1use crate::pools;
2use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::Connection;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15
16lazy_static! {
17 static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
18 static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
19 static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
20}
21
22#[derive(Clone, Debug)]
23pub struct Sqlite {
24 pub connection: Connection,
26 pub default: String,
28
29 pub params: Params,
30}
31
32impl Sqlite {
33 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
34 let dsn = connection.clone().get_dsn();
35 let db_path = Path::new(&dsn);
36 if let Some(parent) = db_path.parent() {
37 if !parent.as_os_str().is_empty() && !parent.exists() {
38 if let Err(e) = std::fs::create_dir_all(parent) {
39 error!("sqlite 创建目录失败: {} {:?}", e, parent);
40 return Err(e.to_string());
41 }
42 info!("sqlite 自动创建目录: {:?}", parent);
43 }
44 }
45
46 let flags = OpenFlags::new().with_create().with_read_write();
47 match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
48 Ok(e) => {
49 if let Ok(mut guard) = DBS.lock() {
50 guard.insert(default.clone(), Arc::new(e));
51 } else {
52 error!("sqlite 获取数据库锁失败");
53 return Err("获取数据库锁失败".to_string());
54 }
55 Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("sqlite"),
59 })
60 }
61 Err(e) => {
62 error!("sqlite 启动失败: {} {}", e, dsn.as_str());
63 Err(e.to_string())
64 }
65 }
66 }
67 fn query_handle_static(
69 mut statement: Statement,
70 sql: &str,
71 table: &str,
72 thread_id: &str,
73 ) -> (bool, JsonValue) {
74 let mut data = array![];
75 while let State::Row = match statement.next() {
76 Ok(e) => e,
77 Err(e) => {
78 let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
79 if in_transaction {
80 error!("{} 查询事务: {} {}", thread_id, e, sql);
81 } else {
82 error!("{} 非事务查询: {} {}", thread_id, e, sql);
83 }
84 return (false, data);
85 }
86 } {
87 let mut list = object! {};
88 let mut index = 0;
89 for field in statement.column_names().iter() {
90 if !list[field.as_str()].is_null() {
91 index += 1;
92 continue;
93 }
94 match statement.column_type(field.as_str()) {
95 Ok(types) => match types {
96 Type::String => {
97 if let Ok(data) = statement.read::<String, _>(index) {
98 match data.as_str() {
99 "false" => {
100 list[field.as_str()] = JsonValue::from(false);
101 }
102 "true" => {
103 list[field.as_str()] = JsonValue::from(true);
104 }
105 _ => {
106 list[field.as_str()] = JsonValue::from(data.clone());
107 }
108 }
109 }
110 }
111 Type::Integer => {
112 let fields_cache =
113 FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
114 if let Some(fields) = fields_cache {
115 if fields[field.clone()].is_empty() {
116 if let Ok(data) = statement.read::<i64, _>(index) {
117 list[field.as_str()] = data.into();
118 }
119 index += 1;
120 continue;
121 }
122 let field_type =
123 fields[field.clone()]["type"].as_str().unwrap_or("");
124 match field_type {
125 "INTEGER" => {
126 if let Ok(data) = statement.read::<i64, _>(index) {
127 list[field.as_str()] = JsonValue::from(data == 1);
128 }
129 }
130 x if x.contains("int(") => {
131 if let Ok(data) = statement.read::<i64, _>(index) {
132 list[field.as_str()] = data.into();
133 }
134 }
135 x if x.contains("decimal(") && x.ends_with(",0)") => {
136 if let Ok(data) = statement.read::<f64, _>(index) {
137 list[field.as_str()] = data.into();
138 }
139 }
140 _ => {
141 if let Ok(data) = statement.read::<i64, _>(index) {
142 list[field.as_str()] = data.into();
143 }
144 }
145 }
146 } else if let Ok(data) = statement.read::<i64, _>(index) {
147 list[field.as_str()] = JsonValue::from(data);
148 }
149 }
150 Type::Float => {
151 if let Ok(data) = statement.read::<f64, _>(index) {
152 list[field.as_str()] = JsonValue::from(data);
153 }
154 }
155 Type::Binary => {
156 if let Ok(data) = statement.read::<String, _>(index) {
157 list[field.as_str()] = JsonValue::from(data.clone());
158 }
159 }
160 Type::Null => match statement.read::<String, _>(index) {
161 Ok(data) => {
162 list[field.as_str()] = JsonValue::from(data.clone());
163 }
164 Err(_) => match statement.read::<f64, _>(index) {
165 Ok(data) => {
166 if data == 0.0 {
167 list[field.as_str()] = JsonValue::from("");
168 } else {
169 list[field.as_str()] = JsonValue::from(data);
170 }
171 }
172 Err(_) => match statement.read::<i64, _>(index) {
173 Ok(data) => {
174 if data == 0 {
175 list[field.as_str()] = JsonValue::from("");
176 } else {
177 list[field.as_str()] = JsonValue::from(data);
178 }
179 }
180 Err(e) => {
181 error!("Type:{} {:?}", field.as_str(), e);
182 }
183 },
184 },
185 },
186 },
187 Err(e) => {
188 error!("query Err: {e:?}");
189 }
190 }
191 index += 1;
192 }
193 let _ = data.push(list);
194 }
195 (true, data)
196 }
197
198 pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
199 let thread_id = format!("{:?}", thread::current().id());
200 let table = self.params.table.clone();
201
202 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
203 if self.connection.debug {
204 info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
205 }
206
207 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
208 match db.prepare(sql.clone()) {
209 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
210 Err(e) => {
211 error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
212 (false, e.to_string().into())
213 }
214 }
215 });
216
217 match result {
218 Some(r) => r,
219 None => {
220 error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
221 (false, JsonValue::from("未找到事务连接"))
222 }
223 }
224 } else {
225 if self.connection.debug {
226 info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
227 }
228 let dbs = match DBS.lock() {
229 Ok(dbs) => dbs,
230 Err(e) => {
231 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
232 return (false, JsonValue::from("数据库锁定失败"));
233 }
234 };
235 let db = match dbs.get(&self.default) {
236 Some(db) => db.clone(),
237 None => {
238 error!(
239 "{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
240 self.default
241 );
242 return (false, JsonValue::from("未找到默认数据库配置"));
243 }
244 };
245 drop(dbs);
246 let result = match db.prepare(sql.clone()) {
247 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
248 Err(e) => {
249 error!("{thread_id} 查询非事务: Err: {e}");
250 (false, e.to_string().into())
251 }
252 };
253 result
254 }
255 }
256
257 pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
258 let thread_id = format!("{:?}", thread::current().id());
259
260 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
261 if self.connection.debug {
262 info!("{} 执行事务: sql: {}", thread_id, sql.clone());
263 }
264
265 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
266 match db.execute(sql.clone()) {
267 Ok(_) => {
268 let count = db.change_count();
269 (true, JsonValue::from(count))
270 }
271 Err(e) => (false, JsonValue::from(e.to_string())),
272 }
273 });
274
275 match result {
276 Some((true, count)) => {
277 if self.connection.debug {
278 info!("{} count:{}", thread_id, count);
279 }
280 (true, count)
281 }
282 Some((false, err)) => {
283 error!(
284 "{} 执行事务: \r\nErr: {}\r\n{}",
285 thread_id,
286 err,
287 sql.clone()
288 );
289 (false, err)
290 }
291 None => {
292 error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
293 (false, JsonValue::from("未找到事务连接"))
294 }
295 }
296 } else {
297 if self.connection.debug {
298 info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
299 }
300 let dbs = match DBS.lock() {
301 Ok(dbs) => dbs,
302 Err(e) => {
303 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
304 return (false, JsonValue::from("数据库锁定失败"));
305 }
306 };
307
308 let db = match dbs.get(&self.default) {
309 Some(db) => db.clone(),
310 None => {
311 error!(
312 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
313 thread_id, self.default, sql
314 );
315 return (false, JsonValue::from("未找到默认数据库配置"));
316 }
317 };
318 drop(dbs);
319 match db.execute(sql.clone()) {
320 Ok(_) => {
321 let count = db.change_count();
322 if self.connection.debug {
323 info!(
324 "{} count: {} total_count: {}",
325 thread_id,
326 count,
327 db.total_change_count()
328 );
329 }
330 (true, JsonValue::from(count))
331 }
332 Err(e) => {
333 error!(
334 "{} 执行非事务: Err: {}\r\nSQL: {}",
335 thread_id,
336 e,
337 sql.clone()
338 );
339 (false, JsonValue::from(e.to_string()))
340 }
341 }
342 }
343 }
344}
345
346impl DbMode for Sqlite {
347 fn database_tables(&mut self) -> JsonValue {
348 let sql = "select name from sqlite_master where type='table' order by name;".to_string();
349 match self.sql(sql.as_str()) {
350 Ok(e) => {
351 let mut list = vec![];
352 for item in e.members() {
353 list.push(item["name"].clone());
354 }
355 list.into()
356 }
357 Err(_) => {
358 array![]
359 }
360 }
361 }
362 fn database_create(&mut self, name: &str) -> bool {
363 let current_dsn = self.connection.clone().get_dsn();
364 let current_path = Path::new(¤t_dsn);
365
366 let new_path = if let Some(parent) = current_path.parent() {
367 if parent.as_os_str().is_empty() {
368 Path::new(name).to_path_buf()
369 } else {
370 parent.join(name)
371 }
372 } else {
373 Path::new(name).to_path_buf()
374 };
375
376 if let Some(parent) = new_path.parent() {
377 if !parent.as_os_str().is_empty() && !parent.exists() {
378 if let Err(e) = std::fs::create_dir_all(parent) {
379 error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
380 return false;
381 }
382 }
383 }
384
385 let flags = OpenFlags::new().with_create().with_read_write();
386 match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
387 Ok(_) => true,
388 Err(e) => {
389 error!(
390 "sqlite database_create 创建数据库失败: {} {:?}",
391 e, new_path
392 );
393 false
394 }
395 }
396 }
397}
398
399impl Mode for Sqlite {
400 fn table_create(&mut self, options: TableOptions) -> JsonValue {
401 let mut sql = String::new();
402 let mut unique_fields = String::new();
403 let mut unique_name = String::new();
404 let mut unique = String::new();
405 for item in options.table_unique.iter() {
406 if unique_fields.is_empty() {
407 unique_fields = format!("`{item}`");
408 unique_name = format!("unique_{item}");
409 } else {
410 unique_fields = format!("{unique_fields},`{item}`");
411 unique_name = format!("{unique_name}_{item}");
412 }
413 unique = format!(
414 "CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
415 unique_name, options.table_name, unique_fields
416 );
417 }
418
419 let mut index = vec![];
420 for row in options.table_index.iter() {
421 let mut index_fields = String::new();
422 let mut index_name = String::new();
423 for item in row.iter() {
424 if index_fields.is_empty() {
425 index_fields = format!("`{item}`");
426 index_name = format!("index_{item}");
427 } else {
428 index_fields = format!("{index_fields},`{item}`");
429 index_name = format!("{index_name}_{item}");
430 }
431 }
432 index.push(format!(
433 "CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
434 index_name, options.table_name, index_fields
435 ));
436 }
437
438 for (name, field) in options.table_fields.entries() {
439 let row = br_fields::field("sqlite", name, field.clone());
440 sql = format!("{sql} {row},\r\n");
441 }
442
443 sql = sql.trim_end_matches(",\r\n").to_string();
444
445 let sql = format!(
446 "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
447 options.table_name, sql
448 );
449 if self.params.sql {
450 let mut list = vec![sql];
451 if !unique.is_empty() {
452 list.push(unique)
453 }
454 if !index.is_empty() {
455 list.extend(index)
456 }
457
458 return JsonValue::from(list.join(""));
459 }
460
461 let thread_id = format!("{:?}", thread::current().id());
462
463 let (_, table_exists) = self.query(format!(
464 "SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
465 options.table_name
466 ));
467 let is_new_table = table_exists.is_empty();
468
469 let (state, _) = self.execute(sql.clone());
470 if state {
471 if is_new_table {
472 if !unique.is_empty() {
473 let (state, _) = self.execute(unique.clone());
474 info!(
475 "{} {} 唯一索引创建:{}",
476 thread_id, options.table_name, state
477 );
478 }
479 for sql in index.iter() {
480 let (state, _) = self.execute(sql.clone());
481 info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
482 }
483 }
484 JsonValue::from(true)
485 } else {
486 JsonValue::from(false)
487 }
488 }
489 fn table_update(&mut self, options: TableOptions) -> JsonValue {
490 let thread_id = format!("{:?}", thread::current().id());
491
492 let mut sql = String::new();
493 let mut add = vec![];
494 let mut del = vec![];
495 let mut put = vec![];
496
497 let (_, mut fields_list) =
498 self.query(format!("pragma table_info ('{}')", options.table_name));
499 let mut field_old = object! {};
500 for item in fields_list.members_mut() {
501 item["dflt_value"] = item["dflt_value"]
502 .to_string()
503 .trim_start_matches("'")
504 .trim_end_matches("'")
505 .into();
506 if let Some(name) = item["name"].as_str() {
507 field_old[name] = item.clone();
508 if options.table_fields[name].is_empty() {
509 del.push(name.to_string());
510 }
511 }
512 }
513
514 let mut fields_list = vec![];
515 let mut fields_list_new = vec![];
516
517 for (name, field) in options.table_fields.entries() {
518 if field_old[name].is_empty() {
519 add.push(name.to_string());
520 } else {
521 fields_list.push(name.to_string());
522 fields_list_new.push(name.to_string());
523 let field_mode = field["mode"].as_str().unwrap_or("");
524 let old_value = match field_mode {
525 "select" => {
526 if field_old[name]["dflt_value"].clone().is_empty() {
527 "".to_string()
528 } else {
529 field_old[name]["dflt_value"].clone().to_string()
530 }
531 }
532 "switch" => (field_old[name]["dflt_value"]
533 .to_string()
534 .parse::<i32>()
535 .unwrap_or(0)
536 == 1)
537 .to_string(),
538 _ => field_old[name]["dflt_value"].clone().to_string(),
539 };
540 let new_value = match field_mode {
541 "select" => field["def"]
542 .members()
543 .map(|x| x.to_string())
544 .collect::<Vec<String>>()
545 .join(","),
546 _ => field["def"].clone().to_string(),
547 };
548 if old_value != new_value {
549 info!(
550 "{} 差异化当前: {} old_value: {} new_value: {} {}",
551 options.table_name,
552 name,
553 old_value,
554 new_value,
555 old_value != new_value
556 );
557 info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
558 put.push(name.to_string());
559 } else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
560 && name != options.table_key
561 {
562 info!("{} 主键替换: {}", options.table_name, name);
563 put.push(name.to_string());
564 }
565 }
566 }
567
568 let mut unique_fields = String::new();
569 let mut unique_name = String::new();
570 let mut unique = String::new();
571
572 for item in options.table_unique.iter() {
574 if unique_fields.is_empty() {
575 unique_fields = format!("`{item}`");
576 unique_name = format!("unique_{item}");
577 } else {
578 unique_fields = format!("{unique_fields},`{item}`");
579 unique_name = format!("{unique_name}_{item}");
580 }
581 unique = format!(
582 "CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
583 options.table_name, unique_name, options.table_name, unique_fields
584 )
585 }
586
587 let mut index = vec![];
589 for row in options.table_index.iter() {
590 let mut index_fields = String::new();
591 let mut index_name = String::new();
592 for item in row.iter() {
593 if index_fields.is_empty() {
594 index_fields = item.to_string();
595 index_name = format!("index_{item}");
596 } else {
597 index_fields = format!("{index_fields},{item}");
598 index_name = format!("{index_name}_{item}");
599 }
600 }
601 index.push(format!(
602 "CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
603 options.table_name, index_name, options.table_name, index_fields
604 ));
605 }
606 for (name, field) in options.table_fields.entries() {
607 let row = br_fields::field("sqlite", name, field.clone());
608 sql = format!("{sql} {row},\r\n");
609 }
610
611 if !unique.is_empty() || !index.is_empty() {
612 let unique_text = unique.clone();
613 let (_, unique_old) =
614 self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
615 let mut index_old_list = vec![];
616 let mut index_new_list = vec![];
617 for item in unique_old.members() {
618 let origin = item["origin"].as_str().unwrap_or("");
619 let unique_1 = item["unique"].as_usize().unwrap_or(0);
620 let name = item["name"].as_str().unwrap_or("");
621
622 if origin == "c" && unique_1 == 1 {
623 if unique.contains(format!(" {name} ").as_str()) {
624 unique = "".to_string();
625 }
626 continue;
627 }
628 if origin == "c" && unique_1 == 0 {
629 index_old_list.push(item);
630 for item in index.iter() {
631 if item.contains(format!(" {name} ").as_str()) {
632 index_new_list.push(item.clone());
633 }
634 }
635 continue;
636 }
637 }
638 if unique.is_empty() {
639 if index_old_list.len() == index.len()
640 && index_old_list.len() == index_new_list.len()
641 {
642 index = vec![];
643 } else {
644 unique = unique_text;
645 }
646 }
647 }
648
649 sql = sql.trim_end_matches(",\r\n").to_string();
650 sql = format!(
651 "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
652 options.table_name, sql
653 );
654
655 let sqls = format!(
656 "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
657 options.table_name,
658 fields_list_new.join("`,`"),
659 fields_list.join("`,`")
660 );
661 let drop_sql = format!("drop table {};\r\n", options.table_name);
662 let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
663 let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
664
665 if self.params.sql {
666 let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
667 if !unique.is_empty() {
668 list.push(unique)
669 }
670 if !index.is_empty() {
671 list.extend(index)
672 }
673 return JsonValue::from(list.join(""));
674 }
675
676 if add.is_empty()
677 && del.is_empty()
678 && unique.is_empty()
679 && index.is_empty()
680 && put.is_empty()
681 {
682 return JsonValue::from(-1);
683 }
684
685 let (state, _) = self.execute(sql.clone());
686 let data = match state {
687 true => {
688 let (state, _) = self.execute(sqls.clone());
689 match state {
690 true => {
691 let (state, _) = self.execute(drop_sql);
692 match state {
693 true => {
694 let (state, _) = self.execute(alter_sql);
695 match state {
696 true => {
697 if !unique.is_empty() {
698 let (state, _) = self.execute(unique.clone());
699 info!(
700 "{} {} 唯一索引创建:{}",
701 thread_id, options.table_name, state
702 );
703 }
704 for index_sql in index.iter() {
705 let (state, _) = self.execute(index_sql.clone());
706 match state {
707 true => {}
708 false => {
709 error!(
710 "{} 索引创建失败: {} {}",
711 options.table_name, state, index_sql
712 );
713 return JsonValue::from(0);
714 }
715 };
716 }
717
718 return JsonValue::from(1);
719 }
720 false => {
721 error!("{} 修改表名失败", options.table_name);
722 return JsonValue::from(0);
723 }
724 }
725 }
726 false => {
727 error!("{} 删除本表失败", options.table_name);
728 return JsonValue::from(0);
729 }
730 }
731 }
732 false => {
733 error!(
734 "{} 添加tmp表记录失败 {:#} {:#}",
735 options.table_name, sql, sqls
736 );
737 let sql = format!("drop table {}_tmp", options.table_name);
738 let (_, _) = self.execute(sql);
739 0
740 }
741 }
742 }
743 false => {
744 error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
745 let (_, _) = self.execute(drop_sql_temp);
746 0
747 }
748 };
749 JsonValue::from(data)
750 }
751
752 fn table_info(&mut self, table: &str) -> JsonValue {
753 if let Ok(guard) = FIELDS.lock() {
754 if let Some(cached) = guard.get(table) {
755 return cached.clone();
756 }
757 }
758 let sql = format!("PRAGMA table_info({table})");
759 let (state, data) = self.query(sql);
760
761 match state {
762 true => {
763 let mut fields = object! {};
764 for item in data.members() {
765 if let Some(name) = item["name"].as_str() {
766 fields[name] = item.clone();
767 }
768 }
769 if let Ok(mut guard) = FIELDS.lock() {
770 guard.insert(table.to_string(), fields.clone());
771 }
772 data
773 }
774 false => object! {},
775 }
776 }
777
778 fn table_is_exist(&mut self, name: &str) -> bool {
779 let sql = format!(
780 "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
781 );
782 let (state, data) = self.query(sql);
783 if state && !data.is_empty() {
784 let count = data[0]["count"].as_i64().unwrap_or(0);
785 return count > 0;
786 }
787 false
788 }
789
790 fn table(&mut self, name: &str) -> &mut Sqlite {
791 self.params = Params::default(self.connection.mode.str().as_str());
792 let table_name = format!("{}{}", self.connection.prefix, name);
793 if !super::sql_safety::validate_table_name(&table_name) {
794 error!("Invalid table name: {}", name);
795 }
796 self.params.table = table_name.clone();
797 self.params.join_table = table_name;
798 self
799 }
800 fn change_table(&mut self, name: &str) -> &mut Self {
801 self.params.join_table = name.to_string();
802 self
803 }
804 fn autoinc(&mut self) -> &mut Self {
805 self.params.autoinc = true;
806 self
807 }
808
809 fn fetch_sql(&mut self) -> &mut Self {
810 self.params.sql = true;
811 self
812 }
813
814 fn order(&mut self, field: &str, by: bool) -> &mut Self {
815 self.params.order[field] = {
816 if by {
817 "DESC"
818 } else {
819 "ASC"
820 }
821 }
822 .into();
823 self
824 }
825
826 fn group(&mut self, field: &str) -> &mut Self {
827 let fields: Vec<&str> = field.split(",").collect();
828 for field in fields.iter() {
829 let fields = field.to_string();
830 self.params.group[fields.as_str()] = fields.clone().into();
831 self.params.fields[fields.as_str()] = fields.clone().into();
832 }
833 self
834 }
835
836 fn distinct(&mut self) -> &mut Self {
837 self.params.distinct = true;
838 self
839 }
840 fn json(&mut self, field: &str) -> &mut Self {
841 let list: Vec<&str> = field.split(",").collect();
842 for item in list.iter() {
843 self.params.json[item.to_string().as_str()] = item.to_string().into();
844 }
845 self
846 }
847
848 fn location(&mut self, field: &str) -> &mut Self {
849 let list: Vec<&str> = field.split(",").collect();
850 for item in list.iter() {
851 self.params.location[item.to_string().as_str()] = item.to_string().into();
852 }
853 self
854 }
855
856 fn field(&mut self, field: &str) -> &mut Self {
857 let list: Vec<&str> = field.split(",").collect();
858 let join_table = if self.params.join_table.is_empty() {
859 self.params.table.clone()
860 } else {
861 self.params.join_table.clone()
862 };
863 for item in list.iter() {
864 let item = item.to_string();
865 if item.contains(" as ") {
866 let text = item.split(" as ").collect::<Vec<&str>>().clone();
867 self.params.fields[item] =
868 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
869 } else {
870 self.params.fields[item] = format!("{join_table}.`{item}`").into();
871 }
872 }
873 self
874 }
875 fn hidden(&mut self, name: &str) -> &mut Self {
876 let hidden: Vec<&str> = name.split(",").collect();
877 let sql = format!("PRAGMA table_info({})", self.params.table);
878 let (_, data) = self.query(sql);
879 for item in data.members() {
880 if let Some(name) = item["name"].as_str() {
881 if !hidden.contains(&name) {
882 self.params.fields[name] = name.into();
883 }
884 }
885 }
886 self
887 }
888
889 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
890 let join_table = if self.params.join_table.is_empty() {
891 self.params.table.clone()
892 } else {
893 self.params.join_table.clone()
894 };
895
896 if value.is_boolean() {
897 if value.as_bool().unwrap_or(false) {
898 value = 1.into();
899 } else {
900 value = 0.into();
901 }
902 }
903
904 match compare {
905 "between" => {
906 self.params.where_and.push(format!(
907 "{}.`{}` between '{}' AND '{}'",
908 join_table, field, value[0], value[1]
909 ));
910 }
911 "set" => {
912 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
913 let mut wheredata = vec![];
914 for item in list.iter() {
915 wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
916 }
917 self.params
918 .where_and
919 .push(format!("({})", wheredata.join(" or ")));
920 }
921 "notin" => {
922 let mut text = String::new();
923 for item in value.members() {
924 text = format!("{text},'{item}'");
925 }
926 text = text.trim_start_matches(",").into();
927 self.params
928 .where_and
929 .push(format!("{join_table}.`{field}` not in ({text})"));
930 }
931 "in" => {
932 let mut text = String::new();
933 if value.is_array() {
934 for item in value.members() {
935 text = format!("{text},'{item}'");
936 }
937 } else {
938 let value = value.to_string();
939 let value: Vec<&str> = value.split(",").collect();
940 for item in value.iter() {
941 text = format!("{text},'{item}'");
942 }
943 }
944 text = text.trim_start_matches(",").into();
945
946 self.params
947 .where_and
948 .push(format!("{join_table}.`{field}` {compare} ({text})"));
949 }
950 "=" => {
951 if value.is_null() {
952 self.params
953 .where_and
954 .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
955 } else {
956 self.params
957 .where_and
958 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
959 }
960 }
961 _ => {
962 if value.is_null() {
963 self.params
964 .where_and
965 .push(format!("{join_table}.`{field}` {compare} {value}"));
966 } else {
967 self.params
968 .where_and
969 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
970 }
971 }
972 }
973 self
974 }
975 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
976 let join_table = if self.params.join_table.is_empty() {
977 self.params.table.clone()
978 } else {
979 self.params.join_table.clone()
980 };
981
982 if value.is_boolean() {
983 if value.as_bool().unwrap_or(false) {
984 value = 1.into();
985 } else {
986 value = 0.into();
987 }
988 }
989 match compare {
990 "between" => {
991 self.params.where_or.push(format!(
992 "{}.`{}` between '{}' AND '{}'",
993 join_table, field, value[0], value[1]
994 ));
995 }
996 "set" => {
997 let tt = value.to_string().replace(",", "%");
998 self.params
999 .where_or
1000 .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1001 }
1002 "notin" => {
1003 let mut text = String::new();
1004 for item in value.members() {
1005 text = format!("{text},'{item}'");
1006 }
1007 text = text.trim_start_matches(",").into();
1008 self.params
1009 .where_or
1010 .push(format!("{join_table}.`{field}` not in ({text})"));
1011 }
1012 "in" => {
1013 let mut text = String::new();
1014 if value.is_array() {
1015 for item in value.members() {
1016 text = format!("{text},'{item}'");
1017 }
1018 } else {
1019 let value = value.as_str().unwrap_or("");
1020 let value: Vec<&str> = value.split(",").collect();
1021 for item in value.iter() {
1022 text = format!("{text},'{item}'");
1023 }
1024 }
1025 text = text.trim_start_matches(",").into();
1026 self.params
1027 .where_or
1028 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1029 }
1030 _ => {
1031 self.params
1032 .where_or
1033 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1034 }
1035 }
1036 self
1037 }
1038 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1039 self.params.where_column = format!(
1040 "{}.`{}` {} {}.`{}`",
1041 self.params.table, field_a, compare, self.params.table, field_b
1042 );
1043 self
1044 }
1045
1046 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1047 self.params
1048 .update_column
1049 .push(format!("{field_a} = {compare}"));
1050 self
1051 }
1052
1053 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1054 self.params.page = page;
1055 self.params.limit = limit;
1056 self
1057 }
1058
1059 fn column(&mut self, field: &str) -> JsonValue {
1060 self.field(field);
1061 self.group(field);
1062 let sql = self.params.select_sql();
1063 if self.params.sql {
1064 return JsonValue::from(sql.clone());
1065 }
1066 self.table_info(self.params.table.clone().as_str());
1067 let (state, data) = self.query(sql);
1068 if state {
1069 let mut list = array![];
1070 for item in data.members() {
1071 if self.params.json[field].is_empty() {
1072 let _ = list.push(item[field].clone());
1073 } else {
1074 let data =
1075 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1076 let _ = list.push(data);
1077 }
1078 }
1079 list
1080 } else {
1081 array![]
1082 }
1083 }
1084
1085 fn count(&mut self) -> JsonValue {
1086 self.params.fields["count"] = "count(*) as count".to_string().into();
1087 let sql = self.params.select_sql();
1088 if self.params.sql {
1089 return JsonValue::from(sql.clone());
1090 }
1091 let (state, data) = self.query(sql);
1092 match state {
1093 true => data[0]["count"].clone(),
1094 false => JsonValue::from(0),
1095 }
1096 }
1097
1098 fn max(&mut self, field: &str) -> JsonValue {
1099 self.params.fields[field] = format!("max({field}) as {field}").into();
1100 let sql = self.params.select_sql();
1101 if self.params.sql {
1102 return JsonValue::from(sql.clone());
1103 }
1104 let (state, data) = self.query(sql);
1105 if state {
1106 if data.len() > 1 {
1107 return data;
1108 }
1109 data[0][field].clone()
1110 } else {
1111 JsonValue::from(0.0)
1112 }
1113 }
1114
1115 fn min(&mut self, field: &str) -> JsonValue {
1116 self.params.fields[field] = format!("min({field}) as {field}").into();
1117 let sql = self.params.select_sql();
1118 let (state, data) = self.query(sql);
1119 if state {
1120 if data.len() > 1 {
1121 return data;
1122 }
1123 data[0][field].clone()
1124 } else {
1125 JsonValue::from(0.0)
1126 }
1127 }
1128
1129 fn sum(&mut self, field: &str) -> JsonValue {
1130 self.params.fields[field] = format!("sum({field}) as {field}").into();
1131 let sql = self.params.select_sql();
1132 if self.params.sql {
1133 return JsonValue::from(sql.clone());
1134 }
1135 let (state, data) = self.query(sql);
1136 match state {
1137 true => {
1138 if data.len() > 1 {
1139 return data;
1140 }
1141 if self.params.fields.len() > 1 {
1142 return data[0].clone();
1143 }
1144 data[0][field].clone()
1145 }
1146 false => JsonValue::from(0),
1147 }
1148 }
1149 fn avg(&mut self, field: &str) -> JsonValue {
1150 self.params.fields[field] = format!("avg({field}) as {field}").into();
1151 let sql = self.params.select_sql();
1152 if self.params.sql {
1153 return JsonValue::from(sql.clone());
1154 }
1155 let (state, data) = self.query(sql);
1156 if state {
1157 if data.len() > 1 {
1158 return data;
1159 }
1160 data[0][field].clone()
1161 } else {
1162 JsonValue::from(0)
1163 }
1164 }
1165 fn select(&mut self) -> JsonValue {
1166 let sql = self.params.select_sql();
1167 if self.params.sql {
1168 return JsonValue::from(sql.clone());
1169 }
1170 self.table_info(self.params.table.clone().as_str());
1171 let (state, mut data) = self.query(sql.clone());
1172 match state {
1173 true => {
1174 for (field, _) in self.params.json.entries() {
1175 for item in data.members_mut() {
1176 if !item[field].is_empty() {
1177 let json = item[field].to_string();
1178 item[field] = match json::parse(&json) {
1179 Ok(e) => e,
1180 Err(_) => JsonValue::from(json),
1181 };
1182 }
1183 }
1184 }
1185 data.clone()
1186 }
1187 false => {
1188 error!("{data:?}");
1189 array![]
1190 }
1191 }
1192 }
1193 fn find(&mut self) -> JsonValue {
1194 self.params.page = 1;
1195 self.params.limit = 1;
1196 let sql = self.params.select_sql();
1197 if self.params.sql {
1198 return JsonValue::from(sql.clone());
1199 }
1200
1201 self.table_info(self.params.table.clone().as_str());
1202 let (state, mut data) = self.query(sql.clone());
1203 match state {
1204 true => {
1205 if data.is_empty() {
1206 return object! {};
1207 }
1208 for (field, _) in self.params.json.entries() {
1209 if !data[0][field].is_empty() {
1210 let json = data[0][field].to_string();
1211 let json = json::parse(&json).unwrap_or(array![]);
1212 data[0][field] = json;
1213 } else {
1214 data[0][field] = array![];
1215 }
1216 }
1217 data[0].clone()
1218 }
1219 false => {
1220 error!("{data:?}");
1221 object! {}
1222 }
1223 }
1224 }
1225
1226 fn value(&mut self, field: &str) -> JsonValue {
1227 self.params.fields = object! {};
1228 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1229 self.params.page = 1;
1230 self.params.limit = 1;
1231 let sql = self.params.select_sql();
1232 if self.params.sql {
1233 return JsonValue::from(sql.clone());
1234 }
1235 self.table_info(self.params.table.clone().as_str());
1236 let (state, mut data) = self.query(sql.clone());
1237 match state {
1238 true => {
1239 for (field, _) in self.params.json.entries() {
1240 if !data[0][field].is_empty() {
1241 let json = data[0][field].to_string();
1242 let json = json::parse(&json).unwrap_or(array![]);
1243 data[0][field] = json;
1244 } else {
1245 data[0][field] = array![];
1246 }
1247 }
1248 data[0][field].clone()
1249 }
1250 false => {
1251 if self.connection.debug {
1252 info!("{data:?}");
1253 }
1254 JsonValue::Null
1255 }
1256 }
1257 }
1258
1259 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1260 let mut fields = vec![];
1261 let mut values = vec![];
1262
1263 if !self.params.autoinc && data["id"].is_empty() {
1264 let thread_id = format!("{:?}", std::thread::current().id());
1265 let thread_num: u64 = thread_id
1266 .trim_start_matches("ThreadId(")
1267 .trim_end_matches(")")
1268 .parse()
1269 .unwrap_or(0);
1270 data["id"] = format!(
1271 "{:X}{:X}",
1272 Local::now().timestamp_nanos_opt().unwrap_or(0),
1273 thread_num
1274 )
1275 .into();
1276 }
1277 for (field, value) in data.entries() {
1278 fields.push(format!("`{field}`"));
1279
1280 if value.is_string() {
1281 if value.to_string().contains("'") {
1282 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1283 continue;
1284 } else if value.to_string().contains('"') {
1285 values.push(format!("'{value}'"));
1286 continue;
1287 } else {
1288 values.push(format!("\"{value}\""));
1289 continue;
1290 }
1291 } else if value.is_array() || value.is_object() {
1292 if self.params.json[field].is_empty() {
1293 values.push(format!("'{value}'"));
1294 } else {
1295 let json = value.to_string();
1296 let json = json.replace("'", "''");
1297 values.push(format!("'{json}'"));
1298 }
1299 continue;
1300 } else if value.is_number() || value.is_boolean() || value.is_null() {
1301 values.push(format!("{value}"));
1302 continue;
1303 } else {
1304 values.push(format!("'{value}'"));
1305 continue;
1306 }
1307 }
1308 let fields = fields.join(",");
1309 let values = values.join(",");
1310
1311 let sql = format!(
1312 "INSERT INTO `{}` ({}) VALUES ({});",
1313 self.params.table, fields, values
1314 );
1315 if self.params.sql {
1316 return JsonValue::from(sql.clone());
1317 }
1318 let (state, ids) = self.execute(sql);
1319 match state {
1320 true => {
1321 if self.params.autoinc {
1322 let (state, ids) =
1323 self.query(format!("select max(id) as id from {}", self.params.table));
1324 return match state {
1325 true => ids[0]["id"].clone(),
1326 false => {
1327 error!("{ids}");
1328 JsonValue::from("")
1329 }
1330 };
1331 }
1332 data["id"].clone()
1333 }
1334 false => {
1335 error!("{ids}");
1336 JsonValue::from("")
1337 }
1338 }
1339 }
1340 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1341 let mut fields = String::new();
1342
1343 if !self.params.autoinc && data[0]["id"].is_empty() {
1344 data[0]["id"] = "".into();
1345 }
1346 for (field, _) in data[0].entries() {
1347 fields = format!("{fields},`{field}`");
1348 }
1349 fields = fields.trim_start_matches(",").to_string();
1350
1351 let core_count = num_cpus::get();
1352 let mut p = pools::Pool::new(core_count * 4);
1353 let autoinc = self.params.autoinc;
1354 for list in data.members() {
1355 let mut item = list.clone();
1356 p.execute(move |pcindex| {
1357 if !autoinc && item["id"].is_empty() {
1358 let id = format!(
1359 "{:X}{:X}",
1360 Local::now().timestamp_nanos_opt().unwrap_or(0),
1361 pcindex
1362 );
1363 item["id"] = id.into();
1364 }
1365 let mut values = "".to_string();
1366 for (_, value) in item.entries() {
1367 if value.is_string() {
1368 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1369 } else if value.is_number() || value.is_boolean() {
1370 values = format!("{values},{value}");
1371 } else {
1372 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1373 }
1374 }
1375 values = format!("({})", values.trim_start_matches(","));
1376 array![item["id"].clone(), values]
1377 });
1378 }
1379 let (ids_list, mut values) = p.insert_all();
1380
1381 values = values.trim_start_matches(",").to_string();
1382
1383 let sql = format!(
1384 "INSERT INTO {} ({}) VALUES {};",
1385 self.params.table, fields, values
1386 );
1387 if self.params.sql {
1388 return JsonValue::from(sql.clone());
1389 }
1390 let (state, data) = self.execute(sql.clone());
1391 match state {
1392 true => {
1393 if self.params.autoinc {
1394 let (state, ids) = self.query(format!(
1395 "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1396 self.params.table,
1397 ids_list.len()
1398 ));
1399 return match state {
1400 true => {
1401 let mut idlist = array![];
1402 for item in ids.members() {
1403 let _ = idlist.push(item["id"].clone());
1404 }
1405 idlist
1406 }
1407 false => {
1408 error!("批量添加失败: {ids:?} {sql}");
1409 array![]
1410 }
1411 };
1412 }
1413 JsonValue::from(ids_list)
1414 }
1415 false => {
1416 error!("批量添加失败: {data:?} {sql}");
1417 array![]
1418 }
1419 }
1420 }
1421
1422 fn update(&mut self, data: JsonValue) -> JsonValue {
1423 let mut values = vec![];
1424
1425 for (field, value) in data.entries() {
1426 if value.is_string() {
1427 values.push(format!(
1428 "`{}` = '{}'",
1429 field,
1430 value.to_string().replace("'", "''")
1431 ));
1432 } else if value.is_array() || value.is_object() {
1433 if self.params.json[field].is_empty() {
1434 values.push(format!("`{field}` = '{value}'"));
1435 } else {
1436 let json = value.to_string();
1437 let json = json.replace("'", "''");
1438 values.push(format!("`{field}` = '{json}'"));
1439 }
1440 continue;
1441 } else if value.is_number() || value.is_boolean() || value.is_null() {
1442 values.push(format!("`{field}` = {value} "));
1443 } else {
1444 values.push(format!("`{field}` = '{value}' "));
1445 }
1446 }
1447
1448 for (field, value) in self.params.inc_dec.entries() {
1449 values.push(format!("{} = {}", field, value.to_string().clone()));
1450 }
1451
1452 let values = values.join(",");
1453 let sql = format!(
1454 "UPDATE `{}` SET {} {} {};",
1455 self.params.table.clone(),
1456 values,
1457 self.params.where_sql(),
1458 self.params.page_limit_sql()
1459 );
1460 if self.params.sql {
1461 return JsonValue::from(sql.clone());
1462 }
1463 let (state, data) = self.execute(sql);
1464 if state {
1465 data
1466 } else {
1467 error!("{data}");
1468 JsonValue::from(0)
1469 }
1470 }
1471
1472 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1473 let mut values = vec![];
1474 let mut ids = vec![];
1475 for (field, _) in data[0].entries() {
1476 if field == "id" {
1477 continue;
1478 }
1479 let mut fields = vec![];
1480 for row in data.members() {
1481 let value = row[field].clone();
1482 let id = row["id"].clone();
1483 ids.push(id.clone());
1484 if value.is_string() {
1485 fields.push(format!(
1486 "WHEN '{}' THEN '{}'",
1487 id,
1488 value.to_string().replace("'", "''")
1489 ));
1490 } else if value.is_array() || value.is_object() {
1491 if self.params.json[field].is_empty() {
1492 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1493 } else {
1494 let json = value.to_string();
1495 let json = json.replace("'", "''");
1496 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1497 }
1498 continue;
1499 } else if value.is_number() || value.is_boolean() || value.is_null() {
1500 fields.push(format!("WHEN '{id}' THEN {value}"));
1501 } else {
1502 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1503 }
1504 }
1505 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1506 }
1507
1508 self.where_and("id", "in", ids.into());
1509 for (field, value) in self.params.inc_dec.entries() {
1510 values.push(format!("{} = {}", field, value.to_string().clone()));
1511 }
1512
1513 let values = values.join(",");
1514 let sql = format!(
1515 "UPDATE {} SET {} {} {};",
1516 self.params.table.clone(),
1517 values,
1518 self.params.where_sql(),
1519 self.params.page_limit_sql()
1520 );
1521 if self.params.sql {
1522 return JsonValue::from(sql.clone());
1523 }
1524 let (state, data) = self.execute(sql);
1525 if state {
1526 data
1527 } else {
1528 error!("{data:?}");
1529 JsonValue::from(0)
1530 }
1531 }
1532 fn delete(&mut self) -> JsonValue {
1533 let sql = format!(
1534 "delete FROM `{}` {};",
1535 self.params.table.clone(),
1536 self.params.where_sql()
1537 );
1538 if self.params.sql {
1539 return JsonValue::from(sql.clone());
1540 }
1541 let (state, data) = self.execute(sql);
1542 match state {
1543 true => data,
1544 false => {
1545 error!("delete 失败>>>{data:?}");
1546 JsonValue::from(0)
1547 }
1548 }
1549 }
1550
1551 fn transaction(&mut self) -> bool {
1552 let thread_id = format!("{:?}", thread::current().id());
1553
1554 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1555 SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
1556 return true;
1557 }
1558
1559 if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
1560 error!("{thread_id} 启动事务失败: 获取写锁超时");
1561 return false;
1562 }
1563
1564 let flags = OpenFlags::new().with_read_write().with_no_mutex();
1565 let db = match Connect::open_thread_safe_with_flags(
1566 self.connection.clone().get_dsn().as_str(),
1567 flags,
1568 ) {
1569 Ok(e) => Arc::new(e),
1570 Err(e) => {
1571 error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
1572 SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
1573 return false;
1574 }
1575 };
1576
1577 SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
1578
1579 let (state, data) = self.query("BEGIN".to_string());
1580 if state {
1581 true
1582 } else {
1583 error!("{thread_id} 启动事务失败: {data}");
1584 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1585 false
1586 }
1587 }
1588
1589 fn commit(&mut self) -> bool {
1590 let thread_id = format!("{:?}", thread::current().id());
1591
1592 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1593 error!("{thread_id} 提交事务失败: 没有活跃的事务");
1594 return false;
1595 }
1596
1597 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1598 if depth > 1 {
1599 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1600 return true;
1601 }
1602
1603 let (state, _) = self.query("COMMIT".to_string());
1604 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1605
1606 if state {
1607 true
1608 } else {
1609 error!("{thread_id} 提交事务失败");
1610 false
1611 }
1612 }
1613
1614 fn rollback(&mut self) -> bool {
1615 let thread_id = format!("{:?}", thread::current().id());
1616
1617 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1618 error!("{thread_id} 回滚失败: 没有活跃的事务");
1619 return false;
1620 }
1621
1622 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1623 if depth > 1 {
1624 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1625 return true;
1626 }
1627
1628 let (state, _) = self.query("ROLLBACK".to_string());
1629 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1630
1631 if state {
1632 true
1633 } else {
1634 error!("回滚失败: {thread_id}");
1635 false
1636 }
1637 }
1638
1639 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1640 let (state, data) = self.query(sql.to_string());
1641 match state {
1642 true => Ok(data),
1643 false => Err(data.to_string()),
1644 }
1645 }
1646
1647 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1648 let (state, data) = self.execute(sql.to_string());
1649 match state {
1650 true => Ok(data),
1651 false => Err(data.to_string()),
1652 }
1653 }
1654
1655 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1656 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1657 self
1658 }
1659
1660 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1661 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1662 self
1663 }
1664
1665 fn buildsql(&mut self) -> String {
1666 self.fetch_sql();
1667 let sql = self.select().to_string();
1668 format!("( {} ) `{}`", sql, self.params.table)
1669 }
1670
1671 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1672 for field in fields {
1673 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1674 }
1675 self
1676 }
1677
1678 fn join(
1679 &mut self,
1680 main_table: &str,
1681 main_fields: &str,
1682 right_table: &str,
1683 right_fields: &str,
1684 ) -> &mut Self {
1685 let main_table = if main_table.is_empty() {
1686 self.params.table.clone()
1687 } else {
1688 main_table.to_string()
1689 };
1690 self.params.join_table = right_table.to_string();
1691 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1692 self
1693 }
1694
1695 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1696 let main_fields = if main_fields.is_empty() {
1697 "id"
1698 } else {
1699 main_fields
1700 };
1701 let second_fields = if second_fields.is_empty() {
1702 self.params.table.clone()
1703 } else {
1704 second_fields.to_string().clone()
1705 };
1706 let sec_table_name = format!("{}{}", table, "_2");
1707 let second_table = format!("{} {}", table, sec_table_name.clone());
1708 self.params.join_table = sec_table_name.clone();
1709 self.params.join.push(format!(
1710 " INNER JOIN {} ON {}.{} = {}.{}",
1711 second_table, self.params.table, main_fields, sec_table_name, second_fields
1712 ));
1713 self
1714 }
1715}