amaters_net/
server.rs

1//! gRPC server implementation for AmateRS AQL Service
2//!
3//! This module provides the server implementation that connects the network layer
4//! with the storage engine to handle client requests.
5
6use crate::convert::{cipher_blob_to_proto, create_version, key_to_proto, query_from_proto};
7use crate::error::{NetError, NetResult};
8use crate::proto::{aql, query};
9use amaters_core::Query;
10use amaters_core::traits::StorageEngine;
11use std::sync::Arc;
12use std::time::Instant;
13use tracing::{debug, error, info, warn};
14
15#[cfg(feature = "compute")]
16use amaters_core::compute::{FheExecutor, KeyManager, PredicateCompiler};
17#[cfg(feature = "compute")]
18use amaters_core::types::Key;
19#[cfg(feature = "compute")]
20use std::collections::HashMap;
21
22/// AQL service implementation
23///
24/// This service handles all AQL query requests and connects them to the underlying storage engine.
25pub struct AqlServiceImpl<S: StorageEngine> {
26    /// Storage engine for executing queries
27    storage: Arc<S>,
28    /// Server start time for uptime calculation
29    start_time: Instant,
30    /// FHE key manager for encrypted operations
31    #[cfg(feature = "compute")]
32    key_manager: Arc<KeyManager>,
33}
34
35impl<S: StorageEngine> AqlServiceImpl<S> {
36    /// Create a new AQL service with the given storage engine
37    #[cfg(feature = "compute")]
38    pub fn new(storage: Arc<S>) -> Self {
39        Self {
40            storage,
41            start_time: Instant::now(),
42            key_manager: Arc::new(KeyManager::new()),
43        }
44    }
45
46    /// Create a new AQL service with the given storage engine (without compute)
47    #[cfg(not(feature = "compute"))]
48    pub fn new(storage: Arc<S>) -> Self {
49        Self {
50            storage,
51            start_time: Instant::now(),
52        }
53    }
54
55    /// Create a new AQL service with a custom key manager
56    #[cfg(feature = "compute")]
57    pub fn with_key_manager(storage: Arc<S>, key_manager: Arc<KeyManager>) -> Self {
58        Self {
59            storage,
60            start_time: Instant::now(),
61            key_manager,
62        }
63    }
64
65    /// Execute a query and return the result
66    pub async fn execute_query(&self, request: aql::QueryRequest) -> aql::QueryResponse {
67        let start_time = Instant::now();
68
69        info!(
70            "ExecuteQuery request received: request_id={:?}",
71            request.request_id
72        );
73
74        // Extract and validate the query
75        let proto_query = match request.query {
76            Some(q) => q,
77            None => {
78                let execution_time_ms = start_time.elapsed().as_millis() as u64;
79                return aql::QueryResponse {
80                    response: Some(aql::query_response::Response::Error(
81                        crate::proto::errors::ErrorResponse {
82                            code: crate::proto::errors::ErrorCode::ErrorProtocolMissingField as i32,
83                            message: "Missing query in request".to_string(),
84                            category: crate::proto::errors::ErrorCategory::CategoryClientError
85                                as i32,
86                            details: None,
87                            retry_after: None,
88                        },
89                    )),
90                    request_id: request.request_id,
91                    execution_time_ms,
92                };
93            }
94        };
95
96        let query = match query_from_proto(proto_query) {
97            Ok(q) => q,
98            Err(e) => {
99                error!("Failed to parse query: {}", e);
100                let execution_time_ms = start_time.elapsed().as_millis() as u64;
101                return aql::QueryResponse {
102                    response: Some(aql::query_response::Response::Error(
103                        crate::proto::errors::ErrorResponse {
104                            code: e.error_code() as i32,
105                            message: e.to_string(),
106                            category: e.error_category() as i32,
107                            details: None,
108                            retry_after: None,
109                        },
110                    )),
111                    request_id: request.request_id,
112                    execution_time_ms,
113                };
114            }
115        };
116
117        // Execute the query
118        let result = self.execute_query_internal(query).await;
119
120        let execution_time_ms = start_time.elapsed().as_millis() as u64;
121
122        // Build response
123        match result {
124            Ok(query_result) => aql::QueryResponse {
125                response: Some(aql::query_response::Response::Result(query_result)),
126                request_id: request.request_id,
127                execution_time_ms,
128            },
129            Err(e) => {
130                error!("Query execution failed: {}", e);
131                aql::QueryResponse {
132                    response: Some(aql::query_response::Response::Error(
133                        crate::proto::errors::ErrorResponse {
134                            code: e.error_code() as i32,
135                            message: e.to_string(),
136                            category: e.error_category() as i32,
137                            details: None,
138                            retry_after: None,
139                        },
140                    )),
141                    request_id: request.request_id,
142                    execution_time_ms,
143                }
144            }
145        }
146    }
147
148    /// Execute a query and return the result
149    ///
150    /// This is an internal method used for testing and direct query execution.
151    /// For production use, prefer `execute_query` which handles protocol details.
152    #[doc(hidden)]
153    pub async fn execute_query_internal(&self, query: Query) -> NetResult<query::QueryResult> {
154        match query {
155            Query::Get { collection, key } => {
156                debug!(
157                    "Executing GET query: collection={}, key={:?}",
158                    collection, key
159                );
160
161                let result = self.storage.get(&key).await?;
162
163                let result = match result {
164                    Some(value) => query::QueryResult {
165                        result: Some(query::query_result::Result::Single(query::SingleResult {
166                            value: Some(cipher_blob_to_proto(&value)),
167                        })),
168                    },
169                    None => query::QueryResult {
170                        result: Some(query::query_result::Result::Single(query::SingleResult {
171                            value: None,
172                        })),
173                    },
174                };
175
176                Ok(result)
177            }
178            Query::Set {
179                collection,
180                key,
181                value,
182            } => {
183                debug!(
184                    "Executing SET query: collection={}, key={:?}",
185                    collection, key
186                );
187
188                self.storage.put(&key, &value).await?;
189
190                Ok(query::QueryResult {
191                    result: Some(query::query_result::Result::Success(query::SuccessResult {
192                        affected_rows: 1,
193                    })),
194                })
195            }
196            Query::Delete { collection, key } => {
197                debug!(
198                    "Executing DELETE query: collection={}, key={:?}",
199                    collection, key
200                );
201
202                self.storage.delete(&key).await?;
203
204                Ok(query::QueryResult {
205                    result: Some(query::query_result::Result::Success(query::SuccessResult {
206                        affected_rows: 1,
207                    })),
208                })
209            }
210            Query::Range {
211                collection,
212                start,
213                end,
214            } => {
215                debug!(
216                    "Executing RANGE query: collection={}, start={:?}, end={:?}",
217                    collection, start, end
218                );
219
220                let results = self.storage.range(&start, &end).await?;
221
222                let values: Vec<query::KeyValue> = results
223                    .into_iter()
224                    .map(|(k, v)| query::KeyValue {
225                        key: Some(key_to_proto(&k)),
226                        value: Some(cipher_blob_to_proto(&v)),
227                    })
228                    .collect();
229
230                Ok(query::QueryResult {
231                    result: Some(query::query_result::Result::Multi(query::MultiResult {
232                        values,
233                    })),
234                })
235            }
236            Query::Filter {
237                collection,
238                predicate,
239            } => {
240                #[cfg(feature = "compute")]
241                {
242                    info!("Executing FILTER query with FHE predicate evaluation");
243
244                    // WARNING: FHE filter queries are currently limited to simple predicates
245                    // Complex nested predicates may fail due to global server key conflicts
246                    // when multiple requests are processed concurrently.
247                    // TODO: Refactor to use per-request server keys instead of global state
248
249                    // 1. Compile predicate to FHE circuit
250                    let mut compiler = PredicateCompiler::new();
251
252                    // For now, assume U8 type - in production, this should be inferred
253                    // from the data or provided by the client
254                    let circuit = match compiler
255                        .compile(&predicate, amaters_core::compute::EncryptedType::U8)
256                    {
257                        Ok(c) => c,
258                        Err(e) => {
259                            error!("Failed to compile predicate: {}", e);
260                            return Err(NetError::ServerInternal(format!(
261                                "Predicate compilation failed: {}",
262                                e
263                            )));
264                        }
265                    };
266
267                    debug!(
268                        "Compiled predicate circuit: depth={}, gates={}",
269                        circuit.depth, circuit.gate_count
270                    );
271
272                    // 2. Get all candidate rows from storage
273                    // Use full range scan to get all data in the collection
274                    // Create minimal and maximal keys for full scan
275                    let min_key = Key::from_slice(&[]);
276                    let max_key = Key::from_slice(&[0xFF; 256]); // Reduced from 1024 to avoid potential issues
277
278                    let all_rows = match self.storage.range(&min_key, &max_key).await {
279                        Ok(rows) => rows,
280                        Err(e) => {
281                            error!("Failed to retrieve rows for filtering: {}", e);
282                            return Err(NetError::from(e));
283                        }
284                    };
285
286                    debug!("Retrieved {} rows for filtering", all_rows.len());
287
288                    // Limit the number of rows to prevent resource exhaustion
289                    if all_rows.len() > 1000 {
290                        warn!(
291                            "Filter query retrieved {} rows, which may cause performance issues",
292                            all_rows.len()
293                        );
294                    }
295
296                    // 3. Extract RHS value from predicate
297                    let rhs = match PredicateCompiler::extract_rhs_value(&predicate) {
298                        Ok(r) => r,
299                        Err(e) => {
300                            error!("Failed to extract RHS value: {}", e);
301                            return Err(NetError::ServerInternal(format!(
302                                "RHS extraction failed: {}",
303                                e
304                            )));
305                        }
306                    };
307
308                    // 4. Set up FHE executor
309                    let executor = FheExecutor::new();
310
311                    // 5. Execute circuit on each row
312                    // Note: For v0.1.0, we return encrypted booleans to the client
313                    // The client will decrypt and filter locally
314                    let mut results = Vec::new();
315                    let mut execution_errors = 0;
316
317                    for (key, value_blob) in all_rows {
318                        // Build inputs: value from storage + RHS from predicate
319                        let mut inputs = HashMap::new();
320                        inputs.insert("value".to_string(), value_blob.clone());
321                        inputs.insert("rhs".to_string(), rhs.clone());
322
323                        // Execute FHE circuit - result is encrypted boolean
324                        // Catch execution errors and continue processing other rows
325                        match executor.execute(&circuit, &inputs) {
326                            Ok(result_blob) => {
327                                // Store the key, value, and encrypted boolean result
328                                // For now, we'll pack these into the KeyValue structure
329                                // In a future version, we should have a dedicated FilterResult proto
330                                results.push(query::KeyValue {
331                                    key: Some(key_to_proto(&key)),
332                                    value: Some(cipher_blob_to_proto(&value_blob)),
333                                    // TODO: Add encrypted_predicate_result field to proto
334                                    // For now, client needs to re-evaluate locally or we return all
335                                });
336
337                                debug!(
338                                    "Executed predicate on key {:?}, result blob size: {}",
339                                    key,
340                                    result_blob.as_bytes().len()
341                                );
342                            }
343                            Err(e) => {
344                                execution_errors += 1;
345                                warn!("FHE execution failed for key {:?}: {}", key, e);
346                                // Continue processing other rows instead of failing the entire query
347                            }
348                        }
349                    }
350
351                    if execution_errors > 0 {
352                        warn!(
353                            "Filter query had {} FHE execution errors out of {} total rows",
354                            execution_errors,
355                            execution_errors + results.len()
356                        );
357                    }
358
359                    info!(
360                        "FILTER query completed, processed {} rows successfully",
361                        results.len()
362                    );
363
364                    // Return all rows with their values
365                    // TODO: Update proto to include encrypted boolean results
366                    Ok(query::QueryResult {
367                        result: Some(query::query_result::Result::Multi(query::MultiResult {
368                            values: results,
369                        })),
370                    })
371                }
372
373                #[cfg(not(feature = "compute"))]
374                {
375                    let _ = (collection, predicate);
376                    warn!("FILTER queries require compute feature to be enabled");
377                    Err(NetError::ServerInternal(
378                        "FILTER queries require compute feature".to_string(),
379                    ))
380                }
381            }
382            Query::Update {
383                collection,
384                predicate,
385                updates,
386            } => {
387                warn!("UPDATE queries are not yet fully implemented");
388                Err(NetError::ServerInternal(
389                    "UPDATE queries are not yet implemented".to_string(),
390                ))
391            }
392        }
393    }
394
395    /// Health check
396    pub async fn health_check(
397        &self,
398        _request: aql::HealthCheckRequest,
399    ) -> aql::HealthCheckResponse {
400        debug!("HealthCheck request received");
401
402        aql::HealthCheckResponse {
403            status: aql::HealthStatus::HealthServing as i32,
404            message: Some("Service is healthy".to_string()),
405        }
406    }
407
408    /// Get server information
409    pub async fn get_server_info(
410        &self,
411        _request: aql::ServerInfoRequest,
412    ) -> aql::ServerInfoResponse {
413        debug!("GetServerInfo request received");
414
415        aql::ServerInfoResponse {
416            version: Some(create_version()),
417            supported_versions: vec![create_version()],
418            capabilities: vec![
419                "query.get".to_string(),
420                "query.set".to_string(),
421                "query.delete".to_string(),
422                "query.range".to_string(),
423            ],
424            uptime_seconds: self.start_time.elapsed().as_secs(),
425        }
426    }
427}
428
429/// Server builder for creating AQL service instances
430pub struct AqlServerBuilder<S: StorageEngine> {
431    storage: Arc<S>,
432}
433
434impl<S: StorageEngine> AqlServerBuilder<S> {
435    /// Create a new server builder with the given storage engine
436    pub fn new(storage: Arc<S>) -> Self {
437        Self { storage }
438    }
439
440    /// Build the service implementation
441    pub fn build(self) -> AqlServiceImpl<S> {
442        AqlServiceImpl::new(self.storage)
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use amaters_core::storage::MemoryStorage;
450    use amaters_core::types::{CipherBlob, Key};
451
452    #[tokio::test]
453    async fn test_service_creation() {
454        let storage = Arc::new(MemoryStorage::new());
455        let service = AqlServiceImpl::new(storage);
456        assert!(service.start_time.elapsed().as_secs() < 1);
457    }
458
459    #[tokio::test]
460    async fn test_get_query_execution() {
461        let storage = Arc::new(MemoryStorage::new());
462        let key = Key::from_str("test_key");
463        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
464
465        storage.put(&key, &value).await.expect("Failed to put");
466
467        let service = AqlServiceImpl::new(storage);
468
469        let query = Query::Get {
470            collection: "test".to_string(),
471            key: key.clone(),
472        };
473
474        let result = service.execute_query_internal(query).await;
475        assert!(result.is_ok());
476
477        let query_result = result.expect("Query failed");
478        match query_result.result {
479            Some(query::query_result::Result::Single(single)) => {
480                assert!(single.value.is_some());
481            }
482            _ => panic!("Expected single result"),
483        }
484    }
485
486    #[tokio::test]
487    async fn test_set_query_execution() {
488        let storage = Arc::new(MemoryStorage::new());
489        let service = AqlServiceImpl::new(storage.clone());
490
491        let key = Key::from_str("test_key");
492        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
493
494        let query = Query::Set {
495            collection: "test".to_string(),
496            key: key.clone(),
497            value: value.clone(),
498        };
499
500        let result = service.execute_query_internal(query).await;
501        assert!(result.is_ok());
502
503        // Verify the value was stored
504        let stored = storage.get(&key).await.expect("Failed to get");
505        assert!(stored.is_some());
506        assert_eq!(stored.expect("No value"), value);
507    }
508
509    #[tokio::test]
510    async fn test_delete_query_execution() {
511        let storage = Arc::new(MemoryStorage::new());
512        let key = Key::from_str("test_key");
513        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
514
515        storage.put(&key, &value).await.expect("Failed to put");
516
517        let service = AqlServiceImpl::new(storage.clone());
518
519        let query = Query::Delete {
520            collection: "test".to_string(),
521            key: key.clone(),
522        };
523
524        let result = service.execute_query_internal(query).await;
525        assert!(result.is_ok());
526
527        // Verify the value was deleted
528        let stored = storage.get(&key).await.expect("Failed to get");
529        assert!(stored.is_none());
530    }
531
532    #[tokio::test]
533    async fn test_range_query_execution() {
534        let storage = Arc::new(MemoryStorage::new());
535
536        // Insert test data
537        for i in 0..10 {
538            let key = Key::from_str(&format!("key_{:02}", i));
539            let value = CipherBlob::new(vec![i as u8]);
540            storage.put(&key, &value).await.expect("Failed to put");
541        }
542
543        let service = AqlServiceImpl::new(storage);
544
545        let query = Query::Range {
546            collection: "test".to_string(),
547            start: Key::from_str("key_03"),
548            end: Key::from_str("key_07"),
549        };
550
551        let result = service.execute_query_internal(query).await;
552        assert!(result.is_ok());
553
554        let query_result = result.expect("Query failed");
555        match query_result.result {
556            Some(query::query_result::Result::Multi(multi)) => {
557                assert!(!multi.values.is_empty());
558            }
559            _ => panic!("Expected multi result"),
560        }
561    }
562
563    #[tokio::test]
564    async fn test_get_nonexistent_key() {
565        let storage = Arc::new(MemoryStorage::new());
566        let service = AqlServiceImpl::new(storage);
567
568        let query = Query::Get {
569            collection: "test".to_string(),
570            key: Key::from_str("nonexistent"),
571        };
572
573        let result = service.execute_query_internal(query).await;
574        assert!(result.is_ok());
575
576        let query_result = result.expect("Query failed");
577        match query_result.result {
578            Some(query::query_result::Result::Single(single)) => {
579                assert!(single.value.is_none());
580            }
581            _ => panic!("Expected single result"),
582        }
583    }
584
585    #[tokio::test]
586    async fn test_health_check() {
587        let storage = Arc::new(MemoryStorage::new());
588        let service = AqlServiceImpl::new(storage);
589
590        let request = aql::HealthCheckRequest { service: None };
591        let response = service.health_check(request).await;
592
593        assert_eq!(response.status, aql::HealthStatus::HealthServing as i32);
594    }
595
596    #[tokio::test]
597    async fn test_server_info() {
598        let storage = Arc::new(MemoryStorage::new());
599        let service = AqlServiceImpl::new(storage);
600
601        let request = aql::ServerInfoRequest {};
602        let response = service.get_server_info(request).await;
603
604        assert!(response.version.is_some());
605        assert!(!response.capabilities.is_empty());
606        assert!(response.capabilities.contains(&"query.get".to_string()));
607    }
608}