1use crate::config::{ClientConfig, RetryConfig};
4use crate::connection::{Connection, ConnectionPool};
5use crate::error::{Result, SdkError};
6use crate::fhe::FheEncryptor;
7use amaters_core::{CipherBlob, Key, Query};
8use std::sync::Arc;
9use tokio::time::{sleep, timeout};
10use tracing::{debug, info, warn};
11
12#[derive(Clone)]
17pub struct AmateRSClient {
18 pool: Arc<ConnectionPool>,
19 config: Arc<ClientConfig>,
20 encryptor: Option<Arc<FheEncryptor>>,
21}
22
23impl AmateRSClient {
24 pub async fn connect(addr: impl Into<String>) -> Result<Self> {
37 let config = ClientConfig::new(addr);
38 Self::connect_with_config(config).await
39 }
40
41 pub async fn connect_with_config(config: ClientConfig) -> Result<Self> {
59 info!("Connecting to AmateRS server at {}", config.server_addr);
60
61 let pool = ConnectionPool::new(config.clone());
62
63 let _conn = pool.get().await?;
65
66 info!("Successfully connected to AmateRS server");
67
68 Ok(Self {
69 pool: Arc::new(pool),
70 config: Arc::new(config),
71 encryptor: None,
72 })
73 }
74
75 pub fn with_encryptor(mut self, encryptor: FheEncryptor) -> Self {
77 self.encryptor = Some(Arc::new(encryptor));
78 self
79 }
80
81 pub fn encryptor(&self) -> Option<&Arc<FheEncryptor>> {
83 self.encryptor.as_ref()
84 }
85
86 async fn execute_with_retry<F, Fut, T>(&self, operation: F) -> Result<T>
88 where
89 F: Fn(Connection) -> Fut,
90 Fut: std::future::Future<Output = Result<T>>,
91 {
92 let retry_config = &self.config.retry_config;
93 let mut attempt = 0;
94
95 loop {
96 attempt += 1;
97
98 let conn = self.pool.get().await?;
100
101 match operation(conn).await {
103 Ok(result) => return Ok(result),
104 Err(e) if e.is_retryable() && attempt <= retry_config.max_retries => {
105 let backoff = retry_config.backoff_duration(attempt);
106 warn!(
107 "Operation failed (attempt {}), retrying after {:?}: {}",
108 attempt, backoff, e
109 );
110 sleep(backoff).await;
111 }
112 Err(e) => return Err(e),
113 }
114 }
115 }
116
117 pub async fn set(&self, collection: &str, key: &Key, value: &CipherBlob) -> Result<()> {
134 debug!("Set: collection={}, key={}", collection, key);
135
136 let collection = collection.to_string();
137 let key = key.clone();
138 let value = value.clone();
139
140 self.execute_with_retry(move |conn| {
141 let collection = collection.clone();
142 let key = key.clone();
143 let value = value.clone();
144
145 async move {
146 use amaters_net::convert::{create_version, query_to_proto};
147 use amaters_net::proto::aql::QueryRequest;
148 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
149
150 let mut client = AqlServiceClient::new(conn.channel().clone());
151
152 let query = Query::Set {
153 collection,
154 key,
155 value,
156 };
157
158 let proto_query = query_to_proto(&query)?;
159
160 let request = tonic::Request::new(QueryRequest {
161 query: Some(proto_query),
162 request_id: Some(uuid::Uuid::new_v4().to_string()),
163 timeout_ms: Some(30000),
164 transaction_id: None,
165 version: Some(create_version()),
166 });
167
168 let response = client.execute_query(request).await?;
169
170 match response.into_inner().response {
172 Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
173 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
174 SdkError::OperationFailed(format!("Server error: {}", e.message)),
175 ),
176 None => Err(SdkError::OperationFailed(
177 "Empty response from server".to_string(),
178 )),
179 }
180 }
181 })
182 .await
183 }
184
185 pub async fn get(&self, collection: &str, key: &Key) -> Result<Option<CipherBlob>> {
205 debug!("Get: collection={}, key={}", collection, key);
206
207 let collection = collection.to_string();
208 let key = key.clone();
209
210 self.execute_with_retry(move |conn| {
211 let collection = collection.clone();
212 let key = key.clone();
213
214 async move {
215 use amaters_net::convert::{
216 cipher_blob_from_proto, create_version, query_to_proto,
217 };
218 use amaters_net::proto::aql::QueryRequest;
219 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
220
221 let mut client = AqlServiceClient::new(conn.channel().clone());
222
223 let query = Query::Get { collection, key };
224
225 let proto_query = query_to_proto(&query)?;
226
227 let request = tonic::Request::new(QueryRequest {
228 query: Some(proto_query),
229 request_id: Some(uuid::Uuid::new_v4().to_string()),
230 timeout_ms: Some(30000),
231 transaction_id: None,
232 version: Some(create_version()),
233 });
234
235 let response = client.execute_query(request).await?;
236
237 match response.into_inner().response {
239 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
240 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
241 match result.result {
242 Some(QueryResultEnum::Single(single)) => {
243 if let Some(value) = single.value {
244 Ok(Some(cipher_blob_from_proto(value)?))
245 } else {
246 Ok(None)
247 }
248 }
249 _ => Err(SdkError::OperationFailed(
250 "Expected single result".to_string(),
251 )),
252 }
253 }
254 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
255 SdkError::OperationFailed(format!("Server error: {}", e.message)),
256 ),
257 None => Err(SdkError::OperationFailed(
258 "Empty response from server".to_string(),
259 )),
260 }
261 }
262 })
263 .await
264 }
265
266 pub async fn delete(&self, collection: &str, key: &Key) -> Result<()> {
281 debug!("Delete: collection={}, key={}", collection, key);
282
283 let collection = collection.to_string();
284 let key = key.clone();
285
286 self.execute_with_retry(move |conn| {
287 let collection = collection.clone();
288 let key = key.clone();
289
290 async move {
291 use amaters_net::convert::{create_version, query_to_proto};
292 use amaters_net::proto::aql::QueryRequest;
293 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
294
295 let mut client = AqlServiceClient::new(conn.channel().clone());
296
297 let query = Query::Delete { collection, key };
298
299 let proto_query = query_to_proto(&query)?;
300
301 let request = tonic::Request::new(QueryRequest {
302 query: Some(proto_query),
303 request_id: Some(uuid::Uuid::new_v4().to_string()),
304 timeout_ms: Some(30000),
305 transaction_id: None,
306 version: Some(create_version()),
307 });
308
309 let response = client.execute_query(request).await?;
310
311 match response.into_inner().response {
313 Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
314 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
315 SdkError::OperationFailed(format!("Server error: {}", e.message)),
316 ),
317 None => Err(SdkError::OperationFailed(
318 "Empty response from server".to_string(),
319 )),
320 }
321 }
322 })
323 .await
324 }
325
326 pub async fn contains(&self, collection: &str, key: &Key) -> Result<bool> {
344 debug!("Contains: collection={}, key={}", collection, key);
345
346 let result = self.get(collection, key).await?;
348 Ok(result.is_some())
349 }
350
351 pub async fn execute_query(&self, query: &Query) -> Result<QueryResult> {
372 debug!("Executing query: {:?}", query);
373
374 let query = query.clone();
375
376 self.execute_with_retry(move |conn| {
377 let query = query.clone();
378
379 async move {
380 use amaters_net::convert::{
381 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
382 };
383 use amaters_net::proto::aql::QueryRequest;
384 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
385
386 let mut client = AqlServiceClient::new(conn.channel().clone());
387
388 let proto_query = query_to_proto(&query)?;
389
390 let request = tonic::Request::new(QueryRequest {
391 query: Some(proto_query),
392 request_id: Some(uuid::Uuid::new_v4().to_string()),
393 timeout_ms: Some(30000),
394 transaction_id: None,
395 version: Some(create_version()),
396 });
397
398 let response = client.execute_query(request).await?;
399
400 match response.into_inner().response {
402 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
403 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
404 match result.result {
405 Some(QueryResultEnum::Single(single)) => {
406 let value = if let Some(v) = single.value {
407 Some(cipher_blob_from_proto(v)?)
408 } else {
409 None
410 };
411 Ok(QueryResult::Single(value))
412 }
413 Some(QueryResultEnum::Multi(multi)) => {
414 let mut values = Vec::new();
415 for kv in multi.values {
416 let key = kv.key.ok_or_else(|| {
417 SdkError::OperationFailed(
418 "Missing key in result".to_string(),
419 )
420 })?;
421 let value = kv.value.ok_or_else(|| {
422 SdkError::OperationFailed(
423 "Missing value in result".to_string(),
424 )
425 })?;
426 values.push((
427 key_from_proto(key),
428 cipher_blob_from_proto(value)?,
429 ));
430 }
431 Ok(QueryResult::Multi(values))
432 }
433 Some(QueryResultEnum::Success(success)) => Ok(QueryResult::Success {
434 affected_rows: success.affected_rows,
435 }),
436 None => Err(SdkError::OperationFailed(
437 "Empty result from server".to_string(),
438 )),
439 }
440 }
441 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
442 SdkError::OperationFailed(format!("Server error: {}", e.message)),
443 ),
444 None => Err(SdkError::OperationFailed(
445 "Empty response from server".to_string(),
446 )),
447 }
448 }
449 })
450 .await
451 }
452
453 pub async fn execute_batch(&self, queries: Vec<Query>) -> Result<Vec<QueryResult>> {
457 debug!("Executing batch of {} queries", queries.len());
458
459 self.execute_with_retry(move |conn| {
460 let queries = queries.clone();
461
462 async move {
463 use amaters_net::convert::{
464 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
465 };
466 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
467 use amaters_net::proto::aql::{BatchRequest, IsolationLevel};
468
469 let mut client = AqlServiceClient::new(conn.channel().clone());
470
471 let mut proto_queries = Vec::new();
473 for query in &queries {
474 proto_queries.push(query_to_proto(query)?);
475 }
476
477 let request = tonic::Request::new(BatchRequest {
478 queries: proto_queries,
479 request_id: Some(uuid::Uuid::new_v4().to_string()),
480 timeout_ms: Some(60000), isolation_level: IsolationLevel::IsolationDefault as i32,
482 version: Some(create_version()),
483 });
484
485 let response = client.execute_batch(request).await?;
486
487 match response.into_inner().response {
489 Some(amaters_net::proto::aql::batch_response::Response::Results(
490 batch_result,
491 )) => {
492 let mut results = Vec::new();
493 for result in batch_result.results {
494 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
495 let query_result = match result.result {
496 Some(QueryResultEnum::Single(single)) => {
497 let value = if let Some(v) = single.value {
498 Some(cipher_blob_from_proto(v)?)
499 } else {
500 None
501 };
502 QueryResult::Single(value)
503 }
504 Some(QueryResultEnum::Multi(multi)) => {
505 let mut values = Vec::new();
506 for kv in multi.values {
507 let key = kv.key.ok_or_else(|| {
508 SdkError::OperationFailed(
509 "Missing key in result".to_string(),
510 )
511 })?;
512 let value = kv.value.ok_or_else(|| {
513 SdkError::OperationFailed(
514 "Missing value in result".to_string(),
515 )
516 })?;
517 values.push((
518 key_from_proto(key),
519 cipher_blob_from_proto(value)?,
520 ));
521 }
522 QueryResult::Multi(values)
523 }
524 Some(QueryResultEnum::Success(success)) => QueryResult::Success {
525 affected_rows: success.affected_rows,
526 },
527 None => {
528 return Err(SdkError::OperationFailed(
529 "Empty result from server".to_string(),
530 ));
531 }
532 };
533 results.push(query_result);
534 }
535 Ok(results)
536 }
537 Some(amaters_net::proto::aql::batch_response::Response::Error(e)) => Err(
538 SdkError::OperationFailed(format!("Batch error: {}", e.message)),
539 ),
540 None => Err(SdkError::OperationFailed(
541 "Empty response from server".to_string(),
542 )),
543 }
544 }
545 })
546 .await
547 }
548
549 pub fn pool_stats(&self) -> crate::connection::PoolStats {
551 self.pool.stats()
552 }
553
554 pub fn close(&self) {
556 info!("Closing client");
557 self.pool.close_all();
558 }
559
560 pub async fn health_check(&self) -> Result<()> {
564 debug!("Performing health check");
565
566 let result = timeout(
567 self.config.request_timeout,
568 self.execute_with_retry(|conn| async move {
569 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
570 use amaters_net::proto::aql::{HealthCheckRequest, HealthStatus};
571
572 let mut client = AqlServiceClient::new(conn.channel().clone());
573
574 let request = tonic::Request::new(HealthCheckRequest { service: None });
575
576 let response = client.health_check(request).await?;
577 let health_response = response.into_inner();
578
579 if health_response.status == HealthStatus::HealthServing as i32 {
580 Ok(())
581 } else {
582 Err(SdkError::OperationFailed(format!(
583 "Server unhealthy: {:?}",
584 health_response.message
585 )))
586 }
587 }),
588 )
589 .await;
590
591 match result {
592 Ok(Ok(())) => {
593 debug!("Health check passed");
594 Ok(())
595 }
596 Ok(Err(e)) => {
597 warn!("Health check failed: {}", e);
598 Err(e)
599 }
600 Err(_) => {
601 warn!("Health check timeout");
602 Err(SdkError::Timeout("health check timeout".to_string()))
603 }
604 }
605 }
606
607 pub async fn server_info(&self) -> Result<ServerInfo> {
611 debug!("Getting server info");
612
613 self.execute_with_retry(|conn| async move {
614 use amaters_net::proto::aql::ServerInfoRequest;
615 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
616
617 let mut client = AqlServiceClient::new(conn.channel().clone());
618
619 let request = tonic::Request::new(ServerInfoRequest {});
620
621 let response = client.get_server_info(request).await?;
622 let info = response.into_inner();
623
624 Ok(ServerInfo {
625 version: info.version.map(|v| (v.major, v.minor, v.patch)),
626 supported_versions: info
627 .supported_versions
628 .into_iter()
629 .map(|v| (v.major, v.minor, v.patch))
630 .collect(),
631 capabilities: info.capabilities,
632 uptime_seconds: info.uptime_seconds,
633 })
634 })
635 .await
636 }
637
638 pub async fn range(
656 &self,
657 collection: &str,
658 start: &Key,
659 end: &Key,
660 ) -> Result<Vec<(Key, CipherBlob)>> {
661 debug!(
662 "Range: collection={}, start={}, end={}",
663 collection, start, end
664 );
665
666 let collection = collection.to_string();
667 let start = start.clone();
668 let end = end.clone();
669
670 self.execute_with_retry(move |conn| {
671 let collection = collection.clone();
672 let start = start.clone();
673 let end = end.clone();
674
675 async move {
676 use amaters_net::convert::{
677 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
678 };
679 use amaters_net::proto::aql::QueryRequest;
680 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
681
682 let mut client = AqlServiceClient::new(conn.channel().clone());
683
684 let query = Query::Range {
685 collection,
686 start,
687 end,
688 };
689
690 let proto_query = query_to_proto(&query)?;
691
692 let request = tonic::Request::new(QueryRequest {
693 query: Some(proto_query),
694 request_id: Some(uuid::Uuid::new_v4().to_string()),
695 timeout_ms: Some(30000),
696 transaction_id: None,
697 version: Some(create_version()),
698 });
699
700 let response = client.execute_query(request).await?;
701
702 match response.into_inner().response {
704 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
705 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
706 match result.result {
707 Some(QueryResultEnum::Multi(multi)) => {
708 let mut values = Vec::new();
709 for kv in multi.values {
710 let key = kv.key.ok_or_else(|| {
711 SdkError::OperationFailed(
712 "Missing key in result".to_string(),
713 )
714 })?;
715 let value = kv.value.ok_or_else(|| {
716 SdkError::OperationFailed(
717 "Missing value in result".to_string(),
718 )
719 })?;
720 values.push((
721 key_from_proto(key),
722 cipher_blob_from_proto(value)?,
723 ));
724 }
725 Ok(values)
726 }
727 _ => Err(SdkError::OperationFailed(
728 "Expected multi result for range query".to_string(),
729 )),
730 }
731 }
732 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
733 SdkError::OperationFailed(format!("Server error: {}", e.message)),
734 ),
735 None => Err(SdkError::OperationFailed(
736 "Empty response from server".to_string(),
737 )),
738 }
739 }
740 })
741 .await
742 }
743}
744
745#[derive(Debug, Clone)]
747pub struct ServerInfo {
748 pub version: Option<(u32, u32, u32)>,
750 pub supported_versions: Vec<(u32, u32, u32)>,
752 pub capabilities: Vec<String>,
754 pub uptime_seconds: u64,
756}
757
758#[derive(Debug, Clone)]
760pub enum QueryResult {
761 Single(Option<CipherBlob>),
763 Multi(Vec<(Key, CipherBlob)>),
765 Success { affected_rows: u64 },
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772
773 #[tokio::test]
774 async fn test_retry_config() {
775 let config = RetryConfig::default();
776 assert_eq!(config.max_retries, 3);
777
778 let backoff = config.backoff_duration(1);
779 assert!(backoff.as_millis() > 0);
780 }
781
782 #[test]
783 fn test_query_result() {
784 let result = QueryResult::Success { affected_rows: 5 };
785 match result {
786 QueryResult::Success { affected_rows } => {
787 assert_eq!(affected_rows, 5);
788 }
789 _ => panic!("expected Success"),
790 }
791 }
792}