Skip to main content

br_db/types/
sqlite.rs

1use crate::pools;
2use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::Connection;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15
16lazy_static! {
17    static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
18    static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
19    static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
20}
21
22#[derive(Clone, Debug)]
23pub struct Sqlite {
24    /// 当前连接配置
25    pub connection: Connection,
26    /// 当前选中配置
27    pub default: String,
28
29    pub params: Params,
30}
31
32impl Sqlite {
33    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
34        let dsn = connection.clone().get_dsn();
35        let db_path = Path::new(&dsn);
36        if let Some(parent) = db_path.parent() {
37            if !parent.as_os_str().is_empty() && !parent.exists() {
38                if let Err(e) = std::fs::create_dir_all(parent) {
39                    error!("sqlite 创建目录失败: {} {:?}", e, parent);
40                    return Err(e.to_string());
41                }
42                info!("sqlite 自动创建目录: {:?}", parent);
43            }
44        }
45
46        let flags = OpenFlags::new().with_create().with_read_write();
47        match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
48            Ok(e) => {
49                if let Ok(mut guard) = DBS.lock() {
50                    guard.insert(default.clone(), Arc::new(e));
51                } else {
52                    error!("sqlite 获取数据库锁失败");
53                    return Err("获取数据库锁失败".to_string());
54                }
55                Ok(Self {
56                    connection: connection.clone(),
57                    default: default.clone(),
58                    params: Params::default("sqlite"),
59                })
60            }
61            Err(e) => {
62                error!("sqlite 启动失败: {} {}", e, dsn.as_str());
63                Err(e.to_string())
64            }
65        }
66    }
67    /// Static helper to process query results - can be called from closures
68    fn query_handle_static(
69        mut statement: Statement,
70        sql: &str,
71        table: &str,
72        thread_id: &str,
73    ) -> (bool, JsonValue) {
74        let mut data = array![];
75        while let State::Row = match statement.next() {
76            Ok(e) => e,
77            Err(e) => {
78                let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
79                if in_transaction {
80                    error!("{} 查询事务: {} {}", thread_id, e, sql);
81                } else {
82                    error!("{} 非事务查询: {} {}", thread_id, e, sql);
83                }
84                return (false, data);
85            }
86        } {
87            let mut list = object! {};
88            let mut index = 0;
89            for field in statement.column_names().iter() {
90                if !list[field.as_str()].is_null() {
91                    index += 1;
92                    continue;
93                }
94                match statement.column_type(field.as_str()) {
95                    Ok(types) => match types {
96                        Type::String => {
97                            if let Ok(data) = statement.read::<String, _>(index) {
98                                match data.as_str() {
99                                    "false" => {
100                                        list[field.as_str()] = JsonValue::from(false);
101                                    }
102                                    "true" => {
103                                        list[field.as_str()] = JsonValue::from(true);
104                                    }
105                                    _ => {
106                                        list[field.as_str()] = JsonValue::from(data.clone());
107                                    }
108                                }
109                            }
110                        }
111                        Type::Integer => {
112                            let fields_cache =
113                                FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
114                            if let Some(fields) = fields_cache {
115                                if fields[field.clone()].is_empty() {
116                                    if let Ok(data) = statement.read::<i64, _>(index) {
117                                        list[field.as_str()] = data.into();
118                                    }
119                                    index += 1;
120                                    continue;
121                                }
122                                let field_type =
123                                    fields[field.clone()]["type"].as_str().unwrap_or("");
124                                match field_type {
125                                    "INTEGER" => {
126                                        if let Ok(data) = statement.read::<i64, _>(index) {
127                                            list[field.as_str()] = JsonValue::from(data == 1);
128                                        }
129                                    }
130                                    x if x.contains("int(") => {
131                                        if let Ok(data) = statement.read::<i64, _>(index) {
132                                            list[field.as_str()] = data.into();
133                                        }
134                                    }
135                                    x if x.contains("decimal(") && x.ends_with(",0)") => {
136                                        if let Ok(data) = statement.read::<f64, _>(index) {
137                                            list[field.as_str()] = data.into();
138                                        }
139                                    }
140                                    _ => {
141                                        if let Ok(data) = statement.read::<i64, _>(index) {
142                                            list[field.as_str()] = data.into();
143                                        }
144                                    }
145                                }
146                            } else if let Ok(data) = statement.read::<i64, _>(index) {
147                                list[field.as_str()] = JsonValue::from(data);
148                            }
149                        }
150                        Type::Float => {
151                            if let Ok(data) = statement.read::<f64, _>(index) {
152                                list[field.as_str()] = JsonValue::from(data);
153                            }
154                        }
155                        Type::Binary => {
156                            if let Ok(data) = statement.read::<String, _>(index) {
157                                list[field.as_str()] = JsonValue::from(data.clone());
158                            }
159                        }
160                        Type::Null => match statement.read::<String, _>(index) {
161                            Ok(data) => {
162                                list[field.as_str()] = JsonValue::from(data.clone());
163                            }
164                            Err(_) => match statement.read::<f64, _>(index) {
165                                Ok(data) => {
166                                    if data == 0.0 {
167                                        list[field.as_str()] = JsonValue::from("");
168                                    } else {
169                                        list[field.as_str()] = JsonValue::from(data);
170                                    }
171                                }
172                                Err(_) => match statement.read::<i64, _>(index) {
173                                    Ok(data) => {
174                                        if data == 0 {
175                                            list[field.as_str()] = JsonValue::from("");
176                                        } else {
177                                            list[field.as_str()] = JsonValue::from(data);
178                                        }
179                                    }
180                                    Err(e) => {
181                                        error!("Type:{} {:?}", field.as_str(), e);
182                                    }
183                                },
184                            },
185                        },
186                    },
187                    Err(e) => {
188                        error!("query Err: {e:?}");
189                    }
190                }
191                index += 1;
192            }
193            let _ = data.push(list);
194        }
195        (true, data)
196    }
197
198    pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
199        let thread_id = format!("{:?}", thread::current().id());
200        let table = self.params.table.clone();
201
202        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
203            if self.connection.debug {
204                info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
205            }
206
207            let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
208                match db.prepare(sql.clone()) {
209                    Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
210                    Err(e) => {
211                        error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
212                        (false, e.to_string().into())
213                    }
214                }
215            });
216
217            match result {
218                Some(r) => r,
219                None => {
220                    error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
221                    (false, JsonValue::from("未找到事务连接"))
222                }
223            }
224        } else {
225            if self.connection.debug {
226                info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
227            }
228            let dbs = match DBS.lock() {
229                Ok(dbs) => dbs,
230                Err(e) => {
231                    error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
232                    return (false, JsonValue::from("数据库锁定失败"));
233                }
234            };
235            let db = match dbs.get(&self.default) {
236                Some(db) => db.clone(),
237                None => {
238                    error!(
239                        "{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
240                        self.default
241                    );
242                    return (false, JsonValue::from("未找到默认数据库配置"));
243                }
244            };
245            drop(dbs);
246            let result = match db.prepare(sql.clone()) {
247                Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
248                Err(e) => {
249                    error!("{thread_id} 查询非事务: Err: {e}");
250                    (false, e.to_string().into())
251                }
252            };
253            result
254        }
255    }
256
257    pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
258        let thread_id = format!("{:?}", thread::current().id());
259
260        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
261            if self.connection.debug {
262                info!("{} 执行事务: sql: {}", thread_id, sql.clone());
263            }
264
265            let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
266                match db.execute(sql.clone()) {
267                    Ok(_) => {
268                        let count = db.change_count();
269                        (true, JsonValue::from(count))
270                    }
271                    Err(e) => (false, JsonValue::from(e.to_string())),
272                }
273            });
274
275            match result {
276                Some((true, count)) => {
277                    if self.connection.debug {
278                        info!("{} count:{}", thread_id, count);
279                    }
280                    (true, count)
281                }
282                Some((false, err)) => {
283                    error!(
284                        "{} 执行事务: \r\nErr: {}\r\n{}",
285                        thread_id,
286                        err,
287                        sql.clone()
288                    );
289                    (false, err)
290                }
291                None => {
292                    error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
293                    (false, JsonValue::from("未找到事务连接"))
294                }
295            }
296        } else {
297            if self.connection.debug {
298                info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
299            }
300            let dbs = match DBS.lock() {
301                Ok(dbs) => dbs,
302                Err(e) => {
303                    error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
304                    return (false, JsonValue::from("数据库锁定失败"));
305                }
306            };
307
308            let db = match dbs.get(&self.default) {
309                Some(db) => db.clone(),
310                None => {
311                    error!(
312                        "{} 未找到默认数据库配置: {}\r\nSQL: {}",
313                        thread_id, self.default, sql
314                    );
315                    return (false, JsonValue::from("未找到默认数据库配置"));
316                }
317            };
318            drop(dbs);
319            match db.execute(sql.clone()) {
320                Ok(_) => {
321                    let count = db.change_count();
322                    if self.connection.debug {
323                        info!(
324                            "{} count: {} total_count: {}",
325                            thread_id,
326                            count,
327                            db.total_change_count()
328                        );
329                    }
330                    (true, JsonValue::from(count))
331                }
332                Err(e) => {
333                    error!(
334                        "{} 执行非事务: Err: {}\r\nSQL: {}",
335                        thread_id,
336                        e,
337                        sql.clone()
338                    );
339                    (false, JsonValue::from(e.to_string()))
340                }
341            }
342        }
343    }
344}
345
346impl DbMode for Sqlite {
347    fn database_tables(&mut self) -> JsonValue {
348        let sql = "select name from sqlite_master where type='table' order by name;".to_string();
349        match self.sql(sql.as_str()) {
350            Ok(e) => {
351                let mut list = vec![];
352                for item in e.members() {
353                    list.push(item["name"].clone());
354                }
355                list.into()
356            }
357            Err(_) => {
358                array![]
359            }
360        }
361    }
362    fn database_create(&mut self, name: &str) -> bool {
363        let current_dsn = self.connection.clone().get_dsn();
364        let current_path = Path::new(&current_dsn);
365
366        let new_path = if let Some(parent) = current_path.parent() {
367            if parent.as_os_str().is_empty() {
368                Path::new(name).to_path_buf()
369            } else {
370                parent.join(name)
371            }
372        } else {
373            Path::new(name).to_path_buf()
374        };
375
376        if let Some(parent) = new_path.parent() {
377            if !parent.as_os_str().is_empty() && !parent.exists() {
378                if let Err(e) = std::fs::create_dir_all(parent) {
379                    error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
380                    return false;
381                }
382            }
383        }
384
385        let flags = OpenFlags::new().with_create().with_read_write();
386        match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
387            Ok(_) => true,
388            Err(e) => {
389                error!(
390                    "sqlite database_create 创建数据库失败: {} {:?}",
391                    e, new_path
392                );
393                false
394            }
395        }
396    }
397
398    fn truncate(&mut self, table: &str) -> bool {
399        let sql = format!("DELETE FROM `{table}`");
400        let (state, _) = self.execute(sql);
401        state
402    }
403}
404
405impl Mode for Sqlite {
406    fn table_create(&mut self, options: TableOptions) -> JsonValue {
407        let mut sql = String::new();
408        let mut unique_fields = String::new();
409        let mut unique_name = String::new();
410        let mut unique = String::new();
411        for item in options.table_unique.iter() {
412            if unique_fields.is_empty() {
413                unique_fields = format!("`{item}`");
414                unique_name = format!("unique_{item}");
415            } else {
416                unique_fields = format!("{unique_fields},`{item}`");
417                unique_name = format!("{unique_name}_{item}");
418            }
419            unique = format!(
420                "CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
421                unique_name, options.table_name, unique_fields
422            );
423        }
424
425        let mut index = vec![];
426        for row in options.table_index.iter() {
427            let mut index_fields = String::new();
428            let mut index_name = String::new();
429            for item in row.iter() {
430                if index_fields.is_empty() {
431                    index_fields = format!("`{item}`");
432                    index_name = format!("index_{item}");
433                } else {
434                    index_fields = format!("{index_fields},`{item}`");
435                    index_name = format!("{index_name}_{item}");
436                }
437            }
438            index.push(format!(
439                "CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
440                index_name, options.table_name, index_fields
441            ));
442        }
443
444        for (name, field) in options.table_fields.entries() {
445            let row = br_fields::field("sqlite", name, field.clone());
446            sql = format!("{sql} {row},\r\n");
447        }
448
449        sql = sql.trim_end_matches(",\r\n").to_string();
450
451        let sql = format!(
452            "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
453            options.table_name, sql
454        );
455        if self.params.sql {
456            let mut list = vec![sql];
457            if !unique.is_empty() {
458                list.push(unique)
459            }
460            if !index.is_empty() {
461                list.extend(index)
462            }
463
464            return JsonValue::from(list.join(""));
465        }
466
467        let thread_id = format!("{:?}", thread::current().id());
468
469        let (_, table_exists) = self.query(format!(
470            "SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
471            options.table_name
472        ));
473        let is_new_table = table_exists.is_empty();
474
475        let (state, _) = self.execute(sql.clone());
476        if state {
477            if is_new_table {
478                if !unique.is_empty() {
479                    let (state, _) = self.execute(unique.clone());
480                    info!(
481                        "{} {} 唯一索引创建:{}",
482                        thread_id, options.table_name, state
483                    );
484                }
485                for sql in index.iter() {
486                    let (state, _) = self.execute(sql.clone());
487                    info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
488                }
489            }
490            JsonValue::from(true)
491        } else {
492            JsonValue::from(false)
493        }
494    }
495    fn table_update(&mut self, options: TableOptions) -> JsonValue {
496        let thread_id = format!("{:?}", thread::current().id());
497
498        let mut sql = String::new();
499        let mut add = vec![];
500        let mut del = vec![];
501        let mut put = vec![];
502
503        let (_, mut fields_list) =
504            self.query(format!("pragma table_info ('{}')", options.table_name));
505        let mut field_old = object! {};
506        for item in fields_list.members_mut() {
507            item["dflt_value"] = item["dflt_value"]
508                .to_string()
509                .trim_start_matches("'")
510                .trim_end_matches("'")
511                .into();
512            if let Some(name) = item["name"].as_str() {
513                field_old[name] = item.clone();
514                if options.table_fields[name].is_empty() {
515                    del.push(name.to_string());
516                }
517            }
518        }
519
520        let mut fields_list = vec![];
521        let mut fields_list_new = vec![];
522
523        for (name, field) in options.table_fields.entries() {
524            if field_old[name].is_empty() {
525                add.push(name.to_string());
526            } else {
527                fields_list.push(name.to_string());
528                fields_list_new.push(name.to_string());
529                let field_mode = field["mode"].as_str().unwrap_or("");
530                let old_value = match field_mode {
531                    "select" => {
532                        if field_old[name]["dflt_value"].clone().is_empty() {
533                            "".to_string()
534                        } else {
535                            field_old[name]["dflt_value"].clone().to_string()
536                        }
537                    }
538                    "switch" => (field_old[name]["dflt_value"]
539                        .to_string()
540                        .parse::<i32>()
541                        .unwrap_or(0)
542                        == 1)
543                        .to_string(),
544                    _ => field_old[name]["dflt_value"].clone().to_string(),
545                };
546                let new_value = match field_mode {
547                    "select" => field["def"]
548                        .members()
549                        .map(|x| x.to_string())
550                        .collect::<Vec<String>>()
551                        .join(","),
552                    _ => field["def"].clone().to_string(),
553                };
554                if old_value != new_value {
555                    info!(
556                        "{} 差异化当前: {} old_value: {} new_value: {} {}",
557                        options.table_name,
558                        name,
559                        old_value,
560                        new_value,
561                        old_value != new_value
562                    );
563                    info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
564                    put.push(name.to_string());
565                } else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
566                    && name != options.table_key
567                {
568                    info!("{} 主键替换: {}", options.table_name, name);
569                    put.push(name.to_string());
570                }
571            }
572        }
573
574        let mut unique_fields = String::new();
575        let mut unique_name = String::new();
576        let mut unique = String::new();
577
578        // 唯一索引
579        for item in options.table_unique.iter() {
580            if unique_fields.is_empty() {
581                unique_fields = format!("`{item}`");
582                unique_name = format!("unique_{item}");
583            } else {
584                unique_fields = format!("{unique_fields},`{item}`");
585                unique_name = format!("{unique_name}_{item}");
586            }
587            unique = format!(
588                "CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
589                options.table_name, unique_name, options.table_name, unique_fields
590            )
591        }
592
593        // 索引
594        let mut index = vec![];
595        for row in options.table_index.iter() {
596            let mut index_fields = String::new();
597            let mut index_name = String::new();
598            for item in row.iter() {
599                if index_fields.is_empty() {
600                    index_fields = item.to_string();
601                    index_name = format!("index_{item}");
602                } else {
603                    index_fields = format!("{index_fields},{item}");
604                    index_name = format!("{index_name}_{item}");
605                }
606            }
607            index.push(format!(
608                "CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
609                options.table_name, index_name, options.table_name, index_fields
610            ));
611        }
612        for (name, field) in options.table_fields.entries() {
613            let row = br_fields::field("sqlite", name, field.clone());
614            sql = format!("{sql} {row},\r\n");
615        }
616
617        if !unique.is_empty() || !index.is_empty() {
618            let unique_text = unique.clone();
619            let (_, unique_old) =
620                self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
621            let mut index_old_list = vec![];
622            let mut index_new_list = vec![];
623            for item in unique_old.members() {
624                let origin = item["origin"].as_str().unwrap_or("");
625                let unique_1 = item["unique"].as_usize().unwrap_or(0);
626                let name = item["name"].as_str().unwrap_or("");
627
628                if origin == "c" && unique_1 == 1 {
629                    if unique.contains(format!(" {name} ").as_str()) {
630                        unique = "".to_string();
631                    }
632                    continue;
633                }
634                if origin == "c" && unique_1 == 0 {
635                    index_old_list.push(item);
636                    for item in index.iter() {
637                        if item.contains(format!(" {name} ").as_str()) {
638                            index_new_list.push(item.clone());
639                        }
640                    }
641                    continue;
642                }
643            }
644            if unique.is_empty() {
645                if index_old_list.len() == index.len()
646                    && index_old_list.len() == index_new_list.len()
647                {
648                    index = vec![];
649                } else {
650                    unique = unique_text;
651                }
652            }
653        }
654
655        sql = sql.trim_end_matches(",\r\n").to_string();
656        sql = format!(
657            "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
658            options.table_name, sql
659        );
660
661        let sqls = format!(
662            "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
663            options.table_name,
664            fields_list_new.join("`,`"),
665            fields_list.join("`,`")
666        );
667        let drop_sql = format!("drop table {};\r\n", options.table_name);
668        let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
669        let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
670
671        if self.params.sql {
672            let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
673            if !unique.is_empty() {
674                list.push(unique)
675            }
676            if !index.is_empty() {
677                list.extend(index)
678            }
679            return JsonValue::from(list.join(""));
680        }
681
682        if add.is_empty()
683            && del.is_empty()
684            && unique.is_empty()
685            && index.is_empty()
686            && put.is_empty()
687        {
688            return JsonValue::from(-1);
689        }
690
691        let (state, _) = self.execute(sql.clone());
692        let data = match state {
693            true => {
694                let (state, _) = self.execute(sqls.clone());
695                match state {
696                    true => {
697                        let (state, _) = self.execute(drop_sql);
698                        match state {
699                            true => {
700                                let (state, _) = self.execute(alter_sql);
701                                match state {
702                                    true => {
703                                        if !unique.is_empty() {
704                                            let (state, _) = self.execute(unique.clone());
705                                            info!(
706                                                "{} {} 唯一索引创建:{}",
707                                                thread_id, options.table_name, state
708                                            );
709                                        }
710                                        for index_sql in index.iter() {
711                                            let (state, _) = self.execute(index_sql.clone());
712                                            match state {
713                                                true => {}
714                                                false => {
715                                                    error!(
716                                                        "{} 索引创建失败: {} {}",
717                                                        options.table_name, state, index_sql
718                                                    );
719                                                    return JsonValue::from(0);
720                                                }
721                                            };
722                                        }
723
724                                        return JsonValue::from(1);
725                                    }
726                                    false => {
727                                        error!("{} 修改表名失败", options.table_name);
728                                        return JsonValue::from(0);
729                                    }
730                                }
731                            }
732                            false => {
733                                error!("{} 删除本表失败", options.table_name);
734                                return JsonValue::from(0);
735                            }
736                        }
737                    }
738                    false => {
739                        error!(
740                            "{} 添加tmp表记录失败 {:#} {:#}",
741                            options.table_name, sql, sqls
742                        );
743                        let sql = format!("drop table {}_tmp", options.table_name);
744                        let (_, _) = self.execute(sql);
745                        0
746                    }
747                }
748            }
749            false => {
750                error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
751                let (_, _) = self.execute(drop_sql_temp);
752                0
753            }
754        };
755        JsonValue::from(data)
756    }
757
758    fn table_info(&mut self, table: &str) -> JsonValue {
759        if let Ok(guard) = FIELDS.lock() {
760            if let Some(cached) = guard.get(table) {
761                return cached.clone();
762            }
763        }
764        let sql = format!("PRAGMA table_info({table})");
765        let (state, data) = self.query(sql);
766
767        match state {
768            true => {
769                let mut fields = object! {};
770                for item in data.members() {
771                    if let Some(name) = item["name"].as_str() {
772                        fields[name] = item.clone();
773                    }
774                }
775                if let Ok(mut guard) = FIELDS.lock() {
776                    guard.insert(table.to_string(), fields.clone());
777                }
778                data
779            }
780            false => object! {},
781        }
782    }
783
784    fn table_is_exist(&mut self, name: &str) -> bool {
785        let sql = format!(
786            "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
787        );
788        let (state, data) = self.query(sql);
789        if state && !data.is_empty() {
790            let count = data[0]["count"].as_i64().unwrap_or(0);
791            return count > 0;
792        }
793        false
794    }
795
796    fn table(&mut self, name: &str) -> &mut Sqlite {
797        self.params = Params::default(self.connection.mode.str().as_str());
798        let table_name = format!("{}{}", self.connection.prefix, name);
799        if !super::sql_safety::validate_table_name(&table_name) {
800            error!("Invalid table name: {}", name);
801        }
802        self.params.table = table_name.clone();
803        self.params.join_table = table_name;
804        self
805    }
806    fn change_table(&mut self, name: &str) -> &mut Self {
807        self.params.join_table = name.to_string();
808        self
809    }
810    fn autoinc(&mut self) -> &mut Self {
811        self.params.autoinc = true;
812        self
813    }
814
815    fn timestamps(&mut self) -> &mut Self {
816        self.params.timestamps = true;
817        self
818    }
819
820    fn fetch_sql(&mut self) -> &mut Self {
821        self.params.sql = true;
822        self
823    }
824
825    fn order(&mut self, field: &str, by: bool) -> &mut Self {
826        self.params.order[field] = {
827            if by {
828                "DESC"
829            } else {
830                "ASC"
831            }
832        }
833        .into();
834        self
835    }
836
837    fn group(&mut self, field: &str) -> &mut Self {
838        let fields: Vec<&str> = field.split(",").collect();
839        for field in fields.iter() {
840            let fields = field.to_string();
841            self.params.group[fields.as_str()] = fields.clone().into();
842            self.params.fields[fields.as_str()] = fields.clone().into();
843        }
844        self
845    }
846
847    fn distinct(&mut self) -> &mut Self {
848        self.params.distinct = true;
849        self
850    }
851    fn json(&mut self, field: &str) -> &mut Self {
852        let list: Vec<&str> = field.split(",").collect();
853        for item in list.iter() {
854            self.params.json[item.to_string().as_str()] = item.to_string().into();
855        }
856        self
857    }
858
859    fn location(&mut self, field: &str) -> &mut Self {
860        let list: Vec<&str> = field.split(",").collect();
861        for item in list.iter() {
862            self.params.location[item.to_string().as_str()] = item.to_string().into();
863        }
864        self
865    }
866
867    fn field(&mut self, field: &str) -> &mut Self {
868        let list: Vec<&str> = field.split(",").collect();
869        let join_table = if self.params.join_table.is_empty() {
870            self.params.table.clone()
871        } else {
872            self.params.join_table.clone()
873        };
874        for item in list.iter() {
875            let item = item.to_string();
876            if item.contains(" as ") {
877                let text = item.split(" as ").collect::<Vec<&str>>().clone();
878                self.params.fields[item] =
879                    format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
880            } else {
881                self.params.fields[item] = format!("{join_table}.`{item}`").into();
882            }
883        }
884        self
885    }
886
887    fn field_raw(&mut self, expr: &str) -> &mut Self {
888        self.params.fields[expr] = expr.into();
889        self
890    }
891
892    fn hidden(&mut self, name: &str) -> &mut Self {
893        let hidden: Vec<&str> = name.split(",").collect();
894        let sql = format!("PRAGMA table_info({})", self.params.table);
895        let (_, data) = self.query(sql);
896        for item in data.members() {
897            if let Some(name) = item["name"].as_str() {
898                if !hidden.contains(&name) {
899                    self.params.fields[name] = name.into();
900                }
901            }
902        }
903        self
904    }
905
906    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
907        for f in field.split('|') {
908            if !super::sql_safety::validate_field_name(f) {
909                error!("Invalid field name: {}", f);
910            }
911        }
912        if !super::sql_safety::validate_compare_orator(compare) {
913            error!("Invalid compare operator: {}", compare);
914        }
915        let join_table = if self.params.join_table.is_empty() {
916            self.params.table.clone()
917        } else {
918            self.params.join_table.clone()
919        };
920
921        if value.is_boolean() {
922            if value.as_bool().unwrap_or(false) {
923                value = 1.into();
924            } else {
925                value = 0.into();
926            }
927        }
928
929        match compare {
930            "between" => {
931                self.params.where_and.push(format!(
932                    "{}.`{}` between '{}' AND '{}'",
933                    join_table, field, value[0], value[1]
934                ));
935            }
936            "set" => {
937                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
938                let mut wheredata = vec![];
939                for item in list.iter() {
940                    wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
941                }
942                self.params
943                    .where_and
944                    .push(format!("({})", wheredata.join(" or ")));
945            }
946            "notin" => {
947                let mut text = String::new();
948                for item in value.members() {
949                    text = format!("{text},'{item}'");
950                }
951                text = text.trim_start_matches(",").into();
952                self.params
953                    .where_and
954                    .push(format!("{join_table}.`{field}` not in ({text})"));
955            }
956            "in" => {
957                let mut text = String::new();
958                if value.is_array() {
959                    for item in value.members() {
960                        text = format!("{text},'{item}'");
961                    }
962                } else {
963                    let value = value.to_string();
964                    let value: Vec<&str> = value.split(",").collect();
965                    for item in value.iter() {
966                        text = format!("{text},'{item}'");
967                    }
968                }
969                text = text.trim_start_matches(",").into();
970
971                self.params
972                    .where_and
973                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
974            }
975            "=" => {
976                if value.is_null() {
977                    self.params
978                        .where_and
979                        .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
980                } else {
981                    self.params
982                        .where_and
983                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
984                }
985            }
986            _ => {
987                if value.is_null() {
988                    self.params
989                        .where_and
990                        .push(format!("{join_table}.`{field}` {compare} {value}"));
991                } else {
992                    self.params
993                        .where_and
994                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
995                }
996            }
997        }
998        self
999    }
1000    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1001        for f in field.split('|') {
1002            if !super::sql_safety::validate_field_name(f) {
1003                error!("Invalid field name: {}", f);
1004            }
1005        }
1006        if !super::sql_safety::validate_compare_orator(compare) {
1007            error!("Invalid compare operator: {}", compare);
1008        }
1009        let join_table = if self.params.join_table.is_empty() {
1010            self.params.table.clone()
1011        } else {
1012            self.params.join_table.clone()
1013        };
1014
1015        if value.is_boolean() {
1016            if value.as_bool().unwrap_or(false) {
1017                value = 1.into();
1018            } else {
1019                value = 0.into();
1020            }
1021        }
1022        match compare {
1023            "between" => {
1024                self.params.where_or.push(format!(
1025                    "{}.`{}` between '{}' AND '{}'",
1026                    join_table, field, value[0], value[1]
1027                ));
1028            }
1029            "set" => {
1030                let tt = value.to_string().replace(",", "%");
1031                self.params
1032                    .where_or
1033                    .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1034            }
1035            "notin" => {
1036                let mut text = String::new();
1037                for item in value.members() {
1038                    text = format!("{text},'{item}'");
1039                }
1040                text = text.trim_start_matches(",").into();
1041                self.params
1042                    .where_or
1043                    .push(format!("{join_table}.`{field}` not in ({text})"));
1044            }
1045            "in" => {
1046                let mut text = String::new();
1047                if value.is_array() {
1048                    for item in value.members() {
1049                        text = format!("{text},'{item}'");
1050                    }
1051                } else {
1052                    let value = value.as_str().unwrap_or("");
1053                    let value: Vec<&str> = value.split(",").collect();
1054                    for item in value.iter() {
1055                        text = format!("{text},'{item}'");
1056                    }
1057                }
1058                text = text.trim_start_matches(",").into();
1059                self.params
1060                    .where_or
1061                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1062            }
1063            _ => {
1064                self.params
1065                    .where_or
1066                    .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1067            }
1068        }
1069        self
1070    }
1071    fn where_raw(&mut self, expr: &str) -> &mut Self {
1072        self.params.where_and.push(expr.to_string());
1073        self
1074    }
1075
1076    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1077        self.params
1078            .where_and
1079            .push(format!("`{field}` IN ({sub_sql})"));
1080        self
1081    }
1082
1083    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1084        self.params
1085            .where_and
1086            .push(format!("`{field}` NOT IN ({sub_sql})"));
1087        self
1088    }
1089
1090    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1091        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1092        self
1093    }
1094
1095    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1096        self.params
1097            .where_and
1098            .push(format!("NOT EXISTS ({sub_sql})"));
1099        self
1100    }
1101
1102    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1103        self.params.where_column = format!(
1104            "{}.`{}` {} {}.`{}`",
1105            self.params.table, field_a, compare, self.params.table, field_b
1106        );
1107        self
1108    }
1109
1110    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1111        self.params
1112            .update_column
1113            .push(format!("{field_a} = {compare}"));
1114        self
1115    }
1116
1117    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1118        self.params.page = page;
1119        self.params.limit = limit;
1120        self
1121    }
1122
1123    fn limit(&mut self, count: i32) -> &mut Self {
1124        self.params.limit_only = count;
1125        self
1126    }
1127
1128    fn column(&mut self, field: &str) -> JsonValue {
1129        self.field(field);
1130        self.group(field);
1131        let sql = self.params.select_sql();
1132        if self.params.sql {
1133            return JsonValue::from(sql.clone());
1134        }
1135        self.table_info(self.params.table.clone().as_str());
1136        let (state, data) = self.query(sql);
1137        if state {
1138            let mut list = array![];
1139            for item in data.members() {
1140                if self.params.json[field].is_empty() {
1141                    let _ = list.push(item[field].clone());
1142                } else {
1143                    let data =
1144                        json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1145                    let _ = list.push(data);
1146                }
1147            }
1148            list
1149        } else {
1150            array![]
1151        }
1152    }
1153
1154    fn count(&mut self) -> JsonValue {
1155        self.params.fields["count"] = "count(*) as count".to_string().into();
1156        let sql = self.params.select_sql();
1157        if self.params.sql {
1158            return JsonValue::from(sql.clone());
1159        }
1160        let (state, data) = self.query(sql);
1161        match state {
1162            true => data[0]["count"].clone(),
1163            false => JsonValue::from(0),
1164        }
1165    }
1166
1167    fn max(&mut self, field: &str) -> JsonValue {
1168        self.params.fields[field] = format!("max({field}) as {field}").into();
1169        let sql = self.params.select_sql();
1170        if self.params.sql {
1171            return JsonValue::from(sql.clone());
1172        }
1173        let (state, data) = self.query(sql);
1174        if state {
1175            if data.len() > 1 {
1176                return data;
1177            }
1178            data[0][field].clone()
1179        } else {
1180            JsonValue::from(0.0)
1181        }
1182    }
1183
1184    fn min(&mut self, field: &str) -> JsonValue {
1185        self.params.fields[field] = format!("min({field}) as {field}").into();
1186        let sql = self.params.select_sql();
1187        let (state, data) = self.query(sql);
1188        if state {
1189            if data.len() > 1 {
1190                return data;
1191            }
1192            data[0][field].clone()
1193        } else {
1194            JsonValue::from(0.0)
1195        }
1196    }
1197
1198    fn sum(&mut self, field: &str) -> JsonValue {
1199        self.params.fields[field] = format!("sum({field}) as {field}").into();
1200        let sql = self.params.select_sql();
1201        if self.params.sql {
1202            return JsonValue::from(sql.clone());
1203        }
1204        let (state, data) = self.query(sql);
1205        match state {
1206            true => {
1207                if data.len() > 1 {
1208                    return data;
1209                }
1210                if self.params.fields.len() > 1 {
1211                    return data[0].clone();
1212                }
1213                data[0][field].clone()
1214            }
1215            false => JsonValue::from(0),
1216        }
1217    }
1218    fn avg(&mut self, field: &str) -> JsonValue {
1219        self.params.fields[field] = format!("avg({field}) as {field}").into();
1220        let sql = self.params.select_sql();
1221        if self.params.sql {
1222            return JsonValue::from(sql.clone());
1223        }
1224        let (state, data) = self.query(sql);
1225        if state {
1226            if data.len() > 1 {
1227                return data;
1228            }
1229            data[0][field].clone()
1230        } else {
1231            JsonValue::from(0)
1232        }
1233    }
1234    fn having(&mut self, expr: &str) -> &mut Self {
1235        self.params.having.push(expr.to_string());
1236        self
1237    }
1238    fn select(&mut self) -> JsonValue {
1239        let sql = self.params.select_sql();
1240        if self.params.sql {
1241            return JsonValue::from(sql.clone());
1242        }
1243        self.table_info(self.params.table.clone().as_str());
1244        let (state, mut data) = self.query(sql.clone());
1245        match state {
1246            true => {
1247                for (field, _) in self.params.json.entries() {
1248                    for item in data.members_mut() {
1249                        if !item[field].is_empty() {
1250                            let json = item[field].to_string();
1251                            item[field] = match json::parse(&json) {
1252                                Ok(e) => e,
1253                                Err(_) => JsonValue::from(json),
1254                            };
1255                        }
1256                    }
1257                }
1258                data.clone()
1259            }
1260            false => {
1261                error!("{data:?}");
1262                array![]
1263            }
1264        }
1265    }
1266    fn find(&mut self) -> JsonValue {
1267        self.params.page = 1;
1268        self.params.limit = 1;
1269        let sql = self.params.select_sql();
1270        if self.params.sql {
1271            return JsonValue::from(sql.clone());
1272        }
1273
1274        self.table_info(self.params.table.clone().as_str());
1275        let (state, mut data) = self.query(sql.clone());
1276        match state {
1277            true => {
1278                if data.is_empty() {
1279                    return object! {};
1280                }
1281                for (field, _) in self.params.json.entries() {
1282                    if !data[0][field].is_empty() {
1283                        let json = data[0][field].to_string();
1284                        let json = json::parse(&json).unwrap_or(array![]);
1285                        data[0][field] = json;
1286                    } else {
1287                        data[0][field] = array![];
1288                    }
1289                }
1290                data[0].clone()
1291            }
1292            false => {
1293                error!("{data:?}");
1294                object! {}
1295            }
1296        }
1297    }
1298
1299    fn value(&mut self, field: &str) -> JsonValue {
1300        self.params.fields = object! {};
1301        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1302        self.params.page = 1;
1303        self.params.limit = 1;
1304        let sql = self.params.select_sql();
1305        if self.params.sql {
1306            return JsonValue::from(sql.clone());
1307        }
1308        self.table_info(self.params.table.clone().as_str());
1309        let (state, mut data) = self.query(sql.clone());
1310        match state {
1311            true => {
1312                for (field, _) in self.params.json.entries() {
1313                    if !data[0][field].is_empty() {
1314                        let json = data[0][field].to_string();
1315                        let json = json::parse(&json).unwrap_or(array![]);
1316                        data[0][field] = json;
1317                    } else {
1318                        data[0][field] = array![];
1319                    }
1320                }
1321                data[0][field].clone()
1322            }
1323            false => {
1324                if self.connection.debug {
1325                    info!("{data:?}");
1326                }
1327                JsonValue::Null
1328            }
1329        }
1330    }
1331
1332    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1333        let mut fields = vec![];
1334        let mut values = vec![];
1335
1336        if !self.params.autoinc && data["id"].is_empty() {
1337            let thread_id = format!("{:?}", std::thread::current().id());
1338            let thread_num: u64 = thread_id
1339                .trim_start_matches("ThreadId(")
1340                .trim_end_matches(")")
1341                .parse()
1342                .unwrap_or(0);
1343            data["id"] = format!(
1344                "{:X}{:X}",
1345                Local::now().timestamp_nanos_opt().unwrap_or(0),
1346                thread_num
1347            )
1348            .into();
1349        }
1350        for (field, value) in data.entries() {
1351            fields.push(format!("`{field}`"));
1352
1353            if value.is_string() {
1354                if value.to_string().contains("'") {
1355                    values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1356                    continue;
1357                } else if value.to_string().contains('"') {
1358                    values.push(format!("'{value}'"));
1359                    continue;
1360                } else {
1361                    values.push(format!("\"{value}\""));
1362                    continue;
1363                }
1364            } else if value.is_array() || value.is_object() {
1365                if self.params.json[field].is_empty() {
1366                    values.push(format!("'{value}'"));
1367                } else {
1368                    let json = value.to_string();
1369                    let json = json.replace("'", "''");
1370                    values.push(format!("'{json}'"));
1371                }
1372                continue;
1373            } else if value.is_number() || value.is_boolean() || value.is_null() {
1374                values.push(format!("{value}"));
1375                continue;
1376            } else {
1377                values.push(format!("'{value}'"));
1378                continue;
1379            }
1380        }
1381        let fields = fields.join(",");
1382        let values = values.join(",");
1383
1384        let sql = format!(
1385            "INSERT INTO `{}` ({}) VALUES ({});",
1386            self.params.table, fields, values
1387        );
1388        if self.params.sql {
1389            return JsonValue::from(sql.clone());
1390        }
1391        let (state, ids) = self.execute(sql);
1392        match state {
1393            true => {
1394                if self.params.autoinc {
1395                    let (state, ids) =
1396                        self.query(format!("select max(id) as id from {}", self.params.table));
1397                    return match state {
1398                        true => ids[0]["id"].clone(),
1399                        false => {
1400                            error!("{ids}");
1401                            JsonValue::from("")
1402                        }
1403                    };
1404                }
1405                data["id"].clone()
1406            }
1407            false => {
1408                error!("{ids}");
1409                JsonValue::from("")
1410            }
1411        }
1412    }
1413    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1414        let mut fields = String::new();
1415
1416        if !self.params.autoinc && data[0]["id"].is_empty() {
1417            data[0]["id"] = "".into();
1418        }
1419        for (field, _) in data[0].entries() {
1420            fields = format!("{fields},`{field}`");
1421        }
1422        fields = fields.trim_start_matches(",").to_string();
1423
1424        let core_count = num_cpus::get();
1425        let mut p = pools::Pool::new(core_count * 4);
1426        let autoinc = self.params.autoinc;
1427        for list in data.members() {
1428            let mut item = list.clone();
1429            p.execute(move |pcindex| {
1430                if !autoinc && item["id"].is_empty() {
1431                    let id = format!(
1432                        "{:X}{:X}",
1433                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1434                        pcindex
1435                    );
1436                    item["id"] = id.into();
1437                }
1438                let mut values = "".to_string();
1439                for (_, value) in item.entries() {
1440                    if value.is_string() {
1441                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1442                    } else if value.is_number() || value.is_boolean() {
1443                        values = format!("{values},{value}");
1444                    } else {
1445                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1446                    }
1447                }
1448                values = format!("({})", values.trim_start_matches(","));
1449                array![item["id"].clone(), values]
1450            });
1451        }
1452        let (ids_list, mut values) = p.insert_all();
1453
1454        values = values.trim_start_matches(",").to_string();
1455
1456        let sql = format!(
1457            "INSERT INTO {} ({}) VALUES {};",
1458            self.params.table, fields, values
1459        );
1460        if self.params.sql {
1461            return JsonValue::from(sql.clone());
1462        }
1463        let (state, data) = self.execute(sql.clone());
1464        match state {
1465            true => {
1466                if self.params.autoinc {
1467                    let (state, ids) = self.query(format!(
1468                        "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1469                        self.params.table,
1470                        ids_list.len()
1471                    ));
1472                    return match state {
1473                        true => {
1474                            let mut idlist = array![];
1475                            for item in ids.members() {
1476                                let _ = idlist.push(item["id"].clone());
1477                            }
1478                            idlist
1479                        }
1480                        false => {
1481                            error!("批量添加失败: {ids:?} {sql}");
1482                            array![]
1483                        }
1484                    };
1485                }
1486                JsonValue::from(ids_list)
1487            }
1488            false => {
1489                error!("批量添加失败: {data:?} {sql}");
1490                array![]
1491            }
1492        }
1493    }
1494
1495    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1496        let mut fields = vec![];
1497        let mut values = vec![];
1498
1499        if !self.params.autoinc && data["id"].is_empty() {
1500            let thread_id = format!("{:?}", std::thread::current().id());
1501            let thread_num: u64 = thread_id
1502                .trim_start_matches("ThreadId(")
1503                .trim_end_matches(")")
1504                .parse()
1505                .unwrap_or(0);
1506            data["id"] = format!(
1507                "{:X}{:X}",
1508                Local::now().timestamp_nanos_opt().unwrap_or(0),
1509                thread_num
1510            )
1511            .into();
1512        }
1513        for (field, value) in data.entries() {
1514            fields.push(format!("`{field}`"));
1515
1516            if value.is_string() {
1517                if value.to_string().contains("'") {
1518                    values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1519                    continue;
1520                } else if value.to_string().contains('"') {
1521                    values.push(format!("'{value}'"));
1522                    continue;
1523                } else {
1524                    values.push(format!("\"{value}\""));
1525                    continue;
1526                }
1527            } else if value.is_array() || value.is_object() {
1528                if self.params.json[field].is_empty() {
1529                    values.push(format!("'{value}'"));
1530                } else {
1531                    let json = value.to_string();
1532                    let json = json.replace("'", "''");
1533                    values.push(format!("'{json}'"));
1534                }
1535                continue;
1536            } else if value.is_number() || value.is_boolean() || value.is_null() {
1537                values.push(format!("{value}"));
1538                continue;
1539            } else {
1540                values.push(format!("'{value}'"));
1541                continue;
1542            }
1543        }
1544
1545        let conflict_cols: Vec<String> =
1546            conflict_fields.iter().map(|f| format!("`{}`", f)).collect();
1547
1548        let update_set: Vec<String> = fields
1549            .iter()
1550            .filter(|f| {
1551                let name = f.trim_matches('`');
1552                !conflict_fields.contains(&name) && name != "id"
1553            })
1554            .map(|f| format!("{f}=excluded.{f}"))
1555            .collect();
1556
1557        let fields_str = fields.join(",");
1558        let values_str = values.join(",");
1559
1560        let sql = format!(
1561            "INSERT INTO `{}` ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1562            self.params.table,
1563            fields_str,
1564            values_str,
1565            conflict_cols.join(","),
1566            update_set.join(",")
1567        );
1568        if self.params.sql {
1569            return JsonValue::from(sql.clone());
1570        }
1571        let (state, result) = self.execute(sql);
1572        match state {
1573            true => {
1574                if self.params.autoinc {
1575                    let (state, ids) =
1576                        self.query(format!("select max(id) as id from {}", self.params.table));
1577                    return match state {
1578                        true => ids[0]["id"].clone(),
1579                        false => {
1580                            error!("{ids}");
1581                            JsonValue::from("")
1582                        }
1583                    };
1584                }
1585                data["id"].clone()
1586            }
1587            false => {
1588                error!("upsert失败: {result}");
1589                JsonValue::from("")
1590            }
1591        }
1592    }
1593
1594    fn update(&mut self, data: JsonValue) -> JsonValue {
1595        let mut values = vec![];
1596
1597        for (field, value) in data.entries() {
1598            if value.is_string() {
1599                values.push(format!(
1600                    "`{}` = '{}'",
1601                    field,
1602                    value.to_string().replace("'", "''")
1603                ));
1604            } else if value.is_array() || value.is_object() {
1605                if self.params.json[field].is_empty() {
1606                    values.push(format!("`{field}` = '{value}'"));
1607                } else {
1608                    let json = value.to_string();
1609                    let json = json.replace("'", "''");
1610                    values.push(format!("`{field}` = '{json}'"));
1611                }
1612                continue;
1613            } else if value.is_number() || value.is_boolean() || value.is_null() {
1614                values.push(format!("`{field}` = {value} "));
1615            } else {
1616                values.push(format!("`{field}` = '{value}' "));
1617            }
1618        }
1619
1620        for (field, value) in self.params.inc_dec.entries() {
1621            values.push(format!("{} = {}", field, value.to_string().clone()));
1622        }
1623
1624        let values = values.join(",");
1625        let sql = format!(
1626            "UPDATE `{}` SET {} {} {};",
1627            self.params.table.clone(),
1628            values,
1629            self.params.where_sql(),
1630            self.params.page_limit_sql()
1631        );
1632        if self.params.sql {
1633            return JsonValue::from(sql.clone());
1634        }
1635        let (state, data) = self.execute(sql);
1636        if state {
1637            data
1638        } else {
1639            error!("{data}");
1640            JsonValue::from(0)
1641        }
1642    }
1643
1644    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1645        let mut values = vec![];
1646        let mut ids = vec![];
1647        for (field, _) in data[0].entries() {
1648            if field == "id" {
1649                continue;
1650            }
1651            let mut fields = vec![];
1652            for row in data.members() {
1653                let value = row[field].clone();
1654                let id = row["id"].clone();
1655                ids.push(id.clone());
1656                if value.is_string() {
1657                    fields.push(format!(
1658                        "WHEN '{}' THEN '{}'",
1659                        id,
1660                        value.to_string().replace("'", "''")
1661                    ));
1662                } else if value.is_array() || value.is_object() {
1663                    if self.params.json[field].is_empty() {
1664                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1665                    } else {
1666                        let json = value.to_string();
1667                        let json = json.replace("'", "''");
1668                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1669                    }
1670                    continue;
1671                } else if value.is_number() || value.is_boolean() || value.is_null() {
1672                    fields.push(format!("WHEN '{id}' THEN {value}"));
1673                } else {
1674                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1675                }
1676            }
1677            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1678        }
1679
1680        self.where_and("id", "in", ids.into());
1681        for (field, value) in self.params.inc_dec.entries() {
1682            values.push(format!("{} = {}", field, value.to_string().clone()));
1683        }
1684
1685        let values = values.join(",");
1686        let sql = format!(
1687            "UPDATE {} SET {} {} {};",
1688            self.params.table.clone(),
1689            values,
1690            self.params.where_sql(),
1691            self.params.page_limit_sql()
1692        );
1693        if self.params.sql {
1694            return JsonValue::from(sql.clone());
1695        }
1696        let (state, data) = self.execute(sql);
1697        if state {
1698            data
1699        } else {
1700            error!("{data:?}");
1701            JsonValue::from(0)
1702        }
1703    }
1704    fn delete(&mut self) -> JsonValue {
1705        let sql = format!(
1706            "delete FROM `{}` {};",
1707            self.params.table.clone(),
1708            self.params.where_sql()
1709        );
1710        if self.params.sql {
1711            return JsonValue::from(sql.clone());
1712        }
1713        let (state, data) = self.execute(sql);
1714        match state {
1715            true => data,
1716            false => {
1717                error!("delete 失败>>>{data:?}");
1718                JsonValue::from(0)
1719            }
1720        }
1721    }
1722
1723    fn transaction(&mut self) -> bool {
1724        let thread_id = format!("{:?}", thread::current().id());
1725
1726        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1727            let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1728            SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
1729            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1730            let _ = self.query(sp);
1731            return true;
1732        }
1733
1734        if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
1735            error!("{thread_id} 启动事务失败: 获取写锁超时");
1736            return false;
1737        }
1738
1739        let flags = OpenFlags::new().with_read_write().with_no_mutex();
1740        let db = match Connect::open_thread_safe_with_flags(
1741            self.connection.clone().get_dsn().as_str(),
1742            flags,
1743        ) {
1744            Ok(e) => Arc::new(e),
1745            Err(e) => {
1746                error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
1747                SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
1748                return false;
1749            }
1750        };
1751
1752        SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
1753
1754        let (state, data) = self.query("BEGIN".to_string());
1755        if state {
1756            true
1757        } else {
1758            error!("{thread_id} 启动事务失败: {data}");
1759            SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1760            false
1761        }
1762    }
1763
1764    fn commit(&mut self) -> bool {
1765        let thread_id = format!("{:?}", thread::current().id());
1766
1767        if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1768            error!("{thread_id} 提交事务失败: 没有活跃的事务");
1769            return false;
1770        }
1771
1772        let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1773        if depth > 1 {
1774            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1775            let _ = self.query(sp);
1776            SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1777            return true;
1778        }
1779
1780        let (state, _) = self.query("COMMIT".to_string());
1781        SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1782
1783        if state {
1784            true
1785        } else {
1786            error!("{thread_id} 提交事务失败");
1787            false
1788        }
1789    }
1790
1791    fn rollback(&mut self) -> bool {
1792        let thread_id = format!("{:?}", thread::current().id());
1793
1794        if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1795            error!("{thread_id} 回滚失败: 没有活跃的事务");
1796            return false;
1797        }
1798
1799        let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1800        if depth > 1 {
1801            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1802            let _ = self.query(sp);
1803            SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1804            return true;
1805        }
1806
1807        let (state, _) = self.query("ROLLBACK".to_string());
1808        SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1809
1810        if state {
1811            true
1812        } else {
1813            error!("回滚失败: {thread_id}");
1814            false
1815        }
1816    }
1817
1818    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1819        let (state, data) = self.query(sql.to_string());
1820        match state {
1821            true => Ok(data),
1822            false => Err(data.to_string()),
1823        }
1824    }
1825
1826    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1827        let (state, data) = self.execute(sql.to_string());
1828        match state {
1829            true => Ok(data),
1830            false => Err(data.to_string()),
1831        }
1832    }
1833
1834    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1835        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1836        self
1837    }
1838
1839    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1840        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1841        self
1842    }
1843
1844    fn buildsql(&mut self) -> String {
1845        self.fetch_sql();
1846        let sql = self.select().to_string();
1847        format!("( {} ) `{}`", sql, self.params.table)
1848    }
1849
1850    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1851        for field in fields {
1852            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1853        }
1854        self
1855    }
1856
1857    fn join(
1858        &mut self,
1859        main_table: &str,
1860        main_fields: &str,
1861        right_table: &str,
1862        right_fields: &str,
1863    ) -> &mut Self {
1864        let main_table = if main_table.is_empty() {
1865            self.params.table.clone()
1866        } else {
1867            main_table.to_string()
1868        };
1869        self.params.join_table = right_table.to_string();
1870        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1871        self
1872    }
1873
1874    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1875        let main_fields = if main_fields.is_empty() {
1876            "id"
1877        } else {
1878            main_fields
1879        };
1880        let second_fields = if second_fields.is_empty() {
1881            self.params.table.clone()
1882        } else {
1883            second_fields.to_string().clone()
1884        };
1885        let sec_table_name = format!("{}{}", table, "_2");
1886        let second_table = format!("{} {}", table, sec_table_name.clone());
1887        self.params.join_table = sec_table_name.clone();
1888        self.params.join.push(format!(
1889            " INNER JOIN {} ON {}.{} = {}.{}",
1890            second_table, self.params.table, main_fields, sec_table_name, second_fields
1891        ));
1892        self
1893    }
1894
1895    fn join_right(
1896        &mut self,
1897        main_table: &str,
1898        main_fields: &str,
1899        right_table: &str,
1900        right_fields: &str,
1901    ) -> &mut Self {
1902        let main_table = if main_table.is_empty() {
1903            self.params.table.clone()
1904        } else {
1905            main_table.to_string()
1906        };
1907        self.params.join_table = right_table.to_string();
1908        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1909        self
1910    }
1911
1912    fn join_full(
1913        &mut self,
1914        main_table: &str,
1915        main_fields: &str,
1916        right_table: &str,
1917        right_fields: &str,
1918    ) -> &mut Self {
1919        let main_table = if main_table.is_empty() {
1920            self.params.table.clone()
1921        } else {
1922            main_table.to_string()
1923        };
1924        self.params.join_table = right_table.to_string();
1925        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1926        self
1927    }
1928
1929    fn union(&mut self, sub_sql: &str) -> &mut Self {
1930        self.params.unions.push(format!("UNION {sub_sql}"));
1931        self
1932    }
1933
1934    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1935        self.params.unions.push(format!("UNION ALL {sub_sql}"));
1936        self
1937    }
1938
1939    fn lock_for_update(&mut self) -> &mut Self {
1940        self.params.lock_mode = "FOR UPDATE".to_string();
1941        self
1942    }
1943
1944    fn lock_for_share(&mut self) -> &mut Self {
1945        self.params.lock_mode = "FOR SHARE".to_string();
1946        self
1947    }
1948}