1use crate::convert::{cipher_blob_to_proto, create_version, key_to_proto, query_from_proto};
7use crate::error::{NetError, NetResult};
8use crate::proto::{aql, query};
9use crate::server_admin::{LogEntry, push_log_entry};
10use amaters_core::Query;
11use amaters_core::Update as UpdateOp;
12use amaters_core::traits::StorageEngine;
13use amaters_core::types::{CipherBlob, Key};
14use futures::StreamExt;
15use parking_lot::RwLock;
16use std::collections::VecDeque;
17use std::sync::Arc;
18use std::time::Instant;
19use tracing::{debug, error, info, warn};
20
21#[cfg(feature = "compute")]
22use amaters_core::compute::{FheExecutor, KeyManager, PredicateCompiler};
23#[cfg(feature = "compute")]
24use std::collections::HashMap;
25
26pub struct AqlServiceImpl<S: StorageEngine> {
30 storage: Arc<S>,
32 start_time: Instant,
34 recent_log: Arc<RwLock<VecDeque<LogEntry>>>,
36 #[cfg(feature = "compute")]
38 key_manager: Arc<KeyManager>,
39}
40
41impl<S: StorageEngine> AqlServiceImpl<S> {
42 #[cfg(feature = "compute")]
44 pub fn new(storage: Arc<S>) -> Self {
45 Self {
46 storage,
47 start_time: Instant::now(),
48 recent_log: Arc::new(RwLock::new(VecDeque::new())),
49 key_manager: Arc::new(KeyManager::new()),
50 }
51 }
52
53 #[cfg(not(feature = "compute"))]
55 pub fn new(storage: Arc<S>) -> Self {
56 Self {
57 storage,
58 start_time: Instant::now(),
59 recent_log: Arc::new(RwLock::new(VecDeque::new())),
60 }
61 }
62
63 #[cfg(feature = "compute")]
65 pub fn with_key_manager(storage: Arc<S>, key_manager: Arc<KeyManager>) -> Self {
66 Self {
67 storage,
68 start_time: Instant::now(),
69 recent_log: Arc::new(RwLock::new(VecDeque::new())),
70 key_manager,
71 }
72 }
73
74 pub async fn execute_query(&self, request: aql::QueryRequest) -> aql::QueryResponse {
76 let start_time = Instant::now();
77
78 info!(
79 "ExecuteQuery request received: request_id={:?}",
80 request.request_id
81 );
82
83 let proto_query = match request.query {
85 Some(q) => q,
86 None => {
87 let execution_time_ms = start_time.elapsed().as_millis() as u64;
88 return aql::QueryResponse {
89 response: Some(aql::query_response::Response::Error(
90 crate::proto::errors::ErrorResponse {
91 code: crate::proto::errors::ErrorCode::ErrorProtocolMissingField as i32,
92 message: "Missing query in request".to_string(),
93 category: crate::proto::errors::ErrorCategory::CategoryClientError
94 as i32,
95 details: None,
96 retry_after: None,
97 },
98 )),
99 request_id: request.request_id,
100 execution_time_ms,
101 };
102 }
103 };
104
105 let query = match query_from_proto(proto_query) {
106 Ok(q) => q,
107 Err(e) => {
108 error!("Failed to parse query: {}", e);
109 let execution_time_ms = start_time.elapsed().as_millis() as u64;
110 return aql::QueryResponse {
111 response: Some(aql::query_response::Response::Error(
112 crate::proto::errors::ErrorResponse {
113 code: e.error_code() as i32,
114 message: e.to_string(),
115 category: e.error_category() as i32,
116 details: None,
117 retry_after: None,
118 },
119 )),
120 request_id: request.request_id,
121 execution_time_ms,
122 };
123 }
124 };
125
126 let result = self.execute_query_internal(query).await;
128
129 let execution_time_ms = start_time.elapsed().as_millis() as u64;
130
131 let response = match result {
133 Ok(query_result) => aql::QueryResponse {
134 response: Some(aql::query_response::Response::Result(query_result)),
135 request_id: request.request_id,
136 execution_time_ms,
137 },
138 Err(e) => {
139 error!("Query execution failed: {}", e);
140 push_log_entry(
141 &self.recent_log,
142 format!("ExecuteQuery elapsed={}ms error={}", execution_time_ms, e),
143 );
144 return aql::QueryResponse {
145 response: Some(aql::query_response::Response::Error(
146 crate::proto::errors::ErrorResponse {
147 code: e.error_code() as i32,
148 message: e.to_string(),
149 category: e.error_category() as i32,
150 details: None,
151 retry_after: None,
152 },
153 )),
154 request_id: request.request_id,
155 execution_time_ms,
156 };
157 }
158 };
159 push_log_entry(
160 &self.recent_log,
161 format!("ExecuteQuery elapsed={}ms ok", execution_time_ms),
162 );
163 response
164 }
165
166 #[doc(hidden)]
171 #[tracing::instrument(skip(self), fields(trace_id = tracing::field::Empty, duration_us = tracing::field::Empty))]
172 pub async fn execute_query_internal(&self, query: Query) -> NetResult<query::QueryResult> {
173 match query {
174 Query::Get { collection, key } => {
175 debug!(
176 "Executing GET query: collection={}, key={:?}",
177 collection, key
178 );
179
180 let key_str = key.to_string_lossy();
186 if let Some(admin_cmd) = key_str.strip_prefix("__admin__:") {
187 if let Some(json) = self.handle_admin_command(admin_cmd).await {
188 let blob = CipherBlob::new(json.into_bytes());
189 return Ok(query::QueryResult {
190 result: Some(query::query_result::Result::Single(
191 query::SingleResult {
192 value: Some(cipher_blob_to_proto(&blob)),
193 },
194 )),
195 });
196 }
197 return Ok(query::QueryResult {
199 result: Some(query::query_result::Result::Single(query::SingleResult {
200 value: None,
201 })),
202 });
203 }
204
205 let result = self.storage.get(&key).await?;
206
207 let result = match result {
208 Some(value) => query::QueryResult {
209 result: Some(query::query_result::Result::Single(query::SingleResult {
210 value: Some(cipher_blob_to_proto(&value)),
211 })),
212 },
213 None => query::QueryResult {
214 result: Some(query::query_result::Result::Single(query::SingleResult {
215 value: None,
216 })),
217 },
218 };
219
220 Ok(result)
221 }
222 Query::Set {
223 collection,
224 key,
225 value,
226 } => {
227 debug!(
228 "Executing SET query: collection={}, key={:?}",
229 collection, key
230 );
231
232 self.storage.put(&key, &value).await?;
233
234 Ok(query::QueryResult {
235 result: Some(query::query_result::Result::Success(query::SuccessResult {
236 affected_rows: 1,
237 })),
238 })
239 }
240 Query::Delete { collection, key } => {
241 debug!(
242 "Executing DELETE query: collection={}, key={:?}",
243 collection, key
244 );
245
246 self.storage.delete(&key).await?;
247
248 Ok(query::QueryResult {
249 result: Some(query::query_result::Result::Success(query::SuccessResult {
250 affected_rows: 1,
251 })),
252 })
253 }
254 Query::Range {
255 collection,
256 start,
257 end,
258 } => {
259 debug!(
260 "Executing RANGE query: collection={}, start={:?}, end={:?}",
261 collection, start, end
262 );
263
264 let results = self.storage.range(&start, &end).await?;
265
266 let values: Vec<query::KeyValue> = results
267 .into_iter()
268 .map(|(k, v)| query::KeyValue {
269 key: Some(key_to_proto(&k)),
270 value: Some(cipher_blob_to_proto(&v)),
271 encrypted_predicate_result: None,
272 })
273 .collect();
274
275 Ok(query::QueryResult {
276 result: Some(query::query_result::Result::Multi(query::MultiResult {
277 values,
278 })),
279 })
280 }
281 Query::Filter {
282 collection,
283 predicate,
284 } => {
285 let min_key = Key::from_slice(&[]);
287 let max_key = Key::from_slice(&[0xFF; 256]);
288
289 let all_rows = match self.storage.range(&min_key, &max_key).await {
290 Ok(rows) => rows,
291 Err(e) => {
292 error!("Failed to retrieve rows for filter: {}", e);
293 return Err(NetError::from(e));
294 }
295 };
296
297 debug!("Filter: retrieved {} candidate rows", all_rows.len());
298
299 if all_rows.len() > 1000 {
300 warn!(
301 "Filter query retrieved {} rows, which may cause performance issues",
302 all_rows.len()
303 );
304 }
305
306 let first_is_plaintext = all_rows
311 .first()
312 .map(|(_, v)| predicate.evaluate_plaintext(v).is_some())
313 .unwrap_or(true); if first_is_plaintext {
316 info!("Executing FILTER query with server-side plaintext predicate evaluation");
317
318 let mut results = Vec::new();
319 let mut excluded: usize = 0;
320
321 for (key, value_blob) in all_rows {
322 match predicate.evaluate_plaintext(&value_blob) {
323 Some(true) => {
324 results.push(query::KeyValue {
325 key: Some(key_to_proto(&key)),
326 value: Some(cipher_blob_to_proto(&value_blob)),
327 encrypted_predicate_result: None,
328 });
329 }
330 Some(false) => {
331 excluded += 1;
333 }
334 None => {
335 warn!(
338 "Plaintext evaluation returned None for key {:?} mid-scan; \
339 including row conservatively",
340 key
341 );
342 results.push(query::KeyValue {
343 key: Some(key_to_proto(&key)),
344 value: Some(cipher_blob_to_proto(&value_blob)),
345 encrypted_predicate_result: None,
346 });
347 }
348 }
349 }
350
351 info!(
352 "FILTER query completed: {} rows matched, {} rows excluded by plaintext predicate",
353 results.len(),
354 excluded
355 );
356
357 return Ok(query::QueryResult {
358 result: Some(query::query_result::Result::Multi(query::MultiResult {
359 values: results,
360 })),
361 });
362 }
363
364 #[cfg(feature = "compute")]
366 {
367 info!("Executing FILTER query with FHE predicate evaluation");
368
369 let mut compiler = PredicateCompiler::new();
377
378 let circuit = match compiler
381 .compile(&predicate, amaters_core::compute::EncryptedType::U8)
382 {
383 Ok(c) => c,
384 Err(e) => {
385 error!("Failed to compile predicate: {}", e);
386 return Err(NetError::ServerInternal(format!(
387 "Predicate compilation failed: {}",
388 e
389 )));
390 }
391 };
392
393 debug!(
394 "Compiled predicate circuit: depth={}, gates={}",
395 circuit.depth, circuit.gate_count
396 );
397
398 let rhs = match PredicateCompiler::extract_rhs_value(&predicate) {
400 Ok(r) => r,
401 Err(e) => {
402 error!("Failed to extract RHS value: {}", e);
403 return Err(NetError::ServerInternal(format!(
404 "RHS extraction failed: {}",
405 e
406 )));
407 }
408 };
409
410 let executor = FheExecutor::new();
412
413 let mut results = Vec::new();
416 let mut execution_errors = 0;
417
418 for (key, value_blob) in all_rows {
419 let mut inputs = HashMap::new();
421 inputs.insert("value".to_string(), value_blob.clone());
422 inputs.insert("rhs".to_string(), rhs.clone());
423
424 match executor.execute(&circuit, &inputs) {
427 Ok(result_blob) => {
428 let result_bytes = result_blob.as_bytes().to_vec();
429
430 debug!(
431 "Executed predicate on key {:?}, result blob size: {}",
432 key,
433 result_bytes.len()
434 );
435
436 results.push(query::KeyValue {
437 key: Some(key_to_proto(&key)),
438 value: Some(cipher_blob_to_proto(&value_blob)),
439 encrypted_predicate_result: Some(result_bytes),
440 });
441 }
442 Err(e) => {
443 execution_errors += 1;
444 warn!("FHE execution failed for key {:?}: {}", key, e);
445 }
447 }
448 }
449
450 if execution_errors > 0 {
451 warn!(
452 "Filter query had {} FHE execution errors out of {} total rows",
453 execution_errors,
454 execution_errors + results.len()
455 );
456 }
457
458 info!(
459 "FILTER query completed, processed {} rows successfully",
460 results.len()
461 );
462
463 Ok(query::QueryResult {
464 result: Some(query::query_result::Result::Multi(query::MultiResult {
465 values: results,
466 })),
467 })
468 }
469
470 #[cfg(not(feature = "compute"))]
471 {
472 let _ = (collection, predicate);
473 warn!("FILTER query reached FHE path but compute feature is disabled");
474 Err(NetError::ServerInternal(
475 "FILTER queries on encrypted values require the compute feature"
476 .to_string(),
477 ))
478 }
479 }
480 Query::Update {
481 collection,
482 predicate,
483 updates,
484 } => {
485 debug!(
486 "Executing UPDATE query: collection={}, updates_count={}",
487 collection,
488 updates.len()
489 );
490
491 #[cfg(feature = "compute")]
492 {
493 let mut compiler = PredicateCompiler::new();
497 let circuit = match compiler
498 .compile(&predicate, amaters_core::compute::EncryptedType::U8)
499 {
500 Ok(c) => c,
501 Err(e) => {
502 error!("Failed to compile update predicate: {}", e);
503 return Err(NetError::ServerInternal(format!(
504 "Update predicate compilation failed: {}",
505 e
506 )));
507 }
508 };
509
510 let rhs = match PredicateCompiler::extract_rhs_value(&predicate) {
511 Ok(r) => r,
512 Err(e) => {
513 error!("Failed to extract RHS value for update predicate: {}", e);
514 return Err(NetError::ServerInternal(format!(
515 "Update RHS extraction failed: {}",
516 e
517 )));
518 }
519 };
520
521 let executor = FheExecutor::new();
522
523 let min_key = Key::from_slice(&[]);
525 let max_key = Key::from_slice(&[0xFF; 256]);
526 let all_rows = self.storage.range(&min_key, &max_key).await?;
527
528 let mut affected_rows: u64 = 0;
529
530 for (key, value_blob) in &all_rows {
531 let mut inputs = HashMap::new();
533 inputs.insert("value".to_string(), value_blob.clone());
534 inputs.insert("rhs".to_string(), rhs.clone());
535
536 let matches = match executor.execute(&circuit, &inputs) {
538 Ok(result_blob) => {
539 result_blob.as_bytes().iter().any(|&b| b != 0)
541 }
542 Err(e) => {
543 warn!("FHE predicate evaluation failed for key {:?}: {}", key, e);
544 continue;
545 }
546 };
547
548 if !matches {
549 continue;
550 }
551
552 let mut current_value = value_blob.clone();
554 for update_op in &updates {
555 current_value = apply_update_operation(¤t_value, update_op);
556 }
557
558 self.storage.put(key, ¤t_value).await?;
559 affected_rows += 1;
560 }
561
562 info!(
563 "UPDATE query completed: {} rows affected out of {} total",
564 affected_rows,
565 all_rows.len()
566 );
567
568 Ok(query::QueryResult {
569 result: Some(query::query_result::Result::Success(query::SuccessResult {
570 affected_rows,
571 })),
572 })
573 }
574
575 #[cfg(not(feature = "compute"))]
576 {
577 let _ = predicate;
581
582 let all_keys = self.storage.keys().await?;
583
584 if all_keys.is_empty() {
585 info!(
586 "UPDATE query on collection '{}': no keys found, 0 rows affected",
587 collection
588 );
589 return Ok(query::QueryResult {
590 result: Some(query::query_result::Result::Success(
591 query::SuccessResult { affected_rows: 0 },
592 )),
593 });
594 }
595
596 let mut affected_rows: u64 = 0;
597
598 for key in &all_keys {
599 let value_opt = self.storage.get(key).await?;
600 let current_value = match value_opt {
601 Some(v) => v,
602 None => continue,
603 };
604
605 let mut updated_value = current_value;
606 for update_op in &updates {
607 updated_value = apply_update_operation(&updated_value, update_op);
608 }
609
610 self.storage.put(key, &updated_value).await?;
611 affected_rows += 1;
612 }
613
614 info!(
615 "UPDATE query completed: {} rows affected in collection '{}'",
616 affected_rows, collection
617 );
618
619 Ok(query::QueryResult {
620 result: Some(query::query_result::Result::Success(query::SuccessResult {
621 affected_rows,
622 })),
623 })
624 }
625 }
626 }
627 }
628
629 #[tracing::instrument(skip(self, request), fields(trace_id = tracing::field::Empty, query_count = request.queries.len(), duration_us = tracing::field::Empty))]
636 pub async fn execute_batch(&self, request: aql::BatchRequest) -> aql::BatchResponse {
637 let start_time = Instant::now();
638
639 info!(
640 "ExecuteBatch request received: request_id={:?}, query_count={}",
641 request.request_id,
642 request.queries.len()
643 );
644
645 if request.queries.is_empty() {
647 let execution_time_ms = start_time.elapsed().as_millis() as u64;
648 return aql::BatchResponse {
649 response: Some(aql::batch_response::Response::Results(aql::BatchResult {
650 results: Vec::new(),
651 })),
652 request_id: request.request_id,
653 execution_time_ms,
654 };
655 }
656
657 let mut results = Vec::with_capacity(request.queries.len());
658 let mut rollback_ops: Vec<RollbackOp> = Vec::new();
659
660 for (idx, proto_query) in request.queries.into_iter().enumerate() {
661 let core_query = match query_from_proto(proto_query) {
663 Ok(q) => q,
664 Err(e) => {
665 error!("Failed to parse query {} in batch: {}", idx, e);
666 self.rollback_operations(&rollback_ops).await;
668 let execution_time_ms = start_time.elapsed().as_millis() as u64;
669 push_log_entry(
670 &self.recent_log,
671 format!(
672 "ExecuteBatch elapsed={}ms error=parse_query_{}: {}",
673 execution_time_ms, idx, e
674 ),
675 );
676 return aql::BatchResponse {
677 response: Some(aql::batch_response::Response::Error(
678 crate::proto::errors::ErrorResponse {
679 code: e.error_code() as i32,
680 message: format!("Query {} in batch failed to parse: {}", idx, e),
681 category: e.error_category() as i32,
682 details: None,
683 retry_after: None,
684 },
685 )),
686 request_id: request.request_id,
687 execution_time_ms,
688 };
689 }
690 };
691
692 let rollback_op = self.build_rollback_op(&core_query).await;
694
695 match self.execute_query_internal(core_query).await {
696 Ok(query_result) => {
697 if let Some(op) = rollback_op {
699 rollback_ops.push(op);
700 }
701 results.push(query_result);
702 }
703 Err(e) => {
704 error!("Query {} in batch failed: {}", idx, e);
705 self.rollback_operations(&rollback_ops).await;
707 let execution_time_ms = start_time.elapsed().as_millis() as u64;
708 push_log_entry(
709 &self.recent_log,
710 format!(
711 "ExecuteBatch elapsed={}ms error=query_{}: {}",
712 execution_time_ms, idx, e
713 ),
714 );
715 return aql::BatchResponse {
716 response: Some(aql::batch_response::Response::Error(
717 crate::proto::errors::ErrorResponse {
718 code: e.error_code() as i32,
719 message: format!("Query {} in batch failed: {}", idx, e),
720 category: e.error_category() as i32,
721 details: None,
722 retry_after: None,
723 },
724 )),
725 request_id: request.request_id,
726 execution_time_ms,
727 };
728 }
729 }
730 }
731
732 let execution_time_ms = start_time.elapsed().as_millis() as u64;
733 info!(
734 "ExecuteBatch completed successfully: {} queries in {}ms",
735 results.len(),
736 execution_time_ms
737 );
738 push_log_entry(
739 &self.recent_log,
740 format!(
741 "ExecuteBatch elapsed={}ms queries={} ok",
742 execution_time_ms,
743 results.len()
744 ),
745 );
746
747 aql::BatchResponse {
748 response: Some(aql::batch_response::Response::Results(aql::BatchResult {
749 results,
750 })),
751 request_id: request.request_id,
752 execution_time_ms,
753 }
754 }
755
756 async fn build_rollback_op(&self, query: &Query) -> Option<RollbackOp> {
763 match query {
764 Query::Set { key, .. } => {
765 let old_value = match self.storage.get(key).await {
767 Ok(v) => v,
768 Err(e) => {
769 warn!("Failed to read old value for rollback tracking: {}", e);
770 None
771 }
772 };
773 Some(RollbackOp::UndoSet {
774 key: key.clone(),
775 old_value,
776 })
777 }
778 Query::Delete { key, .. } => {
779 let old_value = match self.storage.get(key).await {
781 Ok(v) => v,
782 Err(e) => {
783 warn!("Failed to read value for rollback tracking: {}", e);
784 None
785 }
786 };
787 Some(RollbackOp::UndoDelete {
788 key: key.clone(),
789 old_value,
790 })
791 }
792 Query::Update { .. } => {
793 let keys = match self.storage.keys().await {
795 Ok(k) => k,
796 Err(e) => {
797 warn!("Failed to list keys for update rollback tracking: {}", e);
798 return Some(RollbackOp::UndoUpdate {
799 snapshots: Vec::new(),
800 });
801 }
802 };
803 let mut snapshots = Vec::with_capacity(keys.len());
804 for key in &keys {
805 let value = match self.storage.get(key).await {
806 Ok(v) => v,
807 Err(e) => {
808 warn!(
809 "Failed to read value for key {:?} during update rollback tracking: {}",
810 key, e
811 );
812 None
813 }
814 };
815 snapshots.push((key.clone(), value));
816 }
817 Some(RollbackOp::UndoUpdate { snapshots })
818 }
819 Query::Get { .. } | Query::Range { .. } | Query::Filter { .. } => None,
821 }
822 }
823
824 async fn rollback_operations(&self, ops: &[RollbackOp]) {
829 if ops.is_empty() {
830 return;
831 }
832
833 warn!("Rolling back {} operations due to batch failure", ops.len());
834
835 for (idx, op) in ops.iter().rev().enumerate() {
836 match op {
837 RollbackOp::UndoSet { key, old_value } => {
838 match old_value {
839 Some(value) => {
840 if let Err(e) = self.storage.put(key, value).await {
842 error!(
843 "Rollback failed for UndoSet (restore) at index {}: {}",
844 idx, e
845 );
846 } else {
847 debug!("Rolled back Set: restored old value for key {:?}", key);
848 }
849 }
850 None => {
851 if let Err(e) = self.storage.delete(key).await {
853 error!(
854 "Rollback failed for UndoSet (delete) at index {}: {}",
855 idx, e
856 );
857 } else {
858 debug!("Rolled back Set: deleted new key {:?}", key);
859 }
860 }
861 }
862 }
863 RollbackOp::UndoDelete { key, old_value } => {
864 if let Some(value) = old_value {
865 if let Err(e) = self.storage.put(key, value).await {
867 error!("Rollback failed for UndoDelete at index {}: {}", idx, e);
868 } else {
869 debug!("Rolled back Delete: restored value for key {:?}", key);
870 }
871 }
872 }
875 RollbackOp::UndoUpdate { snapshots } => {
876 let current_keys = match self.storage.keys().await {
878 Ok(k) => k,
879 Err(e) => {
880 error!(
881 "Rollback failed for UndoUpdate at index {}: cannot list keys: {}",
882 idx, e
883 );
884 continue;
885 }
886 };
887
888 let snapshot_keys: std::collections::HashSet<&Key> =
890 snapshots.iter().map(|(k, _)| k).collect();
891
892 for key in ¤t_keys {
894 if !snapshot_keys.contains(key) {
895 if let Err(e) = self.storage.delete(key).await {
896 error!(
897 "Rollback failed for UndoUpdate (remove new key) at index {}: {}",
898 idx, e
899 );
900 } else {
901 debug!("Rolled back Update: removed new key {:?}", key);
902 }
903 }
904 }
905
906 for (key, old_value) in snapshots {
908 match old_value {
909 Some(value) => {
910 if let Err(e) = self.storage.put(key, value).await {
911 error!(
912 "Rollback failed for UndoUpdate (restore) at index {}: {}",
913 idx, e
914 );
915 } else {
916 debug!("Rolled back Update: restored value for key {:?}", key);
917 }
918 }
919 None => {
920 if let Err(e) = self.storage.delete(key).await {
922 error!(
923 "Rollback failed for UndoUpdate (delete) at index {}: {}",
924 idx, e
925 );
926 }
927 }
928 }
929 }
930 debug!("Rolled back Update operation at index {}", idx);
931 }
932 }
933 }
934
935 info!("Rollback completed");
936 }
937
938 pub fn execute_stream(
951 &self,
952 request: aql::QueryRequest,
953 config: StreamConfig,
954 ) -> futures::stream::BoxStream<'static, Result<aql::StreamResponse, NetError>> {
955 use futures::StreamExt;
956
957 let storage = self.storage.clone();
958 let recent_log = self.recent_log.clone();
959 let request_id = request.request_id.clone();
960
961 let stream = async_stream::stream! {
962 let start_time = Instant::now();
963
964 info!(
965 "ExecuteStream request received: request_id={:?}, chunk_size={}",
966 request_id, config.chunk_size
967 );
968
969 let proto_query = match request.query {
971 Some(q) => q,
972 None => {
973 yield Err(NetError::MissingField("query".to_string()));
974 return;
975 }
976 };
977
978 let core_query = match query_from_proto(proto_query) {
979 Ok(q) => q,
980 Err(e) => {
981 error!("Failed to parse stream query: {}", e);
982 yield Err(e);
983 return;
984 }
985 };
986
987 let results = match core_query {
989 Query::Range { collection, start, end } => {
990 debug!(
991 "Executing streaming RANGE query: collection={}, start={:?}, end={:?}",
992 collection, start, end
993 );
994 match storage.range(&start, &end).await {
995 Ok(rows) => rows,
996 Err(e) => {
997 error!("Storage range query failed: {}", e);
998 yield Err(NetError::from(e));
999 return;
1000 }
1001 }
1002 }
1003 Query::Get { collection, key } => {
1004 debug!(
1005 "Executing streaming GET query: collection={}, key={:?}",
1006 collection, key
1007 );
1008 match storage.get(&key).await {
1009 Ok(Some(value)) => vec![(key, value)],
1010 Ok(None) => Vec::new(),
1011 Err(e) => {
1012 error!("Storage get query failed: {}", e);
1013 yield Err(NetError::from(e));
1014 return;
1015 }
1016 }
1017 }
1018 _ => {
1019 yield Err(NetError::InvalidRequest(
1020 "Only Range and Get queries are supported for streaming".to_string(),
1021 ));
1022 return;
1023 }
1024 };
1025
1026 let results = if let Some(max) = config.max_results {
1028 if results.len() > max {
1029 results.into_iter().take(max).collect::<Vec<_>>()
1030 } else {
1031 results
1032 }
1033 } else {
1034 results
1035 };
1036
1037 let total_count = results.len();
1038
1039 if start_time.elapsed() > config.timeout {
1041 yield Err(NetError::Timeout(
1042 "Query execution exceeded timeout before streaming began".to_string(),
1043 ));
1044 return;
1045 }
1046
1047 let mut sequence: u64 = 0;
1049 let chunks_iter: Vec<Vec<(Key, CipherBlob)>> = results
1050 .chunks(config.chunk_size)
1051 .map(|c| c.to_vec())
1052 .collect();
1053 let total_chunks = chunks_iter.len();
1054
1055 for (chunk_idx, chunk) in chunks_iter.into_iter().enumerate() {
1056 if start_time.elapsed() > config.timeout {
1058 yield Err(NetError::Timeout(
1059 format!("Streaming timed out at chunk {}/{}", chunk_idx + 1, total_chunks)
1060 ));
1061 return;
1062 }
1063
1064 let has_more = chunk_idx + 1 < total_chunks;
1065 let values: Vec<query::KeyValue> = chunk
1066 .into_iter()
1067 .map(|(k, v)| query::KeyValue {
1068 key: Some(key_to_proto(&k)),
1069 value: Some(cipher_blob_to_proto(&v)),
1070 encrypted_predicate_result: None,
1071 })
1072 .collect();
1073
1074 yield Ok(aql::StreamResponse {
1075 chunk: Some(aql::stream_response::Chunk::Batch(aql::StreamBatch {
1076 values,
1077 has_more,
1078 })),
1079 sequence,
1080 });
1081
1082 sequence += 1;
1083 }
1084
1085 yield Ok(aql::StreamResponse {
1087 chunk: Some(aql::stream_response::Chunk::End(aql::StreamEnd {
1088 total_count: total_count as u64,
1089 })),
1090 sequence,
1091 });
1092
1093 let elapsed_ms = start_time.elapsed().as_millis() as u64;
1094 info!(
1095 "ExecuteStream completed: {} items in {} chunks, {}ms",
1096 total_count,
1097 total_chunks,
1098 elapsed_ms
1099 );
1100 push_log_entry(
1101 &recent_log,
1102 format!(
1103 "ExecuteStream elapsed={}ms items={} chunks={} ok",
1104 elapsed_ms, total_count, total_chunks
1105 ),
1106 );
1107 };
1108
1109 stream.boxed()
1110 }
1111
1112 #[tracing::instrument(skip(self, _request))]
1114 pub async fn health_check(
1115 &self,
1116 _request: aql::HealthCheckRequest,
1117 ) -> aql::HealthCheckResponse {
1118 debug!("HealthCheck request received");
1119 push_log_entry(&self.recent_log, "HealthCheck ok".to_string());
1120
1121 aql::HealthCheckResponse {
1122 status: aql::HealthStatus::HealthServing as i32,
1123 message: Some("Service is healthy".to_string()),
1124 }
1125 }
1126
1127 #[tracing::instrument(skip(self, _request))]
1129 pub async fn get_server_info(
1130 &self,
1131 _request: aql::ServerInfoRequest,
1132 ) -> aql::ServerInfoResponse {
1133 debug!("GetServerInfo request received");
1134 push_log_entry(&self.recent_log, "GetServerInfo ok".to_string());
1135
1136 let mut capabilities = vec![
1137 "query.get".to_string(),
1138 "query.set".to_string(),
1139 "query.delete".to_string(),
1140 "query.range".to_string(),
1141 "query.update".to_string(),
1142 ];
1143
1144 #[cfg(feature = "compute")]
1145 capabilities.push("query.filter".to_string());
1146
1147 aql::ServerInfoResponse {
1148 version: Some(create_version()),
1149 supported_versions: vec![create_version()],
1150 capabilities,
1151 uptime_seconds: self.start_time.elapsed().as_secs(),
1152 }
1153 }
1154
1155 async fn handle_admin_command(&self, cmd: &str) -> Option<String> {
1161 crate::server_admin::handle_admin_command(
1162 cmd,
1163 self.start_time.elapsed().as_secs(),
1164 &self.recent_log,
1165 &self.storage,
1166 )
1167 .await
1168 }
1169}
1170
1171pub use crate::server_builder::AqlServerBuilder;
1174pub use crate::server_types::StreamConfig;
1175use crate::server_types::{RollbackOp, apply_update_operation};
1176
1177#[cfg(test)]
1178mod tests {
1179 use super::*;
1180 use amaters_core::storage::MemoryStorage;
1181 use amaters_core::types::{CipherBlob, Key};
1182
1183 #[tokio::test]
1184 async fn test_service_creation() {
1185 let storage = Arc::new(MemoryStorage::new());
1186 let service = AqlServiceImpl::new(storage);
1187 assert!(service.start_time.elapsed().as_secs() < 1);
1188 }
1189
1190 #[tokio::test]
1191 async fn test_get_query_execution() {
1192 let storage = Arc::new(MemoryStorage::new());
1193 let key = Key::from_str("test_key");
1194 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1195
1196 storage.put(&key, &value).await.expect("Failed to put");
1197
1198 let service = AqlServiceImpl::new(storage);
1199
1200 let query = Query::Get {
1201 collection: "test".to_string(),
1202 key: key.clone(),
1203 };
1204
1205 let result = service.execute_query_internal(query).await;
1206 assert!(result.is_ok());
1207
1208 let query_result = result.expect("Query failed");
1209 match query_result.result {
1210 Some(query::query_result::Result::Single(single)) => {
1211 assert!(single.value.is_some());
1212 }
1213 _ => panic!("Expected single result"),
1214 }
1215 }
1216
1217 #[tokio::test]
1218 async fn test_set_query_execution() {
1219 let storage = Arc::new(MemoryStorage::new());
1220 let service = AqlServiceImpl::new(storage.clone());
1221
1222 let key = Key::from_str("test_key");
1223 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1224
1225 let query = Query::Set {
1226 collection: "test".to_string(),
1227 key: key.clone(),
1228 value: value.clone(),
1229 };
1230
1231 let result = service.execute_query_internal(query).await;
1232 assert!(result.is_ok());
1233
1234 let stored = storage.get(&key).await.expect("Failed to get");
1236 assert!(stored.is_some());
1237 assert_eq!(stored.expect("No value"), value);
1238 }
1239
1240 #[tokio::test]
1241 async fn test_delete_query_execution() {
1242 let storage = Arc::new(MemoryStorage::new());
1243 let key = Key::from_str("test_key");
1244 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1245
1246 storage.put(&key, &value).await.expect("Failed to put");
1247
1248 let service = AqlServiceImpl::new(storage.clone());
1249
1250 let query = Query::Delete {
1251 collection: "test".to_string(),
1252 key: key.clone(),
1253 };
1254
1255 let result = service.execute_query_internal(query).await;
1256 assert!(result.is_ok());
1257
1258 let stored = storage.get(&key).await.expect("Failed to get");
1260 assert!(stored.is_none());
1261 }
1262
1263 #[tokio::test]
1264 async fn test_range_query_execution() {
1265 let storage = Arc::new(MemoryStorage::new());
1266
1267 for i in 0..10 {
1269 let key = Key::from_str(&format!("key_{:02}", i));
1270 let value = CipherBlob::new(vec![i as u8]);
1271 storage.put(&key, &value).await.expect("Failed to put");
1272 }
1273
1274 let service = AqlServiceImpl::new(storage);
1275
1276 let query = Query::Range {
1277 collection: "test".to_string(),
1278 start: Key::from_str("key_03"),
1279 end: Key::from_str("key_07"),
1280 };
1281
1282 let result = service.execute_query_internal(query).await;
1283 assert!(result.is_ok());
1284
1285 let query_result = result.expect("Query failed");
1286 match query_result.result {
1287 Some(query::query_result::Result::Multi(multi)) => {
1288 assert!(!multi.values.is_empty());
1289 }
1290 _ => panic!("Expected multi result"),
1291 }
1292 }
1293
1294 #[tokio::test]
1295 async fn test_get_nonexistent_key() {
1296 let storage = Arc::new(MemoryStorage::new());
1297 let service = AqlServiceImpl::new(storage);
1298
1299 let query = Query::Get {
1300 collection: "test".to_string(),
1301 key: Key::from_str("nonexistent"),
1302 };
1303
1304 let result = service.execute_query_internal(query).await;
1305 assert!(result.is_ok());
1306
1307 let query_result = result.expect("Query failed");
1308 match query_result.result {
1309 Some(query::query_result::Result::Single(single)) => {
1310 assert!(single.value.is_none());
1311 }
1312 _ => panic!("Expected single result"),
1313 }
1314 }
1315
1316 #[tokio::test]
1317 async fn test_health_check() {
1318 let storage = Arc::new(MemoryStorage::new());
1319 let service = AqlServiceImpl::new(storage);
1320
1321 let request = aql::HealthCheckRequest { service: None };
1322 let response = service.health_check(request).await;
1323
1324 assert_eq!(response.status, aql::HealthStatus::HealthServing as i32);
1325 }
1326
1327 #[tokio::test]
1328 async fn test_server_info() {
1329 let storage = Arc::new(MemoryStorage::new());
1330 let service = AqlServiceImpl::new(storage);
1331
1332 let request = aql::ServerInfoRequest {};
1333 let response = service.get_server_info(request).await;
1334
1335 assert!(response.version.is_some());
1336 assert!(!response.capabilities.is_empty());
1337 assert!(response.capabilities.contains(&"query.get".to_string()));
1338 }
1339
1340 #[cfg(feature = "compute")]
1341 #[tokio::test]
1342 async fn test_server_info_advertises_filter() {
1343 let storage = Arc::new(MemoryStorage::new());
1344 let service = AqlServiceImpl::new(storage);
1345
1346 let request = aql::ServerInfoRequest {};
1347 let response = service.get_server_info(request).await;
1348
1349 assert!(
1350 response.capabilities.contains(&"query.filter".to_string()),
1351 "capabilities should advertise query.filter when compute feature is enabled"
1352 );
1353 }
1354
1355 #[cfg(feature = "compute")]
1356 #[tokio::test]
1357 async fn test_filter_query_execution() {
1358 use amaters_core::{ColumnRef, Predicate};
1359
1360 let storage = Arc::new(MemoryStorage::new());
1361
1362 for i in 0u8..5 {
1365 let key = Key::from_str(&format!("row_{:02}", i));
1366 let value = CipherBlob::new(vec![i]);
1367 storage
1368 .put(&key, &value)
1369 .await
1370 .expect("Failed to insert test data");
1371 }
1372
1373 let service = AqlServiceImpl::new(storage);
1374
1375 let rhs_blob = CipherBlob::new(vec![2]);
1377 let predicate = Predicate::Gt(ColumnRef::new("value".to_string()), rhs_blob);
1378
1379 let filter_query = Query::Filter {
1380 collection: "test".to_string(),
1381 predicate,
1382 };
1383
1384 let result = service
1385 .execute_query_internal(filter_query)
1386 .await
1387 .expect("plaintext filter query should succeed");
1388
1389 match result.result {
1390 Some(query::query_result::Result::Multi(multi)) => {
1391 assert_eq!(
1393 multi.values.len(),
1394 2,
1395 "expected 2 matching rows (values 3 and 4)"
1396 );
1397 for kv in &multi.values {
1399 assert!(
1400 kv.encrypted_predicate_result.is_none(),
1401 "plaintext filter results should not carry encrypted_predicate_result"
1402 );
1403 }
1404 }
1405 other => panic!("Expected Multi result from filter query, got {:?}", other),
1406 }
1407 }
1408
1409 #[cfg(not(feature = "compute"))]
1410 #[tokio::test]
1411 async fn test_filter_query_requires_compute_feature() {
1412 use amaters_core::{ColumnRef, Predicate};
1413
1414 let storage = Arc::new(MemoryStorage::new());
1415 let service = AqlServiceImpl::new(storage);
1416
1417 let rhs_blob = CipherBlob::new(vec![1]);
1418 let predicate = Predicate::Gt(ColumnRef::new("value".to_string()), rhs_blob);
1419
1420 let filter_query = Query::Filter {
1421 collection: "test".to_string(),
1422 predicate,
1423 };
1424
1425 let result = service.execute_query_internal(filter_query).await;
1426 assert!(
1427 result.is_err(),
1428 "Filter should fail without compute feature"
1429 );
1430 let err_msg = result
1431 .as_ref()
1432 .err()
1433 .map(|e| e.to_string())
1434 .unwrap_or_default();
1435 assert!(
1436 err_msg.contains("compute feature"),
1437 "Error should mention compute feature: {}",
1438 err_msg
1439 );
1440 }
1441
1442 #[cfg(not(feature = "compute"))]
1450 fn dummy_predicate() -> amaters_core::Predicate {
1451 amaters_core::Predicate::Eq(
1452 amaters_core::ColumnRef::new("col"),
1453 CipherBlob::new(vec![0]),
1454 )
1455 }
1456
1457 #[cfg(not(feature = "compute"))]
1458 #[tokio::test]
1459 async fn test_update_set_single_key() {
1460 let storage = Arc::new(MemoryStorage::new());
1461 let key = Key::from_str("row_00");
1462 let original = CipherBlob::new(vec![10, 20, 30]);
1463 storage.put(&key, &original).await.expect("Failed to put");
1464
1465 let service = AqlServiceImpl::new(storage.clone());
1466
1467 let new_blob = CipherBlob::new(vec![99, 88, 77]);
1468 let query = Query::Update {
1469 collection: "test".to_string(),
1470 predicate: dummy_predicate(),
1471 updates: vec![amaters_core::Update::Set(
1472 amaters_core::ColumnRef::new("val"),
1473 new_blob.clone(),
1474 )],
1475 };
1476
1477 let result = service
1478 .execute_query_internal(query)
1479 .await
1480 .expect("Update failed");
1481 match result.result {
1482 Some(query::query_result::Result::Success(s)) => {
1483 assert_eq!(s.affected_rows, 1);
1484 }
1485 other => panic!("Expected Success, got {:?}", other),
1486 }
1487
1488 let stored = storage
1489 .get(&key)
1490 .await
1491 .expect("Failed to get")
1492 .expect("Key missing after update");
1493 assert_eq!(stored, new_blob);
1494 }
1495
1496 #[cfg(not(feature = "compute"))]
1497 #[tokio::test]
1498 async fn test_update_set_multiple_keys() {
1499 let storage = Arc::new(MemoryStorage::new());
1500
1501 for i in 0u8..5 {
1502 let key = Key::from_str(&format!("row_{:02}", i));
1503 let value = CipherBlob::new(vec![i]);
1504 storage.put(&key, &value).await.expect("Failed to put");
1505 }
1506
1507 let service = AqlServiceImpl::new(storage.clone());
1508
1509 let replacement = CipherBlob::new(vec![255]);
1510 let query = Query::Update {
1511 collection: "data".to_string(),
1512 predicate: dummy_predicate(),
1513 updates: vec![amaters_core::Update::Set(
1514 amaters_core::ColumnRef::new("v"),
1515 replacement.clone(),
1516 )],
1517 };
1518
1519 let result = service
1520 .execute_query_internal(query)
1521 .await
1522 .expect("Update failed");
1523 match result.result {
1524 Some(query::query_result::Result::Success(s)) => {
1525 assert_eq!(s.affected_rows, 5);
1526 }
1527 other => panic!("Expected Success, got {:?}", other),
1528 }
1529
1530 for i in 0u8..5 {
1532 let key = Key::from_str(&format!("row_{:02}", i));
1533 let stored = storage
1534 .get(&key)
1535 .await
1536 .expect("Failed to get")
1537 .expect("Key missing");
1538 assert_eq!(stored, replacement);
1539 }
1540 }
1541
1542 #[cfg(not(feature = "compute"))]
1543 #[tokio::test]
1544 async fn test_update_nonexistent_collection() {
1545 let storage = Arc::new(MemoryStorage::new());
1547 let service = AqlServiceImpl::new(storage);
1548
1549 let query = Query::Update {
1550 collection: "ghost".to_string(),
1551 predicate: dummy_predicate(),
1552 updates: vec![amaters_core::Update::Set(
1553 amaters_core::ColumnRef::new("x"),
1554 CipherBlob::new(vec![1]),
1555 )],
1556 };
1557
1558 let result = service
1559 .execute_query_internal(query)
1560 .await
1561 .expect("Update on empty storage should not error");
1562 match result.result {
1563 Some(query::query_result::Result::Success(s)) => {
1564 assert_eq!(s.affected_rows, 0);
1565 }
1566 other => panic!("Expected Success with 0 rows, got {:?}", other),
1567 }
1568 }
1569
1570 #[cfg(not(feature = "compute"))]
1571 #[tokio::test]
1572 async fn test_update_add_operation() {
1573 let storage = Arc::new(MemoryStorage::new());
1574 let key = Key::from_str("counter");
1575 let original = CipherBlob::new(vec![10, 20]);
1576 storage.put(&key, &original).await.expect("Failed to put");
1577
1578 let service = AqlServiceImpl::new(storage.clone());
1579
1580 let addend = CipherBlob::new(vec![5, 3]);
1581 let query = Query::Update {
1582 collection: "c".to_string(),
1583 predicate: dummy_predicate(),
1584 updates: vec![amaters_core::Update::Add(
1585 amaters_core::ColumnRef::new("v"),
1586 addend,
1587 )],
1588 };
1589
1590 service
1591 .execute_query_internal(query)
1592 .await
1593 .expect("Update failed");
1594
1595 let stored = storage
1596 .get(&key)
1597 .await
1598 .expect("Failed to get")
1599 .expect("Key missing");
1600 assert_eq!(stored.as_bytes(), &[15, 23]);
1601 }
1602
1603 #[cfg(not(feature = "compute"))]
1604 #[tokio::test]
1605 async fn test_update_mul_operation() {
1606 let storage = Arc::new(MemoryStorage::new());
1607 let key = Key::from_str("product");
1608 let original = CipherBlob::new(vec![3, 4]);
1609 storage.put(&key, &original).await.expect("Failed to put");
1610
1611 let service = AqlServiceImpl::new(storage.clone());
1612
1613 let factor = CipherBlob::new(vec![2, 5]);
1614 let query = Query::Update {
1615 collection: "c".to_string(),
1616 predicate: dummy_predicate(),
1617 updates: vec![amaters_core::Update::Mul(
1618 amaters_core::ColumnRef::new("v"),
1619 factor,
1620 )],
1621 };
1622
1623 service
1624 .execute_query_internal(query)
1625 .await
1626 .expect("Update failed");
1627
1628 let stored = storage
1629 .get(&key)
1630 .await
1631 .expect("Failed to get")
1632 .expect("Key missing");
1633 assert_eq!(stored.as_bytes(), &[6, 20]);
1634 }
1635
1636 #[cfg(not(feature = "compute"))]
1637 #[tokio::test]
1638 async fn test_update_multiple_operations_per_key() {
1639 let storage = Arc::new(MemoryStorage::new());
1640 let key = Key::from_str("multi_op");
1641 let original = CipherBlob::new(vec![2]);
1642 storage.put(&key, &original).await.expect("Failed to put");
1643
1644 let service = AqlServiceImpl::new(storage.clone());
1645
1646 let query = Query::Update {
1648 collection: "c".to_string(),
1649 predicate: dummy_predicate(),
1650 updates: vec![
1651 amaters_core::Update::Add(
1652 amaters_core::ColumnRef::new("v"),
1653 CipherBlob::new(vec![3]),
1654 ),
1655 amaters_core::Update::Mul(
1656 amaters_core::ColumnRef::new("v"),
1657 CipherBlob::new(vec![10]),
1658 ),
1659 ],
1660 };
1661
1662 service
1663 .execute_query_internal(query)
1664 .await
1665 .expect("Update failed");
1666
1667 let stored = storage
1668 .get(&key)
1669 .await
1670 .expect("Failed to get")
1671 .expect("Key missing");
1672 assert_eq!(stored.as_bytes(), &[50]);
1673 }
1674
1675 #[cfg(not(feature = "compute"))]
1676 #[tokio::test]
1677 async fn test_update_returns_affected_count() {
1678 let storage = Arc::new(MemoryStorage::new());
1679
1680 for i in 0u8..7 {
1682 let key = Key::from_str(&format!("k{}", i));
1683 storage
1684 .put(&key, &CipherBlob::new(vec![i]))
1685 .await
1686 .expect("Failed to put");
1687 }
1688
1689 let service = AqlServiceImpl::new(storage);
1690
1691 let query = Query::Update {
1692 collection: "c".to_string(),
1693 predicate: dummy_predicate(),
1694 updates: vec![amaters_core::Update::Set(
1695 amaters_core::ColumnRef::new("v"),
1696 CipherBlob::new(vec![0]),
1697 )],
1698 };
1699
1700 let result = service
1701 .execute_query_internal(query)
1702 .await
1703 .expect("Update failed");
1704 match result.result {
1705 Some(query::query_result::Result::Success(s)) => {
1706 assert_eq!(s.affected_rows, 7);
1707 }
1708 other => panic!("Expected Success with 7 rows, got {:?}", other),
1709 }
1710 }
1711
1712 #[cfg(not(feature = "compute"))]
1713 #[tokio::test]
1714 async fn test_update_preserves_other_collections() {
1715 let storage = Arc::new(MemoryStorage::new());
1718
1719 let key_a = Key::from_str("collA_row1");
1720 let key_b = Key::from_str("collB_row1");
1721 let val_a = CipherBlob::new(vec![1, 2, 3]);
1722 let val_b = CipherBlob::new(vec![4, 5, 6]);
1723
1724 storage.put(&key_a, &val_a).await.expect("Failed to put A");
1725 storage.put(&key_b, &val_b).await.expect("Failed to put B");
1726
1727 let service = AqlServiceImpl::new(storage.clone());
1728
1729 let query = Query::Update {
1731 collection: "collA".to_string(),
1732 predicate: dummy_predicate(),
1733 updates: vec![amaters_core::Update::Set(
1734 amaters_core::ColumnRef::new("v"),
1735 CipherBlob::new(vec![99]),
1736 )],
1737 };
1738
1739 service
1740 .execute_query_internal(query)
1741 .await
1742 .expect("Update failed");
1743
1744 let stored_a = storage.get(&key_a).await.expect("Failed to get A");
1746 assert!(stored_a.is_some(), "key_a should still exist");
1747
1748 let stored_b = storage.get(&key_b).await.expect("Failed to get B");
1749 assert!(stored_b.is_some(), "key_b should still exist");
1750 }
1751
1752 #[cfg(not(feature = "compute"))]
1753 #[tokio::test]
1754 async fn test_update_empty_updates_vec() {
1755 let storage = Arc::new(MemoryStorage::new());
1757 let key = Key::from_str("keep_me");
1758 let original = CipherBlob::new(vec![42]);
1759 storage.put(&key, &original).await.expect("Failed to put");
1760
1761 let service = AqlServiceImpl::new(storage.clone());
1762
1763 let query = Query::Update {
1764 collection: "c".to_string(),
1765 predicate: dummy_predicate(),
1766 updates: vec![], };
1768
1769 let result = service
1770 .execute_query_internal(query)
1771 .await
1772 .expect("Update with empty ops should succeed");
1773 match result.result {
1774 Some(query::query_result::Result::Success(s)) => {
1775 assert_eq!(s.affected_rows, 1);
1777 }
1778 other => panic!("Expected Success, got {:?}", other),
1779 }
1780
1781 let stored = storage
1783 .get(&key)
1784 .await
1785 .expect("Failed to get")
1786 .expect("Key missing");
1787 assert_eq!(stored, original);
1788 }
1789
1790 #[cfg(not(feature = "compute"))]
1791 #[tokio::test]
1792 async fn test_update_then_select_verifies_changes() {
1793 let storage = Arc::new(MemoryStorage::new());
1794
1795 for i in 0u8..3 {
1797 let key = Key::from_str(&format!("sel_{:02}", i));
1798 let value = CipherBlob::new(vec![i, i, i]);
1799 storage.put(&key, &value).await.expect("Failed to put");
1800 }
1801
1802 let service = AqlServiceImpl::new(storage.clone());
1803
1804 let update_query = Query::Update {
1806 collection: "c".to_string(),
1807 predicate: dummy_predicate(),
1808 updates: vec![amaters_core::Update::Add(
1809 amaters_core::ColumnRef::new("v"),
1810 CipherBlob::new(vec![1, 1, 1]),
1811 )],
1812 };
1813
1814 service
1815 .execute_query_internal(update_query)
1816 .await
1817 .expect("Update failed");
1818
1819 for i in 0u8..3 {
1821 let key = Key::from_str(&format!("sel_{:02}", i));
1822 let get_query = Query::Get {
1823 collection: "c".to_string(),
1824 key: key.clone(),
1825 };
1826
1827 let result = service
1828 .execute_query_internal(get_query)
1829 .await
1830 .expect("Get failed");
1831
1832 match result.result {
1833 Some(query::query_result::Result::Single(single)) => {
1834 let proto_val = single.value.expect("Expected value from get");
1835 let expected = vec![i + 1, i + 1, i + 1];
1837 assert_eq!(
1838 proto_val.data, expected,
1839 "Row sel_{:02} should have been updated",
1840 i
1841 );
1842 }
1843 other => panic!("Expected Single result, got {:?}", other),
1844 }
1845 }
1846 }
1847
1848 #[cfg(feature = "compute")]
1853 #[tokio::test]
1854 async fn test_update_with_compute_feature() {
1855 use amaters_core::{ColumnRef, Predicate};
1856
1857 let storage = Arc::new(MemoryStorage::new());
1858
1859 for i in 0u8..3 {
1860 let key = Key::from_str(&format!("row_{:02}", i));
1861 let value = CipherBlob::new(vec![i]);
1862 storage
1863 .put(&key, &value)
1864 .await
1865 .expect("Failed to insert test data");
1866 }
1867
1868 let service = AqlServiceImpl::new(storage);
1869
1870 let rhs_blob = CipherBlob::new(vec![1]);
1871 let predicate = Predicate::Eq(ColumnRef::new("value"), rhs_blob);
1872
1873 let update_query = Query::Update {
1874 collection: "test".to_string(),
1875 predicate,
1876 updates: vec![amaters_core::Update::Set(
1877 ColumnRef::new("v"),
1878 CipherBlob::new(vec![99]),
1879 )],
1880 };
1881
1882 let result = service.execute_query_internal(update_query).await;
1883
1884 match result {
1886 Ok(query_result) => {
1887 match query_result.result {
1888 Some(query::query_result::Result::Success(s)) => {
1889 assert!(s.affected_rows <= 3);
1891 }
1892 other => panic!("Expected Success result from update, got {:?}", other),
1893 }
1894 }
1895 Err(e) => {
1896 let msg = e.to_string();
1897 assert!(
1898 msg.contains("FHE")
1899 || msg.contains("fhe")
1900 || msg.contains("Predicate compilation")
1901 || msg.contains("compilation failed")
1902 || msg.contains("execution")
1903 || msg.contains("RHS"),
1904 "Unexpected error from update query: {}",
1905 msg
1906 );
1907 }
1908 }
1909 }
1910
1911 include!("server_rollback_tests.rs");
1913
1914 #[tokio::test]
1919 async fn test_compression_feature_gate_disabled() {
1920 let storage = Arc::new(MemoryStorage::new());
1921 let builder = AqlServerBuilder::new(storage);
1922 let _server = builder.build_grpc_service();
1924 }
1926}