1use crate::pools;
2use crate::types::mysql_transaction::TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::{Connection, TABLE_FIELDS};
5use chrono::Local;
6use json::{array, object, JsonValue};
7use log::{error, info};
8use mysql::consts::ColumnType;
9use mysql::prelude::Queryable;
10use mysql::Value::NULL;
11use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
12
13use std::fmt::Debug;
14use std::ops::Index;
15use std::thread;
16use std::time::Duration;
17
18#[cfg(any(feature = "default", feature = "db-mysql"))]
19#[derive(Clone, Debug)]
20pub struct Mysql {
21 pub connection: Connection,
23 pub default: String,
25 pub params: Params,
26 pub pool: Pool,
27}
28
29impl Mysql {
30 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31 let pool_cfg = &connection.pool;
32 let pool_opts = PoolOpts::default()
33 .with_constraints(
34 PoolConstraints::new(
35 pool_cfg.min_connections as usize,
36 pool_cfg.max_connections as usize,
37 )
38 .unwrap_or_default(),
39 )
40 .with_reset_connection(true);
41
42 let opts = OptsBuilder::new()
43 .pool_opts(pool_opts)
44 .ip_or_hostname(Some(connection.hostname.clone()))
45 .tcp_port(connection.hostport.parse().unwrap_or(3306))
46 .user(Some(connection.username.clone()))
47 .pass(Some(connection.userpass.clone()))
48 .tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
49 .read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
50 .write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
51 .tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
52 .db_name(Some(connection.database.clone()));
53
54 match Pool::new(opts) {
55 Ok(pool) => Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("mysql"),
59 pool,
60 }),
61 Err(e) => {
62 error!("connect: {e}");
63 Err(e.to_string())
64 }
65 }
66 }
67 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
68 if sql.contains("INSERT") {
69 let rows = text.affected_rows();
70 if rows > 1 {
71 if self.params.autoinc {
72 let row = rows;
73 let start_row = text.last_insert_id().unwrap_or(0);
74 let end_row = start_row + row;
75
76 let mut ids = array![];
77 for item in start_row..end_row {
78 let _ = ids.push(item);
79 }
80 (true, ids)
81 } else {
82 (true, JsonValue::from(rows))
83 }
84 } else {
85 (true, JsonValue::from(text.last_insert_id()))
86 }
87 } else {
88 (true, JsonValue::from(text.affected_rows()))
89 }
90 }
91 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
92 let mut list = array![];
93 let mut index = 0;
94 text.for_each(|row| {
95 match row {
96 Ok(r) => {
97 let mut data = object! {};
98 for (index, item) in r.columns().iter().enumerate() {
99 let field = item.name_str();
100 let field = field.to_string();
101 let field = field.as_str();
102
103 data[field] = match item.column_type() {
104 ColumnType::MYSQL_TYPE_TINY => {
105 let t = r.get::<bool, _>(index).unwrap_or(true);
106 JsonValue::from(t)
107 }
108 ColumnType::MYSQL_TYPE_FLOAT
109 | ColumnType::MYSQL_TYPE_NEWDECIMAL
110 | ColumnType::MYSQL_TYPE_DOUBLE => {
111 let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
112 if t == NULL {
113 JsonValue::from(0.0)
114 } else {
115 match r.get::<f64, _>(index) {
116 None => JsonValue::from(0.0),
117 Some(t) => JsonValue::from(t),
118 }
119 }
120 }
121 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
122 let t = r.index(field).clone();
123 if t == NULL {
124 JsonValue::from(0)
125 } else {
126 let t = r.get::<i64, _>(index).unwrap_or(0);
127 JsonValue::from(t)
128 }
129 }
130 ColumnType::MYSQL_TYPE_NULL => {
131 let t = r.index(field).clone();
132 if t == NULL {
133 JsonValue::from("".to_string())
134 } else {
135 let t = r.get::<String, _>(index).unwrap_or("".to_string());
136 JsonValue::from(t)
137 }
138 }
139 ColumnType::MYSQL_TYPE_BLOB => {
140 let t = r.index(field).clone();
141 if t == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let t = r
145 .get::<mysql::Value, _>(index)
146 .unwrap_or("".to_string().into());
147 if t == NULL {
148 JsonValue::from("".to_string())
149 } else {
150 let t = r.get::<String, _>(index).unwrap_or("".to_string());
151 JsonValue::from(t)
152 }
153 }
154 }
155 ColumnType::MYSQL_TYPE_VAR_STRING => {
156 let t = r
157 .get::<mysql::Value, _>(index)
158 .unwrap_or("".to_string().into());
159 if t == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let t = r.get::<String, _>(index).unwrap_or("".to_string());
163 JsonValue::from(t)
164 }
165 }
166 ColumnType::MYSQL_TYPE_STRING => {
167 let t = r.index(field).clone();
168 if t == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let t = r.get::<String, _>(index).unwrap_or("".to_string());
172 JsonValue::from(t)
173 }
174 }
175 ColumnType::MYSQL_TYPE_DATE
176 | ColumnType::MYSQL_TYPE_DATETIME
177 | ColumnType::MYSQL_TYPE_LONG_BLOB
178 | ColumnType::MYSQL_TYPE_TIMESTAMP
179 | ColumnType::MYSQL_TYPE_TIME => {
180 let t = r.index(field).clone();
181 if t == NULL {
182 JsonValue::from("".to_string())
183 } else {
184 let t = r.get::<String, _>(index).unwrap_or("".to_string());
185 JsonValue::from(t)
186 }
187 }
188 ColumnType::MYSQL_TYPE_GEOMETRY => {
189 let t = r.index(field).clone();
190 if t == NULL {
191 JsonValue::from("".to_string())
192 } else {
193 let res = match r.index(field).clone() {
194 Value::Bytes(e) => e,
195 _ => vec![],
196 };
197 if res.len() >= 25 {
198 let x = f64::from_le_bytes(
199 res[9..17].try_into().unwrap_or([0u8; 8]),
200 );
201 let y = f64::from_le_bytes(
202 res[17..25].try_into().unwrap_or([0u8; 8]),
203 );
204 JsonValue::from(format!("{x},{y}"))
205 } else {
206 JsonValue::from("".to_string())
207 }
208 }
209 }
210 _ => {
211 let t = r.index(field).clone();
212 info!("未知: {} {:?} {:?}", field, item.column_type(), t);
213 JsonValue::from("".to_string())
214 }
215 };
216 }
217 let _ = list.push(data);
218 }
219 Err(e) => {
220 error!("err: {e} \r\n {sql}");
221 }
222 }
223 index += 1;
224 });
225 (true, list)
226 }
227 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
228 let thread_id = format!("{:?}", thread::current().id());
229 let key = format!("{}{}", self.default, thread_id);
230
231 let debug = self.connection.debug;
232 let params_json = self.params.json.clone();
233 let table_name = self.params.table.clone();
234
235 let is_system_query = sql.contains("INFORMATION_SCHEMA")
236 || sql.contains("information_schema")
237 || sql.starts_with("START TRANSACTION")
238 || sql.starts_with("COMMIT")
239 || sql.starts_with("ROLLBACK")
240 || sql.starts_with("SHOW ");
241
242 let fields_list = if !is_system_query && !table_name.is_empty() {
243 self.table_info(&table_name)
244 } else {
245 object! {}
246 };
247
248 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
249
250 if !in_transaction {
251 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
252 Ok(e) => e,
253 Err(err) => {
254 error!("非事务 execute超时: {err}");
255 return (false, object! {});
256 }
257 };
258 let connection_id = db.connection_id();
259 return match db.query_iter(sql) {
260 Ok(e) => {
261 if debug {
262 info!("查询成功: {} {}", thread_id, sql);
263 }
264 self.query_handle(e, sql)
265 }
266 Err(e) => {
267 error!(
268 "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
269 );
270 (false, JsonValue::from(e.to_string()))
271 }
272 };
273 }
274
275 match TRANSACTION_MANAGER.with_conn(&key, |db| {
276 let connection_id = db.connection_id();
277 match db.query_iter(sql) {
278 Ok(text) => {
279 if debug {
280 info!("查询成功: {} {}", thread_id, sql);
281 }
282 let mut list = array![];
283 text.for_each(|row| {
284 if let Ok(row) = row {
285 let mut item = object! {};
286 for column in row.columns_ref() {
287 let field = column.name_str().to_string();
288 let value = &row[column.name_str().as_ref()];
289 let column_type = column.column_type();
290 match column_type {
291 ColumnType::MYSQL_TYPE_VARCHAR
292 | ColumnType::MYSQL_TYPE_STRING
293 | ColumnType::MYSQL_TYPE_VAR_STRING
294 | ColumnType::MYSQL_TYPE_BLOB
295 | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
296 | ColumnType::MYSQL_TYPE_LONG_BLOB
297 | ColumnType::MYSQL_TYPE_TINY_BLOB => {
298 if *value == NULL {
299 item[field.as_str()] = "".into();
300 } else {
301 let data: String = mysql::from_value(value.clone());
302 if params_json.has_key(&field)
303 || fields_list[&field]["type"]
304 .to_string()
305 .contains("json")
306 {
307 item[field.as_str()] = match json::parse(&data) {
308 Ok(e) => e,
309 Err(_) => data.into(),
310 };
311 } else {
312 item[field.as_str()] = data.into();
313 }
314 }
315 }
316 ColumnType::MYSQL_TYPE_TINY
317 | ColumnType::MYSQL_TYPE_SHORT
318 | ColumnType::MYSQL_TYPE_LONG
319 | ColumnType::MYSQL_TYPE_INT24
320 | ColumnType::MYSQL_TYPE_LONGLONG => {
321 if *value == NULL {
322 item[field.as_str()] = 0.into();
323 } else {
324 let data: i64 = mysql::from_value(value.clone());
325 item[field.as_str()] = data.into();
326 }
327 }
328 ColumnType::MYSQL_TYPE_FLOAT
329 | ColumnType::MYSQL_TYPE_DOUBLE
330 | ColumnType::MYSQL_TYPE_DECIMAL
331 | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
332 if *value == NULL {
333 item[field.as_str()] = 0.0.into();
334 } else {
335 let data: f64 = mysql::from_value(value.clone());
336 item[field.as_str()] = data.into();
337 }
338 }
339 _ => {
340 if *value != NULL {
341 let data: String = mysql::from_value(value.clone());
342 item[field.as_str()] = data.into();
343 }
344 }
345 }
346 }
347 let _ = list.push(item);
348 }
349 });
350 (true, list)
351 }
352 Err(e) => {
353 error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
354 (false, JsonValue::from(e.to_string()))
355 }
356 }
357 }) {
358 Some(result) => result,
359 None => {
360 error!("事务连接不存在: {key}");
361 (false, object! {})
362 }
363 }
364 }
365 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
366 let thread_id = format!("{:?}", thread::current().id());
367 let key = format!("{}{}", self.default, thread_id);
368
369 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
370
371 if !in_transaction {
372 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
373 Ok(e) => e,
374 Err(err) => {
375 error!("非事务: execute超时: {err}");
376 return (false, object! {});
377 }
378 };
379 return match db.exec_iter(sql, ()) {
380 Ok(e) => {
381 if self.connection.debug {
382 info!("提交成功: {} {}", thread_id, sql);
383 }
384 self.execute_cl(e, sql)
385 }
386 Err(e) => {
387 error!("非事务提交失败: {thread_id} {e} {sql}");
388 (false, JsonValue::from(e.to_string()))
389 }
390 };
391 }
392
393 if !TRANSACTION_MANAGER.acquire_table_lock(
394 &self.params.table,
395 &thread_id,
396 Duration::from_secs(30),
397 ) {
398 error!("获取表锁超时: {} {}", self.params.table, thread_id);
399 return (false, object! {"error": "table lock timeout"});
400 }
401
402 let is_insert = sql.contains("INSERT");
403 let autoinc = self.params.autoinc;
404 let debug = self.connection.debug;
405
406 match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
408 Ok(result) => {
409 let affected_rows = result.affected_rows();
410 let last_insert_id = result.last_insert_id();
411 (true, affected_rows, last_insert_id, None)
412 }
413 Err(e) => (false, 0, None, Some(e.to_string())),
414 }) {
415 Some((true, affected_rows, last_insert_id, _)) => {
416 if debug {
417 info!("提交成功: {} {}", thread_id, sql);
418 }
419 if is_insert {
420 if affected_rows > 1 {
421 if autoinc {
422 let start_row = last_insert_id.unwrap_or(0);
423 let end_row = start_row + affected_rows;
424 let mut ids = array![];
425 for item in start_row..end_row {
426 let _ = ids.push(item);
427 }
428 (true, ids)
429 } else {
430 (true, JsonValue::from(affected_rows))
431 }
432 } else {
433 (true, JsonValue::from(last_insert_id))
434 }
435 } else {
436 (true, JsonValue::from(affected_rows))
437 }
438 }
439 Some((false, _, _, Some(err))) => {
440 error!("事务提交失败: {thread_id} {err} {sql}");
441 (false, JsonValue::from(err))
442 }
443 _ => {
444 error!("事务连接不存在: {key}");
445 (false, object! {})
446 }
447 }
448 }
449}
450
451impl DbMode for Mysql {
452 fn database_tables(&mut self) -> JsonValue {
453 let sql = "SHOW TABLES".to_string();
454 match self.sql(sql.as_str()) {
455 Ok(e) => {
456 let mut list = vec![];
457 for item in e.members() {
458 for (_, value) in item.entries() {
459 list.push(value.clone());
460 }
461 }
462 list.into()
463 }
464 Err(_) => {
465 array![]
466 }
467 }
468 }
469
470 fn database_create(&mut self, name: &str) -> bool {
471 let sql = format!("CREATE DATABASE {name}");
472
473 let (state, data) = self.execute(sql.as_str());
474 match state {
475 true => data.as_bool().unwrap_or(false),
476 false => {
477 error!("创建数据库失败: {data:?}");
478 false
479 }
480 }
481 }
482
483 fn truncate(&mut self, table: &str) -> bool {
484 let sql = format!("TRUNCATE TABLE {table}");
485 let (state, _) = self.execute(sql.as_str());
486 state
487 }
488}
489
490impl Mode for Mysql {
491 fn table_create(&mut self, options: TableOptions) -> JsonValue {
492 let mut sql = String::new();
493 let mut unique_fields = String::new();
495 let mut unique_name = String::new();
496 let mut unique = String::new();
497 for item in options.table_unique.iter() {
498 if unique_fields.is_empty() {
499 unique_fields = format!("`{item}`");
500 unique_name = format!("{}_unique_{}", options.table_name, item);
501 } else {
502 unique_fields = format!("{unique_fields},`{item}`");
503 unique_name = format!("{unique_name}_{item}");
504 }
505 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
506 unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
507 }
508
509 let mut index = String::new();
511 for row in options.table_index.iter() {
512 let mut index_fields = String::new();
513 let mut index_name = String::new();
514 for item in row.iter() {
515 if index_fields.is_empty() {
516 index_fields = format!("`{item}`");
517 index_name = format!("{}_index_{}", options.table_name, item);
518 } else {
519 index_fields = format!("{index_fields},`{item}`");
520 index_name = format!("{index_name}_{item}");
521 }
522 }
523 if index.is_empty() {
524 index = format!("INDEX `{index_name}` ({index_fields})");
525 } else {
526 index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
527 }
528 }
529 if index.replace(",", "").is_empty() {
530 index = index.replace(",", "");
531 }
532
533 for (name, field) in options.table_fields.entries() {
534 let row = br_fields::field("mysql", name, field.clone());
535 sql = format!("{sql} {row},\r\n");
536 }
537
538 if !unique.is_empty() {
539 sql = sql.trim_end_matches(",\r\n").to_string();
540 sql = format!("{sql},\r\n{unique}");
541 }
542 if !index.is_empty() {
543 sql = sql.trim_end_matches(",\r\n").to_string();
544 sql = format!("{sql},\r\n{index}");
545 }
546 let collate = format!("{}_bin", self.connection.charset.str());
547
548 let partition = if options.table_partition {
550 sql = format!(
551 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
552 sql,
553 options.table_key,
554 options.table_partition_columns[0].clone()
555 );
556 let temp_head = format!(
557 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
558 options.table_partition_columns[0].clone()
559 );
560 let mut partition_array = vec![];
561 let mut count = 0;
562 for member in options.table_partition_columns[1].members() {
563 let temp = format!(
564 "PARTITION p{} VALUES LESS THAN ('{}')",
565 count.clone(),
566 member.clone()
567 );
568 count += 1;
569 partition_array.push(temp.clone());
570 }
571 let temp_body = partition_array.join(",\r\n");
572 let temp_end = format!(
573 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
574 count.clone()
575 );
576 format!("{temp_head}{temp_body}{temp_end}")
577 } else {
578 sql = if sql.trim_end().ends_with(",") {
579 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
580 } else {
581 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
582 };
583 "".to_string()
584 };
585 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
586
587 if self.params.sql {
588 return JsonValue::from(sql);
589 }
590
591 let (state, data) = self.execute(sql.as_str());
592
593 match state {
594 true => JsonValue::from(state),
595 false => {
596 info!("创建错误: {data}");
597 JsonValue::from(state)
598 }
599 }
600 }
601
602 fn table_update(&mut self, options: TableOptions) -> JsonValue {
603 let table_fields_guard = match TABLE_FIELDS.lock() {
604 Ok(g) => g,
605 Err(e) => e.into_inner(),
606 };
607 if table_fields_guard
608 .get(&format!("{}{}", self.default, options.table_name))
609 .is_some()
610 {
611 drop(table_fields_guard);
612 let mut table_fields_guard = match TABLE_FIELDS.lock() {
613 Ok(g) => g,
614 Err(e) => e.into_inner(),
615 };
616 table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
617 } else {
618 drop(table_fields_guard);
619 }
620 let mut sql = vec![];
621 let fields_list = self.table_info(&options.table_name);
622 let mut put = vec![];
623 let mut add = vec![];
624 let mut del = vec![];
625 for (key, _) in fields_list.entries() {
626 if options.table_fields[key].is_empty() {
627 del.push(key);
628 }
629 }
630 for (name, field) in options.table_fields.entries() {
631 if !fields_list[name].is_empty() {
632 let old_comment = fields_list[name]["comment"].to_string();
633 let new_comment = br_fields::field("mysql", name, field.clone());
634 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
635 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
636 if old_comment == new_comment_text {
637 continue;
638 }
639 put.push(name);
640 } else {
641 add.push(name);
642 }
643 }
644
645 for name in add.iter() {
646 let name = name.to_string();
647 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
648 sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
649 }
650 for name in del.iter() {
651 sql.push(format!(
652 "ALTER TABLE {} DROP `{name}`;\r\n",
653 options.table_name
654 ));
655 }
656 for name in put.iter() {
657 let name = name.to_string();
658 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
659 sql.push(format!(
660 "ALTER TABLE {} CHANGE `{}` {};\r\n",
661 options.table_name, name, row
662 ));
663 }
664
665 let (_, index_list) =
666 self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
667 let (_, pk_list) = self.query(
669 format!(
670 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
671 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
672 self.connection.database, options.table_name
673 )
674 .as_str(),
675 );
676 let mut pk_vec = vec![];
677 for member in pk_list.members() {
678 pk_vec.push(member["COLUMN_NAME"].to_string());
679 }
680
681 let mut unique_new = vec![];
682 let mut index_new = vec![];
683 for item in index_list.members() {
684 let key_name = item["Key_name"].as_str().unwrap_or("");
685 let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
686
687 if non_unique == 0
688 && (key_name.contains(format!("{}_unique", options.table_name).as_str())
689 || key_name.contains("unique"))
690 {
691 unique_new.push(key_name.to_string());
692 continue;
693 }
694 if non_unique == 1
695 && (key_name.contains(format!("{}_index", options.table_name).as_str())
696 || key_name.contains("index"))
697 {
698 index_new.push(key_name.to_string());
699 continue;
700 }
701 }
702
703 let mut unique_fields = String::new();
704 let mut unique_name = String::new();
705 for item in options.table_unique.iter() {
706 if unique_fields.is_empty() {
707 unique_fields = format!("`{item}`");
708 unique_name = format!("{}_unique_{}", options.table_name, item);
709 } else {
710 unique_fields = format!("{unique_fields},`{item}`");
711 unique_name = format!("{unique_name}_{item}");
712 }
713 }
714 if !unique_name.is_empty() {
715 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
716 unique_name = format!("unique_{md5}");
717 for item in &unique_new {
718 if unique_name != *item {
719 sql.push(format!(
720 "alter table {} drop index {};\r\n",
721 options.table_name, item
722 ));
723 }
724 }
725 if !unique_new.contains(&unique_name) {
726 sql.push(format!(
727 "CREATE UNIQUE index {} on {} ({});\r\n",
728 unique_name, options.table_name, unique_fields
729 ));
730 }
731 }
732
733 let mut index_list = vec![];
734 for row in options.table_index.iter() {
736 let mut index_fields = String::new();
737 let mut index_name = String::new();
738 for item in row {
739 if index_fields.is_empty() {
740 index_fields = item.to_string();
741 index_name = format!("{}_index_{}", options.table_name, item);
742 } else {
743 index_fields = format!("{index_fields},{item}");
744 index_name = format!("{index_name}_{item}");
745 }
746 }
747 index_list.push(index_name.clone());
748 if !index_new.contains(&index_name.clone()) {
749 sql.push(format!(
750 "CREATE INDEX {} on {} ({});\r\n",
751 index_name, options.table_name, index_fields
752 ));
753 }
754 }
755
756 for item in index_new {
757 if !index_list.contains(&item.to_string()) {
758 sql.push(format!(
759 "DROP INDEX {} ON {};\r\n",
760 item.clone(),
761 options.table_name
762 ));
763 }
764 }
765
766 if options.table_partition {
768 if !pk_vec.contains(&options.table_key.to_string().clone())
770 || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
771 {
772 let pk = format!(
773 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
774 options.table_name,
775 options.table_key,
776 options.table_partition_columns[0].clone()
777 );
778 sql.push(pk);
779 let temp_head = format!(
780 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
781 options.table_name,
782 options.table_partition_columns[0].clone()
783 );
784 let mut partition_array = vec![];
785 let mut count = 0;
786 for member in options.table_partition_columns[1].members() {
787 let temp = format!(
788 "PARTITION p{} VALUES LESS THAN ('{}')",
789 count.clone(),
790 member.clone()
791 );
792 count += 1;
793 partition_array.push(temp.clone());
794 }
795 let temp_body = partition_array.join(",\r\n");
796 let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
797 sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
798 }
799 } else if pk_vec.len() != 1 {
800 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
801 sql.push(rm_partition);
802 let pk = format!(
803 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
804 options.table_name, options.table_key
805 );
806 sql.push(pk);
807 };
808
809 if self.params.sql {
810 return JsonValue::from(sql.join(""));
811 }
812
813 if sql.is_empty() {
814 return JsonValue::from(-1);
815 }
816
817 for item in sql.iter() {
818 let (state, res) = self.execute(item.as_str());
819 match state {
820 true => {}
821 false => {
822 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
823 return JsonValue::from(0);
824 }
825 }
826 }
827 JsonValue::from(1)
828 }
829
830 fn table_info(&mut self, table: &str) -> JsonValue {
831 let table_fields_guard = match TABLE_FIELDS.lock() {
832 Ok(g) => g,
833 Err(e) => e.into_inner(),
834 };
835 if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
836 return cached.clone();
837 }
838 drop(table_fields_guard);
839 let sql = format!(
840 "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'"
841 );
842 let (state, data) = self.query(sql.as_str());
843 let mut list = object! {};
844 if state {
845 for item in data.members() {
846 if item["TABLE_SCHEMA"] != self.connection.database {
847 continue;
848 }
849 let mut row = object! {};
850 row["field"] = item["COLUMN_NAME"].clone();
851 row["comment"] = item["COLUMN_COMMENT"].clone();
852 row["type"] = item["COLUMN_TYPE"].clone();
853 if let Some(field_name) = row["field"].as_str() {
854 list[field_name] = row.clone();
855 }
856 }
857 let mut table_fields_guard = match TABLE_FIELDS.lock() {
858 Ok(g) => g,
859 Err(e) => e.into_inner(),
860 };
861 table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
862 list
863 } else {
864 list
865 }
866 }
867
868 fn table_is_exist(&mut self, name: &str) -> bool {
869 let sql =
870 format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
871 let (state, data) = self.query(sql.as_str());
872 match state {
873 true => {
874 for item in data.members() {
875 if item["TABLE_NAME"] == name
876 && item["TABLE_SCHEMA"] == self.connection.database
877 {
878 return true;
879 }
880 }
881 false
882 }
883 false => false,
884 }
885 }
886
887 fn table(&mut self, name: &str) -> &mut Mysql {
888 self.params = Params::default(self.connection.mode.str().as_str());
889 let table_name = format!("{}{}", self.connection.prefix, name);
890 if !super::sql_safety::validate_table_name(&table_name) {
891 error!("Invalid table name: {}", name);
892 }
893 self.params.table = table_name.clone();
894 self.params.join_table = table_name;
895 self
896 }
897
898 fn change_table(&mut self, name: &str) -> &mut Self {
899 self.params.join_table = name.to_string();
900 self
901 }
902
903 fn autoinc(&mut self) -> &mut Self {
904 self.params.autoinc = true;
905 self
906 }
907
908 fn timestamps(&mut self) -> &mut Self {
909 self.params.timestamps = true;
910 self
911 }
912
913 fn fetch_sql(&mut self) -> &mut Self {
914 self.params.sql = true;
915 self
916 }
917
918 fn order(&mut self, field: &str, by: bool) -> &mut Self {
919 self.params.order[field] = {
920 if by {
921 "DESC"
922 } else {
923 "ASC"
924 }
925 }
926 .into();
927 self
928 }
929
930 fn group(&mut self, field: &str) -> &mut Self {
931 let fields: Vec<&str> = field.split(",").collect();
932 for field in fields.iter() {
933 let field = field.to_string();
934 self.params.group[field.as_str()] = field.clone().into();
935 if !self.params.fields.has_key(field.as_str()) {
936 self.params.fields[field.as_str()] = field.clone().into();
937 }
938 }
939
940 self
941 }
942
943 fn distinct(&mut self) -> &mut Self {
944 self.params.distinct = true;
945 self
946 }
947
948 fn json(&mut self, field: &str) -> &mut Self {
949 let list: Vec<&str> = field.split(",").collect();
950 for item in list.iter() {
951 self.params.json[item.to_string().as_str()] = item.to_string().into();
952 }
953 self
954 }
955
956 fn location(&mut self, field: &str) -> &mut Self {
957 let list: Vec<&str> = field.split(",").collect();
958 for item in list.iter() {
959 self.params.location[item.to_string().as_str()] = item.to_string().into();
960 }
961 self
962 }
963
964 fn field(&mut self, field: &str) -> &mut Self {
965 let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
966 let join_table = if self.params.join_table.is_empty() {
967 self.params.table.clone()
968 } else {
969 self.params.join_table.clone()
970 };
971 for item in list.iter() {
972 if item.contains(" as ") {
973 let text = item.split(" as ").collect::<Vec<&str>>();
974 if text[0].contains("count(") {
975 self.params.fields[item.to_string().as_str()] =
976 format!("{} as {}", text[0], text[1]).into();
977 } else {
978 self.params.fields[item.to_string().as_str()] =
979 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
980 }
981 } else {
982 self.params.fields[item.to_string().as_str()] =
983 format!("{join_table}.`{item}`").into();
984 }
985 }
986 self
987 }
988
989 fn field_raw(&mut self, expr: &str) -> &mut Self {
990 self.params.fields[expr] = expr.into();
991 self
992 }
993
994 fn hidden(&mut self, name: &str) -> &mut Self {
995 let hidden: Vec<&str> = name.split(",").collect();
996
997 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
998
999 let mut data = array![];
1000 for item in fields_list.members() {
1001 let _ = data.push(object! {
1002 "name":item["COLUMN_NAME"].as_str().unwrap_or("")
1003 });
1004 }
1005
1006 for item in data.members() {
1007 let name = item["name"].as_str().unwrap_or("");
1008 if !hidden.contains(&name) {
1009 self.params.fields[name] = name.into();
1010 }
1011 }
1012 self
1013 }
1014
1015 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1016 for f in field.split('|') {
1017 if !super::sql_safety::validate_field_name(f) {
1018 error!("Invalid field name: {}", f);
1019 }
1020 }
1021 if !super::sql_safety::validate_compare_orator(compare) {
1022 error!("Invalid compare operator: {}", compare);
1023 }
1024 let table_fields = self.table_info(&self.params.table.clone());
1025 let join_table = if self.params.join_table.is_empty() {
1026 self.params.table.clone()
1027 } else {
1028 self.params.join_table.clone()
1029 };
1030 if value.is_boolean() {
1031 if value.as_bool().unwrap_or(false) {
1032 value = 1.into();
1033 } else {
1034 value = 0.into();
1035 }
1036 }
1037 match compare {
1038 "between" => {
1039 self.params.where_and.push(format!(
1040 "{join_table}.`{field}` between '{}' AND '{}'",
1041 value[0], value[1]
1042 ));
1043 }
1044 "location" => {
1045 let comment = table_fields[field]["comment"].to_string();
1046 let srid = comment
1047 .split("|")
1048 .collect::<Vec<&str>>()
1049 .last()
1050 .unwrap_or(&"0")
1051 .to_string();
1052
1053 let field_name = format!(
1054 "ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
1055 value[0], value[1], value[4]
1056 );
1057 self.params.fields[&field_name.clone()] = field_name.clone().into();
1058 let location = format!(
1061 "ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
1062 value[0], value[1], value[2], value[3]
1063 );
1064 self.params.where_and.push(location);
1065 }
1066 "set" => {
1067 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1068 let mut wheredata = vec![];
1069 for item in list.iter() {
1070 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1071 }
1072 self.params
1073 .where_and
1074 .push(format!("({})", wheredata.join(" or ")));
1075 }
1076 "notin" => {
1077 let mut text = String::new();
1078 for item in value.members() {
1079 text = format!("{text},'{item}'");
1080 }
1081 text = text.trim_start_matches(",").into();
1082 self.params
1083 .where_and
1084 .push(format!("{join_table}.`{field}` not in ({text})"));
1085 }
1086 "is" => {
1087 self.params
1088 .where_and
1089 .push(format!("{join_table}.`{field}` is {value}"));
1090 }
1091 "isnot" => {
1092 self.params
1093 .where_and
1094 .push(format!("{join_table}.`{field}` is not {value}"));
1095 }
1096 "notlike" => {
1097 self.params
1098 .where_and
1099 .push(format!("{join_table}.`{field}` not like '{value}'"));
1100 }
1101 "in" => {
1102 let mut text = String::new();
1103 if value.is_array() {
1104 for item in value.members() {
1105 text = format!("{text},'{item}'");
1106 }
1107 } else if value.is_null() {
1108 text = format!("{text},null");
1109 } else {
1110 let value = value.as_str().unwrap_or("");
1111
1112 let value: Vec<&str> = value.split(",").collect();
1113 for item in value.iter() {
1114 text = format!("{text},'{item}'");
1115 }
1116 }
1117 text = text.trim_start_matches(",").into();
1118
1119 self.params
1120 .where_and
1121 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1122 }
1123 _ => {
1124 self.params
1125 .where_and
1126 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1127 }
1128 }
1129 self
1130 }
1131
1132 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1133 for f in field.split('|') {
1134 if !super::sql_safety::validate_field_name(f) {
1135 error!("Invalid field name: {}", f);
1136 }
1137 }
1138 if !super::sql_safety::validate_compare_orator(compare) {
1139 error!("Invalid compare operator: {}", compare);
1140 }
1141 let join_table = if self.params.join_table.is_empty() {
1142 self.params.table.clone()
1143 } else {
1144 self.params.join_table.clone()
1145 };
1146
1147 if value.is_boolean() {
1148 if value.as_bool().unwrap_or(false) {
1149 value = 1.into();
1150 } else {
1151 value = 0.into();
1152 }
1153 }
1154
1155 match compare {
1156 "between" => {
1157 self.params.where_or.push(format!(
1158 "{}.`{}` between '{}' AND '{}'",
1159 join_table, field, value[0], value[1]
1160 ));
1161 }
1162 "set" => {
1163 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1164 let mut wheredata = vec![];
1165 for item in list.iter() {
1166 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1167 }
1168 self.params
1169 .where_or
1170 .push(format!("({})", wheredata.join(" or ")));
1171 }
1172 "notin" => {
1173 let mut text = String::new();
1174 for item in value.members() {
1175 text = format!("{text},'{item}'");
1176 }
1177 text = text.trim_start_matches(",").into();
1178 self.params
1179 .where_or
1180 .push(format!("{join_table}.`{field}` not in ({text})"));
1181 }
1182 "is" => {
1183 self.params
1184 .where_or
1185 .push(format!("{join_table}.`{field}` is {value}"));
1186 }
1187 "isnot" => {
1188 self.params
1189 .where_or
1190 .push(format!("{join_table}.`{field}` IS NOT {value}"));
1191 }
1192 "in" => {
1193 let mut text = String::new();
1194 if value.is_array() {
1195 for item in value.members() {
1196 text = format!("{text},'{item}'");
1197 }
1198 } else {
1199 let value = value.as_str().unwrap_or("");
1200 let value: Vec<&str> = value.split(",").collect();
1201 for item in value.iter() {
1202 text = format!("{text},'{item}'");
1203 }
1204 }
1205 text = text.trim_start_matches(",").into();
1206 self.params
1207 .where_or
1208 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1209 }
1210 _ => {
1211 if field.contains(".") {
1212 self.params
1213 .where_or
1214 .push(format!("{field} {compare} '{value}'"));
1215 } else {
1216 self.params
1217 .where_or
1218 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1219 }
1220 }
1221 }
1222 self
1223 }
1224
1225 fn where_raw(&mut self, expr: &str) -> &mut Self {
1226 self.params.where_and.push(expr.to_string());
1227 self
1228 }
1229
1230 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1231 self.params
1232 .where_and
1233 .push(format!("`{field}` IN ({sub_sql})"));
1234 self
1235 }
1236
1237 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1238 self.params
1239 .where_and
1240 .push(format!("`{field}` NOT IN ({sub_sql})"));
1241 self
1242 }
1243
1244 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1245 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1246 self
1247 }
1248
1249 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1250 self.params
1251 .where_and
1252 .push(format!("NOT EXISTS ({sub_sql})"));
1253 self
1254 }
1255
1256 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1257 self.params.where_column = format!(
1258 "{}.`{}` {} {}.`{}`",
1259 self.params.table, field_a, compare, self.params.table, field_b
1260 );
1261 self
1262 }
1263
1264 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1265 self.params
1266 .update_column
1267 .push(format!("{field_a} = {compare}"));
1268 self
1269 }
1270
1271 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1272 self.params.page = page;
1273 self.params.limit = limit;
1274 self
1275 }
1276
1277 fn limit(&mut self, count: i32) -> &mut Self {
1278 self.params.limit_only = count;
1279 self
1280 }
1281
1282 fn column(&mut self, field: &str) -> JsonValue {
1283 self.field(field);
1284 let sql = self.params.select_sql();
1285
1286 if self.params.sql {
1287 return JsonValue::from(sql);
1288 }
1289 let (state, data) = self.query(sql.as_str());
1290 match state {
1291 true => {
1292 let mut list = array![];
1293 for item in data.members() {
1294 if self.params.json[field].is_empty() {
1295 let _ = list.push(item[field].clone());
1296 } else {
1297 let data =
1298 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1299 let _ = list.push(data);
1300 }
1301 }
1302 list
1303 }
1304 false => {
1305 array![]
1306 }
1307 }
1308 }
1309
1310 fn count(&mut self) -> JsonValue {
1311 if !self.params.fields.is_empty() {
1312 self.group(format!("{}.id", self.params.table).as_str());
1313 }
1314 self.params.fields["count"] = "count(*) as count".into();
1315 let sql = self.params.select_sql();
1316 if self.params.sql {
1317 return JsonValue::from(sql.clone());
1318 }
1319 let (state, data) = self.query(sql.as_str());
1320 if state {
1321 if data.is_empty() {
1322 JsonValue::from(0)
1323 } else {
1324 data[0]["count"].clone()
1325 }
1326 } else {
1327 JsonValue::from(0)
1328 }
1329 }
1330
1331 fn max(&mut self, field: &str) -> JsonValue {
1332 self.params.fields[field] = format!("max({field}) as {field}").into();
1333 let sql = self.params.select_sql();
1334 if self.params.sql {
1335 return JsonValue::from(sql.clone());
1336 }
1337 let (state, data) = self.query(sql.as_str());
1338 if state {
1339 if data.len() > 1 {
1340 return data.clone();
1341 }
1342 data[0][field].clone()
1343 } else {
1344 JsonValue::from(0)
1345 }
1346 }
1347
1348 fn min(&mut self, field: &str) -> JsonValue {
1349 self.params.fields[field] = format!("min({field}) as {field}").into();
1350 let sql = self.params.select_sql();
1351 if self.params.sql {
1352 return JsonValue::from(sql.clone());
1353 }
1354 let (state, data) = self.query(sql.as_str());
1355 if state {
1356 if data.len() > 1 {
1357 return data;
1358 }
1359 data[0][field].clone()
1360 } else {
1361 JsonValue::from(0)
1362 }
1363 }
1364
1365 fn sum(&mut self, field: &str) -> JsonValue {
1366 self.params.fields[field] = format!("sum({field}) as {field}").into();
1367 let sql = self.params.select_sql();
1368 if self.params.sql {
1369 return JsonValue::from(sql.clone());
1370 }
1371 let (state, data) = self.query(sql.as_str());
1372 match state {
1373 true => {
1374 if data.len() > 1 {
1375 return data;
1376 }
1377 data[0][field].clone()
1378 }
1379 false => JsonValue::from(0),
1380 }
1381 }
1382
1383 fn avg(&mut self, field: &str) -> JsonValue {
1384 self.params.fields[field] = format!("avg({field}) as {field}").into();
1385 let sql = self.params.select_sql();
1386 if self.params.sql {
1387 return JsonValue::from(sql.clone());
1388 }
1389 let (state, data) = self.query(sql.as_str());
1390 if state {
1391 if data.len() > 1 {
1392 return data;
1393 }
1394 data[0][field].clone()
1395 } else {
1396 JsonValue::from(0)
1397 }
1398 }
1399
1400 fn having(&mut self, expr: &str) -> &mut Self {
1401 self.params.having.push(expr.to_string());
1402 self
1403 }
1404
1405 fn select(&mut self) -> JsonValue {
1406 let sql = self.params.select_sql();
1407 if self.params.sql {
1408 return JsonValue::from(sql.clone());
1409 }
1410 let (state, mut data) = self.query(sql.as_str());
1411 match state {
1412 true => {
1413 for (field, _) in self.params.json.entries() {
1414 for item in data.members_mut() {
1415 if !item[field].is_empty() {
1416 let json = item[field].to_string();
1417 item[field] = match json::parse(&json) {
1418 Ok(e) => e,
1419 Err(_) => JsonValue::from(json),
1420 };
1421 }
1422 }
1423 }
1424 data.clone()
1425 }
1426 false => array![],
1427 }
1428 }
1429
1430 fn find(&mut self) -> JsonValue {
1431 self.params.page = 1;
1432 self.params.limit = 1;
1433 let sql = self.params.select_sql();
1434 if self.params.sql {
1435 return JsonValue::from(sql.clone());
1436 }
1437 let (state, mut data) = self.query(sql.as_str());
1438 match state {
1439 true => {
1440 if data.is_empty() {
1441 return object! {};
1442 }
1443 for (field, _) in self.params.json.entries() {
1444 if !data[0][field].is_empty() {
1445 let json = data[0][field].to_string();
1446 let json = json::parse(&json).unwrap_or(array![]);
1447 data[0][field] = json;
1448 } else {
1449 data[0][field] = array![];
1450 }
1451 }
1452 data[0].clone()
1453 }
1454 false => {
1455 error!("find失败: {data:?}");
1456 object! {}
1457 }
1458 }
1459 }
1460
1461 fn value(&mut self, field: &str) -> JsonValue {
1462 self.params.fields = object! {};
1463 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1464 self.params.page = 1;
1465 self.params.limit = 1;
1466 let sql = self.params.select_sql();
1467 if self.params.sql {
1468 return JsonValue::from(sql.clone());
1469 }
1470 let (state, mut data) = self.query(sql.as_str());
1471 match state {
1472 true => {
1473 for (field, _) in self.params.json.entries() {
1474 if !data[0][field].is_empty() {
1475 let json = data[0][field].to_string();
1476 let json = json::parse(&json).unwrap_or(array![]);
1477 data[0][field] = json;
1478 } else {
1479 data[0][field] = array![];
1480 }
1481 }
1482 data[0][field].clone()
1483 }
1484 false => {
1485 if self.connection.debug {
1486 info!("{data:?}");
1487 }
1488 JsonValue::Null
1489 }
1490 }
1491 }
1492 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1493 let fields_list = self.table_info(&self.params.table.clone());
1494
1495 let mut fields = vec![];
1496 let mut values = vec![];
1497 if !self.params.autoinc && data["id"].is_empty() {
1498 let thread_id = format!("{:?}", std::thread::current().id());
1499 let thread_num: u64 = thread_id
1500 .trim_start_matches("ThreadId(")
1501 .trim_end_matches(")")
1502 .parse()
1503 .unwrap_or(0);
1504 data["id"] = format!(
1505 "{:X}{:X}",
1506 Local::now().timestamp_nanos_opt().unwrap_or(0),
1507 thread_num
1508 )
1509 .into();
1510 }
1511 for (field, value) in data.entries() {
1512 fields.push(format!("`{field}`"));
1513
1514 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1515 if value.is_empty() {
1516 values.push("NULL".to_string());
1517 continue;
1518 }
1519 let comment = fields_list[field]["comment"].to_string();
1520 let srid = comment
1521 .split("|")
1522 .collect::<Vec<&str>>()
1523 .last()
1524 .unwrap_or(&"0")
1525 .to_string();
1526 let location = value.to_string().replace(",", " ");
1527 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1528 continue;
1529 }
1530
1531 if value.is_string() || value.is_array() || value.is_object() {
1532 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1533 continue;
1534 } else if value.is_number() || value.is_boolean() || value.is_null() {
1535 values.push(format!("{value}"));
1536 continue;
1537 } else {
1538 values.push(format!("'{value}'"));
1539 continue;
1540 }
1541 }
1542 let fields = fields.join(",");
1543 let values = values.join(",");
1544
1545 let sql = format!(
1546 "INSERT INTO {} ({fields}) VALUES ({values});",
1547 self.params.table
1548 );
1549 if self.params.sql {
1550 return JsonValue::from(sql.clone());
1551 }
1552 let (state, ids) = self.execute(sql.as_str());
1553
1554 match state {
1555 true => match self.params.autoinc {
1556 true => ids.clone(),
1557 false => data["id"].clone(),
1558 },
1559 false => {
1560 let thread_id = format!("{:?}", thread::current().id());
1561 error!("添加失败: {thread_id} {ids:?} {sql}");
1562 JsonValue::from("")
1563 }
1564 }
1565 }
1566 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1567 let fields_list = self.table_info(&self.params.table.clone());
1568
1569 let mut fields = String::new();
1570 if !self.params.autoinc && data[0]["id"].is_empty() {
1571 data[0]["id"] = "".into();
1572 }
1573 for (field, _) in data[0].entries() {
1574 fields = format!("{fields},`{field}`");
1575 }
1576 fields = fields.trim_start_matches(",").to_string();
1577
1578 let core_count = num_cpus::get();
1579 let mut p = pools::Pool::new(core_count * 4);
1580 let autoinc = self.params.autoinc;
1581 for list in data.members() {
1582 let mut item = list.clone();
1583 let params_location = self.params.location.clone();
1584 let fields_list_new = fields_list.clone();
1585 p.execute(move |pcindex| {
1586 if !autoinc && item["id"].is_empty() {
1587 let id = format!(
1588 "{:X}{:X}",
1589 Local::now().timestamp_nanos_opt().unwrap_or(0),
1590 pcindex
1591 );
1592 item["id"] = id.into();
1593 }
1594 let mut values = "".to_string();
1595 for (field, value) in item.entries() {
1596 if params_location.has_key(field) {
1597 if value.is_empty() {
1598 values = format!("{values},NULL");
1599 continue;
1600 }
1601 let comment = fields_list_new[field]["comment"].to_string();
1602 let srid = comment
1603 .split("|")
1604 .collect::<Vec<&str>>()
1605 .last()
1606 .unwrap_or(&"0")
1607 .to_string();
1608 let location = value.to_string().replace(",", " ");
1609 values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1610 continue;
1611 }
1612 if value.is_string() {
1613 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1614 } else if value.is_number() || value.is_boolean() {
1615 values = format!("{values},{value}");
1616 } else {
1617 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1618 }
1619 }
1620 values = format!("({})", values.trim_start_matches(","));
1621 array![item["id"].clone(), values]
1622 });
1623 }
1624 let (ids_list, mut values) = p.insert_all();
1625 values = values.trim_start_matches(",").to_string();
1626 let sql = format!(
1627 "INSERT INTO {} ({}) VALUES {};",
1628 self.params.table, fields, values
1629 );
1630
1631 if self.params.sql {
1632 return JsonValue::from(sql.clone());
1633 }
1634 let (state, data) = self.execute(sql.as_str());
1635 match state {
1636 true => match autoinc {
1637 true => data,
1638 false => JsonValue::from(ids_list),
1639 },
1640 false => {
1641 error!("insert_all: {data:?}");
1642 array![]
1643 }
1644 }
1645 }
1646 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1647 let fields_list = self.table_info(&self.params.table.clone());
1648
1649 let mut fields = vec![];
1650 let mut values = vec![];
1651 if !self.params.autoinc && data["id"].is_empty() {
1652 let thread_id = format!("{:?}", std::thread::current().id());
1653 let thread_num: u64 = thread_id
1654 .trim_start_matches("ThreadId(")
1655 .trim_end_matches(")")
1656 .parse()
1657 .unwrap_or(0);
1658 data["id"] = format!(
1659 "{:X}{:X}",
1660 Local::now().timestamp_nanos_opt().unwrap_or(0),
1661 thread_num
1662 )
1663 .into();
1664 }
1665 for (field, value) in data.entries() {
1666 fields.push(format!("`{field}`"));
1667
1668 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1669 if value.is_empty() {
1670 values.push("NULL".to_string());
1671 continue;
1672 }
1673 let comment = fields_list[field]["comment"].to_string();
1674 let srid = comment
1675 .split("|")
1676 .collect::<Vec<&str>>()
1677 .last()
1678 .unwrap_or(&"0")
1679 .to_string();
1680 let location = value.to_string().replace(",", " ");
1681 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1682 continue;
1683 }
1684
1685 if value.is_string() || value.is_array() || value.is_object() {
1686 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1687 continue;
1688 } else if value.is_number() || value.is_boolean() || value.is_null() {
1689 values.push(format!("{value}"));
1690 continue;
1691 } else {
1692 values.push(format!("'{value}'"));
1693 continue;
1694 }
1695 }
1696
1697 let conflict_set: Vec<String> = fields
1698 .iter()
1699 .filter(|f| {
1700 let name = f.trim_matches('`');
1701 !conflict_fields.contains(&name) && name != "id"
1702 })
1703 .map(|f| format!("{f}=VALUES({f})"))
1704 .collect();
1705
1706 let fields_str = fields.join(",");
1707 let values_str = values.join(",");
1708
1709 let sql = format!(
1710 "INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {};",
1711 self.params.table,
1712 fields_str,
1713 values_str,
1714 conflict_set.join(",")
1715 );
1716 if self.params.sql {
1717 return JsonValue::from(sql.clone());
1718 }
1719 let (state, result) = self.execute(sql.as_str());
1720 match state {
1721 true => match self.params.autoinc {
1722 true => result.clone(),
1723 false => data["id"].clone(),
1724 },
1725 false => {
1726 let thread_id = format!("{:?}", thread::current().id());
1727 error!("upsert失败: {thread_id} {result:?} {sql}");
1728 JsonValue::from("")
1729 }
1730 }
1731 }
1732 fn update(&mut self, data: JsonValue) -> JsonValue {
1733 let fields_list = self.table_info(&self.params.table.clone());
1734
1735 let mut values = vec![];
1736 for (field, value) in data.entries() {
1737 if !self.params.json[field].is_empty() {
1738 let json = value.to_string().replace("'", "''");
1739 values.push(format!("`{field}`='{json}'"));
1740 continue;
1741 }
1742 if !self.params.location[field].is_empty() {
1743 if value.is_empty() {
1744 values.push(format!("{field}=NULL").to_string());
1745 continue;
1746 }
1747 let comment = fields_list[field]["comment"].to_string();
1748 let srid = comment
1749 .split("|")
1750 .collect::<Vec<&str>>()
1751 .last()
1752 .unwrap_or(&"0")
1753 .to_string();
1754 let location = value.to_string().replace(",", " ");
1755 values.push(format!(
1756 "{field}=ST_GeomFromText('POINT({location})',{srid})"
1757 ));
1758
1759 continue;
1760 }
1761
1762 if value.is_string() {
1763 values.push(format!(
1764 "`{field}`='{}'",
1765 value.to_string().replace("'", "''")
1766 ));
1767 } else if value.is_number() {
1768 values.push(format!("`{field}`= {value}"));
1769 } else if value.is_array() {
1770 let array = value
1771 .members()
1772 .map(|x| x.as_str().unwrap_or(""))
1773 .collect::<Vec<&str>>()
1774 .join(",");
1775 values.push(format!("`{field}`='{array}'"));
1776 continue;
1777 } else if value.is_object() {
1778 if self.params.json[field].is_empty() {
1779 values.push(format!("`{field}`='{value}'"));
1780 } else {
1781 if value.is_empty() {
1782 values.push(format!("`{field}`=''"));
1783 continue;
1784 }
1785 let json = value.to_string();
1786 let json = json.replace("'", "''");
1787 values.push(format!("`{field}`='{json}'"));
1788 }
1789 continue;
1790 } else if value.is_boolean() {
1791 values.push(format!("`{field}`= {value}"));
1792 } else {
1793 values.push(format!("`{field}`=\"{value}\""));
1794 }
1795 }
1796
1797 for (field, value) in self.params.inc_dec.entries() {
1798 values.push(format!("{field} = {}", value.to_string().clone()));
1799 }
1800 if !self.params.update_column.is_empty() {
1801 values.extend(self.params.update_column.clone());
1802 }
1803
1804 let values = values.join(",");
1805
1806 let sql = format!(
1807 "UPDATE {} SET {values} {};",
1808 self.params.table.clone(),
1809 self.params.where_sql()
1810 );
1811 if self.params.sql {
1812 return JsonValue::from(sql.clone());
1813 }
1814 let (state, data) = self.execute(sql.as_str());
1815 if state {
1816 data
1817 } else {
1818 let thread_id = format!("{:?}", thread::current().id());
1819 error!("update: {thread_id} {data:?} {sql}");
1820 0.into()
1821 }
1822 }
1823
1824 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1825 let fields_list = self.table_info(&self.params.table.clone());
1826 let mut values = vec![];
1827 let mut ids = vec![];
1828 for (field, _) in data[0].entries() {
1829 if field == "id" {
1830 continue;
1831 }
1832 let mut fields = vec![];
1833 for row in data.members() {
1834 let value = row[field].clone();
1835 let id = row["id"].clone();
1836 ids.push(id.clone());
1837
1838 if self.params.json.has_key(field) {
1839 let json = value.to_string();
1840 let json = json.replace("'", "''");
1841 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1842 continue;
1843 }
1844 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1845 let comment = fields_list[field]["comment"].to_string();
1846 let srid = comment
1847 .split("|")
1848 .collect::<Vec<&str>>()
1849 .last()
1850 .unwrap_or(&"0")
1851 .to_string();
1852 let location = value.to_string().replace(",", " ");
1853 let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1854 fields.push(format!("WHEN '{id}' THEN {location}"));
1855 continue;
1856 }
1857 if value.is_string() {
1858 fields.push(format!(
1859 "WHEN '{id}' THEN '{}'",
1860 value.to_string().replace("'", "''")
1861 ));
1862 } else if value.is_array() || value.is_object() {
1863 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1864 } else if value.is_number() || value.is_boolean() || value.is_null() {
1865 fields.push(format!("WHEN '{id}' THEN {value}"));
1866 } else {
1867 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1868 }
1869 }
1870 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1871 }
1872 self.where_and("id", "in", ids.into());
1873 for (field, value) in self.params.inc_dec.entries() {
1874 values.push(format!("{} = {}", field, value.to_string().clone()));
1875 }
1876
1877 let values = values.join(",");
1878 let sql = format!(
1879 "UPDATE {} SET {} {} {};",
1880 self.params.table.clone(),
1881 values,
1882 self.params.where_sql(),
1883 self.params.page_limit_sql()
1884 );
1885 if self.params.sql {
1886 return JsonValue::from(sql.clone());
1887 }
1888 let (state, data) = self.execute(sql.as_str());
1889 if state {
1890 data
1891 } else {
1892 error!("update_all: {data:?}");
1893 JsonValue::from(0)
1894 }
1895 }
1896
1897 fn delete(&mut self) -> JsonValue {
1898 let sql = format!(
1899 "delete FROM {} {} {};",
1900 self.params.table.clone(),
1901 self.params.where_sql(),
1902 self.params.page_limit_sql()
1903 );
1904 if self.params.sql {
1905 return JsonValue::from(sql.clone());
1906 }
1907 let (state, data) = self.execute(sql.as_str());
1908 match state {
1909 true => data,
1910 false => {
1911 error!("delete 失败>>> {data:?}");
1912 JsonValue::from(0)
1913 }
1914 }
1915 }
1916 fn transaction(&mut self) -> bool {
1917 let thread_id = format!("{:?}", thread::current().id());
1918 let key = format!("{}{}", self.default, thread_id);
1919
1920 if TRANSACTION_MANAGER.is_in_transaction(&key) {
1921 let depth = TRANSACTION_MANAGER.get_depth(&key);
1922 TRANSACTION_MANAGER.increment_depth(&key);
1923 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1924 let _ = self.query(&sp);
1925 return true;
1926 }
1927
1928 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1929 Ok(e) => e,
1930 Err(err) => {
1931 error!("transaction 获取连接超时: {err}");
1932 return false;
1933 }
1934 };
1935
1936 if !TRANSACTION_MANAGER.start(&key, conn) {
1937 return false;
1938 }
1939
1940 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
1941 let (state, _) = self.query(sql);
1942 if !state {
1943 TRANSACTION_MANAGER.remove(&key, &thread_id);
1944 }
1945 state
1946 }
1947
1948 fn commit(&mut self) -> bool {
1949 let thread_id = format!("{:?}", thread::current().id());
1950 let key = format!("{}{}", self.default, thread_id);
1951
1952 let depth = TRANSACTION_MANAGER.get_depth(&key);
1953 if depth > 1 {
1954 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1955 let _ = self.query(&sp);
1956 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1957 return true;
1958 }
1959
1960 let sql = "COMMIT";
1961 let (state, data) = self.query(sql);
1962
1963 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1964
1965 if !state {
1966 error!("提交事务失败: {data}");
1967 }
1968 state
1969 }
1970
1971 fn rollback(&mut self) -> bool {
1972 let thread_id = format!("{:?}", thread::current().id());
1973 let key = format!("{}{}", self.default, thread_id);
1974
1975 let depth = TRANSACTION_MANAGER.get_depth(&key);
1976 if depth > 1 {
1977 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1978 let _ = self.query(&sp);
1979 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1980 return true;
1981 }
1982
1983 let sql = "ROLLBACK";
1984 let (state, data) = self.query(sql);
1985
1986 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1987
1988 if !state {
1989 error!("回滚失败: {data}");
1990 }
1991 state
1992 }
1993
1994 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1995 let (state, data) = self.query(sql);
1996 match state {
1997 true => Ok(data),
1998 false => Err(data.to_string()),
1999 }
2000 }
2001
2002 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2003 let (state, data) = self.execute(sql);
2004 match state {
2005 true => Ok(data),
2006 false => Err(data.to_string()),
2007 }
2008 }
2009
2010 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2011 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
2012 self
2013 }
2014 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2015 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
2016 self
2017 }
2018
2019 fn buildsql(&mut self) -> String {
2020 self.fetch_sql();
2021 let sql = self.select().to_string();
2022 format!("( {} ) `{}`", sql, self.params.table)
2023 }
2024
2025 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2026 for field in fields.clone() {
2027 if field.contains(format!("{}.", self.params.table).as_str()) {
2028 self.params.fields[field] = field.into();
2029 } else {
2030 self.params.fields[field] =
2031 format!("{field} as {}", field.replace(".", "_")).into();
2032 }
2033 }
2034 self
2035 }
2036
2037 fn join(
2038 &mut self,
2039 main_table: &str,
2040 main_fields: &str,
2041 right_table: &str,
2042 right_fields: &str,
2043 ) -> &mut Self {
2044 let main_table = if main_table.is_empty() {
2045 self.params.table.clone()
2046 } else {
2047 main_table.to_string()
2048 };
2049 self.params.join_table = right_table.to_string();
2050 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2051 self
2052 }
2053
2054 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2055 let main_fields = if main_fields.is_empty() {
2056 "id"
2057 } else {
2058 main_fields
2059 };
2060 let second_fields = if second_fields.is_empty() {
2061 self.params.table.clone()
2062 } else {
2063 second_fields.to_string().clone()
2064 };
2065 let sec_table_name = format!("{}{}", table, "_2");
2066 let second_table = format!("{} {}", table, sec_table_name.clone());
2067 self.params.join_table = sec_table_name.clone();
2068 self.params.join.push(format!(
2069 " INNER JOIN {} ON {}.{} = {}.{}",
2070 second_table, self.params.table, main_fields, sec_table_name, second_fields
2071 ));
2072 self
2073 }
2074
2075 fn join_right(
2076 &mut self,
2077 main_table: &str,
2078 main_fields: &str,
2079 right_table: &str,
2080 right_fields: &str,
2081 ) -> &mut Self {
2082 let main_table = if main_table.is_empty() {
2083 self.params.table.clone()
2084 } else {
2085 main_table.to_string()
2086 };
2087 self.params.join_table = right_table.to_string();
2088 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2089 self
2090 }
2091
2092 fn join_full(
2093 &mut self,
2094 main_table: &str,
2095 main_fields: &str,
2096 right_table: &str,
2097 right_fields: &str,
2098 ) -> &mut Self {
2099 let main_table = if main_table.is_empty() {
2100 self.params.table.clone()
2101 } else {
2102 main_table.to_string()
2103 };
2104 self.params.join_table = right_table.to_string();
2105 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2106 self
2107 }
2108
2109 fn union(&mut self, sub_sql: &str) -> &mut Self {
2110 self.params.unions.push(format!("UNION {sub_sql}"));
2111 self
2112 }
2113
2114 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2115 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2116 self
2117 }
2118
2119 fn lock_for_update(&mut self) -> &mut Self {
2120 self.params.lock_mode = "FOR UPDATE".to_string();
2121 self
2122 }
2123
2124 fn lock_for_share(&mut self) -> &mut Self {
2125 self.params.lock_mode = "FOR SHARE".to_string();
2126 self
2127 }
2128}