1use std::path::PathBuf;
9use std::sync::Arc;
10
11use reddb_server::api::RedDBOptions;
12use reddb_server::runtime::RedDBRuntime;
13use reddb_server::storage::query::unified::UnifiedRecord;
14use reddb_server::storage::schema::Value as SchemaValue;
15
16use crate::error::{ClientError, ErrorCode, Result};
17use crate::params::IntoParams;
18use crate::types::{BulkInsertResult, InsertResult, JsonValue, QueryResult, ValueOut};
19
20pub struct EmbeddedClient {
22 runtime: Arc<RedDBRuntime>,
23}
24
25impl std::fmt::Debug for EmbeddedClient {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("EmbeddedClient").finish_non_exhaustive()
28 }
29}
30
31impl EmbeddedClient {
32 pub fn open(path: PathBuf) -> Result<Self> {
34 let runtime = RedDBRuntime::with_options(RedDBOptions::persistent(path))
35 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
36 Ok(Self {
37 runtime: Arc::new(runtime),
38 })
39 }
40
41 pub fn in_memory() -> Result<Self> {
44 let runtime = RedDBRuntime::in_memory()
45 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
46 Ok(Self {
47 runtime: Arc::new(runtime),
48 })
49 }
50
51 pub fn query(&self, sql: &str) -> Result<QueryResult> {
52 let qr = self
53 .runtime
54 .execute_query(sql)
55 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
56 Ok(map_query_result(&qr))
57 }
58
59 pub fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
63 let params = params.into_params();
64 if params.is_empty() {
65 return self.query(sql);
66 }
67 use reddb_server::storage::query::modes::parse_multi;
68 use reddb_server::storage::query::user_params;
69 let binds: Vec<SchemaValue> = params
70 .iter()
71 .cloned()
72 .map(crate::params::Value::into_schema_value)
73 .collect();
74 let parsed =
75 parse_multi(sql).map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
76 let bound = user_params::bind(&parsed, &binds)
77 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
78 let qr = self
79 .runtime
80 .execute_query_expr(bound)
81 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
82 Ok(map_query_result(&qr))
83 }
84
85 pub fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
89 self.query_with(sql, params)
90 }
91
92 pub fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
99 let object = payload.as_object().ok_or_else(|| {
100 ClientError::new(
101 ErrorCode::QueryError,
102 "insert payload must be a JSON object".to_string(),
103 )
104 })?;
105 let column_names: Vec<String> = object.iter().map(|(k, _)| k.clone()).collect();
106 let row: Vec<SchemaValue> = object
107 .iter()
108 .map(|(_, v)| json_value_to_schema_value(v))
109 .collect();
110 let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
111 self.runtime.as_ref(),
112 collection.to_string(),
113 Arc::new(column_names),
114 vec![row],
115 )
116 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
117 Ok(InsertResult {
118 affected: outputs.len() as u64,
119 id: outputs.first().map(|output| output.id.raw().to_string()),
120 })
121 }
122
123 pub fn bulk_insert(
134 &self,
135 collection: &str,
136 payloads: &[JsonValue],
137 ) -> Result<BulkInsertResult> {
138 if payloads.is_empty() {
139 return Ok(BulkInsertResult {
140 affected: 0,
141 ids: Vec::new(),
142 });
143 }
144
145 let objects: Vec<&[(String, JsonValue)]> = payloads
148 .iter()
149 .map(|p| {
150 p.as_object().ok_or_else(|| {
151 ClientError::new(
152 ErrorCode::QueryError,
153 "bulk_insert payloads must be JSON objects".to_string(),
154 )
155 })
156 })
157 .collect::<Result<_>>()?;
158
159 if uniform_schema(&objects) {
164 let column_names: Vec<String> = objects[0].iter().map(|(k, _)| k.clone()).collect();
165 let ncols = column_names.len();
166 let mut rows: Vec<Vec<SchemaValue>> = Vec::with_capacity(objects.len());
167 for obj in &objects {
168 let mut values = Vec::with_capacity(ncols);
169 for (_, v) in obj.iter() {
170 values.push(json_value_to_schema_value(v));
171 }
172 rows.push(values);
173 }
174 let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
175 self.runtime.as_ref(),
176 collection.to_string(),
177 Arc::new(column_names),
178 rows,
179 )
180 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
181 let ids = outputs
182 .iter()
183 .map(|output| output.id.raw().to_string())
184 .collect();
185 return Ok(BulkInsertResult {
186 affected: outputs.len() as u64,
187 ids,
188 });
189 }
190
191 let mut affected = 0u64;
194 let mut ids = Vec::with_capacity(objects.len());
195 for object in &objects {
196 let payload = JsonValue::Object(object.to_vec());
197 let result = self.insert(collection, &payload)?;
198 affected += result.affected;
199 if let Some(id) = result.id {
200 ids.push(id);
201 }
202 }
203 Ok(BulkInsertResult { affected, ids })
204 }
205
206 pub fn delete(&self, collection: &str, id: &str) -> Result<u64> {
207 let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
208 let qr = self
209 .runtime
210 .execute_query(&sql)
211 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
212 Ok(qr.affected_rows)
213 }
214
215 pub fn close(&self) -> Result<()> {
216 self.runtime
217 .checkpoint()
218 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))
219 }
220
221 pub fn version() -> &'static str {
222 env!("CARGO_PKG_VERSION")
223 }
224}
225
226fn uniform_schema(objects: &[&[(String, JsonValue)]]) -> bool {
231 let Some((first, rest)) = objects.split_first() else {
232 return true;
233 };
234 let ncols = first.len();
235 for row in rest {
236 if row.len() != ncols {
237 return false;
238 }
239 for ((k1, _), (k2, _)) in first.iter().zip(row.iter()) {
240 if k1 != k2 {
241 return false;
242 }
243 }
244 }
245 true
246}
247
248fn json_value_to_schema_value(v: &JsonValue) -> SchemaValue {
257 match v {
258 JsonValue::Null => SchemaValue::Null,
259 JsonValue::Bool(b) => SchemaValue::Boolean(*b),
260 JsonValue::Number(n) => {
261 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
262 SchemaValue::Integer(*n as i64)
263 } else {
264 SchemaValue::Float(*n)
265 }
266 }
267 JsonValue::String(s) => SchemaValue::Text(std::sync::Arc::from(s.as_str())),
268 JsonValue::Array(_) | JsonValue::Object(_) => {
269 SchemaValue::Text(std::sync::Arc::from(v.to_json_string()))
270 }
271 }
272}
273
274fn map_query_result(qr: &reddb_server::runtime::RuntimeQueryResult) -> QueryResult {
275 let columns: Vec<String> = qr
276 .result
277 .records
278 .first()
279 .map(|r| {
280 let mut keys: Vec<String> = r.column_names().iter().map(|k| k.to_string()).collect();
281 keys.sort();
282 keys
283 })
284 .unwrap_or_default();
285
286 let rows: Vec<Vec<(String, ValueOut)>> =
287 qr.result.records.iter().map(record_to_pairs).collect();
288
289 QueryResult {
290 statement: qr.statement_type.to_string(),
291 affected: qr.affected_rows,
292 columns,
293 rows,
294 }
295}
296
297fn record_to_pairs(record: &UnifiedRecord) -> Vec<(String, ValueOut)> {
298 let mut entries: Vec<(&str, &SchemaValue)> =
299 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
300 entries.sort_by(|a, b| a.0.cmp(b.0));
301 entries
302 .into_iter()
303 .map(|(k, v)| (k.to_string(), schema_value_to_value_out(v)))
304 .collect()
305}
306
307fn schema_value_to_value_out(v: &SchemaValue) -> ValueOut {
308 match v {
309 SchemaValue::Null => ValueOut::Null,
310 SchemaValue::Boolean(b) => ValueOut::Bool(*b),
311 SchemaValue::Integer(n) => ValueOut::Integer(*n),
312 SchemaValue::UnsignedInteger(n) => ValueOut::Integer(*n as i64),
313 SchemaValue::Float(n) => ValueOut::Float(*n),
314 SchemaValue::BigInt(n) => ValueOut::Integer(*n),
315 SchemaValue::TimestampMs(n)
316 | SchemaValue::Timestamp(n)
317 | SchemaValue::Duration(n)
318 | SchemaValue::Decimal(n) => ValueOut::Integer(*n),
319 SchemaValue::Password(_) | SchemaValue::Secret(_) => ValueOut::String("***".to_string()),
320 SchemaValue::Text(s) => ValueOut::String(s.to_string()),
321 SchemaValue::Email(s)
322 | SchemaValue::Url(s)
323 | SchemaValue::NodeRef(s)
324 | SchemaValue::EdgeRef(s) => ValueOut::String(s.clone()),
325 other => ValueOut::String(format!("{other}")),
326 }
327}