1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::Connection;
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info};
7use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10use std::thread;
11use chrono::Local;
12
13lazy_static! {
14 static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
15 static ref TR: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
16 static ref TR_COUNT: Mutex<HashMap<String, u32>> = Mutex::new(HashMap::new());
17 static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
18 static ref FIELDS: Mutex<HashMap<String,JsonValue>> = Mutex::new(HashMap::new());
19}
20
21#[derive(Clone, Debug)]
22pub struct Sqlite {
23 pub connection: Connection,
25 pub default: String,
27
28 pub params: Params,
29}
30
31impl Sqlite {
32 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
33 let flags = OpenFlags::new().with_create().with_read_write();
34 match Connect::open_thread_safe_with_flags(connection.clone().get_dsn().as_str(), flags) {
35 Ok(e) => {
36 DBS.lock().unwrap().insert(default.clone(), Arc::new(e));
37 Ok(Self {
38 connection: connection.clone(),
39 default: default.clone(),
40 params: Params::default("sqlite"),
41 })
42 }
43 Err(e) => {
44 error!(
45 "sqlite 启动失败: {} {}",
46 e,
47 connection.clone().get_dsn().as_str()
48 );
49 Err(e.to_string())
50 }
51 }
52 }
53 fn query_handle(&mut self, mut statement: Statement, sql: String) -> (bool, JsonValue) {
54 let thread_id = format!("{:?}", thread::current().id());
55
56 let mut data = array![];
57 while let State::Row = match statement.next() {
58 Ok(e) => e,
59 Err(e) => {
60 let transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
61 if transaction > 0 {
62 error!("{} 查询事务: {} {}", thread_id, e, sql.clone());
63 } else {
64 error!("{} 非事务查询: {} {}", thread_id, e, sql.clone());
65 }
66 return (false, data);
67 }
68 } {
69 let mut list = object! {};
70 let mut index = 0;
71 for field in statement.column_names().iter() {
72 if !list[field.as_str()].is_null() {
73 index += 1;
74 continue;
75 }
76 match statement.column_type(field.as_str()) {
77 Ok(types) => match types {
78 Type::String => {
79 let data = statement.read::<String, _>(index).unwrap();
80 match data.as_str() {
81 "false" => {
82 list[field.as_str()] = JsonValue::from(false);
83 }
84 "true" => {
85 list[field.as_str()] = JsonValue::from(true);
86 }
87 _ => {
88 list[field.as_str()] = JsonValue::from(data.clone());
89 }
90 }
91 }
92 Type::Integer => {
93 if FIELDS.lock().unwrap().get(&self.params.table).is_none() {
94 let data = statement.read::<i64, _>(index).unwrap();
95 list[field.as_str()] = JsonValue::from(data);
96 } else {
97 let fields = FIELDS.lock().unwrap().get(&self.params.table).unwrap().clone();
98 if fields[field.clone()].is_empty() {
99 let data = statement.read::<i64, _>(index).unwrap();
100 list[field.as_str()] = data.into();
101 continue;
102 }
103 match fields[field.clone()]["type"].as_str().unwrap() {
104 "INTEGER" => {
105 let data = statement.read::<i64, _>(index).unwrap();
106 list[field.as_str()] = JsonValue::from(data == 1);
107 }
108 x if x.contains("int(") => {
109 let data = statement.read::<i64, _>(index).unwrap();
110 list[field.as_str()] = data.into();
111 }
112 x if x.contains("decimal(") && x.ends_with(",0)") => {
113 let data = statement.read::<f64, _>(index).unwrap();
114 list[field.as_str()] = data.into();
115 }
116 _ => {
117 let data = statement.read::<i64, _>(index).unwrap();
118 list[field.as_str()] = data.into();
119 }
120 }
121 }
122 }
123 Type::Float => {
124 let data = statement.read::<f64, _>(index).unwrap();
125 list[field.as_str()] = JsonValue::from(data);
126 }
127 Type::Binary => {
128 let data = statement.read::<String, _>(index).unwrap();
129 list[field.as_str()] = JsonValue::from(data.clone());
130 }
131 Type::Null => match statement.read::<String, _>(index) {
132 Ok(data) => {
133 list[field.as_str()] = JsonValue::from(data.clone());
134 }
135 Err(_) => match statement.read::<f64, _>(index) {
136 Ok(data) => {
137 if data == 0.0 {
138 list[field.as_str()] = JsonValue::from("");
139 } else {
140 list[field.as_str()] = JsonValue::from(data);
141 }
142 }
143 Err(_) => match statement.read::<i64, _>(index) {
144 Ok(data) => {
145 if data == 0 {
146 list[field.as_str()] = JsonValue::from("");
147 } else {
148 list[field.as_str()] = JsonValue::from(data);
149 }
150 }
151 Err(e) => {
152 error!("Type:{} {:?}", field.as_str(), e);
153 }
154 },
155 },
156 },
157 },
158 Err(e) => {
159 error!("query Err: {:?}", e);
160 }
161 }
162 index += 1;
163 }
164 data.push(list).unwrap();
165 }
166 (true, data)
167 }
168 pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
169 let thread_id = format!("{:?}", thread::current().id());
170
171 let transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
172 if transaction > 0 {
173 if self.connection.debug {
174 info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
175 }
176
177 let dbs = match TR.lock() {
178 Ok(dbs) => dbs,
179 Err(e) => {
180 error!("{} 获取数据库锁失败: {}\r\nSQL: {}", thread_id, e, sql);
181 return (false, JsonValue::from("数据库锁定失败"));
182 }
183 };
184 let db = dbs.get(&*thread_id.clone()).unwrap().clone();
185 let x = match db.prepare(sql.clone()) {
186 Ok(statement) => self.query_handle(statement, sql.clone()),
187 Err(e) => {
188 error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
189 (false, e.to_string().into())
190 }
191 };
192 x
193 } else {
194 if self.connection.debug {
195 info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
196 }
197 let dbs = match DBS.lock() {
198 Ok(dbs) => dbs,
199 Err(e) => {
200 error!("{} 获取数据库锁失败: {}\r\nSQL: {}", thread_id, e, sql);
201 return (false, JsonValue::from("数据库锁定失败"));
202 }
203 };
204 let db = dbs.get(&*self.default).unwrap().clone();
205 let x = match db.prepare(sql.clone()) {
206 Ok(statement) => self.query_handle(statement, sql.clone()),
207 Err(e) => {
208 error!("{} 查询非事务: Err: {}", thread_id, e);
209 (false, e.to_string().into())
210 }
211 };
212 x
213 }
214 }
215
216 pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
217 let thread_id = format!("{:?}", thread::current().id());
218
219 let transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
220 if transaction > 0 {
221 if self.connection.debug {
222 info!("{} 执行事务: sql: {}", thread_id, sql.clone());
223 }
224
225 let dbs = match TR.lock() {
226 Ok(dbs) => dbs,
227 Err(e) => {
228 error!("{} 获取数据库锁失败: {}\r\nSQL: {}", thread_id, e, sql);
229 return (false, JsonValue::from("数据库锁定失败"));
230 }
231 };
232 let db = match dbs.get(&*thread_id) {
233 Some(db) => db.clone(),
234 None => {
235 error!(
236 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
237 thread_id, self.default, sql
238 );
239 return (false, JsonValue::from("未找到默认数据库配置"));
240 }
241 };
242 match db.execute(sql.clone()) {
243 Ok(_) => {
244 let count = db.change_count();
245 if self.connection.debug {
246 info!(
247 "{} count:{} total_count:{}",
248 thread_id,
249 count,
250 db.total_change_count()
251 );
252 }
253 (true, JsonValue::from(count))
254 }
255 Err(e) => {
256 error!("{} 执行事务: \r\nErr: {}\r\n{}", thread_id, e, sql.clone());
257 (false, JsonValue::from(e.to_string()))
258 }
259 }
260 } else {
261 if self.connection.debug {
262 info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
263 }
264 let dbs = match DBS.lock() {
265 Ok(dbs) => dbs,
266 Err(e) => {
267 error!("{} 获取数据库锁失败: {}\r\nSQL: {}", thread_id, e, sql);
268 return (false, JsonValue::from("数据库锁定失败"));
269 }
270 };
271
272 let db = match dbs.get(&*self.default) {
273 Some(db) => db.clone(),
274 None => {
275 error!(
276 "{} 未找到默认数据库配置: {}\r\nSQL: {}",
277 thread_id, self.default, sql
278 );
279 return (false, JsonValue::from("未找到默认数据库配置"));
280 }
281 };
282 match db.execute(sql.clone()) {
283 Ok(_) => {
284 let count = db.change_count();
285 if self.connection.debug {
286 info!(
287 "{} count: {} total_count: {}",
288 thread_id,
289 count,
290 db.total_change_count()
291 );
292 }
293 (true, JsonValue::from(count))
294 }
295 Err(e) => {
296 error!(
297 "{} 执行非事务: Err: {}\r\nSQL: {}",
298 thread_id,
299 e,
300 sql.clone()
301 );
302 (false, JsonValue::from(e.to_string()))
303 }
304 }
305 }
306 }
307}
308
309impl DbMode for Sqlite {
310 fn database_tables(&mut self) -> JsonValue {
311 let sql = "select name from sqlite_master where type='table' order by name;".to_string();
312 match self.sql(sql.as_str()) {
313 Ok(e) => {
314 let mut list = vec![];
315 for item in e.members() {
316 list.push(item["name"].clone());
317 }
318 list.into()
319 }
320 Err(_) => {
321 array![]
322 }
323 }
324 }
325 fn database_create(&mut self, _name: &str) -> bool {
326 todo!()
327 }
328}
329
330impl Mode for Sqlite {
331 fn table_create(&mut self, options: TableOptions) -> JsonValue {
332 let mut sql = String::new();
333 let mut unique_fields = String::new();
335 let mut unique_name = String::new();
336 let mut unique = String::new();
337 for item in options.table_unique.iter() {
338 if unique_fields.is_empty() {
339 unique_fields = format!("`{}`", item);
340 unique_name = format!("unique_{}", item);
341 } else {
342 unique_fields = format!("{},`{}`", unique_fields, item);
343 unique_name = format!("{}_{}", unique_name, item);
344 }
345 unique = format!(
346 "CREATE UNIQUE INDEX {} on {} ({});\r\n",
347 unique_name, options.table_name, unique_fields
348 );
349 }
350
351 let mut index = vec![];
353 for row in options.table_index.iter() {
354 let mut index_fields = String::new();
355 let mut index_name = String::new();
356 for item in row.iter() {
357 if index_fields.is_empty() {
358 index_fields = format!("`{}`", item);
359 index_name = format!("index_{}", item);
360 } else {
361 index_fields = format!("{},`{}`", index_fields, item);
362 index_name = format!("{}_{}", index_name, item);
363 }
364 }
365 index.push(format!(
366 "CREATE INDEX {} on {} ({});\r\n",
367 index_name, options.table_name, index_fields
368 ));
369 }
370
371 for (name, field) in options.table_fields.entries() {
372 let row = br_fields::field("sqlite", name, field.clone());
373 sql = format!("{} {},\r\n", sql, row);
374 }
375
376 sql = sql.trim_end_matches(",\r\n").to_string();
377
378 let sql = format!(
379 "CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
380 options.table_name, sql
381 );
382 if self.params.sql {
383 let mut list = vec![sql];
384 if !unique.is_empty() {
385 list.push(unique)
386 }
387 if !index.is_empty() {
388 list.extend(index)
389 }
390
391 return JsonValue::from(list.join(""));
392 }
393
394 let thread_id = format!("{:?}", thread::current().id());
395
396 let (state, _) = self.execute(sql.clone());
397 if state {
398 if !unique.is_empty() {
399 let (state, _) = self.execute(unique.clone());
400 info!(
401 "{} {} 唯一索引创建:{}",
402 thread_id, options.table_name, state
403 );
404 }
405 for sql in index.iter() {
406 let (state, _) = self.execute(sql.clone());
407 info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
408 }
409 JsonValue::from(true)
410 } else {
411 JsonValue::from(false)
412 }
413 }
414 fn table_update(&mut self, options: TableOptions) -> JsonValue {
415 let thread_id = format!("{:?}", thread::current().id());
416
417 let mut sql = String::new();
418 let mut add = vec![];
419 let mut del = vec![];
420 let mut put = vec![];
421
422 let (_, mut fields_list) = self.query(format!("pragma table_info ('{}')", options.table_name));
423 let mut field_old = object! {};
424 for item in fields_list.members_mut() {
425 item["dflt_value"] = item["dflt_value"].to_string().trim_start_matches("'").trim_end_matches("'").into();
426 let name = item["name"].as_str().unwrap();
427 field_old[name] = item.clone();
428 if options.table_fields[name].is_empty() {
429 del.push(name);
430 }
431 }
432
433 let mut fields_list = vec![];
434 let mut fields_list_new = vec![];
435
436 for (name, field) in options.table_fields.entries() {
437 if field_old[name].is_empty() {
438 add.push(name);
439 } else {
440 fields_list.push(name);
441 fields_list_new.push(name);
442 let old_value = match field["mode"].as_str().unwrap() {
443 "select" => {
444 if field_old[name]["dflt_value"].clone().is_empty() {
445 "[]".to_string()
446 } else {
447 field_old[name]["dflt_value"].clone().to_string()
448 }
449 }
450 "switch" => {
451 (field_old[name]["dflt_value"].to_string().parse::<i32>().unwrap() == 1).to_string()
452 }
453 _ => {
454 field_old[name]["dflt_value"].clone().to_string()
455 }
456 };
457 let new_value = field["def"].clone().to_string();
458 if old_value != new_value {
459 info!(
460 "{} 差异化当前: {} old_value: {} new_value: {} {}",
461 options.table_name,
462 name,
463 old_value,
464 new_value,
465 old_value != new_value
466 );
467 info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
468 put.push(name);
469 } else if field_old[name]["pk"].as_i64().unwrap() == 1 && name != options.table_key
470 {
471 info!("{} 主键替换: {}", options.table_name, name);
472 put.push(name);
473 }
474 }
475 }
476
477 let mut unique_fields = String::new();
478 let mut unique_name = String::new();
479 let mut unique = String::new();
480
481 for item in options.table_unique.iter() {
483 if unique_fields.is_empty() {
484 unique_fields = format!("`{}`", item);
485 unique_name = format!("unique_{}", item);
486 } else {
487 unique_fields = format!("{},`{}`", unique_fields, item);
488 unique_name = format!("{}_{}", unique_name, item);
489 }
490 unique = format!(
491 "CREATE UNIQUE INDEX {}_{} on {} ({});\r\n",
492 options.table_name, unique_name, options.table_name, unique_fields
493 )
494 }
495
496 let mut index = vec![];
498 for row in options.table_index.iter() {
499 let mut index_fields = String::new();
500 let mut index_name = String::new();
501 for item in row.iter() {
502 if index_fields.is_empty() {
503 index_fields = item.to_string();
504 index_name = format!("index_{}", item);
505 } else {
506 index_fields = format!("{},{}", index_fields, item);
507 index_name = format!("{}_{}", index_name, item);
508 }
509 }
510 index.push(format!(
511 "create index {}_{} on {} ({});\r\n",
512 options.table_name, index_name, options.table_name, index_fields
513 ));
514 }
515 for (name, field) in options.table_fields.entries() {
516 let row = br_fields::field("sqlite", name, field.clone());
517 sql = format!("{} {},\r\n", sql, row);
518 }
519
520 if !unique.is_empty() || !index.is_empty() {
521 let unique_text = unique.clone();
522 let (_, unique_old) = self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
523 let mut index_old_list = vec![];
524 let mut index_new_list = vec![];
525 for item in unique_old.members() {
526 let origin = item["origin"].as_str().unwrap();
527 let unique_1 = item["unique"].as_usize().unwrap();
528 let name = item["name"].as_str().unwrap();
529
530 if origin == "c" && unique_1 == 1 {
531 if unique.contains(format!(" {} ", name).as_str()) {
532 unique = "".to_string();
533 }
534 continue;
535 }
536 if origin == "c" && unique_1 == 0 {
537 index_old_list.push(item);
538 for item in index.iter() {
539 if item.contains(format!(" {} ", name).as_str()) {
540 index_new_list.push(item.clone());
541 }
542 }
543 continue;
544 }
545 }
546 if unique.is_empty() {
547 if index_old_list.len() == index.len() && index_old_list.len() == index_new_list.len()
548 {
549 index = vec![];
550 } else {
551 unique = unique_text;
552 }
553 }
554 }
555
556 sql = sql.trim_end_matches(",\r\n").to_string();
557 sql = format!(
558 "CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
559 options.table_name, sql
560 );
561
562 let sqls = format!(
563 "replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
564 options.table_name,
565 fields_list_new.join("`,`"),
566 fields_list.join("`,`")
567 );
568 let drop_sql = format!("drop table {};\r\n", options.table_name);
569 let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
570 let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
571
572 if self.params.sql {
573 let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
574 if !unique.is_empty() {
575 list.push(unique)
576 }
577 if !index.is_empty() {
578 list.extend(index)
579 }
580 return JsonValue::from(list.join(""));
581 }
582
583 if add.is_empty() && del.is_empty() && unique.is_empty() && index.is_empty() && put.is_empty()
584 {
585 return JsonValue::from(-1);
586 }
587
588 let (state, _) = self.execute(sql.clone());
589 let data = match state {
590 true => {
591 let (state, _) = self.execute(sqls.clone());
592 match state {
593 true => {
594 let (state, _) = self.execute(drop_sql);
595 match state {
596 true => {
597 let (state, _) = self.execute(alter_sql);
598 match state {
599 true => {
600 if !unique.is_empty() {
601 let (state, _) = self.execute(unique.clone());
602 info!(
603 "{} {} 唯一索引创建:{}",
604 thread_id, options.table_name, state
605 );
606 }
607 for index_sql in index.iter() {
608 let (state, _) = self.execute(index_sql.clone());
609 match state {
610 true => {}
611 false => {
612 error!(
613 "{} 索引创建失败: {} {}",
614 options.table_name, state, index_sql
615 );
616 return JsonValue::from(0);
617 }
618 };
619 }
620
621 return JsonValue::from(1);
622 }
623 false => {
624 error!("{} 修改表名失败", options.table_name);
625 return JsonValue::from(0);
626 }
627 }
628 }
629 false => {
630 error!("{} 删除本表失败", options.table_name);
631 return JsonValue::from(0);
632 }
633 }
634 }
635 false => {
636 error!(
637 "{} 添加tmp表记录失败 {:#} {:#}",
638 options.table_name, sql, sqls
639 );
640 let sql = format!("drop table {}_tmp", options.table_name);
641 let (_, _) = self.execute(sql);
642 0
643 }
644 }
645 }
646 false => {
647 error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
648 let (_, _) = self.execute(drop_sql_temp);
649 0
650 }
651 };
652 JsonValue::from(data)
653 }
654
655 fn table_info(&mut self, table: &str) -> JsonValue {
656 if FIELDS.lock().unwrap().get(table).is_some() {
657 return FIELDS.lock().unwrap().get(table).unwrap().clone();
658 }
659 let sql = format!("PRAGMA table_info({})", table);
660 let (state, data) = self.query(sql);
661
662 match state {
663 true => {
664 let mut fields = object! {};
665 for item in data.members() {
666 fields[item["name"].as_str().unwrap()] = item.clone();
667 }
668 FIELDS.lock().unwrap().insert(table.to_string(), fields.clone());
669 data
670 }
671 false => object! {}
672 }
673 }
674
675 fn table_is_exist(&mut self, name: &str) -> bool {
676 let sql = format!(
677 "SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{}'",
678 name
679 );
680 let (state, data) = self.query(sql);
681 if state {
682 if data[0]["count"].as_i64().unwrap() > 0 {
683 return true;
684 }
685 false
686 } else {
687 false
688 }
689 }
690
691 fn table(&mut self, name: &str) -> &mut Sqlite {
692 self.params = Params::default(self.connection.mode.str().as_str());
693 self.params.table = format!("{}{}", self.connection.prefix, name);
694 self.params.join_table = self.params.table.clone();
695 self
696 }
697 fn change_table(&mut self, name: &str) -> &mut Self {
698 self.params.join_table = name.to_string();
699 self
700 }
701 fn autoinc(&mut self) -> &mut Self {
702 self.params.autoinc = true;
703 self
704 }
705
706 fn fetch_sql(&mut self) -> &mut Self {
707 self.params.sql = true;
708 self
709 }
710
711 fn order(&mut self, field: &str, by: bool) -> &mut Self {
712 self.params.order[field] = {
713 if by {
714 "DESC"
715 } else {
716 "ASC"
717 }
718 }.into();
719 self
720 }
721
722 fn group(&mut self, field: &str) -> &mut Self {
723 let fields: Vec<&str> = field.split(",").collect();
724 for field in fields.iter() {
725 let fields = field.to_string();
726 self.params.group[fields.as_str()] = fields.clone().into();
727 self.params.fields[fields.as_str()] = fields.clone().into();
728 }
729 self
730 }
731
732 fn distinct(&mut self) -> &mut Self {
733 self.params.distinct = true;
734 self
735 }
736 fn json(&mut self, field: &str) -> &mut Self {
737 let list: Vec<&str> = field.split(",").collect();
738 for item in list.iter() {
739 self.params.json[item.to_string().as_str()] = item.to_string().into();
740 }
741 self
742 }
743
744 fn field(&mut self, field: &str) -> &mut Self {
745 let list: Vec<&str> = field.split(",").collect();
746 let join_table = if self.params.join_table.is_empty() {
747 self.params.table.clone()
748 } else {
749 self.params.join_table.clone()
750 };
751 for item in list.iter() {
752 let item = item.to_string();
753 if item.contains(" as ") {
754 let text = item.split(" as ").collect::<Vec<&str>>().clone();
755 self.params.fields[item] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
756 } else {
757 self.params.fields[item] = format!("{}.`{}`", join_table, item).into();
758 }
759 }
760 self
761 }
762 fn hidden(&mut self, name: &str) -> &mut Self {
763 let hidden: Vec<&str> = name.split(",").collect();
764 let sql = format!("PRAGMA table_info({})", self.params.table);
765 let (_, data) = self.query(sql);
766 for item in data.members() {
767 let name = item["name"].as_str().unwrap();
768 if !hidden.contains(&name) {
769 self.params.fields[name] = name.into();
770 }
771 }
772 self
773 }
774
775 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
776 let join_table = if self.params.join_table.is_empty() {
777 self.params.table.clone()
778 } else {
779 self.params.join_table.clone()
780 };
781
782 if value.is_boolean() {
783 if value.as_bool().unwrap() {
784 value = 1.into();
785 } else {
786 value = 0.into();
787 }
788 }
789
790 match compare {
791 "between" => {
792 self.params.where_and.push(format!(
793 "{}.`{}` between '{}' AND '{}'",
794 join_table, field, value[0], value[1]
795 ));
796 }
797 "set" => {
798 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
799 let mut wheredata = vec![];
800 for item in list.iter() {
801 wheredata.push(format!("{}.`{}` like '%{}%'", join_table, field, item));
802 }
803 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
804 }
805 "notin" => {
806 let mut text = String::new();
807 for item in value.members() {
808 text = format!("{},'{}'", text, item);
809 }
810 text = text.trim_start_matches(",").into();
811 self.params.where_and.push(format!("{}.`{}` not in ({})", join_table, field, text));
812 }
813 "in" => {
814 let mut text = String::new();
815 if value.is_array() {
816 for item in value.members() {
817 text = format!("{},'{}'", text, item);
818 }
819 } else {
820 let value = value.to_string();
821 let value: Vec<&str> = value.split(",").collect();
822 for item in value.iter() {
823 text = format!("{},'{}'", text, item);
824 }
825 }
826 text = text.trim_start_matches(",").into();
827
828 self.params.where_and.push(format!("{}.`{}` {} ({})", join_table, field, compare, text));
829 }
830 "=" => {
831 if value.is_null() {
832 self.params.where_and.push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
833 } else {
834 self.params.where_and.push(format!(
835 "{}.`{}` {} '{}'",
836 join_table, field, compare, value
837 ));
838 }
839 }
840 _ => {
841 if value.is_null() {
842 self.params.where_and.push(format!("{}.`{}` {} {}", join_table, field, compare, value));
843 } else {
844 self.params.where_and.push(format!(
845 "{}.`{}` {} '{}'",
846 join_table, field, compare, value
847 ));
848 }
849 }
850 }
851 self
852 }
853 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
854 let join_table = if self.params.join_table.is_empty() {
855 self.params.table.clone()
856 } else {
857 self.params.join_table.clone()
858 };
859
860 if value.is_boolean() {
861 if value.as_bool().unwrap() {
862 value = 1.into();
863 } else {
864 value = 0.into();
865 }
866 }
867 match compare {
868 "between" => {
869 self.params.where_or.push(format!(
870 "{}.`{}` between '{}' AND '{}'",
871 join_table, field, value[0], value[1]
872 ));
873 }
874 "set" => {
875 let tt = value.to_string().replace(",", "%");
876 self.params.where_or.push(format!("{}.`{}` like '%{}%'", join_table, field, tt));
877 }
878 "notin" => {
879 let mut text = String::new();
880 for item in value.members() {
881 text = format!("{},'{}'", text, item);
882 }
883 text = text.trim_start_matches(",").into();
884 self.params.where_or.push(format!("{}.`{}` not in ({})", join_table, field, text));
885 }
886 "in" => {
887 let mut text = String::new();
888 if value.is_array() {
889 for item in value.members() {
890 text = format!("{},'{}'", text, item);
891 }
892 } else {
893 let value = value.as_str().unwrap();
894 let value: Vec<&str> = value.split(",").collect();
895 for item in value.iter() {
896 text = format!("{},'{}'", text, item);
897 }
898 }
899 text = text.trim_start_matches(",").into();
900 self.params.where_or.push(format!("{}.`{}` {} ({})", join_table, field, compare, text));
901 }
902 _ => {
903 self.params.where_or.push(format!(
904 "{}.`{}` {} '{}'",
905 join_table, field, compare, value
906 ));
907 }
908 }
909 self
910 }
911 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
912 self.params.where_column = format!(
913 "{}.`{}` {} {}.`{}`",
914 self.params.table, field_a, compare, self.params.table, field_b
915 );
916 self
917 }
918
919 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
920 self.params.page = page;
921 self.params.limit = limit;
922 self
923 }
924
925 fn column(&mut self, field: &str) -> JsonValue {
926 self.field(field);
927 self.group(field);
928 let sql = self.params.select_sql();
929 if self.params.sql {
930 return JsonValue::from(sql.clone());
931 }
932 self.table_info(self.params.table.clone().as_str());
933 let (state, data) = self.query(sql);
934 if state {
935 let mut list = array![];
936 for item in data.members() {
937 if self.params.json[field].is_empty() {
938 list.push(item[field].clone()).unwrap();
939 } else {
940 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
941 list.push(data).unwrap();
942 }
943 }
944 list
945 } else {
946 array![]
947 }
948 }
949
950 fn count(&mut self) -> JsonValue {
951 self.params.fields["count"] = "count(*) as count".to_string().into();
952 let sql = self.params.select_sql();
953 if self.params.sql {
954 return JsonValue::from(sql.clone());
955 }
956 let (state, data) = self.query(sql);
957 match state {
958 true => data[0]["count"].clone(),
959 false => JsonValue::from(0),
960 }
961 }
962
963 fn max(&mut self, field: &str) -> JsonValue {
964 self.params.fields[field] = format!("max({00}) as {00}", field).into();
965 let sql = self.params.select_sql();
966 if self.params.sql {
967 return JsonValue::from(sql.clone());
968 }
969 let (state, data) = self.query(sql);
970 if state {
971 if data.len() > 1 {
972 return data;
973 }
974 data[0][field].clone()
975 } else {
976 JsonValue::from(0.0)
977 }
978 }
979
980 fn min(&mut self, field: &str) -> JsonValue {
981 self.params.fields[field] = format!("min({00}) as {00}", field).into();
982 let sql = self.params.select_sql();
983 let (state, data) = self.query(sql);
984 if state {
985 if data.len() > 1 {
986 return data;
987 }
988 data[0][field].clone()
989 } else {
990 JsonValue::from(0.0)
991 }
992 }
993
994 fn sum(&mut self, field: &str) -> JsonValue {
995 self.params.fields[field] = format!("sum({00}) as {00}", field).into();
996 let sql = self.params.select_sql();
997 if self.params.sql {
998 return JsonValue::from(sql.clone());
999 }
1000 let (state, data) = self.query(sql);
1001 match state {
1002 true => {
1003 if data.len() > 1 {
1004 return data;
1005 }
1006 if self.params.fields.len() > 1 {
1007 return data[0].clone();
1008 }
1009 data[0][field].clone()
1010 }
1011 false => JsonValue::from(0),
1012 }
1013 }
1014 fn avg(&mut self, field: &str) -> JsonValue {
1015 self.params.fields[field] = format!("avg({00}) as {00}", field).into();
1016 let sql = self.params.select_sql();
1017 if self.params.sql {
1018 return JsonValue::from(sql.clone());
1019 }
1020 let (state, data) = self.query(sql);
1021 if state {
1022 if data.len() > 1 {
1023 return data;
1024 }
1025 data[0][field].clone()
1026 } else {
1027 JsonValue::from(0)
1028 }
1029 }
1030 fn select(&mut self) -> JsonValue {
1031 let sql = self.params.select_sql();
1032 if self.params.sql {
1033 return JsonValue::from(sql.clone());
1034 }
1035 self.table_info(self.params.table.clone().as_str());
1036 let (state, mut data) = self.query(sql.clone());
1037 match state {
1038 true => {
1039 for (field, _) in self.params.json.entries() {
1040 for item in data.members_mut() {
1041 if !item[field].is_empty() {
1042 let json = item[field].to_string();
1043 item[field] = match json::parse(&json) {
1044 Ok(e) => e,
1045 Err(_) => JsonValue::from(json),
1046 };
1047 }
1048 }
1049 }
1050 data.clone()
1051 }
1052 false => {
1053 error!("{:?}", data);
1054 array![]
1055 }
1056 }
1057 }
1058 fn find(&mut self) -> JsonValue {
1059 self.params.page = 1;
1060 self.params.limit = 1;
1061 let sql = self.params.select_sql();
1062 if self.params.sql {
1063 return JsonValue::from(sql.clone());
1064 }
1065
1066 self.table_info(self.params.table.clone().as_str());
1067 let (state, mut data) = self.query(sql.clone());
1068 match state {
1069 true => {
1070 if data.is_empty() {
1071 return object! {};
1072 }
1073 for (field, _) in self.params.json.entries() {
1074 if !data[0][field].is_empty() {
1075 let json = data[0][field].to_string();
1076 let json = json::parse(&json).unwrap_or(array![]);
1077 data[0][field] = json;
1078 } else {
1079 data[0][field] = array![];
1080 }
1081 }
1082 data[0].clone()
1083 }
1084 false => {
1085 error!("{:?}", data);
1086 object! {}
1087 }
1088 }
1089 }
1090
1091 fn value(&mut self, field: &str) -> JsonValue {
1092 self.params.fields = object! {};
1093 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1094 self.params.page = 1;
1095 self.params.limit = 1;
1096 let sql = self.params.select_sql();
1097 if self.params.sql {
1098 return JsonValue::from(sql.clone());
1099 }
1100 self.table_info(self.params.table.clone().as_str());
1101 let (state, mut data) = self.query(sql.clone());
1102 match state {
1103 true => {
1104 for (field, _) in self.params.json.entries() {
1105 if !data[0][field].is_empty() {
1106 let json = data[0][field].to_string();
1107 let json = json::parse(&json).unwrap_or(array![]);
1108 data[0][field] = json;
1109 } else {
1110 data[0][field] = array![];
1111 }
1112 }
1113 data[0][field].clone()
1114 }
1115 false => {
1116 if self.connection.debug {
1117 info!("{:?}", data);
1118 }
1119 JsonValue::Null
1120 }
1121 }
1122 }
1123
1124 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1125 let mut fields = vec![];
1126 let mut values = vec![];
1127
1128 if !self.params.autoinc && data["id"].is_empty() {
1129 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1130 }
1131 for (field, value) in data.entries() {
1132 fields.push(format!("`{}`", field));
1133
1134 if value.is_string() {
1135 if value.to_string().contains("'") {
1136 values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
1137 continue;
1138 } else if value.to_string().contains('"') {
1139 values.push(format!("'{}'", value));
1140 continue;
1141 } else {
1142 values.push(format!("\"{}\"", value));
1143 continue;
1144 }
1145 } else if value.is_array() || value.is_object() {
1146 if self.params.json[field].is_empty() {
1147 values.push(format!("'{}'", value));
1148 } else {
1149 let json = value.to_string();
1150 let json = json.replace("'", "''");
1151 values.push(format!("'{}'", json));
1152 }
1153 continue;
1154 } else if value.is_number() || value.is_boolean() || value.is_null() {
1155 values.push(format!("{}", value));
1156 continue;
1157 } else {
1158 values.push(format!("'{}'", value));
1159 continue;
1160 }
1161 }
1162 let fields = fields.join(",");
1163 let values = values.join(",");
1164
1165 let sql = format!(
1166 "INSERT INTO `{}` ({}) VALUES ({});",
1167 self.params.table, fields, values
1168 );
1169 if self.params.sql {
1170 return JsonValue::from(sql.clone());
1171 }
1172 let (state, ids) = self.execute(sql);
1173 match state {
1174 true => {
1175 if self.params.autoinc {
1176 let (state, ids) = self.query(format!("select max(id) as id from {}", self.params.table));
1177 return match state {
1178 true => ids[0]["id"].clone(),
1179 false => {
1180 error!("{}", ids);
1181 JsonValue::from("")
1182 }
1183 };
1184 }
1185 data["id"].clone()
1186 }
1187 false => {
1188 error!("{}", ids);
1189 JsonValue::from("")
1190 }
1191 }
1192 }
1193 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1194 let mut fields = String::new();
1195
1196 if !self.params.autoinc && data[0]["id"].is_empty() {
1197 data[0]["id"] = "".into();
1198 }
1199 for (field, _) in data[0].entries() {
1200 fields = format!("{},`{}`", fields, field);
1201 }
1202 fields = fields.trim_start_matches(",").parse().unwrap();
1203
1204 let core_count = num_cpus::get();
1205 let mut p = pools::Pool::new(core_count * 4);
1206 let autoinc = self.params.autoinc;
1207 for list in data.members() {
1208 let mut item = list.clone();
1209 p.execute(move |pcindex| {
1210 if !autoinc && item["id"].is_empty() {
1211 let id = format!(
1212 "{:X}{:X}",
1213 Local::now().timestamp_nanos_opt().unwrap(),
1214 pcindex
1215 );
1216 item["id"] = id.into();
1217 }
1218 let mut values = "".to_string();
1219 for (_, value) in item.entries() {
1220 if value.is_string() {
1221 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1222 } else if value.is_number() || value.is_boolean() {
1223 values = format!("{},{}", values, value);
1224 } else {
1225 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1226 }
1227 }
1228 values = format!("({})", values.trim_start_matches(","));
1229 array![item["id"].clone(), values]
1230 });
1231 }
1232 let (ids_list, mut values) = p.insert_all();
1233
1234 values = values.trim_start_matches(",").parse().unwrap();
1235
1236 let sql = format!(
1237 "INSERT INTO {} ({}) VALUES {};",
1238 self.params.table, fields, values
1239 );
1240 if self.params.sql {
1241 return JsonValue::from(sql.clone());
1242 }
1243 let (state, data) = self.execute(sql.clone());
1244 match state {
1245 true => {
1246 if self.params.autoinc {
1247 let (state, ids) = self.query(format!(
1248 "SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
1249 self.params.table,
1250 ids_list.len()
1251 ));
1252 return match state {
1253 true => {
1254 let mut idlist = array![];
1255 for item in ids.members() {
1256 idlist.push(item["id"].clone()).unwrap();
1257 }
1258 idlist
1259 }
1260 false => {
1261 error!("批量添加失败: {:?} {}", ids, sql);
1262 array![]
1263 }
1264 };
1265 }
1266 JsonValue::from(ids_list)
1267 }
1268 false => {
1269 error!("批量添加失败: {:?} {}", data, sql);
1270 array![]
1271 }
1272 }
1273 }
1274
1275 fn update(&mut self, data: JsonValue) -> JsonValue {
1276 let mut values = vec![];
1277
1278 for (field, value) in data.entries() {
1279 if value.is_string() {
1280 values.push(format!(
1281 "`{}` = '{}'",
1282 field,
1283 value.to_string().replace("'", "''")
1284 ));
1285 } else if value.is_array() || value.is_object() {
1286 if self.params.json[field].is_empty() {
1287 values.push(format!("`{}` = '{}'", field, value));
1288 } else {
1289 let json = value.to_string();
1290 let json = json.replace("'", "''");
1291 values.push(format!("`{}` = '{}'", field, json));
1292 }
1293 continue;
1294 } else if value.is_number() || value.is_boolean() || value.is_null() {
1295 values.push(format!("`{}` = {} ", field, value));
1296 } else {
1297 values.push(format!("`{}` = '{}' ", field, value));
1298 }
1299 }
1300
1301 for (field, value) in self.params.inc_dec.entries() {
1302 values.push(format!("{} = {}", field, value.to_string().clone()));
1303 }
1304
1305 let values = values.join(",");
1306 let sql = format!(
1307 "UPDATE `{}` SET {} {} {};",
1308 self.params.table.clone(),
1309 values,
1310 self.params.where_sql(),
1311 self.params.page_limit_sql()
1312 );
1313 if self.params.sql {
1314 return JsonValue::from(sql.clone());
1315 }
1316 let (state, data) = self.execute(sql);
1317 if state {
1318 data
1319 } else {
1320 error!("{}", data);
1321 JsonValue::from(0)
1322 }
1323 }
1324
1325 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1326 let mut values = vec![];
1327 let mut ids = vec![];
1328 for (field, _) in data[0].entries() {
1329 if field == "id" {
1330 continue;
1331 }
1332 let mut fields = vec![];
1333 for row in data.members() {
1334 let value = row[field].clone();
1335 let id = row["id"].clone();
1336 ids.push(id.clone());
1337 if value.is_string() {
1338 fields.push(format!(
1339 "WHEN '{}' THEN '{}'",
1340 id,
1341 value.to_string().replace("'", "''")
1342 ));
1343 } else if value.is_array() || value.is_object() {
1344 if self.params.json[field].is_empty() {
1345 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1346 } else {
1347 let json = value.to_string();
1348 let json = json.replace("'", "''");
1349 fields.push(format!("WHEN '{}' THEN '{}'", id, json));
1350 }
1351 continue;
1352 } else if value.is_number() || value.is_boolean() || value.is_null() {
1353 fields.push(format!("WHEN '{}' THEN {}", id, value));
1354 } else {
1355 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1356 }
1357 }
1358 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1359 }
1360
1361 self.where_and("id", "in", ids.into());
1362 for (field, value) in self.params.inc_dec.entries() {
1363 values.push(format!("{} = {}", field, value.to_string().clone()));
1364 }
1365
1366 let values = values.join(",");
1367 let sql = format!(
1368 "UPDATE {} SET {} {} {};",
1369 self.params.table.clone(),
1370 values,
1371 self.params.where_sql(),
1372 self.params.page_limit_sql()
1373 );
1374 if self.params.sql {
1375 return JsonValue::from(sql.clone());
1376 }
1377 let (state, data) = self.execute(sql);
1378 if state {
1379 data
1380 } else {
1381 error!("{:?}", data);
1382 JsonValue::from(0)
1383 }
1384 }
1385 fn delete(&mut self) -> JsonValue {
1386 let sql = format!(
1387 "delete FROM `{}` {};",
1388 self.params.table.clone(),
1389 self.params.where_sql()
1390 );
1391 if self.params.sql {
1392 return JsonValue::from(sql.clone());
1393 }
1394 let (state, data) = self.execute(sql);
1395 match state {
1396 true => data,
1397 false => {
1398 error!("delete 失败>>>{:?}", data);
1399 JsonValue::from(0)
1400 }
1401 }
1402 }
1403
1404 fn transaction(&mut self) -> bool {
1405 let thread_id = format!("{:?}", thread::current().id());
1406 let mut transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
1407 if transaction > 0 {
1408 transaction += 1;
1409 TR_COUNT.lock().unwrap().insert(thread_id.clone(), transaction);
1410 return true;
1411 }
1412
1413 loop {
1414 let mut t = TR_COUNT.lock().unwrap();
1415 if t.is_empty() {
1416 t.insert(thread_id.clone(), 1);
1417 break;
1418 }
1419 drop(t);
1420 thread::yield_now();
1421 }
1422
1423 let flags = OpenFlags::new().with_read_write().with_no_mutex();
1424 let db = match Connect::open_thread_safe_with_flags(
1425 self.connection.clone().get_dsn().as_str(),
1426 flags,
1427 ) {
1428 Ok(e) => Arc::new(e),
1429 Err(_) => {
1430 return false;
1431 }
1432 };
1433 TR.lock().unwrap().insert(thread_id.clone(), db);
1434 let (state, data) = self.query("BEGIN".to_string());
1435 if state {
1436 true
1437 } else {
1438 error!("{} 启动事务失败: {}", thread_id, data);
1439 TR.lock().unwrap().remove(&thread_id.clone());
1440 TR_COUNT.lock().unwrap().remove(&thread_id.clone());
1441 false
1442 }
1443 }
1444
1445 fn commit(&mut self) -> bool {
1446 let thread_id = format!("{:?}", thread::current().id());
1447
1448 let mut transaction = *TR_COUNT.lock().unwrap().get(&*thread_id.clone()).unwrap_or(&0);
1449 if transaction > 1 {
1450 transaction -= 1;
1451 TR_COUNT.lock().unwrap().insert(thread_id.clone(), transaction);
1452 return true;
1453 }
1454 let sql = "COMMIT";
1455 let (state, _) = self.query(sql.to_string());
1456 TR_COUNT.lock().unwrap().remove(&*thread_id.clone());
1457 TR.lock().unwrap().remove(&*thread_id.clone());
1458
1459 if state {
1460 true
1461 } else {
1462 error!("{} 提交事务失败", thread_id);
1463 false
1464 }
1465 }
1466
1467 fn rollback(&mut self) -> bool {
1468 let thread_id = format!("{:?}", thread::current().id());
1469 let sql = "ROLLBACK";
1470
1471 let mut t = *TR_COUNT.lock().unwrap().get(&thread_id).unwrap();
1472 if t > 1 {
1473 t -= 1;
1474 TR_COUNT.lock().unwrap().insert(thread_id.clone(), t);
1475 return true;
1476 }
1477 let (state, _) = self.query(sql.to_string());
1478 TR_COUNT.lock().unwrap().remove(&*thread_id.clone());
1479 TR.lock().unwrap().remove(&*thread_id.clone());
1480
1481 if state {
1482 true
1483 } else {
1484 error!("回滚失败: {}", thread_id);
1485 false
1486 }
1487 }
1488
1489 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1490 let (state, data) = self.query(sql.to_string());
1491 match state {
1492 true => Ok(data),
1493 false => Err(data.to_string()),
1494 }
1495 }
1496
1497 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1498 let (state, data) = self.execute(sql.to_string());
1499 match state {
1500 true => Ok(data),
1501 false => Err(data.to_string()),
1502 }
1503 }
1504
1505 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1506 self.params.inc_dec[field] = format!("`{}` + {}", field, num).into();
1507 self
1508 }
1509
1510 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1511 self.params.inc_dec[field] = format!("`{}` - {}", field, num).into();
1512 self
1513 }
1514
1515 fn buildsql(&mut self) -> String {
1516 self.fetch_sql();
1517 let sql = self.select().to_string();
1518 format!("( {} ) `{}`", sql, self.params.table)
1519 }
1520
1521 fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1522 let main_fields = if main_fields.is_empty() {
1523 "id"
1524 } else {
1525 main_fields
1526 };
1527 let right_fields = if right_fields.is_empty() {
1528 self.params.table.clone()
1529 } else {
1530 right_fields.to_string().clone()
1531 };
1532 self.params.join_table = table.to_string();
1533 self.params.join.push(format!(
1534 " LEFT JOIN {} ON {}.{} = {}.{}",
1535 table, self.params.table, main_fields, table, right_fields
1536 ));
1537 self
1538 }
1539
1540 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1541 let main_fields = if main_fields.is_empty() {
1542 "id"
1543 } else {
1544 main_fields
1545 };
1546 let second_fields = if second_fields.is_empty() {
1547 self.params.table.clone()
1548 } else {
1549 second_fields.to_string().clone()
1550 };
1551 let sec_table_name = format!("{}{}", table, "_2");
1552 let second_table = format!("{} {}", table, sec_table_name.clone());
1553 self.params.join_table = sec_table_name.clone();
1554 self.params.join.push(format!(
1555 " INNER JOIN {} ON {}.{} = {}.{}",
1556 second_table, self.params.table, main_fields, sec_table_name, second_fields
1557 ));
1558 self
1559 }
1560}