1use std::path::PathBuf;
9use std::sync::Arc;
10
11use reddb_server::api::RedDBOptions;
12use reddb_server::runtime::RedDBRuntime;
13use reddb_server::storage::query::unified::UnifiedRecord;
14use reddb_server::storage::schema::Value as SchemaValue;
15
16use crate::error::{ClientError, ErrorCode, Result};
17use crate::params::IntoParams;
18use crate::types::{BulkInsertResult, InsertResult, JsonValue, QueryResult, ValueOut};
19
20pub struct EmbeddedClient {
22 runtime: Arc<RedDBRuntime>,
23}
24
25impl std::fmt::Debug for EmbeddedClient {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("EmbeddedClient").finish_non_exhaustive()
28 }
29}
30
31impl EmbeddedClient {
32 pub fn open(path: PathBuf) -> Result<Self> {
34 let runtime = RedDBRuntime::with_options(RedDBOptions::persistent(path))
35 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
36 Ok(Self {
37 runtime: Arc::new(runtime),
38 })
39 }
40
41 pub fn in_memory() -> Result<Self> {
44 let runtime = RedDBRuntime::in_memory()
45 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
46 Ok(Self {
47 runtime: Arc::new(runtime),
48 })
49 }
50
51 pub fn query(&self, sql: &str) -> Result<QueryResult> {
52 let qr = self
53 .runtime
54 .execute_query(sql)
55 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
56 Ok(map_query_result(&qr))
57 }
58
59 pub fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
63 let params = params.into_params();
64 if params.is_empty() {
65 return self.query(sql);
66 }
67 use reddb_server::storage::query::modes::parse_multi;
68 use reddb_server::storage::query::user_params;
69 let binds: Vec<SchemaValue> = params
70 .iter()
71 .cloned()
72 .map(crate::params::Value::into_schema_value)
73 .collect();
74 let parsed =
75 parse_multi(sql).map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
76 let bound = user_params::bind(&parsed, &binds)
77 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
78 let qr = self
79 .runtime
80 .execute_query_expr(bound)
81 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
82 Ok(map_query_result(&qr))
83 }
84
85 pub fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
89 self.query_with(sql, params)
90 }
91
92 pub fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
99 let object = payload.as_object().ok_or_else(|| {
100 ClientError::new(
101 ErrorCode::QueryError,
102 "insert payload must be a JSON object".to_string(),
103 )
104 })?;
105 let column_names: Vec<String> = object.iter().map(|(k, _)| k.clone()).collect();
106 let row: Vec<SchemaValue> = object
107 .iter()
108 .map(|(_, v)| json_value_to_schema_value(v))
109 .collect();
110 let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
111 self.runtime.as_ref(),
112 collection.to_string(),
113 Arc::new(column_names),
114 vec![row],
115 )
116 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
117 let rid = outputs.first().map(|output| output.id.raw().to_string());
118 Ok(InsertResult {
119 affected: outputs.len() as u64,
120 rid: rid.clone(),
121 id: rid,
122 })
123 }
124
125 pub fn bulk_insert(
136 &self,
137 collection: &str,
138 payloads: &[JsonValue],
139 ) -> Result<BulkInsertResult> {
140 if payloads.is_empty() {
141 return Ok(BulkInsertResult {
142 affected: 0,
143 rids: Vec::new(),
144 ids: Vec::new(),
145 });
146 }
147
148 let objects: Vec<&[(String, JsonValue)]> = payloads
151 .iter()
152 .map(|p| {
153 p.as_object().ok_or_else(|| {
154 ClientError::new(
155 ErrorCode::QueryError,
156 "bulk_insert payloads must be JSON objects".to_string(),
157 )
158 })
159 })
160 .collect::<Result<_>>()?;
161
162 if uniform_schema(&objects) {
167 let column_names: Vec<String> = objects[0].iter().map(|(k, _)| k.clone()).collect();
168 let ncols = column_names.len();
169 let mut rows: Vec<Vec<SchemaValue>> = Vec::with_capacity(objects.len());
170 for obj in &objects {
171 let mut values = Vec::with_capacity(ncols);
172 for (_, v) in obj.iter() {
173 values.push(json_value_to_schema_value(v));
174 }
175 rows.push(values);
176 }
177 let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
178 self.runtime.as_ref(),
179 collection.to_string(),
180 Arc::new(column_names),
181 rows,
182 )
183 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
184 let rids: Vec<String> = outputs
185 .iter()
186 .map(|output| output.id.raw().to_string())
187 .collect();
188 return Ok(BulkInsertResult {
189 affected: outputs.len() as u64,
190 rids: rids.clone(),
191 ids: rids,
192 });
193 }
194
195 let mut affected = 0u64;
198 let mut ids = Vec::with_capacity(objects.len());
199 for object in &objects {
200 let payload = JsonValue::Object(object.to_vec());
201 let result = self.insert(collection, &payload)?;
202 affected += result.affected;
203 if let Some(id) = result.id {
204 ids.push(id);
205 }
206 }
207 Ok(BulkInsertResult {
208 affected,
209 rids: ids.clone(),
210 ids,
211 })
212 }
213
214 pub fn delete(&self, collection: &str, rid: &str) -> Result<u64> {
215 let rid = rid.replace('\'', "''");
216 let sql = format!("DELETE FROM {collection} WHERE rid = '{rid}'");
217 let qr = self
218 .runtime
219 .execute_query(&sql)
220 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
221 Ok(qr.affected_rows)
222 }
223
224 pub fn close(&self) -> Result<()> {
225 self.runtime
226 .checkpoint()
227 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))
228 }
229
230 pub fn version() -> &'static str {
231 env!("CARGO_PKG_VERSION")
232 }
233}
234
235fn uniform_schema(objects: &[&[(String, JsonValue)]]) -> bool {
240 let Some((first, rest)) = objects.split_first() else {
241 return true;
242 };
243 let ncols = first.len();
244 for row in rest {
245 if row.len() != ncols {
246 return false;
247 }
248 for ((k1, _), (k2, _)) in first.iter().zip(row.iter()) {
249 if k1 != k2 {
250 return false;
251 }
252 }
253 }
254 true
255}
256
257fn json_value_to_schema_value(v: &JsonValue) -> SchemaValue {
266 match v {
267 JsonValue::Null => SchemaValue::Null,
268 JsonValue::Bool(b) => SchemaValue::Boolean(*b),
269 JsonValue::Number(n) => {
270 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
271 SchemaValue::Integer(*n as i64)
272 } else {
273 SchemaValue::Float(*n)
274 }
275 }
276 JsonValue::String(s) => SchemaValue::Text(std::sync::Arc::from(s.as_str())),
277 JsonValue::Array(_) | JsonValue::Object(_) => {
278 SchemaValue::Text(std::sync::Arc::from(v.to_json_string()))
279 }
280 }
281}
282
283fn map_query_result(qr: &reddb_server::runtime::RuntimeQueryResult) -> QueryResult {
284 let columns: Vec<String> = qr
285 .result
286 .records
287 .first()
288 .map(|r| {
289 let mut keys: Vec<String> = r.column_names().iter().map(|k| k.to_string()).collect();
290 keys.sort();
291 keys
292 })
293 .unwrap_or_default();
294
295 let rows: Vec<Vec<(String, ValueOut)>> =
296 qr.result.records.iter().map(record_to_pairs).collect();
297
298 QueryResult {
299 statement: qr.statement_type.to_string(),
300 affected: qr.affected_rows,
301 columns,
302 rows,
303 }
304}
305
306fn record_to_pairs(record: &UnifiedRecord) -> Vec<(String, ValueOut)> {
307 let mut entries: Vec<(&str, &SchemaValue)> =
308 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
309 entries.sort_by(|a, b| a.0.cmp(b.0));
310 entries
311 .into_iter()
312 .map(|(k, v)| (k.to_string(), schema_value_to_value_out(v)))
313 .collect()
314}
315
316fn schema_value_to_value_out(v: &SchemaValue) -> ValueOut {
317 match v {
318 SchemaValue::Null => ValueOut::Null,
319 SchemaValue::Boolean(b) => ValueOut::Bool(*b),
320 SchemaValue::Integer(n) => ValueOut::Integer(*n),
321 SchemaValue::UnsignedInteger(n) => ValueOut::Integer(*n as i64),
322 SchemaValue::Float(n) => ValueOut::Float(*n),
323 SchemaValue::BigInt(n) => ValueOut::Integer(*n),
324 SchemaValue::TimestampMs(n)
325 | SchemaValue::Timestamp(n)
326 | SchemaValue::Duration(n)
327 | SchemaValue::Decimal(n) => ValueOut::Integer(*n),
328 SchemaValue::Password(_) | SchemaValue::Secret(_) => ValueOut::String("***".to_string()),
329 SchemaValue::Text(s) => ValueOut::String(s.to_string()),
330 SchemaValue::Email(s)
331 | SchemaValue::Url(s)
332 | SchemaValue::NodeRef(s)
333 | SchemaValue::EdgeRef(s) => ValueOut::String(s.clone()),
334 other => ValueOut::String(format!("{other}")),
335 }
336}