Skip to main content

reddb_client/
lib.rs

1//! Official Rust client for [RedDB](https://github.com/reddb-io/reddb).
2//!
3//! One connection-string API. Pick your backend at runtime:
4//!
5//! ```no_run
6//! use reddb_client::{Reddb, JsonValue};
7//!
8//! # async fn run() -> reddb_client::Result<()> {
9//! // Embedded: opens the engine in-process, no network.
10//! let db = Reddb::connect("memory://").await?;
11//! db.insert("users", &JsonValue::object([("name", JsonValue::string("Alice"))])).await?;
12//! let result = db.query("SELECT * FROM users").await?;
13//! println!("{} rows", result.rows.len());
14//! db.close().await?;
15//! # Ok(())
16//! # }
17//! ```
18//!
19//! Accepted URIs:
20//!
21//! | URI                       | Backend                              | Status |
22//! |---------------------------|--------------------------------------|--------|
23//! | `memory://`               | Ephemeral in-memory                  | ✅    |
24//! | `file:///abs/path`        | Embedded engine on disk              | ✅    |
25//! | `grpc://host:port`        | Remote tonic client                  | ✅    |
26//! | `red://host:port`         | Remote tonic client (default port 5050) | ✅    |
27//! | `http://host:port`        | REST client                          | ✅    |
28//!
29//! ## Cargo features
30//!
31//! - `embedded` (default) — pulls the entire RedDB engine in-process.
32//! - `grpc` — opt-in remote client over tonic. Pulls the engine for
33//!   its `RedDBClient` type today; a thin proto-only client is tracked
34//!   in `PLAN_DRIVERS.md`.
35//! - `http` — REST client.
36//! - `redwire` — RedWire native TCP client (no engine dep).
37//!
38//! ## Internal connector
39//!
40//! The crate also hosts the gRPC connector + REPL used by the
41//! `red` and `red_client` binaries via the [`connector`] module.
42//! That layer is intentionally lighter than the published [`Reddb`]
43//! API: it speaks tonic + ureq + serde_json only and never pulls
44//! the engine in. It is exposed at the crate root as
45//! [`RedDBClient`] and [`repl`] for back-compat with the previous
46//! `reddb-client-internal` crate.
47
48#![deny(unsafe_code)]
49#![warn(missing_debug_implementations)]
50
51pub mod connect;
52pub mod connector;
53pub mod error;
54pub mod params;
55pub mod topology;
56pub mod types;
57
58#[cfg(feature = "embedded")]
59pub mod embedded;
60
61#[cfg(feature = "grpc")]
62pub mod grpc;
63
64#[cfg(feature = "grpc")]
65pub mod router;
66
67#[cfg(feature = "redwire")]
68pub mod redwire;
69
70#[cfg(feature = "http")]
71pub mod http;
72
73pub use error::{ClientError, ErrorCode, Result};
74pub use params::{IntoParams, IntoValue, Value};
75pub use types::{
76    BulkInsertResult, DeleteResult, DocumentItem, ExistsResult, InsertResult, JsonValue, KvItem,
77    KvWatchEvent, ListOptions, ListResult, QueryResult, Row, ValueOut,
78};
79
80// Back-compat re-exports for the previous `reddb-client-internal`
81// crate. Workspace consumers (`reddb-server::rpc_stdio`, the `red`
82// bin's REPL launcher, the `red_client` bin) import these paths
83// directly.
84pub use connector::{
85    repl, BulkCreateStatus, CreatedEntity, HealthStatus, OperationStatus, QueryResponse,
86    RedDBClient,
87};
88
89use connect::Target;
90
91/// Top-level client handle. Use [`Reddb::connect`] to get one.
92#[derive(Debug)]
93pub enum Reddb {
94    #[cfg(feature = "embedded")]
95    Embedded(embedded::EmbeddedClient),
96    #[cfg(feature = "grpc")]
97    Grpc(grpc::GrpcClient),
98    #[cfg(feature = "http")]
99    Http(http::HttpClient),
100    /// Constructed when a feature gate would have produced a real
101    /// variant but the feature is disabled. Every method on this
102    /// variant returns a `FEATURE_DISABLED` error so build-time
103    /// configuration bugs surface as runtime errors with a clear
104    /// remediation, not as missing trait impls.
105    Unavailable(&'static str),
106}
107
108impl Reddb {
109    /// Open a connection. The backend is selected from the URI scheme.
110    pub async fn connect(uri: &str) -> Result<Self> {
111        let target = connect::parse(uri)?;
112        match target {
113            Target::Memory => {
114                #[cfg(feature = "embedded")]
115                {
116                    embedded::EmbeddedClient::in_memory().map(Reddb::Embedded)
117                }
118                #[cfg(not(feature = "embedded"))]
119                {
120                    Err(ClientError::feature_disabled("embedded"))
121                }
122            }
123            Target::File { path } => {
124                #[cfg(feature = "embedded")]
125                {
126                    embedded::EmbeddedClient::open(path).map(Reddb::Embedded)
127                }
128                #[cfg(not(feature = "embedded"))]
129                {
130                    let _ = path;
131                    Err(ClientError::feature_disabled("embedded"))
132                }
133            }
134            Target::Grpc { endpoint } => {
135                #[cfg(feature = "grpc")]
136                {
137                    grpc::GrpcClient::connect(endpoint).await.map(Reddb::Grpc)
138                }
139                #[cfg(not(feature = "grpc"))]
140                {
141                    let _ = endpoint;
142                    Err(ClientError::feature_disabled("grpc"))
143                }
144            }
145            Target::GrpcCluster {
146                primary,
147                replicas,
148                force_primary,
149            } => {
150                #[cfg(feature = "grpc")]
151                {
152                    grpc::GrpcClient::connect_cluster(primary, replicas, force_primary)
153                        .await
154                        .map(Reddb::Grpc)
155                }
156                #[cfg(not(feature = "grpc"))]
157                {
158                    let _ = (primary, replicas, force_primary);
159                    Err(ClientError::feature_disabled("grpc"))
160                }
161            }
162            Target::Http { base_url } => {
163                #[cfg(feature = "http")]
164                {
165                    http::HttpClient::connect(http::HttpOptions::new(base_url))
166                        .await
167                        .map(Reddb::Http)
168                }
169                #[cfg(not(feature = "http"))]
170                {
171                    let _ = base_url;
172                    Err(ClientError::feature_disabled("http"))
173                }
174            }
175        }
176    }
177
178    pub async fn query(&self, sql: &str) -> Result<QueryResult> {
179        match self {
180            #[cfg(feature = "embedded")]
181            Reddb::Embedded(c) => c.query(sql),
182            #[cfg(feature = "grpc")]
183            Reddb::Grpc(c) => c.query(sql).await,
184            #[cfg(feature = "http")]
185            Reddb::Http(c) => c.query(sql).await,
186            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
187        }
188    }
189
190    /// Parameterized query — `$N` placeholders in `sql` are bound to
191    /// `params[N-1]`. Empty params is equivalent to [`Self::query`].
192    ///
193    /// Native type mapping (driver-side, [`IntoValue`]):
194    ///
195    /// | Rust                    | Engine `Value` variant |
196    /// |-------------------------|------------------------|
197    /// | `i8..i64` / `u8..u32`   | `Integer` (i64)        |
198    /// | `bool`                  | `Boolean`              |
199    /// | `f32` / `f64`           | `Float` (f64)          |
200    /// | `&str` / `String`       | `Text`                 |
201    /// | `Vec<u8>` / `&[u8]`     | `Blob`                 |
202    /// | `Vec<f32>` / `&[f32]`   | `Vector`               |
203    /// | `Option<T>`             | `Null` when `None`     |
204    /// | `serde_json::Value`     | `Json`                 |
205    /// | [`Value::Timestamp`]    | `Timestamp` (seconds)  |
206    /// | [`Value::Uuid`]         | `Uuid` (16 raw bytes)  |
207    ///
208    /// Today the [`Reddb::Embedded`], [`Reddb::Grpc`], and [`Reddb::Http`]
209    /// transports carry parameters end-to-end.
210    pub async fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
211        let values = params.into_params();
212        match self {
213            #[cfg(feature = "embedded")]
214            Reddb::Embedded(c) => c.query_with(sql, values),
215            #[cfg(feature = "grpc")]
216            Reddb::Grpc(c) => c.query_with(sql, &values).await,
217            #[cfg(feature = "http")]
218            Reddb::Http(c) => c.query_with(sql, &values).await,
219            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
220        }
221    }
222
223    /// Parameterized execution for DML statements. This is an alias for
224    /// [`Self::query_with`] because RedDB returns one query result envelope for
225    /// SELECT and DML.
226    pub async fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
227        self.query_with(sql, params).await
228    }
229
230    pub async fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
231        match self {
232            #[cfg(feature = "embedded")]
233            Reddb::Embedded(c) => c.insert(collection, payload),
234            #[cfg(feature = "grpc")]
235            Reddb::Grpc(c) => c.insert(collection, payload).await,
236            #[cfg(feature = "http")]
237            Reddb::Http(c) => c.insert(collection, payload).await,
238            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
239        }
240    }
241
242    pub async fn bulk_insert(
243        &self,
244        collection: &str,
245        payloads: &[JsonValue],
246    ) -> Result<BulkInsertResult> {
247        match self {
248            #[cfg(feature = "embedded")]
249            Reddb::Embedded(c) => c.bulk_insert(collection, payloads),
250            #[cfg(feature = "grpc")]
251            Reddb::Grpc(c) => c.bulk_insert(collection, payloads).await,
252            #[cfg(feature = "http")]
253            Reddb::Http(c) => c.bulk_insert(collection, payloads).await,
254            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
255        }
256    }
257
258    pub async fn delete(&self, collection: &str, rid: &str) -> Result<u64> {
259        match self {
260            #[cfg(feature = "embedded")]
261            Reddb::Embedded(c) => c.delete(collection, rid),
262            #[cfg(feature = "grpc")]
263            Reddb::Grpc(c) => c.delete(collection, rid).await,
264            #[cfg(feature = "http")]
265            Reddb::Http(c) => c.delete(collection, rid).await,
266            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
267        }
268    }
269
270    pub fn documents(&self) -> DocumentClient<'_> {
271        DocumentClient { db: self }
272    }
273
274    pub fn queue(&self) -> QueueClient<'_> {
275        QueueClient { db: self }
276    }
277
278    pub fn kv_collection<'a>(&'a self, collection: &'a str) -> KvClient<'a> {
279        KvClient {
280            db: self,
281            collection,
282        }
283    }
284
285    pub async fn begin(&self) -> Result<QueryResult> {
286        self.query("BEGIN").await
287    }
288
289    pub async fn commit(&self) -> Result<QueryResult> {
290        self.query("COMMIT").await
291    }
292
293    pub async fn rollback(&self) -> Result<QueryResult> {
294        self.query("ROLLBACK").await
295    }
296
297    pub async fn close(&self) -> Result<()> {
298        match self {
299            #[cfg(feature = "embedded")]
300            Reddb::Embedded(c) => c.close(),
301            #[cfg(feature = "grpc")]
302            Reddb::Grpc(c) => c.close().await,
303            #[cfg(feature = "http")]
304            Reddb::Http(c) => c.close().await,
305            Reddb::Unavailable(_) => Ok(()),
306        }
307    }
308
309    pub fn kv(&self) -> KvClient<'_> {
310        KvClient {
311            db: self,
312            collection: "kv_default",
313        }
314    }
315
316    pub fn config(&self) -> ConfigClient<'_> {
317        self.config_collection("red.config")
318    }
319
320    pub fn vault(&self) -> VaultClient<'_> {
321        self.vault_collection("red.vault")
322    }
323
324    pub fn config_collection<'a>(&'a self, collection: &'a str) -> ConfigClient<'a> {
325        ConfigClient {
326            db: self,
327            collection,
328        }
329    }
330
331    pub fn vault_collection<'a>(&'a self, collection: &'a str) -> VaultClient<'a> {
332        VaultClient {
333            db: self,
334            collection,
335        }
336    }
337}
338
339#[derive(Debug)]
340pub struct DocumentClient<'a> {
341    db: &'a Reddb,
342}
343
344impl<'a> DocumentClient<'a> {
345    pub async fn insert(&self, collection: &str, body: &JsonValue) -> Result<DocumentItem> {
346        ensure_json_object("document body", body)?;
347        let collection = sql_identifier(collection)?;
348        self.db
349            .query(&format!("CREATE DOCUMENT IF NOT EXISTS {collection}"))
350            .await?;
351        let result = self
352            .db
353            .query(&format!(
354                "INSERT INTO {collection} DOCUMENT (body) VALUES ({}) RETURNING *",
355                json_text_literal(body)
356            ))
357            .await?;
358        document_item_from_first_row(result)
359    }
360
361    pub async fn get(&self, collection: &str, rid: &str) -> Result<DocumentItem> {
362        let collection = sql_identifier(collection)?;
363        let result = self
364            .db
365            .query(&format!(
366                "SELECT * FROM {collection} WHERE rid = {} LIMIT 1",
367                sql_string_literal(rid)
368            ))
369            .await?;
370        document_item_from_first_row(result)
371    }
372
373    pub async fn list(&self, collection: &str, options: ListOptions<'_>) -> Result<ListResult> {
374        let collection = sql_identifier(collection)?;
375        let result = self
376            .db
377            .query(&select_sql(&collection, "*", &options))
378            .await?;
379        Ok(ListResult {
380            affected: result.affected,
381            items: result.rows,
382        })
383    }
384
385    pub async fn filter(&self, collection: &str, filter: &str) -> Result<ListResult> {
386        self.list(collection, ListOptions::new().filter(filter))
387            .await
388    }
389
390    pub async fn patch(
391        &self,
392        collection: &str,
393        rid: &str,
394        patch: &JsonValue,
395    ) -> Result<DocumentItem> {
396        let entries = patch.as_object().ok_or_else(|| {
397            ClientError::new(
398                ErrorCode::InvalidArgument,
399                "document patch must be a JSON object",
400            )
401        })?;
402        if entries.is_empty() {
403            return Err(ClientError::new(
404                ErrorCode::InvalidArgument,
405                "document patch must contain at least one field",
406            ));
407        }
408        let collection = sql_identifier(collection)?;
409        let assignments = entries
410            .iter()
411            .map(|(field, value)| {
412                Ok(format!(
413                    "{} = {}",
414                    sql_identifier(field)?,
415                    json_value_literal(value)
416                ))
417            })
418            .collect::<Result<Vec<_>>>()?;
419        let result = self
420            .db
421            .query(&format!(
422                "UPDATE {collection} DOCUMENTS SET {} WHERE rid = {} RETURNING *",
423                assignments.join(", "),
424                sql_string_literal(rid)
425            ))
426            .await?;
427        document_item_from_first_row(result)
428    }
429
430    pub async fn delete(&self, collection: &str, rid: &str) -> Result<DeleteResult> {
431        let collection = sql_identifier(collection)?;
432        let result = self
433            .db
434            .query(&format!(
435                "DELETE FROM {collection} WHERE rid = {}",
436                sql_string_literal(rid)
437            ))
438            .await?;
439        Ok(DeleteResult {
440            affected: result.affected,
441            deleted: result.affected > 0,
442        })
443    }
444}
445
446#[derive(Debug)]
447pub struct QueueClient<'a> {
448    db: &'a Reddb,
449}
450
451impl<'a> QueueClient<'a> {
452    pub async fn create(&self, queue: &str) -> Result<QueryResult> {
453        self.db
454            .query(&format!(
455                "CREATE QUEUE IF NOT EXISTS {}",
456                sql_identifier(queue)?
457            ))
458            .await
459    }
460
461    pub async fn push(&self, queue: &str, value: &JsonValue) -> Result<QueryResult> {
462        self.db
463            .query(&format!(
464                "QUEUE PUSH {} {}",
465                sql_identifier(queue)?,
466                json_value_literal(value)
467            ))
468            .await
469    }
470
471    pub async fn peek(&self, queue: &str, limit: Option<u64>) -> Result<ListResult> {
472        let mut sql = format!("QUEUE PEEK {}", sql_identifier(queue)?);
473        if let Some(limit) = limit {
474            sql.push_str(&format!(" {limit}"));
475        }
476        let result = self.db.query(&sql).await?;
477        Ok(ListResult {
478            affected: result.affected,
479            items: result.rows,
480        })
481    }
482
483    pub async fn pop(&self, queue: &str) -> Result<ListResult> {
484        let result = self
485            .db
486            .query(&format!("QUEUE POP {}", sql_identifier(queue)?))
487            .await?;
488        Ok(ListResult {
489            affected: result.affected,
490            items: result.rows,
491        })
492    }
493
494    pub async fn len(&self, queue: &str) -> Result<u64> {
495        let result = self
496            .db
497            .query(&format!("QUEUE LEN {}", sql_identifier(queue)?))
498            .await?;
499        row_value(&result.rows, "len")
500            .and_then(value_as_u64)
501            .ok_or_else(|| ClientError::new(ErrorCode::InvalidResponse, "QUEUE LEN missing len"))
502    }
503
504    pub async fn purge(&self, queue: &str) -> Result<DeleteResult> {
505        let result = self
506            .db
507            .query(&format!("QUEUE PURGE {}", sql_identifier(queue)?))
508            .await?;
509        Ok(DeleteResult {
510            affected: result.affected,
511            deleted: result.affected > 0,
512        })
513    }
514}
515
516#[derive(Debug)]
517pub struct KvClient<'a> {
518    db: &'a Reddb,
519    collection: &'a str,
520}
521
522impl<'a> KvClient<'a> {
523    pub async fn set(&self, key: &str, value: JsonValue) -> Result<QueryResult> {
524        self.put(key, value, &[]).await
525    }
526
527    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
528        let tag_clause = if tags.is_empty() {
529            String::new()
530        } else {
531            format!(
532                " TAGS [{}]",
533                tags.iter()
534                    .map(|tag| kv_tag_literal(tag))
535                    .collect::<Vec<_>>()
536                    .join(", ")
537            )
538        };
539        self.db
540            .query(&format!(
541                "KV PUT {}.{} = {}{}",
542                kv_collection_identifier(self.collection)?,
543                kv_path_segment(key),
544                json_value_literal(&value),
545                tag_clause
546            ))
547            .await
548    }
549
550    pub async fn get(&self, key: &str) -> Result<Option<KvItem>> {
551        let result = self
552            .db
553            .query(&format!(
554                "KV GET {}.{}",
555                kv_collection_identifier(self.collection)?,
556                kv_path_segment(key)
557            ))
558            .await?;
559        Ok(kv_item_from_rows(&result.rows))
560    }
561
562    pub async fn exists(&self, key: &str) -> Result<ExistsResult> {
563        Ok(ExistsResult {
564            exists: self.get(key).await?.is_some(),
565        })
566    }
567
568    pub async fn delete(&self, key: &str) -> Result<DeleteResult> {
569        let result = self
570            .db
571            .query(&format!(
572                "KV DELETE {}.{}",
573                kv_collection_identifier(self.collection)?,
574                kv_path_segment(key)
575            ))
576            .await?;
577        Ok(DeleteResult {
578            affected: result.affected,
579            deleted: result.affected > 0,
580        })
581    }
582
583    pub async fn list(&self, options: ListOptions<'_>) -> Result<ListResult> {
584        let collection = sql_identifier(self.collection)?;
585        let result = self
586            .db
587            .query(&select_sql(&collection, "key, value", &options))
588            .await?;
589        Ok(ListResult {
590            affected: result.affected,
591            items: result.rows,
592        })
593    }
594
595    pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<u64> {
596        let result = self
597            .db
598            .query(&format!(
599                "INVALIDATE TAGS [{}] FROM {}",
600                tags.iter()
601                    .map(|tag| kv_tag_literal(tag))
602                    .collect::<Vec<_>>()
603                    .join(", "),
604                kv_collection_identifier(self.collection)?
605            ))
606            .await?;
607        Ok(result.affected)
608    }
609
610    pub async fn watch(&self, key: &str) -> Result<Vec<KvWatchEvent>> {
611        self.watch_from_lsn(key, None).await
612    }
613
614    pub async fn watch_from_lsn(
615        &self,
616        key: &str,
617        from_lsn: Option<u64>,
618    ) -> Result<Vec<KvWatchEvent>> {
619        #[cfg(not(feature = "http"))]
620        {
621            let _ = key;
622            let _ = from_lsn;
623            let _ = self.collection;
624        }
625        match self.db {
626            #[cfg(feature = "http")]
627            Reddb::Http(c) => c.watch_kv(self.collection, key, from_lsn, None).await,
628            #[cfg(feature = "embedded")]
629            Reddb::Embedded(_) => Err(ClientError::feature_disabled("kv.watch embedded")),
630            #[cfg(feature = "grpc")]
631            Reddb::Grpc(_) => Err(ClientError::feature_disabled("kv.watch grpc")),
632            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
633        }
634    }
635
636    pub async fn watch_prefix(&self, prefix: &str) -> Result<Vec<KvWatchEvent>> {
637        self.watch_prefix_from_lsn(prefix, None).await
638    }
639
640    pub async fn watch_prefix_from_lsn(
641        &self,
642        prefix: &str,
643        from_lsn: Option<u64>,
644    ) -> Result<Vec<KvWatchEvent>> {
645        let key = format!("{prefix}.*");
646        self.watch_from_lsn(&key, from_lsn).await
647    }
648}
649
650#[derive(Debug)]
651pub struct ConfigClient<'a> {
652    db: &'a Reddb,
653    collection: &'a str,
654}
655
656impl<'a> ConfigClient<'a> {
657    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
658        let mut sql = format!(
659            "PUT CONFIG {} {} = {}",
660            kv_collection_identifier(self.collection)?,
661            kv_path_segment(key),
662            json_value_literal(&value)
663        );
664        append_tag_clause(&mut sql, tags);
665        self.db.query(&sql).await
666    }
667
668    pub async fn put_secret_ref(
669        &self,
670        key: &str,
671        vault_collection: &str,
672        vault_key: &str,
673        tags: &[&str],
674    ) -> Result<QueryResult> {
675        let mut sql = format!(
676            "PUT CONFIG {} {} = SECRET_REF(vault, {}.{})",
677            kv_collection_identifier(self.collection)?,
678            kv_path_segment(key),
679            kv_collection_identifier(vault_collection)?,
680            kv_path_segment(vault_key)
681        );
682        append_tag_clause(&mut sql, tags);
683        self.db.query(&sql).await
684    }
685
686    pub async fn get(&self, key: &str) -> Result<QueryResult> {
687        self.db
688            .query(&format!(
689                "GET CONFIG {} {}",
690                kv_collection_identifier(self.collection)?,
691                kv_path_segment(key)
692            ))
693            .await
694    }
695
696    pub async fn resolve(&self, key: &str) -> Result<QueryResult> {
697        self.db
698            .query(&format!(
699                "RESOLVE CONFIG {} {}",
700                kv_collection_identifier(self.collection)?,
701                kv_path_segment(key)
702            ))
703            .await
704    }
705}
706
707#[derive(Debug)]
708pub struct VaultClient<'a> {
709    db: &'a Reddb,
710    collection: &'a str,
711}
712
713impl<'a> VaultClient<'a> {
714    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
715        let mut sql = format!(
716            "VAULT PUT {}.{} = {}",
717            kv_collection_identifier(self.collection)?,
718            kv_path_segment(key),
719            json_value_literal(&value)
720        );
721        append_tag_clause(&mut sql, tags);
722        self.db.query(&sql).await
723    }
724
725    pub async fn get(&self, key: &str) -> Result<QueryResult> {
726        self.db
727            .query(&format!(
728                "VAULT GET {}.{}",
729                kv_collection_identifier(self.collection)?,
730                kv_path_segment(key)
731            ))
732            .await
733    }
734
735    pub async fn unseal(&self, key: &str) -> Result<QueryResult> {
736        self.db
737            .query(&format!(
738                "UNSEAL VAULT {}.{}",
739                kv_collection_identifier(self.collection)?,
740                kv_path_segment(key)
741            ))
742            .await
743    }
744}
745
746fn append_tag_clause(sql: &mut String, tags: &[&str]) {
747    if tags.is_empty() {
748        return;
749    }
750    sql.push_str(" TAGS [");
751    sql.push_str(
752        &tags
753            .iter()
754            .map(|tag| kv_tag_literal(tag))
755            .collect::<Vec<_>>()
756            .join(", "),
757    );
758    sql.push(']');
759}
760
761fn sql_identifier(value: &str) -> Result<String> {
762    let mut chars = value.chars();
763    let Some(first) = chars.next() else {
764        return Err(ClientError::new(
765            ErrorCode::InvalidArgument,
766            "identifier must not be empty",
767        ));
768    };
769    if !(first.is_ascii_alphabetic() || first == '_') {
770        return Err(ClientError::new(
771            ErrorCode::InvalidArgument,
772            format!("invalid identifier `{value}`"),
773        ));
774    }
775    if chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
776        Ok(value.to_string())
777    } else {
778        Err(ClientError::new(
779            ErrorCode::InvalidArgument,
780            format!("invalid identifier `{value}`"),
781        ))
782    }
783}
784
785fn kv_collection_identifier(value: &str) -> Result<String> {
786    let mut parts = Vec::new();
787    for part in value.split('.') {
788        parts.push(sql_identifier(part)?);
789    }
790    Ok(parts.join("."))
791}
792
793fn kv_path_segment(value: &str) -> String {
794    if is_plain_path_segment(value) {
795        value.to_string()
796    } else {
797        sql_string_literal(value)
798    }
799}
800
801fn is_plain_path_segment(value: &str) -> bool {
802    let mut chars = value.chars();
803    let Some(first) = chars.next() else {
804        return false;
805    };
806    (first.is_ascii_alphabetic() || first == '_')
807        && chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-')
808}
809
810fn json_value_literal(value: &JsonValue) -> String {
811    match value {
812        JsonValue::Null => "NULL".to_string(),
813        JsonValue::Bool(value) => value.to_string(),
814        JsonValue::Number(value) => value.to_string(),
815        JsonValue::String(value) => sql_string_literal(value),
816        JsonValue::Array(_) | JsonValue::Object(_) => value.to_json_string(),
817    }
818}
819
820fn json_text_literal(value: &JsonValue) -> String {
821    sql_string_literal(&value.to_json_string())
822}
823
824fn kv_tag_literal(value: &str) -> String {
825    sql_string_literal(value)
826}
827
828fn sql_string_literal(value: &str) -> String {
829    format!("'{}'", value.replace('\'', "''"))
830}
831
832fn ensure_json_object(name: &str, value: &JsonValue) -> Result<()> {
833    if value.as_object().is_some() {
834        Ok(())
835    } else {
836        Err(ClientError::new(
837            ErrorCode::InvalidArgument,
838            format!("{name} must be a JSON object"),
839        ))
840    }
841}
842
843fn select_sql(collection: &str, columns: &str, options: &ListOptions<'_>) -> String {
844    let mut sql = format!("SELECT {columns} FROM {collection}");
845    if let Some(filter) = options.filter {
846        sql.push_str(" WHERE ");
847        sql.push_str(filter);
848    }
849    if let Some(order_by) = options.order_by {
850        sql.push_str(" ORDER BY ");
851        sql.push_str(order_by);
852    }
853    if let Some(limit) = options.limit {
854        sql.push_str(&format!(" LIMIT {limit}"));
855    }
856    sql
857}
858
859fn document_item_from_first_row(result: QueryResult) -> Result<DocumentItem> {
860    let Some(row) = result.rows.into_iter().next() else {
861        return Err(ClientError::new(ErrorCode::NotFound, "document not found"));
862    };
863    let rid = row
864        .iter()
865        .find(|(column, _)| column == "rid")
866        .and_then(|(_, value)| value_as_string(value))
867        .ok_or_else(|| ClientError::new(ErrorCode::InvalidResponse, "document row missing rid"))?;
868    Ok(DocumentItem { rid, fields: row })
869}
870
871fn kv_item_from_rows(rows: &[Row]) -> Option<KvItem> {
872    let row = rows.first()?;
873    let value = row
874        .iter()
875        .find(|(column, _)| column == "value")
876        .map(|(_, value)| value.clone())?;
877    let rid = row
878        .iter()
879        .find(|(column, _)| column == "rid")
880        .map(|(_, value)| value);
881    if matches!(rid, Some(ValueOut::Null)) && value == ValueOut::Null {
882        return None;
883    }
884    let collection = row
885        .iter()
886        .find(|(column, _)| column == "collection")
887        .and_then(|(_, value)| value_as_string(value))
888        .unwrap_or_default();
889    let key = row
890        .iter()
891        .find(|(column, _)| column == "key")
892        .and_then(|(_, value)| value_as_string(value))
893        .unwrap_or_default();
894    Some(KvItem {
895        collection,
896        key,
897        value,
898    })
899}
900
901fn row_value<'a>(rows: &'a [Row], column: &str) -> Option<&'a ValueOut> {
902    rows.first()?
903        .iter()
904        .find(|(name, _)| name == column)
905        .map(|(_, value)| value)
906}
907
908fn value_as_string(value: &ValueOut) -> Option<String> {
909    match value {
910        ValueOut::String(value) => Some(value.clone()),
911        ValueOut::Integer(value) => Some(value.to_string()),
912        _ => None,
913    }
914}
915
916fn value_as_u64(value: &ValueOut) -> Option<u64> {
917    match value {
918        ValueOut::Integer(value) => (*value).try_into().ok(),
919        ValueOut::Float(value) if *value >= 0.0 => Some(*value as u64),
920        ValueOut::String(value) => value.parse().ok(),
921        _ => None,
922    }
923}
924
925/// Crate version (matches the engine version when published in lockstep).
926pub fn version() -> &'static str {
927    env!("CARGO_PKG_VERSION")
928}