Skip to main content

reddb_client/
lib.rs

1//! Official Rust client for [RedDB](https://github.com/reddb-io/reddb).
2//!
3//! One connection-string API. Pick your backend at runtime:
4//!
5//! ```no_run
6//! use reddb_client::{Reddb, JsonValue};
7//!
8//! # async fn run() -> reddb_client::Result<()> {
9//! // Embedded: opens the engine in-process, no network.
10//! let db = Reddb::connect("memory://").await?;
11//! db.insert("users", &JsonValue::object([("name", JsonValue::string("Alice"))])).await?;
12//! let result = db.query("SELECT * FROM users").await?;
13//! println!("{} rows", result.rows.len());
14//! db.close().await?;
15//! # Ok(())
16//! # }
17//! ```
18//!
19//! Accepted URIs:
20//!
21//! | URI                       | Backend                              | Status |
22//! |---------------------------|--------------------------------------|--------|
23//! | `memory://`               | Ephemeral in-memory                  | ✅    |
24//! | `file:///abs/path`        | Embedded engine on disk              | ✅    |
25//! | `grpc://host:port`        | Remote tonic client                  | ✅    |
26//! | `red://host:port`         | Remote tonic client (default port 5050) | ✅    |
27//! | `http://host:port`        | REST client                          | ✅    |
28//!
29//! ## Cargo features
30//!
31//! - `embedded` (default) — pulls the entire RedDB engine in-process.
32//! - `grpc` — opt-in remote client over tonic. Pulls the engine for
33//!   its `RedDBClient` type today; a thin proto-only client is tracked
34//!   in `PLAN_DRIVERS.md`.
35//! - `http` — REST client.
36//! - `redwire` — RedWire native TCP client (no engine dep).
37//!
38//! ## Internal connector
39//!
40//! The crate also hosts the gRPC connector + REPL used by the
41//! `red` and `red_client` binaries via the [`connector`] module.
42//! That layer is intentionally lighter than the published [`Reddb`]
43//! API: it speaks tonic + ureq + serde_json only and never pulls
44//! the engine in. It is exposed at the crate root as
45//! [`RedDBClient`] and [`repl`] for back-compat with the previous
46//! `reddb-client-internal` crate.
47
48#![deny(unsafe_code)]
49#![warn(missing_debug_implementations)]
50
51pub mod connect;
52pub mod connector;
53pub mod error;
54pub mod params;
55pub mod topology;
56pub mod types;
57
58#[cfg(feature = "embedded")]
59pub mod embedded;
60
61#[cfg(feature = "grpc")]
62pub mod grpc;
63
64#[cfg(feature = "grpc")]
65pub mod router;
66
67#[cfg(feature = "redwire")]
68pub mod redwire;
69
70#[cfg(feature = "http")]
71pub mod http;
72
73pub use error::{ClientError, ErrorCode, Result};
74pub use params::{IntoParams, IntoValue, Value};
75pub use types::{BulkInsertResult, InsertResult, JsonValue, KvWatchEvent, QueryResult, ValueOut};
76
77// Back-compat re-exports for the previous `reddb-client-internal`
78// crate. Workspace consumers (`reddb-server::rpc_stdio`, the `red`
79// bin's REPL launcher, the `red_client` bin) import these paths
80// directly.
81pub use connector::{
82    repl, BulkCreateStatus, CreatedEntity, HealthStatus, OperationStatus, QueryResponse,
83    RedDBClient,
84};
85
86use connect::Target;
87
88/// Top-level client handle. Use [`Reddb::connect`] to get one.
89#[derive(Debug)]
90pub enum Reddb {
91    #[cfg(feature = "embedded")]
92    Embedded(embedded::EmbeddedClient),
93    #[cfg(feature = "grpc")]
94    Grpc(grpc::GrpcClient),
95    #[cfg(feature = "http")]
96    Http(http::HttpClient),
97    /// Constructed when a feature gate would have produced a real
98    /// variant but the feature is disabled. Every method on this
99    /// variant returns a `FEATURE_DISABLED` error so build-time
100    /// configuration bugs surface as runtime errors with a clear
101    /// remediation, not as missing trait impls.
102    Unavailable(&'static str),
103}
104
105impl Reddb {
106    /// Open a connection. The backend is selected from the URI scheme.
107    pub async fn connect(uri: &str) -> Result<Self> {
108        let target = connect::parse(uri)?;
109        match target {
110            Target::Memory => {
111                #[cfg(feature = "embedded")]
112                {
113                    embedded::EmbeddedClient::in_memory().map(Reddb::Embedded)
114                }
115                #[cfg(not(feature = "embedded"))]
116                {
117                    Err(ClientError::feature_disabled("embedded"))
118                }
119            }
120            Target::File { path } => {
121                #[cfg(feature = "embedded")]
122                {
123                    embedded::EmbeddedClient::open(path).map(Reddb::Embedded)
124                }
125                #[cfg(not(feature = "embedded"))]
126                {
127                    let _ = path;
128                    Err(ClientError::feature_disabled("embedded"))
129                }
130            }
131            Target::Grpc { endpoint } => {
132                #[cfg(feature = "grpc")]
133                {
134                    grpc::GrpcClient::connect(endpoint).await.map(Reddb::Grpc)
135                }
136                #[cfg(not(feature = "grpc"))]
137                {
138                    let _ = endpoint;
139                    Err(ClientError::feature_disabled("grpc"))
140                }
141            }
142            Target::GrpcCluster {
143                primary,
144                replicas,
145                force_primary,
146            } => {
147                #[cfg(feature = "grpc")]
148                {
149                    grpc::GrpcClient::connect_cluster(primary, replicas, force_primary)
150                        .await
151                        .map(Reddb::Grpc)
152                }
153                #[cfg(not(feature = "grpc"))]
154                {
155                    let _ = (primary, replicas, force_primary);
156                    Err(ClientError::feature_disabled("grpc"))
157                }
158            }
159            Target::Http { base_url } => {
160                #[cfg(feature = "http")]
161                {
162                    http::HttpClient::connect(http::HttpOptions::new(base_url))
163                        .await
164                        .map(Reddb::Http)
165                }
166                #[cfg(not(feature = "http"))]
167                {
168                    let _ = base_url;
169                    Err(ClientError::feature_disabled("http"))
170                }
171            }
172        }
173    }
174
175    pub async fn query(&self, sql: &str) -> Result<QueryResult> {
176        match self {
177            #[cfg(feature = "embedded")]
178            Reddb::Embedded(c) => c.query(sql),
179            #[cfg(feature = "grpc")]
180            Reddb::Grpc(c) => c.query(sql).await,
181            #[cfg(feature = "http")]
182            Reddb::Http(c) => c.query(sql).await,
183            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
184        }
185    }
186
187    /// Parameterized query — `$N` placeholders in `sql` are bound to
188    /// `params[N-1]`. Empty params is equivalent to [`Self::query`].
189    ///
190    /// Native type mapping (driver-side, [`IntoValue`]):
191    ///
192    /// | Rust                    | Engine `Value` variant |
193    /// |-------------------------|------------------------|
194    /// | `i8..i64` / `u8..u32`   | `Integer` (i64)        |
195    /// | `bool`                  | `Boolean`              |
196    /// | `f32` / `f64`           | `Float` (f64)          |
197    /// | `&str` / `String`       | `Text`                 |
198    /// | `Vec<u8>` / `&[u8]`     | `Blob`                 |
199    /// | `Vec<f32>` / `&[f32]`   | `Vector`               |
200    /// | `Option<T>`             | `Null` when `None`     |
201    /// | `serde_json::Value`     | `Json`                 |
202    /// | [`Value::Timestamp`]    | `Timestamp` (seconds)  |
203    /// | [`Value::Uuid`]         | `Uuid` (16 raw bytes)  |
204    ///
205    /// Today the [`Reddb::Embedded`], [`Reddb::Grpc`], and [`Reddb::Http`]
206    /// transports carry parameters end-to-end.
207    pub async fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
208        let values = params.into_params();
209        match self {
210            #[cfg(feature = "embedded")]
211            Reddb::Embedded(c) => c.query_with(sql, values),
212            #[cfg(feature = "grpc")]
213            Reddb::Grpc(c) => c.query_with(sql, &values).await,
214            #[cfg(feature = "http")]
215            Reddb::Http(c) => c.query_with(sql, &values).await,
216            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
217        }
218    }
219
220    /// Parameterized execution for DML statements. This is an alias for
221    /// [`Self::query_with`] because RedDB returns one query result envelope for
222    /// SELECT and DML.
223    pub async fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
224        self.query_with(sql, params).await
225    }
226
227    pub async fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
228        match self {
229            #[cfg(feature = "embedded")]
230            Reddb::Embedded(c) => c.insert(collection, payload),
231            #[cfg(feature = "grpc")]
232            Reddb::Grpc(c) => c.insert(collection, payload).await,
233            #[cfg(feature = "http")]
234            Reddb::Http(c) => c.insert(collection, payload).await,
235            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
236        }
237    }
238
239    pub async fn bulk_insert(
240        &self,
241        collection: &str,
242        payloads: &[JsonValue],
243    ) -> Result<BulkInsertResult> {
244        match self {
245            #[cfg(feature = "embedded")]
246            Reddb::Embedded(c) => c.bulk_insert(collection, payloads),
247            #[cfg(feature = "grpc")]
248            Reddb::Grpc(c) => c.bulk_insert(collection, payloads).await,
249            #[cfg(feature = "http")]
250            Reddb::Http(c) => c.bulk_insert(collection, payloads).await,
251            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
252        }
253    }
254
255    pub async fn delete(&self, collection: &str, id: &str) -> Result<u64> {
256        match self {
257            #[cfg(feature = "embedded")]
258            Reddb::Embedded(c) => c.delete(collection, id),
259            #[cfg(feature = "grpc")]
260            Reddb::Grpc(c) => c.delete(collection, id).await,
261            #[cfg(feature = "http")]
262            Reddb::Http(c) => c.delete(collection, id).await,
263            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
264        }
265    }
266
267    pub async fn close(&self) -> Result<()> {
268        match self {
269            #[cfg(feature = "embedded")]
270            Reddb::Embedded(c) => c.close(),
271            #[cfg(feature = "grpc")]
272            Reddb::Grpc(c) => c.close().await,
273            #[cfg(feature = "http")]
274            Reddb::Http(c) => c.close().await,
275            Reddb::Unavailable(_) => Ok(()),
276        }
277    }
278
279    pub fn kv(&self) -> KvClient<'_> {
280        KvClient {
281            db: self,
282            collection: "kv_default",
283        }
284    }
285
286    pub fn config(&self) -> ConfigClient<'_> {
287        self.config_collection("red.config")
288    }
289
290    pub fn vault(&self) -> VaultClient<'_> {
291        self.vault_collection("red.vault")
292    }
293
294    pub fn config_collection<'a>(&'a self, collection: &'a str) -> ConfigClient<'a> {
295        ConfigClient {
296            db: self,
297            collection,
298        }
299    }
300
301    pub fn vault_collection<'a>(&'a self, collection: &'a str) -> VaultClient<'a> {
302        VaultClient {
303            db: self,
304            collection,
305        }
306    }
307}
308
309#[derive(Debug)]
310pub struct KvClient<'a> {
311    db: &'a Reddb,
312    collection: &'static str,
313}
314
315impl<'a> KvClient<'a> {
316    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
317        let tag_clause = if tags.is_empty() {
318            String::new()
319        } else {
320            format!(
321                " TAGS [{}]",
322                tags.iter()
323                    .map(|tag| kv_tag_literal(tag))
324                    .collect::<Vec<_>>()
325                    .join(", ")
326            )
327        };
328        self.db
329            .query(&format!(
330                "KV PUT {}.{} = {}{}",
331                kv_identifier(self.collection),
332                kv_identifier(key),
333                kv_value_literal(&value),
334                tag_clause
335            ))
336            .await
337    }
338
339    pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<u64> {
340        let result = self
341            .db
342            .query(&format!(
343                "INVALIDATE TAGS [{}] FROM {}",
344                tags.iter()
345                    .map(|tag| kv_tag_literal(tag))
346                    .collect::<Vec<_>>()
347                    .join(", "),
348                kv_identifier(self.collection)
349            ))
350            .await?;
351        Ok(result.affected)
352    }
353
354    pub async fn watch(&self, key: &str) -> Result<Vec<KvWatchEvent>> {
355        self.watch_from_lsn(key, None).await
356    }
357
358    pub async fn watch_from_lsn(
359        &self,
360        key: &str,
361        from_lsn: Option<u64>,
362    ) -> Result<Vec<KvWatchEvent>> {
363        #[cfg(not(feature = "http"))]
364        {
365            let _ = key;
366            let _ = from_lsn;
367            let _ = self.collection;
368        }
369        match self.db {
370            #[cfg(feature = "http")]
371            Reddb::Http(c) => c.watch_kv(self.collection, key, from_lsn, None).await,
372            #[cfg(feature = "embedded")]
373            Reddb::Embedded(_) => Err(ClientError::feature_disabled("kv.watch embedded")),
374            #[cfg(feature = "grpc")]
375            Reddb::Grpc(_) => Err(ClientError::feature_disabled("kv.watch grpc")),
376            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
377        }
378    }
379
380    pub async fn watch_prefix(&self, prefix: &str) -> Result<Vec<KvWatchEvent>> {
381        self.watch_prefix_from_lsn(prefix, None).await
382    }
383
384    pub async fn watch_prefix_from_lsn(
385        &self,
386        prefix: &str,
387        from_lsn: Option<u64>,
388    ) -> Result<Vec<KvWatchEvent>> {
389        let key = format!("{prefix}.*");
390        self.watch_from_lsn(&key, from_lsn).await
391    }
392}
393
394#[derive(Debug)]
395pub struct ConfigClient<'a> {
396    db: &'a Reddb,
397    collection: &'a str,
398}
399
400impl<'a> ConfigClient<'a> {
401    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
402        let mut sql = format!(
403            "PUT CONFIG {} {} = {}",
404            kv_identifier(self.collection),
405            kv_identifier(key),
406            kv_value_literal(&value)
407        );
408        append_tag_clause(&mut sql, tags);
409        self.db.query(&sql).await
410    }
411
412    pub async fn put_secret_ref(
413        &self,
414        key: &str,
415        vault_collection: &str,
416        vault_key: &str,
417        tags: &[&str],
418    ) -> Result<QueryResult> {
419        let mut sql = format!(
420            "PUT CONFIG {} {} = SECRET_REF(vault, {}.{})",
421            kv_identifier(self.collection),
422            kv_identifier(key),
423            kv_identifier(vault_collection),
424            kv_identifier(vault_key)
425        );
426        append_tag_clause(&mut sql, tags);
427        self.db.query(&sql).await
428    }
429
430    pub async fn get(&self, key: &str) -> Result<QueryResult> {
431        self.db
432            .query(&format!(
433                "GET CONFIG {} {}",
434                kv_identifier(self.collection),
435                kv_identifier(key)
436            ))
437            .await
438    }
439
440    pub async fn resolve(&self, key: &str) -> Result<QueryResult> {
441        self.db
442            .query(&format!(
443                "RESOLVE CONFIG {} {}",
444                kv_identifier(self.collection),
445                kv_identifier(key)
446            ))
447            .await
448    }
449}
450
451#[derive(Debug)]
452pub struct VaultClient<'a> {
453    db: &'a Reddb,
454    collection: &'a str,
455}
456
457impl<'a> VaultClient<'a> {
458    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
459        let mut sql = format!(
460            "VAULT PUT {}.{} = {}",
461            kv_identifier(self.collection),
462            kv_identifier(key),
463            kv_value_literal(&value)
464        );
465        append_tag_clause(&mut sql, tags);
466        self.db.query(&sql).await
467    }
468
469    pub async fn get(&self, key: &str) -> Result<QueryResult> {
470        self.db
471            .query(&format!(
472                "VAULT GET {}.{}",
473                kv_identifier(self.collection),
474                kv_identifier(key)
475            ))
476            .await
477    }
478
479    pub async fn unseal(&self, key: &str) -> Result<QueryResult> {
480        self.db
481            .query(&format!(
482                "UNSEAL VAULT {}.{}",
483                kv_identifier(self.collection),
484                kv_identifier(key)
485            ))
486            .await
487    }
488}
489
490fn append_tag_clause(sql: &mut String, tags: &[&str]) {
491    if tags.is_empty() {
492        return;
493    }
494    sql.push_str(" TAGS [");
495    sql.push_str(
496        &tags
497            .iter()
498            .map(|tag| kv_tag_literal(tag))
499            .collect::<Vec<_>>()
500            .join(", "),
501    );
502    sql.push(']');
503}
504
505fn kv_identifier(value: &str) -> String {
506    value
507        .chars()
508        .map(|ch| {
509            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '.' {
510                ch
511            } else {
512                '_'
513            }
514        })
515        .collect()
516}
517
518fn kv_value_literal(value: &JsonValue) -> String {
519    match value {
520        JsonValue::Null => "NULL".to_string(),
521        JsonValue::Bool(value) => value.to_string(),
522        JsonValue::Number(value) => value.to_string(),
523        JsonValue::String(value) => format!("'{}'", value.replace('\'', "''")),
524        JsonValue::Array(_) | JsonValue::Object(_) => {
525            format!("'{}'", value.to_json_string().replace('\'', "''"))
526        }
527    }
528}
529
530fn kv_tag_literal(value: &str) -> String {
531    format!("'{}'", value.replace('\'', "''"))
532}
533
534/// Crate version (matches the engine version when published in lockstep).
535pub fn version() -> &'static str {
536    env!("CARGO_PKG_VERSION")
537}