Skip to main content

reddb_client/
embedded.rs

1//! Embedded backend — wraps the in-process RedDB engine.
2//!
3//! Compiled only when the `embedded` Cargo feature is enabled (default).
4//! When `embedded` is off, this module does not exist and the
5//! [`crate::Reddb`] enum will refuse to construct an embedded variant
6//! at runtime with a clear `FEATURE_DISABLED` error.
7
8use std::path::PathBuf;
9use std::sync::Arc;
10
11use reddb_server::api::RedDBOptions;
12use reddb_server::runtime::RedDBRuntime;
13use reddb_server::storage::query::unified::UnifiedRecord;
14use reddb_server::storage::schema::Value as SchemaValue;
15
16use crate::error::{ClientError, ErrorCode, Result};
17use crate::params::IntoParams;
18use crate::types::{BulkInsertResult, InsertResult, JsonValue, QueryResult, ValueOut};
19
20/// In-process handle to a RedDB engine.
21pub struct EmbeddedClient {
22    runtime: Arc<RedDBRuntime>,
23}
24
25impl std::fmt::Debug for EmbeddedClient {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("EmbeddedClient").finish_non_exhaustive()
28    }
29}
30
31impl EmbeddedClient {
32    /// Open a persistent database at `path`.
33    pub fn open(path: PathBuf) -> Result<Self> {
34        let runtime = RedDBRuntime::with_options(RedDBOptions::persistent(path))
35            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
36        Ok(Self {
37            runtime: Arc::new(runtime),
38        })
39    }
40
41    /// Open an ephemeral, tempfile-backed database. Equivalent to
42    /// `connect("memory://")`.
43    pub fn in_memory() -> Result<Self> {
44        let runtime = RedDBRuntime::in_memory()
45            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
46        Ok(Self {
47            runtime: Arc::new(runtime),
48        })
49    }
50
51    pub fn query(&self, sql: &str) -> Result<QueryResult> {
52        let qr = self
53            .runtime
54            .execute_query(sql)
55            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
56        Ok(map_query_result(&qr))
57    }
58
59    /// Parameterized embedded query — see [`crate::Reddb::query_with`].
60    /// Empty `params` short-circuits to the legacy `execute_query` fast
61    /// path so the parameter-less hot path pays zero overhead.
62    pub fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
63        let params = params.into_params();
64        if params.is_empty() {
65            return self.query(sql);
66        }
67        use reddb_server::storage::query::modes::parse_multi;
68        use reddb_server::storage::query::user_params;
69        let binds: Vec<SchemaValue> = params
70            .iter()
71            .cloned()
72            .map(crate::params::Value::into_schema_value)
73            .collect();
74        let parsed =
75            parse_multi(sql).map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
76        let bound = user_params::bind(&parsed, &binds)
77            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
78        let qr = self
79            .runtime
80            .execute_query_expr(bound)
81            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
82        Ok(map_query_result(&qr))
83    }
84
85    /// Parameterized execution for INSERT / UPDATE / DELETE. RedDB uses the
86    /// same result envelope for DML as for SELECT, so this forwards to
87    /// [`Self::query_with`].
88    pub fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
89        self.query_with(sql, params)
90    }
91
92    /// Single-row insert. Routes through the same
93    /// `runtime.create_rows_batch_columnar` port that
94    /// [`Self::bulk_insert`] uses (#110), passing a one-row batch.
95    /// Skips `build_insert_sql` + `execute_query`, so this hot
96    /// autocommit path pays zero SQL build / lex / parse / plan cost
97    /// when the collection carries no contract.
98    pub fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
99        let object = payload.as_object().ok_or_else(|| {
100            ClientError::new(
101                ErrorCode::QueryError,
102                "insert payload must be a JSON object".to_string(),
103            )
104        })?;
105        let column_names: Vec<String> = object.iter().map(|(k, _)| k.clone()).collect();
106        let row: Vec<SchemaValue> = object
107            .iter()
108            .map(|(_, v)| json_value_to_schema_value(v))
109            .collect();
110        let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
111            self.runtime.as_ref(),
112            collection.to_string(),
113            Arc::new(column_names),
114            vec![row],
115        )
116        .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
117        let rid = outputs.first().map(|output| output.id.raw().to_string());
118        Ok(InsertResult {
119            affected: outputs.len() as u64,
120            rid: rid.clone(),
121            id: rid,
122        })
123    }
124
125    /// Routes through `runtime.create_rows_batch_columnar`, which fast-paths
126    /// to the prevalidated columnar kernel when the collection carries no
127    /// contract — same shape `MSG_BULK_INSERT_BINARY` already uses on the
128    /// wire path. Result: one WAL append per batch instead of one per row,
129    /// no per-row SQL build / lex / parse / plan, and no per-row `(String,
130    /// Value)` tuple materialisation when the collection is contract-free.
131    ///
132    /// Heterogeneous payloads (rows with differing key sets) fall back to
133    /// the per-row `execute_query` path so existing semantics are preserved
134    /// for callers that mix shapes.
135    pub fn bulk_insert(
136        &self,
137        collection: &str,
138        payloads: &[JsonValue],
139    ) -> Result<BulkInsertResult> {
140        if payloads.is_empty() {
141            return Ok(BulkInsertResult {
142                affected: 0,
143                rids: Vec::new(),
144                ids: Vec::new(),
145            });
146        }
147
148        // Validate every payload is a JSON object up-front. Mirrors the
149        // old loop's error contract.
150        let objects: Vec<&[(String, JsonValue)]> = payloads
151            .iter()
152            .map(|p| {
153                p.as_object().ok_or_else(|| {
154                    ClientError::new(
155                        ErrorCode::QueryError,
156                        "bulk_insert payloads must be JSON objects".to_string(),
157                    )
158                })
159            })
160            .collect::<Result<_>>()?;
161
162        // Columnar fast path requires a uniform schema (same column names in
163        // the same order across every row). When that holds we pay zero
164        // per-row SQL build / lex / parse cost. When it doesn't we fall back
165        // to the per-row loop so heterogeneous workloads stay correct.
166        if uniform_schema(&objects) {
167            let column_names: Vec<String> = objects[0].iter().map(|(k, _)| k.clone()).collect();
168            let ncols = column_names.len();
169            let mut rows: Vec<Vec<SchemaValue>> = Vec::with_capacity(objects.len());
170            for obj in &objects {
171                let mut values = Vec::with_capacity(ncols);
172                for (_, v) in obj.iter() {
173                    values.push(json_value_to_schema_value(v));
174                }
175                rows.push(values);
176            }
177            let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
178                self.runtime.as_ref(),
179                collection.to_string(),
180                Arc::new(column_names),
181                rows,
182            )
183            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
184            let rids: Vec<String> = outputs
185                .iter()
186                .map(|output| output.id.raw().to_string())
187                .collect();
188            return Ok(BulkInsertResult {
189                affected: outputs.len() as u64,
190                rids: rids.clone(),
191                ids: rids,
192            });
193        }
194
195        // Fallback: heterogeneous shapes. Per-row inserts retain existing
196        // semantics for mixed-key payloads while still surfacing generated ids.
197        let mut affected = 0u64;
198        let mut ids = Vec::with_capacity(objects.len());
199        for object in &objects {
200            let payload = JsonValue::Object(object.to_vec());
201            let result = self.insert(collection, &payload)?;
202            affected += result.affected;
203            if let Some(id) = result.id {
204                ids.push(id);
205            }
206        }
207        Ok(BulkInsertResult {
208            affected,
209            rids: ids.clone(),
210            ids,
211        })
212    }
213
214    pub fn delete(&self, collection: &str, id: &str) -> Result<u64> {
215        let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
216        let qr = self
217            .runtime
218            .execute_query(&sql)
219            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
220        Ok(qr.affected_rows)
221    }
222
223    pub fn close(&self) -> Result<()> {
224        self.runtime
225            .checkpoint()
226            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))
227    }
228
229    pub fn version() -> &'static str {
230        env!("CARGO_PKG_VERSION")
231    }
232}
233
234/// True when every row carries the same column names in the same order.
235/// The columnar `create_rows_batch_columnar` port requires one shared
236/// `Arc<Vec<String>>` schema, so heterogeneous payloads have to fall
237/// back to the per-row path.
238fn uniform_schema(objects: &[&[(String, JsonValue)]]) -> bool {
239    let Some((first, rest)) = objects.split_first() else {
240        return true;
241    };
242    let ncols = first.len();
243    for row in rest {
244        if row.len() != ncols {
245            return false;
246        }
247        for ((k1, _), (k2, _)) in first.iter().zip(row.iter()) {
248            if k1 != k2 {
249                return false;
250            }
251        }
252    }
253    true
254}
255
256/// Best-effort `JsonValue` → `SchemaValue` coercion for the columnar
257/// fast path. The contract-free branch in `create_rows_batch_columnar`
258/// stores values without column-type normalisation, so the choice here
259/// only affects what the row reads back as. The mapping mirrors what
260/// `value_to_sql_literal` would have produced through the SQL parser:
261/// integers stay integers, fractional numbers stay floats, arrays /
262/// objects are JSON-encoded into a `Text` value (the parser would have
263/// quoted them too, so the on-disk shape matches).
264fn json_value_to_schema_value(v: &JsonValue) -> SchemaValue {
265    match v {
266        JsonValue::Null => SchemaValue::Null,
267        JsonValue::Bool(b) => SchemaValue::Boolean(*b),
268        JsonValue::Number(n) => {
269            if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
270                SchemaValue::Integer(*n as i64)
271            } else {
272                SchemaValue::Float(*n)
273            }
274        }
275        JsonValue::String(s) => SchemaValue::Text(std::sync::Arc::from(s.as_str())),
276        JsonValue::Array(_) | JsonValue::Object(_) => {
277            SchemaValue::Text(std::sync::Arc::from(v.to_json_string()))
278        }
279    }
280}
281
282fn map_query_result(qr: &reddb_server::runtime::RuntimeQueryResult) -> QueryResult {
283    let columns: Vec<String> = qr
284        .result
285        .records
286        .first()
287        .map(|r| {
288            let mut keys: Vec<String> = r.column_names().iter().map(|k| k.to_string()).collect();
289            keys.sort();
290            keys
291        })
292        .unwrap_or_default();
293
294    let rows: Vec<Vec<(String, ValueOut)>> =
295        qr.result.records.iter().map(record_to_pairs).collect();
296
297    QueryResult {
298        statement: qr.statement_type.to_string(),
299        affected: qr.affected_rows,
300        columns,
301        rows,
302    }
303}
304
305fn record_to_pairs(record: &UnifiedRecord) -> Vec<(String, ValueOut)> {
306    let mut entries: Vec<(&str, &SchemaValue)> =
307        record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
308    entries.sort_by(|a, b| a.0.cmp(b.0));
309    entries
310        .into_iter()
311        .map(|(k, v)| (k.to_string(), schema_value_to_value_out(v)))
312        .collect()
313}
314
315fn schema_value_to_value_out(v: &SchemaValue) -> ValueOut {
316    match v {
317        SchemaValue::Null => ValueOut::Null,
318        SchemaValue::Boolean(b) => ValueOut::Bool(*b),
319        SchemaValue::Integer(n) => ValueOut::Integer(*n),
320        SchemaValue::UnsignedInteger(n) => ValueOut::Integer(*n as i64),
321        SchemaValue::Float(n) => ValueOut::Float(*n),
322        SchemaValue::BigInt(n) => ValueOut::Integer(*n),
323        SchemaValue::TimestampMs(n)
324        | SchemaValue::Timestamp(n)
325        | SchemaValue::Duration(n)
326        | SchemaValue::Decimal(n) => ValueOut::Integer(*n),
327        SchemaValue::Password(_) | SchemaValue::Secret(_) => ValueOut::String("***".to_string()),
328        SchemaValue::Text(s) => ValueOut::String(s.to_string()),
329        SchemaValue::Email(s)
330        | SchemaValue::Url(s)
331        | SchemaValue::NodeRef(s)
332        | SchemaValue::EdgeRef(s) => ValueOut::String(s.clone()),
333        other => ValueOut::String(format!("{other}")),
334    }
335}