1use super::{
18 planner::{ExecutionStep, IndexSelection, ParallelizationInfo, QueryPlan, StepType},
19 ComparisonOperator, Condition,
20};
21use crate::{
22 schema::SchemaManager, storage::StorageEngine, Config, Error, Result, RowKey, TableId, Value,
23};
24use crossbeam::channel;
25use std::cmp::Ordering;
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30pub use super::result::{QueryResult, QueryRow};
32
33const DEFAULT_PARALLEL_WORKERS: usize = 4;
35
36static DEFAULT_PARALLELIZATION: ParallelizationInfo = ParallelizationInfo {
39 can_parallelize: true,
40 suggested_threads: DEFAULT_PARALLEL_WORKERS,
41 partition_key: None,
42};
43
44#[derive(Debug, Clone)]
46pub struct QueryExecutor {
47 storage: Arc<StorageEngine>,
49 _schema: Arc<SchemaManager>,
51 _config: Config,
53}
54
55impl QueryExecutor {
56 pub fn new(storage: Arc<StorageEngine>, schema: Arc<SchemaManager>, config: &Config) -> Self {
58 Self {
59 storage,
60 _schema: schema,
61 _config: config.clone(),
62 }
63 }
64
65 pub async fn execute(&self, plan: &QueryPlan) -> Result<QueryResult> {
67 let start_time = Instant::now();
68
69 let has_insert_step = plan
71 .steps
72 .iter()
73 .any(|step| matches!(step.step_type, StepType::Insert));
74 let is_create_table =
75 plan.steps.is_empty() && plan.table.is_some() && plan.estimated_rows == 0;
76
77 #[cfg(debug_assertions)]
78 eprintln!(
79 "DEBUG: Plan steps: {:?}, has_insert_step: {}, is_create_table: {}",
80 plan.steps.iter().map(|s| &s.step_type).collect::<Vec<_>>(),
81 has_insert_step,
82 is_create_table
83 );
84
85 let result = match plan.plan_type {
86 super::planner::PlanType::PointLookup => self.execute_point_lookup(plan).await,
87 super::planner::PlanType::IndexScan => self.execute_index_scan(plan).await,
88 super::planner::PlanType::RangeScan => self.execute_range_scan(plan).await,
89 super::planner::PlanType::TableScan if has_insert_step => {
90 #[cfg(feature = "experimental")]
91 {
92 self.execute_insert_operation(plan).await
93 }
94 #[cfg(not(feature = "experimental"))]
95 {
96 Err(Error::UnsupportedFormat(
97 "INSERT operations require the 'experimental' feature. \
98 Add 'experimental' to your Cargo.toml features."
99 .to_string(),
100 ))
101 }
102 }
103 super::planner::PlanType::TableScan if is_create_table => {
104 self.execute_create_table_operation(plan).await
105 }
106 super::planner::PlanType::TableScan => self.execute_table_scan(plan).await,
107 super::planner::PlanType::Join => self.execute_join(plan).await,
108 super::planner::PlanType::Aggregation => self.execute_aggregation(plan).await,
109 super::planner::PlanType::Subquery => self.execute_subquery(plan).await,
110 };
111
112 let mut query_result = result?;
113 let elapsed_ms = start_time.elapsed().as_millis() as u64;
114
115 #[cfg(debug_assertions)]
116 eprintln!(
117 "DEBUG: Final result before metadata update - rows_affected: {}",
118 query_result.rows_affected
119 );
120
121 query_result.execution_time_ms = elapsed_ms;
122 query_result.metadata.plan_info = Some(super::result::PlanInfo {
123 plan_type: format!("{:?}", plan.plan_type),
124 estimated_cost: plan.estimated_cost,
125 actual_cost: elapsed_ms as f64,
126 indexes_used: Vec::new(), steps: plan
128 .steps
129 .iter()
130 .map(|s| format!("{:?}", s.step_type))
131 .collect(),
132 parallelization: plan
133 .steps
134 .iter()
135 .find(|s| s.parallelization.can_parallelize)
136 .map(|s| super::result::ParallelizationInfo {
137 threads_used: s.parallelization.suggested_threads,
138 effective: true,
139 partitions: Vec::new(),
140 }),
141 });
142 Ok(query_result)
143 }
144
145 fn require_table<'a>(&self, plan: &'a QueryPlan) -> Result<&'a TableId> {
149 plan.table
150 .as_ref()
151 .ok_or_else(|| Error::query_execution("Missing table in plan"))
152 }
153
154 fn find_condition<'a>(steps: &'a [ExecutionStep], column: &str) -> Option<&'a Condition> {
156 steps
157 .iter()
158 .flat_map(|s| s.conditions.iter())
159 .find(|c| c.column == column)
160 }
161
162 fn scan_pairs_to_rows(&self, pairs: Vec<(RowKey, Value)>) -> Result<Vec<QueryRow>> {
164 let mut rows = Vec::with_capacity(pairs.len());
165 for (row_key, row_data) in pairs {
166 rows.push(self.storage_data_to_query_row(row_data, &row_key)?);
167 }
168 Ok(rows)
169 }
170
171 async fn full_scan_rows(&self, table: &TableId) -> Result<Vec<QueryRow>> {
173 let scan_results = self.storage.scan(table, None, None, None, None).await?;
174 self.scan_pairs_to_rows(scan_results)
175 }
176
177 async fn point_lookup_rows(
179 &self,
180 table: &TableId,
181 condition: &Condition,
182 ) -> Result<Vec<QueryRow>> {
183 let row_key = self.condition_to_row_key(condition)?;
184 match self.storage.get(table, &row_key).await? {
185 Some(row_data) => Ok(vec![self.storage_data_to_query_row(row_data, &row_key)?]),
186 None => Ok(Vec::new()),
187 }
188 }
189
190 fn make_result(rows: Vec<QueryRow>) -> QueryResult {
192 QueryResult::with_rows(rows)
193 }
194
195 async fn execute_point_lookup(&self, plan: &QueryPlan) -> Result<QueryResult> {
199 let table = self.require_table(plan)?;
200
201 let lookup_condition = plan
203 .steps
204 .iter()
205 .find_map(|step| step.conditions.first())
206 .ok_or_else(|| Error::query_execution("No lookup condition found"))?;
207
208 let row_key = self.condition_to_row_key(lookup_condition)?;
209
210 #[cfg(debug_assertions)]
211 eprintln!(
212 "DEBUG: SELECT point lookup using row key: {:?}",
213 std::str::from_utf8(row_key.as_bytes()).unwrap_or("<invalid-utf8>")
214 );
215
216 let mut rows = Vec::new();
217 if let Some(row_data) = self.storage.get(table, &row_key).await? {
218 rows.push(self.storage_data_to_query_row(row_data, &row_key)?);
219 }
220
221 Ok(Self::make_result(rows))
222 }
223
224 async fn execute_index_scan(&self, plan: &QueryPlan) -> Result<QueryResult> {
226 let table = self.require_table(plan)?;
227
228 let index_selection = plan
229 .selected_indexes
230 .first()
231 .ok_or_else(|| Error::query_execution("No index selected"))?;
232
233 let mut rows = match index_selection.index_type {
234 super::planner::IndexType::Secondary => {
235 self.execute_secondary_index_scan(table, index_selection, &plan.steps)
236 .await?
237 }
238 super::planner::IndexType::BloomFilter => {
239 self.execute_bloom_filter_scan(table, index_selection, &plan.steps)
240 .await?
241 }
242 super::planner::IndexType::Primary => {
243 self.execute_primary_index_scan(table, index_selection, &plan.steps)
244 .await?
245 }
246 super::planner::IndexType::Composite => {
247 self.execute_composite_index_scan(table, index_selection, &plan.steps)
248 .await?
249 }
250 };
251
252 rows = self.apply_execution_steps(rows, &plan.steps).await?;
253 Ok(Self::make_result(rows))
254 }
255
256 async fn execute_range_scan(&self, plan: &QueryPlan) -> Result<QueryResult> {
258 let table = self.require_table(plan)?;
259
260 let mut rows = self.full_scan_rows(table).await?;
263 rows = self.apply_execution_steps(rows, &plan.steps).await?;
264 Ok(Self::make_result(rows))
265 }
266
267 async fn execute_table_scan(&self, plan: &QueryPlan) -> Result<QueryResult> {
269 let table = self.require_table(plan)?;
270
271 #[cfg(debug_assertions)]
272 log::debug!("executor: Scanning for table: {:?}", table.name());
273
274 let can_parallelize = plan
275 .steps
276 .iter()
277 .any(|step| step.parallelization.can_parallelize);
278
279 let mut rows = if can_parallelize {
280 self.execute_parallel_table_scan(table, &plan.steps).await?
281 } else {
282 self.full_scan_rows(table).await?
283 };
284
285 rows = self.apply_execution_steps(rows, &plan.steps).await?;
286 Ok(Self::make_result(rows))
287 }
288
289 async fn execute_join(&self, _plan: &QueryPlan) -> Result<QueryResult> {
291 Ok(QueryResult::new())
292 }
293
294 async fn execute_aggregation(&self, _plan: &QueryPlan) -> Result<QueryResult> {
296 Ok(QueryResult::new())
297 }
298
299 async fn execute_subquery(&self, _plan: &QueryPlan) -> Result<QueryResult> {
301 Ok(QueryResult::new())
302 }
303
304 async fn execute_secondary_index_scan(
309 &self,
310 table: &TableId,
311 index_selection: &IndexSelection,
312 steps: &[ExecutionStep],
313 ) -> Result<Vec<QueryRow>> {
314 Self::find_condition(steps, &index_selection.columns[0])
316 .ok_or_else(|| Error::query_execution("No condition found for index"))?;
317 self.full_scan_rows(table).await
318 }
319
320 async fn execute_bloom_filter_scan(
322 &self,
323 table: &TableId,
324 index_selection: &IndexSelection,
325 steps: &[ExecutionStep],
326 ) -> Result<Vec<QueryRow>> {
327 let condition = Self::find_condition(steps, &index_selection.columns[0])
328 .ok_or_else(|| Error::query_execution("No condition found for bloom filter"))?;
329 self.point_lookup_rows(table, condition).await
330 }
331
332 async fn execute_primary_index_scan(
334 &self,
335 table: &TableId,
336 index_selection: &IndexSelection,
337 steps: &[ExecutionStep],
338 ) -> Result<Vec<QueryRow>> {
339 let condition = Self::find_condition(steps, &index_selection.columns[0])
340 .ok_or_else(|| Error::query_execution("No condition found for primary key"))?;
341 self.point_lookup_rows(table, condition).await
342 }
343
344 async fn execute_composite_index_scan(
347 &self,
348 table: &TableId,
349 _index_selection: &IndexSelection,
350 _steps: &[ExecutionStep],
351 ) -> Result<Vec<QueryRow>> {
352 self.full_scan_rows(table).await
353 }
354
355 async fn execute_parallel_table_scan(
364 &self,
365 table: &TableId,
366 steps: &[ExecutionStep],
367 ) -> Result<Vec<QueryRow>> {
368 let parallelization = steps
369 .iter()
370 .find(|step| step.parallelization.can_parallelize)
371 .map(|step| &step.parallelization)
372 .unwrap_or(&DEFAULT_PARALLELIZATION);
373
374 let thread_count = parallelization.suggested_threads;
375 let (tx, rx) = channel::unbounded();
376
377 let mut handles = Vec::with_capacity(thread_count);
378 for worker_id in 0..thread_count {
379 let storage = self.storage.clone();
380 let table = table.clone();
381 let tx = tx.clone();
382
383 handles.push(tokio::spawn(async move {
384 match storage.scan(&table, None, None, None, None).await {
385 Ok(results) => {
386 for pair in results {
387 if tx.send(pair).is_err() {
389 break;
390 }
391 }
392 }
393 Err(e) => log::error!("Worker {} error: {:?}", worker_id, e),
394 }
395 }));
396 }
397
398 drop(tx);
400
401 let mut rows = Vec::new();
402 while let Ok((row_key, row_data)) = rx.recv() {
403 rows.push(self.storage_data_to_query_row(row_data, &row_key)?);
404 }
405
406 for handle in handles {
407 let _ = handle.await;
408 }
409
410 Ok(rows)
411 }
412
413 async fn apply_execution_steps(
421 &self,
422 mut rows: Vec<QueryRow>,
423 steps: &[ExecutionStep],
424 ) -> Result<Vec<QueryRow>> {
425 for step in steps {
426 match step.step_type {
427 StepType::Filter => rows = self.apply_filter_step(rows, step)?,
428 StepType::Sort => rows = self.apply_sort_step(rows, step),
429 StepType::Project => rows = self.apply_project_step(rows, step),
430 StepType::Limit
432 | StepType::Aggregate
433 | StepType::Join
434 | StepType::Scan
435 | StepType::Insert => {}
436 }
437 }
438 Ok(rows)
439 }
440
441 fn apply_filter_step(
443 &self,
444 rows: Vec<QueryRow>,
445 step: &ExecutionStep,
446 ) -> Result<Vec<QueryRow>> {
447 let mut filtered_rows = Vec::with_capacity(rows.len());
448 for row in rows {
449 let mut matches = true;
450 for condition in &step.conditions {
451 if !self.evaluate_condition(&row, condition)? {
452 matches = false;
453 break;
454 }
455 }
456 if matches {
457 filtered_rows.push(row);
458 }
459 }
460 Ok(filtered_rows)
461 }
462
463 fn apply_sort_step(&self, mut rows: Vec<QueryRow>, step: &ExecutionStep) -> Vec<QueryRow> {
465 let Some(sort_column) = step.columns.first() else {
466 return rows;
467 };
468
469 rows.sort_by(|a, b| {
470 let a_val = a.values.get(sort_column).unwrap_or(&Value::Null);
471 let b_val = b.values.get(sort_column).unwrap_or(&Value::Null);
472 self.compare_values(a_val, b_val).unwrap_or(Ordering::Equal)
473 });
474 rows
475 }
476
477 fn apply_project_step(&self, rows: Vec<QueryRow>, step: &ExecutionStep) -> Vec<QueryRow> {
479 rows.into_iter()
480 .map(|row| {
481 let mut projected_values = HashMap::with_capacity(step.columns.len());
482 for column in &step.columns {
483 if let Some(value) = row.values.get(column) {
484 projected_values.insert(column.clone(), value.clone());
485 }
486 }
487 QueryRow::with_values(row.key, projected_values)
488 })
489 .collect()
490 }
491
492 fn evaluate_condition(&self, row: &QueryRow, condition: &Condition) -> Result<bool> {
496 let row_value = row.values.get(&condition.column).unwrap_or(&Value::Null);
497
498 match condition.operator {
499 ComparisonOperator::Equal => Ok(row_value == &condition.value),
500 ComparisonOperator::NotEqual => Ok(row_value != &condition.value),
501 ComparisonOperator::LessThan => Ok(matches!(
502 self.compare_values(row_value, &condition.value)?,
503 Ordering::Less
504 )),
505 ComparisonOperator::LessThanOrEqual => Ok(matches!(
506 self.compare_values(row_value, &condition.value)?,
507 Ordering::Less | Ordering::Equal
508 )),
509 ComparisonOperator::GreaterThan => Ok(matches!(
510 self.compare_values(row_value, &condition.value)?,
511 Ordering::Greater
512 )),
513 ComparisonOperator::GreaterThanOrEqual => Ok(matches!(
514 self.compare_values(row_value, &condition.value)?,
515 Ordering::Greater | Ordering::Equal
516 )),
517 ComparisonOperator::In => Ok(row_value == &condition.value),
519 ComparisonOperator::NotIn => Ok(row_value != &condition.value),
520 ComparisonOperator::Like => match (row_value, &condition.value) {
521 (Value::Text(row_text), Value::Text(pattern)) => Ok(row_text.contains(pattern)),
522 _ => Ok(false),
523 },
524 ComparisonOperator::NotLike => match (row_value, &condition.value) {
525 (Value::Text(row_text), Value::Text(pattern)) => Ok(!row_text.contains(pattern)),
526 _ => Ok(true),
527 },
528 }
529 }
530
531 fn compare_values(&self, a: &Value, b: &Value) -> Result<Ordering> {
533 match (a, b) {
534 (Value::Integer(a), Value::Integer(b)) => Ok(a.cmp(b)),
535 (Value::Float(a), Value::Float(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
536 (Value::Text(a), Value::Text(b)) => Ok(a.cmp(b)),
537 (Value::Boolean(a), Value::Boolean(b)) => Ok(a.cmp(b)),
538 (Value::Uuid(a), Value::Uuid(b)) => Ok(a.cmp(b)),
541 (Value::Null, Value::Null) => Ok(Ordering::Equal),
542 (Value::Null, _) => Ok(Ordering::Less),
543 (_, Value::Null) => Ok(Ordering::Greater),
544 _ => Err(Error::query_execution(
545 "Cannot compare values of different types",
546 )),
547 }
548 }
549
550 fn value_to_row_key(&self, value: &Value) -> Result<RowKey> {
562 match value {
563 Value::Integer(i) => Ok(RowKey::new(i.to_be_bytes().to_vec())),
564 Value::Text(s) => Ok(RowKey::new(s.as_bytes().to_vec())),
565 Value::Float(f) => Ok(RowKey::new(f.to_be_bytes().to_vec())),
566 Value::Boolean(b) => Ok(RowKey::new(vec![u8::from(*b)])),
567 Value::Null => Ok(RowKey::new(vec![0])),
568 Value::Uuid(bytes) => Ok(RowKey::new(bytes.to_vec())),
571 Value::BigInt(i) => Ok(RowKey::new(i.to_be_bytes().to_vec())),
572 Value::Tuple(components) => {
576 let mut result = Vec::new();
577 for component in components {
578 let raw = self.value_to_raw_pk_bytes(component)?;
579 let len = raw.len();
580 if len > u16::MAX as usize {
581 return Err(Error::query_execution(
582 "Composite partition key component too large",
583 ));
584 }
585 result.extend_from_slice(&(len as u16).to_be_bytes());
586 result.extend_from_slice(&raw);
587 result.push(0x00);
588 }
589 Ok(RowKey::new(result))
590 }
591 _ => Err(Error::query_execution("Cannot convert value to row key")),
592 }
593 }
594
595 fn value_to_raw_pk_bytes(&self, value: &Value) -> Result<Vec<u8>> {
599 match value {
600 Value::Integer(i) => Ok(i.to_be_bytes().to_vec()),
601 Value::Text(s) => Ok(s.as_bytes().to_vec()),
602 Value::Float(f) => Ok(f.to_be_bytes().to_vec()),
603 Value::Boolean(b) => Ok(vec![u8::from(*b)]),
604 Value::Null => Ok(Vec::new()),
605 Value::Uuid(bytes) => Ok(bytes.to_vec()),
606 Value::BigInt(i) => Ok(i.to_be_bytes().to_vec()),
607 _ => Err(Error::query_execution(
608 "Cannot serialize value as partition key component",
609 )),
610 }
611 }
612
613 fn condition_to_row_key(&self, condition: &Condition) -> Result<RowKey> {
615 if condition.column == "id" {
617 if let Value::Integer(id) = &condition.value {
618 return Ok(RowKey::new(format!("user_key_{}", id).into_bytes()));
619 }
620 }
621 self.value_to_row_key(&condition.value)
622 }
623
624 fn storage_data_to_query_row(&self, data: Value, key: &RowKey) -> Result<QueryRow> {
626 let mut values = HashMap::new();
627
628 match data {
630 Value::Map(map) => {
631 for (map_key, map_value) in map {
632 if let Value::Text(column_name) = map_key {
633 values.insert(column_name, map_value);
634 }
635 }
636 }
637 other => {
638 values.insert("data".to_string(), other);
639 }
640 }
641
642 if values.is_empty() {
644 values.insert("id".to_string(), Value::Text(format!("{:?}", key)));
645 }
646
647 Ok(QueryRow::with_values(key.clone(), values))
648 }
649
650 #[cfg(feature = "experimental")]
654 async fn execute_insert_operation(&self, plan: &QueryPlan) -> Result<QueryResult> {
655 let table_id = self
656 .require_table(plan)
657 .map_err(|_| Error::query_execution("No table specified in INSERT plan"))?;
658
659 let mut inserted_count: u64 = 0;
660
661 for step in &plan.steps {
662 if !matches!(step.step_type, StepType::Insert) {
663 continue;
664 }
665
666 #[cfg(debug_assertions)]
667 eprintln!("DEBUG: INSERT step conditions: {:?}", step.conditions);
668
669 let mut key_value = format!("test_key_{}", inserted_count);
672 for condition in &step.conditions {
673 if condition.column == "id" {
674 if let Value::Integer(id) = &condition.value {
675 key_value = format!("user_key_{}", id);
676 break;
677 }
678 }
679 }
680
681 #[cfg(debug_assertions)]
682 eprintln!("DEBUG: Using row key: {}", key_value);
683
684 let row_key = RowKey::new(key_value.into_bytes());
685
686 let mut value_map: HashMap<String, Value> = step
689 .conditions
690 .iter()
691 .map(|c| (c.column.clone(), c.value.clone()))
692 .collect();
693
694 if value_map.is_empty() {
695 value_map.insert("id".to_string(), Value::Integer(inserted_count as i32 + 1));
696 value_map.insert(
697 "name".to_string(),
698 Value::Text(format!("TestUser{}", inserted_count + 1)),
699 );
700 }
701
702 let row_value = map_to_value(value_map);
703
704 self.storage.put(table_id, row_key, row_value).await?;
705 inserted_count += 1;
706
707 #[cfg(debug_assertions)]
708 eprintln!(
709 "DEBUG: execute_insert_operation - stored row {} in table {}",
710 inserted_count, table_id
711 );
712 }
713
714 if inserted_count == 0 {
717 let row_key = RowKey::new(b"default_test_key".to_vec());
718 let mut value_map = HashMap::new();
719 value_map.insert("id".to_string(), Value::Integer(1));
720 value_map.insert("name".to_string(), Value::Text("DefaultUser".to_string()));
721
722 self.storage
723 .put(table_id, row_key, map_to_value(value_map))
724 .await?;
725 inserted_count = 1;
726 }
727
728 #[cfg(debug_assertions)]
729 eprintln!(
730 "DEBUG: execute_insert_operation called, returning rows_affected: {}",
731 inserted_count
732 );
733
734 Ok(QueryResult {
735 rows: vec![],
736 rows_affected: inserted_count,
737 execution_time_ms: 0,
738 metadata: super::result::QueryMetadata::default(),
739 })
740 }
741
742 async fn execute_create_table_operation(&self, _plan: &QueryPlan) -> Result<QueryResult> {
744 Ok(QueryResult {
745 rows: vec![],
746 rows_affected: 0,
747 execution_time_ms: 0,
748 metadata: super::result::QueryMetadata::default(),
749 })
750 }
751}
752
753#[cfg(feature = "experimental")]
755fn map_to_value(map: HashMap<String, Value>) -> Value {
756 Value::Map(map.into_iter().map(|(k, v)| (Value::Text(k), v)).collect())
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762 use crate::Config;
763 use std::sync::Arc;
764 use tempfile::TempDir;
765
766 async fn make_executor() -> (TempDir, QueryExecutor, Config) {
768 let temp_dir = TempDir::new().unwrap();
769 let config = Config::default();
770 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
771 let storage = Arc::new(
772 crate::storage::StorageEngine::open(
773 temp_dir.path(),
774 &config,
775 platform,
776 #[cfg(feature = "state_machine")]
777 None,
778 )
779 .await
780 .unwrap(),
781 );
782 let schema = Arc::new(
783 crate::schema::SchemaManager::new(temp_dir.path())
784 .await
785 .unwrap(),
786 );
787 let executor = QueryExecutor::new(storage, schema, &config);
788 (temp_dir, executor, config)
789 }
790
791 #[tokio::test]
792 async fn test_query_executor_creation() {
793 let (_tmp, executor, config) = make_executor().await;
794 assert_eq!(
795 executor._config.query.query_parallelism,
796 config.query.query_parallelism
797 );
798 }
799
800 #[tokio::test]
801 async fn test_value_comparison() {
802 let (_tmp, executor, _) = make_executor().await;
803
804 let result = executor
805 .compare_values(&Value::Integer(10), &Value::Integer(20))
806 .unwrap();
807 assert_eq!(result, Ordering::Less);
808
809 let result = executor
810 .compare_values(
811 &Value::Text("apple".to_string()),
812 &Value::Text("banana".to_string()),
813 )
814 .unwrap();
815 assert_eq!(result, Ordering::Less);
816 }
817
818 #[tokio::test]
819 async fn test_condition_evaluation() {
820 let (_tmp, executor, _) = make_executor().await;
821
822 let mut row_values = HashMap::new();
823 row_values.insert("id".to_string(), Value::Integer(1));
824 row_values.insert("name".to_string(), Value::Text("test".to_string()));
825 let row = QueryRow::with_values(RowKey::new(vec![1]), row_values);
826
827 let condition = Condition {
828 column: "id".to_string(),
829 operator: ComparisonOperator::Equal,
830 value: Value::Integer(1),
831 };
832 assert!(executor.evaluate_condition(&row, &condition).unwrap());
833
834 let condition = Condition {
835 column: "name".to_string(),
836 operator: ComparisonOperator::Like,
837 value: Value::Text("test".to_string()),
838 };
839 assert!(executor.evaluate_condition(&row, &condition).unwrap());
840 }
841
842 #[tokio::test]
843 async fn test_condition_to_row_key_mapping() {
844 let (_tmp, executor, _) = make_executor().await;
845
846 let id_condition = Condition {
847 column: "id".to_string(),
848 operator: ComparisonOperator::Equal,
849 value: Value::Integer(42),
850 };
851 let key = executor
852 .condition_to_row_key(&id_condition)
853 .expect("id condition key");
854 assert_eq!(std::str::from_utf8(key.as_bytes()).unwrap(), "user_key_42");
855
856 let name_condition = Condition {
857 column: "username".to_string(),
858 operator: ComparisonOperator::Equal,
859 value: Value::Text("carol".to_string()),
860 };
861 let key = executor
862 .condition_to_row_key(&name_condition)
863 .expect("fallback key");
864 assert_eq!(key.as_bytes(), b"carol");
865 }
866}