Skip to main content

amaters_net/
server.rs

1//! gRPC server implementation for AmateRS AQL Service
2//!
3//! This module provides the server implementation that connects the network layer
4//! with the storage engine to handle client requests.
5
6use crate::convert::{cipher_blob_to_proto, create_version, key_to_proto, query_from_proto};
7use crate::error::{NetError, NetResult};
8use crate::proto::{aql, query};
9use crate::server_admin::{LogEntry, push_log_entry};
10use amaters_core::Query;
11use amaters_core::Update as UpdateOp;
12use amaters_core::traits::StorageEngine;
13use amaters_core::types::{CipherBlob, Key};
14use futures::StreamExt;
15use parking_lot::RwLock;
16use std::collections::VecDeque;
17use std::sync::Arc;
18use std::time::Instant;
19use tracing::{debug, error, info, warn};
20
21#[cfg(feature = "compute")]
22use amaters_core::compute::{FheExecutor, KeyManager, PredicateCompiler};
23#[cfg(feature = "compute")]
24use std::collections::HashMap;
25
26/// AQL service implementation
27///
28/// This service handles all AQL query requests and connects them to the underlying storage engine.
29pub struct AqlServiceImpl<S: StorageEngine> {
30    /// Storage engine for executing queries
31    storage: Arc<S>,
32    /// Server start time for uptime calculation
33    start_time: Instant,
34    /// Ring buffer for recent log entries (capacity: 256).
35    recent_log: Arc<RwLock<VecDeque<LogEntry>>>,
36    /// FHE key manager for encrypted operations
37    #[cfg(feature = "compute")]
38    key_manager: Arc<KeyManager>,
39}
40
41impl<S: StorageEngine> AqlServiceImpl<S> {
42    /// Create a new AQL service with the given storage engine
43    #[cfg(feature = "compute")]
44    pub fn new(storage: Arc<S>) -> Self {
45        Self {
46            storage,
47            start_time: Instant::now(),
48            recent_log: Arc::new(RwLock::new(VecDeque::new())),
49            key_manager: Arc::new(KeyManager::new()),
50        }
51    }
52
53    /// Create a new AQL service with the given storage engine (without compute)
54    #[cfg(not(feature = "compute"))]
55    pub fn new(storage: Arc<S>) -> Self {
56        Self {
57            storage,
58            start_time: Instant::now(),
59            recent_log: Arc::new(RwLock::new(VecDeque::new())),
60        }
61    }
62
63    /// Create a new AQL service with a custom key manager
64    #[cfg(feature = "compute")]
65    pub fn with_key_manager(storage: Arc<S>, key_manager: Arc<KeyManager>) -> Self {
66        Self {
67            storage,
68            start_time: Instant::now(),
69            recent_log: Arc::new(RwLock::new(VecDeque::new())),
70            key_manager,
71        }
72    }
73
74    /// Execute a query and return the result
75    pub async fn execute_query(&self, request: aql::QueryRequest) -> aql::QueryResponse {
76        let start_time = Instant::now();
77
78        info!(
79            "ExecuteQuery request received: request_id={:?}",
80            request.request_id
81        );
82
83        // Extract and validate the query
84        let proto_query = match request.query {
85            Some(q) => q,
86            None => {
87                let execution_time_ms = start_time.elapsed().as_millis() as u64;
88                return aql::QueryResponse {
89                    response: Some(aql::query_response::Response::Error(
90                        crate::proto::errors::ErrorResponse {
91                            code: crate::proto::errors::ErrorCode::ErrorProtocolMissingField as i32,
92                            message: "Missing query in request".to_string(),
93                            category: crate::proto::errors::ErrorCategory::CategoryClientError
94                                as i32,
95                            details: None,
96                            retry_after: None,
97                        },
98                    )),
99                    request_id: request.request_id,
100                    execution_time_ms,
101                };
102            }
103        };
104
105        let query = match query_from_proto(proto_query) {
106            Ok(q) => q,
107            Err(e) => {
108                error!("Failed to parse query: {}", e);
109                let execution_time_ms = start_time.elapsed().as_millis() as u64;
110                return aql::QueryResponse {
111                    response: Some(aql::query_response::Response::Error(
112                        crate::proto::errors::ErrorResponse {
113                            code: e.error_code() as i32,
114                            message: e.to_string(),
115                            category: e.error_category() as i32,
116                            details: None,
117                            retry_after: None,
118                        },
119                    )),
120                    request_id: request.request_id,
121                    execution_time_ms,
122                };
123            }
124        };
125
126        // Execute the query
127        let result = self.execute_query_internal(query).await;
128
129        let execution_time_ms = start_time.elapsed().as_millis() as u64;
130
131        // Build response
132        let response = match result {
133            Ok(query_result) => aql::QueryResponse {
134                response: Some(aql::query_response::Response::Result(query_result)),
135                request_id: request.request_id,
136                execution_time_ms,
137            },
138            Err(e) => {
139                error!("Query execution failed: {}", e);
140                push_log_entry(
141                    &self.recent_log,
142                    format!("ExecuteQuery elapsed={}ms error={}", execution_time_ms, e),
143                );
144                return aql::QueryResponse {
145                    response: Some(aql::query_response::Response::Error(
146                        crate::proto::errors::ErrorResponse {
147                            code: e.error_code() as i32,
148                            message: e.to_string(),
149                            category: e.error_category() as i32,
150                            details: None,
151                            retry_after: None,
152                        },
153                    )),
154                    request_id: request.request_id,
155                    execution_time_ms,
156                };
157            }
158        };
159        push_log_entry(
160            &self.recent_log,
161            format!("ExecuteQuery elapsed={}ms ok", execution_time_ms),
162        );
163        response
164    }
165
166    /// Execute a query and return the result
167    ///
168    /// This is an internal method used for testing and direct query execution.
169    /// For production use, prefer `execute_query` which handles protocol details.
170    #[doc(hidden)]
171    #[tracing::instrument(skip(self), fields(trace_id = tracing::field::Empty, duration_us = tracing::field::Empty))]
172    pub async fn execute_query_internal(&self, query: Query) -> NetResult<query::QueryResult> {
173        match query {
174            Query::Get { collection, key } => {
175                debug!(
176                    "Executing GET query: collection={}, key={:?}",
177                    collection, key
178                );
179
180                // Intercept __admin__:<command> keys and dispatch to built-in handlers.
181                // The CLI encodes admin commands as Get queries with a special key prefix so
182                // that the admin wire protocol works over the existing gRPC path without a
183                // dedicated RPC.  Keys that are not admin commands fall through to storage as
184                // normal.
185                let key_str = key.to_string_lossy();
186                if let Some(admin_cmd) = key_str.strip_prefix("__admin__:") {
187                    if let Some(json) = self.handle_admin_command(admin_cmd).await {
188                        let blob = CipherBlob::new(json.into_bytes());
189                        return Ok(query::QueryResult {
190                            result: Some(query::query_result::Result::Single(
191                                query::SingleResult {
192                                    value: Some(cipher_blob_to_proto(&blob)),
193                                },
194                            )),
195                        });
196                    }
197                    // Unrecognised admin command — return None so CLI falls back to mock data.
198                    return Ok(query::QueryResult {
199                        result: Some(query::query_result::Result::Single(query::SingleResult {
200                            value: None,
201                        })),
202                    });
203                }
204
205                let result = self.storage.get(&key).await?;
206
207                let result = match result {
208                    Some(value) => query::QueryResult {
209                        result: Some(query::query_result::Result::Single(query::SingleResult {
210                            value: Some(cipher_blob_to_proto(&value)),
211                        })),
212                    },
213                    None => query::QueryResult {
214                        result: Some(query::query_result::Result::Single(query::SingleResult {
215                            value: None,
216                        })),
217                    },
218                };
219
220                Ok(result)
221            }
222            Query::Set {
223                collection,
224                key,
225                value,
226            } => {
227                debug!(
228                    "Executing SET query: collection={}, key={:?}",
229                    collection, key
230                );
231
232                self.storage.put(&key, &value).await?;
233
234                Ok(query::QueryResult {
235                    result: Some(query::query_result::Result::Success(query::SuccessResult {
236                        affected_rows: 1,
237                    })),
238                })
239            }
240            Query::Delete { collection, key } => {
241                debug!(
242                    "Executing DELETE query: collection={}, key={:?}",
243                    collection, key
244                );
245
246                self.storage.delete(&key).await?;
247
248                Ok(query::QueryResult {
249                    result: Some(query::query_result::Result::Success(query::SuccessResult {
250                        affected_rows: 1,
251                    })),
252                })
253            }
254            Query::Range {
255                collection,
256                start,
257                end,
258            } => {
259                debug!(
260                    "Executing RANGE query: collection={}, start={:?}, end={:?}",
261                    collection, start, end
262                );
263
264                let results = self.storage.range(&start, &end).await?;
265
266                let values: Vec<query::KeyValue> = results
267                    .into_iter()
268                    .map(|(k, v)| query::KeyValue {
269                        key: Some(key_to_proto(&k)),
270                        value: Some(cipher_blob_to_proto(&v)),
271                        encrypted_predicate_result: None,
272                    })
273                    .collect();
274
275                Ok(query::QueryResult {
276                    result: Some(query::query_result::Result::Multi(query::MultiResult {
277                        values,
278                    })),
279                })
280            }
281            Query::Filter {
282                collection,
283                predicate,
284            } => {
285                // Retrieve all candidate rows for the collection via full range scan.
286                let min_key = Key::from_slice(&[]);
287                let max_key = Key::from_slice(&[0xFF; 256]);
288
289                let all_rows = match self.storage.range(&min_key, &max_key).await {
290                    Ok(rows) => rows,
291                    Err(e) => {
292                        error!("Failed to retrieve rows for filter: {}", e);
293                        return Err(NetError::from(e));
294                    }
295                };
296
297                debug!("Filter: retrieved {} candidate rows", all_rows.len());
298
299                if all_rows.len() > 1000 {
300                    warn!(
301                        "Filter query retrieved {} rows, which may cause performance issues",
302                        all_rows.len()
303                    );
304                }
305
306                // Probe the first row to decide between plaintext and FHE mode.
307                // If evaluate_plaintext returns Some(_) for the first value, all
308                // values are assumed to be plaintext; the server filters in-place.
309                // If it returns None (FHE ciphertext detected), fall through to FHE.
310                let first_is_plaintext = all_rows
311                    .first()
312                    .map(|(_, v)| predicate.evaluate_plaintext(v).is_some())
313                    .unwrap_or(true); // empty collection → treat as plaintext (return empty)
314
315                if first_is_plaintext {
316                    info!("Executing FILTER query with server-side plaintext predicate evaluation");
317
318                    let mut results = Vec::new();
319                    let mut excluded: usize = 0;
320
321                    for (key, value_blob) in all_rows {
322                        match predicate.evaluate_plaintext(&value_blob) {
323                            Some(true) => {
324                                results.push(query::KeyValue {
325                                    key: Some(key_to_proto(&key)),
326                                    value: Some(cipher_blob_to_proto(&value_blob)),
327                                    encrypted_predicate_result: None,
328                                });
329                            }
330                            Some(false) => {
331                                // Row does not match predicate; skip it.
332                                excluded += 1;
333                            }
334                            None => {
335                                // Mid-collection the encoding switched away from plaintext.
336                                // Include the row conservatively (unknown state).
337                                warn!(
338                                    "Plaintext evaluation returned None for key {:?} mid-scan; \
339                                     including row conservatively",
340                                    key
341                                );
342                                results.push(query::KeyValue {
343                                    key: Some(key_to_proto(&key)),
344                                    value: Some(cipher_blob_to_proto(&value_blob)),
345                                    encrypted_predicate_result: None,
346                                });
347                            }
348                        }
349                    }
350
351                    info!(
352                        "FILTER query completed: {} rows matched, {} rows excluded by plaintext predicate",
353                        results.len(),
354                        excluded
355                    );
356
357                    return Ok(query::QueryResult {
358                        result: Some(query::query_result::Result::Multi(query::MultiResult {
359                            values: results,
360                        })),
361                    });
362                }
363
364                // FHE path — values are ciphertexts, use homomorphic evaluation.
365                #[cfg(feature = "compute")]
366                {
367                    info!("Executing FILTER query with FHE predicate evaluation");
368
369                    // Key isolation: Both `PredicateCompiler` and `FheExecutor` are
370                    // created as stack-local values for each filter call. This means
371                    // concurrent filter requests do not share mutable compiler or
372                    // executor state, providing per-request isolation without
373                    // additional synchronisation overhead.
374
375                    // 1. Compile predicate to FHE circuit
376                    let mut compiler = PredicateCompiler::new();
377
378                    // For now, assume U8 type - in production, this should be inferred
379                    // from the data or provided by the client
380                    let circuit = match compiler
381                        .compile(&predicate, amaters_core::compute::EncryptedType::U8)
382                    {
383                        Ok(c) => c,
384                        Err(e) => {
385                            error!("Failed to compile predicate: {}", e);
386                            return Err(NetError::ServerInternal(format!(
387                                "Predicate compilation failed: {}",
388                                e
389                            )));
390                        }
391                    };
392
393                    debug!(
394                        "Compiled predicate circuit: depth={}, gates={}",
395                        circuit.depth, circuit.gate_count
396                    );
397
398                    // 2. Extract RHS value from predicate
399                    let rhs = match PredicateCompiler::extract_rhs_value(&predicate) {
400                        Ok(r) => r,
401                        Err(e) => {
402                            error!("Failed to extract RHS value: {}", e);
403                            return Err(NetError::ServerInternal(format!(
404                                "RHS extraction failed: {}",
405                                e
406                            )));
407                        }
408                    };
409
410                    // 3. Set up FHE executor (per-request instance for isolation)
411                    let executor = FheExecutor::new();
412
413                    // 4. Execute circuit on each row and populate encrypted_predicate_result.
414                    // The client decrypts the encrypted boolean to learn which rows matched.
415                    let mut results = Vec::new();
416                    let mut execution_errors = 0;
417
418                    for (key, value_blob) in all_rows {
419                        // Build inputs: value from storage + RHS from predicate
420                        let mut inputs = HashMap::new();
421                        inputs.insert("value".to_string(), value_blob.clone());
422                        inputs.insert("rhs".to_string(), rhs.clone());
423
424                        // Execute FHE circuit - result is encrypted boolean
425                        // Catch execution errors and continue processing other rows
426                        match executor.execute(&circuit, &inputs) {
427                            Ok(result_blob) => {
428                                let result_bytes = result_blob.as_bytes().to_vec();
429
430                                debug!(
431                                    "Executed predicate on key {:?}, result blob size: {}",
432                                    key,
433                                    result_bytes.len()
434                                );
435
436                                results.push(query::KeyValue {
437                                    key: Some(key_to_proto(&key)),
438                                    value: Some(cipher_blob_to_proto(&value_blob)),
439                                    encrypted_predicate_result: Some(result_bytes),
440                                });
441                            }
442                            Err(e) => {
443                                execution_errors += 1;
444                                warn!("FHE execution failed for key {:?}: {}", key, e);
445                                // Continue processing other rows instead of failing the entire query
446                            }
447                        }
448                    }
449
450                    if execution_errors > 0 {
451                        warn!(
452                            "Filter query had {} FHE execution errors out of {} total rows",
453                            execution_errors,
454                            execution_errors + results.len()
455                        );
456                    }
457
458                    info!(
459                        "FILTER query completed, processed {} rows successfully",
460                        results.len()
461                    );
462
463                    Ok(query::QueryResult {
464                        result: Some(query::query_result::Result::Multi(query::MultiResult {
465                            values: results,
466                        })),
467                    })
468                }
469
470                #[cfg(not(feature = "compute"))]
471                {
472                    let _ = (collection, predicate);
473                    warn!("FILTER query reached FHE path but compute feature is disabled");
474                    Err(NetError::ServerInternal(
475                        "FILTER queries on encrypted values require the compute feature"
476                            .to_string(),
477                    ))
478                }
479            }
480            Query::Update {
481                collection,
482                predicate,
483                updates,
484            } => {
485                debug!(
486                    "Executing UPDATE query: collection={}, updates_count={}",
487                    collection,
488                    updates.len()
489                );
490
491                #[cfg(feature = "compute")]
492                {
493                    // With compute feature: compile predicate and evaluate against each row
494                    // to determine which rows should be updated.
495
496                    let mut compiler = PredicateCompiler::new();
497                    let circuit = match compiler
498                        .compile(&predicate, amaters_core::compute::EncryptedType::U8)
499                    {
500                        Ok(c) => c,
501                        Err(e) => {
502                            error!("Failed to compile update predicate: {}", e);
503                            return Err(NetError::ServerInternal(format!(
504                                "Update predicate compilation failed: {}",
505                                e
506                            )));
507                        }
508                    };
509
510                    let rhs = match PredicateCompiler::extract_rhs_value(&predicate) {
511                        Ok(r) => r,
512                        Err(e) => {
513                            error!("Failed to extract RHS value for update predicate: {}", e);
514                            return Err(NetError::ServerInternal(format!(
515                                "Update RHS extraction failed: {}",
516                                e
517                            )));
518                        }
519                    };
520
521                    let executor = FheExecutor::new();
522
523                    // Get all candidate rows
524                    let min_key = Key::from_slice(&[]);
525                    let max_key = Key::from_slice(&[0xFF; 256]);
526                    let all_rows = self.storage.range(&min_key, &max_key).await?;
527
528                    let mut affected_rows: u64 = 0;
529
530                    for (key, value_blob) in &all_rows {
531                        // Build inputs for predicate evaluation
532                        let mut inputs = HashMap::new();
533                        inputs.insert("value".to_string(), value_blob.clone());
534                        inputs.insert("rhs".to_string(), rhs.clone());
535
536                        // Evaluate predicate; on error skip this row
537                        let matches = match executor.execute(&circuit, &inputs) {
538                            Ok(result_blob) => {
539                                // Check if result is truthy (any non-zero byte)
540                                result_blob.as_bytes().iter().any(|&b| b != 0)
541                            }
542                            Err(e) => {
543                                warn!("FHE predicate evaluation failed for key {:?}: {}", key, e);
544                                continue;
545                            }
546                        };
547
548                        if !matches {
549                            continue;
550                        }
551
552                        // Apply updates to matching row
553                        let mut current_value = value_blob.clone();
554                        for update_op in &updates {
555                            current_value = apply_update_operation(&current_value, update_op);
556                        }
557
558                        self.storage.put(key, &current_value).await?;
559                        affected_rows += 1;
560                    }
561
562                    info!(
563                        "UPDATE query completed: {} rows affected out of {} total",
564                        affected_rows,
565                        all_rows.len()
566                    );
567
568                    Ok(query::QueryResult {
569                        result: Some(query::query_result::Result::Success(query::SuccessResult {
570                            affected_rows,
571                        })),
572                    })
573                }
574
575                #[cfg(not(feature = "compute"))]
576                {
577                    // Without compute feature: apply updates to ALL rows in the collection.
578                    // We cannot evaluate predicates without FHE support, so we treat
579                    // the update as unconditional.
580                    let _ = predicate;
581
582                    let all_keys = self.storage.keys().await?;
583
584                    if all_keys.is_empty() {
585                        info!(
586                            "UPDATE query on collection '{}': no keys found, 0 rows affected",
587                            collection
588                        );
589                        return Ok(query::QueryResult {
590                            result: Some(query::query_result::Result::Success(
591                                query::SuccessResult { affected_rows: 0 },
592                            )),
593                        });
594                    }
595
596                    let mut affected_rows: u64 = 0;
597
598                    for key in &all_keys {
599                        let value_opt = self.storage.get(key).await?;
600                        let current_value = match value_opt {
601                            Some(v) => v,
602                            None => continue,
603                        };
604
605                        let mut updated_value = current_value;
606                        for update_op in &updates {
607                            updated_value = apply_update_operation(&updated_value, update_op);
608                        }
609
610                        self.storage.put(key, &updated_value).await?;
611                        affected_rows += 1;
612                    }
613
614                    info!(
615                        "UPDATE query completed: {} rows affected in collection '{}'",
616                        affected_rows, collection
617                    );
618
619                    Ok(query::QueryResult {
620                        result: Some(query::query_result::Result::Success(query::SuccessResult {
621                            affected_rows,
622                        })),
623                    })
624                }
625            }
626        }
627    }
628
629    /// Execute a batch of queries as a transaction
630    ///
631    /// All queries are executed sequentially. If any query fails, all previously
632    /// completed write operations (Set/Delete) are rolled back, and an error
633    /// response is returned. Read-only operations (Get/Range) are not tracked
634    /// for rollback since they don't mutate state.
635    #[tracing::instrument(skip(self, request), fields(trace_id = tracing::field::Empty, query_count = request.queries.len(), duration_us = tracing::field::Empty))]
636    pub async fn execute_batch(&self, request: aql::BatchRequest) -> aql::BatchResponse {
637        let start_time = Instant::now();
638
639        info!(
640            "ExecuteBatch request received: request_id={:?}, query_count={}",
641            request.request_id,
642            request.queries.len()
643        );
644
645        // Handle empty batch
646        if request.queries.is_empty() {
647            let execution_time_ms = start_time.elapsed().as_millis() as u64;
648            return aql::BatchResponse {
649                response: Some(aql::batch_response::Response::Results(aql::BatchResult {
650                    results: Vec::new(),
651                })),
652                request_id: request.request_id,
653                execution_time_ms,
654            };
655        }
656
657        let mut results = Vec::with_capacity(request.queries.len());
658        let mut rollback_ops: Vec<RollbackOp> = Vec::new();
659
660        for (idx, proto_query) in request.queries.into_iter().enumerate() {
661            // Convert proto query to core query
662            let core_query = match query_from_proto(proto_query) {
663                Ok(q) => q,
664                Err(e) => {
665                    error!("Failed to parse query {} in batch: {}", idx, e);
666                    // Rollback all completed write operations
667                    self.rollback_operations(&rollback_ops).await;
668                    let execution_time_ms = start_time.elapsed().as_millis() as u64;
669                    push_log_entry(
670                        &self.recent_log,
671                        format!(
672                            "ExecuteBatch elapsed={}ms error=parse_query_{}: {}",
673                            execution_time_ms, idx, e
674                        ),
675                    );
676                    return aql::BatchResponse {
677                        response: Some(aql::batch_response::Response::Error(
678                            crate::proto::errors::ErrorResponse {
679                                code: e.error_code() as i32,
680                                message: format!("Query {} in batch failed to parse: {}", idx, e),
681                                category: e.error_category() as i32,
682                                details: None,
683                                retry_after: None,
684                            },
685                        )),
686                        request_id: request.request_id,
687                        execution_time_ms,
688                    };
689                }
690            };
691
692            // Track rollback info before executing write operations
693            let rollback_op = self.build_rollback_op(&core_query).await;
694
695            match self.execute_query_internal(core_query).await {
696                Ok(query_result) => {
697                    // Record the rollback operation only after successful execution
698                    if let Some(op) = rollback_op {
699                        rollback_ops.push(op);
700                    }
701                    results.push(query_result);
702                }
703                Err(e) => {
704                    error!("Query {} in batch failed: {}", idx, e);
705                    // Rollback all completed write operations
706                    self.rollback_operations(&rollback_ops).await;
707                    let execution_time_ms = start_time.elapsed().as_millis() as u64;
708                    push_log_entry(
709                        &self.recent_log,
710                        format!(
711                            "ExecuteBatch elapsed={}ms error=query_{}: {}",
712                            execution_time_ms, idx, e
713                        ),
714                    );
715                    return aql::BatchResponse {
716                        response: Some(aql::batch_response::Response::Error(
717                            crate::proto::errors::ErrorResponse {
718                                code: e.error_code() as i32,
719                                message: format!("Query {} in batch failed: {}", idx, e),
720                                category: e.error_category() as i32,
721                                details: None,
722                                retry_after: None,
723                            },
724                        )),
725                        request_id: request.request_id,
726                        execution_time_ms,
727                    };
728                }
729            }
730        }
731
732        let execution_time_ms = start_time.elapsed().as_millis() as u64;
733        info!(
734            "ExecuteBatch completed successfully: {} queries in {}ms",
735            results.len(),
736            execution_time_ms
737        );
738        push_log_entry(
739            &self.recent_log,
740            format!(
741                "ExecuteBatch elapsed={}ms queries={} ok",
742                execution_time_ms,
743                results.len()
744            ),
745        );
746
747        aql::BatchResponse {
748            response: Some(aql::batch_response::Response::Results(aql::BatchResult {
749                results,
750            })),
751            request_id: request.request_id,
752            execution_time_ms,
753        }
754    }
755
756    /// Build a rollback operation for a query (before executing it)
757    ///
758    /// For Set operations: save the old value (if any) so we can restore it
759    /// For Delete operations: save the current value so we can re-insert it
760    /// For Update operations: snapshot all current key-value pairs so we can restore them
761    /// For Get/Range/Filter: no rollback needed (read-only)
762    async fn build_rollback_op(&self, query: &Query) -> Option<RollbackOp> {
763        match query {
764            Query::Set { key, .. } => {
765                // Capture the old value before overwriting
766                let old_value = match self.storage.get(key).await {
767                    Ok(v) => v,
768                    Err(e) => {
769                        warn!("Failed to read old value for rollback tracking: {}", e);
770                        None
771                    }
772                };
773                Some(RollbackOp::UndoSet {
774                    key: key.clone(),
775                    old_value,
776                })
777            }
778            Query::Delete { key, .. } => {
779                // Capture the current value before deleting
780                let old_value = match self.storage.get(key).await {
781                    Ok(v) => v,
782                    Err(e) => {
783                        warn!("Failed to read value for rollback tracking: {}", e);
784                        None
785                    }
786                };
787                Some(RollbackOp::UndoDelete {
788                    key: key.clone(),
789                    old_value,
790                })
791            }
792            Query::Update { .. } => {
793                // Capture all current key-value pairs before the update modifies them
794                let keys = match self.storage.keys().await {
795                    Ok(k) => k,
796                    Err(e) => {
797                        warn!("Failed to list keys for update rollback tracking: {}", e);
798                        return Some(RollbackOp::UndoUpdate {
799                            snapshots: Vec::new(),
800                        });
801                    }
802                };
803                let mut snapshots = Vec::with_capacity(keys.len());
804                for key in &keys {
805                    let value = match self.storage.get(key).await {
806                        Ok(v) => v,
807                        Err(e) => {
808                            warn!(
809                                "Failed to read value for key {:?} during update rollback tracking: {}",
810                                key, e
811                            );
812                            None
813                        }
814                    };
815                    snapshots.push((key.clone(), value));
816                }
817                Some(RollbackOp::UndoUpdate { snapshots })
818            }
819            // Read-only operations don't need rollback
820            Query::Get { .. } | Query::Range { .. } | Query::Filter { .. } => None,
821        }
822    }
823
824    /// Rollback completed write operations in reverse order
825    ///
826    /// Best-effort rollback: if a rollback operation itself fails, we log
827    /// a warning and continue rolling back remaining operations.
828    async fn rollback_operations(&self, ops: &[RollbackOp]) {
829        if ops.is_empty() {
830            return;
831        }
832
833        warn!("Rolling back {} operations due to batch failure", ops.len());
834
835        for (idx, op) in ops.iter().rev().enumerate() {
836            match op {
837                RollbackOp::UndoSet { key, old_value } => {
838                    match old_value {
839                        Some(value) => {
840                            // Restore the old value
841                            if let Err(e) = self.storage.put(key, value).await {
842                                error!(
843                                    "Rollback failed for UndoSet (restore) at index {}: {}",
844                                    idx, e
845                                );
846                            } else {
847                                debug!("Rolled back Set: restored old value for key {:?}", key);
848                            }
849                        }
850                        None => {
851                            // Key didn't exist before, so delete it
852                            if let Err(e) = self.storage.delete(key).await {
853                                error!(
854                                    "Rollback failed for UndoSet (delete) at index {}: {}",
855                                    idx, e
856                                );
857                            } else {
858                                debug!("Rolled back Set: deleted new key {:?}", key);
859                            }
860                        }
861                    }
862                }
863                RollbackOp::UndoDelete { key, old_value } => {
864                    if let Some(value) = old_value {
865                        // Re-insert the deleted value
866                        if let Err(e) = self.storage.put(key, value).await {
867                            error!("Rollback failed for UndoDelete at index {}: {}", idx, e);
868                        } else {
869                            debug!("Rolled back Delete: restored value for key {:?}", key);
870                        }
871                    }
872                    // If old_value was None, the key didn't exist before delete,
873                    // so nothing to restore
874                }
875                RollbackOp::UndoUpdate { snapshots } => {
876                    // First, collect all current keys so we can detect keys added by the update
877                    let current_keys = match self.storage.keys().await {
878                        Ok(k) => k,
879                        Err(e) => {
880                            error!(
881                                "Rollback failed for UndoUpdate at index {}: cannot list keys: {}",
882                                idx, e
883                            );
884                            continue;
885                        }
886                    };
887
888                    // Build a set of keys that existed before the update
889                    let snapshot_keys: std::collections::HashSet<&Key> =
890                        snapshots.iter().map(|(k, _)| k).collect();
891
892                    // Remove any keys that were created by the update (not in snapshot)
893                    for key in &current_keys {
894                        if !snapshot_keys.contains(key) {
895                            if let Err(e) = self.storage.delete(key).await {
896                                error!(
897                                    "Rollback failed for UndoUpdate (remove new key) at index {}: {}",
898                                    idx, e
899                                );
900                            } else {
901                                debug!("Rolled back Update: removed new key {:?}", key);
902                            }
903                        }
904                    }
905
906                    // Restore all snapshotted values
907                    for (key, old_value) in snapshots {
908                        match old_value {
909                            Some(value) => {
910                                if let Err(e) = self.storage.put(key, value).await {
911                                    error!(
912                                        "Rollback failed for UndoUpdate (restore) at index {}: {}",
913                                        idx, e
914                                    );
915                                } else {
916                                    debug!("Rolled back Update: restored value for key {:?}", key);
917                                }
918                            }
919                            None => {
920                                // Key existed in snapshot as None — delete it if it was created
921                                if let Err(e) = self.storage.delete(key).await {
922                                    error!(
923                                        "Rollback failed for UndoUpdate (delete) at index {}: {}",
924                                        idx, e
925                                    );
926                                }
927                            }
928                        }
929                    }
930                    debug!("Rolled back Update operation at index {}", idx);
931                }
932            }
933        }
934
935        info!("Rollback completed");
936    }
937
938    /// Execute a streaming query that returns results in chunks
939    ///
940    /// This method executes a range or filter query and returns results as a stream
941    /// of `StreamResponse` messages, each containing a batch of key-value pairs.
942    /// The chunk size controls how many items are included per message.
943    ///
944    /// # Arguments
945    /// * `request` - The query request to execute
946    /// * `config` - Streaming configuration (chunk size, max results, timeout)
947    ///
948    /// # Returns
949    /// A boxed stream of `Result<aql::StreamResponse, NetError>` messages
950    pub fn execute_stream(
951        &self,
952        request: aql::QueryRequest,
953        config: StreamConfig,
954    ) -> futures::stream::BoxStream<'static, Result<aql::StreamResponse, NetError>> {
955        use futures::StreamExt;
956
957        let storage = self.storage.clone();
958        let recent_log = self.recent_log.clone();
959        let request_id = request.request_id.clone();
960
961        let stream = async_stream::stream! {
962            let start_time = Instant::now();
963
964            info!(
965                "ExecuteStream request received: request_id={:?}, chunk_size={}",
966                request_id, config.chunk_size
967            );
968
969            // Extract and validate the query
970            let proto_query = match request.query {
971                Some(q) => q,
972                None => {
973                    yield Err(NetError::MissingField("query".to_string()));
974                    return;
975                }
976            };
977
978            let core_query = match query_from_proto(proto_query) {
979                Ok(q) => q,
980                Err(e) => {
981                    error!("Failed to parse stream query: {}", e);
982                    yield Err(e);
983                    return;
984                }
985            };
986
987            // Only Range queries are supported for streaming (they return multiple results)
988            let results = match core_query {
989                Query::Range { collection, start, end } => {
990                    debug!(
991                        "Executing streaming RANGE query: collection={}, start={:?}, end={:?}",
992                        collection, start, end
993                    );
994                    match storage.range(&start, &end).await {
995                        Ok(rows) => rows,
996                        Err(e) => {
997                            error!("Storage range query failed: {}", e);
998                            yield Err(NetError::from(e));
999                            return;
1000                        }
1001                    }
1002                }
1003                Query::Get { collection, key } => {
1004                    debug!(
1005                        "Executing streaming GET query: collection={}, key={:?}",
1006                        collection, key
1007                    );
1008                    match storage.get(&key).await {
1009                        Ok(Some(value)) => vec![(key, value)],
1010                        Ok(None) => Vec::new(),
1011                        Err(e) => {
1012                            error!("Storage get query failed: {}", e);
1013                            yield Err(NetError::from(e));
1014                            return;
1015                        }
1016                    }
1017                }
1018                _ => {
1019                    yield Err(NetError::InvalidRequest(
1020                        "Only Range and Get queries are supported for streaming".to_string(),
1021                    ));
1022                    return;
1023                }
1024            };
1025
1026            // Apply max_results limit if configured
1027            let results = if let Some(max) = config.max_results {
1028                if results.len() > max {
1029                    results.into_iter().take(max).collect::<Vec<_>>()
1030                } else {
1031                    results
1032                }
1033            } else {
1034                results
1035            };
1036
1037            let total_count = results.len();
1038
1039            // Check timeout before starting to stream
1040            if start_time.elapsed() > config.timeout {
1041                yield Err(NetError::Timeout(
1042                    "Query execution exceeded timeout before streaming began".to_string(),
1043                ));
1044                return;
1045            }
1046
1047            // Stream results in chunks
1048            let mut sequence: u64 = 0;
1049            let chunks_iter: Vec<Vec<(Key, CipherBlob)>> = results
1050                .chunks(config.chunk_size)
1051                .map(|c| c.to_vec())
1052                .collect();
1053            let total_chunks = chunks_iter.len();
1054
1055            for (chunk_idx, chunk) in chunks_iter.into_iter().enumerate() {
1056                // Check timeout for each chunk
1057                if start_time.elapsed() > config.timeout {
1058                    yield Err(NetError::Timeout(
1059                        format!("Streaming timed out at chunk {}/{}", chunk_idx + 1, total_chunks)
1060                    ));
1061                    return;
1062                }
1063
1064                let has_more = chunk_idx + 1 < total_chunks;
1065                let values: Vec<query::KeyValue> = chunk
1066                    .into_iter()
1067                    .map(|(k, v)| query::KeyValue {
1068                        key: Some(key_to_proto(&k)),
1069                        value: Some(cipher_blob_to_proto(&v)),
1070                        encrypted_predicate_result: None,
1071                    })
1072                    .collect();
1073
1074                yield Ok(aql::StreamResponse {
1075                    chunk: Some(aql::stream_response::Chunk::Batch(aql::StreamBatch {
1076                        values,
1077                        has_more,
1078                    })),
1079                    sequence,
1080                });
1081
1082                sequence += 1;
1083            }
1084
1085            // Send end marker
1086            yield Ok(aql::StreamResponse {
1087                chunk: Some(aql::stream_response::Chunk::End(aql::StreamEnd {
1088                    total_count: total_count as u64,
1089                })),
1090                sequence,
1091            });
1092
1093            let elapsed_ms = start_time.elapsed().as_millis() as u64;
1094            info!(
1095                "ExecuteStream completed: {} items in {} chunks, {}ms",
1096                total_count,
1097                total_chunks,
1098                elapsed_ms
1099            );
1100            push_log_entry(
1101                &recent_log,
1102                format!(
1103                    "ExecuteStream elapsed={}ms items={} chunks={} ok",
1104                    elapsed_ms, total_count, total_chunks
1105                ),
1106            );
1107        };
1108
1109        stream.boxed()
1110    }
1111
1112    /// Health check
1113    #[tracing::instrument(skip(self, _request))]
1114    pub async fn health_check(
1115        &self,
1116        _request: aql::HealthCheckRequest,
1117    ) -> aql::HealthCheckResponse {
1118        debug!("HealthCheck request received");
1119        push_log_entry(&self.recent_log, "HealthCheck ok".to_string());
1120
1121        aql::HealthCheckResponse {
1122            status: aql::HealthStatus::HealthServing as i32,
1123            message: Some("Service is healthy".to_string()),
1124        }
1125    }
1126
1127    /// Get server information
1128    #[tracing::instrument(skip(self, _request))]
1129    pub async fn get_server_info(
1130        &self,
1131        _request: aql::ServerInfoRequest,
1132    ) -> aql::ServerInfoResponse {
1133        debug!("GetServerInfo request received");
1134        push_log_entry(&self.recent_log, "GetServerInfo ok".to_string());
1135
1136        let mut capabilities = vec![
1137            "query.get".to_string(),
1138            "query.set".to_string(),
1139            "query.delete".to_string(),
1140            "query.range".to_string(),
1141            "query.update".to_string(),
1142        ];
1143
1144        #[cfg(feature = "compute")]
1145        capabilities.push("query.filter".to_string());
1146
1147        aql::ServerInfoResponse {
1148            version: Some(create_version()),
1149            supported_versions: vec![create_version()],
1150            capabilities,
1151            uptime_seconds: self.start_time.elapsed().as_secs(),
1152        }
1153    }
1154
1155    /// Handle a decoded admin command and return a JSON string if supported.
1156    ///
1157    /// Delegates to [`crate::server_admin::handle_admin_command`].  The
1158    /// interceptor in `execute_query_internal` remains here; only the logic
1159    /// moves to `admin.rs`.
1160    async fn handle_admin_command(&self, cmd: &str) -> Option<String> {
1161        crate::server_admin::handle_admin_command(
1162            cmd,
1163            self.start_time.elapsed().as_secs(),
1164            &self.recent_log,
1165            &self.storage,
1166        )
1167        .await
1168    }
1169}
1170
1171// `AqlServerBuilder` lives in `crate::server_builder`; re-export so existing
1172// callers can continue to write `crate::server::AqlServerBuilder`.
1173pub use crate::server_builder::AqlServerBuilder;
1174pub use crate::server_types::StreamConfig;
1175use crate::server_types::{RollbackOp, apply_update_operation};
1176
1177#[cfg(test)]
1178mod tests {
1179    use super::*;
1180    use amaters_core::storage::MemoryStorage;
1181    use amaters_core::types::{CipherBlob, Key};
1182
1183    #[tokio::test]
1184    async fn test_service_creation() {
1185        let storage = Arc::new(MemoryStorage::new());
1186        let service = AqlServiceImpl::new(storage);
1187        assert!(service.start_time.elapsed().as_secs() < 1);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_get_query_execution() {
1192        let storage = Arc::new(MemoryStorage::new());
1193        let key = Key::from_str("test_key");
1194        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1195
1196        storage.put(&key, &value).await.expect("Failed to put");
1197
1198        let service = AqlServiceImpl::new(storage);
1199
1200        let query = Query::Get {
1201            collection: "test".to_string(),
1202            key: key.clone(),
1203        };
1204
1205        let result = service.execute_query_internal(query).await;
1206        assert!(result.is_ok());
1207
1208        let query_result = result.expect("Query failed");
1209        match query_result.result {
1210            Some(query::query_result::Result::Single(single)) => {
1211                assert!(single.value.is_some());
1212            }
1213            _ => panic!("Expected single result"),
1214        }
1215    }
1216
1217    #[tokio::test]
1218    async fn test_set_query_execution() {
1219        let storage = Arc::new(MemoryStorage::new());
1220        let service = AqlServiceImpl::new(storage.clone());
1221
1222        let key = Key::from_str("test_key");
1223        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1224
1225        let query = Query::Set {
1226            collection: "test".to_string(),
1227            key: key.clone(),
1228            value: value.clone(),
1229        };
1230
1231        let result = service.execute_query_internal(query).await;
1232        assert!(result.is_ok());
1233
1234        // Verify the value was stored
1235        let stored = storage.get(&key).await.expect("Failed to get");
1236        assert!(stored.is_some());
1237        assert_eq!(stored.expect("No value"), value);
1238    }
1239
1240    #[tokio::test]
1241    async fn test_delete_query_execution() {
1242        let storage = Arc::new(MemoryStorage::new());
1243        let key = Key::from_str("test_key");
1244        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1245
1246        storage.put(&key, &value).await.expect("Failed to put");
1247
1248        let service = AqlServiceImpl::new(storage.clone());
1249
1250        let query = Query::Delete {
1251            collection: "test".to_string(),
1252            key: key.clone(),
1253        };
1254
1255        let result = service.execute_query_internal(query).await;
1256        assert!(result.is_ok());
1257
1258        // Verify the value was deleted
1259        let stored = storage.get(&key).await.expect("Failed to get");
1260        assert!(stored.is_none());
1261    }
1262
1263    #[tokio::test]
1264    async fn test_range_query_execution() {
1265        let storage = Arc::new(MemoryStorage::new());
1266
1267        // Insert test data
1268        for i in 0..10 {
1269            let key = Key::from_str(&format!("key_{:02}", i));
1270            let value = CipherBlob::new(vec![i as u8]);
1271            storage.put(&key, &value).await.expect("Failed to put");
1272        }
1273
1274        let service = AqlServiceImpl::new(storage);
1275
1276        let query = Query::Range {
1277            collection: "test".to_string(),
1278            start: Key::from_str("key_03"),
1279            end: Key::from_str("key_07"),
1280        };
1281
1282        let result = service.execute_query_internal(query).await;
1283        assert!(result.is_ok());
1284
1285        let query_result = result.expect("Query failed");
1286        match query_result.result {
1287            Some(query::query_result::Result::Multi(multi)) => {
1288                assert!(!multi.values.is_empty());
1289            }
1290            _ => panic!("Expected multi result"),
1291        }
1292    }
1293
1294    #[tokio::test]
1295    async fn test_get_nonexistent_key() {
1296        let storage = Arc::new(MemoryStorage::new());
1297        let service = AqlServiceImpl::new(storage);
1298
1299        let query = Query::Get {
1300            collection: "test".to_string(),
1301            key: Key::from_str("nonexistent"),
1302        };
1303
1304        let result = service.execute_query_internal(query).await;
1305        assert!(result.is_ok());
1306
1307        let query_result = result.expect("Query failed");
1308        match query_result.result {
1309            Some(query::query_result::Result::Single(single)) => {
1310                assert!(single.value.is_none());
1311            }
1312            _ => panic!("Expected single result"),
1313        }
1314    }
1315
1316    #[tokio::test]
1317    async fn test_health_check() {
1318        let storage = Arc::new(MemoryStorage::new());
1319        let service = AqlServiceImpl::new(storage);
1320
1321        let request = aql::HealthCheckRequest { service: None };
1322        let response = service.health_check(request).await;
1323
1324        assert_eq!(response.status, aql::HealthStatus::HealthServing as i32);
1325    }
1326
1327    #[tokio::test]
1328    async fn test_server_info() {
1329        let storage = Arc::new(MemoryStorage::new());
1330        let service = AqlServiceImpl::new(storage);
1331
1332        let request = aql::ServerInfoRequest {};
1333        let response = service.get_server_info(request).await;
1334
1335        assert!(response.version.is_some());
1336        assert!(!response.capabilities.is_empty());
1337        assert!(response.capabilities.contains(&"query.get".to_string()));
1338    }
1339
1340    #[cfg(feature = "compute")]
1341    #[tokio::test]
1342    async fn test_server_info_advertises_filter() {
1343        let storage = Arc::new(MemoryStorage::new());
1344        let service = AqlServiceImpl::new(storage);
1345
1346        let request = aql::ServerInfoRequest {};
1347        let response = service.get_server_info(request).await;
1348
1349        assert!(
1350            response.capabilities.contains(&"query.filter".to_string()),
1351            "capabilities should advertise query.filter when compute feature is enabled"
1352        );
1353    }
1354
1355    #[cfg(feature = "compute")]
1356    #[tokio::test]
1357    async fn test_filter_query_execution() {
1358        use amaters_core::{ColumnRef, Predicate};
1359
1360        let storage = Arc::new(MemoryStorage::new());
1361
1362        // Store single-byte (plaintext) values 0..4.  Single-byte blobs are
1363        // detected as plaintext by the server and filtered without FHE.
1364        for i in 0u8..5 {
1365            let key = Key::from_str(&format!("row_{:02}", i));
1366            let value = CipherBlob::new(vec![i]);
1367            storage
1368                .put(&key, &value)
1369                .await
1370                .expect("Failed to insert test data");
1371        }
1372
1373        let service = AqlServiceImpl::new(storage);
1374
1375        // Filter predicate: value > 2 (expects rows 3 and 4 to match)
1376        let rhs_blob = CipherBlob::new(vec![2]);
1377        let predicate = Predicate::Gt(ColumnRef::new("value".to_string()), rhs_blob);
1378
1379        let filter_query = Query::Filter {
1380            collection: "test".to_string(),
1381            predicate,
1382        };
1383
1384        let result = service
1385            .execute_query_internal(filter_query)
1386            .await
1387            .expect("plaintext filter query should succeed");
1388
1389        match result.result {
1390            Some(query::query_result::Result::Multi(multi)) => {
1391                // Plaintext filtering: only rows with value > 2 (i.e., 3 and 4) are returned.
1392                assert_eq!(
1393                    multi.values.len(),
1394                    2,
1395                    "expected 2 matching rows (values 3 and 4)"
1396                );
1397                // Plaintext results have no encrypted predicate result field.
1398                for kv in &multi.values {
1399                    assert!(
1400                        kv.encrypted_predicate_result.is_none(),
1401                        "plaintext filter results should not carry encrypted_predicate_result"
1402                    );
1403                }
1404            }
1405            other => panic!("Expected Multi result from filter query, got {:?}", other),
1406        }
1407    }
1408
1409    #[cfg(not(feature = "compute"))]
1410    #[tokio::test]
1411    async fn test_filter_query_requires_compute_feature() {
1412        use amaters_core::{ColumnRef, Predicate};
1413
1414        let storage = Arc::new(MemoryStorage::new());
1415        let service = AqlServiceImpl::new(storage);
1416
1417        let rhs_blob = CipherBlob::new(vec![1]);
1418        let predicate = Predicate::Gt(ColumnRef::new("value".to_string()), rhs_blob);
1419
1420        let filter_query = Query::Filter {
1421            collection: "test".to_string(),
1422            predicate,
1423        };
1424
1425        let result = service.execute_query_internal(filter_query).await;
1426        assert!(
1427            result.is_err(),
1428            "Filter should fail without compute feature"
1429        );
1430        let err_msg = result
1431            .as_ref()
1432            .err()
1433            .map(|e| e.to_string())
1434            .unwrap_or_default();
1435        assert!(
1436            err_msg.contains("compute feature"),
1437            "Error should mention compute feature: {}",
1438            err_msg
1439        );
1440    }
1441
1442    // ---------------------------------------------------------------
1443    // UPDATE query tests (non-compute path: updates ALL rows)
1444    // ---------------------------------------------------------------
1445
1446    /// Helper to build a dummy predicate (used by UPDATE queries).
1447    /// Without the compute feature the predicate is ignored, so we
1448    /// just need a syntactically valid one.
1449    #[cfg(not(feature = "compute"))]
1450    fn dummy_predicate() -> amaters_core::Predicate {
1451        amaters_core::Predicate::Eq(
1452            amaters_core::ColumnRef::new("col"),
1453            CipherBlob::new(vec![0]),
1454        )
1455    }
1456
1457    #[cfg(not(feature = "compute"))]
1458    #[tokio::test]
1459    async fn test_update_set_single_key() {
1460        let storage = Arc::new(MemoryStorage::new());
1461        let key = Key::from_str("row_00");
1462        let original = CipherBlob::new(vec![10, 20, 30]);
1463        storage.put(&key, &original).await.expect("Failed to put");
1464
1465        let service = AqlServiceImpl::new(storage.clone());
1466
1467        let new_blob = CipherBlob::new(vec![99, 88, 77]);
1468        let query = Query::Update {
1469            collection: "test".to_string(),
1470            predicate: dummy_predicate(),
1471            updates: vec![amaters_core::Update::Set(
1472                amaters_core::ColumnRef::new("val"),
1473                new_blob.clone(),
1474            )],
1475        };
1476
1477        let result = service
1478            .execute_query_internal(query)
1479            .await
1480            .expect("Update failed");
1481        match result.result {
1482            Some(query::query_result::Result::Success(s)) => {
1483                assert_eq!(s.affected_rows, 1);
1484            }
1485            other => panic!("Expected Success, got {:?}", other),
1486        }
1487
1488        let stored = storage
1489            .get(&key)
1490            .await
1491            .expect("Failed to get")
1492            .expect("Key missing after update");
1493        assert_eq!(stored, new_blob);
1494    }
1495
1496    #[cfg(not(feature = "compute"))]
1497    #[tokio::test]
1498    async fn test_update_set_multiple_keys() {
1499        let storage = Arc::new(MemoryStorage::new());
1500
1501        for i in 0u8..5 {
1502            let key = Key::from_str(&format!("row_{:02}", i));
1503            let value = CipherBlob::new(vec![i]);
1504            storage.put(&key, &value).await.expect("Failed to put");
1505        }
1506
1507        let service = AqlServiceImpl::new(storage.clone());
1508
1509        let replacement = CipherBlob::new(vec![255]);
1510        let query = Query::Update {
1511            collection: "data".to_string(),
1512            predicate: dummy_predicate(),
1513            updates: vec![amaters_core::Update::Set(
1514                amaters_core::ColumnRef::new("v"),
1515                replacement.clone(),
1516            )],
1517        };
1518
1519        let result = service
1520            .execute_query_internal(query)
1521            .await
1522            .expect("Update failed");
1523        match result.result {
1524            Some(query::query_result::Result::Success(s)) => {
1525                assert_eq!(s.affected_rows, 5);
1526            }
1527            other => panic!("Expected Success, got {:?}", other),
1528        }
1529
1530        // Verify all keys were updated
1531        for i in 0u8..5 {
1532            let key = Key::from_str(&format!("row_{:02}", i));
1533            let stored = storage
1534                .get(&key)
1535                .await
1536                .expect("Failed to get")
1537                .expect("Key missing");
1538            assert_eq!(stored, replacement);
1539        }
1540    }
1541
1542    #[cfg(not(feature = "compute"))]
1543    #[tokio::test]
1544    async fn test_update_nonexistent_collection() {
1545        // No keys in storage at all — update should succeed with 0 affected rows
1546        let storage = Arc::new(MemoryStorage::new());
1547        let service = AqlServiceImpl::new(storage);
1548
1549        let query = Query::Update {
1550            collection: "ghost".to_string(),
1551            predicate: dummy_predicate(),
1552            updates: vec![amaters_core::Update::Set(
1553                amaters_core::ColumnRef::new("x"),
1554                CipherBlob::new(vec![1]),
1555            )],
1556        };
1557
1558        let result = service
1559            .execute_query_internal(query)
1560            .await
1561            .expect("Update on empty storage should not error");
1562        match result.result {
1563            Some(query::query_result::Result::Success(s)) => {
1564                assert_eq!(s.affected_rows, 0);
1565            }
1566            other => panic!("Expected Success with 0 rows, got {:?}", other),
1567        }
1568    }
1569
1570    #[cfg(not(feature = "compute"))]
1571    #[tokio::test]
1572    async fn test_update_add_operation() {
1573        let storage = Arc::new(MemoryStorage::new());
1574        let key = Key::from_str("counter");
1575        let original = CipherBlob::new(vec![10, 20]);
1576        storage.put(&key, &original).await.expect("Failed to put");
1577
1578        let service = AqlServiceImpl::new(storage.clone());
1579
1580        let addend = CipherBlob::new(vec![5, 3]);
1581        let query = Query::Update {
1582            collection: "c".to_string(),
1583            predicate: dummy_predicate(),
1584            updates: vec![amaters_core::Update::Add(
1585                amaters_core::ColumnRef::new("v"),
1586                addend,
1587            )],
1588        };
1589
1590        service
1591            .execute_query_internal(query)
1592            .await
1593            .expect("Update failed");
1594
1595        let stored = storage
1596            .get(&key)
1597            .await
1598            .expect("Failed to get")
1599            .expect("Key missing");
1600        assert_eq!(stored.as_bytes(), &[15, 23]);
1601    }
1602
1603    #[cfg(not(feature = "compute"))]
1604    #[tokio::test]
1605    async fn test_update_mul_operation() {
1606        let storage = Arc::new(MemoryStorage::new());
1607        let key = Key::from_str("product");
1608        let original = CipherBlob::new(vec![3, 4]);
1609        storage.put(&key, &original).await.expect("Failed to put");
1610
1611        let service = AqlServiceImpl::new(storage.clone());
1612
1613        let factor = CipherBlob::new(vec![2, 5]);
1614        let query = Query::Update {
1615            collection: "c".to_string(),
1616            predicate: dummy_predicate(),
1617            updates: vec![amaters_core::Update::Mul(
1618                amaters_core::ColumnRef::new("v"),
1619                factor,
1620            )],
1621        };
1622
1623        service
1624            .execute_query_internal(query)
1625            .await
1626            .expect("Update failed");
1627
1628        let stored = storage
1629            .get(&key)
1630            .await
1631            .expect("Failed to get")
1632            .expect("Key missing");
1633        assert_eq!(stored.as_bytes(), &[6, 20]);
1634    }
1635
1636    #[cfg(not(feature = "compute"))]
1637    #[tokio::test]
1638    async fn test_update_multiple_operations_per_key() {
1639        let storage = Arc::new(MemoryStorage::new());
1640        let key = Key::from_str("multi_op");
1641        let original = CipherBlob::new(vec![2]);
1642        storage.put(&key, &original).await.expect("Failed to put");
1643
1644        let service = AqlServiceImpl::new(storage.clone());
1645
1646        // Add 3 then multiply by 10: (2 + 3) * 10 = 50
1647        let query = Query::Update {
1648            collection: "c".to_string(),
1649            predicate: dummy_predicate(),
1650            updates: vec![
1651                amaters_core::Update::Add(
1652                    amaters_core::ColumnRef::new("v"),
1653                    CipherBlob::new(vec![3]),
1654                ),
1655                amaters_core::Update::Mul(
1656                    amaters_core::ColumnRef::new("v"),
1657                    CipherBlob::new(vec![10]),
1658                ),
1659            ],
1660        };
1661
1662        service
1663            .execute_query_internal(query)
1664            .await
1665            .expect("Update failed");
1666
1667        let stored = storage
1668            .get(&key)
1669            .await
1670            .expect("Failed to get")
1671            .expect("Key missing");
1672        assert_eq!(stored.as_bytes(), &[50]);
1673    }
1674
1675    #[cfg(not(feature = "compute"))]
1676    #[tokio::test]
1677    async fn test_update_returns_affected_count() {
1678        let storage = Arc::new(MemoryStorage::new());
1679
1680        // Insert exactly 7 keys
1681        for i in 0u8..7 {
1682            let key = Key::from_str(&format!("k{}", i));
1683            storage
1684                .put(&key, &CipherBlob::new(vec![i]))
1685                .await
1686                .expect("Failed to put");
1687        }
1688
1689        let service = AqlServiceImpl::new(storage);
1690
1691        let query = Query::Update {
1692            collection: "c".to_string(),
1693            predicate: dummy_predicate(),
1694            updates: vec![amaters_core::Update::Set(
1695                amaters_core::ColumnRef::new("v"),
1696                CipherBlob::new(vec![0]),
1697            )],
1698        };
1699
1700        let result = service
1701            .execute_query_internal(query)
1702            .await
1703            .expect("Update failed");
1704        match result.result {
1705            Some(query::query_result::Result::Success(s)) => {
1706                assert_eq!(s.affected_rows, 7);
1707            }
1708            other => panic!("Expected Success with 7 rows, got {:?}", other),
1709        }
1710    }
1711
1712    #[cfg(not(feature = "compute"))]
1713    #[tokio::test]
1714    async fn test_update_preserves_other_collections() {
1715        // Since our storage is flat (no collection namespacing at the storage level),
1716        // we verify that keys with different prefixes are still present after update.
1717        let storage = Arc::new(MemoryStorage::new());
1718
1719        let key_a = Key::from_str("collA_row1");
1720        let key_b = Key::from_str("collB_row1");
1721        let val_a = CipherBlob::new(vec![1, 2, 3]);
1722        let val_b = CipherBlob::new(vec![4, 5, 6]);
1723
1724        storage.put(&key_a, &val_a).await.expect("Failed to put A");
1725        storage.put(&key_b, &val_b).await.expect("Failed to put B");
1726
1727        let service = AqlServiceImpl::new(storage.clone());
1728
1729        // Update sets all keys; verify key_b is still readable (even though changed)
1730        let query = Query::Update {
1731            collection: "collA".to_string(),
1732            predicate: dummy_predicate(),
1733            updates: vec![amaters_core::Update::Set(
1734                amaters_core::ColumnRef::new("v"),
1735                CipherBlob::new(vec![99]),
1736            )],
1737        };
1738
1739        service
1740            .execute_query_internal(query)
1741            .await
1742            .expect("Update failed");
1743
1744        // Both keys should still exist in storage
1745        let stored_a = storage.get(&key_a).await.expect("Failed to get A");
1746        assert!(stored_a.is_some(), "key_a should still exist");
1747
1748        let stored_b = storage.get(&key_b).await.expect("Failed to get B");
1749        assert!(stored_b.is_some(), "key_b should still exist");
1750    }
1751
1752    #[cfg(not(feature = "compute"))]
1753    #[tokio::test]
1754    async fn test_update_empty_updates_vec() {
1755        // An update with an empty updates vector should succeed and not modify values
1756        let storage = Arc::new(MemoryStorage::new());
1757        let key = Key::from_str("keep_me");
1758        let original = CipherBlob::new(vec![42]);
1759        storage.put(&key, &original).await.expect("Failed to put");
1760
1761        let service = AqlServiceImpl::new(storage.clone());
1762
1763        let query = Query::Update {
1764            collection: "c".to_string(),
1765            predicate: dummy_predicate(),
1766            updates: vec![], // no operations
1767        };
1768
1769        let result = service
1770            .execute_query_internal(query)
1771            .await
1772            .expect("Update with empty ops should succeed");
1773        match result.result {
1774            Some(query::query_result::Result::Success(s)) => {
1775                // The row was "affected" (iterated) even though no ops were applied
1776                assert_eq!(s.affected_rows, 1);
1777            }
1778            other => panic!("Expected Success, got {:?}", other),
1779        }
1780
1781        // Value should be unchanged
1782        let stored = storage
1783            .get(&key)
1784            .await
1785            .expect("Failed to get")
1786            .expect("Key missing");
1787        assert_eq!(stored, original);
1788    }
1789
1790    #[cfg(not(feature = "compute"))]
1791    #[tokio::test]
1792    async fn test_update_then_select_verifies_changes() {
1793        let storage = Arc::new(MemoryStorage::new());
1794
1795        // Insert 3 rows
1796        for i in 0u8..3 {
1797            let key = Key::from_str(&format!("sel_{:02}", i));
1798            let value = CipherBlob::new(vec![i, i, i]);
1799            storage.put(&key, &value).await.expect("Failed to put");
1800        }
1801
1802        let service = AqlServiceImpl::new(storage.clone());
1803
1804        // Update: add [1, 1, 1] to every row
1805        let update_query = Query::Update {
1806            collection: "c".to_string(),
1807            predicate: dummy_predicate(),
1808            updates: vec![amaters_core::Update::Add(
1809                amaters_core::ColumnRef::new("v"),
1810                CipherBlob::new(vec![1, 1, 1]),
1811            )],
1812        };
1813
1814        service
1815            .execute_query_internal(update_query)
1816            .await
1817            .expect("Update failed");
1818
1819        // Now read back each key and verify the addition
1820        for i in 0u8..3 {
1821            let key = Key::from_str(&format!("sel_{:02}", i));
1822            let get_query = Query::Get {
1823                collection: "c".to_string(),
1824                key: key.clone(),
1825            };
1826
1827            let result = service
1828                .execute_query_internal(get_query)
1829                .await
1830                .expect("Get failed");
1831
1832            match result.result {
1833                Some(query::query_result::Result::Single(single)) => {
1834                    let proto_val = single.value.expect("Expected value from get");
1835                    // The proto value data should equal [i+1, i+1, i+1]
1836                    let expected = vec![i + 1, i + 1, i + 1];
1837                    assert_eq!(
1838                        proto_val.data, expected,
1839                        "Row sel_{:02} should have been updated",
1840                        i
1841                    );
1842                }
1843                other => panic!("Expected Single result, got {:?}", other),
1844            }
1845        }
1846    }
1847
1848    /// With compute enabled, the UPDATE handler compiles the predicate and
1849    /// evaluates it via FHE. This test verifies the code path runs without
1850    /// panicking and returns a valid result (either success or a known FHE
1851    /// error), similar to the existing filter compute test.
1852    #[cfg(feature = "compute")]
1853    #[tokio::test]
1854    async fn test_update_with_compute_feature() {
1855        use amaters_core::{ColumnRef, Predicate};
1856
1857        let storage = Arc::new(MemoryStorage::new());
1858
1859        for i in 0u8..3 {
1860            let key = Key::from_str(&format!("row_{:02}", i));
1861            let value = CipherBlob::new(vec![i]);
1862            storage
1863                .put(&key, &value)
1864                .await
1865                .expect("Failed to insert test data");
1866        }
1867
1868        let service = AqlServiceImpl::new(storage);
1869
1870        let rhs_blob = CipherBlob::new(vec![1]);
1871        let predicate = Predicate::Eq(ColumnRef::new("value"), rhs_blob);
1872
1873        let update_query = Query::Update {
1874            collection: "test".to_string(),
1875            predicate,
1876            updates: vec![amaters_core::Update::Set(
1877                ColumnRef::new("v"),
1878                CipherBlob::new(vec![99]),
1879            )],
1880        };
1881
1882        let result = service.execute_query_internal(update_query).await;
1883
1884        // Accept either Ok (FHE evaluated successfully) or a known FHE error
1885        match result {
1886            Ok(query_result) => {
1887                match query_result.result {
1888                    Some(query::query_result::Result::Success(s)) => {
1889                        // Some or all rows may have been affected
1890                        assert!(s.affected_rows <= 3);
1891                    }
1892                    other => panic!("Expected Success result from update, got {:?}", other),
1893                }
1894            }
1895            Err(e) => {
1896                let msg = e.to_string();
1897                assert!(
1898                    msg.contains("FHE")
1899                        || msg.contains("fhe")
1900                        || msg.contains("Predicate compilation")
1901                        || msg.contains("compilation failed")
1902                        || msg.contains("execution")
1903                        || msg.contains("RHS"),
1904                    "Unexpected error from update query: {}",
1905                    msg
1906                );
1907            }
1908        }
1909    }
1910
1911    // UPDATE rollback tests are in server_rollback_tests.rs
1912    include!("server_rollback_tests.rs");
1913
1914    /// Compile-time test: verifies that `AqlServerBuilder::build_grpc_service` compiles
1915    /// without the `compression` feature. No runtime assertions are needed — if the
1916    /// `#[cfg(feature = "compression")]` block were unconditional this test would fail
1917    /// to compile (or worse, panic at runtime) when the feature is absent.
1918    #[tokio::test]
1919    async fn test_compression_feature_gate_disabled() {
1920        let storage = Arc::new(MemoryStorage::new());
1921        let builder = AqlServerBuilder::new(storage);
1922        // build_grpc_service should always compile regardless of compression feature.
1923        let _server = builder.build_grpc_service();
1924        // If we reach here, the feature-gate is working correctly.
1925    }
1926}