1use std::path::PathBuf;
9use std::sync::Arc;
10
11use reddb_server::api::RedDBOptions;
12use reddb_server::runtime::RedDBRuntime;
13use reddb_server::storage::query::unified::UnifiedRecord;
14use reddb_server::storage::schema::Value as SchemaValue;
15use reddb_server::RuntimeEntityPort;
16
17use crate::error::{ClientError, ErrorCode, Result};
18use crate::types::{InsertResult, JsonValue, QueryResult, ValueOut};
19
20pub struct EmbeddedClient {
22 runtime: Arc<RedDBRuntime>,
23}
24
25impl std::fmt::Debug for EmbeddedClient {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("EmbeddedClient").finish_non_exhaustive()
28 }
29}
30
31impl EmbeddedClient {
32 pub fn open(path: PathBuf) -> Result<Self> {
34 let runtime = RedDBRuntime::with_options(RedDBOptions::persistent(path))
35 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
36 Ok(Self {
37 runtime: Arc::new(runtime),
38 })
39 }
40
41 pub fn in_memory() -> Result<Self> {
44 let runtime = RedDBRuntime::in_memory()
45 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))?;
46 Ok(Self {
47 runtime: Arc::new(runtime),
48 })
49 }
50
51 pub fn query(&self, sql: &str) -> Result<QueryResult> {
52 let qr = self
53 .runtime
54 .execute_query(sql)
55 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
56 Ok(map_query_result(&qr))
57 }
58
59 pub fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
66 let object = payload.as_object().ok_or_else(|| {
67 ClientError::new(
68 ErrorCode::QueryError,
69 "insert payload must be a JSON object".to_string(),
70 )
71 })?;
72 let column_names: Vec<String> = object.iter().map(|(k, _)| k.clone()).collect();
73 let row: Vec<SchemaValue> = object
74 .iter()
75 .map(|(_, v)| json_value_to_schema_value(v))
76 .collect();
77 let count = self
78 .runtime
79 .create_rows_batch_columnar(collection.to_string(), Arc::new(column_names), vec![row])
80 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
81 Ok(InsertResult {
82 affected: count as u64,
83 id: None,
84 })
85 }
86
87 pub fn bulk_insert(&self, collection: &str, payloads: &[JsonValue]) -> Result<u64> {
98 if payloads.is_empty() {
99 return Ok(0);
100 }
101
102 let objects: Vec<&[(String, JsonValue)]> = payloads
105 .iter()
106 .map(|p| {
107 p.as_object().ok_or_else(|| {
108 ClientError::new(
109 ErrorCode::QueryError,
110 "bulk_insert payloads must be JSON objects".to_string(),
111 )
112 })
113 })
114 .collect::<Result<_>>()?;
115
116 if uniform_schema(&objects) {
121 let column_names: Vec<String> = objects[0].iter().map(|(k, _)| k.clone()).collect();
122 let ncols = column_names.len();
123 let mut rows: Vec<Vec<SchemaValue>> = Vec::with_capacity(objects.len());
124 for obj in &objects {
125 let mut values = Vec::with_capacity(ncols);
126 for (_, v) in obj.iter() {
127 values.push(json_value_to_schema_value(v));
128 }
129 rows.push(values);
130 }
131 let count = self
132 .runtime
133 .create_rows_batch_columnar(collection.to_string(), Arc::new(column_names), rows)
134 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
135 return Ok(count as u64);
136 }
137
138 let mut total = 0u64;
141 for object in &objects {
142 let sql = build_insert_sql(collection, object);
143 let qr = self
144 .runtime
145 .execute_query(&sql)
146 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
147 total += qr.affected_rows;
148 }
149 Ok(total)
150 }
151
152 pub fn delete(&self, collection: &str, id: &str) -> Result<u64> {
153 let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
154 let qr = self
155 .runtime
156 .execute_query(&sql)
157 .map_err(|e| ClientError::new(ErrorCode::QueryError, e.to_string()))?;
158 Ok(qr.affected_rows)
159 }
160
161 pub fn close(&self) -> Result<()> {
162 self.runtime
163 .checkpoint()
164 .map_err(|e| ClientError::new(ErrorCode::IoError, e.to_string()))
165 }
166
167 pub fn version() -> &'static str {
168 env!("CARGO_PKG_VERSION")
169 }
170}
171
172fn uniform_schema(objects: &[&[(String, JsonValue)]]) -> bool {
177 let Some((first, rest)) = objects.split_first() else {
178 return true;
179 };
180 let ncols = first.len();
181 for row in rest {
182 if row.len() != ncols {
183 return false;
184 }
185 for ((k1, _), (k2, _)) in first.iter().zip(row.iter()) {
186 if k1 != k2 {
187 return false;
188 }
189 }
190 }
191 true
192}
193
194fn json_value_to_schema_value(v: &JsonValue) -> SchemaValue {
203 match v {
204 JsonValue::Null => SchemaValue::Null,
205 JsonValue::Bool(b) => SchemaValue::Boolean(*b),
206 JsonValue::Number(n) => {
207 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
208 SchemaValue::Integer(*n as i64)
209 } else {
210 SchemaValue::Float(*n)
211 }
212 }
213 JsonValue::String(s) => SchemaValue::Text(std::sync::Arc::from(s.as_str())),
214 JsonValue::Array(_) | JsonValue::Object(_) => {
215 SchemaValue::Text(std::sync::Arc::from(v.to_json_string()))
216 }
217 }
218}
219
220fn build_insert_sql(collection: &str, object: &[(String, JsonValue)]) -> String {
221 let mut cols = Vec::new();
222 let mut vals = Vec::new();
223 for (k, v) in object {
224 cols.push(k.clone());
225 vals.push(value_to_sql_literal(v));
226 }
227 format!(
228 "INSERT INTO {collection} ({}) VALUES ({})",
229 cols.join(", "),
230 vals.join(", "),
231 )
232}
233
234fn value_to_sql_literal(v: &JsonValue) -> String {
235 match v {
236 JsonValue::Null => "NULL".to_string(),
237 JsonValue::Bool(b) => b.to_string(),
238 JsonValue::Number(n) => {
239 if n.fract() == 0.0 {
240 format!("{}", *n as i64)
241 } else {
242 n.to_string()
243 }
244 }
245 JsonValue::String(s) => format!("'{}'", s.replace('\'', "''")),
246 JsonValue::Array(_) | JsonValue::Object(_) => {
247 format!("'{}'", v.to_json_string().replace('\'', "''"))
248 }
249 }
250}
251
252fn map_query_result(qr: &reddb_server::runtime::RuntimeQueryResult) -> QueryResult {
253 let columns: Vec<String> = qr
254 .result
255 .records
256 .first()
257 .map(|r| {
258 let mut keys: Vec<String> = r.column_names().iter().map(|k| k.to_string()).collect();
259 keys.sort();
260 keys
261 })
262 .unwrap_or_default();
263
264 let rows: Vec<Vec<(String, ValueOut)>> =
265 qr.result.records.iter().map(record_to_pairs).collect();
266
267 QueryResult {
268 statement: qr.statement_type.to_string(),
269 affected: qr.affected_rows,
270 columns,
271 rows,
272 }
273}
274
275fn record_to_pairs(record: &UnifiedRecord) -> Vec<(String, ValueOut)> {
276 let mut entries: Vec<(&str, &SchemaValue)> =
277 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
278 entries.sort_by(|a, b| a.0.cmp(b.0));
279 entries
280 .into_iter()
281 .map(|(k, v)| (k.to_string(), schema_value_to_value_out(v)))
282 .collect()
283}
284
285fn schema_value_to_value_out(v: &SchemaValue) -> ValueOut {
286 match v {
287 SchemaValue::Null => ValueOut::Null,
288 SchemaValue::Boolean(b) => ValueOut::Bool(*b),
289 SchemaValue::Integer(n) => ValueOut::Integer(*n),
290 SchemaValue::UnsignedInteger(n) => ValueOut::Integer(*n as i64),
291 SchemaValue::Float(n) => ValueOut::Float(*n),
292 SchemaValue::BigInt(n) => ValueOut::Integer(*n),
293 SchemaValue::TimestampMs(n)
294 | SchemaValue::Timestamp(n)
295 | SchemaValue::Duration(n)
296 | SchemaValue::Decimal(n) => ValueOut::Integer(*n),
297 SchemaValue::Password(_) | SchemaValue::Secret(_) => ValueOut::String("***".to_string()),
298 SchemaValue::Text(s) => ValueOut::String(s.to_string()),
299 SchemaValue::Email(s)
300 | SchemaValue::Url(s)
301 | SchemaValue::NodeRef(s)
302 | SchemaValue::EdgeRef(s) => ValueOut::String(s.clone()),
303 other => ValueOut::String(format!("{other}")),
304 }
305}