Skip to main content

amaters_sdk_rust/
client.rs

1//! AmateRS client implementation
2
3use crate::cache::{InvalidationPolicy, QueryCache, QueryCacheConfig};
4use crate::config::{ClientConfig, RetryConfig};
5use crate::connection::{Connection, ConnectionPool};
6use crate::error::{Result, SdkError};
7use crate::fhe::FheEncryptor;
8use crate::streaming::{QueryStream, Row, StreamConfig};
9use amaters_core::{CipherBlob, Key, Query};
10use futures::StreamExt as _;
11use std::sync::Arc;
12use tokio::time::{sleep, timeout};
13use tracing::{debug, info, warn};
14
15// ---------------------------------------------------------------------------
16// Pagination & Sorting types
17// ---------------------------------------------------------------------------
18
19/// Configuration for cursor-based pagination.
20#[derive(Debug, Clone)]
21pub struct PaginationConfig {
22    /// Maximum number of items per page.
23    pub page_size: usize,
24    /// Opaque cursor to resume from. `None` starts from the beginning.
25    pub cursor: Option<String>,
26    /// Number of items to skip after cursor-resume (or from the start when no cursor).
27    ///
28    /// When both `cursor` and `offset` are set, the offset is applied after cursor
29    /// resume — i.e. the first `offset` items following the cursor position are skipped.
30    pub offset: usize,
31}
32
33impl Default for PaginationConfig {
34    fn default() -> Self {
35        Self {
36            page_size: 100,
37            cursor: None,
38            offset: 0,
39        }
40    }
41}
42
43impl PaginationConfig {
44    /// Create a new pagination config with the given page size.
45    pub fn new(page_size: usize) -> Self {
46        Self {
47            page_size,
48            cursor: None,
49            offset: 0,
50        }
51    }
52
53    /// Set the cursor to resume from.
54    #[must_use]
55    pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
56        self.cursor = Some(cursor.into());
57        self
58    }
59
60    /// Set the number of items to skip.
61    #[must_use]
62    pub fn with_offset(mut self, offset: usize) -> Self {
63        self.offset = offset;
64        self
65    }
66}
67
68/// Result of a paginated query.
69#[derive(Debug, Clone)]
70pub struct PaginatedResult<T> {
71    /// Items in the current page.
72    pub items: Vec<T>,
73    /// Opaque cursor to fetch the next page. `None` if this is the last page.
74    pub next_cursor: Option<String>,
75    /// Whether there are more items after this page.
76    pub has_more: bool,
77    /// Optional hint about the total number of items (may not always be available).
78    pub total_hint: Option<usize>,
79}
80
81/// Sort ordering direction.
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum SortOrder {
84    /// Sort in ascending order (A → Z, 0 → 9).
85    Ascending,
86    /// Sort in descending order (Z → A, 9 → 0).
87    Descending,
88}
89
90/// Field to sort results by.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum SortField {
93    /// Sort by key.
94    Key,
95    /// Sort by value bytes (lexicographic).
96    Value,
97    /// Sort by timestamp (insertion order proxy — byte ordering).
98    Timestamp,
99}
100
101/// Sort configuration for query results.
102#[derive(Debug, Clone)]
103pub struct SortConfig {
104    /// The field to sort by.
105    pub field: SortField,
106    /// The sort direction.
107    pub order: SortOrder,
108}
109
110impl SortConfig {
111    /// Create a new sort configuration.
112    pub fn new(field: SortField, order: SortOrder) -> Self {
113        Self { field, order }
114    }
115}
116
117// ---------------------------------------------------------------------------
118// Cursor encoding / decoding helpers
119// ---------------------------------------------------------------------------
120
121/// Separator between the key payload and the integrity hash inside a cursor.
122const CURSOR_SEPARATOR: u8 = b'|';
123
124/// Encode a key into an opaque cursor string.
125///
126/// Format: hex(key_bytes) + "|" + hex(blake3(key_bytes))
127/// The blake3 hash provides integrity so tampered cursors are rejected.
128fn encode_cursor(key: &Key) -> String {
129    let key_bytes = key.as_bytes();
130    let hash = blake3::hash(key_bytes);
131    let key_hex = hex_encode(key_bytes);
132    let hash_hex = hex_encode(hash.as_bytes());
133    format!("{}{}{}", key_hex, CURSOR_SEPARATOR as char, hash_hex)
134}
135
136/// Decode an opaque cursor string back into a `Key`.
137///
138/// Returns an error if the cursor is malformed or if the integrity hash does
139/// not match (i.e. the cursor was tampered with).
140fn decode_cursor(cursor: &str) -> Result<Key> {
141    let parts: Vec<&str> = cursor.split(CURSOR_SEPARATOR as char).collect();
142    if parts.len() != 2 {
143        return Err(SdkError::InvalidArgument(
144            "malformed cursor: expected two parts separated by '|'".to_string(),
145        ));
146    }
147
148    let key_bytes = hex_decode(parts[0])
149        .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor key: {}", e)))?;
150
151    let hash_bytes = hex_decode(parts[1])
152        .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor hash: {}", e)))?;
153
154    // Verify integrity
155    let expected_hash = blake3::hash(&key_bytes);
156    if hash_bytes.len() != 32 || expected_hash.as_bytes() != hash_bytes.as_slice() {
157        return Err(SdkError::InvalidArgument(
158            "cursor integrity check failed: hash mismatch".to_string(),
159        ));
160    }
161
162    Ok(Key::from_slice(&key_bytes))
163}
164
165/// Hex-encode bytes into a lowercase hex string.
166fn hex_encode(bytes: &[u8]) -> String {
167    use std::fmt::Write;
168    bytes
169        .iter()
170        .fold(String::with_capacity(bytes.len() * 2), |mut s, b| {
171            let _ = write!(&mut s, "{:02x}", b);
172            s
173        })
174}
175
176/// Hex-decode a hex string into bytes.
177fn hex_decode(hex: &str) -> std::result::Result<Vec<u8>, String> {
178    if hex.len() % 2 != 0 {
179        return Err("odd-length hex string".to_string());
180    }
181    (0..hex.len())
182        .step_by(2)
183        .map(|i| {
184            u8::from_str_radix(&hex[i..i + 2], 16)
185                .map_err(|e| format!("invalid hex at offset {}: {}", i, e))
186        })
187        .collect()
188}
189
190/// AmateRS client for interacting with the database
191///
192/// The client manages connections, handles retries, and provides
193/// high-level operations for working with encrypted data.
194#[derive(Clone)]
195pub struct AmateRSClient {
196    pool: Arc<ConnectionPool>,
197    config: Arc<ClientConfig>,
198    encryptor: Option<Arc<FheEncryptor>>,
199    cache: Option<Arc<QueryCache>>,
200}
201
202impl AmateRSClient {
203    /// Connect to an AmateRS server
204    ///
205    /// # Example
206    ///
207    /// ```no_run
208    /// use amaters_sdk_rust::AmateRSClient;
209    ///
210    /// # async fn example() -> anyhow::Result<()> {
211    /// let client = AmateRSClient::connect("http://localhost:50051").await?;
212    /// # Ok(())
213    /// # }
214    /// ```
215    pub async fn connect(addr: impl Into<String>) -> Result<Self> {
216        let config = ClientConfig::new(addr);
217        Self::connect_with_config(config).await
218    }
219
220    /// Connect with a custom configuration
221    ///
222    /// # Example
223    ///
224    /// ```no_run
225    /// use amaters_sdk_rust::{AmateRSClient, ClientConfig};
226    /// use std::time::Duration;
227    ///
228    /// # async fn example() -> anyhow::Result<()> {
229    /// let config = ClientConfig::new("http://localhost:50051")
230    ///     .with_connect_timeout(Duration::from_secs(5))
231    ///     .with_max_connections(20);
232    ///
233    /// let client = AmateRSClient::connect_with_config(config).await?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub async fn connect_with_config(config: ClientConfig) -> Result<Self> {
238        info!("Connecting to AmateRS server at {}", config.server_addr);
239
240        let pool = ConnectionPool::new(config.clone());
241
242        // Test connection by getting one
243        let _conn = pool.get().await?;
244
245        info!("Successfully connected to AmateRS server");
246
247        Ok(Self {
248            pool: Arc::new(pool),
249            config: Arc::new(config),
250            encryptor: None,
251            cache: None,
252        })
253    }
254
255    /// Set the FHE encryptor for client-side encryption
256    pub fn with_encryptor(mut self, encryptor: FheEncryptor) -> Self {
257        self.encryptor = Some(Arc::new(encryptor));
258        self
259    }
260
261    /// Get the encryptor (if set)
262    pub fn encryptor(&self) -> Option<&Arc<FheEncryptor>> {
263        self.encryptor.as_ref()
264    }
265
266    /// Enable client-side query result caching with the given configuration.
267    ///
268    /// # Example
269    ///
270    /// ```no_run
271    /// use amaters_sdk_rust::{AmateRSClient, QueryCacheConfig};
272    /// use std::time::Duration;
273    ///
274    /// # async fn example() -> anyhow::Result<()> {
275    /// let client = AmateRSClient::connect("http://localhost:50051")
276    ///     .await?
277    ///     .with_cache(QueryCacheConfig::default().with_ttl(Duration::from_secs(120)));
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub fn with_cache(mut self, config: QueryCacheConfig) -> Self {
282        self.cache = Some(Arc::new(QueryCache::new(config)));
283        self
284    }
285
286    /// Get a reference to the cache (if enabled).
287    pub fn cache(&self) -> Option<&Arc<QueryCache>> {
288        self.cache.as_ref()
289    }
290
291    /// Execute a query with retry logic
292    async fn execute_with_retry<F, Fut, T>(&self, operation: F) -> Result<T>
293    where
294        F: Fn(Connection) -> Fut,
295        Fut: std::future::Future<Output = Result<T>>,
296    {
297        let retry_config = &self.config.retry_config;
298        let mut attempt = 0;
299
300        loop {
301            attempt += 1;
302
303            // Get connection from pool
304            let conn = self.pool.get().await?;
305
306            // Try the operation
307            match operation(conn).await {
308                Ok(result) => return Ok(result),
309                Err(e) if e.is_retryable() && attempt <= retry_config.max_retries => {
310                    let backoff = retry_config.backoff_duration(attempt);
311                    warn!(
312                        "Operation failed (attempt {}), retrying after {:?}: {}",
313                        attempt, backoff, e
314                    );
315                    sleep(backoff).await;
316                }
317                Err(e) => return Err(e),
318            }
319        }
320    }
321
322    /// Set a key-value pair
323    ///
324    /// # Example
325    ///
326    /// ```no_run
327    /// use amaters_sdk_rust::AmateRSClient;
328    /// use amaters_core::{Key, CipherBlob};
329    ///
330    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
331    /// let key = Key::from_str("user:123");
332    /// let value = CipherBlob::new(vec![1, 2, 3, 4]);
333    ///
334    /// client.set("users", &key, &value).await?;
335    /// # Ok(())
336    /// # }
337    /// ```
338    pub async fn set(&self, collection: &str, key: &Key, value: &CipherBlob) -> Result<()> {
339        debug!("Set: collection={}, key={}", collection, key);
340
341        // Invalidate cache entry on write (if policy allows)
342        if let Some(ref cache) = self.cache {
343            if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
344                let cache_key = QueryCache::make_key(collection, key.as_bytes());
345                cache.invalidate(&cache_key);
346            }
347        }
348
349        let collection = collection.to_string();
350        let key = key.clone();
351        let value = value.clone();
352
353        self.execute_with_retry(move |conn| {
354            let collection = collection.clone();
355            let key = key.clone();
356            let value = value.clone();
357
358            async move {
359                use amaters_net::convert::{create_version, query_to_proto};
360                use amaters_net::proto::aql::QueryRequest;
361                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
362
363                let mut client = AqlServiceClient::new(conn.channel().clone());
364
365                let query = Query::Set {
366                    collection,
367                    key,
368                    value,
369                };
370
371                let proto_query = query_to_proto(&query)?;
372
373                let request = tonic::Request::new(QueryRequest {
374                    query: Some(proto_query),
375                    request_id: Some(uuid::Uuid::new_v4().to_string()),
376                    timeout_ms: Some(30000),
377                    transaction_id: None,
378                    version: Some(create_version()),
379                });
380
381                let response = client.execute_query(request).await?;
382
383                // Handle response, check for errors
384                match response.into_inner().response {
385                    Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
386                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
387                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
388                    ),
389                    None => Err(SdkError::OperationFailed(
390                        "Empty response from server".to_string(),
391                    )),
392                }
393            }
394        })
395        .await
396    }
397
398    /// Get a value by key
399    ///
400    /// Returns `None` if the key doesn't exist.
401    ///
402    /// # Example
403    ///
404    /// ```no_run
405    /// use amaters_sdk_rust::AmateRSClient;
406    /// use amaters_core::Key;
407    ///
408    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
409    /// let key = Key::from_str("user:123");
410    ///
411    /// if let Some(value) = client.get("users", &key).await? {
412    ///     println!("Found value: {} bytes", value.len());
413    /// }
414    /// # Ok(())
415    /// # }
416    /// ```
417    pub async fn get(&self, collection: &str, key: &Key) -> Result<Option<CipherBlob>> {
418        debug!("Get: collection={}, key={}", collection, key);
419
420        // Check cache first
421        if let Some(ref cache) = self.cache {
422            let cache_key = QueryCache::make_key(collection, key.as_bytes());
423            if let Some(cached_data) = cache.get(&cache_key) {
424                debug!("Cache hit for collection={}, key={}", collection, key);
425                return Ok(Some(CipherBlob::new(cached_data)));
426            }
427        }
428
429        let collection = collection.to_string();
430        let key = key.clone();
431        let cache = self.cache.clone();
432        let coll_for_cache = collection.clone();
433        let key_for_cache = key.clone();
434
435        let result = self
436            .execute_with_retry(move |conn| {
437                let collection = collection.clone();
438                let key = key.clone();
439
440                async move {
441                    use amaters_net::convert::{
442                        cipher_blob_from_proto, create_version, query_to_proto,
443                    };
444                    use amaters_net::proto::aql::QueryRequest;
445                    use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
446
447                    let mut client = AqlServiceClient::new(conn.channel().clone());
448
449                    let query = Query::Get { collection, key };
450
451                    let proto_query = query_to_proto(&query)?;
452
453                    let request = tonic::Request::new(QueryRequest {
454                        query: Some(proto_query),
455                        request_id: Some(uuid::Uuid::new_v4().to_string()),
456                        timeout_ms: Some(30000),
457                        transaction_id: None,
458                        version: Some(create_version()),
459                    });
460
461                    let response = client.execute_query(request).await?;
462
463                    // Handle response and extract SingleResult
464                    match response.into_inner().response {
465                        Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
466                            use amaters_net::proto::query::query_result::Result as QueryResultEnum;
467                            match result.result {
468                                Some(QueryResultEnum::Single(single)) => {
469                                    if let Some(value) = single.value {
470                                        Ok(Some(cipher_blob_from_proto(value)?))
471                                    } else {
472                                        Ok(None)
473                                    }
474                                }
475                                _ => Err(SdkError::OperationFailed(
476                                    "Expected single result".to_string(),
477                                )),
478                            }
479                        }
480                        Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
481                            SdkError::OperationFailed(format!("Server error: {}", e.message)),
482                        ),
483                        None => Err(SdkError::OperationFailed(
484                            "Empty response from server".to_string(),
485                        )),
486                    }
487                }
488            })
489            .await;
490
491        // Cache the result on success
492        if let Ok(Some(ref blob)) = result {
493            if let Some(ref c) = cache {
494                let ck = QueryCache::make_key(&coll_for_cache, key_for_cache.as_bytes());
495                c.put_with_collection(&ck, blob.to_vec(), Some(&coll_for_cache));
496            }
497        }
498
499        result
500    }
501
502    /// Delete a key
503    ///
504    /// # Example
505    ///
506    /// ```no_run
507    /// use amaters_sdk_rust::AmateRSClient;
508    /// use amaters_core::Key;
509    ///
510    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
511    /// let key = Key::from_str("user:123");
512    /// client.delete("users", &key).await?;
513    /// # Ok(())
514    /// # }
515    /// ```
516    pub async fn delete(&self, collection: &str, key: &Key) -> Result<()> {
517        debug!("Delete: collection={}, key={}", collection, key);
518
519        // Invalidate cache entry on write (if policy allows)
520        if let Some(ref cache) = self.cache {
521            if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
522                let cache_key = QueryCache::make_key(collection, key.as_bytes());
523                cache.invalidate(&cache_key);
524            }
525        }
526
527        let collection = collection.to_string();
528        let key = key.clone();
529
530        self.execute_with_retry(move |conn| {
531            let collection = collection.clone();
532            let key = key.clone();
533
534            async move {
535                use amaters_net::convert::{create_version, query_to_proto};
536                use amaters_net::proto::aql::QueryRequest;
537                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
538
539                let mut client = AqlServiceClient::new(conn.channel().clone());
540
541                let query = Query::Delete { collection, key };
542
543                let proto_query = query_to_proto(&query)?;
544
545                let request = tonic::Request::new(QueryRequest {
546                    query: Some(proto_query),
547                    request_id: Some(uuid::Uuid::new_v4().to_string()),
548                    timeout_ms: Some(30000),
549                    transaction_id: None,
550                    version: Some(create_version()),
551                });
552
553                let response = client.execute_query(request).await?;
554
555                // Handle response, check for success
556                match response.into_inner().response {
557                    Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
558                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
559                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
560                    ),
561                    None => Err(SdkError::OperationFailed(
562                        "Empty response from server".to_string(),
563                    )),
564                }
565            }
566        })
567        .await
568    }
569
570    /// Check if a key exists
571    ///
572    /// # Example
573    ///
574    /// ```no_run
575    /// use amaters_sdk_rust::AmateRSClient;
576    /// use amaters_core::Key;
577    ///
578    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
579    /// let key = Key::from_str("user:123");
580    ///
581    /// if client.contains("users", &key).await? {
582    ///     println!("Key exists");
583    /// }
584    /// # Ok(())
585    /// # }
586    /// ```
587    pub async fn contains(&self, collection: &str, key: &Key) -> Result<bool> {
588        debug!("Contains: collection={}, key={}", collection, key);
589
590        // Use get and check if result is Some
591        let result = self.get(collection, key).await?;
592        Ok(result.is_some())
593    }
594
595    /// Execute a query
596    ///
597    /// This is a lower-level method that executes arbitrary queries.
598    ///
599    /// # Example
600    ///
601    /// ```no_run
602    /// use amaters_sdk_rust::AmateRSClient;
603    /// use amaters_core::{Query, Key};
604    ///
605    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
606    /// let query = Query::Get {
607    ///     collection: "users".to_string(),
608    ///     key: Key::from_str("user:123"),
609    /// };
610    ///
611    /// client.execute_query(&query).await?;
612    /// # Ok(())
613    /// # }
614    /// ```
615    pub async fn execute_query(&self, query: &Query) -> Result<QueryResult> {
616        debug!("Executing query: {:?}", query);
617
618        let query = query.clone();
619
620        self.execute_with_retry(move |conn| {
621            let query = query.clone();
622
623            async move {
624                use amaters_net::convert::{
625                    cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
626                };
627                use amaters_net::proto::aql::QueryRequest;
628                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
629
630                let mut client = AqlServiceClient::new(conn.channel().clone());
631
632                let proto_query = query_to_proto(&query)?;
633
634                let request = tonic::Request::new(QueryRequest {
635                    query: Some(proto_query),
636                    request_id: Some(uuid::Uuid::new_v4().to_string()),
637                    timeout_ms: Some(30000),
638                    transaction_id: None,
639                    version: Some(create_version()),
640                });
641
642                let response = client.execute_query(request).await?;
643
644                // Handle response and convert to QueryResult
645                match response.into_inner().response {
646                    Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
647                        use amaters_net::proto::query::query_result::Result as QueryResultEnum;
648                        match result.result {
649                            Some(QueryResultEnum::Single(single)) => {
650                                let value = if let Some(v) = single.value {
651                                    Some(cipher_blob_from_proto(v)?)
652                                } else {
653                                    None
654                                };
655                                Ok(QueryResult::Single(value))
656                            }
657                            Some(QueryResultEnum::Multi(multi)) => {
658                                let mut values = Vec::new();
659                                for kv in multi.values {
660                                    let key = kv.key.ok_or_else(|| {
661                                        SdkError::OperationFailed(
662                                            "Missing key in result".to_string(),
663                                        )
664                                    })?;
665                                    let value = kv.value.ok_or_else(|| {
666                                        SdkError::OperationFailed(
667                                            "Missing value in result".to_string(),
668                                        )
669                                    })?;
670                                    values.push((
671                                        key_from_proto(key),
672                                        cipher_blob_from_proto(value)?,
673                                    ));
674                                }
675                                Ok(QueryResult::Multi(values))
676                            }
677                            Some(QueryResultEnum::Success(success)) => Ok(QueryResult::Success {
678                                affected_rows: success.affected_rows,
679                            }),
680                            None => Err(SdkError::OperationFailed(
681                                "Empty result from server".to_string(),
682                            )),
683                        }
684                    }
685                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
686                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
687                    ),
688                    None => Err(SdkError::OperationFailed(
689                        "Empty response from server".to_string(),
690                    )),
691                }
692            }
693        })
694        .await
695    }
696
697    /// Execute a batch of queries
698    ///
699    /// All queries are executed atomically (all succeed or all fail).
700    pub async fn execute_batch(&self, queries: Vec<Query>) -> Result<Vec<QueryResult>> {
701        debug!("Executing batch of {} queries", queries.len());
702
703        self.execute_with_retry(move |conn| {
704            let queries = queries.clone();
705
706            async move {
707                use amaters_net::convert::{
708                    cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
709                };
710                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
711                use amaters_net::proto::aql::{BatchRequest, IsolationLevel};
712
713                let mut client = AqlServiceClient::new(conn.channel().clone());
714
715                // Convert queries to proto
716                let mut proto_queries = Vec::new();
717                for query in &queries {
718                    proto_queries.push(query_to_proto(query)?);
719                }
720
721                let request = tonic::Request::new(BatchRequest {
722                    queries: proto_queries,
723                    request_id: Some(uuid::Uuid::new_v4().to_string()),
724                    timeout_ms: Some(60000), // 60 seconds for batch
725                    isolation_level: IsolationLevel::IsolationDefault as i32,
726                    version: Some(create_version()),
727                });
728
729                let response = client.execute_batch(request).await?;
730
731                // Handle response and convert results
732                match response.into_inner().response {
733                    Some(amaters_net::proto::aql::batch_response::Response::Results(
734                        batch_result,
735                    )) => {
736                        let mut results = Vec::new();
737                        for result in batch_result.results {
738                            use amaters_net::proto::query::query_result::Result as QueryResultEnum;
739                            let query_result = match result.result {
740                                Some(QueryResultEnum::Single(single)) => {
741                                    let value = if let Some(v) = single.value {
742                                        Some(cipher_blob_from_proto(v)?)
743                                    } else {
744                                        None
745                                    };
746                                    QueryResult::Single(value)
747                                }
748                                Some(QueryResultEnum::Multi(multi)) => {
749                                    let mut values = Vec::new();
750                                    for kv in multi.values {
751                                        let key = kv.key.ok_or_else(|| {
752                                            SdkError::OperationFailed(
753                                                "Missing key in result".to_string(),
754                                            )
755                                        })?;
756                                        let value = kv.value.ok_or_else(|| {
757                                            SdkError::OperationFailed(
758                                                "Missing value in result".to_string(),
759                                            )
760                                        })?;
761                                        values.push((
762                                            key_from_proto(key),
763                                            cipher_blob_from_proto(value)?,
764                                        ));
765                                    }
766                                    QueryResult::Multi(values)
767                                }
768                                Some(QueryResultEnum::Success(success)) => QueryResult::Success {
769                                    affected_rows: success.affected_rows,
770                                },
771                                None => {
772                                    return Err(SdkError::OperationFailed(
773                                        "Empty result from server".to_string(),
774                                    ));
775                                }
776                            };
777                            results.push(query_result);
778                        }
779                        Ok(results)
780                    }
781                    Some(amaters_net::proto::aql::batch_response::Response::Error(e)) => Err(
782                        SdkError::OperationFailed(format!("Batch error: {}", e.message)),
783                    ),
784                    None => Err(SdkError::OperationFailed(
785                        "Empty response from server".to_string(),
786                    )),
787                }
788            }
789        })
790        .await
791    }
792
793    /// Get connection pool statistics
794    pub fn pool_stats(&self) -> crate::connection::PoolStats {
795        self.pool.stats()
796    }
797
798    /// Close all connections
799    pub fn close(&self) {
800        info!("Closing client");
801        self.pool.close_all();
802    }
803
804    /// Health check
805    ///
806    /// Returns `Ok(())` if the server is healthy.
807    pub async fn health_check(&self) -> Result<()> {
808        debug!("Performing health check");
809
810        let result = timeout(
811            self.config.request_timeout,
812            self.execute_with_retry(|conn| async move {
813                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
814                use amaters_net::proto::aql::{HealthCheckRequest, HealthStatus};
815
816                let mut client = AqlServiceClient::new(conn.channel().clone());
817
818                let request = tonic::Request::new(HealthCheckRequest { service: None });
819
820                let response = client.health_check(request).await?;
821                let health_response = response.into_inner();
822
823                if health_response.status == HealthStatus::HealthServing as i32 {
824                    Ok(())
825                } else {
826                    Err(SdkError::OperationFailed(format!(
827                        "Server unhealthy: {:?}",
828                        health_response.message
829                    )))
830                }
831            }),
832        )
833        .await;
834
835        match result {
836            Ok(Ok(())) => {
837                debug!("Health check passed");
838                Ok(())
839            }
840            Ok(Err(e)) => {
841                warn!("Health check failed: {}", e);
842                Err(e)
843            }
844            Err(_) => {
845                warn!("Health check timeout");
846                Err(SdkError::Timeout("health check timeout".to_string()))
847            }
848        }
849    }
850
851    /// Get server information
852    ///
853    /// Returns information about the server including version, capabilities, and uptime.
854    pub async fn server_info(&self) -> Result<ServerInfo> {
855        debug!("Getting server info");
856
857        self.execute_with_retry(|conn| async move {
858            use amaters_net::proto::aql::ServerInfoRequest;
859            use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
860
861            let mut client = AqlServiceClient::new(conn.channel().clone());
862
863            let request = tonic::Request::new(ServerInfoRequest {});
864
865            let response = client.get_server_info(request).await?;
866            let info = response.into_inner();
867
868            Ok(ServerInfo {
869                version: info.version.map(|v| (v.major, v.minor, v.patch)),
870                supported_versions: info
871                    .supported_versions
872                    .into_iter()
873                    .map(|v| (v.major, v.minor, v.patch))
874                    .collect(),
875                capabilities: info.capabilities,
876                uptime_seconds: info.uptime_seconds,
877            })
878        })
879        .await
880    }
881
882    /// Range query - retrieve keys in a range
883    ///
884    /// # Example
885    ///
886    /// ```no_run
887    /// use amaters_sdk_rust::AmateRSClient;
888    /// use amaters_core::Key;
889    ///
890    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
891    /// let start = Key::from_str("user:000");
892    /// let end = Key::from_str("user:999");
893    ///
894    /// let results = client.range("users", &start, &end).await?;
895    /// println!("Found {} keys in range", results.len());
896    /// # Ok(())
897    /// # }
898    /// ```
899    pub async fn range(
900        &self,
901        collection: &str,
902        start: &Key,
903        end: &Key,
904    ) -> Result<Vec<(Key, CipherBlob)>> {
905        debug!(
906            "Range: collection={}, start={}, end={}",
907            collection, start, end
908        );
909
910        let collection = collection.to_string();
911        let start = start.clone();
912        let end = end.clone();
913
914        self.execute_with_retry(move |conn| {
915            let collection = collection.clone();
916            let start = start.clone();
917            let end = end.clone();
918
919            async move {
920                use amaters_net::convert::{
921                    cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
922                };
923                use amaters_net::proto::aql::QueryRequest;
924                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
925
926                let mut client = AqlServiceClient::new(conn.channel().clone());
927
928                let query = Query::Range {
929                    collection,
930                    start,
931                    end,
932                };
933
934                let proto_query = query_to_proto(&query)?;
935
936                let request = tonic::Request::new(QueryRequest {
937                    query: Some(proto_query),
938                    request_id: Some(uuid::Uuid::new_v4().to_string()),
939                    timeout_ms: Some(30000),
940                    transaction_id: None,
941                    version: Some(create_version()),
942                });
943
944                let response = client.execute_query(request).await?;
945
946                // Handle response and extract MultiResult
947                match response.into_inner().response {
948                    Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
949                        use amaters_net::proto::query::query_result::Result as QueryResultEnum;
950                        match result.result {
951                            Some(QueryResultEnum::Multi(multi)) => {
952                                let mut values = Vec::new();
953                                for kv in multi.values {
954                                    let key = kv.key.ok_or_else(|| {
955                                        SdkError::OperationFailed(
956                                            "Missing key in result".to_string(),
957                                        )
958                                    })?;
959                                    let value = kv.value.ok_or_else(|| {
960                                        SdkError::OperationFailed(
961                                            "Missing value in result".to_string(),
962                                        )
963                                    })?;
964                                    values.push((
965                                        key_from_proto(key),
966                                        cipher_blob_from_proto(value)?,
967                                    ));
968                                }
969                                Ok(values)
970                            }
971                            _ => Err(SdkError::OperationFailed(
972                                "Expected multi result for range query".to_string(),
973                            )),
974                        }
975                    }
976                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
977                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
978                    ),
979                    None => Err(SdkError::OperationFailed(
980                        "Empty response from server".to_string(),
981                    )),
982                }
983            }
984        })
985        .await
986    }
987
988    // -----------------------------------------------------------------------
989    // Paginated range queries
990    // -----------------------------------------------------------------------
991
992    /// Range query with simple pagination.
993    ///
994    /// Returns the first `page_size` items in the range `[start, end)`. Use
995    /// the returned cursor to fetch subsequent pages via [`Self::range_with_cursor`].
996    pub async fn range_paginated(
997        &self,
998        collection: &str,
999        start: &Key,
1000        end: &Key,
1001        page_size: usize,
1002    ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1003        let pagination = PaginationConfig::new(page_size);
1004        self.range_with_cursor(collection, start, end, &pagination)
1005            .await
1006    }
1007
1008    /// Range query with full cursor-based pagination.
1009    ///
1010    /// If `pagination.cursor` is `Some`, the scan resumes from the key
1011    /// encoded in the cursor (exclusive). Otherwise it starts from `start`.
1012    pub async fn range_with_cursor(
1013        &self,
1014        collection: &str,
1015        start: &Key,
1016        end: &Key,
1017        pagination: &PaginationConfig,
1018    ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1019        let effective_start = if let Some(ref cursor_str) = pagination.cursor {
1020            decode_cursor(cursor_str)?
1021        } else {
1022            start.clone()
1023        };
1024
1025        // Fetch page_size + 1 items so we can detect "has_more"
1026        let all = self.range(collection, &effective_start, end).await?;
1027
1028        // If we resumed from a cursor the first item may be the cursor key
1029        // itself (inclusive range). Skip it so the page is exclusive of the
1030        // previous last item.
1031        let base_iter: Box<dyn Iterator<Item = (Key, CipherBlob)>> =
1032            if pagination.cursor.is_some() && !all.is_empty() && all[0].0 == effective_start {
1033                Box::new(all.into_iter().skip(1))
1034            } else {
1035                Box::new(all.into_iter())
1036            };
1037
1038        // Apply offset: skip `pagination.offset` additional items.
1039        let after_offset: Vec<(Key, CipherBlob)> = base_iter.skip(pagination.offset).collect();
1040
1041        let has_more = after_offset.len() > pagination.page_size;
1042        let items: Vec<(Key, CipherBlob)> = after_offset
1043            .into_iter()
1044            .take(pagination.page_size)
1045            .collect();
1046
1047        let next_cursor = if has_more {
1048            items.last().map(|(k, _)| encode_cursor(k))
1049        } else {
1050            None
1051        };
1052
1053        Ok(PaginatedResult {
1054            items,
1055            next_cursor,
1056            has_more,
1057            total_hint: None,
1058        })
1059    }
1060
1061    // -----------------------------------------------------------------------
1062    // Sorted range queries
1063    // -----------------------------------------------------------------------
1064
1065    /// Range query with results sorted according to `sort`.
1066    ///
1067    /// The full range is fetched from the server and then sorted client-side.
1068    pub async fn range_sorted(
1069        &self,
1070        collection: &str,
1071        start: &Key,
1072        end: &Key,
1073        sort: &SortConfig,
1074    ) -> Result<Vec<(Key, CipherBlob)>> {
1075        let mut results = self.range(collection, start, end).await?;
1076        sort_results(&mut results, sort);
1077        Ok(results)
1078    }
1079
1080    // -----------------------------------------------------------------------
1081    // Prefix scan
1082    // -----------------------------------------------------------------------
1083
1084    /// Scan keys with a given prefix, returning paginated results.
1085    ///
1086    /// This constructs a range `[prefix, prefix_end)` where `prefix_end` is
1087    /// the lexicographic successor of `prefix`, then delegates to
1088    /// [`Self::range_with_cursor`].
1089    pub async fn scan(
1090        &self,
1091        collection: &str,
1092        prefix: &Key,
1093        pagination: &PaginationConfig,
1094    ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1095        let start = prefix.clone();
1096        let end = prefix_end_key(prefix);
1097        self.range_with_cursor(collection, &start, &end, pagination)
1098            .await
1099    }
1100
1101    // -----------------------------------------------------------------------
1102    // Count
1103    // -----------------------------------------------------------------------
1104
1105    /// Count the number of keys in a collection.
1106    ///
1107    /// This performs a full range scan and counts the results. For very large
1108    /// collections a server-side count would be more efficient, but this
1109    /// provides a correct answer using the existing API surface.
1110    pub async fn count(&self, collection: &str) -> Result<usize> {
1111        debug!("Count: collection={}", collection);
1112
1113        // Use a full range scan (min key → max key)
1114        let start = Key::from_slice(&[0u8]);
1115        let end = Key::from_slice(&[0xFF; 32]);
1116        let results = self.range(collection, &start, &end).await?;
1117        Ok(results.len())
1118    }
1119
1120    // -----------------------------------------------------------------------
1121    // Transaction factory
1122    // -----------------------------------------------------------------------
1123
1124    /// Begin a new transaction bound to `collection`.
1125    ///
1126    /// All operations are buffered locally until [`crate::Transaction::commit`] or
1127    /// [`crate::Transaction::rollback`] is called.  Dropping the transaction without
1128    /// committing or rolling back emits a `tracing::warn!` for every uncommitted
1129    /// operation.
1130    ///
1131    /// # Example
1132    ///
1133    /// ```no_run
1134    /// use amaters_sdk_rust::{AmateRSClient, Transaction};
1135    /// use amaters_core::{Key, CipherBlob};
1136    ///
1137    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
1138    /// let mut tx = client.transaction("users");
1139    /// tx.set(Key::from_str("user:1"), CipherBlob::new(vec![1, 2, 3]))?;
1140    /// tx.commit().await?;
1141    /// # Ok(())
1142    /// # }
1143    /// ```
1144    pub fn transaction(&self, collection: impl Into<String>) -> crate::transaction::Transaction {
1145        crate::transaction::Transaction::new(Arc::new(self.clone()), collection)
1146    }
1147
1148    // -----------------------------------------------------------------------
1149    // Offline constructor (test / stub use only)
1150    // -----------------------------------------------------------------------
1151
1152    /// Create a client that is **not** connected to any server.
1153    ///
1154    /// This constructor skips the connection-pool probe so it can be used in
1155    /// unit tests and other contexts where no live server is available.
1156    /// Operations that require a server connection (e.g. `get`, `set`, `stream_query`) will
1157    /// fail with a `SdkError::Connection` error if called on this client.
1158    ///
1159    /// # Note
1160    ///
1161    /// This method is intended for **testing and development** only.
1162    #[doc(hidden)]
1163    pub fn new_offline(config: ClientConfig) -> Self {
1164        Self {
1165            pool: Arc::new(ConnectionPool::new(config.clone())),
1166            config: Arc::new(config),
1167            encryptor: None,
1168            cache: None,
1169        }
1170    }
1171
1172    // -----------------------------------------------------------------------
1173    // Streaming API
1174    // -----------------------------------------------------------------------
1175
1176    /// Stream query results row by row using a real gRPC server-streaming RPC.
1177    ///
1178    /// Returns a [`QueryStream`] that implements [`futures::Stream`].  The
1179    /// stream is backed by a bounded mpsc channel (capacity =
1180    /// `config.buffer_size`) so the producer is automatically throttled when
1181    /// the consumer is slow.  Dropping the returned stream cancels the
1182    /// background task.
1183    ///
1184    /// The server currently supports `Range` and `Get` queries for streaming.
1185    /// Other query variants will be rejected by the server with a gRPC error
1186    /// that is forwarded to the stream as `Err(SdkError::Grpc(...))`.
1187    ///
1188    /// Returns `Err(SdkError::Connection(...))` if there is no live server
1189    /// connection available.
1190    pub async fn stream_query(&self, query: Query, config: StreamConfig) -> Result<QueryStream> {
1191        debug!("stream_query: query={:?}", query);
1192
1193        // Acquire a connection from the pool.  If the pool has no live
1194        // connections (e.g. offline client) this returns an error immediately.
1195        let conn = self.pool.get().await?;
1196
1197        let timeout_secs = config.timeout_secs;
1198        let (query_stream, sender) = QueryStream::new(&config);
1199        let cancel_token = sender.cancel_token();
1200
1201        // Build the proto request from the core Query.
1202        let proto_query = {
1203            use amaters_net::convert::query_to_proto;
1204            query_to_proto(&query)?
1205        };
1206
1207        let request = tonic::Request::new(amaters_net::proto::aql::QueryRequest {
1208            query: Some(proto_query),
1209            request_id: Some(uuid::Uuid::new_v4().to_string()),
1210            timeout_ms: timeout_secs.map(|s| (s * 1000) as u32),
1211            transaction_id: None,
1212            version: Some(amaters_net::convert::create_version()),
1213        });
1214
1215        // Start the server-streaming RPC.
1216        let mut grpc_client = {
1217            use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
1218            AqlServiceClient::new(conn.channel().clone())
1219        };
1220
1221        let response_stream = grpc_client
1222            .execute_stream(request)
1223            .await
1224            .map_err(SdkError::Grpc)?
1225            .into_inner();
1226
1227        // Spawn background task: pump the gRPC stream into the mpsc channel.
1228        tokio::spawn(async move {
1229            // Hold the connection alive for the duration of the stream.
1230            let _conn = conn;
1231
1232            let mut pinned = std::pin::pin!(response_stream);
1233
1234            loop {
1235                // Wait for the next chunk while honouring cancellation.
1236                let item = tokio::select! {
1237                    biased;
1238                    _ = cancel_token.cancelled() => {
1239                        debug!("stream_query: cancelled by consumer");
1240                        break;
1241                    }
1242                    item = pinned.next() => item,
1243                };
1244
1245                match item {
1246                    // gRPC stream closed cleanly.
1247                    None => break,
1248
1249                    // gRPC transport or server error.
1250                    Some(Err(status)) => {
1251                        let _ = sender.send_error(SdkError::Grpc(status)).await;
1252                        break;
1253                    }
1254
1255                    // A StreamResponse message.
1256                    Some(Ok(response)) => {
1257                        use amaters_net::proto::aql::stream_response::Chunk;
1258
1259                        match response.chunk {
1260                            // Batched key-value pairs — what the server actually emits today.
1261                            Some(Chunk::Batch(batch)) => {
1262                                for kv in batch.values {
1263                                    if sender.is_cancelled() {
1264                                        return;
1265                                    }
1266                                    let key = kv.key.map(|k| k.data).unwrap_or_default();
1267                                    let value = kv.value.map(|v| v.data).unwrap_or_default();
1268                                    let row = Row::new(key, value);
1269                                    if !sender.send_row(row).await {
1270                                        return;
1271                                    }
1272                                }
1273                            }
1274
1275                            // Legacy single-item message.
1276                            Some(Chunk::Value(kv)) => {
1277                                if sender.is_cancelled() {
1278                                    return;
1279                                }
1280                                let key = kv.key.map(|k| k.data).unwrap_or_default();
1281                                let value = kv.value.map(|v| v.data).unwrap_or_default();
1282                                let row = Row::new(key, value);
1283                                if !sender.send_row(row).await {
1284                                    return;
1285                                }
1286                            }
1287
1288                            // End-of-stream marker — close cleanly.
1289                            Some(Chunk::End(_)) => break,
1290
1291                            // Server-side error embedded in the stream.
1292                            Some(Chunk::Error(e)) => {
1293                                let _ = sender
1294                                    .send_error(SdkError::OperationFailed(e.message))
1295                                    .await;
1296                                break;
1297                            }
1298
1299                            // Malformed/empty chunk — skip.
1300                            None => {}
1301                        }
1302                    }
1303                }
1304            }
1305            // Dropping `sender` closes the channel, which ends the QueryStream.
1306        });
1307
1308        Ok(query_stream)
1309    }
1310}
1311
1312// ---------------------------------------------------------------------------
1313// Free-standing helpers
1314// ---------------------------------------------------------------------------
1315
1316/// Sort a mutable slice of (Key, CipherBlob) pairs in-place according to `sort`.
1317fn sort_results(results: &mut [(Key, CipherBlob)], sort: &SortConfig) {
1318    match (sort.field, sort.order) {
1319        (SortField::Key, SortOrder::Ascending) => {
1320            results.sort_by(|a, b| a.0.cmp(&b.0));
1321        }
1322        (SortField::Key, SortOrder::Descending) => {
1323            results.sort_by(|a, b| b.0.cmp(&a.0));
1324        }
1325        (SortField::Value, SortOrder::Ascending) => {
1326            results.sort_by(|a, b| a.1.as_bytes().cmp(b.1.as_bytes()));
1327        }
1328        (SortField::Value, SortOrder::Descending) => {
1329            results.sort_by(|a, b| b.1.as_bytes().cmp(a.1.as_bytes()));
1330        }
1331        // Timestamp is approximated by key ordering (insertion-order proxy)
1332        (SortField::Timestamp, SortOrder::Ascending) => {
1333            results.sort_by(|a, b| a.0.cmp(&b.0));
1334        }
1335        (SortField::Timestamp, SortOrder::Descending) => {
1336            results.sort_by(|a, b| b.0.cmp(&a.0));
1337        }
1338    }
1339}
1340
1341/// Compute the lexicographic successor of `prefix` to define an exclusive
1342/// upper bound for prefix scans.
1343///
1344/// For example `"user:"` → `"user;"` (`:` + 1 = `;` in ASCII).
1345/// If the prefix is all `0xFF` bytes, returns a key with an extra `0xFF`
1346/// byte appended so the range is always valid.
1347fn prefix_end_key(prefix: &Key) -> Key {
1348    let mut bytes = prefix.to_vec();
1349    // Walk from the last byte backwards, incrementing the first byte < 0xFF.
1350    while let Some(last) = bytes.last_mut() {
1351        if *last < 0xFF {
1352            *last += 1;
1353            return Key::from_slice(&bytes);
1354        }
1355        bytes.pop();
1356    }
1357    // All bytes were 0xFF — extend with one more 0xFF.
1358    let mut extended = prefix.to_vec();
1359    extended.push(0xFF);
1360    Key::from_slice(&extended)
1361}
1362
1363/// Server information
1364#[derive(Debug, Clone)]
1365pub struct ServerInfo {
1366    /// Server version (major, minor, patch)
1367    pub version: Option<(u32, u32, u32)>,
1368    /// Supported protocol versions
1369    pub supported_versions: Vec<(u32, u32, u32)>,
1370    /// Server capabilities
1371    pub capabilities: Vec<String>,
1372    /// Server uptime in seconds
1373    pub uptime_seconds: u64,
1374}
1375
1376// ---------------------------------------------------------------------------
1377// PaginatedQueryBuilder
1378// ---------------------------------------------------------------------------
1379
1380/// Builder for constructing paginated and sorted queries.
1381///
1382/// Provides a fluent API on top of the core `QueryBuilder` with pagination
1383/// and sorting options.
1384#[derive(Debug, Clone)]
1385pub struct PaginatedQueryBuilder {
1386    collection: String,
1387    page_size: Option<usize>,
1388    cursor: Option<String>,
1389    sort: Option<SortConfig>,
1390    /// Alias for `page_size` — the maximum number of results to return.
1391    limit: Option<usize>,
1392    /// Number of items to skip (applied after cursor resume).
1393    offset: Option<usize>,
1394}
1395
1396impl PaginatedQueryBuilder {
1397    /// Create a new paginated query builder for a collection.
1398    pub fn new(collection: impl Into<String>) -> Self {
1399        Self {
1400            collection: collection.into(),
1401            page_size: None,
1402            cursor: None,
1403            sort: None,
1404            limit: None,
1405            offset: None,
1406        }
1407    }
1408
1409    /// Set the page size for pagination.
1410    #[must_use]
1411    pub fn page_size(mut self, size: usize) -> Self {
1412        self.page_size = Some(size);
1413        self
1414    }
1415
1416    /// Set the maximum number of results to return (alias for `page_size`).
1417    #[must_use]
1418    pub fn limit(mut self, n: usize) -> Self {
1419        self.limit = Some(n);
1420        self
1421    }
1422
1423    /// Set the number of items to skip (applied after any cursor resume).
1424    #[must_use]
1425    pub fn offset(mut self, n: usize) -> Self {
1426        self.offset = Some(n);
1427        self
1428    }
1429
1430    /// Set the cursor to resume pagination from.
1431    #[must_use]
1432    pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
1433        self.cursor = Some(cursor.into());
1434        self
1435    }
1436
1437    /// Set the sort order for results.
1438    #[must_use]
1439    pub fn sort_by(mut self, field: SortField, order: SortOrder) -> Self {
1440        self.sort = Some(SortConfig::new(field, order));
1441        self
1442    }
1443
1444    /// Build a `PaginationConfig` from the current builder state.
1445    ///
1446    /// `limit` takes precedence over `page_size` when both are set.
1447    pub fn build_paginated(&self) -> PaginationConfig {
1448        let page_size = self.limit.or(self.page_size).unwrap_or(100);
1449        PaginationConfig {
1450            page_size,
1451            cursor: self.cursor.clone(),
1452            offset: self.offset.unwrap_or(0),
1453        }
1454    }
1455
1456    /// Get the collection name.
1457    pub fn collection(&self) -> &str {
1458        &self.collection
1459    }
1460
1461    /// Get the sort configuration, if set.
1462    pub fn sort_config(&self) -> Option<&SortConfig> {
1463        self.sort.as_ref()
1464    }
1465}
1466
1467/// Query execution result
1468#[derive(Debug, Clone)]
1469pub enum QueryResult {
1470    /// Single value result
1471    Single(Option<CipherBlob>),
1472    /// Multiple values result
1473    Multi(Vec<(Key, CipherBlob)>),
1474    /// Success result (no data)
1475    Success { affected_rows: u64 },
1476}
1477
1478#[cfg(test)]
1479mod tests {
1480    use super::*;
1481
1482    // --- existing tests ----------------------------------------------------
1483
1484    #[tokio::test]
1485    async fn test_retry_config() {
1486        let config = RetryConfig::default();
1487        assert_eq!(config.max_retries, 3);
1488
1489        let backoff = config.backoff_duration(1);
1490        assert!(backoff.as_millis() > 0);
1491    }
1492
1493    #[test]
1494    fn test_query_result() {
1495        let result = QueryResult::Success { affected_rows: 5 };
1496        match result {
1497            QueryResult::Success { affected_rows } => {
1498                assert_eq!(affected_rows, 5);
1499            }
1500            _ => panic!("expected Success"),
1501        }
1502    }
1503
1504    // --- pagination config tests -------------------------------------------
1505
1506    #[test]
1507    fn test_pagination_config_default() {
1508        let config = PaginationConfig::default();
1509        assert_eq!(config.page_size, 100);
1510        assert!(config.cursor.is_none());
1511    }
1512
1513    #[test]
1514    fn test_pagination_config_with_cursor() {
1515        let config = PaginationConfig::new(50).with_cursor("abc123");
1516        assert_eq!(config.page_size, 50);
1517        assert_eq!(config.cursor.as_deref(), Some("abc123"));
1518    }
1519
1520    // --- paginated result tests --------------------------------------------
1521
1522    #[test]
1523    fn test_paginated_result_has_more() {
1524        // Items less than page_size means no more pages
1525        let result: PaginatedResult<u8> = PaginatedResult {
1526            items: vec![1, 2, 3],
1527            next_cursor: None,
1528            has_more: false,
1529            total_hint: None,
1530        };
1531        assert!(!result.has_more);
1532        assert!(result.next_cursor.is_none());
1533        assert_eq!(result.items.len(), 3);
1534    }
1535
1536    #[test]
1537    fn test_paginated_result_with_more() {
1538        let result: PaginatedResult<u8> = PaginatedResult {
1539            items: vec![1, 2, 3],
1540            next_cursor: Some("cursor_xyz".to_string()),
1541            has_more: true,
1542            total_hint: Some(100),
1543        };
1544        assert!(result.has_more);
1545        assert_eq!(result.next_cursor.as_deref(), Some("cursor_xyz"));
1546        assert_eq!(result.total_hint, Some(100));
1547    }
1548
1549    // --- sort config tests -------------------------------------------------
1550
1551    #[test]
1552    fn test_sort_ascending() {
1553        let mut data = vec![
1554            (Key::from_str("c"), CipherBlob::new(vec![3])),
1555            (Key::from_str("a"), CipherBlob::new(vec![1])),
1556            (Key::from_str("b"), CipherBlob::new(vec![2])),
1557        ];
1558        let sort = SortConfig::new(SortField::Key, SortOrder::Ascending);
1559        sort_results(&mut data, &sort);
1560
1561        assert_eq!(data[0].0.to_string_lossy(), "a");
1562        assert_eq!(data[1].0.to_string_lossy(), "b");
1563        assert_eq!(data[2].0.to_string_lossy(), "c");
1564    }
1565
1566    #[test]
1567    fn test_sort_descending() {
1568        let mut data = vec![
1569            (Key::from_str("a"), CipherBlob::new(vec![1])),
1570            (Key::from_str("c"), CipherBlob::new(vec![3])),
1571            (Key::from_str("b"), CipherBlob::new(vec![2])),
1572        ];
1573        let sort = SortConfig::new(SortField::Key, SortOrder::Descending);
1574        sort_results(&mut data, &sort);
1575
1576        assert_eq!(data[0].0.to_string_lossy(), "c");
1577        assert_eq!(data[1].0.to_string_lossy(), "b");
1578        assert_eq!(data[2].0.to_string_lossy(), "a");
1579    }
1580
1581    #[test]
1582    fn test_sort_by_value_ascending() {
1583        let mut data = vec![
1584            (Key::from_str("x"), CipherBlob::new(vec![30])),
1585            (Key::from_str("y"), CipherBlob::new(vec![10])),
1586            (Key::from_str("z"), CipherBlob::new(vec![20])),
1587        ];
1588        let sort = SortConfig::new(SortField::Value, SortOrder::Ascending);
1589        sort_results(&mut data, &sort);
1590
1591        assert_eq!(data[0].1.to_vec(), vec![10]);
1592        assert_eq!(data[1].1.to_vec(), vec![20]);
1593        assert_eq!(data[2].1.to_vec(), vec![30]);
1594    }
1595
1596    // --- cursor encoding / decoding tests ----------------------------------
1597
1598    #[test]
1599    fn test_cursor_encoding() {
1600        let key = Key::from_str("user:999");
1601        let cursor = encode_cursor(&key);
1602
1603        // Should be non-empty and contain the separator
1604        assert!(!cursor.is_empty());
1605        assert!(cursor.contains('|'));
1606
1607        // Decode and verify round-trip
1608        let decoded = decode_cursor(&cursor).expect("decode should succeed");
1609        assert_eq!(decoded, key);
1610    }
1611
1612    #[test]
1613    fn test_cursor_encoding_binary_key() {
1614        let key = Key::from_slice(&[0x00, 0xFF, 0xAB, 0xCD]);
1615        let cursor = encode_cursor(&key);
1616        let decoded = decode_cursor(&cursor).expect("decode should succeed");
1617        assert_eq!(decoded, key);
1618    }
1619
1620    #[test]
1621    fn test_cursor_integrity() {
1622        let key = Key::from_str("original_key");
1623        let cursor = encode_cursor(&key);
1624
1625        // Tamper with the cursor by flipping a character in the key portion
1626        let mut tampered = cursor.clone();
1627        let bytes = unsafe { tampered.as_bytes_mut() };
1628        if !bytes.is_empty() {
1629            bytes[0] ^= 0x01; // flip a bit
1630        }
1631
1632        let result = decode_cursor(&tampered);
1633        assert!(result.is_err(), "tampered cursor should be rejected");
1634    }
1635
1636    #[test]
1637    fn test_cursor_malformed_no_separator() {
1638        let result = decode_cursor("noseparatorhere");
1639        assert!(result.is_err());
1640    }
1641
1642    #[test]
1643    fn test_cursor_malformed_empty() {
1644        let result = decode_cursor("|");
1645        // Both parts are empty — hash check will fail
1646        assert!(result.is_err());
1647    }
1648
1649    // --- prefix end key tests ----------------------------------------------
1650
1651    #[test]
1652    fn test_prefix_end_key() {
1653        let prefix = Key::from_str("user:");
1654        let end = prefix_end_key(&prefix);
1655        // "user:" → "user;" because ':' + 1 = ';'
1656        assert_eq!(end.to_string_lossy(), "user;");
1657    }
1658
1659    #[test]
1660    fn test_prefix_end_key_all_ff() {
1661        let prefix = Key::from_slice(&[0xFF, 0xFF]);
1662        let end = prefix_end_key(&prefix);
1663        // Should extend with one more 0xFF
1664        assert_eq!(end.to_vec(), vec![0xFF, 0xFF, 0xFF]);
1665    }
1666
1667    // --- scan prefix helper ------------------------------------------------
1668
1669    #[test]
1670    fn test_scan_with_prefix_key_generation() {
1671        // Verify that prefix_end_key produces a correct exclusive upper bound
1672        let prefix = Key::from_str("item:");
1673        let end = prefix_end_key(&prefix);
1674
1675        // Keys within the prefix should be < end
1676        let within = Key::from_str("item:abc");
1677        assert!(within < end);
1678
1679        // Keys outside the prefix should be >= end
1680        let outside = Key::from_str("item;abc");
1681        assert!(outside >= end);
1682    }
1683
1684    // --- query builder pagination tests ------------------------------------
1685
1686    #[test]
1687    fn test_query_builder_pagination() {
1688        let builder = PaginatedQueryBuilder::new("users")
1689            .page_size(25)
1690            .cursor("some_cursor");
1691
1692        let config = builder.build_paginated();
1693        assert_eq!(config.page_size, 25);
1694        assert_eq!(config.cursor.as_deref(), Some("some_cursor"));
1695        assert_eq!(builder.collection(), "users");
1696    }
1697
1698    #[test]
1699    fn test_query_builder_pagination_defaults() {
1700        let builder = PaginatedQueryBuilder::new("events");
1701        let config = builder.build_paginated();
1702        assert_eq!(config.page_size, 100);
1703        assert!(config.cursor.is_none());
1704    }
1705
1706    #[test]
1707    fn test_query_builder_sorting() {
1708        let builder = PaginatedQueryBuilder::new("logs")
1709            .sort_by(SortField::Timestamp, SortOrder::Descending)
1710            .page_size(50);
1711
1712        let sort = builder.sort_config().expect("sort should be set");
1713        assert_eq!(sort.field, SortField::Timestamp);
1714        assert_eq!(sort.order, SortOrder::Descending);
1715    }
1716
1717    #[test]
1718    fn test_query_builder_no_sorting() {
1719        let builder = PaginatedQueryBuilder::new("data");
1720        assert!(builder.sort_config().is_none());
1721    }
1722
1723    // --- hex encode/decode round-trip --------------------------------------
1724
1725    #[test]
1726    fn test_hex_encode_decode() {
1727        let original = vec![0x00, 0x0A, 0xFF, 0x42, 0x99];
1728        let encoded = hex_encode(&original);
1729        assert_eq!(encoded, "000aff4299");
1730        let decoded = hex_decode(&encoded).expect("decode should succeed");
1731        assert_eq!(decoded, original);
1732    }
1733
1734    #[test]
1735    fn test_hex_decode_odd_length() {
1736        let result = hex_decode("abc");
1737        assert!(result.is_err());
1738    }
1739
1740    #[test]
1741    fn test_hex_decode_invalid_chars() {
1742        let result = hex_decode("zzzz");
1743        assert!(result.is_err());
1744    }
1745
1746    // --- sort with timestamp field -----------------------------------------
1747
1748    #[test]
1749    fn test_sort_by_timestamp_ascending() {
1750        let mut data = vec![
1751            (Key::from_str("ts:003"), CipherBlob::new(vec![3])),
1752            (Key::from_str("ts:001"), CipherBlob::new(vec![1])),
1753            (Key::from_str("ts:002"), CipherBlob::new(vec![2])),
1754        ];
1755        let sort = SortConfig::new(SortField::Timestamp, SortOrder::Ascending);
1756        sort_results(&mut data, &sort);
1757
1758        assert_eq!(data[0].0.to_string_lossy(), "ts:001");
1759        assert_eq!(data[1].0.to_string_lossy(), "ts:002");
1760        assert_eq!(data[2].0.to_string_lossy(), "ts:003");
1761    }
1762}