1use crate::pools;
2use crate::types::mysql_transaction::TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::{Connection, TABLE_FIELDS};
5use chrono::Local;
6use json::{array, object, JsonValue};
7use log::{error, info};
8use mysql::consts::ColumnType;
9use mysql::prelude::Queryable;
10use mysql::Value::NULL;
11use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
12
13use std::fmt::Debug;
14use std::ops::Index;
15use std::thread;
16use std::time::Duration;
17
18#[cfg(any(feature = "default", feature = "db-mysql"))]
19#[derive(Clone, Debug)]
20pub struct Mysql {
21 pub connection: Connection,
23 pub default: String,
25 pub params: Params,
26 pub pool: Pool,
27}
28
29impl Mysql {
30 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31 let pool_cfg = &connection.pool;
32 let pool_opts = PoolOpts::default()
33 .with_constraints(
34 PoolConstraints::new(
35 pool_cfg.min_connections as usize,
36 pool_cfg.max_connections as usize,
37 )
38 .unwrap_or_default(),
39 )
40 .with_reset_connection(true);
41
42 let opts = OptsBuilder::new()
43 .pool_opts(pool_opts)
44 .ip_or_hostname(Some(connection.hostname.clone()))
45 .tcp_port(connection.hostport.parse().unwrap_or(3306))
46 .user(Some(connection.username.clone()))
47 .pass(Some(connection.userpass.clone()))
48 .tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
49 .read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
50 .write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
51 .tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
52 .db_name(Some(connection.database.clone()));
53
54 match Pool::new(opts) {
55 Ok(pool) => Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("mysql"),
59 pool,
60 }),
61 Err(e) => {
62 error!("connect: {e}");
63 Err(e.to_string())
64 }
65 }
66 }
67 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
68 if sql.contains("INSERT") {
69 let rows = text.affected_rows();
70 if rows > 1 {
71 if self.params.autoinc {
72 let row = rows;
73 let start_row = text.last_insert_id().unwrap_or(0);
74 let end_row = start_row + row;
75
76 let mut ids = array![];
77 for item in start_row..end_row {
78 let _ = ids.push(item);
79 }
80 (true, ids)
81 } else {
82 (true, JsonValue::from(rows))
83 }
84 } else {
85 (true, JsonValue::from(text.last_insert_id()))
86 }
87 } else {
88 (true, JsonValue::from(text.affected_rows()))
89 }
90 }
91 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
92 let mut list = array![];
93 let mut index = 0;
94 text.for_each(|row| {
95 match row {
96 Ok(r) => {
97 let mut data = object! {};
98 for (index, item) in r.columns().iter().enumerate() {
99 let field = item.name_str();
100 let field = field.to_string();
101 let field = field.as_str();
102
103 data[field] = match item.column_type() {
104 ColumnType::MYSQL_TYPE_TINY => {
105 let t = r.get::<bool, _>(index).unwrap_or(true);
106 JsonValue::from(t)
107 }
108 ColumnType::MYSQL_TYPE_FLOAT
109 | ColumnType::MYSQL_TYPE_NEWDECIMAL
110 | ColumnType::MYSQL_TYPE_DOUBLE => {
111 let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
112 if t == NULL {
113 JsonValue::from(0.0)
114 } else {
115 match r.get::<f64, _>(index) {
116 None => JsonValue::from(0.0),
117 Some(t) => JsonValue::from(t),
118 }
119 }
120 }
121 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
122 let t = r.index(field).clone();
123 if t == NULL {
124 JsonValue::from(0)
125 } else {
126 let t = r.get::<i64, _>(index).unwrap_or(0);
127 JsonValue::from(t)
128 }
129 }
130 ColumnType::MYSQL_TYPE_NULL => {
131 let t = r.index(field).clone();
132 if t == NULL {
133 JsonValue::from("".to_string())
134 } else {
135 let t = r.get::<String, _>(index).unwrap_or("".to_string());
136 JsonValue::from(t)
137 }
138 }
139 ColumnType::MYSQL_TYPE_BLOB => {
140 let t = r.index(field).clone();
141 if t == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let t = r
145 .get::<mysql::Value, _>(index)
146 .unwrap_or("".to_string().into());
147 if t == NULL {
148 JsonValue::from("".to_string())
149 } else {
150 let t = r.get::<String, _>(index).unwrap_or("".to_string());
151 JsonValue::from(t)
152 }
153 }
154 }
155 ColumnType::MYSQL_TYPE_VAR_STRING => {
156 let t = r
157 .get::<mysql::Value, _>(index)
158 .unwrap_or("".to_string().into());
159 if t == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let t = r.get::<String, _>(index).unwrap_or("".to_string());
163 JsonValue::from(t)
164 }
165 }
166 ColumnType::MYSQL_TYPE_STRING => {
167 let t = r.index(field).clone();
168 if t == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let t = r.get::<String, _>(index).unwrap_or("".to_string());
172 JsonValue::from(t)
173 }
174 }
175 ColumnType::MYSQL_TYPE_DATE
176 | ColumnType::MYSQL_TYPE_DATETIME
177 | ColumnType::MYSQL_TYPE_LONG_BLOB
178 | ColumnType::MYSQL_TYPE_TIMESTAMP
179 | ColumnType::MYSQL_TYPE_TIME => {
180 let t = r.index(field).clone();
181 if t == NULL {
182 JsonValue::from("".to_string())
183 } else {
184 let t = r.get::<String, _>(index).unwrap_or("".to_string());
185 JsonValue::from(t)
186 }
187 }
188 ColumnType::MYSQL_TYPE_GEOMETRY => {
189 let t = r.index(field).clone();
190 if t == NULL {
191 JsonValue::from("".to_string())
192 } else {
193 let res = match r.index(field).clone() {
194 Value::Bytes(e) => e,
195 _ => vec![],
196 };
197 if res.len() >= 25 {
198 let x = f64::from_le_bytes(
199 res[9..17].try_into().unwrap_or([0u8; 8]),
200 );
201 let y = f64::from_le_bytes(
202 res[17..25].try_into().unwrap_or([0u8; 8]),
203 );
204 JsonValue::from(format!("{x},{y}"))
205 } else {
206 JsonValue::from("".to_string())
207 }
208 }
209 }
210 _ => {
211 let t = r.index(field).clone();
212 info!("未知: {} {:?} {:?}", field, item.column_type(), t);
213 JsonValue::from("".to_string())
214 }
215 };
216 }
217 let _ = list.push(data);
218 }
219 Err(e) => {
220 error!("err: {e} \r\n {sql}");
221 }
222 }
223 index += 1;
224 });
225 (true, list)
226 }
227 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
228 let thread_id = format!("{:?}", thread::current().id());
229 let key = format!("{}{}", self.default, thread_id);
230
231 let debug = self.connection.debug;
232 let params_json = self.params.json.clone();
233 let table_name = self.params.table.clone();
234
235 let is_system_query = sql.contains("INFORMATION_SCHEMA")
236 || sql.contains("information_schema")
237 || sql.starts_with("START TRANSACTION")
238 || sql.starts_with("COMMIT")
239 || sql.starts_with("ROLLBACK")
240 || sql.starts_with("SHOW ");
241
242 let fields_list = if !is_system_query && !table_name.is_empty() {
243 self.table_info(&table_name)
244 } else {
245 object! {}
246 };
247
248 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
249
250 if !in_transaction {
251 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
252 Ok(e) => e,
253 Err(err) => {
254 error!("非事务 execute超时: {err}");
255 return (false, object! {});
256 }
257 };
258 let connection_id = db.connection_id();
259 return match db.query_iter(sql) {
260 Ok(e) => {
261 if debug {
262 info!("查询成功: {} {}", thread_id, sql);
263 }
264 self.query_handle(e, sql)
265 }
266 Err(e) => {
267 error!(
268 "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
269 );
270 (false, JsonValue::from(e.to_string()))
271 }
272 };
273 }
274
275 match TRANSACTION_MANAGER.with_conn(&key, |db| {
276 let connection_id = db.connection_id();
277 match db.query_iter(sql) {
278 Ok(text) => {
279 if debug {
280 info!("查询成功: {} {}", thread_id, sql);
281 }
282 let mut list = array![];
283 text.for_each(|row| {
284 if let Ok(row) = row {
285 let mut item = object! {};
286 for column in row.columns_ref() {
287 let field = column.name_str().to_string();
288 let value = &row[column.name_str().as_ref()];
289 let column_type = column.column_type();
290 match column_type {
291 ColumnType::MYSQL_TYPE_VARCHAR
292 | ColumnType::MYSQL_TYPE_STRING
293 | ColumnType::MYSQL_TYPE_VAR_STRING
294 | ColumnType::MYSQL_TYPE_BLOB
295 | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
296 | ColumnType::MYSQL_TYPE_LONG_BLOB
297 | ColumnType::MYSQL_TYPE_TINY_BLOB => {
298 if *value == NULL {
299 item[field.as_str()] = "".into();
300 } else {
301 let data: String = mysql::from_value(value.clone());
302 if params_json.has_key(&field)
303 || fields_list[&field]["type"]
304 .to_string()
305 .contains("json")
306 {
307 item[field.as_str()] = match json::parse(&data) {
308 Ok(e) => e,
309 Err(_) => data.into(),
310 };
311 } else {
312 item[field.as_str()] = data.into();
313 }
314 }
315 }
316 ColumnType::MYSQL_TYPE_TINY
317 | ColumnType::MYSQL_TYPE_SHORT
318 | ColumnType::MYSQL_TYPE_LONG
319 | ColumnType::MYSQL_TYPE_INT24
320 | ColumnType::MYSQL_TYPE_LONGLONG => {
321 if *value == NULL {
322 item[field.as_str()] = 0.into();
323 } else {
324 let data: i64 = mysql::from_value(value.clone());
325 item[field.as_str()] = data.into();
326 }
327 }
328 ColumnType::MYSQL_TYPE_FLOAT
329 | ColumnType::MYSQL_TYPE_DOUBLE
330 | ColumnType::MYSQL_TYPE_DECIMAL
331 | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
332 if *value == NULL {
333 item[field.as_str()] = 0.0.into();
334 } else {
335 let data: f64 = mysql::from_value(value.clone());
336 item[field.as_str()] = data.into();
337 }
338 }
339 _ => {
340 if *value != NULL {
341 let data: String = mysql::from_value(value.clone());
342 item[field.as_str()] = data.into();
343 }
344 }
345 }
346 }
347 let _ = list.push(item);
348 }
349 });
350 (true, list)
351 }
352 Err(e) => {
353 error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
354 (false, JsonValue::from(e.to_string()))
355 }
356 }
357 }) {
358 Some(result) => result,
359 None => {
360 error!("事务连接不存在: {key}");
361 (false, object! {})
362 }
363 }
364 }
365 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
366 let thread_id = format!("{:?}", thread::current().id());
367 let key = format!("{}{}", self.default, thread_id);
368
369 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
370
371 if !in_transaction {
372 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
373 Ok(e) => e,
374 Err(err) => {
375 error!("非事务: execute超时: {err}");
376 return (false, object! {});
377 }
378 };
379 return match db.exec_iter(sql, ()) {
380 Ok(e) => {
381 if self.connection.debug {
382 info!("提交成功: {} {}", thread_id, sql);
383 }
384 self.execute_cl(e, sql)
385 }
386 Err(e) => {
387 error!("非事务提交失败: {thread_id} {e} {sql}");
388 (false, JsonValue::from(e.to_string()))
389 }
390 };
391 }
392
393 if !TRANSACTION_MANAGER.acquire_table_lock(
394 &self.params.table,
395 &thread_id,
396 Duration::from_secs(30),
397 ) {
398 error!("获取表锁超时: {} {}", self.params.table, thread_id);
399 return (false, object! {"error": "table lock timeout"});
400 }
401
402 let is_insert = sql.contains("INSERT");
403 let autoinc = self.params.autoinc;
404 let debug = self.connection.debug;
405
406 match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
408 Ok(result) => {
409 let affected_rows = result.affected_rows();
410 let last_insert_id = result.last_insert_id();
411 (true, affected_rows, last_insert_id, None)
412 }
413 Err(e) => (false, 0, None, Some(e.to_string())),
414 }) {
415 Some((true, affected_rows, last_insert_id, _)) => {
416 if debug {
417 info!("提交成功: {} {}", thread_id, sql);
418 }
419 if is_insert {
420 if affected_rows > 1 {
421 if autoinc {
422 let start_row = last_insert_id.unwrap_or(0);
423 let end_row = start_row + affected_rows;
424 let mut ids = array![];
425 for item in start_row..end_row {
426 let _ = ids.push(item);
427 }
428 (true, ids)
429 } else {
430 (true, JsonValue::from(affected_rows))
431 }
432 } else {
433 (true, JsonValue::from(last_insert_id))
434 }
435 } else {
436 (true, JsonValue::from(affected_rows))
437 }
438 }
439 Some((false, _, _, Some(err))) => {
440 error!("事务提交失败: {thread_id} {err} {sql}");
441 (false, JsonValue::from(err))
442 }
443 _ => {
444 error!("事务连接不存在: {key}");
445 (false, object! {})
446 }
447 }
448 }
449}
450
451impl DbMode for Mysql {
452 fn database_tables(&mut self) -> JsonValue {
453 let sql = "SHOW TABLES".to_string();
454 match self.sql(sql.as_str()) {
455 Ok(e) => {
456 let mut list = vec![];
457 for item in e.members() {
458 for (_, value) in item.entries() {
459 list.push(value.clone());
460 }
461 }
462 list.into()
463 }
464 Err(_) => {
465 array![]
466 }
467 }
468 }
469
470 fn database_create(&mut self, name: &str) -> bool {
471 let sql = format!("CREATE DATABASE {name}");
472
473 let (state, data) = self.execute(sql.as_str());
474 match state {
475 true => data.as_bool().unwrap_or(false),
476 false => {
477 error!("创建数据库失败: {data:?}");
478 false
479 }
480 }
481 }
482}
483
484impl Mode for Mysql {
485 fn table_create(&mut self, options: TableOptions) -> JsonValue {
486 let mut sql = String::new();
487 let mut unique_fields = String::new();
489 let mut unique_name = String::new();
490 let mut unique = String::new();
491 for item in options.table_unique.iter() {
492 if unique_fields.is_empty() {
493 unique_fields = format!("`{item}`");
494 unique_name = format!("{}_unique_{}", options.table_name, item);
495 } else {
496 unique_fields = format!("{unique_fields},`{item}`");
497 unique_name = format!("{unique_name}_{item}");
498 }
499 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
500 unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
501 }
502
503 let mut index = String::new();
505 for row in options.table_index.iter() {
506 let mut index_fields = String::new();
507 let mut index_name = String::new();
508 for item in row.iter() {
509 if index_fields.is_empty() {
510 index_fields = format!("`{item}`");
511 index_name = format!("{}_index_{}", options.table_name, item);
512 } else {
513 index_fields = format!("{index_fields},`{item}`");
514 index_name = format!("{index_name}_{item}");
515 }
516 }
517 if index.is_empty() {
518 index = format!("INDEX `{index_name}` ({index_fields})");
519 } else {
520 index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
521 }
522 }
523 if index.replace(",", "").is_empty() {
524 index = index.replace(",", "");
525 }
526
527 for (name, field) in options.table_fields.entries() {
528 let row = br_fields::field("mysql", name, field.clone());
529 sql = format!("{sql} {row},\r\n");
530 }
531
532 if !unique.is_empty() {
533 sql = sql.trim_end_matches(",\r\n").to_string();
534 sql = format!("{sql},\r\n{unique}");
535 }
536 if !index.is_empty() {
537 sql = sql.trim_end_matches(",\r\n").to_string();
538 sql = format!("{sql},\r\n{index}");
539 }
540 let collate = format!("{}_bin", self.connection.charset.str());
541
542 let partition = if options.table_partition {
544 sql = format!(
545 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
546 sql,
547 options.table_key,
548 options.table_partition_columns[0].clone()
549 );
550 let temp_head = format!(
551 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
552 options.table_partition_columns[0].clone()
553 );
554 let mut partition_array = vec![];
555 let mut count = 0;
556 for member in options.table_partition_columns[1].members() {
557 let temp = format!(
558 "PARTITION p{} VALUES LESS THAN ('{}')",
559 count.clone(),
560 member.clone()
561 );
562 count += 1;
563 partition_array.push(temp.clone());
564 }
565 let temp_body = partition_array.join(",\r\n");
566 let temp_end = format!(
567 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
568 count.clone()
569 );
570 format!("{temp_head}{temp_body}{temp_end}")
571 } else {
572 sql = if sql.trim_end().ends_with(",") {
573 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
574 } else {
575 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
576 };
577 "".to_string()
578 };
579 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
580
581 if self.params.sql {
582 return JsonValue::from(sql);
583 }
584
585 let (state, data) = self.execute(sql.as_str());
586
587 match state {
588 true => JsonValue::from(state),
589 false => {
590 info!("创建错误: {data}");
591 JsonValue::from(state)
592 }
593 }
594 }
595
596 fn table_update(&mut self, options: TableOptions) -> JsonValue {
597 let table_fields_guard = match TABLE_FIELDS.lock() {
598 Ok(g) => g,
599 Err(e) => e.into_inner(),
600 };
601 if table_fields_guard
602 .get(&format!("{}{}", self.default, options.table_name))
603 .is_some()
604 {
605 drop(table_fields_guard);
606 let mut table_fields_guard = match TABLE_FIELDS.lock() {
607 Ok(g) => g,
608 Err(e) => e.into_inner(),
609 };
610 table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
611 } else {
612 drop(table_fields_guard);
613 }
614 let mut sql = vec![];
615 let fields_list = self.table_info(&options.table_name);
616 let mut put = vec![];
617 let mut add = vec![];
618 let mut del = vec![];
619 for (key, _) in fields_list.entries() {
620 if options.table_fields[key].is_empty() {
621 del.push(key);
622 }
623 }
624 for (name, field) in options.table_fields.entries() {
625 if !fields_list[name].is_empty() {
626 let old_comment = fields_list[name]["comment"].to_string();
627 let new_comment = br_fields::field("mysql", name, field.clone());
628 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
629 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
630 if old_comment == new_comment_text {
631 continue;
632 }
633 put.push(name);
634 } else {
635 add.push(name);
636 }
637 }
638
639 for name in add.iter() {
640 let name = name.to_string();
641 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
642 sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
643 }
644 for name in del.iter() {
645 sql.push(format!(
646 "ALTER TABLE {} DROP `{name}`;\r\n",
647 options.table_name
648 ));
649 }
650 for name in put.iter() {
651 let name = name.to_string();
652 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
653 sql.push(format!(
654 "ALTER TABLE {} CHANGE `{}` {};\r\n",
655 options.table_name, name, row
656 ));
657 }
658
659 let (_, index_list) =
660 self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
661 let (_, pk_list) = self.query(
663 format!(
664 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
665 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
666 self.connection.database, options.table_name
667 )
668 .as_str(),
669 );
670 let mut pk_vec = vec![];
671 for member in pk_list.members() {
672 pk_vec.push(member["COLUMN_NAME"].to_string());
673 }
674
675 let mut unique_new = vec![];
676 let mut index_new = vec![];
677 for item in index_list.members() {
678 let key_name = item["Key_name"].as_str().unwrap_or("");
679 let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
680
681 if non_unique == 0
682 && (key_name.contains(format!("{}_unique", options.table_name).as_str())
683 || key_name.contains("unique"))
684 {
685 unique_new.push(key_name.to_string());
686 continue;
687 }
688 if non_unique == 1
689 && (key_name.contains(format!("{}_index", options.table_name).as_str())
690 || key_name.contains("index"))
691 {
692 index_new.push(key_name.to_string());
693 continue;
694 }
695 }
696
697 let mut unique_fields = String::new();
698 let mut unique_name = String::new();
699 for item in options.table_unique.iter() {
700 if unique_fields.is_empty() {
701 unique_fields = format!("`{item}`");
702 unique_name = format!("{}_unique_{}", options.table_name, item);
703 } else {
704 unique_fields = format!("{unique_fields},`{item}`");
705 unique_name = format!("{unique_name}_{item}");
706 }
707 }
708 if !unique_name.is_empty() {
709 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
710 unique_name = format!("unique_{md5}");
711 for item in &unique_new {
712 if unique_name != *item {
713 sql.push(format!(
714 "alter table {} drop index {};\r\n",
715 options.table_name, item
716 ));
717 }
718 }
719 if !unique_new.contains(&unique_name) {
720 sql.push(format!(
721 "CREATE UNIQUE index {} on {} ({});\r\n",
722 unique_name, options.table_name, unique_fields
723 ));
724 }
725 }
726
727 let mut index_list = vec![];
728 for row in options.table_index.iter() {
730 let mut index_fields = String::new();
731 let mut index_name = String::new();
732 for item in row {
733 if index_fields.is_empty() {
734 index_fields = item.to_string();
735 index_name = format!("{}_index_{}", options.table_name, item);
736 } else {
737 index_fields = format!("{index_fields},{item}");
738 index_name = format!("{index_name}_{item}");
739 }
740 }
741 index_list.push(index_name.clone());
742 if !index_new.contains(&index_name.clone()) {
743 sql.push(format!(
744 "CREATE INDEX {} on {} ({});\r\n",
745 index_name, options.table_name, index_fields
746 ));
747 }
748 }
749
750 for item in index_new {
751 if !index_list.contains(&item.to_string()) {
752 sql.push(format!(
753 "DROP INDEX {} ON {};\r\n",
754 item.clone(),
755 options.table_name
756 ));
757 }
758 }
759
760 if options.table_partition {
762 if !pk_vec.contains(&options.table_key.to_string().clone())
764 || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
765 {
766 let pk = format!(
767 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
768 options.table_name,
769 options.table_key,
770 options.table_partition_columns[0].clone()
771 );
772 sql.push(pk);
773 let temp_head = format!(
774 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
775 options.table_name,
776 options.table_partition_columns[0].clone()
777 );
778 let mut partition_array = vec![];
779 let mut count = 0;
780 for member in options.table_partition_columns[1].members() {
781 let temp = format!(
782 "PARTITION p{} VALUES LESS THAN ('{}')",
783 count.clone(),
784 member.clone()
785 );
786 count += 1;
787 partition_array.push(temp.clone());
788 }
789 let temp_body = partition_array.join(",\r\n");
790 let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
791 sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
792 }
793 } else if pk_vec.len() != 1 {
794 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
795 sql.push(rm_partition);
796 let pk = format!(
797 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
798 options.table_name, options.table_key
799 );
800 sql.push(pk);
801 };
802
803 if self.params.sql {
804 return JsonValue::from(sql.join(""));
805 }
806
807 if sql.is_empty() {
808 return JsonValue::from(-1);
809 }
810
811 for item in sql.iter() {
812 let (state, res) = self.execute(item.as_str());
813 match state {
814 true => {}
815 false => {
816 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
817 return JsonValue::from(0);
818 }
819 }
820 }
821 JsonValue::from(1)
822 }
823
824 fn table_info(&mut self, table: &str) -> JsonValue {
825 let table_fields_guard = match TABLE_FIELDS.lock() {
826 Ok(g) => g,
827 Err(e) => e.into_inner(),
828 };
829 if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
830 return cached.clone();
831 }
832 drop(table_fields_guard);
833 let sql = format!(
834 "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'"
835 );
836 let (state, data) = self.query(sql.as_str());
837 let mut list = object! {};
838 if state {
839 for item in data.members() {
840 if item["TABLE_SCHEMA"] != self.connection.database {
841 continue;
842 }
843 let mut row = object! {};
844 row["field"] = item["COLUMN_NAME"].clone();
845 row["comment"] = item["COLUMN_COMMENT"].clone();
846 row["type"] = item["COLUMN_TYPE"].clone();
847 if let Some(field_name) = row["field"].as_str() {
848 list[field_name] = row.clone();
849 }
850 }
851 let mut table_fields_guard = match TABLE_FIELDS.lock() {
852 Ok(g) => g,
853 Err(e) => e.into_inner(),
854 };
855 table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
856 list
857 } else {
858 list
859 }
860 }
861
862 fn table_is_exist(&mut self, name: &str) -> bool {
863 let sql =
864 format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
865 let (state, data) = self.query(sql.as_str());
866 match state {
867 true => {
868 for item in data.members() {
869 if item["TABLE_NAME"] == name
870 && item["TABLE_SCHEMA"] == self.connection.database
871 {
872 return true;
873 }
874 }
875 false
876 }
877 false => false,
878 }
879 }
880
881 fn table(&mut self, name: &str) -> &mut Mysql {
882 self.params = Params::default(self.connection.mode.str().as_str());
883 let table_name = format!("{}{}", self.connection.prefix, name);
884 if !super::sql_safety::validate_table_name(&table_name) {
885 error!("Invalid table name: {}", name);
886 }
887 self.params.table = table_name.clone();
888 self.params.join_table = table_name;
889 self
890 }
891
892 fn change_table(&mut self, name: &str) -> &mut Self {
893 self.params.join_table = name.to_string();
894 self
895 }
896
897 fn autoinc(&mut self) -> &mut Self {
898 self.params.autoinc = true;
899 self
900 }
901
902 fn fetch_sql(&mut self) -> &mut Self {
903 self.params.sql = true;
904 self
905 }
906
907 fn order(&mut self, field: &str, by: bool) -> &mut Self {
908 self.params.order[field] = {
909 if by {
910 "DESC"
911 } else {
912 "ASC"
913 }
914 }
915 .into();
916 self
917 }
918
919 fn group(&mut self, field: &str) -> &mut Self {
920 let fields: Vec<&str> = field.split(",").collect();
921 for field in fields.iter() {
922 let field = field.to_string();
923 self.params.group[field.as_str()] = field.clone().into();
924 if !self.params.fields.has_key(field.as_str()) {
925 self.params.fields[field.as_str()] = field.clone().into();
926 }
927 }
928
929 self
930 }
931
932 fn distinct(&mut self) -> &mut Self {
933 self.params.distinct = true;
934 self
935 }
936
937 fn json(&mut self, field: &str) -> &mut Self {
938 let list: Vec<&str> = field.split(",").collect();
939 for item in list.iter() {
940 self.params.json[item.to_string().as_str()] = item.to_string().into();
941 }
942 self
943 }
944
945 fn location(&mut self, field: &str) -> &mut Self {
946 let list: Vec<&str> = field.split(",").collect();
947 for item in list.iter() {
948 self.params.location[item.to_string().as_str()] = item.to_string().into();
949 }
950 self
951 }
952
953 fn field(&mut self, field: &str) -> &mut Self {
954 let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
955 let join_table = if self.params.join_table.is_empty() {
956 self.params.table.clone()
957 } else {
958 self.params.join_table.clone()
959 };
960 for item in list.iter() {
961 if item.contains(" as ") {
962 let text = item.split(" as ").collect::<Vec<&str>>();
963 if text[0].contains("count(") {
964 self.params.fields[item.to_string().as_str()] =
965 format!("{} as {}", text[0], text[1]).into();
966 } else {
967 self.params.fields[item.to_string().as_str()] =
968 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
969 }
970 } else {
971 self.params.fields[item.to_string().as_str()] =
972 format!("{join_table}.`{item}`").into();
973 }
974 }
975 self
976 }
977
978 fn hidden(&mut self, name: &str) -> &mut Self {
979 let hidden: Vec<&str> = name.split(",").collect();
980
981 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
982
983 let mut data = array![];
984 for item in fields_list.members() {
985 let _ = data.push(object! {
986 "name":item["COLUMN_NAME"].as_str().unwrap_or("")
987 });
988 }
989
990 for item in data.members() {
991 let name = item["name"].as_str().unwrap_or("");
992 if !hidden.contains(&name) {
993 self.params.fields[name] = name.into();
994 }
995 }
996 self
997 }
998
999 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1000 let table_fields = self.table_info(&self.params.table.clone());
1001 let join_table = if self.params.join_table.is_empty() {
1002 self.params.table.clone()
1003 } else {
1004 self.params.join_table.clone()
1005 };
1006 if value.is_boolean() {
1007 if value.as_bool().unwrap_or(false) {
1008 value = 1.into();
1009 } else {
1010 value = 0.into();
1011 }
1012 }
1013 match compare {
1014 "between" => {
1015 self.params.where_and.push(format!(
1016 "{join_table}.`{field}` between '{}' AND '{}'",
1017 value[0], value[1]
1018 ));
1019 }
1020 "location" => {
1021 let comment = table_fields[field]["comment"].to_string();
1022 let srid = comment
1023 .split("|")
1024 .collect::<Vec<&str>>()
1025 .last()
1026 .unwrap_or(&"0")
1027 .to_string();
1028
1029 let field_name = format!(
1030 "ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
1031 value[0], value[1], value[4]
1032 );
1033 self.params.fields[&field_name.clone()] = field_name.clone().into();
1034 let location = format!(
1037 "ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
1038 value[0], value[1], value[2], value[3]
1039 );
1040 self.params.where_and.push(location);
1041 }
1042 "set" => {
1043 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1044 let mut wheredata = vec![];
1045 for item in list.iter() {
1046 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1047 }
1048 self.params
1049 .where_and
1050 .push(format!("({})", wheredata.join(" or ")));
1051 }
1052 "notin" => {
1053 let mut text = String::new();
1054 for item in value.members() {
1055 text = format!("{text},'{item}'");
1056 }
1057 text = text.trim_start_matches(",").into();
1058 self.params
1059 .where_and
1060 .push(format!("{join_table}.`{field}` not in ({text})"));
1061 }
1062 "is" => {
1063 self.params
1064 .where_and
1065 .push(format!("{join_table}.`{field}` is {value}"));
1066 }
1067 "isnot" => {
1068 self.params
1069 .where_and
1070 .push(format!("{join_table}.`{field}` is not {value}"));
1071 }
1072 "notlike" => {
1073 self.params
1074 .where_and
1075 .push(format!("{join_table}.`{field}` not like '{value}'"));
1076 }
1077 "in" => {
1078 let mut text = String::new();
1079 if value.is_array() {
1080 for item in value.members() {
1081 text = format!("{text},'{item}'");
1082 }
1083 } else if value.is_null() {
1084 text = format!("{text},null");
1085 } else {
1086 let value = value.as_str().unwrap_or("");
1087
1088 let value: Vec<&str> = value.split(",").collect();
1089 for item in value.iter() {
1090 text = format!("{text},'{item}'");
1091 }
1092 }
1093 text = text.trim_start_matches(",").into();
1094
1095 self.params
1096 .where_and
1097 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1098 }
1099 _ => {
1100 self.params
1101 .where_and
1102 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1103 }
1104 }
1105 self
1106 }
1107
1108 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1109 let join_table = if self.params.join_table.is_empty() {
1110 self.params.table.clone()
1111 } else {
1112 self.params.join_table.clone()
1113 };
1114
1115 if value.is_boolean() {
1116 if value.as_bool().unwrap_or(false) {
1117 value = 1.into();
1118 } else {
1119 value = 0.into();
1120 }
1121 }
1122
1123 match compare {
1124 "between" => {
1125 self.params.where_or.push(format!(
1126 "{}.`{}` between '{}' AND '{}'",
1127 join_table, field, value[0], value[1]
1128 ));
1129 }
1130 "set" => {
1131 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1132 let mut wheredata = vec![];
1133 for item in list.iter() {
1134 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1135 }
1136 self.params
1137 .where_or
1138 .push(format!("({})", wheredata.join(" or ")));
1139 }
1140 "notin" => {
1141 let mut text = String::new();
1142 for item in value.members() {
1143 text = format!("{text},'{item}'");
1144 }
1145 text = text.trim_start_matches(",").into();
1146 self.params
1147 .where_or
1148 .push(format!("{join_table}.`{field}` not in ({text})"));
1149 }
1150 "is" => {
1151 self.params
1152 .where_or
1153 .push(format!("{join_table}.`{field}` is {value}"));
1154 }
1155 "isnot" => {
1156 self.params
1157 .where_or
1158 .push(format!("{join_table}.`{field}` IS NOT {value}"));
1159 }
1160 "in" => {
1161 let mut text = String::new();
1162 if value.is_array() {
1163 for item in value.members() {
1164 text = format!("{text},'{item}'");
1165 }
1166 } else {
1167 let value = value.as_str().unwrap_or("");
1168 let value: Vec<&str> = value.split(",").collect();
1169 for item in value.iter() {
1170 text = format!("{text},'{item}'");
1171 }
1172 }
1173 text = text.trim_start_matches(",").into();
1174 self.params
1175 .where_or
1176 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1177 }
1178 _ => {
1179 if field.contains(".") {
1180 self.params
1181 .where_or
1182 .push(format!("{field} {compare} '{value}'"));
1183 } else {
1184 self.params
1185 .where_or
1186 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1187 }
1188 }
1189 }
1190 self
1191 }
1192
1193 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1194 self.params.where_column = format!(
1195 "{}.`{}` {} {}.`{}`",
1196 self.params.table, field_a, compare, self.params.table, field_b
1197 );
1198 self
1199 }
1200
1201 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1202 self.params
1203 .update_column
1204 .push(format!("{field_a} = {compare}"));
1205 self
1206 }
1207
1208 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1209 self.params.page = page;
1210 self.params.limit = limit;
1211 self
1212 }
1213
1214 fn column(&mut self, field: &str) -> JsonValue {
1215 self.field(field);
1216 let sql = self.params.select_sql();
1217
1218 if self.params.sql {
1219 return JsonValue::from(sql);
1220 }
1221 let (state, data) = self.query(sql.as_str());
1222 match state {
1223 true => {
1224 let mut list = array![];
1225 for item in data.members() {
1226 if self.params.json[field].is_empty() {
1227 let _ = list.push(item[field].clone());
1228 } else {
1229 let data =
1230 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1231 let _ = list.push(data);
1232 }
1233 }
1234 list
1235 }
1236 false => {
1237 array![]
1238 }
1239 }
1240 }
1241
1242 fn count(&mut self) -> JsonValue {
1243 if !self.params.fields.is_empty() {
1244 self.group(format!("{}.id", self.params.table).as_str());
1245 }
1246 self.params.fields["count"] = "count(*) as count".into();
1247 let sql = self.params.select_sql();
1248 if self.params.sql {
1249 return JsonValue::from(sql.clone());
1250 }
1251 let (state, data) = self.query(sql.as_str());
1252 if state {
1253 if data.is_empty() {
1254 JsonValue::from(0)
1255 } else {
1256 data[0]["count"].clone()
1257 }
1258 } else {
1259 JsonValue::from(0)
1260 }
1261 }
1262
1263 fn max(&mut self, field: &str) -> JsonValue {
1264 self.params.fields[field] = format!("max({field}) as {field}").into();
1265 let sql = self.params.select_sql();
1266 if self.params.sql {
1267 return JsonValue::from(sql.clone());
1268 }
1269 let (state, data) = self.query(sql.as_str());
1270 if state {
1271 if data.len() > 1 {
1272 return data.clone();
1273 }
1274 data[0][field].clone()
1275 } else {
1276 JsonValue::from(0)
1277 }
1278 }
1279
1280 fn min(&mut self, field: &str) -> JsonValue {
1281 self.params.fields[field] = format!("min({field}) as {field}").into();
1282 let sql = self.params.select_sql();
1283 if self.params.sql {
1284 return JsonValue::from(sql.clone());
1285 }
1286 let (state, data) = self.query(sql.as_str());
1287 if state {
1288 if data.len() > 1 {
1289 return data;
1290 }
1291 data[0][field].clone()
1292 } else {
1293 JsonValue::from(0)
1294 }
1295 }
1296
1297 fn sum(&mut self, field: &str) -> JsonValue {
1298 self.params.fields[field] = format!("sum({field}) as {field}").into();
1299 let sql = self.params.select_sql();
1300 if self.params.sql {
1301 return JsonValue::from(sql.clone());
1302 }
1303 let (state, data) = self.query(sql.as_str());
1304 match state {
1305 true => {
1306 if data.len() > 1 {
1307 return data;
1308 }
1309 data[0][field].clone()
1310 }
1311 false => JsonValue::from(0),
1312 }
1313 }
1314
1315 fn avg(&mut self, field: &str) -> JsonValue {
1316 self.params.fields[field] = format!("avg({field}) as {field}").into();
1317 let sql = self.params.select_sql();
1318 if self.params.sql {
1319 return JsonValue::from(sql.clone());
1320 }
1321 let (state, data) = self.query(sql.as_str());
1322 if state {
1323 if data.len() > 1 {
1324 return data;
1325 }
1326 data[0][field].clone()
1327 } else {
1328 JsonValue::from(0)
1329 }
1330 }
1331
1332 fn select(&mut self) -> JsonValue {
1333 let sql = self.params.select_sql();
1334 if self.params.sql {
1335 return JsonValue::from(sql.clone());
1336 }
1337 let (state, mut data) = self.query(sql.as_str());
1338 match state {
1339 true => {
1340 for (field, _) in self.params.json.entries() {
1341 for item in data.members_mut() {
1342 if !item[field].is_empty() {
1343 let json = item[field].to_string();
1344 item[field] = match json::parse(&json) {
1345 Ok(e) => e,
1346 Err(_) => JsonValue::from(json),
1347 };
1348 }
1349 }
1350 }
1351 data.clone()
1352 }
1353 false => array![],
1354 }
1355 }
1356
1357 fn find(&mut self) -> JsonValue {
1358 self.params.page = 1;
1359 self.params.limit = 1;
1360 let sql = self.params.select_sql();
1361 if self.params.sql {
1362 return JsonValue::from(sql.clone());
1363 }
1364 let (state, mut data) = self.query(sql.as_str());
1365 match state {
1366 true => {
1367 if data.is_empty() {
1368 return object! {};
1369 }
1370 for (field, _) in self.params.json.entries() {
1371 if !data[0][field].is_empty() {
1372 let json = data[0][field].to_string();
1373 let json = json::parse(&json).unwrap_or(array![]);
1374 data[0][field] = json;
1375 } else {
1376 data[0][field] = array![];
1377 }
1378 }
1379 data[0].clone()
1380 }
1381 false => {
1382 error!("find失败: {data:?}");
1383 object! {}
1384 }
1385 }
1386 }
1387
1388 fn value(&mut self, field: &str) -> JsonValue {
1389 self.params.fields = object! {};
1390 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1391 self.params.page = 1;
1392 self.params.limit = 1;
1393 let sql = self.params.select_sql();
1394 if self.params.sql {
1395 return JsonValue::from(sql.clone());
1396 }
1397 let (state, mut data) = self.query(sql.as_str());
1398 match state {
1399 true => {
1400 for (field, _) in self.params.json.entries() {
1401 if !data[0][field].is_empty() {
1402 let json = data[0][field].to_string();
1403 let json = json::parse(&json).unwrap_or(array![]);
1404 data[0][field] = json;
1405 } else {
1406 data[0][field] = array![];
1407 }
1408 }
1409 data[0][field].clone()
1410 }
1411 false => {
1412 if self.connection.debug {
1413 info!("{data:?}");
1414 }
1415 JsonValue::Null
1416 }
1417 }
1418 }
1419 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1420 let fields_list = self.table_info(&self.params.table.clone());
1421
1422 let mut fields = vec![];
1423 let mut values = vec![];
1424 if !self.params.autoinc && data["id"].is_empty() {
1425 let thread_id = format!("{:?}", std::thread::current().id());
1426 let thread_num: u64 = thread_id
1427 .trim_start_matches("ThreadId(")
1428 .trim_end_matches(")")
1429 .parse()
1430 .unwrap_or(0);
1431 data["id"] = format!(
1432 "{:X}{:X}",
1433 Local::now().timestamp_nanos_opt().unwrap_or(0),
1434 thread_num
1435 )
1436 .into();
1437 }
1438 for (field, value) in data.entries() {
1439 fields.push(format!("`{field}`"));
1440
1441 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1442 if value.is_empty() {
1443 values.push("NULL".to_string());
1444 continue;
1445 }
1446 let comment = fields_list[field]["comment"].to_string();
1447 let srid = comment
1448 .split("|")
1449 .collect::<Vec<&str>>()
1450 .last()
1451 .unwrap_or(&"0")
1452 .to_string();
1453 let location = value.to_string().replace(",", " ");
1454 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1455 continue;
1456 }
1457
1458 if value.is_string() || value.is_array() || value.is_object() {
1459 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1460 continue;
1461 } else if value.is_number() || value.is_boolean() || value.is_null() {
1462 values.push(format!("{value}"));
1463 continue;
1464 } else {
1465 values.push(format!("'{value}'"));
1466 continue;
1467 }
1468 }
1469 let fields = fields.join(",");
1470 let values = values.join(",");
1471
1472 let sql = format!(
1473 "INSERT INTO {} ({fields}) VALUES ({values});",
1474 self.params.table
1475 );
1476 if self.params.sql {
1477 return JsonValue::from(sql.clone());
1478 }
1479 let (state, ids) = self.execute(sql.as_str());
1480
1481 match state {
1482 true => match self.params.autoinc {
1483 true => ids.clone(),
1484 false => data["id"].clone(),
1485 },
1486 false => {
1487 let thread_id = format!("{:?}", thread::current().id());
1488 error!("添加失败: {thread_id} {ids:?} {sql}");
1489 JsonValue::from("")
1490 }
1491 }
1492 }
1493 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1494 let fields_list = self.table_info(&self.params.table.clone());
1495
1496 let mut fields = String::new();
1497 if !self.params.autoinc && data[0]["id"].is_empty() {
1498 data[0]["id"] = "".into();
1499 }
1500 for (field, _) in data[0].entries() {
1501 fields = format!("{fields},`{field}`");
1502 }
1503 fields = fields.trim_start_matches(",").to_string();
1504
1505 let core_count = num_cpus::get();
1506 let mut p = pools::Pool::new(core_count * 4);
1507 let autoinc = self.params.autoinc;
1508 for list in data.members() {
1509 let mut item = list.clone();
1510 let params_location = self.params.location.clone();
1511 let fields_list_new = fields_list.clone();
1512 p.execute(move |pcindex| {
1513 if !autoinc && item["id"].is_empty() {
1514 let id = format!(
1515 "{:X}{:X}",
1516 Local::now().timestamp_nanos_opt().unwrap_or(0),
1517 pcindex
1518 );
1519 item["id"] = id.into();
1520 }
1521 let mut values = "".to_string();
1522 for (field, value) in item.entries() {
1523 if params_location.has_key(field) {
1524 if value.is_empty() {
1525 values = format!("{values},NULL");
1526 continue;
1527 }
1528 let comment = fields_list_new[field]["comment"].to_string();
1529 let srid = comment
1530 .split("|")
1531 .collect::<Vec<&str>>()
1532 .last()
1533 .unwrap_or(&"0")
1534 .to_string();
1535 let location = value.to_string().replace(",", " ");
1536 values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1537 continue;
1538 }
1539 if value.is_string() {
1540 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1541 } else if value.is_number() || value.is_boolean() {
1542 values = format!("{values},{value}");
1543 } else {
1544 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1545 }
1546 }
1547 values = format!("({})", values.trim_start_matches(","));
1548 array![item["id"].clone(), values]
1549 });
1550 }
1551 let (ids_list, mut values) = p.insert_all();
1552 values = values.trim_start_matches(",").to_string();
1553 let sql = format!(
1554 "INSERT INTO {} ({}) VALUES {};",
1555 self.params.table, fields, values
1556 );
1557
1558 if self.params.sql {
1559 return JsonValue::from(sql.clone());
1560 }
1561 let (state, data) = self.execute(sql.as_str());
1562 match state {
1563 true => match autoinc {
1564 true => data,
1565 false => JsonValue::from(ids_list),
1566 },
1567 false => {
1568 error!("insert_all: {data:?}");
1569 array![]
1570 }
1571 }
1572 }
1573 fn update(&mut self, data: JsonValue) -> JsonValue {
1574 let fields_list = self.table_info(&self.params.table.clone());
1575
1576 let mut values = vec![];
1577 for (field, value) in data.entries() {
1578 if !self.params.json[field].is_empty() {
1579 let json = value.to_string().replace("'", "''");
1580 values.push(format!("`{field}`='{json}'"));
1581 continue;
1582 }
1583 if !self.params.location[field].is_empty() {
1584 if value.is_empty() {
1585 values.push(format!("{field}=NULL").to_string());
1586 continue;
1587 }
1588 let comment = fields_list[field]["comment"].to_string();
1589 let srid = comment
1590 .split("|")
1591 .collect::<Vec<&str>>()
1592 .last()
1593 .unwrap_or(&"0")
1594 .to_string();
1595 let location = value.to_string().replace(",", " ");
1596 values.push(format!(
1597 "{field}=ST_GeomFromText('POINT({location})',{srid})"
1598 ));
1599
1600 continue;
1601 }
1602
1603 if value.is_string() {
1604 values.push(format!(
1605 "`{field}`='{}'",
1606 value.to_string().replace("'", "''")
1607 ));
1608 } else if value.is_number() {
1609 values.push(format!("`{field}`= {value}"));
1610 } else if value.is_array() {
1611 let array = value
1612 .members()
1613 .map(|x| x.as_str().unwrap_or(""))
1614 .collect::<Vec<&str>>()
1615 .join(",");
1616 values.push(format!("`{field}`='{array}'"));
1617 continue;
1618 } else if value.is_object() {
1619 if self.params.json[field].is_empty() {
1620 values.push(format!("`{field}`='{value}'"));
1621 } else {
1622 if value.is_empty() {
1623 values.push(format!("`{field}`=''"));
1624 continue;
1625 }
1626 let json = value.to_string();
1627 let json = json.replace("'", "''");
1628 values.push(format!("`{field}`='{json}'"));
1629 }
1630 continue;
1631 } else if value.is_boolean() {
1632 values.push(format!("`{field}`= {value}"));
1633 } else {
1634 values.push(format!("`{field}`=\"{value}\""));
1635 }
1636 }
1637
1638 for (field, value) in self.params.inc_dec.entries() {
1639 values.push(format!("{field} = {}", value.to_string().clone()));
1640 }
1641 if !self.params.update_column.is_empty() {
1642 values.extend(self.params.update_column.clone());
1643 }
1644
1645 let values = values.join(",");
1646
1647 let sql = format!(
1648 "UPDATE {} SET {values} {};",
1649 self.params.table.clone(),
1650 self.params.where_sql()
1651 );
1652 if self.params.sql {
1653 return JsonValue::from(sql.clone());
1654 }
1655 let (state, data) = self.execute(sql.as_str());
1656 if state {
1657 data
1658 } else {
1659 let thread_id = format!("{:?}", thread::current().id());
1660 error!("update: {thread_id} {data:?} {sql}");
1661 0.into()
1662 }
1663 }
1664
1665 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1666 let fields_list = self.table_info(&self.params.table.clone());
1667 let mut values = vec![];
1668 let mut ids = vec![];
1669 for (field, _) in data[0].entries() {
1670 if field == "id" {
1671 continue;
1672 }
1673 let mut fields = vec![];
1674 for row in data.members() {
1675 let value = row[field].clone();
1676 let id = row["id"].clone();
1677 ids.push(id.clone());
1678
1679 if self.params.json.has_key(field) {
1680 let json = value.to_string();
1681 let json = json.replace("'", "''");
1682 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1683 continue;
1684 }
1685 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1686 let comment = fields_list[field]["comment"].to_string();
1687 let srid = comment
1688 .split("|")
1689 .collect::<Vec<&str>>()
1690 .last()
1691 .unwrap_or(&"0")
1692 .to_string();
1693 let location = value.to_string().replace(",", " ");
1694 let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1695 fields.push(format!("WHEN '{id}' THEN {location}"));
1696 continue;
1697 }
1698 if value.is_string() {
1699 fields.push(format!(
1700 "WHEN '{id}' THEN '{}'",
1701 value.to_string().replace("'", "''")
1702 ));
1703 } else if value.is_array() || value.is_object() {
1704 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1705 } else if value.is_number() || value.is_boolean() || value.is_null() {
1706 fields.push(format!("WHEN '{id}' THEN {value}"));
1707 } else {
1708 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1709 }
1710 }
1711 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1712 }
1713 self.where_and("id", "in", ids.into());
1714 for (field, value) in self.params.inc_dec.entries() {
1715 values.push(format!("{} = {}", field, value.to_string().clone()));
1716 }
1717
1718 let values = values.join(",");
1719 let sql = format!(
1720 "UPDATE {} SET {} {} {};",
1721 self.params.table.clone(),
1722 values,
1723 self.params.where_sql(),
1724 self.params.page_limit_sql()
1725 );
1726 if self.params.sql {
1727 return JsonValue::from(sql.clone());
1728 }
1729 let (state, data) = self.execute(sql.as_str());
1730 if state {
1731 data
1732 } else {
1733 error!("update_all: {data:?}");
1734 JsonValue::from(0)
1735 }
1736 }
1737
1738 fn delete(&mut self) -> JsonValue {
1739 let sql = format!(
1740 "delete FROM {} {} {};",
1741 self.params.table.clone(),
1742 self.params.where_sql(),
1743 self.params.page_limit_sql()
1744 );
1745 if self.params.sql {
1746 return JsonValue::from(sql.clone());
1747 }
1748 let (state, data) = self.execute(sql.as_str());
1749 match state {
1750 true => data,
1751 false => {
1752 error!("delete 失败>>> {data:?}");
1753 JsonValue::from(0)
1754 }
1755 }
1756 }
1757 fn transaction(&mut self) -> bool {
1758 let thread_id = format!("{:?}", thread::current().id());
1759 let key = format!("{}{}", self.default, thread_id);
1760
1761 if TRANSACTION_MANAGER.is_in_transaction(&key) {
1762 return TRANSACTION_MANAGER.increment_depth(&key);
1763 }
1764
1765 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1766 Ok(e) => e,
1767 Err(err) => {
1768 error!("transaction 获取连接超时: {err}");
1769 return false;
1770 }
1771 };
1772
1773 if !TRANSACTION_MANAGER.start(&key, conn) {
1774 return false;
1775 }
1776
1777 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
1778 let (state, _) = self.query(sql);
1779 if !state {
1780 TRANSACTION_MANAGER.remove(&key, &thread_id);
1781 }
1782 state
1783 }
1784
1785 fn commit(&mut self) -> bool {
1786 let thread_id = format!("{:?}", thread::current().id());
1787 let key = format!("{}{}", self.default, thread_id);
1788
1789 let depth = TRANSACTION_MANAGER.get_depth(&key);
1790 if depth > 1 {
1791 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1792 return true;
1793 }
1794
1795 let sql = "COMMIT";
1796 let (state, data) = self.query(sql);
1797
1798 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1799
1800 if !state {
1801 error!("提交事务失败: {data}");
1802 }
1803 state
1804 }
1805
1806 fn rollback(&mut self) -> bool {
1807 let thread_id = format!("{:?}", thread::current().id());
1808 let key = format!("{}{}", self.default, thread_id);
1809
1810 let depth = TRANSACTION_MANAGER.get_depth(&key);
1811 if depth > 1 {
1812 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1813 return true;
1814 }
1815
1816 let sql = "ROLLBACK";
1817 let (state, data) = self.query(sql);
1818
1819 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1820
1821 if !state {
1822 error!("回滚失败: {data}");
1823 }
1824 state
1825 }
1826
1827 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1828 let (state, data) = self.query(sql);
1829 match state {
1830 true => Ok(data),
1831 false => Err(data.to_string()),
1832 }
1833 }
1834
1835 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1836 let (state, data) = self.execute(sql);
1837 match state {
1838 true => Ok(data),
1839 false => Err(data.to_string()),
1840 }
1841 }
1842
1843 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1844 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1845 self
1846 }
1847 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1848 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1849 self
1850 }
1851
1852 fn buildsql(&mut self) -> String {
1853 self.fetch_sql();
1854 let sql = self.select().to_string();
1855 format!("( {} ) `{}`", sql, self.params.table)
1856 }
1857
1858 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1859 for field in fields.clone() {
1860 if field.contains(format!("{}.", self.params.table).as_str()) {
1861 self.params.fields[field] = field.into();
1862 } else {
1863 self.params.fields[field] =
1864 format!("{field} as {}", field.replace(".", "_")).into();
1865 }
1866 }
1867 self
1868 }
1869
1870 fn join(
1871 &mut self,
1872 main_table: &str,
1873 main_fields: &str,
1874 right_table: &str,
1875 right_fields: &str,
1876 ) -> &mut Self {
1877 let main_table = if main_table.is_empty() {
1878 self.params.table.clone()
1879 } else {
1880 main_table.to_string()
1881 };
1882 self.params.join_table = right_table.to_string();
1883 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1884 self
1885 }
1886
1887 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1888 let main_fields = if main_fields.is_empty() {
1889 "id"
1890 } else {
1891 main_fields
1892 };
1893 let second_fields = if second_fields.is_empty() {
1894 self.params.table.clone()
1895 } else {
1896 second_fields.to_string().clone()
1897 };
1898 let sec_table_name = format!("{}{}", table, "_2");
1899 let second_table = format!("{} {}", table, sec_table_name.clone());
1900 self.params.join_table = sec_table_name.clone();
1901 self.params.join.push(format!(
1902 " INNER JOIN {} ON {}.{} = {}.{}",
1903 second_table, self.params.table, main_fields, sec_table_name, second_fields
1904 ));
1905 self
1906 }
1907}