Skip to main content

reddb_client/
lib.rs

1//! Official Rust client for [RedDB](https://github.com/reddb-io/reddb).
2//!
3//! One connection-string API. Pick your backend at runtime:
4//!
5//! ```no_run
6//! use reddb_client::{Reddb, JsonValue};
7//!
8//! # async fn run() -> reddb_client::Result<()> {
9//! // Embedded: opens the engine in-process, no network.
10//! let db = Reddb::connect("memory://").await?;
11//! db.insert("users", &JsonValue::object([("name", JsonValue::string("Alice"))])).await?;
12//! let result = db.query("SELECT * FROM users").await?;
13//! println!("{} rows", result.rows.len());
14//! db.close().await?;
15//! # Ok(())
16//! # }
17//! ```
18//!
19//! Accepted URIs:
20//!
21//! | URI                       | Backend                              | Status |
22//! |---------------------------|--------------------------------------|--------|
23//! | `memory://`               | Ephemeral in-memory                  | ✅    |
24//! | `file:///abs/path`        | Embedded engine on disk              | ✅    |
25//! | `grpc://host:port`        | Remote tonic client                  | ✅    |
26//! | `red://host:port`         | Remote tonic client (default port 5050) | ✅    |
27//! | `http://host:port`        | REST client                          | ✅    |
28//!
29//! ## Cargo features
30//!
31//! - `embedded` (default) — pulls the entire RedDB engine in-process.
32//! - `grpc` — opt-in remote client over tonic. Pulls the engine for
33//!   its `RedDBClient` type today; a thin proto-only client is tracked
34//!   in `PLAN_DRIVERS.md`.
35//! - `http` — REST client.
36//! - `redwire` — RedWire native TCP client (no engine dep).
37//!
38//! ## Internal connector
39//!
40//! The crate also hosts the gRPC connector + REPL used by the
41//! `red` and `red_client` binaries via the [`connector`] module.
42//! That layer is intentionally lighter than the published [`Reddb`]
43//! API: it speaks tonic + ureq + serde_json only and never pulls
44//! the engine in. It is exposed at the crate root as
45//! [`RedDBClient`] and [`repl`] for back-compat with the previous
46//! `reddb-client-internal` crate.
47
48#![deny(unsafe_code)]
49#![warn(missing_debug_implementations)]
50
51pub mod connect;
52pub mod connector;
53pub mod error;
54pub mod topology;
55pub mod types;
56
57#[cfg(feature = "embedded")]
58pub mod embedded;
59
60#[cfg(feature = "grpc")]
61pub mod grpc;
62
63#[cfg(feature = "grpc")]
64pub mod router;
65
66#[cfg(feature = "redwire")]
67pub mod redwire;
68
69#[cfg(feature = "http")]
70pub mod http;
71
72pub use error::{ClientError, ErrorCode, Result};
73pub use types::{InsertResult, JsonValue, KvWatchEvent, QueryResult, ValueOut};
74
75// Back-compat re-exports for the previous `reddb-client-internal`
76// crate. Workspace consumers (`reddb-server::rpc_stdio`, the `red`
77// bin's REPL launcher, the `red_client` bin) import these paths
78// directly.
79pub use connector::{
80    repl, BulkCreateStatus, CreatedEntity, HealthStatus, OperationStatus, QueryResponse,
81    RedDBClient,
82};
83
84use connect::Target;
85
86/// Top-level client handle. Use [`Reddb::connect`] to get one.
87#[derive(Debug)]
88pub enum Reddb {
89    #[cfg(feature = "embedded")]
90    Embedded(embedded::EmbeddedClient),
91    #[cfg(feature = "grpc")]
92    Grpc(grpc::GrpcClient),
93    #[cfg(feature = "http")]
94    Http(http::HttpClient),
95    /// Constructed when a feature gate would have produced a real
96    /// variant but the feature is disabled. Every method on this
97    /// variant returns a `FEATURE_DISABLED` error so build-time
98    /// configuration bugs surface as runtime errors with a clear
99    /// remediation, not as missing trait impls.
100    Unavailable(&'static str),
101}
102
103impl Reddb {
104    /// Open a connection. The backend is selected from the URI scheme.
105    pub async fn connect(uri: &str) -> Result<Self> {
106        let target = connect::parse(uri)?;
107        match target {
108            Target::Memory => {
109                #[cfg(feature = "embedded")]
110                {
111                    embedded::EmbeddedClient::in_memory().map(Reddb::Embedded)
112                }
113                #[cfg(not(feature = "embedded"))]
114                {
115                    Err(ClientError::feature_disabled("embedded"))
116                }
117            }
118            Target::File { path } => {
119                #[cfg(feature = "embedded")]
120                {
121                    embedded::EmbeddedClient::open(path).map(Reddb::Embedded)
122                }
123                #[cfg(not(feature = "embedded"))]
124                {
125                    let _ = path;
126                    Err(ClientError::feature_disabled("embedded"))
127                }
128            }
129            Target::Grpc { endpoint } => {
130                #[cfg(feature = "grpc")]
131                {
132                    grpc::GrpcClient::connect(endpoint).await.map(Reddb::Grpc)
133                }
134                #[cfg(not(feature = "grpc"))]
135                {
136                    let _ = endpoint;
137                    Err(ClientError::feature_disabled("grpc"))
138                }
139            }
140            Target::GrpcCluster {
141                primary,
142                replicas,
143                force_primary,
144            } => {
145                #[cfg(feature = "grpc")]
146                {
147                    grpc::GrpcClient::connect_cluster(primary, replicas, force_primary)
148                        .await
149                        .map(Reddb::Grpc)
150                }
151                #[cfg(not(feature = "grpc"))]
152                {
153                    let _ = (primary, replicas, force_primary);
154                    Err(ClientError::feature_disabled("grpc"))
155                }
156            }
157            Target::Http { base_url } => {
158                #[cfg(feature = "http")]
159                {
160                    http::HttpClient::connect(http::HttpOptions::new(base_url))
161                        .await
162                        .map(Reddb::Http)
163                }
164                #[cfg(not(feature = "http"))]
165                {
166                    let _ = base_url;
167                    Err(ClientError::feature_disabled("http"))
168                }
169            }
170        }
171    }
172
173    pub async fn query(&self, sql: &str) -> Result<QueryResult> {
174        match self {
175            #[cfg(feature = "embedded")]
176            Reddb::Embedded(c) => c.query(sql),
177            #[cfg(feature = "grpc")]
178            Reddb::Grpc(c) => c.query(sql).await,
179            #[cfg(feature = "http")]
180            Reddb::Http(c) => c.query(sql).await,
181            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
182        }
183    }
184
185    pub async fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
186        match self {
187            #[cfg(feature = "embedded")]
188            Reddb::Embedded(c) => c.insert(collection, payload),
189            #[cfg(feature = "grpc")]
190            Reddb::Grpc(c) => c.insert(collection, payload).await,
191            #[cfg(feature = "http")]
192            Reddb::Http(c) => c.insert(collection, payload).await,
193            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
194        }
195    }
196
197    pub async fn bulk_insert(&self, collection: &str, payloads: &[JsonValue]) -> Result<u64> {
198        match self {
199            #[cfg(feature = "embedded")]
200            Reddb::Embedded(c) => c.bulk_insert(collection, payloads),
201            #[cfg(feature = "grpc")]
202            Reddb::Grpc(c) => c.bulk_insert(collection, payloads).await,
203            #[cfg(feature = "http")]
204            Reddb::Http(c) => c.bulk_insert(collection, payloads).await,
205            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
206        }
207    }
208
209    pub async fn delete(&self, collection: &str, id: &str) -> Result<u64> {
210        match self {
211            #[cfg(feature = "embedded")]
212            Reddb::Embedded(c) => c.delete(collection, id),
213            #[cfg(feature = "grpc")]
214            Reddb::Grpc(c) => c.delete(collection, id).await,
215            #[cfg(feature = "http")]
216            Reddb::Http(c) => c.delete(collection, id).await,
217            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
218        }
219    }
220
221    pub async fn close(&self) -> Result<()> {
222        match self {
223            #[cfg(feature = "embedded")]
224            Reddb::Embedded(c) => c.close(),
225            #[cfg(feature = "grpc")]
226            Reddb::Grpc(c) => c.close().await,
227            #[cfg(feature = "http")]
228            Reddb::Http(c) => c.close().await,
229            Reddb::Unavailable(_) => Ok(()),
230        }
231    }
232
233    pub fn kv(&self) -> KvClient<'_> {
234        KvClient {
235            db: self,
236            collection: "kv_default",
237        }
238    }
239
240    pub fn config(&self) -> ConfigClient<'_> {
241        self.config_collection("red.config")
242    }
243
244    pub fn vault(&self) -> VaultClient<'_> {
245        self.vault_collection("red.vault")
246    }
247
248    pub fn config_collection<'a>(&'a self, collection: &'a str) -> ConfigClient<'a> {
249        ConfigClient {
250            db: self,
251            collection,
252        }
253    }
254
255    pub fn vault_collection<'a>(&'a self, collection: &'a str) -> VaultClient<'a> {
256        VaultClient {
257            db: self,
258            collection,
259        }
260    }
261}
262
263#[derive(Debug)]
264pub struct KvClient<'a> {
265    db: &'a Reddb,
266    collection: &'static str,
267}
268
269impl<'a> KvClient<'a> {
270    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
271        let tag_clause = if tags.is_empty() {
272            String::new()
273        } else {
274            format!(
275                " TAGS [{}]",
276                tags.iter()
277                    .map(|tag| kv_tag_literal(tag))
278                    .collect::<Vec<_>>()
279                    .join(", ")
280            )
281        };
282        self.db
283            .query(&format!(
284                "KV PUT {}.{} = {}{}",
285                kv_identifier(self.collection),
286                kv_identifier(key),
287                kv_value_literal(&value),
288                tag_clause
289            ))
290            .await
291    }
292
293    pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<u64> {
294        let result = self
295            .db
296            .query(&format!(
297                "INVALIDATE TAGS [{}] FROM {}",
298                tags.iter()
299                    .map(|tag| kv_tag_literal(tag))
300                    .collect::<Vec<_>>()
301                    .join(", "),
302                kv_identifier(self.collection)
303            ))
304            .await?;
305        Ok(result.affected)
306    }
307
308    pub async fn watch(&self, key: &str) -> Result<Vec<KvWatchEvent>> {
309        self.watch_from_lsn(key, None).await
310    }
311
312    pub async fn watch_from_lsn(
313        &self,
314        key: &str,
315        from_lsn: Option<u64>,
316    ) -> Result<Vec<KvWatchEvent>> {
317        #[cfg(not(feature = "http"))]
318        {
319            let _ = key;
320            let _ = from_lsn;
321            let _ = self.collection;
322        }
323        match self.db {
324            #[cfg(feature = "http")]
325            Reddb::Http(c) => c.watch_kv(self.collection, key, from_lsn, None).await,
326            #[cfg(feature = "embedded")]
327            Reddb::Embedded(_) => Err(ClientError::feature_disabled("kv.watch embedded")),
328            #[cfg(feature = "grpc")]
329            Reddb::Grpc(_) => Err(ClientError::feature_disabled("kv.watch grpc")),
330            Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
331        }
332    }
333
334    pub async fn watch_prefix(&self, prefix: &str) -> Result<Vec<KvWatchEvent>> {
335        self.watch_prefix_from_lsn(prefix, None).await
336    }
337
338    pub async fn watch_prefix_from_lsn(
339        &self,
340        prefix: &str,
341        from_lsn: Option<u64>,
342    ) -> Result<Vec<KvWatchEvent>> {
343        let key = format!("{prefix}.*");
344        self.watch_from_lsn(&key, from_lsn).await
345    }
346}
347
348#[derive(Debug)]
349pub struct ConfigClient<'a> {
350    db: &'a Reddb,
351    collection: &'a str,
352}
353
354impl<'a> ConfigClient<'a> {
355    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
356        let mut sql = format!(
357            "PUT CONFIG {} {} = {}",
358            kv_identifier(self.collection),
359            kv_identifier(key),
360            kv_value_literal(&value)
361        );
362        append_tag_clause(&mut sql, tags);
363        self.db.query(&sql).await
364    }
365
366    pub async fn put_secret_ref(
367        &self,
368        key: &str,
369        vault_collection: &str,
370        vault_key: &str,
371        tags: &[&str],
372    ) -> Result<QueryResult> {
373        let mut sql = format!(
374            "PUT CONFIG {} {} = SECRET_REF(vault, {}.{})",
375            kv_identifier(self.collection),
376            kv_identifier(key),
377            kv_identifier(vault_collection),
378            kv_identifier(vault_key)
379        );
380        append_tag_clause(&mut sql, tags);
381        self.db.query(&sql).await
382    }
383
384    pub async fn get(&self, key: &str) -> Result<QueryResult> {
385        self.db
386            .query(&format!(
387                "GET CONFIG {} {}",
388                kv_identifier(self.collection),
389                kv_identifier(key)
390            ))
391            .await
392    }
393
394    pub async fn resolve(&self, key: &str) -> Result<QueryResult> {
395        self.db
396            .query(&format!(
397                "RESOLVE CONFIG {} {}",
398                kv_identifier(self.collection),
399                kv_identifier(key)
400            ))
401            .await
402    }
403}
404
405#[derive(Debug)]
406pub struct VaultClient<'a> {
407    db: &'a Reddb,
408    collection: &'a str,
409}
410
411impl<'a> VaultClient<'a> {
412    pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
413        let mut sql = format!(
414            "VAULT PUT {}.{} = {}",
415            kv_identifier(self.collection),
416            kv_identifier(key),
417            kv_value_literal(&value)
418        );
419        append_tag_clause(&mut sql, tags);
420        self.db.query(&sql).await
421    }
422
423    pub async fn get(&self, key: &str) -> Result<QueryResult> {
424        self.db
425            .query(&format!(
426                "VAULT GET {}.{}",
427                kv_identifier(self.collection),
428                kv_identifier(key)
429            ))
430            .await
431    }
432
433    pub async fn unseal(&self, key: &str) -> Result<QueryResult> {
434        self.db
435            .query(&format!(
436                "UNSEAL VAULT {}.{}",
437                kv_identifier(self.collection),
438                kv_identifier(key)
439            ))
440            .await
441    }
442}
443
444fn append_tag_clause(sql: &mut String, tags: &[&str]) {
445    if tags.is_empty() {
446        return;
447    }
448    sql.push_str(" TAGS [");
449    sql.push_str(
450        &tags
451            .iter()
452            .map(|tag| kv_tag_literal(tag))
453            .collect::<Vec<_>>()
454            .join(", "),
455    );
456    sql.push(']');
457}
458
459fn kv_identifier(value: &str) -> String {
460    value
461        .chars()
462        .map(|ch| {
463            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '.' {
464                ch
465            } else {
466                '_'
467            }
468        })
469        .collect()
470}
471
472fn kv_value_literal(value: &JsonValue) -> String {
473    match value {
474        JsonValue::Null => "NULL".to_string(),
475        JsonValue::Bool(value) => value.to_string(),
476        JsonValue::Number(value) => value.to_string(),
477        JsonValue::String(value) => format!("'{}'", value.replace('\'', "''")),
478        JsonValue::Array(_) | JsonValue::Object(_) => {
479            format!("'{}'", value.to_json_string().replace('\'', "''"))
480        }
481    }
482}
483
484fn kv_tag_literal(value: &str) -> String {
485    format!("'{}'", value.replace('\'', "''"))
486}
487
488/// Crate version (matches the engine version when published in lockstep).
489pub fn version() -> &'static str {
490    env!("CARGO_PKG_VERSION")
491}