1use std::path::PathBuf;
9use std::sync::Arc;
10
11use reddb_server::api::RedDBOptions;
12use reddb_server::runtime::RedDBRuntime;
13use reddb_server::storage::query::unified::UnifiedRecord;
14use reddb_server::storage::schema::Value as SchemaValue;
15
16use crate::error::{ClientError, ErrorCode, Result};
17use crate::params::IntoParams;
18use crate::types::{BulkInsertResult, InsertResult, JsonValue, QueryResult, ValueOut};
19
20pub struct EmbeddedClient {
22 runtime: Arc<RedDBRuntime>,
23}
24
25impl std::fmt::Debug for EmbeddedClient {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("EmbeddedClient").finish_non_exhaustive()
28 }
29}
30
31impl EmbeddedClient {
32 pub fn open(path: PathBuf) -> Result<Self> {
34 let runtime = RedDBRuntime::with_options(RedDBOptions::persistent(path))
35 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
36 Ok(Self {
37 runtime: Arc::new(runtime),
38 })
39 }
40
41 pub fn in_memory() -> Result<Self> {
44 let runtime = RedDBRuntime::in_memory()
45 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
46 Ok(Self {
47 runtime: Arc::new(runtime),
48 })
49 }
50
51 pub fn query(&self, sql: &str) -> Result<QueryResult> {
52 let qr = self
53 .runtime
54 .execute_query(sql)
55 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
56 Ok(map_query_result(&qr))
57 }
58
59 pub fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
63 let params = params.into_params();
64 if params.is_empty() {
65 return self.query(sql);
66 }
67 use reddb_server::storage::query::modes::parse_multi;
68 use reddb_server::storage::query::user_params;
69 let binds: Vec<SchemaValue> = params
70 .iter()
71 .cloned()
72 .map(crate::params::Value::into_schema_value)
73 .collect();
74 let parsed =
75 parse_multi(sql).map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
76 let bound = user_params::bind(&parsed, &binds)
77 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
78 let qr = self
79 .runtime
80 .execute_query_expr(bound)
81 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
82 Ok(map_query_result(&qr))
83 }
84
85 pub fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
89 self.query_with(sql, params)
90 }
91
92 pub fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
99 let object = payload.as_object().ok_or_else(|| {
100 ClientError::new(
101 ErrorCode::QueryError,
102 "insert payload must be a JSON object".to_string(),
103 )
104 })?;
105 let column_names: Vec<String> = object.iter().map(|(k, _)| k.clone()).collect();
106 let row: Vec<SchemaValue> = object
107 .iter()
108 .map(|(_, v)| json_value_to_schema_value(v))
109 .collect();
110 let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
111 self.runtime.as_ref(),
112 collection.to_string(),
113 Arc::new(column_names),
114 vec![row],
115 )
116 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
117 let rid = outputs.first().map(|output| output.id.raw().to_string());
118 Ok(InsertResult {
119 affected: outputs.len() as u64,
120 rid: rid.clone(),
121 id: rid,
122 })
123 }
124
125 pub fn bulk_insert(
136 &self,
137 collection: &str,
138 payloads: &[JsonValue],
139 ) -> Result<BulkInsertResult> {
140 if payloads.is_empty() {
141 return Ok(BulkInsertResult {
142 affected: 0,
143 rids: Vec::new(),
144 ids: Vec::new(),
145 });
146 }
147
148 let objects: Vec<&[(String, JsonValue)]> = payloads
151 .iter()
152 .map(|p| {
153 p.as_object().ok_or_else(|| {
154 ClientError::new(
155 ErrorCode::QueryError,
156 "bulk_insert payloads must be JSON objects".to_string(),
157 )
158 })
159 })
160 .collect::<Result<_>>()?;
161
162 if uniform_schema(&objects) {
167 let column_names: Vec<String> = objects[0].iter().map(|(k, _)| k.clone()).collect();
168 let ncols = column_names.len();
169 let mut rows: Vec<Vec<SchemaValue>> = Vec::with_capacity(objects.len());
170 for obj in &objects {
171 let mut values = Vec::with_capacity(ncols);
172 for (_, v) in obj.iter() {
173 values.push(json_value_to_schema_value(v));
174 }
175 rows.push(values);
176 }
177 let outputs = reddb_server::RuntimeEntityPort::create_rows_batch_columnar_with_outputs(
178 self.runtime.as_ref(),
179 collection.to_string(),
180 Arc::new(column_names),
181 rows,
182 )
183 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
184 let rids: Vec<String> = outputs
185 .iter()
186 .map(|output| output.id.raw().to_string())
187 .collect();
188 return Ok(BulkInsertResult {
189 affected: outputs.len() as u64,
190 rids: rids.clone(),
191 ids: rids,
192 });
193 }
194
195 let mut affected = 0u64;
198 let mut ids = Vec::with_capacity(objects.len());
199 for object in &objects {
200 let payload = JsonValue::Object(object.to_vec());
201 let result = self.insert(collection, &payload)?;
202 affected += result.affected;
203 if let Some(id) = result.id {
204 ids.push(id);
205 }
206 }
207 Ok(BulkInsertResult {
208 affected,
209 rids: ids.clone(),
210 ids,
211 })
212 }
213
214 pub fn delete(&self, collection: &str, id: &str) -> Result<u64> {
215 let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
216 let qr = self
217 .runtime
218 .execute_query(&sql)
219 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
220 Ok(qr.affected_rows)
221 }
222
223 pub fn close(&self) -> Result<()> {
224 self.runtime
225 .checkpoint()
226 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))
227 }
228
229 pub fn version() -> &'static str {
230 env!("CARGO_PKG_VERSION")
231 }
232}
233
234fn uniform_schema(objects: &[&[(String, JsonValue)]]) -> bool {
239 let Some((first, rest)) = objects.split_first() else {
240 return true;
241 };
242 let ncols = first.len();
243 for row in rest {
244 if row.len() != ncols {
245 return false;
246 }
247 for ((k1, _), (k2, _)) in first.iter().zip(row.iter()) {
248 if k1 != k2 {
249 return false;
250 }
251 }
252 }
253 true
254}
255
256fn json_value_to_schema_value(v: &JsonValue) -> SchemaValue {
265 match v {
266 JsonValue::Null => SchemaValue::Null,
267 JsonValue::Bool(b) => SchemaValue::Boolean(*b),
268 JsonValue::Number(n) => {
269 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
270 SchemaValue::Integer(*n as i64)
271 } else {
272 SchemaValue::Float(*n)
273 }
274 }
275 JsonValue::String(s) => SchemaValue::Text(std::sync::Arc::from(s.as_str())),
276 JsonValue::Array(_) | JsonValue::Object(_) => {
277 SchemaValue::Text(std::sync::Arc::from(v.to_json_string()))
278 }
279 }
280}
281
282fn map_query_result(qr: &reddb_server::runtime::RuntimeQueryResult) -> QueryResult {
283 let columns: Vec<String> = qr
284 .result
285 .records
286 .first()
287 .map(|r| {
288 let mut keys: Vec<String> = r.column_names().iter().map(|k| k.to_string()).collect();
289 keys.sort();
290 keys
291 })
292 .unwrap_or_default();
293
294 let rows: Vec<Vec<(String, ValueOut)>> =
295 qr.result.records.iter().map(record_to_pairs).collect();
296
297 QueryResult {
298 statement: qr.statement_type.to_string(),
299 affected: qr.affected_rows,
300 columns,
301 rows,
302 }
303}
304
305fn record_to_pairs(record: &UnifiedRecord) -> Vec<(String, ValueOut)> {
306 let mut entries: Vec<(&str, &SchemaValue)> =
307 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
308 entries.sort_by(|a, b| a.0.cmp(b.0));
309 entries
310 .into_iter()
311 .map(|(k, v)| (k.to_string(), schema_value_to_value_out(v)))
312 .collect()
313}
314
315fn schema_value_to_value_out(v: &SchemaValue) -> ValueOut {
316 match v {
317 SchemaValue::Null => ValueOut::Null,
318 SchemaValue::Boolean(b) => ValueOut::Bool(*b),
319 SchemaValue::Integer(n) => ValueOut::Integer(*n),
320 SchemaValue::UnsignedInteger(n) => ValueOut::Integer(*n as i64),
321 SchemaValue::Float(n) => ValueOut::Float(*n),
322 SchemaValue::BigInt(n) => ValueOut::Integer(*n),
323 SchemaValue::TimestampMs(n)
324 | SchemaValue::Timestamp(n)
325 | SchemaValue::Duration(n)
326 | SchemaValue::Decimal(n) => ValueOut::Integer(*n),
327 SchemaValue::Password(_) | SchemaValue::Secret(_) => ValueOut::String("***".to_string()),
328 SchemaValue::Text(s) => ValueOut::String(s.to_string()),
329 SchemaValue::Email(s)
330 | SchemaValue::Url(s)
331 | SchemaValue::NodeRef(s)
332 | SchemaValue::EdgeRef(s) => ValueOut::String(s.clone()),
333 other => ValueOut::String(format!("{other}")),
334 }
335}