Skip to main content

reddb_client/
embedded.rs

1//! Embedded backend — wraps the in-process RedDB engine.
2//!
3//! Compiled only when the `embedded` Cargo feature is enabled (default).
4//! When `embedded` is off, this module does not exist and the
5//! [`crate::Reddb`] enum will refuse to construct an embedded variant
6//! at runtime with a clear `FEATURE_DISABLED` error.
7
8use std::path::PathBuf;
9use std::sync::Arc;
10
11use reddb_server::api::RedDBOptions;
12use reddb_server::runtime::RedDBRuntime;
13use reddb_server::storage::query::unified::UnifiedRecord;
14use reddb_server::storage::schema::Value as SchemaValue;
15use reddb_server::RuntimeEntityPort;
16
17use crate::error::{ClientError, ErrorCode, Result};
18use crate::types::{InsertResult, JsonValue, QueryResult, ValueOut};
19
20/// In-process handle to a RedDB engine.
21pub struct EmbeddedClient {
22    runtime: Arc<RedDBRuntime>,
23}
24
25impl std::fmt::Debug for EmbeddedClient {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("EmbeddedClient").finish_non_exhaustive()
28    }
29}
30
31impl EmbeddedClient {
32    /// Open a persistent database at `path`.
33    pub fn open(path: PathBuf) -> Result<Self> {
34        let runtime = RedDBRuntime::with_options(RedDBOptions::persistent(path))
35            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
36        Ok(Self {
37            runtime: Arc::new(runtime),
38        })
39    }
40
41    /// Open an ephemeral, tempfile-backed database. Equivalent to
42    /// `connect("memory://")`.
43    pub fn in_memory() -> Result<Self> {
44        let runtime = RedDBRuntime::in_memory()
45            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
46        Ok(Self {
47            runtime: Arc::new(runtime),
48        })
49    }
50
51    pub fn query(&self, sql: &str) -> Result<QueryResult> {
52        let qr = self
53            .runtime
54            .execute_query(sql)
55            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
56        Ok(map_query_result(&qr))
57    }
58
59    /// Single-row insert. Routes through the same
60    /// `runtime.create_rows_batch_columnar` port that
61    /// [`Self::bulk_insert`] uses (#110), passing a one-row batch.
62    /// Skips `build_insert_sql` + `execute_query`, so this hot
63    /// autocommit path pays zero SQL build / lex / parse / plan cost
64    /// when the collection carries no contract.
65    pub fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
66        let object = payload.as_object().ok_or_else(|| {
67            ClientError::new(
68                ErrorCode::QueryError,
69                "insert payload must be a JSON object".to_string(),
70            )
71        })?;
72        let column_names: Vec<String> = object.iter().map(|(k, _)| k.clone()).collect();
73        let row: Vec<SchemaValue> = object
74            .iter()
75            .map(|(_, v)| json_value_to_schema_value(v))
76            .collect();
77        let count = self
78            .runtime
79            .create_rows_batch_columnar(collection.to_string(), Arc::new(column_names), vec![row])
80            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
81        Ok(InsertResult {
82            affected: count as u64,
83            id: None,
84        })
85    }
86
87    /// Routes through `runtime.create_rows_batch_columnar`, which fast-paths
88    /// to the prevalidated columnar kernel when the collection carries no
89    /// contract — same shape `MSG_BULK_INSERT_BINARY` already uses on the
90    /// wire path. Result: one WAL append per batch instead of one per row,
91    /// no per-row SQL build / lex / parse / plan, and no per-row `(String,
92    /// Value)` tuple materialisation when the collection is contract-free.
93    ///
94    /// Heterogeneous payloads (rows with differing key sets) fall back to
95    /// the per-row `execute_query` path so existing semantics are preserved
96    /// for callers that mix shapes.
97    pub fn bulk_insert(&self, collection: &str, payloads: &[JsonValue]) -> Result<u64> {
98        if payloads.is_empty() {
99            return Ok(0);
100        }
101
102        // Validate every payload is a JSON object up-front. Mirrors the
103        // old loop's error contract.
104        let objects: Vec<&[(String, JsonValue)]> = payloads
105            .iter()
106            .map(|p| {
107                p.as_object().ok_or_else(|| {
108                    ClientError::new(
109                        ErrorCode::QueryError,
110                        "bulk_insert payloads must be JSON objects".to_string(),
111                    )
112                })
113            })
114            .collect::<Result<_>>()?;
115
116        // Columnar fast path requires a uniform schema (same column names in
117        // the same order across every row). When that holds we pay zero
118        // per-row SQL build / lex / parse cost. When it doesn't we fall back
119        // to the per-row loop so heterogeneous workloads stay correct.
120        if uniform_schema(&objects) {
121            let column_names: Vec<String> = objects[0].iter().map(|(k, _)| k.clone()).collect();
122            let ncols = column_names.len();
123            let mut rows: Vec<Vec<SchemaValue>> = Vec::with_capacity(objects.len());
124            for obj in &objects {
125                let mut values = Vec::with_capacity(ncols);
126                for (_, v) in obj.iter() {
127                    values.push(json_value_to_schema_value(v));
128                }
129                rows.push(values);
130            }
131            let count = self
132                .runtime
133                .create_rows_batch_columnar(collection.to_string(), Arc::new(column_names), rows)
134                .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
135            return Ok(count as u64);
136        }
137
138        // Fallback: heterogeneous shapes. Per-row `execute_query` retains
139        // the old behaviour for mixed-key payloads.
140        let mut total = 0u64;
141        for object in &objects {
142            let sql = build_insert_sql(collection, object);
143            let qr = self
144                .runtime
145                .execute_query(&sql)
146                .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
147            total += qr.affected_rows;
148        }
149        Ok(total)
150    }
151
152    pub fn delete(&self, collection: &str, id: &str) -> Result<u64> {
153        let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
154        let qr = self
155            .runtime
156            .execute_query(&sql)
157            .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
158        Ok(qr.affected_rows)
159    }
160
161    pub fn close(&self) -> Result<()> {
162        self.runtime
163            .checkpoint()
164            .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))
165    }
166
167    pub fn version() -> &'static str {
168        env!("CARGO_PKG_VERSION")
169    }
170}
171
172/// True when every row carries the same column names in the same order.
173/// The columnar `create_rows_batch_columnar` port requires one shared
174/// `Arc<Vec<String>>` schema, so heterogeneous payloads have to fall
175/// back to the per-row path.
176fn uniform_schema(objects: &[&[(String, JsonValue)]]) -> bool {
177    let Some((first, rest)) = objects.split_first() else {
178        return true;
179    };
180    let ncols = first.len();
181    for row in rest {
182        if row.len() != ncols {
183            return false;
184        }
185        for ((k1, _), (k2, _)) in first.iter().zip(row.iter()) {
186            if k1 != k2 {
187                return false;
188            }
189        }
190    }
191    true
192}
193
194/// Best-effort `JsonValue` → `SchemaValue` coercion for the columnar
195/// fast path. The contract-free branch in `create_rows_batch_columnar`
196/// stores values without column-type normalisation, so the choice here
197/// only affects what the row reads back as. The mapping mirrors what
198/// `value_to_sql_literal` would have produced through the SQL parser:
199/// integers stay integers, fractional numbers stay floats, arrays /
200/// objects are JSON-encoded into a `Text` value (the parser would have
201/// quoted them too, so the on-disk shape matches).
202fn json_value_to_schema_value(v: &JsonValue) -> SchemaValue {
203    match v {
204        JsonValue::Null => SchemaValue::Null,
205        JsonValue::Bool(b) => SchemaValue::Boolean(*b),
206        JsonValue::Number(n) => {
207            if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
208                SchemaValue::Integer(*n as i64)
209            } else {
210                SchemaValue::Float(*n)
211            }
212        }
213        JsonValue::String(s) => SchemaValue::Text(std::sync::Arc::from(s.as_str())),
214        JsonValue::Array(_) | JsonValue::Object(_) => {
215            SchemaValue::Text(std::sync::Arc::from(v.to_json_string()))
216        }
217    }
218}
219
220fn build_insert_sql(collection: &str, object: &[(String, JsonValue)]) -> String {
221    let mut cols = Vec::new();
222    let mut vals = Vec::new();
223    for (k, v) in object {
224        cols.push(k.clone());
225        vals.push(value_to_sql_literal(v));
226    }
227    format!(
228        "INSERT INTO {collection} ({}) VALUES ({})",
229        cols.join(", "),
230        vals.join(", "),
231    )
232}
233
234fn value_to_sql_literal(v: &JsonValue) -> String {
235    match v {
236        JsonValue::Null => "NULL".to_string(),
237        JsonValue::Bool(b) => b.to_string(),
238        JsonValue::Number(n) => {
239            if n.fract() == 0.0 {
240                format!("{}", *n as i64)
241            } else {
242                n.to_string()
243            }
244        }
245        JsonValue::String(s) => format!("'{}'", s.replace('\'', "''")),
246        JsonValue::Array(_) | JsonValue::Object(_) => {
247            format!("'{}'", v.to_json_string().replace('\'', "''"))
248        }
249    }
250}
251
252fn map_query_result(qr: &reddb_server::runtime::RuntimeQueryResult) -> QueryResult {
253    let columns: Vec<String> = qr
254        .result
255        .records
256        .first()
257        .map(|r| {
258            let mut keys: Vec<String> = r.column_names().iter().map(|k| k.to_string()).collect();
259            keys.sort();
260            keys
261        })
262        .unwrap_or_default();
263
264    let rows: Vec<Vec<(String, ValueOut)>> =
265        qr.result.records.iter().map(record_to_pairs).collect();
266
267    QueryResult {
268        statement: qr.statement_type.to_string(),
269        affected: qr.affected_rows,
270        columns,
271        rows,
272    }
273}
274
275fn record_to_pairs(record: &UnifiedRecord) -> Vec<(String, ValueOut)> {
276    let mut entries: Vec<(&str, &SchemaValue)> =
277        record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
278    entries.sort_by(|a, b| a.0.cmp(b.0));
279    entries
280        .into_iter()
281        .map(|(k, v)| (k.to_string(), schema_value_to_value_out(v)))
282        .collect()
283}
284
285fn schema_value_to_value_out(v: &SchemaValue) -> ValueOut {
286    match v {
287        SchemaValue::Null => ValueOut::Null,
288        SchemaValue::Boolean(b) => ValueOut::Bool(*b),
289        SchemaValue::Integer(n) => ValueOut::Integer(*n),
290        SchemaValue::UnsignedInteger(n) => ValueOut::Integer(*n as i64),
291        SchemaValue::Float(n) => ValueOut::Float(*n),
292        SchemaValue::BigInt(n) => ValueOut::Integer(*n),
293        SchemaValue::TimestampMs(n)
294        | SchemaValue::Timestamp(n)
295        | SchemaValue::Duration(n)
296        | SchemaValue::Decimal(n) => ValueOut::Integer(*n),
297        SchemaValue::Password(_) | SchemaValue::Secret(_) => ValueOut::String("***".to_string()),
298        SchemaValue::Text(s) => ValueOut::String(s.to_string()),
299        SchemaValue::Email(s)
300        | SchemaValue::Url(s)
301        | SchemaValue::NodeRef(s)
302        | SchemaValue::EdgeRef(s) => ValueOut::String(s.clone()),
303        other => ValueOut::String(format!("{other}")),
304    }
305}