1use crate::optimizer_integration::StorageBackend;
64use crate::soch_ql::SochValue as QuerySochValue;
65use crate::sql::ast::*;
66use crate::sql::bridge::{ExecutionResult, SqlConnection};
67use crate::sql::error::{SqlError, SqlResult};
68use sochdb_core::SochValue as CoreSochValue;
69use sochdb_storage::{Database, KernelTxnHandle};
70use std::collections::HashMap;
71use std::sync::Arc;
72
73pub fn convert_core_to_query(value: &CoreSochValue) -> QuerySochValue {
83 match value {
84 CoreSochValue::Null => QuerySochValue::Null,
85 CoreSochValue::Bool(b) => QuerySochValue::Bool(*b),
86 CoreSochValue::Int(i) => QuerySochValue::Int(*i),
87 CoreSochValue::UInt(u) => QuerySochValue::UInt(*u),
88 CoreSochValue::Float(f) => QuerySochValue::Float(*f),
89 CoreSochValue::Text(s) => QuerySochValue::Text(s.clone()),
90 CoreSochValue::Binary(b) => QuerySochValue::Binary(b.clone()),
91 CoreSochValue::Array(arr) => {
92 QuerySochValue::Array(arr.iter().map(convert_core_to_query).collect())
93 }
94 CoreSochValue::Object(map) => {
95 match serde_json::to_string(map) {
97 Ok(json) => QuerySochValue::Text(json),
98 Err(_) => QuerySochValue::Text(format!("{:?}", map)),
99 }
100 }
101 CoreSochValue::Ref { table, id } => {
102 QuerySochValue::Text(format!("{}/{}", table, id))
103 }
104 }
105}
106
107pub fn convert_query_to_core(value: &QuerySochValue) -> CoreSochValue {
109 match value {
110 QuerySochValue::Null => CoreSochValue::Null,
111 QuerySochValue::Bool(b) => CoreSochValue::Bool(*b),
112 QuerySochValue::Int(i) => CoreSochValue::Int(*i),
113 QuerySochValue::UInt(u) => CoreSochValue::UInt(*u),
114 QuerySochValue::Float(f) => CoreSochValue::Float(*f),
115 QuerySochValue::Text(s) => CoreSochValue::Text(s.clone()),
116 QuerySochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
117 QuerySochValue::Array(arr) => {
118 CoreSochValue::Array(arr.iter().map(convert_query_to_core).collect())
119 }
120 }
121}
122
123fn convert_row_core_to_query(
125 row: HashMap<String, CoreSochValue>,
126) -> HashMap<String, QuerySochValue> {
127 row.into_iter()
128 .map(|(k, v)| (k, convert_core_to_query(&v)))
129 .collect()
130}
131
132fn convert_rows_core_to_query(
134 rows: Vec<HashMap<String, CoreSochValue>>,
135) -> Vec<HashMap<String, QuerySochValue>> {
136 rows.into_iter().map(convert_row_core_to_query).collect()
137}
138
139pub struct DatabaseStorageBackend {
158 db: Arc<Database>,
159}
160
161impl DatabaseStorageBackend {
162 pub fn new(db: Arc<Database>) -> Self {
164 Self { db }
165 }
166
167 pub fn database(&self) -> &Arc<Database> {
169 &self.db
170 }
171
172 fn with_read_txn<F, T>(&self, f: F) -> sochdb_core::Result<T>
174 where
175 F: FnOnce(KernelTxnHandle) -> sochdb_core::Result<T>,
176 {
177 let txn = self.db.begin_read_only_fast();
178 let result = f(txn);
179 self.db.abort_read_only_fast(txn);
180 result
181 }
182}
183
184impl StorageBackend for DatabaseStorageBackend {
185 fn table_scan(
186 &self,
187 table: &str,
188 columns: &[String],
189 predicate: Option<&str>,
190 ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
191 self.with_read_txn(|txn| {
192 let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
193
194 let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
195 self.db.query(txn, table)
196 } else {
197 self.db.query(txn, table).columns(&col_refs)
198 };
199
200 let result = query.execute()?;
201 let mut rows = convert_rows_core_to_query(result.rows);
202
203 if let Some(pred) = predicate {
205 rows = apply_simple_predicate(&rows, pred);
206 }
207
208 Ok(rows)
209 })
210 }
211
212 fn primary_key_lookup(
213 &self,
214 table: &str,
215 key: &QuerySochValue,
216 ) -> sochdb_core::Result<Option<HashMap<String, QuerySochValue>>> {
217 let row_id = match key {
218 QuerySochValue::Int(i) => *i as u64,
219 QuerySochValue::UInt(u) => *u,
220 _ => return Ok(None),
221 };
222
223 self.with_read_txn(|txn| {
224 let result = self.db.read_row(txn, table, row_id, None)?;
225 Ok(result.map(convert_row_core_to_query))
226 })
227 }
228
229 fn secondary_index_seek(
230 &self,
231 table: &str,
232 index: &str,
233 key: &QuerySochValue,
234 ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
235 let column_name = index.to_string();
237 let core_key = convert_query_to_core(key);
238
239 self.with_read_txn(|txn| {
240 let result = self.db.query(txn, table).execute()?;
241 let rows: Vec<HashMap<String, QuerySochValue>> = result
242 .rows
243 .into_iter()
244 .filter(|row| {
245 row.get(&column_name)
246 .map(|v| v == &core_key)
247 .unwrap_or(false)
248 })
249 .map(convert_row_core_to_query)
250 .collect();
251 Ok(rows)
252 })
253 }
254
255 fn time_index_scan(
256 &self,
257 table: &str,
258 start_us: u64,
259 end_us: u64,
260 ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
261 self.with_read_txn(|txn| {
262 let result = self.db.query(txn, table).execute()?;
263 let rows: Vec<HashMap<String, QuerySochValue>> = result
264 .rows
265 .into_iter()
266 .filter(|row| {
267 if let Some(CoreSochValue::UInt(ts)) = row.get("_timestamp") {
268 *ts >= start_us && *ts <= end_us
269 } else if let Some(CoreSochValue::Int(ts)) = row.get("_timestamp") {
270 let ts = *ts as u64;
271 ts >= start_us && ts <= end_us
272 } else {
273 false
274 }
275 })
276 .map(convert_row_core_to_query)
277 .collect();
278 Ok(rows)
279 })
280 }
281
282 fn vector_search(
283 &self,
284 table: &str,
285 query: &[f32],
286 k: usize,
287 ) -> sochdb_core::Result<Vec<(f32, HashMap<String, QuerySochValue>)>> {
288 self.with_read_txn(|txn| {
289 let result = self.db.query(txn, table).execute()?;
290
291 let mut scored: Vec<(f32, HashMap<String, CoreSochValue>)> = result
292 .rows
293 .into_iter()
294 .filter_map(|row| {
295 let vec_col = row.get("_vector").or_else(|| row.get("_embedding"));
296
297 if let Some(CoreSochValue::Array(arr)) = vec_col {
298 let vec: Vec<f32> = arr
299 .iter()
300 .filter_map(|v| match v {
301 CoreSochValue::Float(f) => Some(*f as f32),
302 CoreSochValue::Int(i) => Some(*i as f32),
303 _ => None,
304 })
305 .collect();
306
307 if vec.len() == query.len() {
308 let dist = euclidean_distance(&vec, query);
309 Some((dist, row))
310 } else {
311 None
312 }
313 } else {
314 None
315 }
316 })
317 .collect();
318
319 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
320 scored.truncate(k);
321
322 Ok(scored
323 .into_iter()
324 .map(|(dist, row)| (dist, convert_row_core_to_query(row)))
325 .collect())
326 })
327 }
328
329 fn row_count(&self, table: &str) -> usize {
330 let txn = self.db.begin_read_only_fast();
331 let count = self
332 .db
333 .query(txn, table)
334 .execute()
335 .map(|r| r.rows_scanned)
336 .unwrap_or(0);
337 self.db.abort_read_only_fast(txn);
338 count
339 }
340}
341
342pub struct DatabaseSqlConnection {
358 db: Arc<Database>,
359 active_txn: Option<KernelTxnHandle>,
360 explicit_txn: bool,
362 next_row_ids: HashMap<String, u64>,
364}
365
366impl DatabaseSqlConnection {
367 pub fn new(db: Arc<Database>) -> Self {
369 Self {
370 db,
371 active_txn: None,
372 explicit_txn: false,
373 next_row_ids: HashMap::new(),
374 }
375 }
376
377 pub fn database(&self) -> &Arc<Database> {
379 &self.db
380 }
381
382 fn ensure_write_txn(&mut self) -> SqlResult<KernelTxnHandle> {
384 if let Some(txn) = self.active_txn {
385 Ok(txn)
386 } else {
387 let txn = self
388 .db
389 .begin_transaction()
390 .map_err(|e| SqlError::ExecutionError(format!("Failed to begin txn: {}", e)))?;
391 self.active_txn = Some(txn);
392 Ok(txn)
393 }
394 }
395
396 fn auto_commit_if_implicit(&mut self) -> SqlResult<()> {
399 if self.explicit_txn {
400 return Ok(()); }
402 if let Some(txn) = self.active_txn.take() {
403 self.db
404 .commit(txn)
405 .map_err(|e| SqlError::ExecutionError(format!("Commit failed: {}", e)))?;
406 }
407 Ok(())
408 }
409
410 fn next_row_id(&mut self, table: &str) -> u64 {
412 let counter = self.next_row_ids.entry(table.to_string()).or_insert(0);
413 *counter += 1;
414 *counter
415 }
416
417 fn init_row_id_counter(&mut self, table: &str) {
419 if self.next_row_ids.contains_key(table) {
420 return;
421 }
422 let txn = self.db.begin_read_only_fast();
423 let max_id = self
424 .db
425 .query(txn, table)
426 .execute()
427 .map(|r| r.rows_scanned as u64)
428 .unwrap_or(0);
429 self.db.abort_read_only_fast(txn);
430 self.next_row_ids.insert(table.to_string(), max_id);
431 }
432
433 fn eval_expr(
435 &self,
436 expr: &Expr,
437 row: &HashMap<String, CoreSochValue>,
438 params: &[CoreSochValue],
439 ) -> Option<CoreSochValue> {
440 match expr {
441 Expr::Column(col_ref) => {
442 if let Some(ref tbl) = col_ref.table {
444 let qualified = format!("{}.{}", tbl, col_ref.column);
445 if let Some(v) = row.get(&qualified) {
446 return Some(v.clone());
447 }
448 }
449 let col_name = &col_ref.column;
451 row.get(col_name).cloned()
452 }
453 Expr::Literal(lit) => Some(literal_to_core(lit)),
454 Expr::Placeholder(idx) => params.get((*idx as usize).saturating_sub(1)).cloned(),
455 Expr::BinaryOp { left, op, right } => {
456 let lhs = self.eval_expr(left, row, params)?;
457 let rhs = self.eval_expr(right, row, params)?;
458 Some(eval_binary_op(&lhs, op, &rhs))
459 }
460 Expr::UnaryOp { op, expr: inner } => {
461 let val = self.eval_expr(inner, row, params)?;
462 Some(eval_unary_op(op, &val))
463 }
464 Expr::IsNull { expr: inner, negated } => {
465 let val = self.eval_expr(inner, row, params)?;
466 let is_null = matches!(val, CoreSochValue::Null);
467 Some(CoreSochValue::Bool(if *negated { !is_null } else { is_null }))
468 }
469 Expr::Between {
470 expr: inner,
471 low,
472 high,
473 negated,
474 } => {
475 let val = self.eval_expr(inner, row, params)?;
476 let lo = self.eval_expr(low, row, params)?;
477 let hi = self.eval_expr(high, row, params)?;
478 let in_range = compare_values(&val, &lo) != std::cmp::Ordering::Less
479 && compare_values(&val, &hi) != std::cmp::Ordering::Greater;
480 Some(CoreSochValue::Bool(if *negated {
481 !in_range
482 } else {
483 in_range
484 }))
485 }
486 Expr::InList {
487 expr: inner,
488 list,
489 negated,
490 } => {
491 let val = self.eval_expr(inner, row, params)?;
492 let found = list
493 .iter()
494 .any(|item| self.eval_expr(item, row, params) == Some(val.clone()));
495 Some(CoreSochValue::Bool(if *negated { !found } else { found }))
496 }
497 Expr::Like {
498 expr: inner,
499 pattern,
500 negated,
501 ..
502 } => {
503 let val = self.eval_expr(inner, row, params)?;
504 let pat = self.eval_expr(pattern, row, params)?;
505 if let (CoreSochValue::Text(s), CoreSochValue::Text(p)) = (&val, &pat) {
506 let matched = sql_like_match(s, p);
507 Some(CoreSochValue::Bool(if *negated { !matched } else { matched }))
508 } else {
509 Some(CoreSochValue::Bool(false))
510 }
511 }
512 Expr::Function(func_call) => {
513 let func_name = func_call.name.name().to_uppercase();
514 match func_name.as_str() {
515 "UPPER" => {
516 let val = func_call.args.first().and_then(|a| self.eval_expr(a, row, params))?;
517 if let CoreSochValue::Text(s) = val {
518 Some(CoreSochValue::Text(s.to_uppercase()))
519 } else {
520 Some(CoreSochValue::Null)
521 }
522 }
523 "LOWER" => {
524 let val = func_call.args.first().and_then(|a| self.eval_expr(a, row, params))?;
525 if let CoreSochValue::Text(s) = val {
526 Some(CoreSochValue::Text(s.to_lowercase()))
527 } else {
528 Some(CoreSochValue::Null)
529 }
530 }
531 "LENGTH" | "LEN" => {
532 let val = func_call.args.first().and_then(|a| self.eval_expr(a, row, params))?;
533 if let CoreSochValue::Text(s) = val {
534 Some(CoreSochValue::Int(s.len() as i64))
535 } else {
536 Some(CoreSochValue::Null)
537 }
538 }
539 "COALESCE" => {
540 for arg in &func_call.args {
541 if let Some(val) = self.eval_expr(arg, row, params) {
542 if !matches!(val, CoreSochValue::Null) {
543 return Some(val);
544 }
545 }
546 }
547 Some(CoreSochValue::Null)
548 }
549 _ => {
551 func_call
552 .args
553 .first()
554 .and_then(|a| self.eval_expr(a, row, params))
555 }
556 }
557 }
558 _ => None,
559 }
560 }
561
562 fn row_matches(
564 &self,
565 expr: &Expr,
566 row: &HashMap<String, CoreSochValue>,
567 params: &[CoreSochValue],
568 ) -> bool {
569 match self.eval_expr(expr, row, params) {
570 Some(CoreSochValue::Bool(b)) => b,
571 _ => false,
572 }
573 }
574
575 fn find_row_id(
579 &self,
580 table: &str,
581 target_row: &HashMap<String, CoreSochValue>,
582 txn: KernelTxnHandle,
583 ) -> SqlResult<Option<u64>> {
584 let entries = self
585 .db
586 .scan(txn, table.as_bytes())
587 .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)))?;
588
589 for (key_bytes, _value_bytes) in entries {
590 if let Ok(key_str) = String::from_utf8(key_bytes) {
591 let parts: Vec<&str> = key_str.split('/').collect();
593 if parts.len() == 2 {
594 if let Ok(row_id) = parts[1].parse::<u64>() {
595 if let Ok(Some(row)) = self.db.read_row(txn, table, row_id, None) {
596 if rows_equal(&row, target_row) {
597 return Ok(Some(row_id));
598 }
599 }
600 }
601 }
602 }
603 }
604
605 Ok(None)
606 }
607}
608
609impl SqlConnection for DatabaseSqlConnection {
610 fn select(
611 &self,
612 table: &str,
613 columns: &[String],
614 where_clause: Option<&Expr>,
615 order_by: &[OrderByItem],
616 limit: Option<usize>,
617 offset: Option<usize>,
618 params: &[CoreSochValue],
619 ) -> SqlResult<ExecutionResult> {
620 let txn = self.db.begin_read_only_fast();
621
622 let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
623 self.db.query(txn, table)
624 } else {
625 let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
626 self.db.query(txn, table).columns(&col_refs)
627 };
628
629 let result = query
630 .execute()
631 .map_err(|e| SqlError::ExecutionError(format!("Query failed: {}", e)));
632
633 self.db.abort_read_only_fast(txn);
634
635 let result = result?;
636 let mut rows = result.rows;
637
638 if let Some(expr) = where_clause {
640 rows.retain(|row| self.row_matches(expr, row, params));
641 }
642
643 if !order_by.is_empty() {
645 rows.sort_by(|a, b| {
646 for item in order_by {
647 let col_name = extract_order_by_column(&item.expr);
648 let va = a.get(&col_name);
649 let vb = b.get(&col_name);
650 let cmp = compare_optional_values(va, vb);
651 let cmp = if !item.asc { cmp.reverse() } else { cmp };
652 if cmp != std::cmp::Ordering::Equal {
653 return cmp;
654 }
655 }
656 std::cmp::Ordering::Equal
657 });
658 }
659
660 if let Some(off) = offset {
662 rows = rows.into_iter().skip(off).collect();
663 }
664
665 if let Some(lim) = limit {
667 rows.truncate(lim);
668 }
669
670 let result_columns = if columns.is_empty() || columns.iter().any(|c| c == "*") {
672 rows.first()
673 .map(|r| r.keys().cloned().collect())
674 .unwrap_or_default()
675 } else {
676 columns.to_vec()
677 };
678
679 Ok(ExecutionResult::Rows {
680 columns: result_columns,
681 rows,
682 })
683 }
684
685 fn insert(
686 &mut self,
687 table: &str,
688 columns: Option<&[String]>,
689 rows: &[Vec<Expr>],
690 _on_conflict: Option<&OnConflict>,
691 params: &[CoreSochValue],
692 ) -> SqlResult<ExecutionResult> {
693 let txn = self.ensure_write_txn()?;
694 self.init_row_id_counter(table);
695
696 let schema = self.db.get_table_schema(table);
697 let col_names: Vec<String> = if let Some(cols) = columns {
698 cols.to_vec()
699 } else if let Some(ref s) = schema {
700 s.columns.iter().map(|c| c.name.clone()).collect()
701 } else {
702 return Err(SqlError::InvalidArgument(
703 "INSERT requires column names when table has no schema".into(),
704 ));
705 };
706
707 let mut inserted = 0;
708 for row_exprs in rows {
709 let row_id = self.next_row_id(table);
710 let mut values = HashMap::new();
711
712 for (i, expr) in row_exprs.iter().enumerate() {
713 if i < col_names.len() {
714 let value = match expr {
715 Expr::Literal(lit) => literal_to_core(lit),
716 Expr::Placeholder(idx) => params
717 .get((*idx as usize).saturating_sub(1))
718 .cloned()
719 .unwrap_or(CoreSochValue::Null),
720 _ => CoreSochValue::Null,
721 };
722 values.insert(col_names[i].clone(), value);
723 }
724 }
725
726 self.db
727 .insert_row(txn, table, row_id, &values)
728 .map_err(|e| SqlError::ExecutionError(format!("Insert failed: {}", e)))?;
729 inserted += 1;
730 }
731
732 self.auto_commit_if_implicit()?;
733 Ok(ExecutionResult::RowsAffected(inserted))
734 }
735
736 fn update(
737 &mut self,
738 table: &str,
739 assignments: &[Assignment],
740 where_clause: Option<&Expr>,
741 params: &[CoreSochValue],
742 ) -> SqlResult<ExecutionResult> {
743 let txn = self.ensure_write_txn()?;
744
745 let result = self
746 .db
747 .query(txn, table)
748 .execute()
749 .map_err(|e| SqlError::ExecutionError(format!("Scan for update failed: {}", e)))?;
750
751 let mut updated = 0;
752
753 for row in &result.rows {
754 let matches = match where_clause {
755 Some(expr) => self.row_matches(expr, row, params),
756 None => true,
757 };
758
759 if matches {
760 let row_id = self.find_row_id(table, row, txn)?;
761 if let Some(row_id) = row_id {
762 let mut new_values = row.clone();
763 for assignment in assignments {
764 let col_name = assignment.column.clone();
765 let value = match &assignment.value {
766 Expr::Literal(lit) => literal_to_core(lit),
767 Expr::Placeholder(idx) => params
768 .get((*idx as usize).saturating_sub(1))
769 .cloned()
770 .unwrap_or(CoreSochValue::Null),
771 _ => self
772 .eval_expr(&assignment.value, row, params)
773 .unwrap_or(CoreSochValue::Null),
774 };
775 new_values.insert(col_name, value);
776 }
777
778 self.db
779 .insert_row(txn, table, row_id, &new_values)
780 .map_err(|e| SqlError::ExecutionError(format!("Update failed: {}", e)))?;
781 updated += 1;
782 }
783 }
784 }
785
786 self.auto_commit_if_implicit()?;
787 Ok(ExecutionResult::RowsAffected(updated))
788 }
789
790 fn delete(
791 &mut self,
792 table: &str,
793 where_clause: Option<&Expr>,
794 params: &[CoreSochValue],
795 ) -> SqlResult<ExecutionResult> {
796 let txn = self.ensure_write_txn()?;
797
798 let result = self
799 .db
800 .query(txn, table)
801 .execute()
802 .map_err(|e| SqlError::ExecutionError(format!("Scan for delete failed: {}", e)))?;
803
804 let mut deleted = 0;
805
806 for row in &result.rows {
807 let matches = match where_clause {
808 Some(expr) => self.row_matches(expr, row, params),
809 None => true,
810 };
811
812 if matches {
813 if let Some(row_id) = self.find_row_id(table, row, txn)? {
814 let key = format!("{}/{}", table, row_id);
815 self.db
816 .delete(txn, key.as_bytes())
817 .map_err(|e| SqlError::ExecutionError(format!("Delete failed: {}", e)))?;
818 deleted += 1;
819 }
820 }
821 }
822
823 self.auto_commit_if_implicit()?;
824 Ok(ExecutionResult::RowsAffected(deleted))
825 }
826
827 fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
828 use sochdb_storage::DbColumnDef;
829 use sochdb_storage::DbTableSchema;
830
831 let table_name = stmt.name.name().to_string();
832
833 if self.db.get_table_schema(&table_name).is_some() {
834 if stmt.if_not_exists {
835 return Ok(ExecutionResult::Ok);
836 }
837 return Err(SqlError::InvalidArgument(format!(
838 "Table '{}' already exists",
839 table_name
840 )));
841 }
842
843 let columns: Vec<DbColumnDef> = stmt
844 .columns
845 .iter()
846 .map(|col| {
847 let col_type = sql_type_to_db_type(&col.data_type);
848 let nullable = !col.constraints.iter().any(|c| matches!(c, ColumnConstraint::NotNull));
849 DbColumnDef {
850 name: col.name.clone(),
851 col_type,
852 nullable,
853 }
854 })
855 .collect();
856
857 let schema = DbTableSchema {
858 name: table_name,
859 columns,
860 };
861
862 self.db
863 .register_table(schema)
864 .map_err(|e| SqlError::ExecutionError(format!("Create table failed: {}", e)))?;
865
866 Ok(ExecutionResult::Ok)
867 }
868
869 fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
870 let table_name = stmt
871 .names
872 .first()
873 .map(|n| n.name().to_string())
874 .unwrap_or_default();
875
876 if self.db.get_table_schema(&table_name).is_none() {
877 if stmt.if_exists {
878 return Ok(ExecutionResult::Ok);
879 }
880 return Err(SqlError::TableNotFound(table_name));
881 }
882 Ok(ExecutionResult::Ok)
885 }
886
887 fn create_index(&mut self, _stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
888 Ok(ExecutionResult::Ok)
890 }
891
892 fn drop_index(&mut self, _stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
893 Ok(ExecutionResult::Ok)
894 }
895
896 fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
897 use sochdb_storage::{DbColumnDef, DbColumnType};
898
899 let table_name = stmt.name.name().to_string();
900 let mut schema = self
901 .db
902 .get_table_schema(&table_name)
903 .ok_or_else(|| SqlError::TableNotFound(table_name.clone()))?;
904
905 let original_name = table_name.clone();
906
907 for op in &stmt.operations {
908 match op {
909 AlterTableOp::AddColumn(col_def) => {
910 if schema.columns.iter().any(|c| c.name == col_def.name) {
912 return Err(SqlError::InvalidArgument(format!(
913 "Column '{}' already exists in table '{}'",
914 col_def.name, schema.name
915 )));
916 }
917 let col_type = sql_type_to_db_type(&col_def.data_type);
918 let nullable = !col_def
919 .constraints
920 .iter()
921 .any(|c| matches!(c, ColumnConstraint::NotNull));
922 schema.columns.push(DbColumnDef {
923 name: col_def.name.clone(),
924 col_type,
925 nullable,
926 });
927 }
928 AlterTableOp::DropColumn { name, .. } => {
929 let idx = schema
930 .columns
931 .iter()
932 .position(|c| c.name == *name)
933 .ok_or_else(|| {
934 SqlError::InvalidArgument(format!(
935 "Column '{}' not found in table '{}'",
936 name, schema.name
937 ))
938 })?;
939 schema.columns.remove(idx);
940 }
941 AlterTableOp::RenameColumn { old_name, new_name } => {
942 let col = schema
943 .columns
944 .iter_mut()
945 .find(|c| c.name == *old_name)
946 .ok_or_else(|| {
947 SqlError::InvalidArgument(format!(
948 "Column '{}' not found in table '{}'",
949 old_name, schema.name
950 ))
951 })?;
952 col.name = new_name.clone();
953 }
954 AlterTableOp::RenameTable(new_name) => {
955 schema.name = new_name.name().to_string();
956 }
957 AlterTableOp::AlterColumn { name, operation } => {
958 let col = schema
959 .columns
960 .iter_mut()
961 .find(|c| c.name == *name)
962 .ok_or_else(|| {
963 SqlError::InvalidArgument(format!(
964 "Column '{}' not found in table '{}'",
965 name, schema.name
966 ))
967 })?;
968 match operation {
969 AlterColumnOp::SetType(data_type) => {
970 col.col_type = sql_type_to_db_type(data_type);
971 }
972 AlterColumnOp::SetNotNull => {
973 col.nullable = false;
974 }
975 AlterColumnOp::DropNotNull => {
976 col.nullable = true;
977 }
978 AlterColumnOp::SetDefault(_) | AlterColumnOp::DropDefault => {
979 }
982 }
983 }
984 AlterTableOp::AddConstraint(_) | AlterTableOp::DropConstraint { .. } => {
985 return Err(SqlError::NotImplemented(
986 "ADD/DROP CONSTRAINT not yet implemented".into(),
987 ));
988 }
989 }
990 }
991
992 self.db
993 .update_table_schema(&original_name, schema)
994 .map_err(|e| SqlError::ExecutionError(format!("ALTER TABLE failed: {}", e)))?;
995
996 Ok(ExecutionResult::Ok)
997 }
998
999 fn begin(&mut self, _stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1000 if self.active_txn.is_some() {
1001 return Err(SqlError::TransactionError(
1002 "Transaction already active".into(),
1003 ));
1004 }
1005 let txn = self
1006 .db
1007 .begin_transaction()
1008 .map_err(|e| SqlError::ExecutionError(format!("Begin failed: {}", e)))?;
1009 self.active_txn = Some(txn);
1010 self.explicit_txn = true;
1011 Ok(ExecutionResult::TransactionOk)
1012 }
1013
1014 fn commit(&mut self) -> SqlResult<ExecutionResult> {
1015 if let Some(txn) = self.active_txn.take() {
1016 self.explicit_txn = false;
1017 self.db
1018 .commit(txn)
1019 .map_err(|e| SqlError::TransactionError(format!("Commit failed: {}", e)))?;
1020 Ok(ExecutionResult::TransactionOk)
1021 } else {
1022 Err(SqlError::TransactionError("No active transaction".into()))
1023 }
1024 }
1025
1026 fn rollback(&mut self, _savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1027 if let Some(txn) = self.active_txn.take() {
1028 self.explicit_txn = false;
1029 self.db
1030 .abort(txn)
1031 .map_err(|e| SqlError::TransactionError(format!("Rollback failed: {}", e)))?;
1032 Ok(ExecutionResult::TransactionOk)
1033 } else {
1034 Err(SqlError::TransactionError("No active transaction".into()))
1035 }
1036 }
1037
1038 fn table_exists(&self, table: &str) -> SqlResult<bool> {
1039 Ok(self.db.get_table_schema(table).is_some())
1040 }
1041
1042 fn index_exists(&self, _index: &str) -> SqlResult<bool> {
1043 Ok(false)
1045 }
1046
1047 fn scan_all(
1048 &self,
1049 table: &str,
1050 columns: &[String],
1051 ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1052 let txn = self.db.begin_read_only_fast();
1053
1054 let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
1055 self.db.query(txn, table)
1056 } else {
1057 let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
1058 self.db.query(txn, table).columns(&col_refs)
1059 };
1060
1061 let result = query
1062 .execute()
1063 .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)));
1064
1065 self.db.abort_read_only_fast(txn);
1066 Ok(result?.rows)
1067 }
1068
1069 fn eval_join_predicate(
1070 &self,
1071 expr: &Expr,
1072 row: &HashMap<String, CoreSochValue>,
1073 params: &[CoreSochValue],
1074 ) -> Option<bool> {
1075 let val = self.eval_expr(expr, row, params)?;
1076 match val {
1077 CoreSochValue::Bool(b) => Some(b),
1078 CoreSochValue::Null => Some(false),
1079 _ => Some(false),
1080 }
1081 }
1082}
1083
1084fn apply_simple_predicate(
1093 rows: &[HashMap<String, QuerySochValue>],
1094 predicate: &str,
1095) -> Vec<HashMap<String, QuerySochValue>> {
1096 let operators = [">=", "<=", "!=", "=", ">", "<"];
1097
1098 for op in &operators {
1099 if let Some(idx) = predicate.find(op) {
1100 let column = predicate[..idx].trim();
1101 let value_str = predicate[idx + op.len()..].trim().trim_matches('\'');
1102
1103 return rows
1104 .iter()
1105 .filter(|row| {
1106 if let Some(val) = row.get(column) {
1107 let val_str = match val {
1108 QuerySochValue::Text(s) => s.clone(),
1109 QuerySochValue::Int(i) => i.to_string(),
1110 QuerySochValue::UInt(u) => u.to_string(),
1111 QuerySochValue::Float(f) => f.to_string(),
1112 QuerySochValue::Bool(b) => b.to_string(),
1113 _ => return false,
1114 };
1115
1116 match *op {
1117 "=" => val_str == value_str,
1118 "!=" => val_str != value_str,
1119 ">" => val_str.as_str() > value_str,
1120 "<" => (val_str.as_str()) < value_str,
1121 ">=" => val_str.as_str() >= value_str,
1122 "<=" => val_str.as_str() <= value_str,
1123 _ => false,
1124 }
1125 } else {
1126 false
1127 }
1128 })
1129 .cloned()
1130 .collect();
1131 }
1132 }
1133
1134 rows.to_vec()
1135}
1136
1137fn euclidean_distance(a: &[f32], b: &[f32]) -> f32 {
1139 a.iter()
1140 .zip(b.iter())
1141 .map(|(x, y)| (x - y) * (x - y))
1142 .sum::<f32>()
1143 .sqrt()
1144}
1145
1146fn literal_to_core(lit: &Literal) -> CoreSochValue {
1148 match lit {
1149 Literal::Integer(i) => CoreSochValue::Int(*i),
1150 Literal::Float(f) => CoreSochValue::Float(*f),
1151 Literal::String(s) => CoreSochValue::Text(s.clone()),
1152 Literal::Boolean(b) => CoreSochValue::Bool(*b),
1153 Literal::Null => CoreSochValue::Null,
1154 Literal::Blob(b) => CoreSochValue::Binary(b.clone()),
1155 }
1156}
1157
1158fn eval_binary_op(
1160 lhs: &CoreSochValue,
1161 op: &BinaryOperator,
1162 rhs: &CoreSochValue,
1163) -> CoreSochValue {
1164 match op {
1165 BinaryOperator::Eq => CoreSochValue::Bool(lhs == rhs),
1166 BinaryOperator::Ne => CoreSochValue::Bool(lhs != rhs),
1167 BinaryOperator::Lt => {
1168 CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Less)
1169 }
1170 BinaryOperator::Gt => {
1171 CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Greater)
1172 }
1173 BinaryOperator::Le => {
1174 CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Greater)
1175 }
1176 BinaryOperator::Ge => {
1177 CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Less)
1178 }
1179 BinaryOperator::And => {
1180 let a = matches!(lhs, CoreSochValue::Bool(true));
1181 let b = matches!(rhs, CoreSochValue::Bool(true));
1182 CoreSochValue::Bool(a && b)
1183 }
1184 BinaryOperator::Or => {
1185 let a = matches!(lhs, CoreSochValue::Bool(true));
1186 let b = matches!(rhs, CoreSochValue::Bool(true));
1187 CoreSochValue::Bool(a || b)
1188 }
1189 BinaryOperator::Plus => numeric_op(lhs, rhs, |a, b| a + b, |a, b| a + b),
1190 BinaryOperator::Minus => numeric_op(lhs, rhs, |a, b| a - b, |a, b| a - b),
1191 BinaryOperator::Multiply => numeric_op(lhs, rhs, |a, b| a * b, |a, b| a * b),
1192 BinaryOperator::Divide => numeric_op(
1193 lhs,
1194 rhs,
1195 |a, b| if b != 0 { a / b } else { 0 },
1196 |a, b| if b != 0.0 { a / b } else { 0.0 },
1197 ),
1198 BinaryOperator::Modulo => numeric_op(
1199 lhs,
1200 rhs,
1201 |a, b| if b != 0 { a % b } else { 0 },
1202 |a, b| if b != 0.0 { a % b } else { 0.0 },
1203 ),
1204 BinaryOperator::Like => {
1205 if let (CoreSochValue::Text(s), CoreSochValue::Text(pattern)) = (lhs, rhs) {
1206 CoreSochValue::Bool(sql_like_match(s, pattern))
1207 } else {
1208 CoreSochValue::Bool(false)
1209 }
1210 }
1211 BinaryOperator::Concat => {
1212 let a = value_to_string(lhs);
1213 let b = value_to_string(rhs);
1214 CoreSochValue::Text(format!("{}{}", a, b))
1215 }
1216 _ => CoreSochValue::Null,
1217 }
1218}
1219
1220fn eval_unary_op(op: &UnaryOperator, val: &CoreSochValue) -> CoreSochValue {
1222 match op {
1223 UnaryOperator::Not => match val {
1224 CoreSochValue::Bool(b) => CoreSochValue::Bool(!b),
1225 _ => CoreSochValue::Null,
1226 },
1227 UnaryOperator::Minus => match val {
1228 CoreSochValue::Int(i) => CoreSochValue::Int(-i),
1229 CoreSochValue::Float(f) => CoreSochValue::Float(-f),
1230 _ => CoreSochValue::Null,
1231 },
1232 UnaryOperator::Plus => val.clone(),
1233 _ => CoreSochValue::Null,
1234 }
1235}
1236
1237fn numeric_op(
1239 lhs: &CoreSochValue,
1240 rhs: &CoreSochValue,
1241 int_op: impl Fn(i64, i64) -> i64,
1242 float_op: impl Fn(f64, f64) -> f64,
1243) -> CoreSochValue {
1244 match (lhs, rhs) {
1245 (CoreSochValue::Int(a), CoreSochValue::Int(b)) => CoreSochValue::Int(int_op(*a, *b)),
1246 (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1247 CoreSochValue::Float(float_op(*a, *b))
1248 }
1249 (CoreSochValue::Int(a), CoreSochValue::Float(b)) => {
1250 CoreSochValue::Float(float_op(*a as f64, *b))
1251 }
1252 (CoreSochValue::Float(a), CoreSochValue::Int(b)) => {
1253 CoreSochValue::Float(float_op(*a, *b as f64))
1254 }
1255 (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => {
1256 CoreSochValue::Int(int_op(*a as i64, *b as i64))
1257 }
1258 _ => CoreSochValue::Null,
1259 }
1260}
1261
1262fn compare_values(a: &CoreSochValue, b: &CoreSochValue) -> std::cmp::Ordering {
1264 match (a, b) {
1265 (CoreSochValue::Int(a), CoreSochValue::Int(b)) => a.cmp(b),
1266 (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => a.cmp(b),
1267 (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1268 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
1269 }
1270 (CoreSochValue::Text(a), CoreSochValue::Text(b)) => a.cmp(b),
1271 (CoreSochValue::Int(a), CoreSochValue::Float(b)) => (*a as f64)
1272 .partial_cmp(b)
1273 .unwrap_or(std::cmp::Ordering::Equal),
1274 (CoreSochValue::Float(a), CoreSochValue::Int(b)) => a
1275 .partial_cmp(&(*b as f64))
1276 .unwrap_or(std::cmp::Ordering::Equal),
1277 (CoreSochValue::Null, CoreSochValue::Null) => std::cmp::Ordering::Equal,
1278 (CoreSochValue::Null, _) => std::cmp::Ordering::Less,
1279 (_, CoreSochValue::Null) => std::cmp::Ordering::Greater,
1280 _ => std::cmp::Ordering::Equal,
1281 }
1282}
1283
1284fn compare_optional_values(
1286 a: Option<&CoreSochValue>,
1287 b: Option<&CoreSochValue>,
1288) -> std::cmp::Ordering {
1289 match (a, b) {
1290 (Some(a), Some(b)) => compare_values(a, b),
1291 (None, Some(_)) => std::cmp::Ordering::Less,
1292 (Some(_), None) => std::cmp::Ordering::Greater,
1293 (None, None) => std::cmp::Ordering::Equal,
1294 }
1295}
1296
1297fn extract_order_by_column(expr: &Expr) -> String {
1299 match expr {
1300 Expr::Column(col_ref) => col_ref.column.clone(),
1301 _ => String::new(),
1302 }
1303}
1304
1305fn rows_equal(
1307 a: &HashMap<String, CoreSochValue>,
1308 b: &HashMap<String, CoreSochValue>,
1309) -> bool {
1310 if a.len() != b.len() {
1311 return false;
1312 }
1313 a.iter().all(|(k, v)| b.get(k) == Some(v))
1314}
1315
1316fn value_to_string(v: &CoreSochValue) -> String {
1318 match v {
1319 CoreSochValue::Text(s) => s.clone(),
1320 CoreSochValue::Int(i) => i.to_string(),
1321 CoreSochValue::UInt(u) => u.to_string(),
1322 CoreSochValue::Float(f) => f.to_string(),
1323 CoreSochValue::Bool(b) => b.to_string(),
1324 CoreSochValue::Null => "NULL".to_string(),
1325 _ => String::new(),
1326 }
1327}
1328
1329fn sql_like_match(s: &str, pattern: &str) -> bool {
1333 let mut regex_pattern = String::with_capacity(pattern.len() * 2 + 2);
1335 regex_pattern.push('^');
1336 for ch in pattern.chars() {
1337 match ch {
1338 '%' => regex_pattern.push_str(".*"),
1339 '_' => regex_pattern.push('.'),
1340 '.' | '(' | ')' | '[' | ']' | '{' | '}' | '+' | '*' | '?' | '^' | '$' | '|'
1341 | '\\' => {
1342 regex_pattern.push('\\');
1343 regex_pattern.push(ch);
1344 }
1345 _ => regex_pattern.push(ch),
1346 }
1347 }
1348 regex_pattern.push('$');
1349
1350 regex::Regex::new(®ex_pattern)
1351 .map(|re| re.is_match(s))
1352 .unwrap_or(false)
1353}
1354
1355fn sql_type_to_db_type(dt: &DataType) -> sochdb_storage::DbColumnType {
1357 use sochdb_storage::DbColumnType;
1358 match dt {
1359 DataType::TinyInt | DataType::SmallInt | DataType::Int | DataType::BigInt => {
1360 DbColumnType::Int64
1361 }
1362 DataType::Float | DataType::Double | DataType::Decimal { .. } => DbColumnType::Float64,
1363 DataType::Boolean => DbColumnType::Bool,
1364 DataType::Binary(_) | DataType::Varbinary(_) | DataType::Blob => DbColumnType::Binary,
1365 _ => DbColumnType::Text,
1367 }
1368}
1369
1370pub struct NamespacedSqlConnection<C: SqlConnection> {
1392 inner: C,
1393 namespace: String,
1394 database: String,
1395}
1396
1397impl<C: SqlConnection> NamespacedSqlConnection<C> {
1398 pub fn new(inner: C, namespace: impl Into<String>, database: impl Into<String>) -> Self {
1400 Self {
1401 inner,
1402 namespace: namespace.into(),
1403 database: database.into(),
1404 }
1405 }
1406
1407 fn prefix_table(&self, table: &str) -> String {
1409 format!("{}:{}:{}", self.namespace, self.database, table)
1410 }
1411
1412 pub fn namespace(&self) -> &str {
1414 &self.namespace
1415 }
1416
1417 pub fn database(&self) -> &str {
1419 &self.database
1420 }
1421
1422 pub fn inner(&self) -> &C {
1424 &self.inner
1425 }
1426
1427 pub fn inner_mut(&mut self) -> &mut C {
1429 &mut self.inner
1430 }
1431}
1432
1433impl<C: SqlConnection> SqlConnection for NamespacedSqlConnection<C> {
1434 fn select(
1435 &self,
1436 table: &str,
1437 columns: &[String],
1438 where_clause: Option<&Expr>,
1439 order_by: &[OrderByItem],
1440 limit: Option<usize>,
1441 offset: Option<usize>,
1442 params: &[CoreSochValue],
1443 ) -> SqlResult<ExecutionResult> {
1444 self.inner.select(&self.prefix_table(table), columns, where_clause, order_by, limit, offset, params)
1445 }
1446
1447 fn insert(
1448 &mut self,
1449 table: &str,
1450 columns: Option<&[String]>,
1451 rows: &[Vec<Expr>],
1452 on_conflict: Option<&OnConflict>,
1453 params: &[CoreSochValue],
1454 ) -> SqlResult<ExecutionResult> {
1455 self.inner.insert(&self.prefix_table(table), columns, rows, on_conflict, params)
1456 }
1457
1458 fn update(
1459 &mut self,
1460 table: &str,
1461 assignments: &[Assignment],
1462 where_clause: Option<&Expr>,
1463 params: &[CoreSochValue],
1464 ) -> SqlResult<ExecutionResult> {
1465 self.inner.update(&self.prefix_table(table), assignments, where_clause, params)
1466 }
1467
1468 fn delete(
1469 &mut self,
1470 table: &str,
1471 where_clause: Option<&Expr>,
1472 params: &[CoreSochValue],
1473 ) -> SqlResult<ExecutionResult> {
1474 self.inner.delete(&self.prefix_table(table), where_clause, params)
1475 }
1476
1477 fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
1478 let mut prefixed = stmt.clone();
1480 let original_name = stmt.name.name().to_string();
1481 prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1482 self.inner.create_table(&prefixed)
1483 }
1484
1485 fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
1486 let mut prefixed = stmt.clone();
1487 prefixed.names = stmt.names.iter()
1488 .map(|n| ObjectName::new(self.prefix_table(n.name())))
1489 .collect();
1490 self.inner.drop_table(&prefixed)
1491 }
1492
1493 fn create_index(&mut self, stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
1494 self.inner.create_index(stmt)
1495 }
1496
1497 fn drop_index(&mut self, stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
1498 self.inner.drop_index(stmt)
1499 }
1500
1501 fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
1502 let mut prefixed = stmt.clone();
1503 let original_name = stmt.name.name().to_string();
1504 prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1505 self.inner.alter_table(&prefixed)
1506 }
1507
1508 fn begin(&mut self, stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1509 self.inner.begin(stmt)
1510 }
1511
1512 fn commit(&mut self) -> SqlResult<ExecutionResult> {
1513 self.inner.commit()
1514 }
1515
1516 fn rollback(&mut self, savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1517 self.inner.rollback(savepoint)
1518 }
1519
1520 fn table_exists(&self, table: &str) -> SqlResult<bool> {
1521 self.inner.table_exists(&self.prefix_table(table))
1522 }
1523
1524 fn index_exists(&self, index: &str) -> SqlResult<bool> {
1525 self.inner.index_exists(index)
1526 }
1527
1528 fn scan_all(
1529 &self,
1530 table: &str,
1531 columns: &[String],
1532 ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1533 self.inner.scan_all(&self.prefix_table(table), columns)
1534 }
1535
1536 fn eval_join_predicate(
1537 &self,
1538 expr: &Expr,
1539 row: &HashMap<String, CoreSochValue>,
1540 params: &[CoreSochValue],
1541 ) -> Option<bool> {
1542 self.inner.eval_join_predicate(expr, row, params)
1543 }
1544}
1545
1546#[cfg(test)]
1551mod tests {
1552 use super::*;
1553
1554 #[test]
1555 fn test_convert_core_to_query_basic_types() {
1556 assert_eq!(
1557 convert_core_to_query(&CoreSochValue::Null),
1558 QuerySochValue::Null
1559 );
1560 assert_eq!(
1561 convert_core_to_query(&CoreSochValue::Bool(true)),
1562 QuerySochValue::Bool(true)
1563 );
1564 assert_eq!(
1565 convert_core_to_query(&CoreSochValue::Int(42)),
1566 QuerySochValue::Int(42)
1567 );
1568 assert_eq!(
1569 convert_core_to_query(&CoreSochValue::UInt(100)),
1570 QuerySochValue::UInt(100)
1571 );
1572 assert_eq!(
1573 convert_core_to_query(&CoreSochValue::Float(3.14)),
1574 QuerySochValue::Float(3.14)
1575 );
1576 assert_eq!(
1577 convert_core_to_query(&CoreSochValue::Text("hello".into())),
1578 QuerySochValue::Text("hello".into())
1579 );
1580 }
1581
1582 #[test]
1583 fn test_convert_core_to_query_object() {
1584 let mut map = HashMap::new();
1585 map.insert("name".to_string(), CoreSochValue::Text("Alice".into()));
1586 let result = convert_core_to_query(&CoreSochValue::Object(map));
1587 match result {
1588 QuerySochValue::Text(s) => assert!(s.contains("Alice")),
1589 _ => panic!("Expected Text for Object conversion"),
1590 }
1591 }
1592
1593 #[test]
1594 fn test_convert_core_to_query_ref() {
1595 let result = convert_core_to_query(&CoreSochValue::Ref {
1596 table: "users".into(),
1597 id: 42,
1598 });
1599 assert_eq!(result, QuerySochValue::Text("users/42".into()));
1600 }
1601
1602 #[test]
1603 fn test_convert_roundtrip() {
1604 let original = QuerySochValue::Int(42);
1605 let core = convert_query_to_core(&original);
1606 let back = convert_core_to_query(&core);
1607 assert_eq!(original, back);
1608 }
1609
1610 #[test]
1611 fn test_apply_simple_predicate_eq() {
1612 let rows = vec![
1613 {
1614 let mut m = HashMap::new();
1615 m.insert("name".into(), QuerySochValue::Text("Alice".into()));
1616 m.insert("age".into(), QuerySochValue::Int(30));
1617 m
1618 },
1619 {
1620 let mut m = HashMap::new();
1621 m.insert("name".into(), QuerySochValue::Text("Bob".into()));
1622 m.insert("age".into(), QuerySochValue::Int(25));
1623 m
1624 },
1625 ];
1626
1627 let filtered = apply_simple_predicate(&rows, "name = Alice");
1628 assert_eq!(filtered.len(), 1);
1629 assert_eq!(
1630 filtered[0].get("name"),
1631 Some(&QuerySochValue::Text("Alice".into()))
1632 );
1633 }
1634
1635 #[test]
1636 fn test_apply_simple_predicate_neq() {
1637 let rows = vec![
1638 {
1639 let mut m = HashMap::new();
1640 m.insert("status".into(), QuerySochValue::Text("active".into()));
1641 m
1642 },
1643 {
1644 let mut m = HashMap::new();
1645 m.insert("status".into(), QuerySochValue::Text("inactive".into()));
1646 m
1647 },
1648 ];
1649
1650 let filtered = apply_simple_predicate(&rows, "status != active");
1651 assert_eq!(filtered.len(), 1);
1652 assert_eq!(
1653 filtered[0].get("status"),
1654 Some(&QuerySochValue::Text("inactive".into()))
1655 );
1656 }
1657
1658 #[test]
1659 fn test_literal_to_core() {
1660 assert_eq!(
1661 literal_to_core(&Literal::Integer(42)),
1662 CoreSochValue::Int(42)
1663 );
1664 assert_eq!(
1665 literal_to_core(&Literal::Float(3.14)),
1666 CoreSochValue::Float(3.14)
1667 );
1668 assert_eq!(
1669 literal_to_core(&Literal::String("hi".into())),
1670 CoreSochValue::Text("hi".into())
1671 );
1672 assert_eq!(
1673 literal_to_core(&Literal::Boolean(true)),
1674 CoreSochValue::Bool(true)
1675 );
1676 assert_eq!(literal_to_core(&Literal::Null), CoreSochValue::Null);
1677 }
1678
1679 #[test]
1680 fn test_euclidean_distance() {
1681 let a = vec![1.0, 0.0, 0.0];
1682 let b = vec![0.0, 1.0, 0.0];
1683 let dist = euclidean_distance(&a, &b);
1684 assert!((dist - std::f32::consts::SQRT_2).abs() < 1e-6);
1685 }
1686
1687 #[test]
1688 fn test_compare_values() {
1689 assert_eq!(
1690 compare_values(&CoreSochValue::Int(1), &CoreSochValue::Int(2)),
1691 std::cmp::Ordering::Less
1692 );
1693 assert_eq!(
1694 compare_values(
1695 &CoreSochValue::Text("a".into()),
1696 &CoreSochValue::Text("b".into())
1697 ),
1698 std::cmp::Ordering::Less
1699 );
1700 assert_eq!(
1701 compare_values(&CoreSochValue::Null, &CoreSochValue::Int(1)),
1702 std::cmp::Ordering::Less
1703 );
1704 }
1705
1706 #[test]
1707 fn test_eval_binary_op() {
1708 assert_eq!(
1709 eval_binary_op(
1710 &CoreSochValue::Int(10),
1711 &BinaryOperator::Plus,
1712 &CoreSochValue::Int(5)
1713 ),
1714 CoreSochValue::Int(15)
1715 );
1716 assert_eq!(
1717 eval_binary_op(
1718 &CoreSochValue::Int(10),
1719 &BinaryOperator::Eq,
1720 &CoreSochValue::Int(10)
1721 ),
1722 CoreSochValue::Bool(true)
1723 );
1724 assert_eq!(
1725 eval_binary_op(
1726 &CoreSochValue::Text("hello".into()),
1727 &BinaryOperator::Concat,
1728 &CoreSochValue::Text(" world".into())
1729 ),
1730 CoreSochValue::Text("hello world".into())
1731 );
1732 }
1733
1734 #[test]
1735 fn test_sql_like_match() {
1736 assert!(sql_like_match("hello", "hello"));
1737 assert!(sql_like_match("hello", "%llo"));
1738 assert!(sql_like_match("hello", "h%o"));
1739 assert!(sql_like_match("hello", "h_llo"));
1740 assert!(!sql_like_match("hello", "world"));
1741 assert!(sql_like_match("file.txt", "file%"));
1743 assert!(sql_like_match("test(1)", "%(%"));
1744 }
1745
1746 #[test]
1747 fn test_sql_type_to_db_type() {
1748 use sochdb_storage::DbColumnType;
1749 assert_eq!(sql_type_to_db_type(&DataType::Int), DbColumnType::Int64);
1750 assert_eq!(sql_type_to_db_type(&DataType::BigInt), DbColumnType::Int64);
1751 assert_eq!(
1752 sql_type_to_db_type(&DataType::Float),
1753 DbColumnType::Float64
1754 );
1755 assert_eq!(
1756 sql_type_to_db_type(&DataType::Boolean),
1757 DbColumnType::Bool
1758 );
1759 assert_eq!(sql_type_to_db_type(&DataType::Blob), DbColumnType::Binary);
1760 assert_eq!(sql_type_to_db_type(&DataType::Text), DbColumnType::Text);
1761 assert_eq!(
1762 sql_type_to_db_type(&DataType::Varchar(Some(255))),
1763 DbColumnType::Text
1764 );
1765 }
1766
1767 fn setup_test_db() -> (std::sync::Arc<sochdb_storage::Database>, tempfile::TempDir) {
1772 let tmp = tempfile::tempdir().expect("tmpdir");
1773 let db = sochdb_storage::Database::open(tmp.path()).expect("open db");
1774 (db, tmp)
1775 }
1776
1777 #[test]
1778 fn test_integration_storage_backend_table_scan() {
1779 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1780 let (db, _tmp) = setup_test_db();
1781
1782 db.register_table(DbTableSchema {
1784 name: "users".into(),
1785 columns: vec![
1786 DbColumnDef { name: "id".into(), col_type: DbColumnType::Int64, nullable: false },
1787 DbColumnDef { name: "name".into(), col_type: DbColumnType::Text, nullable: false },
1788 DbColumnDef { name: "age".into(), col_type: DbColumnType::Int64, nullable: true },
1789 ],
1790 })
1791 .expect("register table");
1792
1793 let txn = db.begin_transaction().expect("begin txn");
1795 let mut vals = std::collections::HashMap::new();
1796 vals.insert("id".into(), CoreSochValue::Int(1));
1797 vals.insert("name".into(), CoreSochValue::Text("Alice".into()));
1798 vals.insert("age".into(), CoreSochValue::Int(30));
1799 db.insert_row(txn, "users", 1, &vals).expect("insert 1");
1800
1801 vals.clear();
1802 vals.insert("id".into(), CoreSochValue::Int(2));
1803 vals.insert("name".into(), CoreSochValue::Text("Bob".into()));
1804 vals.insert("age".into(), CoreSochValue::Int(25));
1805 db.insert_row(txn, "users", 2, &vals).expect("insert 2");
1806 db.commit(txn).expect("commit");
1807
1808 let backend = DatabaseStorageBackend::new(db.clone());
1810 let rows = backend
1811 .table_scan("users", &["id".into(), "name".into(), "age".into()], None)
1812 .expect("table_scan");
1813
1814 assert_eq!(rows.len(), 2);
1815 let names: Vec<_> = rows
1817 .iter()
1818 .filter_map(|r| match r.get("name") {
1819 Some(crate::soch_ql::SochValue::Text(t)) => Some(t.clone()),
1820 _ => None,
1821 })
1822 .collect();
1823 assert!(names.contains(&"Alice".to_string()));
1824 assert!(names.contains(&"Bob".to_string()));
1825 }
1826
1827 #[test]
1828 fn test_integration_storage_backend_primary_key_lookup() {
1829 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1830 let (db, _tmp) = setup_test_db();
1831
1832 db.register_table(DbTableSchema {
1833 name: "items".into(),
1834 columns: vec![
1835 DbColumnDef { name: "id".into(), col_type: DbColumnType::Int64, nullable: false },
1836 DbColumnDef { name: "label".into(), col_type: DbColumnType::Text, nullable: false },
1837 ],
1838 })
1839 .expect("register");
1840
1841 let txn = db.begin_transaction().expect("txn");
1842 let mut v = std::collections::HashMap::new();
1843 v.insert("id".into(), CoreSochValue::Int(42));
1844 v.insert("label".into(), CoreSochValue::Text("answer".into()));
1845 db.insert_row(txn, "items", 42, &v).expect("insert");
1846 db.commit(txn).expect("commit");
1847
1848 let backend = DatabaseStorageBackend::new(db.clone());
1849 let row = backend
1850 .primary_key_lookup("items", &crate::soch_ql::SochValue::Int(42))
1851 .expect("pk lookup");
1852 assert!(row.is_some());
1853 let row = row.unwrap();
1854 assert_eq!(
1855 row.get("label"),
1856 Some(&crate::soch_ql::SochValue::Text("answer".into()))
1857 );
1858 }
1859
1860 #[test]
1861 fn test_integration_sochql_executor_reads_storage() {
1862 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1863 use sochdb_core::{Catalog, SochSchema, SochType};
1864 let (db, _tmp) = setup_test_db();
1865
1866 db.register_table(DbTableSchema {
1868 name: "events".into(),
1869 columns: vec![
1870 DbColumnDef { name: "id".into(), col_type: DbColumnType::Int64, nullable: false },
1871 DbColumnDef { name: "kind".into(), col_type: DbColumnType::Text, nullable: false },
1872 DbColumnDef { name: "score".into(), col_type: DbColumnType::Float64, nullable: true },
1873 ],
1874 })
1875 .expect("register events");
1876
1877 let txn = db.begin_transaction().expect("txn");
1879 for i in 1..=5u64 {
1880 let mut vals = std::collections::HashMap::new();
1881 vals.insert("id".into(), CoreSochValue::Int(i as i64));
1882 vals.insert("kind".into(), CoreSochValue::Text(format!("event_{}", i)));
1883 vals.insert("score".into(), CoreSochValue::Float(i as f64 * 1.5));
1884 db.insert_row(txn, "events", i, &vals).expect("insert");
1885 }
1886 db.commit(txn).expect("commit");
1887
1888 let mut catalog = Catalog::new("test");
1890 let schema = SochSchema {
1891 name: "events".into(),
1892 fields: vec![
1893 sochdb_core::SochField { name: "id".into(), field_type: SochType::Int, nullable: false, default: None },
1894 sochdb_core::SochField { name: "kind".into(), field_type: SochType::Text, nullable: false, default: None },
1895 sochdb_core::SochField { name: "score".into(), field_type: SochType::Float, nullable: true, default: None },
1896 ],
1897 primary_key: None,
1898 indexes: vec![],
1899 };
1900 catalog.create_table(schema, 0).expect("register catalog");
1901
1902 let backend = std::sync::Arc::new(DatabaseStorageBackend::new(db.clone()));
1904 let executor = crate::soch_ql_executor::SochQlExecutor::with_storage(backend);
1905 let result = executor
1906 .execute("SELECT * FROM events", &catalog)
1907 .expect("select *");
1908
1909 assert_eq!(result.rows.len(), 5, "Expected 5 rows from storage, got {}", result.rows.len());
1911 assert_eq!(result.columns, vec!["id", "kind", "score"]);
1912
1913 let first_row_kind = &result.rows[0];
1915 assert_eq!(first_row_kind.len(), 3);
1917 }
1918
1919 #[test]
1920 fn test_integration_sql_connection_crud() {
1921 use crate::sql::bridge::SqlConnection;
1922
1923 let (db, _tmp) = setup_test_db();
1924 let mut conn = DatabaseSqlConnection::new(db.clone());
1925
1926 let create = CreateTableStmt {
1928 span: crate::sql::token::Span::new(0, 0, 0, 0),
1929 name: crate::sql::ast::ObjectName::new("products"),
1930 columns: vec![
1931 crate::sql::ast::ColumnDef {
1932 name: "id".into(),
1933 data_type: DataType::Int,
1934 constraints: vec![],
1935 },
1936 crate::sql::ast::ColumnDef {
1937 name: "name".into(),
1938 data_type: DataType::Text,
1939 constraints: vec![],
1940 },
1941 crate::sql::ast::ColumnDef {
1942 name: "price".into(),
1943 data_type: DataType::Float,
1944 constraints: vec![],
1945 },
1946 ],
1947 if_not_exists: false,
1948 constraints: vec![],
1949 options: vec![],
1950 };
1951 let result = conn.create_table(&create).expect("create table");
1952 assert!(matches!(result, crate::sql::bridge::ExecutionResult::Ok));
1953
1954 let begin_stmt = crate::sql::ast::BeginStmt {
1956 isolation_level: None,
1957 read_only: false,
1958 };
1959 let result = conn.begin(&begin_stmt).expect("begin");
1960 assert!(matches!(result, crate::sql::bridge::ExecutionResult::TransactionOk));
1961
1962 let values = vec![vec![
1964 Expr::Literal(Literal::Integer(1)),
1965 Expr::Literal(Literal::String("Widget".into())),
1966 Expr::Literal(Literal::Float(9.99)),
1967 ]];
1968 let cols = vec!["id".into(), "name".into(), "price".into()];
1969 let result = conn
1970 .insert("products", Some(&cols), &values, None, &[])
1971 .expect("insert");
1972 match &result {
1973 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
1974 _ => panic!("Expected RowsAffected, got {:?}", result),
1975 }
1976
1977 let result = conn.commit().expect("commit");
1979 assert!(matches!(result, crate::sql::bridge::ExecutionResult::TransactionOk));
1980
1981 let columns = vec!["name".into()];
1983 let result = conn
1984 .select("products", &columns, None, &[], None, None, &[])
1985 .expect("select");
1986 match result {
1987 crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
1988 assert!(!rows.is_empty(), "Expected rows from SELECT");
1989 assert_eq!(columns, vec!["name"]);
1990 let name = rows[0].get("name").expect("name column");
1992 match name {
1993 CoreSochValue::Text(s) => assert_eq!(s, "Widget"),
1994 other => panic!("Expected Text, got {:?}", other),
1995 }
1996 }
1997 other => panic!("Expected Rows, got {:?}", other),
1998 }
1999 }
2000
2001 #[test]
2002 fn test_integration_row_count() {
2003 use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
2004 let (db, _tmp) = setup_test_db();
2005
2006 db.register_table(DbTableSchema {
2007 name: "metrics".into(),
2008 columns: vec![
2009 DbColumnDef { name: "ts".into(), col_type: DbColumnType::UInt64, nullable: false },
2010 DbColumnDef { name: "val".into(), col_type: DbColumnType::Float64, nullable: false },
2011 ],
2012 })
2013 .expect("register");
2014
2015 let txn = db.begin_transaction().expect("txn");
2016 for i in 0..10u64 {
2017 let mut v = std::collections::HashMap::new();
2018 v.insert("ts".into(), CoreSochValue::UInt(i));
2019 v.insert("val".into(), CoreSochValue::Float(i as f64));
2020 db.insert_row(txn, "metrics", i, &v).expect("insert");
2021 }
2022 db.commit(txn).expect("commit");
2023
2024 let backend = DatabaseStorageBackend::new(db.clone());
2025 assert_eq!(backend.row_count("metrics"), 10);
2026 assert_eq!(backend.row_count("nonexistent"), 0);
2027 }
2028
2029 #[test]
2034 fn test_sqlbridge_create_insert_select() {
2035 let (db, _tmp) = setup_test_db();
2036 let conn = DatabaseSqlConnection::new(db.clone());
2037 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2038
2039 let r = bridge.execute("CREATE TABLE cities (id INT, name TEXT, pop FLOAT)").unwrap();
2041 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2042
2043 let r = bridge.execute("INSERT INTO cities (id, name, pop) VALUES (1, 'Tokyo', 13.96)").unwrap();
2045 match &r {
2046 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2047 other => panic!("Expected RowsAffected, got {:?}", other),
2048 }
2049
2050 let r = bridge.execute("INSERT INTO cities (id, name, pop) VALUES (2, 'Delhi', 11.03)").unwrap();
2051 match &r {
2052 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2053 other => panic!("Expected RowsAffected, got {:?}", other),
2054 }
2055
2056 let r = bridge.execute("SELECT name, pop FROM cities").unwrap();
2058 match &r {
2059 crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
2060 assert_eq!(rows.len(), 2);
2061 assert!(columns.contains(&"name".to_string()));
2062 }
2063 other => panic!("Expected Rows, got {:?}", other),
2064 }
2065 }
2066
2067 #[test]
2068 fn test_sqlbridge_update_delete() {
2069 let (db, _tmp) = setup_test_db();
2070 let conn = DatabaseSqlConnection::new(db.clone());
2071 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2072
2073 bridge.execute("CREATE TABLE items (id INT, qty INT)").unwrap();
2074 bridge.execute("INSERT INTO items (id, qty) VALUES (1, 10)").unwrap();
2075 bridge.execute("INSERT INTO items (id, qty) VALUES (2, 20)").unwrap();
2076 bridge.execute("INSERT INTO items (id, qty) VALUES (3, 30)").unwrap();
2077
2078 let r = bridge.execute("UPDATE items SET qty = 99 WHERE id = 2").unwrap();
2080 match &r {
2081 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2082 other => panic!("Expected RowsAffected for UPDATE, got {:?}", other),
2083 }
2084
2085 let r = bridge.execute("DELETE FROM items WHERE id = 3").unwrap();
2087 match &r {
2088 crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2089 other => panic!("Expected RowsAffected for DELETE, got {:?}", other),
2090 }
2091
2092 let r = bridge.execute("SELECT id, qty FROM items").unwrap();
2094 match &r {
2095 crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2096 assert_eq!(rows.len(), 2, "Expected 2 rows after delete");
2097 }
2098 other => panic!("Expected Rows, got {:?}", other),
2099 }
2100 }
2101
2102 #[test]
2103 fn test_sqlbridge_transaction_commit() {
2104 let (db, _tmp) = setup_test_db();
2105 let conn = DatabaseSqlConnection::new(db.clone());
2106 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2107
2108 bridge.execute("CREATE TABLE txtest (id INT, val TEXT)").unwrap();
2109
2110 let r = bridge.execute("BEGIN").unwrap();
2112 assert!(matches!(r, crate::sql::bridge::ExecutionResult::TransactionOk));
2113
2114 bridge.execute("INSERT INTO txtest (id, val) VALUES (1, 'committed')").unwrap();
2115
2116 let r = bridge.execute("COMMIT").unwrap();
2117 assert!(matches!(r, crate::sql::bridge::ExecutionResult::TransactionOk));
2118
2119 let r = bridge.execute("SELECT val FROM txtest").unwrap();
2121 match &r {
2122 crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2123 assert_eq!(rows.len(), 1);
2124 }
2125 other => panic!("Expected Rows, got {:?}", other),
2126 }
2127 }
2128
2129 #[test]
2130 fn test_sqlbridge_drop_table() {
2131 let (db, _tmp) = setup_test_db();
2132 let conn = DatabaseSqlConnection::new(db.clone());
2133 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2134
2135 bridge.execute("CREATE TABLE ephemeral (x INT)").unwrap();
2136 let r = bridge.execute("DROP TABLE ephemeral").unwrap();
2137 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2138
2139 let r = bridge.execute("DROP TABLE IF EXISTS ephemeral").unwrap();
2141 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2142 }
2143
2144 #[test]
2145 fn test_sqlbridge_if_not_exists() {
2146 let (db, _tmp) = setup_test_db();
2147 let conn = DatabaseSqlConnection::new(db.clone());
2148 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2149
2150 bridge.execute("CREATE TABLE dup (id INT)").unwrap();
2151 let r = bridge.execute("CREATE TABLE IF NOT EXISTS dup (id INT)").unwrap();
2153 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2154 }
2155
2156 #[test]
2157 fn test_sqlbridge_alter_add_column() {
2158 let (db, _tmp) = setup_test_db();
2159 let conn = DatabaseSqlConnection::new(db.clone());
2160 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2161
2162 bridge
2163 .execute("CREATE TABLE alter_test (id INT, name TEXT)")
2164 .unwrap();
2165
2166 let r = bridge
2167 .execute("ALTER TABLE alter_test ADD COLUMN age INT")
2168 .unwrap();
2169 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2170
2171 let schema = db.get_table_schema("alter_test").unwrap();
2173 assert_eq!(schema.columns.len(), 3);
2174 assert_eq!(schema.columns[2].name, "age");
2175 }
2176
2177 #[test]
2178 fn test_sqlbridge_alter_drop_column() {
2179 let (db, _tmp) = setup_test_db();
2180 let conn = DatabaseSqlConnection::new(db.clone());
2181 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2182
2183 bridge
2184 .execute("CREATE TABLE drop_col_test (id INT, name TEXT, age INT)")
2185 .unwrap();
2186
2187 let r = bridge
2188 .execute("ALTER TABLE drop_col_test DROP COLUMN age")
2189 .unwrap();
2190 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2191
2192 let schema = db.get_table_schema("drop_col_test").unwrap();
2193 assert_eq!(schema.columns.len(), 2);
2194 assert!(schema.columns.iter().all(|c| c.name != "age"));
2195 }
2196
2197 #[test]
2198 fn test_sqlbridge_alter_rename_column() {
2199 let (db, _tmp) = setup_test_db();
2200 let conn = DatabaseSqlConnection::new(db.clone());
2201 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2202
2203 bridge
2204 .execute("CREATE TABLE rename_col_test (id INT, name TEXT)")
2205 .unwrap();
2206
2207 let r = bridge
2208 .execute("ALTER TABLE rename_col_test RENAME COLUMN name TO full_name")
2209 .unwrap();
2210 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2211
2212 let schema = db.get_table_schema("rename_col_test").unwrap();
2213 assert_eq!(schema.columns[1].name, "full_name");
2214 }
2215
2216 #[test]
2217 fn test_sqlbridge_alter_rename_table() {
2218 let (db, _tmp) = setup_test_db();
2219 let conn = DatabaseSqlConnection::new(db.clone());
2220 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2221
2222 bridge
2223 .execute("CREATE TABLE old_name (id INT)")
2224 .unwrap();
2225
2226 let r = bridge
2227 .execute("ALTER TABLE old_name RENAME TO new_name")
2228 .unwrap();
2229 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2230
2231 assert!(db.get_table_schema("old_name").is_none());
2232 assert!(db.get_table_schema("new_name").is_some());
2233 }
2234
2235 #[test]
2236 fn test_sqlbridge_alter_multiple_ops() {
2237 let (db, _tmp) = setup_test_db();
2238 let conn = DatabaseSqlConnection::new(db.clone());
2239 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2240
2241 bridge
2242 .execute("CREATE TABLE multi_alter (id INT, a TEXT, b TEXT)")
2243 .unwrap();
2244
2245 let r = bridge
2247 .execute("ALTER TABLE multi_alter ADD COLUMN c INT, DROP COLUMN b")
2248 .unwrap();
2249 assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2250
2251 let schema = db.get_table_schema("multi_alter").unwrap();
2252 let col_names: Vec<&str> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2253 assert_eq!(col_names, vec!["id", "a", "c"]);
2254 }
2255
2256 #[test]
2257 fn test_sqlbridge_alter_errors() {
2258 let (db, _tmp) = setup_test_db();
2259 let conn = DatabaseSqlConnection::new(db.clone());
2260 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2261
2262 bridge
2263 .execute("CREATE TABLE err_test (id INT, name TEXT)")
2264 .unwrap();
2265
2266 let r = bridge.execute("ALTER TABLE err_test ADD COLUMN name TEXT");
2268 assert!(r.is_err());
2269
2270 let r = bridge.execute("ALTER TABLE err_test DROP COLUMN nonexistent");
2272 assert!(r.is_err());
2273
2274 let r = bridge.execute("ALTER TABLE no_such_table ADD COLUMN x INT");
2276 assert!(r.is_err());
2277 }
2278
2279 fn setup_join_tables(bridge: &mut crate::sql::bridge::SqlBridge<DatabaseSqlConnection>) {
2285 bridge.execute("CREATE TABLE users (id INT, name TEXT, dept TEXT)").unwrap();
2286 bridge.execute("INSERT INTO users (id, name, dept) VALUES (1, 'Alice', 'eng')").unwrap();
2287 bridge.execute("INSERT INTO users (id, name, dept) VALUES (2, 'Bob', 'sales')").unwrap();
2288 bridge.execute("INSERT INTO users (id, name, dept) VALUES (3, 'Carol', 'eng')").unwrap();
2289
2290 bridge.execute("CREATE TABLE orders (oid INT, user_id INT, amount FLOAT)").unwrap();
2291 bridge.execute("INSERT INTO orders (oid, user_id, amount) VALUES (10, 1, 99.50)").unwrap();
2292 bridge.execute("INSERT INTO orders (oid, user_id, amount) VALUES (11, 1, 45.00)").unwrap();
2293 bridge.execute("INSERT INTO orders (oid, user_id, amount) VALUES (12, 2, 200.00)").unwrap();
2294 }
2296
2297 #[test]
2298 fn test_sqlbridge_inner_join() {
2299 let (db, _tmp) = setup_test_db();
2300 let conn = DatabaseSqlConnection::new(db.clone());
2301 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2302 setup_join_tables(&mut bridge);
2303
2304 let r = bridge.execute(
2305 "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id"
2306 ).unwrap();
2307
2308 let rows = r.rows().unwrap();
2309 assert_eq!(rows.len(), 3); let alice_rows: Vec<_> = rows.iter()
2313 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Alice".into())))
2314 .collect();
2315 assert_eq!(alice_rows.len(), 2);
2316
2317 let bob_rows: Vec<_> = rows.iter()
2319 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Bob".into())))
2320 .collect();
2321 assert_eq!(bob_rows.len(), 1);
2322
2323 let carol_rows: Vec<_> = rows.iter()
2325 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2326 .collect();
2327 assert_eq!(carol_rows.len(), 0);
2328 }
2329
2330 #[test]
2331 fn test_sqlbridge_left_join() {
2332 let (db, _tmp) = setup_test_db();
2333 let conn = DatabaseSqlConnection::new(db.clone());
2334 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2335 setup_join_tables(&mut bridge);
2336
2337 let r = bridge.execute(
2338 "SELECT users.name, orders.amount FROM users LEFT JOIN orders ON users.id = orders.user_id"
2339 ).unwrap();
2340
2341 let rows = r.rows().unwrap();
2342 assert_eq!(rows.len(), 4); let carol_rows: Vec<_> = rows.iter()
2346 .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2347 .collect();
2348 assert_eq!(carol_rows.len(), 1);
2349 assert_eq!(carol_rows[0].get("amount"), Some(&CoreSochValue::Null));
2350 }
2351
2352 #[test]
2353 fn test_sqlbridge_right_join() {
2354 let (db, _tmp) = setup_test_db();
2355 let conn = DatabaseSqlConnection::new(db.clone());
2356 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2357 setup_join_tables(&mut bridge);
2358
2359 let r = bridge.execute(
2361 "SELECT users.name, orders.oid FROM users RIGHT JOIN orders ON users.id = orders.user_id"
2362 ).unwrap();
2363
2364 let rows = r.rows().unwrap();
2365 assert_eq!(rows.len(), 3); }
2367
2368 #[test]
2369 fn test_sqlbridge_cross_join() {
2370 let (db, _tmp) = setup_test_db();
2371 let conn = DatabaseSqlConnection::new(db.clone());
2372 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2373 setup_join_tables(&mut bridge);
2374
2375 let r = bridge.execute(
2376 "SELECT users.name, orders.oid FROM users CROSS JOIN orders"
2377 ).unwrap();
2378
2379 let rows = r.rows().unwrap();
2380 assert_eq!(rows.len(), 9); }
2382
2383 #[test]
2384 fn test_sqlbridge_join_with_where() {
2385 let (db, _tmp) = setup_test_db();
2386 let conn = DatabaseSqlConnection::new(db.clone());
2387 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2388 setup_join_tables(&mut bridge);
2389
2390 let r = bridge.execute(
2391 "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id WHERE orders.amount > 50"
2392 ).unwrap();
2393
2394 let rows = r.rows().unwrap();
2395 assert_eq!(rows.len(), 2); }
2397
2398 #[test]
2399 fn test_sqlbridge_join_with_alias() {
2400 let (db, _tmp) = setup_test_db();
2401 let conn = DatabaseSqlConnection::new(db.clone());
2402 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2403 setup_join_tables(&mut bridge);
2404
2405 let r = bridge.execute(
2406 "SELECT u.name, o.amount FROM users u INNER JOIN orders o ON u.id = o.user_id"
2407 ).unwrap();
2408
2409 let rows = r.rows().unwrap();
2410 assert_eq!(rows.len(), 3);
2411 }
2412
2413 #[test]
2414 fn test_sqlbridge_join_with_limit() {
2415 let (db, _tmp) = setup_test_db();
2416 let conn = DatabaseSqlConnection::new(db.clone());
2417 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2418 setup_join_tables(&mut bridge);
2419
2420 let r = bridge.execute(
2421 "SELECT users.name, orders.oid FROM users INNER JOIN orders ON users.id = orders.user_id LIMIT 2"
2422 ).unwrap();
2423
2424 let rows = r.rows().unwrap();
2425 assert_eq!(rows.len(), 2);
2426 }
2427
2428 #[test]
2429 fn test_sqlbridge_join_three_tables() {
2430 let (db, _tmp) = setup_test_db();
2431 let conn = DatabaseSqlConnection::new(db.clone());
2432 let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2433 setup_join_tables(&mut bridge);
2434
2435 bridge.execute("CREATE TABLE departments (code TEXT, dname TEXT)").unwrap();
2437 bridge.execute("INSERT INTO departments (code, dname) VALUES ('eng', 'Engineering')").unwrap();
2438 bridge.execute("INSERT INTO departments (code, dname) VALUES ('sales', 'Sales')").unwrap();
2439
2440 let r = bridge.execute(
2441 "SELECT users.name, departments.dname FROM users INNER JOIN departments ON users.dept = departments.code"
2442 ).unwrap();
2443
2444 let rows = r.rows().unwrap();
2445 assert_eq!(rows.len(), 3); }
2447
2448 #[test]
2449 fn test_namespaced_connection_prefixes_tables() {
2450 let ns_conn = NamespacedSqlConnection::new(
2451 MockConn::default(),
2452 "prod",
2453 "app",
2454 );
2455 assert_eq!(ns_conn.prefix_table("users"), "prod:app:users");
2456 assert_eq!(ns_conn.prefix_table("posts"), "prod:app:posts");
2457 assert_eq!(ns_conn.namespace(), "prod");
2458 assert_eq!(ns_conn.database(), "app");
2459 }
2460
2461 #[test]
2462 fn test_namespaced_connection_isolates_data() {
2463 let (db, _tmp) = setup_test_db();
2464
2465 let conn_a = NamespacedSqlConnection::new(
2467 DatabaseSqlConnection::new(db.clone()),
2468 "tenant_a",
2469 "db1",
2470 );
2471 let conn_b = NamespacedSqlConnection::new(
2472 DatabaseSqlConnection::new(db.clone()),
2473 "tenant_b",
2474 "db1",
2475 );
2476
2477 let mut bridge_a = crate::sql::bridge::SqlBridge::new(conn_a);
2478 let mut bridge_b = crate::sql::bridge::SqlBridge::new(conn_b);
2479
2480 bridge_a.execute("CREATE TABLE users (name TEXT, age INTEGER)").unwrap();
2482 bridge_a.execute("INSERT INTO users (name, age) VALUES ('Alice', 30)").unwrap();
2483
2484 bridge_b.execute("CREATE TABLE users (name TEXT, age INTEGER)").unwrap();
2486 bridge_b.execute("INSERT INTO users (name, age) VALUES ('Bob', 25)").unwrap();
2487
2488 let result_a = bridge_a.execute("SELECT * FROM users").unwrap();
2490 let rows_a = result_a.rows().unwrap();
2491 assert_eq!(rows_a.len(), 1);
2492
2493 let result_b = bridge_b.execute("SELECT * FROM users").unwrap();
2495 let rows_b = result_b.rows().unwrap();
2496 assert_eq!(rows_b.len(), 1);
2497 }
2498
2499 #[derive(Default)]
2501 struct MockConn;
2502 impl crate::sql::bridge::SqlConnection for MockConn {
2503 fn select(&self, _: &str, _: &[String], _: Option<&Expr>, _: &[OrderByItem], _: Option<usize>, _: Option<usize>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2504 Ok(ExecutionResult::Rows { columns: vec![], rows: vec![] })
2505 }
2506 fn insert(&mut self, _: &str, _: Option<&[String]>, _: &[Vec<Expr>], _: Option<&OnConflict>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2507 Ok(ExecutionResult::RowsAffected(0))
2508 }
2509 fn update(&mut self, _: &str, _: &[Assignment], _: Option<&Expr>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2510 Ok(ExecutionResult::RowsAffected(0))
2511 }
2512 fn delete(&mut self, _: &str, _: Option<&Expr>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2513 Ok(ExecutionResult::RowsAffected(0))
2514 }
2515 fn create_table(&mut self, _: &CreateTableStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2516 fn drop_table(&mut self, _: &DropTableStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2517 fn create_index(&mut self, _: &CreateIndexStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2518 fn drop_index(&mut self, _: &DropIndexStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2519 fn alter_table(&mut self, _: &AlterTableStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2520 fn begin(&mut self, _: &BeginStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::TransactionOk) }
2521 fn commit(&mut self) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::TransactionOk) }
2522 fn rollback(&mut self, _: Option<&str>) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::TransactionOk) }
2523 fn table_exists(&self, _: &str) -> SqlResult<bool> { Ok(false) }
2524 fn index_exists(&self, _: &str) -> SqlResult<bool> { Ok(false) }
2525 fn scan_all(&self, _: &str, _: &[String]) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> { Ok(vec![]) }
2526 fn eval_join_predicate(&self, _: &Expr, _: &HashMap<String, CoreSochValue>, _: &[CoreSochValue]) -> Option<bool> { Some(true) }
2527 }
2528}