1use crate::pools;
2use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::Connection;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
10use std::collections::HashMap;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15
16lazy_static! {
17 static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
18 static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
19 static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
20}
21
22#[derive(Clone, Debug)]
23pub struct Sqlite {
24 pub connection: Connection,
26 pub default: String,
28
29 pub params: Params,
30}
31
32impl Sqlite {
33 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
34 let dsn = connection.clone().get_dsn();
35 let db_path = Path::new(&dsn);
36 if let Some(parent) = db_path.parent() {
37 if !parent.as_os_str().is_empty() && !parent.exists() {
38 if let Err(e) = std::fs::create_dir_all(parent) {
39 error!("sqlite 创建目录失败: {} {:?}", e, parent);
40 return Err(e.to_string());
41 }
42 info!("sqlite 自动创建目录: {:?}", parent);
43 }
44 }
45
46 let flags = OpenFlags::new().with_create().with_read_write();
47 match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
48 Ok(e) => {
49 if let Ok(mut guard) = DBS.lock() {
50 guard.insert(default.clone(), Arc::new(e));
51 } else {
52 error!("sqlite 获取数据库锁失败");
53 return Err("获取数据库锁失败".to_string());
54 }
55 Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("sqlite"),
59 })
60 }
61 Err(e) => {
62 error!("sqlite 启动失败: {} {}", e, dsn.as_str());
63 Err(e.to_string())
64 }
65 }
66 }
67 fn query_handle_static(
69 mut statement: Statement,
70 sql: &str,
71 table: &str,
72 thread_id: &str,
73 ) -> (bool, JsonValue) {
74 let mut data = array![];
75 while let State::Row = match statement.next() {
76 Ok(e) => e,
77 Err(e) => {
78 let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
79 if in_transaction {
80 error!("{} 查询事务: {} {}", thread_id, e, sql);
81 } else {
82 error!("{} 非事务查询: {} {}", thread_id, e, sql);
83 }
84 return (false, data);
85 }
86 } {
87 let mut list = object! {};
88 let mut index = 0;
89 for field in statement.column_names().iter() {
90 if !list[field.as_str()].is_null() {
91 index += 1;
92 continue;
93 }
94 match statement.column_type(field.as_str()) {
95 Ok(types) => match types {
96 Type::String => {
97 if let Ok(data) = statement.read::<String, _>(index) {
98 match data.as_str() {
99 "false" => {
100 list[field.as_str()] = JsonValue::from(false);
101 }
102 "true" => {
103 list[field.as_str()] = JsonValue::from(true);
104 }
105 _ => {
106 list[field.as_str()] = JsonValue::from(data.clone());
107 }
108 }
109 }
110 }
111 Type::Integer => {
112 let fields_cache =
113 FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
114 if let Some(fields) = fields_cache {
115 if fields[field.clone()].is_empty() {
116 if let Ok(data) = statement.read::<i64, _>(index) {
117 list[field.as_str()] = data.into();
118 }
119 index += 1;
120 continue;
121 }
122 let field_type =
123 fields[field.clone()]["type"].as_str().unwrap_or("");
124 match field_type {
125 "INTEGER" => {
126 if let Ok(data) = statement.read::<i64, _>(index) {
127 list[field.as_str()] = JsonValue::from(data == 1);
128 }
129 }
130 x if x.contains("int(") => {
131 if let Ok(data) = statement.read::<i64, _>(index) {
132 list[field.as_str()] = data.into();
133 }
134 }
135 x if x.contains("decimal(") && x.ends_with(",0)") => {
136 if let Ok(data) = statement.read::<f64, _>(index) {
137 list[field.as_str()] = data.into();
138 }
139 }
140 _ => {
141 if let Ok(data) = statement.read::<i64, _>(index) {
142 list[field.as_str()] = data.into();
143 }
144 }
145 }
146 } else if let Ok(data) = statement.read::<i64, _>(index) {
147 list[field.as_str()] = JsonValue::from(data);
148 }
149 }
150 Type::Float => {
151 if let Ok(data) = statement.read::<f64, _>(index) {
152 list[field.as_str()] = JsonValue::from(data);
153 }
154 }
155 Type::Binary => {
156 if let Ok(data) = statement.read::<String, _>(index) {
157 list[field.as_str()] = JsonValue::from(data.clone());
158 }
159 }
160 Type::Null => match statement.read::<String, _>(index) {
161 Ok(data) => {
162 list[field.as_str()] = JsonValue::from(data.clone());
163 }
164 Err(_) => match statement.read::<f64, _>(index) {
165 Ok(data) => {
166 if data == 0.0 {
167 list[field.as_str()] = JsonValue::from("");
168 } else {
169 list[field.as_str()] = JsonValue::from(data);
170 }
171 }
172 Err(_) => match statement.read::<i64, _>(index) {
173 Ok(data) => {
174 if data == 0 {
175 list[field.as_str()] = JsonValue::from("");
176 } else {
177 list[field.as_str()] = JsonValue::from(data);
178 }
179 }
180 Err(e) => {
181 error!("Type:{} {:?}", field.as_str(), e);
182 }
183 },
184 },
185 },
186 },
187 Err(e) => {
188 error!("query Err: {e:?}");
189 }
190 }
191 index += 1;
192 }
193 let _ = data.push(list);
194 }
195 (true, data)
196 }
197
198 pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
199 let thread_id = format!("{:?}", thread::current().id());
200 let table = self.params.table.clone();
201
202 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
203 if self.connection.debug {
204 info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
205 }
206
207 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
208 match db.prepare(sql.clone()) {
209 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
210 Err(e) => {
211 error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
212 (false, e.to_string().into())
213 }
214 }
215 });
216
217 match result {
218 Some(r) => r,
219 None => {
220 error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
221 (false, JsonValue::from("未找到事务连接"))
222 }
223 }
224 } else {
225 if self.connection.debug {
226 info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
227 }
228 let dbs = match DBS.lock() {
229 Ok(dbs) => dbs,
230 Err(e) => {
231 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
232 return (false, JsonValue::from("数据库锁定失败"));
233 }
234 };
235 let db = match dbs.get(&self.default) {
236 Some(db) => db.clone(),
237 None => {
238 error!(
239 "{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
240 self.default
241 );
242 return (false, JsonValue::from("未找到默认数据库配置"));
243 }
244 };
245 drop(dbs);
246 let result = match db.prepare(sql.clone()) {
247 Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
248 Err(e) => {
249 error!("{thread_id} 查询非事务: Err: {e}");
250 (false, e.to_string().into())
251 }
252 };
253 result
254 }
255 }
256
257 pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
258 let thread_id = format!("{:?}", thread::current().id());
259
260 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
261 if self.connection.debug {
262 info!("{} 执行事务: sql: {}", thread_id, sql.clone());
263 }
264
265 let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
266 match db.execute(sql.clone()) {
267 Ok(_) => {
268 let count = db.change_count();
269 (true, JsonValue::from(count))
270 }
271 Err(e) => (false, JsonValue::from(e.to_string())),
272 }
273 });
274
275 match result {
276 Some((true, count)) => {
277 if self.connection.debug {
278 info!("{} count:{}", thread_id, count);
279 }
280 (true, count)
281 }
282 Some((false, err)) => {
283 error!(
284 "{} 执行事务: \r\nErr: {}\r\n{}",
285 thread_id,
286 err,
287 sql.clone()
288 );
289 (false, err)
290 }
291 None => {
292 error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
293 (false, JsonValue::from("未找到事务连接"))
294 }
295 }
296 } else {
297 if self.connection.debug {
298 info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
299 }
300 let dbs = match DBS.lock() {
301 Ok(dbs) => dbs,
302 Err(e) => {
303 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
304 return (false, JsonValue::from("数据库锁定失败"));
305 }
306 };
307
308 let db = match dbs.get(&self.default) {
309 Some(db) => db.clone(),
310 None => {
311 error!(
312 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
313 thread_id, self.default, sql
314 );
315 return (false, JsonValue::from("未找到默认数据库配置"));
316 }
317 };
318 drop(dbs);
319 match db.execute(sql.clone()) {
320 Ok(_) => {
321 let count = db.change_count();
322 if self.connection.debug {
323 info!(
324 "{} count: {} total_count: {}",
325 thread_id,
326 count,
327 db.total_change_count()
328 );
329 }
330 (true, JsonValue::from(count))
331 }
332 Err(e) => {
333 error!(
334 "{} 执行非事务: Err: {}\r\nSQL: {}",
335 thread_id,
336 e,
337 sql.clone()
338 );
339 (false, JsonValue::from(e.to_string()))
340 }
341 }
342 }
343 }
344}
345
346impl DbMode for Sqlite {
347 fn database_tables(&mut self) -> JsonValue {
348 let sql = "select name from sqlite_master where type='table' order by name;".to_string();
349 match self.sql(sql.as_str()) {
350 Ok(e) => {
351 let mut list = vec![];
352 for item in e.members() {
353 list.push(item["name"].clone());
354 }
355 list.into()
356 }
357 Err(_) => {
358 array![]
359 }
360 }
361 }
362 fn database_create(&mut self, name: &str) -> bool {
363 let current_dsn = self.connection.clone().get_dsn();
364 let current_path = Path::new(¤t_dsn);
365
366 let new_path = if let Some(parent) = current_path.parent() {
367 if parent.as_os_str().is_empty() {
368 Path::new(name).to_path_buf()
369 } else {
370 parent.join(name)
371 }
372 } else {
373 Path::new(name).to_path_buf()
374 };
375
376 if let Some(parent) = new_path.parent() {
377 if !parent.as_os_str().is_empty() && !parent.exists() {
378 if let Err(e) = std::fs::create_dir_all(parent) {
379 error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
380 return false;
381 }
382 }
383 }
384
385 let flags = OpenFlags::new().with_create().with_read_write();
386 match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
387 Ok(_) => true,
388 Err(e) => {
389 error!(
390 "sqlite database_create 创建数据库失败: {} {:?}",
391 e, new_path
392 );
393 false
394 }
395 }
396 }
397
398 fn truncate(&mut self, table: &str) -> bool {
399 let sql = format!("DELETE FROM `{table}`");
400 let (state, _) = self.execute(sql);
401 state
402 }
403}
404
405impl Mode for Sqlite {
406 fn table_create(&mut self, options: TableOptions) -> JsonValue {
407 let mut sql = String::new();
408 let mut unique_fields = String::new();
409 let mut unique_name = String::new();
410 let mut unique = String::new();
411 for item in options.table_unique.iter() {
412 if unique_fields.is_empty() {
413 unique_fields = format!("`{item}`");
414 unique_name = format!("unique_{item}");
415 } else {
416 unique_fields = format!("{unique_fields},`{item}`");
417 unique_name = format!("{unique_name}_{item}");
418 }
419 unique = format!(
420 "CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
421 unique_name, options.table_name, unique_fields
422 );
423 }
424
425 let mut index = vec![];
426 for row in options.table_index.iter() {
427 let mut index_fields = String::new();
428 let mut index_name = String::new();
429 for item in row.iter() {
430 if index_fields.is_empty() {
431 index_fields = format!("`{item}`");
432 index_name = format!("index_{item}");
433 } else {
434 index_fields = format!("{index_fields},`{item}`");
435 index_name = format!("{index_name}_{item}");
436 }
437 }
438 index.push(format!(
439 "CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
440 index_name, options.table_name, index_fields
441 ));
442 }
443
444 for (name, field) in options.table_fields.entries() {
445 let row = br_fields::field("sqlite", name, field.clone());
446 sql = format!("{sql} {row},\r\n");
447 }
448
449 sql = sql.trim_end_matches(",\r\n").to_string();
450
451 let sql = format!(
452 "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
453 options.table_name, sql
454 );
455 if self.params.sql {
456 let mut list = vec![sql];
457 if !unique.is_empty() {
458 list.push(unique)
459 }
460 if !index.is_empty() {
461 list.extend(index)
462 }
463
464 return JsonValue::from(list.join(""));
465 }
466
467 let thread_id = format!("{:?}", thread::current().id());
468
469 let (_, table_exists) = self.query(format!(
470 "SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
471 options.table_name
472 ));
473 let is_new_table = table_exists.is_empty();
474
475 let (state, _) = self.execute(sql.clone());
476 if state {
477 if is_new_table {
478 if !unique.is_empty() {
479 let (state, _) = self.execute(unique.clone());
480 info!(
481 "{} {} 唯一索引创建:{}",
482 thread_id, options.table_name, state
483 );
484 }
485 for sql in index.iter() {
486 let (state, _) = self.execute(sql.clone());
487 info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
488 }
489 }
490 JsonValue::from(true)
491 } else {
492 JsonValue::from(false)
493 }
494 }
495 fn table_update(&mut self, options: TableOptions) -> JsonValue {
496 let thread_id = format!("{:?}", thread::current().id());
497
498 let mut sql = String::new();
499 let mut add = vec![];
500 let mut del = vec![];
501 let mut put = vec![];
502
503 let (_, mut fields_list) =
504 self.query(format!("pragma table_info ('{}')", options.table_name));
505 let mut field_old = object! {};
506 for item in fields_list.members_mut() {
507 item["dflt_value"] = item["dflt_value"]
508 .to_string()
509 .trim_start_matches("'")
510 .trim_end_matches("'")
511 .into();
512 if let Some(name) = item["name"].as_str() {
513 field_old[name] = item.clone();
514 if options.table_fields[name].is_empty() {
515 del.push(name.to_string());
516 }
517 }
518 }
519
520 let mut fields_list = vec![];
521 let mut fields_list_new = vec![];
522
523 for (name, field) in options.table_fields.entries() {
524 if field_old[name].is_empty() {
525 add.push(name.to_string());
526 } else {
527 fields_list.push(name.to_string());
528 fields_list_new.push(name.to_string());
529 let field_mode = field["mode"].as_str().unwrap_or("");
530 let old_value = match field_mode {
531 "select" => {
532 if field_old[name]["dflt_value"].clone().is_empty() {
533 "".to_string()
534 } else {
535 field_old[name]["dflt_value"].clone().to_string()
536 }
537 }
538 "switch" => (field_old[name]["dflt_value"]
539 .to_string()
540 .parse::<i32>()
541 .unwrap_or(0)
542 == 1)
543 .to_string(),
544 _ => field_old[name]["dflt_value"].clone().to_string(),
545 };
546 let new_value = match field_mode {
547 "select" => field["def"]
548 .members()
549 .map(|x| x.to_string())
550 .collect::<Vec<String>>()
551 .join(","),
552 _ => field["def"].clone().to_string(),
553 };
554 if old_value != new_value {
555 info!(
556 "{} 差异化当前: {} old_value: {} new_value: {} {}",
557 options.table_name,
558 name,
559 old_value,
560 new_value,
561 old_value != new_value
562 );
563 info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
564 put.push(name.to_string());
565 } else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
566 && name != options.table_key
567 {
568 info!("{} 主键替换: {}", options.table_name, name);
569 put.push(name.to_string());
570 }
571 }
572 }
573
574 let mut unique_fields = String::new();
575 let mut unique_name = String::new();
576 let mut unique = String::new();
577
578 for item in options.table_unique.iter() {
580 if unique_fields.is_empty() {
581 unique_fields = format!("`{item}`");
582 unique_name = format!("unique_{item}");
583 } else {
584 unique_fields = format!("{unique_fields},`{item}`");
585 unique_name = format!("{unique_name}_{item}");
586 }
587 unique = format!(
588 "CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
589 options.table_name, unique_name, options.table_name, unique_fields
590 )
591 }
592
593 let mut index = vec![];
595 for row in options.table_index.iter() {
596 let mut index_fields = String::new();
597 let mut index_name = String::new();
598 for item in row.iter() {
599 if index_fields.is_empty() {
600 index_fields = item.to_string();
601 index_name = format!("index_{item}");
602 } else {
603 index_fields = format!("{index_fields},{item}");
604 index_name = format!("{index_name}_{item}");
605 }
606 }
607 index.push(format!(
608 "CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
609 options.table_name, index_name, options.table_name, index_fields
610 ));
611 }
612 for (name, field) in options.table_fields.entries() {
613 let row = br_fields::field("sqlite", name, field.clone());
614 sql = format!("{sql} {row},\r\n");
615 }
616
617 if !unique.is_empty() || !index.is_empty() {
618 let unique_text = unique.clone();
619 let (_, unique_old) =
620 self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
621 let mut index_old_list = vec![];
622 let mut index_new_list = vec![];
623 for item in unique_old.members() {
624 let origin = item["origin"].as_str().unwrap_or("");
625 let unique_1 = item["unique"].as_usize().unwrap_or(0);
626 let name = item["name"].as_str().unwrap_or("");
627
628 if origin == "c" && unique_1 == 1 {
629 if unique.contains(format!(" {name} ").as_str()) {
630 unique = "".to_string();
631 }
632 continue;
633 }
634 if origin == "c" && unique_1 == 0 {
635 index_old_list.push(item);
636 for item in index.iter() {
637 if item.contains(format!(" {name} ").as_str()) {
638 index_new_list.push(item.clone());
639 }
640 }
641 continue;
642 }
643 }
644 if unique.is_empty() {
645 if index_old_list.len() == index.len()
646 && index_old_list.len() == index_new_list.len()
647 {
648 index = vec![];
649 } else {
650 unique = unique_text;
651 }
652 }
653 }
654
655 sql = sql.trim_end_matches(",\r\n").to_string();
656 sql = format!(
657 "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
658 options.table_name, sql
659 );
660
661 let sqls = format!(
662 "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
663 options.table_name,
664 fields_list_new.join("`,`"),
665 fields_list.join("`,`")
666 );
667 let drop_sql = format!("drop table {};\r\n", options.table_name);
668 let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
669 let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
670
671 if self.params.sql {
672 let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
673 if !unique.is_empty() {
674 list.push(unique)
675 }
676 if !index.is_empty() {
677 list.extend(index)
678 }
679 return JsonValue::from(list.join(""));
680 }
681
682 if add.is_empty()
683 && del.is_empty()
684 && unique.is_empty()
685 && index.is_empty()
686 && put.is_empty()
687 {
688 return JsonValue::from(-1);
689 }
690
691 let (state, _) = self.execute(sql.clone());
692 let data = match state {
693 true => {
694 let (state, _) = self.execute(sqls.clone());
695 match state {
696 true => {
697 let (state, _) = self.execute(drop_sql);
698 match state {
699 true => {
700 let (state, _) = self.execute(alter_sql);
701 match state {
702 true => {
703 if !unique.is_empty() {
704 let (state, _) = self.execute(unique.clone());
705 info!(
706 "{} {} 唯一索引创建:{}",
707 thread_id, options.table_name, state
708 );
709 }
710 for index_sql in index.iter() {
711 let (state, _) = self.execute(index_sql.clone());
712 match state {
713 true => {}
714 false => {
715 error!(
716 "{} 索引创建失败: {} {}",
717 options.table_name, state, index_sql
718 );
719 return JsonValue::from(0);
720 }
721 };
722 }
723
724 return JsonValue::from(1);
725 }
726 false => {
727 error!("{} 修改表名失败", options.table_name);
728 return JsonValue::from(0);
729 }
730 }
731 }
732 false => {
733 error!("{} 删除本表失败", options.table_name);
734 return JsonValue::from(0);
735 }
736 }
737 }
738 false => {
739 error!(
740 "{} 添加tmp表记录失败 {:#} {:#}",
741 options.table_name, sql, sqls
742 );
743 let sql = format!("drop table {}_tmp", options.table_name);
744 let (_, _) = self.execute(sql);
745 0
746 }
747 }
748 }
749 false => {
750 error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
751 let (_, _) = self.execute(drop_sql_temp);
752 0
753 }
754 };
755 JsonValue::from(data)
756 }
757
758 fn table_info(&mut self, table: &str) -> JsonValue {
759 if let Ok(guard) = FIELDS.lock() {
760 if let Some(cached) = guard.get(table) {
761 return cached.clone();
762 }
763 }
764 let sql = format!("PRAGMA table_info({table})");
765 let (state, data) = self.query(sql);
766
767 match state {
768 true => {
769 let mut fields = object! {};
770 for item in data.members() {
771 if let Some(name) = item["name"].as_str() {
772 fields[name] = item.clone();
773 }
774 }
775 if let Ok(mut guard) = FIELDS.lock() {
776 guard.insert(table.to_string(), fields.clone());
777 }
778 data
779 }
780 false => object! {},
781 }
782 }
783
784 fn table_is_exist(&mut self, name: &str) -> bool {
785 let sql = format!(
786 "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
787 );
788 let (state, data) = self.query(sql);
789 if state && !data.is_empty() {
790 let count = data[0]["count"].as_i64().unwrap_or(0);
791 return count > 0;
792 }
793 false
794 }
795
796 fn table(&mut self, name: &str) -> &mut Sqlite {
797 self.params = Params::default(self.connection.mode.str().as_str());
798 let table_name = format!("{}{}", self.connection.prefix, name);
799 if !super::sql_safety::validate_table_name(&table_name) {
800 error!("Invalid table name: {}", name);
801 }
802 self.params.table = table_name.clone();
803 self.params.join_table = table_name;
804 self
805 }
806 fn change_table(&mut self, name: &str) -> &mut Self {
807 self.params.join_table = name.to_string();
808 self
809 }
810 fn autoinc(&mut self) -> &mut Self {
811 self.params.autoinc = true;
812 self
813 }
814
815 fn timestamps(&mut self) -> &mut Self {
816 self.params.timestamps = true;
817 self
818 }
819
820 fn fetch_sql(&mut self) -> &mut Self {
821 self.params.sql = true;
822 self
823 }
824
825 fn order(&mut self, field: &str, by: bool) -> &mut Self {
826 self.params.order[field] = {
827 if by {
828 "DESC"
829 } else {
830 "ASC"
831 }
832 }
833 .into();
834 self
835 }
836
837 fn group(&mut self, field: &str) -> &mut Self {
838 let fields: Vec<&str> = field.split(",").collect();
839 for field in fields.iter() {
840 let fields = field.to_string();
841 self.params.group[fields.as_str()] = fields.clone().into();
842 self.params.fields[fields.as_str()] = fields.clone().into();
843 }
844 self
845 }
846
847 fn distinct(&mut self) -> &mut Self {
848 self.params.distinct = true;
849 self
850 }
851 fn json(&mut self, field: &str) -> &mut Self {
852 let list: Vec<&str> = field.split(",").collect();
853 for item in list.iter() {
854 self.params.json[item.to_string().as_str()] = item.to_string().into();
855 }
856 self
857 }
858
859 fn location(&mut self, field: &str) -> &mut Self {
860 let list: Vec<&str> = field.split(",").collect();
861 for item in list.iter() {
862 self.params.location[item.to_string().as_str()] = item.to_string().into();
863 }
864 self
865 }
866
867 fn field(&mut self, field: &str) -> &mut Self {
868 let list: Vec<&str> = field.split(",").collect();
869 let join_table = if self.params.join_table.is_empty() {
870 self.params.table.clone()
871 } else {
872 self.params.join_table.clone()
873 };
874 for item in list.iter() {
875 let item = item.to_string();
876 if item.contains(" as ") {
877 let text = item.split(" as ").collect::<Vec<&str>>().clone();
878 self.params.fields[item] =
879 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
880 } else {
881 self.params.fields[item] = format!("{join_table}.`{item}`").into();
882 }
883 }
884 self
885 }
886
887 fn field_raw(&mut self, expr: &str) -> &mut Self {
888 self.params.fields[expr] = expr.into();
889 self
890 }
891
892 fn hidden(&mut self, name: &str) -> &mut Self {
893 let hidden: Vec<&str> = name.split(",").collect();
894 let sql = format!("PRAGMA table_info({})", self.params.table);
895 let (_, data) = self.query(sql);
896 for item in data.members() {
897 if let Some(name) = item["name"].as_str() {
898 if !hidden.contains(&name) {
899 self.params.fields[name] = name.into();
900 }
901 }
902 }
903 self
904 }
905
906 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
907 for f in field.split('|') {
908 if !super::sql_safety::validate_field_name(f) {
909 error!("Invalid field name: {}", f);
910 }
911 }
912 if !super::sql_safety::validate_compare_orator(compare) {
913 error!("Invalid compare operator: {}", compare);
914 }
915 let join_table = if self.params.join_table.is_empty() {
916 self.params.table.clone()
917 } else {
918 self.params.join_table.clone()
919 };
920
921 if value.is_boolean() {
922 if value.as_bool().unwrap_or(false) {
923 value = 1.into();
924 } else {
925 value = 0.into();
926 }
927 }
928
929 match compare {
930 "between" => {
931 self.params.where_and.push(format!(
932 "{}.`{}` between '{}' AND '{}'",
933 join_table, field, value[0], value[1]
934 ));
935 }
936 "set" => {
937 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
938 let mut wheredata = vec![];
939 for item in list.iter() {
940 wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
941 }
942 self.params
943 .where_and
944 .push(format!("({})", wheredata.join(" or ")));
945 }
946 "notin" => {
947 let mut text = String::new();
948 for item in value.members() {
949 text = format!("{text},'{item}'");
950 }
951 text = text.trim_start_matches(",").into();
952 self.params
953 .where_and
954 .push(format!("{join_table}.`{field}` not in ({text})"));
955 }
956 "in" => {
957 let mut text = String::new();
958 if value.is_array() {
959 for item in value.members() {
960 text = format!("{text},'{item}'");
961 }
962 } else {
963 let value = value.to_string();
964 let value: Vec<&str> = value.split(",").collect();
965 for item in value.iter() {
966 text = format!("{text},'{item}'");
967 }
968 }
969 text = text.trim_start_matches(",").into();
970
971 self.params
972 .where_and
973 .push(format!("{join_table}.`{field}` {compare} ({text})"));
974 }
975 "=" => {
976 if value.is_null() {
977 self.params
978 .where_and
979 .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
980 } else {
981 self.params
982 .where_and
983 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
984 }
985 }
986 "json_contains" => {
990 if value.is_array() {
991 if value.is_empty() {
992 self.params.where_and.push("1=0".to_string());
993 } else {
994 let mut parts = vec![];
995 for item in value.members() {
996 let escaped = super::sql_safety::escape_string(&item.to_string());
997 parts
998 .push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
999 }
1000 self.params
1001 .where_and
1002 .push(format!("({})", parts.join(" OR ")));
1003 }
1004 } else {
1005 let escaped = super::sql_safety::escape_string(&value.to_string());
1006 self.params
1007 .where_and
1008 .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1009 }
1010 }
1011 _ => {
1012 if value.is_null() {
1013 self.params
1014 .where_and
1015 .push(format!("{join_table}.`{field}` {compare} {value}"));
1016 } else {
1017 self.params
1018 .where_and
1019 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1020 }
1021 }
1022 }
1023 self
1024 }
1025 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1026 for f in field.split('|') {
1027 if !super::sql_safety::validate_field_name(f) {
1028 error!("Invalid field name: {}", f);
1029 }
1030 }
1031 if !super::sql_safety::validate_compare_orator(compare) {
1032 error!("Invalid compare operator: {}", compare);
1033 }
1034 let join_table = if self.params.join_table.is_empty() {
1035 self.params.table.clone()
1036 } else {
1037 self.params.join_table.clone()
1038 };
1039
1040 if value.is_boolean() {
1041 if value.as_bool().unwrap_or(false) {
1042 value = 1.into();
1043 } else {
1044 value = 0.into();
1045 }
1046 }
1047 match compare {
1048 "between" => {
1049 self.params.where_or.push(format!(
1050 "{}.`{}` between '{}' AND '{}'",
1051 join_table, field, value[0], value[1]
1052 ));
1053 }
1054 "set" => {
1055 let tt = value.to_string().replace(",", "%");
1056 self.params
1057 .where_or
1058 .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1059 }
1060 "notin" => {
1061 let mut text = String::new();
1062 for item in value.members() {
1063 text = format!("{text},'{item}'");
1064 }
1065 text = text.trim_start_matches(",").into();
1066 self.params
1067 .where_or
1068 .push(format!("{join_table}.`{field}` not in ({text})"));
1069 }
1070 "in" => {
1071 let mut text = String::new();
1072 if value.is_array() {
1073 for item in value.members() {
1074 text = format!("{text},'{item}'");
1075 }
1076 } else {
1077 let value = value.as_str().unwrap_or("");
1078 let value: Vec<&str> = value.split(",").collect();
1079 for item in value.iter() {
1080 text = format!("{text},'{item}'");
1081 }
1082 }
1083 text = text.trim_start_matches(",").into();
1084 self.params
1085 .where_or
1086 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1087 }
1088 "json_contains" => {
1092 if value.is_array() {
1093 if value.is_empty() {
1094 self.params.where_or.push("1=0".to_string());
1095 } else {
1096 let mut parts = vec![];
1097 for item in value.members() {
1098 let escaped = super::sql_safety::escape_string(&item.to_string());
1099 parts
1100 .push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
1101 }
1102 self.params
1103 .where_or
1104 .push(format!("({})", parts.join(" OR ")));
1105 }
1106 } else {
1107 let escaped = super::sql_safety::escape_string(&value.to_string());
1108 self.params
1109 .where_or
1110 .push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
1111 }
1112 }
1113 _ => {
1114 self.params
1115 .where_or
1116 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1117 }
1118 }
1119 self
1120 }
1121 fn where_raw(&mut self, expr: &str) -> &mut Self {
1122 self.params.where_and.push(expr.to_string());
1123 self
1124 }
1125
1126 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1127 self.params
1128 .where_and
1129 .push(format!("`{field}` IN ({sub_sql})"));
1130 self
1131 }
1132
1133 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1134 self.params
1135 .where_and
1136 .push(format!("`{field}` NOT IN ({sub_sql})"));
1137 self
1138 }
1139
1140 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1141 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1142 self
1143 }
1144
1145 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1146 self.params
1147 .where_and
1148 .push(format!("NOT EXISTS ({sub_sql})"));
1149 self
1150 }
1151
1152 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1153 self.params.where_column = format!(
1154 "{}.`{}` {} {}.`{}`",
1155 self.params.table, field_a, compare, self.params.table, field_b
1156 );
1157 self
1158 }
1159
1160 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1161 self.params
1162 .update_column
1163 .push(format!("{field_a} = {compare}"));
1164 self
1165 }
1166
1167 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1168 self.params.page = page;
1169 self.params.limit = limit;
1170 self
1171 }
1172
1173 fn limit(&mut self, count: i32) -> &mut Self {
1174 self.params.limit_only = count;
1175 self
1176 }
1177
1178 fn column(&mut self, field: &str) -> JsonValue {
1179 self.field(field);
1180 self.group(field);
1181 let sql = self.params.select_sql();
1182 if self.params.sql {
1183 return JsonValue::from(sql.clone());
1184 }
1185 self.table_info(self.params.table.clone().as_str());
1186 let (state, data) = self.query(sql);
1187 if state {
1188 let mut list = array![];
1189 for item in data.members() {
1190 if self.params.json[field].is_empty() {
1191 let _ = list.push(item[field].clone());
1192 } else {
1193 let data =
1194 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1195 let _ = list.push(data);
1196 }
1197 }
1198 list
1199 } else {
1200 array![]
1201 }
1202 }
1203
1204 fn count(&mut self) -> JsonValue {
1205 self.params.fields["count"] = "count(*) as count".to_string().into();
1206 let sql = self.params.select_sql();
1207 if self.params.sql {
1208 return JsonValue::from(sql.clone());
1209 }
1210 let (state, data) = self.query(sql);
1211 match state {
1212 true => data[0]["count"].clone(),
1213 false => JsonValue::from(0),
1214 }
1215 }
1216
1217 fn max(&mut self, field: &str) -> JsonValue {
1218 self.params.fields[field] = format!("max({field}) as {field}").into();
1219 let sql = self.params.select_sql();
1220 if self.params.sql {
1221 return JsonValue::from(sql.clone());
1222 }
1223 let (state, data) = self.query(sql);
1224 if state {
1225 if data.len() > 1 {
1226 return data;
1227 }
1228 data[0][field].clone()
1229 } else {
1230 JsonValue::from(0.0)
1231 }
1232 }
1233
1234 fn min(&mut self, field: &str) -> JsonValue {
1235 self.params.fields[field] = format!("min({field}) as {field}").into();
1236 let sql = self.params.select_sql();
1237 let (state, data) = self.query(sql);
1238 if state {
1239 if data.len() > 1 {
1240 return data;
1241 }
1242 data[0][field].clone()
1243 } else {
1244 JsonValue::from(0.0)
1245 }
1246 }
1247
1248 fn sum(&mut self, field: &str) -> JsonValue {
1249 self.params.fields[field] = format!("sum({field}) as {field}").into();
1250 let sql = self.params.select_sql();
1251 if self.params.sql {
1252 return JsonValue::from(sql.clone());
1253 }
1254 let (state, data) = self.query(sql);
1255 match state {
1256 true => {
1257 if data.len() > 1 {
1258 return data;
1259 }
1260 if self.params.fields.len() > 1 {
1261 return data[0].clone();
1262 }
1263 data[0][field].clone()
1264 }
1265 false => JsonValue::from(0),
1266 }
1267 }
1268 fn avg(&mut self, field: &str) -> JsonValue {
1269 self.params.fields[field] = format!("avg({field}) as {field}").into();
1270 let sql = self.params.select_sql();
1271 if self.params.sql {
1272 return JsonValue::from(sql.clone());
1273 }
1274 let (state, data) = self.query(sql);
1275 if state {
1276 if data.len() > 1 {
1277 return data;
1278 }
1279 data[0][field].clone()
1280 } else {
1281 JsonValue::from(0)
1282 }
1283 }
1284 fn having(&mut self, expr: &str) -> &mut Self {
1285 self.params.having.push(expr.to_string());
1286 self
1287 }
1288 fn select(&mut self) -> JsonValue {
1289 let sql = self.params.select_sql();
1290 if self.params.sql {
1291 return JsonValue::from(sql.clone());
1292 }
1293 self.table_info(self.params.table.clone().as_str());
1294 let (state, mut data) = self.query(sql.clone());
1295 match state {
1296 true => {
1297 for (field, _) in self.params.json.entries() {
1298 for item in data.members_mut() {
1299 if !item[field].is_empty() {
1300 let json = item[field].to_string();
1301 item[field] = match json::parse(&json) {
1302 Ok(e) => e,
1303 Err(_) => JsonValue::from(json),
1304 };
1305 }
1306 }
1307 }
1308 data.clone()
1309 }
1310 false => {
1311 error!("{data:?}");
1312 array![]
1313 }
1314 }
1315 }
1316 fn find(&mut self) -> JsonValue {
1317 self.params.page = 1;
1318 self.params.limit = 1;
1319 let sql = self.params.select_sql();
1320 if self.params.sql {
1321 return JsonValue::from(sql.clone());
1322 }
1323
1324 self.table_info(self.params.table.clone().as_str());
1325 let (state, mut data) = self.query(sql.clone());
1326 match state {
1327 true => {
1328 if data.is_empty() {
1329 return object! {};
1330 }
1331 for (field, _) in self.params.json.entries() {
1332 if !data[0][field].is_empty() {
1333 let json = data[0][field].to_string();
1334 let json = json::parse(&json).unwrap_or(array![]);
1335 data[0][field] = json;
1336 } else {
1337 data[0][field] = array![];
1338 }
1339 }
1340 data[0].clone()
1341 }
1342 false => {
1343 error!("{data:?}");
1344 object! {}
1345 }
1346 }
1347 }
1348
1349 fn value(&mut self, field: &str) -> JsonValue {
1350 self.params.fields = object! {};
1351 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1352 self.params.page = 1;
1353 self.params.limit = 1;
1354 let sql = self.params.select_sql();
1355 if self.params.sql {
1356 return JsonValue::from(sql.clone());
1357 }
1358 self.table_info(self.params.table.clone().as_str());
1359 let (state, mut data) = self.query(sql.clone());
1360 match state {
1361 true => {
1362 for (field, _) in self.params.json.entries() {
1363 if !data[0][field].is_empty() {
1364 let json = data[0][field].to_string();
1365 let json = json::parse(&json).unwrap_or(array![]);
1366 data[0][field] = json;
1367 } else {
1368 data[0][field] = array![];
1369 }
1370 }
1371 data[0][field].clone()
1372 }
1373 false => {
1374 if self.connection.debug {
1375 info!("{data:?}");
1376 }
1377 JsonValue::Null
1378 }
1379 }
1380 }
1381
1382 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1383 let mut fields = vec![];
1384 let mut values = vec![];
1385
1386 if !self.params.autoinc && data["id"].is_empty() {
1387 let thread_id = format!("{:?}", std::thread::current().id());
1388 let thread_num: u64 = thread_id
1389 .trim_start_matches("ThreadId(")
1390 .trim_end_matches(")")
1391 .parse()
1392 .unwrap_or(0);
1393 data["id"] = format!(
1394 "{:X}{:X}",
1395 Local::now().timestamp_nanos_opt().unwrap_or(0),
1396 thread_num
1397 )
1398 .into();
1399 }
1400 for (field, value) in data.entries() {
1401 fields.push(format!("`{field}`"));
1402
1403 if value.is_string() {
1404 if value.to_string().contains("'") {
1405 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1406 continue;
1407 } else if value.to_string().contains('"') {
1408 values.push(format!("'{value}'"));
1409 continue;
1410 } else {
1411 values.push(format!("\"{value}\""));
1412 continue;
1413 }
1414 } else if value.is_array() || value.is_object() {
1415 if self.params.json[field].is_empty() {
1416 values.push(format!("'{value}'"));
1417 } else {
1418 let json = value.to_string();
1419 let json = json.replace("'", "''");
1420 values.push(format!("'{json}'"));
1421 }
1422 continue;
1423 } else if value.is_number() || value.is_boolean() || value.is_null() {
1424 values.push(format!("{value}"));
1425 continue;
1426 } else {
1427 values.push(format!("'{value}'"));
1428 continue;
1429 }
1430 }
1431 let fields = fields.join(",");
1432 let values = values.join(",");
1433
1434 let sql = format!(
1435 "INSERT INTO `{}` ({}) VALUES ({});",
1436 self.params.table, fields, values
1437 );
1438 if self.params.sql {
1439 return JsonValue::from(sql.clone());
1440 }
1441 let (state, ids) = self.execute(sql);
1442 match state {
1443 true => {
1444 if self.params.autoinc {
1445 let (state, ids) =
1446 self.query(format!("select max(id) as id from {}", self.params.table));
1447 return match state {
1448 true => ids[0]["id"].clone(),
1449 false => {
1450 error!("{ids}");
1451 JsonValue::from("")
1452 }
1453 };
1454 }
1455 data["id"].clone()
1456 }
1457 false => {
1458 error!("{ids}");
1459 JsonValue::from("")
1460 }
1461 }
1462 }
1463 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1464 let mut fields = String::new();
1465
1466 if !self.params.autoinc && data[0]["id"].is_empty() {
1467 data[0]["id"] = "".into();
1468 }
1469 for (field, _) in data[0].entries() {
1470 fields = format!("{fields},`{field}`");
1471 }
1472 fields = fields.trim_start_matches(",").to_string();
1473
1474 let core_count = num_cpus::get();
1475 let mut p = pools::Pool::new(core_count * 4);
1476 let autoinc = self.params.autoinc;
1477 for list in data.members() {
1478 let mut item = list.clone();
1479 p.execute(move |pcindex| {
1480 if !autoinc && item["id"].is_empty() {
1481 let id = format!(
1482 "{:X}{:X}",
1483 Local::now().timestamp_nanos_opt().unwrap_or(0),
1484 pcindex
1485 );
1486 item["id"] = id.into();
1487 }
1488 let mut values = "".to_string();
1489 for (_, value) in item.entries() {
1490 if value.is_string() {
1491 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1492 } else if value.is_number() || value.is_boolean() {
1493 values = format!("{values},{value}");
1494 } else {
1495 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1496 }
1497 }
1498 values = format!("({})", values.trim_start_matches(","));
1499 array![item["id"].clone(), values]
1500 });
1501 }
1502 let (ids_list, mut values) = p.insert_all();
1503
1504 values = values.trim_start_matches(",").to_string();
1505
1506 let sql = format!(
1507 "INSERT INTO {} ({}) VALUES {};",
1508 self.params.table, fields, values
1509 );
1510 if self.params.sql {
1511 return JsonValue::from(sql.clone());
1512 }
1513 let (state, data) = self.execute(sql.clone());
1514 match state {
1515 true => {
1516 if self.params.autoinc {
1517 let (state, ids) = self.query(format!(
1518 "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1519 self.params.table,
1520 ids_list.len()
1521 ));
1522 return match state {
1523 true => {
1524 let mut idlist = array![];
1525 for item in ids.members() {
1526 let _ = idlist.push(item["id"].clone());
1527 }
1528 idlist
1529 }
1530 false => {
1531 error!("批量添加失败: {ids:?} {sql}");
1532 array![]
1533 }
1534 };
1535 }
1536 JsonValue::from(ids_list)
1537 }
1538 false => {
1539 error!("批量添加失败: {data:?} {sql}");
1540 array![]
1541 }
1542 }
1543 }
1544
1545 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1546 let mut fields = vec![];
1547 let mut values = vec![];
1548
1549 if !self.params.autoinc && data["id"].is_empty() {
1550 let thread_id = format!("{:?}", std::thread::current().id());
1551 let thread_num: u64 = thread_id
1552 .trim_start_matches("ThreadId(")
1553 .trim_end_matches(")")
1554 .parse()
1555 .unwrap_or(0);
1556 data["id"] = format!(
1557 "{:X}{:X}",
1558 Local::now().timestamp_nanos_opt().unwrap_or(0),
1559 thread_num
1560 )
1561 .into();
1562 }
1563 for (field, value) in data.entries() {
1564 fields.push(format!("`{field}`"));
1565
1566 if value.is_string() {
1567 if value.to_string().contains("'") {
1568 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1569 continue;
1570 } else if value.to_string().contains('"') {
1571 values.push(format!("'{value}'"));
1572 continue;
1573 } else {
1574 values.push(format!("\"{value}\""));
1575 continue;
1576 }
1577 } else if value.is_array() || value.is_object() {
1578 if self.params.json[field].is_empty() {
1579 values.push(format!("'{value}'"));
1580 } else {
1581 let json = value.to_string();
1582 let json = json.replace("'", "''");
1583 values.push(format!("'{json}'"));
1584 }
1585 continue;
1586 } else if value.is_number() || value.is_boolean() || value.is_null() {
1587 values.push(format!("{value}"));
1588 continue;
1589 } else {
1590 values.push(format!("'{value}'"));
1591 continue;
1592 }
1593 }
1594
1595 let conflict_cols: Vec<String> =
1596 conflict_fields.iter().map(|f| format!("`{}`", f)).collect();
1597
1598 let update_set: Vec<String> = fields
1599 .iter()
1600 .filter(|f| {
1601 let name = f.trim_matches('`');
1602 !conflict_fields.contains(&name) && name != "id"
1603 })
1604 .map(|f| format!("{f}=excluded.{f}"))
1605 .collect();
1606
1607 let fields_str = fields.join(",");
1608 let values_str = values.join(",");
1609
1610 let sql = format!(
1611 "INSERT INTO `{}` ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1612 self.params.table,
1613 fields_str,
1614 values_str,
1615 conflict_cols.join(","),
1616 update_set.join(",")
1617 );
1618 if self.params.sql {
1619 return JsonValue::from(sql.clone());
1620 }
1621 let (state, result) = self.execute(sql);
1622 match state {
1623 true => {
1624 if self.params.autoinc {
1625 let (state, ids) =
1626 self.query(format!("select max(id) as id from {}", self.params.table));
1627 return match state {
1628 true => ids[0]["id"].clone(),
1629 false => {
1630 error!("{ids}");
1631 JsonValue::from("")
1632 }
1633 };
1634 }
1635 data["id"].clone()
1636 }
1637 false => {
1638 error!("upsert失败: {result}");
1639 JsonValue::from("")
1640 }
1641 }
1642 }
1643
1644 fn update(&mut self, data: JsonValue) -> JsonValue {
1645 let mut values = vec![];
1646
1647 for (field, value) in data.entries() {
1648 if value.is_string() {
1649 values.push(format!(
1650 "`{}` = '{}'",
1651 field,
1652 value.to_string().replace("'", "''")
1653 ));
1654 } else if value.is_array() || value.is_object() {
1655 if self.params.json[field].is_empty() {
1656 values.push(format!("`{field}` = '{value}'"));
1657 } else {
1658 let json = value.to_string();
1659 let json = json.replace("'", "''");
1660 values.push(format!("`{field}` = '{json}'"));
1661 }
1662 continue;
1663 } else if value.is_number() || value.is_boolean() || value.is_null() {
1664 values.push(format!("`{field}` = {value} "));
1665 } else {
1666 values.push(format!("`{field}` = '{value}' "));
1667 }
1668 }
1669
1670 for (field, value) in self.params.inc_dec.entries() {
1671 values.push(format!("{} = {}", field, value.to_string().clone()));
1672 }
1673
1674 let values = values.join(",");
1675 let sql = format!(
1676 "UPDATE `{}` SET {} {} {};",
1677 self.params.table.clone(),
1678 values,
1679 self.params.where_sql(),
1680 self.params.page_limit_sql()
1681 );
1682 if self.params.sql {
1683 return JsonValue::from(sql.clone());
1684 }
1685 let (state, data) = self.execute(sql);
1686 if state {
1687 data
1688 } else {
1689 error!("{data}");
1690 JsonValue::from(0)
1691 }
1692 }
1693
1694 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1695 let mut values = vec![];
1696 let mut ids = vec![];
1697 for (field, _) in data[0].entries() {
1698 if field == "id" {
1699 continue;
1700 }
1701 let mut fields = vec![];
1702 for row in data.members() {
1703 let value = row[field].clone();
1704 let id = row["id"].clone();
1705 ids.push(id.clone());
1706 if value.is_string() {
1707 fields.push(format!(
1708 "WHEN '{}' THEN '{}'",
1709 id,
1710 value.to_string().replace("'", "''")
1711 ));
1712 } else if value.is_array() || value.is_object() {
1713 if self.params.json[field].is_empty() {
1714 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1715 } else {
1716 let json = value.to_string();
1717 let json = json.replace("'", "''");
1718 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1719 }
1720 continue;
1721 } else if value.is_number() || value.is_boolean() || value.is_null() {
1722 fields.push(format!("WHEN '{id}' THEN {value}"));
1723 } else {
1724 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1725 }
1726 }
1727 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1728 }
1729
1730 self.where_and("id", "in", ids.into());
1731 for (field, value) in self.params.inc_dec.entries() {
1732 values.push(format!("{} = {}", field, value.to_string().clone()));
1733 }
1734
1735 let values = values.join(",");
1736 let sql = format!(
1737 "UPDATE {} SET {} {} {};",
1738 self.params.table.clone(),
1739 values,
1740 self.params.where_sql(),
1741 self.params.page_limit_sql()
1742 );
1743 if self.params.sql {
1744 return JsonValue::from(sql.clone());
1745 }
1746 let (state, data) = self.execute(sql);
1747 if state {
1748 data
1749 } else {
1750 error!("{data:?}");
1751 JsonValue::from(0)
1752 }
1753 }
1754 fn delete(&mut self) -> JsonValue {
1755 let sql = format!(
1756 "delete FROM `{}` {};",
1757 self.params.table.clone(),
1758 self.params.where_sql()
1759 );
1760 if self.params.sql {
1761 return JsonValue::from(sql.clone());
1762 }
1763 let (state, data) = self.execute(sql);
1764 match state {
1765 true => data,
1766 false => {
1767 error!("delete 失败>>>{data:?}");
1768 JsonValue::from(0)
1769 }
1770 }
1771 }
1772
1773 fn transaction(&mut self) -> bool {
1774 let thread_id = format!("{:?}", thread::current().id());
1775
1776 if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1777 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1778 SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
1779 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1780 let _ = self.query(sp);
1781 return true;
1782 }
1783
1784 if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
1785 error!("{thread_id} 启动事务失败: 获取写锁超时");
1786 return false;
1787 }
1788
1789 let flags = OpenFlags::new().with_read_write().with_no_mutex();
1790 let db = match Connect::open_thread_safe_with_flags(
1791 self.connection.clone().get_dsn().as_str(),
1792 flags,
1793 ) {
1794 Ok(e) => Arc::new(e),
1795 Err(e) => {
1796 error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
1797 SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
1798 return false;
1799 }
1800 };
1801
1802 SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
1803
1804 let (state, data) = self.query("BEGIN".to_string());
1805 if state {
1806 true
1807 } else {
1808 error!("{thread_id} 启动事务失败: {data}");
1809 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1810 false
1811 }
1812 }
1813
1814 fn commit(&mut self) -> bool {
1815 let thread_id = format!("{:?}", thread::current().id());
1816
1817 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1818 error!("{thread_id} 提交事务失败: 没有活跃的事务");
1819 return false;
1820 }
1821
1822 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1823 if depth > 1 {
1824 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1825 let _ = self.query(sp);
1826 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1827 return true;
1828 }
1829
1830 let (state, _) = self.query("COMMIT".to_string());
1831 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1832
1833 if state {
1834 true
1835 } else {
1836 error!("{thread_id} 提交事务失败");
1837 false
1838 }
1839 }
1840
1841 fn rollback(&mut self) -> bool {
1842 let thread_id = format!("{:?}", thread::current().id());
1843
1844 if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
1845 error!("{thread_id} 回滚失败: 没有活跃的事务");
1846 return false;
1847 }
1848
1849 let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
1850 if depth > 1 {
1851 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1852 let _ = self.query(sp);
1853 SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
1854 return true;
1855 }
1856
1857 let (state, _) = self.query("ROLLBACK".to_string());
1858 SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
1859
1860 if state {
1861 true
1862 } else {
1863 error!("回滚失败: {thread_id}");
1864 false
1865 }
1866 }
1867
1868 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1869 let (state, data) = self.query(sql.to_string());
1870 match state {
1871 true => Ok(data),
1872 false => Err(data.to_string()),
1873 }
1874 }
1875
1876 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1877 let (state, data) = self.execute(sql.to_string());
1878 match state {
1879 true => Ok(data),
1880 false => Err(data.to_string()),
1881 }
1882 }
1883
1884 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1885 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1886 self
1887 }
1888
1889 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1890 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1891 self
1892 }
1893
1894 fn buildsql(&mut self) -> String {
1895 self.fetch_sql();
1896 let sql = self.select().to_string();
1897 format!("( {} ) `{}`", sql, self.params.table)
1898 }
1899
1900 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1901 for field in fields {
1902 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1903 }
1904 self
1905 }
1906
1907 fn join(
1908 &mut self,
1909 main_table: &str,
1910 main_fields: &str,
1911 right_table: &str,
1912 right_fields: &str,
1913 ) -> &mut Self {
1914 let main_table = if main_table.is_empty() {
1915 self.params.table.clone()
1916 } else {
1917 main_table.to_string()
1918 };
1919 self.params.join_table = right_table.to_string();
1920 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1921 self
1922 }
1923
1924 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1925 let main_fields = if main_fields.is_empty() {
1926 "id"
1927 } else {
1928 main_fields
1929 };
1930 let second_fields = if second_fields.is_empty() {
1931 self.params.table.clone()
1932 } else {
1933 second_fields.to_string().clone()
1934 };
1935 let sec_table_name = format!("{}{}", table, "_2");
1936 let second_table = format!("{} {}", table, sec_table_name.clone());
1937 self.params.join_table = sec_table_name.clone();
1938 self.params.join.push(format!(
1939 " INNER JOIN {} ON {}.{} = {}.{}",
1940 second_table, self.params.table, main_fields, sec_table_name, second_fields
1941 ));
1942 self
1943 }
1944
1945 fn join_right(
1946 &mut self,
1947 main_table: &str,
1948 main_fields: &str,
1949 right_table: &str,
1950 right_fields: &str,
1951 ) -> &mut Self {
1952 let main_table = if main_table.is_empty() {
1953 self.params.table.clone()
1954 } else {
1955 main_table.to_string()
1956 };
1957 self.params.join_table = right_table.to_string();
1958 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1959 self
1960 }
1961
1962 fn join_full(
1963 &mut self,
1964 main_table: &str,
1965 main_fields: &str,
1966 right_table: &str,
1967 right_fields: &str,
1968 ) -> &mut Self {
1969 let main_table = if main_table.is_empty() {
1970 self.params.table.clone()
1971 } else {
1972 main_table.to_string()
1973 };
1974 self.params.join_table = right_table.to_string();
1975 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1976 self
1977 }
1978
1979 fn union(&mut self, sub_sql: &str) -> &mut Self {
1980 self.params.unions.push(format!("UNION {sub_sql}"));
1981 self
1982 }
1983
1984 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1985 self.params.unions.push(format!("UNION ALL {sub_sql}"));
1986 self
1987 }
1988
1989 fn lock_for_update(&mut self) -> &mut Self {
1990 self.params.lock_mode = "FOR UPDATE".to_string();
1991 self
1992 }
1993
1994 fn lock_for_share(&mut self) -> &mut Self {
1995 self.params.lock_mode = "FOR SHARE".to_string();
1996 self
1997 }
1998}