1use crate::pools;
2use crate::types::mysql_transaction::TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::{Connection, TABLE_FIELDS};
5use chrono::Local;
6use json::{array, object, JsonValue};
7use log::{error, info};
8use mysql::consts::ColumnType;
9use mysql::prelude::Queryable;
10use mysql::Value::NULL;
11use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
12
13use std::fmt::Debug;
14use std::ops::Index;
15use std::thread;
16use std::time::Duration;
17
18#[cfg(any(feature = "default", feature = "db-mysql"))]
19#[derive(Clone, Debug)]
20pub struct Mysql {
21 pub connection: Connection,
23 pub default: String,
25 pub params: Params,
26 pub pool: Pool,
27}
28
29impl Mysql {
30 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31 let pool_cfg = &connection.pool;
32 let pool_opts = PoolOpts::default()
33 .with_constraints(
34 PoolConstraints::new(
35 pool_cfg.min_connections as usize,
36 pool_cfg.max_connections as usize,
37 )
38 .unwrap_or_default(),
39 )
40 .with_reset_connection(true);
41
42 let opts = OptsBuilder::new()
43 .pool_opts(pool_opts)
44 .ip_or_hostname(Some(connection.hostname.clone()))
45 .tcp_port(connection.hostport.parse().unwrap_or(3306))
46 .user(Some(connection.username.clone()))
47 .pass(Some(connection.userpass.clone()))
48 .tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
49 .read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
50 .write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
51 .tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
52 .db_name(Some(connection.database.clone()));
53
54 match Pool::new(opts) {
55 Ok(pool) => Ok(Self {
56 connection: connection.clone(),
57 default: default.clone(),
58 params: Params::default("mysql"),
59 pool,
60 }),
61 Err(e) => {
62 error!("connect: {e}");
63 Err(e.to_string())
64 }
65 }
66 }
67 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
68 if sql.contains("INSERT") {
69 let rows = text.affected_rows();
70 if rows > 1 {
71 if self.params.autoinc {
72 let row = rows;
73 let start_row = text.last_insert_id().unwrap_or(0);
74 let end_row = start_row + row;
75
76 let mut ids = array![];
77 for item in start_row..end_row {
78 let _ = ids.push(item);
79 }
80 (true, ids)
81 } else {
82 (true, JsonValue::from(rows))
83 }
84 } else {
85 (true, JsonValue::from(text.last_insert_id()))
86 }
87 } else {
88 (true, JsonValue::from(text.affected_rows()))
89 }
90 }
91 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
92 let mut list = array![];
93 let mut index = 0;
94 text.for_each(|row| {
95 match row {
96 Ok(r) => {
97 let mut data = object! {};
98 for (index, item) in r.columns().iter().enumerate() {
99 let field = item.name_str();
100 let field = field.to_string();
101 let field = field.as_str();
102
103 data[field] = match item.column_type() {
104 ColumnType::MYSQL_TYPE_TINY => {
105 let t = r.get::<bool, _>(index).unwrap_or(true);
106 JsonValue::from(t)
107 }
108 ColumnType::MYSQL_TYPE_FLOAT
109 | ColumnType::MYSQL_TYPE_NEWDECIMAL
110 | ColumnType::MYSQL_TYPE_DOUBLE => {
111 let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
112 if t == NULL {
113 JsonValue::from(0.0)
114 } else {
115 match r.get::<f64, _>(index) {
116 None => JsonValue::from(0.0),
117 Some(t) => JsonValue::from(t),
118 }
119 }
120 }
121 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
122 let t = r.index(field).clone();
123 if t == NULL {
124 JsonValue::from(0)
125 } else {
126 let t = r.get::<i64, _>(index).unwrap_or(0);
127 JsonValue::from(t)
128 }
129 }
130 ColumnType::MYSQL_TYPE_NULL => {
131 let t = r.index(field).clone();
132 if t == NULL {
133 JsonValue::from("".to_string())
134 } else {
135 let t = r.get::<String, _>(index).unwrap_or("".to_string());
136 JsonValue::from(t)
137 }
138 }
139 ColumnType::MYSQL_TYPE_BLOB => {
140 let t = r.index(field).clone();
141 if t == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let t = r
145 .get::<mysql::Value, _>(index)
146 .unwrap_or("".to_string().into());
147 if t == NULL {
148 JsonValue::from("".to_string())
149 } else {
150 let t = r.get::<String, _>(index).unwrap_or("".to_string());
151 JsonValue::from(t)
152 }
153 }
154 }
155 ColumnType::MYSQL_TYPE_VAR_STRING => {
156 let t = r
157 .get::<mysql::Value, _>(index)
158 .unwrap_or("".to_string().into());
159 if t == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let t = r.get::<String, _>(index).unwrap_or("".to_string());
163 JsonValue::from(t)
164 }
165 }
166 ColumnType::MYSQL_TYPE_STRING => {
167 let t = r.index(field).clone();
168 if t == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let t = r.get::<String, _>(index).unwrap_or("".to_string());
172 JsonValue::from(t)
173 }
174 }
175 ColumnType::MYSQL_TYPE_DATE
176 | ColumnType::MYSQL_TYPE_DATETIME
177 | ColumnType::MYSQL_TYPE_LONG_BLOB
178 | ColumnType::MYSQL_TYPE_TIMESTAMP
179 | ColumnType::MYSQL_TYPE_TIME => {
180 let t = r.index(field).clone();
181 if t == NULL {
182 JsonValue::from("".to_string())
183 } else {
184 let t = r.get::<String, _>(index).unwrap_or("".to_string());
185 JsonValue::from(t)
186 }
187 }
188 ColumnType::MYSQL_TYPE_GEOMETRY => {
189 let t = r.index(field).clone();
190 if t == NULL {
191 JsonValue::from("".to_string())
192 } else {
193 let res = match r.index(field).clone() {
194 Value::Bytes(e) => e,
195 _ => vec![],
196 };
197 if res.len() >= 25 {
198 let x = f64::from_le_bytes(
199 res[9..17].try_into().unwrap_or([0u8; 8]),
200 );
201 let y = f64::from_le_bytes(
202 res[17..25].try_into().unwrap_or([0u8; 8]),
203 );
204 JsonValue::from(format!("{x},{y}"))
205 } else {
206 JsonValue::from("".to_string())
207 }
208 }
209 }
210 ColumnType::MYSQL_TYPE_JSON => {
211 let t = r.index(field).clone();
212 if t == NULL {
213 json::array![]
214 } else {
215 let t = r.get::<String, _>(index).unwrap_or("".to_string());
216 match json::parse(&t) {
217 Ok(v) => v,
218 Err(_) => JsonValue::from(t),
219 }
220 }
221 }
222 _ => {
223 let t = r.index(field).clone();
224 info!("未知: {} {:?} {:?}", field, item.column_type(), t);
225 JsonValue::from("".to_string())
226 }
227 };
228 }
229 let _ = list.push(data);
230 }
231 Err(e) => {
232 error!("err: {e} \r\n {sql}");
233 }
234 }
235 index += 1;
236 });
237 (true, list)
238 }
239 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
240 let thread_id = format!("{:?}", thread::current().id());
241 let key = format!("{}{}", self.default, thread_id);
242
243 let debug = self.connection.debug;
244 let params_json = self.params.json.clone();
245 let table_name = self.params.table.clone();
246
247 let is_system_query = sql.contains("INFORMATION_SCHEMA")
248 || sql.contains("information_schema")
249 || sql.starts_with("START TRANSACTION")
250 || sql.starts_with("COMMIT")
251 || sql.starts_with("ROLLBACK")
252 || sql.starts_with("SHOW ");
253
254 let fields_list = if !is_system_query && !table_name.is_empty() {
255 self.table_info(&table_name)
256 } else {
257 object! {}
258 };
259
260 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
261
262 if !in_transaction {
263 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
264 Ok(e) => e,
265 Err(err) => {
266 error!("非事务 execute超时: {err}");
267 return (false, object! {});
268 }
269 };
270 let connection_id = db.connection_id();
271 return match db.query_iter(sql) {
272 Ok(e) => {
273 if debug {
274 info!("查询成功: {} {}", thread_id, sql);
275 }
276 self.query_handle(e, sql)
277 }
278 Err(e) => {
279 error!(
280 "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
281 );
282 (false, JsonValue::from(e.to_string()))
283 }
284 };
285 }
286
287 match TRANSACTION_MANAGER.with_conn(&key, |db| {
288 let connection_id = db.connection_id();
289 match db.query_iter(sql) {
290 Ok(text) => {
291 if debug {
292 info!("查询成功: {} {}", thread_id, sql);
293 }
294 let mut list = array![];
295 text.for_each(|row| {
296 if let Ok(row) = row {
297 let mut item = object! {};
298 for column in row.columns_ref() {
299 let field = column.name_str().to_string();
300 let value = &row[column.name_str().as_ref()];
301 let column_type = column.column_type();
302 match column_type {
303 ColumnType::MYSQL_TYPE_VARCHAR
304 | ColumnType::MYSQL_TYPE_STRING
305 | ColumnType::MYSQL_TYPE_VAR_STRING
306 | ColumnType::MYSQL_TYPE_BLOB
307 | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
308 | ColumnType::MYSQL_TYPE_LONG_BLOB
309 | ColumnType::MYSQL_TYPE_TINY_BLOB => {
310 if *value == NULL {
311 item[field.as_str()] = "".into();
312 } else {
313 let data: String = mysql::from_value(value.clone());
314 if params_json.has_key(&field)
315 || fields_list[&field]["type"]
316 .to_string()
317 .contains("json")
318 {
319 item[field.as_str()] = match json::parse(&data) {
320 Ok(e) => e,
321 Err(_) => data.into(),
322 };
323 } else {
324 item[field.as_str()] = data.into();
325 }
326 }
327 }
328 ColumnType::MYSQL_TYPE_TINY
329 | ColumnType::MYSQL_TYPE_SHORT
330 | ColumnType::MYSQL_TYPE_LONG
331 | ColumnType::MYSQL_TYPE_INT24
332 | ColumnType::MYSQL_TYPE_LONGLONG => {
333 if *value == NULL {
334 item[field.as_str()] = 0.into();
335 } else {
336 let data: i64 = mysql::from_value(value.clone());
337 item[field.as_str()] = data.into();
338 }
339 }
340 ColumnType::MYSQL_TYPE_FLOAT
341 | ColumnType::MYSQL_TYPE_DOUBLE
342 | ColumnType::MYSQL_TYPE_DECIMAL
343 | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
344 if *value == NULL {
345 item[field.as_str()] = 0.0.into();
346 } else {
347 let data: f64 = mysql::from_value(value.clone());
348 item[field.as_str()] = data.into();
349 }
350 }
351 ColumnType::MYSQL_TYPE_JSON => {
352 if *value == NULL {
353 item[field.as_str()] = json::array![];
354 } else {
355 let data: String = mysql::from_value(value.clone());
356 match json::parse(&data) {
357 Ok(v) => item[field.as_str()] = v,
358 Err(_) => item[field.as_str()] = data.into(),
359 }
360 }
361 }
362 _ => {
363 if *value != NULL {
364 let data: String = mysql::from_value(value.clone());
365 item[field.as_str()] = data.into();
366 }
367 }
368 }
369 }
370 let _ = list.push(item);
371 }
372 });
373 (true, list)
374 }
375 Err(e) => {
376 error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
377 (false, JsonValue::from(e.to_string()))
378 }
379 }
380 }) {
381 Some(result) => result,
382 None => {
383 error!("事务连接不存在: {key}");
384 (false, object! {})
385 }
386 }
387 }
388 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
389 let thread_id = format!("{:?}", thread::current().id());
390 let key = format!("{}{}", self.default, thread_id);
391
392 let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
393
394 if !in_transaction {
395 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
396 Ok(e) => e,
397 Err(err) => {
398 error!("非事务: execute超时: {err}");
399 return (false, object! {});
400 }
401 };
402 return match db.exec_iter(sql, ()) {
403 Ok(e) => {
404 if self.connection.debug {
405 info!("提交成功: {} {}", thread_id, sql);
406 }
407 self.execute_cl(e, sql)
408 }
409 Err(e) => {
410 error!("非事务提交失败: {thread_id} {e} {sql}");
411 (false, JsonValue::from(e.to_string()))
412 }
413 };
414 }
415
416 if !TRANSACTION_MANAGER.acquire_table_lock(
417 &self.params.table,
418 &thread_id,
419 Duration::from_secs(30),
420 ) {
421 error!("获取表锁超时: {} {}", self.params.table, thread_id);
422 return (false, object! {"error": "table lock timeout"});
423 }
424
425 let is_insert = sql.contains("INSERT");
426 let autoinc = self.params.autoinc;
427 let debug = self.connection.debug;
428
429 match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
431 Ok(result) => {
432 let affected_rows = result.affected_rows();
433 let last_insert_id = result.last_insert_id();
434 (true, affected_rows, last_insert_id, None)
435 }
436 Err(e) => (false, 0, None, Some(e.to_string())),
437 }) {
438 Some((true, affected_rows, last_insert_id, _)) => {
439 if debug {
440 info!("提交成功: {} {}", thread_id, sql);
441 }
442 if is_insert {
443 if affected_rows > 1 {
444 if autoinc {
445 let start_row = last_insert_id.unwrap_or(0);
446 let end_row = start_row + affected_rows;
447 let mut ids = array![];
448 for item in start_row..end_row {
449 let _ = ids.push(item);
450 }
451 (true, ids)
452 } else {
453 (true, JsonValue::from(affected_rows))
454 }
455 } else {
456 (true, JsonValue::from(last_insert_id))
457 }
458 } else {
459 (true, JsonValue::from(affected_rows))
460 }
461 }
462 Some((false, _, _, Some(err))) => {
463 error!("事务提交失败: {thread_id} {err} {sql}");
464 (false, JsonValue::from(err))
465 }
466 _ => {
467 error!("事务连接不存在: {key}");
468 (false, object! {})
469 }
470 }
471 }
472}
473
474impl DbMode for Mysql {
475 fn database_tables(&mut self) -> JsonValue {
476 let sql = "SHOW TABLES".to_string();
477 match self.sql(sql.as_str()) {
478 Ok(e) => {
479 let mut list = vec![];
480 for item in e.members() {
481 for (_, value) in item.entries() {
482 list.push(value.clone());
483 }
484 }
485 list.into()
486 }
487 Err(_) => {
488 array![]
489 }
490 }
491 }
492
493 fn database_create(&mut self, name: &str) -> bool {
494 let sql = format!("CREATE DATABASE {name}");
495
496 let (state, data) = self.execute(sql.as_str());
497 match state {
498 true => data.as_bool().unwrap_or(false),
499 false => {
500 error!("创建数据库失败: {data:?}");
501 false
502 }
503 }
504 }
505
506 fn truncate(&mut self, table: &str) -> bool {
507 let sql = format!("TRUNCATE TABLE {table}");
508 let (state, _) = self.execute(sql.as_str());
509 state
510 }
511}
512
513impl Mode for Mysql {
514 fn table_create(&mut self, options: TableOptions) -> JsonValue {
515 let mut sql = String::new();
516 let mut unique_fields = String::new();
518 let mut unique_name = String::new();
519 let mut unique = String::new();
520 for item in options.table_unique.iter() {
521 if unique_fields.is_empty() {
522 unique_fields = format!("`{item}`");
523 unique_name = format!("{}_unique_{}", options.table_name, item);
524 } else {
525 unique_fields = format!("{unique_fields},`{item}`");
526 unique_name = format!("{unique_name}_{item}");
527 }
528 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
529 unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
530 }
531
532 let mut index = String::new();
534 for row in options.table_index.iter() {
535 let mut index_fields = String::new();
536 let mut index_name = String::new();
537 for item in row.iter() {
538 if index_fields.is_empty() {
539 index_fields = format!("`{item}`");
540 index_name = format!("{}_index_{}", options.table_name, item);
541 } else {
542 index_fields = format!("{index_fields},`{item}`");
543 index_name = format!("{index_name}_{item}");
544 }
545 }
546 if index.is_empty() {
547 index = format!("INDEX `{index_name}` ({index_fields})");
548 } else {
549 index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
550 }
551 }
552 if index.replace(",", "").is_empty() {
553 index = index.replace(",", "");
554 }
555
556 for (name, field) in options.table_fields.entries() {
557 let row = br_fields::field("mysql", name, field.clone());
558 sql = format!("{sql} {row},\r\n");
559 }
560
561 if !unique.is_empty() {
562 sql = sql.trim_end_matches(",\r\n").to_string();
563 sql = format!("{sql},\r\n{unique}");
564 }
565 if !index.is_empty() {
566 sql = sql.trim_end_matches(",\r\n").to_string();
567 sql = format!("{sql},\r\n{index}");
568 }
569 let collate = format!("{}_bin", self.connection.charset.str());
570
571 let partition = if options.table_partition {
573 sql = format!(
574 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
575 sql,
576 options.table_key,
577 options.table_partition_columns[0].clone()
578 );
579 let temp_head = format!(
580 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
581 options.table_partition_columns[0].clone()
582 );
583 let mut partition_array = vec![];
584 let mut count = 0;
585 for member in options.table_partition_columns[1].members() {
586 let temp = format!(
587 "PARTITION p{} VALUES LESS THAN ('{}')",
588 count.clone(),
589 member.clone()
590 );
591 count += 1;
592 partition_array.push(temp.clone());
593 }
594 let temp_body = partition_array.join(",\r\n");
595 let temp_end = format!(
596 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
597 count.clone()
598 );
599 format!("{temp_head}{temp_body}{temp_end}")
600 } else {
601 sql = if sql.trim_end().ends_with(",") {
602 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
603 } else {
604 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
605 };
606 "".to_string()
607 };
608 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
609
610 if self.params.sql {
611 return JsonValue::from(sql);
612 }
613
614 let (state, data) = self.execute(sql.as_str());
615
616 match state {
617 true => JsonValue::from(state),
618 false => {
619 info!("创建错误: {data}");
620 JsonValue::from(state)
621 }
622 }
623 }
624
625 fn table_update(&mut self, options: TableOptions) -> JsonValue {
626 let table_fields_guard = match TABLE_FIELDS.lock() {
627 Ok(g) => g,
628 Err(e) => e.into_inner(),
629 };
630 if table_fields_guard
631 .get(&format!("{}{}", self.default, options.table_name))
632 .is_some()
633 {
634 drop(table_fields_guard);
635 let mut table_fields_guard = match TABLE_FIELDS.lock() {
636 Ok(g) => g,
637 Err(e) => e.into_inner(),
638 };
639 table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
640 } else {
641 drop(table_fields_guard);
642 }
643 let mut sql = vec![];
644 let fields_list = self.table_info(&options.table_name);
645 let mut put = vec![];
646 let mut add = vec![];
647 let mut del = vec![];
648 for (key, _) in fields_list.entries() {
649 if options.table_fields[key].is_empty() {
650 del.push(key);
651 }
652 }
653 for (name, field) in options.table_fields.entries() {
654 if !fields_list[name].is_empty() {
655 let old_comment = fields_list[name]["comment"].to_string();
656 let new_comment = br_fields::field("mysql", name, field.clone());
657 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
658 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
659 if old_comment == new_comment_text {
660 continue;
661 }
662 put.push(name);
663 } else {
664 add.push(name);
665 }
666 }
667
668 for name in add.iter() {
669 let name = name.to_string();
670 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
671 sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
672 }
673 for name in del.iter() {
674 sql.push(format!(
675 "ALTER TABLE {} DROP `{name}`;\r\n",
676 options.table_name
677 ));
678 }
679 for name in put.iter() {
680 let name = name.to_string();
681 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
682 sql.push(format!(
683 "ALTER TABLE {} CHANGE `{}` {};\r\n",
684 options.table_name, name, row
685 ));
686 }
687
688 let (_, index_list) =
689 self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
690 let (_, pk_list) = self.query(
692 format!(
693 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
694 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
695 self.connection.database, options.table_name
696 )
697 .as_str(),
698 );
699 let mut pk_vec = vec![];
700 for member in pk_list.members() {
701 pk_vec.push(member["COLUMN_NAME"].to_string());
702 }
703
704 let mut unique_new = vec![];
705 let mut index_new = vec![];
706 for item in index_list.members() {
707 let key_name = item["Key_name"].as_str().unwrap_or("");
708 let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
709
710 if non_unique == 0
711 && (key_name.contains(format!("{}_unique", options.table_name).as_str())
712 || key_name.contains("unique"))
713 {
714 unique_new.push(key_name.to_string());
715 continue;
716 }
717 if non_unique == 1
718 && (key_name.contains(format!("{}_index", options.table_name).as_str())
719 || key_name.contains("index"))
720 {
721 index_new.push(key_name.to_string());
722 continue;
723 }
724 }
725
726 let mut unique_fields = String::new();
727 let mut unique_name = String::new();
728 for item in options.table_unique.iter() {
729 if unique_fields.is_empty() {
730 unique_fields = format!("`{item}`");
731 unique_name = format!("{}_unique_{}", options.table_name, item);
732 } else {
733 unique_fields = format!("{unique_fields},`{item}`");
734 unique_name = format!("{unique_name}_{item}");
735 }
736 }
737 if !unique_name.is_empty() {
738 let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
739 unique_name = format!("unique_{md5}");
740 for item in &unique_new {
741 if unique_name != *item {
742 sql.push(format!(
743 "alter table {} drop index {};\r\n",
744 options.table_name, item
745 ));
746 }
747 }
748 if !unique_new.contains(&unique_name) {
749 sql.push(format!(
750 "CREATE UNIQUE index {} on {} ({});\r\n",
751 unique_name, options.table_name, unique_fields
752 ));
753 }
754 }
755
756 let mut index_list = vec![];
757 for row in options.table_index.iter() {
759 let mut index_fields = String::new();
760 let mut index_name = String::new();
761 for item in row {
762 if index_fields.is_empty() {
763 index_fields = item.to_string();
764 index_name = format!("{}_index_{}", options.table_name, item);
765 } else {
766 index_fields = format!("{index_fields},{item}");
767 index_name = format!("{index_name}_{item}");
768 }
769 }
770 index_list.push(index_name.clone());
771 if !index_new.contains(&index_name.clone()) {
772 sql.push(format!(
773 "CREATE INDEX {} on {} ({});\r\n",
774 index_name, options.table_name, index_fields
775 ));
776 }
777 }
778
779 for item in index_new {
780 if !index_list.contains(&item.to_string()) {
781 sql.push(format!(
782 "DROP INDEX {} ON {};\r\n",
783 item.clone(),
784 options.table_name
785 ));
786 }
787 }
788
789 if options.table_partition {
791 if !pk_vec.contains(&options.table_key.to_string().clone())
793 || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
794 {
795 let pk = format!(
796 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
797 options.table_name,
798 options.table_key,
799 options.table_partition_columns[0].clone()
800 );
801 sql.push(pk);
802 let temp_head = format!(
803 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
804 options.table_name,
805 options.table_partition_columns[0].clone()
806 );
807 let mut partition_array = vec![];
808 let mut count = 0;
809 for member in options.table_partition_columns[1].members() {
810 let temp = format!(
811 "PARTITION p{} VALUES LESS THAN ('{}')",
812 count.clone(),
813 member.clone()
814 );
815 count += 1;
816 partition_array.push(temp.clone());
817 }
818 let temp_body = partition_array.join(",\r\n");
819 let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
820 sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
821 }
822 } else if pk_vec.len() != 1 {
823 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
824 sql.push(rm_partition);
825 let pk = format!(
826 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
827 options.table_name, options.table_key
828 );
829 sql.push(pk);
830 };
831
832 if self.params.sql {
833 return JsonValue::from(sql.join(""));
834 }
835
836 if sql.is_empty() {
837 return JsonValue::from(-1);
838 }
839
840 for item in sql.iter() {
841 let (state, res) = self.execute(item.as_str());
842 match state {
843 true => {}
844 false => {
845 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
846 return JsonValue::from(0);
847 }
848 }
849 }
850 JsonValue::from(1)
851 }
852
853 fn table_info(&mut self, table: &str) -> JsonValue {
854 let table_fields_guard = match TABLE_FIELDS.lock() {
855 Ok(g) => g,
856 Err(e) => e.into_inner(),
857 };
858 if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
859 return cached.clone();
860 }
861 drop(table_fields_guard);
862 let sql = format!(
863 "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'"
864 );
865 let (state, data) = self.query(sql.as_str());
866 let mut list = object! {};
867 if state {
868 for item in data.members() {
869 if item["TABLE_SCHEMA"] != self.connection.database {
870 continue;
871 }
872 let mut row = object! {};
873 row["field"] = item["COLUMN_NAME"].clone();
874 row["comment"] = item["COLUMN_COMMENT"].clone();
875 row["type"] = item["COLUMN_TYPE"].clone();
876 if let Some(field_name) = row["field"].as_str() {
877 list[field_name] = row.clone();
878 }
879 }
880 let mut table_fields_guard = match TABLE_FIELDS.lock() {
881 Ok(g) => g,
882 Err(e) => e.into_inner(),
883 };
884 table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
885 list
886 } else {
887 list
888 }
889 }
890
891 fn table_is_exist(&mut self, name: &str) -> bool {
892 let sql = format!(
893 "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = '{}' AND TABLE_SCHEMA = '{}'",
894 name, self.connection.database
895 );
896 let (state, data) = self.query(sql.as_str());
897 match state {
898 true => !data.is_empty() && !data.is_empty(),
899 false => false,
900 }
901 }
902
903 fn table(&mut self, name: &str) -> &mut Mysql {
904 self.params = Params::default(self.connection.mode.str().as_str());
905 let table_name = format!("{}{}", self.connection.prefix, name);
906 if !super::sql_safety::validate_table_name(&table_name) {
907 error!("Invalid table name: {}", name);
908 }
909 self.params.table = table_name.clone();
910 self.params.join_table = table_name;
911 self
912 }
913
914 fn change_table(&mut self, name: &str) -> &mut Self {
915 self.params.join_table = name.to_string();
916 self
917 }
918
919 fn autoinc(&mut self) -> &mut Self {
920 self.params.autoinc = true;
921 self
922 }
923
924 fn timestamps(&mut self) -> &mut Self {
925 self.params.timestamps = true;
926 self
927 }
928
929 fn fetch_sql(&mut self) -> &mut Self {
930 self.params.sql = true;
931 self
932 }
933
934 fn order(&mut self, field: &str, by: bool) -> &mut Self {
935 self.params.order[field] = {
936 if by {
937 "DESC"
938 } else {
939 "ASC"
940 }
941 }
942 .into();
943 self
944 }
945
946 fn group(&mut self, field: &str) -> &mut Self {
947 let fields: Vec<&str> = field.split(",").collect();
948 for field in fields.iter() {
949 let field = field.to_string();
950 self.params.group[field.as_str()] = field.clone().into();
951 if !self.params.fields.has_key(field.as_str()) {
952 self.params.fields[field.as_str()] = field.clone().into();
953 }
954 }
955
956 self
957 }
958
959 fn distinct(&mut self) -> &mut Self {
960 self.params.distinct = true;
961 self
962 }
963
964 fn json(&mut self, field: &str) -> &mut Self {
965 let list: Vec<&str> = field.split(",").collect();
966 for item in list.iter() {
967 self.params.json[item.to_string().as_str()] = item.to_string().into();
968 }
969 self
970 }
971
972 fn location(&mut self, field: &str) -> &mut Self {
973 let list: Vec<&str> = field.split(",").collect();
974 for item in list.iter() {
975 self.params.location[item.to_string().as_str()] = item.to_string().into();
976 }
977 self
978 }
979
980 fn field(&mut self, field: &str) -> &mut Self {
981 let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
982 let join_table = if self.params.join_table.is_empty() {
983 self.params.table.clone()
984 } else {
985 self.params.join_table.clone()
986 };
987 for item in list.iter() {
988 let lower = item.to_lowercase();
989 let is_expr = lower.contains("count(")
990 || lower.contains("sum(")
991 || lower.contains("avg(")
992 || lower.contains("max(")
993 || lower.contains("min(")
994 || lower.contains("case ");
995 if is_expr {
996 self.params.fields[item.to_string().as_str()] = (*item).into();
997 } else if item.contains(" as ") {
998 let text = item.split(" as ").collect::<Vec<&str>>();
999 self.params.fields[item.to_string().as_str()] =
1000 format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
1001 } else {
1002 self.params.fields[item.to_string().as_str()] =
1003 format!("{join_table}.`{item}`").into();
1004 }
1005 }
1006 self
1007 }
1008
1009 fn field_raw(&mut self, expr: &str) -> &mut Self {
1010 self.params.fields[expr] = expr.into();
1011 self
1012 }
1013
1014 fn hidden(&mut self, name: &str) -> &mut Self {
1015 let hidden: Vec<&str> = name.split(",").collect();
1016
1017 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
1018
1019 let mut data = array![];
1020 for item in fields_list.members() {
1021 let _ = data.push(object! {
1022 "name":item["COLUMN_NAME"].as_str().unwrap_or("")
1023 });
1024 }
1025
1026 for item in data.members() {
1027 let name = item["name"].as_str().unwrap_or("");
1028 if !hidden.contains(&name) {
1029 self.params.fields[name] = name.into();
1030 }
1031 }
1032 self
1033 }
1034
1035 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1036 for f in field.split('|') {
1037 if !super::sql_safety::validate_field_name(f) {
1038 error!("Invalid field name: {}", f);
1039 }
1040 }
1041 if !super::sql_safety::validate_compare_orator(compare) {
1042 error!("Invalid compare operator: {}", compare);
1043 }
1044 let table_fields = self.table_info(&self.params.table.clone());
1045 let join_table = if self.params.join_table.is_empty() {
1046 self.params.table.clone()
1047 } else {
1048 self.params.join_table.clone()
1049 };
1050 if value.is_boolean() {
1051 if value.as_bool().unwrap_or(false) {
1052 value = 1.into();
1053 } else {
1054 value = 0.into();
1055 }
1056 }
1057 match compare {
1058 "between" => {
1059 self.params.where_and.push(format!(
1060 "{join_table}.`{field}` between '{}' AND '{}'",
1061 value[0], value[1]
1062 ));
1063 }
1064 "location" => {
1065 let comment = table_fields[field]["comment"].to_string();
1066 let srid = comment
1067 .split("|")
1068 .collect::<Vec<&str>>()
1069 .last()
1070 .unwrap_or(&"0")
1071 .to_string();
1072
1073 let field_name = format!(
1074 "ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
1075 value[0], value[1], value[4]
1076 );
1077 self.params.fields[&field_name.clone()] = field_name.clone().into();
1078 let location = format!(
1081 "ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
1082 value[0], value[1], value[2], value[3]
1083 );
1084 self.params.where_and.push(location);
1085 }
1086 "set" => {
1087 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1088 let mut wheredata = vec![];
1089 for item in list.iter() {
1090 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1091 }
1092 self.params
1093 .where_and
1094 .push(format!("({})", wheredata.join(" or ")));
1095 }
1096 "notin" => {
1097 let mut text = String::new();
1098 for item in value.members() {
1099 text = format!("{text},'{item}'");
1100 }
1101 text = text.trim_start_matches(",").into();
1102 self.params
1103 .where_and
1104 .push(format!("{join_table}.`{field}` not in ({text})"));
1105 }
1106 "is" => {
1107 self.params
1108 .where_and
1109 .push(format!("{join_table}.`{field}` is {value}"));
1110 }
1111 "isnot" => {
1112 self.params
1113 .where_and
1114 .push(format!("{join_table}.`{field}` is not {value}"));
1115 }
1116 "notlike" => {
1117 self.params
1118 .where_and
1119 .push(format!("{join_table}.`{field}` not like '{value}'"));
1120 }
1121 "in" => {
1122 if value.is_array() && value.is_empty() {
1123 self.params.where_and.push("1=0".to_string());
1124 return self;
1125 }
1126 let mut text = String::new();
1127 if value.is_array() {
1128 for item in value.members() {
1129 text = format!("{text},'{item}'");
1130 }
1131 } else if value.is_null() {
1132 text = format!("{text},null");
1133 } else {
1134 let value = value.as_str().unwrap_or("");
1135
1136 let value: Vec<&str> = value.split(",").collect();
1137 for item in value.iter() {
1138 text = format!("{text},'{item}'");
1139 }
1140 }
1141 text = text.trim_start_matches(",").into();
1142
1143 self.params
1144 .where_and
1145 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1146 }
1147 "json_contains" => {
1151 if value.is_array() {
1152 if value.is_empty() {
1153 self.params.where_and.push("1=0".to_string());
1154 } else {
1155 let mut parts = vec![];
1156 for item in value.members() {
1157 let escaped = super::sql_safety::escape_string(&item.to_string());
1158 parts.push(format!(
1159 "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1160 escaped
1161 ));
1162 }
1163 self.params
1164 .where_and
1165 .push(format!("({})", parts.join(" OR ")));
1166 }
1167 } else {
1168 let escaped = super::sql_safety::escape_string(&value.to_string());
1169 self.params.where_and.push(format!(
1170 "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1171 escaped
1172 ));
1173 }
1174 }
1175 _ => {
1176 self.params
1177 .where_and
1178 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1179 }
1180 }
1181 self
1182 }
1183
1184 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1185 for f in field.split('|') {
1186 if !super::sql_safety::validate_field_name(f) {
1187 error!("Invalid field name: {}", f);
1188 }
1189 }
1190 if !super::sql_safety::validate_compare_orator(compare) {
1191 error!("Invalid compare operator: {}", compare);
1192 }
1193 let join_table = if self.params.join_table.is_empty() {
1194 self.params.table.clone()
1195 } else {
1196 self.params.join_table.clone()
1197 };
1198
1199 if value.is_boolean() {
1200 if value.as_bool().unwrap_or(false) {
1201 value = 1.into();
1202 } else {
1203 value = 0.into();
1204 }
1205 }
1206
1207 match compare {
1208 "between" => {
1209 self.params.where_or.push(format!(
1210 "{}.`{}` between '{}' AND '{}'",
1211 join_table, field, value[0], value[1]
1212 ));
1213 }
1214 "set" => {
1215 let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1216 let mut wheredata = vec![];
1217 for item in list.iter() {
1218 wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1219 }
1220 self.params
1221 .where_or
1222 .push(format!("({})", wheredata.join(" or ")));
1223 }
1224 "notin" => {
1225 let mut text = String::new();
1226 for item in value.members() {
1227 text = format!("{text},'{item}'");
1228 }
1229 text = text.trim_start_matches(",").into();
1230 self.params
1231 .where_or
1232 .push(format!("{join_table}.`{field}` not in ({text})"));
1233 }
1234 "is" => {
1235 self.params
1236 .where_or
1237 .push(format!("{join_table}.`{field}` is {value}"));
1238 }
1239 "isnot" => {
1240 self.params
1241 .where_or
1242 .push(format!("{join_table}.`{field}` IS NOT {value}"));
1243 }
1244 "in" => {
1245 if value.is_array() && value.is_empty() {
1246 self.params.where_or.push("1=0".to_string());
1247 return self;
1248 }
1249 let mut text = String::new();
1250 if value.is_array() {
1251 for item in value.members() {
1252 text = format!("{text},'{item}'");
1253 }
1254 } else {
1255 let value = value.as_str().unwrap_or("");
1256 let value: Vec<&str> = value.split(",").collect();
1257 for item in value.iter() {
1258 text = format!("{text},'{item}'");
1259 }
1260 }
1261 text = text.trim_start_matches(",").into();
1262 self.params
1263 .where_or
1264 .push(format!("{join_table}.`{field}` {compare} ({text})"));
1265 }
1266 "json_contains" => {
1270 if value.is_array() {
1271 if value.is_empty() {
1272 self.params.where_or.push("1=0".to_string());
1273 } else {
1274 let mut parts = vec![];
1275 for item in value.members() {
1276 let escaped = super::sql_safety::escape_string(&item.to_string());
1277 parts.push(format!(
1278 "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1279 escaped
1280 ));
1281 }
1282 self.params
1283 .where_or
1284 .push(format!("({})", parts.join(" OR ")));
1285 }
1286 } else {
1287 let escaped = super::sql_safety::escape_string(&value.to_string());
1288 self.params.where_or.push(format!(
1289 "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1290 escaped
1291 ));
1292 }
1293 }
1294 _ => {
1295 if field.contains(".") {
1296 self.params
1297 .where_or
1298 .push(format!("{field} {compare} '{value}'"));
1299 } else {
1300 self.params
1301 .where_or
1302 .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1303 }
1304 }
1305 }
1306 self
1307 }
1308
1309 fn where_raw(&mut self, expr: &str) -> &mut Self {
1310 self.params.where_and.push(expr.to_string());
1311 self
1312 }
1313
1314 fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1315 self.params
1316 .where_and
1317 .push(format!("`{field}` IN ({sub_sql})"));
1318 self
1319 }
1320
1321 fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1322 self.params
1323 .where_and
1324 .push(format!("`{field}` NOT IN ({sub_sql})"));
1325 self
1326 }
1327
1328 fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1329 self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1330 self
1331 }
1332
1333 fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1334 self.params
1335 .where_and
1336 .push(format!("NOT EXISTS ({sub_sql})"));
1337 self
1338 }
1339
1340 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1341 self.params.where_column = format!(
1342 "{}.`{}` {} {}.`{}`",
1343 self.params.table, field_a, compare, self.params.table, field_b
1344 );
1345 self
1346 }
1347
1348 fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1349 self.params
1350 .update_column
1351 .push(format!("{field_a} = {compare}"));
1352 self
1353 }
1354
1355 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1356 self.params.page = page;
1357 self.params.limit = limit;
1358 self
1359 }
1360
1361 fn limit(&mut self, count: i32) -> &mut Self {
1362 self.params.limit_only = count;
1363 self
1364 }
1365
1366 fn column(&mut self, field: &str) -> JsonValue {
1367 self.field(field);
1368 let sql = self.params.select_sql();
1369
1370 if self.params.sql {
1371 return JsonValue::from(sql);
1372 }
1373 let (state, data) = self.query(sql.as_str());
1374 match state {
1375 true => {
1376 let mut list = array![];
1377 for item in data.members() {
1378 if self.params.json[field].is_empty() {
1379 let _ = list.push(item[field].clone());
1380 } else {
1381 let data =
1382 json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1383 let _ = list.push(data);
1384 }
1385 }
1386 list
1387 }
1388 false => {
1389 array![]
1390 }
1391 }
1392 }
1393
1394 fn count(&mut self) -> JsonValue {
1395 if !self.params.fields.is_empty() {
1396 self.group(format!("{}.id", self.params.table).as_str());
1397 }
1398 self.params.fields["count"] = "count(*) as count".into();
1399 let sql = self.params.select_sql();
1400 if self.params.sql {
1401 return JsonValue::from(sql.clone());
1402 }
1403 let (state, data) = self.query(sql.as_str());
1404 if state {
1405 if data.is_empty() {
1406 JsonValue::from(0)
1407 } else {
1408 data[0]["count"].clone()
1409 }
1410 } else {
1411 JsonValue::from(0)
1412 }
1413 }
1414
1415 fn max(&mut self, field: &str) -> JsonValue {
1416 self.params.fields[field] = format!("max({field}) as {field}").into();
1417 let sql = self.params.select_sql();
1418 if self.params.sql {
1419 return JsonValue::from(sql.clone());
1420 }
1421 let (state, data) = self.query(sql.as_str());
1422 if state {
1423 if data.len() > 1 {
1424 return data.clone();
1425 }
1426 data[0][field].clone()
1427 } else {
1428 JsonValue::from(0)
1429 }
1430 }
1431
1432 fn min(&mut self, field: &str) -> JsonValue {
1433 self.params.fields[field] = format!("min({field}) as {field}").into();
1434 let sql = self.params.select_sql();
1435 if self.params.sql {
1436 return JsonValue::from(sql.clone());
1437 }
1438 let (state, data) = self.query(sql.as_str());
1439 if state {
1440 if data.len() > 1 {
1441 return data;
1442 }
1443 data[0][field].clone()
1444 } else {
1445 JsonValue::from(0)
1446 }
1447 }
1448
1449 fn sum(&mut self, field: &str) -> JsonValue {
1450 self.params.fields[field] = format!("sum({field}) as {field}").into();
1451 let sql = self.params.select_sql();
1452 if self.params.sql {
1453 return JsonValue::from(sql.clone());
1454 }
1455 let (state, data) = self.query(sql.as_str());
1456 match state {
1457 true => {
1458 if data.len() > 1 {
1459 return data;
1460 }
1461 data[0][field].clone()
1462 }
1463 false => JsonValue::from(0),
1464 }
1465 }
1466
1467 fn avg(&mut self, field: &str) -> JsonValue {
1468 self.params.fields[field] = format!("avg({field}) as {field}").into();
1469 let sql = self.params.select_sql();
1470 if self.params.sql {
1471 return JsonValue::from(sql.clone());
1472 }
1473 let (state, data) = self.query(sql.as_str());
1474 if state {
1475 if data.len() > 1 {
1476 return data;
1477 }
1478 data[0][field].clone()
1479 } else {
1480 JsonValue::from(0)
1481 }
1482 }
1483
1484 fn having(&mut self, expr: &str) -> &mut Self {
1485 self.params.having.push(expr.to_string());
1486 self
1487 }
1488
1489 fn select(&mut self) -> JsonValue {
1490 let sql = self.params.select_sql();
1491 if self.params.sql {
1492 return JsonValue::from(sql.clone());
1493 }
1494 let (state, mut data) = self.query(sql.as_str());
1495 match state {
1496 true => {
1497 for (field, _) in self.params.json.entries() {
1498 for item in data.members_mut() {
1499 if !item[field].is_empty() {
1500 let json = item[field].to_string();
1501 item[field] = match json::parse(&json) {
1502 Ok(e) => e,
1503 Err(_) => JsonValue::from(json),
1504 };
1505 }
1506 }
1507 }
1508 data.clone()
1509 }
1510 false => array![],
1511 }
1512 }
1513
1514 fn find(&mut self) -> JsonValue {
1515 self.params.page = 1;
1516 self.params.limit = 1;
1517 let sql = self.params.select_sql();
1518 if self.params.sql {
1519 return JsonValue::from(sql.clone());
1520 }
1521 let (state, mut data) = self.query(sql.as_str());
1522 match state {
1523 true => {
1524 if data.is_empty() {
1525 return object! {};
1526 }
1527 for (field, _) in self.params.json.entries() {
1528 if !data[0][field].is_empty() {
1529 let json = data[0][field].to_string();
1530 let json = json::parse(&json).unwrap_or(array![]);
1531 data[0][field] = json;
1532 } else {
1533 data[0][field] = array![];
1534 }
1535 }
1536 data[0].clone()
1537 }
1538 false => {
1539 error!("find失败: {data:?}");
1540 object! {}
1541 }
1542 }
1543 }
1544
1545 fn value(&mut self, field: &str) -> JsonValue {
1546 self.params.fields = object! {};
1547 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1548 self.params.page = 1;
1549 self.params.limit = 1;
1550 let sql = self.params.select_sql();
1551 if self.params.sql {
1552 return JsonValue::from(sql.clone());
1553 }
1554 let (state, mut data) = self.query(sql.as_str());
1555 match state {
1556 true => {
1557 for (field, _) in self.params.json.entries() {
1558 if !data[0][field].is_empty() {
1559 let json = data[0][field].to_string();
1560 let json = json::parse(&json).unwrap_or(array![]);
1561 data[0][field] = json;
1562 } else {
1563 data[0][field] = array![];
1564 }
1565 }
1566 data[0][field].clone()
1567 }
1568 false => {
1569 if self.connection.debug {
1570 info!("{data:?}");
1571 }
1572 JsonValue::Null
1573 }
1574 }
1575 }
1576 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1577 let fields_list = self.table_info(&self.params.table.clone());
1578
1579 let mut fields = vec![];
1580 let mut values = vec![];
1581 if !self.params.autoinc && data["id"].is_empty() {
1582 let thread_id = format!("{:?}", std::thread::current().id());
1583 let thread_num: u64 = thread_id
1584 .trim_start_matches("ThreadId(")
1585 .trim_end_matches(")")
1586 .parse()
1587 .unwrap_or(0);
1588 data["id"] = format!(
1589 "{:X}{:X}",
1590 Local::now().timestamp_nanos_opt().unwrap_or(0),
1591 thread_num
1592 )
1593 .into();
1594 }
1595 for (field, value) in data.entries() {
1596 fields.push(format!("`{field}`"));
1597
1598 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1599 let is_location = (self.params.location.has_key(field)
1600 && !self.params.location[field].is_empty())
1601 || col_type == "point";
1602 if is_location {
1603 if value.is_empty() {
1604 values.push("NULL".to_string());
1605 continue;
1606 }
1607 let comment = fields_list[field]["comment"].to_string();
1608 let srid = comment
1609 .split("|")
1610 .collect::<Vec<&str>>()
1611 .last()
1612 .unwrap_or(&"0")
1613 .to_string();
1614 let location = value.to_string().replace(",", " ");
1615 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1616 continue;
1617 }
1618
1619 if value.is_string() || value.is_array() || value.is_object() {
1620 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1621 continue;
1622 } else if value.is_number() {
1623 if col_type.contains("int") {
1624 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1625 } else {
1626 values.push(format!("{value}"));
1627 }
1628 continue;
1629 } else if value.is_boolean() || value.is_null() {
1630 values.push(format!("{value}"));
1631 continue;
1632 } else {
1633 values.push(format!("'{value}'"));
1634 continue;
1635 }
1636 }
1637 let fields = fields.join(",");
1638 let values = values.join(",");
1639
1640 let sql = format!(
1641 "INSERT INTO {} ({fields}) VALUES ({values});",
1642 self.params.table
1643 );
1644 if self.params.sql {
1645 return JsonValue::from(sql.clone());
1646 }
1647 let (state, ids) = self.execute(sql.as_str());
1648
1649 match state {
1650 true => match self.params.autoinc {
1651 true => ids.clone(),
1652 false => data["id"].clone(),
1653 },
1654 false => {
1655 let thread_id = format!("{:?}", thread::current().id());
1656 error!("添加失败: {thread_id} {ids:?} {sql}");
1657 JsonValue::from("")
1658 }
1659 }
1660 }
1661 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1662 let fields_list = self.table_info(&self.params.table.clone());
1663
1664 let mut fields = String::new();
1665 if !self.params.autoinc && data[0]["id"].is_empty() {
1666 data[0]["id"] = "".into();
1667 }
1668 for (field, _) in data[0].entries() {
1669 fields = format!("{fields},`{field}`");
1670 }
1671 fields = fields.trim_start_matches(",").to_string();
1672
1673 let core_count = num_cpus::get();
1674 let mut p = pools::Pool::new(core_count * 4);
1675 let autoinc = self.params.autoinc;
1676 for list in data.members() {
1677 let mut item = list.clone();
1678 let params_location = self.params.location.clone();
1679 let fields_list_new = fields_list.clone();
1680 p.execute(move |pcindex| {
1681 if !autoinc && item["id"].is_empty() {
1682 let id = format!(
1683 "{:X}{:X}",
1684 Local::now().timestamp_nanos_opt().unwrap_or(0),
1685 pcindex
1686 );
1687 item["id"] = id.into();
1688 }
1689 let mut values = "".to_string();
1690 for (field, value) in item.entries() {
1691 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1692 let is_location = params_location.has_key(field) || col_type == "point";
1693 if is_location {
1694 if value.is_empty() {
1695 values = format!("{values},NULL");
1696 continue;
1697 }
1698 let comment = fields_list_new[field]["comment"].to_string();
1699 let srid = comment
1700 .split("|")
1701 .collect::<Vec<&str>>()
1702 .last()
1703 .unwrap_or(&"0")
1704 .to_string();
1705 let location = value.to_string().replace(",", " ");
1706 values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1707 continue;
1708 }
1709 if value.is_string() {
1710 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1711 } else if value.is_number() {
1712 let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1713 if col_type.contains("int") {
1714 values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1715 } else {
1716 values = format!("{values},{value}");
1717 }
1718 } else if value.is_boolean() {
1719 values = format!("{values},{value}");
1720 } else {
1721 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1722 }
1723 }
1724 values = format!("({})", values.trim_start_matches(","));
1725 array![item["id"].clone(), values]
1726 });
1727 }
1728 let (ids_list, mut values) = p.insert_all();
1729 values = values.trim_start_matches(",").to_string();
1730 let sql = format!(
1731 "INSERT INTO {} ({}) VALUES {};",
1732 self.params.table, fields, values
1733 );
1734
1735 if self.params.sql {
1736 return JsonValue::from(sql.clone());
1737 }
1738 let (state, data) = self.execute(sql.as_str());
1739 match state {
1740 true => match autoinc {
1741 true => data,
1742 false => JsonValue::from(ids_list),
1743 },
1744 false => {
1745 error!("insert_all: {data:?}");
1746 array![]
1747 }
1748 }
1749 }
1750 fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1751 let fields_list = self.table_info(&self.params.table.clone());
1752
1753 let mut fields = vec![];
1754 let mut values = vec![];
1755 if !self.params.autoinc && data["id"].is_empty() {
1756 let thread_id = format!("{:?}", std::thread::current().id());
1757 let thread_num: u64 = thread_id
1758 .trim_start_matches("ThreadId(")
1759 .trim_end_matches(")")
1760 .parse()
1761 .unwrap_or(0);
1762 data["id"] = format!(
1763 "{:X}{:X}",
1764 Local::now().timestamp_nanos_opt().unwrap_or(0),
1765 thread_num
1766 )
1767 .into();
1768 }
1769 for (field, value) in data.entries() {
1770 fields.push(format!("`{field}`"));
1771
1772 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1773 let is_location = (self.params.location.has_key(field)
1774 && !self.params.location[field].is_empty())
1775 || col_type == "point";
1776 if is_location {
1777 if value.is_empty() {
1778 values.push("NULL".to_string());
1779 continue;
1780 }
1781 let comment = fields_list[field]["comment"].to_string();
1782 let srid = comment
1783 .split("|")
1784 .collect::<Vec<&str>>()
1785 .last()
1786 .unwrap_or(&"0")
1787 .to_string();
1788 let location = value.to_string().replace(",", " ");
1789 values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1790 continue;
1791 }
1792
1793 if value.is_string() || value.is_array() || value.is_object() {
1794 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1795 continue;
1796 } else if value.is_number() {
1797 if col_type.contains("int") {
1798 values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1799 } else {
1800 values.push(format!("{value}"));
1801 }
1802 continue;
1803 } else if value.is_boolean() || value.is_null() {
1804 values.push(format!("{value}"));
1805 continue;
1806 } else {
1807 values.push(format!("'{value}'"));
1808 continue;
1809 }
1810 }
1811
1812 let conflict_set: Vec<String> = fields
1813 .iter()
1814 .filter(|f| {
1815 let name = f.trim_matches('`');
1816 !conflict_fields.contains(&name) && name != "id"
1817 })
1818 .map(|f| format!("{f}=VALUES({f})"))
1819 .collect();
1820
1821 let fields_str = fields.join(",");
1822 let values_str = values.join(",");
1823
1824 let sql = format!(
1825 "INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {};",
1826 self.params.table,
1827 fields_str,
1828 values_str,
1829 conflict_set.join(",")
1830 );
1831 if self.params.sql {
1832 return JsonValue::from(sql.clone());
1833 }
1834 let (state, result) = self.execute(sql.as_str());
1835 match state {
1836 true => match self.params.autoinc {
1837 true => result.clone(),
1838 false => data["id"].clone(),
1839 },
1840 false => {
1841 let thread_id = format!("{:?}", thread::current().id());
1842 error!("upsert失败: {thread_id} {result:?} {sql}");
1843 JsonValue::from("")
1844 }
1845 }
1846 }
1847 fn update(&mut self, data: JsonValue) -> JsonValue {
1848 let fields_list = self.table_info(&self.params.table.clone());
1849
1850 let mut values = vec![];
1851 for (field, value) in data.entries() {
1852 if !self.params.json[field].is_empty() {
1853 let json = value.to_string().replace("'", "''");
1854 values.push(format!("`{field}`='{json}'"));
1855 continue;
1856 }
1857 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1858 let is_location = !self.params.location[field].is_empty() || col_type == "point";
1859 if is_location {
1860 if value.is_empty() {
1861 values.push(format!("{field}=NULL").to_string());
1862 continue;
1863 }
1864 let comment = fields_list[field]["comment"].to_string();
1865 let srid = comment
1866 .split("|")
1867 .collect::<Vec<&str>>()
1868 .last()
1869 .unwrap_or(&"0")
1870 .to_string();
1871 let location = value.to_string().replace(",", " ");
1872 values.push(format!(
1873 "{field}=ST_GeomFromText('POINT({location})',{srid})"
1874 ));
1875
1876 continue;
1877 }
1878
1879 if value.is_string() {
1880 values.push(format!(
1881 "`{field}`='{}'",
1882 value.to_string().replace("'", "''")
1883 ));
1884 } else if value.is_number() {
1885 if col_type.contains("int") {
1886 values.push(format!(
1887 "`{field}`= {}",
1888 value.as_f64().unwrap_or(0.0) as i64
1889 ));
1890 } else {
1891 values.push(format!("`{field}`= {value}"));
1892 }
1893 } else if value.is_array() {
1894 let array = value
1895 .members()
1896 .map(|x| x.as_str().unwrap_or(""))
1897 .collect::<Vec<&str>>()
1898 .join(",");
1899 values.push(format!("`{field}`='{array}'"));
1900 continue;
1901 } else if value.is_object() {
1902 if self.params.json[field].is_empty() {
1903 values.push(format!("`{field}`='{value}'"));
1904 } else {
1905 if value.is_empty() {
1906 values.push(format!("`{field}`=''"));
1907 continue;
1908 }
1909 let json = value.to_string();
1910 let json = json.replace("'", "''");
1911 values.push(format!("`{field}`='{json}'"));
1912 }
1913 continue;
1914 } else if value.is_boolean() {
1915 values.push(format!("`{field}`= {value}"));
1916 } else {
1917 values.push(format!("`{field}`=\"{value}\""));
1918 }
1919 }
1920
1921 for (field, value) in self.params.inc_dec.entries() {
1922 values.push(format!("{field} = {}", value.to_string().clone()));
1923 }
1924 if !self.params.update_column.is_empty() {
1925 values.extend(self.params.update_column.clone());
1926 }
1927
1928 let values = values.join(",");
1929
1930 let sql = format!(
1931 "UPDATE {} SET {values} {};",
1932 self.params.table.clone(),
1933 self.params.where_sql()
1934 );
1935 if self.params.sql {
1936 return JsonValue::from(sql.clone());
1937 }
1938 let (state, data) = self.execute(sql.as_str());
1939 if state {
1940 data
1941 } else {
1942 let thread_id = format!("{:?}", thread::current().id());
1943 error!("update: {thread_id} {data:?} {sql}");
1944 0.into()
1945 }
1946 }
1947
1948 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1949 let fields_list = self.table_info(&self.params.table.clone());
1950 let mut values = vec![];
1951 let mut ids = vec![];
1952 for (field, _) in data[0].entries() {
1953 if field == "id" {
1954 continue;
1955 }
1956 let mut fields = vec![];
1957 for row in data.members() {
1958 let value = row[field].clone();
1959 let id = row["id"].clone();
1960 ids.push(id.clone());
1961
1962 if self.params.json.has_key(field) {
1963 let json = value.to_string();
1964 let json = json.replace("'", "''");
1965 fields.push(format!("WHEN '{id}' THEN '{json}'"));
1966 continue;
1967 }
1968 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1969 let is_location = (self.params.location.has_key(field)
1970 && !self.params.location[field].is_empty())
1971 || col_type == "point";
1972 if is_location {
1973 let comment = fields_list[field]["comment"].to_string();
1974 let srid = comment
1975 .split("|")
1976 .collect::<Vec<&str>>()
1977 .last()
1978 .unwrap_or(&"0")
1979 .to_string();
1980 let location = value.to_string().replace(",", " ");
1981 let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1982 fields.push(format!("WHEN '{id}' THEN {location}"));
1983 continue;
1984 }
1985 if value.is_string() {
1986 fields.push(format!(
1987 "WHEN '{id}' THEN '{}'",
1988 value.to_string().replace("'", "''")
1989 ));
1990 } else if value.is_array() || value.is_object() {
1991 fields.push(format!("WHEN '{id}' THEN '{value}'"));
1992 } else if value.is_number() {
1993 let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1994 if col_type.contains("int") {
1995 fields.push(format!(
1996 "WHEN '{id}' THEN {}",
1997 value.as_f64().unwrap_or(0.0) as i64
1998 ));
1999 } else {
2000 fields.push(format!("WHEN '{id}' THEN {value}"));
2001 }
2002 } else if value.is_boolean() || value.is_null() {
2003 fields.push(format!("WHEN '{id}' THEN {value}"));
2004 } else {
2005 fields.push(format!("WHEN '{id}' THEN '{value}'"));
2006 }
2007 }
2008 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
2009 }
2010 self.where_and("id", "in", ids.into());
2011 for (field, value) in self.params.inc_dec.entries() {
2012 values.push(format!("{} = {}", field, value.to_string().clone()));
2013 }
2014
2015 let values = values.join(",");
2016 let sql = format!(
2017 "UPDATE {} SET {} {} {};",
2018 self.params.table.clone(),
2019 values,
2020 self.params.where_sql(),
2021 self.params.page_limit_sql()
2022 );
2023 if self.params.sql {
2024 return JsonValue::from(sql.clone());
2025 }
2026 let (state, data) = self.execute(sql.as_str());
2027 if state {
2028 data
2029 } else {
2030 error!("update_all: {data:?}");
2031 JsonValue::from(0)
2032 }
2033 }
2034
2035 fn delete(&mut self) -> JsonValue {
2036 let sql = format!(
2037 "delete FROM {} {} {};",
2038 self.params.table.clone(),
2039 self.params.where_sql(),
2040 self.params.page_limit_sql()
2041 );
2042 if self.params.sql {
2043 return JsonValue::from(sql.clone());
2044 }
2045 let (state, data) = self.execute(sql.as_str());
2046 match state {
2047 true => data,
2048 false => {
2049 error!("delete 失败>>> {data:?}");
2050 JsonValue::from(0)
2051 }
2052 }
2053 }
2054 fn transaction(&mut self) -> bool {
2055 let thread_id = format!("{:?}", thread::current().id());
2056 let key = format!("{}{}", self.default, thread_id);
2057
2058 if TRANSACTION_MANAGER.is_in_transaction(&key) {
2059 let depth = TRANSACTION_MANAGER.get_depth(&key);
2060 TRANSACTION_MANAGER.increment_depth(&key);
2061 let sp = format!("SAVEPOINT sp_{}", depth + 1);
2062 let _ = self.query(&sp);
2063 return true;
2064 }
2065
2066 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
2067 Ok(e) => e,
2068 Err(err) => {
2069 error!("transaction 获取连接超时: {err}");
2070 return false;
2071 }
2072 };
2073
2074 if !TRANSACTION_MANAGER.start(&key, conn) {
2075 return false;
2076 }
2077
2078 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
2079 let (state, _) = self.query(sql);
2080 if !state {
2081 TRANSACTION_MANAGER.remove(&key, &thread_id);
2082 }
2083 state
2084 }
2085
2086 fn commit(&mut self) -> bool {
2087 let thread_id = format!("{:?}", thread::current().id());
2088 let key = format!("{}{}", self.default, thread_id);
2089
2090 let depth = TRANSACTION_MANAGER.get_depth(&key);
2091 if depth > 1 {
2092 let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
2093 let _ = self.query(&sp);
2094 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2095 return true;
2096 }
2097
2098 let sql = "COMMIT";
2099 let (state, data) = self.query(sql);
2100
2101 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2102
2103 if !state {
2104 error!("提交事务失败: {data}");
2105 }
2106 state
2107 }
2108
2109 fn rollback(&mut self) -> bool {
2110 let thread_id = format!("{:?}", thread::current().id());
2111 let key = format!("{}{}", self.default, thread_id);
2112
2113 let depth = TRANSACTION_MANAGER.get_depth(&key);
2114 if depth > 1 {
2115 let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
2116 let _ = self.query(&sp);
2117 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2118 return true;
2119 }
2120
2121 let sql = "ROLLBACK";
2122 let (state, data) = self.query(sql);
2123
2124 TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2125
2126 if !state {
2127 error!("回滚失败: {data}");
2128 }
2129 state
2130 }
2131
2132 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2133 let (state, data) = self.query(sql);
2134 match state {
2135 true => Ok(data),
2136 false => Err(data.to_string()),
2137 }
2138 }
2139
2140 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2141 let (state, data) = self.execute(sql);
2142 match state {
2143 true => Ok(data),
2144 false => Err(data.to_string()),
2145 }
2146 }
2147
2148 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2149 self.params.inc_dec[field] = format!("`{field}` + {num}").into();
2150 self
2151 }
2152 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2153 self.params.inc_dec[field] = format!("`{field}` - {num}").into();
2154 self
2155 }
2156
2157 fn buildsql(&mut self) -> String {
2158 self.fetch_sql();
2159 let sql = self.select().to_string();
2160 format!("( {} ) `{}`", sql, self.params.table)
2161 }
2162
2163 fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2164 for field in fields.clone() {
2165 if field.contains(format!("{}.", self.params.table).as_str()) {
2166 self.params.fields[field] = field.into();
2167 } else {
2168 self.params.fields[field] =
2169 format!("{field} as {}", field.replace(".", "_")).into();
2170 }
2171 }
2172 self
2173 }
2174
2175 fn join(
2176 &mut self,
2177 main_table: &str,
2178 main_fields: &str,
2179 right_table: &str,
2180 right_fields: &str,
2181 ) -> &mut Self {
2182 let main_table = if main_table.is_empty() {
2183 self.params.table.clone()
2184 } else {
2185 main_table.to_string()
2186 };
2187 self.params.join_table = right_table.to_string();
2188 self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2189 self
2190 }
2191
2192 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2193 let main_fields = if main_fields.is_empty() {
2194 "id"
2195 } else {
2196 main_fields
2197 };
2198 let second_fields = if second_fields.is_empty() {
2199 self.params.table.clone()
2200 } else {
2201 second_fields.to_string().clone()
2202 };
2203 let sec_table_name = format!("{}{}", table, "_2");
2204 let second_table = format!("{} {}", table, sec_table_name.clone());
2205 self.params.join_table = sec_table_name.clone();
2206 self.params.join.push(format!(
2207 " INNER JOIN {} ON {}.{} = {}.{}",
2208 second_table, self.params.table, main_fields, sec_table_name, second_fields
2209 ));
2210 self
2211 }
2212
2213 fn join_right(
2214 &mut self,
2215 main_table: &str,
2216 main_fields: &str,
2217 right_table: &str,
2218 right_fields: &str,
2219 ) -> &mut Self {
2220 let main_table = if main_table.is_empty() {
2221 self.params.table.clone()
2222 } else {
2223 main_table.to_string()
2224 };
2225 self.params.join_table = right_table.to_string();
2226 self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2227 self
2228 }
2229
2230 fn join_full(
2231 &mut self,
2232 main_table: &str,
2233 main_fields: &str,
2234 right_table: &str,
2235 right_fields: &str,
2236 ) -> &mut Self {
2237 let main_table = if main_table.is_empty() {
2238 self.params.table.clone()
2239 } else {
2240 main_table.to_string()
2241 };
2242 self.params.join_table = right_table.to_string();
2243 self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2244 self
2245 }
2246
2247 fn union(&mut self, sub_sql: &str) -> &mut Self {
2248 self.params.unions.push(format!("UNION {sub_sql}"));
2249 self
2250 }
2251
2252 fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2253 self.params.unions.push(format!("UNION ALL {sub_sql}"));
2254 self
2255 }
2256
2257 fn lock_for_update(&mut self) -> &mut Self {
2258 self.params.lock_mode = "FOR UPDATE".to_string();
2259 self
2260 }
2261
2262 fn lock_for_share(&mut self) -> &mut Self {
2263 self.params.lock_mode = "FOR SHARE".to_string();
2264 self
2265 }
2266}