Skip to main content

br_db/types/
sqlite.rs

1use crate::pools;
2use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::Connection;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15
16lazy_static! {
17    static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
18    static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
19    static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
20}
21
22#[derive(Clone, Debug)]
23pub struct Sqlite {
24    /// 当前连接配置
25    pub connection: Connection,
26    /// 当前选中配置
27    pub default: String,
28
29    pub params: Params,
30}
31
32impl Sqlite {
33    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
34        let dsn = connection.clone().get_dsn();
35        let db_path = Path::new(&dsn);
36        if let Some(parent) = db_path.parent() {
37            if !parent.as_os_str().is_empty() && !parent.exists() {
38                if let Err(e) = std::fs::create_dir_all(parent) {
39                    error!("sqlite 创建目录失败: {} {:?}", e, parent);
40                    return Err(e.to_string());
41                }
42                info!("sqlite 自动创建目录: {:?}", parent);
43            }
44        }
45
46        let flags = OpenFlags::new().with_create().with_read_write();
47        match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
48            Ok(e) => {
49                if let Ok(mut guard) = DBS.lock() {
50                    guard.insert(default.clone(), Arc::new(e));
51                } else {
52                    error!("sqlite 获取数据库锁失败");
53                    return Err("获取数据库锁失败".to_string());
54                }
55                Ok(Self {
56                    connection: connection.clone(),
57                    default: default.clone(),
58                    params: Params::default("sqlite"),
59                })
60            }
61            Err(e) => {
62                error!("sqlite 启动失败: {} {}", e, dsn.as_str());
63                Err(e.to_string())
64            }
65        }
66    }
67    /// Static helper to process query results - can be called from closures
68    fn query_handle_static(
69        mut statement: Statement,
70        sql: &str,
71        table: &str,
72        thread_id: &str,
73    ) -> (bool, JsonValue) {
74        let mut data = array![];
75        while let State::Row = match statement.next() {
76            Ok(e) => e,
77            Err(e) => {
78                let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
79                if in_transaction {
80                    error!("{} 查询事务: {} {}", thread_id, e, sql);
81                } else {
82                    error!("{} 非事务查询: {} {}", thread_id, e, sql);
83                }
84                return (false, data);
85            }
86        } {
87            let mut list = object! {};
88            let mut index = 0;
89            for field in statement.column_names().iter() {
90                if !list[field.as_str()].is_null() {
91                    index += 1;
92                    continue;
93                }
94                match statement.column_type(field.as_str()) {
95                    Ok(types) => match types {
96                        Type::String => {
97                            if let Ok(data) = statement.read::<String, _>(index) {
98                                match data.as_str() {
99                                    "false" => {
100                                        list[field.as_str()] = JsonValue::from(false);
101                                    }
102                                    "true" => {
103                                        list[field.as_str()] = JsonValue::from(true);
104                                    }
105                                    _ => {
106                                        list[field.as_str()] = JsonValue::from(data.clone());
107                                    }
108                                }
109                            }
110                        }
111                        Type::Integer => {
112                            let fields_cache =
113                                FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
114                            if let Some(fields) = fields_cache {
115                                if fields[field.clone()].is_empty() {
116                                    if let Ok(data) = statement.read::<i64, _>(index) {
117                                        list[field.as_str()] = data.into();
118                                    }
119                                    index += 1;
120                                    continue;
121                                }
122                                let field_type =
123                                    fields[field.clone()]["type"].as_str().unwrap_or("");
124                                match field_type {
125                                    "INTEGER" => {
126                                        if let Ok(data) = statement.read::<i64, _>(index) {
127                                            list[field.as_str()] = JsonValue::from(data == 1);
128                                        }
129                                    }
130                                    x if x.contains("int(") => {
131                                        if let Ok(data) = statement.read::<i64, _>(index) {
132                                            list[field.as_str()] = data.into();
133                                        }
134                                    }
135                                    x if x.contains("decimal(") && x.ends_with(",0)") => {
136                                        if let Ok(data) = statement.read::<f64, _>(index) {
137                                            list[field.as_str()] = data.into();
138                                        }
139                                    }
140                                    _ => {
141                                        if let Ok(data) = statement.read::<i64, _>(index) {
142                                            list[field.as_str()] = data.into();
143                                        }
144                                    }
145                                }
146                            } else if let Ok(data) = statement.read::<i64, _>(index) {
147                                list[field.as_str()] = JsonValue::from(data);
148                            }
149                        }
150                        Type::Float => {
151                            if let Ok(data) = statement.read::<f64, _>(index) {
152                                list[field.as_str()] = JsonValue::from(data);
153                            }
154                        }
155                        Type::Binary => {
156                            if let Ok(data) = statement.read::<String, _>(index) {
157                                list[field.as_str()] = JsonValue::from(data.clone());
158                            }
159                        }
160                        Type::Null => match statement.read::<String, _>(index) {
161                            Ok(data) => {
162                                list[field.as_str()] = JsonValue::from(data.clone());
163                            }
164                            Err(_) => match statement.read::<f64, _>(index) {
165                                Ok(data) => {
166                                    if data == 0.0 {
167                                        list[field.as_str()] = JsonValue::from("");
168                                    } else {
169                                        list[field.as_str()] = JsonValue::from(data);
170                                    }
171                                }
172                                Err(_) => match statement.read::<i64, _>(index) {
173                                    Ok(data) => {
174                                        if data == 0 {
175                                            list[field.as_str()] = JsonValue::from("");
176                                        } else {
177                                            list[field.as_str()] = JsonValue::from(data);
178                                        }
179                                    }
180                                    Err(e) => {
181                                        error!("Type:{} {:?}", field.as_str(), e);
182                                    }
183                                },
184                            },
185                        },
186                    },
187                    Err(e) => {
188                        error!("query Err: {e:?}");
189                    }
190                }
191                index += 1;
192            }
193            let _ = data.push(list);
194        }
195        (true, data)
196    }
197
198    pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
199        let thread_id = format!("{:?}", thread::current().id());
200        let table = self.params.table.clone();
201
202        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
203            if self.connection.debug {
204                info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
205            }
206
207            let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
208                match db.prepare(sql.clone()) {
209                    Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
210                    Err(e) => {
211                        error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
212                        (false, e.to_string().into())
213                    }
214                }
215            });
216
217            match result {
218                Some(r) => r,
219                None => {
220                    error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
221                    (false, JsonValue::from("未找到事务连接"))
222                }
223            }
224        } else {
225            if self.connection.debug {
226                info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
227            }
228            let dbs = match DBS.lock() {
229                Ok(dbs) => dbs,
230                Err(e) => {
231                    error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
232                    return (false, JsonValue::from("数据库锁定失败"));
233                }
234            };
235            let db = match dbs.get(&self.default) {
236                Some(db) => db.clone(),
237                None => {
238                    error!(
239                        "{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
240                        self.default
241                    );
242                    return (false, JsonValue::from("未找到默认数据库配置"));
243                }
244            };
245            drop(dbs);
246            let result = match db.prepare(sql.clone()) {
247                Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
248                Err(e) => {
249                    error!("{thread_id} 查询非事务: Err: {e}");
250                    (false, e.to_string().into())
251                }
252            };
253            result
254        }
255    }
256
257    pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
258        let thread_id = format!("{:?}", thread::current().id());
259
260        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
261            if self.connection.debug {
262                info!("{} 执行事务: sql: {}", thread_id, sql.clone());
263            }
264
265            let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
266                match db.execute(sql.clone()) {
267                    Ok(_) => {
268                        let count = db.change_count();
269                        (true, JsonValue::from(count))
270                    }
271                    Err(e) => (false, JsonValue::from(e.to_string())),
272                }
273            });
274
275            match result {
276                Some((true, count)) => {
277                    if self.connection.debug {
278                        info!("{} count:{}", thread_id, count);
279                    }
280                    (true, count)
281                }
282                Some((false, err)) => {
283                    error!(
284                        "{} 执行事务: \r\nErr: {}\r\n{}",
285                        thread_id,
286                        err,
287                        sql.clone()
288                    );
289                    (false, err)
290                }
291                None => {
292                    error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
293                    (false, JsonValue::from("未找到事务连接"))
294                }
295            }
296        } else {
297            if self.connection.debug {
298                info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
299            }
300            let dbs = match DBS.lock() {
301                Ok(dbs) => dbs,
302                Err(e) => {
303                    error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
304                    return (false, JsonValue::from("数据库锁定失败"));
305                }
306            };
307
308            let db = match dbs.get(&self.default) {
309                Some(db) => db.clone(),
310                None => {
311                    error!(
312                        "{} 未找到默认数据库配置: {}\r\nSQL: {}",
313                        thread_id, self.default, sql
314                    );
315                    return (false, JsonValue::from("未找到默认数据库配置"));
316                }
317            };
318            drop(dbs);
319            match db.execute(sql.clone()) {
320                Ok(_) => {
321                    let count = db.change_count();
322                    if self.connection.debug {
323                        info!(
324                            "{} count: {} total_count: {}",
325                            thread_id,
326                            count,
327                            db.total_change_count()
328                        );
329                    }
330                    (true, JsonValue::from(count))
331                }
332                Err(e) => {
333                    error!(
334                        "{} 执行非事务: Err: {}\r\nSQL: {}",
335                        thread_id,
336                        e,
337                        sql.clone()
338                    );
339                    (false, JsonValue::from(e.to_string()))
340                }
341            }
342        }
343    }
344}
345
346impl DbMode for Sqlite {
347    fn database_tables(&mut self) -> JsonValue {
348        let sql = "select name from sqlite_master where type='table' order by name;".to_string();
349        match self.sql(sql.as_str()) {
350            Ok(e) => {
351                let mut list = vec![];
352                for item in e.members() {
353                    list.push(item["name"].clone());
354                }
355                list.into()
356            }
357            Err(_) => {
358                array![]
359            }
360        }
361    }
362    fn database_create(&mut self, name: &str) -> bool {
363        let current_dsn = self.connection.clone().get_dsn();
364        let current_path = Path::new(&current_dsn);
365
366        let new_path = if let Some(parent) = current_path.parent() {
367            if parent.as_os_str().is_empty() {
368                Path::new(name).to_path_buf()
369            } else {
370                parent.join(name)
371            }
372        } else {
373            Path::new(name).to_path_buf()
374        };
375
376        if let Some(parent) = new_path.parent() {
377            if !parent.as_os_str().is_empty() && !parent.exists() {
378                if let Err(e) = std::fs::create_dir_all(parent) {
379                    error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
380                    return false;
381                }
382            }
383        }
384
385        let flags = OpenFlags::new().with_create().with_read_write();
386        match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
387            Ok(_) => true,
388            Err(e) => {
389                error!(
390                    "sqlite database_create 创建数据库失败: {} {:?}",
391                    e, new_path
392                );
393                false
394            }
395        }
396    }
397
398    fn truncate(&mut self, table: &str) -> bool {
399        let sql = format!("DELETE FROM `{table}`");
400        let (state, _) = self.execute(sql);
401        state
402    }
403}
404
405impl Mode for Sqlite {
406    fn table_create(&mut self, options: TableOptions) -> JsonValue {
407        let mut sql = String::new();
408        let mut unique_fields = String::new();
409        let mut unique_name = String::new();
410        let mut unique = String::new();
411        for item in options.table_unique.iter() {
412            if unique_fields.is_empty() {
413                unique_fields = format!("`{item}`");
414                unique_name = format!("unique_{item}");
415            } else {
416                unique_fields = format!("{unique_fields},`{item}`");
417                unique_name = format!("{unique_name}_{item}");
418            }
419            unique = format!(
420                "CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
421                unique_name, options.table_name, unique_fields
422            );
423        }
424
425        let mut index = vec![];
426        for row in options.table_index.iter() {
427            let mut index_fields = String::new();
428            let mut index_name = String::new();
429            for item in row.iter() {
430                if index_fields.is_empty() {
431                    index_fields = format!("`{item}`");
432                    index_name = format!("index_{item}");
433                } else {
434                    index_fields = format!("{index_fields},`{item}`");
435                    index_name = format!("{index_name}_{item}");
436                }
437            }
438            index.push(format!(
439                "CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
440                index_name, options.table_name, index_fields
441            ));
442        }
443
444        for (name, field) in options.table_fields.entries() {
445            let row = br_fields::field("sqlite", name, field.clone());
446            sql = format!("{sql} {row},\r\n");
447        }
448
449        sql = sql.trim_end_matches(",\r\n").to_string();
450
451        let sql = format!(
452            "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
453            options.table_name, sql
454        );
455        if self.params.sql {
456            let mut list = vec![sql];
457            if !unique.is_empty() {
458                list.push(unique)
459            }
460            if !index.is_empty() {
461                list.extend(index)
462            }
463
464            return JsonValue::from(list.join(""));
465        }
466
467        let thread_id = format!("{:?}", thread::current().id());
468
469        let (_, table_exists) = self.query(format!(
470            "SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
471            options.table_name
472        ));
473        let is_new_table = table_exists.is_empty();
474
475        let (state, _) = self.execute(sql.clone());
476        if state {
477            if is_new_table {
478                if !unique.is_empty() {
479                    let (state, _) = self.execute(unique.clone());
480                    info!(
481                        "{} {} 唯一索引创建:{}",
482                        thread_id, options.table_name, state
483                    );
484                }
485                for sql in index.iter() {
486                    let (state, _) = self.execute(sql.clone());
487                    info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
488                }
489            }
490            JsonValue::from(true)
491        } else {
492            JsonValue::from(false)
493        }
494    }
495    fn table_update(&mut self, options: TableOptions) -> JsonValue {
496        let thread_id = format!("{:?}", thread::current().id());
497
498        let mut sql = String::new();
499        let mut add = vec![];
500        let mut del = vec![];
501        let mut put = vec![];
502
503        let (_, mut fields_list) =
504            self.query(format!("pragma table_info ('{}')", options.table_name));
505        let mut field_old = object! {};
506        for item in fields_list.members_mut() {
507            item["dflt_value"] = item["dflt_value"]
508                .to_string()
509                .trim_start_matches("'")
510                .trim_end_matches("'")
511                .into();
512            if let Some(name) = item["name"].as_str() {
513                field_old[name] = item.clone();
514                if options.table_fields[name].is_empty() {
515                    del.push(name.to_string());
516                }
517            }
518        }
519
520        let mut fields_list = vec![];
521        let mut fields_list_new = vec![];
522
523        for (name, field) in options.table_fields.entries() {
524            if field_old[name].is_empty() {
525                add.push(name.to_string());
526            } else {
527                fields_list.push(name.to_string());
528                fields_list_new.push(name.to_string());
529                let field_mode = field["mode"].as_str().unwrap_or("");
530                let old_value = match field_mode {
531                    "select" => {
532                        if field_old[name]["dflt_value"].clone().is_empty() {
533                            "".to_string()
534                        } else {
535                            field_old[name]["dflt_value"].clone().to_string()
536                        }
537                    }
538                    "switch" => (field_old[name]["dflt_value"]
539                        .to_string()
540                        .parse::<i32>()
541                        .unwrap_or(0)
542                        == 1)
543                        .to_string(),
544                    _ => field_old[name]["dflt_value"].clone().to_string(),
545                };
546                let new_value = match field_mode {
547                    "select" => field["def"]
548                        .members()
549                        .map(|x| x.to_string())
550                        .collect::<Vec<String>>()
551                        .join(","),
552                    _ => field["def"].clone().to_string(),
553                };
554                if old_value != new_value {
555                    info!(
556                        "{} 差异化当前: {} old_value: {} new_value: {} {}",
557                        options.table_name,
558                        name,
559                        old_value,
560                        new_value,
561                        old_value != new_value
562                    );
563                    info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
564                    put.push(name.to_string());
565                } else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
566                    && name != options.table_key
567                {
568                    info!("{} 主键替换: {}", options.table_name, name);
569                    put.push(name.to_string());
570                }
571            }
572        }
573
574        let mut unique_fields = String::new();
575        let mut unique_name = String::new();
576        let mut unique = String::new();
577
578        // 唯一索引
579        for item in options.table_unique.iter() {
580            if unique_fields.is_empty() {
581                unique_fields = format!("`{item}`");
582                unique_name = format!("unique_{item}");
583            } else {
584                unique_fields = format!("{unique_fields},`{item}`");
585                unique_name = format!("{unique_name}_{item}");
586            }
587            unique = format!(
588                "CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
589                options.table_name, unique_name, options.table_name, unique_fields
590            )
591        }
592
593        // 索引
594        let mut index = vec![];
595        for row in options.table_index.iter() {
596            let mut index_fields = String::new();
597            let mut index_name = String::new();
598            for item in row.iter() {
599                if index_fields.is_empty() {
600                    index_fields = item.to_string();
601                    index_name = format!("index_{item}");
602                } else {
603                    index_fields = format!("{index_fields},{item}");
604                    index_name = format!("{index_name}_{item}");
605                }
606            }
607            index.push(format!(
608                "CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
609                options.table_name, index_name, options.table_name, index_fields
610            ));
611        }
612        for (name, field) in options.table_fields.entries() {
613            let row = br_fields::field("sqlite", name, field.clone());
614            sql = format!("{sql} {row},\r\n");
615        }
616
617        if !unique.is_empty() || !index.is_empty() {
618            let unique_text = unique.clone();
619            let (_, unique_old) =
620                self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
621            let mut index_old_list = vec![];
622            let mut index_new_list = vec![];
623            for item in unique_old.members() {
624                let origin = item["origin"].as_str().unwrap_or("");
625                let unique_1 = item["unique"].as_usize().unwrap_or(0);
626                let name = item["name"].as_str().unwrap_or("");
627
628                if origin == "c" && unique_1 == 1 {
629                    if unique.contains(format!(" {name} ").as_str()) {
630                        unique = "".to_string();
631                    }
632                    continue;
633                }
634                if origin == "c" && unique_1 == 0 {
635                    index_old_list.push(item);
636                    for item in index.iter() {
637                        if item.contains(format!(" {name} ").as_str()) {
638                            index_new_list.push(item.clone());
639                        }
640                    }
641                    continue;
642                }
643            }
644            if unique.is_empty() {
645                if index_old_list.len() == index.len()
646                    && index_old_list.len() == index_new_list.len()
647                {
648                    index = vec![];
649                } else {
650                    unique = unique_text;
651                }
652            }
653        }
654
655        sql = sql.trim_end_matches(",\r\n").to_string();
656        sql = format!(
657            "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
658            options.table_name, sql
659        );
660
661        let sqls = format!(
662            "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
663            options.table_name,
664            fields_list_new.join("`,`"),
665            fields_list.join("`,`")
666        );
667        let drop_sql = format!("drop table {};\r\n", options.table_name);
668        let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
669        let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
670
671        if self.params.sql {
672            let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
673            if !unique.is_empty() {
674                list.push(unique)
675            }
676            if !index.is_empty() {
677                list.extend(index)
678            }
679            return JsonValue::from(list.join(""));
680        }
681
682        if add.is_empty()
683            && del.is_empty()
684            && unique.is_empty()
685            && index.is_empty()
686            && put.is_empty()
687        {
688            return JsonValue::from(-1);
689        }
690
691        let (state, _) = self.execute(sql.clone());
692        let data = match state {
693            true => {
694                let (state, _) = self.execute(sqls.clone());
695                match state {
696                    true => {
697                        let (state, _) = self.execute(drop_sql);
698                        match state {
699                            true => {
700                                let (state, _) = self.execute(alter_sql);
701                                match state {
702                                    true => {
703                                        if !unique.is_empty() {
704                                            let (state, _) = self.execute(unique.clone());
705                                            info!(
706                                                "{} {} 唯一索引创建:{}",
707                                                thread_id, options.table_name, state
708                                            );
709                                        }
710                                        for index_sql in index.iter() {
711                                            let (state, _) = self.execute(index_sql.clone());
712                                            match state {
713                                                true => {}
714                                                false => {
715                                                    error!(
716                                                        "{} 索引创建失败: {} {}",
717                                                        options.table_name, state, index_sql
718                                                    );
719                                                    return JsonValue::from(0);
720                                                }
721                                            };
722                                        }
723
724                                        return JsonValue::from(1);
725                                    }
726                                    false => {
727                                        error!("{} 修改表名失败", options.table_name);
728                                        return JsonValue::from(0);
729                                    }
730                                }
731                            }
732                            false => {
733                                error!("{} 删除本表失败", options.table_name);
734                                return JsonValue::from(0);
735                            }
736                        }
737                    }
738                    false => {
739                        error!(
740                            "{} 添加tmp表记录失败 {:#} {:#}",
741                            options.table_name, sql, sqls
742                        );
743                        let sql = format!("drop table {}_tmp", options.table_name);
744                        let (_, _) = self.execute(sql);
745                        0
746                    }
747                }
748            }
749            false => {
750                error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
751                let (_, _) = self.execute(drop_sql_temp);
752                0
753            }
754        };
755        JsonValue::from(data)
756    }
757
758    fn table_info(&mut self, table: &str) -> JsonValue {
759        if let Ok(guard) = FIELDS.lock() {
760            if let Some(cached) = guard.get(table) {
761                return cached.clone();
762            }
763        }
764        let sql = format!("PRAGMA table_info({table})");
765        let (state, data) = self.query(sql);
766
767        match state {
768            true => {
769                let mut fields = object! {};
770                for item in data.members() {
771                    if let Some(name) = item["name"].as_str() {
772                        fields[name] = item.clone();
773                    }
774                }
775                if let Ok(mut guard) = FIELDS.lock() {
776                    guard.insert(table.to_string(), fields.clone());
777                }
778                data
779            }
780            false => object! {},
781        }
782    }
783
784    fn table_is_exist(&mut self, name: &str) -> bool {
785        let sql = format!(
786            "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
787        );
788        let (state, data) = self.query(sql);
789        if state && !data.is_empty() {
790            let count = data[0]["count"].as_i64().unwrap_or(0);
791            return count > 0;
792        }
793        false
794    }
795
796    fn table(&mut self, name: &str) -> &mut Sqlite {
797        self.params = Params::default(self.connection.mode.str().as_str());
798        let table_name = format!("{}{}", self.connection.prefix, name);
799        if !super::sql_safety::validate_table_name(&table_name) {
800            error!("Invalid table name: {}", name);
801        }
802        self.params.table = table_name.clone();
803        self.params.join_table = table_name;
804        self
805    }
806    fn change_table(&mut self, name: &str) -> &mut Self {
807        self.params.join_table = name.to_string();
808        self
809    }
810    fn autoinc(&mut self) -> &mut Self {
811        self.params.autoinc = true;
812        self
813    }
814
815    fn timestamps(&mut self) -> &mut Self {
816        self.params.timestamps = true;
817        self
818    }
819
820    fn fetch_sql(&mut self) -> &mut Self {
821        self.params.sql = true;
822        self
823    }
824
825    fn order(&mut self, field: &str, by: bool) -> &mut Self {
826        self.params.order[field] = {
827            if by {
828                "DESC"
829            } else {
830                "ASC"
831            }
832        }
833        .into();
834        self
835    }
836
837    fn group(&mut self, field: &str) -> &mut Self {
838        let fields: Vec<&str> = field.split(",").collect();
839        for field in fields.iter() {
840            let fields = field.to_string();
841            self.params.group[fields.as_str()] = fields.clone().into();
842            self.params.fields[fields.as_str()] = fields.clone().into();
843        }
844        self
845    }
846
847    fn distinct(&mut self) -> &mut Self {
848        self.params.distinct = true;
849        self
850    }
851    fn json(&mut self, field: &str) -> &mut Self {
852        let list: Vec<&str> = field.split(",").collect();
853        for item in list.iter() {
854            self.params.json[item.to_string().as_str()] = item.to_string().into();
855        }
856        self
857    }
858
859    fn location(&mut self, field: &str) -> &mut Self {
860        let list: Vec<&str> = field.split(",").collect();
861        for item in list.iter() {
862            self.params.location[item.to_string().as_str()] = item.to_string().into();
863        }
864        self
865    }
866
867    fn field(&mut self, field: &str) -> &mut Self {
868        let list: Vec<&str> = field.split(",").collect();
869        let join_table = if self.params.join_table.is_empty() {
870            self.params.table.clone()
871        } else {
872            self.params.join_table.clone()
873        };
874        for item in list.iter() {
875            let item = item.to_string();
876            if item.contains(" as ") {
877                let text = item.split(" as ").collect::<Vec<&str>>().clone();
878                self.params.fields[item] =
879                    format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
880            } else {
881                self.params.fields[item] = format!("{join_table}.`{item}`").into();
882            }
883        }
884        self
885    }
886
887    fn field_raw(&mut self, expr: &str) -> &mut Self {
888        self.params.fields[expr] = expr.into();
889        self
890    }
891
892    fn hidden(&mut self, name: &str) -> &mut Self {
893        let hidden: Vec<&str> = name.split(",").collect();
894        let sql = format!("PRAGMA table_info({})", self.params.table);
895        let (_, data) = self.query(sql);
896        for item in data.members() {
897            if let Some(name) = item["name"].as_str() {
898                if !hidden.contains(&name) {
899                    self.params.fields[name] = name.into();
900                }
901            }
902        }
903        self
904    }
905
906    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
907        for f in field.split('|') {
908            if !super::sql_safety::validate_field_name(f) {
909                error!("Invalid field name: {}", f);
910            }
911        }
912        if !super::sql_safety::validate_compare_orator(compare) {
913            error!("Invalid compare operator: {}", compare);
914        }
915        let join_table = if self.params.join_table.is_empty() {
916            self.params.table.clone()
917        } else {
918            self.params.join_table.clone()
919        };
920
921        if value.is_boolean() {
922            if value.as_bool().unwrap_or(false) {
923                value = 1.into();
924            } else {
925                value = 0.into();
926            }
927        }
928
929        match compare {
930            "between" => {
931                self.params.where_and.push(format!(
932                    "{}.`{}` between '{}' AND '{}'",
933                    join_table, field, value[0], value[1]
934                ));
935            }
936            "set" => {
937                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
938                let mut wheredata = vec![];
939                for item in list.iter() {
940                    wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
941                }
942                self.params
943                    .where_and
944                    .push(format!("({})", wheredata.join(" or ")));
945            }
946            "notin" => {
947                let mut text = String::new();
948                for item in value.members() {
949                    text = format!("{text},'{item}'");
950                }
951                text = text.trim_start_matches(",").into();
952                self.params
953                    .where_and
954                    .push(format!("{join_table}.`{field}` not in ({text})"));
955            }
956            "in" => {
957                let mut text = String::new();
958                if value.is_array() {
959                    for item in value.members() {
960                        text = format!("{text},'{item}'");
961                    }
962                } else {
963                    let value = value.to_string();
964                    let value: Vec<&str> = value.split(",").collect();
965                    for item in value.iter() {
966                        text = format!("{text},'{item}'");
967                    }
968                }
969                text = text.trim_start_matches(",").into();
970
971                self.params
972                    .where_and
973                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
974            }
975            "=" => {
976                if value.is_null() {
977                    self.params
978                        .where_and
979                        .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
980                } else {
981                    self.params
982                        .where_and
983                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
984                }
985            }
986            // JSON 数组包含查询:LIKE 模糊匹配(SQLite 无原生 JSON 函数)
987            // 用法:.where_and("tags", "json_contains", "紧急".into())
988            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
989            "json_contains" => {
990                if value.is_array() {
991                    if value.is_empty() {
992                        self.params.where_and.push("1=0".to_string());
993                    } else {
994                        let mut parts = vec![];
995                        for item in value.members() {
996                            let escaped = super::sql_safety::escape_string(&item.to_string());
997                            parts
998                                .push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
999                        }
1000                        self.params
1001                            .where_and
1002                            .push(format!("({})", parts.join(" OR ")));
1003                    }
1004                } else {
1005                    let escaped = super::sql_safety::escape_string(&value.to_string());
1006                    self.params
1007                        .where_and
1008                        .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1009                }
1010            }
1011            _ => {
1012                if value.is_null() {
1013                    self.params
1014                        .where_and
1015                        .push(format!("{join_table}.`{field}` {compare} {value}"));
1016                } else {
1017                    self.params
1018                        .where_and
1019                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1020                }
1021            }
1022        }
1023        self
1024    }
1025    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1026        for f in field.split('|') {
1027            if !super::sql_safety::validate_field_name(f) {
1028                error!("Invalid field name: {}", f);
1029            }
1030        }
1031        if !super::sql_safety::validate_compare_orator(compare) {
1032            error!("Invalid compare operator: {}", compare);
1033        }
1034        let join_table = if self.params.join_table.is_empty() {
1035            self.params.table.clone()
1036        } else {
1037            self.params.join_table.clone()
1038        };
1039
1040        if value.is_boolean() {
1041            if value.as_bool().unwrap_or(false) {
1042                value = 1.into();
1043            } else {
1044                value = 0.into();
1045            }
1046        }
1047        match compare {
1048            "between" => {
1049                self.params.where_or.push(format!(
1050                    "{}.`{}` between '{}' AND '{}'",
1051                    join_table, field, value[0], value[1]
1052                ));
1053            }
1054            "set" => {
1055                let tt = value.to_string().replace(",", "%");
1056                self.params
1057                    .where_or
1058                    .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1059            }
1060            "notin" => {
1061                let mut text = String::new();
1062                for item in value.members() {
1063                    text = format!("{text},'{item}'");
1064                }
1065                text = text.trim_start_matches(",").into();
1066                self.params
1067                    .where_or
1068                    .push(format!("{join_table}.`{field}` not in ({text})"));
1069            }
1070            "in" => {
1071                let mut text = String::new();
1072                if value.is_array() {
1073                    for item in value.members() {
1074                        text = format!("{text},'{item}'");
1075                    }
1076                } else {
1077                    let value = value.as_str().unwrap_or("");
1078                    let value: Vec<&str> = value.split(",").collect();
1079                    for item in value.iter() {
1080                        text = format!("{text},'{item}'");
1081                    }
1082                }
1083                text = text.trim_start_matches(",").into();
1084                self.params
1085                    .where_or
1086                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1087            }
1088            // JSON 数组包含查询:LIKE 模糊匹配(SQLite 无原生 JSON 函数)
1089            // 用法:.where_or("tags", "json_contains", "紧急".into())
1090            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1091            "json_contains" => {
1092                if value.is_array() {
1093                    if value.is_empty() {
1094                        self.params.where_or.push("1=0".to_string());
1095                    } else {
1096                        let mut parts = vec![];
1097                        for item in value.members() {
1098                            let escaped = super::sql_safety::escape_string(&item.to_string());
1099                            parts
1100                                .push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
1101                        }
1102                        self.params
1103                            .where_or
1104                            .push(format!("({})", parts.join(" OR ")));
1105                    }
1106                } else {
1107                    let escaped = super::sql_safety::escape_string(&value.to_string());
1108                    self.params
1109                        .where_or
1110                        .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1111                }
1112            }
1113            _ => {
1114                self.params
1115                    .where_or
1116                    .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1117            }
1118        }
1119        self
1120    }
1121    fn where_raw(&mut self, expr: &str) -> &mut Self {
1122        self.params.where_and.push(expr.to_string());
1123        self
1124    }
1125
1126    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1127        self.params
1128            .where_and
1129            .push(format!("`{field}` IN ({sub_sql})"));
1130        self
1131    }
1132
1133    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1134        self.params
1135            .where_and
1136            .push(format!("`{field}` NOT IN ({sub_sql})"));
1137        self
1138    }
1139
1140    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1141        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1142        self
1143    }
1144
1145    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1146        self.params
1147            .where_and
1148            .push(format!("NOT EXISTS ({sub_sql})"));
1149        self
1150    }
1151
1152    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1153        self.params.where_column = format!(
1154            "{}.`{}` {} {}.`{}`",
1155            self.params.table, field_a, compare, self.params.table, field_b
1156        );
1157        self
1158    }
1159
1160    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1161        self.params
1162            .update_column
1163            .push(format!("{field_a} = {compare}"));
1164        self
1165    }
1166
1167    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1168        self.params.page = page;
1169        self.params.limit = limit;
1170        self
1171    }
1172
1173    fn limit(&mut self, count: i32) -> &mut Self {
1174        self.params.limit_only = count;
1175        self
1176    }
1177
1178    fn column(&mut self, field: &str) -> JsonValue {
1179        self.field(field);
1180        self.group(field);
1181        let sql = self.params.select_sql();
1182        if self.params.sql {
1183            return JsonValue::from(sql.clone());
1184        }
1185        self.table_info(self.params.table.clone().as_str());
1186        let (state, data) = self.query(sql);
1187        if state {
1188            let mut list = array![];
1189            for item in data.members() {
1190                if self.params.json[field].is_empty() {
1191                    let _ = list.push(item[field].clone());
1192                } else {
1193                    let data =
1194                        json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1195                    let _ = list.push(data);
1196                }
1197            }
1198            list
1199        } else {
1200            array![]
1201        }
1202    }
1203
1204    fn count(&mut self) -> JsonValue {
1205        self.params.fields["count"] = "count(*) as count".to_string().into();
1206        let sql = self.params.select_sql();
1207        if self.params.sql {
1208            return JsonValue::from(sql.clone());
1209        }
1210        let (state, data) = self.query(sql);
1211        match state {
1212            true => data[0]["count"].clone(),
1213            false => JsonValue::from(0),
1214        }
1215    }
1216
1217    fn max(&mut self, field: &str) -> JsonValue {
1218        self.params.fields[field] = format!("max({field}) as {field}").into();
1219        let sql = self.params.select_sql();
1220        if self.params.sql {
1221            return JsonValue::from(sql.clone());
1222        }
1223        let (state, data) = self.query(sql);
1224        if state {
1225            if data.len() > 1 {
1226                return data;
1227            }
1228            data[0][field].clone()
1229        } else {
1230            JsonValue::from(0.0)
1231        }
1232    }
1233
1234    fn min(&mut self, field: &str) -> JsonValue {
1235        self.params.fields[field] = format!("min({field}) as {field}").into();
1236        let sql = self.params.select_sql();
1237        let (state, data) = self.query(sql);
1238        if state {
1239            if data.len() > 1 {
1240                return data;
1241            }
1242            data[0][field].clone()
1243        } else {
1244            JsonValue::from(0.0)
1245        }
1246    }
1247
1248    fn sum(&mut self, field: &str) -> JsonValue {
1249        self.params.fields[field] = format!("sum({field}) as {field}").into();
1250        let sql = self.params.select_sql();
1251        if self.params.sql {
1252            return JsonValue::from(sql.clone());
1253        }
1254        let (state, data) = self.query(sql);
1255        match state {
1256            true => {
1257                if data.len() > 1 {
1258                    return data;
1259                }
1260                if self.params.fields.len() > 1 {
1261                    return data[0].clone();
1262                }
1263                data[0][field].clone()
1264            }
1265            false => JsonValue::from(0),
1266        }
1267    }
1268    fn avg(&mut self, field: &str) -> JsonValue {
1269        self.params.fields[field] = format!("avg({field}) as {field}").into();
1270        let sql = self.params.select_sql();
1271        if self.params.sql {
1272            return JsonValue::from(sql.clone());
1273        }
1274        let (state, data) = self.query(sql);
1275        if state {
1276            if data.len() > 1 {
1277                return data;
1278            }
1279            data[0][field].clone()
1280        } else {
1281            JsonValue::from(0)
1282        }
1283    }
1284    fn having(&mut self, expr: &str) -> &mut Self {
1285        self.params.having.push(expr.to_string());
1286        self
1287    }
1288    fn select(&mut self) -> JsonValue {
1289        let sql = self.params.select_sql();
1290        if self.params.sql {
1291            return JsonValue::from(sql.clone());
1292        }
1293        self.table_info(self.params.table.clone().as_str());
1294        let (state, mut data) = self.query(sql.clone());
1295        match state {
1296            true => {
1297                for (field, _) in self.params.json.entries() {
1298                    for item in data.members_mut() {
1299                        if !item[field].is_empty() {
1300                            let json = item[field].to_string();
1301                            item[field] = match json::parse(&json) {
1302                                Ok(e) => e,
1303                                Err(_) => JsonValue::from(json),
1304                            };
1305                        }
1306                    }
1307                }
1308                data.clone()
1309            }
1310            false => {
1311                error!("{data:?}");
1312                array![]
1313            }
1314        }
1315    }
1316    fn find(&mut self) -> JsonValue {
1317        self.params.page = 1;
1318        self.params.limit = 1;
1319        let sql = self.params.select_sql();
1320        if self.params.sql {
1321            return JsonValue::from(sql.clone());
1322        }
1323
1324        self.table_info(self.params.table.clone().as_str());
1325        let (state, mut data) = self.query(sql.clone());
1326        match state {
1327            true => {
1328                if data.is_empty() {
1329                    return object! {};
1330                }
1331                for (field, _) in self.params.json.entries() {
1332                    if !data[0][field].is_empty() {
1333                        let json = data[0][field].to_string();
1334                        let json = json::parse(&json).unwrap_or(array![]);
1335                        data[0][field] = json;
1336                    } else {
1337                        data[0][field] = array![];
1338                    }
1339                }
1340                data[0].clone()
1341            }
1342            false => {
1343                error!("{data:?}");
1344                object! {}
1345            }
1346        }
1347    }
1348
1349    fn value(&mut self, field: &str) -> JsonValue {
1350        self.params.fields = object! {};
1351        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1352        self.params.page = 1;
1353        self.params.limit = 1;
1354        let sql = self.params.select_sql();
1355        if self.params.sql {
1356            return JsonValue::from(sql.clone());
1357        }
1358        self.table_info(self.params.table.clone().as_str());
1359        let (state, mut data) = self.query(sql.clone());
1360        match state {
1361            true => {
1362                for (field, _) in self.params.json.entries() {
1363                    if !data[0][field].is_empty() {
1364                        let json = data[0][field].to_string();
1365                        let json = json::parse(&json).unwrap_or(array![]);
1366                        data[0][field] = json;
1367                    } else {
1368                        data[0][field] = array![];
1369                    }
1370                }
1371                data[0][field].clone()
1372            }
1373            false => {
1374                if self.connection.debug {
1375                    info!("{data:?}");
1376                }
1377                JsonValue::Null
1378            }
1379        }
1380    }
1381
1382    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1383        let mut fields = vec![];
1384        let mut values = vec![];
1385
1386        if !self.params.autoinc && data["id"].is_empty() {
1387            let thread_id = format!("{:?}", std::thread::current().id());
1388            let thread_num: u64 = thread_id
1389                .trim_start_matches("ThreadId(")
1390                .trim_end_matches(")")
1391                .parse()
1392                .unwrap_or(0);
1393            data["id"] = format!(
1394                "{:X}{:X}",
1395                Local::now().timestamp_nanos_opt().unwrap_or(0),
1396                thread_num
1397            )
1398            .into();
1399        }
1400        for (field, value) in data.entries() {
1401            fields.push(format!("`{field}`"));
1402
1403            if value.is_string() {
1404                if value.to_string().contains("'") {
1405                    values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1406                    continue;
1407                } else if value.to_string().contains('"') {
1408                    values.push(format!("'{value}'"));
1409                    continue;
1410                } else {
1411                    values.push(format!("\"{value}\""));
1412                    continue;
1413                }
1414            } else if value.is_array() || value.is_object() {
1415                if self.params.json[field].is_empty() {
1416                    values.push(format!("'{value}'"));
1417                } else {
1418                    let json = value.to_string();
1419                    let json = json.replace("'", "''");
1420                    values.push(format!("'{json}'"));
1421                }
1422                continue;
1423            } else if value.is_number() || value.is_boolean() || value.is_null() {
1424                values.push(format!("{value}"));
1425                continue;
1426            } else {
1427                values.push(format!("'{value}'"));
1428                continue;
1429            }
1430        }
1431        let fields = fields.join(",");
1432        let values = values.join(",");
1433
1434        let sql = format!(
1435            "INSERT INTO `{}` ({}) VALUES ({});",
1436            self.params.table, fields, values
1437        );
1438        if self.params.sql {
1439            return JsonValue::from(sql.clone());
1440        }
1441        let (state, ids) = self.execute(sql);
1442        match state {
1443            true => {
1444                if self.params.autoinc {
1445                    let (state, ids) =
1446                        self.query(format!("select max(id) as id from {}", self.params.table));
1447                    return match state {
1448                        true => ids[0]["id"].clone(),
1449                        false => {
1450                            error!("{ids}");
1451                            JsonValue::from("")
1452                        }
1453                    };
1454                }
1455                data["id"].clone()
1456            }
1457            false => {
1458                error!("{ids}");
1459                JsonValue::from("")
1460            }
1461        }
1462    }
1463    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1464        let mut fields = String::new();
1465
1466        if !self.params.autoinc && data[0]["id"].is_empty() {
1467            data[0]["id"] = "".into();
1468        }
1469        for (field, _) in data[0].entries() {
1470            fields = format!("{fields},`{field}`");
1471        }
1472        fields = fields.trim_start_matches(",").to_string();
1473
1474        let core_count = num_cpus::get();
1475        let mut p = pools::Pool::new(core_count * 4);
1476        let autoinc = self.params.autoinc;
1477        for list in data.members() {
1478            let mut item = list.clone();
1479            p.execute(move |pcindex| {
1480                if !autoinc && item["id"].is_empty() {
1481                    let id = format!(
1482                        "{:X}{:X}",
1483                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1484                        pcindex
1485                    );
1486                    item["id"] = id.into();
1487                }
1488                let mut values = "".to_string();
1489                for (_, value) in item.entries() {
1490                    if value.is_string() {
1491                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1492                    } else if value.is_number() || value.is_boolean() {
1493                        values = format!("{values},{value}");
1494                    } else {
1495                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1496                    }
1497                }
1498                values = format!("({})", values.trim_start_matches(","));
1499                array![item["id"].clone(), values]
1500            });
1501        }
1502        let (ids_list, mut values) = p.insert_all();
1503
1504        values = values.trim_start_matches(",").to_string();
1505
1506        let sql = format!(
1507            "INSERT INTO {} ({}) VALUES {};",
1508            self.params.table, fields, values
1509        );
1510        if self.params.sql {
1511            return JsonValue::from(sql.clone());
1512        }
1513        let (state, data) = self.execute(sql.clone());
1514        match state {
1515            true => {
1516                if self.params.autoinc {
1517                    let (state, ids) = self.query(format!(
1518                        "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1519                        self.params.table,
1520                        ids_list.len()
1521                    ));
1522                    return match state {
1523                        true => {
1524                            let mut idlist = array![];
1525                            for item in ids.members() {
1526                                let _ = idlist.push(item["id"].clone());
1527                            }
1528                            idlist
1529                        }
1530                        false => {
1531                            error!("批量添加失败: {ids:?} {sql}");
1532                            array![]
1533                        }
1534                    };
1535                }
1536                JsonValue::from(ids_list)
1537            }
1538            false => {
1539                error!("批量添加失败: {data:?} {sql}");
1540                array![]
1541            }
1542        }
1543    }
1544
1545    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1546        let mut fields = vec![];
1547        let mut values = vec![];
1548
1549        if !self.params.autoinc && data["id"].is_empty() {
1550            let thread_id = format!("{:?}", std::thread::current().id());
1551            let thread_num: u64 = thread_id
1552                .trim_start_matches("ThreadId(")
1553                .trim_end_matches(")")
1554                .parse()
1555                .unwrap_or(0);
1556            data["id"] = format!(
1557                "{:X}{:X}",
1558                Local::now().timestamp_nanos_opt().unwrap_or(0),
1559                thread_num
1560            )
1561            .into();
1562        }
1563        for (field, value) in data.entries() {
1564            fields.push(format!("`{field}`"));
1565
1566            if value.is_string() {
1567                if value.to_string().contains("'") {
1568                    values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1569                    continue;
1570                } else if value.to_string().contains('"') {
1571                    values.push(format!("'{value}'"));
1572                    continue;
1573                } else {
1574                    values.push(format!("\"{value}\""));
1575                    continue;
1576                }
1577            } else if value.is_array() || value.is_object() {
1578                if self.params.json[field].is_empty() {
1579                    values.push(format!("'{value}'"));
1580                } else {
1581                    let json = value.to_string();
1582                    let json = json.replace("'", "''");
1583                    values.push(format!("'{json}'"));
1584                }
1585                continue;
1586            } else if value.is_number() || value.is_boolean() || value.is_null() {
1587                values.push(format!("{value}"));
1588                continue;
1589            } else {
1590                values.push(format!("'{value}'"));
1591                continue;
1592            }
1593        }
1594
1595        let conflict_cols: Vec<String> =
1596            conflict_fields.iter().map(|f| format!("`{}`", f)).collect();
1597
1598        let update_set: Vec<String> = fields
1599            .iter()
1600            .filter(|f| {
1601                let name = f.trim_matches('`');
1602                !conflict_fields.contains(&name) && name != "id"
1603            })
1604            .map(|f| format!("{f}=excluded.{f}"))
1605            .collect();
1606
1607        let fields_str = fields.join(",");
1608        let values_str = values.join(",");
1609
1610        let sql = format!(
1611            "INSERT INTO `{}` ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1612            self.params.table,
1613            fields_str,
1614            values_str,
1615            conflict_cols.join(","),
1616            update_set.join(",")
1617        );
1618        if self.params.sql {
1619            return JsonValue::from(sql.clone());
1620        }
1621        let (state, result) = self.execute(sql);
1622        match state {
1623            true => {
1624                if self.params.autoinc {
1625                    let (state, ids) =
1626                        self.query(format!("select max(id) as id from {}", self.params.table));
1627                    return match state {
1628                        true => ids[0]["id"].clone(),
1629                        false => {
1630                            error!("{ids}");
1631                            JsonValue::from("")
1632                        }
1633                    };
1634                }
1635                data["id"].clone()
1636            }
1637            false => {
1638                error!("upsert失败: {result}");
1639                JsonValue::from("")
1640            }
1641        }
1642    }
1643
1644    fn update(&mut self, data: JsonValue) -> JsonValue {
1645        let mut values = vec![];
1646
1647        for (field, value) in data.entries() {
1648            if value.is_string() {
1649                values.push(format!(
1650                    "`{}` = '{}'",
1651                    field,
1652                    value.to_string().replace("'", "''")
1653                ));
1654            } else if value.is_array() || value.is_object() {
1655                if self.params.json[field].is_empty() {
1656                    values.push(format!("`{field}` = '{value}'"));
1657                } else {
1658                    let json = value.to_string();
1659                    let json = json.replace("'", "''");
1660                    values.push(format!("`{field}` = '{json}'"));
1661                }
1662                continue;
1663            } else if value.is_number() || value.is_boolean() || value.is_null() {
1664                values.push(format!("`{field}` = {value} "));
1665            } else {
1666                values.push(format!("`{field}` = '{value}' "));
1667            }
1668        }
1669
1670        for (field, value) in self.params.inc_dec.entries() {
1671            values.push(format!("{} = {}", field, value.to_string().clone()));
1672        }
1673
1674        let values = values.join(",");
1675        let sql = format!(
1676            "UPDATE `{}` SET {} {} {};",
1677            self.params.table.clone(),
1678            values,
1679            self.params.where_sql(),
1680            self.params.page_limit_sql()
1681        );
1682        if self.params.sql {
1683            return JsonValue::from(sql.clone());
1684        }
1685        let (state, data) = self.execute(sql);
1686        if state {
1687            data
1688        } else {
1689            error!("{data}");
1690            JsonValue::from(0)
1691        }
1692    }
1693
1694    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1695        let mut values = vec![];
1696        let mut ids = vec![];
1697        for (field, _) in data[0].entries() {
1698            if field == "id" {
1699                continue;
1700            }
1701            let mut fields = vec![];
1702            for row in data.members() {
1703                let value = row[field].clone();
1704                let id = row["id"].clone();
1705                ids.push(id.clone());
1706                if value.is_string() {
1707                    fields.push(format!(
1708                        "WHEN '{}' THEN '{}'",
1709                        id,
1710                        value.to_string().replace("'", "''")
1711                    ));
1712                } else if value.is_array() || value.is_object() {
1713                    if self.params.json[field].is_empty() {
1714                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1715                    } else {
1716                        let json = value.to_string();
1717                        let json = json.replace("'", "''");
1718                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1719                    }
1720                    continue;
1721                } else if value.is_number() || value.is_boolean() || value.is_null() {
1722                    fields.push(format!("WHEN '{id}' THEN {value}"));
1723                } else {
1724                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1725                }
1726            }
1727            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1728        }
1729
1730        self.where_and("id", "in", ids.into());
1731        for (field, value) in self.params.inc_dec.entries() {
1732            values.push(format!("{} = {}", field, value.to_string().clone()));
1733        }
1734
1735        let values = values.join(",");
1736        let sql = format!(
1737            "UPDATE {} SET {} {} {};",
1738            self.params.table.clone(),
1739            values,
1740            self.params.where_sql(),
1741            self.params.page_limit_sql()
1742        );
1743        if self.params.sql {
1744            return JsonValue::from(sql.clone());
1745        }
1746        let (state, data) = self.execute(sql);
1747        if state {
1748            data
1749        } else {
1750            error!("{data:?}");
1751            JsonValue::from(0)
1752        }
1753    }
1754    fn delete(&mut self) -> JsonValue {
1755        let sql = format!(
1756            "delete FROM `{}` {};",
1757            self.params.table.clone(),
1758            self.params.where_sql()
1759        );
1760        if self.params.sql {
1761            return JsonValue::from(sql.clone());
1762        }
1763        let (state, data) = self.execute(sql);
1764        match state {
1765            true => data,
1766            false => {
1767                error!("delete 失败>>>{data:?}");
1768                JsonValue::from(0)
1769            }
1770        }
1771    }
1772
1773    fn transaction(&mut self) -> bool {
1774        let thread_id = format!("{:?}", thread::current().id());
1775
1776        if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1777            let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1778            SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
1779            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1780            let _ = self.query(sp);
1781            return true;
1782        }
1783
1784        if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
1785            error!("{thread_id} 启动事务失败: 获取写锁超时");
1786            return false;
1787        }
1788
1789        let flags = OpenFlags::new().with_read_write().with_no_mutex();
1790        let db = match Connect::open_thread_safe_with_flags(
1791            self.connection.clone().get_dsn().as_str(),
1792            flags,
1793        ) {
1794            Ok(e) => Arc::new(e),
1795            Err(e) => {
1796                error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
1797                SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
1798                return false;
1799            }
1800        };
1801
1802        SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
1803
1804        let (state, data) = self.query("BEGIN".to_string());
1805        if state {
1806            true
1807        } else {
1808            error!("{thread_id} 启动事务失败: {data}");
1809            SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1810            false
1811        }
1812    }
1813
1814    fn commit(&mut self) -> bool {
1815        let thread_id = format!("{:?}", thread::current().id());
1816
1817        if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1818            error!("{thread_id} 提交事务失败: 没有活跃的事务");
1819            return false;
1820        }
1821
1822        let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1823        if depth > 1 {
1824            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1825            let _ = self.query(sp);
1826            SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1827            return true;
1828        }
1829
1830        let (state, _) = self.query("COMMIT".to_string());
1831        SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1832
1833        if state {
1834            true
1835        } else {
1836            error!("{thread_id} 提交事务失败");
1837            false
1838        }
1839    }
1840
1841    fn rollback(&mut self) -> bool {
1842        let thread_id = format!("{:?}", thread::current().id());
1843
1844        if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1845            error!("{thread_id} 回滚失败: 没有活跃的事务");
1846            return false;
1847        }
1848
1849        let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1850        if depth > 1 {
1851            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1852            let _ = self.query(sp);
1853            SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1854            return true;
1855        }
1856
1857        let (state, _) = self.query("ROLLBACK".to_string());
1858        SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1859
1860        if state {
1861            true
1862        } else {
1863            error!("回滚失败: {thread_id}");
1864            false
1865        }
1866    }
1867
1868    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1869        let (state, data) = self.query(sql.to_string());
1870        match state {
1871            true => Ok(data),
1872            false => Err(data.to_string()),
1873        }
1874    }
1875
1876    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1877        let (state, data) = self.execute(sql.to_string());
1878        match state {
1879            true => Ok(data),
1880            false => Err(data.to_string()),
1881        }
1882    }
1883
1884    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1885        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1886        self
1887    }
1888
1889    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1890        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1891        self
1892    }
1893
1894    fn buildsql(&mut self) -> String {
1895        self.fetch_sql();
1896        let sql = self.select().to_string();
1897        format!("( {} ) `{}`", sql, self.params.table)
1898    }
1899
1900    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1901        for field in fields {
1902            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1903        }
1904        self
1905    }
1906
1907    fn join(
1908        &mut self,
1909        main_table: &str,
1910        main_fields: &str,
1911        right_table: &str,
1912        right_fields: &str,
1913    ) -> &mut Self {
1914        let main_table = if main_table.is_empty() {
1915            self.params.table.clone()
1916        } else {
1917            main_table.to_string()
1918        };
1919        self.params.join_table = right_table.to_string();
1920        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1921        self
1922    }
1923
1924    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1925        let main_fields = if main_fields.is_empty() {
1926            "id"
1927        } else {
1928            main_fields
1929        };
1930        let second_fields = if second_fields.is_empty() {
1931            self.params.table.clone()
1932        } else {
1933            second_fields.to_string().clone()
1934        };
1935        let sec_table_name = format!("{}{}", table, "_2");
1936        let second_table = format!("{} {}", table, sec_table_name.clone());
1937        self.params.join_table = sec_table_name.clone();
1938        self.params.join.push(format!(
1939            " INNER JOIN {} ON {}.{} = {}.{}",
1940            second_table, self.params.table, main_fields, sec_table_name, second_fields
1941        ));
1942        self
1943    }
1944
1945    fn join_right(
1946        &mut self,
1947        main_table: &str,
1948        main_fields: &str,
1949        right_table: &str,
1950        right_fields: &str,
1951    ) -> &mut Self {
1952        let main_table = if main_table.is_empty() {
1953            self.params.table.clone()
1954        } else {
1955            main_table.to_string()
1956        };
1957        self.params.join_table = right_table.to_string();
1958        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1959        self
1960    }
1961
1962    fn join_full(
1963        &mut self,
1964        main_table: &str,
1965        main_fields: &str,
1966        right_table: &str,
1967        right_fields: &str,
1968    ) -> &mut Self {
1969        let main_table = if main_table.is_empty() {
1970            self.params.table.clone()
1971        } else {
1972            main_table.to_string()
1973        };
1974        self.params.join_table = right_table.to_string();
1975        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1976        self
1977    }
1978
1979    fn union(&mut self, sub_sql: &str) -> &mut Self {
1980        self.params.unions.push(format!("UNION {sub_sql}"));
1981        self
1982    }
1983
1984    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1985        self.params.unions.push(format!("UNION ALL {sub_sql}"));
1986        self
1987    }
1988
1989    fn lock_for_update(&mut self) -> &mut Self {
1990        self.params.lock_mode = "FOR UPDATE".to_string();
1991        self
1992    }
1993
1994    fn lock_for_share(&mut self) -> &mut Self {
1995        self.params.lock_mode = "FOR SHARE".to_string();
1996        self
1997    }
1998}