Skip to main content

reddb_client/
embedded.rs

1//! Embedded backend — wraps the in-process RedDB engine.
2//!
3//! Compiled only when the `embedded` Cargo feature is enabled (default).
4//! When `embedded` is off, this module does not exist and the
5//! [`crate::Reddb`] enum will refuse to construct an embedded variant
6//! at runtime with a clear `FEATURE_DISABLED` error.
7
8use std::path::PathBuf;
9use std::sync::Arc;
10
11use reddb_server::api::RedDBOptions;
12use reddb_server::runtime::RedDBRuntime;
13use reddb_server::storage::query::unified::UnifiedRecord;
14use reddb_server::storage::schema::Value as SchemaValue;
15
16use crate::error::{ClientError, ErrorCode, Result};
17use crate::params::IntoParams;
18use crate::types::{BulkInsertResult, InsertResult, JsonValue, QueryResult, ValueOut};
19
20/// In-process handle to a RedDB engine.
21pub struct EmbeddedClient {
22    runtime: Arc<RedDBRuntime>,
23}
24
25impl std::fmt::Debug for EmbeddedClient {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("EmbeddedClient").finish_non_exhaustive()
28    }
29}
30
31impl EmbeddedClient {
32    /// Open a persistent database at `path`.
33    pub fn open(path: PathBuf) -> Result<Self> {
34        let runtime = RedDBRuntime::with_options(RedDBOptions::persistent(path))
35            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
36        Ok(Self {
37            runtime: Arc::new(runtime),
38        })
39    }
40
41    /// Open an ephemeral, tempfile-backed database. Equivalent to
42    /// `connect("memory://")`.
43    pub fn in_memory() -> Result<Self> {
44        let runtime = RedDBRuntime::in_memory()
45            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
46        Ok(Self {
47            runtime: Arc::new(runtime),
48        })
49    }
50
51    pub fn query(&self, sql: &str) -> Result<QueryResult> {
52        let qr = self
53            .runtime
54            .execute_query(sql)
55            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
56        Ok(map_query_result(&qr))
57    }
58
59    /// Parameterized embedded query — see [`crate::Reddb::query_with`].
60    /// Empty `params` short-circuits to the legacy `execute_query` fast
61    /// path so the parameter-less hot path pays zero overhead.
62    pub fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
63        let params = params.into_params();
64        if params.is_empty() {
65            return self.query(sql);
66        }
67        use reddb_server::storage::query::modes::parse_multi;
68        use reddb_server::storage::query::user_params;
69        let binds: Vec<SchemaValue> = params
70            .iter()
71            .cloned()
72            .map(crate::params::Value::into_schema_value)
73            .collect();
74        let parsed =
75            parse_multi(sql).map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
76        let bound = user_params::bind(&parsed, &binds)
77            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
78        let qr = self
79            .runtime
80            .execute_query_expr(bound)
81            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
82        Ok(map_query_result(&qr))
83    }
84
85    /// Parameterized execution for INSERT / UPDATE / DELETE. RedDB uses the
86    /// same result envelope for DML as for SELECT, so this forwards to
87    /// [`Self::query_with`].
88    pub fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
89        self.query_with(sql, params)
90    }
91
92    /// Single-row insert. Routes through the same
93    /// `runtime.create_rows_batch_columnar` port that
94    /// [`Self::bulk_insert`] uses (#110), passing a one-row batch.
95    /// Skips `build_insert_sql` + `execute_query`, so this hot
96    /// autocommit path pays zero SQL build / lex / parse / plan cost
97    /// when the collection carries no contract.
98    pub fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
99        let object = payload.as_object().ok_or_else(|| {
100            ClientError::new(
101                ErrorCode::QueryError,
102                "insert payload must be a JSON object".to_string(),
103            )
104        })?;
105        let column_names: Vec<String> = object.iter().map(|(k, _)| k.clone()).collect();
106        let row: Vec<SchemaValue> = object
107            .iter()
108            .map(|(_, v)| json_value_to_schema_value(v))
109            .collect();
110        let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
111            self.runtime.as_ref(),
112            collection.to_string(),
113            Arc::new(column_names),
114            vec![row],
115        )
116        .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
117        Ok(InsertResult {
118            affected: outputs.len() as u64,
119            id: outputs.first().map(|output| output.id.raw().to_string()),
120        })
121    }
122
123    /// Routes through `runtime.create_rows_batch_columnar`, which fast-paths
124    /// to the prevalidated columnar kernel when the collection carries no
125    /// contract — same shape `MSG_BULK_INSERT_BINARY` already uses on the
126    /// wire path. Result: one WAL append per batch instead of one per row,
127    /// no per-row SQL build / lex / parse / plan, and no per-row `(String,
128    /// Value)` tuple materialisation when the collection is contract-free.
129    ///
130    /// Heterogeneous payloads (rows with differing key sets) fall back to
131    /// the per-row `execute_query` path so existing semantics are preserved
132    /// for callers that mix shapes.
133    pub fn bulk_insert(
134        &self,
135        collection: &str,
136        payloads: &[JsonValue],
137    ) -> Result<BulkInsertResult> {
138        if payloads.is_empty() {
139            return Ok(BulkInsertResult {
140                affected: 0,
141                ids: Vec::new(),
142            });
143        }
144
145        // Validate every payload is a JSON object up-front. Mirrors the
146        // old loop's error contract.
147        let objects: Vec<&[(String, JsonValue)]> = payloads
148            .iter()
149            .map(|p| {
150                p.as_object().ok_or_else(|| {
151                    ClientError::new(
152                        ErrorCode::QueryError,
153                        "bulk_insert payloads must be JSON objects".to_string(),
154                    )
155                })
156            })
157            .collect::<Result<_>>()?;
158
159        // Columnar fast path requires a uniform schema (same column names in
160        // the same order across every row). When that holds we pay zero
161        // per-row SQL build / lex / parse cost. When it doesn't we fall back
162        // to the per-row loop so heterogeneous workloads stay correct.
163        if uniform_schema(&objects) {
164            let column_names: Vec<String> = objects[0].iter().map(|(k, _)| k.clone()).collect();
165            let ncols = column_names.len();
166            let mut rows: Vec<Vec<SchemaValue>> = Vec::with_capacity(objects.len());
167            for obj in &objects {
168                let mut values = Vec::with_capacity(ncols);
169                for (_, v) in obj.iter() {
170                    values.push(json_value_to_schema_value(v));
171                }
172                rows.push(values);
173            }
174            let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
175                self.runtime.as_ref(),
176                collection.to_string(),
177                Arc::new(column_names),
178                rows,
179            )
180            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
181            let ids = outputs
182                .iter()
183                .map(|output| output.id.raw().to_string())
184                .collect();
185            return Ok(BulkInsertResult {
186                affected: outputs.len() as u64,
187                ids,
188            });
189        }
190
191        // Fallback: heterogeneous shapes. Per-row inserts retain existing
192        // semantics for mixed-key payloads while still surfacing generated ids.
193        let mut affected = 0u64;
194        let mut ids = Vec::with_capacity(objects.len());
195        for object in &objects {
196            let payload = JsonValue::Object(object.to_vec());
197            let result = self.insert(collection, &payload)?;
198            affected += result.affected;
199            if let Some(id) = result.id {
200                ids.push(id);
201            }
202        }
203        Ok(BulkInsertResult { affected, ids })
204    }
205
206    pub fn delete(&self, collection: &str, id: &str) -> Result<u64> {
207        let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
208        let qr = self
209            .runtime
210            .execute_query(&sql)
211            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
212        Ok(qr.affected_rows)
213    }
214
215    pub fn close(&self) -> Result<()> {
216        self.runtime
217            .checkpoint()
218            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))
219    }
220
221    pub fn version() -> &'static str {
222        env!("CARGO_PKG_VERSION")
223    }
224}
225
226/// True when every row carries the same column names in the same order.
227/// The columnar `create_rows_batch_columnar` port requires one shared
228/// `Arc<Vec<String>>` schema, so heterogeneous payloads have to fall
229/// back to the per-row path.
230fn uniform_schema(objects: &[&[(String, JsonValue)]]) -> bool {
231    let Some((first, rest)) = objects.split_first() else {
232        return true;
233    };
234    let ncols = first.len();
235    for row in rest {
236        if row.len() != ncols {
237            return false;
238        }
239        for ((k1, _), (k2, _)) in first.iter().zip(row.iter()) {
240            if k1 != k2 {
241                return false;
242            }
243        }
244    }
245    true
246}
247
248/// Best-effort `JsonValue` → `SchemaValue` coercion for the columnar
249/// fast path. The contract-free branch in `create_rows_batch_columnar`
250/// stores values without column-type normalisation, so the choice here
251/// only affects what the row reads back as. The mapping mirrors what
252/// `value_to_sql_literal` would have produced through the SQL parser:
253/// integers stay integers, fractional numbers stay floats, arrays /
254/// objects are JSON-encoded into a `Text` value (the parser would have
255/// quoted them too, so the on-disk shape matches).
256fn json_value_to_schema_value(v: &JsonValue) -> SchemaValue {
257    match v {
258        JsonValue::Null => SchemaValue::Null,
259        JsonValue::Bool(b) => SchemaValue::Boolean(*b),
260        JsonValue::Number(n) => {
261            if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
262                SchemaValue::Integer(*n as i64)
263            } else {
264                SchemaValue::Float(*n)
265            }
266        }
267        JsonValue::String(s) => SchemaValue::Text(std::sync::Arc::from(s.as_str())),
268        JsonValue::Array(_) | JsonValue::Object(_) => {
269            SchemaValue::Text(std::sync::Arc::from(v.to_json_string()))
270        }
271    }
272}
273
274fn map_query_result(qr: &reddb_server::runtime::RuntimeQueryResult) -> QueryResult {
275    let columns: Vec<String> = qr
276        .result
277        .records
278        .first()
279        .map(|r| {
280            let mut keys: Vec<String> = r.column_names().iter().map(|k| k.to_string()).collect();
281            keys.sort();
282            keys
283        })
284        .unwrap_or_default();
285
286    let rows: Vec<Vec<(String, ValueOut)>> =
287        qr.result.records.iter().map(record_to_pairs).collect();
288
289    QueryResult {
290        statement: qr.statement_type.to_string(),
291        affected: qr.affected_rows,
292        columns,
293        rows,
294    }
295}
296
297fn record_to_pairs(record: &UnifiedRecord) -> Vec<(String, ValueOut)> {
298    let mut entries: Vec<(&str, &SchemaValue)> =
299        record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
300    entries.sort_by(|a, b| a.0.cmp(b.0));
301    entries
302        .into_iter()
303        .map(|(k, v)| (k.to_string(), schema_value_to_value_out(v)))
304        .collect()
305}
306
307fn schema_value_to_value_out(v: &SchemaValue) -> ValueOut {
308    match v {
309        SchemaValue::Null => ValueOut::Null,
310        SchemaValue::Boolean(b) => ValueOut::Bool(*b),
311        SchemaValue::Integer(n) => ValueOut::Integer(*n),
312        SchemaValue::UnsignedInteger(n) => ValueOut::Integer(*n as i64),
313        SchemaValue::Float(n) => ValueOut::Float(*n),
314        SchemaValue::BigInt(n) => ValueOut::Integer(*n),
315        SchemaValue::TimestampMs(n)
316        | SchemaValue::Timestamp(n)
317        | SchemaValue::Duration(n)
318        | SchemaValue::Decimal(n) => ValueOut::Integer(*n),
319        SchemaValue::Password(_) | SchemaValue::Secret(_) => ValueOut::String("***".to_string()),
320        SchemaValue::Text(s) => ValueOut::String(s.to_string()),
321        SchemaValue::Email(s)
322        | SchemaValue::Url(s)
323        | SchemaValue::NodeRef(s)
324        | SchemaValue::EdgeRef(s) => ValueOut::String(s.clone()),
325        other => ValueOut::String(format!("{other}")),
326    }
327}