br_db/types/
pgsql.rs

1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use rand::{rng, Rng};
13use br_pgsql::connect::Connect;
14use br_pgsql::pools::Pools;
15
16lazy_static! {
17    static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
18    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
19}
20#[derive(Clone)]
21pub struct Pgsql {
22    /// 当前连接配置
23    pub connection: Connection,
24    /// 当前选中配置
25    pub default: String,
26    pub params: Params,
27    pub client: Pools,
28}
29
30impl Pgsql {
31    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
32        let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {:?}", e))?;
33
34        let cp_connection = connection.clone();
35        let config = object! {
36            debug: cp_connection.debug,
37            username: cp_connection.username,
38            userpass: cp_connection.userpass,
39            database: cp_connection.database,
40            hostname: cp_connection.hostname,
41            hostport: port,
42            charset: cp_connection.charset.str(),
43        };
44        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
45
46        let pools = pgsql.pools()?;
47        Ok(Self {
48            connection,
49            default: default.clone(),
50            params: Params::default("pgsql"),
51            client: pools,
52        })
53    }
54    
55    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
56        let thread_id = format!("{:?}", thread::current().id());
57        let _key = format!("{}{}", self.default, thread_id);
58        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
59            let key = format!("{}{}", self.default, thread_id);
60            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
61            let mut t = db.lock().unwrap();
62            match t.query(sql) {
63                Ok(e) => {
64                    if self.connection.debug {
65                        info!("查询成功: {} {}", thread_id.clone(), sql);
66                    }
67                    (true, e.rows)
68                }
69                Err(e) => {
70                    error!("事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
71                    (false, JsonValue::from(e.to_string()))
72                }
73            }
74        } else {
75            let mut db = match self.client.get_connect() {
76                Ok(c) => c,
77                Err(e) => {
78                    error!("非事务查询失败: get_connect 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
79                    return (false, JsonValue::from(e.to_string()));
80                }
81            };
82            match db.query(sql) {
83                Ok(e) => {
84                    if self.connection.debug {
85                        info!("查询成功: {} {}", thread_id.clone(), sql);
86                    }
87                    (true, e.rows)
88                }
89                Err(e) => {
90                    error!("非事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}]",thread_id,e,sql);
91                    (false, JsonValue::from(e.to_string()))
92                }
93            }
94        }
95    }
96    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
97        let thread_id = format!("{:?}", thread::current().id());
98
99        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
100            let key = format!("{}{}", self.default, thread_id);
101            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
102            let mut t = db.lock().unwrap();
103            match t.execute(sql) {
104                Ok(e) => {
105                    if self.connection.debug {
106                        info!("提交成功: {} {}", thread_id.clone(), sql);
107                    }
108                    if sql.contains("INSERT") {
109                        (true, e.rows)
110                    } else {
111                        (true, e.affect_count.into())
112                    }
113                }
114                Err(e) => {
115                    error!("事务提交失败: {} {} {}", thread_id, e, sql);
116                    (false, JsonValue::from(e.to_string()))
117                }
118            }
119        } else {
120            let mut db = self.client.get_connect().unwrap();
121            match db.execute(sql) {
122                Ok(e) => {
123                    if self.connection.debug {
124                        info!("提交成功: {} {}", thread_id.clone(), sql);
125                    }
126                    if sql.contains("INSERT") {
127                        (true, e.rows)
128                    } else {
129                        (true, e.affect_count.into())
130                    }
131                }
132                Err(e) => {
133                    error!("非事务提交失败: {} {} {}", thread_id, e, sql);
134                    (false, JsonValue::from(e.to_string()))
135                }
136            }
137        }
138    }
139}
140
141impl DbMode for Pgsql {
142    fn database_tables(&mut self) -> JsonValue {
143        let sql = "SHOW TABLES".to_string();
144        match self.sql(sql.as_str()) {
145            Ok(e) => {
146                let mut list = vec![];
147                for item in e.members() {
148                    for (_, value) in item.entries() {
149                        list.push(value.clone());
150                    }
151                }
152                list.into()
153            }
154            Err(_) => {
155                array![]
156            }
157        }
158    }
159
160    fn database_create(&mut self, name: &str) -> bool {
161        let sql = format!("CREATE DATABASE {}", name);
162
163        let (state, data) = self.execute(sql.as_str());
164        match state {
165            true => data.as_bool().unwrap(),
166            false => {
167                error!("创建数据库失败: {:?}", data);
168                false
169            }
170        }
171    }
172}
173
174impl Mode for Pgsql {
175    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
176        let mut sql = String::new();
177        let mut comments = vec![];
178
179        // 唯一约束
180        if !options.table_unique.is_empty() {
181            let unique = format!(
182                "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
183                options.table_unique.join("_"),
184                options.table_name,
185                options.table_unique.join(",")
186            );
187            comments.push(unique);
188        }
189
190        // 唯一索引
191        for row in options.table_index.iter() {
192            let index = format!(
193                "CREATE INDEX {01}_index_{} ON {} ({})",
194                row.join("_"),
195                options.table_name,
196                row.join(",")
197            );
198            comments.push(index);
199        }
200
201        for (name, field) in options.table_fields.entries_mut() {
202            field["table_name"] = options.table_name.clone().into();
203            let row = br_fields::field("pgsql", name, field.clone());
204            let rows = row.split("comment").collect::<Vec<&str>>();
205            comments.push(format!(
206                "COMMENT ON COLUMN {}.{} IS {};",
207                options.table_name, name, rows[1]
208            ));
209            sql = format!("{} {},\r\n", sql, rows[0]);
210        }
211
212        let primary_key = format!(
213            "CONSTRAINT {}_{} PRIMARY KEY ({})",
214            options.table_name, options.table_key, options.table_key
215        );
216        let sql = format!(
217            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
218            options.table_name,
219            sql.trim_end_matches(",\r\n"),
220            primary_key
221        );
222        comments.insert(0, sql);
223
224        for (_name, field) in options.table_fields.entries() {
225            field["mode"].as_str().unwrap();
226            {}
227        }
228
229        if self.params.sql {
230            let info = comments.join("\r\n");
231            return JsonValue::from(info);
232        }
233        for comment in comments {
234            let (state, _) = self.execute(comment.as_str());
235            match state {
236                true => {}
237                false => {
238                    return JsonValue::from(state);
239                }
240            }
241        }
242        JsonValue::from(true)
243    }
244
245    fn table_update(&mut self, options: TableOptions) -> JsonValue {
246        let fields_list = self.table_info(&options.table_name);
247        let mut put = vec![];
248        let mut add = vec![];
249        let mut del = vec![];
250        let mut comments = vec![];
251
252        for (key, _) in fields_list.entries() {
253            if options.table_fields[key].is_empty() {
254                del.push(key);
255            }
256        }
257        for (name, field) in options.table_fields.entries() {
258            if !fields_list[name].is_empty() {
259                let old_comment = fields_list[name]["comment"].to_string();
260                let new_comment = br_fields::field("pgsql", name, field.clone());
261                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
262                let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
263                if old_comment == new_comment_text {
264                    continue;
265                }
266                put.push(name);
267            } else {
268                add.push(name);
269            }
270        }
271
272        for name in add.iter() {
273            let name = name.to_string();
274            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
275            let rows = row.split("comment").collect::<Vec<&str>>();
276            comments.push(format!(
277                r#"ALTER TABLE "{}" add {};"#,
278                options.table_name, rows[0]
279            ));
280            comments.push(format!(
281                "COMMENT ON COLUMN {}.{} IS {};",
282                options.table_name, name, rows[1]
283            ));
284        }
285        for name in del.iter() {
286            comments.push(format!(
287                "ALTER TABLE {} DROP {};\r\n",
288                options.table_name, name
289            ));
290        }
291        for name in put.iter() {
292            let name = name.to_string();
293            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
294            let rows = row.split("comment").collect::<Vec<&str>>();
295
296            let sql = rows[0].split(" ").collect::<Vec<&str>>();
297            
298            if sql[1].contains("BOOLEAN") {
299                let text =     format!(
300                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
301                    options.table_name, name
302                );
303                comments.push(text.clone());
304                let text =     format!(
305                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
306                    options.table_name, name, sql[1]
307                );
308                comments.push(text.clone());
309            } else {
310                let text =  format!(
311                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
312                    options.table_name, name, sql[1]
313                );
314                comments.push(text.clone());
315            };
316            
317            if sql.len() > 3 && !sql[3].is_empty() {
318                let text = format!(
319                    "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
320                    options.table_name, name, sql[3]
321                );
322                comments.push(text.clone());
323            }
324
325            comments.push(format!(
326                "COMMENT ON COLUMN {}.{} IS {};",
327                options.table_name, name, rows[1]
328            ));
329        }
330
331        let mut unique_new = vec![];
332        let mut index_new = vec![];
333        let mut primary_key = vec![];
334        let (_, index_list) = self.query(
335            format!(
336                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
337                options.table_name
338            ).as_str(),
339        );
340        for item in index_list.members() {
341            let key_name = item["indexname"].as_str().unwrap();
342            let indexdef = item["indexdef"].to_string();
343
344            if indexdef.contains(
345                format!(
346                    "CREATE UNIQUE INDEX {}_{} ON",
347                    options.table_name, options.table_key
348                ).as_str(),
349            ) {
350                primary_key.push(key_name.to_string());
351                continue;
352            }
353            if indexdef.contains("CREATE UNIQUE INDEX") {
354                unique_new.push(key_name.to_string());
355                continue;
356            }
357            if indexdef.contains("CREATE INDEX") {
358                index_new.push(key_name.to_string());
359                continue;
360            }
361        }
362
363        if !options.table_unique.is_empty() {
364            let name = format!(
365                "{}_unique_{}",
366                options.table_name,
367                options.table_unique.join("_")
368            );
369            let unique = format!(
370                "CREATE UNIQUE INDEX {} ON {} ({});",
371                name,
372                options.table_name,
373                options.table_unique.join(",")
374            );
375            if !unique_new.contains(&name) {
376                comments.push(unique);
377            } else {
378                unique_new.retain(|x| *x != name);
379            }
380        }
381
382        // 唯一索引
383        for row in options.table_index.iter() {
384            let name = format!("{}_index_{}", options.table_name, row.join("_"));
385            let index = format!(
386                "CREATE INDEX {} ON {} ({})",
387                name,
388                options.table_name,
389                row.join(",")
390            );
391            if !index_new.contains(&name) {
392                comments.push(index);
393            } else {
394                index_new.retain(|x| *x != name);
395            }
396        }
397
398        for item in unique_new {
399            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
400        }
401        for item in index_new {
402            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
403        }
404
405        if self.params.sql {
406            return JsonValue::from(comments.join(""));
407        }
408
409        if comments.is_empty() {
410            return JsonValue::from(-1);
411        }
412
413        for item in comments.iter() {
414            let (state, res) = self.execute(item.as_str());
415            match state {
416                true => {}
417                false => {
418                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
419                    return JsonValue::from(0);
420                }
421            }
422        }
423        JsonValue::from(1)
424    }
425
426    fn table_info(&mut self, table: &str) -> JsonValue {
427        let sql = format!(
428            "SELECT  COL.COLUMN_NAME,
429    COL.DATA_TYPE,
430    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
431    LEFT JOIN
432    pg_catalog.pg_description DESCRIPTION
433    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
434    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{}'",
435            table
436        );
437        let (state, data) = self.query(sql.as_str());
438        let mut list = object! {};
439        if state {
440            for item in data.members() {
441                let mut row = object! {};
442                row["field"] = item["column_name"].clone();
443                row["comment"] = item["comment"].clone();
444                row["type"] = item["data_type"].clone();
445                list[row["field"].as_str().unwrap()] = row.clone();
446            }
447            list
448        } else {
449            list
450        }
451    }
452
453    fn table_is_exist(&mut self, name: &str) -> bool {
454        let sql = format!(
455            "SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{}')",
456            name
457        );
458        let (state, data) = self.query(sql.as_str());
459        match state {
460            true => {
461                for item in data.members() {
462                    if item.has_key("exists") {
463                        return item["exists"].as_bool().unwrap();
464                    }
465                }
466                false
467            }
468            false => false,
469        }
470    }
471
472    fn table(&mut self, name: &str) -> &mut Pgsql {
473        self.params = Params::default(self.connection.mode.str().as_str());
474        self.params.table = format!("{}{}", self.connection.prefix, name);
475        self.params.join_table = self.params.table.clone();
476        self
477    }
478
479    fn change_table(&mut self, name: &str) -> &mut Self {
480        self.params.join_table = name.to_string();
481        self
482    }
483
484    fn autoinc(&mut self) -> &mut Self {
485        self.params.autoinc = true;
486        self
487    }
488
489    fn fetch_sql(&mut self) -> &mut Self {
490        self.params.sql = true;
491        self
492    }
493
494    fn order(&mut self, field: &str, by: bool) -> &mut Self {
495        self.params.order[field] = {
496            if by {
497                "DESC"
498            } else {
499                "ASC"
500            }
501        }.into();
502        self
503    }
504
505    fn group(&mut self, field: &str) -> &mut Self {
506        let fields: Vec<&str> = field.split(",").collect();
507        for field in fields.iter() {
508            let field = field.to_string();
509            self.params.group[field.as_str()] = field.clone().into();
510            self.params.fields[field.as_str()] = field.clone().into();
511        }
512        self
513    }
514
515    fn distinct(&mut self) -> &mut Self {
516        self.params.distinct = true;
517        self
518    }
519
520    fn json(&mut self, field: &str) -> &mut Self {
521        let list: Vec<&str> = field.split(",").collect();
522        for item in list.iter() {
523            self.params.json[item.to_string().as_str()] = item.to_string().into();
524        }
525        self
526    }
527
528    fn field(&mut self, field: &str) -> &mut Self {
529        let list: Vec<&str> = field.split(",").collect();
530        let join_table = if self.params.join_table.is_empty() {
531            self.params.table.clone()
532        } else {
533            self.params.join_table.clone()
534        };
535        for item in list.iter() {
536            if item.contains(" as ") {
537                let text = item.split(" as ").collect::<Vec<&str>>();
538                if text[0].contains("count(") {
539                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
540                } else {
541                    self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
542                }
543            } else {
544                self.params.fields[item.to_string().as_str()] = format!("{}.{}", join_table, item).into();
545            }
546        }
547        self
548    }
549
550    fn hidden(&mut self, name: &str) -> &mut Self {
551        let hidden: Vec<&str> = name.split(",").collect();
552
553        let fields_list = self.table_info(self.params.clone().table.as_str());
554        let mut data = array![];
555        for item in fields_list.members() {
556            data.push(object! {
557                "name":item["field"].as_str().unwrap()
558            }).unwrap();
559        }
560
561        for item in data.members() {
562            let name = item["name"].as_str().unwrap();
563            if !hidden.contains(&name) {
564                self.params.fields[name] = name.into();
565            }
566        }
567        self
568    }
569
570    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
571        let join_table = if self.params.join_table.is_empty() {
572            self.params.table.clone()
573        } else {
574            self.params.join_table.clone()
575        };
576        if value.is_boolean() {
577            if value.as_bool().unwrap() {
578                value = 1.into();
579            } else {
580                value = 0.into();
581            }
582        }
583        match compare {
584            "between" => {
585                self.params.where_and.push(format!(
586                    "{}.{} between '{}' AND '{}'",
587                    join_table, field, value[0], value[1]
588                ));
589            }
590            "set" => {
591                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
592                let mut wheredata = vec![];
593                for item in list.iter() {
594                    wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
595                }
596                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
597            }
598            "notin" => {
599                let mut text = String::new();
600                for item in value.members() {
601                    text = format!("{},'{}'", text, item);
602                }
603                text = text.trim_start_matches(",").into();
604                self.params.where_and.push(format!("{}.{} not in ({})", join_table, field, text));
605            }
606            "is" => {
607                self.params.where_and.push(format!("{}.{} is {}", join_table, field, value));
608            }
609            "notlike" => {
610                self.params.where_and.push(format!("{}.{} not like '{}'", join_table, field, value));
611            }
612            "in" => {
613                let mut text = String::new();
614                if value.is_array() {
615                    for item in value.members() {
616                        text = format!("{},'{}'", text, item);
617                    }
618                } else if value.is_null() {
619                    text = format!("{},null", text);
620                } else {
621                    let value = value.as_str().unwrap();
622
623                    let value: Vec<&str> = value.split(",").collect();
624                    for item in value.iter() {
625                        text = format!("{},'{}'", text, item);
626                    }
627                }
628                text = text.trim_start_matches(",").into();
629
630                self.params.where_and.push(format!("{}.{} {} ({})", join_table, field, compare, text));
631            }
632            _ => {
633                self.params.where_and.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
634            }
635        }
636        self
637    }
638
639    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
640        let join_table = if self.params.join_table.is_empty() {
641            self.params.table.clone()
642        } else {
643            self.params.join_table.clone()
644        };
645
646        if value.is_boolean() {
647            if value.as_bool().unwrap() {
648                value = 1.into();
649            } else {
650                value = 0.into();
651            }
652        }
653
654        match compare {
655            "between" => {
656                self.params.where_or.push(format!(
657                    "{}.{} between '{}' AND '{}'",
658                    join_table, field, value[0], value[1]
659                ));
660            }
661            "set" => {
662                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
663                let mut wheredata = vec![];
664                for item in list.iter() {
665                    wheredata.push(format!("'{}' = ANY (string_to_array({}.{},','))", item, join_table, field));
666                }
667                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
668            }
669            "notin" => {
670                let mut text = String::new();
671                for item in value.members() {
672                    text = format!("{},'{}'", text, item);
673                }
674                text = text.trim_start_matches(",").into();
675                self.params.where_or.push(format!("{}.{} not in ({})", join_table, field, text));
676            }
677            "in" => {
678                let mut text = String::new();
679                if value.is_array() {
680                    for item in value.members() {
681                        text = format!("{},'{}'", text, item);
682                    }
683                } else {
684                    let value = value.as_str().unwrap();
685                    let value: Vec<&str> = value.split(",").collect();
686                    for item in value.iter() {
687                        text = format!("{},'{}'", text, item);
688                    }
689                }
690                text = text.trim_start_matches(",").into();
691                self.params.where_or.push(format!("{}.{} {} ({})", join_table, field, compare, text));
692            }
693            _ => {
694                self.params.where_or.push(format!("{}.{} {} '{}'", join_table, field, compare, value));
695            }
696        }
697        self
698    }
699
700    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
701        self.params.where_column = format!(
702            "{}.{} {} {}.{}",
703            self.params.table, field_a, compare, self.params.table, field_b
704        );
705        self
706    }
707
708    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
709        self.params.page = page;
710        self.params.limit = limit;
711        self
712    }
713
714    fn column(&mut self, field: &str) -> JsonValue {
715        self.field(field);
716        let sql = self.params.select_sql();
717
718        if self.params.sql {
719            return JsonValue::from(sql);
720        }
721        let (state, data) = self.query(sql.as_str());
722        match state {
723            true => {
724                let mut list = array![];
725                for item in data.members() {
726                    if self.params.json[field].is_empty() {
727                        list.push(item[field].clone()).unwrap();
728                    } else {
729                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
730                        list.push(data).unwrap();
731                    }
732                }
733                list
734            }
735            false => {
736                array![]
737            }
738        }
739    }
740
741    fn count(&mut self) -> JsonValue {
742        self.params.fields["count"] = "count(*) as count".to_string().into();
743        let sql = self.params.select_sql();
744        if self.params.sql {
745            return JsonValue::from(sql.clone());
746        }
747        let (state, data) = self.query(sql.as_str());
748        if state {
749            data[0]["count"].clone()
750        } else {
751            JsonValue::from(0)
752        }
753    }
754
755    fn max(&mut self, field: &str) -> JsonValue {
756        self.params.fields[field] = format!("max({00}) as {00}", field).into();
757        let sql = self.params.select_sql();
758        if self.params.sql {
759            return JsonValue::from(sql.clone());
760        }
761        let (state, data) = self.query(sql.as_str());
762        if state {
763            if data.len() > 1 {
764                return data.clone();
765            }
766            data[0][field].clone()
767        } else {
768            JsonValue::from(0)
769        }
770    }
771
772    fn min(&mut self, field: &str) -> JsonValue {
773        self.params.fields[field] = format!("min({00}) as {00}", field).into();
774        let sql = self.params.select_sql();
775        if self.params.sql {
776            return JsonValue::from(sql.clone());
777        }
778        let (state, data) = self.query(sql.as_str());
779        if state {
780            if data.len() > 1 {
781                return data;
782            }
783            data[0][field].clone()
784        } else {
785            JsonValue::from(0)
786        }
787    }
788
789    fn sum(&mut self, field: &str) -> JsonValue {
790        self.params.fields[field] = format!("sum({00}) as {00}", field).into();
791        let sql = self.params.select_sql();
792        if self.params.sql {
793            return JsonValue::from(sql.clone());
794        }
795        let (state, data) = self.query(sql.as_str());
796        match state {
797            true => {
798                if data.len() > 1 {
799                    return data;
800                }
801                data[0][field].clone()
802            }
803            false => JsonValue::from(0),
804        }
805    }
806
807    fn avg(&mut self, field: &str) -> JsonValue {
808        self.params.fields[field] = format!("avg({00}) as {00}", field).into();
809        let sql = self.params.select_sql();
810        if self.params.sql {
811            return JsonValue::from(sql.clone());
812        }
813        let (state, data) = self.query(sql.as_str());
814        if state {
815            if data.len() > 1 {
816                return data;
817            }
818            data[0][field].clone()
819        } else {
820            JsonValue::from(0)
821        }
822    }
823
824    fn select(&mut self) -> JsonValue {
825        let sql = self.params.select_sql();
826        if self.params.sql {
827            return JsonValue::from(sql.clone());
828        }
829        let (state, mut data) = self.query(sql.as_str());
830        match state {
831            true => {
832                for (field, _) in self.params.json.entries() {
833                    for item in data.members_mut() {
834                        if !item[field].is_empty() {
835                            let json = item[field].to_string();
836                            item[field] = match json::parse(&json) {
837                                Ok(e) => e,
838                                Err(_) => JsonValue::from(json),
839                            };
840                        }
841                    }
842                }
843                data.clone()
844            }
845            false => array![],
846        }
847    }
848
849    fn find(&mut self) -> JsonValue {
850        self.params.page = 1;
851        self.params.limit = 1;
852        let sql = self.params.select_sql();
853        if self.params.sql {
854            return JsonValue::from(sql.clone());
855        }
856        let (state, mut data) = self.query(sql.as_str());
857        match state {
858            true => {
859                if data.is_empty() {
860                    return object! {};
861                }
862                for (field, _) in self.params.json.entries() {
863                    if !data[0][field].is_empty() {
864                        let json = data[0][field].to_string();
865                        let json = json::parse(&json).unwrap_or(array![]);
866                        data[0][field] = json;
867                    } else {
868                        data[0][field] = array![];
869                    }
870                }
871                data[0].clone()
872            }
873            false => {
874                object! {}
875            }
876        }
877    }
878
879    fn value(&mut self, field: &str) -> JsonValue {
880        self.params.fields = object! {};
881        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
882        self.params.page = 1;
883        self.params.limit = 1;
884        let sql = self.params.select_sql();
885        if self.params.sql {
886            return JsonValue::from(sql.clone());
887        }
888        let (state, mut data) = self.query(sql.as_str());
889        match state {
890            true => {
891                for (field, _) in self.params.json.entries() {
892                    if !data[0][field].is_empty() {
893                        let json = data[0][field].to_string();
894                        let json = json::parse(&json).unwrap_or(array![]);
895                        data[0][field] = json;
896                    } else {
897                        data[0][field] = array![];
898                    }
899                }
900                data[0][field].clone()
901            }
902            false => {
903                if self.connection.debug {
904                    info!("{:?}", data);
905                }
906                JsonValue::Null
907            }
908        }
909    }
910
911    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
912        let mut fields = vec![];
913        let mut values = vec![];
914        if !self.params.autoinc && data["id"].is_empty() {
915            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
916        }
917        for (field, value) in data.entries() {
918            fields.push(field);
919
920            if value.is_string() {
921                values.push(format!("'{}'", value.to_string().replace("'", "''")));
922                continue;
923            } else if value.is_array() {
924                if self.params.json[field].is_empty() {
925                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
926                    values.push(format!("'{}'", array));
927                } else {
928                    let json = value.to_string();
929                    let json = json.replace("'", "''");
930                    values.push(format!("'{}'", json));
931                }
932                continue;
933            } else if value.is_object() {
934                if self.params.json[field].is_empty() {
935                    values.push(format!("'{}'", value));
936                } else {
937                    let json = value.to_string();
938                    let json = json.replace("'", "''");
939                    values.push(format!("'{}'", json));
940                }
941                continue;
942            } else if value.is_number() || value.is_boolean() || value.is_null() {
943                values.push(format!("{}", value));
944                continue;
945            } else {
946                values.push(format!("'{}'", value));
947                continue;
948            }
949        }
950        let fields = fields.join(",");
951        let values = values.join(",");
952
953        let sql = format!(
954            "INSERT INTO {} ({}) VALUES ({});",
955            self.params.table, fields, values
956        );
957        if self.params.sql {
958            return JsonValue::from(sql.clone());
959        }
960        let (state, ids) = self.execute(sql.as_str());
961
962        match state {
963            true => match self.params.autoinc {
964                true => ids.clone(),
965                false => data["id"].clone(),
966            },
967            false => {
968                let thread_id = format!("{:?}", thread::current().id());
969                error!("添加失败: {} {:?} {}", thread_id, ids, sql);
970                JsonValue::from("")
971            }
972        }
973    }
974    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
975        let mut fields = String::new();
976        if !self.params.autoinc && data[0]["id"].is_empty() {
977            data[0]["id"] = "".into();
978        }
979        for (field, _) in data[0].entries() {
980            fields = format!("{},{}", fields, field);
981        }
982        fields = fields.trim_start_matches(",").parse().unwrap();
983
984        let core_count = num_cpus::get();
985        let mut p = pools::Pool::new(core_count * 4);
986
987        let mut rng = rng();
988        let i: i32 = rng.random_range(1..=100);
989        let autoinc = self.params.autoinc;
990        for list in data.members() {
991            let mut item = list.clone();
992            p.execute(move |pcindex| {
993                if !autoinc && item["id"].is_empty() {
994                    let id = format!(
995                        "{:X}{:X}{}",
996                        Local::now().timestamp_nanos_opt().unwrap(),
997                        pcindex,
998                        i
999                    );
1000                    item["id"] = id.into();
1001                }
1002                let mut values = "".to_string();
1003                for (_, value) in item.entries() {
1004                    if value.is_string() {
1005                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1006                    } else if value.is_number() {
1007                        values = format!("{},{}", values, value);
1008                    } else if value.is_boolean() {
1009                        values = format!("{},{}", values, value);
1010                        continue;
1011                    } else {
1012                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1013                    }
1014                }
1015                values = format!("({})", values.trim_start_matches(","));
1016                array![item["id"].clone(), values]
1017            });
1018        }
1019        let (ids_list, mut values) = p.insert_all();
1020        values = values.trim_start_matches(",").parse().unwrap();
1021        let sql = format!(
1022            "INSERT INTO {} ({}) VALUES {};",
1023            self.params.table, fields, values
1024        );
1025
1026        if self.params.sql {
1027            return JsonValue::from(sql.clone());
1028        }
1029        let (state, data) = self.execute(sql.as_str());
1030        match state {
1031            true => match autoinc {
1032                true => data,
1033                false => JsonValue::from(ids_list),
1034            },
1035            false => {
1036                error!("insert_all: {:?}", data);
1037                array![]
1038            }
1039        }
1040    }
1041    fn update(&mut self, data: JsonValue) -> JsonValue {
1042        let mut values = vec![];
1043        for (field, value) in data.entries() {
1044            if value.is_string() {
1045                values.push(format!(
1046                    "{}='{}'",
1047                    field,
1048                    value.to_string().replace("'", "''")
1049                ));
1050            } else if value.is_number() {
1051                values.push(format!("{}= {}", field, value));
1052            } else if value.is_array() {
1053                if self.params.json[field].is_empty() {
1054                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1055                    values.push(format!("{}='{}'", field, array));
1056                } else {
1057                    let json = value.to_string();
1058                    let json = json.replace("'", "''");
1059                    values.push(format!("{}='{}'", field, json));
1060                }
1061                continue;
1062            } else if value.is_object() {
1063                if self.params.json[field].is_empty() {
1064                    values.push(format!("{}='{}'", field, value));
1065                } else {
1066                    if value.is_empty() {
1067                        values.push(format!("{}=''", field));
1068                        continue;
1069                    }
1070                    let json = value.to_string();
1071                    let json = json.replace("'", "''");
1072                    values.push(format!("{}='{}'", field, json));
1073                }
1074                continue;
1075            } else if value.is_boolean() {
1076                values.push(format!("{}= {}", field, value));
1077            } else {
1078                values.push(format!("{}=\"{}\"", field, value));
1079            }
1080        }
1081
1082        for (field, value) in self.params.inc_dec.entries() {
1083            values.push(format!("{} = {}", field, value.to_string().clone()));
1084        }
1085
1086        let values = values.join(",");
1087
1088        let sql = format!(
1089            "UPDATE {} SET {} {};",
1090            self.params.table.clone(),
1091            values,
1092            self.params.where_sql()
1093        );
1094        if self.params.sql {
1095            return JsonValue::from(sql.clone());
1096        }
1097        let (state, data) = self.execute(sql.as_str());
1098        if state {
1099            data
1100        } else {
1101            let thread_id = format!("{:?}", thread::current().id());
1102            error!("update: {} {:?} {}", thread_id, data, sql);
1103            0.into()
1104        }
1105    }
1106    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1107        let mut values = vec![];
1108
1109        let mut ids = vec![];
1110        for (field, _) in data[0].entries() {
1111            if field == "id" {
1112                continue;
1113            }
1114            let mut fields = vec![];
1115            for row in data.members() {
1116                let value = row[field].clone();
1117                let id = row["id"].clone();
1118                ids.push(id.clone());
1119                if value.is_string() {
1120                    fields.push(format!(
1121                        "WHEN '{}' THEN '{}'",
1122                        id,
1123                        value.to_string().replace("'", "''")
1124                    ));
1125                } else if value.is_array() || value.is_object() {
1126                    if self.params.json[field].is_empty() {
1127                        fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1128                    } else {
1129                        let json = value.to_string();
1130                        let json = json.replace("'", "''");
1131                        fields.push(format!("WHEN '{}' THEN '{}'", id, json));
1132                    }
1133                    continue;
1134                } else if value.is_number() || value.is_boolean() || value.is_null() {
1135                    fields.push(format!("WHEN '{}' THEN {}", id, value));
1136                } else {
1137                    fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1138                }
1139            }
1140            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1141        }
1142        self.where_and("id", "in", ids.into());
1143        for (field, value) in self.params.inc_dec.entries() {
1144            values.push(format!("{} = {}", field, value.to_string().clone()));
1145        }
1146
1147        let values = values.join(",");
1148        let sql = format!(
1149            "UPDATE {} SET {} {} {};",
1150            self.params.table.clone(),
1151            values,
1152            self.params.where_sql(),
1153            self.params.page_limit_sql()
1154        );
1155        if self.params.sql {
1156            return JsonValue::from(sql.clone());
1157        }
1158        let (state, data) = self.execute(sql.as_str());
1159        if state {
1160            data
1161        } else {
1162            error!("update_all: {:?}", data);
1163            JsonValue::from(0)
1164        }
1165    }
1166
1167    fn delete(&mut self) -> JsonValue {
1168        let sql = format!(
1169            "delete FROM {} {} {};",
1170            self.params.table.clone(),
1171            self.params.where_sql(),
1172            self.params.page_limit_sql()
1173        );
1174        if self.params.sql {
1175            return JsonValue::from(sql.clone());
1176        }
1177        let (state, data) = self.execute(sql.as_str());
1178        match state {
1179            true => data,
1180            false => {
1181                error!("delete 失败>>> {:?}", data);
1182                JsonValue::from(0)
1183            }
1184        }
1185    }
1186
1187    fn transaction(&mut self) -> bool {
1188        let thread_id = format!("{:?}", thread::current().id());
1189
1190        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1191            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1192            t += 1;
1193            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1194            return true;
1195        }
1196
1197        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1198        let key = format!("{}{}", self.default, thread_id);
1199        TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(self.client.get_connect().unwrap())));
1200
1201        let sql = "START TRANSACTION;".to_string();
1202        let (state, _) = self.execute(sql.as_str());
1203        match state {
1204            true => {
1205                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1206                let (state, _) = self.execute(sql.as_str());
1207                match state {
1208                    true => state,
1209                    false => {
1210                        TRANS.lock().unwrap().remove(&*thread_id.clone());
1211                        TR.lock().unwrap().remove(&key.clone());
1212                        state
1213                    }
1214                }
1215            }
1216            false => {
1217                TRANS.lock().unwrap().remove(&*thread_id.clone());
1218                TR.lock().unwrap().remove(&key.clone());
1219                state
1220            }
1221        }
1222    }
1223    fn commit(&mut self) -> bool {
1224        let thread_id = format!("{:?}", thread::current().id());
1225        let sql = "COMMIT".to_string();
1226
1227        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1228        if t > 1 {
1229            t -= 1;
1230            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1231            return true;
1232        }
1233        let (state, data) = self.execute(sql.as_str());
1234        TRANS.lock().unwrap().remove(&thread_id);
1235        let key = format!("{}{}", self.default, thread_id);
1236        TR.lock().unwrap().remove(&*key);
1237        match state {
1238            true => {}
1239            false => {
1240                error!("提交事务失败: {}", data);
1241            }
1242        }
1243        state
1244    }
1245
1246    fn rollback(&mut self) -> bool {
1247        let thread_id = format!("{:?}", thread::current().id());
1248        let sql = "ROLLBACK".to_string();
1249
1250        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1251        if t > 1 {
1252            t -= 1;
1253            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1254            return true;
1255        }
1256        let (state, data) = self.execute(sql.as_str());
1257        TRANS.lock().unwrap().remove(&thread_id);
1258        let key = format!("{}{}", self.default, thread_id);
1259        TR.lock().unwrap().remove(&*key);
1260        match state {
1261            true => {}
1262            false => {
1263                error!("回滚失败: {}", data);
1264            }
1265        }
1266        state
1267    }
1268
1269    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1270        let (state, data) = self.query(sql);
1271        match state {
1272            true => Ok(data),
1273            false => Err(data.to_string()),
1274        }
1275    }
1276
1277    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1278        let (state, data) = self.execute(sql);
1279        match state {
1280            true => Ok(data),
1281            false => Err(data.to_string()),
1282        }
1283    }
1284
1285    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1286        self.params.inc_dec[field] = format!("{} + {}", field, num).into();
1287        self
1288    }
1289
1290    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1291        self.params.inc_dec[field] = format!("{} - {}", field, num).into();
1292        self
1293    }
1294    fn buildsql(&mut self) -> String {
1295        self.fetch_sql();
1296        let sql = self.select().to_string();
1297        format!("( {} ) {}", sql, self.params.table)
1298    }
1299
1300    fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1301        let main_fields = if main_fields.is_empty() {
1302            "id"
1303        } else {
1304            main_fields
1305        };
1306        let right_fields = if right_fields.is_empty() {
1307            self.params.table.clone()
1308        } else {
1309            right_fields.to_string().clone()
1310        };
1311        self.params.join_table = table.to_string();
1312        self.params.join.push(format!(
1313            " LEFT JOIN {} ON {}.{} = {}.{}",
1314            table, self.params.table, main_fields, table, right_fields
1315        ));
1316        self
1317    }
1318
1319    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1320        let main_fields = if main_fields.is_empty() {
1321            "id"
1322        } else {
1323            main_fields
1324        };
1325        let second_fields = if second_fields.is_empty() {
1326            self.params.table.clone()
1327        } else {
1328            second_fields.to_string().clone()
1329        };
1330        let sec_table_name = format!("{}{}", table, "_2");
1331        let second_table = format!("{} {}", table, sec_table_name.clone());
1332        self.params.join_table = sec_table_name.clone();
1333        self.params.join.push(format!(
1334            " INNER JOIN {} ON {}.{} = {}.{}",
1335            second_table, self.params.table, main_fields, sec_table_name, second_fields
1336        ));
1337        self
1338    }
1339}