1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::{Connection, TABLE_FIELDS};
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::{Queryable};
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text, Value};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19lazy_static! {
20 static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
21 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
22 static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
23 Arc::new(Mutex::new(HashMap::new()));
24}
25#[cfg(any(feature = "default", feature = "db-mysql"))]
26#[derive(Clone, Debug)]
27pub struct Mysql {
28 pub connection: Connection,
30 pub default: String,
32 pub params: Params,
33 pub pool: Pool,
34}
35
36impl Mysql {
37 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
38 let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
39
40 let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
41
42 match Pool::new(opts) {
43 Ok(pool) => Ok(Self {
44 connection: connection.clone(),
45 default: default.clone(),
46 params: Params::default("mysql"),
47 pool,
48 }),
49 Err(e) => {
50 error!("connect: {e}");
51 Err(e.to_string())
52 }
53 }
54 }
55 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
56 if sql.contains("INSERT") {
57 let rows = text.affected_rows();
58 if rows > 1 {
59 if self.params.autoinc {
60 let row = rows;
61 let start_row = text.last_insert_id().unwrap();
62 let end_row = start_row + row;
63
64 let mut ids = array![];
65 for item in start_row..end_row {
66 ids.push(item).unwrap();
67 }
68 (true, ids)
69 } else {
70 (true, JsonValue::from(rows))
71 }
72 } else {
73 (true, JsonValue::from(text.last_insert_id()))
74 }
75 } else {
76 (true, JsonValue::from(text.affected_rows()))
77 }
78 }
79 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
80 let mut list = array![];
81 let mut index = 0;
82 text.for_each(|row| {
83 match row {
84 Ok(r) => {
85 let mut data = object! {};
86 for (index, item) in r.columns().iter().enumerate() {
87 let field = item.name_str();
88 let field = field.to_string();
89 let field = field.as_str();
90
91 data[field] = match item.column_type() {
92 ColumnType::MYSQL_TYPE_TINY => {
93 let t = r.get::<bool, _>(index).unwrap_or(true);
94 JsonValue::from(t)
95 }
96 ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
97 let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
98 if t == NULL {
99 JsonValue::from(0.0)
100 } else {
101 match r.get::<f64, _>(index) {
102 None => JsonValue::from(0.0),
103 Some(t) => JsonValue::from(t),
104 }
105 }
106 }
107 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
108 let t = r.index(field).clone();
109 if t == NULL {
110 JsonValue::from(0)
111 } else {
112 let t = r.get::<i64, _>(index).unwrap();
113 JsonValue::from(t)
114 }
115 }
116 ColumnType::MYSQL_TYPE_NULL => {
117 let t = r.index(field).clone();
118 if t == NULL {
119 JsonValue::from("".to_string())
120 } else {
121 let t = r.get::<String, _>(index).unwrap_or("".to_string());
122 JsonValue::from(t)
123 }
124 }
125 ColumnType::MYSQL_TYPE_BLOB => {
126 let t = r.index(field).clone();
127 if t == NULL {
128 JsonValue::from("".to_string())
129 } else {
130 let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
131 if t == NULL {
132 JsonValue::from("".to_string())
133 } else {
134 let t = r.get::<String, _>(index).unwrap_or("".to_string());
135 JsonValue::from(t)
136 }
137 }
138 }
139 ColumnType::MYSQL_TYPE_VAR_STRING => {
140 let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
141 if t == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let t = r.get::<String, _>(index).unwrap_or("".to_string());
145 JsonValue::from(t)
146 }
147 }
148 ColumnType::MYSQL_TYPE_STRING => {
149 let t = r.index(field).clone();
150 if t == NULL {
151 JsonValue::from("".to_string())
152 } else {
153 let t = r.get::<String, _>(index).unwrap_or("".to_string());
154 JsonValue::from(t)
155 }
156 }
157 ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME | ColumnType::MYSQL_TYPE_LONG_BLOB | ColumnType::MYSQL_TYPE_TIMESTAMP | ColumnType::MYSQL_TYPE_TIME => {
158 let t = r.index(field).clone();
159 if t == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let t = r.get::<String, _>(index).unwrap_or("".to_string());
163 JsonValue::from(t)
164 }
165 }
166 ColumnType::MYSQL_TYPE_GEOMETRY => {
167 let t = r.index(field).clone();
168 if t == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let res = match r.index(field).clone() {
172 Value::Bytes(e) => e,
173 _ => vec![]
174 };
175 let x = f64::from_le_bytes(res[9..17].try_into().unwrap());
176 let y = f64::from_le_bytes(res[17..25].try_into().unwrap());
177 JsonValue::from(format!("{x},{y}"))
178 }
179 }
180 _ => {
181 let t = r.index(field).clone();
182 info!("未知: {} {:?} {:?}", field, item.column_type(), t);
183 JsonValue::from("".to_string())
184 }
185 };
186 }
187 list.push(data).unwrap();
188 }
189 Err(e) => {
190 error!("err: {e} \r\n {sql}");
191 }
192 }
193 index += 1;
194 });
195 (true, list)
196 }
197 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
198 let thread_id = format!("{:?}", thread::current().id());
199 let key = format!("{}{}", self.default, thread_id);
200
201 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
202 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
203 Ok(e) => e,
204 Err(err) => {
205 error!("非事务 execute超时: {err}");
206 return (false, object! {});
207 }
208 };
209 let connection_id = db.connection_id();
210 return match db.query_iter(sql) {
211 Ok(e) => {
212 if self.connection.debug {
213 info!("查询成功: {} {}", thread_id.clone(), sql);
214 }
215 self.query_handle(e, sql)
216 }
217 Err(e) => {
218 error!(
219 "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
220 );
221 (false, JsonValue::from(e.to_string()))
222 }
223 };
224 } else {
225 let mut tr = TR.lock().unwrap();
226 let db = tr.get_mut(&*key).unwrap();
227 let connection_id = db.connection_id();
228 return match db.query_iter(sql) {
229 Ok(e) => {
230 if self.connection.debug {
231 info!("查询成功: {} {}", thread_id.clone(), sql);
232 }
233 self.query_handle(e, sql)
234 }
235 Err(e) => {
236 error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
237 (false, JsonValue::from(e.to_string()))
238 }
239 };
240 };
241 }
242 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
243 let thread_id = format!("{:?}", thread::current().id());
244 let key = format!("{}{}", self.default, thread_id);
245
246 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
247 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
248 Ok(e) => e,
249 Err(err) => {
250 error!("非事务: execute超时: {err}");
251 return (false, object! {});
252 }
253 };
254 return match db.exec_iter(sql, ()) {
255 Ok(e) => {
256 if self.connection.debug {
257 info!("提交成功: {} {}", thread_id.clone(), sql);
258 }
259 self.execute_cl(e, sql)
260 }
261 Err(e) => {
262 error!("非事务提交失败: {thread_id} {e} {sql}");
263 (false, JsonValue::from(e.to_string()))
264 }
265 };
266 } else {
267 let mut count_flag: i64 = 0;
269 loop {
270 let mut t = TRANS_TABLE.lock().unwrap();
271 if t.get(&self.params.table).is_none() {
272 t.insert(self.params.table.clone(), thread_id.clone());
273 break;
274 }
275 if t.get(&self.params.table).unwrap().clone() == thread_id {
276 break;
277 }
278 thread::yield_now();
279
280 count_flag += 1;
281 if count_flag == 10000 {
282 warn!("execute循环次数: 1w,强制退出");
283 break;
284 }
285 }
286
287 let mut tr = TR.lock().unwrap();
288 let db = tr.get_mut(&*key).unwrap();
289
290 return match db.exec_iter(sql, ()) {
291 Ok(e) => {
292 if self.connection.debug {
293 info!("提交成功: {} {}", thread_id.clone(), sql);
294 }
295
296 self.execute_cl(e, sql)
297 }
298 Err(e) => {
299 error!("事务提交失败: {thread_id} {e} {sql}");
300 (false, JsonValue::from(e.to_string()))
301 }
302 };
303 };
304 }
305}
306
307impl DbMode for Mysql {
308 fn database_tables(&mut self) -> JsonValue {
309 let sql = "SHOW TABLES".to_string();
310 match self.sql(sql.as_str()) {
311 Ok(e) => {
312 let mut list = vec![];
313 for item in e.members() {
314 for (_, value) in item.entries() {
315 list.push(value.clone());
316 }
317 }
318 list.into()
319 }
320 Err(_) => {
321 array![]
322 }
323 }
324 }
325
326 fn database_create(&mut self, name: &str) -> bool {
327 let sql = format!("CREATE DATABASE {name}");
328
329 let (state, data) = self.execute(sql.as_str());
330 match state {
331 true => data.as_bool().unwrap(),
332 false => {
333 error!("创建数据库失败: {data:?}");
334 false
335 }
336 }
337 }
338}
339
340impl Mode for Mysql {
341 fn table_create(&mut self, options: TableOptions) -> JsonValue {
342 let mut sql = String::new();
343 let mut unique_fields = String::new();
345 let mut unique_name = String::new();
346 let mut unique = String::new();
347 for item in options.table_unique.iter() {
348 if unique_fields.is_empty() {
349 unique_fields = format!("`{item}`");
350 unique_name = format!("{}_unique_{}", options.table_name, item);
351 } else {
352 unique_fields = format!("{unique_fields},`{item}`");
353 unique_name = format!("{unique_name}_{item}");
354 }
355 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
356 unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
357 }
358
359 let mut index = String::new();
361 for row in options.table_index.iter() {
362 let mut index_fields = String::new();
363 let mut index_name = String::new();
364 for item in row.iter() {
365 if index_fields.is_empty() {
366 index_fields = format!("`{item}`");
367 index_name = format!("{}_index_{}", options.table_name, item);
368 } else {
369 index_fields = format!("{index_fields},`{item}`");
370 index_name = format!("{index_name}_{item}");
371 }
372 }
373 if index.is_empty() {
374 index = format!("INDEX `{index_name}` ({index_fields})");
375 } else {
376 index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
377 }
378 }
379 if index.replace(",", "").is_empty() {
380 index = index.replace(",", "");
381 }
382
383 for (name, field) in options.table_fields.entries() {
384 let row = br_fields::field("mysql", name, field.clone());
385 sql = format!("{sql} {row},\r\n");
386 }
387
388 if !unique.is_empty() {
389 sql = sql.trim_end_matches(",\r\n").to_string();
390 sql = format!("{sql},\r\n{unique}");
391 }
392 if !index.is_empty() {
393 sql = sql.trim_end_matches(",\r\n").to_string();
394 sql = format!("{sql},\r\n{index}");
395 }
396 let collate = format!("{}_bin", self.connection.charset.str());
397
398 let partition = if options.table_partition {
400 sql = format!(
401 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
402 sql,
403 options.table_key,
404 options.table_partition_columns[0].clone()
405 );
406 let temp_head = format!(
407 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
408 options.table_partition_columns[0].clone()
409 );
410 let mut partition_array = vec![];
411 let mut count = 0;
412 for member in options.table_partition_columns[1].members() {
413 let temp = format!(
414 "PARTITION p{} VALUES LESS THAN ('{}')",
415 count.clone(),
416 member.clone()
417 );
418 count += 1;
419 partition_array.push(temp.clone());
420 }
421 let temp_body = partition_array.join(",\r\n");
422 let temp_end = format!(
423 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
424 count.clone()
425 );
426 format!("{temp_head}{temp_body}{temp_end}")
427 } else {
428 sql = if sql.trim_end().ends_with(",") {
429 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
430 } else {
431 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
432 };
433 "".to_string()
434 };
435 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
436
437 if self.params.sql {
438 return JsonValue::from(sql);
439 }
440
441 let (state, data) = self.execute(sql.as_str());
442
443 match state {
444 true => JsonValue::from(state),
445 false => {
446 info!("创建错误: {data}");
447 JsonValue::from(state)
448 }
449 }
450 }
451
452 fn table_update(&mut self, options: TableOptions) -> JsonValue {
453 if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, options.table_name)).is_some() {
454 TABLE_FIELDS.lock().unwrap().remove(&format!("{}{}", self.default, options.table_name));
455 }
456 let mut sql = vec![];
457 let fields_list = self.table_info(&options.table_name);
458 let mut put = vec![];
459 let mut add = vec![];
460 let mut del = vec![];
461 for (key, _) in fields_list.entries() {
462 if options.table_fields[key].is_empty() {
463 del.push(key);
464 }
465 }
466 for (name, field) in options.table_fields.entries() {
467 if !fields_list[name].is_empty() {
468 let old_comment = fields_list[name]["comment"].to_string();
469 let new_comment = br_fields::field("mysql", name, field.clone());
470 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
471 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
472 if old_comment == new_comment_text {
473 continue;
474 }
475 put.push(name);
476 } else {
477 add.push(name);
478 }
479 }
480
481 for name in add.iter() {
482 let name = name.to_string();
483 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
484 sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
485 }
486 for name in del.iter() {
487 sql.push(format!("ALTER TABLE {} DROP `{name}`;\r\n", options.table_name));
488 }
489 for name in put.iter() {
490 let name = name.to_string();
491 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
492 sql.push(format!(
493 "ALTER TABLE {} CHANGE `{}` {};\r\n",
494 options.table_name, name, row
495 ));
496 }
497
498 let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
499 let (_, pk_list) = self.query(
501 format!(
502 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
503 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
504 self.connection.database, options.table_name
505 ).as_str(),
506 );
507 let mut pk_vec = vec![];
508 for member in pk_list.members() {
509 pk_vec.push(member["COLUMN_NAME"].to_string());
510 }
511
512 let mut unique_new = vec![];
513 let mut index_new = vec![];
514 for item in index_list.members() {
515 let key_name = item["Key_name"].as_str().unwrap();
516 let non_unique = item["Non_unique"].as_i32().unwrap();
517
518 if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
519 {
520 unique_new.push(key_name.to_string());
521 continue;
522 }
523 if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
524 {
525 index_new.push(key_name.to_string());
526 continue;
527 }
528 }
529
530 let mut unique_fields = String::new();
531 let mut unique_name = String::new();
532 for item in options.table_unique.iter() {
533 if unique_fields.is_empty() {
534 unique_fields = format!("`{item}`");
535 unique_name = format!("{}_unique_{}", options.table_name, item);
536 } else {
537 unique_fields = format!("{unique_fields},`{item}`");
538 unique_name = format!("{unique_name}_{item}");
539 }
540 }
541 if !unique_name.is_empty() {
542 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
543 unique_name = format!("unique_{md5}");
544 for item in &unique_new {
545 if unique_name != *item {
546 sql.push(format!(
547 "alter table {} drop index {};\r\n",
548 options.table_name, item
549 ));
550 }
551 }
552 if !unique_new.contains(&unique_name) {
553 sql.push(format!(
554 "CREATE UNIQUE index {} on {} ({});\r\n",
555 unique_name, options.table_name, unique_fields
556 ));
557 }
558 }
559
560 let mut index_list = vec![];
561 for row in options.table_index.iter() {
563 let mut index_fields = String::new();
564 let mut index_name = String::new();
565 for item in row {
566 if index_fields.is_empty() {
567 index_fields = item.to_string();
568 index_name = format!("{}_index_{}", options.table_name, item);
569 } else {
570 index_fields = format!("{index_fields},{item}");
571 index_name = format!("{index_name}_{item}");
572 }
573 }
574 index_list.push(index_name.clone());
575 if !index_new.contains(&index_name.clone()) {
576 sql.push(format!(
577 "CREATE INDEX {} on {} ({});\r\n",
578 index_name, options.table_name, index_fields
579 ));
580 }
581 }
582
583 for item in index_new {
584 if !index_list.contains(&item.to_string()) {
585 sql.push(format!(
586 "DROP INDEX {} ON {};\r\n",
587 item.clone(),
588 options.table_name
589 ));
590 }
591 }
592
593 if options.table_partition {
595 if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
597 {
598 let pk = format!(
599 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
600 options.table_name,
601 options.table_key,
602 options.table_partition_columns[0].clone()
603 );
604 sql.push(pk);
605 let temp_head = format!(
606 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
607 options.table_name,
608 options.table_partition_columns[0].clone()
609 );
610 let mut partition_array = vec![];
611 let mut count = 0;
612 for member in options.table_partition_columns[1].members() {
613 let temp = format!(
614 "PARTITION p{} VALUES LESS THAN ('{}')",
615 count.clone(),
616 member.clone()
617 );
618 count += 1;
619 partition_array.push(temp.clone());
620 }
621 let temp_body = partition_array.join(",\r\n");
622 let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
623 sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
624 }
625 } else if pk_vec.len() != 1 {
626 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
627 sql.push(rm_partition);
628 let pk = format!(
629 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
630 options.table_name, options.table_key
631 );
632 sql.push(pk);
633 };
634
635 if self.params.sql {
636 return JsonValue::from(sql.join(""));
637 }
638
639 if sql.is_empty() {
640 return JsonValue::from(-1);
641 }
642
643 for item in sql.iter() {
644 let (state, res) = self.execute(item.as_str());
645 match state {
646 true => {}
647 false => {
648 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
649 return JsonValue::from(0);
650 }
651 }
652 }
653 JsonValue::from(1)
654 }
655
656 fn table_info(&mut self, table: &str) -> JsonValue {
657 if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).is_some() {
658 return TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).unwrap().clone();
659 }
660 let sql = format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'");
661 let (state, data) = self.query(sql.as_str());
662 let mut list = object! {};
663 if state {
664 for item in data.members() {
665 if item["TABLE_SCHEMA"] != self.connection.database {
666 continue;
667 }
668 let mut row = object! {};
669 row["field"] = item["COLUMN_NAME"].clone();
670 row["comment"] = item["COLUMN_COMMENT"].clone();
671 row["type"] = item["COLUMN_TYPE"].clone();
672 list[row["field"].as_str().unwrap()] = row.clone();
673 }
674 TABLE_FIELDS.lock().unwrap().insert(format!("{}{}", self.default, table), list.clone());
675 list
676 } else {
677 list
678 }
679 }
680
681 fn table_is_exist(&mut self, name: &str) -> bool {
682 let sql = format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
683 let (state, data) = self.query(sql.as_str());
684 match state {
685 true => {
686 for item in data.members() {
687 if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
688 {
689 return true;
690 }
691 }
692 false
693 }
694 false => false,
695 }
696 }
697
698 fn table(&mut self, name: &str) -> &mut Mysql {
699 self.params = Params::default(self.connection.mode.str().as_str());
700 self.params.table = format!("{}{}", self.connection.prefix, name);
701 self.params.join_table = self.params.table.clone();
702 self
703 }
704
705 fn change_table(&mut self, name: &str) -> &mut Self {
706 self.params.join_table = name.to_string();
707 self
708 }
709
710 fn autoinc(&mut self) -> &mut Self {
711 self.params.autoinc = true;
712 self
713 }
714
715 fn fetch_sql(&mut self) -> &mut Self {
716 self.params.sql = true;
717 self
718 }
719
720 fn order(&mut self, field: &str, by: bool) -> &mut Self {
721 self.params.order[field] = {
722 if by {
723 "DESC"
724 } else {
725 "ASC"
726 }
727 }.into();
728 self
729 }
730
731 fn group(&mut self, field: &str) -> &mut Self {
732 let fields: Vec<&str> = field.split(",").collect();
733 for field in fields.iter() {
734 let field = field.to_string();
735 self.params.group[field.as_str()] = field.clone().into();
736 if !self.params.fields.has_key(field.as_str()) {
737 self.params.fields[field.as_str()] = field.clone().into();
738 }
739 }
740
741 self
742 }
743
744 fn distinct(&mut self) -> &mut Self {
745 self.params.distinct = true;
746 self
747 }
748
749 fn json(&mut self, field: &str) -> &mut Self {
750 let list: Vec<&str> = field.split(",").collect();
751 for item in list.iter() {
752 self.params.json[item.to_string().as_str()] = item.to_string().into();
753 }
754 self
755 }
756
757 fn location(&mut self, field: &str) -> &mut Self {
758 let list: Vec<&str> = field.split(",").collect();
759 for item in list.iter() {
760 self.params.location[item.to_string().as_str()] = item.to_string().into();
761 }
762 self
763 }
764
765 fn field(&mut self, field: &str) -> &mut Self {
766 let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
767 let join_table = if self.params.join_table.is_empty() {
768 self.params.table.clone()
769 } else {
770 self.params.join_table.clone()
771 };
772 for item in list.iter() {
773 if item.contains(" as ") {
774 let text = item.split(" as ").collect::<Vec<&str>>();
775 if text[0].contains("count(") {
776 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
777 } else {
778 self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
779 }
780 } else {
781 self.params.fields[item.to_string().as_str()] = format!("{join_table}.`{item}`").into();
782 }
783 }
784 self
785 }
786
787 fn hidden(&mut self, name: &str) -> &mut Self {
788 let hidden: Vec<&str> = name.split(",").collect();
789
790 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
791
792 let mut data = array![];
793 for item in fields_list.members() {
794 data.push(object! {
795 "name":item["COLUMN_NAME"].as_str().unwrap()
796 }).unwrap();
797 }
798
799 for item in data.members() {
800 let name = item["name"].as_str().unwrap();
801 if !hidden.contains(&name) {
802 self.params.fields[name] = name.into();
803 }
804 }
805 self
806 }
807
808 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
809 let table_fields = self.table_info(&self.params.table.clone());
810 let join_table = if self.params.join_table.is_empty() {
811 self.params.table.clone()
812 } else {
813 self.params.join_table.clone()
814 };
815 if value.is_boolean() {
816 if value.as_bool().unwrap() {
817 value = 1.into();
818 } else {
819 value = 0.into();
820 }
821 }
822 match compare {
823 "between" => {
824 self.params.where_and.push(format!("{join_table}.`{field}` between '{}' AND '{}'", value[0], value[1]));
825 }
826 "location" => {
827 let comment = table_fields[field]["comment"].to_string();
828 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
829
830 let field_name = format!("ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}", value[0], value[1], value[4]);
831 self.params.fields[&field_name.clone()] = field_name.clone().into();
832 let location = format!("ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}", value[0], value[1], value[2], value[3]);
835 self.params.where_and.push(location);
836 }
837 "set" => {
838 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
839 let mut wheredata = vec![];
840 for item in list.iter() {
841 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
842 }
843 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
844 }
845 "notin" => {
846 let mut text = String::new();
847 for item in value.members() {
848 text = format!("{text},'{item}'");
849 }
850 text = text.trim_start_matches(",").into();
851 self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
852 }
853 "is" => {
854 self.params.where_and.push(format!("{join_table}.`{field}` is {value}"));
855 }
856 "isnot" => {
857 self.params.where_and.push(format!("{join_table}.`{field}` is not {value}"));
858 }
859 "notlike" => {
860 self.params.where_and.push(format!("{join_table}.`{field}` not like '{value}'"));
861 }
862 "in" => {
863 let mut text = String::new();
864 if value.is_array() {
865 for item in value.members() {
866 text = format!("{text},'{item}'");
867 }
868 } else if value.is_null() {
869 text = format!("{text},null");
870 } else {
871 let value = value.as_str().unwrap();
872
873 let value: Vec<&str> = value.split(",").collect();
874 for item in value.iter() {
875 text = format!("{text},'{item}'");
876 }
877 }
878 text = text.trim_start_matches(",").into();
879
880 self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
881 }
882 _ => {
883 self.params.where_and.push(format!("{join_table}.`{field}` {compare} '{value}'"));
884 }
885 }
886 self
887 }
888
889 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
890 let join_table = if self.params.join_table.is_empty() {
891 self.params.table.clone()
892 } else {
893 self.params.join_table.clone()
894 };
895
896 if value.is_boolean() {
897 if value.as_bool().unwrap() {
898 value = 1.into();
899 } else {
900 value = 0.into();
901 }
902 }
903
904 match compare {
905 "between" => {
906 self.params.where_or.push(format!(
907 "{}.`{}` between '{}' AND '{}'",
908 join_table, field, value[0], value[1]
909 ));
910 }
911 "set" => {
912 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
913 let mut wheredata = vec![];
914 for item in list.iter() {
915 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
916 }
917 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
918 }
919 "notin" => {
920 let mut text = String::new();
921 for item in value.members() {
922 text = format!("{text},'{item}'");
923 }
924 text = text.trim_start_matches(",").into();
925 self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
926 }
927 "is" => {
928 self.params.where_or.push(format!("{join_table}.`{field}` is {value}"));
929 }
930 "isnot" => {
931 self.params.where_or.push(format!("{join_table}.`{field}` IS NOT {value}"));
932 }
933 "in" => {
934 let mut text = String::new();
935 if value.is_array() {
936 for item in value.members() {
937 text = format!("{text},'{item}'");
938 }
939 } else {
940 let value = value.as_str().unwrap();
941 let value: Vec<&str> = value.split(",").collect();
942 for item in value.iter() {
943 text = format!("{text},'{item}'");
944 }
945 }
946 text = text.trim_start_matches(",").into();
947 self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
948 }
949 _ => {
950 if field.contains(".") {
951 self.params.where_or.push(format!("{field} {compare} '{value}'"));
952 } else {
953 self.params.where_or.push(format!("{join_table}.`{field}` {compare} '{value}'"));
954 }
955 }
956 }
957 self
958 }
959
960 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
961 self.params.where_column = format!(
962 "{}.`{}` {} {}.`{}`",
963 self.params.table, field_a, compare, self.params.table, field_b
964 );
965 self
966 }
967
968 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
969 self.params.page = page;
970 self.params.limit = limit;
971 self
972 }
973
974 fn column(&mut self, field: &str) -> JsonValue {
975 self.field(field);
976 let sql = self.params.select_sql();
977
978 if self.params.sql {
979 return JsonValue::from(sql);
980 }
981 let (state, data) = self.query(sql.as_str());
982 match state {
983 true => {
984 let mut list = array![];
985 for item in data.members() {
986 if self.params.json[field].is_empty() {
987 list.push(item[field].clone()).unwrap();
988 } else {
989 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
990 list.push(data).unwrap();
991 }
992 }
993 list
994 }
995 false => {
996 array![]
997 }
998 }
999 }
1000
1001 fn count(&mut self) -> JsonValue {
1002 if !self.params.fields.is_empty() {
1003 self.group(format!("{}.id", self.params.table).as_str());
1004 }
1005 self.params.fields["count"] = "count(*) as count".into();
1006 let sql = self.params.select_sql();
1007 if self.params.sql {
1008 return JsonValue::from(sql.clone());
1009 }
1010 let (state, data) = self.query(sql.as_str());
1011 if state {
1012 if data.is_empty() {
1013 JsonValue::from(0)
1014 } else {
1015 data[0]["count"].clone()
1016 }
1017 } else {
1018 JsonValue::from(0)
1019 }
1020 }
1021
1022 fn max(&mut self, field: &str) -> JsonValue {
1023 self.params.fields[field] = format!("max({field}) as {field}").into();
1024 let sql = self.params.select_sql();
1025 if self.params.sql {
1026 return JsonValue::from(sql.clone());
1027 }
1028 let (state, data) = self.query(sql.as_str());
1029 if state {
1030 if data.len() > 1 {
1031 return data.clone();
1032 }
1033 data[0][field].clone()
1034 } else {
1035 JsonValue::from(0)
1036 }
1037 }
1038
1039 fn min(&mut self, field: &str) -> JsonValue {
1040 self.params.fields[field] = format!("min({field}) as {field}").into();
1041 let sql = self.params.select_sql();
1042 if self.params.sql {
1043 return JsonValue::from(sql.clone());
1044 }
1045 let (state, data) = self.query(sql.as_str());
1046 if state {
1047 if data.len() > 1 {
1048 return data;
1049 }
1050 data[0][field].clone()
1051 } else {
1052 JsonValue::from(0)
1053 }
1054 }
1055
1056 fn sum(&mut self, field: &str) -> JsonValue {
1057 self.params.fields[field] = format!("sum({field}) as {field}").into();
1058 let sql = self.params.select_sql();
1059 if self.params.sql {
1060 return JsonValue::from(sql.clone());
1061 }
1062 let (state, data) = self.query(sql.as_str());
1063 match state {
1064 true => {
1065 if data.len() > 1 {
1066 return data;
1067 }
1068 data[0][field].clone()
1069 }
1070 false => JsonValue::from(0),
1071 }
1072 }
1073
1074 fn avg(&mut self, field: &str) -> JsonValue {
1075 self.params.fields[field] = format!("avg({field}) as {field}").into();
1076 let sql = self.params.select_sql();
1077 if self.params.sql {
1078 return JsonValue::from(sql.clone());
1079 }
1080 let (state, data) = self.query(sql.as_str());
1081 if state {
1082 if data.len() > 1 {
1083 return data;
1084 }
1085 data[0][field].clone()
1086 } else {
1087 JsonValue::from(0)
1088 }
1089 }
1090
1091 fn select(&mut self) -> JsonValue {
1092 let sql = self.params.select_sql();
1093 if self.params.sql {
1094 return JsonValue::from(sql.clone());
1095 }
1096 let (state, mut data) = self.query(sql.as_str());
1097 match state {
1098 true => {
1099 for (field, _) in self.params.json.entries() {
1100 for item in data.members_mut() {
1101 if !item[field].is_empty() {
1102 let json = item[field].to_string();
1103 item[field] = match json::parse(&json) {
1104 Ok(e) => e,
1105 Err(_) => JsonValue::from(json),
1106 };
1107 }
1108 }
1109 }
1110 data.clone()
1111 }
1112 false => array![],
1113 }
1114 }
1115
1116 fn find(&mut self) -> JsonValue {
1117 self.params.page = 1;
1118 self.params.limit = 1;
1119 let sql = self.params.select_sql();
1120 if self.params.sql {
1121 return JsonValue::from(sql.clone());
1122 }
1123 let (state, mut data) = self.query(sql.as_str());
1124 match state {
1125 true => {
1126 if data.is_empty() {
1127 return object! {};
1128 }
1129 for (field, _) in self.params.json.entries() {
1130 if !data[0][field].is_empty() {
1131 let json = data[0][field].to_string();
1132 let json = json::parse(&json).unwrap_or(array![]);
1133 data[0][field] = json;
1134 } else {
1135 data[0][field] = array![];
1136 }
1137 }
1138 data[0].clone()
1139 }
1140 false => {
1141 error!("find失败: {data:?}");
1142 object! {}
1143 }
1144 }
1145 }
1146
1147 fn value(&mut self, field: &str) -> JsonValue {
1148 self.params.fields = object! {};
1149 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1150 self.params.page = 1;
1151 self.params.limit = 1;
1152 let sql = self.params.select_sql();
1153 if self.params.sql {
1154 return JsonValue::from(sql.clone());
1155 }
1156 let (state, mut data) = self.query(sql.as_str());
1157 match state {
1158 true => {
1159 for (field, _) in self.params.json.entries() {
1160 if !data[0][field].is_empty() {
1161 let json = data[0][field].to_string();
1162 let json = json::parse(&json).unwrap_or(array![]);
1163 data[0][field] = json;
1164 } else {
1165 data[0][field] = array![];
1166 }
1167 }
1168 data[0][field].clone()
1169 }
1170 false => {
1171 if self.connection.debug {
1172 info!("{data:?}");
1173 }
1174 JsonValue::Null
1175 }
1176 }
1177 }
1178 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1179 let fields_list = self.table_info(&self.params.table.clone());
1180
1181 let mut fields = vec![];
1182 let mut values = vec![];
1183 if !self.params.autoinc && data["id"].is_empty() {
1184 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1185 }
1186 for (field, value) in data.entries() {
1187 fields.push(format!("`{field}`"));
1188
1189 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1190 if value.is_empty() {
1191 values.push("NULL".to_string());
1192 continue;
1193 }
1194 let comment = fields_list[field]["comment"].to_string();
1195 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1196 let location = value.to_string().replace(",", " ");
1197 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1198 continue;
1199 }
1200
1201 if value.is_string() || value.is_array() || value.is_object() {
1202 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1203 continue;
1204 } else if value.is_number() || value.is_boolean() || value.is_null() {
1205 values.push(format!("{value}"));
1206 continue;
1207 } else {
1208 values.push(format!("'{value}'"));
1209 continue;
1210 }
1211 }
1212 let fields = fields.join(",");
1213 let values = values.join(",");
1214
1215 let sql = format!("INSERT INTO {} ({fields}) VALUES ({values});", self.params.table);
1216 if self.params.sql {
1217 return JsonValue::from(sql.clone());
1218 }
1219 let (state, ids) = self.execute(sql.as_str());
1220
1221 match state {
1222 true => match self.params.autoinc {
1223 true => ids.clone(),
1224 false => data["id"].clone(),
1225 },
1226 false => {
1227 let thread_id = format!("{:?}", thread::current().id());
1228 error!("添加失败: {thread_id} {ids:?} {sql}");
1229 JsonValue::from("")
1230 }
1231 }
1232 }
1233 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1234 let fields_list = self.table_info(&self.params.table.clone());
1235
1236 let mut fields = String::new();
1237 if !self.params.autoinc && data[0]["id"].is_empty() {
1238 data[0]["id"] = "".into();
1239 }
1240 for (field, _) in data[0].entries() {
1241 fields = format!("{fields},`{field}`");
1242 }
1243 fields = fields.trim_start_matches(",").parse().unwrap();
1244
1245 let core_count = num_cpus::get();
1246 let mut p = pools::Pool::new(core_count * 4);
1247 let autoinc = self.params.autoinc;
1248 for list in data.members() {
1249 let mut item = list.clone();
1250 let params_location = self.params.location.clone();
1251 let fields_list_new = fields_list.clone();
1252 p.execute(move |pcindex| {
1253 if !autoinc && item["id"].is_empty() {
1254 let id = format!(
1255 "{:X}{:X}",
1256 Local::now().timestamp_nanos_opt().unwrap(),
1257 pcindex
1258 );
1259 item["id"] = id.into();
1260 }
1261 let mut values = "".to_string();
1262 for (field, value) in item.entries() {
1263 if params_location.has_key(field) {
1264 if value.is_empty() {
1265 values = format!("{values},NULL");
1266 continue;
1267 }
1268 let comment = fields_list_new[field]["comment"].to_string();
1269 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1270 let location = value.to_string().replace(",", " ");
1271 values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1272 continue;
1273 }
1274 if value.is_string() {
1275 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1276 } else if value.is_number() || value.is_boolean() {
1277 values = format!("{values},{value}");
1278 } else {
1279 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1280 }
1281 }
1282 values = format!("({})", values.trim_start_matches(","));
1283 array![item["id"].clone(), values]
1284 });
1285 }
1286 let (ids_list, mut values) = p.insert_all();
1287 values = values.trim_start_matches(",").parse().unwrap();
1288 let sql = format!(
1289 "INSERT INTO {} ({}) VALUES {};",
1290 self.params.table, fields, values
1291 );
1292
1293 if self.params.sql {
1294 return JsonValue::from(sql.clone());
1295 }
1296 let (state, data) = self.execute(sql.as_str());
1297 match state {
1298 true => match autoinc {
1299 true => data,
1300 false => JsonValue::from(ids_list),
1301 },
1302 false => {
1303 error!("insert_all: {data:?}");
1304 array![]
1305 }
1306 }
1307 }
1308 fn update(&mut self, data: JsonValue) -> JsonValue {
1309 let fields_list = self.table_info(&self.params.table.clone());
1310
1311 let mut values = vec![];
1312 for (field, value) in data.entries() {
1313 if !self.params.json[field].is_empty() {
1314 let json = value.to_string().replace("'", "''");
1315 values.push(format!("`{field}`='{json}'"));
1316 continue;
1317 }
1318 if !self.params.location[field].is_empty() {
1319 if value.is_empty() {
1320 values.push(format!("{field}=NULL").to_string());
1321 continue;
1322 }
1323 let comment = fields_list[field]["comment"].to_string();
1324 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1325 let location = value.to_string().replace(",", " ");
1326 values.push(format!("{field}=ST_GeomFromText('POINT({location})',{srid})"));
1327
1328 continue;
1329 }
1330
1331 if value.is_string() {
1332 values.push(format!("`{field}`='{}'", value.to_string().replace("'", "''")));
1333 } else if value.is_number() {
1334 values.push(format!("`{field}`= {value}"));
1335 } else if value.is_array() {
1336 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1337 values.push(format!("`{field}`='{array}'"));
1338 continue;
1339 } else if value.is_object() {
1340 if self.params.json[field].is_empty() {
1341 values.push(format!("`{field}`='{value}'"));
1342 } else {
1343 if value.is_empty() {
1344 values.push(format!("`{field}`=''"));
1345 continue;
1346 }
1347 let json = value.to_string();
1348 let json = json.replace("'", "''");
1349 values.push(format!("`{field}`='{json}'"));
1350 }
1351 continue;
1352 } else if value.is_boolean() {
1353 values.push(format!("`{field}`= {value}"));
1354 } else {
1355 values.push(format!("`{field}`=\"{value}\""));
1356 }
1357 }
1358
1359 for (field, value) in self.params.inc_dec.entries() {
1360 values.push(format!("{field} = {}", value.to_string().clone()));
1361 }
1362
1363 let values = values.join(",");
1364
1365 let sql = format!("UPDATE {} SET {values} {};", self.params.table.clone(), self.params.where_sql());
1366 if self.params.sql {
1367 return JsonValue::from(sql.clone());
1368 }
1369 let (state, data) = self.execute(sql.as_str());
1370 if state {
1371 data
1372 } else {
1373 let thread_id = format!("{:?}", thread::current().id());
1374 error!("update: {thread_id} {data:?} {sql}");
1375 0.into()
1376 }
1377 }
1378
1379 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1380 let fields_list = self.table_info(&self.params.table.clone());
1381 let mut values = vec![];
1382 let mut ids = vec![];
1383 for (field, _) in data[0].entries() {
1384 if field == "id" {
1385 continue;
1386 }
1387 let mut fields = vec![];
1388 for row in data.members() {
1389 let value = row[field].clone();
1390 let id = row["id"].clone();
1391 ids.push(id.clone());
1392
1393 if self.params.json.has_key(field) {
1394 let json = value.to_string();
1395 let json = json.replace("'", "''");
1396 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1397 continue;
1398 }
1399 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1400 let comment = fields_list[field]["comment"].to_string();
1401 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1402 let location = value.to_string().replace(",", " ");
1403 let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1404 fields.push(format!("WHEN '{id}' THEN {location}"));
1405 continue;
1406 }
1407 if value.is_string() {
1408 fields.push(format!("WHEN '{id}' THEN '{}'", value.to_string().replace("'", "''")));
1409 } else if value.is_array() || value.is_object() {
1410 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1411 } else if value.is_number() || value.is_boolean() || value.is_null() {
1412 fields.push(format!("WHEN '{id}' THEN {value}"));
1413 } else {
1414 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1415 }
1416 }
1417 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1418 }
1419 self.where_and("id", "in", ids.into());
1420 for (field, value) in self.params.inc_dec.entries() {
1421 values.push(format!("{} = {}", field, value.to_string().clone()));
1422 }
1423
1424 let values = values.join(",");
1425 let sql = format!(
1426 "UPDATE {} SET {} {} {};",
1427 self.params.table.clone(),
1428 values,
1429 self.params.where_sql(),
1430 self.params.page_limit_sql()
1431 );
1432 if self.params.sql {
1433 return JsonValue::from(sql.clone());
1434 }
1435 let (state, data) = self.execute(sql.as_str());
1436 if state {
1437 data
1438 } else {
1439 error!("update_all: {data:?}");
1440 JsonValue::from(0)
1441 }
1442 }
1443
1444 fn delete(&mut self) -> JsonValue {
1445 let sql = format!(
1446 "delete FROM {} {} {};",
1447 self.params.table.clone(),
1448 self.params.where_sql(),
1449 self.params.page_limit_sql()
1450 );
1451 if self.params.sql {
1452 return JsonValue::from(sql.clone());
1453 }
1454 let (state, data) = self.execute(sql.as_str());
1455 match state {
1456 true => data,
1457 false => {
1458 error!("delete 失败>>> {data:?}");
1459 JsonValue::from(0)
1460 }
1461 }
1462 }
1463 fn transaction(&mut self) -> bool {
1464 let thread_id = format!("{:?}", thread::current().id());
1465
1466 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1467 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1468 t += 1;
1469 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1470 return true;
1471 }
1472 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1473
1474 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1475
1476 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1477 Ok(e) => e,
1478 Err(err) => {
1479 error!("query 超时: {err}");
1480 return false;
1481 }
1482 };
1483 let key = format!("{}{}", self.default, thread_id);
1484 TR.lock().unwrap().insert(key.clone(), conn);
1485
1486 let (state, _) = self.query(sql.as_str());
1487 match state {
1488 true => state,
1489 false => {
1490 TR.lock().unwrap().remove(&*key);
1491 TRANS.lock().unwrap().remove(&*thread_id.clone());
1492 state
1493 }
1494 }
1495 }
1496
1497 fn commit(&mut self) -> bool {
1498 let thread_id = format!("{:?}", thread::current().id());
1499 let sql = "COMMIT".to_string();
1500
1501 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1502 if t > 1 {
1503 t -= 1;
1504 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1505 return true;
1506 }
1507 let (state, data) = self.query(sql.as_str());
1508 TRANS.lock().unwrap().remove(&thread_id);
1509 let key = format!("{}{}", self.default, thread_id);
1510 TR.lock().unwrap().remove(&*key);
1511
1512 let t = TRANS_TABLE.lock().unwrap().clone();
1513 for (key, value) in t.iter() {
1514 if value.clone() == thread_id {
1515 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1516 }
1517 }
1518
1519 match state {
1520 true => {}
1521 false => {
1522 error!("提交事务失败: {data}");
1523 }
1524 }
1525 state
1526 }
1527
1528 fn rollback(&mut self) -> bool {
1529 let thread_id = format!("{:?}", thread::current().id());
1530 let sql = "ROLLBACK".to_string();
1531
1532 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1533 if t > 1 {
1534 t -= 1;
1535 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1536 return true;
1537 }
1538 let (state, data) = self.query(sql.as_str());
1539 TRANS.lock().unwrap().remove(&thread_id);
1540 let key = format!("{}{}", self.default, thread_id);
1541 TR.lock().unwrap().remove(&*key);
1542
1543 let t = TRANS_TABLE.lock().unwrap().clone();
1544 for (key, value) in t.iter() {
1545 if value.clone() == thread_id {
1546 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1547 }
1548 }
1549
1550 match state {
1551 true => {}
1552 false => {
1553 error!("回滚失败: {data}");
1554 }
1555 }
1556 state
1557 }
1558
1559 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1560 let (state, data) = self.query(sql);
1561 match state {
1562 true => Ok(data),
1563 false => Err(data.to_string()),
1564 }
1565 }
1566
1567 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1568 let (state, data) = self.execute(sql);
1569 match state {
1570 true => Ok(data),
1571 false => Err(data.to_string()),
1572 }
1573 }
1574
1575 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1576 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1577 self
1578 }
1579 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1580 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1581 self
1582 }
1583
1584 fn buildsql(&mut self) -> String {
1585 self.fetch_sql();
1586 let sql = self.select().to_string();
1587 format!("( {} ) `{}`", sql, self.params.table)
1588 }
1589
1590 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1591 for field in fields.clone() {
1592 if field.contains(format!("{}.", self.params.table).as_str()) {
1593 self.params.fields[field] = field.into();
1594 } else {
1595 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1596 }
1597 }
1598 self
1599 }
1600
1601 fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1602 let main_table = if main_table.is_empty() {
1603 self.params.table.clone()
1604 } else {
1605 main_table.to_string()
1606 };
1607 self.params.join_table = right_table.to_string();
1608 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1609 self
1610 }
1611
1612 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1613 let main_fields = if main_fields.is_empty() {
1614 "id"
1615 } else {
1616 main_fields
1617 };
1618 let second_fields = if second_fields.is_empty() {
1619 self.params.table.clone()
1620 } else {
1621 second_fields.to_string().clone()
1622 };
1623 let sec_table_name = format!("{}{}", table, "_2");
1624 let second_table = format!("{} {}", table, sec_table_name.clone());
1625 self.params.join_table = sec_table_name.clone();
1626 self.params.join.push(format!(
1627 " INNER JOIN {} ON {}.{} = {}.{}",
1628 second_table, self.params.table, main_fields, sec_table_name, second_fields
1629 ));
1630 self
1631 }
1632}