1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::{Connection, TABLE_FIELDS};
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::{Queryable};
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text, Value};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19lazy_static! {
20 static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
21 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
22 static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
23 Arc::new(Mutex::new(HashMap::new()));
24}
25#[cfg(any(feature = "default", feature = "db-mysql"))]
26#[derive(Clone, Debug)]
27pub struct Mysql {
28 pub connection: Connection,
30 pub default: String,
32 pub params: Params,
33 pub pool: Pool,
34}
35
36impl Mysql {
37 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
38 let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
39
40 let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
41
42 match Pool::new(opts) {
43 Ok(pool) => Ok(Self {
44 connection: connection.clone(),
45 default: default.clone(),
46 params: Params::default("mysql"),
47 pool,
48 }),
49 Err(e) => {
50 error!("connect: {e}");
51 Err(e.to_string())
52 }
53 }
54 }
55 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
56 if sql.contains("INSERT") {
57 let rows = text.affected_rows();
58 if rows > 1 {
59 if self.params.autoinc {
60 let row = rows;
61 let start_row = text.last_insert_id().unwrap();
62 let end_row = start_row + row;
63
64 let mut ids = array![];
65 for item in start_row..end_row {
66 ids.push(item).unwrap();
67 }
68 (true, ids)
69 } else {
70 (true, JsonValue::from(rows))
71 }
72 } else {
73 (true, JsonValue::from(text.last_insert_id()))
74 }
75 } else {
76 (true, JsonValue::from(text.affected_rows()))
77 }
78 }
79 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
80 let mut list = array![];
81 let mut index = 0;
82 text.for_each(|row| {
83 match row {
84 Ok(r) => {
85 let mut data = object! {};
86 for (index, item) in r.columns().iter().enumerate() {
87 let field = item.name_str();
88 let field = field.to_string();
89 let field = field.as_str();
90
91 data[field] = match item.column_type() {
92 ColumnType::MYSQL_TYPE_TINY => {
93 let t = r.get::<bool, _>(index).unwrap_or(true);
94 JsonValue::from(t)
95 }
96 ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
97 let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
98 if t == NULL {
99 JsonValue::from(0.0)
100 } else {
101 match r.get::<f64, _>(index) {
102 None => JsonValue::from(0.0),
103 Some(t) => JsonValue::from(t),
104 }
105 }
106 }
107 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
108 let t = r.index(field).clone();
109 if t == NULL {
110 JsonValue::from(0)
111 } else {
112 let t = r.get::<i64, _>(index).unwrap();
113 JsonValue::from(t)
114 }
115 }
116 ColumnType::MYSQL_TYPE_NULL => {
117 let t = r.index(field).clone();
118 if t == NULL {
119 JsonValue::from("".to_string())
120 } else {
121 let t = r.get::<String, _>(index).unwrap_or("".to_string());
122 JsonValue::from(t)
123 }
124 }
125 ColumnType::MYSQL_TYPE_BLOB => {
126 let t = r.index(field).clone();
127 if t == NULL {
128 JsonValue::from("".to_string())
129 } else {
130 let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
131 if t == NULL {
132 JsonValue::from("".to_string())
133 } else {
134 let t = r.get::<String, _>(index).unwrap_or("".to_string());
135 JsonValue::from(t)
136 }
137 }
138 }
139 ColumnType::MYSQL_TYPE_VAR_STRING => {
140 let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
141 if t == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let t = r.get::<String, _>(index).unwrap_or("".to_string());
145 JsonValue::from(t)
146 }
147 }
148 ColumnType::MYSQL_TYPE_STRING => {
149 let t = r.index(field).clone();
150 if t == NULL {
151 JsonValue::from("".to_string())
152 } else {
153 let t = r.get::<String, _>(index).unwrap_or("".to_string());
154 JsonValue::from(t)
155 }
156 }
157 ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME | ColumnType::MYSQL_TYPE_LONG_BLOB | ColumnType::MYSQL_TYPE_TIMESTAMP | ColumnType::MYSQL_TYPE_TIME => {
158 let t = r.index(field).clone();
159 if t == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let t = r.get::<String, _>(index).unwrap_or("".to_string());
163 JsonValue::from(t)
164 }
165 }
166 ColumnType::MYSQL_TYPE_GEOMETRY => {
167 let t = r.index(field).clone();
168 if t == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let res = match r.index(field).clone() {
172 Value::Bytes(e) => e,
173 _ => vec![]
174 };
175 let x = f64::from_le_bytes(res[9..17].try_into().unwrap());
176 let y = f64::from_le_bytes(res[17..25].try_into().unwrap());
177 JsonValue::from(format!("{x},{y}"))
178 }
179 }
180 _ => {
181 let t = r.index(field).clone();
182 info!("未知: {} {:?} {:?}", field, item.column_type(), t);
183 JsonValue::from("".to_string())
184 }
185 };
186 }
187 list.push(data).unwrap();
188 }
189 Err(e) => {
190 error!("err: {e} \r\n {sql}");
191 }
192 }
193 index += 1;
194 });
195 (true, list)
196 }
197 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
198 let thread_id = format!("{:?}", thread::current().id());
199 let key = format!("{}{}", self.default, thread_id);
200
201 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
202 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
203 Ok(e) => e,
204 Err(err) => {
205 error!("非事务 execute超时: {err}");
206 return (false, object! {});
207 }
208 };
209 let connection_id = db.connection_id();
210 return match db.query_iter(sql) {
211 Ok(e) => {
212 if self.connection.debug {
213 info!("查询成功: {} {}", thread_id.clone(), sql);
214 }
215 self.query_handle(e, sql)
216 }
217 Err(e) => {
218 error!(
219 "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
220 );
221 (false, JsonValue::from(e.to_string()))
222 }
223 };
224 } else {
225 let mut tr = TR.lock().unwrap();
226 let db = tr.get_mut(&*key).unwrap();
227 let connection_id = db.connection_id();
228 return match db.query_iter(sql) {
229 Ok(e) => {
230 if self.connection.debug {
231 info!("查询成功: {} {}", thread_id.clone(), sql);
232 }
233 self.query_handle(e, sql)
234 }
235 Err(e) => {
236 error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
237 (false, JsonValue::from(e.to_string()))
238 }
239 };
240 };
241 }
242 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
243 let thread_id = format!("{:?}", thread::current().id());
244 let key = format!("{}{}", self.default, thread_id);
245
246 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
247 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
248 Ok(e) => e,
249 Err(err) => {
250 error!("非事务: execute超时: {err}");
251 return (false, object! {});
252 }
253 };
254 return match db.exec_iter(sql, ()) {
255 Ok(e) => {
256 if self.connection.debug {
257 info!("提交成功: {} {}", thread_id.clone(), sql);
258 }
259 self.execute_cl(e, sql)
260 }
261 Err(e) => {
262 error!("非事务提交失败: {thread_id} {e} {sql}");
263 (false, JsonValue::from(e.to_string()))
264 }
265 };
266 } else {
267 let mut count_flag: i64 = 0;
269 loop {
270 let mut t = TRANS_TABLE.lock().unwrap();
271 if t.get(&self.params.table).is_none() {
272 t.insert(self.params.table.clone(), thread_id.clone());
273 break;
274 }
275 if t.get(&self.params.table).unwrap().clone() == thread_id {
276 break;
277 }
278 thread::yield_now();
279
280 count_flag += 1;
281 if count_flag == 10000 {
282 warn!("execute循环次数: 1w,强制退出");
283 break;
284 }
285 }
286
287 let mut tr = TR.lock().unwrap();
288 let db = tr.get_mut(&*key).unwrap();
289
290 return match db.exec_iter(sql, ()) {
291 Ok(e) => {
292 if self.connection.debug {
293 info!("提交成功: {} {}", thread_id.clone(), sql);
294 }
295
296 self.execute_cl(e, sql)
297 }
298 Err(e) => {
299 error!("事务提交失败: {thread_id} {e} {sql}");
300 (false, JsonValue::from(e.to_string()))
301 }
302 };
303 };
304 }
305}
306
307impl DbMode for Mysql {
308 fn database_tables(&mut self) -> JsonValue {
309 let sql = "SHOW TABLES".to_string();
310 match self.sql(sql.as_str()) {
311 Ok(e) => {
312 let mut list = vec![];
313 for item in e.members() {
314 for (_, value) in item.entries() {
315 list.push(value.clone());
316 }
317 }
318 list.into()
319 }
320 Err(_) => {
321 array![]
322 }
323 }
324 }
325
326 fn database_create(&mut self, name: &str) -> bool {
327 let sql = format!("CREATE DATABASE {name}");
328
329 let (state, data) = self.execute(sql.as_str());
330 match state {
331 true => data.as_bool().unwrap(),
332 false => {
333 error!("创建数据库失败: {data:?}");
334 false
335 }
336 }
337 }
338}
339
340impl Mode for Mysql {
341 fn table_create(&mut self, options: TableOptions) -> JsonValue {
342 let mut sql = String::new();
343 let mut unique_fields = String::new();
345 let mut unique_name = String::new();
346 let mut unique = String::new();
347 for item in options.table_unique.iter() {
348 if unique_fields.is_empty() {
349 unique_fields = format!("`{item}`");
350 unique_name = format!("{}_unique_{}", options.table_name, item);
351 } else {
352 unique_fields = format!("{unique_fields},`{item}`");
353 unique_name = format!("{unique_name}_{item}");
354 }
355 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
356 unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
357 }
358
359 let mut index = String::new();
361 for row in options.table_index.iter() {
362 let mut index_fields = String::new();
363 let mut index_name = String::new();
364 for item in row.iter() {
365 if index_fields.is_empty() {
366 index_fields = format!("`{item}`");
367 index_name = format!("{}_index_{}", options.table_name, item);
368 } else {
369 index_fields = format!("{index_fields},`{item}`");
370 index_name = format!("{index_name}_{item}");
371 }
372 }
373 if index.is_empty() {
374 index = format!("INDEX `{index_name}` ({index_fields})");
375 } else {
376 index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
377 }
378 }
379 if index.replace(",", "").is_empty() {
380 index = index.replace(",", "");
381 }
382
383 for (name, field) in options.table_fields.entries() {
384 let row = br_fields::field("mysql", name, field.clone());
385 sql = format!("{sql} {row},\r\n");
386 }
387
388 if !unique.is_empty() {
389 sql = sql.trim_end_matches(",\r\n").to_string();
390 sql = format!("{sql},\r\n{unique}");
391 }
392 if !index.is_empty() {
393 sql = sql.trim_end_matches(",\r\n").to_string();
394 sql = format!("{sql},\r\n{index}");
395 }
396 let collate = format!("{}_bin", self.connection.charset.str());
397
398 let partition = if options.table_partition {
400 sql = format!(
401 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
402 sql,
403 options.table_key,
404 options.table_partition_columns[0].clone()
405 );
406 let temp_head = format!(
407 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
408 options.table_partition_columns[0].clone()
409 );
410 let mut partition_array = vec![];
411 let mut count = 0;
412 for member in options.table_partition_columns[1].members() {
413 let temp = format!(
414 "PARTITION p{} VALUES LESS THAN ('{}')",
415 count.clone(),
416 member.clone()
417 );
418 count += 1;
419 partition_array.push(temp.clone());
420 }
421 let temp_body = partition_array.join(",\r\n");
422 let temp_end = format!(
423 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
424 count.clone()
425 );
426 format!("{temp_head}{temp_body}{temp_end}")
427 } else {
428 sql = if sql.trim_end().ends_with(",") {
429 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
430 } else {
431 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
432 };
433 "".to_string()
434 };
435 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
436
437 if self.params.sql {
438 return JsonValue::from(sql);
439 }
440
441 let (state, data) = self.execute(sql.as_str());
442
443 match state {
444 true => JsonValue::from(state),
445 false => {
446 info!("创建错误: {data}");
447 JsonValue::from(state)
448 }
449 }
450 }
451
452 fn table_update(&mut self, options: TableOptions) -> JsonValue {
453 if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, options.table_name)).is_some() {
454 TABLE_FIELDS.lock().unwrap().remove(&format!("{}{}", self.default, options.table_name));
455 }
456 let mut sql = vec![];
457 let fields_list = self.table_info(&options.table_name);
458 let mut put = vec![];
459 let mut add = vec![];
460 let mut del = vec![];
461 for (key, _) in fields_list.entries() {
462 if options.table_fields[key].is_empty() {
463 del.push(key);
464 }
465 }
466 for (name, field) in options.table_fields.entries() {
467 if !fields_list[name].is_empty() {
468 let old_comment = fields_list[name]["comment"].to_string();
469 let new_comment = br_fields::field("mysql", name, field.clone());
470 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
471 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
472 if old_comment == new_comment_text {
473 continue;
474 }
475 put.push(name);
476 } else {
477 add.push(name);
478 }
479 }
480
481 for name in add.iter() {
482 let name = name.to_string();
483 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
484 sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
485 }
486 for name in del.iter() {
487 sql.push(format!("ALTER TABLE {} DROP `{name}`;\r\n", options.table_name));
488 }
489 for name in put.iter() {
490 let name = name.to_string();
491 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
492 sql.push(format!(
493 "ALTER TABLE {} CHANGE `{}` {};\r\n",
494 options.table_name, name, row
495 ));
496 }
497
498 let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
499 let (_, pk_list) = self.query(
501 format!(
502 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
503 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
504 self.connection.database, options.table_name
505 ).as_str(),
506 );
507 let mut pk_vec = vec![];
508 for member in pk_list.members() {
509 pk_vec.push(member["COLUMN_NAME"].to_string());
510 }
511
512 let mut unique_new = vec![];
513 let mut index_new = vec![];
514 for item in index_list.members() {
515 let key_name = item["Key_name"].as_str().unwrap();
516 let non_unique = item["Non_unique"].as_i32().unwrap();
517
518 if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
519 {
520 unique_new.push(key_name.to_string());
521 continue;
522 }
523 if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
524 {
525 index_new.push(key_name.to_string());
526 continue;
527 }
528 }
529
530 let mut unique_fields = String::new();
531 let mut unique_name = String::new();
532 for item in options.table_unique.iter() {
533 if unique_fields.is_empty() {
534 unique_fields = format!("`{item}`");
535 unique_name = format!("{}_unique_{}", options.table_name, item);
536 } else {
537 unique_fields = format!("{unique_fields},`{item}`");
538 unique_name = format!("{unique_name}_{item}");
539 }
540 }
541 if !unique_name.is_empty() {
542 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
543 unique_name = format!("unique_{md5}");
544 for item in &unique_new {
545 if unique_name != *item {
546 sql.push(format!(
547 "alter table {} drop index {};\r\n",
548 options.table_name, item
549 ));
550 }
551 }
552 if !unique_new.contains(&unique_name) {
553 sql.push(format!(
554 "CREATE UNIQUE index {} on {} ({});\r\n",
555 unique_name, options.table_name, unique_fields
556 ));
557 }
558 }
559
560 let mut index_list = vec![];
561 for row in options.table_index.iter() {
563 let mut index_fields = String::new();
564 let mut index_name = String::new();
565 for item in row {
566 if index_fields.is_empty() {
567 index_fields = item.to_string();
568 index_name = format!("{}_index_{}", options.table_name, item);
569 } else {
570 index_fields = format!("{index_fields},{item}");
571 index_name = format!("{index_name}_{item}");
572 }
573 }
574 index_list.push(index_name.clone());
575 if !index_new.contains(&index_name.clone()) {
576 sql.push(format!(
577 "CREATE INDEX {} on {} ({});\r\n",
578 index_name, options.table_name, index_fields
579 ));
580 }
581 }
582
583 for item in index_new {
584 if !index_list.contains(&item.to_string()) {
585 sql.push(format!(
586 "DROP INDEX {} ON {};\r\n",
587 item.clone(),
588 options.table_name
589 ));
590 }
591 }
592
593 if options.table_partition {
595 if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
597 {
598 let pk = format!(
599 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
600 options.table_name,
601 options.table_key,
602 options.table_partition_columns[0].clone()
603 );
604 sql.push(pk);
605 let temp_head = format!(
606 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
607 options.table_name,
608 options.table_partition_columns[0].clone()
609 );
610 let mut partition_array = vec![];
611 let mut count = 0;
612 for member in options.table_partition_columns[1].members() {
613 let temp = format!(
614 "PARTITION p{} VALUES LESS THAN ('{}')",
615 count.clone(),
616 member.clone()
617 );
618 count += 1;
619 partition_array.push(temp.clone());
620 }
621 let temp_body = partition_array.join(",\r\n");
622 let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
623 sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
624 }
625 } else if pk_vec.len() != 1 {
626 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
627 sql.push(rm_partition);
628 let pk = format!(
629 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
630 options.table_name, options.table_key
631 );
632 sql.push(pk);
633 };
634
635 if self.params.sql {
636 return JsonValue::from(sql.join(""));
637 }
638
639 if sql.is_empty() {
640 return JsonValue::from(-1);
641 }
642
643 for item in sql.iter() {
644 let (state, res) = self.execute(item.as_str());
645 match state {
646 true => {}
647 false => {
648 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
649 return JsonValue::from(0);
650 }
651 }
652 }
653 JsonValue::from(1)
654 }
655
656 fn table_info(&mut self, table: &str) -> JsonValue {
657 if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).is_some() {
658 return TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).unwrap().clone();
659 }
660 let sql = format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'");
661 let (state, data) = self.query(sql.as_str());
662 let mut list = object! {};
663 if state {
664 for item in data.members() {
665 if item["TABLE_SCHEMA"] != self.connection.database {
666 continue;
667 }
668 let mut row = object! {};
669 row["field"] = item["COLUMN_NAME"].clone();
670 row["comment"] = item["COLUMN_COMMENT"].clone();
671 row["type"] = item["COLUMN_TYPE"].clone();
672 list[row["field"].as_str().unwrap()] = row.clone();
673 }
674 TABLE_FIELDS.lock().unwrap().insert(format!("{}{}", self.default, table), list.clone());
675 list
676 } else {
677 list
678 }
679 }
680
681 fn table_is_exist(&mut self, name: &str) -> bool {
682 let sql = format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
683 let (state, data) = self.query(sql.as_str());
684 match state {
685 true => {
686 for item in data.members() {
687 if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
688 {
689 return true;
690 }
691 }
692 false
693 }
694 false => false,
695 }
696 }
697
698 fn table(&mut self, name: &str) -> &mut Mysql {
699 self.params = Params::default(self.connection.mode.str().as_str());
700 self.params.table = format!("{}{}", self.connection.prefix, name);
701 self.params.join_table = self.params.table.clone();
702 self
703 }
704
705 fn change_table(&mut self, name: &str) -> &mut Self {
706 self.params.join_table = name.to_string();
707 self
708 }
709
710 fn autoinc(&mut self) -> &mut Self {
711 self.params.autoinc = true;
712 self
713 }
714
715 fn fetch_sql(&mut self) -> &mut Self {
716 self.params.sql = true;
717 self
718 }
719
720 fn order(&mut self, field: &str, by: bool) -> &mut Self {
721 self.params.order[field] = {
722 if by {
723 "DESC"
724 } else {
725 "ASC"
726 }
727 }.into();
728 self
729 }
730
731 fn group(&mut self, field: &str) -> &mut Self {
732 let fields: Vec<&str> = field.split(",").collect();
733 for field in fields.iter() {
734 let field = field.to_string();
735 self.params.group[field.as_str()] = field.clone().into();
736 self.params.fields[field.as_str()] = field.clone().into();
737 }
738
739 self
740 }
741
742 fn distinct(&mut self) -> &mut Self {
743 self.params.distinct = true;
744 self
745 }
746
747 fn json(&mut self, field: &str) -> &mut Self {
748 let list: Vec<&str> = field.split(",").collect();
749 for item in list.iter() {
750 self.params.json[item.to_string().as_str()] = item.to_string().into();
751 }
752 self
753 }
754
755 fn location(&mut self, field: &str) -> &mut Self {
756 let list: Vec<&str> = field.split(",").collect();
757 for item in list.iter() {
758 self.params.location[item.to_string().as_str()] = item.to_string().into();
759 }
760 self
761 }
762
763 fn field(&mut self, field: &str) -> &mut Self {
764 let list: Vec<&str> = field.split(",").collect();
765 let join_table = if self.params.join_table.is_empty() {
766 self.params.table.clone()
767 } else {
768 self.params.join_table.clone()
769 };
770 for item in list.iter() {
771 if item.contains(" as ") {
772 let text = item.split(" as ").collect::<Vec<&str>>();
773 if text[0].contains("count(") {
774 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
775 } else {
776 self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
777 }
778 } else {
779 self.params.fields[item.to_string().as_str()] = format!("{join_table}.`{item}`").into();
780 }
781 }
782 self
783 }
784
785 fn hidden(&mut self, name: &str) -> &mut Self {
786 let hidden: Vec<&str> = name.split(",").collect();
787
788 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
789
790 let mut data = array![];
791 for item in fields_list.members() {
792 data.push(object! {
793 "name":item["COLUMN_NAME"].as_str().unwrap()
794 }).unwrap();
795 }
796
797 for item in data.members() {
798 let name = item["name"].as_str().unwrap();
799 if !hidden.contains(&name) {
800 self.params.fields[name] = name.into();
801 }
802 }
803 self
804 }
805
806 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
807 let table_fields = self.table_info(&self.params.table.clone());
808 let join_table = if self.params.join_table.is_empty() {
809 self.params.table.clone()
810 } else {
811 self.params.join_table.clone()
812 };
813 if value.is_boolean() {
814 if value.as_bool().unwrap() {
815 value = 1.into();
816 } else {
817 value = 0.into();
818 }
819 }
820 match compare {
821 "between" => {
822 self.params.where_and.push(format!("{join_table}.`{field}` between '{}' AND '{}'", value[0], value[1]));
823 }
824 "location" => {
825 let comment = table_fields[field]["comment"].to_string();
826 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
827
828 let field_name = format!("ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}", value[0], value[1], value[4]);
829 self.params.fields[&field_name.clone()] = field_name.clone().into();
830 let location = format!("ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}", value[0], value[1], value[2], value[3]);
833 self.params.where_and.push(location);
834 }
835 "set" => {
836 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
837 let mut wheredata = vec![];
838 for item in list.iter() {
839 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
840 }
841 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
842 }
843 "notin" => {
844 let mut text = String::new();
845 for item in value.members() {
846 text = format!("{text},'{item}'");
847 }
848 text = text.trim_start_matches(",").into();
849 self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
850 }
851 "is" => {
852 self.params.where_and.push(format!("{join_table}.`{field}` is {value}"));
853 }
854 "notlike" => {
855 self.params.where_and.push(format!("{join_table}.`{field}` not like '{value}'"));
856 }
857 "in" => {
858 let mut text = String::new();
859 if value.is_array() {
860 for item in value.members() {
861 text = format!("{text},'{item}'");
862 }
863 } else if value.is_null() {
864 text = format!("{text},null");
865 } else {
866 let value = value.as_str().unwrap();
867
868 let value: Vec<&str> = value.split(",").collect();
869 for item in value.iter() {
870 text = format!("{text},'{item}'");
871 }
872 }
873 text = text.trim_start_matches(",").into();
874
875 self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
876 }
877 _ => {
878 self.params.where_and.push(format!("{join_table}.`{field}` {compare} '{value}'"));
879 }
880 }
881 self
882 }
883
884 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
885 let join_table = if self.params.join_table.is_empty() {
886 self.params.table.clone()
887 } else {
888 self.params.join_table.clone()
889 };
890
891 if value.is_boolean() {
892 if value.as_bool().unwrap() {
893 value = 1.into();
894 } else {
895 value = 0.into();
896 }
897 }
898
899 match compare {
900 "between" => {
901 self.params.where_or.push(format!(
902 "{}.`{}` between '{}' AND '{}'",
903 join_table, field, value[0], value[1]
904 ));
905 }
906 "set" => {
907 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
908 let mut wheredata = vec![];
909 for item in list.iter() {
910 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
911 }
912 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
913 }
914 "notin" => {
915 let mut text = String::new();
916 for item in value.members() {
917 text = format!("{text},'{item}'");
918 }
919 text = text.trim_start_matches(",").into();
920 self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
921 }
922 "in" => {
923 let mut text = String::new();
924 if value.is_array() {
925 for item in value.members() {
926 text = format!("{text},'{item}'");
927 }
928 } else {
929 let value = value.as_str().unwrap();
930 let value: Vec<&str> = value.split(",").collect();
931 for item in value.iter() {
932 text = format!("{text},'{item}'");
933 }
934 }
935 text = text.trim_start_matches(",").into();
936 self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
937 }
938 _ => {
939 self.params.where_or.push(format!("{join_table}.`{field}` {compare} '{value}'"));
940 }
941 }
942 self
943 }
944
945 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
946 self.params.where_column = format!(
947 "{}.`{}` {} {}.`{}`",
948 self.params.table, field_a, compare, self.params.table, field_b
949 );
950 self
951 }
952
953 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
954 self.params.page = page;
955 self.params.limit = limit;
956 self
957 }
958
959 fn column(&mut self, field: &str) -> JsonValue {
960 self.field(field);
961 let sql = self.params.select_sql();
962
963 if self.params.sql {
964 return JsonValue::from(sql);
965 }
966 let (state, data) = self.query(sql.as_str());
967 match state {
968 true => {
969 let mut list = array![];
970 for item in data.members() {
971 if self.params.json[field].is_empty() {
972 list.push(item[field].clone()).unwrap();
973 } else {
974 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
975 list.push(data).unwrap();
976 }
977 }
978 list
979 }
980 false => {
981 array![]
982 }
983 }
984 }
985
986 fn count(&mut self) -> JsonValue {
987 self.params.fields["count"] = "count(*) as count".to_string().into();
988 let sql = self.params.select_sql();
989 if self.params.sql {
990 return JsonValue::from(sql.clone());
991 }
992 let (state, data) = self.query(sql.as_str());
993 if state {
994 data[0]["count"].clone()
995 } else {
996 JsonValue::from(0)
997 }
998 }
999
1000 fn max(&mut self, field: &str) -> JsonValue {
1001 self.params.fields[field] = format!("max({field}) as {field}").into();
1002 let sql = self.params.select_sql();
1003 if self.params.sql {
1004 return JsonValue::from(sql.clone());
1005 }
1006 let (state, data) = self.query(sql.as_str());
1007 if state {
1008 if data.len() > 1 {
1009 return data.clone();
1010 }
1011 data[0][field].clone()
1012 } else {
1013 JsonValue::from(0)
1014 }
1015 }
1016
1017 fn min(&mut self, field: &str) -> JsonValue {
1018 self.params.fields[field] = format!("min({field}) as {field}").into();
1019 let sql = self.params.select_sql();
1020 if self.params.sql {
1021 return JsonValue::from(sql.clone());
1022 }
1023 let (state, data) = self.query(sql.as_str());
1024 if state {
1025 if data.len() > 1 {
1026 return data;
1027 }
1028 data[0][field].clone()
1029 } else {
1030 JsonValue::from(0)
1031 }
1032 }
1033
1034 fn sum(&mut self, field: &str) -> JsonValue {
1035 self.params.fields[field] = format!("sum({field}) as {field}").into();
1036 let sql = self.params.select_sql();
1037 if self.params.sql {
1038 return JsonValue::from(sql.clone());
1039 }
1040 let (state, data) = self.query(sql.as_str());
1041 match state {
1042 true => {
1043 if data.len() > 1 {
1044 return data;
1045 }
1046 data[0][field].clone()
1047 }
1048 false => JsonValue::from(0),
1049 }
1050 }
1051
1052 fn avg(&mut self, field: &str) -> JsonValue {
1053 self.params.fields[field] = format!("avg({field}) as {field}").into();
1054 let sql = self.params.select_sql();
1055 if self.params.sql {
1056 return JsonValue::from(sql.clone());
1057 }
1058 let (state, data) = self.query(sql.as_str());
1059 if state {
1060 if data.len() > 1 {
1061 return data;
1062 }
1063 data[0][field].clone()
1064 } else {
1065 JsonValue::from(0)
1066 }
1067 }
1068
1069 fn select(&mut self) -> JsonValue {
1070 let sql = self.params.select_sql();
1071 if self.params.sql {
1072 return JsonValue::from(sql.clone());
1073 }
1074 let (state, mut data) = self.query(sql.as_str());
1075 match state {
1076 true => {
1077 for (field, _) in self.params.json.entries() {
1078 for item in data.members_mut() {
1079 if !item[field].is_empty() {
1080 let json = item[field].to_string();
1081 item[field] = match json::parse(&json) {
1082 Ok(e) => e,
1083 Err(_) => JsonValue::from(json),
1084 };
1085 }
1086 }
1087 }
1088 data.clone()
1089 }
1090 false => array![],
1091 }
1092 }
1093
1094 fn find(&mut self) -> JsonValue {
1095 self.params.page = 1;
1096 self.params.limit = 1;
1097 let sql = self.params.select_sql();
1098 if self.params.sql {
1099 return JsonValue::from(sql.clone());
1100 }
1101 let (state, mut data) = self.query(sql.as_str());
1102 match state {
1103 true => {
1104 if data.is_empty() {
1105 return object! {};
1106 }
1107 for (field, _) in self.params.json.entries() {
1108 if !data[0][field].is_empty() {
1109 let json = data[0][field].to_string();
1110 let json = json::parse(&json).unwrap_or(array![]);
1111 data[0][field] = json;
1112 } else {
1113 data[0][field] = array![];
1114 }
1115 }
1116 data[0].clone()
1117 }
1118 false => {
1119 error!("find失败: {data:?}");
1120 object! {}
1121 }
1122 }
1123 }
1124
1125 fn value(&mut self, field: &str) -> JsonValue {
1126 self.params.fields = object! {};
1127 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1128 self.params.page = 1;
1129 self.params.limit = 1;
1130 let sql = self.params.select_sql();
1131 if self.params.sql {
1132 return JsonValue::from(sql.clone());
1133 }
1134 let (state, mut data) = self.query(sql.as_str());
1135 match state {
1136 true => {
1137 for (field, _) in self.params.json.entries() {
1138 if !data[0][field].is_empty() {
1139 let json = data[0][field].to_string();
1140 let json = json::parse(&json).unwrap_or(array![]);
1141 data[0][field] = json;
1142 } else {
1143 data[0][field] = array![];
1144 }
1145 }
1146 data[0][field].clone()
1147 }
1148 false => {
1149 if self.connection.debug {
1150 info!("{data:?}");
1151 }
1152 JsonValue::Null
1153 }
1154 }
1155 }
1156 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1157 let fields_list = self.table_info(&self.params.table.clone());
1158
1159 let mut fields = vec![];
1160 let mut values = vec![];
1161 if !self.params.autoinc && data["id"].is_empty() {
1162 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1163 }
1164 for (field, value) in data.entries() {
1165 fields.push(format!("`{field}`"));
1166
1167 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1168 if value.is_empty() {
1169 values.push("NULL".to_string());
1170 continue;
1171 }
1172 let comment = fields_list[field]["comment"].to_string();
1173 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1174 let location = value.to_string().replace(",", " ");
1175 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1176 continue;
1177 }
1178
1179 if value.is_string() || value.is_array() || value.is_object() {
1180 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1181 continue;
1182 } else if value.is_number() || value.is_boolean() || value.is_null() {
1183 values.push(format!("{value}"));
1184 continue;
1185 } else {
1186 values.push(format!("'{value}'"));
1187 continue;
1188 }
1189 }
1190 let fields = fields.join(",");
1191 let values = values.join(",");
1192
1193 let sql = format!("INSERT INTO {} ({fields}) VALUES ({values});", self.params.table);
1194 if self.params.sql {
1195 return JsonValue::from(sql.clone());
1196 }
1197 let (state, ids) = self.execute(sql.as_str());
1198
1199 match state {
1200 true => match self.params.autoinc {
1201 true => ids.clone(),
1202 false => data["id"].clone(),
1203 },
1204 false => {
1205 let thread_id = format!("{:?}", thread::current().id());
1206 error!("添加失败: {thread_id} {ids:?} {sql}");
1207 JsonValue::from("")
1208 }
1209 }
1210 }
1211 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1212 let fields_list = self.table_info(&self.params.table.clone());
1213
1214 let mut fields = String::new();
1215 if !self.params.autoinc && data[0]["id"].is_empty() {
1216 data[0]["id"] = "".into();
1217 }
1218 for (field, _) in data[0].entries() {
1219 fields = format!("{fields},`{field}`");
1220 }
1221 fields = fields.trim_start_matches(",").parse().unwrap();
1222
1223 let core_count = num_cpus::get();
1224 let mut p = pools::Pool::new(core_count * 4);
1225 let autoinc = self.params.autoinc;
1226 for list in data.members() {
1227 let mut item = list.clone();
1228 let params_location = self.params.location.clone();
1229 let fields_list_new = fields_list.clone();
1230 p.execute(move |pcindex| {
1231 if !autoinc && item["id"].is_empty() {
1232 let id = format!(
1233 "{:X}{:X}",
1234 Local::now().timestamp_nanos_opt().unwrap(),
1235 pcindex
1236 );
1237 item["id"] = id.into();
1238 }
1239 let mut values = "".to_string();
1240 for (field, value) in item.entries() {
1241 if params_location.has_key(field) {
1242 if value.is_empty() {
1243 values = format!("{values},NULL");
1244 continue;
1245 }
1246 let comment = fields_list_new[field]["comment"].to_string();
1247 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1248 let location = value.to_string().replace(",", " ");
1249 values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1250 continue;
1251 }
1252 if value.is_string() {
1253 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1254 } else if value.is_number() || value.is_boolean() {
1255 values = format!("{values},{value}");
1256 } else {
1257 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1258 }
1259 }
1260 values = format!("({})", values.trim_start_matches(","));
1261 array![item["id"].clone(), values]
1262 });
1263 }
1264 let (ids_list, mut values) = p.insert_all();
1265 values = values.trim_start_matches(",").parse().unwrap();
1266 let sql = format!(
1267 "INSERT INTO {} ({}) VALUES {};",
1268 self.params.table, fields, values
1269 );
1270
1271 if self.params.sql {
1272 return JsonValue::from(sql.clone());
1273 }
1274 let (state, data) = self.execute(sql.as_str());
1275 match state {
1276 true => match autoinc {
1277 true => data,
1278 false => JsonValue::from(ids_list),
1279 },
1280 false => {
1281 error!("insert_all: {data:?}");
1282 array![]
1283 }
1284 }
1285 }
1286 fn update(&mut self, data: JsonValue) -> JsonValue {
1287 let fields_list = self.table_info(&self.params.table.clone());
1288
1289 let mut values = vec![];
1290 for (field, value) in data.entries() {
1291 if !self.params.json[field].is_empty() {
1292 let json = value.to_string().replace("'", "''");
1293 values.push(format!("`{field}`='{json}'"));
1294 continue;
1295 }
1296 if !self.params.location[field].is_empty() {
1297 if value.is_empty() {
1298 values.push(format!("{field}=NULL").to_string());
1299 continue;
1300 }
1301 let comment = fields_list[field]["comment"].to_string();
1302 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1303 let location = value.to_string().replace(",", " ");
1304 values.push(format!("{field}=ST_GeomFromText('POINT({location})',{srid})"));
1305
1306 continue;
1307 }
1308
1309 if value.is_string() {
1310 values.push(format!("`{field}`='{}'", value.to_string().replace("'", "''")));
1311 } else if value.is_number() {
1312 values.push(format!("`{field}`= {value}"));
1313 } else if value.is_array() {
1314 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1315 values.push(format!("`{field}`='{array}'"));
1316 continue;
1317 } else if value.is_object() {
1318 if self.params.json[field].is_empty() {
1319 values.push(format!("`{field}`='{value}'"));
1320 } else {
1321 if value.is_empty() {
1322 values.push(format!("`{field}`=''"));
1323 continue;
1324 }
1325 let json = value.to_string();
1326 let json = json.replace("'", "''");
1327 values.push(format!("`{field}`='{json}'"));
1328 }
1329 continue;
1330 } else if value.is_boolean() {
1331 values.push(format!("`{field}`= {value}"));
1332 } else {
1333 values.push(format!("`{field}`=\"{value}\""));
1334 }
1335 }
1336
1337 for (field, value) in self.params.inc_dec.entries() {
1338 values.push(format!("{field} = {}", value.to_string().clone()));
1339 }
1340
1341 let values = values.join(",");
1342
1343 let sql = format!("UPDATE {} SET {values} {};", self.params.table.clone(), self.params.where_sql());
1344 if self.params.sql {
1345 return JsonValue::from(sql.clone());
1346 }
1347 let (state, data) = self.execute(sql.as_str());
1348 if state {
1349 data
1350 } else {
1351 let thread_id = format!("{:?}", thread::current().id());
1352 error!("update: {thread_id} {data:?} {sql}");
1353 0.into()
1354 }
1355 }
1356
1357 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1358 let fields_list = self.table_info(&self.params.table.clone());
1359 let mut values = vec![];
1360 let mut ids = vec![];
1361 for (field, _) in data[0].entries() {
1362 if field == "id" {
1363 continue;
1364 }
1365 let mut fields = vec![];
1366 for row in data.members() {
1367 let value = row[field].clone();
1368 let id = row["id"].clone();
1369 ids.push(id.clone());
1370
1371 if self.params.json.has_key(field) {
1372 let json = value.to_string();
1373 let json = json.replace("'", "''");
1374 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1375 continue;
1376 }
1377 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1378 let comment = fields_list[field]["comment"].to_string();
1379 let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1380 let location = value.to_string().replace(",", " ");
1381 let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1382 fields.push(format!("WHEN '{id}' THEN {location}"));
1383 continue;
1384 }
1385 if value.is_string() {
1386 fields.push(format!("WHEN '{id}' THEN '{}'", value.to_string().replace("'", "''")));
1387 } else if value.is_array() || value.is_object() {
1388 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1389 } else if value.is_number() || value.is_boolean() || value.is_null() {
1390 fields.push(format!("WHEN '{id}' THEN {value}"));
1391 } else {
1392 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1393 }
1394 }
1395 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1396 }
1397 self.where_and("id", "in", ids.into());
1398 for (field, value) in self.params.inc_dec.entries() {
1399 values.push(format!("{} = {}", field, value.to_string().clone()));
1400 }
1401
1402 let values = values.join(",");
1403 let sql = format!(
1404 "UPDATE {} SET {} {} {};",
1405 self.params.table.clone(),
1406 values,
1407 self.params.where_sql(),
1408 self.params.page_limit_sql()
1409 );
1410 if self.params.sql {
1411 return JsonValue::from(sql.clone());
1412 }
1413 let (state, data) = self.execute(sql.as_str());
1414 if state {
1415 data
1416 } else {
1417 error!("update_all: {data:?}");
1418 JsonValue::from(0)
1419 }
1420 }
1421
1422 fn delete(&mut self) -> JsonValue {
1423 let sql = format!(
1424 "delete FROM {} {} {};",
1425 self.params.table.clone(),
1426 self.params.where_sql(),
1427 self.params.page_limit_sql()
1428 );
1429 if self.params.sql {
1430 return JsonValue::from(sql.clone());
1431 }
1432 let (state, data) = self.execute(sql.as_str());
1433 match state {
1434 true => data,
1435 false => {
1436 error!("delete 失败>>> {data:?}");
1437 JsonValue::from(0)
1438 }
1439 }
1440 }
1441 fn transaction(&mut self) -> bool {
1442 let thread_id = format!("{:?}", thread::current().id());
1443
1444 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1445 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1446 t += 1;
1447 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1448 return true;
1449 }
1450 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1451
1452 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1453
1454 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1455 Ok(e) => e,
1456 Err(err) => {
1457 error!("query 超时: {err}");
1458 return false;
1459 }
1460 };
1461 let key = format!("{}{}", self.default, thread_id);
1462 TR.lock().unwrap().insert(key.clone(), conn);
1463
1464 let (state, _) = self.query(sql.as_str());
1465 match state {
1466 true => state,
1467 false => {
1468 TR.lock().unwrap().remove(&*key);
1469 TRANS.lock().unwrap().remove(&*thread_id.clone());
1470 state
1471 }
1472 }
1473 }
1474
1475 fn commit(&mut self) -> bool {
1476 let thread_id = format!("{:?}", thread::current().id());
1477 let sql = "COMMIT".to_string();
1478
1479 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1480 if t > 1 {
1481 t -= 1;
1482 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1483 return true;
1484 }
1485 let (state, data) = self.query(sql.as_str());
1486 TRANS.lock().unwrap().remove(&thread_id);
1487 let key = format!("{}{}", self.default, thread_id);
1488 TR.lock().unwrap().remove(&*key);
1489
1490 let t = TRANS_TABLE.lock().unwrap().clone();
1491 for (key, value) in t.iter() {
1492 if value.clone() == thread_id {
1493 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1494 }
1495 }
1496
1497 match state {
1498 true => {}
1499 false => {
1500 error!("提交事务失败: {data}");
1501 }
1502 }
1503 state
1504 }
1505
1506 fn rollback(&mut self) -> bool {
1507 let thread_id = format!("{:?}", thread::current().id());
1508 let sql = "ROLLBACK".to_string();
1509
1510 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1511 if t > 1 {
1512 t -= 1;
1513 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1514 return true;
1515 }
1516 let (state, data) = self.query(sql.as_str());
1517 TRANS.lock().unwrap().remove(&thread_id);
1518 let key = format!("{}{}", self.default, thread_id);
1519 TR.lock().unwrap().remove(&*key);
1520
1521 let t = TRANS_TABLE.lock().unwrap().clone();
1522 for (key, value) in t.iter() {
1523 if value.clone() == thread_id {
1524 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1525 }
1526 }
1527
1528 match state {
1529 true => {}
1530 false => {
1531 error!("回滚失败: {data}");
1532 }
1533 }
1534 state
1535 }
1536
1537 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1538 let (state, data) = self.query(sql);
1539 match state {
1540 true => Ok(data),
1541 false => Err(data.to_string()),
1542 }
1543 }
1544
1545 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1546 let (state, data) = self.execute(sql);
1547 match state {
1548 true => Ok(data),
1549 false => Err(data.to_string()),
1550 }
1551 }
1552
1553 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1554 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1555 self
1556 }
1557 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1558 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1559 self
1560 }
1561
1562 fn buildsql(&mut self) -> String {
1563 self.fetch_sql();
1564 let sql = self.select().to_string();
1565 format!("( {} ) `{}`", sql, self.params.table)
1566 }
1567
1568 fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1569 let main_table = if self.params.join_table.is_empty() {
1570 self.params.table.clone()
1571 } else {
1572 self.params.join_table.clone()
1573 };
1574 self.params.join_table = table.to_string();
1575 self.params.join.push(format!(" LEFT JOIN {table} ON {main_table}.{main_fields} = {table}.{right_fields}"));
1576 self
1577 }
1578
1579 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1580 let main_fields = if main_fields.is_empty() {
1581 "id"
1582 } else {
1583 main_fields
1584 };
1585 let second_fields = if second_fields.is_empty() {
1586 self.params.table.clone()
1587 } else {
1588 second_fields.to_string().clone()
1589 };
1590 let sec_table_name = format!("{}{}", table, "_2");
1591 let second_table = format!("{} {}", table, sec_table_name.clone());
1592 self.params.join_table = sec_table_name.clone();
1593 self.params.join.push(format!(
1594 " INNER JOIN {} ON {}.{} = {}.{}",
1595 second_table, self.params.table, main_fields, sec_table_name, second_fields
1596 ));
1597 self
1598 }
1599}