1use crate::pools;
2use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::Connection;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15
16lazy_static! {
17 static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
18 static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
19 static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
20}
21
22#[derive(Clone, Debug)]
23pub struct Sqlite {
24 pub connection: Connection,
26 pub default: String,
28
29 pub params: Params,
30}
31
32impl Sqlite {
33 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
34 let dsn = connection.clone().get_dsn();
35 let db_path = Path::new(&dsn);
36 if let Some(parent) = db_path.parent() {
37 if !parent.as_os_str().is_empty() && !parent.exists() {
38 if let Err(e) = std::fs::create_dir_all(parent) {
39 error!("sqlite 创建目录失败: {} {:?}", e, parent);
40 return Err(e.to_string());
41 }
42 info!("sqlite 自动创建目录: {:?}", parent);
43 }
44 }
45
46 let flags = OpenFlags::new().with_create().with_read_write();
47 match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
48 Ok(e) => {
49 if let Ok(mut guard) = DBS.lock() {
50 guard.insert(default.clone(), Arc::new(e));
51 } else {
52 error!("sqlite 获取数据库锁失败");
53 return Err("获取数据库锁失败".to_string());
54 }
55 Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("sqlite"),
59 })
60 }
61 Err(e) => {
62 error!("sqlite 启动失败: {} {}", e, dsn.as_str());
63 Err(e.to_string())
64 }
65 }
66 }
67 fn query_handle_static(
69 mut statement: Statement,
70 sql: &str,
71 table: &str,
72 thread_id: &str,
73 ) -> (bool, JsonValue) {
74 let mut data = array![];
75 while let State::Row = match statement.next() {
76 Ok(e) => e,
77 Err(e) => {
78 let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
79 if in_transaction {
80 error!("{} 查询事务: {} {}", thread_id, e, sql);
81 } else {
82 error!("{} 非事务查询: {} {}", thread_id, e, sql);
83 }
84 return (false, data);
85 }
86 } {
87 let mut list = object! {};
88 let mut index = 0;
89 for field in statement.column_names().iter() {
90 if !list[field.as_str()].is_null() {
91 index += 1;
92 continue;
93 }
94 match statement.column_type(field.as_str()) {
95 Ok(types) => match types {
96 Type::String => {
97 if let Ok(data) = statement.read::<String, _>(index) {
98 match data.as_str() {
99 "false" => {
100 list[field.as_str()] = JsonValue::from(false);
101 }
102 "true" => {
103 list[field.as_str()] = JsonValue::from(true);
104 }
105 _ => {
106 list[field.as_str()] = JsonValue::from(data.clone());
107 }
108 }
109 }
110 }
111 Type::Integer => {
112 let fields_cache =
113 FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
114 if let Some(fields) = fields_cache {
115 if fields[field.clone()].is_empty() {
116 if let Ok(data) = statement.read::<i64, _>(index) {
117 list[field.as_str()] = data.into();
118 }
119 index += 1;
120 continue;
121 }
122 let field_type =
123 fields[field.clone()]["type"].as_str().unwrap_or("");
124 match field_type {
125 "INTEGER" => {
126 if let Ok(data) = statement.read::<i64, _>(index) {
127 list[field.as_str()] = JsonValue::from(data == 1);
128 }
129 }
130 x if x.contains("int(") => {
131 if let Ok(data) = statement.read::<i64, _>(index) {
132 list[field.as_str()] = data.into();
133 }
134 }
135 x if x.contains("decimal(") && x.ends_with(",0)") => {
136 if let Ok(data) = statement.read::<f64, _>(index) {
137 list[field.as_str()] = data.into();
138 }
139 }
140 _ => {
141 if let Ok(data) = statement.read::<i64, _>(index) {
142 list[field.as_str()] = data.into();
143 }
144 }
145 }
146 } else if let Ok(data) = statement.read::<i64, _>(index) {
147 list[field.as_str()] = JsonValue::from(data);
148 }
149 }
150 Type::Float => {
151 if let Ok(data) = statement.read::<f64, _>(index) {
152 list[field.as_str()] = JsonValue::from(data);
153 }
154 }
155 Type::Binary => {
156 if let Ok(data) = statement.read::<String, _>(index) {
157 list[field.as_str()] = JsonValue::from(data.clone());
158 }
159 }
160 Type::Null => match statement.read::<String, _>(index) {
161 Ok(data) => {
162 list[field.as_str()] = JsonValue::from(data.clone());
163 }
164 Err(_) => match statement.read::<f64, _>(index) {
165 Ok(data) => {
166 if data == 0.0 {
167 list[field.as_str()] = JsonValue::from("");
168 } else {
169 list[field.as_str()] = JsonValue::from(data);
170 }
171 }
172 Err(_) => match statement.read::<i64, _>(index) {
173 Ok(data) => {
174 if data == 0 {
175 list[field.as_str()] = JsonValue::from("");
176 } else {
177 list[field.as_str()] = JsonValue::from(data);
178 }
179 }
180 Err(e) => {
181 error!("Type:{} {:?}", field.as_str(), e);
182 }
183 },
184 },
185 },
186 },
187 Err(e) => {
188 error!("query Err: {e:?}");
189 }
190 }
191 index += 1;
192 }
193 let _ = data.push(list);
194 }
195 (true, data)
196 }
197
198 pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
199 let thread_id = format!("{:?}", thread::current().id());
200 let table = self.params.table.clone();
201
202 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
203 if self.connection.debug {
204 info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
205 }
206
207 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
208 match db.prepare(sql.clone()) {
209 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
210 Err(e) => {
211 error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
212 (false, e.to_string().into())
213 }
214 }
215 });
216
217 match result {
218 Some(r) => r,
219 None => {
220 error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
221 (false, JsonValue::from("未找到事务连接"))
222 }
223 }
224 } else {
225 if self.connection.debug {
226 info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
227 }
228 let dbs = match DBS.lock() {
229 Ok(dbs) => dbs,
230 Err(e) => {
231 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
232 return (false, JsonValue::from("数据库锁定失败"));
233 }
234 };
235 let db = match dbs.get(&self.default) {
236 Some(db) => db.clone(),
237 None => {
238 error!(
239 "{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
240 self.default
241 );
242 return (false, JsonValue::from("未找到默认数据库配置"));
243 }
244 };
245 drop(dbs);
246 let result = match db.prepare(sql.clone()) {
247 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
248 Err(e) => {
249 error!("{thread_id} 查询非事务: Err: {e}");
250 (false, e.to_string().into())
251 }
252 };
253 result
254 }
255 }
256
257 pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
258 let thread_id = format!("{:?}", thread::current().id());
259
260 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
261 if self.connection.debug {
262 info!("{} 执行事务: sql: {}", thread_id, sql.clone());
263 }
264
265 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
266 match db.execute(sql.clone()) {
267 Ok(_) => {
268 let count = db.change_count();
269 (true, JsonValue::from(count))
270 }
271 Err(e) => (false, JsonValue::from(e.to_string())),
272 }
273 });
274
275 match result {
276 Some((true, count)) => {
277 if self.connection.debug {
278 info!("{} count:{}", thread_id, count);
279 }
280 (true, count)
281 }
282 Some((false, err)) => {
283 error!(
284 "{} 执行事务: \r\nErr: {}\r\n{}",
285 thread_id,
286 err,
287 sql.clone()
288 );
289 (false, err)
290 }
291 None => {
292 error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
293 (false, JsonValue::from("未找到事务连接"))
294 }
295 }
296 } else {
297 if self.connection.debug {
298 info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
299 }
300 let dbs = match DBS.lock() {
301 Ok(dbs) => dbs,
302 Err(e) => {
303 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
304 return (false, JsonValue::from("数据库锁定失败"));
305 }
306 };
307
308 let db = match dbs.get(&self.default) {
309 Some(db) => db.clone(),
310 None => {
311 error!(
312 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
313 thread_id, self.default, sql
314 );
315 return (false, JsonValue::from("未找到默认数据库配置"));
316 }
317 };
318 drop(dbs);
319 match db.execute(sql.clone()) {
320 Ok(_) => {
321 let count = db.change_count();
322 if self.connection.debug {
323 info!(
324 "{} count: {} total_count: {}",
325 thread_id,
326 count,
327 db.total_change_count()
328 );
329 }
330 (true, JsonValue::from(count))
331 }
332 Err(e) => {
333 error!(
334 "{} 执行非事务: Err: {}\r\nSQL: {}",
335 thread_id,
336 e,
337 sql.clone()
338 );
339 (false, JsonValue::from(e.to_string()))
340 }
341 }
342 }
343 }
344}
345
346impl DbMode for Sqlite {
347 fn database_tables(&mut self) -> JsonValue {
348 let sql = "select name from sqlite_master where type='table' order by name;".to_string();
349 match self.sql(sql.as_str()) {
350 Ok(e) => {
351 let mut list = vec![];
352 for item in e.members() {
353 list.push(item["name"].clone());
354 }
355 list.into()
356 }
357 Err(_) => {
358 array![]
359 }
360 }
361 }
362 fn database_create(&mut self, name: &str) -> bool {
363 let current_dsn = self.connection.clone().get_dsn();
364 let current_path = Path::new(¤t_dsn);
365
366 let new_path = if let Some(parent) = current_path.parent() {
367 if parent.as_os_str().is_empty() {
368 Path::new(name).to_path_buf()
369 } else {
370 parent.join(name)
371 }
372 } else {
373 Path::new(name).to_path_buf()
374 };
375
376 if let Some(parent) = new_path.parent() {
377 if !parent.as_os_str().is_empty() && !parent.exists() {
378 if let Err(e) = std::fs::create_dir_all(parent) {
379 error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
380 return false;
381 }
382 }
383 }
384
385 let flags = OpenFlags::new().with_create().with_read_write();
386 match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
387 Ok(_) => true,
388 Err(e) => {
389 error!(
390 "sqlite database_create 创建数据库失败: {} {:?}",
391 e, new_path
392 );
393 false
394 }
395 }
396 }
397
398 fn truncate(&mut self, table: &str) -> bool {
399 let sql = format!("DELETE FROM `{table}`");
400 let (state, _) = self.execute(sql);
401 state
402 }
403}
404
405impl Mode for Sqlite {
406 fn table_create(&mut self, options: TableOptions) -> JsonValue {
407 let mut sql = String::new();
408 let mut unique_fields = String::new();
409 let mut unique_name = String::new();
410 let mut unique = String::new();
411 for item in options.table_unique.iter() {
412 if unique_fields.is_empty() {
413 unique_fields = format!("`{item}`");
414 unique_name = format!("unique_{item}");
415 } else {
416 unique_fields = format!("{unique_fields},`{item}`");
417 unique_name = format!("{unique_name}_{item}");
418 }
419 unique = format!(
420 "CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
421 unique_name, options.table_name, unique_fields
422 );
423 }
424
425 let mut index = vec![];
426 for row in options.table_index.iter() {
427 let mut index_fields = String::new();
428 let mut index_name = String::new();
429 for item in row.iter() {
430 if index_fields.is_empty() {
431 index_fields = format!("`{item}`");
432 index_name = format!("index_{item}");
433 } else {
434 index_fields = format!("{index_fields},`{item}`");
435 index_name = format!("{index_name}_{item}");
436 }
437 }
438 index.push(format!(
439 "CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
440 index_name, options.table_name, index_fields
441 ));
442 }
443
444 for (name, field) in options.table_fields.entries() {
445 let row = br_fields::field("sqlite", name, field.clone());
446 sql = format!("{sql} {row},\r\n");
447 }
448
449 sql = sql.trim_end_matches(",\r\n").to_string();
450
451 let sql = format!(
452 "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
453 options.table_name, sql
454 );
455 if self.params.sql {
456 let mut list = vec![sql];
457 if !unique.is_empty() {
458 list.push(unique)
459 }
460 if !index.is_empty() {
461 list.extend(index)
462 }
463
464 return JsonValue::from(list.join(""));
465 }
466
467 let thread_id = format!("{:?}", thread::current().id());
468
469 let (_, table_exists) = self.query(format!(
470 "SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
471 options.table_name
472 ));
473 let is_new_table = table_exists.is_empty();
474
475 let (state, _) = self.execute(sql.clone());
476 if state {
477 if is_new_table {
478 if !unique.is_empty() {
479 let (state, _) = self.execute(unique.clone());
480 info!(
481 "{} {} 唯一索引创建:{}",
482 thread_id, options.table_name, state
483 );
484 }
485 for sql in index.iter() {
486 let (state, _) = self.execute(sql.clone());
487 info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
488 }
489 }
490 JsonValue::from(true)
491 } else {
492 JsonValue::from(false)
493 }
494 }
495 fn table_update(&mut self, options: TableOptions) -> JsonValue {
496 let thread_id = format!("{:?}", thread::current().id());
497
498 let mut sql = String::new();
499 let mut add = vec![];
500 let mut del = vec![];
501 let mut put = vec![];
502
503 let (_, mut fields_list) =
504 self.query(format!("pragma table_info ('{}')", options.table_name));
505 let mut field_old = object! {};
506 for item in fields_list.members_mut() {
507 item["dflt_value"] = item["dflt_value"]
508 .to_string()
509 .trim_start_matches("'")
510 .trim_end_matches("'")
511 .into();
512 if let Some(name) = item["name"].as_str() {
513 field_old[name] = item.clone();
514 if options.table_fields[name].is_empty() {
515 del.push(name.to_string());
516 }
517 }
518 }
519
520 let mut fields_list = vec![];
521 let mut fields_list_new = vec![];
522
523 for (name, field) in options.table_fields.entries() {
524 if field_old[name].is_empty() {
525 add.push(name.to_string());
526 } else {
527 fields_list.push(name.to_string());
528 fields_list_new.push(name.to_string());
529 let field_mode = field["mode"].as_str().unwrap_or("");
530 let old_value = match field_mode {
531 "select" => {
532 if field_old[name]["dflt_value"].clone().is_empty() {
533 "".to_string()
534 } else {
535 field_old[name]["dflt_value"].clone().to_string()
536 }
537 }
538 "switch" => (field_old[name]["dflt_value"]
539 .to_string()
540 .parse::<i32>()
541 .unwrap_or(0)
542 == 1)
543 .to_string(),
544 _ => field_old[name]["dflt_value"].clone().to_string(),
545 };
546 let new_value = match field_mode {
547 "select" => field["def"]
548 .members()
549 .map(|x| x.to_string())
550 .collect::<Vec<String>>()
551 .join(","),
552 _ => field["def"].clone().to_string(),
553 };
554 if old_value != new_value {
555 info!(
556 "{} 差异化当前: {} old_value: {} new_value: {} {}",
557 options.table_name,
558 name,
559 old_value,
560 new_value,
561 old_value != new_value
562 );
563 info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
564 put.push(name.to_string());
565 } else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
566 && name != options.table_key
567 {
568 info!("{} 主键替换: {}", options.table_name, name);
569 put.push(name.to_string());
570 }
571 }
572 }
573
574 let mut unique_fields = String::new();
575 let mut unique_name = String::new();
576 let mut unique = String::new();
577
578 for item in options.table_unique.iter() {
580 if unique_fields.is_empty() {
581 unique_fields = format!("`{item}`");
582 unique_name = format!("unique_{item}");
583 } else {
584 unique_fields = format!("{unique_fields},`{item}`");
585 unique_name = format!("{unique_name}_{item}");
586 }
587 unique = format!(
588 "CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
589 options.table_name, unique_name, options.table_name, unique_fields
590 )
591 }
592
593 let mut index = vec![];
595 for row in options.table_index.iter() {
596 let mut index_fields = String::new();
597 let mut index_name = String::new();
598 for item in row.iter() {
599 if index_fields.is_empty() {
600 index_fields = item.to_string();
601 index_name = format!("index_{item}");
602 } else {
603 index_fields = format!("{index_fields},{item}");
604 index_name = format!("{index_name}_{item}");
605 }
606 }
607 index.push(format!(
608 "CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
609 options.table_name, index_name, options.table_name, index_fields
610 ));
611 }
612 for (name, field) in options.table_fields.entries() {
613 let row = br_fields::field("sqlite", name, field.clone());
614 sql = format!("{sql} {row},\r\n");
615 }
616
617 if !unique.is_empty() || !index.is_empty() {
618 let unique_text = unique.clone();
619 let (_, unique_old) =
620 self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
621 let mut index_old_list = vec![];
622 let mut index_new_list = vec![];
623 for item in unique_old.members() {
624 let origin = item["origin"].as_str().unwrap_or("");
625 let unique_1 = item["unique"].as_usize().unwrap_or(0);
626 let name = item["name"].as_str().unwrap_or("");
627
628 if origin == "c" && unique_1 == 1 {
629 if unique.contains(format!(" {name} ").as_str()) {
630 unique = "".to_string();
631 }
632 continue;
633 }
634 if origin == "c" && unique_1 == 0 {
635 index_old_list.push(item);
636 for item in index.iter() {
637 if item.contains(format!(" {name} ").as_str()) {
638 index_new_list.push(item.clone());
639 }
640 }
641 continue;
642 }
643 }
644 if unique.is_empty() {
645 if index_old_list.len() == index.len()
646 && index_old_list.len() == index_new_list.len()
647 {
648 index = vec![];
649 } else {
650 unique = unique_text;
651 }
652 }
653 }
654
655 sql = sql.trim_end_matches(",\r\n").to_string();
656 sql = format!(
657 "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
658 options.table_name, sql
659 );
660
661 let sqls = format!(
662 "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
663 options.table_name,
664 fields_list_new.join("`,`"),
665 fields_list.join("`,`")
666 );
667 let drop_sql = format!("drop table {};\r\n", options.table_name);
668 let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
669 let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
670
671 if self.params.sql {
672 let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
673 if !unique.is_empty() {
674 list.push(unique)
675 }
676 if !index.is_empty() {
677 list.extend(index)
678 }
679 return JsonValue::from(list.join(""));
680 }
681
682 if add.is_empty()
683 && del.is_empty()
684 && unique.is_empty()
685 && index.is_empty()
686 && put.is_empty()
687 {
688 return JsonValue::from(-1);
689 }
690
691 let (state, _) = self.execute(sql.clone());
692 let data = match state {
693 true => {
694 let (state, _) = self.execute(sqls.clone());
695 match state {
696 true => {
697 let (state, _) = self.execute(drop_sql);
698 match state {
699 true => {
700 let (state, _) = self.execute(alter_sql);
701 match state {
702 true => {
703 if !unique.is_empty() {
704 let (state, _) = self.execute(unique.clone());
705 info!(
706 "{} {} 唯一索引创建:{}",
707 thread_id, options.table_name, state
708 );
709 }
710 for index_sql in index.iter() {
711 let (state, _) = self.execute(index_sql.clone());
712 match state {
713 true => {}
714 false => {
715 error!(
716 "{} 索引创建失败: {} {}",
717 options.table_name, state, index_sql
718 );
719 return JsonValue::from(0);
720 }
721 };
722 }
723
724 return JsonValue::from(1);
725 }
726 false => {
727 error!("{} 修改表名失败", options.table_name);
728 return JsonValue::from(0);
729 }
730 }
731 }
732 false => {
733 error!("{} 删除本表失败", options.table_name);
734 return JsonValue::from(0);
735 }
736 }
737 }
738 false => {
739 error!(
740 "{} 添加tmp表记录失败 {:#} {:#}",
741 options.table_name, sql, sqls
742 );
743 let sql = format!("drop table {}_tmp", options.table_name);
744 let (_, _) = self.execute(sql);
745 0
746 }
747 }
748 }
749 false => {
750 error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
751 let (_, _) = self.execute(drop_sql_temp);
752 0
753 }
754 };
755 JsonValue::from(data)
756 }
757
758 fn table_info(&mut self, table: &str) -> JsonValue {
759 if let Ok(guard) = FIELDS.lock() {
760 if let Some(cached) = guard.get(table) {
761 return cached.clone();
762 }
763 }
764 let sql = format!("PRAGMA table_info({table})");
765 let (state, data) = self.query(sql);
766
767 match state {
768 true => {
769 let mut fields = object! {};
770 for item in data.members() {
771 if let Some(name) = item["name"].as_str() {
772 fields[name] = item.clone();
773 }
774 }
775 if let Ok(mut guard) = FIELDS.lock() {
776 guard.insert(table.to_string(), fields.clone());
777 }
778 data
779 }
780 false => object! {},
781 }
782 }
783
784 fn table_is_exist(&mut self, name: &str) -> bool {
785 let sql = format!(
786 "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
787 );
788 let (state, data) = self.query(sql);
789 if state && !data.is_empty() {
790 let count = data[0]["count"].as_i64().unwrap_or(0);
791 return count > 0;
792 }
793 false
794 }
795
796 fn table(&mut self, name: &str) -> &mut Sqlite {
797 self.params = Params::default(self.connection.mode.str().as_str());
798 let table_name = format!("{}{}", self.connection.prefix, name);
799 if !super::sql_safety::validate_table_name(&table_name) {
800 error!("Invalid table name: {}", name);
801 }
802 self.params.table = table_name.clone();
803 self.params.join_table = table_name;
804 self
805 }
806 fn change_table(&mut self, name: &str) -> &mut Self {
807 self.params.join_table = name.to_string();
808 self
809 }
810 fn autoinc(&mut self) -> &mut Self {
811 self.params.autoinc = true;
812 self
813 }
814
815 fn timestamps(&mut self) -> &mut Self {
816 self.params.timestamps = true;
817 self
818 }
819
820 fn fetch_sql(&mut self) -> &mut Self {
821 self.params.sql = true;
822 self
823 }
824
825 fn order(&mut self, field: &str, by: bool) -> &mut Self {
826 self.params.order[field] = {
827 if by {
828 "DESC"
829 } else {
830 "ASC"
831 }
832 }
833 .into();
834 self
835 }
836
837 fn group(&mut self, field: &str) -> &mut Self {
838 let fields: Vec<&str> = field.split(",").collect();
839 for field in fields.iter() {
840 let fields = field.to_string();
841 self.params.group[fields.as_str()] = fields.clone().into();
842 self.params.fields[fields.as_str()] = fields.clone().into();
843 }
844 self
845 }
846
847 fn distinct(&mut self) -> &mut Self {
848 self.params.distinct = true;
849 self
850 }
851 fn json(&mut self, field: &str) -> &mut Self {
852 let list: Vec<&str> = field.split(",").collect();
853 for item in list.iter() {
854 self.params.json[item.to_string().as_str()] = item.to_string().into();
855 }
856 self
857 }
858
859 fn location(&mut self, field: &str) -> &mut Self {
860 let list: Vec<&str> = field.split(",").collect();
861 for item in list.iter() {
862 self.params.location[item.to_string().as_str()] = item.to_string().into();
863 }
864 self
865 }
866
867 fn field(&mut self, field: &str) -> &mut Self {
868 let list: Vec<&str> = field.split(",").collect();
869 let join_table = if self.params.join_table.is_empty() {
870 self.params.table.clone()
871 } else {
872 self.params.join_table.clone()
873 };
874 for item in list.iter() {
875 let item = item.to_string();
876 if item.contains(" as ") {
877 let text = item.split(" as ").collect::<Vec<&str>>().clone();
878 self.params.fields[item] =
879 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
880 } else {
881 self.params.fields[item] = format!("{join_table}.`{item}`").into();
882 }
883 }
884 self
885 }
886
887 fn field_raw(&mut self, expr: &str) -> &mut Self {
888 self.params.fields[expr] = expr.into();
889 self
890 }
891
892 fn hidden(&mut self, name: &str) -> &mut Self {
893 let hidden: Vec<&str> = name.split(",").collect();
894 let sql = format!("PRAGMA table_info({})", self.params.table);
895 let (_, data) = self.query(sql);
896 for item in data.members() {
897 if let Some(name) = item["name"].as_str() {
898 if !hidden.contains(&name) {
899 self.params.fields[name] = name.into();
900 }
901 }
902 }
903 self
904 }
905
906 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
907 for f in field.split('|') {
908 if !super::sql_safety::validate_field_name(f) {
909 error!("Invalid field name: {}", f);
910 }
911 }
912 if !super::sql_safety::validate_compare_orator(compare) {
913 error!("Invalid compare operator: {}", compare);
914 }
915 let join_table = if self.params.join_table.is_empty() {
916 self.params.table.clone()
917 } else {
918 self.params.join_table.clone()
919 };
920
921 if value.is_boolean() {
922 if value.as_bool().unwrap_or(false) {
923 value = 1.into();
924 } else {
925 value = 0.into();
926 }
927 }
928
929 match compare {
930 "between" => {
931 self.params.where_and.push(format!(
932 "{}.`{}` between '{}' AND '{}'",
933 join_table, field, value[0], value[1]
934 ));
935 }
936 "set" => {
937 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
938 let mut wheredata = vec![];
939 for item in list.iter() {
940 wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
941 }
942 self.params
943 .where_and
944 .push(format!("({})", wheredata.join(" or ")));
945 }
946 "notin" => {
947 let mut text = String::new();
948 for item in value.members() {
949 text = format!("{text},'{item}'");
950 }
951 text = text.trim_start_matches(",").into();
952 self.params
953 .where_and
954 .push(format!("{join_table}.`{field}` not in ({text})"));
955 }
956 "in" => {
957 let mut text = String::new();
958 if value.is_array() {
959 for item in value.members() {
960 text = format!("{text},'{item}'");
961 }
962 } else {
963 let value = value.to_string();
964 let value: Vec<&str> = value.split(",").collect();
965 for item in value.iter() {
966 text = format!("{text},'{item}'");
967 }
968 }
969 text = text.trim_start_matches(",").into();
970
971 self.params
972 .where_and
973 .push(format!("{join_table}.`{field}` {compare} ({text})"));
974 }
975 "=" => {
976 if value.is_null() {
977 self.params
978 .where_and
979 .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
980 } else {
981 self.params
982 .where_and
983 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
984 }
985 }
986 "json_contains" => {
990 if value.is_array() {
991 if value.is_empty() {
992 self.params.where_and.push("1=0".to_string());
993 } else {
994 let mut parts = vec![];
995 for item in value.members() {
996 let escaped = super::sql_safety::escape_string(&item.to_string());
997 parts.push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
998 }
999 self.params
1000 .where_and
1001 .push(format!("({})", parts.join(" OR ")));
1002 }
1003 } else {
1004 let escaped = super::sql_safety::escape_string(&value.to_string());
1005 self.params
1006 .where_and
1007 .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1008 }
1009 }
1010 _ => {
1011 if value.is_null() {
1012 self.params
1013 .where_and
1014 .push(format!("{join_table}.`{field}` {compare} {value}"));
1015 } else {
1016 self.params
1017 .where_and
1018 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1019 }
1020 }
1021 }
1022 self
1023 }
1024 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1025 for f in field.split('|') {
1026 if !super::sql_safety::validate_field_name(f) {
1027 error!("Invalid field name: {}", f);
1028 }
1029 }
1030 if !super::sql_safety::validate_compare_orator(compare) {
1031 error!("Invalid compare operator: {}", compare);
1032 }
1033 let join_table = if self.params.join_table.is_empty() {
1034 self.params.table.clone()
1035 } else {
1036 self.params.join_table.clone()
1037 };
1038
1039 if value.is_boolean() {
1040 if value.as_bool().unwrap_or(false) {
1041 value = 1.into();
1042 } else {
1043 value = 0.into();
1044 }
1045 }
1046 match compare {
1047 "between" => {
1048 self.params.where_or.push(format!(
1049 "{}.`{}` between '{}' AND '{}'",
1050 join_table, field, value[0], value[1]
1051 ));
1052 }
1053 "set" => {
1054 let tt = value.to_string().replace(",", "%");
1055 self.params
1056 .where_or
1057 .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1058 }
1059 "notin" => {
1060 let mut text = String::new();
1061 for item in value.members() {
1062 text = format!("{text},'{item}'");
1063 }
1064 text = text.trim_start_matches(",").into();
1065 self.params
1066 .where_or
1067 .push(format!("{join_table}.`{field}` not in ({text})"));
1068 }
1069 "in" => {
1070 let mut text = String::new();
1071 if value.is_array() {
1072 for item in value.members() {
1073 text = format!("{text},'{item}'");
1074 }
1075 } else {
1076 let value = value.as_str().unwrap_or("");
1077 let value: Vec<&str> = value.split(",").collect();
1078 for item in value.iter() {
1079 text = format!("{text},'{item}'");
1080 }
1081 }
1082 text = text.trim_start_matches(",").into();
1083 self.params
1084 .where_or
1085 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1086 }
1087 "json_contains" => {
1091 if value.is_array() {
1092 if value.is_empty() {
1093 self.params.where_or.push("1=0".to_string());
1094 } else {
1095 let mut parts = vec![];
1096 for item in value.members() {
1097 let escaped = super::sql_safety::escape_string(&item.to_string());
1098 parts.push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
1099 }
1100 self.params
1101 .where_or
1102 .push(format!("({})", parts.join(" OR ")));
1103 }
1104 } else {
1105 let escaped = super::sql_safety::escape_string(&value.to_string());
1106 self.params
1107 .where_or
1108 .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1109 }
1110 }
1111 _ => {
1112 self.params
1113 .where_or
1114 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1115 }
1116 }
1117 self
1118 }
1119 fn where_raw(&mut self, expr: &str) -> &mut Self {
1120 self.params.where_and.push(expr.to_string());
1121 self
1122 }
1123
1124 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1125 self.params
1126 .where_and
1127 .push(format!("`{field}` IN ({sub_sql})"));
1128 self
1129 }
1130
1131 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1132 self.params
1133 .where_and
1134 .push(format!("`{field}` NOT IN ({sub_sql})"));
1135 self
1136 }
1137
1138 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1139 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1140 self
1141 }
1142
1143 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1144 self.params
1145 .where_and
1146 .push(format!("NOT EXISTS ({sub_sql})"));
1147 self
1148 }
1149
1150 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1151 self.params.where_column = format!(
1152 "{}.`{}` {} {}.`{}`",
1153 self.params.table, field_a, compare, self.params.table, field_b
1154 );
1155 self
1156 }
1157
1158 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1159 self.params
1160 .update_column
1161 .push(format!("{field_a} = {compare}"));
1162 self
1163 }
1164
1165 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1166 self.params.page = page;
1167 self.params.limit = limit;
1168 self
1169 }
1170
1171 fn limit(&mut self, count: i32) -> &mut Self {
1172 self.params.limit_only = count;
1173 self
1174 }
1175
1176 fn column(&mut self, field: &str) -> JsonValue {
1177 self.field(field);
1178 self.group(field);
1179 let sql = self.params.select_sql();
1180 if self.params.sql {
1181 return JsonValue::from(sql.clone());
1182 }
1183 self.table_info(self.params.table.clone().as_str());
1184 let (state, data) = self.query(sql);
1185 if state {
1186 let mut list = array![];
1187 for item in data.members() {
1188 if self.params.json[field].is_empty() {
1189 let _ = list.push(item[field].clone());
1190 } else {
1191 let data =
1192 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1193 let _ = list.push(data);
1194 }
1195 }
1196 list
1197 } else {
1198 array![]
1199 }
1200 }
1201
1202 fn count(&mut self) -> JsonValue {
1203 self.params.fields["count"] = "count(*) as count".to_string().into();
1204 let sql = self.params.select_sql();
1205 if self.params.sql {
1206 return JsonValue::from(sql.clone());
1207 }
1208 let (state, data) = self.query(sql);
1209 match state {
1210 true => data[0]["count"].clone(),
1211 false => JsonValue::from(0),
1212 }
1213 }
1214
1215 fn max(&mut self, field: &str) -> JsonValue {
1216 self.params.fields[field] = format!("max({field}) as {field}").into();
1217 let sql = self.params.select_sql();
1218 if self.params.sql {
1219 return JsonValue::from(sql.clone());
1220 }
1221 let (state, data) = self.query(sql);
1222 if state {
1223 if data.len() > 1 {
1224 return data;
1225 }
1226 data[0][field].clone()
1227 } else {
1228 JsonValue::from(0.0)
1229 }
1230 }
1231
1232 fn min(&mut self, field: &str) -> JsonValue {
1233 self.params.fields[field] = format!("min({field}) as {field}").into();
1234 let sql = self.params.select_sql();
1235 let (state, data) = self.query(sql);
1236 if state {
1237 if data.len() > 1 {
1238 return data;
1239 }
1240 data[0][field].clone()
1241 } else {
1242 JsonValue::from(0.0)
1243 }
1244 }
1245
1246 fn sum(&mut self, field: &str) -> JsonValue {
1247 self.params.fields[field] = format!("sum({field}) as {field}").into();
1248 let sql = self.params.select_sql();
1249 if self.params.sql {
1250 return JsonValue::from(sql.clone());
1251 }
1252 let (state, data) = self.query(sql);
1253 match state {
1254 true => {
1255 if data.len() > 1 {
1256 return data;
1257 }
1258 if self.params.fields.len() > 1 {
1259 return data[0].clone();
1260 }
1261 data[0][field].clone()
1262 }
1263 false => JsonValue::from(0),
1264 }
1265 }
1266 fn avg(&mut self, field: &str) -> JsonValue {
1267 self.params.fields[field] = format!("avg({field}) as {field}").into();
1268 let sql = self.params.select_sql();
1269 if self.params.sql {
1270 return JsonValue::from(sql.clone());
1271 }
1272 let (state, data) = self.query(sql);
1273 if state {
1274 if data.len() > 1 {
1275 return data;
1276 }
1277 data[0][field].clone()
1278 } else {
1279 JsonValue::from(0)
1280 }
1281 }
1282 fn having(&mut self, expr: &str) -> &mut Self {
1283 self.params.having.push(expr.to_string());
1284 self
1285 }
1286 fn select(&mut self) -> JsonValue {
1287 let sql = self.params.select_sql();
1288 if self.params.sql {
1289 return JsonValue::from(sql.clone());
1290 }
1291 self.table_info(self.params.table.clone().as_str());
1292 let (state, mut data) = self.query(sql.clone());
1293 match state {
1294 true => {
1295 for (field, _) in self.params.json.entries() {
1296 for item in data.members_mut() {
1297 if !item[field].is_empty() {
1298 let json = item[field].to_string();
1299 item[field] = match json::parse(&json) {
1300 Ok(e) => e,
1301 Err(_) => JsonValue::from(json),
1302 };
1303 }
1304 }
1305 }
1306 data.clone()
1307 }
1308 false => {
1309 error!("{data:?}");
1310 array![]
1311 }
1312 }
1313 }
1314 fn find(&mut self) -> JsonValue {
1315 self.params.page = 1;
1316 self.params.limit = 1;
1317 let sql = self.params.select_sql();
1318 if self.params.sql {
1319 return JsonValue::from(sql.clone());
1320 }
1321
1322 self.table_info(self.params.table.clone().as_str());
1323 let (state, mut data) = self.query(sql.clone());
1324 match state {
1325 true => {
1326 if data.is_empty() {
1327 return object! {};
1328 }
1329 for (field, _) in self.params.json.entries() {
1330 if !data[0][field].is_empty() {
1331 let json = data[0][field].to_string();
1332 let json = json::parse(&json).unwrap_or(array![]);
1333 data[0][field] = json;
1334 } else {
1335 data[0][field] = array![];
1336 }
1337 }
1338 data[0].clone()
1339 }
1340 false => {
1341 error!("{data:?}");
1342 object! {}
1343 }
1344 }
1345 }
1346
1347 fn value(&mut self, field: &str) -> JsonValue {
1348 self.params.fields = object! {};
1349 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1350 self.params.page = 1;
1351 self.params.limit = 1;
1352 let sql = self.params.select_sql();
1353 if self.params.sql {
1354 return JsonValue::from(sql.clone());
1355 }
1356 self.table_info(self.params.table.clone().as_str());
1357 let (state, mut data) = self.query(sql.clone());
1358 match state {
1359 true => {
1360 for (field, _) in self.params.json.entries() {
1361 if !data[0][field].is_empty() {
1362 let json = data[0][field].to_string();
1363 let json = json::parse(&json).unwrap_or(array![]);
1364 data[0][field] = json;
1365 } else {
1366 data[0][field] = array![];
1367 }
1368 }
1369 data[0][field].clone()
1370 }
1371 false => {
1372 if self.connection.debug {
1373 info!("{data:?}");
1374 }
1375 JsonValue::Null
1376 }
1377 }
1378 }
1379
1380 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1381 let mut fields = vec![];
1382 let mut values = vec![];
1383
1384 if !self.params.autoinc && data["id"].is_empty() {
1385 let thread_id = format!("{:?}", std::thread::current().id());
1386 let thread_num: u64 = thread_id
1387 .trim_start_matches("ThreadId(")
1388 .trim_end_matches(")")
1389 .parse()
1390 .unwrap_or(0);
1391 data["id"] = format!(
1392 "{:X}{:X}",
1393 Local::now().timestamp_nanos_opt().unwrap_or(0),
1394 thread_num
1395 )
1396 .into();
1397 }
1398 for (field, value) in data.entries() {
1399 fields.push(format!("`{field}`"));
1400
1401 if value.is_string() {
1402 if value.to_string().contains("'") {
1403 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1404 continue;
1405 } else if value.to_string().contains('"') {
1406 values.push(format!("'{value}'"));
1407 continue;
1408 } else {
1409 values.push(format!("\"{value}\""));
1410 continue;
1411 }
1412 } else if value.is_array() || value.is_object() {
1413 if self.params.json[field].is_empty() {
1414 values.push(format!("'{value}'"));
1415 } else {
1416 let json = value.to_string();
1417 let json = json.replace("'", "''");
1418 values.push(format!("'{json}'"));
1419 }
1420 continue;
1421 } else if value.is_number() || value.is_boolean() || value.is_null() {
1422 values.push(format!("{value}"));
1423 continue;
1424 } else {
1425 values.push(format!("'{value}'"));
1426 continue;
1427 }
1428 }
1429 let fields = fields.join(",");
1430 let values = values.join(",");
1431
1432 let sql = format!(
1433 "INSERT INTO `{}` ({}) VALUES ({});",
1434 self.params.table, fields, values
1435 );
1436 if self.params.sql {
1437 return JsonValue::from(sql.clone());
1438 }
1439 let (state, ids) = self.execute(sql);
1440 match state {
1441 true => {
1442 if self.params.autoinc {
1443 let (state, ids) =
1444 self.query(format!("select max(id) as id from {}", self.params.table));
1445 return match state {
1446 true => ids[0]["id"].clone(),
1447 false => {
1448 error!("{ids}");
1449 JsonValue::from("")
1450 }
1451 };
1452 }
1453 data["id"].clone()
1454 }
1455 false => {
1456 error!("{ids}");
1457 JsonValue::from("")
1458 }
1459 }
1460 }
1461 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1462 let mut fields = String::new();
1463
1464 if !self.params.autoinc && data[0]["id"].is_empty() {
1465 data[0]["id"] = "".into();
1466 }
1467 for (field, _) in data[0].entries() {
1468 fields = format!("{fields},`{field}`");
1469 }
1470 fields = fields.trim_start_matches(",").to_string();
1471
1472 let core_count = num_cpus::get();
1473 let mut p = pools::Pool::new(core_count * 4);
1474 let autoinc = self.params.autoinc;
1475 for list in data.members() {
1476 let mut item = list.clone();
1477 p.execute(move |pcindex| {
1478 if !autoinc && item["id"].is_empty() {
1479 let id = format!(
1480 "{:X}{:X}",
1481 Local::now().timestamp_nanos_opt().unwrap_or(0),
1482 pcindex
1483 );
1484 item["id"] = id.into();
1485 }
1486 let mut values = "".to_string();
1487 for (_, value) in item.entries() {
1488 if value.is_string() {
1489 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1490 } else if value.is_number() || value.is_boolean() {
1491 values = format!("{values},{value}");
1492 } else {
1493 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1494 }
1495 }
1496 values = format!("({})", values.trim_start_matches(","));
1497 array![item["id"].clone(), values]
1498 });
1499 }
1500 let (ids_list, mut values) = p.insert_all();
1501
1502 values = values.trim_start_matches(",").to_string();
1503
1504 let sql = format!(
1505 "INSERT INTO {} ({}) VALUES {};",
1506 self.params.table, fields, values
1507 );
1508 if self.params.sql {
1509 return JsonValue::from(sql.clone());
1510 }
1511 let (state, data) = self.execute(sql.clone());
1512 match state {
1513 true => {
1514 if self.params.autoinc {
1515 let (state, ids) = self.query(format!(
1516 "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1517 self.params.table,
1518 ids_list.len()
1519 ));
1520 return match state {
1521 true => {
1522 let mut idlist = array![];
1523 for item in ids.members() {
1524 let _ = idlist.push(item["id"].clone());
1525 }
1526 idlist
1527 }
1528 false => {
1529 error!("批量添加失败: {ids:?} {sql}");
1530 array![]
1531 }
1532 };
1533 }
1534 JsonValue::from(ids_list)
1535 }
1536 false => {
1537 error!("批量添加失败: {data:?} {sql}");
1538 array![]
1539 }
1540 }
1541 }
1542
1543 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1544 let mut fields = vec![];
1545 let mut values = vec![];
1546
1547 if !self.params.autoinc && data["id"].is_empty() {
1548 let thread_id = format!("{:?}", std::thread::current().id());
1549 let thread_num: u64 = thread_id
1550 .trim_start_matches("ThreadId(")
1551 .trim_end_matches(")")
1552 .parse()
1553 .unwrap_or(0);
1554 data["id"] = format!(
1555 "{:X}{:X}",
1556 Local::now().timestamp_nanos_opt().unwrap_or(0),
1557 thread_num
1558 )
1559 .into();
1560 }
1561 for (field, value) in data.entries() {
1562 fields.push(format!("`{field}`"));
1563
1564 if value.is_string() {
1565 if value.to_string().contains("'") {
1566 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1567 continue;
1568 } else if value.to_string().contains('"') {
1569 values.push(format!("'{value}'"));
1570 continue;
1571 } else {
1572 values.push(format!("\"{value}\""));
1573 continue;
1574 }
1575 } else if value.is_array() || value.is_object() {
1576 if self.params.json[field].is_empty() {
1577 values.push(format!("'{value}'"));
1578 } else {
1579 let json = value.to_string();
1580 let json = json.replace("'", "''");
1581 values.push(format!("'{json}'"));
1582 }
1583 continue;
1584 } else if value.is_number() || value.is_boolean() || value.is_null() {
1585 values.push(format!("{value}"));
1586 continue;
1587 } else {
1588 values.push(format!("'{value}'"));
1589 continue;
1590 }
1591 }
1592
1593 let conflict_cols: Vec<String> =
1594 conflict_fields.iter().map(|f| format!("`{}`", f)).collect();
1595
1596 let update_set: Vec<String> = fields
1597 .iter()
1598 .filter(|f| {
1599 let name = f.trim_matches('`');
1600 !conflict_fields.contains(&name) && name != "id"
1601 })
1602 .map(|f| format!("{f}=excluded.{f}"))
1603 .collect();
1604
1605 let fields_str = fields.join(",");
1606 let values_str = values.join(",");
1607
1608 let sql = format!(
1609 "INSERT INTO `{}` ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1610 self.params.table,
1611 fields_str,
1612 values_str,
1613 conflict_cols.join(","),
1614 update_set.join(",")
1615 );
1616 if self.params.sql {
1617 return JsonValue::from(sql.clone());
1618 }
1619 let (state, result) = self.execute(sql);
1620 match state {
1621 true => {
1622 if self.params.autoinc {
1623 let (state, ids) =
1624 self.query(format!("select max(id) as id from {}", self.params.table));
1625 return match state {
1626 true => ids[0]["id"].clone(),
1627 false => {
1628 error!("{ids}");
1629 JsonValue::from("")
1630 }
1631 };
1632 }
1633 data["id"].clone()
1634 }
1635 false => {
1636 error!("upsert失败: {result}");
1637 JsonValue::from("")
1638 }
1639 }
1640 }
1641
1642 fn update(&mut self, data: JsonValue) -> JsonValue {
1643 let mut values = vec![];
1644
1645 for (field, value) in data.entries() {
1646 if value.is_string() {
1647 values.push(format!(
1648 "`{}` = '{}'",
1649 field,
1650 value.to_string().replace("'", "''")
1651 ));
1652 } else if value.is_array() || value.is_object() {
1653 if self.params.json[field].is_empty() {
1654 values.push(format!("`{field}` = '{value}'"));
1655 } else {
1656 let json = value.to_string();
1657 let json = json.replace("'", "''");
1658 values.push(format!("`{field}` = '{json}'"));
1659 }
1660 continue;
1661 } else if value.is_number() || value.is_boolean() || value.is_null() {
1662 values.push(format!("`{field}` = {value} "));
1663 } else {
1664 values.push(format!("`{field}` = '{value}' "));
1665 }
1666 }
1667
1668 for (field, value) in self.params.inc_dec.entries() {
1669 values.push(format!("{} = {}", field, value.to_string().clone()));
1670 }
1671
1672 let values = values.join(",");
1673 let sql = format!(
1674 "UPDATE `{}` SET {} {} {};",
1675 self.params.table.clone(),
1676 values,
1677 self.params.where_sql(),
1678 self.params.page_limit_sql()
1679 );
1680 if self.params.sql {
1681 return JsonValue::from(sql.clone());
1682 }
1683 let (state, data) = self.execute(sql);
1684 if state {
1685 data
1686 } else {
1687 error!("{data}");
1688 JsonValue::from(0)
1689 }
1690 }
1691
1692 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1693 let mut values = vec![];
1694 let mut ids = vec![];
1695 for (field, _) in data[0].entries() {
1696 if field == "id" {
1697 continue;
1698 }
1699 let mut fields = vec![];
1700 for row in data.members() {
1701 let value = row[field].clone();
1702 let id = row["id"].clone();
1703 ids.push(id.clone());
1704 if value.is_string() {
1705 fields.push(format!(
1706 "WHEN '{}' THEN '{}'",
1707 id,
1708 value.to_string().replace("'", "''")
1709 ));
1710 } else if value.is_array() || value.is_object() {
1711 if self.params.json[field].is_empty() {
1712 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1713 } else {
1714 let json = value.to_string();
1715 let json = json.replace("'", "''");
1716 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1717 }
1718 continue;
1719 } else if value.is_number() || value.is_boolean() || value.is_null() {
1720 fields.push(format!("WHEN '{id}' THEN {value}"));
1721 } else {
1722 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1723 }
1724 }
1725 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1726 }
1727
1728 self.where_and("id", "in", ids.into());
1729 for (field, value) in self.params.inc_dec.entries() {
1730 values.push(format!("{} = {}", field, value.to_string().clone()));
1731 }
1732
1733 let values = values.join(",");
1734 let sql = format!(
1735 "UPDATE {} SET {} {} {};",
1736 self.params.table.clone(),
1737 values,
1738 self.params.where_sql(),
1739 self.params.page_limit_sql()
1740 );
1741 if self.params.sql {
1742 return JsonValue::from(sql.clone());
1743 }
1744 let (state, data) = self.execute(sql);
1745 if state {
1746 data
1747 } else {
1748 error!("{data:?}");
1749 JsonValue::from(0)
1750 }
1751 }
1752 fn delete(&mut self) -> JsonValue {
1753 let sql = format!(
1754 "delete FROM `{}` {};",
1755 self.params.table.clone(),
1756 self.params.where_sql()
1757 );
1758 if self.params.sql {
1759 return JsonValue::from(sql.clone());
1760 }
1761 let (state, data) = self.execute(sql);
1762 match state {
1763 true => data,
1764 false => {
1765 error!("delete 失败>>>{data:?}");
1766 JsonValue::from(0)
1767 }
1768 }
1769 }
1770
1771 fn transaction(&mut self) -> bool {
1772 let thread_id = format!("{:?}", thread::current().id());
1773
1774 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1775 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1776 SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
1777 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1778 let _ = self.query(sp);
1779 return true;
1780 }
1781
1782 if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
1783 error!("{thread_id} 启动事务失败: 获取写锁超时");
1784 return false;
1785 }
1786
1787 let flags = OpenFlags::new().with_read_write().with_no_mutex();
1788 let db = match Connect::open_thread_safe_with_flags(
1789 self.connection.clone().get_dsn().as_str(),
1790 flags,
1791 ) {
1792 Ok(e) => Arc::new(e),
1793 Err(e) => {
1794 error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
1795 SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
1796 return false;
1797 }
1798 };
1799
1800 SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
1801
1802 let (state, data) = self.query("BEGIN".to_string());
1803 if state {
1804 true
1805 } else {
1806 error!("{thread_id} 启动事务失败: {data}");
1807 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1808 false
1809 }
1810 }
1811
1812 fn commit(&mut self) -> bool {
1813 let thread_id = format!("{:?}", thread::current().id());
1814
1815 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1816 error!("{thread_id} 提交事务失败: 没有活跃的事务");
1817 return false;
1818 }
1819
1820 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1821 if depth > 1 {
1822 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1823 let _ = self.query(sp);
1824 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1825 return true;
1826 }
1827
1828 let (state, _) = self.query("COMMIT".to_string());
1829 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1830
1831 if state {
1832 true
1833 } else {
1834 error!("{thread_id} 提交事务失败");
1835 false
1836 }
1837 }
1838
1839 fn rollback(&mut self) -> bool {
1840 let thread_id = format!("{:?}", thread::current().id());
1841
1842 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1843 error!("{thread_id} 回滚失败: 没有活跃的事务");
1844 return false;
1845 }
1846
1847 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1848 if depth > 1 {
1849 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1850 let _ = self.query(sp);
1851 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1852 return true;
1853 }
1854
1855 let (state, _) = self.query("ROLLBACK".to_string());
1856 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1857
1858 if state {
1859 true
1860 } else {
1861 error!("回滚失败: {thread_id}");
1862 false
1863 }
1864 }
1865
1866 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1867 let (state, data) = self.query(sql.to_string());
1868 match state {
1869 true => Ok(data),
1870 false => Err(data.to_string()),
1871 }
1872 }
1873
1874 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1875 let (state, data) = self.execute(sql.to_string());
1876 match state {
1877 true => Ok(data),
1878 false => Err(data.to_string()),
1879 }
1880 }
1881
1882 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1883 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1884 self
1885 }
1886
1887 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1888 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1889 self
1890 }
1891
1892 fn buildsql(&mut self) -> String {
1893 self.fetch_sql();
1894 let sql = self.select().to_string();
1895 format!("( {} ) `{}`", sql, self.params.table)
1896 }
1897
1898 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1899 for field in fields {
1900 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1901 }
1902 self
1903 }
1904
1905 fn join(
1906 &mut self,
1907 main_table: &str,
1908 main_fields: &str,
1909 right_table: &str,
1910 right_fields: &str,
1911 ) -> &mut Self {
1912 let main_table = if main_table.is_empty() {
1913 self.params.table.clone()
1914 } else {
1915 main_table.to_string()
1916 };
1917 self.params.join_table = right_table.to_string();
1918 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1919 self
1920 }
1921
1922 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1923 let main_fields = if main_fields.is_empty() {
1924 "id"
1925 } else {
1926 main_fields
1927 };
1928 let second_fields = if second_fields.is_empty() {
1929 self.params.table.clone()
1930 } else {
1931 second_fields.to_string().clone()
1932 };
1933 let sec_table_name = format!("{}{}", table, "_2");
1934 let second_table = format!("{} {}", table, sec_table_name.clone());
1935 self.params.join_table = sec_table_name.clone();
1936 self.params.join.push(format!(
1937 " INNER JOIN {} ON {}.{} = {}.{}",
1938 second_table, self.params.table, main_fields, sec_table_name, second_fields
1939 ));
1940 self
1941 }
1942
1943 fn join_right(
1944 &mut self,
1945 main_table: &str,
1946 main_fields: &str,
1947 right_table: &str,
1948 right_fields: &str,
1949 ) -> &mut Self {
1950 let main_table = if main_table.is_empty() {
1951 self.params.table.clone()
1952 } else {
1953 main_table.to_string()
1954 };
1955 self.params.join_table = right_table.to_string();
1956 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1957 self
1958 }
1959
1960 fn join_full(
1961 &mut self,
1962 main_table: &str,
1963 main_fields: &str,
1964 right_table: &str,
1965 right_fields: &str,
1966 ) -> &mut Self {
1967 let main_table = if main_table.is_empty() {
1968 self.params.table.clone()
1969 } else {
1970 main_table.to_string()
1971 };
1972 self.params.join_table = right_table.to_string();
1973 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1974 self
1975 }
1976
1977 fn union(&mut self, sub_sql: &str) -> &mut Self {
1978 self.params.unions.push(format!("UNION {sub_sql}"));
1979 self
1980 }
1981
1982 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1983 self.params.unions.push(format!("UNION ALL {sub_sql}"));
1984 self
1985 }
1986
1987 fn lock_for_update(&mut self) -> &mut Self {
1988 self.params.lock_mode = "FOR UPDATE".to_string();
1989 self
1990 }
1991
1992 fn lock_for_share(&mut self) -> &mut Self {
1993 self.params.lock_mode = "FOR SHARE".to_string();
1994 self
1995 }
1996}