1use crate::cache::{InvalidationPolicy, QueryCache, QueryCacheConfig};
4use crate::config::{ClientConfig, RetryConfig};
5use crate::connection::{Connection, ConnectionPool};
6use crate::error::{Result, SdkError};
7use crate::fhe::FheEncryptor;
8use crate::streaming::{QueryStream, Row, StreamConfig};
9use amaters_core::{CipherBlob, Key, Query};
10use futures::StreamExt as _;
11use std::sync::Arc;
12use tokio::time::{sleep, timeout};
13use tracing::{debug, info, warn};
14
15#[derive(Debug, Clone)]
21pub struct PaginationConfig {
22 pub page_size: usize,
24 pub cursor: Option<String>,
26 pub offset: usize,
31}
32
33impl Default for PaginationConfig {
34 fn default() -> Self {
35 Self {
36 page_size: 100,
37 cursor: None,
38 offset: 0,
39 }
40 }
41}
42
43impl PaginationConfig {
44 pub fn new(page_size: usize) -> Self {
46 Self {
47 page_size,
48 cursor: None,
49 offset: 0,
50 }
51 }
52
53 #[must_use]
55 pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
56 self.cursor = Some(cursor.into());
57 self
58 }
59
60 #[must_use]
62 pub fn with_offset(mut self, offset: usize) -> Self {
63 self.offset = offset;
64 self
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct PaginatedResult<T> {
71 pub items: Vec<T>,
73 pub next_cursor: Option<String>,
75 pub has_more: bool,
77 pub total_hint: Option<usize>,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum SortOrder {
84 Ascending,
86 Descending,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum SortField {
93 Key,
95 Value,
97 Timestamp,
99}
100
101#[derive(Debug, Clone)]
103pub struct SortConfig {
104 pub field: SortField,
106 pub order: SortOrder,
108}
109
110impl SortConfig {
111 pub fn new(field: SortField, order: SortOrder) -> Self {
113 Self { field, order }
114 }
115}
116
117const CURSOR_SEPARATOR: u8 = b'|';
123
124fn encode_cursor(key: &Key) -> String {
129 let key_bytes = key.as_bytes();
130 let hash = blake3::hash(key_bytes);
131 let key_hex = hex_encode(key_bytes);
132 let hash_hex = hex_encode(hash.as_bytes());
133 format!("{}{}{}", key_hex, CURSOR_SEPARATOR as char, hash_hex)
134}
135
136fn decode_cursor(cursor: &str) -> Result<Key> {
141 let parts: Vec<&str> = cursor.split(CURSOR_SEPARATOR as char).collect();
142 if parts.len() != 2 {
143 return Err(SdkError::InvalidArgument(
144 "malformed cursor: expected two parts separated by '|'".to_string(),
145 ));
146 }
147
148 let key_bytes = hex_decode(parts[0])
149 .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor key: {}", e)))?;
150
151 let hash_bytes = hex_decode(parts[1])
152 .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor hash: {}", e)))?;
153
154 let expected_hash = blake3::hash(&key_bytes);
156 if hash_bytes.len() != 32 || expected_hash.as_bytes() != hash_bytes.as_slice() {
157 return Err(SdkError::InvalidArgument(
158 "cursor integrity check failed: hash mismatch".to_string(),
159 ));
160 }
161
162 Ok(Key::from_slice(&key_bytes))
163}
164
165fn hex_encode(bytes: &[u8]) -> String {
167 use std::fmt::Write;
168 bytes
169 .iter()
170 .fold(String::with_capacity(bytes.len() * 2), |mut s, b| {
171 let _ = write!(&mut s, "{:02x}", b);
172 s
173 })
174}
175
176fn hex_decode(hex: &str) -> std::result::Result<Vec<u8>, String> {
178 if hex.len() % 2 != 0 {
179 return Err("odd-length hex string".to_string());
180 }
181 (0..hex.len())
182 .step_by(2)
183 .map(|i| {
184 u8::from_str_radix(&hex[i..i + 2], 16)
185 .map_err(|e| format!("invalid hex at offset {}: {}", i, e))
186 })
187 .collect()
188}
189
190#[derive(Clone)]
195pub struct AmateRSClient {
196 pool: Arc<ConnectionPool>,
197 config: Arc<ClientConfig>,
198 encryptor: Option<Arc<FheEncryptor>>,
199 cache: Option<Arc<QueryCache>>,
200}
201
202impl AmateRSClient {
203 pub async fn connect(addr: impl Into<String>) -> Result<Self> {
216 let config = ClientConfig::new(addr);
217 Self::connect_with_config(config).await
218 }
219
220 pub async fn connect_with_config(config: ClientConfig) -> Result<Self> {
238 info!("Connecting to AmateRS server at {}", config.server_addr);
239
240 let pool = ConnectionPool::new(config.clone());
241
242 let _conn = pool.get().await?;
244
245 info!("Successfully connected to AmateRS server");
246
247 Ok(Self {
248 pool: Arc::new(pool),
249 config: Arc::new(config),
250 encryptor: None,
251 cache: None,
252 })
253 }
254
255 pub fn with_encryptor(mut self, encryptor: FheEncryptor) -> Self {
257 self.encryptor = Some(Arc::new(encryptor));
258 self
259 }
260
261 pub fn encryptor(&self) -> Option<&Arc<FheEncryptor>> {
263 self.encryptor.as_ref()
264 }
265
266 pub fn with_cache(mut self, config: QueryCacheConfig) -> Self {
282 self.cache = Some(Arc::new(QueryCache::new(config)));
283 self
284 }
285
286 pub fn cache(&self) -> Option<&Arc<QueryCache>> {
288 self.cache.as_ref()
289 }
290
291 async fn execute_with_retry<F, Fut, T>(&self, operation: F) -> Result<T>
293 where
294 F: Fn(Connection) -> Fut,
295 Fut: std::future::Future<Output = Result<T>>,
296 {
297 let retry_config = &self.config.retry_config;
298 let mut attempt = 0;
299
300 loop {
301 attempt += 1;
302
303 let conn = self.pool.get().await?;
305
306 match operation(conn).await {
308 Ok(result) => return Ok(result),
309 Err(e) if e.is_retryable() && attempt <= retry_config.max_retries => {
310 let backoff = retry_config.backoff_duration(attempt);
311 warn!(
312 "Operation failed (attempt {}), retrying after {:?}: {}",
313 attempt, backoff, e
314 );
315 sleep(backoff).await;
316 }
317 Err(e) => return Err(e),
318 }
319 }
320 }
321
322 pub async fn set(&self, collection: &str, key: &Key, value: &CipherBlob) -> Result<()> {
339 debug!("Set: collection={}, key={}", collection, key);
340
341 if let Some(ref cache) = self.cache {
343 if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
344 let cache_key = QueryCache::make_key(collection, key.as_bytes());
345 cache.invalidate(&cache_key);
346 }
347 }
348
349 let collection = collection.to_string();
350 let key = key.clone();
351 let value = value.clone();
352
353 self.execute_with_retry(move |conn| {
354 let collection = collection.clone();
355 let key = key.clone();
356 let value = value.clone();
357
358 async move {
359 use amaters_net::convert::{create_version, query_to_proto};
360 use amaters_net::proto::aql::QueryRequest;
361 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
362
363 let mut client = AqlServiceClient::new(conn.channel().clone());
364
365 let query = Query::Set {
366 collection,
367 key,
368 value,
369 };
370
371 let proto_query = query_to_proto(&query)?;
372
373 let request = tonic::Request::new(QueryRequest {
374 query: Some(proto_query),
375 request_id: Some(uuid::Uuid::new_v4().to_string()),
376 timeout_ms: Some(30000),
377 transaction_id: None,
378 version: Some(create_version()),
379 });
380
381 let response = client.execute_query(request).await?;
382
383 match response.into_inner().response {
385 Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
386 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
387 SdkError::OperationFailed(format!("Server error: {}", e.message)),
388 ),
389 None => Err(SdkError::OperationFailed(
390 "Empty response from server".to_string(),
391 )),
392 }
393 }
394 })
395 .await
396 }
397
398 pub async fn get(&self, collection: &str, key: &Key) -> Result<Option<CipherBlob>> {
418 debug!("Get: collection={}, key={}", collection, key);
419
420 if let Some(ref cache) = self.cache {
422 let cache_key = QueryCache::make_key(collection, key.as_bytes());
423 if let Some(cached_data) = cache.get(&cache_key) {
424 debug!("Cache hit for collection={}, key={}", collection, key);
425 return Ok(Some(CipherBlob::new(cached_data)));
426 }
427 }
428
429 let collection = collection.to_string();
430 let key = key.clone();
431 let cache = self.cache.clone();
432 let coll_for_cache = collection.clone();
433 let key_for_cache = key.clone();
434
435 let result = self
436 .execute_with_retry(move |conn| {
437 let collection = collection.clone();
438 let key = key.clone();
439
440 async move {
441 use amaters_net::convert::{
442 cipher_blob_from_proto, create_version, query_to_proto,
443 };
444 use amaters_net::proto::aql::QueryRequest;
445 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
446
447 let mut client = AqlServiceClient::new(conn.channel().clone());
448
449 let query = Query::Get { collection, key };
450
451 let proto_query = query_to_proto(&query)?;
452
453 let request = tonic::Request::new(QueryRequest {
454 query: Some(proto_query),
455 request_id: Some(uuid::Uuid::new_v4().to_string()),
456 timeout_ms: Some(30000),
457 transaction_id: None,
458 version: Some(create_version()),
459 });
460
461 let response = client.execute_query(request).await?;
462
463 match response.into_inner().response {
465 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
466 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
467 match result.result {
468 Some(QueryResultEnum::Single(single)) => {
469 if let Some(value) = single.value {
470 Ok(Some(cipher_blob_from_proto(value)?))
471 } else {
472 Ok(None)
473 }
474 }
475 _ => Err(SdkError::OperationFailed(
476 "Expected single result".to_string(),
477 )),
478 }
479 }
480 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
481 SdkError::OperationFailed(format!("Server error: {}", e.message)),
482 ),
483 None => Err(SdkError::OperationFailed(
484 "Empty response from server".to_string(),
485 )),
486 }
487 }
488 })
489 .await;
490
491 if let Ok(Some(ref blob)) = result {
493 if let Some(ref c) = cache {
494 let ck = QueryCache::make_key(&coll_for_cache, key_for_cache.as_bytes());
495 c.put_with_collection(&ck, blob.to_vec(), Some(&coll_for_cache));
496 }
497 }
498
499 result
500 }
501
502 pub async fn delete(&self, collection: &str, key: &Key) -> Result<()> {
517 debug!("Delete: collection={}, key={}", collection, key);
518
519 if let Some(ref cache) = self.cache {
521 if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
522 let cache_key = QueryCache::make_key(collection, key.as_bytes());
523 cache.invalidate(&cache_key);
524 }
525 }
526
527 let collection = collection.to_string();
528 let key = key.clone();
529
530 self.execute_with_retry(move |conn| {
531 let collection = collection.clone();
532 let key = key.clone();
533
534 async move {
535 use amaters_net::convert::{create_version, query_to_proto};
536 use amaters_net::proto::aql::QueryRequest;
537 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
538
539 let mut client = AqlServiceClient::new(conn.channel().clone());
540
541 let query = Query::Delete { collection, key };
542
543 let proto_query = query_to_proto(&query)?;
544
545 let request = tonic::Request::new(QueryRequest {
546 query: Some(proto_query),
547 request_id: Some(uuid::Uuid::new_v4().to_string()),
548 timeout_ms: Some(30000),
549 transaction_id: None,
550 version: Some(create_version()),
551 });
552
553 let response = client.execute_query(request).await?;
554
555 match response.into_inner().response {
557 Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
558 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
559 SdkError::OperationFailed(format!("Server error: {}", e.message)),
560 ),
561 None => Err(SdkError::OperationFailed(
562 "Empty response from server".to_string(),
563 )),
564 }
565 }
566 })
567 .await
568 }
569
570 pub async fn contains(&self, collection: &str, key: &Key) -> Result<bool> {
588 debug!("Contains: collection={}, key={}", collection, key);
589
590 let result = self.get(collection, key).await?;
592 Ok(result.is_some())
593 }
594
595 pub async fn execute_query(&self, query: &Query) -> Result<QueryResult> {
616 debug!("Executing query: {:?}", query);
617
618 let query = query.clone();
619
620 self.execute_with_retry(move |conn| {
621 let query = query.clone();
622
623 async move {
624 use amaters_net::convert::{
625 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
626 };
627 use amaters_net::proto::aql::QueryRequest;
628 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
629
630 let mut client = AqlServiceClient::new(conn.channel().clone());
631
632 let proto_query = query_to_proto(&query)?;
633
634 let request = tonic::Request::new(QueryRequest {
635 query: Some(proto_query),
636 request_id: Some(uuid::Uuid::new_v4().to_string()),
637 timeout_ms: Some(30000),
638 transaction_id: None,
639 version: Some(create_version()),
640 });
641
642 let response = client.execute_query(request).await?;
643
644 match response.into_inner().response {
646 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
647 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
648 match result.result {
649 Some(QueryResultEnum::Single(single)) => {
650 let value = if let Some(v) = single.value {
651 Some(cipher_blob_from_proto(v)?)
652 } else {
653 None
654 };
655 Ok(QueryResult::Single(value))
656 }
657 Some(QueryResultEnum::Multi(multi)) => {
658 let mut values = Vec::new();
659 for kv in multi.values {
660 let key = kv.key.ok_or_else(|| {
661 SdkError::OperationFailed(
662 "Missing key in result".to_string(),
663 )
664 })?;
665 let value = kv.value.ok_or_else(|| {
666 SdkError::OperationFailed(
667 "Missing value in result".to_string(),
668 )
669 })?;
670 values.push((
671 key_from_proto(key),
672 cipher_blob_from_proto(value)?,
673 ));
674 }
675 Ok(QueryResult::Multi(values))
676 }
677 Some(QueryResultEnum::Success(success)) => Ok(QueryResult::Success {
678 affected_rows: success.affected_rows,
679 }),
680 None => Err(SdkError::OperationFailed(
681 "Empty result from server".to_string(),
682 )),
683 }
684 }
685 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
686 SdkError::OperationFailed(format!("Server error: {}", e.message)),
687 ),
688 None => Err(SdkError::OperationFailed(
689 "Empty response from server".to_string(),
690 )),
691 }
692 }
693 })
694 .await
695 }
696
697 pub async fn execute_batch(&self, queries: Vec<Query>) -> Result<Vec<QueryResult>> {
701 debug!("Executing batch of {} queries", queries.len());
702
703 self.execute_with_retry(move |conn| {
704 let queries = queries.clone();
705
706 async move {
707 use amaters_net::convert::{
708 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
709 };
710 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
711 use amaters_net::proto::aql::{BatchRequest, IsolationLevel};
712
713 let mut client = AqlServiceClient::new(conn.channel().clone());
714
715 let mut proto_queries = Vec::new();
717 for query in &queries {
718 proto_queries.push(query_to_proto(query)?);
719 }
720
721 let request = tonic::Request::new(BatchRequest {
722 queries: proto_queries,
723 request_id: Some(uuid::Uuid::new_v4().to_string()),
724 timeout_ms: Some(60000), isolation_level: IsolationLevel::IsolationDefault as i32,
726 version: Some(create_version()),
727 });
728
729 let response = client.execute_batch(request).await?;
730
731 match response.into_inner().response {
733 Some(amaters_net::proto::aql::batch_response::Response::Results(
734 batch_result,
735 )) => {
736 let mut results = Vec::new();
737 for result in batch_result.results {
738 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
739 let query_result = match result.result {
740 Some(QueryResultEnum::Single(single)) => {
741 let value = if let Some(v) = single.value {
742 Some(cipher_blob_from_proto(v)?)
743 } else {
744 None
745 };
746 QueryResult::Single(value)
747 }
748 Some(QueryResultEnum::Multi(multi)) => {
749 let mut values = Vec::new();
750 for kv in multi.values {
751 let key = kv.key.ok_or_else(|| {
752 SdkError::OperationFailed(
753 "Missing key in result".to_string(),
754 )
755 })?;
756 let value = kv.value.ok_or_else(|| {
757 SdkError::OperationFailed(
758 "Missing value in result".to_string(),
759 )
760 })?;
761 values.push((
762 key_from_proto(key),
763 cipher_blob_from_proto(value)?,
764 ));
765 }
766 QueryResult::Multi(values)
767 }
768 Some(QueryResultEnum::Success(success)) => QueryResult::Success {
769 affected_rows: success.affected_rows,
770 },
771 None => {
772 return Err(SdkError::OperationFailed(
773 "Empty result from server".to_string(),
774 ));
775 }
776 };
777 results.push(query_result);
778 }
779 Ok(results)
780 }
781 Some(amaters_net::proto::aql::batch_response::Response::Error(e)) => Err(
782 SdkError::OperationFailed(format!("Batch error: {}", e.message)),
783 ),
784 None => Err(SdkError::OperationFailed(
785 "Empty response from server".to_string(),
786 )),
787 }
788 }
789 })
790 .await
791 }
792
793 pub fn pool_stats(&self) -> crate::connection::PoolStats {
795 self.pool.stats()
796 }
797
798 pub fn close(&self) {
800 info!("Closing client");
801 self.pool.close_all();
802 }
803
804 pub async fn health_check(&self) -> Result<()> {
808 debug!("Performing health check");
809
810 let result = timeout(
811 self.config.request_timeout,
812 self.execute_with_retry(|conn| async move {
813 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
814 use amaters_net::proto::aql::{HealthCheckRequest, HealthStatus};
815
816 let mut client = AqlServiceClient::new(conn.channel().clone());
817
818 let request = tonic::Request::new(HealthCheckRequest { service: None });
819
820 let response = client.health_check(request).await?;
821 let health_response = response.into_inner();
822
823 if health_response.status == HealthStatus::HealthServing as i32 {
824 Ok(())
825 } else {
826 Err(SdkError::OperationFailed(format!(
827 "Server unhealthy: {:?}",
828 health_response.message
829 )))
830 }
831 }),
832 )
833 .await;
834
835 match result {
836 Ok(Ok(())) => {
837 debug!("Health check passed");
838 Ok(())
839 }
840 Ok(Err(e)) => {
841 warn!("Health check failed: {}", e);
842 Err(e)
843 }
844 Err(_) => {
845 warn!("Health check timeout");
846 Err(SdkError::Timeout("health check timeout".to_string()))
847 }
848 }
849 }
850
851 pub async fn server_info(&self) -> Result<ServerInfo> {
855 debug!("Getting server info");
856
857 self.execute_with_retry(|conn| async move {
858 use amaters_net::proto::aql::ServerInfoRequest;
859 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
860
861 let mut client = AqlServiceClient::new(conn.channel().clone());
862
863 let request = tonic::Request::new(ServerInfoRequest {});
864
865 let response = client.get_server_info(request).await?;
866 let info = response.into_inner();
867
868 Ok(ServerInfo {
869 version: info.version.map(|v| (v.major, v.minor, v.patch)),
870 supported_versions: info
871 .supported_versions
872 .into_iter()
873 .map(|v| (v.major, v.minor, v.patch))
874 .collect(),
875 capabilities: info.capabilities,
876 uptime_seconds: info.uptime_seconds,
877 })
878 })
879 .await
880 }
881
882 pub async fn range(
900 &self,
901 collection: &str,
902 start: &Key,
903 end: &Key,
904 ) -> Result<Vec<(Key, CipherBlob)>> {
905 debug!(
906 "Range: collection={}, start={}, end={}",
907 collection, start, end
908 );
909
910 let collection = collection.to_string();
911 let start = start.clone();
912 let end = end.clone();
913
914 self.execute_with_retry(move |conn| {
915 let collection = collection.clone();
916 let start = start.clone();
917 let end = end.clone();
918
919 async move {
920 use amaters_net::convert::{
921 cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
922 };
923 use amaters_net::proto::aql::QueryRequest;
924 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
925
926 let mut client = AqlServiceClient::new(conn.channel().clone());
927
928 let query = Query::Range {
929 collection,
930 start,
931 end,
932 };
933
934 let proto_query = query_to_proto(&query)?;
935
936 let request = tonic::Request::new(QueryRequest {
937 query: Some(proto_query),
938 request_id: Some(uuid::Uuid::new_v4().to_string()),
939 timeout_ms: Some(30000),
940 transaction_id: None,
941 version: Some(create_version()),
942 });
943
944 let response = client.execute_query(request).await?;
945
946 match response.into_inner().response {
948 Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
949 use amaters_net::proto::query::query_result::Result as QueryResultEnum;
950 match result.result {
951 Some(QueryResultEnum::Multi(multi)) => {
952 let mut values = Vec::new();
953 for kv in multi.values {
954 let key = kv.key.ok_or_else(|| {
955 SdkError::OperationFailed(
956 "Missing key in result".to_string(),
957 )
958 })?;
959 let value = kv.value.ok_or_else(|| {
960 SdkError::OperationFailed(
961 "Missing value in result".to_string(),
962 )
963 })?;
964 values.push((
965 key_from_proto(key),
966 cipher_blob_from_proto(value)?,
967 ));
968 }
969 Ok(values)
970 }
971 _ => Err(SdkError::OperationFailed(
972 "Expected multi result for range query".to_string(),
973 )),
974 }
975 }
976 Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
977 SdkError::OperationFailed(format!("Server error: {}", e.message)),
978 ),
979 None => Err(SdkError::OperationFailed(
980 "Empty response from server".to_string(),
981 )),
982 }
983 }
984 })
985 .await
986 }
987
988 pub async fn range_paginated(
997 &self,
998 collection: &str,
999 start: &Key,
1000 end: &Key,
1001 page_size: usize,
1002 ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1003 let pagination = PaginationConfig::new(page_size);
1004 self.range_with_cursor(collection, start, end, &pagination)
1005 .await
1006 }
1007
1008 pub async fn range_with_cursor(
1013 &self,
1014 collection: &str,
1015 start: &Key,
1016 end: &Key,
1017 pagination: &PaginationConfig,
1018 ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1019 let effective_start = if let Some(ref cursor_str) = pagination.cursor {
1020 decode_cursor(cursor_str)?
1021 } else {
1022 start.clone()
1023 };
1024
1025 let all = self.range(collection, &effective_start, end).await?;
1027
1028 let base_iter: Box<dyn Iterator<Item = (Key, CipherBlob)>> =
1032 if pagination.cursor.is_some() && !all.is_empty() && all[0].0 == effective_start {
1033 Box::new(all.into_iter().skip(1))
1034 } else {
1035 Box::new(all.into_iter())
1036 };
1037
1038 let after_offset: Vec<(Key, CipherBlob)> = base_iter.skip(pagination.offset).collect();
1040
1041 let has_more = after_offset.len() > pagination.page_size;
1042 let items: Vec<(Key, CipherBlob)> = after_offset
1043 .into_iter()
1044 .take(pagination.page_size)
1045 .collect();
1046
1047 let next_cursor = if has_more {
1048 items.last().map(|(k, _)| encode_cursor(k))
1049 } else {
1050 None
1051 };
1052
1053 Ok(PaginatedResult {
1054 items,
1055 next_cursor,
1056 has_more,
1057 total_hint: None,
1058 })
1059 }
1060
1061 pub async fn range_sorted(
1069 &self,
1070 collection: &str,
1071 start: &Key,
1072 end: &Key,
1073 sort: &SortConfig,
1074 ) -> Result<Vec<(Key, CipherBlob)>> {
1075 let mut results = self.range(collection, start, end).await?;
1076 sort_results(&mut results, sort);
1077 Ok(results)
1078 }
1079
1080 pub async fn scan(
1090 &self,
1091 collection: &str,
1092 prefix: &Key,
1093 pagination: &PaginationConfig,
1094 ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1095 let start = prefix.clone();
1096 let end = prefix_end_key(prefix);
1097 self.range_with_cursor(collection, &start, &end, pagination)
1098 .await
1099 }
1100
1101 pub async fn count(&self, collection: &str) -> Result<usize> {
1111 debug!("Count: collection={}", collection);
1112
1113 let start = Key::from_slice(&[0u8]);
1115 let end = Key::from_slice(&[0xFF; 32]);
1116 let results = self.range(collection, &start, &end).await?;
1117 Ok(results.len())
1118 }
1119
1120 #[doc(hidden)]
1135 pub fn new_offline(config: ClientConfig) -> Self {
1136 Self {
1137 pool: Arc::new(ConnectionPool::new(config.clone())),
1138 config: Arc::new(config),
1139 encryptor: None,
1140 cache: None,
1141 }
1142 }
1143
1144 pub async fn stream_query(&self, query: Query, config: StreamConfig) -> Result<QueryStream> {
1163 debug!("stream_query: query={:?}", query);
1164
1165 let conn = self.pool.get().await?;
1168
1169 let timeout_secs = config.timeout_secs;
1170 let (query_stream, sender) = QueryStream::new(&config);
1171 let cancel_token = sender.cancel_token();
1172
1173 let proto_query = {
1175 use amaters_net::convert::query_to_proto;
1176 query_to_proto(&query)?
1177 };
1178
1179 let request = tonic::Request::new(amaters_net::proto::aql::QueryRequest {
1180 query: Some(proto_query),
1181 request_id: Some(uuid::Uuid::new_v4().to_string()),
1182 timeout_ms: timeout_secs.map(|s| (s * 1000) as u32),
1183 transaction_id: None,
1184 version: Some(amaters_net::convert::create_version()),
1185 });
1186
1187 let mut grpc_client = {
1189 use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
1190 AqlServiceClient::new(conn.channel().clone())
1191 };
1192
1193 let response_stream = grpc_client
1194 .execute_stream(request)
1195 .await
1196 .map_err(SdkError::Grpc)?
1197 .into_inner();
1198
1199 tokio::spawn(async move {
1201 let _conn = conn;
1203
1204 let mut pinned = std::pin::pin!(response_stream);
1205
1206 loop {
1207 let item = tokio::select! {
1209 biased;
1210 _ = cancel_token.cancelled() => {
1211 debug!("stream_query: cancelled by consumer");
1212 break;
1213 }
1214 item = pinned.next() => item,
1215 };
1216
1217 match item {
1218 None => break,
1220
1221 Some(Err(status)) => {
1223 let _ = sender.send_error(SdkError::Grpc(status)).await;
1224 break;
1225 }
1226
1227 Some(Ok(response)) => {
1229 use amaters_net::proto::aql::stream_response::Chunk;
1230
1231 match response.chunk {
1232 Some(Chunk::Batch(batch)) => {
1234 for kv in batch.values {
1235 if sender.is_cancelled() {
1236 return;
1237 }
1238 let key = kv.key.map(|k| k.data).unwrap_or_default();
1239 let value = kv.value.map(|v| v.data).unwrap_or_default();
1240 let row = Row::new(key, value);
1241 if !sender.send_row(row).await {
1242 return;
1243 }
1244 }
1245 }
1246
1247 Some(Chunk::Value(kv)) => {
1249 if sender.is_cancelled() {
1250 return;
1251 }
1252 let key = kv.key.map(|k| k.data).unwrap_or_default();
1253 let value = kv.value.map(|v| v.data).unwrap_or_default();
1254 let row = Row::new(key, value);
1255 if !sender.send_row(row).await {
1256 return;
1257 }
1258 }
1259
1260 Some(Chunk::End(_)) => break,
1262
1263 Some(Chunk::Error(e)) => {
1265 let _ = sender
1266 .send_error(SdkError::OperationFailed(e.message))
1267 .await;
1268 break;
1269 }
1270
1271 None => {}
1273 }
1274 }
1275 }
1276 }
1277 });
1279
1280 Ok(query_stream)
1281 }
1282}
1283
1284fn sort_results(results: &mut [(Key, CipherBlob)], sort: &SortConfig) {
1290 match (sort.field, sort.order) {
1291 (SortField::Key, SortOrder::Ascending) => {
1292 results.sort_by(|a, b| a.0.cmp(&b.0));
1293 }
1294 (SortField::Key, SortOrder::Descending) => {
1295 results.sort_by(|a, b| b.0.cmp(&a.0));
1296 }
1297 (SortField::Value, SortOrder::Ascending) => {
1298 results.sort_by(|a, b| a.1.as_bytes().cmp(b.1.as_bytes()));
1299 }
1300 (SortField::Value, SortOrder::Descending) => {
1301 results.sort_by(|a, b| b.1.as_bytes().cmp(a.1.as_bytes()));
1302 }
1303 (SortField::Timestamp, SortOrder::Ascending) => {
1305 results.sort_by(|a, b| a.0.cmp(&b.0));
1306 }
1307 (SortField::Timestamp, SortOrder::Descending) => {
1308 results.sort_by(|a, b| b.0.cmp(&a.0));
1309 }
1310 }
1311}
1312
1313fn prefix_end_key(prefix: &Key) -> Key {
1320 let mut bytes = prefix.to_vec();
1321 while let Some(last) = bytes.last_mut() {
1323 if *last < 0xFF {
1324 *last += 1;
1325 return Key::from_slice(&bytes);
1326 }
1327 bytes.pop();
1328 }
1329 let mut extended = prefix.to_vec();
1331 extended.push(0xFF);
1332 Key::from_slice(&extended)
1333}
1334
1335#[derive(Debug, Clone)]
1337pub struct ServerInfo {
1338 pub version: Option<(u32, u32, u32)>,
1340 pub supported_versions: Vec<(u32, u32, u32)>,
1342 pub capabilities: Vec<String>,
1344 pub uptime_seconds: u64,
1346}
1347
1348#[derive(Debug, Clone)]
1357pub struct PaginatedQueryBuilder {
1358 collection: String,
1359 page_size: Option<usize>,
1360 cursor: Option<String>,
1361 sort: Option<SortConfig>,
1362 limit: Option<usize>,
1364 offset: Option<usize>,
1366}
1367
1368impl PaginatedQueryBuilder {
1369 pub fn new(collection: impl Into<String>) -> Self {
1371 Self {
1372 collection: collection.into(),
1373 page_size: None,
1374 cursor: None,
1375 sort: None,
1376 limit: None,
1377 offset: None,
1378 }
1379 }
1380
1381 #[must_use]
1383 pub fn page_size(mut self, size: usize) -> Self {
1384 self.page_size = Some(size);
1385 self
1386 }
1387
1388 #[must_use]
1390 pub fn limit(mut self, n: usize) -> Self {
1391 self.limit = Some(n);
1392 self
1393 }
1394
1395 #[must_use]
1397 pub fn offset(mut self, n: usize) -> Self {
1398 self.offset = Some(n);
1399 self
1400 }
1401
1402 #[must_use]
1404 pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
1405 self.cursor = Some(cursor.into());
1406 self
1407 }
1408
1409 #[must_use]
1411 pub fn sort_by(mut self, field: SortField, order: SortOrder) -> Self {
1412 self.sort = Some(SortConfig::new(field, order));
1413 self
1414 }
1415
1416 pub fn build_paginated(&self) -> PaginationConfig {
1420 let page_size = self.limit.or(self.page_size).unwrap_or(100);
1421 PaginationConfig {
1422 page_size,
1423 cursor: self.cursor.clone(),
1424 offset: self.offset.unwrap_or(0),
1425 }
1426 }
1427
1428 pub fn collection(&self) -> &str {
1430 &self.collection
1431 }
1432
1433 pub fn sort_config(&self) -> Option<&SortConfig> {
1435 self.sort.as_ref()
1436 }
1437}
1438
1439#[derive(Debug, Clone)]
1441pub enum QueryResult {
1442 Single(Option<CipherBlob>),
1444 Multi(Vec<(Key, CipherBlob)>),
1446 Success { affected_rows: u64 },
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452 use super::*;
1453
1454 #[tokio::test]
1457 async fn test_retry_config() {
1458 let config = RetryConfig::default();
1459 assert_eq!(config.max_retries, 3);
1460
1461 let backoff = config.backoff_duration(1);
1462 assert!(backoff.as_millis() > 0);
1463 }
1464
1465 #[test]
1466 fn test_query_result() {
1467 let result = QueryResult::Success { affected_rows: 5 };
1468 match result {
1469 QueryResult::Success { affected_rows } => {
1470 assert_eq!(affected_rows, 5);
1471 }
1472 _ => panic!("expected Success"),
1473 }
1474 }
1475
1476 #[test]
1479 fn test_pagination_config_default() {
1480 let config = PaginationConfig::default();
1481 assert_eq!(config.page_size, 100);
1482 assert!(config.cursor.is_none());
1483 }
1484
1485 #[test]
1486 fn test_pagination_config_with_cursor() {
1487 let config = PaginationConfig::new(50).with_cursor("abc123");
1488 assert_eq!(config.page_size, 50);
1489 assert_eq!(config.cursor.as_deref(), Some("abc123"));
1490 }
1491
1492 #[test]
1495 fn test_paginated_result_has_more() {
1496 let result: PaginatedResult<u8> = PaginatedResult {
1498 items: vec![1, 2, 3],
1499 next_cursor: None,
1500 has_more: false,
1501 total_hint: None,
1502 };
1503 assert!(!result.has_more);
1504 assert!(result.next_cursor.is_none());
1505 assert_eq!(result.items.len(), 3);
1506 }
1507
1508 #[test]
1509 fn test_paginated_result_with_more() {
1510 let result: PaginatedResult<u8> = PaginatedResult {
1511 items: vec![1, 2, 3],
1512 next_cursor: Some("cursor_xyz".to_string()),
1513 has_more: true,
1514 total_hint: Some(100),
1515 };
1516 assert!(result.has_more);
1517 assert_eq!(result.next_cursor.as_deref(), Some("cursor_xyz"));
1518 assert_eq!(result.total_hint, Some(100));
1519 }
1520
1521 #[test]
1524 fn test_sort_ascending() {
1525 let mut data = vec![
1526 (Key::from_str("c"), CipherBlob::new(vec![3])),
1527 (Key::from_str("a"), CipherBlob::new(vec![1])),
1528 (Key::from_str("b"), CipherBlob::new(vec![2])),
1529 ];
1530 let sort = SortConfig::new(SortField::Key, SortOrder::Ascending);
1531 sort_results(&mut data, &sort);
1532
1533 assert_eq!(data[0].0.to_string_lossy(), "a");
1534 assert_eq!(data[1].0.to_string_lossy(), "b");
1535 assert_eq!(data[2].0.to_string_lossy(), "c");
1536 }
1537
1538 #[test]
1539 fn test_sort_descending() {
1540 let mut data = vec![
1541 (Key::from_str("a"), CipherBlob::new(vec![1])),
1542 (Key::from_str("c"), CipherBlob::new(vec![3])),
1543 (Key::from_str("b"), CipherBlob::new(vec![2])),
1544 ];
1545 let sort = SortConfig::new(SortField::Key, SortOrder::Descending);
1546 sort_results(&mut data, &sort);
1547
1548 assert_eq!(data[0].0.to_string_lossy(), "c");
1549 assert_eq!(data[1].0.to_string_lossy(), "b");
1550 assert_eq!(data[2].0.to_string_lossy(), "a");
1551 }
1552
1553 #[test]
1554 fn test_sort_by_value_ascending() {
1555 let mut data = vec![
1556 (Key::from_str("x"), CipherBlob::new(vec![30])),
1557 (Key::from_str("y"), CipherBlob::new(vec![10])),
1558 (Key::from_str("z"), CipherBlob::new(vec![20])),
1559 ];
1560 let sort = SortConfig::new(SortField::Value, SortOrder::Ascending);
1561 sort_results(&mut data, &sort);
1562
1563 assert_eq!(data[0].1.to_vec(), vec![10]);
1564 assert_eq!(data[1].1.to_vec(), vec![20]);
1565 assert_eq!(data[2].1.to_vec(), vec![30]);
1566 }
1567
1568 #[test]
1571 fn test_cursor_encoding() {
1572 let key = Key::from_str("user:999");
1573 let cursor = encode_cursor(&key);
1574
1575 assert!(!cursor.is_empty());
1577 assert!(cursor.contains('|'));
1578
1579 let decoded = decode_cursor(&cursor).expect("decode should succeed");
1581 assert_eq!(decoded, key);
1582 }
1583
1584 #[test]
1585 fn test_cursor_encoding_binary_key() {
1586 let key = Key::from_slice(&[0x00, 0xFF, 0xAB, 0xCD]);
1587 let cursor = encode_cursor(&key);
1588 let decoded = decode_cursor(&cursor).expect("decode should succeed");
1589 assert_eq!(decoded, key);
1590 }
1591
1592 #[test]
1593 fn test_cursor_integrity() {
1594 let key = Key::from_str("original_key");
1595 let cursor = encode_cursor(&key);
1596
1597 let mut tampered = cursor.clone();
1599 let bytes = unsafe { tampered.as_bytes_mut() };
1600 if !bytes.is_empty() {
1601 bytes[0] ^= 0x01; }
1603
1604 let result = decode_cursor(&tampered);
1605 assert!(result.is_err(), "tampered cursor should be rejected");
1606 }
1607
1608 #[test]
1609 fn test_cursor_malformed_no_separator() {
1610 let result = decode_cursor("noseparatorhere");
1611 assert!(result.is_err());
1612 }
1613
1614 #[test]
1615 fn test_cursor_malformed_empty() {
1616 let result = decode_cursor("|");
1617 assert!(result.is_err());
1619 }
1620
1621 #[test]
1624 fn test_prefix_end_key() {
1625 let prefix = Key::from_str("user:");
1626 let end = prefix_end_key(&prefix);
1627 assert_eq!(end.to_string_lossy(), "user;");
1629 }
1630
1631 #[test]
1632 fn test_prefix_end_key_all_ff() {
1633 let prefix = Key::from_slice(&[0xFF, 0xFF]);
1634 let end = prefix_end_key(&prefix);
1635 assert_eq!(end.to_vec(), vec![0xFF, 0xFF, 0xFF]);
1637 }
1638
1639 #[test]
1642 fn test_scan_with_prefix_key_generation() {
1643 let prefix = Key::from_str("item:");
1645 let end = prefix_end_key(&prefix);
1646
1647 let within = Key::from_str("item:abc");
1649 assert!(within < end);
1650
1651 let outside = Key::from_str("item;abc");
1653 assert!(outside >= end);
1654 }
1655
1656 #[test]
1659 fn test_query_builder_pagination() {
1660 let builder = PaginatedQueryBuilder::new("users")
1661 .page_size(25)
1662 .cursor("some_cursor");
1663
1664 let config = builder.build_paginated();
1665 assert_eq!(config.page_size, 25);
1666 assert_eq!(config.cursor.as_deref(), Some("some_cursor"));
1667 assert_eq!(builder.collection(), "users");
1668 }
1669
1670 #[test]
1671 fn test_query_builder_pagination_defaults() {
1672 let builder = PaginatedQueryBuilder::new("events");
1673 let config = builder.build_paginated();
1674 assert_eq!(config.page_size, 100);
1675 assert!(config.cursor.is_none());
1676 }
1677
1678 #[test]
1679 fn test_query_builder_sorting() {
1680 let builder = PaginatedQueryBuilder::new("logs")
1681 .sort_by(SortField::Timestamp, SortOrder::Descending)
1682 .page_size(50);
1683
1684 let sort = builder.sort_config().expect("sort should be set");
1685 assert_eq!(sort.field, SortField::Timestamp);
1686 assert_eq!(sort.order, SortOrder::Descending);
1687 }
1688
1689 #[test]
1690 fn test_query_builder_no_sorting() {
1691 let builder = PaginatedQueryBuilder::new("data");
1692 assert!(builder.sort_config().is_none());
1693 }
1694
1695 #[test]
1698 fn test_hex_encode_decode() {
1699 let original = vec![0x00, 0x0A, 0xFF, 0x42, 0x99];
1700 let encoded = hex_encode(&original);
1701 assert_eq!(encoded, "000aff4299");
1702 let decoded = hex_decode(&encoded).expect("decode should succeed");
1703 assert_eq!(decoded, original);
1704 }
1705
1706 #[test]
1707 fn test_hex_decode_odd_length() {
1708 let result = hex_decode("abc");
1709 assert!(result.is_err());
1710 }
1711
1712 #[test]
1713 fn test_hex_decode_invalid_chars() {
1714 let result = hex_decode("zzzz");
1715 assert!(result.is_err());
1716 }
1717
1718 #[test]
1721 fn test_sort_by_timestamp_ascending() {
1722 let mut data = vec![
1723 (Key::from_str("ts:003"), CipherBlob::new(vec![3])),
1724 (Key::from_str("ts:001"), CipherBlob::new(vec![1])),
1725 (Key::from_str("ts:002"), CipherBlob::new(vec![2])),
1726 ];
1727 let sort = SortConfig::new(SortField::Timestamp, SortOrder::Ascending);
1728 sort_results(&mut data, &sort);
1729
1730 assert_eq!(data[0].0.to_string_lossy(), "ts:001");
1731 assert_eq!(data[1].0.to_string_lossy(), "ts:002");
1732 assert_eq!(data[2].0.to_string_lossy(), "ts:003");
1733 }
1734}