1use crate::convert::{cipher_blob_to_proto, create_version, key_to_proto, query_from_proto};
7use crate::error::{NetError, NetResult};
8use crate::proto::{aql, query};
9use amaters_core::Query;
10use amaters_core::traits::StorageEngine;
11use std::sync::Arc;
12use std::time::Instant;
13use tracing::{debug, error, info, warn};
14
15#[cfg(feature = "compute")]
16use amaters_core::compute::{FheExecutor, KeyManager, PredicateCompiler};
17#[cfg(feature = "compute")]
18use amaters_core::types::Key;
19#[cfg(feature = "compute")]
20use std::collections::HashMap;
21
22pub struct AqlServiceImpl<S: StorageEngine> {
26 storage: Arc<S>,
28 start_time: Instant,
30 #[cfg(feature = "compute")]
32 key_manager: Arc<KeyManager>,
33}
34
35impl<S: StorageEngine> AqlServiceImpl<S> {
36 #[cfg(feature = "compute")]
38 pub fn new(storage: Arc<S>) -> Self {
39 Self {
40 storage,
41 start_time: Instant::now(),
42 key_manager: Arc::new(KeyManager::new()),
43 }
44 }
45
46 #[cfg(not(feature = "compute"))]
48 pub fn new(storage: Arc<S>) -> Self {
49 Self {
50 storage,
51 start_time: Instant::now(),
52 }
53 }
54
55 #[cfg(feature = "compute")]
57 pub fn with_key_manager(storage: Arc<S>, key_manager: Arc<KeyManager>) -> Self {
58 Self {
59 storage,
60 start_time: Instant::now(),
61 key_manager,
62 }
63 }
64
65 pub async fn execute_query(&self, request: aql::QueryRequest) -> aql::QueryResponse {
67 let start_time = Instant::now();
68
69 info!(
70 "ExecuteQuery request received: request_id={:?}",
71 request.request_id
72 );
73
74 let proto_query = match request.query {
76 Some(q) => q,
77 None => {
78 let execution_time_ms = start_time.elapsed().as_millis() as u64;
79 return aql::QueryResponse {
80 response: Some(aql::query_response::Response::Error(
81 crate::proto::errors::ErrorResponse {
82 code: crate::proto::errors::ErrorCode::ErrorProtocolMissingField as i32,
83 message: "Missing query in request".to_string(),
84 category: crate::proto::errors::ErrorCategory::CategoryClientError
85 as i32,
86 details: None,
87 retry_after: None,
88 },
89 )),
90 request_id: request.request_id,
91 execution_time_ms,
92 };
93 }
94 };
95
96 let query = match query_from_proto(proto_query) {
97 Ok(q) => q,
98 Err(e) => {
99 error!("Failed to parse query: {}", e);
100 let execution_time_ms = start_time.elapsed().as_millis() as u64;
101 return aql::QueryResponse {
102 response: Some(aql::query_response::Response::Error(
103 crate::proto::errors::ErrorResponse {
104 code: e.error_code() as i32,
105 message: e.to_string(),
106 category: e.error_category() as i32,
107 details: None,
108 retry_after: None,
109 },
110 )),
111 request_id: request.request_id,
112 execution_time_ms,
113 };
114 }
115 };
116
117 let result = self.execute_query_internal(query).await;
119
120 let execution_time_ms = start_time.elapsed().as_millis() as u64;
121
122 match result {
124 Ok(query_result) => aql::QueryResponse {
125 response: Some(aql::query_response::Response::Result(query_result)),
126 request_id: request.request_id,
127 execution_time_ms,
128 },
129 Err(e) => {
130 error!("Query execution failed: {}", e);
131 aql::QueryResponse {
132 response: Some(aql::query_response::Response::Error(
133 crate::proto::errors::ErrorResponse {
134 code: e.error_code() as i32,
135 message: e.to_string(),
136 category: e.error_category() as i32,
137 details: None,
138 retry_after: None,
139 },
140 )),
141 request_id: request.request_id,
142 execution_time_ms,
143 }
144 }
145 }
146 }
147
148 #[doc(hidden)]
153 pub async fn execute_query_internal(&self, query: Query) -> NetResult<query::QueryResult> {
154 match query {
155 Query::Get { collection, key } => {
156 debug!(
157 "Executing GET query: collection={}, key={:?}",
158 collection, key
159 );
160
161 let result = self.storage.get(&key).await?;
162
163 let result = match result {
164 Some(value) => query::QueryResult {
165 result: Some(query::query_result::Result::Single(query::SingleResult {
166 value: Some(cipher_blob_to_proto(&value)),
167 })),
168 },
169 None => query::QueryResult {
170 result: Some(query::query_result::Result::Single(query::SingleResult {
171 value: None,
172 })),
173 },
174 };
175
176 Ok(result)
177 }
178 Query::Set {
179 collection,
180 key,
181 value,
182 } => {
183 debug!(
184 "Executing SET query: collection={}, key={:?}",
185 collection, key
186 );
187
188 self.storage.put(&key, &value).await?;
189
190 Ok(query::QueryResult {
191 result: Some(query::query_result::Result::Success(query::SuccessResult {
192 affected_rows: 1,
193 })),
194 })
195 }
196 Query::Delete { collection, key } => {
197 debug!(
198 "Executing DELETE query: collection={}, key={:?}",
199 collection, key
200 );
201
202 self.storage.delete(&key).await?;
203
204 Ok(query::QueryResult {
205 result: Some(query::query_result::Result::Success(query::SuccessResult {
206 affected_rows: 1,
207 })),
208 })
209 }
210 Query::Range {
211 collection,
212 start,
213 end,
214 } => {
215 debug!(
216 "Executing RANGE query: collection={}, start={:?}, end={:?}",
217 collection, start, end
218 );
219
220 let results = self.storage.range(&start, &end).await?;
221
222 let values: Vec<query::KeyValue> = results
223 .into_iter()
224 .map(|(k, v)| query::KeyValue {
225 key: Some(key_to_proto(&k)),
226 value: Some(cipher_blob_to_proto(&v)),
227 })
228 .collect();
229
230 Ok(query::QueryResult {
231 result: Some(query::query_result::Result::Multi(query::MultiResult {
232 values,
233 })),
234 })
235 }
236 Query::Filter {
237 collection,
238 predicate,
239 } => {
240 #[cfg(feature = "compute")]
241 {
242 info!("Executing FILTER query with FHE predicate evaluation");
243
244 let mut compiler = PredicateCompiler::new();
251
252 let circuit = match compiler
255 .compile(&predicate, amaters_core::compute::EncryptedType::U8)
256 {
257 Ok(c) => c,
258 Err(e) => {
259 error!("Failed to compile predicate: {}", e);
260 return Err(NetError::ServerInternal(format!(
261 "Predicate compilation failed: {}",
262 e
263 )));
264 }
265 };
266
267 debug!(
268 "Compiled predicate circuit: depth={}, gates={}",
269 circuit.depth, circuit.gate_count
270 );
271
272 let min_key = Key::from_slice(&[]);
276 let max_key = Key::from_slice(&[0xFF; 256]); let all_rows = match self.storage.range(&min_key, &max_key).await {
279 Ok(rows) => rows,
280 Err(e) => {
281 error!("Failed to retrieve rows for filtering: {}", e);
282 return Err(NetError::from(e));
283 }
284 };
285
286 debug!("Retrieved {} rows for filtering", all_rows.len());
287
288 if all_rows.len() > 1000 {
290 warn!(
291 "Filter query retrieved {} rows, which may cause performance issues",
292 all_rows.len()
293 );
294 }
295
296 let rhs = match PredicateCompiler::extract_rhs_value(&predicate) {
298 Ok(r) => r,
299 Err(e) => {
300 error!("Failed to extract RHS value: {}", e);
301 return Err(NetError::ServerInternal(format!(
302 "RHS extraction failed: {}",
303 e
304 )));
305 }
306 };
307
308 let executor = FheExecutor::new();
310
311 let mut results = Vec::new();
315 let mut execution_errors = 0;
316
317 for (key, value_blob) in all_rows {
318 let mut inputs = HashMap::new();
320 inputs.insert("value".to_string(), value_blob.clone());
321 inputs.insert("rhs".to_string(), rhs.clone());
322
323 match executor.execute(&circuit, &inputs) {
326 Ok(result_blob) => {
327 results.push(query::KeyValue {
331 key: Some(key_to_proto(&key)),
332 value: Some(cipher_blob_to_proto(&value_blob)),
333 });
336
337 debug!(
338 "Executed predicate on key {:?}, result blob size: {}",
339 key,
340 result_blob.as_bytes().len()
341 );
342 }
343 Err(e) => {
344 execution_errors += 1;
345 warn!("FHE execution failed for key {:?}: {}", key, e);
346 }
348 }
349 }
350
351 if execution_errors > 0 {
352 warn!(
353 "Filter query had {} FHE execution errors out of {} total rows",
354 execution_errors,
355 execution_errors + results.len()
356 );
357 }
358
359 info!(
360 "FILTER query completed, processed {} rows successfully",
361 results.len()
362 );
363
364 Ok(query::QueryResult {
367 result: Some(query::query_result::Result::Multi(query::MultiResult {
368 values: results,
369 })),
370 })
371 }
372
373 #[cfg(not(feature = "compute"))]
374 {
375 let _ = (collection, predicate);
376 warn!("FILTER queries require compute feature to be enabled");
377 Err(NetError::ServerInternal(
378 "FILTER queries require compute feature".to_string(),
379 ))
380 }
381 }
382 Query::Update {
383 collection,
384 predicate,
385 updates,
386 } => {
387 warn!("UPDATE queries are not yet fully implemented");
388 Err(NetError::ServerInternal(
389 "UPDATE queries are not yet implemented".to_string(),
390 ))
391 }
392 }
393 }
394
395 pub async fn health_check(
397 &self,
398 _request: aql::HealthCheckRequest,
399 ) -> aql::HealthCheckResponse {
400 debug!("HealthCheck request received");
401
402 aql::HealthCheckResponse {
403 status: aql::HealthStatus::HealthServing as i32,
404 message: Some("Service is healthy".to_string()),
405 }
406 }
407
408 pub async fn get_server_info(
410 &self,
411 _request: aql::ServerInfoRequest,
412 ) -> aql::ServerInfoResponse {
413 debug!("GetServerInfo request received");
414
415 aql::ServerInfoResponse {
416 version: Some(create_version()),
417 supported_versions: vec![create_version()],
418 capabilities: vec![
419 "query.get".to_string(),
420 "query.set".to_string(),
421 "query.delete".to_string(),
422 "query.range".to_string(),
423 ],
424 uptime_seconds: self.start_time.elapsed().as_secs(),
425 }
426 }
427}
428
429pub struct AqlServerBuilder<S: StorageEngine> {
431 storage: Arc<S>,
432}
433
434impl<S: StorageEngine> AqlServerBuilder<S> {
435 pub fn new(storage: Arc<S>) -> Self {
437 Self { storage }
438 }
439
440 pub fn build(self) -> AqlServiceImpl<S> {
442 AqlServiceImpl::new(self.storage)
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use amaters_core::storage::MemoryStorage;
450 use amaters_core::types::{CipherBlob, Key};
451
452 #[tokio::test]
453 async fn test_service_creation() {
454 let storage = Arc::new(MemoryStorage::new());
455 let service = AqlServiceImpl::new(storage);
456 assert!(service.start_time.elapsed().as_secs() < 1);
457 }
458
459 #[tokio::test]
460 async fn test_get_query_execution() {
461 let storage = Arc::new(MemoryStorage::new());
462 let key = Key::from_str("test_key");
463 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
464
465 storage.put(&key, &value).await.expect("Failed to put");
466
467 let service = AqlServiceImpl::new(storage);
468
469 let query = Query::Get {
470 collection: "test".to_string(),
471 key: key.clone(),
472 };
473
474 let result = service.execute_query_internal(query).await;
475 assert!(result.is_ok());
476
477 let query_result = result.expect("Query failed");
478 match query_result.result {
479 Some(query::query_result::Result::Single(single)) => {
480 assert!(single.value.is_some());
481 }
482 _ => panic!("Expected single result"),
483 }
484 }
485
486 #[tokio::test]
487 async fn test_set_query_execution() {
488 let storage = Arc::new(MemoryStorage::new());
489 let service = AqlServiceImpl::new(storage.clone());
490
491 let key = Key::from_str("test_key");
492 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
493
494 let query = Query::Set {
495 collection: "test".to_string(),
496 key: key.clone(),
497 value: value.clone(),
498 };
499
500 let result = service.execute_query_internal(query).await;
501 assert!(result.is_ok());
502
503 let stored = storage.get(&key).await.expect("Failed to get");
505 assert!(stored.is_some());
506 assert_eq!(stored.expect("No value"), value);
507 }
508
509 #[tokio::test]
510 async fn test_delete_query_execution() {
511 let storage = Arc::new(MemoryStorage::new());
512 let key = Key::from_str("test_key");
513 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
514
515 storage.put(&key, &value).await.expect("Failed to put");
516
517 let service = AqlServiceImpl::new(storage.clone());
518
519 let query = Query::Delete {
520 collection: "test".to_string(),
521 key: key.clone(),
522 };
523
524 let result = service.execute_query_internal(query).await;
525 assert!(result.is_ok());
526
527 let stored = storage.get(&key).await.expect("Failed to get");
529 assert!(stored.is_none());
530 }
531
532 #[tokio::test]
533 async fn test_range_query_execution() {
534 let storage = Arc::new(MemoryStorage::new());
535
536 for i in 0..10 {
538 let key = Key::from_str(&format!("key_{:02}", i));
539 let value = CipherBlob::new(vec![i as u8]);
540 storage.put(&key, &value).await.expect("Failed to put");
541 }
542
543 let service = AqlServiceImpl::new(storage);
544
545 let query = Query::Range {
546 collection: "test".to_string(),
547 start: Key::from_str("key_03"),
548 end: Key::from_str("key_07"),
549 };
550
551 let result = service.execute_query_internal(query).await;
552 assert!(result.is_ok());
553
554 let query_result = result.expect("Query failed");
555 match query_result.result {
556 Some(query::query_result::Result::Multi(multi)) => {
557 assert!(!multi.values.is_empty());
558 }
559 _ => panic!("Expected multi result"),
560 }
561 }
562
563 #[tokio::test]
564 async fn test_get_nonexistent_key() {
565 let storage = Arc::new(MemoryStorage::new());
566 let service = AqlServiceImpl::new(storage);
567
568 let query = Query::Get {
569 collection: "test".to_string(),
570 key: Key::from_str("nonexistent"),
571 };
572
573 let result = service.execute_query_internal(query).await;
574 assert!(result.is_ok());
575
576 let query_result = result.expect("Query failed");
577 match query_result.result {
578 Some(query::query_result::Result::Single(single)) => {
579 assert!(single.value.is_none());
580 }
581 _ => panic!("Expected single result"),
582 }
583 }
584
585 #[tokio::test]
586 async fn test_health_check() {
587 let storage = Arc::new(MemoryStorage::new());
588 let service = AqlServiceImpl::new(storage);
589
590 let request = aql::HealthCheckRequest { service: None };
591 let response = service.health_check(request).await;
592
593 assert_eq!(response.status, aql::HealthStatus::HealthServing as i32);
594 }
595
596 #[tokio::test]
597 async fn test_server_info() {
598 let storage = Arc::new(MemoryStorage::new());
599 let service = AqlServiceImpl::new(storage);
600
601 let request = aql::ServerInfoRequest {};
602 let response = service.get_server_info(request).await;
603
604 assert!(response.version.is_some());
605 assert!(!response.capabilities.is_empty());
606 assert!(response.capabilities.contains(&"query.get".to_string()));
607 }
608}