1use crate::pools;
2use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::Connection;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15
16lazy_static! {
17 static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
18 static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
19 static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
20}
21
22#[derive(Clone, Debug)]
23pub struct Sqlite {
24 pub connection: Connection,
26 pub default: String,
28
29 pub params: Params,
30}
31
32impl Sqlite {
33 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
34 let dsn = connection.clone().get_dsn();
35 let db_path = Path::new(&dsn);
36 if let Some(parent) = db_path.parent() {
37 if !parent.as_os_str().is_empty() && !parent.exists() {
38 if let Err(e) = std::fs::create_dir_all(parent) {
39 error!("sqlite 创建目录失败: {} {:?}", e, parent);
40 return Err(e.to_string());
41 }
42 info!("sqlite 自动创建目录: {:?}", parent);
43 }
44 }
45
46 let flags = OpenFlags::new().with_create().with_read_write();
47 match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
48 Ok(e) => {
49 if let Ok(mut guard) = DBS.lock() {
50 guard.insert(default.clone(), Arc::new(e));
51 } else {
52 error!("sqlite 获取数据库锁失败");
53 return Err("获取数据库锁失败".to_string());
54 }
55 Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("sqlite"),
59 })
60 }
61 Err(e) => {
62 error!("sqlite 启动失败: {} {}", e, dsn.as_str());
63 Err(e.to_string())
64 }
65 }
66 }
67 fn query_handle_static(
69 mut statement: Statement,
70 sql: &str,
71 table: &str,
72 thread_id: &str,
73 ) -> (bool, JsonValue) {
74 let mut data = array![];
75 while let State::Row = match statement.next() {
76 Ok(e) => e,
77 Err(e) => {
78 let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
79 if in_transaction {
80 error!("{} 查询事务: {} {}", thread_id, e, sql);
81 } else {
82 error!("{} 非事务查询: {} {}", thread_id, e, sql);
83 }
84 return (false, data);
85 }
86 } {
87 let mut list = object! {};
88 let mut index = 0;
89 for field in statement.column_names().iter() {
90 if !list[field.as_str()].is_null() {
91 index += 1;
92 continue;
93 }
94 match statement.column_type(field.as_str()) {
95 Ok(types) => match types {
96 Type::String => {
97 if let Ok(data) = statement.read::<String, _>(index) {
98 match data.as_str() {
99 "false" => {
100 list[field.as_str()] = JsonValue::from(false);
101 }
102 "true" => {
103 list[field.as_str()] = JsonValue::from(true);
104 }
105 _ => {
106 list[field.as_str()] = JsonValue::from(data.clone());
107 }
108 }
109 }
110 }
111 Type::Integer => {
112 let fields_cache =
113 FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
114 if let Some(fields) = fields_cache {
115 if fields[field.clone()].is_empty() {
116 if let Ok(data) = statement.read::<i64, _>(index) {
117 list[field.as_str()] = data.into();
118 }
119 index += 1;
120 continue;
121 }
122 let field_type =
123 fields[field.clone()]["type"].as_str().unwrap_or("");
124 match field_type {
125 "INTEGER" => {
126 if let Ok(data) = statement.read::<i64, _>(index) {
127 list[field.as_str()] = JsonValue::from(data == 1);
128 }
129 }
130 x if x.contains("int(") => {
131 if let Ok(data) = statement.read::<i64, _>(index) {
132 list[field.as_str()] = data.into();
133 }
134 }
135 x if x.contains("decimal(") && x.ends_with(",0)") => {
136 if let Ok(data) = statement.read::<f64, _>(index) {
137 list[field.as_str()] = data.into();
138 }
139 }
140 _ => {
141 if let Ok(data) = statement.read::<i64, _>(index) {
142 list[field.as_str()] = data.into();
143 }
144 }
145 }
146 } else if let Ok(data) = statement.read::<i64, _>(index) {
147 list[field.as_str()] = JsonValue::from(data);
148 }
149 }
150 Type::Float => {
151 if let Ok(data) = statement.read::<f64, _>(index) {
152 list[field.as_str()] = JsonValue::from(data);
153 }
154 }
155 Type::Binary => {
156 if let Ok(data) = statement.read::<String, _>(index) {
157 list[field.as_str()] = JsonValue::from(data.clone());
158 }
159 }
160 Type::Null => match statement.read::<String, _>(index) {
161 Ok(data) => {
162 list[field.as_str()] = JsonValue::from(data.clone());
163 }
164 Err(_) => match statement.read::<f64, _>(index) {
165 Ok(data) => {
166 if data == 0.0 {
167 list[field.as_str()] = JsonValue::from("");
168 } else {
169 list[field.as_str()] = JsonValue::from(data);
170 }
171 }
172 Err(_) => match statement.read::<i64, _>(index) {
173 Ok(data) => {
174 if data == 0 {
175 list[field.as_str()] = JsonValue::from("");
176 } else {
177 list[field.as_str()] = JsonValue::from(data);
178 }
179 }
180 Err(e) => {
181 error!("Type:{} {:?}", field.as_str(), e);
182 }
183 },
184 },
185 },
186 },
187 Err(e) => {
188 error!("query Err: {e:?}");
189 }
190 }
191 index += 1;
192 }
193 let _ = data.push(list);
194 }
195 (true, data)
196 }
197
198 pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
199 let thread_id = format!("{:?}", thread::current().id());
200 let table = self.params.table.clone();
201
202 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
203 if self.connection.debug {
204 info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
205 }
206
207 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
208 match db.prepare(sql.clone()) {
209 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
210 Err(e) => {
211 error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
212 (false, e.to_string().into())
213 }
214 }
215 });
216
217 match result {
218 Some(r) => r,
219 None => {
220 error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
221 (false, JsonValue::from("未找到事务连接"))
222 }
223 }
224 } else {
225 if self.connection.debug {
226 info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
227 }
228 let dbs = match DBS.lock() {
229 Ok(dbs) => dbs,
230 Err(e) => {
231 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
232 return (false, JsonValue::from("数据库锁定失败"));
233 }
234 };
235 let db = match dbs.get(&self.default) {
236 Some(db) => db.clone(),
237 None => {
238 error!(
239 "{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
240 self.default
241 );
242 return (false, JsonValue::from("未找到默认数据库配置"));
243 }
244 };
245 drop(dbs);
246 let result = match db.prepare(sql.clone()) {
247 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
248 Err(e) => {
249 error!("{thread_id} 查询非事务: Err: {e}");
250 (false, e.to_string().into())
251 }
252 };
253 result
254 }
255 }
256
257 pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
258 let thread_id = format!("{:?}", thread::current().id());
259
260 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
261 if self.connection.debug {
262 info!("{} 执行事务: sql: {}", thread_id, sql.clone());
263 }
264
265 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
266 match db.execute(sql.clone()) {
267 Ok(_) => {
268 let count = db.change_count();
269 (true, JsonValue::from(count))
270 }
271 Err(e) => (false, JsonValue::from(e.to_string())),
272 }
273 });
274
275 match result {
276 Some((true, count)) => {
277 if self.connection.debug {
278 info!("{} count:{}", thread_id, count);
279 }
280 (true, count)
281 }
282 Some((false, err)) => {
283 error!(
284 "{} 执行事务: \r\nErr: {}\r\n{}",
285 thread_id,
286 err,
287 sql.clone()
288 );
289 (false, err)
290 }
291 None => {
292 error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
293 (false, JsonValue::from("未找到事务连接"))
294 }
295 }
296 } else {
297 if self.connection.debug {
298 info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
299 }
300 let dbs = match DBS.lock() {
301 Ok(dbs) => dbs,
302 Err(e) => {
303 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
304 return (false, JsonValue::from("数据库锁定失败"));
305 }
306 };
307
308 let db = match dbs.get(&self.default) {
309 Some(db) => db.clone(),
310 None => {
311 error!(
312 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
313 thread_id, self.default, sql
314 );
315 return (false, JsonValue::from("未找到默认数据库配置"));
316 }
317 };
318 drop(dbs);
319 match db.execute(sql.clone()) {
320 Ok(_) => {
321 let count = db.change_count();
322 if self.connection.debug {
323 info!(
324 "{} count: {} total_count: {}",
325 thread_id,
326 count,
327 db.total_change_count()
328 );
329 }
330 (true, JsonValue::from(count))
331 }
332 Err(e) => {
333 error!(
334 "{} 执行非事务: Err: {}\r\nSQL: {}",
335 thread_id,
336 e,
337 sql.clone()
338 );
339 (false, JsonValue::from(e.to_string()))
340 }
341 }
342 }
343 }
344}
345
346impl DbMode for Sqlite {
347 fn database_tables(&mut self) -> JsonValue {
348 let sql = "select name from sqlite_master where type='table' order by name;".to_string();
349 match self.sql(sql.as_str()) {
350 Ok(e) => {
351 let mut list = vec![];
352 for item in e.members() {
353 list.push(item["name"].clone());
354 }
355 list.into()
356 }
357 Err(_) => {
358 array![]
359 }
360 }
361 }
362 fn database_create(&mut self, name: &str) -> bool {
363 let current_dsn = self.connection.clone().get_dsn();
364 let current_path = Path::new(¤t_dsn);
365
366 let new_path = if let Some(parent) = current_path.parent() {
367 if parent.as_os_str().is_empty() {
368 Path::new(name).to_path_buf()
369 } else {
370 parent.join(name)
371 }
372 } else {
373 Path::new(name).to_path_buf()
374 };
375
376 if let Some(parent) = new_path.parent() {
377 if !parent.as_os_str().is_empty() && !parent.exists() {
378 if let Err(e) = std::fs::create_dir_all(parent) {
379 error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
380 return false;
381 }
382 }
383 }
384
385 let flags = OpenFlags::new().with_create().with_read_write();
386 match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
387 Ok(_) => true,
388 Err(e) => {
389 error!(
390 "sqlite database_create 创建数据库失败: {} {:?}",
391 e, new_path
392 );
393 false
394 }
395 }
396 }
397
398 fn truncate(&mut self, table: &str) -> bool {
399 let sql = format!("DELETE FROM `{table}`");
400 let (state, _) = self.execute(sql);
401 state
402 }
403}
404
405impl Mode for Sqlite {
406 fn table_create(&mut self, options: TableOptions) -> JsonValue {
407 let mut sql = String::new();
408 let mut unique_fields = String::new();
409 let mut unique_name = String::new();
410 let mut unique = String::new();
411 for item in options.table_unique.iter() {
412 if unique_fields.is_empty() {
413 unique_fields = format!("`{item}`");
414 unique_name = format!("unique_{item}");
415 } else {
416 unique_fields = format!("{unique_fields},`{item}`");
417 unique_name = format!("{unique_name}_{item}");
418 }
419 unique = format!(
420 "CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
421 unique_name, options.table_name, unique_fields
422 );
423 }
424
425 let mut index = vec![];
426 for row in options.table_index.iter() {
427 let mut index_fields = String::new();
428 let mut index_name = String::new();
429 for item in row.iter() {
430 if index_fields.is_empty() {
431 index_fields = format!("`{item}`");
432 index_name = format!("index_{item}");
433 } else {
434 index_fields = format!("{index_fields},`{item}`");
435 index_name = format!("{index_name}_{item}");
436 }
437 }
438 index.push(format!(
439 "CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
440 index_name, options.table_name, index_fields
441 ));
442 }
443
444 for (name, field) in options.table_fields.entries() {
445 let row = br_fields::field("sqlite", name, field.clone());
446 sql = format!("{sql} {row},\r\n");
447 }
448
449 sql = sql.trim_end_matches(",\r\n").to_string();
450
451 let sql = format!(
452 "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
453 options.table_name, sql
454 );
455 if self.params.sql {
456 let mut list = vec![sql];
457 if !unique.is_empty() {
458 list.push(unique)
459 }
460 if !index.is_empty() {
461 list.extend(index)
462 }
463
464 return JsonValue::from(list.join(""));
465 }
466
467 let thread_id = format!("{:?}", thread::current().id());
468
469 let (_, table_exists) = self.query(format!(
470 "SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
471 options.table_name
472 ));
473 let is_new_table = table_exists.is_empty();
474
475 let (state, _) = self.execute(sql.clone());
476 if state {
477 if is_new_table {
478 if !unique.is_empty() {
479 let (state, _) = self.execute(unique.clone());
480 info!(
481 "{} {} 唯一索引创建:{}",
482 thread_id, options.table_name, state
483 );
484 }
485 for sql in index.iter() {
486 let (state, _) = self.execute(sql.clone());
487 info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
488 }
489 }
490 JsonValue::from(true)
491 } else {
492 JsonValue::from(false)
493 }
494 }
495 fn table_update(&mut self, options: TableOptions) -> JsonValue {
496 let thread_id = format!("{:?}", thread::current().id());
497
498 let mut sql = String::new();
499 let mut add = vec![];
500 let mut del = vec![];
501 let mut put = vec![];
502
503 let (_, mut fields_list) =
504 self.query(format!("pragma table_info ('{}')", options.table_name));
505 let mut field_old = object! {};
506 for item in fields_list.members_mut() {
507 item["dflt_value"] = item["dflt_value"]
508 .to_string()
509 .trim_start_matches("'")
510 .trim_end_matches("'")
511 .into();
512 if let Some(name) = item["name"].as_str() {
513 field_old[name] = item.clone();
514 if options.table_fields[name].is_empty() {
515 del.push(name.to_string());
516 }
517 }
518 }
519
520 let mut fields_list = vec![];
521 let mut fields_list_new = vec![];
522
523 for (name, field) in options.table_fields.entries() {
524 if field_old[name].is_empty() {
525 add.push(name.to_string());
526 } else {
527 fields_list.push(name.to_string());
528 fields_list_new.push(name.to_string());
529 let field_mode = field["mode"].as_str().unwrap_or("");
530 let old_value = match field_mode {
531 "select" => {
532 if field_old[name]["dflt_value"].clone().is_empty() {
533 "".to_string()
534 } else {
535 field_old[name]["dflt_value"].clone().to_string()
536 }
537 }
538 "switch" => (field_old[name]["dflt_value"]
539 .to_string()
540 .parse::<i32>()
541 .unwrap_or(0)
542 == 1)
543 .to_string(),
544 _ => field_old[name]["dflt_value"].clone().to_string(),
545 };
546 let new_value = match field_mode {
547 "select" => field["def"]
548 .members()
549 .map(|x| x.to_string())
550 .collect::<Vec<String>>()
551 .join(","),
552 _ => field["def"].clone().to_string(),
553 };
554 if old_value != new_value {
555 info!(
556 "{} 差异化当前: {} old_value: {} new_value: {} {}",
557 options.table_name,
558 name,
559 old_value,
560 new_value,
561 old_value != new_value
562 );
563 info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
564 put.push(name.to_string());
565 } else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
566 && name != options.table_key
567 {
568 info!("{} 主键替换: {}", options.table_name, name);
569 put.push(name.to_string());
570 }
571 }
572 }
573
574 let mut unique_fields = String::new();
575 let mut unique_name = String::new();
576 let mut unique = String::new();
577
578 for item in options.table_unique.iter() {
580 if unique_fields.is_empty() {
581 unique_fields = format!("`{item}`");
582 unique_name = format!("unique_{item}");
583 } else {
584 unique_fields = format!("{unique_fields},`{item}`");
585 unique_name = format!("{unique_name}_{item}");
586 }
587 unique = format!(
588 "CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
589 options.table_name, unique_name, options.table_name, unique_fields
590 )
591 }
592
593 let mut index = vec![];
595 for row in options.table_index.iter() {
596 let mut index_fields = String::new();
597 let mut index_name = String::new();
598 for item in row.iter() {
599 if index_fields.is_empty() {
600 index_fields = item.to_string();
601 index_name = format!("index_{item}");
602 } else {
603 index_fields = format!("{index_fields},{item}");
604 index_name = format!("{index_name}_{item}");
605 }
606 }
607 index.push(format!(
608 "CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
609 options.table_name, index_name, options.table_name, index_fields
610 ));
611 }
612 for (name, field) in options.table_fields.entries() {
613 let row = br_fields::field("sqlite", name, field.clone());
614 sql = format!("{sql} {row},\r\n");
615 }
616
617 if !unique.is_empty() || !index.is_empty() {
618 let unique_text = unique.clone();
619 let (_, unique_old) =
620 self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
621 let mut index_old_list = vec![];
622 let mut index_new_list = vec![];
623 for item in unique_old.members() {
624 let origin = item["origin"].as_str().unwrap_or("");
625 let unique_1 = item["unique"].as_usize().unwrap_or(0);
626 let name = item["name"].as_str().unwrap_or("");
627
628 if origin == "c" && unique_1 == 1 {
629 if unique.contains(format!(" {name} ").as_str()) {
630 unique = "".to_string();
631 }
632 continue;
633 }
634 if origin == "c" && unique_1 == 0 {
635 index_old_list.push(item);
636 for item in index.iter() {
637 if item.contains(format!(" {name} ").as_str()) {
638 index_new_list.push(item.clone());
639 }
640 }
641 continue;
642 }
643 }
644 if unique.is_empty() {
645 if index_old_list.len() == index.len()
646 && index_old_list.len() == index_new_list.len()
647 {
648 index = vec![];
649 } else {
650 unique = unique_text;
651 }
652 }
653 }
654
655 sql = sql.trim_end_matches(",\r\n").to_string();
656 sql = format!(
657 "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
658 options.table_name, sql
659 );
660
661 let sqls = format!(
662 "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
663 options.table_name,
664 fields_list_new.join("`,`"),
665 fields_list.join("`,`")
666 );
667 let drop_sql = format!("drop table {};\r\n", options.table_name);
668 let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
669 let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
670
671 if self.params.sql {
672 let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
673 if !unique.is_empty() {
674 list.push(unique)
675 }
676 if !index.is_empty() {
677 list.extend(index)
678 }
679 return JsonValue::from(list.join(""));
680 }
681
682 if add.is_empty()
683 && del.is_empty()
684 && unique.is_empty()
685 && index.is_empty()
686 && put.is_empty()
687 {
688 return JsonValue::from(-1);
689 }
690
691 let (state, _) = self.execute(sql.clone());
692 let data = match state {
693 true => {
694 let (state, _) = self.execute(sqls.clone());
695 match state {
696 true => {
697 let (state, _) = self.execute(drop_sql);
698 match state {
699 true => {
700 let (state, _) = self.execute(alter_sql);
701 match state {
702 true => {
703 if !unique.is_empty() {
704 let (state, _) = self.execute(unique.clone());
705 info!(
706 "{} {} 唯一索引创建:{}",
707 thread_id, options.table_name, state
708 );
709 }
710 for index_sql in index.iter() {
711 let (state, _) = self.execute(index_sql.clone());
712 match state {
713 true => {}
714 false => {
715 error!(
716 "{} 索引创建失败: {} {}",
717 options.table_name, state, index_sql
718 );
719 return JsonValue::from(0);
720 }
721 };
722 }
723
724 return JsonValue::from(1);
725 }
726 false => {
727 error!("{} 修改表名失败", options.table_name);
728 return JsonValue::from(0);
729 }
730 }
731 }
732 false => {
733 error!("{} 删除本表失败", options.table_name);
734 return JsonValue::from(0);
735 }
736 }
737 }
738 false => {
739 error!(
740 "{} 添加tmp表记录失败 {:#} {:#}",
741 options.table_name, sql, sqls
742 );
743 let sql = format!("drop table {}_tmp", options.table_name);
744 let (_, _) = self.execute(sql);
745 0
746 }
747 }
748 }
749 false => {
750 error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
751 let (_, _) = self.execute(drop_sql_temp);
752 0
753 }
754 };
755 JsonValue::from(data)
756 }
757
758 fn table_info(&mut self, table: &str) -> JsonValue {
759 if let Ok(guard) = FIELDS.lock() {
760 if let Some(cached) = guard.get(table) {
761 return cached.clone();
762 }
763 }
764 let sql = format!("PRAGMA table_info({table})");
765 let (state, data) = self.query(sql);
766
767 match state {
768 true => {
769 let mut fields = object! {};
770 for item in data.members() {
771 if let Some(name) = item["name"].as_str() {
772 fields[name] = item.clone();
773 }
774 }
775 if let Ok(mut guard) = FIELDS.lock() {
776 guard.insert(table.to_string(), fields.clone());
777 }
778 data
779 }
780 false => object! {},
781 }
782 }
783
784 fn table_is_exist(&mut self, name: &str) -> bool {
785 let sql = format!(
786 "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
787 );
788 let (state, data) = self.query(sql);
789 if state && !data.is_empty() {
790 let count = data[0]["count"].as_i64().unwrap_or(0);
791 return count > 0;
792 }
793 false
794 }
795
796 fn table(&mut self, name: &str) -> &mut Sqlite {
797 self.params = Params::default(self.connection.mode.str().as_str());
798 let table_name = format!("{}{}", self.connection.prefix, name);
799 if !super::sql_safety::validate_table_name(&table_name) {
800 error!("Invalid table name: {}", name);
801 }
802 self.params.table = table_name.clone();
803 self.params.join_table = table_name;
804 self
805 }
806 fn change_table(&mut self, name: &str) -> &mut Self {
807 self.params.join_table = name.to_string();
808 self
809 }
810 fn autoinc(&mut self) -> &mut Self {
811 self.params.autoinc = true;
812 self
813 }
814
815 fn timestamps(&mut self) -> &mut Self {
816 self.params.timestamps = true;
817 self
818 }
819
820 fn fetch_sql(&mut self) -> &mut Self {
821 self.params.sql = true;
822 self
823 }
824
825 fn order(&mut self, field: &str, by: bool) -> &mut Self {
826 self.params.order[field] = {
827 if by {
828 "DESC"
829 } else {
830 "ASC"
831 }
832 }
833 .into();
834 self
835 }
836
837 fn group(&mut self, field: &str) -> &mut Self {
838 let fields: Vec<&str> = field.split(",").collect();
839 for field in fields.iter() {
840 let fields = field.to_string();
841 self.params.group[fields.as_str()] = fields.clone().into();
842 self.params.fields[fields.as_str()] = fields.clone().into();
843 }
844 self
845 }
846
847 fn distinct(&mut self) -> &mut Self {
848 self.params.distinct = true;
849 self
850 }
851 fn json(&mut self, field: &str) -> &mut Self {
852 let list: Vec<&str> = field.split(",").collect();
853 for item in list.iter() {
854 self.params.json[item.to_string().as_str()] = item.to_string().into();
855 }
856 self
857 }
858
859 fn location(&mut self, field: &str) -> &mut Self {
860 let list: Vec<&str> = field.split(",").collect();
861 for item in list.iter() {
862 self.params.location[item.to_string().as_str()] = item.to_string().into();
863 }
864 self
865 }
866
867 fn field(&mut self, field: &str) -> &mut Self {
868 let list: Vec<&str> = field.split(",").collect();
869 let join_table = if self.params.join_table.is_empty() {
870 self.params.table.clone()
871 } else {
872 self.params.join_table.clone()
873 };
874 for item in list.iter() {
875 let item = item.to_string();
876 if item.contains(" as ") {
877 let text = item.split(" as ").collect::<Vec<&str>>().clone();
878 self.params.fields[item] =
879 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
880 } else {
881 self.params.fields[item] = format!("{join_table}.`{item}`").into();
882 }
883 }
884 self
885 }
886
887 fn field_raw(&mut self, expr: &str) -> &mut Self {
888 self.params.fields[expr] = expr.into();
889 self
890 }
891
892 fn hidden(&mut self, name: &str) -> &mut Self {
893 let hidden: Vec<&str> = name.split(",").collect();
894 let sql = format!("PRAGMA table_info({})", self.params.table);
895 let (_, data) = self.query(sql);
896 for item in data.members() {
897 if let Some(name) = item["name"].as_str() {
898 if !hidden.contains(&name) {
899 self.params.fields[name] = name.into();
900 }
901 }
902 }
903 self
904 }
905
906 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
907 for f in field.split('|') {
908 if !super::sql_safety::validate_field_name(f) {
909 error!("Invalid field name: {}", f);
910 }
911 }
912 if !super::sql_safety::validate_compare_orator(compare) {
913 error!("Invalid compare operator: {}", compare);
914 }
915 let join_table = if self.params.join_table.is_empty() {
916 self.params.table.clone()
917 } else {
918 self.params.join_table.clone()
919 };
920
921 if value.is_boolean() {
922 if value.as_bool().unwrap_or(false) {
923 value = 1.into();
924 } else {
925 value = 0.into();
926 }
927 }
928
929 match compare {
930 "between" => {
931 self.params.where_and.push(format!(
932 "{}.`{}` between '{}' AND '{}'",
933 join_table, field, value[0], value[1]
934 ));
935 }
936 "set" => {
937 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
938 let mut wheredata = vec![];
939 for item in list.iter() {
940 wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
941 }
942 self.params
943 .where_and
944 .push(format!("({})", wheredata.join(" or ")));
945 }
946 "notin" => {
947 let mut text = String::new();
948 for item in value.members() {
949 text = format!("{text},'{item}'");
950 }
951 text = text.trim_start_matches(",").into();
952 self.params
953 .where_and
954 .push(format!("{join_table}.`{field}` not in ({text})"));
955 }
956 "in" => {
957 let mut text = String::new();
958 if value.is_array() {
959 for item in value.members() {
960 text = format!("{text},'{item}'");
961 }
962 } else {
963 let value = value.to_string();
964 let value: Vec<&str> = value.split(",").collect();
965 for item in value.iter() {
966 text = format!("{text},'{item}'");
967 }
968 }
969 text = text.trim_start_matches(",").into();
970
971 self.params
972 .where_and
973 .push(format!("{join_table}.`{field}` {compare} ({text})"));
974 }
975 "=" => {
976 if value.is_null() {
977 self.params
978 .where_and
979 .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
980 } else {
981 self.params
982 .where_and
983 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
984 }
985 }
986 _ => {
987 if value.is_null() {
988 self.params
989 .where_and
990 .push(format!("{join_table}.`{field}` {compare} {value}"));
991 } else {
992 self.params
993 .where_and
994 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
995 }
996 }
997 }
998 self
999 }
1000 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1001 for f in field.split('|') {
1002 if !super::sql_safety::validate_field_name(f) {
1003 error!("Invalid field name: {}", f);
1004 }
1005 }
1006 if !super::sql_safety::validate_compare_orator(compare) {
1007 error!("Invalid compare operator: {}", compare);
1008 }
1009 let join_table = if self.params.join_table.is_empty() {
1010 self.params.table.clone()
1011 } else {
1012 self.params.join_table.clone()
1013 };
1014
1015 if value.is_boolean() {
1016 if value.as_bool().unwrap_or(false) {
1017 value = 1.into();
1018 } else {
1019 value = 0.into();
1020 }
1021 }
1022 match compare {
1023 "between" => {
1024 self.params.where_or.push(format!(
1025 "{}.`{}` between '{}' AND '{}'",
1026 join_table, field, value[0], value[1]
1027 ));
1028 }
1029 "set" => {
1030 let tt = value.to_string().replace(",", "%");
1031 self.params
1032 .where_or
1033 .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1034 }
1035 "notin" => {
1036 let mut text = String::new();
1037 for item in value.members() {
1038 text = format!("{text},'{item}'");
1039 }
1040 text = text.trim_start_matches(",").into();
1041 self.params
1042 .where_or
1043 .push(format!("{join_table}.`{field}` not in ({text})"));
1044 }
1045 "in" => {
1046 let mut text = String::new();
1047 if value.is_array() {
1048 for item in value.members() {
1049 text = format!("{text},'{item}'");
1050 }
1051 } else {
1052 let value = value.as_str().unwrap_or("");
1053 let value: Vec<&str> = value.split(",").collect();
1054 for item in value.iter() {
1055 text = format!("{text},'{item}'");
1056 }
1057 }
1058 text = text.trim_start_matches(",").into();
1059 self.params
1060 .where_or
1061 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1062 }
1063 _ => {
1064 self.params
1065 .where_or
1066 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1067 }
1068 }
1069 self
1070 }
1071 fn where_raw(&mut self, expr: &str) -> &mut Self {
1072 self.params.where_and.push(expr.to_string());
1073 self
1074 }
1075
1076 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1077 self.params
1078 .where_and
1079 .push(format!("`{field}` IN ({sub_sql})"));
1080 self
1081 }
1082
1083 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1084 self.params
1085 .where_and
1086 .push(format!("`{field}` NOT IN ({sub_sql})"));
1087 self
1088 }
1089
1090 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1091 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1092 self
1093 }
1094
1095 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1096 self.params
1097 .where_and
1098 .push(format!("NOT EXISTS ({sub_sql})"));
1099 self
1100 }
1101
1102 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1103 self.params.where_column = format!(
1104 "{}.`{}` {} {}.`{}`",
1105 self.params.table, field_a, compare, self.params.table, field_b
1106 );
1107 self
1108 }
1109
1110 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1111 self.params
1112 .update_column
1113 .push(format!("{field_a} = {compare}"));
1114 self
1115 }
1116
1117 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1118 self.params.page = page;
1119 self.params.limit = limit;
1120 self
1121 }
1122
1123 fn limit(&mut self, count: i32) -> &mut Self {
1124 self.params.limit_only = count;
1125 self
1126 }
1127
1128 fn column(&mut self, field: &str) -> JsonValue {
1129 self.field(field);
1130 self.group(field);
1131 let sql = self.params.select_sql();
1132 if self.params.sql {
1133 return JsonValue::from(sql.clone());
1134 }
1135 self.table_info(self.params.table.clone().as_str());
1136 let (state, data) = self.query(sql);
1137 if state {
1138 let mut list = array![];
1139 for item in data.members() {
1140 if self.params.json[field].is_empty() {
1141 let _ = list.push(item[field].clone());
1142 } else {
1143 let data =
1144 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1145 let _ = list.push(data);
1146 }
1147 }
1148 list
1149 } else {
1150 array![]
1151 }
1152 }
1153
1154 fn count(&mut self) -> JsonValue {
1155 self.params.fields["count"] = "count(*) as count".to_string().into();
1156 let sql = self.params.select_sql();
1157 if self.params.sql {
1158 return JsonValue::from(sql.clone());
1159 }
1160 let (state, data) = self.query(sql);
1161 match state {
1162 true => data[0]["count"].clone(),
1163 false => JsonValue::from(0),
1164 }
1165 }
1166
1167 fn max(&mut self, field: &str) -> JsonValue {
1168 self.params.fields[field] = format!("max({field}) as {field}").into();
1169 let sql = self.params.select_sql();
1170 if self.params.sql {
1171 return JsonValue::from(sql.clone());
1172 }
1173 let (state, data) = self.query(sql);
1174 if state {
1175 if data.len() > 1 {
1176 return data;
1177 }
1178 data[0][field].clone()
1179 } else {
1180 JsonValue::from(0.0)
1181 }
1182 }
1183
1184 fn min(&mut self, field: &str) -> JsonValue {
1185 self.params.fields[field] = format!("min({field}) as {field}").into();
1186 let sql = self.params.select_sql();
1187 let (state, data) = self.query(sql);
1188 if state {
1189 if data.len() > 1 {
1190 return data;
1191 }
1192 data[0][field].clone()
1193 } else {
1194 JsonValue::from(0.0)
1195 }
1196 }
1197
1198 fn sum(&mut self, field: &str) -> JsonValue {
1199 self.params.fields[field] = format!("sum({field}) as {field}").into();
1200 let sql = self.params.select_sql();
1201 if self.params.sql {
1202 return JsonValue::from(sql.clone());
1203 }
1204 let (state, data) = self.query(sql);
1205 match state {
1206 true => {
1207 if data.len() > 1 {
1208 return data;
1209 }
1210 if self.params.fields.len() > 1 {
1211 return data[0].clone();
1212 }
1213 data[0][field].clone()
1214 }
1215 false => JsonValue::from(0),
1216 }
1217 }
1218 fn avg(&mut self, field: &str) -> JsonValue {
1219 self.params.fields[field] = format!("avg({field}) as {field}").into();
1220 let sql = self.params.select_sql();
1221 if self.params.sql {
1222 return JsonValue::from(sql.clone());
1223 }
1224 let (state, data) = self.query(sql);
1225 if state {
1226 if data.len() > 1 {
1227 return data;
1228 }
1229 data[0][field].clone()
1230 } else {
1231 JsonValue::from(0)
1232 }
1233 }
1234 fn having(&mut self, expr: &str) -> &mut Self {
1235 self.params.having.push(expr.to_string());
1236 self
1237 }
1238 fn select(&mut self) -> JsonValue {
1239 let sql = self.params.select_sql();
1240 if self.params.sql {
1241 return JsonValue::from(sql.clone());
1242 }
1243 self.table_info(self.params.table.clone().as_str());
1244 let (state, mut data) = self.query(sql.clone());
1245 match state {
1246 true => {
1247 for (field, _) in self.params.json.entries() {
1248 for item in data.members_mut() {
1249 if !item[field].is_empty() {
1250 let json = item[field].to_string();
1251 item[field] = match json::parse(&json) {
1252 Ok(e) => e,
1253 Err(_) => JsonValue::from(json),
1254 };
1255 }
1256 }
1257 }
1258 data.clone()
1259 }
1260 false => {
1261 error!("{data:?}");
1262 array![]
1263 }
1264 }
1265 }
1266 fn find(&mut self) -> JsonValue {
1267 self.params.page = 1;
1268 self.params.limit = 1;
1269 let sql = self.params.select_sql();
1270 if self.params.sql {
1271 return JsonValue::from(sql.clone());
1272 }
1273
1274 self.table_info(self.params.table.clone().as_str());
1275 let (state, mut data) = self.query(sql.clone());
1276 match state {
1277 true => {
1278 if data.is_empty() {
1279 return object! {};
1280 }
1281 for (field, _) in self.params.json.entries() {
1282 if !data[0][field].is_empty() {
1283 let json = data[0][field].to_string();
1284 let json = json::parse(&json).unwrap_or(array![]);
1285 data[0][field] = json;
1286 } else {
1287 data[0][field] = array![];
1288 }
1289 }
1290 data[0].clone()
1291 }
1292 false => {
1293 error!("{data:?}");
1294 object! {}
1295 }
1296 }
1297 }
1298
1299 fn value(&mut self, field: &str) -> JsonValue {
1300 self.params.fields = object! {};
1301 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1302 self.params.page = 1;
1303 self.params.limit = 1;
1304 let sql = self.params.select_sql();
1305 if self.params.sql {
1306 return JsonValue::from(sql.clone());
1307 }
1308 self.table_info(self.params.table.clone().as_str());
1309 let (state, mut data) = self.query(sql.clone());
1310 match state {
1311 true => {
1312 for (field, _) in self.params.json.entries() {
1313 if !data[0][field].is_empty() {
1314 let json = data[0][field].to_string();
1315 let json = json::parse(&json).unwrap_or(array![]);
1316 data[0][field] = json;
1317 } else {
1318 data[0][field] = array![];
1319 }
1320 }
1321 data[0][field].clone()
1322 }
1323 false => {
1324 if self.connection.debug {
1325 info!("{data:?}");
1326 }
1327 JsonValue::Null
1328 }
1329 }
1330 }
1331
1332 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1333 let mut fields = vec![];
1334 let mut values = vec![];
1335
1336 if !self.params.autoinc && data["id"].is_empty() {
1337 let thread_id = format!("{:?}", std::thread::current().id());
1338 let thread_num: u64 = thread_id
1339 .trim_start_matches("ThreadId(")
1340 .trim_end_matches(")")
1341 .parse()
1342 .unwrap_or(0);
1343 data["id"] = format!(
1344 "{:X}{:X}",
1345 Local::now().timestamp_nanos_opt().unwrap_or(0),
1346 thread_num
1347 )
1348 .into();
1349 }
1350 for (field, value) in data.entries() {
1351 fields.push(format!("`{field}`"));
1352
1353 if value.is_string() {
1354 if value.to_string().contains("'") {
1355 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1356 continue;
1357 } else if value.to_string().contains('"') {
1358 values.push(format!("'{value}'"));
1359 continue;
1360 } else {
1361 values.push(format!("\"{value}\""));
1362 continue;
1363 }
1364 } else if value.is_array() || value.is_object() {
1365 if self.params.json[field].is_empty() {
1366 values.push(format!("'{value}'"));
1367 } else {
1368 let json = value.to_string();
1369 let json = json.replace("'", "''");
1370 values.push(format!("'{json}'"));
1371 }
1372 continue;
1373 } else if value.is_number() || value.is_boolean() || value.is_null() {
1374 values.push(format!("{value}"));
1375 continue;
1376 } else {
1377 values.push(format!("'{value}'"));
1378 continue;
1379 }
1380 }
1381 let fields = fields.join(",");
1382 let values = values.join(",");
1383
1384 let sql = format!(
1385 "INSERT INTO `{}` ({}) VALUES ({});",
1386 self.params.table, fields, values
1387 );
1388 if self.params.sql {
1389 return JsonValue::from(sql.clone());
1390 }
1391 let (state, ids) = self.execute(sql);
1392 match state {
1393 true => {
1394 if self.params.autoinc {
1395 let (state, ids) =
1396 self.query(format!("select max(id) as id from {}", self.params.table));
1397 return match state {
1398 true => ids[0]["id"].clone(),
1399 false => {
1400 error!("{ids}");
1401 JsonValue::from("")
1402 }
1403 };
1404 }
1405 data["id"].clone()
1406 }
1407 false => {
1408 error!("{ids}");
1409 JsonValue::from("")
1410 }
1411 }
1412 }
1413 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1414 let mut fields = String::new();
1415
1416 if !self.params.autoinc && data[0]["id"].is_empty() {
1417 data[0]["id"] = "".into();
1418 }
1419 for (field, _) in data[0].entries() {
1420 fields = format!("{fields},`{field}`");
1421 }
1422 fields = fields.trim_start_matches(",").to_string();
1423
1424 let core_count = num_cpus::get();
1425 let mut p = pools::Pool::new(core_count * 4);
1426 let autoinc = self.params.autoinc;
1427 for list in data.members() {
1428 let mut item = list.clone();
1429 p.execute(move |pcindex| {
1430 if !autoinc && item["id"].is_empty() {
1431 let id = format!(
1432 "{:X}{:X}",
1433 Local::now().timestamp_nanos_opt().unwrap_or(0),
1434 pcindex
1435 );
1436 item["id"] = id.into();
1437 }
1438 let mut values = "".to_string();
1439 for (_, value) in item.entries() {
1440 if value.is_string() {
1441 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1442 } else if value.is_number() || value.is_boolean() {
1443 values = format!("{values},{value}");
1444 } else {
1445 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1446 }
1447 }
1448 values = format!("({})", values.trim_start_matches(","));
1449 array![item["id"].clone(), values]
1450 });
1451 }
1452 let (ids_list, mut values) = p.insert_all();
1453
1454 values = values.trim_start_matches(",").to_string();
1455
1456 let sql = format!(
1457 "INSERT INTO {} ({}) VALUES {};",
1458 self.params.table, fields, values
1459 );
1460 if self.params.sql {
1461 return JsonValue::from(sql.clone());
1462 }
1463 let (state, data) = self.execute(sql.clone());
1464 match state {
1465 true => {
1466 if self.params.autoinc {
1467 let (state, ids) = self.query(format!(
1468 "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1469 self.params.table,
1470 ids_list.len()
1471 ));
1472 return match state {
1473 true => {
1474 let mut idlist = array![];
1475 for item in ids.members() {
1476 let _ = idlist.push(item["id"].clone());
1477 }
1478 idlist
1479 }
1480 false => {
1481 error!("批量添加失败: {ids:?} {sql}");
1482 array![]
1483 }
1484 };
1485 }
1486 JsonValue::from(ids_list)
1487 }
1488 false => {
1489 error!("批量添加失败: {data:?} {sql}");
1490 array![]
1491 }
1492 }
1493 }
1494
1495 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1496 let mut fields = vec![];
1497 let mut values = vec![];
1498
1499 if !self.params.autoinc && data["id"].is_empty() {
1500 let thread_id = format!("{:?}", std::thread::current().id());
1501 let thread_num: u64 = thread_id
1502 .trim_start_matches("ThreadId(")
1503 .trim_end_matches(")")
1504 .parse()
1505 .unwrap_or(0);
1506 data["id"] = format!(
1507 "{:X}{:X}",
1508 Local::now().timestamp_nanos_opt().unwrap_or(0),
1509 thread_num
1510 )
1511 .into();
1512 }
1513 for (field, value) in data.entries() {
1514 fields.push(format!("`{field}`"));
1515
1516 if value.is_string() {
1517 if value.to_string().contains("'") {
1518 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1519 continue;
1520 } else if value.to_string().contains('"') {
1521 values.push(format!("'{value}'"));
1522 continue;
1523 } else {
1524 values.push(format!("\"{value}\""));
1525 continue;
1526 }
1527 } else if value.is_array() || value.is_object() {
1528 if self.params.json[field].is_empty() {
1529 values.push(format!("'{value}'"));
1530 } else {
1531 let json = value.to_string();
1532 let json = json.replace("'", "''");
1533 values.push(format!("'{json}'"));
1534 }
1535 continue;
1536 } else if value.is_number() || value.is_boolean() || value.is_null() {
1537 values.push(format!("{value}"));
1538 continue;
1539 } else {
1540 values.push(format!("'{value}'"));
1541 continue;
1542 }
1543 }
1544
1545 let conflict_cols: Vec<String> =
1546 conflict_fields.iter().map(|f| format!("`{}`", f)).collect();
1547
1548 let update_set: Vec<String> = fields
1549 .iter()
1550 .filter(|f| {
1551 let name = f.trim_matches('`');
1552 !conflict_fields.contains(&name) && name != "id"
1553 })
1554 .map(|f| format!("{f}=excluded.{f}"))
1555 .collect();
1556
1557 let fields_str = fields.join(",");
1558 let values_str = values.join(",");
1559
1560 let sql = format!(
1561 "INSERT INTO `{}` ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1562 self.params.table,
1563 fields_str,
1564 values_str,
1565 conflict_cols.join(","),
1566 update_set.join(",")
1567 );
1568 if self.params.sql {
1569 return JsonValue::from(sql.clone());
1570 }
1571 let (state, result) = self.execute(sql);
1572 match state {
1573 true => {
1574 if self.params.autoinc {
1575 let (state, ids) =
1576 self.query(format!("select max(id) as id from {}", self.params.table));
1577 return match state {
1578 true => ids[0]["id"].clone(),
1579 false => {
1580 error!("{ids}");
1581 JsonValue::from("")
1582 }
1583 };
1584 }
1585 data["id"].clone()
1586 }
1587 false => {
1588 error!("upsert失败: {result}");
1589 JsonValue::from("")
1590 }
1591 }
1592 }
1593
1594 fn update(&mut self, data: JsonValue) -> JsonValue {
1595 let mut values = vec![];
1596
1597 for (field, value) in data.entries() {
1598 if value.is_string() {
1599 values.push(format!(
1600 "`{}` = '{}'",
1601 field,
1602 value.to_string().replace("'", "''")
1603 ));
1604 } else if value.is_array() || value.is_object() {
1605 if self.params.json[field].is_empty() {
1606 values.push(format!("`{field}` = '{value}'"));
1607 } else {
1608 let json = value.to_string();
1609 let json = json.replace("'", "''");
1610 values.push(format!("`{field}` = '{json}'"));
1611 }
1612 continue;
1613 } else if value.is_number() || value.is_boolean() || value.is_null() {
1614 values.push(format!("`{field}` = {value} "));
1615 } else {
1616 values.push(format!("`{field}` = '{value}' "));
1617 }
1618 }
1619
1620 for (field, value) in self.params.inc_dec.entries() {
1621 values.push(format!("{} = {}", field, value.to_string().clone()));
1622 }
1623
1624 let values = values.join(",");
1625 let sql = format!(
1626 "UPDATE `{}` SET {} {} {};",
1627 self.params.table.clone(),
1628 values,
1629 self.params.where_sql(),
1630 self.params.page_limit_sql()
1631 );
1632 if self.params.sql {
1633 return JsonValue::from(sql.clone());
1634 }
1635 let (state, data) = self.execute(sql);
1636 if state {
1637 data
1638 } else {
1639 error!("{data}");
1640 JsonValue::from(0)
1641 }
1642 }
1643
1644 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1645 let mut values = vec![];
1646 let mut ids = vec![];
1647 for (field, _) in data[0].entries() {
1648 if field == "id" {
1649 continue;
1650 }
1651 let mut fields = vec![];
1652 for row in data.members() {
1653 let value = row[field].clone();
1654 let id = row["id"].clone();
1655 ids.push(id.clone());
1656 if value.is_string() {
1657 fields.push(format!(
1658 "WHEN '{}' THEN '{}'",
1659 id,
1660 value.to_string().replace("'", "''")
1661 ));
1662 } else if value.is_array() || value.is_object() {
1663 if self.params.json[field].is_empty() {
1664 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1665 } else {
1666 let json = value.to_string();
1667 let json = json.replace("'", "''");
1668 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1669 }
1670 continue;
1671 } else if value.is_number() || value.is_boolean() || value.is_null() {
1672 fields.push(format!("WHEN '{id}' THEN {value}"));
1673 } else {
1674 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1675 }
1676 }
1677 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1678 }
1679
1680 self.where_and("id", "in", ids.into());
1681 for (field, value) in self.params.inc_dec.entries() {
1682 values.push(format!("{} = {}", field, value.to_string().clone()));
1683 }
1684
1685 let values = values.join(",");
1686 let sql = format!(
1687 "UPDATE {} SET {} {} {};",
1688 self.params.table.clone(),
1689 values,
1690 self.params.where_sql(),
1691 self.params.page_limit_sql()
1692 );
1693 if self.params.sql {
1694 return JsonValue::from(sql.clone());
1695 }
1696 let (state, data) = self.execute(sql);
1697 if state {
1698 data
1699 } else {
1700 error!("{data:?}");
1701 JsonValue::from(0)
1702 }
1703 }
1704 fn delete(&mut self) -> JsonValue {
1705 let sql = format!(
1706 "delete FROM `{}` {};",
1707 self.params.table.clone(),
1708 self.params.where_sql()
1709 );
1710 if self.params.sql {
1711 return JsonValue::from(sql.clone());
1712 }
1713 let (state, data) = self.execute(sql);
1714 match state {
1715 true => data,
1716 false => {
1717 error!("delete 失败>>>{data:?}");
1718 JsonValue::from(0)
1719 }
1720 }
1721 }
1722
1723 fn transaction(&mut self) -> bool {
1724 let thread_id = format!("{:?}", thread::current().id());
1725
1726 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1727 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1728 SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
1729 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1730 let _ = self.query(sp);
1731 return true;
1732 }
1733
1734 if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
1735 error!("{thread_id} 启动事务失败: 获取写锁超时");
1736 return false;
1737 }
1738
1739 let flags = OpenFlags::new().with_read_write().with_no_mutex();
1740 let db = match Connect::open_thread_safe_with_flags(
1741 self.connection.clone().get_dsn().as_str(),
1742 flags,
1743 ) {
1744 Ok(e) => Arc::new(e),
1745 Err(e) => {
1746 error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
1747 SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
1748 return false;
1749 }
1750 };
1751
1752 SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
1753
1754 let (state, data) = self.query("BEGIN".to_string());
1755 if state {
1756 true
1757 } else {
1758 error!("{thread_id} 启动事务失败: {data}");
1759 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1760 false
1761 }
1762 }
1763
1764 fn commit(&mut self) -> bool {
1765 let thread_id = format!("{:?}", thread::current().id());
1766
1767 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1768 error!("{thread_id} 提交事务失败: 没有活跃的事务");
1769 return false;
1770 }
1771
1772 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1773 if depth > 1 {
1774 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1775 let _ = self.query(sp);
1776 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1777 return true;
1778 }
1779
1780 let (state, _) = self.query("COMMIT".to_string());
1781 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1782
1783 if state {
1784 true
1785 } else {
1786 error!("{thread_id} 提交事务失败");
1787 false
1788 }
1789 }
1790
1791 fn rollback(&mut self) -> bool {
1792 let thread_id = format!("{:?}", thread::current().id());
1793
1794 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1795 error!("{thread_id} 回滚失败: 没有活跃的事务");
1796 return false;
1797 }
1798
1799 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1800 if depth > 1 {
1801 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1802 let _ = self.query(sp);
1803 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1804 return true;
1805 }
1806
1807 let (state, _) = self.query("ROLLBACK".to_string());
1808 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1809
1810 if state {
1811 true
1812 } else {
1813 error!("回滚失败: {thread_id}");
1814 false
1815 }
1816 }
1817
1818 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1819 let (state, data) = self.query(sql.to_string());
1820 match state {
1821 true => Ok(data),
1822 false => Err(data.to_string()),
1823 }
1824 }
1825
1826 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1827 let (state, data) = self.execute(sql.to_string());
1828 match state {
1829 true => Ok(data),
1830 false => Err(data.to_string()),
1831 }
1832 }
1833
1834 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1835 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1836 self
1837 }
1838
1839 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1840 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1841 self
1842 }
1843
1844 fn buildsql(&mut self) -> String {
1845 self.fetch_sql();
1846 let sql = self.select().to_string();
1847 format!("( {} ) `{}`", sql, self.params.table)
1848 }
1849
1850 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1851 for field in fields {
1852 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1853 }
1854 self
1855 }
1856
1857 fn join(
1858 &mut self,
1859 main_table: &str,
1860 main_fields: &str,
1861 right_table: &str,
1862 right_fields: &str,
1863 ) -> &mut Self {
1864 let main_table = if main_table.is_empty() {
1865 self.params.table.clone()
1866 } else {
1867 main_table.to_string()
1868 };
1869 self.params.join_table = right_table.to_string();
1870 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1871 self
1872 }
1873
1874 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1875 let main_fields = if main_fields.is_empty() {
1876 "id"
1877 } else {
1878 main_fields
1879 };
1880 let second_fields = if second_fields.is_empty() {
1881 self.params.table.clone()
1882 } else {
1883 second_fields.to_string().clone()
1884 };
1885 let sec_table_name = format!("{}{}", table, "_2");
1886 let second_table = format!("{} {}", table, sec_table_name.clone());
1887 self.params.join_table = sec_table_name.clone();
1888 self.params.join.push(format!(
1889 " INNER JOIN {} ON {}.{} = {}.{}",
1890 second_table, self.params.table, main_fields, sec_table_name, second_fields
1891 ));
1892 self
1893 }
1894
1895 fn join_right(
1896 &mut self,
1897 main_table: &str,
1898 main_fields: &str,
1899 right_table: &str,
1900 right_fields: &str,
1901 ) -> &mut Self {
1902 let main_table = if main_table.is_empty() {
1903 self.params.table.clone()
1904 } else {
1905 main_table.to_string()
1906 };
1907 self.params.join_table = right_table.to_string();
1908 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1909 self
1910 }
1911
1912 fn join_full(
1913 &mut self,
1914 main_table: &str,
1915 main_fields: &str,
1916 right_table: &str,
1917 right_fields: &str,
1918 ) -> &mut Self {
1919 let main_table = if main_table.is_empty() {
1920 self.params.table.clone()
1921 } else {
1922 main_table.to_string()
1923 };
1924 self.params.join_table = right_table.to_string();
1925 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1926 self
1927 }
1928
1929 fn union(&mut self, sub_sql: &str) -> &mut Self {
1930 self.params.unions.push(format!("UNION {sub_sql}"));
1931 self
1932 }
1933
1934 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1935 self.params.unions.push(format!("UNION ALL {sub_sql}"));
1936 self
1937 }
1938
1939 fn lock_for_update(&mut self) -> &mut Self {
1940 self.params.lock_mode = "FOR UPDATE".to_string();
1941 self
1942 }
1943
1944 fn lock_for_share(&mut self) -> &mut Self {
1945 self.params.lock_mode = "FOR SHARE".to_string();
1946 self
1947 }
1948}