br_db/types/
pgsql.rs

1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use rand::{rng, Rng};
13use br_pgsql::connect::Connect;
14use br_pgsql::pools::Pools;
15
16lazy_static! {
17    static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
18    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
19}
20#[derive(Clone)]
21pub struct Pgsql {
22    /// 当前连接配置
23    pub connection: Connection,
24    /// 当前选中配置
25    pub default: String,
26    pub params: Params,
27    pub client: Pools,
28}
29
30impl Pgsql {
31    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
32        let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {:?}", e))?;
33
34        let cp_connection = connection.clone();
35        let config = object! {
36            debug: cp_connection.debug,
37            username: cp_connection.username,
38            userpass: cp_connection.userpass,
39            database: cp_connection.database,
40            hostname: cp_connection.hostname,
41            hostport: port,
42            charset: cp_connection.charset.str(),
43        };
44        let mut pgsql = br_pgsql::Pgsql::new(config)?;
45
46        let pools = pgsql.pools()?;
47        Ok(Self {
48            connection,
49            default: default.clone(),
50            params: Params::default("pgsql"),
51            client: pools,
52        })
53    }
54
55    //fn execute_cl(&mut self, _text: &str, _sql: &str) -> (bool, JsonValue) {
56    //    (true, JsonValue::from(""))
57    //}
58    //fn query_cl(rows: Vec<Row>, _sql: &str) -> (bool, JsonValue) {
59    //    let mut list = array![];
60    //    for row in rows.iter() {
61    //        let mut items = object! {};
62    //        for (index, column) in row.columns().iter().enumerate() {
63    //            match column.type_() {
64    //                &Type::BOOL => {
65    //                    let res = row.get::<&str, bool>(column.name());
66    //                    items[column.name()] = res.into();
67    //                }
68    //                &Type::NAME | &Type::VARCHAR | &Type::TEXT => {
69    //                    let res = row.try_get::<&str, &str>(column.name()).unwrap_or("");
70    //                    items[column.name()] = res.into();
71    //                }
72    //                &Type::NUMERIC => {
73    //                    let res = row.try_get::<&str, _>(column.name()).unwrap_or(0.0);
74    //                    items[column.name()] = res.into();
75    //                }
76    //                &Type::FLOAT4 => {
77    //                    //let res = row.try_get::<&str, float32x2x4_t>(column.name()).unwrap_or(float32x2x4_t::from(0.0));
78    //                }
79    //                &Type::INT4 => {
80    //                    let res = row.try_get::<&str, i32>(column.name()).unwrap_or(0);
81    //                    items[column.name()] = res.into();
82    //                }
83    //                _ => {
84    //                    let _res = row.try_get::<usize, Vec<&str>>(index);
85    //                    println!("{:?}", column.type_());
86    //
87    //                    let res = row.try_get::<&str, &str>(column.name()).unwrap_or("");
88    //                    items[column.name()] = res.into();
89    //                    println!("{}", res);
90    //                    warn!("未实现: {:?}", column.type_());
91    //                }
92    //            }
93    //        }
94    //        list.push(items).unwrap();
95    //    }
96    //    (true, list)
97    //}
98
99    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
100        let thread_id = format!("{:?}", thread::current().id());
101        let _key = format!("{}{}", self.default, thread_id);
102
103        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
104            let key = format!("{}{}", self.default, thread_id);
105            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
106            let mut t = db.lock().unwrap();
107            match t.query(sql) {
108                Ok(e) => {
109                    if self.connection.debug {
110                        info!("查询成功: {} {}", thread_id.clone(), sql);
111                    }
112                    (true, e.rows)
113                }
114                Err(e) => {
115                    error!("事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
116                    (false, JsonValue::from(e.to_string()))
117                }
118            }
119        } else {
120            let mut db = match self.client.get_connect() {
121                Ok(c) => c,
122                Err(e) => {
123                    error!("非事务查询失败: get_connect 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
124                    return (false, JsonValue::from(e.to_string()));
125                }
126            };
127            match db.query(sql) {
128                Ok(e) => {
129                    if self.connection.debug {
130                        info!("查询成功: {} {}", thread_id.clone(), sql);
131                    }
132                    (true, e.rows)
133                }
134                Err(e) => {
135                    error!("非事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
136                    (false, JsonValue::from(e.to_string()))
137                }
138            }
139        }
140    }
141    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
142        let thread_id = format!("{:?}", thread::current().id());
143
144        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
145            let key = format!("{}{}", self.default, thread_id);
146            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
147            let mut t = db.lock().unwrap();
148            match t.execute(sql) {
149                Ok(e) => {
150                    if self.connection.debug {
151                        info!("提交成功: {} {}", thread_id.clone(), sql);
152                    }
153                    if sql.contains("INSERT") {
154                        (true, e.rows)
155                    } else {
156                        (true, e.affect_count.into())
157                    }
158                }
159                Err(e) => {
160                    error!("事务提交失败: {} {} {}", thread_id, e, sql);
161                    (false, JsonValue::from(e.to_string()))
162                }
163            }
164        } else {
165            let mut db = self.client.get_connect().unwrap();
166            match db.execute(sql) {
167                Ok(e) => {
168                    if self.connection.debug {
169                        info!("提交成功: {} {}", thread_id.clone(), sql);
170                    }
171                    if sql.contains("INSERT") {
172                        (true, e.rows)
173                    } else {
174                        (true, e.affect_count.into())
175                    }
176                }
177                Err(e) => {
178                    error!("非事务提交失败: {} {} {}", thread_id, e, sql);
179                    (false, JsonValue::from(e.to_string()))
180                }
181            }
182        }
183    }
184}
185
186impl DbMode for Pgsql {
187    fn database_tables(&mut self) -> JsonValue {
188        let sql = "SHOW TABLES".to_string();
189        match self.sql(sql.as_str()) {
190            Ok(e) => {
191                let mut list = vec![];
192                for item in e.members() {
193                    for (_, value) in item.entries() {
194                        list.push(value.clone());
195                    }
196                }
197                list.into()
198            }
199            Err(_) => {
200                array![]
201            }
202        }
203    }
204
205    fn database_create(&mut self, name: &str) -> bool {
206        let sql = format!("CREATE DATABASE {}", name);
207
208        let (state, data) = self.execute(sql.as_str());
209        match state {
210            true => data.as_bool().unwrap(),
211            false => {
212                error!("创建数据库失败: {:?}", data);
213                false
214            }
215        }
216    }
217}
218
219impl Mode for Pgsql {
220    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
221        let mut sql = String::new();
222        let mut comments = vec![];
223
224        // 唯一约束
225        if !options.table_unique.is_empty() {
226            let unique = format!(
227                "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
228                options.table_unique.join("_"),
229                options.table_name,
230                options.table_unique.join(",")
231            );
232            comments.push(unique);
233        }
234
235        // 唯一索引
236        for row in options.table_index.iter() {
237            let index = format!(
238                "CREATE INDEX {01}_index_{} ON {} ({})",
239                row.join("_"),
240                options.table_name,
241                row.join(",")
242            );
243            comments.push(index);
244        }
245
246        for (name, field) in options.table_fields.entries_mut() {
247            field["table_name"] = options.table_name.clone().into();
248            let row = br_fields::field("pgsql", name, field.clone());
249            let rows = row.split("comment").collect::<Vec<&str>>();
250            comments.push(format!(
251                "COMMENT ON COLUMN {}.{} IS {};",
252                options.table_name, name, rows[1]
253            ));
254            sql = format!("{} {},\r\n", sql, rows[0]);
255        }
256
257        let primary_key = format!(
258            "CONSTRAINT {}_{} PRIMARY KEY ({})",
259            options.table_name, options.table_key, options.table_key
260        );
261        let sql = format!(
262            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
263            options.table_name,
264            sql.trim_end_matches(",\r\n"),
265            primary_key
266        );
267        comments.insert(0, sql);
268
269        for (_name, field) in options.table_fields.entries() {
270            field["mode"].as_str().unwrap();
271            {}
272        }
273
274        if self.params.sql {
275            let info = comments.join("\r\n");
276            return JsonValue::from(info);
277        }
278        for comment in comments {
279            let (state, _) = self.execute(comment.as_str());
280            match state {
281                true => {}
282                false => {
283                    return JsonValue::from(state);
284                }
285            }
286        }
287        JsonValue::from(true)
288    }
289
290    fn table_update(&mut self, options: TableOptions) -> JsonValue {
291        let fields_list = self.table_info(&options.table_name);
292        let mut put = vec![];
293        let mut add = vec![];
294        let mut del = vec![];
295        let mut comments = vec![];
296
297        for (key, _) in fields_list.entries() {
298            if options.table_fields[key].is_empty() {
299                del.push(key);
300            }
301        }
302        for (name, field) in options.table_fields.entries() {
303            if !fields_list[name].is_empty() {
304                let old_comment = fields_list[name]["comment"].to_string();
305                let new_comment = br_fields::field("pgsql", name, field.clone());
306                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
307                let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
308                if old_comment == new_comment_text {
309                    continue;
310                }
311                put.push(name);
312            } else {
313                add.push(name);
314            }
315        }
316
317        for name in add.iter() {
318            let name = name.to_string();
319            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
320            let rows = row.split("comment").collect::<Vec<&str>>();
321            comments.push(format!(
322                r#"ALTER TABLE "{}" add {};"#,
323                options.table_name, rows[0]
324            ));
325            comments.push(format!(
326                "COMMENT ON COLUMN {}.{} IS {};",
327                options.table_name, name, rows[1]
328            ));
329        }
330        for name in del.iter() {
331            comments.push(format!(
332                "ALTER TABLE {} DROP {};\r\n",
333                options.table_name, name
334            ));
335        }
336        for name in put.iter() {
337            let name = name.to_string();
338            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
339            let rows = row.split("comment").collect::<Vec<&str>>();
340
341            let sql = rows[0].split(" ").collect::<Vec<&str>>();
342            let text = format!(
343                "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
344                options.table_name, name, sql[1]
345            );
346            comments.push(text.clone());
347            if sql.len() > 3 && !sql[3].is_empty() {
348                let text = format!(
349                    "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
350                    options.table_name, name, sql[3]
351                );
352                comments.push(text.clone());
353            }
354
355            comments.push(format!(
356                "COMMENT ON COLUMN {}.{} IS {};",
357                options.table_name, name, rows[1]
358            ));
359        }
360
361        let mut unique_new = vec![];
362        let mut index_new = vec![];
363        let mut primary_key = vec![];
364        let (_, index_list) = self.query(
365            format!(
366                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
367                options.table_name
368            ).as_str(),
369        );
370        for item in index_list.members() {
371            let key_name = item["indexname"].as_str().unwrap();
372            let indexdef = item["indexdef"].to_string();
373
374            if indexdef.contains(
375                format!(
376                    "CREATE UNIQUE INDEX {}_{} ON",
377                    options.table_name, options.table_key
378                ).as_str(),
379            ) {
380                primary_key.push(key_name.to_string());
381                continue;
382            }
383            if indexdef.contains("CREATE UNIQUE INDEX") {
384                unique_new.push(key_name.to_string());
385                continue;
386            }
387            if indexdef.contains("CREATE INDEX") {
388                index_new.push(key_name.to_string());
389                continue;
390            }
391        }
392
393        if !options.table_unique.is_empty() {
394            let name = format!(
395                "{}_unique_{}",
396                options.table_name,
397                options.table_unique.join("_")
398            );
399            let unique = format!(
400                "CREATE UNIQUE INDEX {} ON {} ({});",
401                name,
402                options.table_name,
403                options.table_unique.join(",")
404            );
405            if !unique_new.contains(&name) {
406                comments.push(unique);
407            } else {
408                unique_new.retain(|x| *x != name);
409            }
410        }
411
412        // 唯一索引
413        for row in options.table_index.iter() {
414            let name = format!("{}_index_{}", options.table_name, row.join("_"));
415            let index = format!(
416                "CREATE INDEX {} ON {} ({})",
417                name,
418                options.table_name,
419                row.join(",")
420            );
421            if !index_new.contains(&name) {
422                comments.push(index);
423            } else {
424                index_new.retain(|x| *x != name);
425            }
426        }
427
428        for item in unique_new {
429            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
430        }
431        for item in index_new {
432            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
433        }
434
435        if self.params.sql {
436            return JsonValue::from(comments.join(""));
437        }
438
439        if comments.is_empty() {
440            return JsonValue::from(-1);
441        }
442
443        for item in comments.iter() {
444            let (state, res) = self.execute(item.as_str());
445            match state {
446                true => {}
447                false => {
448                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
449                    return JsonValue::from(0);
450                }
451            }
452        }
453        JsonValue::from(1)
454    }
455
456    fn table_info(&mut self, table: &str) -> JsonValue {
457        let sql = format!(
458            "SELECT  COL.COLUMN_NAME,
459    COL.DATA_TYPE,
460    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
461    LEFT JOIN
462    pg_catalog.pg_description DESCRIPTION
463    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
464    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{}'",
465            table
466        );
467        let (state, data) = self.query(sql.as_str());
468        let mut list = object! {};
469        if state {
470            for item in data.members() {
471                let mut row = object! {};
472                row["field"] = item["column_name"].clone();
473                row["comment"] = item["comment"].clone();
474                row["type"] = item["data_type"].clone();
475                list[row["field"].as_str().unwrap()] = row.clone();
476            }
477            list
478        } else {
479            list
480        }
481    }
482
483    fn table_is_exist(&mut self, name: &str) -> bool {
484        let sql = format!(
485            "SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{}')",
486            name
487        );
488        let (state, data) = self.query(sql.as_str());
489        match state {
490            true => {
491                for item in data.members() {
492                    if item.has_key("exists") {
493                        return item["exists"].as_bool().unwrap();
494                    }
495                }
496                false
497            }
498            false => false,
499        }
500    }
501
502    fn table(&mut self, name: &str) -> &mut Pgsql {
503        self.params = Params::default(self.connection.mode.str().as_str());
504        self.params.table = format!("{}{}", self.connection.prefix, name);
505        self.params.join_table = self.params.table.clone();
506        self
507    }
508
509    fn change_table(&mut self, name: &str) -> &mut Self {
510        self.params.join_table = name.to_string();
511        self
512    }
513
514    fn autoinc(&mut self) -> &mut Self {
515        self.params.autoinc = true;
516        self
517    }
518
519    fn fetch_sql(&mut self) -> &mut Self {
520        self.params.sql = true;
521        self
522    }
523
524    fn order(&mut self, field: &str, by: bool) -> &mut Self {
525        self.params.order[field] = {
526            if by {
527                "DESC"
528            } else {
529                "ASC"
530            }
531        }.into();
532        self
533    }
534
535    fn group(&mut self, field: &str) -> &mut Self {
536        let fields: Vec<&str> = field.split(",").collect();
537        for field in fields.iter() {
538            let field = field.to_string();
539            self.params.group[field.as_str()] = field.clone().into();
540            self.params.fields[field.as_str()] = field.clone().into();
541        }
542        self
543    }
544
545    fn distinct(&mut self) -> &mut Self {
546        self.params.distinct = true;
547        self
548    }
549
550    fn json(&mut self, field: &str) -> &mut Self {
551        let list: Vec<&str> = field.split(",").collect();
552        for item in list.iter() {
553            self.params.json[item.to_string().as_str()] = item.to_string().into();
554        }
555        self
556    }
557
558    fn field(&mut self, field: &str) -> &mut Self {
559        let list: Vec<&str> = field.split(",").collect();
560        let join_table = if self.params.join_table.is_empty() {
561            self.params.table.clone()
562        } else {
563            self.params.join_table.clone()
564        };
565        for item in list.iter() {
566            if item.contains(" as ") {
567                let text = item.split(" as ").collect::<Vec<&str>>();
568                if text[0].contains("count(") {
569                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
570                } else {
571                    self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
572                }
573            } else {
574                self.params.fields[item.to_string().as_str()] = format!("{}.{}", join_table, item).into();
575            }
576        }
577        self
578    }
579
580    fn hidden(&mut self, name: &str) -> &mut Self {
581        let hidden: Vec<&str> = name.split(",").collect();
582
583        let fields_list = self.table_info(self.params.clone().table.as_str());
584        let mut data = array![];
585        for item in fields_list.members() {
586            data.push(object! {
587                "name":item["field"].as_str().unwrap()
588            }).unwrap();
589        }
590
591        for item in data.members() {
592            let name = item["name"].as_str().unwrap();
593            if !hidden.contains(&name) {
594                self.params.fields[name] = name.into();
595            }
596        }
597        self
598    }
599
600    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
601        let join_table = if self.params.join_table.is_empty() {
602            self.params.table.clone()
603        } else {
604            self.params.join_table.clone()
605        };
606        if value.is_boolean() {
607            if value.as_bool().unwrap() {
608                value = 1.into();
609            } else {
610                value = 0.into();
611            }
612        }
613        match compare {
614            "between" => {
615                self.params.where_and.push(format!(
616                    "{}.{} between '{}' AND '{}'",
617                    join_table, field, value[0], value[1]
618                ));
619            }
620            "set" => {
621                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
622                let mut wheredata = vec![];
623                for item in list.iter() {
624                    wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
625                }
626                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
627            }
628            "notin" => {
629                let mut text = String::new();
630                for item in value.members() {
631                    text = format!("{},'{}'", text, item);
632                }
633                text = text.trim_start_matches(",").into();
634                self.params.where_and.push(format!("{}.{} not in ({})", join_table, field, text));
635            }
636            "is" => {
637                self.params.where_and.push(format!("{}.{} is {}", join_table, field, value));
638            }
639            "notlike" => {
640                self.params.where_and.push(format!("{}.{} not like '{}'", join_table, field, value));
641            }
642            "in" => {
643                let mut text = String::new();
644                if value.is_array() {
645                    for item in value.members() {
646                        text = format!("{},'{}'", text, item);
647                    }
648                } else if value.is_null() {
649                    text = format!("{},null", text);
650                } else {
651                    let value = value.as_str().unwrap();
652
653                    let value: Vec<&str> = value.split(",").collect();
654                    for item in value.iter() {
655                        text = format!("{},'{}'", text, item);
656                    }
657                }
658                text = text.trim_start_matches(",").into();
659
660                self.params.where_and.push(format!("{}.{} {} ({})", join_table, field, compare, text));
661            }
662            _ => {
663                self.params.where_and.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
664            }
665        }
666        self
667    }
668
669    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
670        let join_table = if self.params.join_table.is_empty() {
671            self.params.table.clone()
672        } else {
673            self.params.join_table.clone()
674        };
675
676        if value.is_boolean() {
677            if value.as_bool().unwrap() {
678                value = 1.into();
679            } else {
680                value = 0.into();
681            }
682        }
683
684        match compare {
685            "between" => {
686                self.params.where_or.push(format!(
687                    "{}.{} between '{}' AND '{}'",
688                    join_table, field, value[0], value[1]
689                ));
690            }
691            "set" => {
692                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
693                let mut wheredata = vec![];
694                for item in list.iter() {
695                    wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
696                }
697                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
698            }
699            "notin" => {
700                let mut text = String::new();
701                for item in value.members() {
702                    text = format!("{},'{}'", text, item);
703                }
704                text = text.trim_start_matches(",").into();
705                self.params.where_or.push(format!("{}.{} not in ({})", join_table, field, text));
706            }
707            "in" => {
708                let mut text = String::new();
709                if value.is_array() {
710                    for item in value.members() {
711                        text = format!("{},'{}'", text, item);
712                    }
713                } else {
714                    let value = value.as_str().unwrap();
715                    let value: Vec<&str> = value.split(",").collect();
716                    for item in value.iter() {
717                        text = format!("{},'{}'", text, item);
718                    }
719                }
720                text = text.trim_start_matches(",").into();
721                self.params.where_or.push(format!("{}.{} {} ({})", join_table, field, compare, text));
722            }
723            _ => {
724                self.params.where_or.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
725            }
726        }
727        self
728    }
729
730    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
731        self.params.where_column = format!(
732            "{}.{} {} {}.{}",
733            self.params.table, field_a, compare, self.params.table, field_b
734        );
735        self
736    }
737
738    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
739        self.params.page = page;
740        self.params.limit = limit;
741        self
742    }
743
744    fn column(&mut self, field: &str) -> JsonValue {
745        self.field(field);
746        let sql = self.params.select_sql();
747
748        if self.params.sql {
749            return JsonValue::from(sql);
750        }
751        let (state, data) = self.query(sql.as_str());
752        match state {
753            true => {
754                let mut list = array![];
755                for item in data.members() {
756                    if self.params.json[field].is_empty() {
757                        list.push(item[field].clone()).unwrap();
758                    } else {
759                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
760                        list.push(data).unwrap();
761                    }
762                }
763                list
764            }
765            false => {
766                array![]
767            }
768        }
769    }
770
771    fn count(&mut self) -> JsonValue {
772        self.params.fields["count"] = "count(*) as count".to_string().into();
773        let sql = self.params.select_sql();
774        if self.params.sql {
775            return JsonValue::from(sql.clone());
776        }
777        let (state, data) = self.query(sql.as_str());
778        if state {
779            data[0]["count"].clone()
780        } else {
781            JsonValue::from(0)
782        }
783    }
784
785    fn max(&mut self, field: &str) -> JsonValue {
786        self.params.fields[field] = format!("max({00}) as {00}", field).into();
787        let sql = self.params.select_sql();
788        if self.params.sql {
789            return JsonValue::from(sql.clone());
790        }
791        let (state, data) = self.query(sql.as_str());
792        if state {
793            if data.len() > 1 {
794                return data.clone();
795            }
796            data[0][field].clone()
797        } else {
798            JsonValue::from(0)
799        }
800    }
801
802    fn min(&mut self, field: &str) -> JsonValue {
803        self.params.fields[field] = format!("min({00}) as {00}", field).into();
804        let sql = self.params.select_sql();
805        if self.params.sql {
806            return JsonValue::from(sql.clone());
807        }
808        let (state, data) = self.query(sql.as_str());
809        if state {
810            if data.len() > 1 {
811                return data;
812            }
813            data[0][field].clone()
814        } else {
815            JsonValue::from(0)
816        }
817    }
818
819    fn sum(&mut self, field: &str) -> JsonValue {
820        self.params.fields[field] = format!("sum({00}) as {00}", field).into();
821        let sql = self.params.select_sql();
822        if self.params.sql {
823            return JsonValue::from(sql.clone());
824        }
825        let (state, data) = self.query(sql.as_str());
826        match state {
827            true => {
828                if data.len() > 1 {
829                    return data;
830                }
831                data[0][field].clone()
832            }
833            false => JsonValue::from(0),
834        }
835    }
836
837    fn avg(&mut self, field: &str) -> JsonValue {
838        self.params.fields[field] = format!("avg({00}) as {00}", field).into();
839        let sql = self.params.select_sql();
840        if self.params.sql {
841            return JsonValue::from(sql.clone());
842        }
843        let (state, data) = self.query(sql.as_str());
844        if state {
845            if data.len() > 1 {
846                return data;
847            }
848            data[0][field].clone()
849        } else {
850            JsonValue::from(0)
851        }
852    }
853
854    fn select(&mut self) -> JsonValue {
855        let sql = self.params.select_sql();
856        if self.params.sql {
857            return JsonValue::from(sql.clone());
858        }
859        let (state, mut data) = self.query(sql.as_str());
860        match state {
861            true => {
862                for (field, _) in self.params.json.entries() {
863                    for item in data.members_mut() {
864                        if !item[field].is_empty() {
865                            let json = item[field].to_string();
866                            item[field] = match json::parse(&json) {
867                                Ok(e) => e,
868                                Err(_) => JsonValue::from(json),
869                            };
870                        }
871                    }
872                }
873                data.clone()
874            }
875            false => array![],
876        }
877    }
878
879    fn find(&mut self) -> JsonValue {
880        self.params.page = 1;
881        self.params.limit = 1;
882        let sql = self.params.select_sql();
883        if self.params.sql {
884            return JsonValue::from(sql.clone());
885        }
886        let (state, mut data) = self.query(sql.as_str());
887        match state {
888            true => {
889                if data.is_empty() {
890                    return object! {};
891                }
892                for (field, _) in self.params.json.entries() {
893                    if !data[0][field].is_empty() {
894                        let json = data[0][field].to_string();
895                        let json = json::parse(&json).unwrap_or(array![]);
896                        data[0][field] = json;
897                    } else {
898                        data[0][field] = array![];
899                    }
900                }
901                data[0].clone()
902            }
903            false => {
904                object! {}
905            }
906        }
907    }
908
909    fn value(&mut self, field: &str) -> JsonValue {
910        self.params.fields = object! {};
911        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
912        self.params.page = 1;
913        self.params.limit = 1;
914        let sql = self.params.select_sql();
915        if self.params.sql {
916            return JsonValue::from(sql.clone());
917        }
918        let (state, mut data) = self.query(sql.as_str());
919        match state {
920            true => {
921                for (field, _) in self.params.json.entries() {
922                    if !data[0][field].is_empty() {
923                        let json = data[0][field].to_string();
924                        let json = json::parse(&json).unwrap_or(array![]);
925                        data[0][field] = json;
926                    } else {
927                        data[0][field] = array![];
928                    }
929                }
930                data[0][field].clone()
931            }
932            false => {
933                if self.connection.debug {
934                    info!("{:?}", data);
935                }
936                JsonValue::Null
937            }
938        }
939    }
940
941    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
942        let mut fields = vec![];
943        let mut values = vec![];
944        if !self.params.autoinc && data["id"].is_empty() {
945            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
946        }
947        for (field, value) in data.entries() {
948            fields.push(field);
949
950            if value.is_string() {
951                values.push(format!("'{}'", value.to_string().replace("'", "''")));
952                continue;
953            } else if value.is_array() {
954                if self.params.json[field].is_empty() {
955                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
956                    values.push(format!("'{}'", array));
957                } else {
958                    let json = value.to_string();
959                    let json = json.replace("'", "''");
960                    values.push(format!("'{}'", json));
961                }
962                continue;
963            } else if value.is_object() {
964                if self.params.json[field].is_empty() {
965                    values.push(format!("'{}'", value));
966                } else {
967                    let json = value.to_string();
968                    let json = json.replace("'", "''");
969                    values.push(format!("'{}'", json));
970                }
971                continue;
972            } else if value.is_number() || value.is_boolean() || value.is_null() {
973                values.push(format!("{}", value));
974                continue;
975            } else {
976                values.push(format!("'{}'", value));
977                continue;
978            }
979        }
980        let fields = fields.join(",");
981        let values = values.join(",");
982
983        let sql = format!(
984            "INSERT INTO {} ({}) VALUES ({});",
985            self.params.table, fields, values
986        );
987        if self.params.sql {
988            return JsonValue::from(sql.clone());
989        }
990        let (state, ids) = self.execute(sql.as_str());
991
992        match state {
993            true => match self.params.autoinc {
994                true => ids.clone(),
995                false => data["id"].clone(),
996            },
997            false => {
998                let thread_id = format!("{:?}", thread::current().id());
999                error!("添加失败: {} {:?} {}", thread_id, ids, sql);
1000                JsonValue::from("")
1001            }
1002        }
1003    }
1004    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1005        let mut fields = String::new();
1006        if !self.params.autoinc && data[0]["id"].is_empty() {
1007            data[0]["id"] = "".into();
1008        }
1009        for (field, _) in data[0].entries() {
1010            fields = format!("{},{}", fields, field);
1011        }
1012        fields = fields.trim_start_matches(",").parse().unwrap();
1013
1014        let core_count = num_cpus::get();
1015        let mut p = pools::Pool::new(core_count * 4);
1016
1017        let mut rng = rng();
1018        let i: i32 = rng.random_range(1..=100);
1019        let autoinc = self.params.autoinc;
1020        for list in data.members() {
1021            let mut item = list.clone();
1022            p.execute(move |pcindex| {
1023                if !autoinc && item["id"].is_empty() {
1024                    let id = format!(
1025                        "{:X}{:X}{}",
1026                        Local::now().timestamp_nanos_opt().unwrap(),
1027                        pcindex,
1028                        i
1029                    );
1030                    item["id"] = id.into();
1031                }
1032                let mut values = "".to_string();
1033                for (_, value) in item.entries() {
1034                    if value.is_string() {
1035                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1036                    } else if value.is_number() {
1037                        values = format!("{},{}", values, value);
1038                    } else if value.is_boolean() {
1039                        values = format!("{},{}", values, value);
1040                        continue;
1041                    } else {
1042                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1043                    }
1044                }
1045                values = format!("({})", values.trim_start_matches(","));
1046                array![item["id"].clone(), values]
1047            });
1048        }
1049        let (ids_list, mut values) = p.insert_all();
1050        values = values.trim_start_matches(",").parse().unwrap();
1051        let sql = format!(
1052            "INSERT INTO {} ({}) VALUES {};",
1053            self.params.table, fields, values
1054        );
1055
1056        if self.params.sql {
1057            return JsonValue::from(sql.clone());
1058        }
1059        let (state, data) = self.execute(sql.as_str());
1060        match state {
1061            true => match autoinc {
1062                true => data,
1063                false => JsonValue::from(ids_list),
1064            },
1065            false => {
1066                error!("insert_all: {:?}", data);
1067                array![]
1068            }
1069        }
1070    }
1071    fn update(&mut self, data: JsonValue) -> JsonValue {
1072        let mut values = vec![];
1073        for (field, value) in data.entries() {
1074            if value.is_string() {
1075                values.push(format!(
1076                    "{}='{}'",
1077                    field,
1078                    value.to_string().replace("'", "''")
1079                ));
1080            } else if value.is_number() {
1081                values.push(format!("{}= {}", field, value));
1082            } else if value.is_array() {
1083                if self.params.json[field].is_empty() {
1084                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1085                    values.push(format!("{}='{}'", field, array));
1086                } else {
1087                    let json = value.to_string();
1088                    let json = json.replace("'", "''");
1089                    values.push(format!("{}='{}'", field, json));
1090                }
1091                continue;
1092            } else if value.is_object() {
1093                if self.params.json[field].is_empty() {
1094                    values.push(format!("{}='{}'", field, value));
1095                } else {
1096                    if value.is_empty() {
1097                        values.push(format!("{}=''", field));
1098                        continue;
1099                    }
1100                    let json = value.to_string();
1101                    let json = json.replace("'", "''");
1102                    values.push(format!("{}='{}'", field, json));
1103                }
1104                continue;
1105            } else if value.is_boolean() {
1106                values.push(format!("{}= {}", field, value));
1107            } else {
1108                values.push(format!("{}=\"{}\"", field, value));
1109            }
1110        }
1111
1112        for (field, value) in self.params.inc_dec.entries() {
1113            values.push(format!("{} = {}", field, value.to_string().clone()));
1114        }
1115
1116        let values = values.join(",");
1117
1118        let sql = format!(
1119            "UPDATE {} SET {} {};",
1120            self.params.table.clone(),
1121            values,
1122            self.params.where_sql()
1123        );
1124        if self.params.sql {
1125            return JsonValue::from(sql.clone());
1126        }
1127        let (state, data) = self.execute(sql.as_str());
1128        if state {
1129            data
1130        } else {
1131            let thread_id = format!("{:?}", thread::current().id());
1132            error!("update: {} {:?} {}", thread_id, data, sql);
1133            0.into()
1134        }
1135    }
1136    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1137        let mut values = vec![];
1138
1139        let mut ids = vec![];
1140        for (field, _) in data[0].entries() {
1141            if field == "id" {
1142                continue;
1143            }
1144            let mut fields = vec![];
1145            for row in data.members() {
1146                let value = row[field].clone();
1147                let id = row["id"].clone();
1148                ids.push(id.clone());
1149                if value.is_string() {
1150                    fields.push(format!(
1151                        "WHEN '{}' THEN '{}'",
1152                        id,
1153                        value.to_string().replace("'", "''")
1154                    ));
1155                } else if value.is_array() || value.is_object() {
1156                    if self.params.json[field].is_empty() {
1157                        fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1158                    } else {
1159                        let json = value.to_string();
1160                        let json = json.replace("'", "''");
1161                        fields.push(format!("WHEN '{}' THEN '{}'", id, json));
1162                    }
1163                    continue;
1164                } else if value.is_number() || value.is_boolean() || value.is_null() {
1165                    fields.push(format!("WHEN '{}' THEN {}", id, value));
1166                } else {
1167                    fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1168                }
1169            }
1170            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1171        }
1172        self.where_and("id", "in", ids.into());
1173        for (field, value) in self.params.inc_dec.entries() {
1174            values.push(format!("{} = {}", field, value.to_string().clone()));
1175        }
1176
1177        let values = values.join(",");
1178        let sql = format!(
1179            "UPDATE {} SET {} {} {};",
1180            self.params.table.clone(),
1181            values,
1182            self.params.where_sql(),
1183            self.params.page_limit_sql()
1184        );
1185        if self.params.sql {
1186            return JsonValue::from(sql.clone());
1187        }
1188        let (state, data) = self.execute(sql.as_str());
1189        if state {
1190            data
1191        } else {
1192            error!("update_all: {:?}", data);
1193            JsonValue::from(0)
1194        }
1195    }
1196
1197    fn delete(&mut self) -> JsonValue {
1198        let sql = format!(
1199            "delete FROM {} {} {};",
1200            self.params.table.clone(),
1201            self.params.where_sql(),
1202            self.params.page_limit_sql()
1203        );
1204        if self.params.sql {
1205            return JsonValue::from(sql.clone());
1206        }
1207        let (state, data) = self.execute(sql.as_str());
1208        match state {
1209            true => data,
1210            false => {
1211                error!("delete 失败>>> {:?}", data);
1212                JsonValue::from(0)
1213            }
1214        }
1215    }
1216
1217    fn transaction(&mut self) -> bool {
1218        let thread_id = format!("{:?}", thread::current().id());
1219
1220        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1221            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1222            t += 1;
1223            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1224            return true;
1225        }
1226
1227        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1228        let key = format!("{}{}", self.default, thread_id);
1229        TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(self.client.get_connect().unwrap())));
1230
1231        let sql = "START TRANSACTION;".to_string();
1232        let (state, _) = self.execute(sql.as_str());
1233        match state {
1234            true => {
1235                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1236                let (state, _) = self.execute(sql.as_str());
1237                match state {
1238                    true => state,
1239                    false => {
1240                        TRANS.lock().unwrap().remove(&*thread_id.clone());
1241                        TR.lock().unwrap().remove(&key.clone());
1242                        state
1243                    }
1244                }
1245            }
1246            false => {
1247                TRANS.lock().unwrap().remove(&*thread_id.clone());
1248                TR.lock().unwrap().remove(&key.clone());
1249                state
1250            }
1251        }
1252    }
1253    fn commit(&mut self) -> bool {
1254        let thread_id = format!("{:?}", thread::current().id());
1255        let sql = "COMMIT".to_string();
1256
1257        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1258        if t > 1 {
1259            t -= 1;
1260            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1261            return true;
1262        }
1263        let (state, data) = self.execute(sql.as_str());
1264        TRANS.lock().unwrap().remove(&thread_id);
1265        let key = format!("{}{}", self.default, thread_id);
1266        TR.lock().unwrap().remove(&*key);
1267        match state {
1268            true => {}
1269            false => {
1270                error!("提交事务失败: {}", data);
1271            }
1272        }
1273        state
1274    }
1275
1276    fn rollback(&mut self) -> bool {
1277        let thread_id = format!("{:?}", thread::current().id());
1278        let sql = "ROLLBACK".to_string();
1279
1280        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1281        if t > 1 {
1282            t -= 1;
1283            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1284            return true;
1285        }
1286        let (state, data) = self.execute(sql.as_str());
1287        TRANS.lock().unwrap().remove(&thread_id);
1288        let key = format!("{}{}", self.default, thread_id);
1289        TR.lock().unwrap().remove(&*key);
1290        match state {
1291            true => {}
1292            false => {
1293                error!("回滚失败: {}", data);
1294            }
1295        }
1296        state
1297    }
1298
1299    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1300        let (state, data) = self.query(sql);
1301        match state {
1302            true => Ok(data),
1303            false => Err(data.to_string()),
1304        }
1305    }
1306
1307    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1308        let (state, data) = self.execute(sql);
1309        match state {
1310            true => Ok(data),
1311            false => Err(data.to_string()),
1312        }
1313    }
1314
1315    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1316        self.params.inc_dec[field] = format!("{} + {}", field, num).into();
1317        self
1318    }
1319
1320    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1321        self.params.inc_dec[field] = format!("{} - {}", field, num).into();
1322        self
1323    }
1324    fn buildsql(&mut self) -> String {
1325        self.fetch_sql();
1326        let sql = self.select().to_string();
1327        format!("( {} ) {}", sql, self.params.table)
1328    }
1329
1330    fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1331        let main_fields = if main_fields.is_empty() {
1332            "id"
1333        } else {
1334            main_fields
1335        };
1336        let right_fields = if right_fields.is_empty() {
1337            self.params.table.clone()
1338        } else {
1339            right_fields.to_string().clone()
1340        };
1341        self.params.join_table = table.to_string();
1342        self.params.join.push(format!(
1343            " LEFT JOIN {} ON {}.{} = {}.{}",
1344            table, self.params.table, main_fields, table, right_fields
1345        ));
1346        self
1347    }
1348
1349    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1350        let main_fields = if main_fields.is_empty() {
1351            "id"
1352        } else {
1353            main_fields
1354        };
1355        let second_fields = if second_fields.is_empty() {
1356            self.params.table.clone()
1357        } else {
1358            second_fields.to_string().clone()
1359        };
1360        let sec_table_name = format!("{}{}", table, "_2");
1361        let second_table = format!("{} {}", table, sec_table_name.clone());
1362        self.params.join_table = sec_table_name.clone();
1363        self.params.join.push(format!(
1364            " INNER JOIN {} ON {}.{} = {}.{}",
1365            second_table, self.params.table, main_fields, sec_table_name, second_fields
1366        ));
1367        self
1368    }
1369}