1use crate::pools;
2use crate::types::{add_table_prefix, validate_table_name, DbMode, Mode, Params, TableOptions};
3use crate::Connection;
4use chrono::Local;
5use json::{array, object, JsonValue};
6use lazy_static::lazy_static;
7use log::{error, info};
8use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
9use std::cell::Cell;
10use std::collections::{HashMap, HashSet};
11use std::sync::{Arc, Mutex};
12use std::thread;
13
14thread_local! {
15 static THREAD_ID_CACHE: Cell<Option<String>> = const { Cell::new(None) };
16}
17
18fn get_thread_id() -> String {
19 THREAD_ID_CACHE.with(|cache| {
20 if let Some(id) = cache.take() {
21 cache.set(Some(id.clone()));
22 id
23 } else {
24 let id = format!("{:?}", thread::current().id());
25 cache.set(Some(id.clone()));
26 id
27 }
28 })
29}
30
31lazy_static! {
32 static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
33 static ref TR: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
34 static ref TR_COUNT: Mutex<HashMap<String, u32>> = Mutex::new(HashMap::new());
35 static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
36 static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
37}
38
39#[derive(Clone, Debug)]
40pub struct Sqlite {
41 pub connection: Connection,
43 pub default: String,
45
46 pub params: Params,
47}
48
49impl Sqlite {
50 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
51 let flags = OpenFlags::new().with_create().with_read_write();
52 match Connect::open_thread_safe_with_flags(connection.clone().get_dsn().as_str(), flags) {
53 Ok(e) => {
54 DBS.lock().unwrap().insert(default.clone(), Arc::new(e));
55 Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("sqlite"),
59 })
60 }
61 Err(e) => {
62 error!(
63 "sqlite 启动失败: {} {}",
64 e,
65 connection.clone().get_dsn().as_str()
66 );
67 Err(e.to_string())
68 }
69 }
70 }
71 fn query_handle(&mut self, mut statement: Statement, sql: String) -> (bool, JsonValue) {
72 let thread_id = get_thread_id();
73
74 let mut data = array![];
75 while let State::Row = match statement.next() {
76 Ok(e) => e,
77 Err(e) => {
78 let transaction = *TR_COUNT
79 .lock()
80 .unwrap()
81 .get(&*thread_id.clone())
82 .unwrap_or(&0);
83 if transaction > 0 {
84 error!("{} 查询事务: {} {}", thread_id, e, sql.clone());
85 } else {
86 error!("{} 非事务查询: {} {}", thread_id, e, sql.clone());
87 }
88 return (false, data);
89 }
90 } {
91 let mut list = object! {};
92 let mut index = 0;
93 for field in statement.column_names().iter() {
94 if !list[field.as_str()].is_null() {
95 index += 1;
96 continue;
97 }
98 match statement.column_type(field.as_str()) {
99 Ok(types) => match types {
100 Type::String => {
101 let data = statement.read::<String, _>(index).unwrap();
102 match data.as_str() {
103 "false" => {
104 list[field.as_str()] = JsonValue::from(false);
105 }
106 "true" => {
107 list[field.as_str()] = JsonValue::from(true);
108 }
109 _ => {
110 list[field.as_str()] = JsonValue::from(data.clone());
111 }
112 }
113 }
114 Type::Integer => {
115 let field_info = FIELDS
117 .lock()
118 .map(|f| f.get(&self.params.table).cloned())
119 .unwrap_or(None);
120 if let Some(fields) = field_info {
121 if fields[field.clone()].is_empty() {
122 let data = statement.read::<i64, _>(index).unwrap();
123 list[field.as_str()] = data.into();
124 continue;
125 }
126 match fields[field.clone()]["type"].as_str().unwrap() {
127 "INTEGER" => {
128 let data = statement.read::<i64, _>(index).unwrap();
129 list[field.as_str()] = JsonValue::from(data == 1);
130 }
131 x if x.contains("int(") => {
132 let data = statement.read::<i64, _>(index).unwrap();
133 list[field.as_str()] = data.into();
134 }
135 x if x.contains("decimal(") && x.ends_with(",0)") => {
136 let data = statement.read::<f64, _>(index).unwrap();
137 list[field.as_str()] = data.into();
138 }
139 _ => {
140 let data = statement.read::<i64, _>(index).unwrap();
141 list[field.as_str()] = data.into();
142 }
143 }
144 }
145 }
146 Type::Float => {
147 let data = statement.read::<f64, _>(index).unwrap();
148 list[field.as_str()] = JsonValue::from(data);
149 }
150 Type::Binary => {
151 let data = statement.read::<String, _>(index).unwrap();
152 list[field.as_str()] = JsonValue::from(data.clone());
153 }
154 Type::Null => match statement.read::<String, _>(index) {
155 Ok(data) => {
156 list[field.as_str()] = JsonValue::from(data.clone());
157 }
158 Err(_) => match statement.read::<f64, _>(index) {
159 Ok(data) => {
160 if data == 0.0 {
161 list[field.as_str()] = JsonValue::from("");
162 } else {
163 list[field.as_str()] = JsonValue::from(data);
164 }
165 }
166 Err(_) => match statement.read::<i64, _>(index) {
167 Ok(data) => {
168 if data == 0 {
169 list[field.as_str()] = JsonValue::from("");
170 } else {
171 list[field.as_str()] = JsonValue::from(data);
172 }
173 }
174 Err(e) => {
175 error!("Type:{} {:?}", field.as_str(), e);
176 }
177 },
178 },
179 },
180 },
181 Err(e) => {
182 error!("query Err: {e:?}");
183 }
184 }
185 index += 1;
186 }
187 data.push(list).unwrap();
188 }
189 (true, data)
190 }
191 pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
192 let thread_id = get_thread_id();
193
194 let transaction = *TR_COUNT
195 .lock()
196 .unwrap()
197 .get(&*thread_id.clone())
198 .unwrap_or(&0);
199 if transaction > 0 {
200 let dbs = match TR.lock() {
203 Ok(dbs) => dbs,
204 Err(e) => {
205 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
206 return (false, JsonValue::from("数据库锁定失败"));
207 }
208 };
209 let db = dbs.get(&*thread_id.clone()).unwrap().clone();
210 let x = match db.prepare(sql.clone()) {
211 Ok(statement) => self.query_handle(statement, sql.clone()),
212 Err(e) => {
213 error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
214 (false, e.to_string().into())
215 }
216 };
217 x
218 } else {
219 let dbs = match DBS.lock() {
221 Ok(dbs) => dbs,
222 Err(e) => {
223 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
224 return (false, JsonValue::from("数据库锁定失败"));
225 }
226 };
227 let db = dbs.get(&*self.default).unwrap().clone();
228 let x = match db.prepare(sql.clone()) {
229 Ok(statement) => self.query_handle(statement, sql.clone()),
230 Err(e) => {
231 error!("{thread_id} 查询非事务: Err: {e}");
232 (false, e.to_string().into())
233 }
234 };
235 x
236 }
237 }
238
239 pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
240 let thread_id = get_thread_id();
241
242 let transaction = *TR_COUNT
243 .lock()
244 .unwrap()
245 .get(&*thread_id.clone())
246 .unwrap_or(&0);
247 if transaction > 0 {
248 let dbs = match TR.lock() {
251 Ok(dbs) => dbs,
252 Err(e) => {
253 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
254 return (false, JsonValue::from("数据库锁定失败"));
255 }
256 };
257 let db = match dbs.get(&*thread_id) {
258 Some(db) => db.clone(),
259 None => {
260 error!(
261 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
262 thread_id, self.default, sql
263 );
264 return (false, JsonValue::from("未找到默认数据库配置"));
265 }
266 };
267 match db.execute(sql.clone()) {
268 Ok(_) => {
269 let count = db.change_count();
270 if self.connection.debug {
271 info!(
272 "{} count:{} total_count:{}",
273 thread_id,
274 count,
275 db.total_change_count()
276 );
277 }
278 (true, JsonValue::from(count))
279 }
280 Err(e) => {
281 error!("{} 执行事务: \r\nErr: {}\r\n{}", thread_id, e, sql.clone());
282 (false, JsonValue::from(e.to_string()))
283 }
284 }
285 } else {
286 if self.connection.debug {
287 info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
288 }
289 let dbs = match DBS.lock() {
290 Ok(dbs) => dbs,
291 Err(e) => {
292 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
293 return (false, JsonValue::from("数据库锁定失败"));
294 }
295 };
296
297 let db = match dbs.get(&*self.default) {
298 Some(db) => db.clone(),
299 None => {
300 error!(
301 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
302 thread_id, self.default, sql
303 );
304 return (false, JsonValue::from("未找到默认数据库配置"));
305 }
306 };
307 match db.execute(sql.clone()) {
308 Ok(_) => {
309 let count = db.change_count();
310 if self.connection.debug {
311 info!(
312 "{} count: {} total_count: {}",
313 thread_id,
314 count,
315 db.total_change_count()
316 );
317 }
318 (true, JsonValue::from(count))
319 }
320 Err(e) => {
321 let error_msg = e.to_string().to_lowercase();
323 if error_msg.contains("already exists") {
324 (true, JsonValue::from(0))
326 } else {
327 error!(
328 "{} 执行非事务: Err: {}\r\nSQL: {}",
329 thread_id,
330 e,
331 sql.clone()
332 );
333 (false, JsonValue::from(e.to_string()))
334 }
335 }
336 }
337 }
338 }
339
340 fn execute_ddl(&mut self, sql: String) -> (bool, JsonValue) {
344 let thread_id = get_thread_id();
345
346 if self.connection.debug {
349 info!("{} 执行 DDL(非事务): \r\nsql: {}", thread_id, sql.clone());
350 }
351
352 let dbs = match DBS.lock() {
353 Ok(dbs) => dbs,
354 Err(e) => {
355 error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
356 return (false, JsonValue::from("数据库锁定失败"));
357 }
358 };
359
360 let db = match dbs.get(&*self.default) {
361 Some(db) => db.clone(),
362 None => {
363 error!(
364 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
365 thread_id, self.default, sql
366 );
367 return (false, JsonValue::from("未找到默认数据库配置"));
368 }
369 };
370
371 match db.execute(sql.clone()) {
372 Ok(_) => {
373 let count = db.change_count();
374 if self.connection.debug {
375 info!(
376 "{} DDL 执行成功: count: {} total_count: {}",
377 thread_id,
378 count,
379 db.total_change_count()
380 );
381 }
382 (true, JsonValue::from(count))
383 }
384 Err(e) => {
385 let error_msg = e.to_string().to_lowercase();
387 if error_msg.contains("already exists") {
388 (true, JsonValue::from(0))
390 } else {
391 error!(
392 "{} 执行 DDL 失败: Err: {}\r\nSQL: {}",
393 thread_id,
394 e,
395 sql.clone()
396 );
397 (false, JsonValue::from(e.to_string()))
398 }
399 }
400 }
401 }
402}
403
404impl DbMode for Sqlite {
405 fn database_tables(&mut self) -> JsonValue {
406 let sql = "select name from sqlite_master where type='table' order by name;".to_string();
407 match self.sql(sql.as_str()) {
408 Ok(e) => {
409 let mut list = vec![];
410 for item in e.members() {
411 list.push(item["name"].clone());
412 }
413 list.into()
414 }
415 Err(_) => {
416 array![]
417 }
418 }
419 }
420 fn database_create(&mut self, _name: &str) -> bool {
421 true
424 }
425}
426
427impl Sqlite {
428 fn validate_index_fields_in_table(index_fields: &str, table_fields: &[String]) -> bool {
430 let fields: Vec<String> = index_fields
432 .split(',')
433 .map(|f| f.trim_matches('`').trim().to_string())
434 .collect();
435
436 for field in fields {
437 if !table_fields.contains(&field) {
438 return false;
439 }
440 }
441 true
442 }
443
444 fn can_drop_index(index_name: &str) -> bool {
447 !index_name.starts_with("sqlite_autoindex_")
449 }
450}
451
452impl Mode for Sqlite {
453 fn table_create(&mut self, options: TableOptions) -> JsonValue {
454 let thread_id = get_thread_id();
455
456 let table_name = match validate_table_name(&options.table_name) {
458 Ok(n) => n,
459 Err(e) => {
460 error!("表名验证失败: {}", e);
461 return JsonValue::from(false);
462 }
463 };
464
465 if options.table_fields.is_empty() {
467 error!("{} 表 {} 字段列表为空,无法创建表", thread_id, table_name);
468 return JsonValue::from(false);
469 }
470
471 let estimated_capacity = options.table_fields.len() * 100;
473 let mut sql = String::with_capacity(estimated_capacity);
474 let mut unique_fields = String::new();
476 let mut unique_name = String::new();
477 let mut unique = String::new();
478 for item in options.table_unique.iter() {
479 if unique_fields.is_empty() {
480 unique_fields = format!("`{item}`");
481 unique_name = format!("{}_unique_{}", table_name, item);
482 } else {
483 unique_fields = format!("{unique_fields},`{item}`");
484 unique_name = format!("{unique_name}_{item}");
485 }
486 }
487 if !unique_name.is_empty() {
489 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
490 let index_name = format!("unique_{}", md5);
491 unique = format!(
492 "CREATE UNIQUE INDEX `{}` on {} ({});\r\n",
493 index_name, table_name, unique_fields
494 );
495 }
496
497 let fields_list: Vec<String> = options
499 .table_fields
500 .entries()
501 .map(|(name, _)| name.to_string())
502 .collect();
503
504 if !fields_list.contains(&options.table_key) {
506 error!(
507 "{} 表 {} 主键字段验证失败: 主键字段 '{}' 不在字段列表中",
508 thread_id, table_name, options.table_key
509 );
510 return JsonValue::from(false);
511 }
512
513 if !unique_fields.is_empty()
515 && !Self::validate_index_fields_in_table(&unique_fields, &fields_list)
516 {
517 error!(
518 "{} 表 {} 唯一索引字段验证失败: 字段 {} 不在字段列表中",
519 thread_id, table_name, unique_fields
520 );
521 return JsonValue::from(false);
522 }
523
524 let estimated_indexes = options.table_index.len();
526 let mut index = Vec::with_capacity(estimated_indexes);
527 let mut index_names = Vec::with_capacity(estimated_indexes); for row in options.table_index.iter() {
529 let mut index_fields = String::new();
530 let mut index_name = String::new();
531 for item in row.iter() {
532 if index_fields.is_empty() {
533 index_fields = format!("`{item}`");
534 index_name = format!("index_{item}");
535 } else {
536 index_fields = format!("{index_fields},`{item}`");
537 index_name = format!("{index_name}_{item}");
538 }
539 }
540 if !Self::validate_index_fields_in_table(&index_fields, &fields_list) {
542 error!(
543 "{} 表 {} 索引字段验证失败: 字段 {} 不在字段列表中,跳过创建索引 {}",
544 thread_id, table_name, index_fields, index_name
545 );
546 continue; }
548 let full_index_name = format!("{}_{}", table_name, index_name);
550 index_names.push(full_index_name.clone()); index.push(format!(
552 "CREATE INDEX {} on {} ({});\r\n",
553 full_index_name, table_name, index_fields
554 ));
555 }
556
557 for (name, field) in options.table_fields.entries() {
558 let row = br_fields::field("sqlite", name, field.clone());
559 if row.is_empty() {
561 error!(
562 "{} 表 {} 字段 {} 的 SQL 生成结果为空",
563 thread_id, table_name, name
564 );
565 return JsonValue::from(false);
566 }
567 let escaped_row = if crate::types::DISABLE_FIELD.contains(&name) {
569 if let Some(space_pos) = row.find(' ') {
572 format!("`{}`{}", &row[..space_pos], &row[space_pos..])
573 } else {
574 format!("`{}`", row)
575 }
576 } else {
577 row
578 };
579 sql = format!("{sql} {escaped_row},\r\n");
580 }
581
582 sql = sql.trim_end_matches(",\r\n").to_string();
583
584 sql = if sql.trim_end().ends_with(",") {
587 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
588 } else {
589 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
590 };
591
592 let sql = format!(
593 "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
594 table_name, sql
595 );
596 if self.params.sql {
597 let mut list = vec![sql];
598 if !unique.is_empty() {
599 list.push(unique)
600 }
601 if !index.is_empty() {
602 list.extend(index)
603 }
604
605 return JsonValue::from(list.join(""));
606 }
607
608 let (state, error_info) = self.execute_ddl(sql.clone());
611 if state {
612 if !unique.is_empty() {
613 let (_, index_list) =
615 self.query(format!("PRAGMA index_list(`{}`);\r\n", table_name));
616 let mut existing_unique_indexes = HashSet::with_capacity(index_list.len());
617 for item in index_list.members() {
618 if item["unique"].as_usize().unwrap_or(0) == 1 {
619 if let Some(name) = item["name"].as_str() {
620 existing_unique_indexes.insert(name.to_string());
621 }
622 }
623 }
624
625 let index_name = if unique.contains("`") {
627 let start = unique.find("`").unwrap() + 1;
628 let end = unique[start..].find("`").unwrap() + start;
629 unique[start..end].to_string()
630 } else {
631 String::new()
632 };
633
634 if !index_name.is_empty() && !existing_unique_indexes.contains(index_name.as_str())
636 {
637 let old_indexes: Vec<String> =
639 existing_unique_indexes.iter().cloned().collect();
640 for old_index in old_indexes {
641 if old_index != index_name {
642 if Self::can_drop_index(&old_index) {
644 let drop_sql = format!("DROP INDEX IF EXISTS `{}`;", old_index);
645 let (state, result) = self.execute_ddl(drop_sql);
646 if !state {
647 error!("删除旧索引失败: {} 错误: {}", old_index, result);
648 }
649 } else {
650 }
652 }
653 }
654 let (state, result) = self.execute_ddl(unique.clone());
655 if !state {
656 let error_msg = result.to_string().to_lowercase();
658 if error_msg.contains("already exists") {
659 } else {
661 error!(
662 "{} 表 {} 唯一索引创建失败: {} SQL: {}",
663 thread_id, table_name, result, unique
664 );
665 }
666 }
667 } else if !index_name.is_empty() {
668 }
670 }
671 if !index.is_empty() {
672 let (_, index_list) =
674 self.query(format!("PRAGMA index_list(`{}`);\r\n", table_name));
675 let mut existing_indexes = HashSet::with_capacity(index_list.len());
676 for item in index_list.members() {
677 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
678 if !is_unique {
679 if let Some(name) = item["name"].as_str() {
680 existing_indexes.insert(name.to_string());
681 }
682 }
683 }
684
685 for (index_sql, index_name) in index.iter().zip(index_names.iter()) {
687 if !existing_indexes.contains(index_name.as_str()) {
689 let (state, result) = self.execute_ddl(index_sql.clone());
690 if state {
691 } else {
693 error!(
694 "{} {} 索引创建失败: {} 错误: {}",
695 thread_id, table_name, index_name, result
696 );
697 }
698 } else {
699 }
701 }
702 }
703 JsonValue::from(true)
704 } else {
705 error!(
706 "{} 表 {} 创建失败: SQL: {} 错误: {}",
707 thread_id, table_name, sql, error_info
708 );
709 JsonValue::from(false)
710 }
711 }
712 fn table_update(&mut self, options: TableOptions) -> JsonValue {
713 let thread_id = get_thread_id();
714
715 let estimated_fields = options.table_fields.len();
717 let mut sql = String::with_capacity(estimated_fields * 100);
718 let mut add = Vec::with_capacity(estimated_fields);
719 let mut del = Vec::with_capacity(estimated_fields);
720 let mut put = Vec::with_capacity(estimated_fields);
721
722 let table_name = match validate_table_name(&options.table_name) {
724 Ok(n) => n,
725 Err(e) => {
726 error!("表名验证失败: {}", e);
727 return JsonValue::from(0);
728 }
729 };
730 let (_, mut fields_list) = self.query(format!("PRAGMA table_info(`{}`)", table_name));
731 let mut field_old = object! {};
732 for item in fields_list.members_mut() {
733 item["dflt_value"] = item["dflt_value"]
734 .to_string()
735 .trim_start_matches("'")
736 .trim_end_matches("'")
737 .into();
738 let name = match item["name"].as_str() {
739 Some(n) => n,
740 None => {
741 error!("表 {} 字段信息中缺少 name 字段", options.table_name);
742 continue;
743 }
744 };
745 field_old[name] = item.clone();
746 if options.table_fields[name].is_empty() {
747 del.push(name);
748 }
749 }
750
751 let estimated_fields = options.table_fields.len();
752 let mut fields_list = Vec::with_capacity(estimated_fields);
753 let mut fields_list_new = Vec::with_capacity(estimated_fields);
754
755 for (name, field) in options.table_fields.entries() {
756 fields_list_new.push(name.to_string());
757 if field_old[name].is_empty() {
758 add.push(name);
759 } else {
760 fields_list.push(name);
761 let field_mode = field["mode"].as_str().unwrap_or("");
762 let old_value = match field_mode {
763 "select" => {
764 if field_old[name]["dflt_value"].clone().is_empty() {
765 "".to_string()
766 } else {
767 field_old[name]["dflt_value"].clone().to_string()
768 }
769 }
770 "switch" => match field_old[name]["dflt_value"].to_string().parse::<i32>() {
771 Ok(val) => (val == 1).to_string(),
772 Err(_) => {
773 error!(
774 "表 {} 字段 {} 的默认值解析失败,跳过更新",
775 options.table_name, name
776 );
777 continue;
778 }
779 },
780 _ => field_old[name]["dflt_value"].clone().to_string(),
781 };
782 let new_value = match field["mode"].as_str().unwrap() {
783 "select" => field["def"]
784 .members()
785 .map(|x| x.to_string())
786 .collect::<Vec<String>>()
787 .join(","),
788 _ => field["def"].clone().to_string(),
789 };
790 if old_value != new_value {
791 info!(
792 "{} 差异化当前: {} old_value: {} new_value: {} {}",
793 options.table_name,
794 name,
795 old_value,
796 new_value,
797 old_value != new_value
798 );
799 info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
800 put.push(name);
801 } else if field_old[name]["pk"].as_i64().unwrap() == 1 && name != options.table_key
802 {
803 info!("{} 主键替换: {}", options.table_name, name);
804 put.push(name);
805 }
806 }
807 }
808
809 if add.is_empty() && del.is_empty() && put.is_empty() {
811 let (_, index_list_result) =
813 self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
814
815 let mut existing_unique_indexes: Vec<String> = vec![];
816 let mut existing_indexes: Vec<String> = vec![];
817
818 for item in index_list_result.members() {
819 if let Some(index_name) = item["name"].as_str() {
820 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
821 if is_unique {
822 existing_unique_indexes.push(index_name.to_string());
823 } else {
824 existing_indexes.push(index_name.to_string());
825 }
826 }
827 }
828
829 let mut has_unique_change = false;
831 if !options.table_unique.is_empty() {
832 let mut unique_name = String::new();
833 for item in options.table_unique.iter() {
834 if unique_name.is_empty() {
835 unique_name = format!("{}_unique_{}", options.table_name, item);
836 } else {
837 unique_name = format!("{}_{}", unique_name, item);
838 }
839 }
840 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
841 let new_unique_name = format!("unique_{}", md5);
842 has_unique_change = !existing_unique_indexes.contains(&new_unique_name);
843 }
844
845 let mut has_index_change = false;
847 for row in options.table_index.iter() {
848 let mut index_name = String::new();
849 for item in row.iter() {
850 if index_name.is_empty() {
851 index_name = format!("index_{}", item);
852 } else {
853 index_name = format!("{}_{}", index_name, item);
854 }
855 }
856 let full_index_name = format!("{}_{}", options.table_name, index_name);
857 if !existing_indexes.contains(&full_index_name) {
858 has_index_change = true;
859 break;
860 }
861 }
862
863 if !has_unique_change && !has_index_change {
865 return JsonValue::from(-1);
866 }
867 }
868
869 let mut unique_fields = String::new();
870 let mut unique_name = String::new();
871 let mut unique = String::new();
872
873 for item in options.table_unique.iter() {
875 if unique_fields.is_empty() {
876 unique_fields = format!("`{item}`");
877 unique_name = format!("{}_unique_{}", options.table_name, item);
878 } else {
879 unique_fields = format!("{unique_fields},`{item}`");
880 unique_name = format!("{unique_name}_{item}");
881 }
882 }
883 if !unique_name.is_empty() {
885 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
886 let index_name = format!("unique_{}", md5);
887 unique = format!(
888 "CREATE UNIQUE INDEX `{}` on {} ({});\r\n",
889 index_name, options.table_name, unique_fields
890 );
891 }
892
893 let estimated_indexes = options.table_index.len();
895 let mut index = Vec::with_capacity(estimated_indexes);
896 for row in options.table_index.iter() {
897 let mut index_fields = String::new();
898 let mut index_name = String::new();
899 for item in row.iter() {
900 if index_fields.is_empty() {
901 index_fields = format!("`{item}`");
902 index_name = format!("index_{item}");
903 } else {
904 index_fields = format!("{index_fields},`{item}`");
905 index_name = format!("{index_name}_{item}");
906 }
907 }
908 if !Self::validate_index_fields_in_table(&index_fields, &fields_list_new) {
910 error!(
911 "{} 索引字段验证失败: 字段 {} 不在新表字段列表中,跳过创建索引 {}",
912 options.table_name, index_fields, index_name
913 );
914 } else {
915 index.push(format!(
916 "create index {}_{} on {} ({});\r\n",
917 options.table_name, index_name, options.table_name, index_fields
918 ));
919 }
920 }
921 for (name, field) in options.table_fields.entries() {
922 let row = br_fields::field("sqlite", name, field.clone());
923 let escaped_row = if crate::types::DISABLE_FIELD.contains(&name) {
925 if let Some(space_pos) = row.find(' ') {
928 format!("`{}`{}", &row[..space_pos], &row[space_pos..])
929 } else {
930 format!("`{}`", row)
931 }
932 } else {
933 row
934 };
935 sql = format!("{sql} {escaped_row},\r\n");
936 }
937
938 if !unique.is_empty() || !index.is_empty() {
939 let (_, index_list_result) =
940 self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
941 let mut existing_unique_indexes = HashSet::with_capacity(index_list_result.len());
943 let mut existing_indexes = HashSet::with_capacity(index_list_result.len());
944
945 for item in index_list_result.members() {
946 if let Some(index_name) = item["name"].as_str() {
947 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
948 if is_unique {
949 existing_unique_indexes.insert(index_name.to_string());
950 } else {
951 existing_indexes.insert(index_name.to_string());
952 }
953 }
954 }
955
956 if !unique_name.is_empty() {
958 if !Self::validate_index_fields_in_table(&unique_fields, &fields_list_new) {
960 error!(
961 "{} 唯一索引字段验证失败: 字段 {} 不在新表字段列表中,跳过创建唯一索引",
962 options.table_name, unique_fields
963 );
964 unique = String::new();
965 } else {
966 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
967 let new_unique_index_name = format!("unique_{}", md5);
968
969 if !existing_unique_indexes.contains(new_unique_index_name.as_str()) {
971 let old_indexes: Vec<String> =
973 existing_unique_indexes.iter().cloned().collect();
974 for old_index in old_indexes {
975 if old_index != new_unique_index_name {
976 if Self::can_drop_index(&old_index) {
978 let drop_sql = format!("DROP INDEX IF EXISTS `{}`;", old_index);
979 let (state, result) = self.execute(drop_sql);
980 if !state {
981 error!("删除旧索引失败: {} 错误: {}", old_index, result);
982 }
983 } else {
984 }
986 }
987 }
988 unique = format!(
990 "CREATE UNIQUE INDEX `{}` on {} ({});\r\n",
991 new_unique_index_name, options.table_name, unique_fields
992 );
993 } else {
994 unique = String::new();
996 }
997 }
998 }
999
1000 let mut index_old_set = HashSet::with_capacity(index_list_result.len());
1002 let mut index_new_set = HashSet::with_capacity(index.len());
1003
1004 for item in index_list_result.members() {
1006 let origin = item["origin"].as_str().unwrap_or("");
1007 let is_unique = item["unique"].as_usize().unwrap_or(0) == 1;
1008 let name = item["name"].as_str().unwrap_or("");
1009
1010 if origin == "c" && !is_unique {
1011 index_old_set.insert(name.to_string());
1012 }
1013 }
1014
1015 for index_sql in index.iter() {
1017 if let Some(start) = index_sql.find("CREATE INDEX") {
1019 if let Some(end) = index_sql[start..].find(" ON ") {
1020 let index_name = index_sql[start + 12..start + end].trim();
1021 index_new_set.insert(index_name.to_string());
1022 }
1023 } else if let Some(start) = index_sql.find("create index") {
1024 if let Some(end) = index_sql[start..].find(" on ") {
1025 let index_name = index_sql[start + 12..start + end].trim();
1026 index_new_set.insert(index_name.to_string());
1027 }
1028 }
1029 }
1030
1031 if unique.is_empty() && index_old_set.len() == index_new_set.len() {
1033 let all_exist = index_new_set
1035 .iter()
1036 .all(|name| index_old_set.contains(name));
1037 if all_exist {
1038 index = vec![];
1039 }
1040 }
1041 }
1042
1043 if add.is_empty()
1045 && del.is_empty()
1046 && put.is_empty()
1047 && unique.is_empty()
1048 && index.is_empty()
1049 {
1050 return JsonValue::from(-1);
1051 }
1052
1053 sql = sql.trim_end_matches(",\r\n").to_string();
1054 let mut create_sql = String::with_capacity(sql.len() + options.table_name.len() + 50);
1056 create_sql.push_str("CREATE TABLE ");
1057 create_sql.push_str(&options.table_name);
1058 create_sql.push_str("_tmp (\r\n");
1059 create_sql.push_str(&sql);
1060 create_sql.push_str("\r\n);\r\n");
1061 let sql = create_sql;
1062
1063 let fields_new_str = fields_list_new.join("`,`");
1065 let fields_old_str = fields_list.join("`,`");
1066 let mut sqls = String::with_capacity(
1067 options.table_name.len() * 2 + fields_new_str.len() + fields_old_str.len() + 100,
1068 );
1069 sqls.push_str("replace INTO ");
1070 sqls.push_str(&options.table_name);
1071 sqls.push_str("_tmp (`");
1072 sqls.push_str(&fields_new_str);
1073 sqls.push_str("`) select `");
1074 sqls.push_str(&fields_old_str);
1075 sqls.push_str("` from ");
1076 sqls.push_str(&options.table_name);
1077 sqls.push_str(";\r\n");
1078
1079 let mut drop_sql = String::with_capacity(options.table_name.len() + 20);
1080 drop_sql.push_str("drop table ");
1081 drop_sql.push_str(&options.table_name);
1082 drop_sql.push_str(";\r\n");
1083
1084 let mut alter_sql = String::with_capacity(options.table_name.len() * 2 + 30);
1085 alter_sql.push_str("alter table ");
1086 alter_sql.push_str(&options.table_name);
1087 alter_sql.push_str("_tmp rename to ");
1088 alter_sql.push_str(&options.table_name);
1089 alter_sql.push_str(";\r\n");
1090
1091 let mut drop_sql_temp = String::with_capacity(options.table_name.len() + 25);
1092 drop_sql_temp.push_str("drop table ");
1093 drop_sql_temp.push_str(&options.table_name);
1094 drop_sql_temp.push_str("_tmp;\r\n");
1095
1096 if self.params.sql {
1097 let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
1098 if !unique.is_empty() {
1099 list.push(unique)
1100 }
1101 if !index.is_empty() {
1102 list.extend(index)
1103 }
1104 return JsonValue::from(list.join(""));
1105 }
1106
1107 if add.is_empty()
1108 && del.is_empty()
1109 && unique.is_empty()
1110 && index.is_empty()
1111 && put.is_empty()
1112 {
1113 return JsonValue::from(-1);
1114 }
1115
1116 let (state, _) = self.execute(sql.clone());
1117 let data = match state {
1118 true => {
1119 let (state, _) = self.execute(sqls.clone());
1120 match state {
1121 true => {
1122 let (state, _) = self.execute(drop_sql);
1123 match state {
1124 true => {
1125 let (state, _) = self.execute(alter_sql);
1126 match state {
1127 true => {
1128 let (_, new_table_fields_result) = self.query(format!(
1131 "PRAGMA table_info(`{}`)",
1132 options.table_name
1133 ));
1134 let mut new_table_fields = vec![];
1135 for item in new_table_fields_result.members() {
1136 if let Some(field_name) = item["name"].as_str() {
1137 new_table_fields.push(field_name.to_string());
1138 }
1139 }
1140
1141 if !unique.is_empty() {
1143 let (_, existing_unique_index_list) =
1145 self.query(format!(
1146 "PRAGMA index_list({});\r\n",
1147 options.table_name
1148 ));
1149 let mut existing_unique_index_names =
1150 HashSet::with_capacity(
1151 existing_unique_index_list.len(),
1152 );
1153 for item in existing_unique_index_list.members() {
1154 let is_unique =
1155 item["unique"].as_usize().unwrap_or(0) == 1;
1156 if is_unique {
1157 if let Some(name) = item["name"].as_str() {
1158 existing_unique_index_names
1159 .insert(name.to_string());
1160 }
1161 }
1162 }
1163
1164 let unique_index_name =
1166 if unique.contains("CREATE UNIQUE INDEX") {
1167 let start =
1168 unique.find("CREATE UNIQUE INDEX").unwrap()
1169 + 19;
1170 let end = unique[start..]
1171 .find(" on ")
1172 .unwrap_or(unique.len() - start);
1173 unique[start..start + end]
1174 .trim()
1175 .trim_matches('`')
1176 .to_string()
1177 } else if unique.contains("create unique index") {
1178 let start =
1179 unique.find("create unique index").unwrap()
1180 + 19;
1181 let end = unique[start..]
1182 .find(" on ")
1183 .unwrap_or(unique.len() - start);
1184 unique[start..start + end]
1185 .trim()
1186 .trim_matches('`')
1187 .to_string()
1188 } else {
1189 String::new()
1190 };
1191
1192 let unique_fields_str =
1194 if unique.contains("(") && unique.contains(")") {
1195 let start = unique.find("(").unwrap() + 1;
1196 let end = unique.find(")").unwrap();
1197 unique[start..end].to_string()
1198 } else {
1199 unique_fields.clone()
1200 };
1201
1202 if !unique_index_name.is_empty()
1204 && existing_unique_index_names
1205 .contains(unique_index_name.as_str())
1206 {
1207 } else if Self::validate_index_fields_in_table(
1209 &unique_fields_str,
1210 &new_table_fields,
1211 ) {
1212 let (state, result) = self.execute(unique.clone());
1213 if state {
1214 } else {
1216 let error_msg =
1218 result.to_string().to_lowercase();
1219 if error_msg.contains("already exists") {
1220 } else {
1222 error!(
1223 "{} {} 唯一索引创建失败: {} SQL: {}",
1224 thread_id,
1225 options.table_name,
1226 result,
1227 unique
1228 );
1229 }
1230 }
1231 } else {
1232 }
1235 }
1236
1237 let (_, existing_index_list) = self.query(format!(
1240 "PRAGMA index_list({});\r\n",
1241 options.table_name
1242 ));
1243 let mut existing_index_names =
1244 HashSet::with_capacity(existing_index_list.len());
1245 for item in existing_index_list.members() {
1246 let is_unique =
1247 item["unique"].as_usize().unwrap_or(0) == 1;
1248 if !is_unique {
1249 if let Some(name) = item["name"].as_str() {
1250 existing_index_names.insert(name.to_string());
1251 }
1252 }
1253 }
1254
1255 let estimated_indexes = options.table_index.len();
1257 let mut index_names = Vec::with_capacity(estimated_indexes);
1258 for row in options.table_index.iter() {
1259 let mut index_name = String::new();
1260 for item in row.iter() {
1261 if index_name.is_empty() {
1262 index_name = format!("index_{}", item);
1263 } else {
1264 index_name = format!("{}_{}", index_name, item);
1265 }
1266 }
1267 let full_index_name =
1269 format!("{}_{}", options.table_name, index_name);
1270 index_names.push(full_index_name);
1271 }
1272
1273 let mut index_failed_count = 0;
1274 for (index_sql, index_name) in
1275 index.iter().zip(index_names.iter())
1276 {
1277 let index_fields_str = if index_sql.contains("(")
1279 && index_sql.contains(")")
1280 {
1281 let start = index_sql.find("(").unwrap() + 1;
1282 let end = index_sql.find(")").unwrap();
1283 index_sql[start..end].to_string()
1284 } else {
1285 String::new()
1286 };
1287
1288 if !index_name.is_empty()
1290 && existing_index_names
1291 .contains(index_name.as_str())
1292 {
1293 continue;
1295 }
1296
1297 if !index_fields_str.is_empty()
1298 && Self::validate_index_fields_in_table(
1299 &index_fields_str,
1300 &new_table_fields,
1301 )
1302 {
1303 let (state, result) =
1304 self.execute(index_sql.clone());
1305 match state {
1306 true => {
1307 }
1309 false => {
1310 let error_msg =
1312 result.to_string().to_lowercase();
1313 if error_msg.contains("already exists") {
1314 continue;
1316 }
1317 index_failed_count += 1;
1318 error!(
1319 "{} {} 索引创建失败: {} SQL: {}",
1320 thread_id,
1321 options.table_name,
1322 result,
1323 index_sql
1324 );
1325 }
1327 }
1328 } else {
1329 }
1332 }
1333
1334 if index_failed_count > 0 {
1335 }
1337
1338 return JsonValue::from(1);
1339 }
1340 false => {
1341 error!("{} 修改表名失败", options.table_name);
1342 return JsonValue::from(0);
1343 }
1344 }
1345 }
1346 false => {
1347 error!("{} 删除本表失败", options.table_name);
1348 return JsonValue::from(0);
1349 }
1350 }
1351 }
1352 false => {
1353 error!(
1354 "{} 添加tmp表记录失败 {:#} {:#}",
1355 options.table_name, sql, sqls
1356 );
1357 let sql = format!("drop table {}_tmp", options.table_name);
1358 let (_, _) = self.execute(sql);
1359 0
1360 }
1361 }
1362 }
1363 false => {
1364 error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
1365 let (_, _) = self.execute(drop_sql_temp);
1366 0
1367 }
1368 };
1369 JsonValue::from(data)
1370 }
1371
1372 fn table_info(&mut self, table: &str) -> JsonValue {
1373 {
1375 let fields = FIELDS.lock().unwrap();
1376 if let Some(cached) = fields.get(table) {
1377 return cached.clone();
1378 }
1379 }
1380 let sql = format!("PRAGMA table_info(`{table}`)");
1381 let (state, data) = self.query(sql);
1382
1383 match state {
1384 true => {
1385 let mut fields = object! {};
1386 for item in data.members() {
1387 if let Some(name) = item["name"].as_str() {
1388 fields[name] = item.clone();
1389 }
1390 }
1391 let fields_clone = fields.clone();
1393 FIELDS
1394 .lock()
1395 .unwrap()
1396 .insert(table.to_string(), fields_clone);
1397 data
1398 }
1399 false => object! {},
1400 }
1401 }
1402
1403 fn table_is_exist(&mut self, name: &str) -> bool {
1404 let name = match validate_table_name(name) {
1406 Ok(n) => n,
1407 Err(e) => {
1408 error!("表名验证失败: {}", e);
1409 return false;
1410 }
1411 };
1412 let table_name = add_table_prefix(&self.connection.prefix, name);
1414 let sql = format!(
1415 "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{table_name}'"
1416 );
1417 let (state, data) = self.query(sql);
1418 if state && !data.is_empty() {
1419 if let Some(count) = data[0]["count"].as_i64() {
1420 if count > 0 {
1421 return true;
1422 }
1423 }
1424 false
1425 } else {
1426 false
1427 }
1428 }
1429
1430 fn table(&mut self, name: &str) -> &mut Sqlite {
1431 self.params = Params::default(self.connection.mode.str().as_str());
1432 self.params.table = add_table_prefix(&self.connection.prefix, name);
1434 self.params.join_table = self.params.table.clone();
1435 self
1436 }
1437 fn change_table(&mut self, name: &str) -> &mut Self {
1438 self.params.join_table = name.to_string();
1439 self
1440 }
1441 fn autoinc(&mut self) -> &mut Self {
1442 self.params.autoinc = true;
1443 self
1444 }
1445
1446 fn fetch_sql(&mut self) -> &mut Self {
1447 self.params.sql = true;
1448 self
1449 }
1450
1451 fn order(&mut self, field: &str, by: bool) -> &mut Self {
1452 self.params.order[field] = {
1453 if by {
1454 "DESC"
1455 } else {
1456 "ASC"
1457 }
1458 }
1459 .into();
1460 self
1461 }
1462
1463 fn group(&mut self, field: &str) -> &mut Self {
1464 let fields: Vec<&str> = field.split(",").collect();
1465 for field in fields.iter() {
1466 let fields = field.to_string();
1467 self.params.group[fields.as_str()] = fields.clone().into();
1468 self.params.fields[fields.as_str()] = fields.clone().into();
1469 }
1470 self
1471 }
1472
1473 fn distinct(&mut self) -> &mut Self {
1474 self.params.distinct = true;
1475 self
1476 }
1477 fn json(&mut self, field: &str) -> &mut Self {
1478 let list: Vec<&str> = field.split(",").collect();
1479 for item in list.iter() {
1480 self.params.json[item.to_string().as_str()] = item.to_string().into();
1481 }
1482 self
1483 }
1484
1485 fn location(&mut self, field: &str) -> &mut Self {
1486 let list: Vec<&str> = field.split(",").collect();
1487 for item in list.iter() {
1488 self.params.location[item.to_string().as_str()] = item.to_string().into();
1489 }
1490 self
1491 }
1492
1493 fn field(&mut self, field: &str) -> &mut Self {
1494 let list: Vec<&str> = field.split(",").collect();
1495 let join_table = if self.params.join_table.is_empty() {
1496 self.params.table.clone()
1497 } else {
1498 self.params.join_table.clone()
1499 };
1500 for item in list.iter() {
1501 let item = item.to_string();
1502 if item.contains(" as ") {
1503 let text = item.split(" as ").collect::<Vec<&str>>().clone();
1504 self.params.fields[item] =
1505 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
1506 } else {
1507 self.params.fields[item] = format!("{join_table}.`{item}`").into();
1508 }
1509 }
1510 self
1511 }
1512 fn hidden(&mut self, name: &str) -> &mut Self {
1513 let hidden: Vec<&str> = name.split(",").collect();
1514 let sql = format!("PRAGMA table_info(`{}`)", self.params.table);
1515 let (_, data) = self.query(sql);
1516 for item in data.members() {
1517 let name = item["name"].as_str().unwrap();
1518 if !hidden.contains(&name) {
1519 self.params.fields[name] = name.into();
1520 }
1521 }
1522 self
1523 }
1524
1525 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1526 let join_table = if self.params.join_table.is_empty() {
1527 self.params.table.clone()
1528 } else {
1529 self.params.join_table.clone()
1530 };
1531
1532 if value.is_boolean() {
1533 if value.as_bool().unwrap() {
1534 value = 1.into();
1535 } else {
1536 value = 0.into();
1537 }
1538 }
1539
1540 match compare {
1541 "between" => {
1542 self.params.where_and.push(format!(
1543 "{}.`{}` between '{}' AND '{}'",
1544 join_table, field, value[0], value[1]
1545 ));
1546 }
1547 "set" => {
1548 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
1549 let mut wheredata = vec![];
1550 for item in list.iter() {
1551 wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
1552 }
1553 self.params
1554 .where_and
1555 .push(format!("({})", wheredata.join(" or ")));
1556 }
1557 "notin" => {
1558 let mut text = String::new();
1559 for item in value.members() {
1560 text = format!("{text},'{item}'");
1561 }
1562 text = text.trim_start_matches(",").into();
1563 self.params
1564 .where_and
1565 .push(format!("{join_table}.`{field}` not in ({text})"));
1566 }
1567 "in" => {
1568 let mut text = String::new();
1569 if value.is_array() {
1570 for item in value.members() {
1571 text = format!("{text},'{item}'");
1572 }
1573 } else {
1574 let value = value.to_string();
1575 let value: Vec<&str> = value.split(",").collect();
1576 for item in value.iter() {
1577 text = format!("{text},'{item}'");
1578 }
1579 }
1580 text = text.trim_start_matches(",").into();
1581
1582 self.params
1583 .where_and
1584 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1585 }
1586 "=" => {
1587 if value.is_null() {
1588 self.params
1589 .where_and
1590 .push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
1591 } else {
1592 self.params
1593 .where_and
1594 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1595 }
1596 }
1597 _ => {
1598 if value.is_null() {
1599 self.params
1600 .where_and
1601 .push(format!("{join_table}.`{field}` {compare} {value}"));
1602 } else {
1603 self.params
1604 .where_and
1605 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1606 }
1607 }
1608 }
1609 self
1610 }
1611 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1612 let join_table = if self.params.join_table.is_empty() {
1613 self.params.table.clone()
1614 } else {
1615 self.params.join_table.clone()
1616 };
1617
1618 if value.is_boolean() {
1619 if value.as_bool().unwrap() {
1620 value = 1.into();
1621 } else {
1622 value = 0.into();
1623 }
1624 }
1625 match compare {
1626 "between" => {
1627 self.params.where_or.push(format!(
1628 "{}.`{}` between '{}' AND '{}'",
1629 join_table, field, value[0], value[1]
1630 ));
1631 }
1632 "set" => {
1633 let tt = value.to_string().replace(",", "%");
1634 self.params
1635 .where_or
1636 .push(format!("{join_table}.`{field}` like '%{tt}%'"));
1637 }
1638 "notin" => {
1639 let mut text = String::new();
1640 for item in value.members() {
1641 text = format!("{text},'{item}'");
1642 }
1643 text = text.trim_start_matches(",").into();
1644 self.params
1645 .where_or
1646 .push(format!("{join_table}.`{field}` not in ({text})"));
1647 }
1648 "in" => {
1649 let mut text = String::new();
1650 if value.is_array() {
1651 for item in value.members() {
1652 text = format!("{text},'{item}'");
1653 }
1654 } else {
1655 let value = value.as_str().unwrap();
1656 let value: Vec<&str> = value.split(",").collect();
1657 for item in value.iter() {
1658 text = format!("{text},'{item}'");
1659 }
1660 }
1661 text = text.trim_start_matches(",").into();
1662 self.params
1663 .where_or
1664 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1665 }
1666 _ => {
1667 self.params
1668 .where_or
1669 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1670 }
1671 }
1672 self
1673 }
1674 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1675 self.params.where_column = format!(
1676 "{}.`{}` {} {}.`{}`",
1677 self.params.table, field_a, compare, self.params.table, field_b
1678 );
1679 self
1680 }
1681
1682 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1683 self.params
1684 .update_column
1685 .push(format!("{field_a} = {compare}"));
1686 self
1687 }
1688
1689 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1690 self.params.page = page;
1691 self.params.limit = limit;
1692 self
1693 }
1694
1695 fn column(&mut self, field: &str) -> JsonValue {
1696 self.field(field);
1697 self.group(field);
1698 let sql = self.params.select_sql();
1699 if self.params.sql {
1700 return JsonValue::from(sql.clone());
1701 }
1702 self.table_info(self.params.table.clone().as_str());
1703 let (state, data) = self.query(sql);
1704 if state {
1705 let mut list = array![];
1706 for item in data.members() {
1707 if self.params.json[field].is_empty() {
1708 list.push(item[field].clone()).unwrap();
1709 } else {
1710 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
1711 list.push(data).unwrap();
1712 }
1713 }
1714 list
1715 } else {
1716 array![]
1717 }
1718 }
1719
1720 fn count(&mut self) -> JsonValue {
1721 self.params.fields["count"] = "count(*) as count".to_string().into();
1722 let sql = self.params.select_sql();
1723 if self.params.sql {
1724 return JsonValue::from(sql.clone());
1725 }
1726 let (state, data) = self.query(sql);
1727 match state {
1728 true => data[0]["count"].clone(),
1729 false => JsonValue::from(0),
1730 }
1731 }
1732
1733 fn max(&mut self, field: &str) -> JsonValue {
1734 self.params.fields[field] = format!("max({field}) as {field}").into();
1735 let sql = self.params.select_sql();
1736 if self.params.sql {
1737 return JsonValue::from(sql.clone());
1738 }
1739 let (state, data) = self.query(sql);
1740 if state {
1741 if data.len() > 1 {
1742 return data;
1743 }
1744 data[0][field].clone()
1745 } else {
1746 JsonValue::from(0.0)
1747 }
1748 }
1749
1750 fn min(&mut self, field: &str) -> JsonValue {
1751 self.params.fields[field] = format!("min({field}) as {field}").into();
1752 let sql = self.params.select_sql();
1753 let (state, data) = self.query(sql);
1754 if state {
1755 if data.len() > 1 {
1756 return data;
1757 }
1758 data[0][field].clone()
1759 } else {
1760 JsonValue::from(0.0)
1761 }
1762 }
1763
1764 fn sum(&mut self, field: &str) -> JsonValue {
1765 self.params.fields[field] = format!("sum({field}) as {field}").into();
1766 let sql = self.params.select_sql();
1767 if self.params.sql {
1768 return JsonValue::from(sql.clone());
1769 }
1770 let (state, data) = self.query(sql);
1771 match state {
1772 true => {
1773 if data.len() > 1 {
1774 return data;
1775 }
1776 if self.params.fields.len() > 1 {
1777 return data[0].clone();
1778 }
1779 data[0][field].clone()
1780 }
1781 false => JsonValue::from(0),
1782 }
1783 }
1784 fn avg(&mut self, field: &str) -> JsonValue {
1785 self.params.fields[field] = format!("avg({field}) as {field}").into();
1786 let sql = self.params.select_sql();
1787 if self.params.sql {
1788 return JsonValue::from(sql.clone());
1789 }
1790 let (state, data) = self.query(sql);
1791 if state {
1792 if data.len() > 1 {
1793 return data;
1794 }
1795 data[0][field].clone()
1796 } else {
1797 JsonValue::from(0)
1798 }
1799 }
1800 fn select(&mut self) -> JsonValue {
1801 let sql = self.params.select_sql();
1802 if self.params.sql {
1803 return JsonValue::from(sql.clone());
1804 }
1805 self.table_info(self.params.table.clone().as_str());
1806 let (state, mut data) = self.query(sql.clone());
1807 match state {
1808 true => {
1809 for (field, _) in self.params.json.entries() {
1810 for item in data.members_mut() {
1811 if !item[field].is_empty() {
1812 let json = item[field].to_string();
1813 item[field] = match json::parse(&json) {
1814 Ok(e) => e,
1815 Err(_) => JsonValue::from(json),
1816 };
1817 }
1818 }
1819 }
1820 data.clone()
1821 }
1822 false => {
1823 error!("{data:?}");
1824 array![]
1825 }
1826 }
1827 }
1828 fn find(&mut self) -> JsonValue {
1829 self.params.page = 1;
1830 self.params.limit = 1;
1831 let sql = self.params.select_sql();
1832 if self.params.sql {
1833 return JsonValue::from(sql.clone());
1834 }
1835
1836 self.table_info(self.params.table.clone().as_str());
1837 let (state, mut data) = self.query(sql.clone());
1838 match state {
1839 true => {
1840 if data.is_empty() {
1841 return object! {};
1842 }
1843 for (field, _) in self.params.json.entries() {
1844 if !data[0][field].is_empty() {
1845 let json = data[0][field].to_string();
1846 let json = json::parse(&json).unwrap_or(array![]);
1847 data[0][field] = json;
1848 } else {
1849 data[0][field] = array![];
1850 }
1851 }
1852 data[0].clone()
1853 }
1854 false => {
1855 error!("{data:?}");
1856 object! {}
1857 }
1858 }
1859 }
1860
1861 fn value(&mut self, field: &str) -> JsonValue {
1862 self.params.fields = object! {};
1863 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1864 self.params.page = 1;
1865 self.params.limit = 1;
1866 let sql = self.params.select_sql();
1867 if self.params.sql {
1868 return JsonValue::from(sql.clone());
1869 }
1870 self.table_info(self.params.table.clone().as_str());
1871 let (state, mut data) = self.query(sql.clone());
1872 match state {
1873 true => {
1874 for (field, _) in self.params.json.entries() {
1875 if !data[0][field].is_empty() {
1876 let json = data[0][field].to_string();
1877 let json = json::parse(&json).unwrap_or(array![]);
1878 data[0][field] = json;
1879 } else {
1880 data[0][field] = array![];
1881 }
1882 }
1883 data[0][field].clone()
1884 }
1885 false => {
1886 if self.connection.debug {
1887 info!("{data:?}");
1888 }
1889 JsonValue::Null
1890 }
1891 }
1892 }
1893
1894 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1895 let mut fields = vec![];
1896 let mut values = vec![];
1897
1898 if !self.params.autoinc && data["id"].is_empty() {
1899 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1900 }
1901 for (field, value) in data.entries() {
1902 fields.push(format!("`{field}`"));
1903
1904 if value.is_string() {
1905 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1906 continue;
1907 } else if value.is_array() || value.is_object() {
1908 if self.params.json[field].is_empty() {
1909 let json_str = value.to_string();
1910 values.push(format!("'{}'", json_str.replace("'", "''")));
1911 } else {
1912 let json = value.to_string();
1913 let json = json.replace("'", "''");
1914 values.push(format!("'{json}'"));
1915 }
1916 continue;
1917 } else if value.is_number() || value.is_boolean() || value.is_null() {
1918 values.push(format!("{value}"));
1919 continue;
1920 } else {
1921 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1922 continue;
1923 }
1924 }
1925 let fields = fields.join(",");
1926 let values = values.join(",");
1927
1928 let sql = format!(
1929 "INSERT INTO `{}` ({}) VALUES ({});",
1930 self.params.table, fields, values
1931 );
1932 if self.params.sql {
1933 return JsonValue::from(sql.clone());
1934 }
1935 let (state, ids) = self.execute(sql);
1936 match state {
1937 true => {
1938 if self.params.autoinc {
1939 let (state, ids) =
1940 self.query(format!("select max(id) as id from {}", self.params.table));
1941 return match state {
1942 true => ids[0]["id"].clone(),
1943 false => {
1944 error!("{ids}");
1945 JsonValue::from("")
1946 }
1947 };
1948 }
1949 data["id"].clone()
1950 }
1951 false => {
1952 error!("{ids}");
1953 JsonValue::from("")
1954 }
1955 }
1956 }
1957 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1958 let mut fields = String::new();
1959
1960 if !self.params.autoinc && data[0]["id"].is_empty() {
1961 data[0]["id"] = "".into();
1962 }
1963 for (field, _) in data[0].entries() {
1964 fields = format!("{fields},`{field}`");
1965 }
1966 fields = fields.trim_start_matches(",").parse().unwrap();
1967
1968 let core_count = num_cpus::get();
1969 let mut p = pools::Pool::new(core_count * 4);
1970 let autoinc = self.params.autoinc;
1971 for list in data.members() {
1972 let mut item = list.clone();
1973 p.execute(move |pcindex| {
1974 if !autoinc && item["id"].is_empty() {
1975 let id = format!(
1976 "{:X}{:X}",
1977 Local::now().timestamp_nanos_opt().unwrap(),
1978 pcindex
1979 );
1980 item["id"] = id.into();
1981 }
1982 let mut values = "".to_string();
1983 for (_, value) in item.entries() {
1984 if value.is_string() {
1985 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1986 } else if value.is_number() || value.is_boolean() {
1987 values = format!("{values},{value}");
1988 } else {
1989 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1990 }
1991 }
1992 values = format!("({})", values.trim_start_matches(","));
1993 array![item["id"].clone(), values]
1994 });
1995 }
1996 let (ids_list, mut values) = p.insert_all();
1997
1998 values = values.trim_start_matches(",").parse().unwrap();
1999
2000 let sql = format!(
2001 "INSERT INTO {} ({}) VALUES {};",
2002 self.params.table, fields, values
2003 );
2004 if self.params.sql {
2005 return JsonValue::from(sql.clone());
2006 }
2007 let (state, data) = self.execute(sql.clone());
2008 match state {
2009 true => {
2010 if self.params.autoinc {
2011 let (state, ids) = self.query(format!(
2012 "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
2013 self.params.table,
2014 ids_list.len()
2015 ));
2016 return match state {
2017 true => {
2018 let mut idlist = array![];
2019 for item in ids.members() {
2020 idlist.push(item["id"].clone()).unwrap();
2021 }
2022 idlist
2023 }
2024 false => {
2025 error!("批量添加失败: {ids:?} {sql}");
2026 array![]
2027 }
2028 };
2029 }
2030 JsonValue::from(ids_list)
2031 }
2032 false => {
2033 error!("批量添加失败: {data:?} {sql}");
2034 array![]
2035 }
2036 }
2037 }
2038
2039 fn update(&mut self, data: JsonValue) -> JsonValue {
2040 let mut values = vec![];
2041
2042 for (field, value) in data.entries() {
2043 if value.is_string() {
2044 values.push(format!(
2045 "`{}` = '{}'",
2046 field,
2047 value.to_string().replace("'", "''")
2048 ));
2049 } else if value.is_array() || value.is_object() {
2050 if self.params.json[field].is_empty() {
2051 values.push(format!("`{field}` = '{value}'"));
2052 } else {
2053 let json = value.to_string();
2054 let json = json.replace("'", "''");
2055 values.push(format!("`{field}` = '{json}'"));
2056 }
2057 continue;
2058 } else if value.is_number() || value.is_boolean() || value.is_null() {
2059 values.push(format!("`{field}` = {value} "));
2060 } else {
2061 values.push(format!("`{field}` = '{value}' "));
2062 }
2063 }
2064
2065 for (field, value) in self.params.inc_dec.entries() {
2066 values.push(format!("{} = {}", field, value.to_string().clone()));
2067 }
2068
2069 let values = values.join(",");
2070 let sql = format!(
2071 "UPDATE `{}` SET {} {} {};",
2072 self.params.table.clone(),
2073 values,
2074 self.params.where_sql(),
2075 self.params.page_limit_sql()
2076 );
2077 if self.params.sql {
2078 return JsonValue::from(sql.clone());
2079 }
2080 let (state, data) = self.execute(sql);
2081 if state {
2082 data
2083 } else {
2084 error!("{data}");
2085 JsonValue::from(0)
2086 }
2087 }
2088
2089 fn update_all(&mut self, data: JsonValue) -> JsonValue {
2090 let mut values = vec![];
2091 let mut ids = vec![];
2092 for (field, _) in data[0].entries() {
2093 if field == "id" {
2094 continue;
2095 }
2096 let mut fields = vec![];
2097 for row in data.members() {
2098 let value = row[field].clone();
2099 let id = row["id"].clone();
2100 ids.push(id.clone());
2101 if value.is_string() {
2102 fields.push(format!(
2103 "WHEN '{}' THEN '{}'",
2104 id,
2105 value.to_string().replace("'", "''")
2106 ));
2107 } else if value.is_array() || value.is_object() {
2108 if self.params.json[field].is_empty() {
2109 fields.push(format!("WHEN '{id}' THEN '{value}'"));
2110 } else {
2111 let json = value.to_string();
2112 let json = json.replace("'", "''");
2113 fields.push(format!("WHEN '{id}' THEN '{json}'"));
2114 }
2115 continue;
2116 } else if value.is_number() || value.is_boolean() || value.is_null() {
2117 fields.push(format!("WHEN '{id}' THEN {value}"));
2118 } else {
2119 fields.push(format!("WHEN '{id}' THEN '{value}'"));
2120 }
2121 }
2122 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
2123 }
2124
2125 self.where_and("id", "in", ids.into());
2126 for (field, value) in self.params.inc_dec.entries() {
2127 values.push(format!("{} = {}", field, value.to_string().clone()));
2128 }
2129
2130 let values = values.join(",");
2131 let sql = format!(
2132 "UPDATE {} SET {} {} {};",
2133 self.params.table.clone(),
2134 values,
2135 self.params.where_sql(),
2136 self.params.page_limit_sql()
2137 );
2138 if self.params.sql {
2139 return JsonValue::from(sql.clone());
2140 }
2141 let (state, data) = self.execute(sql);
2142 if state {
2143 data
2144 } else {
2145 error!("{data:?}");
2146 JsonValue::from(0)
2147 }
2148 }
2149 fn delete(&mut self) -> JsonValue {
2150 let sql = format!(
2151 "delete FROM `{}` {};",
2152 self.params.table.clone(),
2153 self.params.where_sql()
2154 );
2155 if self.params.sql {
2156 return JsonValue::from(sql.clone());
2157 }
2158 let (state, data) = self.execute(sql);
2159 match state {
2160 true => data,
2161 false => {
2162 error!("delete 失败>>>{data:?}");
2163 JsonValue::from(0)
2164 }
2165 }
2166 }
2167
2168 fn transaction(&mut self) -> bool {
2169 let thread_id = get_thread_id();
2170 let transaction = {
2172 let tr_count = TR_COUNT.lock().unwrap();
2173 *tr_count.get(&*thread_id.clone()).unwrap_or(&0)
2174 };
2175 if transaction > 0 {
2176 let new_count = transaction + 1;
2177 TR_COUNT
2178 .lock()
2179 .unwrap()
2180 .insert(thread_id.clone(), new_count);
2181 return true;
2182 }
2183
2184 loop {
2185 let mut t = TR_COUNT.lock().unwrap();
2186 if t.is_empty() {
2187 t.insert(thread_id.clone(), 1);
2188 break;
2189 }
2190 drop(t);
2191 thread::yield_now();
2192 }
2193
2194 let flags = OpenFlags::new().with_read_write().with_no_mutex();
2195 let db = match Connect::open_thread_safe_with_flags(
2196 self.connection.clone().get_dsn().as_str(),
2197 flags,
2198 ) {
2199 Ok(e) => Arc::new(e),
2200 Err(_) => {
2201 return false;
2202 }
2203 };
2204 TR.lock().unwrap().insert(thread_id.clone(), db);
2205 let (state, data) = self.query("BEGIN".to_string());
2206 if state {
2207 true
2208 } else {
2209 error!("{thread_id} 启动事务失败: {data}");
2210 TR.lock().unwrap().remove(&thread_id.clone());
2211 TR_COUNT.lock().unwrap().remove(&thread_id.clone());
2212 false
2213 }
2214 }
2215
2216 fn commit(&mut self) -> bool {
2217 let thread_id = get_thread_id();
2218
2219 let mut transaction = *TR_COUNT
2220 .lock()
2221 .unwrap()
2222 .get(&*thread_id.clone())
2223 .unwrap_or(&0);
2224 if transaction > 1 {
2225 transaction -= 1;
2226 TR_COUNT
2227 .lock()
2228 .unwrap()
2229 .insert(thread_id.clone(), transaction);
2230 return true;
2231 }
2232 let sql = "COMMIT";
2233 let (state, data) = self.query(sql.to_string());
2234 TR_COUNT.lock().unwrap().remove(&*thread_id.clone());
2235 TR.lock().unwrap().remove(&*thread_id.clone());
2236
2237 if state {
2238 true
2239 } else {
2240 let error_msg = data
2241 .as_str()
2242 .map(|s| s.to_string())
2243 .unwrap_or_else(|| data.to_string());
2244 error!(
2245 "提交事务失败 - 线程ID: {} | 数据库配置: {} | 数据库路径: {} | SQL: {} | 错误详情: {}",
2246 thread_id,
2247 self.default,
2248 self.connection.database,
2249 sql,
2250 error_msg
2251 );
2252 false
2253 }
2254 }
2255
2256 fn rollback(&mut self) -> bool {
2257 let thread_id = get_thread_id();
2258 let sql = "ROLLBACK";
2259
2260 let mut t = *TR_COUNT.lock().unwrap().get(&thread_id).unwrap_or(&0);
2261 if t > 1 {
2262 t -= 1;
2263 TR_COUNT.lock().unwrap().insert(thread_id.clone(), t);
2264 return true;
2265 }
2266 let (state, data) = self.query(sql.to_string());
2267 TR_COUNT.lock().unwrap().remove(&*thread_id.clone());
2268 TR.lock().unwrap().remove(&*thread_id.clone());
2269
2270 if state {
2271 true
2272 } else {
2273 let error_msg = data
2274 .as_str()
2275 .map(|s| s.to_string())
2276 .unwrap_or_else(|| data.to_string());
2277 error!(
2278 "回滚事务失败 - 线程ID: {} | 数据库配置: {} | 数据库路径: {} | SQL: {} | 错误详情: {}",
2279 thread_id,
2280 self.default,
2281 self.connection.database,
2282 sql,
2283 error_msg
2284 );
2285 false
2286 }
2287 }
2288
2289 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2290 let (state, data) = self.query(sql.to_string());
2291 match state {
2292 true => Ok(data),
2293 false => Err(data.to_string()),
2294 }
2295 }
2296
2297 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2298 let (state, data) = self.execute(sql.to_string());
2299 match state {
2300 true => Ok(data),
2301 false => Err(data.to_string()),
2302 }
2303 }
2304
2305 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2306 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
2307 self
2308 }
2309
2310 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2311 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
2312 self
2313 }
2314
2315 fn buildsql(&mut self) -> String {
2316 self.fetch_sql();
2317 let sql = self.select().to_string();
2318 format!("( {} ) `{}`", sql, self.params.table)
2319 }
2320
2321 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2322 for field in fields {
2323 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2324 }
2325 self
2326 }
2327
2328 fn join(
2329 &mut self,
2330 main_table: &str,
2331 main_fields: &str,
2332 right_table: &str,
2333 right_fields: &str,
2334 ) -> &mut Self {
2335 let main_table = if main_table.is_empty() {
2336 self.params.table.clone()
2337 } else {
2338 main_table.to_string()
2339 };
2340 self.params.join_table = right_table.to_string();
2341 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2342 self
2343 }
2344
2345 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2346 let main_fields = if main_fields.is_empty() {
2347 "id"
2348 } else {
2349 main_fields
2350 };
2351 let second_fields = if second_fields.is_empty() {
2352 self.params.table.clone()
2353 } else {
2354 second_fields.to_string().clone()
2355 };
2356 let sec_table_name = format!("{}{}", table, "_2");
2357 let second_table = format!("{} {}", table, sec_table_name.clone());
2358 self.params.join_table = sec_table_name.clone();
2359 self.params.join.push(format!(
2360 " INNER JOIN {} ON {}.{} = {}.{}",
2361 second_table, self.params.table, main_fields, sec_table_name, second_fields
2362 ));
2363 self
2364 }
2365}