1use crate::pools;
2use crate::types::mysql_transaction::TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::{Connection, TABLE_FIELDS};
5use chrono::Local;
6use json::{array, object, JsonValue};
7use log::{error, info};
8use mysql::consts::ColumnType;
9use mysql::prelude::Queryable;
10use mysql::Value::NULL;
11use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
12
13use std::fmt::Debug;
14use std::ops::Index;
15use std::thread;
16use std::time::Duration;
17
18#[cfg(any(feature = "default", feature = "db-mysql"))]
19#[derive(Clone, Debug)]
20pub struct Mysql {
21 pub connection: Connection,
23 pub default: String,
25 pub params: Params,
26 pub pool: Pool,
27}
28
29impl Mysql {
30 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31 let pool_cfg = &connection.pool;
32 let pool_opts = PoolOpts::default()
33 .with_constraints(
34 PoolConstraints::new(
35 pool_cfg.min_connections as usize,
36 pool_cfg.max_connections as usize,
37 )
38 .unwrap_or_default(),
39 )
40 .with_reset_connection(true);
41
42 let opts = OptsBuilder::new()
43 .pool_opts(pool_opts)
44 .ip_or_hostname(Some(connection.hostname.clone()))
45 .tcp_port(connection.hostport.parse().unwrap_or(3306))
46 .user(Some(connection.username.clone()))
47 .pass(Some(connection.userpass.clone()))
48 .tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
49 .read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
50 .write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
51 .tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
52 .db_name(Some(connection.database.clone()));
53
54 match Pool::new(opts) {
55 Ok(pool) => Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("mysql"),
59 pool,
60 }),
61 Err(e) => {
62 error!("connect: {e}");
63 Err(e.to_string())
64 }
65 }
66 }
67 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
68 if sql.contains("INSERT") {
69 let rows = text.affected_rows();
70 if rows > 1 {
71 if self.params.autoinc {
72 let row = rows;
73 let start_row = text.last_insert_id().unwrap_or(0);
74 let end_row = start_row + row;
75
76 let mut ids = array![];
77 for item in start_row..end_row {
78 let _ = ids.push(item);
79 }
80 (true, ids)
81 } else {
82 (true, JsonValue::from(rows))
83 }
84 } else {
85 (true, JsonValue::from(text.last_insert_id()))
86 }
87 } else {
88 (true, JsonValue::from(text.affected_rows()))
89 }
90 }
91 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
92 let mut list = array![];
93 let mut index = 0;
94 text.for_each(|row| {
95 match row {
96 Ok(r) => {
97 let mut data = object! {};
98 for (index, item) in r.columns().iter().enumerate() {
99 let field = item.name_str();
100 let field = field.to_string();
101 let field = field.as_str();
102
103 data[field] = match item.column_type() {
104 ColumnType::MYSQL_TYPE_TINY => {
105 let t = r.get::<bool, _>(index).unwrap_or(true);
106 JsonValue::from(t)
107 }
108 ColumnType::MYSQL_TYPE_FLOAT
109 | ColumnType::MYSQL_TYPE_NEWDECIMAL
110 | ColumnType::MYSQL_TYPE_DOUBLE => {
111 let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
112 if t == NULL {
113 JsonValue::from(0.0)
114 } else {
115 match r.get::<f64, _>(index) {
116 None => JsonValue::from(0.0),
117 Some(t) => JsonValue::from(t),
118 }
119 }
120 }
121 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
122 let t = r.index(field).clone();
123 if t == NULL {
124 JsonValue::from(0)
125 } else {
126 let t = r.get::<i64, _>(index).unwrap_or(0);
127 JsonValue::from(t)
128 }
129 }
130 ColumnType::MYSQL_TYPE_NULL => {
131 let t = r.index(field).clone();
132 if t == NULL {
133 JsonValue::from("".to_string())
134 } else {
135 let t = r.get::<String, _>(index).unwrap_or("".to_string());
136 JsonValue::from(t)
137 }
138 }
139 ColumnType::MYSQL_TYPE_BLOB => {
140 let t = r.index(field).clone();
141 if t == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let t = r
145 .get::<mysql::Value, _>(index)
146 .unwrap_or("".to_string().into());
147 if t == NULL {
148 JsonValue::from("".to_string())
149 } else {
150 let t = r.get::<String, _>(index).unwrap_or("".to_string());
151 JsonValue::from(t)
152 }
153 }
154 }
155 ColumnType::MYSQL_TYPE_VAR_STRING => {
156 let t = r
157 .get::<mysql::Value, _>(index)
158 .unwrap_or("".to_string().into());
159 if t == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let t = r.get::<String, _>(index).unwrap_or("".to_string());
163 JsonValue::from(t)
164 }
165 }
166 ColumnType::MYSQL_TYPE_STRING => {
167 let t = r.index(field).clone();
168 if t == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let t = r.get::<String, _>(index).unwrap_or("".to_string());
172 JsonValue::from(t)
173 }
174 }
175 ColumnType::MYSQL_TYPE_DATE
176 | ColumnType::MYSQL_TYPE_DATETIME
177 | ColumnType::MYSQL_TYPE_LONG_BLOB
178 | ColumnType::MYSQL_TYPE_TIMESTAMP
179 | ColumnType::MYSQL_TYPE_TIME => {
180 let t = r.index(field).clone();
181 if t == NULL {
182 JsonValue::from("".to_string())
183 } else {
184 let t = r.get::<String, _>(index).unwrap_or("".to_string());
185 JsonValue::from(t)
186 }
187 }
188 ColumnType::MYSQL_TYPE_GEOMETRY => {
189 let t = r.index(field).clone();
190 if t == NULL {
191 JsonValue::from("".to_string())
192 } else {
193 let res = match r.index(field).clone() {
194 Value::Bytes(e) => e,
195 _ => vec![],
196 };
197 if res.len() >= 25 {
198 let x = f64::from_le_bytes(
199 res[9..17].try_into().unwrap_or([0u8; 8]),
200 );
201 let y = f64::from_le_bytes(
202 res[17..25].try_into().unwrap_or([0u8; 8]),
203 );
204 JsonValue::from(format!("{x},{y}"))
205 } else {
206 JsonValue::from("".to_string())
207 }
208 }
209 }
210 ColumnType::MYSQL_TYPE_JSON => {
211 let t = r.index(field).clone();
212 if t == NULL {
213 json::array![]
214 } else {
215 let t = r.get::<String, _>(index).unwrap_or("".to_string());
216 match json::parse(&t) {
217 Ok(v) => v,
218 Err(_) => JsonValue::from(t),
219 }
220 }
221 }
222 _ => {
223 let t = r.index(field).clone();
224 info!("未知: {} {:?} {:?}", field, item.column_type(), t);
225 JsonValue::from("".to_string())
226 }
227 };
228 }
229 let _ = list.push(data);
230 }
231 Err(e) => {
232 error!("err: {e} \r\n {sql}");
233 }
234 }
235 index += 1;
236 });
237 (true, list)
238 }
239 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
240 let thread_id = format!("{:?}", thread::current().id());
241 let key = format!("{}{}", self.default, thread_id);
242
243 let debug = self.connection.debug;
244 let params_json = self.params.json.clone();
245 let table_name = self.params.table.clone();
246
247 let is_system_query = sql.contains("INFORMATION_SCHEMA")
248 || sql.contains("information_schema")
249 || sql.starts_with("START TRANSACTION")
250 || sql.starts_with("COMMIT")
251 || sql.starts_with("ROLLBACK")
252 || sql.starts_with("SHOW ");
253
254 let fields_list = if !is_system_query && !table_name.is_empty() {
255 self.table_info(&table_name)
256 } else {
257 object! {}
258 };
259
260 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
261
262 if !in_transaction {
263 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
264 Ok(e) => e,
265 Err(err) => {
266 error!("非事务 execute超时: {err}");
267 return (false, object! {});
268 }
269 };
270 let connection_id = db.connection_id();
271 return match db.query_iter(sql) {
272 Ok(e) => {
273 if debug {
274 info!("查询成功: {} {}", thread_id, sql);
275 }
276 self.query_handle(e, sql)
277 }
278 Err(e) => {
279 error!(
280 "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
281 );
282 (false, JsonValue::from(e.to_string()))
283 }
284 };
285 }
286
287 match TRANSACTION_MANAGER.with_conn(&key, |db| {
288 let connection_id = db.connection_id();
289 match db.query_iter(sql) {
290 Ok(text) => {
291 if debug {
292 info!("查询成功: {} {}", thread_id, sql);
293 }
294 let mut list = array![];
295 text.for_each(|row| {
296 if let Ok(row) = row {
297 let mut item = object! {};
298 for column in row.columns_ref() {
299 let field = column.name_str().to_string();
300 let value = &row[column.name_str().as_ref()];
301 let column_type = column.column_type();
302 match column_type {
303 ColumnType::MYSQL_TYPE_VARCHAR
304 | ColumnType::MYSQL_TYPE_STRING
305 | ColumnType::MYSQL_TYPE_VAR_STRING
306 | ColumnType::MYSQL_TYPE_BLOB
307 | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
308 | ColumnType::MYSQL_TYPE_LONG_BLOB
309 | ColumnType::MYSQL_TYPE_TINY_BLOB => {
310 if *value == NULL {
311 item[field.as_str()] = "".into();
312 } else {
313 let data: String = mysql::from_value(value.clone());
314 if params_json.has_key(&field)
315 || fields_list[&field]["type"]
316 .to_string()
317 .contains("json")
318 {
319 item[field.as_str()] = match json::parse(&data) {
320 Ok(e) => e,
321 Err(_) => data.into(),
322 };
323 } else {
324 item[field.as_str()] = data.into();
325 }
326 }
327 }
328 ColumnType::MYSQL_TYPE_TINY
329 | ColumnType::MYSQL_TYPE_SHORT
330 | ColumnType::MYSQL_TYPE_LONG
331 | ColumnType::MYSQL_TYPE_INT24
332 | ColumnType::MYSQL_TYPE_LONGLONG => {
333 if *value == NULL {
334 item[field.as_str()] = 0.into();
335 } else {
336 let data: i64 = mysql::from_value(value.clone());
337 item[field.as_str()] = data.into();
338 }
339 }
340 ColumnType::MYSQL_TYPE_FLOAT
341 | ColumnType::MYSQL_TYPE_DOUBLE
342 | ColumnType::MYSQL_TYPE_DECIMAL
343 | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
344 if *value == NULL {
345 item[field.as_str()] = 0.0.into();
346 } else {
347 let data: f64 = mysql::from_value(value.clone());
348 item[field.as_str()] = data.into();
349 }
350 }
351 ColumnType::MYSQL_TYPE_JSON => {
352 if *value == NULL {
353 item[field.as_str()] = json::array![];
354 } else {
355 let data: String = mysql::from_value(value.clone());
356 match json::parse(&data) {
357 Ok(v) => item[field.as_str()] = v,
358 Err(_) => item[field.as_str()] = data.into(),
359 }
360 }
361 }
362 _ => {
363 if *value != NULL {
364 let data: String = mysql::from_value(value.clone());
365 item[field.as_str()] = data.into();
366 }
367 }
368 }
369 }
370 let _ = list.push(item);
371 }
372 });
373 (true, list)
374 }
375 Err(e) => {
376 error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
377 (false, JsonValue::from(e.to_string()))
378 }
379 }
380 }) {
381 Some(result) => result,
382 None => {
383 error!("事务连接不存在: {key}");
384 (false, object! {})
385 }
386 }
387 }
388 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
389 let thread_id = format!("{:?}", thread::current().id());
390 let key = format!("{}{}", self.default, thread_id);
391
392 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
393
394 if !in_transaction {
395 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
396 Ok(e) => e,
397 Err(err) => {
398 error!("非事务: execute超时: {err}");
399 return (false, object! {});
400 }
401 };
402 return match db.exec_iter(sql, ()) {
403 Ok(e) => {
404 if self.connection.debug {
405 info!("提交成功: {} {}", thread_id, sql);
406 }
407 self.execute_cl(e, sql)
408 }
409 Err(e) => {
410 error!("非事务提交失败: {thread_id} {e} {sql}");
411 (false, JsonValue::from(e.to_string()))
412 }
413 };
414 }
415
416 if !TRANSACTION_MANAGER.acquire_table_lock(
417 &self.params.table,
418 &thread_id,
419 Duration::from_secs(30),
420 ) {
421 error!("获取表锁超时: {} {}", self.params.table, thread_id);
422 return (false, object! {"error": "table lock timeout"});
423 }
424
425 let is_insert = sql.contains("INSERT");
426 let autoinc = self.params.autoinc;
427 let debug = self.connection.debug;
428
429 match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
431 Ok(result) => {
432 let affected_rows = result.affected_rows();
433 let last_insert_id = result.last_insert_id();
434 (true, affected_rows, last_insert_id, None)
435 }
436 Err(e) => (false, 0, None, Some(e.to_string())),
437 }) {
438 Some((true, affected_rows, last_insert_id, _)) => {
439 if debug {
440 info!("提交成功: {} {}", thread_id, sql);
441 }
442 if is_insert {
443 if affected_rows > 1 {
444 if autoinc {
445 let start_row = last_insert_id.unwrap_or(0);
446 let end_row = start_row + affected_rows;
447 let mut ids = array![];
448 for item in start_row..end_row {
449 let _ = ids.push(item);
450 }
451 (true, ids)
452 } else {
453 (true, JsonValue::from(affected_rows))
454 }
455 } else {
456 (true, JsonValue::from(last_insert_id))
457 }
458 } else {
459 (true, JsonValue::from(affected_rows))
460 }
461 }
462 Some((false, _, _, Some(err))) => {
463 error!("事务提交失败: {thread_id} {err} {sql}");
464 (false, JsonValue::from(err))
465 }
466 _ => {
467 error!("事务连接不存在: {key}");
468 (false, object! {})
469 }
470 }
471 }
472}
473
474impl DbMode for Mysql {
475 fn database_tables(&mut self) -> JsonValue {
476 let sql = "SHOW TABLES".to_string();
477 match self.sql(sql.as_str()) {
478 Ok(e) => {
479 let mut list = vec![];
480 for item in e.members() {
481 for (_, value) in item.entries() {
482 list.push(value.clone());
483 }
484 }
485 list.into()
486 }
487 Err(_) => {
488 array![]
489 }
490 }
491 }
492
493 fn database_create(&mut self, name: &str) -> bool {
494 let sql = format!("CREATE DATABASE {name}");
495
496 let (state, data) = self.execute(sql.as_str());
497 match state {
498 true => data.as_bool().unwrap_or(false),
499 false => {
500 error!("创建数据库失败: {data:?}");
501 false
502 }
503 }
504 }
505
506 fn truncate(&mut self, table: &str) -> bool {
507 let sql = format!("TRUNCATE TABLE {table}");
508 let (state, _) = self.execute(sql.as_str());
509 state
510 }
511}
512
513impl Mode for Mysql {
514 fn table_create(&mut self, options: TableOptions) -> JsonValue {
515 let mut sql = String::new();
516 let mut unique_fields = String::new();
518 let mut unique_name = String::new();
519 let mut unique = String::new();
520 for item in options.table_unique.iter() {
521 if unique_fields.is_empty() {
522 unique_fields = format!("`{item}`");
523 unique_name = format!("{}_unique_{}", options.table_name, item);
524 } else {
525 unique_fields = format!("{unique_fields},`{item}`");
526 unique_name = format!("{unique_name}_{item}");
527 }
528 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
529 unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
530 }
531
532 let mut index = String::new();
534 for row in options.table_index.iter() {
535 let mut index_fields = String::new();
536 let mut index_name = String::new();
537 for item in row.iter() {
538 if index_fields.is_empty() {
539 index_fields = format!("`{item}`");
540 index_name = format!("{}_index_{}", options.table_name, item);
541 } else {
542 index_fields = format!("{index_fields},`{item}`");
543 index_name = format!("{index_name}_{item}");
544 }
545 }
546 if index.is_empty() {
547 index = format!("INDEX `{index_name}` ({index_fields})");
548 } else {
549 index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
550 }
551 }
552 if index.replace(",", "").is_empty() {
553 index = index.replace(",", "");
554 }
555
556 for (name, field) in options.table_fields.entries() {
557 let row = br_fields::field("mysql", name, field.clone());
558 sql = format!("{sql} {row},\r\n");
559 }
560
561 if !unique.is_empty() {
562 sql = sql.trim_end_matches(",\r\n").to_string();
563 sql = format!("{sql},\r\n{unique}");
564 }
565 if !index.is_empty() {
566 sql = sql.trim_end_matches(",\r\n").to_string();
567 sql = format!("{sql},\r\n{index}");
568 }
569 let collate = format!("{}_bin", self.connection.charset.str());
570
571 let partition = if options.table_partition {
573 sql = format!(
574 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
575 sql,
576 options.table_key,
577 options.table_partition_columns[0].clone()
578 );
579 let temp_head = format!(
580 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
581 options.table_partition_columns[0].clone()
582 );
583 let mut partition_array = vec![];
584 let mut count = 0;
585 for member in options.table_partition_columns[1].members() {
586 let temp = format!(
587 "PARTITION p{} VALUES LESS THAN ('{}')",
588 count.clone(),
589 member.clone()
590 );
591 count += 1;
592 partition_array.push(temp.clone());
593 }
594 let temp_body = partition_array.join(",\r\n");
595 let temp_end = format!(
596 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
597 count.clone()
598 );
599 format!("{temp_head}{temp_body}{temp_end}")
600 } else {
601 sql = if sql.trim_end().ends_with(",") {
602 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
603 } else {
604 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
605 };
606 "".to_string()
607 };
608 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
609
610 if self.params.sql {
611 return JsonValue::from(sql);
612 }
613
614 let (state, data) = self.execute(sql.as_str());
615
616 match state {
617 true => JsonValue::from(state),
618 false => {
619 info!("创建错误: {data}");
620 JsonValue::from(state)
621 }
622 }
623 }
624
625 fn table_update(&mut self, options: TableOptions) -> JsonValue {
626 let table_fields_guard = match TABLE_FIELDS.lock() {
627 Ok(g) => g,
628 Err(e) => e.into_inner(),
629 };
630 if table_fields_guard
631 .get(&format!("{}{}", self.default, options.table_name))
632 .is_some()
633 {
634 drop(table_fields_guard);
635 let mut table_fields_guard = match TABLE_FIELDS.lock() {
636 Ok(g) => g,
637 Err(e) => e.into_inner(),
638 };
639 table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
640 } else {
641 drop(table_fields_guard);
642 }
643 let mut sql = vec![];
644 let fields_list = self.table_info(&options.table_name);
645 let mut put = vec![];
646 let mut add = vec![];
647 let mut del = vec![];
648 for (key, _) in fields_list.entries() {
649 if options.table_fields[key].is_empty() {
650 del.push(key);
651 }
652 }
653 for (name, field) in options.table_fields.entries() {
654 if !fields_list[name].is_empty() {
655 let old_comment = fields_list[name]["comment"].to_string();
656 let new_comment = br_fields::field("mysql", name, field.clone());
657 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
658 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
659 if old_comment == new_comment_text {
660 continue;
661 }
662 put.push(name);
663 } else {
664 add.push(name);
665 }
666 }
667
668 for name in add.iter() {
669 let name = name.to_string();
670 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
671 sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
672 }
673 for name in del.iter() {
674 sql.push(format!(
675 "ALTER TABLE {} DROP `{name}`;\r\n",
676 options.table_name
677 ));
678 }
679 for name in put.iter() {
680 let name = name.to_string();
681 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
682 sql.push(format!(
683 "ALTER TABLE {} CHANGE `{}` {};\r\n",
684 options.table_name, name, row
685 ));
686 }
687
688 let (_, index_list) =
689 self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
690 let (_, pk_list) = self.query(
692 format!(
693 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
694 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
695 self.connection.database, options.table_name
696 )
697 .as_str(),
698 );
699 let mut pk_vec = vec![];
700 for member in pk_list.members() {
701 pk_vec.push(member["COLUMN_NAME"].to_string());
702 }
703
704 let mut unique_new = vec![];
705 let mut index_new = vec![];
706 for item in index_list.members() {
707 let key_name = item["Key_name"].as_str().unwrap_or("");
708 let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
709
710 if non_unique == 0
711 && (key_name.contains(format!("{}_unique", options.table_name).as_str())
712 || key_name.contains("unique"))
713 {
714 unique_new.push(key_name.to_string());
715 continue;
716 }
717 if non_unique == 1
718 && (key_name.contains(format!("{}_index", options.table_name).as_str())
719 || key_name.contains("index"))
720 {
721 index_new.push(key_name.to_string());
722 continue;
723 }
724 }
725
726 let mut unique_fields = String::new();
727 let mut unique_name = String::new();
728 for item in options.table_unique.iter() {
729 if unique_fields.is_empty() {
730 unique_fields = format!("`{item}`");
731 unique_name = format!("{}_unique_{}", options.table_name, item);
732 } else {
733 unique_fields = format!("{unique_fields},`{item}`");
734 unique_name = format!("{unique_name}_{item}");
735 }
736 }
737 if !unique_name.is_empty() {
738 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
739 unique_name = format!("unique_{md5}");
740 for item in &unique_new {
741 if unique_name != *item {
742 sql.push(format!(
743 "alter table {} drop index {};\r\n",
744 options.table_name, item
745 ));
746 }
747 }
748 if !unique_new.contains(&unique_name) {
749 sql.push(format!(
750 "CREATE UNIQUE index {} on {} ({});\r\n",
751 unique_name, options.table_name, unique_fields
752 ));
753 }
754 }
755
756 let mut index_list = vec![];
757 for row in options.table_index.iter() {
759 let mut index_fields = String::new();
760 let mut index_name = String::new();
761 for item in row {
762 if index_fields.is_empty() {
763 index_fields = item.to_string();
764 index_name = format!("{}_index_{}", options.table_name, item);
765 } else {
766 index_fields = format!("{index_fields},{item}");
767 index_name = format!("{index_name}_{item}");
768 }
769 }
770 index_list.push(index_name.clone());
771 if !index_new.contains(&index_name.clone()) {
772 sql.push(format!(
773 "CREATE INDEX {} on {} ({});\r\n",
774 index_name, options.table_name, index_fields
775 ));
776 }
777 }
778
779 for item in index_new {
780 if !index_list.contains(&item.to_string()) {
781 sql.push(format!(
782 "DROP INDEX {} ON {};\r\n",
783 item.clone(),
784 options.table_name
785 ));
786 }
787 }
788
789 if options.table_partition {
791 if !pk_vec.contains(&options.table_key.to_string().clone())
793 || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
794 {
795 let pk = format!(
796 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
797 options.table_name,
798 options.table_key,
799 options.table_partition_columns[0].clone()
800 );
801 sql.push(pk);
802 let temp_head = format!(
803 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
804 options.table_name,
805 options.table_partition_columns[0].clone()
806 );
807 let mut partition_array = vec![];
808 let mut count = 0;
809 for member in options.table_partition_columns[1].members() {
810 let temp = format!(
811 "PARTITION p{} VALUES LESS THAN ('{}')",
812 count.clone(),
813 member.clone()
814 );
815 count += 1;
816 partition_array.push(temp.clone());
817 }
818 let temp_body = partition_array.join(",\r\n");
819 let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
820 sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
821 }
822 } else if pk_vec.len() != 1 {
823 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
824 sql.push(rm_partition);
825 let pk = format!(
826 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
827 options.table_name, options.table_key
828 );
829 sql.push(pk);
830 };
831
832 if self.params.sql {
833 return JsonValue::from(sql.join(""));
834 }
835
836 if sql.is_empty() {
837 return JsonValue::from(-1);
838 }
839
840 for item in sql.iter() {
841 let (state, res) = self.execute(item.as_str());
842 match state {
843 true => {}
844 false => {
845 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
846 return JsonValue::from(0);
847 }
848 }
849 }
850 JsonValue::from(1)
851 }
852
853 fn table_info(&mut self, table: &str) -> JsonValue {
854 let table_fields_guard = match TABLE_FIELDS.lock() {
855 Ok(g) => g,
856 Err(e) => e.into_inner(),
857 };
858 if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
859 return cached.clone();
860 }
861 drop(table_fields_guard);
862 let sql = format!(
863 "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'"
864 );
865 let (state, data) = self.query(sql.as_str());
866 let mut list = object! {};
867 if state {
868 for item in data.members() {
869 if item["TABLE_SCHEMA"] != self.connection.database {
870 continue;
871 }
872 let mut row = object! {};
873 row["field"] = item["COLUMN_NAME"].clone();
874 row["comment"] = item["COLUMN_COMMENT"].clone();
875 row["type"] = item["COLUMN_TYPE"].clone();
876 if let Some(field_name) = row["field"].as_str() {
877 list[field_name] = row.clone();
878 }
879 }
880 let mut table_fields_guard = match TABLE_FIELDS.lock() {
881 Ok(g) => g,
882 Err(e) => e.into_inner(),
883 };
884 table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
885 list
886 } else {
887 list
888 }
889 }
890
891 fn table_is_exist(&mut self, name: &str) -> bool {
892 let sql = format!(
893 "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = '{}' AND TABLE_SCHEMA = '{}'",
894 name, self.connection.database
895 );
896 let (state, data) = self.query(sql.as_str());
897 match state {
898 true => !data.is_empty() && !data.is_empty(),
899 false => false,
900 }
901 }
902
903 fn table(&mut self, name: &str) -> &mut Mysql {
904 self.params = Params::default(self.connection.mode.str().as_str());
905 let table_name = format!("{}{}", self.connection.prefix, name);
906 if !super::sql_safety::validate_table_name(&table_name) {
907 error!("Invalid table name: {}", name);
908 }
909 self.params.table = table_name.clone();
910 self.params.join_table = table_name;
911 self
912 }
913
914 fn change_table(&mut self, name: &str) -> &mut Self {
915 self.params.join_table = name.to_string();
916 self
917 }
918
919 fn autoinc(&mut self) -> &mut Self {
920 self.params.autoinc = true;
921 self
922 }
923
924 fn timestamps(&mut self) -> &mut Self {
925 self.params.timestamps = true;
926 self
927 }
928
929 fn fetch_sql(&mut self) -> &mut Self {
930 self.params.sql = true;
931 self
932 }
933
934 fn order(&mut self, field: &str, by: bool) -> &mut Self {
935 self.params.order[field] = {
936 if by {
937 "DESC"
938 } else {
939 "ASC"
940 }
941 }
942 .into();
943 self
944 }
945
946 fn group(&mut self, field: &str) -> &mut Self {
947 let fields: Vec<&str> = field.split(",").collect();
948 for field in fields.iter() {
949 let field = field.to_string();
950 self.params.group[field.as_str()] = field.clone().into();
951 if !self.params.fields.has_key(field.as_str()) {
952 self.params.fields[field.as_str()] = field.clone().into();
953 }
954 }
955
956 self
957 }
958
959 fn distinct(&mut self) -> &mut Self {
960 self.params.distinct = true;
961 self
962 }
963
964 fn json(&mut self, field: &str) -> &mut Self {
965 let list: Vec<&str> = field.split(",").collect();
966 for item in list.iter() {
967 self.params.json[item.to_string().as_str()] = item.to_string().into();
968 }
969 self
970 }
971
972 fn location(&mut self, field: &str) -> &mut Self {
973 let list: Vec<&str> = field.split(",").collect();
974 for item in list.iter() {
975 self.params.location[item.to_string().as_str()] = item.to_string().into();
976 }
977 self
978 }
979
980 fn field(&mut self, field: &str) -> &mut Self {
981 let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
982 let join_table = if self.params.join_table.is_empty() {
983 self.params.table.clone()
984 } else {
985 self.params.join_table.clone()
986 };
987 for item in list.iter() {
988 let lower = item.to_lowercase();
989 let is_expr = lower.contains("count(")
990 || lower.contains("sum(")
991 || lower.contains("avg(")
992 || lower.contains("max(")
993 || lower.contains("min(")
994 || lower.contains("case ");
995 if is_expr {
996 self.params.fields[item.to_string().as_str()] = (*item).into();
997 } else if item.contains(" as ") {
998 let text = item.split(" as ").collect::<Vec<&str>>();
999 self.params.fields[item.to_string().as_str()] =
1000 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
1001 } else {
1002 self.params.fields[item.to_string().as_str()] =
1003 format!("{join_table}.`{item}`").into();
1004 }
1005 }
1006 self
1007 }
1008
1009 fn field_raw(&mut self, expr: &str) -> &mut Self {
1010 self.params.fields[expr] = expr.into();
1011 self
1012 }
1013
1014 fn hidden(&mut self, name: &str) -> &mut Self {
1015 let hidden: Vec<&str> = name.split(",").collect();
1016
1017 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
1018
1019 let mut data = array![];
1020 for item in fields_list.members() {
1021 let _ = data.push(object! {
1022 "name":item["COLUMN_NAME"].as_str().unwrap_or("")
1023 });
1024 }
1025
1026 for item in data.members() {
1027 let name = item["name"].as_str().unwrap_or("");
1028 if !hidden.contains(&name) {
1029 self.params.fields[name] = name.into();
1030 }
1031 }
1032 self
1033 }
1034
1035 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1036 for f in field.split('|') {
1037 if !super::sql_safety::validate_field_name(f) {
1038 error!("Invalid field name: {}", f);
1039 }
1040 }
1041 if !super::sql_safety::validate_compare_orator(compare) {
1042 error!("Invalid compare operator: {}", compare);
1043 }
1044 let table_fields = self.table_info(&self.params.table.clone());
1045 let join_table = if self.params.join_table.is_empty() {
1046 self.params.table.clone()
1047 } else {
1048 self.params.join_table.clone()
1049 };
1050 if value.is_boolean() {
1051 if value.as_bool().unwrap_or(false) {
1052 value = 1.into();
1053 } else {
1054 value = 0.into();
1055 }
1056 }
1057 match compare {
1058 "between" => {
1059 self.params.where_and.push(format!(
1060 "{join_table}.`{field}` between '{}' AND '{}'",
1061 value[0], value[1]
1062 ));
1063 }
1064 "location" => {
1065 let comment = table_fields[field]["comment"].to_string();
1066 let srid = comment
1067 .split("|")
1068 .collect::<Vec<&str>>()
1069 .last()
1070 .unwrap_or(&"0")
1071 .to_string();
1072
1073 let field_name = format!(
1074 "ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
1075 value[0], value[1], value[4]
1076 );
1077 self.params.fields[&field_name.clone()] = field_name.clone().into();
1078 let location = format!(
1081 "ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
1082 value[0], value[1], value[2], value[3]
1083 );
1084 self.params.where_and.push(location);
1085 }
1086 "set" => {
1087 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1088 let mut wheredata = vec![];
1089 for item in list.iter() {
1090 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1091 }
1092 self.params
1093 .where_and
1094 .push(format!("({})", wheredata.join(" or ")));
1095 }
1096 "notin" => {
1097 let mut text = String::new();
1098 for item in value.members() {
1099 text = format!("{text},'{item}'");
1100 }
1101 text = text.trim_start_matches(",").into();
1102 self.params
1103 .where_and
1104 .push(format!("{join_table}.`{field}` not in ({text})"));
1105 }
1106 "is" => {
1107 self.params
1108 .where_and
1109 .push(format!("{join_table}.`{field}` is {value}"));
1110 }
1111 "isnot" => {
1112 self.params
1113 .where_and
1114 .push(format!("{join_table}.`{field}` is not {value}"));
1115 }
1116 "notlike" => {
1117 self.params
1118 .where_and
1119 .push(format!("{join_table}.`{field}` not like '{value}'"));
1120 }
1121 "in" => {
1122 if value.is_array() && value.is_empty() {
1123 self.params.where_and.push("1=0".to_string());
1124 return self;
1125 }
1126 let mut text = String::new();
1127 if value.is_array() {
1128 for item in value.members() {
1129 text = format!("{text},'{item}'");
1130 }
1131 } else if value.is_null() {
1132 text = format!("{text},null");
1133 } else {
1134 let value = value.as_str().unwrap_or("");
1135
1136 let value: Vec<&str> = value.split(",").collect();
1137 for item in value.iter() {
1138 text = format!("{text},'{item}'");
1139 }
1140 }
1141 text = text.trim_start_matches(",").into();
1142
1143 self.params
1144 .where_and
1145 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1146 }
1147 _ => {
1148 self.params
1149 .where_and
1150 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1151 }
1152 }
1153 self
1154 }
1155
1156 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1157 for f in field.split('|') {
1158 if !super::sql_safety::validate_field_name(f) {
1159 error!("Invalid field name: {}", f);
1160 }
1161 }
1162 if !super::sql_safety::validate_compare_orator(compare) {
1163 error!("Invalid compare operator: {}", compare);
1164 }
1165 let join_table = if self.params.join_table.is_empty() {
1166 self.params.table.clone()
1167 } else {
1168 self.params.join_table.clone()
1169 };
1170
1171 if value.is_boolean() {
1172 if value.as_bool().unwrap_or(false) {
1173 value = 1.into();
1174 } else {
1175 value = 0.into();
1176 }
1177 }
1178
1179 match compare {
1180 "between" => {
1181 self.params.where_or.push(format!(
1182 "{}.`{}` between '{}' AND '{}'",
1183 join_table, field, value[0], value[1]
1184 ));
1185 }
1186 "set" => {
1187 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1188 let mut wheredata = vec![];
1189 for item in list.iter() {
1190 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1191 }
1192 self.params
1193 .where_or
1194 .push(format!("({})", wheredata.join(" or ")));
1195 }
1196 "notin" => {
1197 let mut text = String::new();
1198 for item in value.members() {
1199 text = format!("{text},'{item}'");
1200 }
1201 text = text.trim_start_matches(",").into();
1202 self.params
1203 .where_or
1204 .push(format!("{join_table}.`{field}` not in ({text})"));
1205 }
1206 "is" => {
1207 self.params
1208 .where_or
1209 .push(format!("{join_table}.`{field}` is {value}"));
1210 }
1211 "isnot" => {
1212 self.params
1213 .where_or
1214 .push(format!("{join_table}.`{field}` IS NOT {value}"));
1215 }
1216 "in" => {
1217 if value.is_array() && value.is_empty() {
1218 self.params.where_or.push("1=0".to_string());
1219 return self;
1220 }
1221 let mut text = String::new();
1222 if value.is_array() {
1223 for item in value.members() {
1224 text = format!("{text},'{item}'");
1225 }
1226 } else {
1227 let value = value.as_str().unwrap_or("");
1228 let value: Vec<&str> = value.split(",").collect();
1229 for item in value.iter() {
1230 text = format!("{text},'{item}'");
1231 }
1232 }
1233 text = text.trim_start_matches(",").into();
1234 self.params
1235 .where_or
1236 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1237 }
1238 _ => {
1239 if field.contains(".") {
1240 self.params
1241 .where_or
1242 .push(format!("{field} {compare} '{value}'"));
1243 } else {
1244 self.params
1245 .where_or
1246 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1247 }
1248 }
1249 }
1250 self
1251 }
1252
1253 fn where_raw(&mut self, expr: &str) -> &mut Self {
1254 self.params.where_and.push(expr.to_string());
1255 self
1256 }
1257
1258 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1259 self.params
1260 .where_and
1261 .push(format!("`{field}` IN ({sub_sql})"));
1262 self
1263 }
1264
1265 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1266 self.params
1267 .where_and
1268 .push(format!("`{field}` NOT IN ({sub_sql})"));
1269 self
1270 }
1271
1272 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1273 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1274 self
1275 }
1276
1277 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1278 self.params
1279 .where_and
1280 .push(format!("NOT EXISTS ({sub_sql})"));
1281 self
1282 }
1283
1284 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1285 self.params.where_column = format!(
1286 "{}.`{}` {} {}.`{}`",
1287 self.params.table, field_a, compare, self.params.table, field_b
1288 );
1289 self
1290 }
1291
1292 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1293 self.params
1294 .update_column
1295 .push(format!("{field_a} = {compare}"));
1296 self
1297 }
1298
1299 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1300 self.params.page = page;
1301 self.params.limit = limit;
1302 self
1303 }
1304
1305 fn limit(&mut self, count: i32) -> &mut Self {
1306 self.params.limit_only = count;
1307 self
1308 }
1309
1310 fn column(&mut self, field: &str) -> JsonValue {
1311 self.field(field);
1312 let sql = self.params.select_sql();
1313
1314 if self.params.sql {
1315 return JsonValue::from(sql);
1316 }
1317 let (state, data) = self.query(sql.as_str());
1318 match state {
1319 true => {
1320 let mut list = array![];
1321 for item in data.members() {
1322 if self.params.json[field].is_empty() {
1323 let _ = list.push(item[field].clone());
1324 } else {
1325 let data =
1326 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1327 let _ = list.push(data);
1328 }
1329 }
1330 list
1331 }
1332 false => {
1333 array![]
1334 }
1335 }
1336 }
1337
1338 fn count(&mut self) -> JsonValue {
1339 if !self.params.fields.is_empty() {
1340 self.group(format!("{}.id", self.params.table).as_str());
1341 }
1342 self.params.fields["count"] = "count(*) as count".into();
1343 let sql = self.params.select_sql();
1344 if self.params.sql {
1345 return JsonValue::from(sql.clone());
1346 }
1347 let (state, data) = self.query(sql.as_str());
1348 if state {
1349 if data.is_empty() {
1350 JsonValue::from(0)
1351 } else {
1352 data[0]["count"].clone()
1353 }
1354 } else {
1355 JsonValue::from(0)
1356 }
1357 }
1358
1359 fn max(&mut self, field: &str) -> JsonValue {
1360 self.params.fields[field] = format!("max({field}) as {field}").into();
1361 let sql = self.params.select_sql();
1362 if self.params.sql {
1363 return JsonValue::from(sql.clone());
1364 }
1365 let (state, data) = self.query(sql.as_str());
1366 if state {
1367 if data.len() > 1 {
1368 return data.clone();
1369 }
1370 data[0][field].clone()
1371 } else {
1372 JsonValue::from(0)
1373 }
1374 }
1375
1376 fn min(&mut self, field: &str) -> JsonValue {
1377 self.params.fields[field] = format!("min({field}) as {field}").into();
1378 let sql = self.params.select_sql();
1379 if self.params.sql {
1380 return JsonValue::from(sql.clone());
1381 }
1382 let (state, data) = self.query(sql.as_str());
1383 if state {
1384 if data.len() > 1 {
1385 return data;
1386 }
1387 data[0][field].clone()
1388 } else {
1389 JsonValue::from(0)
1390 }
1391 }
1392
1393 fn sum(&mut self, field: &str) -> JsonValue {
1394 self.params.fields[field] = format!("sum({field}) as {field}").into();
1395 let sql = self.params.select_sql();
1396 if self.params.sql {
1397 return JsonValue::from(sql.clone());
1398 }
1399 let (state, data) = self.query(sql.as_str());
1400 match state {
1401 true => {
1402 if data.len() > 1 {
1403 return data;
1404 }
1405 data[0][field].clone()
1406 }
1407 false => JsonValue::from(0),
1408 }
1409 }
1410
1411 fn avg(&mut self, field: &str) -> JsonValue {
1412 self.params.fields[field] = format!("avg({field}) as {field}").into();
1413 let sql = self.params.select_sql();
1414 if self.params.sql {
1415 return JsonValue::from(sql.clone());
1416 }
1417 let (state, data) = self.query(sql.as_str());
1418 if state {
1419 if data.len() > 1 {
1420 return data;
1421 }
1422 data[0][field].clone()
1423 } else {
1424 JsonValue::from(0)
1425 }
1426 }
1427
1428 fn having(&mut self, expr: &str) -> &mut Self {
1429 self.params.having.push(expr.to_string());
1430 self
1431 }
1432
1433 fn select(&mut self) -> JsonValue {
1434 let sql = self.params.select_sql();
1435 if self.params.sql {
1436 return JsonValue::from(sql.clone());
1437 }
1438 let (state, mut data) = self.query(sql.as_str());
1439 match state {
1440 true => {
1441 for (field, _) in self.params.json.entries() {
1442 for item in data.members_mut() {
1443 if !item[field].is_empty() {
1444 let json = item[field].to_string();
1445 item[field] = match json::parse(&json) {
1446 Ok(e) => e,
1447 Err(_) => JsonValue::from(json),
1448 };
1449 }
1450 }
1451 }
1452 data.clone()
1453 }
1454 false => array![],
1455 }
1456 }
1457
1458 fn find(&mut self) -> JsonValue {
1459 self.params.page = 1;
1460 self.params.limit = 1;
1461 let sql = self.params.select_sql();
1462 if self.params.sql {
1463 return JsonValue::from(sql.clone());
1464 }
1465 let (state, mut data) = self.query(sql.as_str());
1466 match state {
1467 true => {
1468 if data.is_empty() {
1469 return object! {};
1470 }
1471 for (field, _) in self.params.json.entries() {
1472 if !data[0][field].is_empty() {
1473 let json = data[0][field].to_string();
1474 let json = json::parse(&json).unwrap_or(array![]);
1475 data[0][field] = json;
1476 } else {
1477 data[0][field] = array![];
1478 }
1479 }
1480 data[0].clone()
1481 }
1482 false => {
1483 error!("find失败: {data:?}");
1484 object! {}
1485 }
1486 }
1487 }
1488
1489 fn value(&mut self, field: &str) -> JsonValue {
1490 self.params.fields = object! {};
1491 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1492 self.params.page = 1;
1493 self.params.limit = 1;
1494 let sql = self.params.select_sql();
1495 if self.params.sql {
1496 return JsonValue::from(sql.clone());
1497 }
1498 let (state, mut data) = self.query(sql.as_str());
1499 match state {
1500 true => {
1501 for (field, _) in self.params.json.entries() {
1502 if !data[0][field].is_empty() {
1503 let json = data[0][field].to_string();
1504 let json = json::parse(&json).unwrap_or(array![]);
1505 data[0][field] = json;
1506 } else {
1507 data[0][field] = array![];
1508 }
1509 }
1510 data[0][field].clone()
1511 }
1512 false => {
1513 if self.connection.debug {
1514 info!("{data:?}");
1515 }
1516 JsonValue::Null
1517 }
1518 }
1519 }
1520 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1521 let fields_list = self.table_info(&self.params.table.clone());
1522
1523 let mut fields = vec![];
1524 let mut values = vec![];
1525 if !self.params.autoinc && data["id"].is_empty() {
1526 let thread_id = format!("{:?}", std::thread::current().id());
1527 let thread_num: u64 = thread_id
1528 .trim_start_matches("ThreadId(")
1529 .trim_end_matches(")")
1530 .parse()
1531 .unwrap_or(0);
1532 data["id"] = format!(
1533 "{:X}{:X}",
1534 Local::now().timestamp_nanos_opt().unwrap_or(0),
1535 thread_num
1536 )
1537 .into();
1538 }
1539 for (field, value) in data.entries() {
1540 fields.push(format!("`{field}`"));
1541
1542 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1543 if value.is_empty() {
1544 values.push("NULL".to_string());
1545 continue;
1546 }
1547 let comment = fields_list[field]["comment"].to_string();
1548 let srid = comment
1549 .split("|")
1550 .collect::<Vec<&str>>()
1551 .last()
1552 .unwrap_or(&"0")
1553 .to_string();
1554 let location = value.to_string().replace(",", " ");
1555 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1556 continue;
1557 }
1558
1559 if value.is_string() || value.is_array() || value.is_object() {
1560 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1561 continue;
1562 } else if value.is_number() {
1563 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1564 if col_type.contains("int") {
1565 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1566 } else {
1567 values.push(format!("{value}"));
1568 }
1569 continue;
1570 } else if value.is_boolean() || value.is_null() {
1571 values.push(format!("{value}"));
1572 continue;
1573 } else {
1574 values.push(format!("'{value}'"));
1575 continue;
1576 }
1577 }
1578 let fields = fields.join(",");
1579 let values = values.join(",");
1580
1581 let sql = format!(
1582 "INSERT INTO {} ({fields}) VALUES ({values});",
1583 self.params.table
1584 );
1585 if self.params.sql {
1586 return JsonValue::from(sql.clone());
1587 }
1588 let (state, ids) = self.execute(sql.as_str());
1589
1590 match state {
1591 true => match self.params.autoinc {
1592 true => ids.clone(),
1593 false => data["id"].clone(),
1594 },
1595 false => {
1596 let thread_id = format!("{:?}", thread::current().id());
1597 error!("添加失败: {thread_id} {ids:?} {sql}");
1598 JsonValue::from("")
1599 }
1600 }
1601 }
1602 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1603 let fields_list = self.table_info(&self.params.table.clone());
1604
1605 let mut fields = String::new();
1606 if !self.params.autoinc && data[0]["id"].is_empty() {
1607 data[0]["id"] = "".into();
1608 }
1609 for (field, _) in data[0].entries() {
1610 fields = format!("{fields},`{field}`");
1611 }
1612 fields = fields.trim_start_matches(",").to_string();
1613
1614 let core_count = num_cpus::get();
1615 let mut p = pools::Pool::new(core_count * 4);
1616 let autoinc = self.params.autoinc;
1617 for list in data.members() {
1618 let mut item = list.clone();
1619 let params_location = self.params.location.clone();
1620 let fields_list_new = fields_list.clone();
1621 p.execute(move |pcindex| {
1622 if !autoinc && item["id"].is_empty() {
1623 let id = format!(
1624 "{:X}{:X}",
1625 Local::now().timestamp_nanos_opt().unwrap_or(0),
1626 pcindex
1627 );
1628 item["id"] = id.into();
1629 }
1630 let mut values = "".to_string();
1631 for (field, value) in item.entries() {
1632 if params_location.has_key(field) {
1633 if value.is_empty() {
1634 values = format!("{values},NULL");
1635 continue;
1636 }
1637 let comment = fields_list_new[field]["comment"].to_string();
1638 let srid = comment
1639 .split("|")
1640 .collect::<Vec<&str>>()
1641 .last()
1642 .unwrap_or(&"0")
1643 .to_string();
1644 let location = value.to_string().replace(",", " ");
1645 values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1646 continue;
1647 }
1648 if value.is_string() {
1649 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1650 } else if value.is_number() {
1651 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1652 if col_type.contains("int") {
1653 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1654 } else {
1655 values = format!("{values},{value}");
1656 }
1657 } else if value.is_boolean() {
1658 values = format!("{values},{value}");
1659 } else {
1660 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1661 }
1662 }
1663 values = format!("({})", values.trim_start_matches(","));
1664 array![item["id"].clone(), values]
1665 });
1666 }
1667 let (ids_list, mut values) = p.insert_all();
1668 values = values.trim_start_matches(",").to_string();
1669 let sql = format!(
1670 "INSERT INTO {} ({}) VALUES {};",
1671 self.params.table, fields, values
1672 );
1673
1674 if self.params.sql {
1675 return JsonValue::from(sql.clone());
1676 }
1677 let (state, data) = self.execute(sql.as_str());
1678 match state {
1679 true => match autoinc {
1680 true => data,
1681 false => JsonValue::from(ids_list),
1682 },
1683 false => {
1684 error!("insert_all: {data:?}");
1685 array![]
1686 }
1687 }
1688 }
1689 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1690 let fields_list = self.table_info(&self.params.table.clone());
1691
1692 let mut fields = vec![];
1693 let mut values = vec![];
1694 if !self.params.autoinc && data["id"].is_empty() {
1695 let thread_id = format!("{:?}", std::thread::current().id());
1696 let thread_num: u64 = thread_id
1697 .trim_start_matches("ThreadId(")
1698 .trim_end_matches(")")
1699 .parse()
1700 .unwrap_or(0);
1701 data["id"] = format!(
1702 "{:X}{:X}",
1703 Local::now().timestamp_nanos_opt().unwrap_or(0),
1704 thread_num
1705 )
1706 .into();
1707 }
1708 for (field, value) in data.entries() {
1709 fields.push(format!("`{field}`"));
1710
1711 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1712 if value.is_empty() {
1713 values.push("NULL".to_string());
1714 continue;
1715 }
1716 let comment = fields_list[field]["comment"].to_string();
1717 let srid = comment
1718 .split("|")
1719 .collect::<Vec<&str>>()
1720 .last()
1721 .unwrap_or(&"0")
1722 .to_string();
1723 let location = value.to_string().replace(",", " ");
1724 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1725 continue;
1726 }
1727
1728 if value.is_string() || value.is_array() || value.is_object() {
1729 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1730 continue;
1731 } else if value.is_number() {
1732 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1733 if col_type.contains("int") {
1734 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1735 } else {
1736 values.push(format!("{value}"));
1737 }
1738 continue;
1739 } else if value.is_boolean() || value.is_null() {
1740 values.push(format!("{value}"));
1741 continue;
1742 } else {
1743 values.push(format!("'{value}'"));
1744 continue;
1745 }
1746 }
1747
1748 let conflict_set: Vec<String> = fields
1749 .iter()
1750 .filter(|f| {
1751 let name = f.trim_matches('`');
1752 !conflict_fields.contains(&name) && name != "id"
1753 })
1754 .map(|f| format!("{f}=VALUES({f})"))
1755 .collect();
1756
1757 let fields_str = fields.join(",");
1758 let values_str = values.join(",");
1759
1760 let sql = format!(
1761 "INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {};",
1762 self.params.table,
1763 fields_str,
1764 values_str,
1765 conflict_set.join(",")
1766 );
1767 if self.params.sql {
1768 return JsonValue::from(sql.clone());
1769 }
1770 let (state, result) = self.execute(sql.as_str());
1771 match state {
1772 true => match self.params.autoinc {
1773 true => result.clone(),
1774 false => data["id"].clone(),
1775 },
1776 false => {
1777 let thread_id = format!("{:?}", thread::current().id());
1778 error!("upsert失败: {thread_id} {result:?} {sql}");
1779 JsonValue::from("")
1780 }
1781 }
1782 }
1783 fn update(&mut self, data: JsonValue) -> JsonValue {
1784 let fields_list = self.table_info(&self.params.table.clone());
1785
1786 let mut values = vec![];
1787 for (field, value) in data.entries() {
1788 if !self.params.json[field].is_empty() {
1789 let json = value.to_string().replace("'", "''");
1790 values.push(format!("`{field}`='{json}'"));
1791 continue;
1792 }
1793 if !self.params.location[field].is_empty() {
1794 if value.is_empty() {
1795 values.push(format!("{field}=NULL").to_string());
1796 continue;
1797 }
1798 let comment = fields_list[field]["comment"].to_string();
1799 let srid = comment
1800 .split("|")
1801 .collect::<Vec<&str>>()
1802 .last()
1803 .unwrap_or(&"0")
1804 .to_string();
1805 let location = value.to_string().replace(",", " ");
1806 values.push(format!(
1807 "{field}=ST_GeomFromText('POINT({location})',{srid})"
1808 ));
1809
1810 continue;
1811 }
1812
1813 if value.is_string() {
1814 values.push(format!(
1815 "`{field}`='{}'",
1816 value.to_string().replace("'", "''")
1817 ));
1818 } else if value.is_number() {
1819 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1820 if col_type.contains("int") {
1821 values.push(format!(
1822 "`{field}`= {}",
1823 value.as_f64().unwrap_or(0.0) as i64
1824 ));
1825 } else {
1826 values.push(format!("`{field}`= {value}"));
1827 }
1828 } else if value.is_array() {
1829 let array = value
1830 .members()
1831 .map(|x| x.as_str().unwrap_or(""))
1832 .collect::<Vec<&str>>()
1833 .join(",");
1834 values.push(format!("`{field}`='{array}'"));
1835 continue;
1836 } else if value.is_object() {
1837 if self.params.json[field].is_empty() {
1838 values.push(format!("`{field}`='{value}'"));
1839 } else {
1840 if value.is_empty() {
1841 values.push(format!("`{field}`=''"));
1842 continue;
1843 }
1844 let json = value.to_string();
1845 let json = json.replace("'", "''");
1846 values.push(format!("`{field}`='{json}'"));
1847 }
1848 continue;
1849 } else if value.is_boolean() {
1850 values.push(format!("`{field}`= {value}"));
1851 } else {
1852 values.push(format!("`{field}`=\"{value}\""));
1853 }
1854 }
1855
1856 for (field, value) in self.params.inc_dec.entries() {
1857 values.push(format!("{field} = {}", value.to_string().clone()));
1858 }
1859 if !self.params.update_column.is_empty() {
1860 values.extend(self.params.update_column.clone());
1861 }
1862
1863 let values = values.join(",");
1864
1865 let sql = format!(
1866 "UPDATE {} SET {values} {};",
1867 self.params.table.clone(),
1868 self.params.where_sql()
1869 );
1870 if self.params.sql {
1871 return JsonValue::from(sql.clone());
1872 }
1873 let (state, data) = self.execute(sql.as_str());
1874 if state {
1875 data
1876 } else {
1877 let thread_id = format!("{:?}", thread::current().id());
1878 error!("update: {thread_id} {data:?} {sql}");
1879 0.into()
1880 }
1881 }
1882
1883 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1884 let fields_list = self.table_info(&self.params.table.clone());
1885 let mut values = vec![];
1886 let mut ids = vec![];
1887 for (field, _) in data[0].entries() {
1888 if field == "id" {
1889 continue;
1890 }
1891 let mut fields = vec![];
1892 for row in data.members() {
1893 let value = row[field].clone();
1894 let id = row["id"].clone();
1895 ids.push(id.clone());
1896
1897 if self.params.json.has_key(field) {
1898 let json = value.to_string();
1899 let json = json.replace("'", "''");
1900 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1901 continue;
1902 }
1903 if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1904 let comment = fields_list[field]["comment"].to_string();
1905 let srid = comment
1906 .split("|")
1907 .collect::<Vec<&str>>()
1908 .last()
1909 .unwrap_or(&"0")
1910 .to_string();
1911 let location = value.to_string().replace(",", " ");
1912 let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1913 fields.push(format!("WHEN '{id}' THEN {location}"));
1914 continue;
1915 }
1916 if value.is_string() {
1917 fields.push(format!(
1918 "WHEN '{id}' THEN '{}'",
1919 value.to_string().replace("'", "''")
1920 ));
1921 } else if value.is_array() || value.is_object() {
1922 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1923 } else if value.is_number() {
1924 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1925 if col_type.contains("int") {
1926 fields.push(format!(
1927 "WHEN '{id}' THEN {}",
1928 value.as_f64().unwrap_or(0.0) as i64
1929 ));
1930 } else {
1931 fields.push(format!("WHEN '{id}' THEN {value}"));
1932 }
1933 } else if value.is_boolean() || value.is_null() {
1934 fields.push(format!("WHEN '{id}' THEN {value}"));
1935 } else {
1936 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1937 }
1938 }
1939 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1940 }
1941 self.where_and("id", "in", ids.into());
1942 for (field, value) in self.params.inc_dec.entries() {
1943 values.push(format!("{} = {}", field, value.to_string().clone()));
1944 }
1945
1946 let values = values.join(",");
1947 let sql = format!(
1948 "UPDATE {} SET {} {} {};",
1949 self.params.table.clone(),
1950 values,
1951 self.params.where_sql(),
1952 self.params.page_limit_sql()
1953 );
1954 if self.params.sql {
1955 return JsonValue::from(sql.clone());
1956 }
1957 let (state, data) = self.execute(sql.as_str());
1958 if state {
1959 data
1960 } else {
1961 error!("update_all: {data:?}");
1962 JsonValue::from(0)
1963 }
1964 }
1965
1966 fn delete(&mut self) -> JsonValue {
1967 let sql = format!(
1968 "delete FROM {} {} {};",
1969 self.params.table.clone(),
1970 self.params.where_sql(),
1971 self.params.page_limit_sql()
1972 );
1973 if self.params.sql {
1974 return JsonValue::from(sql.clone());
1975 }
1976 let (state, data) = self.execute(sql.as_str());
1977 match state {
1978 true => data,
1979 false => {
1980 error!("delete 失败>>> {data:?}");
1981 JsonValue::from(0)
1982 }
1983 }
1984 }
1985 fn transaction(&mut self) -> bool {
1986 let thread_id = format!("{:?}", thread::current().id());
1987 let key = format!("{}{}", self.default, thread_id);
1988
1989 if TRANSACTION_MANAGER.is_in_transaction(&key) {
1990 let depth = TRANSACTION_MANAGER.get_depth(&key);
1991 TRANSACTION_MANAGER.increment_depth(&key);
1992 let sp = format!("SAVEPOINT sp_{}", depth + 1);
1993 let _ = self.query(&sp);
1994 return true;
1995 }
1996
1997 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1998 Ok(e) => e,
1999 Err(err) => {
2000 error!("transaction 获取连接超时: {err}");
2001 return false;
2002 }
2003 };
2004
2005 if !TRANSACTION_MANAGER.start(&key, conn) {
2006 return false;
2007 }
2008
2009 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
2010 let (state, _) = self.query(sql);
2011 if !state {
2012 TRANSACTION_MANAGER.remove(&key, &thread_id);
2013 }
2014 state
2015 }
2016
2017 fn commit(&mut self) -> bool {
2018 let thread_id = format!("{:?}", thread::current().id());
2019 let key = format!("{}{}", self.default, thread_id);
2020
2021 let depth = TRANSACTION_MANAGER.get_depth(&key);
2022 if depth > 1 {
2023 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
2024 let _ = self.query(&sp);
2025 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2026 return true;
2027 }
2028
2029 let sql = "COMMIT";
2030 let (state, data) = self.query(sql);
2031
2032 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2033
2034 if !state {
2035 error!("提交事务失败: {data}");
2036 }
2037 state
2038 }
2039
2040 fn rollback(&mut self) -> bool {
2041 let thread_id = format!("{:?}", thread::current().id());
2042 let key = format!("{}{}", self.default, thread_id);
2043
2044 let depth = TRANSACTION_MANAGER.get_depth(&key);
2045 if depth > 1 {
2046 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
2047 let _ = self.query(&sp);
2048 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2049 return true;
2050 }
2051
2052 let sql = "ROLLBACK";
2053 let (state, data) = self.query(sql);
2054
2055 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2056
2057 if !state {
2058 error!("回滚失败: {data}");
2059 }
2060 state
2061 }
2062
2063 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2064 let (state, data) = self.query(sql);
2065 match state {
2066 true => Ok(data),
2067 false => Err(data.to_string()),
2068 }
2069 }
2070
2071 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2072 let (state, data) = self.execute(sql);
2073 match state {
2074 true => Ok(data),
2075 false => Err(data.to_string()),
2076 }
2077 }
2078
2079 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2080 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
2081 self
2082 }
2083 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2084 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
2085 self
2086 }
2087
2088 fn buildsql(&mut self) -> String {
2089 self.fetch_sql();
2090 let sql = self.select().to_string();
2091 format!("( {} ) `{}`", sql, self.params.table)
2092 }
2093
2094 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2095 for field in fields.clone() {
2096 if field.contains(format!("{}.", self.params.table).as_str()) {
2097 self.params.fields[field] = field.into();
2098 } else {
2099 self.params.fields[field] =
2100 format!("{field} as {}", field.replace(".", "_")).into();
2101 }
2102 }
2103 self
2104 }
2105
2106 fn join(
2107 &mut self,
2108 main_table: &str,
2109 main_fields: &str,
2110 right_table: &str,
2111 right_fields: &str,
2112 ) -> &mut Self {
2113 let main_table = if main_table.is_empty() {
2114 self.params.table.clone()
2115 } else {
2116 main_table.to_string()
2117 };
2118 self.params.join_table = right_table.to_string();
2119 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2120 self
2121 }
2122
2123 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2124 let main_fields = if main_fields.is_empty() {
2125 "id"
2126 } else {
2127 main_fields
2128 };
2129 let second_fields = if second_fields.is_empty() {
2130 self.params.table.clone()
2131 } else {
2132 second_fields.to_string().clone()
2133 };
2134 let sec_table_name = format!("{}{}", table, "_2");
2135 let second_table = format!("{} {}", table, sec_table_name.clone());
2136 self.params.join_table = sec_table_name.clone();
2137 self.params.join.push(format!(
2138 " INNER JOIN {} ON {}.{} = {}.{}",
2139 second_table, self.params.table, main_fields, sec_table_name, second_fields
2140 ));
2141 self
2142 }
2143
2144 fn join_right(
2145 &mut self,
2146 main_table: &str,
2147 main_fields: &str,
2148 right_table: &str,
2149 right_fields: &str,
2150 ) -> &mut Self {
2151 let main_table = if main_table.is_empty() {
2152 self.params.table.clone()
2153 } else {
2154 main_table.to_string()
2155 };
2156 self.params.join_table = right_table.to_string();
2157 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2158 self
2159 }
2160
2161 fn join_full(
2162 &mut self,
2163 main_table: &str,
2164 main_fields: &str,
2165 right_table: &str,
2166 right_fields: &str,
2167 ) -> &mut Self {
2168 let main_table = if main_table.is_empty() {
2169 self.params.table.clone()
2170 } else {
2171 main_table.to_string()
2172 };
2173 self.params.join_table = right_table.to_string();
2174 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2175 self
2176 }
2177
2178 fn union(&mut self, sub_sql: &str) -> &mut Self {
2179 self.params.unions.push(format!("UNION {sub_sql}"));
2180 self
2181 }
2182
2183 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2184 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2185 self
2186 }
2187
2188 fn lock_for_update(&mut self) -> &mut Self {
2189 self.params.lock_mode = "FOR UPDATE".to_string();
2190 self
2191 }
2192
2193 fn lock_for_share(&mut self) -> &mut Self {
2194 self.params.lock_mode = "FOR SHARE".to_string();
2195 self
2196 }
2197}