1use crate::optimizer_integration::StorageBackend;
64use crate::soch_ql::SochValue as QuerySochValue;
65use crate::sql::ast::*;
66use crate::sql::bridge::{ExecutionResult, SqlConnection};
67use crate::sql::error::{SqlError, SqlResult};
68use sochdb_core::SochValue as CoreSochValue;
69use sochdb_storage::{Database, KernelTxnHandle};
70use std::collections::HashMap;
71use std::sync::Arc;
72
73pub fn convert_core_to_query(value: &CoreSochValue) -> QuerySochValue {
83 match value {
84 CoreSochValue::Null => QuerySochValue::Null,
85 CoreSochValue::Bool(b) => QuerySochValue::Bool(*b),
86 CoreSochValue::Int(i) => QuerySochValue::Int(*i),
87 CoreSochValue::UInt(u) => QuerySochValue::UInt(*u),
88 CoreSochValue::Float(f) => QuerySochValue::Float(*f),
89 CoreSochValue::Text(s) => QuerySochValue::Text(s.clone()),
90 CoreSochValue::Binary(b) => QuerySochValue::Binary(b.clone()),
91 CoreSochValue::Array(arr) => {
92 QuerySochValue::Array(arr.iter().map(convert_core_to_query).collect())
93 }
94 CoreSochValue::Object(map) => {
95 match serde_json::to_string(map) {
97 Ok(json) => QuerySochValue::Text(json),
98 Err(_) => QuerySochValue::Text(format!("{:?}", map)),
99 }
100 }
101 CoreSochValue::Ref { table, id } => QuerySochValue::Text(format!("{}/{}", table, id)),
102 }
103}
104
105pub fn convert_query_to_core(value: &QuerySochValue) -> CoreSochValue {
107 match value {
108 QuerySochValue::Null => CoreSochValue::Null,
109 QuerySochValue::Bool(b) => CoreSochValue::Bool(*b),
110 QuerySochValue::Int(i) => CoreSochValue::Int(*i),
111 QuerySochValue::UInt(u) => CoreSochValue::UInt(*u),
112 QuerySochValue::Float(f) => CoreSochValue::Float(*f),
113 QuerySochValue::Text(s) => CoreSochValue::Text(s.clone()),
114 QuerySochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
115 QuerySochValue::Array(arr) => {
116 CoreSochValue::Array(arr.iter().map(convert_query_to_core).collect())
117 }
118 }
119}
120
121fn convert_row_core_to_query(
123 row: HashMap<String, CoreSochValue>,
124) -> HashMap<String, QuerySochValue> {
125 row.into_iter()
126 .map(|(k, v)| (k, convert_core_to_query(&v)))
127 .collect()
128}
129
130fn convert_rows_core_to_query(
132 rows: Vec<HashMap<String, CoreSochValue>>,
133) -> Vec<HashMap<String, QuerySochValue>> {
134 rows.into_iter().map(convert_row_core_to_query).collect()
135}
136
137pub struct DatabaseStorageBackend {
156 db: Arc<Database>,
157}
158
159impl DatabaseStorageBackend {
160 pub fn new(db: Arc<Database>) -> Self {
162 Self { db }
163 }
164
165 pub fn database(&self) -> &Arc<Database> {
167 &self.db
168 }
169
170 fn with_read_txn<F, T>(&self, f: F) -> sochdb_core::Result<T>
172 where
173 F: FnOnce(KernelTxnHandle) -> sochdb_core::Result<T>,
174 {
175 let txn = self.db.begin_read_only_fast();
176 let result = f(txn);
177 self.db.abort_read_only_fast(txn);
178 result
179 }
180}
181
182impl StorageBackend for DatabaseStorageBackend {
183 fn table_scan(
184 &self,
185 table: &str,
186 columns: &[String],
187 predicate: Option<&str>,
188 ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
189 self.with_read_txn(|txn| {
190 let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
191
192 let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
193 self.db.query(txn, table)
194 } else {
195 self.db.query(txn, table).columns(&col_refs)
196 };
197
198 let result = query.execute()?;
199 let mut rows = convert_rows_core_to_query(result.rows);
200
201 if let Some(pred) = predicate {
203 rows = apply_simple_predicate(&rows, pred);
204 }
205
206 Ok(rows)
207 })
208 }
209
210 fn primary_key_lookup(
211 &self,
212 table: &str,
213 key: &QuerySochValue,
214 ) -> sochdb_core::Result<Option<HashMap<String, QuerySochValue>>> {
215 let row_id = match key {
216 QuerySochValue::Int(i) => *i as u64,
217 QuerySochValue::UInt(u) => *u,
218 _ => return Ok(None),
219 };
220
221 self.with_read_txn(|txn| {
222 let result = self.db.read_row(txn, table, row_id, None)?;
223 Ok(result.map(convert_row_core_to_query))
224 })
225 }
226
227 fn secondary_index_seek(
228 &self,
229 table: &str,
230 index: &str,
231 key: &QuerySochValue,
232 ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
233 let column_name = index.to_string();
235 let core_key = convert_query_to_core(key);
236
237 self.with_read_txn(|txn| {
238 let result = self.db.query(txn, table).execute()?;
239 let rows: Vec<HashMap<String, QuerySochValue>> = result
240 .rows
241 .into_iter()
242 .filter(|row| {
243 row.get(&column_name)
244 .map(|v| v == &core_key)
245 .unwrap_or(false)
246 })
247 .map(convert_row_core_to_query)
248 .collect();
249 Ok(rows)
250 })
251 }
252
253 fn time_index_scan(
254 &self,
255 table: &str,
256 start_us: u64,
257 end_us: u64,
258 ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
259 self.with_read_txn(|txn| {
260 let result = self.db.query(txn, table).execute()?;
261 let rows: Vec<HashMap<String, QuerySochValue>> = result
262 .rows
263 .into_iter()
264 .filter(|row| {
265 if let Some(CoreSochValue::UInt(ts)) = row.get("_timestamp") {
266 *ts >= start_us && *ts <= end_us
267 } else if let Some(CoreSochValue::Int(ts)) = row.get("_timestamp") {
268 let ts = *ts as u64;
269 ts >= start_us && ts <= end_us
270 } else {
271 false
272 }
273 })
274 .map(convert_row_core_to_query)
275 .collect();
276 Ok(rows)
277 })
278 }
279
280 fn vector_search(
281 &self,
282 table: &str,
283 query: &[f32],
284 k: usize,
285 ) -> sochdb_core::Result<Vec<(f32, HashMap<String, QuerySochValue>)>> {
286 self.with_read_txn(|txn| {
287 let result = self.db.query(txn, table).execute()?;
288
289 let mut scored: Vec<(f32, HashMap<String, CoreSochValue>)> = result
290 .rows
291 .into_iter()
292 .filter_map(|row| {
293 let vec_col = row.get("_vector").or_else(|| row.get("_embedding"));
294
295 if let Some(CoreSochValue::Array(arr)) = vec_col {
296 let vec: Vec<f32> = arr
297 .iter()
298 .filter_map(|v| match v {
299 CoreSochValue::Float(f) => Some(*f as f32),
300 CoreSochValue::Int(i) => Some(*i as f32),
301 _ => None,
302 })
303 .collect();
304
305 if vec.len() == query.len() {
306 let dist = euclidean_distance(&vec, query);
307 Some((dist, row))
308 } else {
309 None
310 }
311 } else {
312 None
313 }
314 })
315 .collect();
316
317 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
318 scored.truncate(k);
319
320 Ok(scored
321 .into_iter()
322 .map(|(dist, row)| (dist, convert_row_core_to_query(row)))
323 .collect())
324 })
325 }
326
327 fn row_count(&self, table: &str) -> usize {
328 let txn = self.db.begin_read_only_fast();
329 let count = self
330 .db
331 .query(txn, table)
332 .execute()
333 .map(|r| r.rows_scanned)
334 .unwrap_or(0);
335 self.db.abort_read_only_fast(txn);
336 count
337 }
338}
339
340pub struct DatabaseSqlConnection {
356 db: Arc<Database>,
357 active_txn: Option<KernelTxnHandle>,
358 explicit_txn: bool,
360 next_row_ids: HashMap<String, u64>,
362}
363
364impl DatabaseSqlConnection {
365 pub fn new(db: Arc<Database>) -> Self {
367 Self {
368 db,
369 active_txn: None,
370 explicit_txn: false,
371 next_row_ids: HashMap::new(),
372 }
373 }
374
375 pub fn database(&self) -> &Arc<Database> {
377 &self.db
378 }
379
380 fn ensure_write_txn(&mut self) -> SqlResult<KernelTxnHandle> {
382 if let Some(txn) = self.active_txn {
383 Ok(txn)
384 } else {
385 let txn = self
386 .db
387 .begin_transaction()
388 .map_err(|e| SqlError::ExecutionError(format!("Failed to begin txn: {}", e)))?;
389 self.active_txn = Some(txn);
390 Ok(txn)
391 }
392 }
393
394 fn auto_commit_if_implicit(&mut self) -> SqlResult<()> {
397 if self.explicit_txn {
398 return Ok(()); }
400 if let Some(txn) = self.active_txn.take() {
401 self.db
402 .commit(txn)
403 .map_err(|e| SqlError::ExecutionError(format!("Commit failed: {}", e)))?;
404 }
405 Ok(())
406 }
407
408 fn next_row_id(&mut self, table: &str) -> u64 {
410 let counter = self.next_row_ids.entry(table.to_string()).or_insert(0);
411 *counter += 1;
412 *counter
413 }
414
415 fn init_row_id_counter(&mut self, table: &str) {
417 if self.next_row_ids.contains_key(table) {
418 return;
419 }
420 let txn = self.db.begin_read_only_fast();
421 let max_id = self
422 .db
423 .query(txn, table)
424 .execute()
425 .map(|r| r.rows_scanned as u64)
426 .unwrap_or(0);
427 self.db.abort_read_only_fast(txn);
428 self.next_row_ids.insert(table.to_string(), max_id);
429 }
430
431 fn eval_expr(
433 &self,
434 expr: &Expr,
435 row: &HashMap<String, CoreSochValue>,
436 params: &[CoreSochValue],
437 ) -> Option<CoreSochValue> {
438 match expr {
439 Expr::Column(col_ref) => {
440 if let Some(ref tbl) = col_ref.table {
442 let qualified = format!("{}.{}", tbl, col_ref.column);
443 if let Some(v) = row.get(&qualified) {
444 return Some(v.clone());
445 }
446 }
447 let col_name = &col_ref.column;
449 row.get(col_name).cloned()
450 }
451 Expr::Literal(lit) => Some(literal_to_core(lit)),
452 Expr::Placeholder(idx) => params.get((*idx as usize).saturating_sub(1)).cloned(),
453 Expr::BinaryOp { left, op, right } => {
454 let lhs = self.eval_expr(left, row, params)?;
455 let rhs = self.eval_expr(right, row, params)?;
456 Some(eval_binary_op(&lhs, op, &rhs))
457 }
458 Expr::UnaryOp { op, expr: inner } => {
459 let val = self.eval_expr(inner, row, params)?;
460 Some(eval_unary_op(op, &val))
461 }
462 Expr::IsNull {
463 expr: inner,
464 negated,
465 } => {
466 let val = self.eval_expr(inner, row, params)?;
467 let is_null = matches!(val, CoreSochValue::Null);
468 Some(CoreSochValue::Bool(if *negated {
469 !is_null
470 } else {
471 is_null
472 }))
473 }
474 Expr::Between {
475 expr: inner,
476 low,
477 high,
478 negated,
479 } => {
480 let val = self.eval_expr(inner, row, params)?;
481 let lo = self.eval_expr(low, row, params)?;
482 let hi = self.eval_expr(high, row, params)?;
483 let in_range = compare_values(&val, &lo) != std::cmp::Ordering::Less
484 && compare_values(&val, &hi) != std::cmp::Ordering::Greater;
485 Some(CoreSochValue::Bool(if *negated {
486 !in_range
487 } else {
488 in_range
489 }))
490 }
491 Expr::InList {
492 expr: inner,
493 list,
494 negated,
495 } => {
496 let val = self.eval_expr(inner, row, params)?;
497 let found = list
498 .iter()
499 .any(|item| self.eval_expr(item, row, params) == Some(val.clone()));
500 Some(CoreSochValue::Bool(if *negated { !found } else { found }))
501 }
502 Expr::Like {
503 expr: inner,
504 pattern,
505 negated,
506 ..
507 } => {
508 let val = self.eval_expr(inner, row, params)?;
509 let pat = self.eval_expr(pattern, row, params)?;
510 if let (CoreSochValue::Text(s), CoreSochValue::Text(p)) = (&val, &pat) {
511 let matched = sql_like_match(s, p);
512 Some(CoreSochValue::Bool(if *negated {
513 !matched
514 } else {
515 matched
516 }))
517 } else {
518 Some(CoreSochValue::Bool(false))
519 }
520 }
521 Expr::Function(func_call) => {
522 let func_name = func_call.name.name().to_uppercase();
523 match func_name.as_str() {
524 "UPPER" => {
525 let val = func_call
526 .args
527 .first()
528 .and_then(|a| self.eval_expr(a, row, params))?;
529 if let CoreSochValue::Text(s) = val {
530 Some(CoreSochValue::Text(s.to_uppercase()))
531 } else {
532 Some(CoreSochValue::Null)
533 }
534 }
535 "LOWER" => {
536 let val = func_call
537 .args
538 .first()
539 .and_then(|a| self.eval_expr(a, row, params))?;
540 if let CoreSochValue::Text(s) = val {
541 Some(CoreSochValue::Text(s.to_lowercase()))
542 } else {
543 Some(CoreSochValue::Null)
544 }
545 }
546 "LENGTH" | "LEN" => {
547 let val = func_call
548 .args
549 .first()
550 .and_then(|a| self.eval_expr(a, row, params))?;
551 if let CoreSochValue::Text(s) = val {
552 Some(CoreSochValue::Int(s.len() as i64))
553 } else {
554 Some(CoreSochValue::Null)
555 }
556 }
557 "COALESCE" => {
558 for arg in &func_call.args {
559 if let Some(val) = self.eval_expr(arg, row, params) {
560 if !matches!(val, CoreSochValue::Null) {
561 return Some(val);
562 }
563 }
564 }
565 Some(CoreSochValue::Null)
566 }
567 _ => func_call
569 .args
570 .first()
571 .and_then(|a| self.eval_expr(a, row, params)),
572 }
573 }
574 _ => None,
575 }
576 }
577
578 fn row_matches(
580 &self,
581 expr: &Expr,
582 row: &HashMap<String, CoreSochValue>,
583 params: &[CoreSochValue],
584 ) -> bool {
585 match self.eval_expr(expr, row, params) {
586 Some(CoreSochValue::Bool(b)) => b,
587 _ => false,
588 }
589 }
590
591 fn find_row_id(
595 &self,
596 table: &str,
597 target_row: &HashMap<String, CoreSochValue>,
598 txn: KernelTxnHandle,
599 ) -> SqlResult<Option<u64>> {
600 let entries = self
601 .db
602 .scan(txn, table.as_bytes())
603 .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)))?;
604
605 for (key_bytes, _value_bytes) in entries {
606 if let Ok(key_str) = String::from_utf8(key_bytes) {
607 let parts: Vec<&str> = key_str.split('/').collect();
609 if parts.len() == 2 {
610 if let Ok(row_id) = parts[1].parse::<u64>() {
611 if let Ok(Some(row)) = self.db.read_row(txn, table, row_id, None) {
612 if rows_equal(&row, target_row) {
613 return Ok(Some(row_id));
614 }
615 }
616 }
617 }
618 }
619 }
620
621 Ok(None)
622 }
623}
624
625impl SqlConnection for DatabaseSqlConnection {
626 fn select(
627 &self,
628 table: &str,
629 columns: &[String],
630 where_clause: Option<&Expr>,
631 order_by: &[OrderByItem],
632 limit: Option<usize>,
633 offset: Option<usize>,
634 params: &[CoreSochValue],
635 ) -> SqlResult<ExecutionResult> {
636 let txn = self.db.begin_read_only_fast();
637
638 let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
639 self.db.query(txn, table)
640 } else {
641 let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
642 self.db.query(txn, table).columns(&col_refs)
643 };
644
645 let result = query
646 .execute()
647 .map_err(|e| SqlError::ExecutionError(format!("Query failed: {}", e)));
648
649 self.db.abort_read_only_fast(txn);
650
651 let result = result?;
652 let mut rows = result.rows;
653
654 if let Some(expr) = where_clause {
656 rows.retain(|row| self.row_matches(expr, row, params));
657 }
658
659 if !order_by.is_empty() {
661 rows.sort_by(|a, b| {
662 for item in order_by {
663 let col_name = extract_order_by_column(&item.expr);
664 let va = a.get(&col_name);
665 let vb = b.get(&col_name);
666 let cmp = compare_optional_values(va, vb);
667 let cmp = if !item.asc { cmp.reverse() } else { cmp };
668 if cmp != std::cmp::Ordering::Equal {
669 return cmp;
670 }
671 }
672 std::cmp::Ordering::Equal
673 });
674 }
675
676 if let Some(off) = offset {
678 rows = rows.into_iter().skip(off).collect();
679 }
680
681 if let Some(lim) = limit {
683 rows.truncate(lim);
684 }
685
686 let result_columns = if columns.is_empty() || columns.iter().any(|c| c == "*") {
688 rows.first()
689 .map(|r| r.keys().cloned().collect())
690 .unwrap_or_default()
691 } else {
692 columns.to_vec()
693 };
694
695 Ok(ExecutionResult::Rows {
696 columns: result_columns,
697 rows,
698 })
699 }
700
701 fn insert(
702 &mut self,
703 table: &str,
704 columns: Option<&[String]>,
705 rows: &[Vec<Expr>],
706 _on_conflict: Option<&OnConflict>,
707 params: &[CoreSochValue],
708 ) -> SqlResult<ExecutionResult> {
709 let txn = self.ensure_write_txn()?;
710 self.init_row_id_counter(table);
711
712 let schema = self.db.get_table_schema(table);
713 let col_names: Vec<String> = if let Some(cols) = columns {
714 cols.to_vec()
715 } else if let Some(ref s) = schema {
716 s.columns.iter().map(|c| c.name.clone()).collect()
717 } else {
718 return Err(SqlError::InvalidArgument(
719 "INSERT requires column names when table has no schema".into(),
720 ));
721 };
722
723 let mut inserted = 0;
724 for row_exprs in rows {
725 let row_id = self.next_row_id(table);
726 let mut values = HashMap::new();
727
728 for (i, expr) in row_exprs.iter().enumerate() {
729 if i < col_names.len() {
730 let value = match expr {
731 Expr::Literal(lit) => literal_to_core(lit),
732 Expr::Placeholder(idx) => params
733 .get((*idx as usize).saturating_sub(1))
734 .cloned()
735 .unwrap_or(CoreSochValue::Null),
736 _ => CoreSochValue::Null,
737 };
738 values.insert(col_names[i].clone(), value);
739 }
740 }
741
742 self.db
743 .insert_row(txn, table, row_id, &values)
744 .map_err(|e| SqlError::ExecutionError(format!("Insert failed: {}", e)))?;
745 inserted += 1;
746 }
747
748 self.auto_commit_if_implicit()?;
749 Ok(ExecutionResult::RowsAffected(inserted))
750 }
751
752 fn update(
753 &mut self,
754 table: &str,
755 assignments: &[Assignment],
756 where_clause: Option<&Expr>,
757 params: &[CoreSochValue],
758 ) -> SqlResult<ExecutionResult> {
759 let txn = self.ensure_write_txn()?;
760
761 let result = self
762 .db
763 .query(txn, table)
764 .execute()
765 .map_err(|e| SqlError::ExecutionError(format!("Scan for update failed: {}", e)))?;
766
767 let mut updated = 0;
768
769 for row in &result.rows {
770 let matches = match where_clause {
771 Some(expr) => self.row_matches(expr, row, params),
772 None => true,
773 };
774
775 if matches {
776 let row_id = self.find_row_id(table, row, txn)?;
777 if let Some(row_id) = row_id {
778 let mut new_values = row.clone();
779 for assignment in assignments {
780 let col_name = assignment.column.clone();
781 let value = match &assignment.value {
782 Expr::Literal(lit) => literal_to_core(lit),
783 Expr::Placeholder(idx) => params
784 .get((*idx as usize).saturating_sub(1))
785 .cloned()
786 .unwrap_or(CoreSochValue::Null),
787 _ => self
788 .eval_expr(&assignment.value, row, params)
789 .unwrap_or(CoreSochValue::Null),
790 };
791 new_values.insert(col_name, value);
792 }
793
794 self.db
795 .insert_row(txn, table, row_id, &new_values)
796 .map_err(|e| SqlError::ExecutionError(format!("Update failed: {}", e)))?;
797 updated += 1;
798 }
799 }
800 }
801
802 self.auto_commit_if_implicit()?;
803 Ok(ExecutionResult::RowsAffected(updated))
804 }
805
806 fn delete(
807 &mut self,
808 table: &str,
809 where_clause: Option<&Expr>,
810 params: &[CoreSochValue],
811 ) -> SqlResult<ExecutionResult> {
812 let txn = self.ensure_write_txn()?;
813
814 let result = self
815 .db
816 .query(txn, table)
817 .execute()
818 .map_err(|e| SqlError::ExecutionError(format!("Scan for delete failed: {}", e)))?;
819
820 let mut deleted = 0;
821
822 for row in &result.rows {
823 let matches = match where_clause {
824 Some(expr) => self.row_matches(expr, row, params),
825 None => true,
826 };
827
828 if matches {
829 if let Some(row_id) = self.find_row_id(table, row, txn)? {
830 let key = format!("{}/{}", table, row_id);
831 self.db
832 .delete(txn, key.as_bytes())
833 .map_err(|e| SqlError::ExecutionError(format!("Delete failed: {}", e)))?;
834 deleted += 1;
835 }
836 }
837 }
838
839 self.auto_commit_if_implicit()?;
840 Ok(ExecutionResult::RowsAffected(deleted))
841 }
842
843 fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
844 use sochdb_storage::DbColumnDef;
845 use sochdb_storage::DbTableSchema;
846
847 let table_name = stmt.name.name().to_string();
848
849 if self.db.get_table_schema(&table_name).is_some() {
850 if stmt.if_not_exists {
851 return Ok(ExecutionResult::Ok);
852 }
853 return Err(SqlError::InvalidArgument(format!(
854 "Table '{}' already exists",
855 table_name
856 )));
857 }
858
859 let columns: Vec<DbColumnDef> = stmt
860 .columns
861 .iter()
862 .map(|col| {
863 let col_type = sql_type_to_db_type(&col.data_type);
864 let nullable = !col
865 .constraints
866 .iter()
867 .any(|c| matches!(c, ColumnConstraint::NotNull));
868 DbColumnDef {
869 name: col.name.clone(),
870 col_type,
871 nullable,
872 }
873 })
874 .collect();
875
876 let schema = DbTableSchema {
877 name: table_name,
878 columns,
879 };
880
881 self.db
882 .register_table(schema)
883 .map_err(|e| SqlError::ExecutionError(format!("Create table failed: {}", e)))?;
884
885 Ok(ExecutionResult::Ok)
886 }
887
888 fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
889 let table_name = stmt
890 .names
891 .first()
892 .map(|n| n.name().to_string())
893 .unwrap_or_default();
894
895 if self.db.get_table_schema(&table_name).is_none() {
896 if stmt.if_exists {
897 return Ok(ExecutionResult::Ok);
898 }
899 return Err(SqlError::TableNotFound(table_name));
900 }
901 Ok(ExecutionResult::Ok)
904 }
905
906 fn create_index(&mut self, _stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
907 Ok(ExecutionResult::Ok)
909 }
910
911 fn drop_index(&mut self, _stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
912 Ok(ExecutionResult::Ok)
913 }
914
915 fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
916 use sochdb_storage::DbColumnDef;
917
918 let table_name = stmt.name.name().to_string();
919 let mut schema = self
920 .db
921 .get_table_schema(&table_name)
922 .ok_or_else(|| SqlError::TableNotFound(table_name.clone()))?;
923
924 let original_name = table_name.clone();
925
926 for op in &stmt.operations {
927 match op {
928 AlterTableOp::AddColumn(col_def) => {
929 if schema.columns.iter().any(|c| c.name == col_def.name) {
931 return Err(SqlError::InvalidArgument(format!(
932 "Column '{}' already exists in table '{}'",
933 col_def.name, schema.name
934 )));
935 }
936 let col_type = sql_type_to_db_type(&col_def.data_type);
937 let nullable = !col_def
938 .constraints
939 .iter()
940 .any(|c| matches!(c, ColumnConstraint::NotNull));
941 schema.columns.push(DbColumnDef {
942 name: col_def.name.clone(),
943 col_type,
944 nullable,
945 });
946 }
947 AlterTableOp::DropColumn { name, .. } => {
948 let idx = schema
949 .columns
950 .iter()
951 .position(|c| c.name == *name)
952 .ok_or_else(|| {
953 SqlError::InvalidArgument(format!(
954 "Column '{}' not found in table '{}'",
955 name, schema.name
956 ))
957 })?;
958 schema.columns.remove(idx);
959 }
960 AlterTableOp::RenameColumn { old_name, new_name } => {
961 let col = schema
962 .columns
963 .iter_mut()
964 .find(|c| c.name == *old_name)
965 .ok_or_else(|| {
966 SqlError::InvalidArgument(format!(
967 "Column '{}' not found in table '{}'",
968 old_name, schema.name
969 ))
970 })?;
971 col.name = new_name.clone();
972 }
973 AlterTableOp::RenameTable(new_name) => {
974 schema.name = new_name.name().to_string();
975 }
976 AlterTableOp::AlterColumn { name, operation } => {
977 let col = schema
978 .columns
979 .iter_mut()
980 .find(|c| c.name == *name)
981 .ok_or_else(|| {
982 SqlError::InvalidArgument(format!(
983 "Column '{}' not found in table '{}'",
984 name, schema.name
985 ))
986 })?;
987 match operation {
988 AlterColumnOp::SetType(data_type) => {
989 col.col_type = sql_type_to_db_type(data_type);
990 }
991 AlterColumnOp::SetNotNull => {
992 col.nullable = false;
993 }
994 AlterColumnOp::DropNotNull => {
995 col.nullable = true;
996 }
997 AlterColumnOp::SetDefault(_) | AlterColumnOp::DropDefault => {
998 }
1001 }
1002 }
1003 AlterTableOp::AddConstraint(_) | AlterTableOp::DropConstraint { .. } => {
1004 return Err(SqlError::NotImplemented(
1005 "ADD/DROP CONSTRAINT not yet implemented".into(),
1006 ));
1007 }
1008 }
1009 }
1010
1011 self.db
1012 .update_table_schema(&original_name, schema)
1013 .map_err(|e| SqlError::ExecutionError(format!("ALTER TABLE failed: {}", e)))?;
1014
1015 Ok(ExecutionResult::Ok)
1016 }
1017
1018 fn begin(&mut self, _stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1019 if self.active_txn.is_some() {
1020 return Err(SqlError::TransactionError(
1021 "Transaction already active".into(),
1022 ));
1023 }
1024 let txn = self
1025 .db
1026 .begin_transaction()
1027 .map_err(|e| SqlError::ExecutionError(format!("Begin failed: {}", e)))?;
1028 self.active_txn = Some(txn);
1029 self.explicit_txn = true;
1030 Ok(ExecutionResult::TransactionOk)
1031 }
1032
1033 fn commit(&mut self) -> SqlResult<ExecutionResult> {
1034 if let Some(txn) = self.active_txn.take() {
1035 self.explicit_txn = false;
1036 self.db
1037 .commit(txn)
1038 .map_err(|e| SqlError::TransactionError(format!("Commit failed: {}", e)))?;
1039 Ok(ExecutionResult::TransactionOk)
1040 } else {
1041 Err(SqlError::TransactionError("No active transaction".into()))
1042 }
1043 }
1044
1045 fn rollback(&mut self, _savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1046 if let Some(txn) = self.active_txn.take() {
1047 self.explicit_txn = false;
1048 self.db
1049 .abort(txn)
1050 .map_err(|e| SqlError::TransactionError(format!("Rollback failed: {}", e)))?;
1051 Ok(ExecutionResult::TransactionOk)
1052 } else {
1053 Err(SqlError::TransactionError("No active transaction".into()))
1054 }
1055 }
1056
1057 fn table_exists(&self, table: &str) -> SqlResult<bool> {
1058 Ok(self.db.get_table_schema(table).is_some())
1059 }
1060
1061 fn index_exists(&self, _index: &str) -> SqlResult<bool> {
1062 Ok(false)
1064 }
1065
1066 fn scan_all(
1067 &self,
1068 table: &str,
1069 columns: &[String],
1070 ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1071 let txn = self.db.begin_read_only_fast();
1072
1073 let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
1074 self.db.query(txn, table)
1075 } else {
1076 let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
1077 self.db.query(txn, table).columns(&col_refs)
1078 };
1079
1080 let result = query
1081 .execute()
1082 .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)));
1083
1084 self.db.abort_read_only_fast(txn);
1085 Ok(result?.rows)
1086 }
1087
1088 fn eval_join_predicate(
1089 &self,
1090 expr: &Expr,
1091 row: &HashMap<String, CoreSochValue>,
1092 params: &[CoreSochValue],
1093 ) -> Option<bool> {
1094 let val = self.eval_expr(expr, row, params)?;
1095 match val {
1096 CoreSochValue::Bool(b) => Some(b),
1097 CoreSochValue::Null => Some(false),
1098 _ => Some(false),
1099 }
1100 }
1101}
1102
1103fn apply_simple_predicate(
1112 rows: &[HashMap<String, QuerySochValue>],
1113 predicate: &str,
1114) -> Vec<HashMap<String, QuerySochValue>> {
1115 let operators = [">=", "<=", "!=", "=", ">", "<"];
1116
1117 for op in &operators {
1118 if let Some(idx) = predicate.find(op) {
1119 let column = predicate[..idx].trim();
1120 let value_str = predicate[idx + op.len()..].trim().trim_matches('\'');
1121
1122 return rows
1123 .iter()
1124 .filter(|row| {
1125 if let Some(val) = row.get(column) {
1126 let val_str = match val {
1127 QuerySochValue::Text(s) => s.clone(),
1128 QuerySochValue::Int(i) => i.to_string(),
1129 QuerySochValue::UInt(u) => u.to_string(),
1130 QuerySochValue::Float(f) => f.to_string(),
1131 QuerySochValue::Bool(b) => b.to_string(),
1132 _ => return false,
1133 };
1134
1135 match *op {
1136 "=" => val_str == value_str,
1137 "!=" => val_str != value_str,
1138 ">" => val_str.as_str() > value_str,
1139 "<" => (val_str.as_str()) < value_str,
1140 ">=" => val_str.as_str() >= value_str,
1141 "<=" => val_str.as_str() <= value_str,
1142 _ => false,
1143 }
1144 } else {
1145 false
1146 }
1147 })
1148 .cloned()
1149 .collect();
1150 }
1151 }
1152
1153 rows.to_vec()
1154}
1155
1156fn euclidean_distance(a: &[f32], b: &[f32]) -> f32 {
1158 a.iter()
1159 .zip(b.iter())
1160 .map(|(x, y)| (x - y) * (x - y))
1161 .sum::<f32>()
1162 .sqrt()
1163}
1164
1165fn literal_to_core(lit: &Literal) -> CoreSochValue {
1167 match lit {
1168 Literal::Integer(i) => CoreSochValue::Int(*i),
1169 Literal::Float(f) => CoreSochValue::Float(*f),
1170 Literal::String(s) => CoreSochValue::Text(s.clone()),
1171 Literal::Boolean(b) => CoreSochValue::Bool(*b),
1172 Literal::Null => CoreSochValue::Null,
1173 Literal::Blob(b) => CoreSochValue::Binary(b.clone()),
1174 }
1175}
1176
1177fn eval_binary_op(lhs: &CoreSochValue, op: &BinaryOperator, rhs: &CoreSochValue) -> CoreSochValue {
1179 match op {
1180 BinaryOperator::Eq => CoreSochValue::Bool(lhs == rhs),
1181 BinaryOperator::Ne => CoreSochValue::Bool(lhs != rhs),
1182 BinaryOperator::Lt => {
1183 CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Less)
1184 }
1185 BinaryOperator::Gt => {
1186 CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Greater)
1187 }
1188 BinaryOperator::Le => {
1189 CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Greater)
1190 }
1191 BinaryOperator::Ge => {
1192 CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Less)
1193 }
1194 BinaryOperator::And => {
1195 let a = matches!(lhs, CoreSochValue::Bool(true));
1196 let b = matches!(rhs, CoreSochValue::Bool(true));
1197 CoreSochValue::Bool(a && b)
1198 }
1199 BinaryOperator::Or => {
1200 let a = matches!(lhs, CoreSochValue::Bool(true));
1201 let b = matches!(rhs, CoreSochValue::Bool(true));
1202 CoreSochValue::Bool(a || b)
1203 }
1204 BinaryOperator::Plus => numeric_op(lhs, rhs, |a, b| a + b, |a, b| a + b),
1205 BinaryOperator::Minus => numeric_op(lhs, rhs, |a, b| a - b, |a, b| a - b),
1206 BinaryOperator::Multiply => numeric_op(lhs, rhs, |a, b| a * b, |a, b| a * b),
1207 BinaryOperator::Divide => {
1212 if is_numeric_zero(rhs) {
1213 CoreSochValue::Null
1214 } else {
1215 numeric_op(lhs, rhs, |a, b| a / b, |a, b| a / b)
1216 }
1217 }
1218 BinaryOperator::Modulo => {
1219 if is_numeric_zero(rhs) {
1220 CoreSochValue::Null
1221 } else {
1222 numeric_op(lhs, rhs, |a, b| a % b, |a, b| a % b)
1223 }
1224 }
1225 BinaryOperator::Like => {
1226 if let (CoreSochValue::Text(s), CoreSochValue::Text(pattern)) = (lhs, rhs) {
1227 CoreSochValue::Bool(sql_like_match(s, pattern))
1228 } else {
1229 CoreSochValue::Bool(false)
1230 }
1231 }
1232 BinaryOperator::Concat => {
1233 let a = value_to_string(lhs);
1234 let b = value_to_string(rhs);
1235 CoreSochValue::Text(format!("{}{}", a, b))
1236 }
1237 _ => CoreSochValue::Null,
1238 }
1239}
1240
1241fn eval_unary_op(op: &UnaryOperator, val: &CoreSochValue) -> CoreSochValue {
1243 match op {
1244 UnaryOperator::Not => match val {
1245 CoreSochValue::Bool(b) => CoreSochValue::Bool(!b),
1246 _ => CoreSochValue::Null,
1247 },
1248 UnaryOperator::Minus => match val {
1249 CoreSochValue::Int(i) => CoreSochValue::Int(-i),
1250 CoreSochValue::Float(f) => CoreSochValue::Float(-f),
1251 _ => CoreSochValue::Null,
1252 },
1253 UnaryOperator::Plus => val.clone(),
1254 _ => CoreSochValue::Null,
1255 }
1256}
1257
1258fn is_numeric_zero(v: &CoreSochValue) -> bool {
1262 match v {
1263 CoreSochValue::Int(0) | CoreSochValue::UInt(0) => true,
1264 CoreSochValue::Float(f) => *f == 0.0,
1265 _ => false,
1266 }
1267}
1268
1269fn numeric_op(
1270 lhs: &CoreSochValue,
1271 rhs: &CoreSochValue,
1272 int_op: impl Fn(i64, i64) -> i64,
1273 float_op: impl Fn(f64, f64) -> f64,
1274) -> CoreSochValue {
1275 match (lhs, rhs) {
1276 (CoreSochValue::Int(a), CoreSochValue::Int(b)) => CoreSochValue::Int(int_op(*a, *b)),
1277 (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1278 CoreSochValue::Float(float_op(*a, *b))
1279 }
1280 (CoreSochValue::Int(a), CoreSochValue::Float(b)) => {
1281 CoreSochValue::Float(float_op(*a as f64, *b))
1282 }
1283 (CoreSochValue::Float(a), CoreSochValue::Int(b)) => {
1284 CoreSochValue::Float(float_op(*a, *b as f64))
1285 }
1286 (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => {
1287 CoreSochValue::Int(int_op(*a as i64, *b as i64))
1288 }
1289 _ => CoreSochValue::Null,
1290 }
1291}
1292
1293fn compare_values(a: &CoreSochValue, b: &CoreSochValue) -> std::cmp::Ordering {
1295 match (a, b) {
1296 (CoreSochValue::Int(a), CoreSochValue::Int(b)) => a.cmp(b),
1297 (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => a.cmp(b),
1298 (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1299 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
1300 }
1301 (CoreSochValue::Text(a), CoreSochValue::Text(b)) => a.cmp(b),
1302 (CoreSochValue::Int(a), CoreSochValue::Float(b)) => (*a as f64)
1303 .partial_cmp(b)
1304 .unwrap_or(std::cmp::Ordering::Equal),
1305 (CoreSochValue::Float(a), CoreSochValue::Int(b)) => a
1306 .partial_cmp(&(*b as f64))
1307 .unwrap_or(std::cmp::Ordering::Equal),
1308 (CoreSochValue::Null, CoreSochValue::Null) => std::cmp::Ordering::Equal,
1309 (CoreSochValue::Null, _) => std::cmp::Ordering::Less,
1310 (_, CoreSochValue::Null) => std::cmp::Ordering::Greater,
1311 _ => std::cmp::Ordering::Equal,
1312 }
1313}
1314
1315fn compare_optional_values(
1317 a: Option<&CoreSochValue>,
1318 b: Option<&CoreSochValue>,
1319) -> std::cmp::Ordering {
1320 match (a, b) {
1321 (Some(a), Some(b)) => compare_values(a, b),
1322 (None, Some(_)) => std::cmp::Ordering::Less,
1323 (Some(_), None) => std::cmp::Ordering::Greater,
1324 (None, None) => std::cmp::Ordering::Equal,
1325 }
1326}
1327
1328fn extract_order_by_column(expr: &Expr) -> String {
1330 match expr {
1331 Expr::Column(col_ref) => col_ref.column.clone(),
1332 _ => String::new(),
1333 }
1334}
1335
1336fn rows_equal(a: &HashMap<String, CoreSochValue>, b: &HashMap<String, CoreSochValue>) -> bool {
1338 if a.len() != b.len() {
1339 return false;
1340 }
1341 a.iter().all(|(k, v)| b.get(k) == Some(v))
1342}
1343
1344fn value_to_string(v: &CoreSochValue) -> String {
1346 match v {
1347 CoreSochValue::Text(s) => s.clone(),
1348 CoreSochValue::Int(i) => i.to_string(),
1349 CoreSochValue::UInt(u) => u.to_string(),
1350 CoreSochValue::Float(f) => f.to_string(),
1351 CoreSochValue::Bool(b) => b.to_string(),
1352 CoreSochValue::Null => "NULL".to_string(),
1353 _ => String::new(),
1354 }
1355}
1356
1357fn sql_like_match(s: &str, pattern: &str) -> bool {
1362 crate::like::like_match(s, pattern)
1363}
1364
1365fn sql_type_to_db_type(dt: &DataType) -> sochdb_storage::DbColumnType {
1367 use sochdb_storage::DbColumnType;
1368 match dt {
1369 DataType::TinyInt | DataType::SmallInt | DataType::Int | DataType::BigInt => {
1370 DbColumnType::Int64
1371 }
1372 DataType::Float | DataType::Double | DataType::Decimal { .. } => DbColumnType::Float64,
1373 DataType::Boolean => DbColumnType::Bool,
1374 DataType::Binary(_) | DataType::Varbinary(_) | DataType::Blob => DbColumnType::Binary,
1375 _ => DbColumnType::Text,
1377 }
1378}
1379
1380pub struct NamespacedSqlConnection<C: SqlConnection> {
1402 inner: C,
1403 namespace: String,
1404 database: String,
1405}
1406
1407impl<C: SqlConnection> NamespacedSqlConnection<C> {
1408 pub fn new(inner: C, namespace: impl Into<String>, database: impl Into<String>) -> Self {
1410 Self {
1411 inner,
1412 namespace: namespace.into(),
1413 database: database.into(),
1414 }
1415 }
1416
1417 fn prefix_table(&self, table: &str) -> String {
1419 format!("{}:{}:{}", self.namespace, self.database, table)
1420 }
1421
1422 pub fn namespace(&self) -> &str {
1424 &self.namespace
1425 }
1426
1427 pub fn database(&self) -> &str {
1429 &self.database
1430 }
1431
1432 pub fn inner(&self) -> &C {
1434 &self.inner
1435 }
1436
1437 pub fn inner_mut(&mut self) -> &mut C {
1439 &mut self.inner
1440 }
1441}
1442
1443impl<C: SqlConnection> SqlConnection for NamespacedSqlConnection<C> {
1444 fn select(
1445 &self,
1446 table: &str,
1447 columns: &[String],
1448 where_clause: Option<&Expr>,
1449 order_by: &[OrderByItem],
1450 limit: Option<usize>,
1451 offset: Option<usize>,
1452 params: &[CoreSochValue],
1453 ) -> SqlResult<ExecutionResult> {
1454 self.inner.select(
1455 &self.prefix_table(table),
1456 columns,
1457 where_clause,
1458 order_by,
1459 limit,
1460 offset,
1461 params,
1462 )
1463 }
1464
1465 fn insert(
1466 &mut self,
1467 table: &str,
1468 columns: Option<&[String]>,
1469 rows: &[Vec<Expr>],
1470 on_conflict: Option<&OnConflict>,
1471 params: &[CoreSochValue],
1472 ) -> SqlResult<ExecutionResult> {
1473 self.inner.insert(
1474 &self.prefix_table(table),
1475 columns,
1476 rows,
1477 on_conflict,
1478 params,
1479 )
1480 }
1481
1482 fn update(
1483 &mut self,
1484 table: &str,
1485 assignments: &[Assignment],
1486 where_clause: Option<&Expr>,
1487 params: &[CoreSochValue],
1488 ) -> SqlResult<ExecutionResult> {
1489 self.inner
1490 .update(&self.prefix_table(table), assignments, where_clause, params)
1491 }
1492
1493 fn delete(
1494 &mut self,
1495 table: &str,
1496 where_clause: Option<&Expr>,
1497 params: &[CoreSochValue],
1498 ) -> SqlResult<ExecutionResult> {
1499 self.inner
1500 .delete(&self.prefix_table(table), where_clause, params)
1501 }
1502
1503 fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
1504 let mut prefixed = stmt.clone();
1506 let original_name = stmt.name.name().to_string();
1507 prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1508 self.inner.create_table(&prefixed)
1509 }
1510
1511 fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
1512 let mut prefixed = stmt.clone();
1513 prefixed.names = stmt
1514 .names
1515 .iter()
1516 .map(|n| ObjectName::new(self.prefix_table(n.name())))
1517 .collect();
1518 self.inner.drop_table(&prefixed)
1519 }
1520
1521 fn create_index(&mut self, stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
1522 self.inner.create_index(stmt)
1523 }
1524
1525 fn drop_index(&mut self, stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
1526 self.inner.drop_index(stmt)
1527 }
1528
1529 fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
1530 let mut prefixed = stmt.clone();
1531 let original_name = stmt.name.name().to_string();
1532 prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1533 self.inner.alter_table(&prefixed)
1534 }
1535
1536 fn begin(&mut self, stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1537 self.inner.begin(stmt)
1538 }
1539
1540 fn commit(&mut self) -> SqlResult<ExecutionResult> {
1541 self.inner.commit()
1542 }
1543
1544 fn rollback(&mut self, savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1545 self.inner.rollback(savepoint)
1546 }
1547
1548 fn table_exists(&self, table: &str) -> SqlResult<bool> {
1549 self.inner.table_exists(&self.prefix_table(table))
1550 }
1551
1552 fn index_exists(&self, index: &str) -> SqlResult<bool> {
1553 self.inner.index_exists(index)
1554 }
1555
1556 fn scan_all(
1557 &self,
1558 table: &str,
1559 columns: &[String],
1560 ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1561 self.inner.scan_all(&self.prefix_table(table), columns)
1562 }
1563
1564 fn eval_join_predicate(
1565 &self,
1566 expr: &Expr,
1567 row: &HashMap<String, CoreSochValue>,
1568 params: &[CoreSochValue],
1569 ) -> Option<bool> {
1570 self.inner.eval_join_predicate(expr, row, params)
1571 }
1572}
1573
1574#[cfg(test)]
1579mod tests {
1580 use super::*;
1581
1582 #[test]
1583 #[allow(clippy::approx_constant)] fn test_convert_core_to_query_basic_types() {
1585 assert_eq!(
1586 convert_core_to_query(&CoreSochValue::Null),
1587 QuerySochValue::Null
1588 );
1589 assert_eq!(
1590 convert_core_to_query(&CoreSochValue::Bool(true)),
1591 QuerySochValue::Bool(true)
1592 );
1593 assert_eq!(
1594 convert_core_to_query(&CoreSochValue::Int(42)),
1595 QuerySochValue::Int(42)
1596 );
1597 assert_eq!(
1598 convert_core_to_query(&CoreSochValue::UInt(100)),
1599 QuerySochValue::UInt(100)
1600 );
1601 assert_eq!(
1602 convert_core_to_query(&CoreSochValue::Float(3.14)),
1603 QuerySochValue::Float(3.14)
1604 );
1605 assert_eq!(
1606 convert_core_to_query(&CoreSochValue::Text("hello".into())),
1607 QuerySochValue::Text("hello".into())
1608 );
1609 }
1610
1611 #[test]
1612 fn test_convert_core_to_query_object() {
1613 let mut map = HashMap::new();
1614 map.insert("name".to_string(), CoreSochValue::Text("Alice".into()));
1615 let result = convert_core_to_query(&CoreSochValue::Object(map));
1616 match result {
1617 QuerySochValue::Text(s) => assert!(s.contains("Alice")),
1618 _ => panic!("Expected Text for Object conversion"),
1619 }
1620 }
1621
1622 #[test]
1623 fn test_convert_core_to_query_ref() {
1624 let result = convert_core_to_query(&CoreSochValue::Ref {
1625 table: "users".into(),
1626 id: 42,
1627 });
1628 assert_eq!(result, QuerySochValue::Text("users/42".into()));
1629 }
1630
1631 #[test]
1632 fn test_convert_roundtrip() {
1633 let original = QuerySochValue::Int(42);
1634 let core = convert_query_to_core(&original);
1635 let back = convert_core_to_query(&core);
1636 assert_eq!(original, back);
1637 }
1638
1639 #[test]
1640 fn test_apply_simple_predicate_eq() {
1641 let rows = vec![
1642 {
1643 let mut m = HashMap::new();
1644 m.insert("name".into(), QuerySochValue::Text("Alice".into()));
1645 m.insert("age".into(), QuerySochValue::Int(30));
1646 m
1647 },
1648 {
1649 let mut m = HashMap::new();
1650 m.insert("name".into(), QuerySochValue::Text("Bob".into()));
1651 m.insert("age".into(), QuerySochValue::Int(25));
1652 m
1653 },
1654 ];
1655
1656 let filtered = apply_simple_predicate(&rows, "name = Alice");
1657 assert_eq!(filtered.len(), 1);
1658 assert_eq!(
1659 filtered[0].get("name"),
1660 Some(&QuerySochValue::Text("Alice".into()))
1661 );
1662 }
1663
1664 #[test]
1665 fn test_apply_simple_predicate_neq() {
1666 let rows = vec![
1667 {
1668 let mut m = HashMap::new();
1669 m.insert("status".into(), QuerySochValue::Text("active".into()));
1670 m
1671 },
1672 {
1673 let mut m = HashMap::new();
1674 m.insert("status".into(), QuerySochValue::Text("inactive".into()));
1675 m
1676 },
1677 ];
1678
1679 let filtered = apply_simple_predicate(&rows, "status != active");
1680 assert_eq!(filtered.len(), 1);
1681 assert_eq!(
1682 filtered[0].get("status"),
1683 Some(&QuerySochValue::Text("inactive".into()))
1684 );
1685 }
1686
1687 #[test]
1688 #[allow(clippy::approx_constant)] fn test_literal_to_core() {
1690 assert_eq!(
1691 literal_to_core(&Literal::Integer(42)),
1692 CoreSochValue::Int(42)
1693 );
1694 assert_eq!(
1695 literal_to_core(&Literal::Float(3.14)),
1696 CoreSochValue::Float(3.14)
1697 );
1698 assert_eq!(
1699 literal_to_core(&Literal::String("hi".into())),
1700 CoreSochValue::Text("hi".into())
1701 );
1702 assert_eq!(
1703 literal_to_core(&Literal::Boolean(true)),
1704 CoreSochValue::Bool(true)
1705 );
1706 assert_eq!(literal_to_core(&Literal::Null), CoreSochValue::Null);
1707 }
1708
1709 #[test]
1710 fn test_euclidean_distance() {
1711 let a = vec![1.0, 0.0, 0.0];
1712 let b = vec![0.0, 1.0, 0.0];
1713 let dist = euclidean_distance(&a, &b);
1714 assert!((dist - std::f32::consts::SQRT_2).abs() < 1e-6);
1715 }
1716
1717 #[test]
1718 fn test_compare_values() {
1719 assert_eq!(
1720 compare_values(&CoreSochValue::Int(1), &CoreSochValue::Int(2)),
1721 std::cmp::Ordering::Less
1722 );
1723 assert_eq!(
1724 compare_values(
1725 &CoreSochValue::Text("a".into()),
1726 &CoreSochValue::Text("b".into())
1727 ),
1728 std::cmp::Ordering::Less
1729 );
1730 assert_eq!(
1731 compare_values(&CoreSochValue::Null, &CoreSochValue::Int(1)),
1732 std::cmp::Ordering::Less
1733 );
1734 }
1735
1736 #[test]
1737 fn test_eval_binary_op() {
1738 assert_eq!(
1739 eval_binary_op(
1740 &CoreSochValue::Int(10),
1741 &BinaryOperator::Plus,
1742 &CoreSochValue::Int(5)
1743 ),
1744 CoreSochValue::Int(15)
1745 );
1746 assert_eq!(
1747 eval_binary_op(
1748 &CoreSochValue::Int(10),
1749 &BinaryOperator::Eq,
1750 &CoreSochValue::Int(10)
1751 ),
1752 CoreSochValue::Bool(true)
1753 );
1754 assert_eq!(
1755 eval_binary_op(
1756 &CoreSochValue::Text("hello".into()),
1757 &BinaryOperator::Concat,
1758 &CoreSochValue::Text(" world".into())
1759 ),
1760 CoreSochValue::Text("hello world".into())
1761 );
1762 }
1763
1764 #[test]
1765 fn test_sql_like_match() {
1766 assert!(sql_like_match("hello", "hello"));
1767 assert!(sql_like_match("hello", "%llo"));
1768 assert!(sql_like_match("hello", "h%o"));
1769 assert!(sql_like_match("hello", "h_llo"));
1770 assert!(!sql_like_match("hello", "world"));
1771 assert!(sql_like_match("file.txt", "file%"));
1773 assert!(sql_like_match("test(1)", "%(%"));
1774 }
1775
1776 #[test]
1777 fn test_sql_type_to_db_type() {
1778 use sochdb_storage::DbColumnType;
1779 assert_eq!(sql_type_to_db_type(&DataType::Int), DbColumnType::Int64);
1780 assert_eq!(sql_type_to_db_type(&DataType::BigInt), DbColumnType::Int64);
1781 assert_eq!(sql_type_to_db_type(&DataType::Float), DbColumnType::Float64);
1782 assert_eq!(sql_type_to_db_type(&DataType::Boolean), DbColumnType::Bool);
1783 assert_eq!(sql_type_to_db_type(&DataType::Blob), DbColumnType::Binary);
1784 assert_eq!(sql_type_to_db_type(&DataType::Text), DbColumnType::Text);
1785 assert_eq!(
1786 sql_type_to_db_type(&DataType::Varchar(Some(255))),
1787 DbColumnType::Text
1788 );
1789 }
1790
1791 fn setup_test_db() -> (std::sync::Arc<sochdb_storage::Database>, tempfile::TempDir) {
1796 let tmp = tempfile::tempdir().expect("tmpdir");
1797 let db = sochdb_storage::Database::open(tmp.path()).expect("open db");
1798 (db, tmp)
1799 }
1800
1801 #[test]
1802 fn test_integration_storage_backend_table_scan() {
1803 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1804 let (db, _tmp) = setup_test_db();
1805
1806 db.register_table(DbTableSchema {
1808 name: "users".into(),
1809 columns: vec![
1810 DbColumnDef {
1811 name: "id".into(),
1812 col_type: DbColumnType::Int64,
1813 nullable: false,
1814 },
1815 DbColumnDef {
1816 name: "name".into(),
1817 col_type: DbColumnType::Text,
1818 nullable: false,
1819 },
1820 DbColumnDef {
1821 name: "age".into(),
1822 col_type: DbColumnType::Int64,
1823 nullable: true,
1824 },
1825 ],
1826 })
1827 .expect("register table");
1828
1829 let txn = db.begin_transaction().expect("begin txn");
1831 let mut vals = std::collections::HashMap::new();
1832 vals.insert("id".into(), CoreSochValue::Int(1));
1833 vals.insert("name".into(), CoreSochValue::Text("Alice".into()));
1834 vals.insert("age".into(), CoreSochValue::Int(30));
1835 db.insert_row(txn, "users", 1, &vals).expect("insert 1");
1836
1837 vals.clear();
1838 vals.insert("id".into(), CoreSochValue::Int(2));
1839 vals.insert("name".into(), CoreSochValue::Text("Bob".into()));
1840 vals.insert("age".into(), CoreSochValue::Int(25));
1841 db.insert_row(txn, "users", 2, &vals).expect("insert 2");
1842 db.commit(txn).expect("commit");
1843
1844 let backend = DatabaseStorageBackend::new(db.clone());
1846 let rows = backend
1847 .table_scan("users", &["id".into(), "name".into(), "age".into()], None)
1848 .expect("table_scan");
1849
1850 assert_eq!(rows.len(), 2);
1851 let names: Vec<_> = rows
1853 .iter()
1854 .filter_map(|r| match r.get("name") {
1855 Some(crate::soch_ql::SochValue::Text(t)) => Some(t.clone()),
1856 _ => None,
1857 })
1858 .collect();
1859 assert!(names.contains(&"Alice".to_string()));
1860 assert!(names.contains(&"Bob".to_string()));
1861 }
1862
1863 #[test]
1864 fn test_integration_storage_backend_primary_key_lookup() {
1865 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1866 let (db, _tmp) = setup_test_db();
1867
1868 db.register_table(DbTableSchema {
1869 name: "items".into(),
1870 columns: vec![
1871 DbColumnDef {
1872 name: "id".into(),
1873 col_type: DbColumnType::Int64,
1874 nullable: false,
1875 },
1876 DbColumnDef {
1877 name: "label".into(),
1878 col_type: DbColumnType::Text,
1879 nullable: false,
1880 },
1881 ],
1882 })
1883 .expect("register");
1884
1885 let txn = db.begin_transaction().expect("txn");
1886 let mut v = std::collections::HashMap::new();
1887 v.insert("id".into(), CoreSochValue::Int(42));
1888 v.insert("label".into(), CoreSochValue::Text("answer".into()));
1889 db.insert_row(txn, "items", 42, &v).expect("insert");
1890 db.commit(txn).expect("commit");
1891
1892 let backend = DatabaseStorageBackend::new(db.clone());
1893 let row = backend
1894 .primary_key_lookup("items", &crate::soch_ql::SochValue::Int(42))
1895 .expect("pk lookup");
1896 assert!(row.is_some());
1897 let row = row.unwrap();
1898 assert_eq!(
1899 row.get("label"),
1900 Some(&crate::soch_ql::SochValue::Text("answer".into()))
1901 );
1902 }
1903
1904 #[test]
1905 fn test_integration_sochql_executor_reads_storage() {
1906 use sochdb_core::{Catalog, SochSchema, SochType};
1907 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1908 let (db, _tmp) = setup_test_db();
1909
1910 db.register_table(DbTableSchema {
1912 name: "events".into(),
1913 columns: vec![
1914 DbColumnDef {
1915 name: "id".into(),
1916 col_type: DbColumnType::Int64,
1917 nullable: false,
1918 },
1919 DbColumnDef {
1920 name: "kind".into(),
1921 col_type: DbColumnType::Text,
1922 nullable: false,
1923 },
1924 DbColumnDef {
1925 name: "score".into(),
1926 col_type: DbColumnType::Float64,
1927 nullable: true,
1928 },
1929 ],
1930 })
1931 .expect("register events");
1932
1933 let txn = db.begin_transaction().expect("txn");
1935 for i in 1..=5u64 {
1936 let mut vals = std::collections::HashMap::new();
1937 vals.insert("id".into(), CoreSochValue::Int(i as i64));
1938 vals.insert("kind".into(), CoreSochValue::Text(format!("event_{}", i)));
1939 vals.insert("score".into(), CoreSochValue::Float(i as f64 * 1.5));
1940 db.insert_row(txn, "events", i, &vals).expect("insert");
1941 }
1942 db.commit(txn).expect("commit");
1943
1944 let mut catalog = Catalog::new("test");
1946 let schema = SochSchema {
1947 name: "events".into(),
1948 fields: vec![
1949 sochdb_core::SochField {
1950 name: "id".into(),
1951 field_type: SochType::Int,
1952 nullable: false,
1953 default: None,
1954 },
1955 sochdb_core::SochField {
1956 name: "kind".into(),
1957 field_type: SochType::Text,
1958 nullable: false,
1959 default: None,
1960 },
1961 sochdb_core::SochField {
1962 name: "score".into(),
1963 field_type: SochType::Float,
1964 nullable: true,
1965 default: None,
1966 },
1967 ],
1968 primary_key: None,
1969 indexes: vec![],
1970 };
1971 catalog.create_table(schema, 0).expect("register catalog");
1972
1973 let backend = std::sync::Arc::new(DatabaseStorageBackend::new(db.clone()));
1975 let executor = crate::soch_ql_executor::SochQlExecutor::with_storage(backend);
1976 let result = executor
1977 .execute("SELECT * FROM events", &catalog)
1978 .expect("select *");
1979
1980 assert_eq!(
1982 result.rows.len(),
1983 5,
1984 "Expected 5 rows from storage, got {}",
1985 result.rows.len()
1986 );
1987 assert_eq!(result.columns, vec!["id", "kind", "score"]);
1988
1989 let first_row_kind = &result.rows[0];
1991 assert_eq!(first_row_kind.len(), 3);
1993 }
1994
1995 #[test]
1996 fn test_integration_sql_connection_crud() {
1997 use crate::sql::bridge::SqlConnection;
1998
1999 let (db, _tmp) = setup_test_db();
2000 let mut conn = DatabaseSqlConnection::new(db.clone());
2001
2002 let create = CreateTableStmt {
2004 span: crate::sql::token::Span::new(0, 0, 0, 0),
2005 name: crate::sql::ast::ObjectName::new("products"),
2006 columns: vec![
2007 crate::sql::ast::ColumnDef {
2008 name: "id".into(),
2009 data_type: DataType::Int,
2010 constraints: vec![],
2011 },
2012 crate::sql::ast::ColumnDef {
2013 name: "name".into(),
2014 data_type: DataType::Text,
2015 constraints: vec![],
2016 },
2017 crate::sql::ast::ColumnDef {
2018 name: "price".into(),
2019 data_type: DataType::Float,
2020 constraints: vec![],
2021 },
2022 ],
2023 if_not_exists: false,
2024 constraints: vec![],
2025 options: vec![],
2026 };
2027 let result = conn.create_table(&create).expect("create table");
2028 assert!(matches!(result, crate::sql::bridge::ExecutionResult::Ok));
2029
2030 let begin_stmt = crate::sql::ast::BeginStmt {
2032 isolation_level: None,
2033 read_only: false,
2034 };
2035 let result = conn.begin(&begin_stmt).expect("begin");
2036 assert!(matches!(
2037 result,
2038 crate::sql::bridge::ExecutionResult::TransactionOk
2039 ));
2040
2041 let values = vec![vec![
2043 Expr::Literal(Literal::Integer(1)),
2044 Expr::Literal(Literal::String("Widget".into())),
2045 Expr::Literal(Literal::Float(9.99)),
2046 ]];
2047 let cols = vec!["id".into(), "name".into(), "price".into()];
2048 let result = conn
2049 .insert("products", Some(&cols), &values, None, &[])
2050 .expect("insert");
2051 match &result {
2052 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2053 _ => panic!("Expected RowsAffected, got {:?}", result),
2054 }
2055
2056 let result = conn.commit().expect("commit");
2058 assert!(matches!(
2059 result,
2060 crate::sql::bridge::ExecutionResult::TransactionOk
2061 ));
2062
2063 let columns = vec!["name".into()];
2065 let result = conn
2066 .select("products", &columns, None, &[], None, None, &[])
2067 .expect("select");
2068 match result {
2069 crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
2070 assert!(!rows.is_empty(), "Expected rows from SELECT");
2071 assert_eq!(columns, vec!["name"]);
2072 let name = rows[0].get("name").expect("name column");
2074 match name {
2075 CoreSochValue::Text(s) => assert_eq!(s, "Widget"),
2076 other => panic!("Expected Text, got {:?}", other),
2077 }
2078 }
2079 other => panic!("Expected Rows, got {:?}", other),
2080 }
2081 }
2082
2083 #[test]
2084 fn test_integration_row_count() {
2085 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
2086 let (db, _tmp) = setup_test_db();
2087
2088 db.register_table(DbTableSchema {
2089 name: "metrics".into(),
2090 columns: vec![
2091 DbColumnDef {
2092 name: "ts".into(),
2093 col_type: DbColumnType::UInt64,
2094 nullable: false,
2095 },
2096 DbColumnDef {
2097 name: "val".into(),
2098 col_type: DbColumnType::Float64,
2099 nullable: false,
2100 },
2101 ],
2102 })
2103 .expect("register");
2104
2105 let txn = db.begin_transaction().expect("txn");
2106 for i in 0..10u64 {
2107 let mut v = std::collections::HashMap::new();
2108 v.insert("ts".into(), CoreSochValue::UInt(i));
2109 v.insert("val".into(), CoreSochValue::Float(i as f64));
2110 db.insert_row(txn, "metrics", i, &v).expect("insert");
2111 }
2112 db.commit(txn).expect("commit");
2113
2114 let backend = DatabaseStorageBackend::new(db.clone());
2115 assert_eq!(backend.row_count("metrics"), 10);
2116 assert_eq!(backend.row_count("nonexistent"), 0);
2117 }
2118
2119 #[test]
2124 fn test_sqlbridge_create_insert_select() {
2125 let (db, _tmp) = setup_test_db();
2126 let conn = DatabaseSqlConnection::new(db.clone());
2127 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2128
2129 let r = bridge
2131 .execute("CREATE TABLE cities (id INT, name TEXT, pop FLOAT)")
2132 .unwrap();
2133 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2134
2135 let r = bridge
2137 .execute("INSERT INTO cities (id, name, pop) VALUES (1, 'Tokyo', 13.96)")
2138 .unwrap();
2139 match &r {
2140 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2141 other => panic!("Expected RowsAffected, got {:?}", other),
2142 }
2143
2144 let r = bridge
2145 .execute("INSERT INTO cities (id, name, pop) VALUES (2, 'Delhi', 11.03)")
2146 .unwrap();
2147 match &r {
2148 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2149 other => panic!("Expected RowsAffected, got {:?}", other),
2150 }
2151
2152 let r = bridge.execute("SELECT name, pop FROM cities").unwrap();
2154 match &r {
2155 crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
2156 assert_eq!(rows.len(), 2);
2157 assert!(columns.contains(&"name".to_string()));
2158 }
2159 other => panic!("Expected Rows, got {:?}", other),
2160 }
2161 }
2162
2163 #[test]
2164 fn test_sqlbridge_update_delete() {
2165 let (db, _tmp) = setup_test_db();
2166 let conn = DatabaseSqlConnection::new(db.clone());
2167 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2168
2169 bridge
2170 .execute("CREATE TABLE items (id INT, qty INT)")
2171 .unwrap();
2172 bridge
2173 .execute("INSERT INTO items (id, qty) VALUES (1, 10)")
2174 .unwrap();
2175 bridge
2176 .execute("INSERT INTO items (id, qty) VALUES (2, 20)")
2177 .unwrap();
2178 bridge
2179 .execute("INSERT INTO items (id, qty) VALUES (3, 30)")
2180 .unwrap();
2181
2182 let r = bridge
2184 .execute("UPDATE items SET qty = 99 WHERE id = 2")
2185 .unwrap();
2186 match &r {
2187 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2188 other => panic!("Expected RowsAffected for UPDATE, got {:?}", other),
2189 }
2190
2191 let r = bridge.execute("DELETE FROM items WHERE id = 3").unwrap();
2193 match &r {
2194 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2195 other => panic!("Expected RowsAffected for DELETE, got {:?}", other),
2196 }
2197
2198 let r = bridge.execute("SELECT id, qty FROM items").unwrap();
2200 match &r {
2201 crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2202 assert_eq!(rows.len(), 2, "Expected 2 rows after delete");
2203 }
2204 other => panic!("Expected Rows, got {:?}", other),
2205 }
2206 }
2207
2208 #[test]
2209 fn test_sqlbridge_transaction_commit() {
2210 let (db, _tmp) = setup_test_db();
2211 let conn = DatabaseSqlConnection::new(db.clone());
2212 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2213
2214 bridge
2215 .execute("CREATE TABLE txtest (id INT, val TEXT)")
2216 .unwrap();
2217
2218 let r = bridge.execute("BEGIN").unwrap();
2220 assert!(matches!(
2221 r,
2222 crate::sql::bridge::ExecutionResult::TransactionOk
2223 ));
2224
2225 bridge
2226 .execute("INSERT INTO txtest (id, val) VALUES (1, 'committed')")
2227 .unwrap();
2228
2229 let r = bridge.execute("COMMIT").unwrap();
2230 assert!(matches!(
2231 r,
2232 crate::sql::bridge::ExecutionResult::TransactionOk
2233 ));
2234
2235 let r = bridge.execute("SELECT val FROM txtest").unwrap();
2237 match &r {
2238 crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2239 assert_eq!(rows.len(), 1);
2240 }
2241 other => panic!("Expected Rows, got {:?}", other),
2242 }
2243 }
2244
2245 #[test]
2246 fn test_sqlbridge_drop_table() {
2247 let (db, _tmp) = setup_test_db();
2248 let conn = DatabaseSqlConnection::new(db.clone());
2249 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2250
2251 bridge.execute("CREATE TABLE ephemeral (x INT)").unwrap();
2252 let r = bridge.execute("DROP TABLE ephemeral").unwrap();
2253 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2254
2255 let r = bridge.execute("DROP TABLE IF EXISTS ephemeral").unwrap();
2257 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2258 }
2259
2260 #[test]
2261 fn test_sqlbridge_if_not_exists() {
2262 let (db, _tmp) = setup_test_db();
2263 let conn = DatabaseSqlConnection::new(db.clone());
2264 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2265
2266 bridge.execute("CREATE TABLE dup (id INT)").unwrap();
2267 let r = bridge
2269 .execute("CREATE TABLE IF NOT EXISTS dup (id INT)")
2270 .unwrap();
2271 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2272 }
2273
2274 #[test]
2275 fn test_sqlbridge_alter_add_column() {
2276 let (db, _tmp) = setup_test_db();
2277 let conn = DatabaseSqlConnection::new(db.clone());
2278 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2279
2280 bridge
2281 .execute("CREATE TABLE alter_test (id INT, name TEXT)")
2282 .unwrap();
2283
2284 let r = bridge
2285 .execute("ALTER TABLE alter_test ADD COLUMN age INT")
2286 .unwrap();
2287 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2288
2289 let schema = db.get_table_schema("alter_test").unwrap();
2291 assert_eq!(schema.columns.len(), 3);
2292 assert_eq!(schema.columns[2].name, "age");
2293 }
2294
2295 #[test]
2296 fn test_sqlbridge_alter_drop_column() {
2297 let (db, _tmp) = setup_test_db();
2298 let conn = DatabaseSqlConnection::new(db.clone());
2299 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2300
2301 bridge
2302 .execute("CREATE TABLE drop_col_test (id INT, name TEXT, age INT)")
2303 .unwrap();
2304
2305 let r = bridge
2306 .execute("ALTER TABLE drop_col_test DROP COLUMN age")
2307 .unwrap();
2308 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2309
2310 let schema = db.get_table_schema("drop_col_test").unwrap();
2311 assert_eq!(schema.columns.len(), 2);
2312 assert!(schema.columns.iter().all(|c| c.name != "age"));
2313 }
2314
2315 #[test]
2316 fn test_sqlbridge_alter_rename_column() {
2317 let (db, _tmp) = setup_test_db();
2318 let conn = DatabaseSqlConnection::new(db.clone());
2319 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2320
2321 bridge
2322 .execute("CREATE TABLE rename_col_test (id INT, name TEXT)")
2323 .unwrap();
2324
2325 let r = bridge
2326 .execute("ALTER TABLE rename_col_test RENAME COLUMN name TO full_name")
2327 .unwrap();
2328 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2329
2330 let schema = db.get_table_schema("rename_col_test").unwrap();
2331 assert_eq!(schema.columns[1].name, "full_name");
2332 }
2333
2334 #[test]
2335 fn test_sqlbridge_alter_rename_table() {
2336 let (db, _tmp) = setup_test_db();
2337 let conn = DatabaseSqlConnection::new(db.clone());
2338 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2339
2340 bridge.execute("CREATE TABLE old_name (id INT)").unwrap();
2341
2342 let r = bridge
2343 .execute("ALTER TABLE old_name RENAME TO new_name")
2344 .unwrap();
2345 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2346
2347 assert!(db.get_table_schema("old_name").is_none());
2348 assert!(db.get_table_schema("new_name").is_some());
2349 }
2350
2351 #[test]
2352 fn test_sqlbridge_alter_multiple_ops() {
2353 let (db, _tmp) = setup_test_db();
2354 let conn = DatabaseSqlConnection::new(db.clone());
2355 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2356
2357 bridge
2358 .execute("CREATE TABLE multi_alter (id INT, a TEXT, b TEXT)")
2359 .unwrap();
2360
2361 let r = bridge
2363 .execute("ALTER TABLE multi_alter ADD COLUMN c INT, DROP COLUMN b")
2364 .unwrap();
2365 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2366
2367 let schema = db.get_table_schema("multi_alter").unwrap();
2368 let col_names: Vec<&str> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2369 assert_eq!(col_names, vec!["id", "a", "c"]);
2370 }
2371
2372 #[test]
2373 fn test_sqlbridge_alter_errors() {
2374 let (db, _tmp) = setup_test_db();
2375 let conn = DatabaseSqlConnection::new(db.clone());
2376 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2377
2378 bridge
2379 .execute("CREATE TABLE err_test (id INT, name TEXT)")
2380 .unwrap();
2381
2382 let r = bridge.execute("ALTER TABLE err_test ADD COLUMN name TEXT");
2384 assert!(r.is_err());
2385
2386 let r = bridge.execute("ALTER TABLE err_test DROP COLUMN nonexistent");
2388 assert!(r.is_err());
2389
2390 let r = bridge.execute("ALTER TABLE no_such_table ADD COLUMN x INT");
2392 assert!(r.is_err());
2393 }
2394
2395 fn setup_join_tables(bridge: &mut crate::sql::bridge::SqlBridge<DatabaseSqlConnection>) {
2401 bridge
2402 .execute("CREATE TABLE users (id INT, name TEXT, dept TEXT)")
2403 .unwrap();
2404 bridge
2405 .execute("INSERT INTO users (id, name, dept) VALUES (1, 'Alice', 'eng')")
2406 .unwrap();
2407 bridge
2408 .execute("INSERT INTO users (id, name, dept) VALUES (2, 'Bob', 'sales')")
2409 .unwrap();
2410 bridge
2411 .execute("INSERT INTO users (id, name, dept) VALUES (3, 'Carol', 'eng')")
2412 .unwrap();
2413
2414 bridge
2415 .execute("CREATE TABLE orders (oid INT, user_id INT, amount FLOAT)")
2416 .unwrap();
2417 bridge
2418 .execute("INSERT INTO orders (oid, user_id, amount) VALUES (10, 1, 99.50)")
2419 .unwrap();
2420 bridge
2421 .execute("INSERT INTO orders (oid, user_id, amount) VALUES (11, 1, 45.00)")
2422 .unwrap();
2423 bridge
2424 .execute("INSERT INTO orders (oid, user_id, amount) VALUES (12, 2, 200.00)")
2425 .unwrap();
2426 }
2428
2429 #[test]
2430 fn test_sqlbridge_inner_join() {
2431 let (db, _tmp) = setup_test_db();
2432 let conn = DatabaseSqlConnection::new(db.clone());
2433 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2434 setup_join_tables(&mut bridge);
2435
2436 let r = bridge.execute(
2437 "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id"
2438 ).unwrap();
2439
2440 let rows = r.rows().unwrap();
2441 assert_eq!(rows.len(), 3); let alice_rows: Vec<_> = rows
2445 .iter()
2446 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Alice".into())))
2447 .collect();
2448 assert_eq!(alice_rows.len(), 2);
2449
2450 let bob_rows: Vec<_> = rows
2452 .iter()
2453 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Bob".into())))
2454 .collect();
2455 assert_eq!(bob_rows.len(), 1);
2456
2457 let carol_rows: Vec<_> = rows
2459 .iter()
2460 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2461 .collect();
2462 assert_eq!(carol_rows.len(), 0);
2463 }
2464
2465 #[test]
2466 fn test_sqlbridge_left_join() {
2467 let (db, _tmp) = setup_test_db();
2468 let conn = DatabaseSqlConnection::new(db.clone());
2469 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2470 setup_join_tables(&mut bridge);
2471
2472 let r = bridge.execute(
2473 "SELECT users.name, orders.amount FROM users LEFT JOIN orders ON users.id = orders.user_id"
2474 ).unwrap();
2475
2476 let rows = r.rows().unwrap();
2477 assert_eq!(rows.len(), 4); let carol_rows: Vec<_> = rows
2481 .iter()
2482 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2483 .collect();
2484 assert_eq!(carol_rows.len(), 1);
2485 assert_eq!(carol_rows[0].get("amount"), Some(&CoreSochValue::Null));
2486 }
2487
2488 #[test]
2489 fn test_sqlbridge_right_join() {
2490 let (db, _tmp) = setup_test_db();
2491 let conn = DatabaseSqlConnection::new(db.clone());
2492 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2493 setup_join_tables(&mut bridge);
2494
2495 let r = bridge.execute(
2497 "SELECT users.name, orders.oid FROM users RIGHT JOIN orders ON users.id = orders.user_id"
2498 ).unwrap();
2499
2500 let rows = r.rows().unwrap();
2501 assert_eq!(rows.len(), 3); }
2503
2504 #[test]
2505 fn test_sqlbridge_cross_join() {
2506 let (db, _tmp) = setup_test_db();
2507 let conn = DatabaseSqlConnection::new(db.clone());
2508 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2509 setup_join_tables(&mut bridge);
2510
2511 let r = bridge
2512 .execute("SELECT users.name, orders.oid FROM users CROSS JOIN orders")
2513 .unwrap();
2514
2515 let rows = r.rows().unwrap();
2516 assert_eq!(rows.len(), 9); }
2518
2519 #[test]
2520 fn test_sqlbridge_join_with_where() {
2521 let (db, _tmp) = setup_test_db();
2522 let conn = DatabaseSqlConnection::new(db.clone());
2523 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2524 setup_join_tables(&mut bridge);
2525
2526 let r = bridge.execute(
2527 "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id WHERE orders.amount > 50"
2528 ).unwrap();
2529
2530 let rows = r.rows().unwrap();
2531 assert_eq!(rows.len(), 2); }
2533
2534 #[test]
2535 fn test_sqlbridge_join_with_alias() {
2536 let (db, _tmp) = setup_test_db();
2537 let conn = DatabaseSqlConnection::new(db.clone());
2538 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2539 setup_join_tables(&mut bridge);
2540
2541 let r = bridge
2542 .execute("SELECT u.name, o.amount FROM users u INNER JOIN orders o ON u.id = o.user_id")
2543 .unwrap();
2544
2545 let rows = r.rows().unwrap();
2546 assert_eq!(rows.len(), 3);
2547 }
2548
2549 #[test]
2550 fn test_sqlbridge_join_with_limit() {
2551 let (db, _tmp) = setup_test_db();
2552 let conn = DatabaseSqlConnection::new(db.clone());
2553 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2554 setup_join_tables(&mut bridge);
2555
2556 let r = bridge.execute(
2557 "SELECT users.name, orders.oid FROM users INNER JOIN orders ON users.id = orders.user_id LIMIT 2"
2558 ).unwrap();
2559
2560 let rows = r.rows().unwrap();
2561 assert_eq!(rows.len(), 2);
2562 }
2563
2564 #[test]
2565 fn test_sqlbridge_join_three_tables() {
2566 let (db, _tmp) = setup_test_db();
2567 let conn = DatabaseSqlConnection::new(db.clone());
2568 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2569 setup_join_tables(&mut bridge);
2570
2571 bridge
2573 .execute("CREATE TABLE departments (code TEXT, dname TEXT)")
2574 .unwrap();
2575 bridge
2576 .execute("INSERT INTO departments (code, dname) VALUES ('eng', 'Engineering')")
2577 .unwrap();
2578 bridge
2579 .execute("INSERT INTO departments (code, dname) VALUES ('sales', 'Sales')")
2580 .unwrap();
2581
2582 let r = bridge.execute(
2583 "SELECT users.name, departments.dname FROM users INNER JOIN departments ON users.dept = departments.code"
2584 ).unwrap();
2585
2586 let rows = r.rows().unwrap();
2587 assert_eq!(rows.len(), 3); }
2589
2590 #[test]
2591 fn test_namespaced_connection_prefixes_tables() {
2592 let ns_conn = NamespacedSqlConnection::new(MockConn::default(), "prod", "app");
2593 assert_eq!(ns_conn.prefix_table("users"), "prod:app:users");
2594 assert_eq!(ns_conn.prefix_table("posts"), "prod:app:posts");
2595 assert_eq!(ns_conn.namespace(), "prod");
2596 assert_eq!(ns_conn.database(), "app");
2597 }
2598
2599 #[test]
2600 fn test_namespaced_connection_isolates_data() {
2601 let (db, _tmp) = setup_test_db();
2602
2603 let conn_a =
2605 NamespacedSqlConnection::new(DatabaseSqlConnection::new(db.clone()), "tenant_a", "db1");
2606 let conn_b =
2607 NamespacedSqlConnection::new(DatabaseSqlConnection::new(db.clone()), "tenant_b", "db1");
2608
2609 let mut bridge_a = crate::sql::bridge::SqlBridge::new(conn_a);
2610 let mut bridge_b = crate::sql::bridge::SqlBridge::new(conn_b);
2611
2612 bridge_a
2614 .execute("CREATE TABLE users (name TEXT, age INTEGER)")
2615 .unwrap();
2616 bridge_a
2617 .execute("INSERT INTO users (name, age) VALUES ('Alice', 30)")
2618 .unwrap();
2619
2620 bridge_b
2622 .execute("CREATE TABLE users (name TEXT, age INTEGER)")
2623 .unwrap();
2624 bridge_b
2625 .execute("INSERT INTO users (name, age) VALUES ('Bob', 25)")
2626 .unwrap();
2627
2628 let result_a = bridge_a.execute("SELECT * FROM users").unwrap();
2630 let rows_a = result_a.rows().unwrap();
2631 assert_eq!(rows_a.len(), 1);
2632
2633 let result_b = bridge_b.execute("SELECT * FROM users").unwrap();
2635 let rows_b = result_b.rows().unwrap();
2636 assert_eq!(rows_b.len(), 1);
2637 }
2638
2639 #[derive(Default)]
2641 struct MockConn;
2642 impl crate::sql::bridge::SqlConnection for MockConn {
2643 fn select(
2644 &self,
2645 _: &str,
2646 _: &[String],
2647 _: Option<&Expr>,
2648 _: &[OrderByItem],
2649 _: Option<usize>,
2650 _: Option<usize>,
2651 _: &[CoreSochValue],
2652 ) -> SqlResult<ExecutionResult> {
2653 Ok(ExecutionResult::Rows {
2654 columns: vec![],
2655 rows: vec![],
2656 })
2657 }
2658 fn insert(
2659 &mut self,
2660 _: &str,
2661 _: Option<&[String]>,
2662 _: &[Vec<Expr>],
2663 _: Option<&OnConflict>,
2664 _: &[CoreSochValue],
2665 ) -> SqlResult<ExecutionResult> {
2666 Ok(ExecutionResult::RowsAffected(0))
2667 }
2668 fn update(
2669 &mut self,
2670 _: &str,
2671 _: &[Assignment],
2672 _: Option<&Expr>,
2673 _: &[CoreSochValue],
2674 ) -> SqlResult<ExecutionResult> {
2675 Ok(ExecutionResult::RowsAffected(0))
2676 }
2677 fn delete(
2678 &mut self,
2679 _: &str,
2680 _: Option<&Expr>,
2681 _: &[CoreSochValue],
2682 ) -> SqlResult<ExecutionResult> {
2683 Ok(ExecutionResult::RowsAffected(0))
2684 }
2685 fn create_table(&mut self, _: &CreateTableStmt) -> SqlResult<ExecutionResult> {
2686 Ok(ExecutionResult::Ok)
2687 }
2688 fn drop_table(&mut self, _: &DropTableStmt) -> SqlResult<ExecutionResult> {
2689 Ok(ExecutionResult::Ok)
2690 }
2691 fn create_index(&mut self, _: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
2692 Ok(ExecutionResult::Ok)
2693 }
2694 fn drop_index(&mut self, _: &DropIndexStmt) -> SqlResult<ExecutionResult> {
2695 Ok(ExecutionResult::Ok)
2696 }
2697 fn alter_table(&mut self, _: &AlterTableStmt) -> SqlResult<ExecutionResult> {
2698 Ok(ExecutionResult::Ok)
2699 }
2700 fn begin(&mut self, _: &BeginStmt) -> SqlResult<ExecutionResult> {
2701 Ok(ExecutionResult::TransactionOk)
2702 }
2703 fn commit(&mut self) -> SqlResult<ExecutionResult> {
2704 Ok(ExecutionResult::TransactionOk)
2705 }
2706 fn rollback(&mut self, _: Option<&str>) -> SqlResult<ExecutionResult> {
2707 Ok(ExecutionResult::TransactionOk)
2708 }
2709 fn table_exists(&self, _: &str) -> SqlResult<bool> {
2710 Ok(false)
2711 }
2712 fn index_exists(&self, _: &str) -> SqlResult<bool> {
2713 Ok(false)
2714 }
2715 fn scan_all(
2716 &self,
2717 _: &str,
2718 _: &[String],
2719 ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
2720 Ok(vec![])
2721 }
2722 fn eval_join_predicate(
2723 &self,
2724 _: &Expr,
2725 _: &HashMap<String, CoreSochValue>,
2726 _: &[CoreSochValue],
2727 ) -> Option<bool> {
2728 Some(true)
2729 }
2730 }
2731}