Skip to main content

br_db/types/
sqlite.rs

1use crate::pools;
2use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::Connection;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15
16lazy_static! {
17    static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
18    static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
19    static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
20}
21
22#[derive(Clone, Debug)]
23pub struct Sqlite {
24    /// 当前连接配置
25    pub connection: Connection,
26    /// 当前选中配置
27    pub default: String,
28
29    pub params: Params,
30}
31
32impl Sqlite {
33    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
34        let dsn = connection.clone().get_dsn();
35        let db_path = Path::new(&dsn);
36        if let Some(parent) = db_path.parent() {
37            if !parent.as_os_str().is_empty() && !parent.exists() {
38                if let Err(e) = std::fs::create_dir_all(parent) {
39                    error!("sqlite 创建目录失败: {} {:?}", e, parent);
40                    return Err(e.to_string());
41                }
42                info!("sqlite 自动创建目录: {:?}", parent);
43            }
44        }
45
46        let flags = OpenFlags::new().with_create().with_read_write();
47        match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
48            Ok(e) => {
49                if let Ok(mut guard) = DBS.lock() {
50                    guard.insert(default.clone(), Arc::new(e));
51                } else {
52                    error!("sqlite 获取数据库锁失败");
53                    return Err("获取数据库锁失败".to_string());
54                }
55                Ok(Self {
56                    connection: connection.clone(),
57                    default: default.clone(),
58                    params: Params::default("sqlite"),
59                })
60            }
61            Err(e) => {
62                error!("sqlite 启动失败: {} {}", e, dsn.as_str());
63                Err(e.to_string())
64            }
65        }
66    }
67    /// Static helper to process query results - can be called from closures
68    fn query_handle_static(
69        mut statement: Statement,
70        sql: &str,
71        table: &str,
72        thread_id: &str,
73    ) -> (bool, JsonValue) {
74        let mut data = array![];
75        while let State::Row = match statement.next() {
76            Ok(e) => e,
77            Err(e) => {
78                let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
79                if in_transaction {
80                    error!("{} 查询事务: {} {}", thread_id, e, sql);
81                } else {
82                    error!("{} 非事务查询: {} {}", thread_id, e, sql);
83                }
84                return (false, data);
85            }
86        } {
87            let mut list = object! {};
88            let mut index = 0;
89            for field in statement.column_names().iter() {
90                if !list[field.as_str()].is_null() {
91                    index += 1;
92                    continue;
93                }
94                match statement.column_type(field.as_str()) {
95                    Ok(types) => match types {
96                        Type::String => {
97                            if let Ok(data) = statement.read::<String, _>(index) {
98                                match data.as_str() {
99                                    "false" => {
100                                        list[field.as_str()] = JsonValue::from(false);
101                                    }
102                                    "true" => {
103                                        list[field.as_str()] = JsonValue::from(true);
104                                    }
105                                    _ => {
106                                        list[field.as_str()] = JsonValue::from(data.clone());
107                                    }
108                                }
109                            }
110                        }
111                        Type::Integer => {
112                            let fields_cache =
113                                FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
114                            if let Some(fields) = fields_cache {
115                                if fields[field.clone()].is_empty() {
116                                    if let Ok(data) = statement.read::<i64, _>(index) {
117                                        list[field.as_str()] = data.into();
118                                    }
119                                    index += 1;
120                                    continue;
121                                }
122                                let field_type =
123                                    fields[field.clone()]["type"].as_str().unwrap_or("");
124                                match field_type {
125                                    "INTEGER" => {
126                                        if let Ok(data) = statement.read::<i64, _>(index) {
127                                            list[field.as_str()] = JsonValue::from(data == 1);
128                                        }
129                                    }
130                                    x if x.contains("int(") => {
131                                        if let Ok(data) = statement.read::<i64, _>(index) {
132                                            list[field.as_str()] = data.into();
133                                        }
134                                    }
135                                    x if x.contains("decimal(") && x.ends_with(",0)") => {
136                                        if let Ok(data) = statement.read::<f64, _>(index) {
137                                            list[field.as_str()] = data.into();
138                                        }
139                                    }
140                                    _ => {
141                                        if let Ok(data) = statement.read::<i64, _>(index) {
142                                            list[field.as_str()] = data.into();
143                                        }
144                                    }
145                                }
146                            } else if let Ok(data) = statement.read::<i64, _>(index) {
147                                list[field.as_str()] = JsonValue::from(data);
148                            }
149                        }
150                        Type::Float => {
151                            if let Ok(data) = statement.read::<f64, _>(index) {
152                                list[field.as_str()] = JsonValue::from(data);
153                            }
154                        }
155                        Type::Binary => {
156                            if let Ok(data) = statement.read::<String, _>(index) {
157                                list[field.as_str()] = JsonValue::from(data.clone());
158                            }
159                        }
160                        Type::Null => match statement.read::<String, _>(index) {
161                            Ok(data) => {
162                                list[field.as_str()] = JsonValue::from(data.clone());
163                            }
164                            Err(_) => match statement.read::<f64, _>(index) {
165                                Ok(data) => {
166                                    if data == 0.0 {
167                                        list[field.as_str()] = JsonValue::from("");
168                                    } else {
169                                        list[field.as_str()] = JsonValue::from(data);
170                                    }
171                                }
172                                Err(_) => match statement.read::<i64, _>(index) {
173                                    Ok(data) => {
174                                        if data == 0 {
175                                            list[field.as_str()] = JsonValue::from("");
176                                        } else {
177                                            list[field.as_str()] = JsonValue::from(data);
178                                        }
179                                    }
180                                    Err(e) => {
181                                        error!("Type:{} {:?}", field.as_str(), e);
182                                    }
183                                },
184                            },
185                        },
186                    },
187                    Err(e) => {
188                        error!("query Err: {e:?}");
189                    }
190                }
191                index += 1;
192            }
193            let _ = data.push(list);
194        }
195        (true, data)
196    }
197
198    pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
199        let thread_id = format!("{:?}", thread::current().id());
200        let table = self.params.table.clone();
201
202        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
203            if self.connection.debug {
204                info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
205            }
206
207            let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
208                match db.prepare(sql.clone()) {
209                    Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
210                    Err(e) => {
211                        error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
212                        (false, e.to_string().into())
213                    }
214                }
215            });
216
217            match result {
218                Some(r) => r,
219                None => {
220                    error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
221                    (false, JsonValue::from("未找到事务连接"))
222                }
223            }
224        } else {
225            if self.connection.debug {
226                info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
227            }
228            let dbs = match DBS.lock() {
229                Ok(dbs) => dbs,
230                Err(e) => {
231                    error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
232                    return (false, JsonValue::from("数据库锁定失败"));
233                }
234            };
235            let db = match dbs.get(&self.default) {
236                Some(db) => db.clone(),
237                None => {
238                    error!(
239                        "{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
240                        self.default
241                    );
242                    return (false, JsonValue::from("未找到默认数据库配置"));
243                }
244            };
245            drop(dbs);
246            let result = match db.prepare(sql.clone()) {
247                Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
248                Err(e) => {
249                    error!("{thread_id} 查询非事务: Err: {e}");
250                    (false, e.to_string().into())
251                }
252            };
253            result
254        }
255    }
256
257    pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
258        let thread_id = format!("{:?}", thread::current().id());
259
260        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
261            if self.connection.debug {
262                info!("{} 执行事务: sql: {}", thread_id, sql.clone());
263            }
264
265            let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
266                match db.execute(sql.clone()) {
267                    Ok(_) => {
268                        let count = db.change_count();
269                        (true, JsonValue::from(count))
270                    }
271                    Err(e) => (false, JsonValue::from(e.to_string())),
272                }
273            });
274
275            match result {
276                Some((true, count)) => {
277                    if self.connection.debug {
278                        info!("{} count:{}", thread_id, count);
279                    }
280                    (true, count)
281                }
282                Some((false, err)) => {
283                    error!(
284                        "{} 执行事务: \r\nErr: {}\r\n{}",
285                        thread_id,
286                        err,
287                        sql.clone()
288                    );
289                    (false, err)
290                }
291                None => {
292                    error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
293                    (false, JsonValue::from("未找到事务连接"))
294                }
295            }
296        } else {
297            if self.connection.debug {
298                info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
299            }
300            let dbs = match DBS.lock() {
301                Ok(dbs) => dbs,
302                Err(e) => {
303                    error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
304                    return (false, JsonValue::from("数据库锁定失败"));
305                }
306            };
307
308            let db = match dbs.get(&self.default) {
309                Some(db) => db.clone(),
310                None => {
311                    error!(
312                        "{} 未找到默认数据库配置: {}\r\nSQL: {}",
313                        thread_id, self.default, sql
314                    );
315                    return (false, JsonValue::from("未找到默认数据库配置"));
316                }
317            };
318            drop(dbs);
319            match db.execute(sql.clone()) {
320                Ok(_) => {
321                    let count = db.change_count();
322                    if self.connection.debug {
323                        info!(
324                            "{} count: {} total_count: {}",
325                            thread_id,
326                            count,
327                            db.total_change_count()
328                        );
329                    }
330                    (true, JsonValue::from(count))
331                }
332                Err(e) => {
333                    error!(
334                        "{} 执行非事务: Err: {}\r\nSQL: {}",
335                        thread_id,
336                        e,
337                        sql.clone()
338                    );
339                    (false, JsonValue::from(e.to_string()))
340                }
341            }
342        }
343    }
344}
345
346impl DbMode for Sqlite {
347    fn database_tables(&mut self) -> JsonValue {
348        let sql = "select name from sqlite_master where type='table' order by name;".to_string();
349        match self.sql(sql.as_str()) {
350            Ok(e) => {
351                let mut list = vec![];
352                for item in e.members() {
353                    list.push(item["name"].clone());
354                }
355                list.into()
356            }
357            Err(_) => {
358                array![]
359            }
360        }
361    }
362    fn database_create(&mut self, name: &str) -> bool {
363        let current_dsn = self.connection.clone().get_dsn();
364        let current_path = Path::new(&current_dsn);
365
366        let new_path = if let Some(parent) = current_path.parent() {
367            if parent.as_os_str().is_empty() {
368                Path::new(name).to_path_buf()
369            } else {
370                parent.join(name)
371            }
372        } else {
373            Path::new(name).to_path_buf()
374        };
375
376        if let Some(parent) = new_path.parent() {
377            if !parent.as_os_str().is_empty() && !parent.exists() {
378                if let Err(e) = std::fs::create_dir_all(parent) {
379                    error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
380                    return false;
381                }
382            }
383        }
384
385        let flags = OpenFlags::new().with_create().with_read_write();
386        match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
387            Ok(_) => true,
388            Err(e) => {
389                error!(
390                    "sqlite database_create 创建数据库失败: {} {:?}",
391                    e, new_path
392                );
393                false
394            }
395        }
396    }
397
398    fn truncate(&mut self, table: &str) -> bool {
399        let sql = format!("DELETE FROM `{table}`");
400        let (state, _) = self.execute(sql);
401        state
402    }
403}
404
405impl Mode for Sqlite {
406    fn table_create(&mut self, options: TableOptions) -> JsonValue {
407        let mut sql = String::new();
408        let mut unique_fields = String::new();
409        let mut unique_name = String::new();
410        let mut unique = String::new();
411        for item in options.table_unique.iter() {
412            if unique_fields.is_empty() {
413                unique_fields = format!("`{item}`");
414                unique_name = format!("unique_{item}");
415            } else {
416                unique_fields = format!("{unique_fields},`{item}`");
417                unique_name = format!("{unique_name}_{item}");
418            }
419            unique = format!(
420                "CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
421                unique_name, options.table_name, unique_fields
422            );
423        }
424
425        let mut index = vec![];
426        for row in options.table_index.iter() {
427            let mut index_fields = String::new();
428            let mut index_name = String::new();
429            for item in row.iter() {
430                if index_fields.is_empty() {
431                    index_fields = format!("`{item}`");
432                    index_name = format!("index_{item}");
433                } else {
434                    index_fields = format!("{index_fields},`{item}`");
435                    index_name = format!("{index_name}_{item}");
436                }
437            }
438            index.push(format!(
439                "CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
440                index_name, options.table_name, index_fields
441            ));
442        }
443
444        for (name, field) in options.table_fields.entries() {
445            let row = br_fields::field("sqlite", name, field.clone());
446            sql = format!("{sql} {row},\r\n");
447        }
448
449        sql = sql.trim_end_matches(",\r\n").to_string();
450
451        let sql = format!(
452            "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
453            options.table_name, sql
454        );
455        if self.params.sql {
456            let mut list = vec![sql];
457            if !unique.is_empty() {
458                list.push(unique)
459            }
460            if !index.is_empty() {
461                list.extend(index)
462            }
463
464            return JsonValue::from(list.join(""));
465        }
466
467        let thread_id = format!("{:?}", thread::current().id());
468
469        let (_, table_exists) = self.query(format!(
470            "SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
471            options.table_name
472        ));
473        let is_new_table = table_exists.is_empty();
474
475        let (state, _) = self.execute(sql.clone());
476        if state {
477            if is_new_table {
478                if !unique.is_empty() {
479                    let (state, _) = self.execute(unique.clone());
480                    info!(
481                        "{} {} 唯一索引创建:{}",
482                        thread_id, options.table_name, state
483                    );
484                }
485                for sql in index.iter() {
486                    let (state, _) = self.execute(sql.clone());
487                    info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
488                }
489            }
490            JsonValue::from(true)
491        } else {
492            JsonValue::from(false)
493        }
494    }
495    fn table_update(&mut self, options: TableOptions) -> JsonValue {
496        let thread_id = format!("{:?}", thread::current().id());
497
498        let mut sql = String::new();
499        let mut add = vec![];
500        let mut del = vec![];
501        let mut put = vec![];
502
503        let (_, mut fields_list) =
504            self.query(format!("pragma table_info ('{}')", options.table_name));
505        let mut field_old = object! {};
506        for item in fields_list.members_mut() {
507            item["dflt_value"] = item["dflt_value"]
508                .to_string()
509                .trim_start_matches("'")
510                .trim_end_matches("'")
511                .into();
512            if let Some(name) = item["name"].as_str() {
513                field_old[name] = item.clone();
514                if options.table_fields[name].is_empty() {
515                    del.push(name.to_string());
516                }
517            }
518        }
519
520        let mut fields_list = vec![];
521        let mut fields_list_new = vec![];
522
523        for (name, field) in options.table_fields.entries() {
524            if field_old[name].is_empty() {
525                add.push(name.to_string());
526            } else {
527                fields_list.push(name.to_string());
528                fields_list_new.push(name.to_string());
529                let field_mode = field["mode"].as_str().unwrap_or("");
530                let old_value = match field_mode {
531                    "select" => {
532                        if field_old[name]["dflt_value"].clone().is_empty() {
533                            "".to_string()
534                        } else {
535                            field_old[name]["dflt_value"].clone().to_string()
536                        }
537                    }
538                    "switch" => (field_old[name]["dflt_value"]
539                        .to_string()
540                        .parse::<i32>()
541                        .unwrap_or(0)
542                        == 1)
543                        .to_string(),
544                    _ => field_old[name]["dflt_value"].clone().to_string(),
545                };
546                let new_value = match field_mode {
547                    "select" => field["def"]
548                        .members()
549                        .map(|x| x.to_string())
550                        .collect::<Vec<String>>()
551                        .join(","),
552                    _ => field["def"].clone().to_string(),
553                };
554                if old_value != new_value {
555                    info!(
556                        "{} 差异化当前: {} old_value: {} new_value: {} {}",
557                        options.table_name,
558                        name,
559                        old_value,
560                        new_value,
561                        old_value != new_value
562                    );
563                    info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
564                    put.push(name.to_string());
565                } else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
566                    && name != options.table_key
567                {
568                    info!("{} 主键替换: {}", options.table_name, name);
569                    put.push(name.to_string());
570                }
571            }
572        }
573
574        let mut unique_fields = String::new();
575        let mut unique_name = String::new();
576        let mut unique = String::new();
577
578        // 唯一索引
579        for item in options.table_unique.iter() {
580            if unique_fields.is_empty() {
581                unique_fields = format!("`{item}`");
582                unique_name = format!("unique_{item}");
583            } else {
584                unique_fields = format!("{unique_fields},`{item}`");
585                unique_name = format!("{unique_name}_{item}");
586            }
587            unique = format!(
588                "CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
589                options.table_name, unique_name, options.table_name, unique_fields
590            )
591        }
592
593        // 索引
594        let mut index = vec![];
595        for row in options.table_index.iter() {
596            let mut index_fields = String::new();
597            let mut index_name = String::new();
598            for item in row.iter() {
599                if index_fields.is_empty() {
600                    index_fields = item.to_string();
601                    index_name = format!("index_{item}");
602                } else {
603                    index_fields = format!("{index_fields},{item}");
604                    index_name = format!("{index_name}_{item}");
605                }
606            }
607            index.push(format!(
608                "CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
609                options.table_name, index_name, options.table_name, index_fields
610            ));
611        }
612        for (name, field) in options.table_fields.entries() {
613            let row = br_fields::field("sqlite", name, field.clone());
614            sql = format!("{sql} {row},\r\n");
615        }
616
617        if !unique.is_empty() || !index.is_empty() {
618            let unique_text = unique.clone();
619            let (_, unique_old) =
620                self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
621            let mut index_old_list = vec![];
622            let mut index_new_list = vec![];
623            for item in unique_old.members() {
624                let origin = item["origin"].as_str().unwrap_or("");
625                let unique_1 = item["unique"].as_usize().unwrap_or(0);
626                let name = item["name"].as_str().unwrap_or("");
627
628                if origin == "c" && unique_1 == 1 {
629                    if unique.contains(format!(" {name} ").as_str()) {
630                        unique = "".to_string();
631                    }
632                    continue;
633                }
634                if origin == "c" && unique_1 == 0 {
635                    index_old_list.push(item);
636                    for item in index.iter() {
637                        if item.contains(format!(" {name} ").as_str()) {
638                            index_new_list.push(item.clone());
639                        }
640                    }
641                    continue;
642                }
643            }
644            if unique.is_empty() {
645                if index_old_list.len() == index.len()
646                    && index_old_list.len() == index_new_list.len()
647                {
648                    index = vec![];
649                } else {
650                    unique = unique_text;
651                }
652            }
653        }
654
655        sql = sql.trim_end_matches(",\r\n").to_string();
656        sql = format!(
657            "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
658            options.table_name, sql
659        );
660
661        let sqls = format!(
662            "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
663            options.table_name,
664            fields_list_new.join("`,`"),
665            fields_list.join("`,`")
666        );
667        let drop_sql = format!("drop table {};\r\n", options.table_name);
668        let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
669        let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
670
671        if self.params.sql {
672            let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
673            if !unique.is_empty() {
674                list.push(unique)
675            }
676            if !index.is_empty() {
677                list.extend(index)
678            }
679            return JsonValue::from(list.join(""));
680        }
681
682        if add.is_empty()
683            && del.is_empty()
684            && unique.is_empty()
685            && index.is_empty()
686            && put.is_empty()
687        {
688            return JsonValue::from(-1);
689        }
690
691        let (state, _) = self.execute(sql.clone());
692        let data = match state {
693            true => {
694                let (state, _) = self.execute(sqls.clone());
695                match state {
696                    true => {
697                        let (state, _) = self.execute(drop_sql);
698                        match state {
699                            true => {
700                                let (state, _) = self.execute(alter_sql);
701                                match state {
702                                    true => {
703                                        if !unique.is_empty() {
704                                            let (state, _) = self.execute(unique.clone());
705                                            info!(
706                                                "{} {} 唯一索引创建:{}",
707                                                thread_id, options.table_name, state
708                                            );
709                                        }
710                                        for index_sql in index.iter() {
711                                            let (state, _) = self.execute(index_sql.clone());
712                                            match state {
713                                                true => {}
714                                                false => {
715                                                    error!(
716                                                        "{} 索引创建失败: {} {}",
717                                                        options.table_name, state, index_sql
718                                                    );
719                                                    return JsonValue::from(0);
720                                                }
721                                            };
722                                        }
723
724                                        return JsonValue::from(1);
725                                    }
726                                    false => {
727                                        error!("{} 修改表名失败", options.table_name);
728                                        return JsonValue::from(0);
729                                    }
730                                }
731                            }
732                            false => {
733                                error!("{} 删除本表失败", options.table_name);
734                                return JsonValue::from(0);
735                            }
736                        }
737                    }
738                    false => {
739                        error!(
740                            "{} 添加tmp表记录失败 {:#} {:#}",
741                            options.table_name, sql, sqls
742                        );
743                        let sql = format!("drop table {}_tmp", options.table_name);
744                        let (_, _) = self.execute(sql);
745                        0
746                    }
747                }
748            }
749            false => {
750                error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
751                let (_, _) = self.execute(drop_sql_temp);
752                0
753            }
754        };
755        JsonValue::from(data)
756    }
757
758    fn table_info(&mut self, table: &str) -> JsonValue {
759        if let Ok(guard) = FIELDS.lock() {
760            if let Some(cached) = guard.get(table) {
761                return cached.clone();
762            }
763        }
764        let sql = format!("PRAGMA table_info({table})");
765        let (state, data) = self.query(sql);
766
767        match state {
768            true => {
769                let mut fields = object! {};
770                for item in data.members() {
771                    if let Some(name) = item["name"].as_str() {
772                        fields[name] = item.clone();
773                    }
774                }
775                if let Ok(mut guard) = FIELDS.lock() {
776                    guard.insert(table.to_string(), fields.clone());
777                }
778                data
779            }
780            false => object! {},
781        }
782    }
783
784    fn table_is_exist(&mut self, name: &str) -> bool {
785        let sql = format!(
786            "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
787        );
788        let (state, data) = self.query(sql);
789        if state && !data.is_empty() {
790            let count = data[0]["count"].as_i64().unwrap_or(0);
791            return count > 0;
792        }
793        false
794    }
795
796    fn table(&mut self, name: &str) -> &mut Sqlite {
797        self.params = Params::default(self.connection.mode.str().as_str());
798        let table_name = format!("{}{}", self.connection.prefix, name);
799        if !super::sql_safety::validate_table_name(&table_name) {
800            error!("Invalid table name: {}", name);
801        }
802        self.params.table = table_name.clone();
803        self.params.join_table = table_name;
804        self
805    }
806    fn change_table(&mut self, name: &str) -> &mut Self {
807        self.params.join_table = name.to_string();
808        self
809    }
810    fn autoinc(&mut self) -> &mut Self {
811        self.params.autoinc = true;
812        self
813    }
814
815    fn timestamps(&mut self) -> &mut Self {
816        self.params.timestamps = true;
817        self
818    }
819
820    fn fetch_sql(&mut self) -> &mut Self {
821        self.params.sql = true;
822        self
823    }
824
825    fn order(&mut self, field: &str, by: bool) -> &mut Self {
826        self.params.order[field] = {
827            if by {
828                "DESC"
829            } else {
830                "ASC"
831            }
832        }
833        .into();
834        self
835    }
836
837    fn group(&mut self, field: &str) -> &mut Self {
838        let fields: Vec<&str> = field.split(",").collect();
839        for field in fields.iter() {
840            let fields = field.to_string();
841            self.params.group[fields.as_str()] = fields.clone().into();
842            self.params.fields[fields.as_str()] = fields.clone().into();
843        }
844        self
845    }
846
847    fn distinct(&mut self) -> &mut Self {
848        self.params.distinct = true;
849        self
850    }
851    fn json(&mut self, field: &str) -> &mut Self {
852        let list: Vec<&str> = field.split(",").collect();
853        for item in list.iter() {
854            self.params.json[item.to_string().as_str()] = item.to_string().into();
855        }
856        self
857    }
858
859    fn location(&mut self, field: &str) -> &mut Self {
860        let list: Vec<&str> = field.split(",").collect();
861        for item in list.iter() {
862            self.params.location[item.to_string().as_str()] = item.to_string().into();
863        }
864        self
865    }
866
867    fn field(&mut self, field: &str) -> &mut Self {
868        let list: Vec<&str> = field.split(",").collect();
869        let join_table = if self.params.join_table.is_empty() {
870            self.params.table.clone()
871        } else {
872            self.params.join_table.clone()
873        };
874        for item in list.iter() {
875            let item = item.to_string();
876            if item.contains(" as ") {
877                let text = item.split(" as ").collect::<Vec<&str>>().clone();
878                self.params.fields[item] =
879                    format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
880            } else {
881                self.params.fields[item] = format!("{join_table}.`{item}`").into();
882            }
883        }
884        self
885    }
886
887    fn field_raw(&mut self, expr: &str) -> &mut Self {
888        self.params.fields[expr] = expr.into();
889        self
890    }
891
892    fn hidden(&mut self, name: &str) -> &mut Self {
893        let hidden: Vec<&str> = name.split(",").collect();
894        let sql = format!("PRAGMA table_info({})", self.params.table);
895        let (_, data) = self.query(sql);
896        for item in data.members() {
897            if let Some(name) = item["name"].as_str() {
898                if !hidden.contains(&name) {
899                    self.params.fields[name] = name.into();
900                }
901            }
902        }
903        self
904    }
905
906    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
907        for f in field.split('|') {
908            if !super::sql_safety::validate_field_name(f) {
909                error!("Invalid field name: {}", f);
910            }
911        }
912        if !super::sql_safety::validate_compare_orator(compare) {
913            error!("Invalid compare operator: {}", compare);
914        }
915        let join_table = if self.params.join_table.is_empty() {
916            self.params.table.clone()
917        } else {
918            self.params.join_table.clone()
919        };
920
921        if value.is_boolean() {
922            if value.as_bool().unwrap_or(false) {
923                value = 1.into();
924            } else {
925                value = 0.into();
926            }
927        }
928
929        match compare {
930            "between" => {
931                self.params.where_and.push(format!(
932                    "{}.`{}` between '{}' AND '{}'",
933                    join_table, field, value[0], value[1]
934                ));
935            }
936            "set" => {
937                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
938                let mut wheredata = vec![];
939                for item in list.iter() {
940                    wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
941                }
942                self.params
943                    .where_and
944                    .push(format!("({})", wheredata.join(" or ")));
945            }
946            "notin" => {
947                let mut text = String::new();
948                for item in value.members() {
949                    text = format!("{text},'{item}'");
950                }
951                text = text.trim_start_matches(",").into();
952                self.params
953                    .where_and
954                    .push(format!("{join_table}.`{field}` not in ({text})"));
955            }
956            "in" => {
957                let mut text = String::new();
958                if value.is_array() {
959                    for item in value.members() {
960                        text = format!("{text},'{item}'");
961                    }
962                } else {
963                    let value = value.to_string();
964                    let value: Vec<&str> = value.split(",").collect();
965                    for item in value.iter() {
966                        text = format!("{text},'{item}'");
967                    }
968                }
969                text = text.trim_start_matches(",").into();
970
971                self.params
972                    .where_and
973                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
974            }
975            "=" => {
976                if value.is_null() {
977                    self.params
978                        .where_and
979                        .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
980                } else {
981                    self.params
982                        .where_and
983                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
984                }
985            }
986            // JSON 数组包含查询:LIKE 模糊匹配(SQLite 无原生 JSON 函数)
987            // 用法:.where_and("tags", "json_contains", "紧急".into())
988            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
989            "json_contains" => {
990                if value.is_array() {
991                    if value.is_empty() {
992                        self.params.where_and.push("1=0".to_string());
993                    } else {
994                        let mut parts = vec![];
995                        for item in value.members() {
996                            let escaped = super::sql_safety::escape_string(&item.to_string());
997                            parts.push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
998                        }
999                        self.params
1000                            .where_and
1001                            .push(format!("({})", parts.join(" OR ")));
1002                    }
1003                } else {
1004                    let escaped = super::sql_safety::escape_string(&value.to_string());
1005                    self.params
1006                        .where_and
1007                        .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1008                }
1009            }
1010            _ => {
1011                if value.is_null() {
1012                    self.params
1013                        .where_and
1014                        .push(format!("{join_table}.`{field}` {compare} {value}"));
1015                } else {
1016                    self.params
1017                        .where_and
1018                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1019                }
1020            }
1021        }
1022        self
1023    }
1024    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1025        for f in field.split('|') {
1026            if !super::sql_safety::validate_field_name(f) {
1027                error!("Invalid field name: {}", f);
1028            }
1029        }
1030        if !super::sql_safety::validate_compare_orator(compare) {
1031            error!("Invalid compare operator: {}", compare);
1032        }
1033        let join_table = if self.params.join_table.is_empty() {
1034            self.params.table.clone()
1035        } else {
1036            self.params.join_table.clone()
1037        };
1038
1039        if value.is_boolean() {
1040            if value.as_bool().unwrap_or(false) {
1041                value = 1.into();
1042            } else {
1043                value = 0.into();
1044            }
1045        }
1046        match compare {
1047            "between" => {
1048                self.params.where_or.push(format!(
1049                    "{}.`{}` between '{}' AND '{}'",
1050                    join_table, field, value[0], value[1]
1051                ));
1052            }
1053            "set" => {
1054                let tt = value.to_string().replace(",", "%");
1055                self.params
1056                    .where_or
1057                    .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1058            }
1059            "notin" => {
1060                let mut text = String::new();
1061                for item in value.members() {
1062                    text = format!("{text},'{item}'");
1063                }
1064                text = text.trim_start_matches(",").into();
1065                self.params
1066                    .where_or
1067                    .push(format!("{join_table}.`{field}` not in ({text})"));
1068            }
1069            "in" => {
1070                let mut text = String::new();
1071                if value.is_array() {
1072                    for item in value.members() {
1073                        text = format!("{text},'{item}'");
1074                    }
1075                } else {
1076                    let value = value.as_str().unwrap_or("");
1077                    let value: Vec<&str> = value.split(",").collect();
1078                    for item in value.iter() {
1079                        text = format!("{text},'{item}'");
1080                    }
1081                }
1082                text = text.trim_start_matches(",").into();
1083                self.params
1084                    .where_or
1085                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1086            }
1087            // JSON 数组包含查询:LIKE 模糊匹配(SQLite 无原生 JSON 函数)
1088            // 用法:.where_or("tags", "json_contains", "紧急".into())
1089            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1090            "json_contains" => {
1091                if value.is_array() {
1092                    if value.is_empty() {
1093                        self.params.where_or.push("1=0".to_string());
1094                    } else {
1095                        let mut parts = vec![];
1096                        for item in value.members() {
1097                            let escaped = super::sql_safety::escape_string(&item.to_string());
1098                            parts.push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
1099                        }
1100                        self.params
1101                            .where_or
1102                            .push(format!("({})", parts.join(" OR ")));
1103                    }
1104                } else {
1105                    let escaped = super::sql_safety::escape_string(&value.to_string());
1106                    self.params
1107                        .where_or
1108                        .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1109                }
1110            }
1111            _ => {
1112                self.params
1113                    .where_or
1114                    .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1115            }
1116        }
1117        self
1118    }
1119    fn where_raw(&mut self, expr: &str) -> &mut Self {
1120        self.params.where_and.push(expr.to_string());
1121        self
1122    }
1123
1124    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1125        self.params
1126            .where_and
1127            .push(format!("`{field}` IN ({sub_sql})"));
1128        self
1129    }
1130
1131    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1132        self.params
1133            .where_and
1134            .push(format!("`{field}` NOT IN ({sub_sql})"));
1135        self
1136    }
1137
1138    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1139        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1140        self
1141    }
1142
1143    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1144        self.params
1145            .where_and
1146            .push(format!("NOT EXISTS ({sub_sql})"));
1147        self
1148    }
1149
1150    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1151        self.params.where_column = format!(
1152            "{}.`{}` {} {}.`{}`",
1153            self.params.table, field_a, compare, self.params.table, field_b
1154        );
1155        self
1156    }
1157
1158    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1159        self.params
1160            .update_column
1161            .push(format!("{field_a} = {compare}"));
1162        self
1163    }
1164
1165    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1166        self.params.page = page;
1167        self.params.limit = limit;
1168        self
1169    }
1170
1171    fn limit(&mut self, count: i32) -> &mut Self {
1172        self.params.limit_only = count;
1173        self
1174    }
1175
1176    fn column(&mut self, field: &str) -> JsonValue {
1177        self.field(field);
1178        self.group(field);
1179        let sql = self.params.select_sql();
1180        if self.params.sql {
1181            return JsonValue::from(sql.clone());
1182        }
1183        self.table_info(self.params.table.clone().as_str());
1184        let (state, data) = self.query(sql);
1185        if state {
1186            let mut list = array![];
1187            for item in data.members() {
1188                if self.params.json[field].is_empty() {
1189                    let _ = list.push(item[field].clone());
1190                } else {
1191                    let data =
1192                        json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1193                    let _ = list.push(data);
1194                }
1195            }
1196            list
1197        } else {
1198            array![]
1199        }
1200    }
1201
1202    fn count(&mut self) -> JsonValue {
1203        self.params.fields["count"] = "count(*) as count".to_string().into();
1204        let sql = self.params.select_sql();
1205        if self.params.sql {
1206            return JsonValue::from(sql.clone());
1207        }
1208        let (state, data) = self.query(sql);
1209        match state {
1210            true => data[0]["count"].clone(),
1211            false => JsonValue::from(0),
1212        }
1213    }
1214
1215    fn max(&mut self, field: &str) -> JsonValue {
1216        self.params.fields[field] = format!("max({field}) as {field}").into();
1217        let sql = self.params.select_sql();
1218        if self.params.sql {
1219            return JsonValue::from(sql.clone());
1220        }
1221        let (state, data) = self.query(sql);
1222        if state {
1223            if data.len() > 1 {
1224                return data;
1225            }
1226            data[0][field].clone()
1227        } else {
1228            JsonValue::from(0.0)
1229        }
1230    }
1231
1232    fn min(&mut self, field: &str) -> JsonValue {
1233        self.params.fields[field] = format!("min({field}) as {field}").into();
1234        let sql = self.params.select_sql();
1235        let (state, data) = self.query(sql);
1236        if state {
1237            if data.len() > 1 {
1238                return data;
1239            }
1240            data[0][field].clone()
1241        } else {
1242            JsonValue::from(0.0)
1243        }
1244    }
1245
1246    fn sum(&mut self, field: &str) -> JsonValue {
1247        self.params.fields[field] = format!("sum({field}) as {field}").into();
1248        let sql = self.params.select_sql();
1249        if self.params.sql {
1250            return JsonValue::from(sql.clone());
1251        }
1252        let (state, data) = self.query(sql);
1253        match state {
1254            true => {
1255                if data.len() > 1 {
1256                    return data;
1257                }
1258                if self.params.fields.len() > 1 {
1259                    return data[0].clone();
1260                }
1261                data[0][field].clone()
1262            }
1263            false => JsonValue::from(0),
1264        }
1265    }
1266    fn avg(&mut self, field: &str) -> JsonValue {
1267        self.params.fields[field] = format!("avg({field}) as {field}").into();
1268        let sql = self.params.select_sql();
1269        if self.params.sql {
1270            return JsonValue::from(sql.clone());
1271        }
1272        let (state, data) = self.query(sql);
1273        if state {
1274            if data.len() > 1 {
1275                return data;
1276            }
1277            data[0][field].clone()
1278        } else {
1279            JsonValue::from(0)
1280        }
1281    }
1282    fn having(&mut self, expr: &str) -> &mut Self {
1283        self.params.having.push(expr.to_string());
1284        self
1285    }
1286    fn select(&mut self) -> JsonValue {
1287        let sql = self.params.select_sql();
1288        if self.params.sql {
1289            return JsonValue::from(sql.clone());
1290        }
1291        self.table_info(self.params.table.clone().as_str());
1292        let (state, mut data) = self.query(sql.clone());
1293        match state {
1294            true => {
1295                for (field, _) in self.params.json.entries() {
1296                    for item in data.members_mut() {
1297                        if !item[field].is_empty() {
1298                            let json = item[field].to_string();
1299                            item[field] = match json::parse(&json) {
1300                                Ok(e) => e,
1301                                Err(_) => JsonValue::from(json),
1302                            };
1303                        }
1304                    }
1305                }
1306                data.clone()
1307            }
1308            false => {
1309                error!("{data:?}");
1310                array![]
1311            }
1312        }
1313    }
1314    fn find(&mut self) -> JsonValue {
1315        self.params.page = 1;
1316        self.params.limit = 1;
1317        let sql = self.params.select_sql();
1318        if self.params.sql {
1319            return JsonValue::from(sql.clone());
1320        }
1321
1322        self.table_info(self.params.table.clone().as_str());
1323        let (state, mut data) = self.query(sql.clone());
1324        match state {
1325            true => {
1326                if data.is_empty() {
1327                    return object! {};
1328                }
1329                for (field, _) in self.params.json.entries() {
1330                    if !data[0][field].is_empty() {
1331                        let json = data[0][field].to_string();
1332                        let json = json::parse(&json).unwrap_or(array![]);
1333                        data[0][field] = json;
1334                    } else {
1335                        data[0][field] = array![];
1336                    }
1337                }
1338                data[0].clone()
1339            }
1340            false => {
1341                error!("{data:?}");
1342                object! {}
1343            }
1344        }
1345    }
1346
1347    fn value(&mut self, field: &str) -> JsonValue {
1348        self.params.fields = object! {};
1349        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1350        self.params.page = 1;
1351        self.params.limit = 1;
1352        let sql = self.params.select_sql();
1353        if self.params.sql {
1354            return JsonValue::from(sql.clone());
1355        }
1356        self.table_info(self.params.table.clone().as_str());
1357        let (state, mut data) = self.query(sql.clone());
1358        match state {
1359            true => {
1360                for (field, _) in self.params.json.entries() {
1361                    if !data[0][field].is_empty() {
1362                        let json = data[0][field].to_string();
1363                        let json = json::parse(&json).unwrap_or(array![]);
1364                        data[0][field] = json;
1365                    } else {
1366                        data[0][field] = array![];
1367                    }
1368                }
1369                data[0][field].clone()
1370            }
1371            false => {
1372                if self.connection.debug {
1373                    info!("{data:?}");
1374                }
1375                JsonValue::Null
1376            }
1377        }
1378    }
1379
1380    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1381        let mut fields = vec![];
1382        let mut values = vec![];
1383
1384        if !self.params.autoinc && data["id"].is_empty() {
1385            let thread_id = format!("{:?}", std::thread::current().id());
1386            let thread_num: u64 = thread_id
1387                .trim_start_matches("ThreadId(")
1388                .trim_end_matches(")")
1389                .parse()
1390                .unwrap_or(0);
1391            data["id"] = format!(
1392                "{:X}{:X}",
1393                Local::now().timestamp_nanos_opt().unwrap_or(0),
1394                thread_num
1395            )
1396            .into();
1397        }
1398        for (field, value) in data.entries() {
1399            fields.push(format!("`{field}`"));
1400
1401            if value.is_string() {
1402                if value.to_string().contains("'") {
1403                    values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1404                    continue;
1405                } else if value.to_string().contains('"') {
1406                    values.push(format!("'{value}'"));
1407                    continue;
1408                } else {
1409                    values.push(format!("\"{value}\""));
1410                    continue;
1411                }
1412            } else if value.is_array() || value.is_object() {
1413                if self.params.json[field].is_empty() {
1414                    values.push(format!("'{value}'"));
1415                } else {
1416                    let json = value.to_string();
1417                    let json = json.replace("'", "''");
1418                    values.push(format!("'{json}'"));
1419                }
1420                continue;
1421            } else if value.is_number() || value.is_boolean() || value.is_null() {
1422                values.push(format!("{value}"));
1423                continue;
1424            } else {
1425                values.push(format!("'{value}'"));
1426                continue;
1427            }
1428        }
1429        let fields = fields.join(",");
1430        let values = values.join(",");
1431
1432        let sql = format!(
1433            "INSERT INTO `{}` ({}) VALUES ({});",
1434            self.params.table, fields, values
1435        );
1436        if self.params.sql {
1437            return JsonValue::from(sql.clone());
1438        }
1439        let (state, ids) = self.execute(sql);
1440        match state {
1441            true => {
1442                if self.params.autoinc {
1443                    let (state, ids) =
1444                        self.query(format!("select max(id) as id from {}", self.params.table));
1445                    return match state {
1446                        true => ids[0]["id"].clone(),
1447                        false => {
1448                            error!("{ids}");
1449                            JsonValue::from("")
1450                        }
1451                    };
1452                }
1453                data["id"].clone()
1454            }
1455            false => {
1456                error!("{ids}");
1457                JsonValue::from("")
1458            }
1459        }
1460    }
1461    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1462        let mut fields = String::new();
1463
1464        if !self.params.autoinc && data[0]["id"].is_empty() {
1465            data[0]["id"] = "".into();
1466        }
1467        for (field, _) in data[0].entries() {
1468            fields = format!("{fields},`{field}`");
1469        }
1470        fields = fields.trim_start_matches(",").to_string();
1471
1472        let core_count = num_cpus::get();
1473        let mut p = pools::Pool::new(core_count * 4);
1474        let autoinc = self.params.autoinc;
1475        for list in data.members() {
1476            let mut item = list.clone();
1477            p.execute(move |pcindex| {
1478                if !autoinc && item["id"].is_empty() {
1479                    let id = format!(
1480                        "{:X}{:X}",
1481                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1482                        pcindex
1483                    );
1484                    item["id"] = id.into();
1485                }
1486                let mut values = "".to_string();
1487                for (_, value) in item.entries() {
1488                    if value.is_string() {
1489                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1490                    } else if value.is_number() || value.is_boolean() {
1491                        values = format!("{values},{value}");
1492                    } else {
1493                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1494                    }
1495                }
1496                values = format!("({})", values.trim_start_matches(","));
1497                array![item["id"].clone(), values]
1498            });
1499        }
1500        let (ids_list, mut values) = p.insert_all();
1501
1502        values = values.trim_start_matches(",").to_string();
1503
1504        let sql = format!(
1505            "INSERT INTO {} ({}) VALUES {};",
1506            self.params.table, fields, values
1507        );
1508        if self.params.sql {
1509            return JsonValue::from(sql.clone());
1510        }
1511        let (state, data) = self.execute(sql.clone());
1512        match state {
1513            true => {
1514                if self.params.autoinc {
1515                    let (state, ids) = self.query(format!(
1516                        "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1517                        self.params.table,
1518                        ids_list.len()
1519                    ));
1520                    return match state {
1521                        true => {
1522                            let mut idlist = array![];
1523                            for item in ids.members() {
1524                                let _ = idlist.push(item["id"].clone());
1525                            }
1526                            idlist
1527                        }
1528                        false => {
1529                            error!("批量添加失败: {ids:?} {sql}");
1530                            array![]
1531                        }
1532                    };
1533                }
1534                JsonValue::from(ids_list)
1535            }
1536            false => {
1537                error!("批量添加失败: {data:?} {sql}");
1538                array![]
1539            }
1540        }
1541    }
1542
1543    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1544        let mut fields = vec![];
1545        let mut values = vec![];
1546
1547        if !self.params.autoinc && data["id"].is_empty() {
1548            let thread_id = format!("{:?}", std::thread::current().id());
1549            let thread_num: u64 = thread_id
1550                .trim_start_matches("ThreadId(")
1551                .trim_end_matches(")")
1552                .parse()
1553                .unwrap_or(0);
1554            data["id"] = format!(
1555                "{:X}{:X}",
1556                Local::now().timestamp_nanos_opt().unwrap_or(0),
1557                thread_num
1558            )
1559            .into();
1560        }
1561        for (field, value) in data.entries() {
1562            fields.push(format!("`{field}`"));
1563
1564            if value.is_string() {
1565                if value.to_string().contains("'") {
1566                    values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1567                    continue;
1568                } else if value.to_string().contains('"') {
1569                    values.push(format!("'{value}'"));
1570                    continue;
1571                } else {
1572                    values.push(format!("\"{value}\""));
1573                    continue;
1574                }
1575            } else if value.is_array() || value.is_object() {
1576                if self.params.json[field].is_empty() {
1577                    values.push(format!("'{value}'"));
1578                } else {
1579                    let json = value.to_string();
1580                    let json = json.replace("'", "''");
1581                    values.push(format!("'{json}'"));
1582                }
1583                continue;
1584            } else if value.is_number() || value.is_boolean() || value.is_null() {
1585                values.push(format!("{value}"));
1586                continue;
1587            } else {
1588                values.push(format!("'{value}'"));
1589                continue;
1590            }
1591        }
1592
1593        let conflict_cols: Vec<String> =
1594            conflict_fields.iter().map(|f| format!("`{}`", f)).collect();
1595
1596        let update_set: Vec<String> = fields
1597            .iter()
1598            .filter(|f| {
1599                let name = f.trim_matches('`');
1600                !conflict_fields.contains(&name) && name != "id"
1601            })
1602            .map(|f| format!("{f}=excluded.{f}"))
1603            .collect();
1604
1605        let fields_str = fields.join(",");
1606        let values_str = values.join(",");
1607
1608        let sql = format!(
1609            "INSERT INTO `{}` ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1610            self.params.table,
1611            fields_str,
1612            values_str,
1613            conflict_cols.join(","),
1614            update_set.join(",")
1615        );
1616        if self.params.sql {
1617            return JsonValue::from(sql.clone());
1618        }
1619        let (state, result) = self.execute(sql);
1620        match state {
1621            true => {
1622                if self.params.autoinc {
1623                    let (state, ids) =
1624                        self.query(format!("select max(id) as id from {}", self.params.table));
1625                    return match state {
1626                        true => ids[0]["id"].clone(),
1627                        false => {
1628                            error!("{ids}");
1629                            JsonValue::from("")
1630                        }
1631                    };
1632                }
1633                data["id"].clone()
1634            }
1635            false => {
1636                error!("upsert失败: {result}");
1637                JsonValue::from("")
1638            }
1639        }
1640    }
1641
1642    fn update(&mut self, data: JsonValue) -> JsonValue {
1643        let mut values = vec![];
1644
1645        for (field, value) in data.entries() {
1646            if value.is_string() {
1647                values.push(format!(
1648                    "`{}` = '{}'",
1649                    field,
1650                    value.to_string().replace("'", "''")
1651                ));
1652            } else if value.is_array() || value.is_object() {
1653                if self.params.json[field].is_empty() {
1654                    values.push(format!("`{field}` = '{value}'"));
1655                } else {
1656                    let json = value.to_string();
1657                    let json = json.replace("'", "''");
1658                    values.push(format!("`{field}` = '{json}'"));
1659                }
1660                continue;
1661            } else if value.is_number() || value.is_boolean() || value.is_null() {
1662                values.push(format!("`{field}` = {value} "));
1663            } else {
1664                values.push(format!("`{field}` = '{value}' "));
1665            }
1666        }
1667
1668        for (field, value) in self.params.inc_dec.entries() {
1669            values.push(format!("{} = {}", field, value.to_string().clone()));
1670        }
1671
1672        let values = values.join(",");
1673        let sql = format!(
1674            "UPDATE `{}` SET {} {} {};",
1675            self.params.table.clone(),
1676            values,
1677            self.params.where_sql(),
1678            self.params.page_limit_sql()
1679        );
1680        if self.params.sql {
1681            return JsonValue::from(sql.clone());
1682        }
1683        let (state, data) = self.execute(sql);
1684        if state {
1685            data
1686        } else {
1687            error!("{data}");
1688            JsonValue::from(0)
1689        }
1690    }
1691
1692    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1693        let mut values = vec![];
1694        let mut ids = vec![];
1695        for (field, _) in data[0].entries() {
1696            if field == "id" {
1697                continue;
1698            }
1699            let mut fields = vec![];
1700            for row in data.members() {
1701                let value = row[field].clone();
1702                let id = row["id"].clone();
1703                ids.push(id.clone());
1704                if value.is_string() {
1705                    fields.push(format!(
1706                        "WHEN '{}' THEN '{}'",
1707                        id,
1708                        value.to_string().replace("'", "''")
1709                    ));
1710                } else if value.is_array() || value.is_object() {
1711                    if self.params.json[field].is_empty() {
1712                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1713                    } else {
1714                        let json = value.to_string();
1715                        let json = json.replace("'", "''");
1716                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1717                    }
1718                    continue;
1719                } else if value.is_number() || value.is_boolean() || value.is_null() {
1720                    fields.push(format!("WHEN '{id}' THEN {value}"));
1721                } else {
1722                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1723                }
1724            }
1725            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1726        }
1727
1728        self.where_and("id", "in", ids.into());
1729        for (field, value) in self.params.inc_dec.entries() {
1730            values.push(format!("{} = {}", field, value.to_string().clone()));
1731        }
1732
1733        let values = values.join(",");
1734        let sql = format!(
1735            "UPDATE {} SET {} {} {};",
1736            self.params.table.clone(),
1737            values,
1738            self.params.where_sql(),
1739            self.params.page_limit_sql()
1740        );
1741        if self.params.sql {
1742            return JsonValue::from(sql.clone());
1743        }
1744        let (state, data) = self.execute(sql);
1745        if state {
1746            data
1747        } else {
1748            error!("{data:?}");
1749            JsonValue::from(0)
1750        }
1751    }
1752    fn delete(&mut self) -> JsonValue {
1753        let sql = format!(
1754            "delete FROM `{}` {};",
1755            self.params.table.clone(),
1756            self.params.where_sql()
1757        );
1758        if self.params.sql {
1759            return JsonValue::from(sql.clone());
1760        }
1761        let (state, data) = self.execute(sql);
1762        match state {
1763            true => data,
1764            false => {
1765                error!("delete 失败>>>{data:?}");
1766                JsonValue::from(0)
1767            }
1768        }
1769    }
1770
1771    fn transaction(&mut self) -> bool {
1772        let thread_id = format!("{:?}", thread::current().id());
1773
1774        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1775            let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1776            SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
1777            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1778            let _ = self.query(sp);
1779            return true;
1780        }
1781
1782        if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
1783            error!("{thread_id} 启动事务失败: 获取写锁超时");
1784            return false;
1785        }
1786
1787        let flags = OpenFlags::new().with_read_write().with_no_mutex();
1788        let db = match Connect::open_thread_safe_with_flags(
1789            self.connection.clone().get_dsn().as_str(),
1790            flags,
1791        ) {
1792            Ok(e) => Arc::new(e),
1793            Err(e) => {
1794                error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
1795                SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
1796                return false;
1797            }
1798        };
1799
1800        SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
1801
1802        let (state, data) = self.query("BEGIN".to_string());
1803        if state {
1804            true
1805        } else {
1806            error!("{thread_id} 启动事务失败: {data}");
1807            SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1808            false
1809        }
1810    }
1811
1812    fn commit(&mut self) -> bool {
1813        let thread_id = format!("{:?}", thread::current().id());
1814
1815        if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1816            error!("{thread_id} 提交事务失败: 没有活跃的事务");
1817            return false;
1818        }
1819
1820        let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1821        if depth > 1 {
1822            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1823            let _ = self.query(sp);
1824            SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1825            return true;
1826        }
1827
1828        let (state, _) = self.query("COMMIT".to_string());
1829        SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1830
1831        if state {
1832            true
1833        } else {
1834            error!("{thread_id} 提交事务失败");
1835            false
1836        }
1837    }
1838
1839    fn rollback(&mut self) -> bool {
1840        let thread_id = format!("{:?}", thread::current().id());
1841
1842        if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1843            error!("{thread_id} 回滚失败: 没有活跃的事务");
1844            return false;
1845        }
1846
1847        let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1848        if depth > 1 {
1849            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1850            let _ = self.query(sp);
1851            SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1852            return true;
1853        }
1854
1855        let (state, _) = self.query("ROLLBACK".to_string());
1856        SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1857
1858        if state {
1859            true
1860        } else {
1861            error!("回滚失败: {thread_id}");
1862            false
1863        }
1864    }
1865
1866    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1867        let (state, data) = self.query(sql.to_string());
1868        match state {
1869            true => Ok(data),
1870            false => Err(data.to_string()),
1871        }
1872    }
1873
1874    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1875        let (state, data) = self.execute(sql.to_string());
1876        match state {
1877            true => Ok(data),
1878            false => Err(data.to_string()),
1879        }
1880    }
1881
1882    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1883        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1884        self
1885    }
1886
1887    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1888        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1889        self
1890    }
1891
1892    fn buildsql(&mut self) -> String {
1893        self.fetch_sql();
1894        let sql = self.select().to_string();
1895        format!("( {} ) `{}`", sql, self.params.table)
1896    }
1897
1898    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1899        for field in fields {
1900            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1901        }
1902        self
1903    }
1904
1905    fn join(
1906        &mut self,
1907        main_table: &str,
1908        main_fields: &str,
1909        right_table: &str,
1910        right_fields: &str,
1911    ) -> &mut Self {
1912        let main_table = if main_table.is_empty() {
1913            self.params.table.clone()
1914        } else {
1915            main_table.to_string()
1916        };
1917        self.params.join_table = right_table.to_string();
1918        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1919        self
1920    }
1921
1922    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1923        let main_fields = if main_fields.is_empty() {
1924            "id"
1925        } else {
1926            main_fields
1927        };
1928        let second_fields = if second_fields.is_empty() {
1929            self.params.table.clone()
1930        } else {
1931            second_fields.to_string().clone()
1932        };
1933        let sec_table_name = format!("{}{}", table, "_2");
1934        let second_table = format!("{} {}", table, sec_table_name.clone());
1935        self.params.join_table = sec_table_name.clone();
1936        self.params.join.push(format!(
1937            " INNER JOIN {} ON {}.{} = {}.{}",
1938            second_table, self.params.table, main_fields, sec_table_name, second_fields
1939        ));
1940        self
1941    }
1942
1943    fn join_right(
1944        &mut self,
1945        main_table: &str,
1946        main_fields: &str,
1947        right_table: &str,
1948        right_fields: &str,
1949    ) -> &mut Self {
1950        let main_table = if main_table.is_empty() {
1951            self.params.table.clone()
1952        } else {
1953            main_table.to_string()
1954        };
1955        self.params.join_table = right_table.to_string();
1956        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1957        self
1958    }
1959
1960    fn join_full(
1961        &mut self,
1962        main_table: &str,
1963        main_fields: &str,
1964        right_table: &str,
1965        right_fields: &str,
1966    ) -> &mut Self {
1967        let main_table = if main_table.is_empty() {
1968            self.params.table.clone()
1969        } else {
1970            main_table.to_string()
1971        };
1972        self.params.join_table = right_table.to_string();
1973        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1974        self
1975    }
1976
1977    fn union(&mut self, sub_sql: &str) -> &mut Self {
1978        self.params.unions.push(format!("UNION {sub_sql}"));
1979        self
1980    }
1981
1982    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1983        self.params.unions.push(format!("UNION ALL {sub_sql}"));
1984        self
1985    }
1986
1987    fn lock_for_update(&mut self) -> &mut Self {
1988        self.params.lock_mode = "FOR UPDATE".to_string();
1989        self
1990    }
1991
1992    fn lock_for_share(&mut self) -> &mut Self {
1993        self.params.lock_mode = "FOR SHARE".to_string();
1994        self
1995    }
1996}