1use crate::cache::{InvalidationPolicy, QueryCache, QueryCacheConfig};
4use crate::config::{ClientConfig, RetryConfig};
5use crate::connection::{Connection, ConnectionPool};
6use crate::error::{Result, SdkError};
7use crate::fhe::FheEncryptor;
8use crate::streaming::{QueryStream, Row, StreamConfig};
9use amaters_core::{CipherBlob, Key, Query};
10use futures::StreamExt as _;
11use std::sync::Arc;
12use tokio::time::{sleep, timeout};
13use tracing::{debug, info, warn};
14
15#[derive(Debug, Clone)]
21pub struct PaginationConfig {
22 pub page_size: usize,
24 pub cursor: Option<String>,
26 pub offset: usize,
31}
32
33impl Default for PaginationConfig {
34 fn default() -> Self {
35 Self {
36 page_size: 100,
37 cursor: None,
38 offset: 0,
39 }
40 }
41}
42
43impl PaginationConfig {
44 pub fn new(page_size: usize) -> Self {
46 Self {
47 page_size,
48 cursor: None,
49 offset: 0,
50 }
51 }
52
53 #[must_use]
55 pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
56 self.cursor = Some(cursor.into());
57 self
58 }
59
60 #[must_use]
62 pub fn with_offset(mut self, offset: usize) -> Self {
63 self.offset = offset;
64 self
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct PaginatedResult<T> {
71 pub items: Vec<T>,
73 pub next_cursor: Option<String>,
75 pub has_more: bool,
77 pub total_hint: Option<usize>,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum SortOrder {
84 Ascending,
86 Descending,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum SortField {
93 Key,
95 Value,
97 Timestamp,
99}
100
101#[derive(Debug, Clone)]
103pub struct SortConfig {
104 pub field: SortField,
106 pub order: SortOrder,
108}
109
110impl SortConfig {
111 pub fn new(field: SortField, order: SortOrder) -> Self {
113 Self { field, order }
114 }
115}
116
117const CURSOR_SEPARATOR: u8 = b'|';
123
124fn encode_cursor(key: &Key) -> String {
129 let key_bytes = key.as_bytes();
130 let hash = blake3::hash(key_bytes);
131 let key_hex = hex_encode(key_bytes);
132 let hash_hex = hex_encode(hash.as_bytes());
133 format!("{}{}{}", key_hex, CURSOR_SEPARATOR as char, hash_hex)
134}
135
136fn decode_cursor(cursor: &str) -> Result<Key> {
141 let parts: Vec<&str> = cursor.split(CURSOR_SEPARATOR as char).collect();
142 if parts.len() != 2 {
143 return Err(SdkError::InvalidArgument(
144 "malformed cursor: expected two parts separated by '|'".to_string(),
145 ));
146 }
147
148 let key_bytes = hex_decode(parts[0])
149 .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor key: {}", e)))?;
150
151 let hash_bytes = hex_decode(parts[1])
152 .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor hash: {}", e)))?;
153
154 let expected_hash = blake3::hash(&key_bytes);
156 if hash_bytes.len() != 32 || expected_hash.as_bytes() != hash_bytes.as_slice() {
157 return Err(SdkError::InvalidArgument(
158 "cursor integrity check failed: hash mismatch".to_string(),
159 ));
160 }
161
162 Ok(Key::from_slice(&key_bytes))
163}
164
165fn hex_encode(bytes: &[u8]) -> String {
167 use std::fmt::Write;
168 bytes
169 .iter()
170 .fold(String::with_capacity(bytes.len() * 2), |mut s, b| {
171 let _ = write!(&mut s, "{:02x}", b);
172 s
173 })
174}
175
176fn hex_decode(hex: &str) -> std::result::Result<Vec<u8>, String> {
178 if hex.len() % 2 != 0 {
179 return Err("odd-length hex string".to_string());
180 }
181 (0..hex.len())
182 .step_by(2)
183 .map(|i| {
184 u8::from_str_radix(&hex[i..i + 2], 16)
185 .map_err(|e| format!("invalid hex at offset {}: {}", i, e))
186 })
187 .collect()
188}
189
190#[derive(Clone)]
195pub struct AmateRSClient {
196 pool: Arc<ConnectionPool>,
197 config: Arc<ClientConfig>,
198 encryptor: Option<Arc<FheEncryptor>>,
199 cache: Option<Arc<QueryCache>>,
200}
201
202impl AmateRSClient {
203 pub async fn connect(addr: impl Into<String>) -> Result<Self> {
216 let config = ClientConfig::new(addr);
217 Self::connect_with_config(config).await
218 }
219
220 pub async fn connect_with_config(config: ClientConfig) -> Result<Self> {
238 info!("Connecting to AmateRS server at {}", config.server_addr);
239
240 let pool = ConnectionPool::new(config.clone());
241
242 let _conn = pool.get().await?;
244
245 info!("Successfully connected to AmateRS server");
246
247 Ok(Self {
248 pool: Arc::new(pool),
249 config: Arc::new(config),
250 encryptor: None,
251 cache: None,
252 })
253 }
254
255 pub fn with_encryptor(mut self, encryptor: FheEncryptor) -> Self {
257 self.encryptor = Some(Arc::new(encryptor));
258 self
259 }
260
261 pub fn encryptor(&self) -> Option<&Arc<FheEncryptor>> {
263 self.encryptor.as_ref()
264 }
265
266 pub fn with_cache(mut self, config: QueryCacheConfig) -> Self {
282 self.cache = Some(Arc::new(QueryCache::new(config)));
283 self
284 }
285
286 pub fn cache(&self) -> Option<&Arc<QueryCache>> {
288 self.cache.as_ref()
289 }
290
291 async fn execute_with_retry<F, Fut, T>(&self, operation: F) -> Result<T>
293 where
294 F: Fn(Connection) -> Fut,
295 Fut: std::future::Future<Output = Result<T>>,
296 {
297 let retry_config = &self.config.retry_config;
298 let mut attempt = 0;
299
300 loop {
301 attempt += 1;
302
303 let conn = self.pool.get().await?;
305
306 match operation(conn).await {
308 Ok(result) => return Ok(result),
309 Err(e) if e.is_retryable() && attempt <= retry_config.max_retries => {
310 let backoff = retry_config.backoff_duration(attempt);
311 warn!(
312 "Operation failed (attempt {}), retrying after {:?}: {}",
313 attempt, backoff, e
314 );
315 sleep(backoff).await;
316 }
317 Err(e) => return Err(e),
318 }
319 }
320 }
321
322 pub async fn set(&self, collection: &str, key: &Key, value: &CipherBlob) -> Result<()> {
339 debug!("Set: collection={}, key={}", collection, key);
340
341 if let Some(ref cache) = self.cache {
343 if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
344 let cache_key = QueryCache::make_key(collection, key.as_bytes());
345 cache.invalidate(&cache_key);
346 }
347 }
348
349 let collection = collection.to_string();
350 let key = key.clone();
351 let value = value.clone();
352
353 self.execute_with_retry(move |conn| {
354 let collection = collection.clone();
355 let key = key.clone();
356 let value = value.clone();
357
358 async move {
359 use amaters_net::convert::{create_version, query_to_proto};
360 use amaters_net::proto::aql::QueryRequest;
361 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
362
363 let mut client = AqlServiceClient::new(conn.channel().clone());
364
365 let query = Query::Set {
366 collection,
367 key,
368 value,
369 };
370
371 let proto_query = query_to_proto(&query)?;
372
373 let request = tonic::Request::new(QueryRequest {
374 query: Some(proto_query),
375 request_id: Some(uuid::Uuid::new_v4().to_string()),
376 timeout_ms: Some(30000),
377 transaction_id: None,
378 version: Some(create_version()),
379 });
380
381 let response = client.execute_query(request).await?;
382
383 match response.into_inner().response {
385 Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
386 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
387 SdkError::OperationFailed(format!("Server error: {}", e.message)),
388 ),
389 None => Err(SdkError::OperationFailed(
390 "Empty response from server".to_string(),
391 )),
392 }
393 }
394 })
395 .await
396 }
397
398 pub async fn get(&self, collection: &str, key: &Key) -> Result<Option<CipherBlob>> {
418 debug!("Get: collection={}, key={}", collection, key);
419
420 if let Some(ref cache) = self.cache {
422 let cache_key = QueryCache::make_key(collection, key.as_bytes());
423 if let Some(cached_data) = cache.get(&cache_key) {
424 debug!("Cache hit for collection={}, key={}", collection, key);
425 return Ok(Some(CipherBlob::new(cached_data)));
426 }
427 }
428
429 let collection = collection.to_string();
430 let key = key.clone();
431 let cache = self.cache.clone();
432 let coll_for_cache = collection.clone();
433 let key_for_cache = key.clone();
434
435 let result = self
436 .execute_with_retry(move |conn| {
437 let collection = collection.clone();
438 let key = key.clone();
439
440 async move {
441 use amaters_net::convert::{
442 cipher_blob_from_proto, create_version, query_to_proto,
443 };
444 use amaters_net::proto::aql::QueryRequest;
445 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
446
447 let mut client = AqlServiceClient::new(conn.channel().clone());
448
449 let query = Query::Get { collection, key };
450
451 let proto_query = query_to_proto(&query)?;
452
453 let request = tonic::Request::new(QueryRequest {
454 query: Some(proto_query),
455 request_id: Some(uuid::Uuid::new_v4().to_string()),
456 timeout_ms: Some(30000),
457 transaction_id: None,
458 version: Some(create_version()),
459 });
460
461 let response = client.execute_query(request).await?;
462
463 match response.into_inner().response {
465 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
466 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
467 match result.result {
468 Some(QueryResultEnum::Single(single)) => {
469 if let Some(value) = single.value {
470 Ok(Some(cipher_blob_from_proto(value)?))
471 } else {
472 Ok(None)
473 }
474 }
475 _ => Err(SdkError::OperationFailed(
476 "Expected single result".to_string(),
477 )),
478 }
479 }
480 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
481 SdkError::OperationFailed(format!("Server error: {}", e.message)),
482 ),
483 None => Err(SdkError::OperationFailed(
484 "Empty response from server".to_string(),
485 )),
486 }
487 }
488 })
489 .await;
490
491 if let Ok(Some(ref blob)) = result {
493 if let Some(ref c) = cache {
494 let ck = QueryCache::make_key(&coll_for_cache, key_for_cache.as_bytes());
495 c.put_with_collection(&ck, blob.to_vec(), Some(&coll_for_cache));
496 }
497 }
498
499 result
500 }
501
502 pub async fn delete(&self, collection: &str, key: &Key) -> Result<()> {
517 debug!("Delete: collection={}, key={}", collection, key);
518
519 if let Some(ref cache) = self.cache {
521 if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
522 let cache_key = QueryCache::make_key(collection, key.as_bytes());
523 cache.invalidate(&cache_key);
524 }
525 }
526
527 let collection = collection.to_string();
528 let key = key.clone();
529
530 self.execute_with_retry(move |conn| {
531 let collection = collection.clone();
532 let key = key.clone();
533
534 async move {
535 use amaters_net::convert::{create_version, query_to_proto};
536 use amaters_net::proto::aql::QueryRequest;
537 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
538
539 let mut client = AqlServiceClient::new(conn.channel().clone());
540
541 let query = Query::Delete { collection, key };
542
543 let proto_query = query_to_proto(&query)?;
544
545 let request = tonic::Request::new(QueryRequest {
546 query: Some(proto_query),
547 request_id: Some(uuid::Uuid::new_v4().to_string()),
548 timeout_ms: Some(30000),
549 transaction_id: None,
550 version: Some(create_version()),
551 });
552
553 let response = client.execute_query(request).await?;
554
555 match response.into_inner().response {
557 Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
558 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
559 SdkError::OperationFailed(format!("Server error: {}", e.message)),
560 ),
561 None => Err(SdkError::OperationFailed(
562 "Empty response from server".to_string(),
563 )),
564 }
565 }
566 })
567 .await
568 }
569
570 pub async fn contains(&self, collection: &str, key: &Key) -> Result<bool> {
588 debug!("Contains: collection={}, key={}", collection, key);
589
590 let result = self.get(collection, key).await?;
592 Ok(result.is_some())
593 }
594
595 pub async fn execute_query(&self, query: &Query) -> Result<QueryResult> {
616 debug!("Executing query: {:?}", query);
617
618 let query = query.clone();
619
620 self.execute_with_retry(move |conn| {
621 let query = query.clone();
622
623 async move {
624 use amaters_net::convert::{
625 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
626 };
627 use amaters_net::proto::aql::QueryRequest;
628 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
629
630 let mut client = AqlServiceClient::new(conn.channel().clone());
631
632 let proto_query = query_to_proto(&query)?;
633
634 let request = tonic::Request::new(QueryRequest {
635 query: Some(proto_query),
636 request_id: Some(uuid::Uuid::new_v4().to_string()),
637 timeout_ms: Some(30000),
638 transaction_id: None,
639 version: Some(create_version()),
640 });
641
642 let response = client.execute_query(request).await?;
643
644 match response.into_inner().response {
646 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
647 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
648 match result.result {
649 Some(QueryResultEnum::Single(single)) => {
650 let value = if let Some(v) = single.value {
651 Some(cipher_blob_from_proto(v)?)
652 } else {
653 None
654 };
655 Ok(QueryResult::Single(value))
656 }
657 Some(QueryResultEnum::Multi(multi)) => {
658 let mut values = Vec::new();
659 for kv in multi.values {
660 let key = kv.key.ok_or_else(|| {
661 SdkError::OperationFailed(
662 "Missing key in result".to_string(),
663 )
664 })?;
665 let value = kv.value.ok_or_else(|| {
666 SdkError::OperationFailed(
667 "Missing value in result".to_string(),
668 )
669 })?;
670 values.push((
671 key_from_proto(key),
672 cipher_blob_from_proto(value)?,
673 ));
674 }
675 Ok(QueryResult::Multi(values))
676 }
677 Some(QueryResultEnum::Success(success)) => Ok(QueryResult::Success {
678 affected_rows: success.affected_rows,
679 }),
680 None => Err(SdkError::OperationFailed(
681 "Empty result from server".to_string(),
682 )),
683 }
684 }
685 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
686 SdkError::OperationFailed(format!("Server error: {}", e.message)),
687 ),
688 None => Err(SdkError::OperationFailed(
689 "Empty response from server".to_string(),
690 )),
691 }
692 }
693 })
694 .await
695 }
696
697 pub async fn execute_batch(&self, queries: Vec<Query>) -> Result<Vec<QueryResult>> {
701 debug!("Executing batch of {} queries", queries.len());
702
703 self.execute_with_retry(move |conn| {
704 let queries = queries.clone();
705
706 async move {
707 use amaters_net::convert::{
708 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
709 };
710 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
711 use amaters_net::proto::aql::{BatchRequest, IsolationLevel};
712
713 let mut client = AqlServiceClient::new(conn.channel().clone());
714
715 let mut proto_queries = Vec::new();
717 for query in &queries {
718 proto_queries.push(query_to_proto(query)?);
719 }
720
721 let request = tonic::Request::new(BatchRequest {
722 queries: proto_queries,
723 request_id: Some(uuid::Uuid::new_v4().to_string()),
724 timeout_ms: Some(60000), isolation_level: IsolationLevel::IsolationDefault as i32,
726 version: Some(create_version()),
727 });
728
729 let response = client.execute_batch(request).await?;
730
731 match response.into_inner().response {
733 Some(amaters_net::proto::aql::batch_response::Response::Results(
734 batch_result,
735 )) => {
736 let mut results = Vec::new();
737 for result in batch_result.results {
738 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
739 let query_result = match result.result {
740 Some(QueryResultEnum::Single(single)) => {
741 let value = if let Some(v) = single.value {
742 Some(cipher_blob_from_proto(v)?)
743 } else {
744 None
745 };
746 QueryResult::Single(value)
747 }
748 Some(QueryResultEnum::Multi(multi)) => {
749 let mut values = Vec::new();
750 for kv in multi.values {
751 let key = kv.key.ok_or_else(|| {
752 SdkError::OperationFailed(
753 "Missing key in result".to_string(),
754 )
755 })?;
756 let value = kv.value.ok_or_else(|| {
757 SdkError::OperationFailed(
758 "Missing value in result".to_string(),
759 )
760 })?;
761 values.push((
762 key_from_proto(key),
763 cipher_blob_from_proto(value)?,
764 ));
765 }
766 QueryResult::Multi(values)
767 }
768 Some(QueryResultEnum::Success(success)) => QueryResult::Success {
769 affected_rows: success.affected_rows,
770 },
771 None => {
772 return Err(SdkError::OperationFailed(
773 "Empty result from server".to_string(),
774 ));
775 }
776 };
777 results.push(query_result);
778 }
779 Ok(results)
780 }
781 Some(amaters_net::proto::aql::batch_response::Response::Error(e)) => Err(
782 SdkError::OperationFailed(format!("Batch error: {}", e.message)),
783 ),
784 None => Err(SdkError::OperationFailed(
785 "Empty response from server".to_string(),
786 )),
787 }
788 }
789 })
790 .await
791 }
792
793 pub fn pool_stats(&self) -> crate::connection::PoolStats {
795 self.pool.stats()
796 }
797
798 pub fn close(&self) {
800 info!("Closing client");
801 self.pool.close_all();
802 }
803
804 pub async fn health_check(&self) -> Result<()> {
808 debug!("Performing health check");
809
810 let result = timeout(
811 self.config.request_timeout,
812 self.execute_with_retry(|conn| async move {
813 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
814 use amaters_net::proto::aql::{HealthCheckRequest, HealthStatus};
815
816 let mut client = AqlServiceClient::new(conn.channel().clone());
817
818 let request = tonic::Request::new(HealthCheckRequest { service: None });
819
820 let response = client.health_check(request).await?;
821 let health_response = response.into_inner();
822
823 if health_response.status == HealthStatus::HealthServing as i32 {
824 Ok(())
825 } else {
826 Err(SdkError::OperationFailed(format!(
827 "Server unhealthy: {:?}",
828 health_response.message
829 )))
830 }
831 }),
832 )
833 .await;
834
835 match result {
836 Ok(Ok(())) => {
837 debug!("Health check passed");
838 Ok(())
839 }
840 Ok(Err(e)) => {
841 warn!("Health check failed: {}", e);
842 Err(e)
843 }
844 Err(_) => {
845 warn!("Health check timeout");
846 Err(SdkError::Timeout("health check timeout".to_string()))
847 }
848 }
849 }
850
851 pub async fn server_info(&self) -> Result<ServerInfo> {
855 debug!("Getting server info");
856
857 self.execute_with_retry(|conn| async move {
858 use amaters_net::proto::aql::ServerInfoRequest;
859 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
860
861 let mut client = AqlServiceClient::new(conn.channel().clone());
862
863 let request = tonic::Request::new(ServerInfoRequest {});
864
865 let response = client.get_server_info(request).await?;
866 let info = response.into_inner();
867
868 Ok(ServerInfo {
869 version: info.version.map(|v| (v.major, v.minor, v.patch)),
870 supported_versions: info
871 .supported_versions
872 .into_iter()
873 .map(|v| (v.major, v.minor, v.patch))
874 .collect(),
875 capabilities: info.capabilities,
876 uptime_seconds: info.uptime_seconds,
877 })
878 })
879 .await
880 }
881
882 pub async fn range(
900 &self,
901 collection: &str,
902 start: &Key,
903 end: &Key,
904 ) -> Result<Vec<(Key, CipherBlob)>> {
905 debug!(
906 "Range: collection={}, start={}, end={}",
907 collection, start, end
908 );
909
910 let collection = collection.to_string();
911 let start = start.clone();
912 let end = end.clone();
913
914 self.execute_with_retry(move |conn| {
915 let collection = collection.clone();
916 let start = start.clone();
917 let end = end.clone();
918
919 async move {
920 use amaters_net::convert::{
921 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
922 };
923 use amaters_net::proto::aql::QueryRequest;
924 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
925
926 let mut client = AqlServiceClient::new(conn.channel().clone());
927
928 let query = Query::Range {
929 collection,
930 start,
931 end,
932 };
933
934 let proto_query = query_to_proto(&query)?;
935
936 let request = tonic::Request::new(QueryRequest {
937 query: Some(proto_query),
938 request_id: Some(uuid::Uuid::new_v4().to_string()),
939 timeout_ms: Some(30000),
940 transaction_id: None,
941 version: Some(create_version()),
942 });
943
944 let response = client.execute_query(request).await?;
945
946 match response.into_inner().response {
948 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
949 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
950 match result.result {
951 Some(QueryResultEnum::Multi(multi)) => {
952 let mut values = Vec::new();
953 for kv in multi.values {
954 let key = kv.key.ok_or_else(|| {
955 SdkError::OperationFailed(
956 "Missing key in result".to_string(),
957 )
958 })?;
959 let value = kv.value.ok_or_else(|| {
960 SdkError::OperationFailed(
961 "Missing value in result".to_string(),
962 )
963 })?;
964 values.push((
965 key_from_proto(key),
966 cipher_blob_from_proto(value)?,
967 ));
968 }
969 Ok(values)
970 }
971 _ => Err(SdkError::OperationFailed(
972 "Expected multi result for range query".to_string(),
973 )),
974 }
975 }
976 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
977 SdkError::OperationFailed(format!("Server error: {}", e.message)),
978 ),
979 None => Err(SdkError::OperationFailed(
980 "Empty response from server".to_string(),
981 )),
982 }
983 }
984 })
985 .await
986 }
987
988 pub async fn range_paginated(
997 &self,
998 collection: &str,
999 start: &Key,
1000 end: &Key,
1001 page_size: usize,
1002 ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1003 let pagination = PaginationConfig::new(page_size);
1004 self.range_with_cursor(collection, start, end, &pagination)
1005 .await
1006 }
1007
1008 pub async fn range_with_cursor(
1013 &self,
1014 collection: &str,
1015 start: &Key,
1016 end: &Key,
1017 pagination: &PaginationConfig,
1018 ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1019 let effective_start = if let Some(ref cursor_str) = pagination.cursor {
1020 decode_cursor(cursor_str)?
1021 } else {
1022 start.clone()
1023 };
1024
1025 let all = self.range(collection, &effective_start, end).await?;
1027
1028 let base_iter: Box<dyn Iterator<Item = (Key, CipherBlob)>> =
1032 if pagination.cursor.is_some() && !all.is_empty() && all[0].0 == effective_start {
1033 Box::new(all.into_iter().skip(1))
1034 } else {
1035 Box::new(all.into_iter())
1036 };
1037
1038 let after_offset: Vec<(Key, CipherBlob)> = base_iter.skip(pagination.offset).collect();
1040
1041 let has_more = after_offset.len() > pagination.page_size;
1042 let items: Vec<(Key, CipherBlob)> = after_offset
1043 .into_iter()
1044 .take(pagination.page_size)
1045 .collect();
1046
1047 let next_cursor = if has_more {
1048 items.last().map(|(k, _)| encode_cursor(k))
1049 } else {
1050 None
1051 };
1052
1053 Ok(PaginatedResult {
1054 items,
1055 next_cursor,
1056 has_more,
1057 total_hint: None,
1058 })
1059 }
1060
1061 pub async fn range_sorted(
1069 &self,
1070 collection: &str,
1071 start: &Key,
1072 end: &Key,
1073 sort: &SortConfig,
1074 ) -> Result<Vec<(Key, CipherBlob)>> {
1075 let mut results = self.range(collection, start, end).await?;
1076 sort_results(&mut results, sort);
1077 Ok(results)
1078 }
1079
1080 pub async fn scan(
1090 &self,
1091 collection: &str,
1092 prefix: &Key,
1093 pagination: &PaginationConfig,
1094 ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1095 let start = prefix.clone();
1096 let end = prefix_end_key(prefix);
1097 self.range_with_cursor(collection, &start, &end, pagination)
1098 .await
1099 }
1100
1101 pub async fn count(&self, collection: &str) -> Result<usize> {
1111 debug!("Count: collection={}", collection);
1112
1113 let start = Key::from_slice(&[0u8]);
1115 let end = Key::from_slice(&[0xFF; 32]);
1116 let results = self.range(collection, &start, &end).await?;
1117 Ok(results.len())
1118 }
1119
1120 pub fn transaction(&self, collection: impl Into<String>) -> crate::transaction::Transaction {
1145 crate::transaction::Transaction::new(Arc::new(self.clone()), collection)
1146 }
1147
1148 #[doc(hidden)]
1163 pub fn new_offline(config: ClientConfig) -> Self {
1164 Self {
1165 pool: Arc::new(ConnectionPool::new(config.clone())),
1166 config: Arc::new(config),
1167 encryptor: None,
1168 cache: None,
1169 }
1170 }
1171
1172 pub async fn stream_query(&self, query: Query, config: StreamConfig) -> Result<QueryStream> {
1191 debug!("stream_query: query={:?}", query);
1192
1193 let conn = self.pool.get().await?;
1196
1197 let timeout_secs = config.timeout_secs;
1198 let (query_stream, sender) = QueryStream::new(&config);
1199 let cancel_token = sender.cancel_token();
1200
1201 let proto_query = {
1203 use amaters_net::convert::query_to_proto;
1204 query_to_proto(&query)?
1205 };
1206
1207 let request = tonic::Request::new(amaters_net::proto::aql::QueryRequest {
1208 query: Some(proto_query),
1209 request_id: Some(uuid::Uuid::new_v4().to_string()),
1210 timeout_ms: timeout_secs.map(|s| (s * 1000) as u32),
1211 transaction_id: None,
1212 version: Some(amaters_net::convert::create_version()),
1213 });
1214
1215 let mut grpc_client = {
1217 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
1218 AqlServiceClient::new(conn.channel().clone())
1219 };
1220
1221 let response_stream = grpc_client
1222 .execute_stream(request)
1223 .await
1224 .map_err(SdkError::Grpc)?
1225 .into_inner();
1226
1227 tokio::spawn(async move {
1229 let _conn = conn;
1231
1232 let mut pinned = std::pin::pin!(response_stream);
1233
1234 loop {
1235 let item = tokio::select! {
1237 biased;
1238 _ = cancel_token.cancelled() => {
1239 debug!("stream_query: cancelled by consumer");
1240 break;
1241 }
1242 item = pinned.next() => item,
1243 };
1244
1245 match item {
1246 None => break,
1248
1249 Some(Err(status)) => {
1251 let _ = sender.send_error(SdkError::Grpc(status)).await;
1252 break;
1253 }
1254
1255 Some(Ok(response)) => {
1257 use amaters_net::proto::aql::stream_response::Chunk;
1258
1259 match response.chunk {
1260 Some(Chunk::Batch(batch)) => {
1262 for kv in batch.values {
1263 if sender.is_cancelled() {
1264 return;
1265 }
1266 let key = kv.key.map(|k| k.data).unwrap_or_default();
1267 let value = kv.value.map(|v| v.data).unwrap_or_default();
1268 let row = Row::new(key, value);
1269 if !sender.send_row(row).await {
1270 return;
1271 }
1272 }
1273 }
1274
1275 Some(Chunk::Value(kv)) => {
1277 if sender.is_cancelled() {
1278 return;
1279 }
1280 let key = kv.key.map(|k| k.data).unwrap_or_default();
1281 let value = kv.value.map(|v| v.data).unwrap_or_default();
1282 let row = Row::new(key, value);
1283 if !sender.send_row(row).await {
1284 return;
1285 }
1286 }
1287
1288 Some(Chunk::End(_)) => break,
1290
1291 Some(Chunk::Error(e)) => {
1293 let _ = sender
1294 .send_error(SdkError::OperationFailed(e.message))
1295 .await;
1296 break;
1297 }
1298
1299 None => {}
1301 }
1302 }
1303 }
1304 }
1305 });
1307
1308 Ok(query_stream)
1309 }
1310}
1311
1312fn sort_results(results: &mut [(Key, CipherBlob)], sort: &SortConfig) {
1318 match (sort.field, sort.order) {
1319 (SortField::Key, SortOrder::Ascending) => {
1320 results.sort_by(|a, b| a.0.cmp(&b.0));
1321 }
1322 (SortField::Key, SortOrder::Descending) => {
1323 results.sort_by(|a, b| b.0.cmp(&a.0));
1324 }
1325 (SortField::Value, SortOrder::Ascending) => {
1326 results.sort_by(|a, b| a.1.as_bytes().cmp(b.1.as_bytes()));
1327 }
1328 (SortField::Value, SortOrder::Descending) => {
1329 results.sort_by(|a, b| b.1.as_bytes().cmp(a.1.as_bytes()));
1330 }
1331 (SortField::Timestamp, SortOrder::Ascending) => {
1333 results.sort_by(|a, b| a.0.cmp(&b.0));
1334 }
1335 (SortField::Timestamp, SortOrder::Descending) => {
1336 results.sort_by(|a, b| b.0.cmp(&a.0));
1337 }
1338 }
1339}
1340
1341fn prefix_end_key(prefix: &Key) -> Key {
1348 let mut bytes = prefix.to_vec();
1349 while let Some(last) = bytes.last_mut() {
1351 if *last < 0xFF {
1352 *last += 1;
1353 return Key::from_slice(&bytes);
1354 }
1355 bytes.pop();
1356 }
1357 let mut extended = prefix.to_vec();
1359 extended.push(0xFF);
1360 Key::from_slice(&extended)
1361}
1362
1363#[derive(Debug, Clone)]
1365pub struct ServerInfo {
1366 pub version: Option<(u32, u32, u32)>,
1368 pub supported_versions: Vec<(u32, u32, u32)>,
1370 pub capabilities: Vec<String>,
1372 pub uptime_seconds: u64,
1374}
1375
1376#[derive(Debug, Clone)]
1385pub struct PaginatedQueryBuilder {
1386 collection: String,
1387 page_size: Option<usize>,
1388 cursor: Option<String>,
1389 sort: Option<SortConfig>,
1390 limit: Option<usize>,
1392 offset: Option<usize>,
1394}
1395
1396impl PaginatedQueryBuilder {
1397 pub fn new(collection: impl Into<String>) -> Self {
1399 Self {
1400 collection: collection.into(),
1401 page_size: None,
1402 cursor: None,
1403 sort: None,
1404 limit: None,
1405 offset: None,
1406 }
1407 }
1408
1409 #[must_use]
1411 pub fn page_size(mut self, size: usize) -> Self {
1412 self.page_size = Some(size);
1413 self
1414 }
1415
1416 #[must_use]
1418 pub fn limit(mut self, n: usize) -> Self {
1419 self.limit = Some(n);
1420 self
1421 }
1422
1423 #[must_use]
1425 pub fn offset(mut self, n: usize) -> Self {
1426 self.offset = Some(n);
1427 self
1428 }
1429
1430 #[must_use]
1432 pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
1433 self.cursor = Some(cursor.into());
1434 self
1435 }
1436
1437 #[must_use]
1439 pub fn sort_by(mut self, field: SortField, order: SortOrder) -> Self {
1440 self.sort = Some(SortConfig::new(field, order));
1441 self
1442 }
1443
1444 pub fn build_paginated(&self) -> PaginationConfig {
1448 let page_size = self.limit.or(self.page_size).unwrap_or(100);
1449 PaginationConfig {
1450 page_size,
1451 cursor: self.cursor.clone(),
1452 offset: self.offset.unwrap_or(0),
1453 }
1454 }
1455
1456 pub fn collection(&self) -> &str {
1458 &self.collection
1459 }
1460
1461 pub fn sort_config(&self) -> Option<&SortConfig> {
1463 self.sort.as_ref()
1464 }
1465}
1466
1467#[derive(Debug, Clone)]
1469pub enum QueryResult {
1470 Single(Option<CipherBlob>),
1472 Multi(Vec<(Key, CipherBlob)>),
1474 Success { affected_rows: u64 },
1476}
1477
1478#[cfg(test)]
1479mod tests {
1480 use super::*;
1481
1482 #[tokio::test]
1485 async fn test_retry_config() {
1486 let config = RetryConfig::default();
1487 assert_eq!(config.max_retries, 3);
1488
1489 let backoff = config.backoff_duration(1);
1490 assert!(backoff.as_millis() > 0);
1491 }
1492
1493 #[test]
1494 fn test_query_result() {
1495 let result = QueryResult::Success { affected_rows: 5 };
1496 match result {
1497 QueryResult::Success { affected_rows } => {
1498 assert_eq!(affected_rows, 5);
1499 }
1500 _ => panic!("expected Success"),
1501 }
1502 }
1503
1504 #[test]
1507 fn test_pagination_config_default() {
1508 let config = PaginationConfig::default();
1509 assert_eq!(config.page_size, 100);
1510 assert!(config.cursor.is_none());
1511 }
1512
1513 #[test]
1514 fn test_pagination_config_with_cursor() {
1515 let config = PaginationConfig::new(50).with_cursor("abc123");
1516 assert_eq!(config.page_size, 50);
1517 assert_eq!(config.cursor.as_deref(), Some("abc123"));
1518 }
1519
1520 #[test]
1523 fn test_paginated_result_has_more() {
1524 let result: PaginatedResult<u8> = PaginatedResult {
1526 items: vec![1, 2, 3],
1527 next_cursor: None,
1528 has_more: false,
1529 total_hint: None,
1530 };
1531 assert!(!result.has_more);
1532 assert!(result.next_cursor.is_none());
1533 assert_eq!(result.items.len(), 3);
1534 }
1535
1536 #[test]
1537 fn test_paginated_result_with_more() {
1538 let result: PaginatedResult<u8> = PaginatedResult {
1539 items: vec![1, 2, 3],
1540 next_cursor: Some("cursor_xyz".to_string()),
1541 has_more: true,
1542 total_hint: Some(100),
1543 };
1544 assert!(result.has_more);
1545 assert_eq!(result.next_cursor.as_deref(), Some("cursor_xyz"));
1546 assert_eq!(result.total_hint, Some(100));
1547 }
1548
1549 #[test]
1552 fn test_sort_ascending() {
1553 let mut data = vec![
1554 (Key::from_str("c"), CipherBlob::new(vec![3])),
1555 (Key::from_str("a"), CipherBlob::new(vec![1])),
1556 (Key::from_str("b"), CipherBlob::new(vec![2])),
1557 ];
1558 let sort = SortConfig::new(SortField::Key, SortOrder::Ascending);
1559 sort_results(&mut data, &sort);
1560
1561 assert_eq!(data[0].0.to_string_lossy(), "a");
1562 assert_eq!(data[1].0.to_string_lossy(), "b");
1563 assert_eq!(data[2].0.to_string_lossy(), "c");
1564 }
1565
1566 #[test]
1567 fn test_sort_descending() {
1568 let mut data = vec![
1569 (Key::from_str("a"), CipherBlob::new(vec![1])),
1570 (Key::from_str("c"), CipherBlob::new(vec![3])),
1571 (Key::from_str("b"), CipherBlob::new(vec![2])),
1572 ];
1573 let sort = SortConfig::new(SortField::Key, SortOrder::Descending);
1574 sort_results(&mut data, &sort);
1575
1576 assert_eq!(data[0].0.to_string_lossy(), "c");
1577 assert_eq!(data[1].0.to_string_lossy(), "b");
1578 assert_eq!(data[2].0.to_string_lossy(), "a");
1579 }
1580
1581 #[test]
1582 fn test_sort_by_value_ascending() {
1583 let mut data = vec![
1584 (Key::from_str("x"), CipherBlob::new(vec![30])),
1585 (Key::from_str("y"), CipherBlob::new(vec![10])),
1586 (Key::from_str("z"), CipherBlob::new(vec![20])),
1587 ];
1588 let sort = SortConfig::new(SortField::Value, SortOrder::Ascending);
1589 sort_results(&mut data, &sort);
1590
1591 assert_eq!(data[0].1.to_vec(), vec![10]);
1592 assert_eq!(data[1].1.to_vec(), vec![20]);
1593 assert_eq!(data[2].1.to_vec(), vec![30]);
1594 }
1595
1596 #[test]
1599 fn test_cursor_encoding() {
1600 let key = Key::from_str("user:999");
1601 let cursor = encode_cursor(&key);
1602
1603 assert!(!cursor.is_empty());
1605 assert!(cursor.contains('|'));
1606
1607 let decoded = decode_cursor(&cursor).expect("decode should succeed");
1609 assert_eq!(decoded, key);
1610 }
1611
1612 #[test]
1613 fn test_cursor_encoding_binary_key() {
1614 let key = Key::from_slice(&[0x00, 0xFF, 0xAB, 0xCD]);
1615 let cursor = encode_cursor(&key);
1616 let decoded = decode_cursor(&cursor).expect("decode should succeed");
1617 assert_eq!(decoded, key);
1618 }
1619
1620 #[test]
1621 fn test_cursor_integrity() {
1622 let key = Key::from_str("original_key");
1623 let cursor = encode_cursor(&key);
1624
1625 let mut tampered = cursor.clone();
1627 let bytes = unsafe { tampered.as_bytes_mut() };
1628 if !bytes.is_empty() {
1629 bytes[0] ^= 0x01; }
1631
1632 let result = decode_cursor(&tampered);
1633 assert!(result.is_err(), "tampered cursor should be rejected");
1634 }
1635
1636 #[test]
1637 fn test_cursor_malformed_no_separator() {
1638 let result = decode_cursor("noseparatorhere");
1639 assert!(result.is_err());
1640 }
1641
1642 #[test]
1643 fn test_cursor_malformed_empty() {
1644 let result = decode_cursor("|");
1645 assert!(result.is_err());
1647 }
1648
1649 #[test]
1652 fn test_prefix_end_key() {
1653 let prefix = Key::from_str("user:");
1654 let end = prefix_end_key(&prefix);
1655 assert_eq!(end.to_string_lossy(), "user;");
1657 }
1658
1659 #[test]
1660 fn test_prefix_end_key_all_ff() {
1661 let prefix = Key::from_slice(&[0xFF, 0xFF]);
1662 let end = prefix_end_key(&prefix);
1663 assert_eq!(end.to_vec(), vec![0xFF, 0xFF, 0xFF]);
1665 }
1666
1667 #[test]
1670 fn test_scan_with_prefix_key_generation() {
1671 let prefix = Key::from_str("item:");
1673 let end = prefix_end_key(&prefix);
1674
1675 let within = Key::from_str("item:abc");
1677 assert!(within < end);
1678
1679 let outside = Key::from_str("item;abc");
1681 assert!(outside >= end);
1682 }
1683
1684 #[test]
1687 fn test_query_builder_pagination() {
1688 let builder = PaginatedQueryBuilder::new("users")
1689 .page_size(25)
1690 .cursor("some_cursor");
1691
1692 let config = builder.build_paginated();
1693 assert_eq!(config.page_size, 25);
1694 assert_eq!(config.cursor.as_deref(), Some("some_cursor"));
1695 assert_eq!(builder.collection(), "users");
1696 }
1697
1698 #[test]
1699 fn test_query_builder_pagination_defaults() {
1700 let builder = PaginatedQueryBuilder::new("events");
1701 let config = builder.build_paginated();
1702 assert_eq!(config.page_size, 100);
1703 assert!(config.cursor.is_none());
1704 }
1705
1706 #[test]
1707 fn test_query_builder_sorting() {
1708 let builder = PaginatedQueryBuilder::new("logs")
1709 .sort_by(SortField::Timestamp, SortOrder::Descending)
1710 .page_size(50);
1711
1712 let sort = builder.sort_config().expect("sort should be set");
1713 assert_eq!(sort.field, SortField::Timestamp);
1714 assert_eq!(sort.order, SortOrder::Descending);
1715 }
1716
1717 #[test]
1718 fn test_query_builder_no_sorting() {
1719 let builder = PaginatedQueryBuilder::new("data");
1720 assert!(builder.sort_config().is_none());
1721 }
1722
1723 #[test]
1726 fn test_hex_encode_decode() {
1727 let original = vec![0x00, 0x0A, 0xFF, 0x42, 0x99];
1728 let encoded = hex_encode(&original);
1729 assert_eq!(encoded, "000aff4299");
1730 let decoded = hex_decode(&encoded).expect("decode should succeed");
1731 assert_eq!(decoded, original);
1732 }
1733
1734 #[test]
1735 fn test_hex_decode_odd_length() {
1736 let result = hex_decode("abc");
1737 assert!(result.is_err());
1738 }
1739
1740 #[test]
1741 fn test_hex_decode_invalid_chars() {
1742 let result = hex_decode("zzzz");
1743 assert!(result.is_err());
1744 }
1745
1746 #[test]
1749 fn test_sort_by_timestamp_ascending() {
1750 let mut data = vec![
1751 (Key::from_str("ts:003"), CipherBlob::new(vec![3])),
1752 (Key::from_str("ts:001"), CipherBlob::new(vec![1])),
1753 (Key::from_str("ts:002"), CipherBlob::new(vec![2])),
1754 ];
1755 let sort = SortConfig::new(SortField::Timestamp, SortOrder::Ascending);
1756 sort_results(&mut data, &sort);
1757
1758 assert_eq!(data[0].0.to_string_lossy(), "ts:001");
1759 assert_eq!(data[1].0.to_string_lossy(), "ts:002");
1760 assert_eq!(data[2].0.to_string_lossy(), "ts:003");
1761 }
1762}