Skip to main content

amaters_sdk_rust/
client.rs

1//! AmateRS client implementation
2
3use crate::cache::{InvalidationPolicy, QueryCache, QueryCacheConfig};
4use crate::config::{ClientConfig, RetryConfig};
5use crate::connection::{Connection, ConnectionPool};
6use crate::error::{Result, SdkError};
7use crate::fhe::FheEncryptor;
8use crate::streaming::{QueryStream, Row, StreamConfig};
9use amaters_core::{CipherBlob, Key, Query};
10use futures::StreamExt as _;
11use std::sync::Arc;
12use tokio::time::{sleep, timeout};
13use tracing::{debug, info, warn};
14
15// ---------------------------------------------------------------------------
16// Pagination & Sorting types
17// ---------------------------------------------------------------------------
18
19/// Configuration for cursor-based pagination.
20#[derive(Debug, Clone)]
21pub struct PaginationConfig {
22    /// Maximum number of items per page.
23    pub page_size: usize,
24    /// Opaque cursor to resume from. `None` starts from the beginning.
25    pub cursor: Option<String>,
26    /// Number of items to skip after cursor-resume (or from the start when no cursor).
27    ///
28    /// When both `cursor` and `offset` are set, the offset is applied after cursor
29    /// resume — i.e. the first `offset` items following the cursor position are skipped.
30    pub offset: usize,
31}
32
33impl Default for PaginationConfig {
34    fn default() -> Self {
35        Self {
36            page_size: 100,
37            cursor: None,
38            offset: 0,
39        }
40    }
41}
42
43impl PaginationConfig {
44    /// Create a new pagination config with the given page size.
45    pub fn new(page_size: usize) -> Self {
46        Self {
47            page_size,
48            cursor: None,
49            offset: 0,
50        }
51    }
52
53    /// Set the cursor to resume from.
54    #[must_use]
55    pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
56        self.cursor = Some(cursor.into());
57        self
58    }
59
60    /// Set the number of items to skip.
61    #[must_use]
62    pub fn with_offset(mut self, offset: usize) -> Self {
63        self.offset = offset;
64        self
65    }
66}
67
68/// Result of a paginated query.
69#[derive(Debug, Clone)]
70pub struct PaginatedResult<T> {
71    /// Items in the current page.
72    pub items: Vec<T>,
73    /// Opaque cursor to fetch the next page. `None` if this is the last page.
74    pub next_cursor: Option<String>,
75    /// Whether there are more items after this page.
76    pub has_more: bool,
77    /// Optional hint about the total number of items (may not always be available).
78    pub total_hint: Option<usize>,
79}
80
81/// Sort ordering direction.
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum SortOrder {
84    /// Sort in ascending order (A → Z, 0 → 9).
85    Ascending,
86    /// Sort in descending order (Z → A, 9 → 0).
87    Descending,
88}
89
90/// Field to sort results by.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum SortField {
93    /// Sort by key.
94    Key,
95    /// Sort by value bytes (lexicographic).
96    Value,
97    /// Sort by timestamp (insertion order proxy — byte ordering).
98    Timestamp,
99}
100
101/// Sort configuration for query results.
102#[derive(Debug, Clone)]
103pub struct SortConfig {
104    /// The field to sort by.
105    pub field: SortField,
106    /// The sort direction.
107    pub order: SortOrder,
108}
109
110impl SortConfig {
111    /// Create a new sort configuration.
112    pub fn new(field: SortField, order: SortOrder) -> Self {
113        Self { field, order }
114    }
115}
116
117// ---------------------------------------------------------------------------
118// Cursor encoding / decoding helpers
119// ---------------------------------------------------------------------------
120
121/// Separator between the key payload and the integrity hash inside a cursor.
122const CURSOR_SEPARATOR: u8 = b'|';
123
124/// Encode a key into an opaque cursor string.
125///
126/// Format: hex(key_bytes) + "|" + hex(blake3(key_bytes))
127/// The blake3 hash provides integrity so tampered cursors are rejected.
128fn encode_cursor(key: &Key) -> String {
129    let key_bytes = key.as_bytes();
130    let hash = blake3::hash(key_bytes);
131    let key_hex = hex_encode(key_bytes);
132    let hash_hex = hex_encode(hash.as_bytes());
133    format!("{}{}{}", key_hex, CURSOR_SEPARATOR as char, hash_hex)
134}
135
136/// Decode an opaque cursor string back into a `Key`.
137///
138/// Returns an error if the cursor is malformed or if the integrity hash does
139/// not match (i.e. the cursor was tampered with).
140fn decode_cursor(cursor: &str) -> Result<Key> {
141    let parts: Vec<&str> = cursor.split(CURSOR_SEPARATOR as char).collect();
142    if parts.len() != 2 {
143        return Err(SdkError::InvalidArgument(
144            "malformed cursor: expected two parts separated by '|'".to_string(),
145        ));
146    }
147
148    let key_bytes = hex_decode(parts[0])
149        .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor key: {}", e)))?;
150
151    let hash_bytes = hex_decode(parts[1])
152        .map_err(|e| SdkError::InvalidArgument(format!("malformed cursor hash: {}", e)))?;
153
154    // Verify integrity
155    let expected_hash = blake3::hash(&key_bytes);
156    if hash_bytes.len() != 32 || expected_hash.as_bytes() != hash_bytes.as_slice() {
157        return Err(SdkError::InvalidArgument(
158            "cursor integrity check failed: hash mismatch".to_string(),
159        ));
160    }
161
162    Ok(Key::from_slice(&key_bytes))
163}
164
165/// Hex-encode bytes into a lowercase hex string.
166fn hex_encode(bytes: &[u8]) -> String {
167    use std::fmt::Write;
168    bytes
169        .iter()
170        .fold(String::with_capacity(bytes.len() * 2), |mut s, b| {
171            let _ = write!(&mut s, "{:02x}", b);
172            s
173        })
174}
175
176/// Hex-decode a hex string into bytes.
177fn hex_decode(hex: &str) -> std::result::Result<Vec<u8>, String> {
178    if hex.len() % 2 != 0 {
179        return Err("odd-length hex string".to_string());
180    }
181    (0..hex.len())
182        .step_by(2)
183        .map(|i| {
184            u8::from_str_radix(&hex[i..i + 2], 16)
185                .map_err(|e| format!("invalid hex at offset {}: {}", i, e))
186        })
187        .collect()
188}
189
190/// AmateRS client for interacting with the database
191///
192/// The client manages connections, handles retries, and provides
193/// high-level operations for working with encrypted data.
194#[derive(Clone)]
195pub struct AmateRSClient {
196    pool: Arc<ConnectionPool>,
197    config: Arc<ClientConfig>,
198    encryptor: Option<Arc<FheEncryptor>>,
199    cache: Option<Arc<QueryCache>>,
200}
201
202impl AmateRSClient {
203    /// Connect to an AmateRS server
204    ///
205    /// # Example
206    ///
207    /// ```no_run
208    /// use amaters_sdk_rust::AmateRSClient;
209    ///
210    /// # async fn example() -> anyhow::Result<()> {
211    /// let client = AmateRSClient::connect("http://localhost:50051").await?;
212    /// # Ok(())
213    /// # }
214    /// ```
215    pub async fn connect(addr: impl Into<String>) -> Result<Self> {
216        let config = ClientConfig::new(addr);
217        Self::connect_with_config(config).await
218    }
219
220    /// Connect with a custom configuration
221    ///
222    /// # Example
223    ///
224    /// ```no_run
225    /// use amaters_sdk_rust::{AmateRSClient, ClientConfig};
226    /// use std::time::Duration;
227    ///
228    /// # async fn example() -> anyhow::Result<()> {
229    /// let config = ClientConfig::new("http://localhost:50051")
230    ///     .with_connect_timeout(Duration::from_secs(5))
231    ///     .with_max_connections(20);
232    ///
233    /// let client = AmateRSClient::connect_with_config(config).await?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub async fn connect_with_config(config: ClientConfig) -> Result<Self> {
238        info!("Connecting to AmateRS server at {}", config.server_addr);
239
240        let pool = ConnectionPool::new(config.clone());
241
242        // Test connection by getting one
243        let _conn = pool.get().await?;
244
245        info!("Successfully connected to AmateRS server");
246
247        Ok(Self {
248            pool: Arc::new(pool),
249            config: Arc::new(config),
250            encryptor: None,
251            cache: None,
252        })
253    }
254
255    /// Set the FHE encryptor for client-side encryption
256    pub fn with_encryptor(mut self, encryptor: FheEncryptor) -> Self {
257        self.encryptor = Some(Arc::new(encryptor));
258        self
259    }
260
261    /// Get the encryptor (if set)
262    pub fn encryptor(&self) -> Option<&Arc<FheEncryptor>> {
263        self.encryptor.as_ref()
264    }
265
266    /// Enable client-side query result caching with the given configuration.
267    ///
268    /// # Example
269    ///
270    /// ```no_run
271    /// use amaters_sdk_rust::{AmateRSClient, QueryCacheConfig};
272    /// use std::time::Duration;
273    ///
274    /// # async fn example() -> anyhow::Result<()> {
275    /// let client = AmateRSClient::connect("http://localhost:50051")
276    ///     .await?
277    ///     .with_cache(QueryCacheConfig::default().with_ttl(Duration::from_secs(120)));
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub fn with_cache(mut self, config: QueryCacheConfig) -> Self {
282        self.cache = Some(Arc::new(QueryCache::new(config)));
283        self
284    }
285
286    /// Get a reference to the cache (if enabled).
287    pub fn cache(&self) -> Option<&Arc<QueryCache>> {
288        self.cache.as_ref()
289    }
290
291    /// Execute a query with retry logic
292    async fn execute_with_retry<F, Fut, T>(&self, operation: F) -> Result<T>
293    where
294        F: Fn(Connection) -> Fut,
295        Fut: std::future::Future<Output = Result<T>>,
296    {
297        let retry_config = &self.config.retry_config;
298        let mut attempt = 0;
299
300        loop {
301            attempt += 1;
302
303            // Get connection from pool
304            let conn = self.pool.get().await?;
305
306            // Try the operation
307            match operation(conn).await {
308                Ok(result) => return Ok(result),
309                Err(e) if e.is_retryable() && attempt <= retry_config.max_retries => {
310                    let backoff = retry_config.backoff_duration(attempt);
311                    warn!(
312                        "Operation failed (attempt {}), retrying after {:?}: {}",
313                        attempt, backoff, e
314                    );
315                    sleep(backoff).await;
316                }
317                Err(e) => return Err(e),
318            }
319        }
320    }
321
322    /// Set a key-value pair
323    ///
324    /// # Example
325    ///
326    /// ```no_run
327    /// use amaters_sdk_rust::AmateRSClient;
328    /// use amaters_core::{Key, CipherBlob};
329    ///
330    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
331    /// let key = Key::from_str("user:123");
332    /// let value = CipherBlob::new(vec![1, 2, 3, 4]);
333    ///
334    /// client.set("users", &key, &value).await?;
335    /// # Ok(())
336    /// # }
337    /// ```
338    pub async fn set(&self, collection: &str, key: &Key, value: &CipherBlob) -> Result<()> {
339        debug!("Set: collection={}, key={}", collection, key);
340
341        // Invalidate cache entry on write (if policy allows)
342        if let Some(ref cache) = self.cache {
343            if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
344                let cache_key = QueryCache::make_key(collection, key.as_bytes());
345                cache.invalidate(&cache_key);
346            }
347        }
348
349        let collection = collection.to_string();
350        let key = key.clone();
351        let value = value.clone();
352
353        self.execute_with_retry(move |conn| {
354            let collection = collection.clone();
355            let key = key.clone();
356            let value = value.clone();
357
358            async move {
359                use amaters_net::convert::{create_version, query_to_proto};
360                use amaters_net::proto::aql::QueryRequest;
361                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
362
363                let mut client = AqlServiceClient::new(conn.channel().clone());
364
365                let query = Query::Set {
366                    collection,
367                    key,
368                    value,
369                };
370
371                let proto_query = query_to_proto(&query)?;
372
373                let request = tonic::Request::new(QueryRequest {
374                    query: Some(proto_query),
375                    request_id: Some(uuid::Uuid::new_v4().to_string()),
376                    timeout_ms: Some(30000),
377                    transaction_id: None,
378                    version: Some(create_version()),
379                });
380
381                let response = client.execute_query(request).await?;
382
383                // Handle response, check for errors
384                match response.into_inner().response {
385                    Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
386                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
387                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
388                    ),
389                    None => Err(SdkError::OperationFailed(
390                        "Empty response from server".to_string(),
391                    )),
392                }
393            }
394        })
395        .await
396    }
397
398    /// Get a value by key
399    ///
400    /// Returns `None` if the key doesn't exist.
401    ///
402    /// # Example
403    ///
404    /// ```no_run
405    /// use amaters_sdk_rust::AmateRSClient;
406    /// use amaters_core::Key;
407    ///
408    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
409    /// let key = Key::from_str("user:123");
410    ///
411    /// if let Some(value) = client.get("users", &key).await? {
412    ///     println!("Found value: {} bytes", value.len());
413    /// }
414    /// # Ok(())
415    /// # }
416    /// ```
417    pub async fn get(&self, collection: &str, key: &Key) -> Result<Option<CipherBlob>> {
418        debug!("Get: collection={}, key={}", collection, key);
419
420        // Check cache first
421        if let Some(ref cache) = self.cache {
422            let cache_key = QueryCache::make_key(collection, key.as_bytes());
423            if let Some(cached_data) = cache.get(&cache_key) {
424                debug!("Cache hit for collection={}, key={}", collection, key);
425                return Ok(Some(CipherBlob::new(cached_data)));
426            }
427        }
428
429        let collection = collection.to_string();
430        let key = key.clone();
431        let cache = self.cache.clone();
432        let coll_for_cache = collection.clone();
433        let key_for_cache = key.clone();
434
435        let result = self
436            .execute_with_retry(move |conn| {
437                let collection = collection.clone();
438                let key = key.clone();
439
440                async move {
441                    use amaters_net::convert::{
442                        cipher_blob_from_proto, create_version, query_to_proto,
443                    };
444                    use amaters_net::proto::aql::QueryRequest;
445                    use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
446
447                    let mut client = AqlServiceClient::new(conn.channel().clone());
448
449                    let query = Query::Get { collection, key };
450
451                    let proto_query = query_to_proto(&query)?;
452
453                    let request = tonic::Request::new(QueryRequest {
454                        query: Some(proto_query),
455                        request_id: Some(uuid::Uuid::new_v4().to_string()),
456                        timeout_ms: Some(30000),
457                        transaction_id: None,
458                        version: Some(create_version()),
459                    });
460
461                    let response = client.execute_query(request).await?;
462
463                    // Handle response and extract SingleResult
464                    match response.into_inner().response {
465                        Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
466                            use amaters_net::proto::query::query_result::Result as QueryResultEnum;
467                            match result.result {
468                                Some(QueryResultEnum::Single(single)) => {
469                                    if let Some(value) = single.value {
470                                        Ok(Some(cipher_blob_from_proto(value)?))
471                                    } else {
472                                        Ok(None)
473                                    }
474                                }
475                                _ => Err(SdkError::OperationFailed(
476                                    "Expected single result".to_string(),
477                                )),
478                            }
479                        }
480                        Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
481                            SdkError::OperationFailed(format!("Server error: {}", e.message)),
482                        ),
483                        None => Err(SdkError::OperationFailed(
484                            "Empty response from server".to_string(),
485                        )),
486                    }
487                }
488            })
489            .await;
490
491        // Cache the result on success
492        if let Ok(Some(ref blob)) = result {
493            if let Some(ref c) = cache {
494                let ck = QueryCache::make_key(&coll_for_cache, key_for_cache.as_bytes());
495                c.put_with_collection(&ck, blob.to_vec(), Some(&coll_for_cache));
496            }
497        }
498
499        result
500    }
501
502    /// Delete a key
503    ///
504    /// # Example
505    ///
506    /// ```no_run
507    /// use amaters_sdk_rust::AmateRSClient;
508    /// use amaters_core::Key;
509    ///
510    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
511    /// let key = Key::from_str("user:123");
512    /// client.delete("users", &key).await?;
513    /// # Ok(())
514    /// # }
515    /// ```
516    pub async fn delete(&self, collection: &str, key: &Key) -> Result<()> {
517        debug!("Delete: collection={}, key={}", collection, key);
518
519        // Invalidate cache entry on write (if policy allows)
520        if let Some(ref cache) = self.cache {
521            if cache.invalidation_policy() == InvalidationPolicy::OnWrite {
522                let cache_key = QueryCache::make_key(collection, key.as_bytes());
523                cache.invalidate(&cache_key);
524            }
525        }
526
527        let collection = collection.to_string();
528        let key = key.clone();
529
530        self.execute_with_retry(move |conn| {
531            let collection = collection.clone();
532            let key = key.clone();
533
534            async move {
535                use amaters_net::convert::{create_version, query_to_proto};
536                use amaters_net::proto::aql::QueryRequest;
537                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
538
539                let mut client = AqlServiceClient::new(conn.channel().clone());
540
541                let query = Query::Delete { collection, key };
542
543                let proto_query = query_to_proto(&query)?;
544
545                let request = tonic::Request::new(QueryRequest {
546                    query: Some(proto_query),
547                    request_id: Some(uuid::Uuid::new_v4().to_string()),
548                    timeout_ms: Some(30000),
549                    transaction_id: None,
550                    version: Some(create_version()),
551                });
552
553                let response = client.execute_query(request).await?;
554
555                // Handle response, check for success
556                match response.into_inner().response {
557                    Some(amaters_net::proto::aql::query_response::Response::Result(_)) => Ok(()),
558                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
559                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
560                    ),
561                    None => Err(SdkError::OperationFailed(
562                        "Empty response from server".to_string(),
563                    )),
564                }
565            }
566        })
567        .await
568    }
569
570    /// Check if a key exists
571    ///
572    /// # Example
573    ///
574    /// ```no_run
575    /// use amaters_sdk_rust::AmateRSClient;
576    /// use amaters_core::Key;
577    ///
578    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
579    /// let key = Key::from_str("user:123");
580    ///
581    /// if client.contains("users", &key).await? {
582    ///     println!("Key exists");
583    /// }
584    /// # Ok(())
585    /// # }
586    /// ```
587    pub async fn contains(&self, collection: &str, key: &Key) -> Result<bool> {
588        debug!("Contains: collection={}, key={}", collection, key);
589
590        // Use get and check if result is Some
591        let result = self.get(collection, key).await?;
592        Ok(result.is_some())
593    }
594
595    /// Execute a query
596    ///
597    /// This is a lower-level method that executes arbitrary queries.
598    ///
599    /// # Example
600    ///
601    /// ```no_run
602    /// use amaters_sdk_rust::AmateRSClient;
603    /// use amaters_core::{Query, Key};
604    ///
605    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
606    /// let query = Query::Get {
607    ///     collection: "users".to_string(),
608    ///     key: Key::from_str("user:123"),
609    /// };
610    ///
611    /// client.execute_query(&query).await?;
612    /// # Ok(())
613    /// # }
614    /// ```
615    pub async fn execute_query(&self, query: &Query) -> Result<QueryResult> {
616        debug!("Executing query: {:?}", query);
617
618        let query = query.clone();
619
620        self.execute_with_retry(move |conn| {
621            let query = query.clone();
622
623            async move {
624                use amaters_net::convert::{
625                    cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
626                };
627                use amaters_net::proto::aql::QueryRequest;
628                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
629
630                let mut client = AqlServiceClient::new(conn.channel().clone());
631
632                let proto_query = query_to_proto(&query)?;
633
634                let request = tonic::Request::new(QueryRequest {
635                    query: Some(proto_query),
636                    request_id: Some(uuid::Uuid::new_v4().to_string()),
637                    timeout_ms: Some(30000),
638                    transaction_id: None,
639                    version: Some(create_version()),
640                });
641
642                let response = client.execute_query(request).await?;
643
644                // Handle response and convert to QueryResult
645                match response.into_inner().response {
646                    Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
647                        use amaters_net::proto::query::query_result::Result as QueryResultEnum;
648                        match result.result {
649                            Some(QueryResultEnum::Single(single)) => {
650                                let value = if let Some(v) = single.value {
651                                    Some(cipher_blob_from_proto(v)?)
652                                } else {
653                                    None
654                                };
655                                Ok(QueryResult::Single(value))
656                            }
657                            Some(QueryResultEnum::Multi(multi)) => {
658                                let mut values = Vec::new();
659                                for kv in multi.values {
660                                    let key = kv.key.ok_or_else(|| {
661                                        SdkError::OperationFailed(
662                                            "Missing key in result".to_string(),
663                                        )
664                                    })?;
665                                    let value = kv.value.ok_or_else(|| {
666                                        SdkError::OperationFailed(
667                                            "Missing value in result".to_string(),
668                                        )
669                                    })?;
670                                    values.push((
671                                        key_from_proto(key),
672                                        cipher_blob_from_proto(value)?,
673                                    ));
674                                }
675                                Ok(QueryResult::Multi(values))
676                            }
677                            Some(QueryResultEnum::Success(success)) => Ok(QueryResult::Success {
678                                affected_rows: success.affected_rows,
679                            }),
680                            None => Err(SdkError::OperationFailed(
681                                "Empty result from server".to_string(),
682                            )),
683                        }
684                    }
685                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
686                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
687                    ),
688                    None => Err(SdkError::OperationFailed(
689                        "Empty response from server".to_string(),
690                    )),
691                }
692            }
693        })
694        .await
695    }
696
697    /// Execute a batch of queries
698    ///
699    /// All queries are executed atomically (all succeed or all fail).
700    pub async fn execute_batch(&self, queries: Vec<Query>) -> Result<Vec<QueryResult>> {
701        debug!("Executing batch of {} queries", queries.len());
702
703        self.execute_with_retry(move |conn| {
704            let queries = queries.clone();
705
706            async move {
707                use amaters_net::convert::{
708                    cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
709                };
710                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
711                use amaters_net::proto::aql::{BatchRequest, IsolationLevel};
712
713                let mut client = AqlServiceClient::new(conn.channel().clone());
714
715                // Convert queries to proto
716                let mut proto_queries = Vec::new();
717                for query in &queries {
718                    proto_queries.push(query_to_proto(query)?);
719                }
720
721                let request = tonic::Request::new(BatchRequest {
722                    queries: proto_queries,
723                    request_id: Some(uuid::Uuid::new_v4().to_string()),
724                    timeout_ms: Some(60000), // 60 seconds for batch
725                    isolation_level: IsolationLevel::IsolationDefault as i32,
726                    version: Some(create_version()),
727                });
728
729                let response = client.execute_batch(request).await?;
730
731                // Handle response and convert results
732                match response.into_inner().response {
733                    Some(amaters_net::proto::aql::batch_response::Response::Results(
734                        batch_result,
735                    )) => {
736                        let mut results = Vec::new();
737                        for result in batch_result.results {
738                            use amaters_net::proto::query::query_result::Result as QueryResultEnum;
739                            let query_result = match result.result {
740                                Some(QueryResultEnum::Single(single)) => {
741                                    let value = if let Some(v) = single.value {
742                                        Some(cipher_blob_from_proto(v)?)
743                                    } else {
744                                        None
745                                    };
746                                    QueryResult::Single(value)
747                                }
748                                Some(QueryResultEnum::Multi(multi)) => {
749                                    let mut values = Vec::new();
750                                    for kv in multi.values {
751                                        let key = kv.key.ok_or_else(|| {
752                                            SdkError::OperationFailed(
753                                                "Missing key in result".to_string(),
754                                            )
755                                        })?;
756                                        let value = kv.value.ok_or_else(|| {
757                                            SdkError::OperationFailed(
758                                                "Missing value in result".to_string(),
759                                            )
760                                        })?;
761                                        values.push((
762                                            key_from_proto(key),
763                                            cipher_blob_from_proto(value)?,
764                                        ));
765                                    }
766                                    QueryResult::Multi(values)
767                                }
768                                Some(QueryResultEnum::Success(success)) => QueryResult::Success {
769                                    affected_rows: success.affected_rows,
770                                },
771                                None => {
772                                    return Err(SdkError::OperationFailed(
773                                        "Empty result from server".to_string(),
774                                    ));
775                                }
776                            };
777                            results.push(query_result);
778                        }
779                        Ok(results)
780                    }
781                    Some(amaters_net::proto::aql::batch_response::Response::Error(e)) => Err(
782                        SdkError::OperationFailed(format!("Batch error: {}", e.message)),
783                    ),
784                    None => Err(SdkError::OperationFailed(
785                        "Empty response from server".to_string(),
786                    )),
787                }
788            }
789        })
790        .await
791    }
792
793    /// Get connection pool statistics
794    pub fn pool_stats(&self) -> crate::connection::PoolStats {
795        self.pool.stats()
796    }
797
798    /// Close all connections
799    pub fn close(&self) {
800        info!("Closing client");
801        self.pool.close_all();
802    }
803
804    /// Health check
805    ///
806    /// Returns `Ok(())` if the server is healthy.
807    pub async fn health_check(&self) -> Result<()> {
808        debug!("Performing health check");
809
810        let result = timeout(
811            self.config.request_timeout,
812            self.execute_with_retry(|conn| async move {
813                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
814                use amaters_net::proto::aql::{HealthCheckRequest, HealthStatus};
815
816                let mut client = AqlServiceClient::new(conn.channel().clone());
817
818                let request = tonic::Request::new(HealthCheckRequest { service: None });
819
820                let response = client.health_check(request).await?;
821                let health_response = response.into_inner();
822
823                if health_response.status == HealthStatus::HealthServing as i32 {
824                    Ok(())
825                } else {
826                    Err(SdkError::OperationFailed(format!(
827                        "Server unhealthy: {:?}",
828                        health_response.message
829                    )))
830                }
831            }),
832        )
833        .await;
834
835        match result {
836            Ok(Ok(())) => {
837                debug!("Health check passed");
838                Ok(())
839            }
840            Ok(Err(e)) => {
841                warn!("Health check failed: {}", e);
842                Err(e)
843            }
844            Err(_) => {
845                warn!("Health check timeout");
846                Err(SdkError::Timeout("health check timeout".to_string()))
847            }
848        }
849    }
850
851    /// Get server information
852    ///
853    /// Returns information about the server including version, capabilities, and uptime.
854    pub async fn server_info(&self) -> Result<ServerInfo> {
855        debug!("Getting server info");
856
857        self.execute_with_retry(|conn| async move {
858            use amaters_net::proto::aql::ServerInfoRequest;
859            use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
860
861            let mut client = AqlServiceClient::new(conn.channel().clone());
862
863            let request = tonic::Request::new(ServerInfoRequest {});
864
865            let response = client.get_server_info(request).await?;
866            let info = response.into_inner();
867
868            Ok(ServerInfo {
869                version: info.version.map(|v| (v.major, v.minor, v.patch)),
870                supported_versions: info
871                    .supported_versions
872                    .into_iter()
873                    .map(|v| (v.major, v.minor, v.patch))
874                    .collect(),
875                capabilities: info.capabilities,
876                uptime_seconds: info.uptime_seconds,
877            })
878        })
879        .await
880    }
881
882    /// Range query - retrieve keys in a range
883    ///
884    /// # Example
885    ///
886    /// ```no_run
887    /// use amaters_sdk_rust::AmateRSClient;
888    /// use amaters_core::Key;
889    ///
890    /// # async fn example(client: AmateRSClient) -> anyhow::Result<()> {
891    /// let start = Key::from_str("user:000");
892    /// let end = Key::from_str("user:999");
893    ///
894    /// let results = client.range("users", &start, &end).await?;
895    /// println!("Found {} keys in range", results.len());
896    /// # Ok(())
897    /// # }
898    /// ```
899    pub async fn range(
900        &self,
901        collection: &str,
902        start: &Key,
903        end: &Key,
904    ) -> Result<Vec<(Key, CipherBlob)>> {
905        debug!(
906            "Range: collection={}, start={}, end={}",
907            collection, start, end
908        );
909
910        let collection = collection.to_string();
911        let start = start.clone();
912        let end = end.clone();
913
914        self.execute_with_retry(move |conn| {
915            let collection = collection.clone();
916            let start = start.clone();
917            let end = end.clone();
918
919            async move {
920                use amaters_net::convert::{
921                    cipher_blob_from_proto, create_version, key_from_proto, query_to_proto,
922                };
923                use amaters_net::proto::aql::QueryRequest;
924                use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
925
926                let mut client = AqlServiceClient::new(conn.channel().clone());
927
928                let query = Query::Range {
929                    collection,
930                    start,
931                    end,
932                };
933
934                let proto_query = query_to_proto(&query)?;
935
936                let request = tonic::Request::new(QueryRequest {
937                    query: Some(proto_query),
938                    request_id: Some(uuid::Uuid::new_v4().to_string()),
939                    timeout_ms: Some(30000),
940                    transaction_id: None,
941                    version: Some(create_version()),
942                });
943
944                let response = client.execute_query(request).await?;
945
946                // Handle response and extract MultiResult
947                match response.into_inner().response {
948                    Some(amaters_net::proto::aql::query_response::Response::Result(result)) => {
949                        use amaters_net::proto::query::query_result::Result as QueryResultEnum;
950                        match result.result {
951                            Some(QueryResultEnum::Multi(multi)) => {
952                                let mut values = Vec::new();
953                                for kv in multi.values {
954                                    let key = kv.key.ok_or_else(|| {
955                                        SdkError::OperationFailed(
956                                            "Missing key in result".to_string(),
957                                        )
958                                    })?;
959                                    let value = kv.value.ok_or_else(|| {
960                                        SdkError::OperationFailed(
961                                            "Missing value in result".to_string(),
962                                        )
963                                    })?;
964                                    values.push((
965                                        key_from_proto(key),
966                                        cipher_blob_from_proto(value)?,
967                                    ));
968                                }
969                                Ok(values)
970                            }
971                            _ => Err(SdkError::OperationFailed(
972                                "Expected multi result for range query".to_string(),
973                            )),
974                        }
975                    }
976                    Some(amaters_net::proto::aql::query_response::Response::Error(e)) => Err(
977                        SdkError::OperationFailed(format!("Server error: {}", e.message)),
978                    ),
979                    None => Err(SdkError::OperationFailed(
980                        "Empty response from server".to_string(),
981                    )),
982                }
983            }
984        })
985        .await
986    }
987
988    // -----------------------------------------------------------------------
989    // Paginated range queries
990    // -----------------------------------------------------------------------
991
992    /// Range query with simple pagination.
993    ///
994    /// Returns the first `page_size` items in the range `[start, end)`. Use
995    /// the returned cursor to fetch subsequent pages via [`Self::range_with_cursor`].
996    pub async fn range_paginated(
997        &self,
998        collection: &str,
999        start: &Key,
1000        end: &Key,
1001        page_size: usize,
1002    ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1003        let pagination = PaginationConfig::new(page_size);
1004        self.range_with_cursor(collection, start, end, &pagination)
1005            .await
1006    }
1007
1008    /// Range query with full cursor-based pagination.
1009    ///
1010    /// If `pagination.cursor` is `Some`, the scan resumes from the key
1011    /// encoded in the cursor (exclusive). Otherwise it starts from `start`.
1012    pub async fn range_with_cursor(
1013        &self,
1014        collection: &str,
1015        start: &Key,
1016        end: &Key,
1017        pagination: &PaginationConfig,
1018    ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1019        let effective_start = if let Some(ref cursor_str) = pagination.cursor {
1020            decode_cursor(cursor_str)?
1021        } else {
1022            start.clone()
1023        };
1024
1025        // Fetch page_size + 1 items so we can detect "has_more"
1026        let all = self.range(collection, &effective_start, end).await?;
1027
1028        // If we resumed from a cursor the first item may be the cursor key
1029        // itself (inclusive range). Skip it so the page is exclusive of the
1030        // previous last item.
1031        let base_iter: Box<dyn Iterator<Item = (Key, CipherBlob)>> =
1032            if pagination.cursor.is_some() && !all.is_empty() && all[0].0 == effective_start {
1033                Box::new(all.into_iter().skip(1))
1034            } else {
1035                Box::new(all.into_iter())
1036            };
1037
1038        // Apply offset: skip `pagination.offset` additional items.
1039        let after_offset: Vec<(Key, CipherBlob)> = base_iter.skip(pagination.offset).collect();
1040
1041        let has_more = after_offset.len() > pagination.page_size;
1042        let items: Vec<(Key, CipherBlob)> = after_offset
1043            .into_iter()
1044            .take(pagination.page_size)
1045            .collect();
1046
1047        let next_cursor = if has_more {
1048            items.last().map(|(k, _)| encode_cursor(k))
1049        } else {
1050            None
1051        };
1052
1053        Ok(PaginatedResult {
1054            items,
1055            next_cursor,
1056            has_more,
1057            total_hint: None,
1058        })
1059    }
1060
1061    // -----------------------------------------------------------------------
1062    // Sorted range queries
1063    // -----------------------------------------------------------------------
1064
1065    /// Range query with results sorted according to `sort`.
1066    ///
1067    /// The full range is fetched from the server and then sorted client-side.
1068    pub async fn range_sorted(
1069        &self,
1070        collection: &str,
1071        start: &Key,
1072        end: &Key,
1073        sort: &SortConfig,
1074    ) -> Result<Vec<(Key, CipherBlob)>> {
1075        let mut results = self.range(collection, start, end).await?;
1076        sort_results(&mut results, sort);
1077        Ok(results)
1078    }
1079
1080    // -----------------------------------------------------------------------
1081    // Prefix scan
1082    // -----------------------------------------------------------------------
1083
1084    /// Scan keys with a given prefix, returning paginated results.
1085    ///
1086    /// This constructs a range `[prefix, prefix_end)` where `prefix_end` is
1087    /// the lexicographic successor of `prefix`, then delegates to
1088    /// [`Self::range_with_cursor`].
1089    pub async fn scan(
1090        &self,
1091        collection: &str,
1092        prefix: &Key,
1093        pagination: &PaginationConfig,
1094    ) -> Result<PaginatedResult<(Key, CipherBlob)>> {
1095        let start = prefix.clone();
1096        let end = prefix_end_key(prefix);
1097        self.range_with_cursor(collection, &start, &end, pagination)
1098            .await
1099    }
1100
1101    // -----------------------------------------------------------------------
1102    // Count
1103    // -----------------------------------------------------------------------
1104
1105    /// Count the number of keys in a collection.
1106    ///
1107    /// This performs a full range scan and counts the results. For very large
1108    /// collections a server-side count would be more efficient, but this
1109    /// provides a correct answer using the existing API surface.
1110    pub async fn count(&self, collection: &str) -> Result<usize> {
1111        debug!("Count: collection={}", collection);
1112
1113        // Use a full range scan (min key → max key)
1114        let start = Key::from_slice(&[0u8]);
1115        let end = Key::from_slice(&[0xFF; 32]);
1116        let results = self.range(collection, &start, &end).await?;
1117        Ok(results.len())
1118    }
1119
1120    // -----------------------------------------------------------------------
1121    // Offline constructor (test / stub use only)
1122    // -----------------------------------------------------------------------
1123
1124    /// Create a client that is **not** connected to any server.
1125    ///
1126    /// This constructor skips the connection-pool probe so it can be used in
1127    /// unit tests and other contexts where no live server is available.
1128    /// Operations that require a server connection (e.g. `get`, `set`, `stream_query`) will
1129    /// fail with a `SdkError::Connection` error if called on this client.
1130    ///
1131    /// # Note
1132    ///
1133    /// This method is intended for **testing and development** only.
1134    #[doc(hidden)]
1135    pub fn new_offline(config: ClientConfig) -> Self {
1136        Self {
1137            pool: Arc::new(ConnectionPool::new(config.clone())),
1138            config: Arc::new(config),
1139            encryptor: None,
1140            cache: None,
1141        }
1142    }
1143
1144    // -----------------------------------------------------------------------
1145    // Streaming API
1146    // -----------------------------------------------------------------------
1147
1148    /// Stream query results row by row using a real gRPC server-streaming RPC.
1149    ///
1150    /// Returns a [`QueryStream`] that implements [`futures::Stream`].  The
1151    /// stream is backed by a bounded mpsc channel (capacity =
1152    /// `config.buffer_size`) so the producer is automatically throttled when
1153    /// the consumer is slow.  Dropping the returned stream cancels the
1154    /// background task.
1155    ///
1156    /// The server currently supports `Range` and `Get` queries for streaming.
1157    /// Other query variants will be rejected by the server with a gRPC error
1158    /// that is forwarded to the stream as `Err(SdkError::Grpc(...))`.
1159    ///
1160    /// Returns `Err(SdkError::Connection(...))` if there is no live server
1161    /// connection available.
1162    pub async fn stream_query(&self, query: Query, config: StreamConfig) -> Result<QueryStream> {
1163        debug!("stream_query: query={:?}", query);
1164
1165        // Acquire a connection from the pool.  If the pool has no live
1166        // connections (e.g. offline client) this returns an error immediately.
1167        let conn = self.pool.get().await?;
1168
1169        let timeout_secs = config.timeout_secs;
1170        let (query_stream, sender) = QueryStream::new(&config);
1171        let cancel_token = sender.cancel_token();
1172
1173        // Build the proto request from the core Query.
1174        let proto_query = {
1175            use amaters_net::convert::query_to_proto;
1176            query_to_proto(&query)?
1177        };
1178
1179        let request = tonic::Request::new(amaters_net::proto::aql::QueryRequest {
1180            query: Some(proto_query),
1181            request_id: Some(uuid::Uuid::new_v4().to_string()),
1182            timeout_ms: timeout_secs.map(|s| (s * 1000) as u32),
1183            transaction_id: None,
1184            version: Some(amaters_net::convert::create_version()),
1185        });
1186
1187        // Start the server-streaming RPC.
1188        let mut grpc_client = {
1189            use amaters_net::proto::aql::aql_service_client::AqlServiceClient;
1190            AqlServiceClient::new(conn.channel().clone())
1191        };
1192
1193        let response_stream = grpc_client
1194            .execute_stream(request)
1195            .await
1196            .map_err(SdkError::Grpc)?
1197            .into_inner();
1198
1199        // Spawn background task: pump the gRPC stream into the mpsc channel.
1200        tokio::spawn(async move {
1201            // Hold the connection alive for the duration of the stream.
1202            let _conn = conn;
1203
1204            let mut pinned = std::pin::pin!(response_stream);
1205
1206            loop {
1207                // Wait for the next chunk while honouring cancellation.
1208                let item = tokio::select! {
1209                    biased;
1210                    _ = cancel_token.cancelled() => {
1211                        debug!("stream_query: cancelled by consumer");
1212                        break;
1213                    }
1214                    item = pinned.next() => item,
1215                };
1216
1217                match item {
1218                    // gRPC stream closed cleanly.
1219                    None => break,
1220
1221                    // gRPC transport or server error.
1222                    Some(Err(status)) => {
1223                        let _ = sender.send_error(SdkError::Grpc(status)).await;
1224                        break;
1225                    }
1226
1227                    // A StreamResponse message.
1228                    Some(Ok(response)) => {
1229                        use amaters_net::proto::aql::stream_response::Chunk;
1230
1231                        match response.chunk {
1232                            // Batched key-value pairs — what the server actually emits today.
1233                            Some(Chunk::Batch(batch)) => {
1234                                for kv in batch.values {
1235                                    if sender.is_cancelled() {
1236                                        return;
1237                                    }
1238                                    let key = kv.key.map(|k| k.data).unwrap_or_default();
1239                                    let value = kv.value.map(|v| v.data).unwrap_or_default();
1240                                    let row = Row::new(key, value);
1241                                    if !sender.send_row(row).await {
1242                                        return;
1243                                    }
1244                                }
1245                            }
1246
1247                            // Legacy single-item message.
1248                            Some(Chunk::Value(kv)) => {
1249                                if sender.is_cancelled() {
1250                                    return;
1251                                }
1252                                let key = kv.key.map(|k| k.data).unwrap_or_default();
1253                                let value = kv.value.map(|v| v.data).unwrap_or_default();
1254                                let row = Row::new(key, value);
1255                                if !sender.send_row(row).await {
1256                                    return;
1257                                }
1258                            }
1259
1260                            // End-of-stream marker — close cleanly.
1261                            Some(Chunk::End(_)) => break,
1262
1263                            // Server-side error embedded in the stream.
1264                            Some(Chunk::Error(e)) => {
1265                                let _ = sender
1266                                    .send_error(SdkError::OperationFailed(e.message))
1267                                    .await;
1268                                break;
1269                            }
1270
1271                            // Malformed/empty chunk — skip.
1272                            None => {}
1273                        }
1274                    }
1275                }
1276            }
1277            // Dropping `sender` closes the channel, which ends the QueryStream.
1278        });
1279
1280        Ok(query_stream)
1281    }
1282}
1283
1284// ---------------------------------------------------------------------------
1285// Free-standing helpers
1286// ---------------------------------------------------------------------------
1287
1288/// Sort a mutable slice of (Key, CipherBlob) pairs in-place according to `sort`.
1289fn sort_results(results: &mut [(Key, CipherBlob)], sort: &SortConfig) {
1290    match (sort.field, sort.order) {
1291        (SortField::Key, SortOrder::Ascending) => {
1292            results.sort_by(|a, b| a.0.cmp(&b.0));
1293        }
1294        (SortField::Key, SortOrder::Descending) => {
1295            results.sort_by(|a, b| b.0.cmp(&a.0));
1296        }
1297        (SortField::Value, SortOrder::Ascending) => {
1298            results.sort_by(|a, b| a.1.as_bytes().cmp(b.1.as_bytes()));
1299        }
1300        (SortField::Value, SortOrder::Descending) => {
1301            results.sort_by(|a, b| b.1.as_bytes().cmp(a.1.as_bytes()));
1302        }
1303        // Timestamp is approximated by key ordering (insertion-order proxy)
1304        (SortField::Timestamp, SortOrder::Ascending) => {
1305            results.sort_by(|a, b| a.0.cmp(&b.0));
1306        }
1307        (SortField::Timestamp, SortOrder::Descending) => {
1308            results.sort_by(|a, b| b.0.cmp(&a.0));
1309        }
1310    }
1311}
1312
1313/// Compute the lexicographic successor of `prefix` to define an exclusive
1314/// upper bound for prefix scans.
1315///
1316/// For example `"user:"` → `"user;"` (`:` + 1 = `;` in ASCII).
1317/// If the prefix is all `0xFF` bytes, returns a key with an extra `0xFF`
1318/// byte appended so the range is always valid.
1319fn prefix_end_key(prefix: &Key) -> Key {
1320    let mut bytes = prefix.to_vec();
1321    // Walk from the last byte backwards, incrementing the first byte < 0xFF.
1322    while let Some(last) = bytes.last_mut() {
1323        if *last < 0xFF {
1324            *last += 1;
1325            return Key::from_slice(&bytes);
1326        }
1327        bytes.pop();
1328    }
1329    // All bytes were 0xFF — extend with one more 0xFF.
1330    let mut extended = prefix.to_vec();
1331    extended.push(0xFF);
1332    Key::from_slice(&extended)
1333}
1334
1335/// Server information
1336#[derive(Debug, Clone)]
1337pub struct ServerInfo {
1338    /// Server version (major, minor, patch)
1339    pub version: Option<(u32, u32, u32)>,
1340    /// Supported protocol versions
1341    pub supported_versions: Vec<(u32, u32, u32)>,
1342    /// Server capabilities
1343    pub capabilities: Vec<String>,
1344    /// Server uptime in seconds
1345    pub uptime_seconds: u64,
1346}
1347
1348// ---------------------------------------------------------------------------
1349// PaginatedQueryBuilder
1350// ---------------------------------------------------------------------------
1351
1352/// Builder for constructing paginated and sorted queries.
1353///
1354/// Provides a fluent API on top of the core `QueryBuilder` with pagination
1355/// and sorting options.
1356#[derive(Debug, Clone)]
1357pub struct PaginatedQueryBuilder {
1358    collection: String,
1359    page_size: Option<usize>,
1360    cursor: Option<String>,
1361    sort: Option<SortConfig>,
1362    /// Alias for `page_size` — the maximum number of results to return.
1363    limit: Option<usize>,
1364    /// Number of items to skip (applied after cursor resume).
1365    offset: Option<usize>,
1366}
1367
1368impl PaginatedQueryBuilder {
1369    /// Create a new paginated query builder for a collection.
1370    pub fn new(collection: impl Into<String>) -> Self {
1371        Self {
1372            collection: collection.into(),
1373            page_size: None,
1374            cursor: None,
1375            sort: None,
1376            limit: None,
1377            offset: None,
1378        }
1379    }
1380
1381    /// Set the page size for pagination.
1382    #[must_use]
1383    pub fn page_size(mut self, size: usize) -> Self {
1384        self.page_size = Some(size);
1385        self
1386    }
1387
1388    /// Set the maximum number of results to return (alias for `page_size`).
1389    #[must_use]
1390    pub fn limit(mut self, n: usize) -> Self {
1391        self.limit = Some(n);
1392        self
1393    }
1394
1395    /// Set the number of items to skip (applied after any cursor resume).
1396    #[must_use]
1397    pub fn offset(mut self, n: usize) -> Self {
1398        self.offset = Some(n);
1399        self
1400    }
1401
1402    /// Set the cursor to resume pagination from.
1403    #[must_use]
1404    pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
1405        self.cursor = Some(cursor.into());
1406        self
1407    }
1408
1409    /// Set the sort order for results.
1410    #[must_use]
1411    pub fn sort_by(mut self, field: SortField, order: SortOrder) -> Self {
1412        self.sort = Some(SortConfig::new(field, order));
1413        self
1414    }
1415
1416    /// Build a `PaginationConfig` from the current builder state.
1417    ///
1418    /// `limit` takes precedence over `page_size` when both are set.
1419    pub fn build_paginated(&self) -> PaginationConfig {
1420        let page_size = self.limit.or(self.page_size).unwrap_or(100);
1421        PaginationConfig {
1422            page_size,
1423            cursor: self.cursor.clone(),
1424            offset: self.offset.unwrap_or(0),
1425        }
1426    }
1427
1428    /// Get the collection name.
1429    pub fn collection(&self) -> &str {
1430        &self.collection
1431    }
1432
1433    /// Get the sort configuration, if set.
1434    pub fn sort_config(&self) -> Option<&SortConfig> {
1435        self.sort.as_ref()
1436    }
1437}
1438
1439/// Query execution result
1440#[derive(Debug, Clone)]
1441pub enum QueryResult {
1442    /// Single value result
1443    Single(Option<CipherBlob>),
1444    /// Multiple values result
1445    Multi(Vec<(Key, CipherBlob)>),
1446    /// Success result (no data)
1447    Success { affected_rows: u64 },
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452    use super::*;
1453
1454    // --- existing tests ----------------------------------------------------
1455
1456    #[tokio::test]
1457    async fn test_retry_config() {
1458        let config = RetryConfig::default();
1459        assert_eq!(config.max_retries, 3);
1460
1461        let backoff = config.backoff_duration(1);
1462        assert!(backoff.as_millis() > 0);
1463    }
1464
1465    #[test]
1466    fn test_query_result() {
1467        let result = QueryResult::Success { affected_rows: 5 };
1468        match result {
1469            QueryResult::Success { affected_rows } => {
1470                assert_eq!(affected_rows, 5);
1471            }
1472            _ => panic!("expected Success"),
1473        }
1474    }
1475
1476    // --- pagination config tests -------------------------------------------
1477
1478    #[test]
1479    fn test_pagination_config_default() {
1480        let config = PaginationConfig::default();
1481        assert_eq!(config.page_size, 100);
1482        assert!(config.cursor.is_none());
1483    }
1484
1485    #[test]
1486    fn test_pagination_config_with_cursor() {
1487        let config = PaginationConfig::new(50).with_cursor("abc123");
1488        assert_eq!(config.page_size, 50);
1489        assert_eq!(config.cursor.as_deref(), Some("abc123"));
1490    }
1491
1492    // --- paginated result tests --------------------------------------------
1493
1494    #[test]
1495    fn test_paginated_result_has_more() {
1496        // Items less than page_size means no more pages
1497        let result: PaginatedResult<u8> = PaginatedResult {
1498            items: vec![1, 2, 3],
1499            next_cursor: None,
1500            has_more: false,
1501            total_hint: None,
1502        };
1503        assert!(!result.has_more);
1504        assert!(result.next_cursor.is_none());
1505        assert_eq!(result.items.len(), 3);
1506    }
1507
1508    #[test]
1509    fn test_paginated_result_with_more() {
1510        let result: PaginatedResult<u8> = PaginatedResult {
1511            items: vec![1, 2, 3],
1512            next_cursor: Some("cursor_xyz".to_string()),
1513            has_more: true,
1514            total_hint: Some(100),
1515        };
1516        assert!(result.has_more);
1517        assert_eq!(result.next_cursor.as_deref(), Some("cursor_xyz"));
1518        assert_eq!(result.total_hint, Some(100));
1519    }
1520
1521    // --- sort config tests -------------------------------------------------
1522
1523    #[test]
1524    fn test_sort_ascending() {
1525        let mut data = vec![
1526            (Key::from_str("c"), CipherBlob::new(vec![3])),
1527            (Key::from_str("a"), CipherBlob::new(vec![1])),
1528            (Key::from_str("b"), CipherBlob::new(vec![2])),
1529        ];
1530        let sort = SortConfig::new(SortField::Key, SortOrder::Ascending);
1531        sort_results(&mut data, &sort);
1532
1533        assert_eq!(data[0].0.to_string_lossy(), "a");
1534        assert_eq!(data[1].0.to_string_lossy(), "b");
1535        assert_eq!(data[2].0.to_string_lossy(), "c");
1536    }
1537
1538    #[test]
1539    fn test_sort_descending() {
1540        let mut data = vec![
1541            (Key::from_str("a"), CipherBlob::new(vec![1])),
1542            (Key::from_str("c"), CipherBlob::new(vec![3])),
1543            (Key::from_str("b"), CipherBlob::new(vec![2])),
1544        ];
1545        let sort = SortConfig::new(SortField::Key, SortOrder::Descending);
1546        sort_results(&mut data, &sort);
1547
1548        assert_eq!(data[0].0.to_string_lossy(), "c");
1549        assert_eq!(data[1].0.to_string_lossy(), "b");
1550        assert_eq!(data[2].0.to_string_lossy(), "a");
1551    }
1552
1553    #[test]
1554    fn test_sort_by_value_ascending() {
1555        let mut data = vec![
1556            (Key::from_str("x"), CipherBlob::new(vec![30])),
1557            (Key::from_str("y"), CipherBlob::new(vec![10])),
1558            (Key::from_str("z"), CipherBlob::new(vec![20])),
1559        ];
1560        let sort = SortConfig::new(SortField::Value, SortOrder::Ascending);
1561        sort_results(&mut data, &sort);
1562
1563        assert_eq!(data[0].1.to_vec(), vec![10]);
1564        assert_eq!(data[1].1.to_vec(), vec![20]);
1565        assert_eq!(data[2].1.to_vec(), vec![30]);
1566    }
1567
1568    // --- cursor encoding / decoding tests ----------------------------------
1569
1570    #[test]
1571    fn test_cursor_encoding() {
1572        let key = Key::from_str("user:999");
1573        let cursor = encode_cursor(&key);
1574
1575        // Should be non-empty and contain the separator
1576        assert!(!cursor.is_empty());
1577        assert!(cursor.contains('|'));
1578
1579        // Decode and verify round-trip
1580        let decoded = decode_cursor(&cursor).expect("decode should succeed");
1581        assert_eq!(decoded, key);
1582    }
1583
1584    #[test]
1585    fn test_cursor_encoding_binary_key() {
1586        let key = Key::from_slice(&[0x00, 0xFF, 0xAB, 0xCD]);
1587        let cursor = encode_cursor(&key);
1588        let decoded = decode_cursor(&cursor).expect("decode should succeed");
1589        assert_eq!(decoded, key);
1590    }
1591
1592    #[test]
1593    fn test_cursor_integrity() {
1594        let key = Key::from_str("original_key");
1595        let cursor = encode_cursor(&key);
1596
1597        // Tamper with the cursor by flipping a character in the key portion
1598        let mut tampered = cursor.clone();
1599        let bytes = unsafe { tampered.as_bytes_mut() };
1600        if !bytes.is_empty() {
1601            bytes[0] ^= 0x01; // flip a bit
1602        }
1603
1604        let result = decode_cursor(&tampered);
1605        assert!(result.is_err(), "tampered cursor should be rejected");
1606    }
1607
1608    #[test]
1609    fn test_cursor_malformed_no_separator() {
1610        let result = decode_cursor("noseparatorhere");
1611        assert!(result.is_err());
1612    }
1613
1614    #[test]
1615    fn test_cursor_malformed_empty() {
1616        let result = decode_cursor("|");
1617        // Both parts are empty — hash check will fail
1618        assert!(result.is_err());
1619    }
1620
1621    // --- prefix end key tests ----------------------------------------------
1622
1623    #[test]
1624    fn test_prefix_end_key() {
1625        let prefix = Key::from_str("user:");
1626        let end = prefix_end_key(&prefix);
1627        // "user:" → "user;" because ':' + 1 = ';'
1628        assert_eq!(end.to_string_lossy(), "user;");
1629    }
1630
1631    #[test]
1632    fn test_prefix_end_key_all_ff() {
1633        let prefix = Key::from_slice(&[0xFF, 0xFF]);
1634        let end = prefix_end_key(&prefix);
1635        // Should extend with one more 0xFF
1636        assert_eq!(end.to_vec(), vec![0xFF, 0xFF, 0xFF]);
1637    }
1638
1639    // --- scan prefix helper ------------------------------------------------
1640
1641    #[test]
1642    fn test_scan_with_prefix_key_generation() {
1643        // Verify that prefix_end_key produces a correct exclusive upper bound
1644        let prefix = Key::from_str("item:");
1645        let end = prefix_end_key(&prefix);
1646
1647        // Keys within the prefix should be < end
1648        let within = Key::from_str("item:abc");
1649        assert!(within < end);
1650
1651        // Keys outside the prefix should be >= end
1652        let outside = Key::from_str("item;abc");
1653        assert!(outside >= end);
1654    }
1655
1656    // --- query builder pagination tests ------------------------------------
1657
1658    #[test]
1659    fn test_query_builder_pagination() {
1660        let builder = PaginatedQueryBuilder::new("users")
1661            .page_size(25)
1662            .cursor("some_cursor");
1663
1664        let config = builder.build_paginated();
1665        assert_eq!(config.page_size, 25);
1666        assert_eq!(config.cursor.as_deref(), Some("some_cursor"));
1667        assert_eq!(builder.collection(), "users");
1668    }
1669
1670    #[test]
1671    fn test_query_builder_pagination_defaults() {
1672        let builder = PaginatedQueryBuilder::new("events");
1673        let config = builder.build_paginated();
1674        assert_eq!(config.page_size, 100);
1675        assert!(config.cursor.is_none());
1676    }
1677
1678    #[test]
1679    fn test_query_builder_sorting() {
1680        let builder = PaginatedQueryBuilder::new("logs")
1681            .sort_by(SortField::Timestamp, SortOrder::Descending)
1682            .page_size(50);
1683
1684        let sort = builder.sort_config().expect("sort should be set");
1685        assert_eq!(sort.field, SortField::Timestamp);
1686        assert_eq!(sort.order, SortOrder::Descending);
1687    }
1688
1689    #[test]
1690    fn test_query_builder_no_sorting() {
1691        let builder = PaginatedQueryBuilder::new("data");
1692        assert!(builder.sort_config().is_none());
1693    }
1694
1695    // --- hex encode/decode round-trip --------------------------------------
1696
1697    #[test]
1698    fn test_hex_encode_decode() {
1699        let original = vec![0x00, 0x0A, 0xFF, 0x42, 0x99];
1700        let encoded = hex_encode(&original);
1701        assert_eq!(encoded, "000aff4299");
1702        let decoded = hex_decode(&encoded).expect("decode should succeed");
1703        assert_eq!(decoded, original);
1704    }
1705
1706    #[test]
1707    fn test_hex_decode_odd_length() {
1708        let result = hex_decode("abc");
1709        assert!(result.is_err());
1710    }
1711
1712    #[test]
1713    fn test_hex_decode_invalid_chars() {
1714        let result = hex_decode("zzzz");
1715        assert!(result.is_err());
1716    }
1717
1718    // --- sort with timestamp field -----------------------------------------
1719
1720    #[test]
1721    fn test_sort_by_timestamp_ascending() {
1722        let mut data = vec![
1723            (Key::from_str("ts:003"), CipherBlob::new(vec![3])),
1724            (Key::from_str("ts:001"), CipherBlob::new(vec![1])),
1725            (Key::from_str("ts:002"), CipherBlob::new(vec![2])),
1726        ];
1727        let sort = SortConfig::new(SortField::Timestamp, SortOrder::Ascending);
1728        sort_results(&mut data, &sort);
1729
1730        assert_eq!(data[0].0.to_string_lossy(), "ts:001");
1731        assert_eq!(data[1].0.to_string_lossy(), "ts:002");
1732        assert_eq!(data[2].0.to_string_lossy(), "ts:003");
1733    }
1734}