1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::Connection;
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::Queryable;
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19
20lazy_static! {
21 static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
22 static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
23 static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
24 Arc::new(Mutex::new(HashMap::new()));
25}
26#[cfg(any(feature = "default", feature = "db-mysql"))]
27#[derive(Clone, Debug)]
28pub struct Mysql {
29 pub connection: Connection,
31 pub default: String,
33 pub params: Params,
34 pub pool: Pool,
35}
36
37impl Mysql {
38 pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
39 let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
40
41 let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
42
43 match Pool::new(opts) {
44 Ok(pool) => Ok(Self {
45 connection: connection.clone(),
46 default: default.clone(),
47 params: Params::default("mysql"),
48 pool,
49 }),
50 Err(e) => {
51 error!("connect: {}", e);
52 Err(e.to_string())
53 }
54 }
55 }
56 fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
57 if sql.contains("INSERT") {
58 let rows = text.affected_rows();
59 if rows > 1 {
60 if self.params.autoinc {
61 let row = rows;
62 let start_row = text.last_insert_id().unwrap();
63 let end_row = start_row + row;
64
65 let mut ids = array![];
66 for item in start_row..end_row {
67 ids.push(item).unwrap();
68 }
69 (true, ids)
70 } else {
71 (true, JsonValue::from(rows))
72 }
73 } else {
74 (true, JsonValue::from(text.last_insert_id()))
75 }
76 } else {
77 (true, JsonValue::from(text.affected_rows()))
78 }
79 }
80 fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
81 let mut list = array![];
82 let mut index = 0;
83 text.for_each(|row| {
84 match row {
85 Ok(r) => {
86 let mut data = object! {};
87 let mut index = 0;
88 for item in r.columns().iter() {
89 let field = item.name_str();
90 let field = field.to_string();
91 let field = field.as_str();
92 if !data[field].is_null() {
93 index += 1;
94 continue;
95 }
96 data[field] = match item.column_type() {
97 ColumnType::MYSQL_TYPE_TINY => {
98 let data = r.get::<bool, _>(index).unwrap_or(true);
99 JsonValue::from(data)
100 }
101 ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
102 let data = r.index(field).clone();
103 if data == NULL {
104 JsonValue::from(0.0)
105 } else {
106 match r.get::<f64, _>(index) {
107 None => JsonValue::from(0.0),
108 Some(data) => JsonValue::from(data),
109 }
110 }
111 }
112 ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
113 let data = r.index(field).clone();
114 if data == NULL {
115 JsonValue::from(0)
116 } else {
117 let data = r.get::<i64, _>(index).unwrap();
118 JsonValue::from(data)
119 }
120 }
121 ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME => {
122 let data = r.index(field).clone();
123 if data == NULL {
124 JsonValue::from("".to_string())
125 } else {
126 let data = r.get::<String, _>(index).unwrap();
127 JsonValue::from(data)
128 }
129 }
130 ColumnType::MYSQL_TYPE_BLOB => {
131 let data = r.index(field).clone();
132 if data == NULL {
133 JsonValue::from("".to_string())
134 } else {
135 let data = r.get::<String, _>(index).unwrap();
136 JsonValue::from(data)
137 }
138 }
139 ColumnType::MYSQL_TYPE_VAR_STRING | ColumnType::MYSQL_TYPE_STRING => {
140 let data = r.index(field).clone();
141 if data == NULL {
142 JsonValue::from("".to_string())
143 } else {
144 let data = r.get::<String, _>(index).unwrap();
145 JsonValue::from(data)
146 }
147 }
148 ColumnType::MYSQL_TYPE_LONG_BLOB => {
149 let data = r.index(field).clone();
150 if data == NULL {
151 JsonValue::from("".to_string())
152 } else {
153 let data = r.get::<String, _>(index).unwrap();
154 JsonValue::from(data)
155 }
156 }
157 ColumnType::MYSQL_TYPE_TIMESTAMP => {
158 let data = r.index(field).clone();
159 if data == NULL {
160 JsonValue::from("".to_string())
161 } else {
162 let data = r.get::<String, _>(index).unwrap();
163 JsonValue::from(data)
164 }
165 }
166 ColumnType::MYSQL_TYPE_NULL => {
167 let data = r.index(field).clone();
168 if data == NULL {
169 JsonValue::from("".to_string())
170 } else {
171 let data = r.get::<String, _>(index).unwrap();
172 JsonValue::from(data)
173 }
174 }
175 _ => {
176 let data = r.index(field).clone();
177 info!("未知: {} {:?} {:?}", field, item.column_type(), data);
178 JsonValue::from("".to_string())
179 }
180 };
181 index += 1;
182 }
183 list.push(data).unwrap();
184 }
185 Err(e) => {
186 error!("err: {} \r\n {}", e, sql);
187 }
188 }
189 index += 1;
190 });
191 (true, list)
192 }
193 fn query(&mut self, sql: &str) -> (bool, JsonValue) {
194 let thread_id = format!("{:?}", thread::current().id());
195 let key = format!("{}{}", self.default, thread_id);
196
197 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
198 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
199 Ok(e) => e,
200 Err(err) => {
201 error!("execute超时: {}", err);
202 return (false, object! {});
203 }
204 };
205 let connection_id = db.connection_id();
206 return match db.query_iter(sql) {
207 Ok(e) => {
208 if self.connection.debug {
209 info!("查询成功: {} {}", thread_id.clone(), sql);
210 }
211 self.query_handle(e, sql)
212 }
213 Err(e) => {
214 error!(
215 "非事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}] 连接ID: {}",
216 thread_id,
217 e,
218 sql,
219 connection_id
220 );
221 (false, JsonValue::from(e.to_string()))
222 }
223 };
224 } else {
225 let mut tr = TR.lock().unwrap();
226 let db = tr.get_mut(&*key).unwrap();
227 let connection_id = db.connection_id();
228 return match db.query_iter(sql) {
229 Ok(e) => {
230 if self.connection.debug {
231 info!("查询成功: {} {}", thread_id.clone(), sql);
232 }
233 self.query_handle(e, sql)
234 }
235 Err(e) => {
236 error!(
237 "事务查询失败: {} {} {} 连接ID: {}",
238 thread_id,
239 e,
240 sql,
241 connection_id
242 );
243 (false, JsonValue::from(e.to_string()))
244 }
245 };
246 };
247 }
248 fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
249 let thread_id = format!("{:?}", thread::current().id());
250 let key = format!("{}{}", self.default, thread_id);
251
252 if TRANS.lock().unwrap().get(&*thread_id).is_none() {
253 let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
254 Ok(e) => e,
255 Err(err) => {
256 error!("非事务: execute超时: {}", err);
257 return (false, object! {});
258 }
259 };
260 return match db.exec_iter(sql, ()) {
261 Ok(e) => {
262 if self.connection.debug {
263 info!("提交成功: {} {}", thread_id.clone(), sql);
264 }
265 self.execute_cl(e, sql)
266 }
267 Err(e) => {
268 error!("非事务提交失败: {} {} {}", thread_id, e, sql);
269 (false, JsonValue::from(e.to_string()))
270 }
271 };
272 } else {
273 let mut count_flag: i64 = 0;
275 loop {
276 let mut t = TRANS_TABLE.lock().unwrap();
277 if t.get(&self.params.table).is_none() {
278 t.insert(self.params.table.clone(), thread_id.clone());
279 break;
280 }
281 if t.get(&self.params.table).unwrap().clone() == thread_id {
282 break;
283 }
284 thread::yield_now();
285
286 count_flag += 1;
287 if count_flag == 10000 {
288 warn!("execute循环次数: 1w,强制退出");
289 break;
290 }
291 }
292
293 let mut tr = TR.lock().unwrap();
294 let db = tr.get_mut(&*key).unwrap();
295
296 return match db.exec_iter(sql, ()) {
297 Ok(e) => {
298 if self.connection.debug {
299 info!("提交成功: {} {}", thread_id.clone(), sql);
300 }
301
302 self.execute_cl(e, sql)
303 }
304 Err(e) => {
305 error!("事务提交失败: {} {} {}", thread_id, e, sql);
306 (false, JsonValue::from(e.to_string()))
307 }
308 };
309 };
310 }
311}
312
313impl DbMode for Mysql {
314 fn database_tables(&mut self) -> JsonValue {
315 let sql = "SHOW TABLES".to_string();
316 match self.sql(sql.as_str()) {
317 Ok(e) => {
318 let mut list = vec![];
319 for item in e.members() {
320 for (_, value) in item.entries() {
321 list.push(value.clone());
322 }
323 }
324 list.into()
325 }
326 Err(_) => {
327 array![]
328 }
329 }
330 }
331
332 fn database_create(&mut self, name: &str) -> bool {
333 let sql = format!("CREATE DATABASE {}", name);
334
335 let (state, data) = self.execute(sql.as_str());
336 match state {
337 true => data.as_bool().unwrap(),
338 false => {
339 error!("创建数据库失败: {:?}", data);
340 false
341 }
342 }
343 }
344}
345
346impl Mode for Mysql {
347 fn table_create(&mut self, options: TableOptions) -> JsonValue {
348 let mut sql = String::new();
349 let mut unique_fields = String::new();
351 let mut unique_name = String::new();
352 let mut unique = String::new();
353 for item in options.table_unique.iter() {
354 if unique_fields.is_empty() {
355 unique_fields = format!("`{}`", item);
356 unique_name = format!("{}_unique_{}", options.table_name, item);
357 } else {
358 unique_fields = format!("{},`{}`", unique_fields, item);
359 unique_name = format!("{}_{}", unique_name, item);
360 }
361 let digest = md5::compute(unique_name.clone());
362 let text = format!("unique_{:x}", digest);
363 unique = format!("UNIQUE KEY `{}` ({})", text, unique_fields);
364 }
365
366 let mut index = String::new();
368 for row in options.table_index.iter() {
369 let mut index_fields = String::new();
370 let mut index_name = String::new();
371 for item in row.iter() {
372 if index_fields.is_empty() {
373 index_fields = format!("`{}`", item);
374 index_name = format!("{}_index_{}", options.table_name, item);
375 } else {
376 index_fields = format!("{},`{}`", index_fields, item);
377 index_name = format!("{}_{}", index_name, item);
378 }
379 }
380 if index.is_empty() {
381 index = format!("INDEX `{}` ({})", index_name, index_fields);
382 } else {
383 index = format!("{},\r\nINDEX `{}` ({})", index, index_name, index_fields);
384 }
385 }
386 if index.replace(",", "").is_empty() {
387 index = index.replace(",", "");
388 }
389
390 for (name, field) in options.table_fields.entries() {
391 let row = br_fields::field("mysql", name, field.clone());
392 sql = format!("{} {},\r\n", sql, row);
393 }
394
395 if !unique.is_empty() {
396 sql = sql.trim_end_matches(",\r\n").to_string();
397 sql = format!("{},\r\n{}", sql, unique);
398 }
399 if !index.is_empty() {
400 sql = sql.trim_end_matches(",\r\n").to_string();
401 sql = format!("{},\r\n{}", sql, index);
402 }
403 let collate = format!("{}_bin", self.connection.charset.str());
404
405 let partition = if options.table_partition {
407 sql = format!(
408 "{},\r\nPRIMARY KEY(`{}`,`{}`)",
409 sql,
410 options.table_key,
411 options.table_partition_columns[0].clone()
412 );
413 let temp_head = format!(
414 "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
415 options.table_partition_columns[0].clone()
416 );
417 let mut partition_array = vec![];
418 let mut count = 0;
419 for member in options.table_partition_columns[1].members() {
420 let temp = format!(
421 "PARTITION p{} VALUES LESS THAN ('{}')",
422 count.clone(),
423 member.clone()
424 );
425 count += 1;
426 partition_array.push(temp.clone());
427 }
428 let temp_body = partition_array.join(",\r\n");
429 let temp_end = format!(
430 ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
431 count.clone()
432 );
433 format!("{}{}{}", temp_head, temp_body, temp_end)
434 } else {
435 sql = if sql.trim_end().ends_with(",") {
436 format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
437 } else {
438 format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
439 };
440 "".to_string()
441 };
442 let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
443
444 if self.params.sql {
445 return JsonValue::from(sql);
446 }
447
448 let (state, data) = self.execute(sql.as_str());
449
450 match state {
451 true => JsonValue::from(state),
452 false => {
453 info!("创建错误: {}", data);
454 JsonValue::from(state)
455 }
456 }
457 }
458
459 fn table_update(&mut self, options: TableOptions) -> JsonValue {
460 let mut sql = vec![];
461
462 let fields_list = self.table_info(&options.table_name);
463 let mut put = vec![];
464 let mut add = vec![];
465 let mut del = vec![];
466 for (key, _) in fields_list.entries() {
467 if options.table_fields[key].is_empty() {
468 del.push(key);
469 }
470 }
471 for (name, field) in options.table_fields.entries() {
472 if !fields_list[name].is_empty() {
473 let old_comment = fields_list[name]["comment"].to_string();
474 let new_comment = br_fields::field("mysql", name, field.clone());
475 let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
476 let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
477 if old_comment == new_comment_text {
478 continue;
479 }
480 put.push(name);
481 } else {
482 add.push(name);
483 }
484 }
485
486 for name in add.iter() {
487 let name = name.to_string();
488 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
489 sql.push(format!(
490 "ALTER TABLE {} add {};\r\n",
491 options.table_name, row
492 ));
493 }
494 for name in del.iter() {
495 sql.push(format!(
496 "ALTER TABLE {} DROP `{}`;\r\n",
497 options.table_name, name
498 ));
499 }
500 for name in put.iter() {
501 let name = name.to_string();
502 let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
503 sql.push(format!(
504 "ALTER TABLE {} CHANGE `{}` {};\r\n",
505 options.table_name, name, row
506 ));
507 }
508
509 let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
510 let (_, pk_list) = self.query(
512 format!(
513 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
514 WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
515 self.connection.database, options.table_name
516 ).as_str(),
517 );
518 let mut pk_vec = vec![];
519 for member in pk_list.members() {
520 pk_vec.push(member["COLUMN_NAME"].to_string());
521 }
522
523 let mut unique_new = vec![];
524 let mut index_new = vec![];
525 for item in index_list.members() {
526 let key_name = item["Key_name"].as_str().unwrap();
527 let non_unique = item["Non_unique"].as_i32().unwrap();
528
529 if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
530 {
531 unique_new.push(key_name.to_string());
532 continue;
533 }
534 if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
535 {
536 index_new.push(key_name.to_string());
537 continue;
538 }
539 }
540
541 let mut unique_fields = String::new();
542 let mut unique_name = String::new();
543 for item in options.table_unique.iter() {
544 if unique_fields.is_empty() {
545 unique_fields = format!("`{}`", item);
546 unique_name = format!("{}_unique_{}", options.table_name, item);
547 } else {
548 unique_fields = format!("{},`{}`", unique_fields, item);
549 unique_name = format!("{}_{}", unique_name, item);
550 }
551 }
552 if !unique_name.is_empty() {
553 let digest = md5::compute(unique_name);
554 unique_name = format!("unique_{:x}", digest);
555 for item in &unique_new {
556 if unique_name != *item {
557 sql.push(format!(
558 "alter table {} drop index {};\r\n",
559 options.table_name, item
560 ));
561 }
562 }
563 if !unique_new.contains(&unique_name) {
564 sql.push(format!(
565 "CREATE UNIQUE index {} on {} ({});\r\n",
566 unique_name, options.table_name, unique_fields
567 ));
568 }
569 }
570
571 let mut index_list = vec![];
572 for row in options.table_index.iter() {
574 let mut index_fields = String::new();
575 let mut index_name = String::new();
576 for item in row {
577 if index_fields.is_empty() {
578 index_fields = item.to_string();
579 index_name = format!("{}_index_{}", options.table_name, item);
580 } else {
581 index_fields = format!("{},{}", index_fields, item);
582 index_name = format!("{}_{}", index_name, item);
583 }
584 }
585 index_list.push(index_name.clone());
586 if !index_new.contains(&index_name.clone()) {
587 sql.push(format!(
588 "CREATE INDEX {} on {} ({});\r\n",
589 index_name, options.table_name, index_fields
590 ));
591 }
592 }
593
594 for item in index_new {
595 if !index_list.contains(&item.to_string()) {
596 sql.push(format!(
597 "DROP INDEX {} ON {};\r\n",
598 item.clone(),
599 options.table_name
600 ));
601 }
602 }
603
604 if options.table_partition {
606 if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
608 {
609 let pk = format!(
610 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
611 options.table_name,
612 options.table_key,
613 options.table_partition_columns[0].clone()
614 );
615 sql.push(pk);
616 let temp_head = format!(
617 "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
618 options.table_name,
619 options.table_partition_columns[0].clone()
620 );
621 let mut partition_array = vec![];
622 let mut count = 0;
623 for member in options.table_partition_columns[1].members() {
624 let temp = format!(
625 "PARTITION p{} VALUES LESS THAN ('{}')",
626 count.clone(),
627 member.clone()
628 );
629 count += 1;
630 partition_array.push(temp.clone());
631 }
632 let temp_body = partition_array.join(",\r\n");
633 let temp_end = format!(",PARTITION p{} VALUES LESS THAN (MAXVALUE) )", count);
634 sql.push(format!("{}{}{};\r\n", temp_head, temp_body, temp_end));
635 }
636 } else if pk_vec.len() != 1 {
637 let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
638 sql.push(rm_partition);
639 let pk = format!(
640 "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
641 options.table_name, options.table_key
642 );
643 sql.push(pk);
644 };
645
646 if self.params.sql {
647 return JsonValue::from(sql.join(""));
648 }
649
650 if sql.is_empty() {
651 return JsonValue::from(-1);
652 }
653
654 for item in sql.iter() {
655 let (state, res) = self.execute(item.as_str());
656 match state {
657 true => {}
658 false => {
659 info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
660 return JsonValue::from(0);
661 }
662 }
663 }
664 JsonValue::from(1)
665 }
666
667 fn table_info(&mut self, table: &str) -> JsonValue {
668 let sql = format!(
669 "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}'",
670 table
671 );
672 let (state, data) = self.query(sql.as_str());
673 let mut list = object! {};
674 if state {
675 for item in data.members() {
676 if item["TABLE_SCHEMA"] != self.connection.database {
677 continue;
678 }
679 let mut row = object! {};
680 row["field"] = item["COLUMN_NAME"].clone();
681 row["comment"] = item["COLUMN_COMMENT"].clone();
682 row["type"] = item["COLUMN_TYPE"].clone();
683 list[row["field"].as_str().unwrap()] = row.clone();
684 }
685 list
686 } else {
687 list
688 }
689 }
690
691 fn table_is_exist(&mut self, name: &str) -> bool {
692 let sql = format!(
693 "select * from information_schema.TABLES where TABLE_NAME like '%{}%'",
694 name
695 );
696 let (state, data) = self.query(sql.as_str());
697 match state {
698 true => {
699 for item in data.members() {
700 if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
701 {
702 return true;
703 }
704 }
705 false
706 }
707 false => false,
708 }
709 }
710
711 fn table(&mut self, name: &str) -> &mut Mysql {
712 self.params = Params::default(self.connection.mode.str().as_str());
713 self.params.table = format!("{}{}", self.connection.prefix, name);
714 self.params.join_table = self.params.table.clone();
715 self
716 }
717
718 fn change_table(&mut self, name: &str) -> &mut Self {
719 self.params.join_table = name.to_string();
720 self
721 }
722
723 fn autoinc(&mut self) -> &mut Self {
724 self.params.autoinc = true;
725 self
726 }
727
728 fn fetch_sql(&mut self) -> &mut Self {
729 self.params.sql = true;
730 self
731 }
732
733 fn order(&mut self, field: &str, by: bool) -> &mut Self {
734 self.params.order[field] = {
735 if by {
736 "DESC"
737 } else {
738 "ASC"
739 }
740 }.into();
741 self
742 }
743
744 fn group(&mut self, field: &str) -> &mut Self {
745 let fields: Vec<&str> = field.split(",").collect();
746 for field in fields.iter() {
747 let field = field.to_string();
748 self.params.group[field.as_str()] = field.clone().into();
749 self.params.fields[field.as_str()] = field.clone().into();
750 }
751
752 self
753 }
754
755 fn distinct(&mut self) -> &mut Self {
756 self.params.distinct = true;
757 self
758 }
759
760 fn json(&mut self, field: &str) -> &mut Self {
761 let list: Vec<&str> = field.split(",").collect();
762 for item in list.iter() {
763 self.params.json[item.to_string().as_str()] = item.to_string().into();
764 }
765 self
766 }
767
768 fn column(&mut self, field: &str) -> JsonValue {
769 self.field(field);
770 let sql = self.params.select_sql();
771
772 if self.params.sql {
773 return JsonValue::from(sql);
774 }
775 let (state, data) = self.query(sql.as_str());
776 match state {
777 true => {
778 let mut list = array![];
779 for item in data.members() {
780 if self.params.json[field].is_empty() {
781 list.push(item[field].clone()).unwrap();
782 } else {
783 let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
784 list.push(data).unwrap();
785 }
786 }
787 list
788 }
789 false => {
790 array![]
791 }
792 }
793 }
794
795 fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
796 let join_table = if self.params.join_table.is_empty() {
797 self.params.table.clone()
798 } else {
799 self.params.join_table.clone()
800 };
801 if value.is_boolean() {
802 if value.as_bool().unwrap() {
803 value = 1.into();
804 } else {
805 value = 0.into();
806 }
807 }
808 match compare {
809 "between" => {
810 self.params.where_and.push(format!(
811 "{}.`{}` between '{}' AND '{}'",
812 join_table, field, value[0], value[1]
813 ));
814 }
815 "set" => {
816 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
817 let mut wheredata = vec![];
818 for item in list.iter() {
819 wheredata.push(format!(
820 "FIND_IN_SET('{}',{}.`{}`)",
821 item, join_table, field
822 ));
823 }
824 self.params.where_and.push(format!("({})", wheredata.join(" or ")));
825 }
826 "notin" => {
827 let mut text = String::new();
828 for item in value.members() {
829 text = format!("{},'{}'", text, item);
830 }
831 text = text.trim_start_matches(",").into();
832 self.params.where_and.push(format!("{}.`{}` not in ({})", join_table, field, text));
833 }
834 "is" => {
835 self.params.where_and.push(format!("{}.`{}` is {}", join_table, field, value));
836 }
837 "notlike" => {
838 self.params.where_and.push(format!("{}.`{}` not like '{}'", join_table, field, value));
839 }
840 "in" => {
841 let mut text = String::new();
842 if value.is_array() {
843 for item in value.members() {
844 text = format!("{},'{}'", text, item);
845 }
846 } else if value.is_null() {
847 text = format!("{},null", text);
848 } else {
849 let value = value.as_str().unwrap();
850
851 let value: Vec<&str> = value.split(",").collect();
852 for item in value.iter() {
853 text = format!("{},'{}'", text, item);
854 }
855 }
856 text = text.trim_start_matches(",").into();
857
858 self.params.where_and.push(format!("{}.`{}` {} ({})", join_table, field, compare, text));
859 }
860 _ => {
861 self.params.where_and.push(format!(
862 "{}.`{}` {} '{}'",
863 join_table, field, compare, value
864 ));
865 }
866 }
867 self
868 }
869
870 fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
871 let join_table = if self.params.join_table.is_empty() {
872 self.params.table.clone()
873 } else {
874 self.params.join_table.clone()
875 };
876
877 if value.is_boolean() {
878 if value.as_bool().unwrap() {
879 value = 1.into();
880 } else {
881 value = 0.into();
882 }
883 }
884
885 match compare {
886 "between" => {
887 self.params.where_or.push(format!(
888 "{}.`{}` between '{}' AND '{}'",
889 join_table, field, value[0], value[1]
890 ));
891 }
892 "set" => {
893 let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
894 let mut wheredata = vec![];
895 for item in list.iter() {
896 wheredata.push(format!(
897 "FIND_IN_SET('{}',{}.`{}`)",
898 item, join_table, field
899 ));
900 }
901 self.params.where_or.push(format!("({})", wheredata.join(" or ")));
902 }
903 "notin" => {
904 let mut text = String::new();
905 for item in value.members() {
906 text = format!("{},'{}'", text, item);
907 }
908 text = text.trim_start_matches(",").into();
909 self.params.where_or.push(format!("{}.`{}` not in ({})", join_table, field, text));
910 }
911 "in" => {
912 let mut text = String::new();
913 if value.is_array() {
914 for item in value.members() {
915 text = format!("{},'{}'", text, item);
916 }
917 } else {
918 let value = value.as_str().unwrap();
919 let value: Vec<&str> = value.split(",").collect();
920 for item in value.iter() {
921 text = format!("{},'{}'", text, item);
922 }
923 }
924 text = text.trim_start_matches(",").into();
925 self.params.where_or.push(format!("{}.`{}` {} ({})", join_table, field, compare, text));
926 }
927 _ => {
928 self.params.where_or.push(format!(
929 "{}.`{}` {} '{}'",
930 join_table, field, compare, value
931 ));
932 }
933 }
934 self
935 }
936
937 fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
938 self.params.where_column = format!(
939 "{}.`{}` {} {}.`{}`",
940 self.params.table, field_a, compare, self.params.table, field_b
941 );
942 self
943 }
944
945 fn count(&mut self) -> JsonValue {
946 self.params.fields["count"] = "count(*) as count".to_string().into();
947 let sql = self.params.select_sql();
948 if self.params.sql {
949 return JsonValue::from(sql.clone());
950 }
951 let (state, data) = self.query(sql.as_str());
952 if state {
953 data[0]["count"].clone()
954 } else {
955 JsonValue::from(0)
956 }
957 }
958
959 fn max(&mut self, field: &str) -> JsonValue {
960 self.params.fields[field] = format!("max({00}) as {00}", field).into();
961 let sql = self.params.select_sql();
962 if self.params.sql {
963 return JsonValue::from(sql.clone());
964 }
965 let (state, data) = self.query(sql.as_str());
966 if state {
967 if data.len() > 1 {
968 return data.clone();
969 }
970 data[0][field].clone()
971 } else {
972 JsonValue::from(0)
973 }
974 }
975
976 fn min(&mut self, field: &str) -> JsonValue {
977 self.params.fields[field] = format!("min({00}) as {00}", field).into();
978 let sql = self.params.select_sql();
979 if self.params.sql {
980 return JsonValue::from(sql.clone());
981 }
982 let (state, data) = self.query(sql.as_str());
983 if state {
984 if data.len() > 1 {
985 return data;
986 }
987 data[0][field].clone()
988 } else {
989 JsonValue::from(0)
990 }
991 }
992
993 fn sum(&mut self, field: &str) -> JsonValue {
994 self.params.fields[field] = format!("sum({00}) as {00}", field).into();
995 let sql = self.params.select_sql();
996 if self.params.sql {
997 return JsonValue::from(sql.clone());
998 }
999 let (state, data) = self.query(sql.as_str());
1000 match state {
1001 true => {
1002 if data.len() > 1 {
1003 return data;
1004 }
1005 data[0][field].clone()
1006 }
1007 false => JsonValue::from(0),
1008 }
1009 }
1010
1011 fn avg(&mut self, field: &str) -> JsonValue {
1012 self.params.fields[field] = format!("avg({00}) as {00}", field).into();
1013 let sql = self.params.select_sql();
1014 if self.params.sql {
1015 return JsonValue::from(sql.clone());
1016 }
1017 let (state, data) = self.query(sql.as_str());
1018 if state {
1019 if data.len() > 1 {
1020 return data;
1021 }
1022 data[0][field].clone()
1023 } else {
1024 JsonValue::from(0)
1025 }
1026 }
1027
1028 fn select(&mut self) -> JsonValue {
1029 let sql = self.params.select_sql();
1030 if self.params.sql {
1031 return JsonValue::from(sql.clone());
1032 }
1033
1034 let (state, mut data) = self.query(sql.as_str());
1035 match state {
1036 true => {
1037 for (field, _) in self.params.json.entries() {
1038 for item in data.members_mut() {
1039 if !item[field].is_empty() {
1040 let json = item[field].to_string();
1041 item[field] = match json::parse(&json) {
1042 Ok(e) => e,
1043 Err(_) => JsonValue::from(json),
1044 };
1045 }
1046 }
1047 }
1048 data.clone()
1049 }
1050 false => array![],
1051 }
1052 }
1053
1054 fn find(&mut self) -> JsonValue {
1055 self.params.page = 1;
1056 self.params.limit = 1;
1057 let sql = self.params.select_sql();
1058 if self.params.sql {
1059 return JsonValue::from(sql.clone());
1060 }
1061 let (state, mut data) = self.query(sql.as_str());
1062 match state {
1063 true => {
1064 if data.is_empty() {
1065 return object! {};
1066 }
1067 for (field, _) in self.params.json.entries() {
1068 if !data[0][field].is_empty() {
1069 let json = data[0][field].to_string();
1070 let json = json::parse(&json).unwrap_or(array![]);
1071 data[0][field] = json;
1072 } else {
1073 data[0][field] = array![];
1074 }
1075 }
1076 data[0].clone()
1077 }
1078 false => {
1079 error!("find失败: {:?}", data);
1080 object! {}
1081 }
1082 }
1083 }
1084
1085 fn value(&mut self, field: &str) -> JsonValue {
1086 self.params.fields = object! {};
1087 self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1088 self.params.page = 1;
1089 self.params.limit = 1;
1090 let sql = self.params.select_sql();
1091 if self.params.sql {
1092 return JsonValue::from(sql.clone());
1093 }
1094 let (state, mut data) = self.query(sql.as_str());
1095 match state {
1096 true => {
1097 for (field, _) in self.params.json.entries() {
1098 if !data[0][field].is_empty() {
1099 let json = data[0][field].to_string();
1100 let json = json::parse(&json).unwrap_or(array![]);
1101 data[0][field] = json;
1102 } else {
1103 data[0][field] = array![];
1104 }
1105 }
1106 data[0][field].clone()
1107 }
1108 false => {
1109 if self.connection.debug {
1110 info!("{:?}", data);
1111 }
1112 JsonValue::Null
1113 }
1114 }
1115 }
1116
1117 fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1118 let mut fields = vec![];
1119 let mut values = vec![];
1120 if !self.params.autoinc && data["id"].is_empty() {
1121 data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1122 }
1123 for (field, value) in data.entries() {
1124 fields.push(format!("`{}`", field));
1125 if value.is_string() || value.is_array() || value.is_object() {
1126 values.push(format!("'{}'", value.to_string().replace("'", "''")));
1127 continue;
1128 } else if value.is_number() || value.is_boolean() || value.is_null() {
1129 values.push(format!("{}", value));
1130 continue;
1131 } else {
1132 values.push(format!("'{}'", value));
1133 continue;
1134 }
1135 }
1136 let fields = fields.join(",");
1137 let values = values.join(",");
1138
1139 let sql = format!(
1140 "INSERT INTO {} ({}) VALUES ({});",
1141 self.params.table, fields, values
1142 );
1143 if self.params.sql {
1144 return JsonValue::from(sql.clone());
1145 }
1146 let (state, ids) = self.execute(sql.as_str());
1147
1148 match state {
1149 true => match self.params.autoinc {
1150 true => ids.clone(),
1151 false => data["id"].clone(),
1152 },
1153 false => {
1154 let thread_id = format!("{:?}", thread::current().id());
1155 error!("添加失败: {} {:?} {}", thread_id, ids, sql);
1156 JsonValue::from("")
1157 }
1158 }
1159 }
1160
1161 fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1162 let mut fields = String::new();
1163 if !self.params.autoinc && data[0]["id"].is_empty() {
1164 data[0]["id"] = "".into();
1165 }
1166 for (field, _) in data[0].entries() {
1167 fields = format!("{},`{}`", fields, field);
1168 }
1169 fields = fields.trim_start_matches(",").parse().unwrap();
1170
1171 let core_count = num_cpus::get();
1172 let mut p = pools::Pool::new(core_count * 4);
1173 let autoinc = self.params.autoinc;
1174 for list in data.members() {
1175 let mut item = list.clone();
1176 p.execute(move |pcindex| {
1177 if !autoinc && item["id"].is_empty() {
1178 let id = format!(
1179 "{:X}{:X}",
1180 Local::now().timestamp_nanos_opt().unwrap(),
1181 pcindex
1182 );
1183 item["id"] = id.into();
1184 }
1185 let mut values = "".to_string();
1186 for (_, value) in item.entries() {
1187 if value.is_string() {
1188 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1189 } else if value.is_number() || value.is_boolean() {
1190 values = format!("{},{}", values, value);
1191 } else {
1192 values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1193 }
1194 }
1195 values = format!("({})", values.trim_start_matches(","));
1196 array![item["id"].clone(), values]
1197 });
1198 }
1199 let (ids_list, mut values) = p.insert_all();
1200 values = values.trim_start_matches(",").parse().unwrap();
1201 let sql = format!(
1202 "INSERT INTO {} ({}) VALUES {};",
1203 self.params.table, fields, values
1204 );
1205
1206 if self.params.sql {
1207 return JsonValue::from(sql.clone());
1208 }
1209 let (state, data) = self.execute(sql.as_str());
1210 match state {
1211 true => match autoinc {
1212 true => data,
1213 false => JsonValue::from(ids_list),
1214 },
1215 false => {
1216 error!("insert_all: {:?}", data);
1217 array![]
1218 }
1219 }
1220 }
1221
1222 fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1223 self.params.page = page;
1224 self.params.limit = limit;
1225 self
1226 }
1227
1228 fn update(&mut self, data: JsonValue) -> JsonValue {
1229 let mut values = vec![];
1230 for (field, value) in data.entries() {
1231 if value.is_string() {
1232 values.push(format!(
1233 "`{}`='{}'",
1234 field,
1235 value.to_string().replace("'", "''")
1236 ));
1237 } else if value.is_number() {
1238 values.push(format!("`{}`= {}", field, value));
1239 } else if value.is_array() {
1240 if self.params.json[field].is_empty() {
1241 let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1242 values.push(format!("`{}`='{}'", field, array));
1243 } else {
1244 let json = value.to_string();
1245 let json = json.replace("'", "''");
1246 values.push(format!("`{}`='{}'", field, json));
1247 }
1248 continue;
1249 } else if value.is_object() {
1250 if self.params.json[field].is_empty() {
1251 values.push(format!("`{}`='{}'", field, value));
1252 } else {
1253 if value.is_empty() {
1254 values.push(format!("`{}`=''", field));
1255 continue;
1256 }
1257 let json = value.to_string();
1258 let json = json.replace("'", "''");
1259 values.push(format!("`{}`='{}'", field, json));
1260 }
1261 continue;
1262 } else if value.is_boolean() {
1263 values.push(format!("`{}`= {}", field, value));
1264 } else {
1265 values.push(format!("`{}`=\"{}\"", field, value));
1266 }
1267 }
1268
1269 for (field, value) in self.params.inc_dec.entries() {
1270 values.push(format!("{} = {}", field, value.to_string().clone()));
1271 }
1272
1273 let values = values.join(",");
1274
1275 let sql = format!(
1276 "UPDATE {} SET {} {};",
1277 self.params.table.clone(),
1278 values,
1279 self.params.where_sql()
1280 );
1281 if self.params.sql {
1282 return JsonValue::from(sql.clone());
1283 }
1284 let (state, data) = self.execute(sql.as_str());
1285 if state {
1286 data
1287 } else {
1288 let thread_id = format!("{:?}", thread::current().id());
1289 error!("update: {} {:?} {}", thread_id, data, sql);
1290 0.into()
1291 }
1292 }
1293 fn update_all(&mut self, data: JsonValue) -> JsonValue {
1294 let mut values = vec![];
1295
1296 let mut ids = vec![];
1297 for (field, _) in data[0].entries() {
1298 if field == "id" {
1299 continue;
1300 }
1301 let mut fields = vec![];
1302 for row in data.members() {
1303 let value = row[field].clone();
1304 let id = row["id"].clone();
1305 ids.push(id.clone());
1306 if value.is_string() {
1307 fields.push(format!(
1308 "WHEN '{}' THEN '{}'",
1309 id,
1310 value.to_string().replace("'", "''")
1311 ));
1312 } else if value.is_array() || value.is_object() {
1313 if self.params.json[field].is_empty() {
1314 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1315 } else {
1316 let json = value.to_string();
1317 let json = json.replace("'", "''");
1318 fields.push(format!("WHEN '{}' THEN '{}'", id, json));
1319 }
1320 continue;
1321 } else if value.is_number() || value.is_boolean() || value.is_null() {
1322 fields.push(format!("WHEN '{}' THEN {}", id, value));
1323 } else {
1324 fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1325 }
1326 }
1327 values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1328 }
1329 self.where_and("id", "in", ids.into());
1330 for (field, value) in self.params.inc_dec.entries() {
1331 values.push(format!("{} = {}", field, value.to_string().clone()));
1332 }
1333
1334 let values = values.join(",");
1335 let sql = format!(
1336 "UPDATE {} SET {} {} {};",
1337 self.params.table.clone(),
1338 values,
1339 self.params.where_sql(),
1340 self.params.page_limit_sql()
1341 );
1342 if self.params.sql {
1343 return JsonValue::from(sql.clone());
1344 }
1345 let (state, data) = self.execute(sql.as_str());
1346 if state {
1347 data
1348 } else {
1349 error!("update_all: {:?}", data);
1350 JsonValue::from(0)
1351 }
1352 }
1353 fn delete(&mut self) -> JsonValue {
1354 let sql = format!(
1355 "delete FROM {} {} {};",
1356 self.params.table.clone(),
1357 self.params.where_sql(),
1358 self.params.page_limit_sql()
1359 );
1360 if self.params.sql {
1361 return JsonValue::from(sql.clone());
1362 }
1363 let (state, data) = self.execute(sql.as_str());
1364 match state {
1365 true => data,
1366 false => {
1367 error!("delete 失败>>> {:?}", data);
1368 JsonValue::from(0)
1369 }
1370 }
1371 }
1372 fn field(&mut self, field: &str) -> &mut Self {
1373 let list: Vec<&str> = field.split(",").collect();
1374 let join_table = if self.params.join_table.is_empty() {
1375 self.params.table.clone()
1376 } else {
1377 self.params.join_table.clone()
1378 };
1379 for item in list.iter() {
1380 if item.contains(" as ") {
1381 let text = item.split(" as ").collect::<Vec<&str>>();
1382 if text[0].contains("count(") {
1383 self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
1384 } else {
1385 self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
1386 }
1387 } else {
1388 self.params.fields[item.to_string().as_str()] = format!("{}.`{}`", join_table, item).into();
1389 }
1390 }
1391 self
1392 }
1393
1394 fn hidden(&mut self, name: &str) -> &mut Self {
1395 let hidden: Vec<&str> = name.split(",").collect();
1396
1397 let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
1398
1399 let mut data = array![];
1400 for item in fields_list.members() {
1401 data.push(object! {
1402 "name":item["COLUMN_NAME"].as_str().unwrap()
1403 }).unwrap();
1404 }
1405
1406 for item in data.members() {
1407 let name = item["name"].as_str().unwrap();
1408 if !hidden.contains(&name) {
1409 self.params.fields[name] = name.into();
1410 }
1411 }
1412 self
1413 }
1414
1415 fn transaction(&mut self) -> bool {
1416 let thread_id = format!("{:?}", thread::current().id());
1417
1418 if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1419 let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1420 t += 1;
1421 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1422 return true;
1423 }
1424 TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1425
1426 let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1427
1428 let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1429 Ok(e) => e,
1430 Err(err) => {
1431 error!("query 超时: {}", err);
1432 return false;
1433 }
1434 };
1435 let key = format!("{}{}", self.default, thread_id);
1436 TR.lock().unwrap().insert(key.clone(), conn);
1437
1438 let (state, _) = self.query(sql.as_str());
1439 match state {
1440 true => state,
1441 false => {
1442 TR.lock().unwrap().remove(&*key);
1443 TRANS.lock().unwrap().remove(&*thread_id.clone());
1444 state
1445 }
1446 }
1447 }
1448 fn commit(&mut self) -> bool {
1449 let thread_id = format!("{:?}", thread::current().id());
1450 let sql = "COMMIT".to_string();
1451
1452 let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1453 if t > 1 {
1454 t -= 1;
1455 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1456 return true;
1457 }
1458 let (state, data) = self.query(sql.as_str());
1459 TRANS.lock().unwrap().remove(&thread_id);
1460 let key = format!("{}{}", self.default, thread_id);
1461 TR.lock().unwrap().remove(&*key);
1462
1463 let t = TRANS_TABLE.lock().unwrap().clone();
1464 for (key, value) in t.iter() {
1465 if value.clone() == thread_id {
1466 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1467 }
1468 }
1469
1470 match state {
1471 true => {}
1472 false => {
1473 error!("提交事务失败: {}", data);
1474 }
1475 }
1476 state
1477 }
1478
1479 fn rollback(&mut self) -> bool {
1480 let thread_id = format!("{:?}", thread::current().id());
1481 let sql = "ROLLBACK".to_string();
1482
1483 let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1484 if t > 1 {
1485 t -= 1;
1486 TRANS.lock().unwrap().insert(thread_id.clone(), t);
1487 return true;
1488 }
1489 let (state, data) = self.query(sql.as_str());
1490 TRANS.lock().unwrap().remove(&thread_id);
1491 let key = format!("{}{}", self.default, thread_id);
1492 TR.lock().unwrap().remove(&*key);
1493
1494 let t = TRANS_TABLE.lock().unwrap().clone();
1495 for (key, value) in t.iter() {
1496 if value.clone() == thread_id {
1497 TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1498 }
1499 }
1500
1501 match state {
1502 true => {}
1503 false => {
1504 error!("回滚失败: {}", data);
1505 }
1506 }
1507 state
1508 }
1509
1510 fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1511 let (state, data) = self.query(sql);
1512 match state {
1513 true => Ok(data),
1514 false => Err(data.to_string()),
1515 }
1516 }
1517
1518 fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1519 let (state, data) = self.execute(sql);
1520 match state {
1521 true => Ok(data),
1522 false => Err(data.to_string()),
1523 }
1524 }
1525
1526 fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1527 self.params.inc_dec[field] = format!("`{}` + {}", field, num).into();
1528 self
1529 }
1530
1531 fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1532 self.params.inc_dec[field] = format!("`{}` - {}", field, num).into();
1533 self
1534 }
1535 fn buildsql(&mut self) -> String {
1536 self.fetch_sql();
1537 let sql = self.select().to_string();
1538 format!("( {} ) `{}`", sql, self.params.table)
1539 }
1540
1541 fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1542 let main_fields = if main_fields.is_empty() {
1543 "id"
1544 } else {
1545 main_fields
1546 };
1547 let right_fields = if right_fields.is_empty() {
1548 self.params.table.clone()
1549 } else {
1550 right_fields.to_string().clone()
1551 };
1552 self.params.join_table = table.to_string();
1553 self.params.join.push(format!(
1554 " LEFT JOIN {} ON {}.{} = {}.{}",
1555 table, self.params.table, main_fields, table, right_fields
1556 ));
1557 self
1558 }
1559
1560 fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1561 let main_fields = if main_fields.is_empty() {
1562 "id"
1563 } else {
1564 main_fields
1565 };
1566 let second_fields = if second_fields.is_empty() {
1567 self.params.table.clone()
1568 } else {
1569 second_fields.to_string().clone()
1570 };
1571 let sec_table_name = format!("{}{}", table, "_2");
1572 let second_table = format!("{} {}", table, sec_table_name.clone());
1573 self.params.join_table = sec_table_name.clone();
1574 self.params.join.push(format!(
1575 " INNER JOIN {} ON {}.{} = {}.{}",
1576 second_table, self.params.table, main_fields, sec_table_name, second_fields
1577 ));
1578 self
1579 }
1580}