1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::{Connection, TABLE_FIELDS};
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::{Queryable};
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text, Value};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19lazy_static! {
20 static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
21 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
22 static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
23 Arc::new(Mutex::new(HashMap::new()));
24}
25#[cfg(any(feature = "default", feature = "db-mysql"))]
26#[derive(Clone, Debug)]
27pub struct Mysql {
28 pub connection: Connection,
30 pub default: String,
32 pub params: Params,
33 pub pool: Pool,
34}
35
36impl Mysql {
37 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
38 let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
39
40 let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
41
42 match Pool::new(opts) {
43 Ok(pool) => Ok(Self {
44 connection: connection.clone(),
45 default: default.clone(),
46 params: Params::default("mysql"),
47 pool,
48 }),
49 Err(e) => {
50 error!("connect: {e}");
51 Err(e.to_string())
52 }
53 }
54 }
55 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
56 if sql.contains("INSERT") {
57 let rows = text.affected_rows();
58 if rows > 1 {
59 if self.params.autoinc {
60 let row = rows;
61 let start_row = text.last_insert_id().unwrap();
62 let end_row = start_row + row;
63
64 let mut ids = array![];
65 for item in start_row..end_row {
66 ids.push(item).unwrap();
67 }
68 (true, ids)
69 } else {
70 (true, JsonValue::from(rows))
71 }
72 } else {
73 (true, JsonValue::from(text.last_insert_id()))
74 }
75 } else {
76 (true, JsonValue::from(text.affected_rows()))
77 }
78 }
79 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
80 let mut list = array![];
81 let mut index = 0;
82 text.for_each(|row| {
83 match row {
84 Ok(r) => {
85 let mut data = object! {};
86 for (index, item) in r.columns().iter().enumerate() {
87 let field = item.name_str();
88 let field = field.to_string();
89 let field = field.as_str();
90
91 data[field] = match item.column_type() {
92 ColumnType::MYSQL_TYPE_TINY => {
93 let t = r.get::<bool, _>(index).unwrap_or(true);
94 JsonValue::from(t)
95 }
96 ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
97 let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
98 if t == NULL {
99 JsonValue::from(0.0)
100 } else {
101 match r.get::<f64, _>(index) {
102 None => JsonValue::from(0.0),
103 Some(t) => JsonValue::from(t),
104 }
105 }
106 }
107 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
108 let t = r.index(field).clone();
109 if t == NULL {
110 JsonValue::from(0)
111 } else {
112 let t = r.get::<i64, _>(index).unwrap();
113 JsonValue::from(t)
114 }
115 }
116 ColumnType::MYSQL_TYPE_NULL => {
117 let t = r.index(field).clone();
118 if t == NULL {
119 JsonValue::from("".to_string())
120 } else {
121 let t = r.get::<String, _>(index).unwrap_or("".to_string());
122 JsonValue::from(t)
123 }
124 }
125 ColumnType::MYSQL_TYPE_BLOB => {
126 let t = r.index(field).clone();
127 if t == NULL {
128 JsonValue::from("".to_string())
129 } else {
130 let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
131 if t == NULL {
132 JsonValue::from("".to_string())
133 } else {
134 let t = r.get::<String, _>(index).unwrap_or("".to_string());
135 JsonValue::from(t)
136 }
137 }
138 }
139 ColumnType::MYSQL_TYPE_VAR_STRING => {
140 let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
141 if t == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let t = r.get::<String, _>(index).unwrap_or("".to_string());
145 JsonValue::from(t)
146 }
147 }
148 ColumnType::MYSQL_TYPE_STRING => {
149 let t = r.index(field).clone();
150 if t == NULL {
151 JsonValue::from("".to_string())
152 } else {
153 let t = r.get::<String, _>(index).unwrap_or("".to_string());
154 JsonValue::from(t)
155 }
156 }
157 ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME | ColumnType::MYSQL_TYPE_LONG_BLOB | ColumnType::MYSQL_TYPE_TIMESTAMP | ColumnType::MYSQL_TYPE_TIME => {
158 let t = r.index(field).clone();
159 if t == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let t = r.get::<String, _>(index).unwrap_or("".to_string());
163 JsonValue::from(t)
164 }
165 }
166 ColumnType::MYSQL_TYPE_GEOMETRY => {
167 let t = r.index(field).clone();
168 if t == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let res = match r.index(field).clone() {
172 Value::Bytes(e) => e,
173 _ => vec![]
174 };
175 let x = f64::from_le_bytes(res[9..17].try_into().unwrap());
176 let y = f64::from_le_bytes(res[17..25].try_into().unwrap());
177 JsonValue::from(format!("{x},{y}"))
178 }
179 }
180 _ => {
181 let t = r.index(field).clone();
182 info!("未知: {} {:?} {:?}", field, item.column_type(), t);
183 JsonValue::from("".to_string())
184 }
185 };
186 }
187 list.push(data).unwrap();
188 }
189 Err(e) => {
190 error!("err: {e} \r\n {sql}");
191 }
192 }
193 index += 1;
194 });
195 (true, list)
196 }
197 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
198 let thread_id = format!("{:?}", thread::current().id());
199 let key = format!("{}{}", self.default, thread_id);
200
201 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
202 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
203 Ok(e) => e,
204 Err(err) => {
205 error!("非事务 execute超时: {err}");
206 return (false, object! {});
207 }
208 };
209 let connection_id = db.connection_id();
210 return match db.query_iter(sql) {
211 Ok(e) => {
212 if self.connection.debug {
213 info!("查询成功: {} {}", thread_id.clone(), sql);
214 }
215 self.query_handle(e, sql)
216 }
217 Err(e) => {
218 error!(
219 "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
220 );
221 (false, JsonValue::from(e.to_string()))
222 }
223 };
224 } else {
225 let mut tr = TR.lock().unwrap();
226 let db = tr.get_mut(&*key).unwrap();
227 let connection_id = db.connection_id();
228 return match db.query_iter(sql) {
229 Ok(e) => {
230 if self.connection.debug {
231 info!("查询成功: {} {}", thread_id.clone(), sql);
232 }
233 self.query_handle(e, sql)
234 }
235 Err(e) => {
236 error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
237 (false, JsonValue::from(e.to_string()))
238 }
239 };
240 };
241 }
242 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
243 let thread_id = format!("{:?}", thread::current().id());
244 let key = format!("{}{}", self.default, thread_id);
245
246 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
247 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
248 Ok(e) => e,
249 Err(err) => {
250 error!("非事务: execute超时: {err}");
251 return (false, object! {});
252 }
253 };
254 return match db.exec_iter(sql, ()) {
255 Ok(e) => {
256 if self.connection.debug {
257 info!("提交成功: {} {}", thread_id.clone(), sql);
258 }
259 self.execute_cl(e, sql)
260 }
261 Err(e) => {
262 error!("非事务提交失败: {thread_id} {e} {sql}");
263 (false, JsonValue::from(e.to_string()))
264 }
265 };
266 } else {
267 let mut count_flag: i64 = 0;
269 loop {
270 let mut t = TRANS_TABLE.lock().unwrap();
271 if t.get(&self.params.table).is_none() {
272 t.insert(self.params.table.clone(), thread_id.clone());
273 break;
274 }
275 if t.get(&self.params.table).unwrap().clone() == thread_id {
276 break;
277 }
278 thread::yield_now();
279
280 count_flag += 1;
281 if count_flag == 10000 {
282 warn!("execute循环次数: 1w,强制退出");
283 break;
284 }
285 }
286
287 let mut tr = TR.lock().unwrap();
288 let db = tr.get_mut(&*key).unwrap();
289
290 return match db.exec_iter(sql, ()) {
291 Ok(e) => {
292 if self.connection.debug {
293 info!("提交成功: {} {}", thread_id.clone(), sql);
294 }
295
296 self.execute_cl(e, sql)
297 }
298 Err(e) => {
299 error!("事务提交失败: {thread_id} {e} {sql}");
300 (false, JsonValue::from(e.to_string()))
301 }
302 };
303 };
304 }
305}
306
307impl DbMode for Mysql {
308 fn database_tables(&mut self) -> JsonValue {
309 let sql = "SHOW TABLES".to_string();
310 match self.sql(sql.as_str()) {
311 Ok(e) => {
312 let mut list = vec![];
313 for item in e.members() {
314 for (_, value) in item.entries() {
315 list.push(value.clone());
316 }
317 }
318 list.into()
319 }
320 Err(_) => {
321 array![]
322 }
323 }
324 }
325
326 fn database_create(&mut self, name: &str) -> bool {
327 let sql = format!("CREATE DATABASE {name}");
328
329 let (state, data) = self.execute(sql.as_str());
330 match state {
331 true => data.as_bool().unwrap(),
332 false => {
333 error!("创建数据库失败: {data:?}");
334 false
335 }
336 }
337 }
338}
339
340impl Mode for Mysql {
341 fn table_create(&mut self, options: TableOptions) -> JsonValue {
342 let mut sql = String::new();
343 let mut unique_fields = String::new();
345 let mut unique_name = String::new();
346 let mut unique = String::new();
347 for item in options.table_unique.iter() {
348 if unique_fields.is_empty() {
349 unique_fields = format!("`{item}`");
350 unique_name = format!("{}_unique_{}", options.table_name, item);
351 } else {
352 unique_fields = format!("{unique_fields},`{item}`");
353 unique_name = format!("{unique_name}_{item}");
354 }
355 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
356 unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
357 }
358
359 let mut index = String::new();
361 for row in options.table_index.iter() {
362 let mut index_fields = String::new();
363 let mut index_name = String::new();
364 for item in row.iter() {
365 if index_fields.is_empty() {
366 index_fields = format!("`{item}`");
367 index_name = format!("{}_index_{}", options.table_name, item);
368 } else {
369 index_fields = format!("{index_fields},`{item}`");
370 index_name = format!("{index_name}_{item}");
371 }
372 }
373 if index.is_empty() {
374 index = format!("INDEX `{index_name}` ({index_fields})");
375 } else {
376 index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
377 }
378 }
379 if index.replace(",", "").is_empty() {
380 index = index.replace(",", "");
381 }
382
383 for (name, field) in options.table_fields.entries() {
384 let row = br_fields::field("mysql", name, field.clone());
385 sql = format!("{sql} {row},\r\n");
386 }
387
388 if !unique.is_empty() {
389 sql = sql.trim_end_matches(",\r\n").to_string();
390 sql = format!("{sql},\r\n{unique}");
391 }
392 if !index.is_empty() {
393 sql = sql.trim_end_matches(",\r\n").to_string();
394 sql = format!("{sql},\r\n{index}");
395 }
396 let collate = format!("{}_bin", self.connection.charset.str());
397
398 let partition = if options.table_partition {
400 sql = format!(
401 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
402 sql,
403 options.table_key,
404 options.table_partition_columns[0].clone()
405 );
406 let temp_head = format!(
407 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
408 options.table_partition_columns[0].clone()
409 );
410 let mut partition_array = vec![];
411 let mut count = 0;
412 for member in options.table_partition_columns[1].members() {
413 let temp = format!(
414 "PARTITION p{} VALUES LESS THAN ('{}')",
415 count.clone(),
416 member.clone()
417 );
418 count += 1;
419 partition_array.push(temp.clone());
420 }
421 let temp_body = partition_array.join(",\r\n");
422 let temp_end = format!(
423 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
424 count.clone()
425 );
426 format!("{temp_head}{temp_body}{temp_end}")
427 } else {
428 sql = if sql.trim_end().ends_with(",") {
429 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
430 } else {
431 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
432 };
433 "".to_string()
434 };
435 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
436
437 if self.params.sql {
438 return JsonValue::from(sql);
439 }
440
441 let (state, data) = self.execute(sql.as_str());
442
443 match state {
444 true => JsonValue::from(state),
445 false => {
446 info!("创建错误: {data}");
447 JsonValue::from(state)
448 }
449 }
450 }
451
452 fn table_update(&mut self, options: TableOptions) -> JsonValue {
453 if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, options.table_name)).is_some() {
454 TABLE_FIELDS.lock().unwrap().remove(&format!("{}{}", self.default, options.table_name));
455 }
456 let mut sql = vec![];
457 let fields_list = self.table_info(&options.table_name);
458 let mut put = vec![];
459 let mut add = vec![];
460 let mut del = vec![];
461 for (key, _) in fields_list.entries() {
462 if options.table_fields[key].is_empty() {
463 del.push(key);
464 }
465 }
466 for (name, field) in options.table_fields.entries() {
467 if !fields_list[name].is_empty() {
468 let old_comment = fields_list[name]["comment"].to_string();
469 let new_comment = br_fields::field("mysql", name, field.clone());
470 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
471 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
472 if old_comment == new_comment_text {
473 continue;
474 }
475 put.push(name);
476 } else {
477 add.push(name);
478 }
479 }
480
481 for name in add.iter() {
482 let name = name.to_string();
483 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
484 sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
485 }
486 for name in del.iter() {
487 sql.push(format!("ALTER TABLE {} DROP `{name}`;\r\n", options.table_name));
488 }
489 for name in put.iter() {
490 let name = name.to_string();
491 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
492 sql.push(format!(
493 "ALTER TABLE {} CHANGE `{}` {};\r\n",
494 options.table_name, name, row
495 ));
496 }
497
498 let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
499 let (_, pk_list) = self.query(
501 format!(
502 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
503 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
504 self.connection.database, options.table_name
505 ).as_str(),
506 );
507 let mut pk_vec = vec![];
508 for member in pk_list.members() {
509 pk_vec.push(member["COLUMN_NAME"].to_string());
510 }
511
512 let mut unique_new = vec![];
513 let mut index_new = vec![];
514 for item in index_list.members() {
515 let key_name = item["Key_name"].as_str().unwrap();
516 let non_unique = item["Non_unique"].as_i32().unwrap();
517
518 if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
519 {
520 unique_new.push(key_name.to_string());
521 continue;
522 }
523 if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
524 {
525 index_new.push(key_name.to_string());
526 continue;
527 }
528 }
529
530 let mut unique_fields = String::new();
531 let mut unique_name = String::new();
532 for item in options.table_unique.iter() {
533 if unique_fields.is_empty() {
534 unique_fields = format!("`{item}`");
535 unique_name = format!("{}_unique_{}", options.table_name, item);
536 } else {
537 unique_fields = format!("{unique_fields},`{item}`");
538 unique_name = format!("{unique_name}_{item}");
539 }
540 }
541 if !unique_name.is_empty() {
542 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
543 unique_name = format!("unique_{md5}");
544 for item in &unique_new {
545 if unique_name != *item {
546 sql.push(format!(
547 "alter table {} drop index {};\r\n",
548 options.table_name, item
549 ));
550 }
551 }
552 if !unique_new.contains(&unique_name) {
553 sql.push(format!(
554 "CREATE UNIQUE index {} on {} ({});\r\n",
555 unique_name, options.table_name, unique_fields
556 ));
557 }
558 }
559
560 let mut index_list = vec![];
561 for row in options.table_index.iter() {
563 let mut index_fields = String::new();
564 let mut index_name = String::new();
565 for item in row {
566 if index_fields.is_empty() {
567 index_fields = item.to_string();
568 index_name = format!("{}_index_{}", options.table_name, item);
569 } else {
570 index_fields = format!("{index_fields},{item}");
571 index_name = format!("{index_name}_{item}");
572 }
573 }
574 index_list.push(index_name.clone());
575 if !index_new.contains(&index_name.clone()) {
576 sql.push(format!(
577 "CREATE INDEX {} on {} ({});\r\n",
578 index_name, options.table_name, index_fields
579 ));
580 }
581 }
582
583 for item in index_new {
584 if !index_list.contains(&item.to_string()) {
585 sql.push(format!(
586 "DROP INDEX {} ON {};\r\n",
587 item.clone(),
588 options.table_name
589 ));
590 }
591 }
592
593 if options.table_partition {
595 if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
597 {
598 let pk = format!(
599 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
600 options.table_name,
601 options.table_key,
602 options.table_partition_columns[0].clone()
603 );
604 sql.push(pk);
605 let temp_head = format!(
606 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
607 options.table_name,
608 options.table_partition_columns[0].clone()
609 );
610 let mut partition_array = vec![];
611 let mut count = 0;
612 for member in options.table_partition_columns[1].members() {
613 let temp = format!(
614 "PARTITION p{} VALUES LESS THAN ('{}')",
615 count.clone(),
616 member.clone()
617 );
618 count += 1;
619 partition_array.push(temp.clone());
620 }
621 let temp_body = partition_array.join(",\r\n");
622 let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
623 sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
624 }
625 } else if pk_vec.len() != 1 {
626 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
627 sql.push(rm_partition);
628 let pk = format!(
629 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
630 options.table_name, options.table_key
631 );
632 sql.push(pk);
633 };
634
635 if self.params.sql {
636 return JsonValue::from(sql.join(""));
637 }
638
639 if sql.is_empty() {
640 return JsonValue::from(-1);
641 }
642
643 for item in sql.iter() {
644 let (state, res) = self.execute(item.as_str());
645 match state {
646 true => {}
647 false => {
648 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
649 return JsonValue::from(0);
650 }
651 }
652 }
653 JsonValue::from(1)
654 }
655
656 fn table_info(&mut self, table: &str) -> JsonValue {
657 if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).is_some() {
658 return TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).unwrap().clone();
659 }
660 let sql = format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'");
661 let (state, data) = self.query(sql.as_str());
662 let mut list = object! {};
663 if state {
664 for item in data.members() {
665 if item["TABLE_SCHEMA"] != self.connection.database {
666 continue;
667 }
668 let mut row = object! {};
669 row["field"] = item["COLUMN_NAME"].clone();
670 row["comment"] = item["COLUMN_COMMENT"].clone();
671 row["type"] = item["COLUMN_TYPE"].clone();
672 list[row["field"].as_str().unwrap()] = row.clone();
673 }
674 TABLE_FIELDS.lock().unwrap().insert(format!("{}{}", self.default, table), list.clone());
675 list
676 } else {
677 list
678 }
679 }
680
681 fn table_is_exist(&mut self, name: &str) -> bool {
682 let sql = format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
683 let (state, data) = self.query(sql.as_str());
684 match state {
685 true => {
686 for item in data.members() {
687 if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
688 {
689 return true;
690 }
691 }
692 false
693 }
694 false => false,
695 }
696 }
697
698 fn table(&mut self, name: &str) -> &mut Mysql {
699 self.params = Params::default(self.connection.mode.str().as_str());
700 self.params.table = format!("{}{}", self.connection.prefix, name);
701 self.params.join_table = self.params.table.clone();
702 self
703 }
704
705 fn change_table(&mut self, name: &str) -> &mut Self {
706 self.params.join_table = name.to_string();
707 self
708 }
709
710 fn autoinc(&mut self) -> &mut Self {
711 self.params.autoinc = true;
712 self
713 }
714
715 fn fetch_sql(&mut self) -> &mut Self {
716 self.params.sql = true;
717 self
718 }
719
720 fn order(&mut self, field: &str, by: bool) -> &mut Self {
721 self.params.order[field] = {
722 if by {
723 "DESC"
724 } else {
725 "ASC"
726 }
727 }.into();
728 self
729 }
730
731 fn group(&mut self, field: &str) -> &mut Self {
732 let fields: Vec<&str> = field.split(",").collect();
733 for field in fields.iter() {
734 let field = field.to_string();
735 self.params.group[field.as_str()] = field.clone().into();
736 self.params.fields[field.as_str()] = field.clone().into();
737 }
738
739 self
740 }
741
742 fn distinct(&mut self) -> &mut Self {
743 self.params.distinct = true;
744 self
745 }
746
747 fn json(&mut self, field: &str) -> &mut Self {
748 let list: Vec<&str> = field.split(",").collect();
749 for item in list.iter() {
750 self.params.json[item.to_string().as_str()] = item.to_string().into();
751 }
752 self
753 }
754
755 fn location(&mut self, field: &str) -> &mut Self {
756 let list: Vec<&str> = field.split(",").collect();
757 for item in list.iter() {
758 self.params.location[item.to_string().as_str()] = item.to_string().into();
759 }
760 self
761 }
762
763 fn field(&mut self, field: &str) -> &mut Self {
764 let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
765 let join_table = if self.params.join_table.is_empty() {
766 self.params.table.clone()
767 } else {
768 self.params.join_table.clone()
769 };
770 for item in list.iter() {
771 if item.contains(" as ") {
772 let text = item.split(" as ").collect::<Vec<&str>>();
773 if text[0].contains("count(") {
774 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
775 } else {
776 self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
777 }
778 } else {
779 self.params.fields[item.to_string().as_str()] = format!("{join_table}.`{item}`").into();
780 }
781 }
782 self
783 }
784
785 fn hidden(&mut self, name: &str) -> &mut Self {
786 let hidden: Vec<&str> = name.split(",").collect();
787
788 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
789
790 let mut data = array![];
791 for item in fields_list.members() {
792 data.push(object! {
793 "name":item["COLUMN_NAME"].as_str().unwrap()
794 }).unwrap();
795 }
796
797 for item in data.members() {
798 let name = item["name"].as_str().unwrap();
799 if !hidden.contains(&name) {
800 self.params.fields[name] = name.into();
801 }
802 }
803 self
804 }
805
806 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
807 let table_fields = self.table_info(&self.params.table.clone());
808 let join_table = if self.params.join_table.is_empty() {
809 self.params.table.clone()
810 } else {
811 self.params.join_table.clone()
812 };
813 if value.is_boolean() {
814 if value.as_bool().unwrap() {
815 value = 1.into();
816 } else {
817 value = 0.into();
818 }
819 }
820 match compare {
821 "between" => {
822 self.params.where_and.push(format!("{join_table}.`{field}` between '{}' AND '{}'", value[0], value[1]));
823 }
824 "location" => {
825 let comment = table_fields[field]["comment"].to_string();
826 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
827
828 let field_name = format!("ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}", value[0], value[1], value[4]);
829 self.params.fields[&field_name.clone()] = field_name.clone().into();
830 let location = format!("ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}", value[0], value[1], value[2], value[3]);
833 self.params.where_and.push(location);
834 }
835 "set" => {
836 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
837 let mut wheredata = vec![];
838 for item in list.iter() {
839 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
840 }
841 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
842 }
843 "notin" => {
844 let mut text = String::new();
845 for item in value.members() {
846 text = format!("{text},'{item}'");
847 }
848 text = text.trim_start_matches(",").into();
849 self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
850 }
851 "is" => {
852 self.params.where_and.push(format!("{join_table}.`{field}` is {value}"));
853 }
854 "isnot" => {
855 self.params.where_and.push(format!("{join_table}.`{field}` is not {value}"));
856 }
857 "notlike" => {
858 self.params.where_and.push(format!("{join_table}.`{field}` not like '{value}'"));
859 }
860 "in" => {
861 let mut text = String::new();
862 if value.is_array() {
863 for item in value.members() {
864 text = format!("{text},'{item}'");
865 }
866 } else if value.is_null() {
867 text = format!("{text},null");
868 } else {
869 let value = value.as_str().unwrap();
870
871 let value: Vec<&str> = value.split(",").collect();
872 for item in value.iter() {
873 text = format!("{text},'{item}'");
874 }
875 }
876 text = text.trim_start_matches(",").into();
877
878 self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
879 }
880 _ => {
881 self.params.where_and.push(format!("{join_table}.`{field}` {compare} '{value}'"));
882 }
883 }
884 self
885 }
886
887 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
888 let join_table = if self.params.join_table.is_empty() {
889 self.params.table.clone()
890 } else {
891 self.params.join_table.clone()
892 };
893
894 if value.is_boolean() {
895 if value.as_bool().unwrap() {
896 value = 1.into();
897 } else {
898 value = 0.into();
899 }
900 }
901
902 match compare {
903 "between" => {
904 self.params.where_or.push(format!(
905 "{}.`{}` between '{}' AND '{}'",
906 join_table, field, value[0], value[1]
907 ));
908 }
909 "set" => {
910 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
911 let mut wheredata = vec![];
912 for item in list.iter() {
913 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
914 }
915 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
916 }
917 "notin" => {
918 let mut text = String::new();
919 for item in value.members() {
920 text = format!("{text},'{item}'");
921 }
922 text = text.trim_start_matches(",").into();
923 self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
924 }
925 "is" => {
926 self.params.where_or.push(format!("{join_table}.`{field}` is {value}"));
927 }
928 "isnot" => {
929 self.params.where_or.push(format!("{join_table}.`{field}` IS NOT {value}"));
930 }
931 "in" => {
932 let mut text = String::new();
933 if value.is_array() {
934 for item in value.members() {
935 text = format!("{text},'{item}'");
936 }
937 } else {
938 let value = value.as_str().unwrap();
939 let value: Vec<&str> = value.split(",").collect();
940 for item in value.iter() {
941 text = format!("{text},'{item}'");
942 }
943 }
944 text = text.trim_start_matches(",").into();
945 self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
946 }
947 _ => {
948 self.params.where_or.push(format!("{join_table}.`{field}` {compare} '{value}'"));
949 }
950 }
951 self
952 }
953
954 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
955 self.params.where_column = format!(
956 "{}.`{}` {} {}.`{}`",
957 self.params.table, field_a, compare, self.params.table, field_b
958 );
959 self
960 }
961
962 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
963 self.params.page = page;
964 self.params.limit = limit;
965 self
966 }
967
968 fn column(&mut self, field: &str) -> JsonValue {
969 self.field(field);
970 let sql = self.params.select_sql();
971
972 if self.params.sql {
973 return JsonValue::from(sql);
974 }
975 let (state, data) = self.query(sql.as_str());
976 match state {
977 true => {
978 let mut list = array![];
979 for item in data.members() {
980 if self.params.json[field].is_empty() {
981 list.push(item[field].clone()).unwrap();
982 } else {
983 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
984 list.push(data).unwrap();
985 }
986 }
987 list
988 }
989 false => {
990 array![]
991 }
992 }
993 }
994
995 fn count(&mut self) -> JsonValue {
996 self.params.fields["count"] = "count(*) as count".to_string().into();
997 let sql = self.params.select_sql();
998 if self.params.sql {
999 return JsonValue::from(sql.clone());
1000 }
1001 let (state, data) = self.query(sql.as_str());
1002 if state {
1003 data[0]["count"].clone()
1004 } else {
1005 JsonValue::from(0)
1006 }
1007 }
1008
1009 fn max(&mut self, field: &str) -> JsonValue {
1010 self.params.fields[field] = format!("max({field}) as {field}").into();
1011 let sql = self.params.select_sql();
1012 if self.params.sql {
1013 return JsonValue::from(sql.clone());
1014 }
1015 let (state, data) = self.query(sql.as_str());
1016 if state {
1017 if data.len() > 1 {
1018 return data.clone();
1019 }
1020 data[0][field].clone()
1021 } else {
1022 JsonValue::from(0)
1023 }
1024 }
1025
1026 fn min(&mut self, field: &str) -> JsonValue {
1027 self.params.fields[field] = format!("min({field}) as {field}").into();
1028 let sql = self.params.select_sql();
1029 if self.params.sql {
1030 return JsonValue::from(sql.clone());
1031 }
1032 let (state, data) = self.query(sql.as_str());
1033 if state {
1034 if data.len() > 1 {
1035 return data;
1036 }
1037 data[0][field].clone()
1038 } else {
1039 JsonValue::from(0)
1040 }
1041 }
1042
1043 fn sum(&mut self, field: &str) -> JsonValue {
1044 self.params.fields[field] = format!("sum({field}) as {field}").into();
1045 let sql = self.params.select_sql();
1046 if self.params.sql {
1047 return JsonValue::from(sql.clone());
1048 }
1049 let (state, data) = self.query(sql.as_str());
1050 match state {
1051 true => {
1052 if data.len() > 1 {
1053 return data;
1054 }
1055 data[0][field].clone()
1056 }
1057 false => JsonValue::from(0),
1058 }
1059 }
1060
1061 fn avg(&mut self, field: &str) -> JsonValue {
1062 self.params.fields[field] = format!("avg({field}) as {field}").into();
1063 let sql = self.params.select_sql();
1064 if self.params.sql {
1065 return JsonValue::from(sql.clone());
1066 }
1067 let (state, data) = self.query(sql.as_str());
1068 if state {
1069 if data.len() > 1 {
1070 return data;
1071 }
1072 data[0][field].clone()
1073 } else {
1074 JsonValue::from(0)
1075 }
1076 }
1077
1078 fn select(&mut self) -> JsonValue {
1079 let sql = self.params.select_sql();
1080 if self.params.sql {
1081 return JsonValue::from(sql.clone());
1082 }
1083 let (state, mut data) = self.query(sql.as_str());
1084 match state {
1085 true => {
1086 for (field, _) in self.params.json.entries() {
1087 for item in data.members_mut() {
1088 if !item[field].is_empty() {
1089 let json = item[field].to_string();
1090 item[field] = match json::parse(&json) {
1091 Ok(e) => e,
1092 Err(_) => JsonValue::from(json),
1093 };
1094 }
1095 }
1096 }
1097 data.clone()
1098 }
1099 false => array![],
1100 }
1101 }
1102
1103 fn find(&mut self) -> JsonValue {
1104 self.params.page = 1;
1105 self.params.limit = 1;
1106 let sql = self.params.select_sql();
1107 if self.params.sql {
1108 return JsonValue::from(sql.clone());
1109 }
1110 let (state, mut data) = self.query(sql.as_str());
1111 match state {
1112 true => {
1113 if data.is_empty() {
1114 return object! {};
1115 }
1116 for (field, _) in self.params.json.entries() {
1117 if !data[0][field].is_empty() {
1118 let json = data[0][field].to_string();
1119 let json = json::parse(&json).unwrap_or(array![]);
1120 data[0][field] = json;
1121 } else {
1122 data[0][field] = array![];
1123 }
1124 }
1125 data[0].clone()
1126 }
1127 false => {
1128 error!("find失败: {data:?}");
1129 object! {}
1130 }
1131 }
1132 }
1133
1134 fn value(&mut self, field: &str) -> JsonValue {
1135 self.params.fields = object! {};
1136 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1137 self.params.page = 1;
1138 self.params.limit = 1;
1139 let sql = self.params.select_sql();
1140 if self.params.sql {
1141 return JsonValue::from(sql.clone());
1142 }
1143 let (state, mut data) = self.query(sql.as_str());
1144 match state {
1145 true => {
1146 for (field, _) in self.params.json.entries() {
1147 if !data[0][field].is_empty() {
1148 let json = data[0][field].to_string();
1149 let json = json::parse(&json).unwrap_or(array![]);
1150 data[0][field] = json;
1151 } else {
1152 data[0][field] = array![];
1153 }
1154 }
1155 data[0][field].clone()
1156 }
1157 false => {
1158 if self.connection.debug {
1159 info!("{data:?}");
1160 }
1161 JsonValue::Null
1162 }
1163 }
1164 }
1165 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1166 let fields_list = self.table_info(&self.params.table.clone());
1167
1168 let mut fields = vec![];
1169 let mut values = vec![];
1170 if !self.params.autoinc && data["id"].is_empty() {
1171 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1172 }
1173 for (field, value) in data.entries() {
1174 fields.push(format!("`{field}`"));
1175
1176 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1177 if value.is_empty() {
1178 values.push("NULL".to_string());
1179 continue;
1180 }
1181 let comment = fields_list[field]["comment"].to_string();
1182 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1183 let location = value.to_string().replace(",", " ");
1184 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1185 continue;
1186 }
1187
1188 if value.is_string() || value.is_array() || value.is_object() {
1189 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1190 continue;
1191 } else if value.is_number() || value.is_boolean() || value.is_null() {
1192 values.push(format!("{value}"));
1193 continue;
1194 } else {
1195 values.push(format!("'{value}'"));
1196 continue;
1197 }
1198 }
1199 let fields = fields.join(",");
1200 let values = values.join(",");
1201
1202 let sql = format!("INSERT INTO {} ({fields}) VALUES ({values});", self.params.table);
1203 if self.params.sql {
1204 return JsonValue::from(sql.clone());
1205 }
1206 let (state, ids) = self.execute(sql.as_str());
1207
1208 match state {
1209 true => match self.params.autoinc {
1210 true => ids.clone(),
1211 false => data["id"].clone(),
1212 },
1213 false => {
1214 let thread_id = format!("{:?}", thread::current().id());
1215 error!("添加失败: {thread_id} {ids:?} {sql}");
1216 JsonValue::from("")
1217 }
1218 }
1219 }
1220 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1221 let fields_list = self.table_info(&self.params.table.clone());
1222
1223 let mut fields = String::new();
1224 if !self.params.autoinc && data[0]["id"].is_empty() {
1225 data[0]["id"] = "".into();
1226 }
1227 for (field, _) in data[0].entries() {
1228 fields = format!("{fields},`{field}`");
1229 }
1230 fields = fields.trim_start_matches(",").parse().unwrap();
1231
1232 let core_count = num_cpus::get();
1233 let mut p = pools::Pool::new(core_count * 4);
1234 let autoinc = self.params.autoinc;
1235 for list in data.members() {
1236 let mut item = list.clone();
1237 let params_location = self.params.location.clone();
1238 let fields_list_new = fields_list.clone();
1239 p.execute(move |pcindex| {
1240 if !autoinc && item["id"].is_empty() {
1241 let id = format!(
1242 "{:X}{:X}",
1243 Local::now().timestamp_nanos_opt().unwrap(),
1244 pcindex
1245 );
1246 item["id"] = id.into();
1247 }
1248 let mut values = "".to_string();
1249 for (field, value) in item.entries() {
1250 if params_location.has_key(field) {
1251 if value.is_empty() {
1252 values = format!("{values},NULL");
1253 continue;
1254 }
1255 let comment = fields_list_new[field]["comment"].to_string();
1256 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1257 let location = value.to_string().replace(",", " ");
1258 values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1259 continue;
1260 }
1261 if value.is_string() {
1262 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1263 } else if value.is_number() || value.is_boolean() {
1264 values = format!("{values},{value}");
1265 } else {
1266 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1267 }
1268 }
1269 values = format!("({})", values.trim_start_matches(","));
1270 array![item["id"].clone(), values]
1271 });
1272 }
1273 let (ids_list, mut values) = p.insert_all();
1274 values = values.trim_start_matches(",").parse().unwrap();
1275 let sql = format!(
1276 "INSERT INTO {} ({}) VALUES {};",
1277 self.params.table, fields, values
1278 );
1279
1280 if self.params.sql {
1281 return JsonValue::from(sql.clone());
1282 }
1283 let (state, data) = self.execute(sql.as_str());
1284 match state {
1285 true => match autoinc {
1286 true => data,
1287 false => JsonValue::from(ids_list),
1288 },
1289 false => {
1290 error!("insert_all: {data:?}");
1291 array![]
1292 }
1293 }
1294 }
1295 fn update(&mut self, data: JsonValue) -> JsonValue {
1296 let fields_list = self.table_info(&self.params.table.clone());
1297
1298 let mut values = vec![];
1299 for (field, value) in data.entries() {
1300 if !self.params.json[field].is_empty() {
1301 let json = value.to_string().replace("'", "''");
1302 values.push(format!("`{field}`='{json}'"));
1303 continue;
1304 }
1305 if !self.params.location[field].is_empty() {
1306 if value.is_empty() {
1307 values.push(format!("{field}=NULL").to_string());
1308 continue;
1309 }
1310 let comment = fields_list[field]["comment"].to_string();
1311 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1312 let location = value.to_string().replace(",", " ");
1313 values.push(format!("{field}=ST_GeomFromText('POINT({location})',{srid})"));
1314
1315 continue;
1316 }
1317
1318 if value.is_string() {
1319 values.push(format!("`{field}`='{}'", value.to_string().replace("'", "''")));
1320 } else if value.is_number() {
1321 values.push(format!("`{field}`= {value}"));
1322 } else if value.is_array() {
1323 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1324 values.push(format!("`{field}`='{array}'"));
1325 continue;
1326 } else if value.is_object() {
1327 if self.params.json[field].is_empty() {
1328 values.push(format!("`{field}`='{value}'"));
1329 } else {
1330 if value.is_empty() {
1331 values.push(format!("`{field}`=''"));
1332 continue;
1333 }
1334 let json = value.to_string();
1335 let json = json.replace("'", "''");
1336 values.push(format!("`{field}`='{json}'"));
1337 }
1338 continue;
1339 } else if value.is_boolean() {
1340 values.push(format!("`{field}`= {value}"));
1341 } else {
1342 values.push(format!("`{field}`=\"{value}\""));
1343 }
1344 }
1345
1346 for (field, value) in self.params.inc_dec.entries() {
1347 values.push(format!("{field} = {}", value.to_string().clone()));
1348 }
1349
1350 let values = values.join(",");
1351
1352 let sql = format!("UPDATE {} SET {values} {};", self.params.table.clone(), self.params.where_sql());
1353 if self.params.sql {
1354 return JsonValue::from(sql.clone());
1355 }
1356 let (state, data) = self.execute(sql.as_str());
1357 if state {
1358 data
1359 } else {
1360 let thread_id = format!("{:?}", thread::current().id());
1361 error!("update: {thread_id} {data:?} {sql}");
1362 0.into()
1363 }
1364 }
1365
1366 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1367 let fields_list = self.table_info(&self.params.table.clone());
1368 let mut values = vec![];
1369 let mut ids = vec![];
1370 for (field, _) in data[0].entries() {
1371 if field == "id" {
1372 continue;
1373 }
1374 let mut fields = vec![];
1375 for row in data.members() {
1376 let value = row[field].clone();
1377 let id = row["id"].clone();
1378 ids.push(id.clone());
1379
1380 if self.params.json.has_key(field) {
1381 let json = value.to_string();
1382 let json = json.replace("'", "''");
1383 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1384 continue;
1385 }
1386 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1387 let comment = fields_list[field]["comment"].to_string();
1388 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1389 let location = value.to_string().replace(",", " ");
1390 let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1391 fields.push(format!("WHEN '{id}' THEN {location}"));
1392 continue;
1393 }
1394 if value.is_string() {
1395 fields.push(format!("WHEN '{id}' THEN '{}'", value.to_string().replace("'", "''")));
1396 } else if value.is_array() || value.is_object() {
1397 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1398 } else if value.is_number() || value.is_boolean() || value.is_null() {
1399 fields.push(format!("WHEN '{id}' THEN {value}"));
1400 } else {
1401 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1402 }
1403 }
1404 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1405 }
1406 self.where_and("id", "in", ids.into());
1407 for (field, value) in self.params.inc_dec.entries() {
1408 values.push(format!("{} = {}", field, value.to_string().clone()));
1409 }
1410
1411 let values = values.join(",");
1412 let sql = format!(
1413 "UPDATE {} SET {} {} {};",
1414 self.params.table.clone(),
1415 values,
1416 self.params.where_sql(),
1417 self.params.page_limit_sql()
1418 );
1419 if self.params.sql {
1420 return JsonValue::from(sql.clone());
1421 }
1422 let (state, data) = self.execute(sql.as_str());
1423 if state {
1424 data
1425 } else {
1426 error!("update_all: {data:?}");
1427 JsonValue::from(0)
1428 }
1429 }
1430
1431 fn delete(&mut self) -> JsonValue {
1432 let sql = format!(
1433 "delete FROM {} {} {};",
1434 self.params.table.clone(),
1435 self.params.where_sql(),
1436 self.params.page_limit_sql()
1437 );
1438 if self.params.sql {
1439 return JsonValue::from(sql.clone());
1440 }
1441 let (state, data) = self.execute(sql.as_str());
1442 match state {
1443 true => data,
1444 false => {
1445 error!("delete 失败>>> {data:?}");
1446 JsonValue::from(0)
1447 }
1448 }
1449 }
1450 fn transaction(&mut self) -> bool {
1451 let thread_id = format!("{:?}", thread::current().id());
1452
1453 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1454 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1455 t += 1;
1456 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1457 return true;
1458 }
1459 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1460
1461 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1462
1463 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1464 Ok(e) => e,
1465 Err(err) => {
1466 error!("query 超时: {err}");
1467 return false;
1468 }
1469 };
1470 let key = format!("{}{}", self.default, thread_id);
1471 TR.lock().unwrap().insert(key.clone(), conn);
1472
1473 let (state, _) = self.query(sql.as_str());
1474 match state {
1475 true => state,
1476 false => {
1477 TR.lock().unwrap().remove(&*key);
1478 TRANS.lock().unwrap().remove(&*thread_id.clone());
1479 state
1480 }
1481 }
1482 }
1483
1484 fn commit(&mut self) -> bool {
1485 let thread_id = format!("{:?}", thread::current().id());
1486 let sql = "COMMIT".to_string();
1487
1488 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1489 if t > 1 {
1490 t -= 1;
1491 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1492 return true;
1493 }
1494 let (state, data) = self.query(sql.as_str());
1495 TRANS.lock().unwrap().remove(&thread_id);
1496 let key = format!("{}{}", self.default, thread_id);
1497 TR.lock().unwrap().remove(&*key);
1498
1499 let t = TRANS_TABLE.lock().unwrap().clone();
1500 for (key, value) in t.iter() {
1501 if value.clone() == thread_id {
1502 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1503 }
1504 }
1505
1506 match state {
1507 true => {}
1508 false => {
1509 error!("提交事务失败: {data}");
1510 }
1511 }
1512 state
1513 }
1514
1515 fn rollback(&mut self) -> bool {
1516 let thread_id = format!("{:?}", thread::current().id());
1517 let sql = "ROLLBACK".to_string();
1518
1519 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1520 if t > 1 {
1521 t -= 1;
1522 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1523 return true;
1524 }
1525 let (state, data) = self.query(sql.as_str());
1526 TRANS.lock().unwrap().remove(&thread_id);
1527 let key = format!("{}{}", self.default, thread_id);
1528 TR.lock().unwrap().remove(&*key);
1529
1530 let t = TRANS_TABLE.lock().unwrap().clone();
1531 for (key, value) in t.iter() {
1532 if value.clone() == thread_id {
1533 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1534 }
1535 }
1536
1537 match state {
1538 true => {}
1539 false => {
1540 error!("回滚失败: {data}");
1541 }
1542 }
1543 state
1544 }
1545
1546 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1547 let (state, data) = self.query(sql);
1548 match state {
1549 true => Ok(data),
1550 false => Err(data.to_string()),
1551 }
1552 }
1553
1554 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1555 let (state, data) = self.execute(sql);
1556 match state {
1557 true => Ok(data),
1558 false => Err(data.to_string()),
1559 }
1560 }
1561
1562 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1563 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1564 self
1565 }
1566 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1567 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1568 self
1569 }
1570
1571 fn buildsql(&mut self) -> String {
1572 self.fetch_sql();
1573 let sql = self.select().to_string();
1574 format!("( {} ) `{}`", sql, self.params.table)
1575 }
1576
1577 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1578 for field in fields {
1579 self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1580 }
1581 self
1582 }
1583
1584 fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1585 let main_table = if main_table.is_empty() {
1586 self.params.table.clone()
1587 } else {
1588 main_table.to_string()
1589 };
1590 self.params.join_table = right_table.to_string();
1591 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1592 self
1593 }
1594
1595 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1596 let main_fields = if main_fields.is_empty() {
1597 "id"
1598 } else {
1599 main_fields
1600 };
1601 let second_fields = if second_fields.is_empty() {
1602 self.params.table.clone()
1603 } else {
1604 second_fields.to_string().clone()
1605 };
1606 let sec_table_name = format!("{}{}", table, "_2");
1607 let second_table = format!("{} {}", table, sec_table_name.clone());
1608 self.params.join_table = sec_table_name.clone();
1609 self.params.join.push(format!(
1610 " INNER JOIN {} ON {}.{} = {}.{}",
1611 second_table, self.params.table, main_fields, sec_table_name, second_fields
1612 ));
1613 self
1614 }
1615}