1use crate::types::{DbMode, Mode, Params, TableOptions, validate_table_name, add_table_prefix};
2use crate::Connection;
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info};
7use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
8use std::collections::{HashMap, HashSet};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::cell::Cell;
12use chrono::Local;
13
14thread_local! {
15 static THREAD_ID_CACHE: Cell<Option<String>> = const { Cell::new(None) };
16}
17
18fn get_thread_id() -> String {
19 THREAD_ID_CACHE.with(|cache| {
20 if let Some(id) = cache.take() {
21 cache.set(Some(id.clone()));
22 id
23 } else {
24 let id = format!("{:?}", thread::current().id());
25 cache.set(Some(id.clone()));
26 id
27 }
28 })
29}
30
31lazy_static! {
32 static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
33 static ref TR: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
34 static ref TR_COUNT: Mutex<HashMap<String, u32>> = Mutex::new(HashMap::new());
35 static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
36 static ref FIELDS: Mutex<HashMap<String,JsonValue>> = Mutex::new(HashMap::new());
37}
38
39#[derive(Clone, Debug)]
40pub struct Sqlite {
41 pub connection: Connection,
43 pub default: String,
45
46 pub params: Params,
47}
48
49impl Sqlite {
50 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
51 let flags = OpenFlags::new().with_create().with_read_write();
52 match Connect::open_thread_safe_with_flags(connection.clone().get_dsn().as_str(), flags) {
53 Ok(e) => {
54 DBS.lock().unwrap().insert(default.clone(), Arc::new(e));
55 Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("sqlite"),
59 })
60 }
61 Err(e) => {
62 error!(
63 "sqlite 启动失败: {} {}",
64 e,
65 connection.clone().get_dsn().as_str()
66 );
67 Err(e.to_string())
68 }
69 }
70 }
71 fn query_handle(&mut self, mut statement: Statement, sql: String) -> (bool, JsonValue) {
72 let thread_id = get_thread_id();
73
74 let mut data = array![];
75 while let State::Row = match statement.next() {
76 Ok(e) => e,
77 Err(e) => {
78 let transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
79 if transaction > 0 {
80 error!("{} 查询事务: {} {}", thread_id, e, sql.clone());
81 } else {
82 error!("{} 非事务查询: {} {}", thread_id, e, sql.clone());
83 }
84 return (false, data);
85 }
86 } {
87 let mut list = object! {};
88 let mut index = 0;
89 for field in statement.column_names().iter() {
90 if !list[field.as_str()].is_null() {
91 index += 1;
92 continue;
93 }
94 match statement.column_type(field.as_str()) {
95 Ok(types) => match types {
96 Type::String => {
97 let data = statement.read::<String, _>(index).unwrap();
98 match data.as_str() {
99 "false" => {
100 list[field.as_str()] = JsonValue::from(false);
101 }
102 "true" => {
103 list[field.as_str()] = JsonValue::from(true);
104 }
105 _ => {
106 list[field.as_str()] = JsonValue::from(data.clone());
107 }
108 }
109 }
110 Type::Integer => {
111 let field_info = FIELDS.lock().map(|f| f.get(&self.params.table).cloned()).unwrap_or(None);
113 if let Some(fields) = field_info {
114 if fields[field.clone()].is_empty() {
115 let data = statement.read::<i64, _>(index).unwrap();
116 list[field.as_str()] = data.into();
117 continue;
118 }
119 match fields[field.clone()]["type"].as_str().unwrap() {
120 "INTEGER" => {
121 let data = statement.read::<i64, _>(index).unwrap();
122 list[field.as_str()] = JsonValue::from(data == 1);
123 }
124 x if x.contains("int(") => {
125 let data = statement.read::<i64, _>(index).unwrap();
126 list[field.as_str()] = data.into();
127 }
128 x if x.contains("decimal(") && x.ends_with(",0)") => {
129 let data = statement.read::<f64, _>(index).unwrap();
130 list[field.as_str()] = data.into();
131 }
132 _ => {
133 let data = statement.read::<i64, _>(index).unwrap();
134 list[field.as_str()] = data.into();
135 }
136 }
137 }
138 }
139 Type::Float => {
140 let data = statement.read::<f64, _>(index).unwrap();
141 list[field.as_str()] = JsonValue::from(data);
142 }
143 Type::Binary => {
144 let data = statement.read::<String, _>(index).unwrap();
145 list[field.as_str()] = JsonValue::from(data.clone());
146 }
147 Type::Null => match statement.read::<String, _>(index) {
148 Ok(data) => {
149 list[field.as_str()] = JsonValue::from(data.clone());
150 }
151 Err(_) => match statement.read::<f64, _>(index) {
152 Ok(data) => {
153 if data == 0.0 {
154 list[field.as_str()] = JsonValue::from("");
155 } else {
156 list[field.as_str()] = JsonValue::from(data);
157 }
158 }
159 Err(_) => match statement.read::<i64, _>(index) {
160 Ok(data) => {
161 if data == 0 {
162 list[field.as_str()] = JsonValue::from("");
163 } else {
164 list[field.as_str()] = JsonValue::from(data);
165 }
166 }
167 Err(e) => {
168 error!("Type:{} {:?}", field.as_str(), e);
169 }
170 },
171 },
172 },
173 },
174 Err(e) => {
175 error!("query Err: {e:?}");
176 }
177 }
178 index += 1;
179 }
180 data.push(list).unwrap();
181 }
182 (true, data)
183 }
184 pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
185 let thread_id = get_thread_id();
186
187 let transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
188 if transaction > 0 {
189 let dbs = match TR.lock() {
192 Ok(dbs) => dbs,
193 Err(e) => {
194 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
195 return (false, JsonValue::from("数据库锁定失败"));
196 }
197 };
198 let db = dbs.get(&*thread_id.clone()).unwrap().clone();
199 let x = match db.prepare(sql.clone()) {
200 Ok(statement) => self.query_handle(statement, sql.clone()),
201 Err(e) => {
202 error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
203 (false, e.to_string().into())
204 }
205 };
206 x
207 } else {
208 let dbs = match DBS.lock() {
210 Ok(dbs) => dbs,
211 Err(e) => {
212 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
213 return (false, JsonValue::from("数据库锁定失败"));
214 }
215 };
216 let db = dbs.get(&*self.default).unwrap().clone();
217 let x = match db.prepare(sql.clone()) {
218 Ok(statement) => self.query_handle(statement, sql.clone()),
219 Err(e) => {
220 error!("{thread_id} 查询非事务: Err: {e}");
221 (false, e.to_string().into())
222 }
223 };
224 x
225 }
226 }
227
228 pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
229 let thread_id = get_thread_id();
230
231 let transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
232 if transaction > 0 {
233 let dbs = match TR.lock() {
236 Ok(dbs) => dbs,
237 Err(e) => {
238 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
239 return (false, JsonValue::from("数据库锁定失败"));
240 }
241 };
242 let db = match dbs.get(&*thread_id) {
243 Some(db) => db.clone(),
244 None => {
245 error!(
246 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
247 thread_id, self.default, sql
248 );
249 return (false, JsonValue::from("未找到默认数据库配置"));
250 }
251 };
252 match db.execute(sql.clone()) {
253 Ok(_) => {
254 let count = db.change_count();
255 if self.connection.debug {
256 info!(
257 "{} count:{} total_count:{}",
258 thread_id,
259 count,
260 db.total_change_count()
261 );
262 }
263 (true, JsonValue::from(count))
264 }
265 Err(e) => {
266 error!("{} 执行事务: \r\nErr: {}\r\n{}", thread_id, e, sql.clone());
267 (false, JsonValue::from(e.to_string()))
268 }
269 }
270 } else {
271 if self.connection.debug {
272 info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
273 }
274 let dbs = match DBS.lock() {
275 Ok(dbs) => dbs,
276 Err(e) => {
277 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
278 return (false, JsonValue::from("数据库锁定失败"));
279 }
280 };
281
282 let db = match dbs.get(&*self.default) {
283 Some(db) => db.clone(),
284 None => {
285 error!(
286 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
287 thread_id, self.default, sql
288 );
289 return (false, JsonValue::from("未找到默认数据库配置"));
290 }
291 };
292 match db.execute(sql.clone()) {
293 Ok(_) => {
294 let count = db.change_count();
295 if self.connection.debug {
296 info!(
297 "{} count: {} total_count: {}",
298 thread_id,
299 count,
300 db.total_change_count()
301 );
302 }
303 (true, JsonValue::from(count))
304 }
305 Err(e) => {
306 let error_msg = e.to_string().to_lowercase();
308 if error_msg.contains("already exists") {
309 (true, JsonValue::from(0))
311 } else {
312 error!(
313 "{} 执行非事务: Err: {}\r\nSQL: {}",
314 thread_id,
315 e,
316 sql.clone()
317 );
318 (false, JsonValue::from(e.to_string()))
319 }
320 }
321 }
322 }
323 }
324
325 fn execute_ddl(&mut self, sql: String) -> (bool, JsonValue) {
329 let thread_id = get_thread_id();
330
331 if self.connection.debug {
334 info!("{} 执行 DDL(非事务): \r\nsql: {}", thread_id, sql.clone());
335 }
336
337 let dbs = match DBS.lock() {
338 Ok(dbs) => dbs,
339 Err(e) => {
340 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
341 return (false, JsonValue::from("数据库锁定失败"));
342 }
343 };
344
345 let db = match dbs.get(&*self.default) {
346 Some(db) => db.clone(),
347 None => {
348 error!(
349 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
350 thread_id, self.default, sql
351 );
352 return (false, JsonValue::from("未找到默认数据库配置"));
353 }
354 };
355
356 match db.execute(sql.clone()) {
357 Ok(_) => {
358 let count = db.change_count();
359 if self.connection.debug {
360 info!(
361 "{} DDL 执行成功: count: {} total_count: {}",
362 thread_id,
363 count,
364 db.total_change_count()
365 );
366 }
367 (true, JsonValue::from(count))
368 }
369 Err(e) => {
370 let error_msg = e.to_string().to_lowercase();
372 if error_msg.contains("already exists") {
373 (true, JsonValue::from(0))
375 } else {
376 error!(
377 "{} 执行 DDL 失败: Err: {}\r\nSQL: {}",
378 thread_id,
379 e,
380 sql.clone()
381 );
382 (false, JsonValue::from(e.to_string()))
383 }
384 }
385 }
386 }
387}
388
389impl DbMode for Sqlite {
390 fn database_tables(&mut self) -> JsonValue {
391 let sql = "select name from sqlite_master where type='table' order by name;".to_string();
392 match self.sql(sql.as_str()) {
393 Ok(e) => {
394 let mut list = vec![];
395 for item in e.members() {
396 list.push(item["name"].clone());
397 }
398 list.into()
399 }
400 Err(_) => {
401 array![]
402 }
403 }
404 }
405 fn database_create(&mut self, _name: &str) -> bool {
406 true
409 }
410}
411
412impl Sqlite {
413 fn validate_index_fields_in_table(
415 index_fields: &str,
416 table_fields: &[String],
417 ) -> bool {
418 let fields: Vec<String> = index_fields
420 .split(',')
421 .map(|f| f.trim_matches('`').trim().to_string())
422 .collect();
423
424 for field in fields {
425 if !table_fields.contains(&field) {
426 return false;
427 }
428 }
429 true
430 }
431
432 fn can_drop_index(index_name: &str) -> bool {
435 !index_name.starts_with("sqlite_autoindex_")
437 }
438}
439
440impl Mode for Sqlite {
441 fn table_create(&mut self, options: TableOptions) -> JsonValue {
442 let thread_id = get_thread_id();
443
444 let table_name = match validate_table_name(&options.table_name) {
446 Ok(n) => n,
447 Err(e) => {
448 error!("表名验证失败: {}", e);
449 return JsonValue::from(false);
450 }
451 };
452
453 if options.table_fields.is_empty() {
455 error!("{} 表 {} 字段列表为空,无法创建表", thread_id, table_name);
456 return JsonValue::from(false);
457 }
458
459 let estimated_capacity = options.table_fields.len() * 100;
461 let mut sql = String::with_capacity(estimated_capacity);
462 let mut unique_fields = String::new();
464 let mut unique_name = String::new();
465 let mut unique = String::new();
466 for item in options.table_unique.iter() {
467 if unique_fields.is_empty() {
468 unique_fields = format!("`{item}`");
469 unique_name = format!("{}_unique_{}", table_name, item);
470 } else {
471 unique_fields = format!("{unique_fields},`{item}`");
472 unique_name = format!("{unique_name}_{item}");
473 }
474 }
475 if !unique_name.is_empty() {
477 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
478 let index_name = format!("unique_{}", md5);
479 unique = format!(
480 "CREATE UNIQUE INDEX `{}` on {} ({});\r\n",
481 index_name, table_name, unique_fields
482 );
483 }
484
485 let fields_list: Vec<String> = options.table_fields.entries()
487 .map(|(name, _)| name.to_string())
488 .collect();
489
490 if !fields_list.contains(&options.table_key) {
492 error!(
493 "{} 表 {} 主键字段验证失败: 主键字段 '{}' 不在字段列表中",
494 thread_id, table_name, options.table_key
495 );
496 return JsonValue::from(false);
497 }
498
499 if !unique_fields.is_empty()
501 && !Self::validate_index_fields_in_table(&unique_fields, &fields_list)
502 {
503 error!(
504 "{} 表 {} 唯一索引字段验证失败: 字段 {} 不在字段列表中",
505 thread_id, table_name, unique_fields
506 );
507 return JsonValue::from(false);
508 }
509
510 let estimated_indexes = options.table_index.len();
512 let mut index = Vec::with_capacity(estimated_indexes);
513 let mut index_names = Vec::with_capacity(estimated_indexes); for row in options.table_index.iter() {
515 let mut index_fields = String::new();
516 let mut index_name = String::new();
517 for item in row.iter() {
518 if index_fields.is_empty() {
519 index_fields = format!("`{item}`");
520 index_name = format!("index_{item}");
521 } else {
522 index_fields = format!("{index_fields},`{item}`");
523 index_name = format!("{index_name}_{item}");
524 }
525 }
526 if !Self::validate_index_fields_in_table(&index_fields, &fields_list) {
528 error!(
529 "{} 表 {} 索引字段验证失败: 字段 {} 不在字段列表中,跳过创建索引 {}",
530 thread_id, table_name, index_fields, index_name
531 );
532 continue; }
534 let full_index_name = format!("{}_{}", table_name, index_name);
536 index_names.push(full_index_name.clone()); index.push(format!(
538 "CREATE INDEX {} on {} ({});\r\n",
539 full_index_name, table_name, index_fields
540 ));
541 }
542
543 for (name, field) in options.table_fields.entries() {
544 let row = br_fields::field("sqlite", name, field.clone());
545 if row.is_empty() {
547 error!("{} 表 {} 字段 {} 的 SQL 生成结果为空", thread_id, table_name, name);
548 return JsonValue::from(false);
549 }
550 let escaped_row = if crate::types::DISABLE_FIELD.contains(&name) {
552 if let Some(space_pos) = row.find(' ') {
555 format!("`{}`{}", &row[..space_pos], &row[space_pos..])
556 } else {
557 format!("`{}`", row)
558 }
559 } else {
560 row
561 };
562 sql = format!("{sql} {escaped_row},\r\n");
563 }
564
565 sql = sql.trim_end_matches(",\r\n").to_string();
566
567 sql = if sql.trim_end().ends_with(",") {
570 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
571 } else {
572 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
573 };
574
575 let sql = format!(
576 "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
577 table_name, sql
578 );
579 if self.params.sql {
580 let mut list = vec![sql];
581 if !unique.is_empty() {
582 list.push(unique)
583 }
584 if !index.is_empty() {
585 list.extend(index)
586 }
587
588 return JsonValue::from(list.join(""));
589 }
590
591 let (state, error_info) = self.execute_ddl(sql.clone());
594 if state {
595 if !unique.is_empty() {
596 let (_, index_list) = self.query(format!("PRAGMA index_list(`{}`);\r\n", table_name));
598 let mut existing_unique_indexes = HashSet::with_capacity(index_list.len());
599 for item in index_list.members() {
600 if item["unique"].as_usize().unwrap_or(0) == 1 {
601 if let Some(name) = item["name"].as_str() {
602 existing_unique_indexes.insert(name.to_string());
603 }
604 }
605 }
606
607 let index_name = if unique.contains("`") {
609 let start = unique.find("`").unwrap() + 1;
610 let end = unique[start..].find("`").unwrap() + start;
611 unique[start..end].to_string()
612 } else {
613 String::new()
614 };
615
616 if !index_name.is_empty() && !existing_unique_indexes.contains(index_name.as_str()) {
618 let old_indexes: Vec<String> = existing_unique_indexes.iter().cloned().collect();
620 for old_index in old_indexes {
621 if old_index != index_name {
622 if Self::can_drop_index(&old_index) {
624 let drop_sql = format!("DROP INDEX IF EXISTS `{}`;", old_index);
625 let (state, result) = self.execute_ddl(drop_sql);
626 if !state {
627 error!("删除旧索引失败: {} 错误: {}", old_index, result);
628 }
629 } else {
630 }
632 }
633 }
634 let (state, result) = self.execute_ddl(unique.clone());
635 if !state {
636 let error_msg = result.to_string().to_lowercase();
638 if error_msg.contains("already exists") {
639 } else {
641 error!(
642 "{} 表 {} 唯一索引创建失败: {} SQL: {}",
643 thread_id, table_name, result, unique
644 );
645 }
646 }
647 } else if !index_name.is_empty() {
648 }
650 }
651 if !index.is_empty() {
652 let (_, index_list) = self.query(format!("PRAGMA index_list(`{}`);\r\n", table_name));
654 let mut existing_indexes = HashSet::with_capacity(index_list.len());
655 for item in index_list.members() {
656 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
657 if !is_unique {
658 if let Some(name) = item["name"].as_str() {
659 existing_indexes.insert(name.to_string());
660 }
661 }
662 }
663
664 for (index_sql, index_name) in index.iter().zip(index_names.iter()) {
666 if !existing_indexes.contains(index_name.as_str()) {
668 let (state, result) = self.execute_ddl(index_sql.clone());
669 if state {
670 } else {
672 error!("{} {} 索引创建失败: {} 错误: {}", thread_id, table_name, index_name, result);
673 }
674 } else {
675 }
677 }
678 }
679 JsonValue::from(true)
680 } else {
681 error!(
682 "{} 表 {} 创建失败: SQL: {} 错误: {}",
683 thread_id, table_name, sql, error_info
684 );
685 JsonValue::from(false)
686 }
687 }
688 fn table_update(&mut self, options: TableOptions) -> JsonValue {
689 let thread_id = get_thread_id();
690
691 let estimated_fields = options.table_fields.len();
693 let mut sql = String::with_capacity(estimated_fields * 100);
694 let mut add = Vec::with_capacity(estimated_fields);
695 let mut del = Vec::with_capacity(estimated_fields);
696 let mut put = Vec::with_capacity(estimated_fields);
697
698 let table_name = match validate_table_name(&options.table_name) {
700 Ok(n) => n,
701 Err(e) => {
702 error!("表名验证失败: {}", e);
703 return JsonValue::from(0);
704 }
705 };
706 let (_, mut fields_list) = self.query(format!("PRAGMA table_info(`{}`)", table_name));
707 let mut field_old = object! {};
708 for item in fields_list.members_mut() {
709 item["dflt_value"] = item["dflt_value"].to_string().trim_start_matches("'").trim_end_matches("'").into();
710 let name = match item["name"].as_str() {
711 Some(n) => n,
712 None => {
713 error!("表 {} 字段信息中缺少 name 字段", options.table_name);
714 continue;
715 }
716 };
717 field_old[name] = item.clone();
718 if options.table_fields[name].is_empty() {
719 del.push(name);
720 }
721 }
722
723 let estimated_fields = options.table_fields.len();
724 let mut fields_list = Vec::with_capacity(estimated_fields);
725 let mut fields_list_new = Vec::with_capacity(estimated_fields);
726
727 for (name, field) in options.table_fields.entries() {
728 fields_list_new.push(name.to_string());
729 if field_old[name].is_empty() {
730 add.push(name);
731 } else {
732 fields_list.push(name);
733 let field_mode = field["mode"].as_str().unwrap_or("");
734 let old_value = match field_mode {
735 "select" => {
736 if field_old[name]["dflt_value"].clone().is_empty() {
737 "".to_string()
738 } else {
739 field_old[name]["dflt_value"].clone().to_string()
740 }
741 }
742 "switch" => {
743 match field_old[name]["dflt_value"].to_string().parse::<i32>() {
744 Ok(val) => (val == 1).to_string(),
745 Err(_) => {
746 error!("表 {} 字段 {} 的默认值解析失败,跳过更新", options.table_name, name);
747 continue;
748 }
749 }
750 }
751 _ => {
752 field_old[name]["dflt_value"].clone().to_string()
753 }
754 };
755 let new_value = match field["mode"].as_str().unwrap() {
756 "select" => {
757 field["def"].members().map(|x| x.to_string()).collect::<Vec<String>>().join(",")
758 }
759 _ => field["def"].clone().to_string()
760 };
761 if old_value != new_value {
762 info!("{} 差异化当前: {} old_value: {} new_value: {} {}",
763 options.table_name,
764 name,
765 old_value,
766 new_value,
767 old_value != new_value
768 );
769 info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
770 put.push(name);
771 } else if field_old[name]["pk"].as_i64().unwrap() == 1 && name != options.table_key
772 {
773 info!("{} 主键替换: {}", options.table_name, name);
774 put.push(name);
775 }
776 }
777 }
778
779 let mut unique_fields = String::new();
780 let mut unique_name = String::new();
781 let mut unique = String::new();
782
783 for item in options.table_unique.iter() {
785 if unique_fields.is_empty() {
786 unique_fields = format!("`{item}`");
787 unique_name = format!("{}_unique_{}", options.table_name, item);
788 } else {
789 unique_fields = format!("{unique_fields},`{item}`");
790 unique_name = format!("{unique_name}_{item}");
791 }
792 }
793 if !unique_name.is_empty() {
795 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
796 let index_name = format!("unique_{}", md5);
797 unique = format!(
798 "CREATE UNIQUE INDEX `{}` on {} ({});\r\n",
799 index_name, options.table_name, unique_fields
800 );
801 }
802
803 let estimated_indexes = options.table_index.len();
805 let mut index = Vec::with_capacity(estimated_indexes);
806 for row in options.table_index.iter() {
807 let mut index_fields = String::new();
808 let mut index_name = String::new();
809 for item in row.iter() {
810 if index_fields.is_empty() {
811 index_fields = format!("`{item}`");
812 index_name = format!("index_{item}");
813 } else {
814 index_fields = format!("{index_fields},`{item}`");
815 index_name = format!("{index_name}_{item}");
816 }
817 }
818 if !Self::validate_index_fields_in_table(&index_fields, &fields_list_new) {
820 error!(
821 "{} 索引字段验证失败: 字段 {} 不在新表字段列表中,跳过创建索引 {}",
822 options.table_name, index_fields, index_name
823 );
824 } else {
825 index.push(format!(
826 "create index {}_{} on {} ({});\r\n",
827 options.table_name, index_name, options.table_name, index_fields
828 ));
829 }
830 }
831 for (name, field) in options.table_fields.entries() {
832 let row = br_fields::field("sqlite", name, field.clone());
833 let escaped_row = if crate::types::DISABLE_FIELD.contains(&name) {
835 if let Some(space_pos) = row.find(' ') {
838 format!("`{}`{}", &row[..space_pos], &row[space_pos..])
839 } else {
840 format!("`{}`", row)
841 }
842 } else {
843 row
844 };
845 sql = format!("{sql} {escaped_row},\r\n");
846 }
847
848 if !unique.is_empty() || !index.is_empty() {
849 let (_, index_list_result) = self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
850 let mut existing_unique_indexes = HashSet::with_capacity(index_list_result.len());
852 let mut existing_indexes = HashSet::with_capacity(index_list_result.len());
853
854 for item in index_list_result.members() {
855 if let Some(index_name) = item["name"].as_str() {
856 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
857 if is_unique {
858 existing_unique_indexes.insert(index_name.to_string());
859 } else {
860 existing_indexes.insert(index_name.to_string());
861 }
862 }
863 }
864
865 if !unique_name.is_empty() {
867 if !Self::validate_index_fields_in_table(&unique_fields, &fields_list_new) {
869 error!(
870 "{} 唯一索引字段验证失败: 字段 {} 不在新表字段列表中,跳过创建唯一索引",
871 options.table_name, unique_fields
872 );
873 unique = String::new();
874 } else {
875 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
876 let new_unique_index_name = format!("unique_{}", md5);
877
878 if !existing_unique_indexes.contains(new_unique_index_name.as_str()) {
880 let old_indexes: Vec<String> = existing_unique_indexes.iter().cloned().collect();
882 for old_index in old_indexes {
883 if old_index != new_unique_index_name {
884 if Self::can_drop_index(&old_index) {
886 let drop_sql = format!("DROP INDEX IF EXISTS `{}`;", old_index);
887 let (state, result) = self.execute(drop_sql);
888 if !state {
889 error!("删除旧索引失败: {} 错误: {}", old_index, result);
890 }
891 } else {
892 }
894 }
895 }
896 unique = format!(
898 "CREATE UNIQUE INDEX `{}` on {} ({});\r\n",
899 new_unique_index_name, options.table_name, unique_fields
900 );
901 } else {
902 unique = String::new();
904 }
905 }
906 }
907
908 let mut index_old_set = HashSet::with_capacity(index_list_result.len());
910 let mut index_new_set = HashSet::with_capacity(index.len());
911
912 for item in index_list_result.members() {
914 let origin = item["origin"].as_str().unwrap_or("");
915 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
916 let name = item["name"].as_str().unwrap_or("");
917
918 if origin == "c" && !is_unique {
919 index_old_set.insert(name.to_string());
920 }
921 }
922
923 for index_sql in index.iter() {
925 if let Some(start) = index_sql.find("CREATE INDEX") {
927 if let Some(end) = index_sql[start..].find(" ON ") {
928 let index_name = index_sql[start+12..start+end].trim();
929 index_new_set.insert(index_name.to_string());
930 }
931 } else if let Some(start) = index_sql.find("create index") {
932 if let Some(end) = index_sql[start..].find(" on ") {
933 let index_name = index_sql[start+12..start+end].trim();
934 index_new_set.insert(index_name.to_string());
935 }
936 }
937 }
938
939 if unique.is_empty() && index_old_set.len() == index_new_set.len() {
941 let all_exist = index_new_set.iter().all(|name| index_old_set.contains(name));
943 if all_exist {
944 index = vec![];
945 }
946 }
947 }
948
949 if add.is_empty() && del.is_empty() && put.is_empty() && unique.is_empty() && index.is_empty() {
951 return JsonValue::from(-1);
952 }
953
954 sql = sql.trim_end_matches(",\r\n").to_string();
955 let mut create_sql = String::with_capacity(sql.len() + options.table_name.len() + 50);
957 create_sql.push_str("CREATE TABLE ");
958 create_sql.push_str(&options.table_name);
959 create_sql.push_str("_tmp (\r\n");
960 create_sql.push_str(&sql);
961 create_sql.push_str("\r\n);\r\n");
962 let sql = create_sql;
963
964 let fields_new_str = fields_list_new.join("`,`");
966 let fields_old_str = fields_list.join("`,`");
967 let mut sqls = String::with_capacity(
968 options.table_name.len() * 2 + fields_new_str.len() + fields_old_str.len() + 100
969 );
970 sqls.push_str("replace INTO ");
971 sqls.push_str(&options.table_name);
972 sqls.push_str("_tmp (`");
973 sqls.push_str(&fields_new_str);
974 sqls.push_str("`) select `");
975 sqls.push_str(&fields_old_str);
976 sqls.push_str("` from ");
977 sqls.push_str(&options.table_name);
978 sqls.push_str(";\r\n");
979
980 let mut drop_sql = String::with_capacity(options.table_name.len() + 20);
981 drop_sql.push_str("drop table ");
982 drop_sql.push_str(&options.table_name);
983 drop_sql.push_str(";\r\n");
984
985 let mut alter_sql = String::with_capacity(options.table_name.len() * 2 + 30);
986 alter_sql.push_str("alter table ");
987 alter_sql.push_str(&options.table_name);
988 alter_sql.push_str("_tmp rename to ");
989 alter_sql.push_str(&options.table_name);
990 alter_sql.push_str(";\r\n");
991
992 let mut drop_sql_temp = String::with_capacity(options.table_name.len() + 25);
993 drop_sql_temp.push_str("drop table ");
994 drop_sql_temp.push_str(&options.table_name);
995 drop_sql_temp.push_str("_tmp;\r\n");
996
997 if self.params.sql {
998 let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
999 if !unique.is_empty() {
1000 list.push(unique)
1001 }
1002 if !index.is_empty() {
1003 list.extend(index)
1004 }
1005 return JsonValue::from(list.join(""));
1006 }
1007
1008 if add.is_empty() && del.is_empty() && unique.is_empty() && index.is_empty() && put.is_empty()
1009 {
1010 return JsonValue::from(-1);
1011 }
1012
1013 let (state, _) = self.execute(sql.clone());
1014 let data = match state {
1015 true => {
1016 let (state, _) = self.execute(sqls.clone());
1017 match state {
1018 true => {
1019 let (state, _) = self.execute(drop_sql);
1020 match state {
1021 true => {
1022 let (state, _) = self.execute(alter_sql);
1023 match state {
1024 true => {
1025 let (_, new_table_fields_result) = self.query(format!("PRAGMA table_info(`{}`)", options.table_name));
1028 let mut new_table_fields = vec![];
1029 for item in new_table_fields_result.members() {
1030 if let Some(field_name) = item["name"].as_str() {
1031 new_table_fields.push(field_name.to_string());
1032 }
1033 }
1034
1035 if !unique.is_empty() {
1037 let (_, existing_unique_index_list) = self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
1039 let mut existing_unique_index_names = HashSet::with_capacity(existing_unique_index_list.len());
1040 for item in existing_unique_index_list.members() {
1041 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
1042 if is_unique {
1043 if let Some(name) = item["name"].as_str() {
1044 existing_unique_index_names.insert(name.to_string());
1045 }
1046 }
1047 }
1048
1049 let unique_index_name = if unique.contains("CREATE UNIQUE INDEX") {
1051 let start = unique.find("CREATE UNIQUE INDEX").unwrap() + 19;
1052 let end = unique[start..].find(" on ").unwrap_or(unique.len() - start);
1053 unique[start..start+end].trim().trim_matches('`').to_string()
1054 } else if unique.contains("create unique index") {
1055 let start = unique.find("create unique index").unwrap() + 19;
1056 let end = unique[start..].find(" on ").unwrap_or(unique.len() - start);
1057 unique[start..start+end].trim().trim_matches('`').to_string()
1058 } else {
1059 String::new()
1060 };
1061
1062 let unique_fields_str = if unique.contains("(") && unique.contains(")") {
1064 let start = unique.find("(").unwrap() + 1;
1065 let end = unique.find(")").unwrap();
1066 unique[start..end].to_string()
1067 } else {
1068 unique_fields.clone()
1069 };
1070
1071 if !unique_index_name.is_empty() && existing_unique_index_names.contains(unique_index_name.as_str()) {
1073 } else if Self::validate_index_fields_in_table(&unique_fields_str, &new_table_fields) {
1075 let (state, result) = self.execute(unique.clone());
1076 if state {
1077 } else {
1079 let error_msg = result.to_string().to_lowercase();
1081 if error_msg.contains("already exists") {
1082 } else {
1084 error!(
1085 "{} {} 唯一索引创建失败: {} SQL: {}",
1086 thread_id, options.table_name, result, unique
1087 );
1088 }
1089 }
1090 } else {
1091 }
1094 }
1095
1096 let (_, existing_index_list) = self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
1099 let mut existing_index_names = HashSet::with_capacity(existing_index_list.len());
1100 for item in existing_index_list.members() {
1101 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
1102 if !is_unique {
1103 if let Some(name) = item["name"].as_str() {
1104 existing_index_names.insert(name.to_string());
1105 }
1106 }
1107 }
1108
1109 let estimated_indexes = options.table_index.len();
1111 let mut index_names = Vec::with_capacity(estimated_indexes);
1112 for row in options.table_index.iter() {
1113 let mut index_name = String::new();
1114 for item in row.iter() {
1115 if index_name.is_empty() {
1116 index_name = format!("index_{}", item);
1117 } else {
1118 index_name = format!("{}_{}", index_name, item);
1119 }
1120 }
1121 let full_index_name = format!("{}_{}", options.table_name, index_name);
1123 index_names.push(full_index_name);
1124 }
1125
1126 let mut index_failed_count = 0;
1127 for (index_sql, index_name) in index.iter().zip(index_names.iter()) {
1128
1129 let index_fields_str = if index_sql.contains("(") && index_sql.contains(")") {
1131 let start = index_sql.find("(").unwrap() + 1;
1132 let end = index_sql.find(")").unwrap();
1133 index_sql[start..end].to_string()
1134 } else {
1135 String::new()
1136 };
1137
1138 if !index_name.is_empty() && existing_index_names.contains(index_name.as_str()) {
1140 continue;
1142 }
1143
1144 if !index_fields_str.is_empty() && Self::validate_index_fields_in_table(&index_fields_str, &new_table_fields) {
1145 let (state, result) = self.execute(index_sql.clone());
1146 match state {
1147 true => {
1148 }
1150 false => {
1151 let error_msg = result.to_string().to_lowercase();
1153 if error_msg.contains("already exists") {
1154 continue;
1156 }
1157 index_failed_count += 1;
1158 error!(
1159 "{} {} 索引创建失败: {} SQL: {}",
1160 thread_id, options.table_name, result, index_sql
1161 );
1162 }
1164 }
1165 } else {
1166 }
1169 }
1170
1171 if index_failed_count > 0 {
1172 }
1174
1175 return JsonValue::from(1);
1176 }
1177 false => {
1178 error!("{} 修改表名失败", options.table_name);
1179 return JsonValue::from(0);
1180 }
1181 }
1182 }
1183 false => {
1184 error!("{} 删除本表失败", options.table_name);
1185 return JsonValue::from(0);
1186 }
1187 }
1188 }
1189 false => {
1190 error!(
1191 "{} 添加tmp表记录失败 {:#} {:#}",
1192 options.table_name, sql, sqls
1193 );
1194 let sql = format!("drop table {}_tmp", options.table_name);
1195 let (_, _) = self.execute(sql);
1196 0
1197 }
1198 }
1199 }
1200 false => {
1201 error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
1202 let (_, _) = self.execute(drop_sql_temp);
1203 0
1204 }
1205 };
1206 JsonValue::from(data)
1207 }
1208
1209 fn table_info(&mut self, table: &str) -> JsonValue {
1210 {
1212 let fields = FIELDS.lock().unwrap();
1213 if let Some(cached) = fields.get(table) {
1214 return cached.clone();
1215 }
1216 }
1217 let sql = format!("PRAGMA table_info(`{table}`)");
1218 let (state, data) = self.query(sql);
1219
1220 match state {
1221 true => {
1222 let mut fields = object! {};
1223 for item in data.members() {
1224 if let Some(name) = item["name"].as_str() {
1225 fields[name] = item.clone();
1226 }
1227 }
1228 let fields_clone = fields.clone();
1230 FIELDS.lock().unwrap().insert(table.to_string(), fields_clone);
1231 data
1232 }
1233 false => object! {}
1234 }
1235 }
1236
1237 fn table_is_exist(&mut self, name: &str) -> bool {
1238 let name = match validate_table_name(name) {
1240 Ok(n) => n,
1241 Err(e) => {
1242 error!("表名验证失败: {}", e);
1243 return false;
1244 }
1245 };
1246 let table_name = add_table_prefix(&self.connection.prefix, name);
1248 let sql = format!(
1249 "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{table_name}'"
1250 );
1251 let (state, data) = self.query(sql);
1252 if state && !data.is_empty() {
1253 if let Some(count) = data[0]["count"].as_i64() {
1254 if count > 0 {
1255 return true;
1256 }
1257 }
1258 false
1259 } else {
1260 false
1261 }
1262 }
1263
1264 fn table(&mut self, name: &str) -> &mut Sqlite {
1265 self.params = Params::default(self.connection.mode.str().as_str());
1266 self.params.table = add_table_prefix(&self.connection.prefix, name);
1268 self.params.join_table = self.params.table.clone();
1269 self
1270 }
1271 fn change_table(&mut self, name: &str) -> &mut Self {
1272 self.params.join_table = name.to_string();
1273 self
1274 }
1275 fn autoinc(&mut self) -> &mut Self {
1276 self.params.autoinc = true;
1277 self
1278 }
1279
1280 fn fetch_sql(&mut self) -> &mut Self {
1281 self.params.sql = true;
1282 self
1283 }
1284
1285 fn order(&mut self, field: &str, by: bool) -> &mut Self {
1286 self.params.order[field] = {
1287 if by {
1288 "DESC"
1289 } else {
1290 "ASC"
1291 }
1292 }.into();
1293 self
1294 }
1295
1296 fn group(&mut self, field: &str) -> &mut Self {
1297 let fields: Vec<&str> = field.split(",").collect();
1298 for field in fields.iter() {
1299 let fields = field.to_string();
1300 self.params.group[fields.as_str()] = fields.clone().into();
1301 self.params.fields[fields.as_str()] = fields.clone().into();
1302 }
1303 self
1304 }
1305
1306 fn distinct(&mut self) -> &mut Self {
1307 self.params.distinct = true;
1308 self
1309 }
1310 fn json(&mut self, field: &str) -> &mut Self {
1311 let list: Vec<&str> = field.split(",").collect();
1312 for item in list.iter() {
1313 self.params.json[item.to_string().as_str()] = item.to_string().into();
1314 }
1315 self
1316 }
1317
1318 fn location(&mut self, field: &str) -> &mut Self {
1319 let list: Vec<&str> = field.split(",").collect();
1320 for item in list.iter() {
1321 self.params.location[item.to_string().as_str()] = item.to_string().into();
1322 }
1323 self
1324 }
1325
1326 fn field(&mut self, field: &str) -> &mut Self {
1327 let list: Vec<&str> = field.split(",").collect();
1328 let join_table = if self.params.join_table.is_empty() {
1329 self.params.table.clone()
1330 } else {
1331 self.params.join_table.clone()
1332 };
1333 for item in list.iter() {
1334 let item = item.to_string();
1335 if item.contains(" as ") {
1336 let text = item.split(" as ").collect::<Vec<&str>>().clone();
1337 self.params.fields[item] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
1338 } else {
1339 self.params.fields[item] = format!("{join_table}.`{item}`").into();
1340 }
1341 }
1342 self
1343 }
1344 fn hidden(&mut self, name: &str) -> &mut Self {
1345 let hidden: Vec<&str> = name.split(",").collect();
1346 let sql = format!("PRAGMA table_info(`{}`)", self.params.table);
1347 let (_, data) = self.query(sql);
1348 for item in data.members() {
1349 let name = item["name"].as_str().unwrap();
1350 if !hidden.contains(&name) {
1351 self.params.fields[name] = name.into();
1352 }
1353 }
1354 self
1355 }
1356
1357 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1358 let join_table = if self.params.join_table.is_empty() {
1359 self.params.table.clone()
1360 } else {
1361 self.params.join_table.clone()
1362 };
1363
1364 if value.is_boolean() {
1365 if value.as_bool().unwrap() {
1366 value = 1.into();
1367 } else {
1368 value = 0.into();
1369 }
1370 }
1371
1372 match compare {
1373 "between" => {
1374 self.params.where_and.push(format!(
1375 "{}.`{}` between '{}' AND '{}'",
1376 join_table, field, value[0], value[1]
1377 ));
1378 }
1379 "set" => {
1380 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
1381 let mut wheredata = vec![];
1382 for item in list.iter() {
1383 wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
1384 }
1385 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
1386 }
1387 "notin" => {
1388 let mut text = String::new();
1389 for item in value.members() {
1390 text = format!("{text},'{item}'");
1391 }
1392 text = text.trim_start_matches(",").into();
1393 self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
1394 }
1395 "in" => {
1396 let mut text = String::new();
1397 if value.is_array() {
1398 for item in value.members() {
1399 text = format!("{text},'{item}'");
1400 }
1401 } else {
1402 let value = value.to_string();
1403 let value: Vec<&str> = value.split(",").collect();
1404 for item in value.iter() {
1405 text = format!("{text},'{item}'");
1406 }
1407 }
1408 text = text.trim_start_matches(",").into();
1409
1410 self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
1411 }
1412 "=" => {
1413 if value.is_null() {
1414 self.params.where_and.push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
1415 } else {
1416 self.params.where_and.push(format!(
1417 "{join_table}.`{field}` {compare} '{value}'"
1418 ));
1419 }
1420 }
1421 _ => {
1422 if value.is_null() {
1423 self.params.where_and.push(format!("{join_table}.`{field}` {compare} {value}"));
1424 } else {
1425 self.params.where_and.push(format!(
1426 "{join_table}.`{field}` {compare} '{value}'"
1427 ));
1428 }
1429 }
1430 }
1431 self
1432 }
1433 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1434 let join_table = if self.params.join_table.is_empty() {
1435 self.params.table.clone()
1436 } else {
1437 self.params.join_table.clone()
1438 };
1439
1440 if value.is_boolean() {
1441 if value.as_bool().unwrap() {
1442 value = 1.into();
1443 } else {
1444 value = 0.into();
1445 }
1446 }
1447 match compare {
1448 "between" => {
1449 self.params.where_or.push(format!(
1450 "{}.`{}` between '{}' AND '{}'",
1451 join_table, field, value[0], value[1]
1452 ));
1453 }
1454 "set" => {
1455 let tt = value.to_string().replace(",", "%");
1456 self.params.where_or.push(format!("{join_table}.`{field}` like '%{tt}%'"));
1457 }
1458 "notin" => {
1459 let mut text = String::new();
1460 for item in value.members() {
1461 text = format!("{text},'{item}'");
1462 }
1463 text = text.trim_start_matches(",").into();
1464 self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
1465 }
1466 "in" => {
1467 let mut text = String::new();
1468 if value.is_array() {
1469 for item in value.members() {
1470 text = format!("{text},'{item}'");
1471 }
1472 } else {
1473 let value = value.as_str().unwrap();
1474 let value: Vec<&str> = value.split(",").collect();
1475 for item in value.iter() {
1476 text = format!("{text},'{item}'");
1477 }
1478 }
1479 text = text.trim_start_matches(",").into();
1480 self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
1481 }
1482 _ => {
1483 self.params.where_or.push(format!(
1484 "{join_table}.`{field}` {compare} '{value}'"
1485 ));
1486 }
1487 }
1488 self
1489 }
1490 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1491 self.params.where_column = format!(
1492 "{}.`{}` {} {}.`{}`",
1493 self.params.table, field_a, compare, self.params.table, field_b
1494 );
1495 self
1496 }
1497
1498 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1499 self.params.update_column.push(format!("{field_a} = {compare}"));
1500 self
1501 }
1502
1503 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1504 self.params.page = page;
1505 self.params.limit = limit;
1506 self
1507 }
1508
1509 fn column(&mut self, field: &str) -> JsonValue {
1510 self.field(field);
1511 self.group(field);
1512 let sql = self.params.select_sql();
1513 if self.params.sql {
1514 return JsonValue::from(sql.clone());
1515 }
1516 self.table_info(self.params.table.clone().as_str());
1517 let (state, data) = self.query(sql);
1518 if state {
1519 let mut list = array![];
1520 for item in data.members() {
1521 if self.params.json[field].is_empty() {
1522 list.push(item[field].clone()).unwrap();
1523 } else {
1524 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
1525 list.push(data).unwrap();
1526 }
1527 }
1528 list
1529 } else {
1530 array![]
1531 }
1532 }
1533
1534 fn count(&mut self) -> JsonValue {
1535 self.params.fields["count"] = "count(*) as count".to_string().into();
1536 let sql = self.params.select_sql();
1537 if self.params.sql {
1538 return JsonValue::from(sql.clone());
1539 }
1540 let (state, data) = self.query(sql);
1541 match state {
1542 true => data[0]["count"].clone(),
1543 false => JsonValue::from(0),
1544 }
1545 }
1546
1547 fn max(&mut self, field: &str) -> JsonValue {
1548 self.params.fields[field] = format!("max({field}) as {field}").into();
1549 let sql = self.params.select_sql();
1550 if self.params.sql {
1551 return JsonValue::from(sql.clone());
1552 }
1553 let (state, data) = self.query(sql);
1554 if state {
1555 if data.len() > 1 {
1556 return data;
1557 }
1558 data[0][field].clone()
1559 } else {
1560 JsonValue::from(0.0)
1561 }
1562 }
1563
1564 fn min(&mut self, field: &str) -> JsonValue {
1565 self.params.fields[field] = format!("min({field}) as {field}").into();
1566 let sql = self.params.select_sql();
1567 let (state, data) = self.query(sql);
1568 if state {
1569 if data.len() > 1 {
1570 return data;
1571 }
1572 data[0][field].clone()
1573 } else {
1574 JsonValue::from(0.0)
1575 }
1576 }
1577
1578 fn sum(&mut self, field: &str) -> JsonValue {
1579 self.params.fields[field] = format!("sum({field}) as {field}").into();
1580 let sql = self.params.select_sql();
1581 if self.params.sql {
1582 return JsonValue::from(sql.clone());
1583 }
1584 let (state, data) = self.query(sql);
1585 match state {
1586 true => {
1587 if data.len() > 1 {
1588 return data;
1589 }
1590 if self.params.fields.len() > 1 {
1591 return data[0].clone();
1592 }
1593 data[0][field].clone()
1594 }
1595 false => JsonValue::from(0),
1596 }
1597 }
1598 fn avg(&mut self, field: &str) -> JsonValue {
1599 self.params.fields[field] = format!("avg({field}) as {field}").into();
1600 let sql = self.params.select_sql();
1601 if self.params.sql {
1602 return JsonValue::from(sql.clone());
1603 }
1604 let (state, data) = self.query(sql);
1605 if state {
1606 if data.len() > 1 {
1607 return data;
1608 }
1609 data[0][field].clone()
1610 } else {
1611 JsonValue::from(0)
1612 }
1613 }
1614 fn select(&mut self) -> JsonValue {
1615 let sql = self.params.select_sql();
1616 if self.params.sql {
1617 return JsonValue::from(sql.clone());
1618 }
1619 self.table_info(self.params.table.clone().as_str());
1620 let (state, mut data) = self.query(sql.clone());
1621 match state {
1622 true => {
1623 for (field, _) in self.params.json.entries() {
1624 for item in data.members_mut() {
1625 if !item[field].is_empty() {
1626 let json = item[field].to_string();
1627 item[field] = match json::parse(&json) {
1628 Ok(e) => e,
1629 Err(_) => JsonValue::from(json),
1630 };
1631 }
1632 }
1633 }
1634 data.clone()
1635 }
1636 false => {
1637 error!("{data:?}");
1638 array![]
1639 }
1640 }
1641 }
1642 fn find(&mut self) -> JsonValue {
1643 self.params.page = 1;
1644 self.params.limit = 1;
1645 let sql = self.params.select_sql();
1646 if self.params.sql {
1647 return JsonValue::from(sql.clone());
1648 }
1649
1650 self.table_info(self.params.table.clone().as_str());
1651 let (state, mut data) = self.query(sql.clone());
1652 match state {
1653 true => {
1654 if data.is_empty() {
1655 return object! {};
1656 }
1657 for (field, _) in self.params.json.entries() {
1658 if !data[0][field].is_empty() {
1659 let json = data[0][field].to_string();
1660 let json = json::parse(&json).unwrap_or(array![]);
1661 data[0][field] = json;
1662 } else {
1663 data[0][field] = array![];
1664 }
1665 }
1666 data[0].clone()
1667 }
1668 false => {
1669 error!("{data:?}");
1670 object! {}
1671 }
1672 }
1673 }
1674
1675 fn value(&mut self, field: &str) -> JsonValue {
1676 self.params.fields = object! {};
1677 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1678 self.params.page = 1;
1679 self.params.limit = 1;
1680 let sql = self.params.select_sql();
1681 if self.params.sql {
1682 return JsonValue::from(sql.clone());
1683 }
1684 self.table_info(self.params.table.clone().as_str());
1685 let (state, mut data) = self.query(sql.clone());
1686 match state {
1687 true => {
1688 for (field, _) in self.params.json.entries() {
1689 if !data[0][field].is_empty() {
1690 let json = data[0][field].to_string();
1691 let json = json::parse(&json).unwrap_or(array![]);
1692 data[0][field] = json;
1693 } else {
1694 data[0][field] = array![];
1695 }
1696 }
1697 data[0][field].clone()
1698 }
1699 false => {
1700 if self.connection.debug {
1701 info!("{data:?}");
1702 }
1703 JsonValue::Null
1704 }
1705 }
1706 }
1707
1708 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1709 let mut fields = vec![];
1710 let mut values = vec![];
1711
1712 if !self.params.autoinc && data["id"].is_empty() {
1713 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1714 }
1715 for (field, value) in data.entries() {
1716 fields.push(format!("`{field}`"));
1717
1718 if value.is_string() {
1719 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1720 continue;
1721 } else if value.is_array() || value.is_object() {
1722 if self.params.json[field].is_empty() {
1723 let json_str = value.to_string();
1724 values.push(format!("'{}'", json_str.replace("'", "''")));
1725 } else {
1726 let json = value.to_string();
1727 let json = json.replace("'", "''");
1728 values.push(format!("'{json}'"));
1729 }
1730 continue;
1731 } else if value.is_number() || value.is_boolean() || value.is_null() {
1732 values.push(format!("{value}"));
1733 continue;
1734 } else {
1735 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1736 continue;
1737 }
1738 }
1739 let fields = fields.join(",");
1740 let values = values.join(",");
1741
1742 let sql = format!(
1743 "INSERT INTO `{}` ({}) VALUES ({});",
1744 self.params.table, fields, values
1745 );
1746 if self.params.sql {
1747 return JsonValue::from(sql.clone());
1748 }
1749 let (state, ids) = self.execute(sql);
1750 match state {
1751 true => {
1752 if self.params.autoinc {
1753 let (state, ids) = self.query(format!("select max(id) as id from {}", self.params.table));
1754 return match state {
1755 true => ids[0]["id"].clone(),
1756 false => {
1757 error!("{ids}");
1758 JsonValue::from("")
1759 }
1760 };
1761 }
1762 data["id"].clone()
1763 }
1764 false => {
1765 error!("{ids}");
1766 JsonValue::from("")
1767 }
1768 }
1769 }
1770 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1771 let mut fields = String::new();
1772
1773 if !self.params.autoinc && data[0]["id"].is_empty() {
1774 data[0]["id"] = "".into();
1775 }
1776 for (field, _) in data[0].entries() {
1777 fields = format!("{fields},`{field}`");
1778 }
1779 fields = fields.trim_start_matches(",").parse().unwrap();
1780
1781 let core_count = num_cpus::get();
1782 let mut p = pools::Pool::new(core_count * 4);
1783 let autoinc = self.params.autoinc;
1784 for list in data.members() {
1785 let mut item = list.clone();
1786 p.execute(move |pcindex| {
1787 if !autoinc && item["id"].is_empty() {
1788 let id = format!(
1789 "{:X}{:X}",
1790 Local::now().timestamp_nanos_opt().unwrap(),
1791 pcindex
1792 );
1793 item["id"] = id.into();
1794 }
1795 let mut values = "".to_string();
1796 for (_, value) in item.entries() {
1797 if value.is_string() {
1798 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1799 } else if value.is_number() || value.is_boolean() {
1800 values = format!("{values},{value}");
1801 } else {
1802 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1803 }
1804 }
1805 values = format!("({})", values.trim_start_matches(","));
1806 array![item["id"].clone(), values]
1807 });
1808 }
1809 let (ids_list, mut values) = p.insert_all();
1810
1811 values = values.trim_start_matches(",").parse().unwrap();
1812
1813 let sql = format!(
1814 "INSERT INTO {} ({}) VALUES {};",
1815 self.params.table, fields, values
1816 );
1817 if self.params.sql {
1818 return JsonValue::from(sql.clone());
1819 }
1820 let (state, data) = self.execute(sql.clone());
1821 match state {
1822 true => {
1823 if self.params.autoinc {
1824 let (state, ids) = self.query(format!(
1825 "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1826 self.params.table,
1827 ids_list.len()
1828 ));
1829 return match state {
1830 true => {
1831 let mut idlist = array![];
1832 for item in ids.members() {
1833 idlist.push(item["id"].clone()).unwrap();
1834 }
1835 idlist
1836 }
1837 false => {
1838 error!("批量添加失败: {ids:?} {sql}");
1839 array![]
1840 }
1841 };
1842 }
1843 JsonValue::from(ids_list)
1844 }
1845 false => {
1846 error!("批量添加失败: {data:?} {sql}");
1847 array![]
1848 }
1849 }
1850 }
1851
1852 fn update(&mut self, data: JsonValue) -> JsonValue {
1853 let mut values = vec![];
1854
1855 for (field, value) in data.entries() {
1856 if value.is_string() {
1857 values.push(format!(
1858 "`{}` = '{}'",
1859 field,
1860 value.to_string().replace("'", "''")
1861 ));
1862 } else if value.is_array() || value.is_object() {
1863 if self.params.json[field].is_empty() {
1864 values.push(format!("`{field}` = '{value}'"));
1865 } else {
1866 let json = value.to_string();
1867 let json = json.replace("'", "''");
1868 values.push(format!("`{field}` = '{json}'"));
1869 }
1870 continue;
1871 } else if value.is_number() || value.is_boolean() || value.is_null() {
1872 values.push(format!("`{field}` = {value} "));
1873 } else {
1874 values.push(format!("`{field}` = '{value}' "));
1875 }
1876 }
1877
1878 for (field, value) in self.params.inc_dec.entries() {
1879 values.push(format!("{} = {}", field, value.to_string().clone()));
1880 }
1881
1882 let values = values.join(",");
1883 let sql = format!(
1884 "UPDATE `{}` SET {} {} {};",
1885 self.params.table.clone(),
1886 values,
1887 self.params.where_sql(),
1888 self.params.page_limit_sql()
1889 );
1890 if self.params.sql {
1891 return JsonValue::from(sql.clone());
1892 }
1893 let (state, data) = self.execute(sql);
1894 if state {
1895 data
1896 } else {
1897 error!("{data}");
1898 JsonValue::from(0)
1899 }
1900 }
1901
1902 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1903 let mut values = vec![];
1904 let mut ids = vec![];
1905 for (field, _) in data[0].entries() {
1906 if field == "id" {
1907 continue;
1908 }
1909 let mut fields = vec![];
1910 for row in data.members() {
1911 let value = row[field].clone();
1912 let id = row["id"].clone();
1913 ids.push(id.clone());
1914 if value.is_string() {
1915 fields.push(format!(
1916 "WHEN '{}' THEN '{}'",
1917 id,
1918 value.to_string().replace("'", "''")
1919 ));
1920 } else if value.is_array() || value.is_object() {
1921 if self.params.json[field].is_empty() {
1922 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1923 } else {
1924 let json = value.to_string();
1925 let json = json.replace("'", "''");
1926 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1927 }
1928 continue;
1929 } else if value.is_number() || value.is_boolean() || value.is_null() {
1930 fields.push(format!("WHEN '{id}' THEN {value}"));
1931 } else {
1932 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1933 }
1934 }
1935 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1936 }
1937
1938 self.where_and("id", "in", ids.into());
1939 for (field, value) in self.params.inc_dec.entries() {
1940 values.push(format!("{} = {}", field, value.to_string().clone()));
1941 }
1942
1943 let values = values.join(",");
1944 let sql = format!(
1945 "UPDATE {} SET {} {} {};",
1946 self.params.table.clone(),
1947 values,
1948 self.params.where_sql(),
1949 self.params.page_limit_sql()
1950 );
1951 if self.params.sql {
1952 return JsonValue::from(sql.clone());
1953 }
1954 let (state, data) = self.execute(sql);
1955 if state {
1956 data
1957 } else {
1958 error!("{data:?}");
1959 JsonValue::from(0)
1960 }
1961 }
1962 fn delete(&mut self) -> JsonValue {
1963 let sql = format!(
1964 "delete FROM `{}` {};",
1965 self.params.table.clone(),
1966 self.params.where_sql()
1967 );
1968 if self.params.sql {
1969 return JsonValue::from(sql.clone());
1970 }
1971 let (state, data) = self.execute(sql);
1972 match state {
1973 true => data,
1974 false => {
1975 error!("delete 失败>>>{data:?}");
1976 JsonValue::from(0)
1977 }
1978 }
1979 }
1980
1981 fn transaction(&mut self) -> bool {
1982 let thread_id = get_thread_id();
1983 {
1985 use std::fs::OpenOptions;
1986 use std::io::Write;
1987 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
1988 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"sqlite.rs:1985","message":"transaction() entry","data":{{"thread_id":"{}"}},"timestamp":{}}}"#, thread_id, chrono::Utc::now().timestamp_millis());
1989 }
1990 }
1991 let transaction = {
1994 let tr_count = TR_COUNT.lock().unwrap();
1995 *tr_count.get(&*thread_id.clone()).unwrap_or(&0)
1996 };
1997 {
1999 use std::fs::OpenOptions;
2000 use std::io::Write;
2001 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2002 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"sqlite.rs:1992","message":"transaction count check","data":{{"thread_id":"{}","transaction_count":{}}},"timestamp":{}}}"#, thread_id, transaction, chrono::Utc::now().timestamp_millis());
2003 }
2004 }
2005 if transaction > 0 {
2007 let new_count = transaction + 1;
2008 TR_COUNT.lock().unwrap().insert(thread_id.clone(), new_count);
2009 {
2011 use std::fs::OpenOptions;
2012 use std::io::Write;
2013 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2014 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"sqlite.rs:1995","message":"nested transaction","data":{{"thread_id":"{}","new_count":{}}},"timestamp":{}}}"#, thread_id, new_count, chrono::Utc::now().timestamp_millis());
2015 }
2016 }
2017 return true;
2019 }
2020
2021 loop {
2022 let mut t = TR_COUNT.lock().unwrap();
2023 if t.is_empty() {
2024 t.insert(thread_id.clone(), 1);
2025 break;
2026 }
2027 drop(t);
2028 thread::yield_now();
2029 }
2030
2031 let flags = OpenFlags::new().with_read_write().with_no_mutex();
2032 let db = match Connect::open_thread_safe_with_flags(
2033 self.connection.clone().get_dsn().as_str(),
2034 flags,
2035 ) {
2036 Ok(e) => Arc::new(e),
2037 Err(_) => {
2038 {
2040 use std::fs::OpenOptions;
2041 use std::io::Write;
2042 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2043 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"B","location":"sqlite.rs:2015","message":"failed to open connection","data":{{"thread_id":"{}"}},"timestamp":{}}}"#, thread_id, chrono::Utc::now().timestamp_millis());
2044 }
2045 }
2046 return false;
2048 }
2049 };
2050 TR.lock().unwrap().insert(thread_id.clone(), db);
2051 let (state, data) = self.query("BEGIN".to_string());
2052 {
2054 use std::fs::OpenOptions;
2055 use std::io::Write;
2056 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2057 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"sqlite.rs:2019","message":"BEGIN result","data":{{"thread_id":"{}","state":{},"data":"{}"}},"timestamp":{}}}"#, thread_id, state, data.to_string().replace("\"", "\\\""), chrono::Utc::now().timestamp_millis());
2058 }
2059 }
2060 if state {
2062 true
2063 } else {
2064 error!("{thread_id} 启动事务失败: {data}");
2065 TR.lock().unwrap().remove(&thread_id.clone());
2066 TR_COUNT.lock().unwrap().remove(&thread_id.clone());
2067 false
2068 }
2069 }
2070
2071 fn commit(&mut self) -> bool {
2072 let thread_id = get_thread_id();
2073 {
2075 use std::fs::OpenOptions;
2076 use std::io::Write;
2077 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2078 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"D","location":"sqlite.rs:2030","message":"commit() entry","data":{{"thread_id":"{}"}},"timestamp":{}}}"#, thread_id, chrono::Utc::now().timestamp_millis());
2079 }
2080 }
2081 let mut transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
2084 {
2086 use std::fs::OpenOptions;
2087 use std::io::Write;
2088 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2089 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"D","location":"sqlite.rs:2033","message":"transaction count before commit","data":{{"thread_id":"{}","transaction_count":{}}},"timestamp":{}}}"#, thread_id, transaction, chrono::Utc::now().timestamp_millis());
2090 }
2091 }
2092 if transaction > 1 {
2094 transaction -= 1;
2095 TR_COUNT.lock().unwrap().insert(thread_id.clone(), transaction);
2096 {
2098 use std::fs::OpenOptions;
2099 use std::io::Write;
2100 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2101 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"D","location":"sqlite.rs:2037","message":"nested commit, decrement count","data":{{"thread_id":"{}","new_count":{}}},"timestamp":{}}}"#, thread_id, transaction, chrono::Utc::now().timestamp_millis());
2102 }
2103 }
2104 return true;
2106 }
2107 let sql = "COMMIT";
2108 let (state, data) = self.query(sql.to_string());
2109 {
2111 use std::fs::OpenOptions;
2112 use std::io::Write;
2113 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2114 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"E","location":"sqlite.rs:2040","message":"COMMIT result BEFORE cleanup","data":{{"thread_id":"{}","state":{},"data":"{}"}},"timestamp":{}}}"#, thread_id, state, data.to_string().replace("\"", "\\\""), chrono::Utc::now().timestamp_millis());
2115 }
2116 }
2117 TR_COUNT.lock().unwrap().remove(&*thread_id.clone());
2119 TR.lock().unwrap().remove(&*thread_id.clone());
2120 {
2122 use std::fs::OpenOptions;
2123 use std::io::Write;
2124 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2125 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"E","location":"sqlite.rs:2042","message":"resources cleaned up","data":{{"thread_id":"{}","state":{}}},"timestamp":{}}}"#, thread_id, state, chrono::Utc::now().timestamp_millis());
2126 }
2127 }
2128 if state {
2131 true
2132 } else {
2133 let error_msg = data.as_str().map(|s| s.to_string()).unwrap_or_else(|| data.to_string());
2134 error!(
2135 "提交事务失败 - 线程ID: {} | 数据库配置: {} | 数据库路径: {} | SQL: {} | 错误详情: {}",
2136 thread_id,
2137 self.default,
2138 self.connection.database,
2139 sql,
2140 error_msg
2141 );
2142 false
2143 }
2144 }
2145
2146 fn rollback(&mut self) -> bool {
2147 let thread_id = get_thread_id();
2148 {
2150 use std::fs::OpenOptions;
2151 use std::io::Write;
2152 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2153 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"sqlite.rs:2060","message":"rollback() entry","data":{{"thread_id":"{}"}},"timestamp":{}}}"#, thread_id, chrono::Utc::now().timestamp_millis());
2154 }
2155 }
2156 let sql = "ROLLBACK";
2158
2159 let mut t = *TR_COUNT.lock().unwrap().get(&thread_id).unwrap_or(&0);
2160 {
2162 use std::fs::OpenOptions;
2163 use std::io::Write;
2164 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2165 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"sqlite.rs:2064","message":"transaction count before rollback","data":{{"thread_id":"{}","transaction_count":{}}},"timestamp":{}}}"#, thread_id, t, chrono::Utc::now().timestamp_millis());
2166 }
2167 }
2168 if t > 1 {
2170 t -= 1;
2171 TR_COUNT.lock().unwrap().insert(thread_id.clone(), t);
2172 {
2174 use std::fs::OpenOptions;
2175 use std::io::Write;
2176 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2177 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"sqlite.rs:2068","message":"nested rollback, decrement count","data":{{"thread_id":"{}","new_count":{}}},"timestamp":{}}}"#, thread_id, t, chrono::Utc::now().timestamp_millis());
2178 }
2179 }
2180 return true;
2182 }
2183 let (state, data) = self.query(sql.to_string());
2184 {
2186 use std::fs::OpenOptions;
2187 use std::io::Write;
2188 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2189 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"G","location":"sqlite.rs:2070","message":"ROLLBACK result BEFORE cleanup","data":{{"thread_id":"{}","state":{},"data":"{}"}},"timestamp":{}}}"#, thread_id, state, data.to_string().replace("\"", "\\\""), chrono::Utc::now().timestamp_millis());
2190 }
2191 }
2192 TR_COUNT.lock().unwrap().remove(&*thread_id.clone());
2194 TR.lock().unwrap().remove(&*thread_id.clone());
2195 {
2197 use std::fs::OpenOptions;
2198 use std::io::Write;
2199 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/Users/xiaduan/coding/scs/.cursor/debug.log") {
2200 let _ = writeln!(file, r#"{{"sessionId":"debug-session","runId":"run1","hypothesisId":"G","location":"sqlite.rs:2072","message":"resources cleaned up","data":{{"thread_id":"{}","state":{}}},"timestamp":{}}}"#, thread_id, state, chrono::Utc::now().timestamp_millis());
2201 }
2202 }
2203 if state {
2206 true
2207 } else {
2208 let error_msg = data.as_str().map(|s| s.to_string()).unwrap_or_else(|| data.to_string());
2209 error!(
2210 "回滚事务失败 - 线程ID: {} | 数据库配置: {} | 数据库路径: {} | SQL: {} | 错误详情: {}",
2211 thread_id,
2212 self.default,
2213 self.connection.database,
2214 sql,
2215 error_msg
2216 );
2217 false
2218 }
2219 }
2220
2221 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2222 let (state, data) = self.query(sql.to_string());
2223 match state {
2224 true => Ok(data),
2225 false => Err(data.to_string()),
2226 }
2227 }
2228
2229 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2230 let (state, data) = self.execute(sql.to_string());
2231 match state {
2232 true => Ok(data),
2233 false => Err(data.to_string()),
2234 }
2235 }
2236
2237 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2238 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
2239 self
2240 }
2241
2242 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2243 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
2244 self
2245 }
2246
2247 fn buildsql(&mut self) -> String {
2248 self.fetch_sql();
2249 let sql = self.select().to_string();
2250 format!("( {} ) `{}`", sql, self.params.table)
2251 }
2252
2253 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2254 for field in fields {
2255 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2256 }
2257 self
2258 }
2259
2260 fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
2261 let main_table = if main_table.is_empty() {
2262 self.params.table.clone()
2263 } else {
2264 main_table.to_string()
2265 };
2266 self.params.join_table = right_table.to_string();
2267 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2268 self
2269 }
2270
2271 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2272 let main_fields = if main_fields.is_empty() {
2273 "id"
2274 } else {
2275 main_fields
2276 };
2277 let second_fields = if second_fields.is_empty() {
2278 self.params.table.clone()
2279 } else {
2280 second_fields.to_string().clone()
2281 };
2282 let sec_table_name = format!("{}{}", table, "_2");
2283 let second_table = format!("{} {}", table, sec_table_name.clone());
2284 self.params.join_table = sec_table_name.clone();
2285 self.params.join.push(format!(
2286 " INNER JOIN {} ON {}.{} = {}.{}",
2287 second_table, self.params.table, main_fields, sec_table_name, second_fields
2288 ));
2289 self
2290 }
2291}