1use std::sync::Arc;
13
14use serde_json::{json, Value};
15
16use crate::{
17 error::{KeraDbError, Result},
18 ffi::{get_ffi, DbHandle, KeraDbFfi},
19 results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult},
20 vector::{
21 MetadataFilter, VectorCollectionInfo, VectorCollectionStats,
22 VectorConfig, VectorDocument, VectorSearchResult,
23 },
24};
25
26pub fn matches_filter(doc: &Value, filter: &Value) -> bool {
35 let filter_obj = match filter.as_object() {
36 Some(o) => o,
37 None => return true,
38 };
39
40 for (key, value) in filter_obj {
41 match key.as_str() {
42 "$and" => {
44 if let Some(conditions) = value.as_array() {
45 if !conditions.iter().all(|c| matches_filter(doc, c)) {
46 return false;
47 }
48 }
49 }
50 "$or" => {
51 if let Some(conditions) = value.as_array() {
52 if !conditions.iter().any(|c| matches_filter(doc, c)) {
53 return false;
54 }
55 }
56 }
57 field => {
59 let doc_val = &doc[field];
60 if let Some(ops) = value.as_object() {
61 for (op, op_val) in ops {
62 let matched = match op.as_str() {
63 "$eq" => doc_val == op_val,
64 "$ne" => doc_val != op_val,
65 "$gt" => cmp_values(doc_val, op_val) == Some(std::cmp::Ordering::Greater),
66 "$gte" => matches!(
67 cmp_values(doc_val, op_val),
68 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
69 ),
70 "$lt" => cmp_values(doc_val, op_val) == Some(std::cmp::Ordering::Less),
71 "$lte" => matches!(
72 cmp_values(doc_val, op_val),
73 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
74 ),
75 "$in" => {
76 if let Some(arr) = op_val.as_array() {
77 arr.contains(doc_val)
78 } else {
79 false
80 }
81 }
82 "$nin" => {
83 if let Some(arr) = op_val.as_array() {
84 !arr.contains(doc_val)
85 } else {
86 true
87 }
88 }
89 _ => true, };
91 if !matched {
92 return false;
93 }
94 }
95 } else {
96 if doc_val != value {
98 return false;
99 }
100 }
101 }
102 }
103 }
104 true
105}
106
107fn cmp_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
110 match (a.as_f64(), b.as_f64()) {
111 (Some(av), Some(bv)) => av.partial_cmp(&bv),
112 _ => None,
113 }
114}
115
116pub fn apply_update(doc: &Value, update: &Value) -> Value {
120 let doc_obj = match doc.as_object() {
121 Some(o) => o,
122 None => return doc.clone(),
123 };
124 let update_obj = match update.as_object() {
125 Some(o) => o,
126 None => return doc.clone(),
127 };
128
129 let has_operators = update_obj.keys().any(|k| k.starts_with('$'));
130
131 if !has_operators {
132 let mut result = update_obj.clone();
134 if let Some(id) = doc_obj.get("_id") {
135 result.insert("_id".to_owned(), id.clone());
136 }
137 return Value::Object(result);
138 }
139
140 let mut result = doc_obj.clone();
141
142 for (op, fields) in update_obj {
143 match op.as_str() {
144 "$set" => {
145 if let Some(obj) = fields.as_object() {
146 for (k, v) in obj {
147 result.insert(k.clone(), v.clone());
148 }
149 }
150 }
151 "$unset" => {
152 if let Some(obj) = fields.as_object() {
153 for k in obj.keys() {
154 result.remove(k);
155 }
156 }
157 }
158 "$inc" => {
159 if let Some(obj) = fields.as_object() {
160 for (k, v) in obj {
161 let current = result.get(k).and_then(|x| x.as_f64()).unwrap_or(0.0);
162 let delta = v.as_f64().unwrap_or(0.0);
163 let new_val = if current.fract() == 0.0 && delta.fract() == 0.0 {
165 json!((current + delta) as i64)
166 } else {
167 json!(current + delta)
168 };
169 result.insert(k.clone(), new_val);
170 }
171 }
172 }
173 "$push" => {
174 if let Some(obj) = fields.as_object() {
175 for (k, v) in obj {
176 let arr = result
177 .entry(k.clone())
178 .or_insert_with(|| Value::Array(vec![]));
179 if let Some(a) = arr.as_array_mut() {
180 a.push(v.clone());
181 }
182 }
183 }
184 }
185 _ => {}
186 }
187 }
188
189 Value::Object(result)
190}
191
192pub struct Cursor {
199 documents: Vec<Value>,
200 limit: Option<usize>,
201 skip: usize,
202}
203
204impl Cursor {
205 pub fn new(documents: Vec<Value>) -> Self {
207 Self {
208 documents,
209 limit: None,
210 skip: 0,
211 }
212 }
213
214 pub fn limit(mut self, count: usize) -> Self {
216 self.limit = Some(count);
217 self
218 }
219
220 pub fn skip(mut self, count: usize) -> Self {
222 self.skip = count;
223 self
224 }
225
226 pub fn all(self) -> Vec<Value> {
228 let docs = &self.documents[self.skip.min(self.documents.len())..];
229 match self.limit {
230 Some(n) => docs.iter().take(n).cloned().collect(),
231 None => docs.to_vec(),
232 }
233 }
234
235 pub fn first(self) -> Option<Value> {
237 self.limit(1).all().into_iter().next()
238 }
239}
240
241impl IntoIterator for Cursor {
242 type Item = Value;
243 type IntoIter = std::vec::IntoIter<Value>;
244
245 fn into_iter(self) -> Self::IntoIter {
246 self.all().into_iter()
247 }
248}
249
250pub struct Collection {
259 db: DbHandle,
260 name: String,
261 ffi: Arc<KeraDbFfi>,
262}
263
264unsafe impl Send for Collection {}
267
268impl Collection {
269 fn new(db: DbHandle, name: impl Into<String>, ffi: Arc<KeraDbFfi>) -> Self {
270 Self {
271 db,
272 name: name.into(),
273 ffi,
274 }
275 }
276
277 pub fn name(&self) -> &str {
279 &self.name
280 }
281
282 pub fn insert_one(&self, document: Value) -> Result<InsertOneResult> {
296 let json_data = document.to_string();
297 let c_collection = KeraDbFfi::to_cstring(&self.name)?;
298 let c_json = KeraDbFfi::to_cstring(&json_data)?;
299
300 let ptr =
301 unsafe { (self.ffi.fn_insert)(self.db, c_collection.as_ptr(), c_json.as_ptr()) };
302 if ptr.is_null() {
303 return Err(KeraDbError::Native(self.ffi.last_error()));
304 }
305 let id = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
306 Ok(InsertOneResult::new(id))
307 }
308
309 pub fn insert_many(&self, documents: Vec<Value>) -> Result<InsertManyResult> {
311 let mut ids = Vec::with_capacity(documents.len());
312 for doc in documents {
313 ids.push(self.insert_one(doc)?.inserted_id);
314 }
315 Ok(InsertManyResult::new(ids))
316 }
317
318 pub fn find_one(&self, filter: Option<&Value>) -> Result<Option<Value>> {
327 match filter {
328 None => {
329 let docs = self.find(None)?.limit(1).all();
330 Ok(docs.into_iter().next())
331 }
332 Some(f) => {
333 if let Some(id) = f.get("_id").and_then(|v| v.as_str()) {
334 let c_coll = KeraDbFfi::to_cstring(&self.name)?;
336 let c_id = KeraDbFfi::to_cstring(id)?;
337 let ptr = unsafe {
338 (self.ffi.fn_find_by_id)(self.db, c_coll.as_ptr(), c_id.as_ptr())
339 };
340 if ptr.is_null() {
341 return Ok(None);
342 }
343 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
344 let doc: Value = serde_json::from_str(&json_str)?;
345 return Ok(Some(doc));
346 }
347 for doc in self.find(None)? {
349 if matches_filter(&doc, f) {
350 return Ok(Some(doc));
351 }
352 }
353 Ok(None)
354 }
355 }
356 }
357
358 pub fn find(&self, filter: Option<&Value>) -> Result<Cursor> {
362 let c_coll = KeraDbFfi::to_cstring(&self.name)?;
363 let ptr = unsafe { (self.ffi.fn_find_all)(self.db, c_coll.as_ptr(), -1, -1) };
364
365 if ptr.is_null() {
366 return Ok(Cursor::new(vec![]));
367 }
368 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
369 let docs: Vec<Value> = serde_json::from_str(&json_str)?;
370
371 let filtered = match filter {
372 Some(f) => docs.into_iter().filter(|d| matches_filter(d, f)).collect(),
373 None => docs,
374 };
375 Ok(Cursor::new(filtered))
376 }
377
378 pub fn update_one(&self, filter: &Value, update: &Value) -> Result<UpdateResult> {
386 let doc = match self.find_one(Some(filter))? {
387 Some(d) => d,
388 None => return Ok(UpdateResult::new(0, 0)),
389 };
390
391 let mut updated = apply_update(&doc, update);
392
393 let id = match updated.as_object_mut().and_then(|o| o.remove("_id")) {
395 Some(Value::String(s)) => s,
396 Some(other) => other.to_string().trim_matches('"').to_owned(),
397 None => return Err(KeraDbError::Other("Document missing _id".into())),
398 };
399
400 let json_data = updated.to_string();
401 let c_coll = KeraDbFfi::to_cstring(&self.name)?;
402 let c_id = KeraDbFfi::to_cstring(&id)?;
403 let c_json = KeraDbFfi::to_cstring(&json_data)?;
404
405 let ptr = unsafe {
406 (self.ffi.fn_update)(self.db, c_coll.as_ptr(), c_id.as_ptr(), c_json.as_ptr())
407 };
408 if ptr.is_null() {
409 return Err(KeraDbError::Native(self.ffi.last_error()));
410 }
411 unsafe { self.ffi.free_string(ptr) };
412 Ok(UpdateResult::new(1, 1))
413 }
414
415 pub fn update_many(&self, filter: &Value, update: &Value) -> Result<UpdateResult> {
417 let docs = self.find(Some(filter))?.all();
418 let matched = docs.len();
419 let mut modified = 0;
420 for doc in docs {
421 let id_filter = json!({"_id": doc["_id"]});
422 self.update_one(&id_filter, update)?;
423 modified += 1;
424 }
425 Ok(UpdateResult::new(matched, modified))
426 }
427
428 pub fn delete_one(&self, filter: &Value) -> Result<DeleteResult> {
434 let doc = match self.find_one(Some(filter))? {
435 Some(d) => d,
436 None => return Ok(DeleteResult::new(0)),
437 };
438
439 let id = doc["_id"].as_str().unwrap_or("").to_owned();
440 let c_coll = KeraDbFfi::to_cstring(&self.name)?;
441 let c_id = KeraDbFfi::to_cstring(&id)?;
442
443 let result = unsafe { (self.ffi.fn_delete)(self.db, c_coll.as_ptr(), c_id.as_ptr()) };
444 Ok(DeleteResult::new(if result != 0 { 1 } else { 0 }))
445 }
446
447 pub fn delete_many(&self, filter: &Value) -> Result<DeleteResult> {
449 let docs = self.find(Some(filter))?.all();
450 let mut deleted = 0;
451 for doc in docs {
452 let id_filter = json!({"_id": doc["_id"]});
453 deleted += self.delete_one(&id_filter)?.deleted_count;
454 }
455 Ok(DeleteResult::new(deleted))
456 }
457
458 pub fn count_documents(&self, filter: Option<&Value>) -> Result<usize> {
464 if let Some(f) = filter {
465 return Ok(self.find(Some(f))?.all().len());
466 }
467 let c_coll = KeraDbFfi::to_cstring(&self.name)?;
468 let count = unsafe { (self.ffi.fn_count)(self.db, c_coll.as_ptr()) };
469 Ok(count.max(0) as usize)
470 }
471}
472
473pub struct Database {
481 db: DbHandle,
482 ffi: Arc<KeraDbFfi>,
483}
484
485unsafe impl Send for Database {}
486
487impl Database {
488 fn new(db: DbHandle, ffi: Arc<KeraDbFfi>) -> Self {
489 Self { db, ffi }
490 }
491
492 pub fn collection(&self, name: &str) -> Collection {
495 Collection::new(self.db, name, Arc::clone(&self.ffi))
496 }
497
498 pub fn list_collection_names(&self) -> Result<Vec<String>> {
500 let ptr = unsafe { (self.ffi.fn_list_collections)(self.db) };
501 if ptr.is_null() {
502 return Ok(vec![]);
503 }
504 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
505 let raw: Vec<Value> = serde_json::from_str(&json_str)?;
506 Ok(raw
507 .into_iter()
508 .filter_map(|v| v.get(0).and_then(|n| n.as_str()).map(String::from))
509 .collect())
510 }
511}
512
513pub struct Client {
534 pub(crate) db: DbHandle,
535 ffi: Arc<KeraDbFfi>,
536 closed: bool,
537}
538
539unsafe impl Send for Client {}
540
541impl Client {
542 fn open(path: &str) -> Result<Self> {
544 let ffi = get_ffi()?;
545 let c_path = KeraDbFfi::to_cstring(path)?;
546
547 let db = unsafe { (ffi.fn_open)(c_path.as_ptr()) };
549 let db = if db.is_null() {
550 let db = unsafe { (ffi.fn_create)(c_path.as_ptr()) };
551 if db.is_null() {
552 return Err(KeraDbError::Native(ffi.last_error()));
553 }
554 db
555 } else {
556 db
557 };
558
559 Ok(Self {
560 db,
561 ffi,
562 closed: false,
563 })
564 }
565
566 pub fn database(&self) -> Database {
571 Database::new(self.db, Arc::clone(&self.ffi))
572 }
573
574 pub fn sync(&self) -> Result<()> {
576 if self.closed {
577 return Err(KeraDbError::Closed);
578 }
579 unsafe { (self.ffi.fn_sync)(self.db) };
580 Ok(())
581 }
582
583 pub fn close(&mut self) {
585 if !self.closed {
586 unsafe { (self.ffi.fn_close)(self.db) };
587 self.closed = true;
588 }
589 }
590
591 fn require_vector_fn<T>(&self, f: Option<T>, fname: &str) -> Result<T> {
596 f.ok_or_else(|| {
597 KeraDbError::Other(format!(
598 "Vector function '{}' is not available in this build of KeraDB",
599 fname
600 ))
601 })
602 }
603
604 pub fn create_vector_collection(&self, name: &str, config: &VectorConfig) -> Result<()> {
606 let f = self.require_vector_fn(
607 self.ffi.fn_create_vector_collection,
608 "keradb_create_vector_collection",
609 )?;
610 let c_name = KeraDbFfi::to_cstring(name)?;
611 let c_cfg = KeraDbFfi::to_cstring(&config.to_json())?;
612 let ptr = unsafe { f(self.db, c_name.as_ptr(), c_cfg.as_ptr()) };
613 if ptr.is_null() {
614 return Err(KeraDbError::Native(self.ffi.last_error()));
615 }
616 unsafe { self.ffi.free_string(ptr) };
617 Ok(())
618 }
619
620 pub fn list_vector_collections(&self) -> Result<Vec<VectorCollectionInfo>> {
622 let f = self.require_vector_fn(
623 self.ffi.fn_list_vector_collections,
624 "keradb_list_vector_collections",
625 )?;
626 let ptr = unsafe { f(self.db) };
627 if ptr.is_null() {
628 return Ok(vec![]);
629 }
630 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
631 let raw: Vec<Value> = serde_json::from_str(&json_str)?;
632 Ok(raw
633 .into_iter()
634 .filter_map(|v| {
635 let name = v.get("Name").or_else(|| v.get("name"))?.as_str()?.to_owned();
636 let count = v
637 .get("Count")
638 .or_else(|| v.get("count"))
639 .and_then(|x| x.as_u64())
640 .unwrap_or(0) as usize;
641 Some(VectorCollectionInfo { name, count })
642 })
643 .collect())
644 }
645
646 pub fn drop_vector_collection(&self, name: &str) -> Result<bool> {
648 let f = self.require_vector_fn(
649 self.ffi.fn_drop_vector_collection,
650 "keradb_drop_vector_collection",
651 )?;
652 let c_name = KeraDbFfi::to_cstring(name)?;
653 let result = unsafe { f(self.db, c_name.as_ptr()) };
654 Ok(result != 0)
655 }
656
657 pub fn insert_vector(
661 &self,
662 collection: &str,
663 embedding: &[f32],
664 metadata: Option<&Value>,
665 ) -> Result<u64> {
666 let f = self.require_vector_fn(self.ffi.fn_insert_vector, "keradb_insert_vector")?;
667 let embedding_json = serde_json::to_string(embedding)?;
668 let meta_json = match metadata {
669 Some(m) => serde_json::to_string(m)?,
670 None => "{}".to_owned(),
671 };
672 let c_coll = KeraDbFfi::to_cstring(collection)?;
673 let c_emb = KeraDbFfi::to_cstring(&embedding_json)?;
674 let c_meta = KeraDbFfi::to_cstring(&meta_json)?;
675
676 let ptr = unsafe { f(self.db, c_coll.as_ptr(), c_emb.as_ptr(), c_meta.as_ptr()) };
677 if ptr.is_null() {
678 return Err(KeraDbError::Native(self.ffi.last_error()));
679 }
680 let id_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
681 id_str
682 .parse::<u64>()
683 .map_err(|e| KeraDbError::Other(format!("Failed to parse vector ID: {}", e)))
684 }
685
686 pub fn insert_text(
688 &self,
689 collection: &str,
690 text: &str,
691 metadata: Option<&Value>,
692 ) -> Result<u64> {
693 let f = self.require_vector_fn(self.ffi.fn_insert_text, "keradb_insert_text")?;
694 let meta_json = match metadata {
695 Some(m) => serde_json::to_string(m)?,
696 None => "{}".to_owned(),
697 };
698 let c_coll = KeraDbFfi::to_cstring(collection)?;
699 let c_text = KeraDbFfi::to_cstring(text)?;
700 let c_meta = KeraDbFfi::to_cstring(&meta_json)?;
701
702 let ptr = unsafe { f(self.db, c_coll.as_ptr(), c_text.as_ptr(), c_meta.as_ptr()) };
703 if ptr.is_null() {
704 return Err(KeraDbError::Native(self.ffi.last_error()));
705 }
706 let id_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
707 id_str
708 .parse::<u64>()
709 .map_err(|e| KeraDbError::Other(format!("Failed to parse vector ID: {}", e)))
710 }
711
712 pub fn vector_search(
714 &self,
715 collection: &str,
716 query: &[f32],
717 k: usize,
718 ) -> Result<Vec<VectorSearchResult>> {
719 let f = self.require_vector_fn(self.ffi.fn_vector_search, "keradb_vector_search")?;
720 let query_json = serde_json::to_string(query)?;
721 let c_coll = KeraDbFfi::to_cstring(collection)?;
722 let c_query = KeraDbFfi::to_cstring(&query_json)?;
723
724 let ptr = unsafe { f(self.db, c_coll.as_ptr(), c_query.as_ptr(), k as i32) };
725 if ptr.is_null() {
726 return Err(KeraDbError::Native(self.ffi.last_error()));
727 }
728 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
729 let raw: Vec<Value> = serde_json::from_str(&json_str)?;
730 Ok(raw
731 .iter()
732 .filter_map(VectorSearchResult::from_value)
733 .collect())
734 }
735
736 pub fn vector_search_text(
738 &self,
739 collection: &str,
740 query: &str,
741 k: usize,
742 ) -> Result<Vec<VectorSearchResult>> {
743 let f =
744 self.require_vector_fn(self.ffi.fn_vector_search_text, "keradb_vector_search_text")?;
745 let c_coll = KeraDbFfi::to_cstring(collection)?;
746 let c_query = KeraDbFfi::to_cstring(query)?;
747
748 let ptr = unsafe { f(self.db, c_coll.as_ptr(), c_query.as_ptr(), k as i32) };
749 if ptr.is_null() {
750 return Err(KeraDbError::Native(self.ffi.last_error()));
751 }
752 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
753 let raw: Vec<Value> = serde_json::from_str(&json_str)?;
754 Ok(raw
755 .iter()
756 .filter_map(VectorSearchResult::from_value)
757 .collect())
758 }
759
760 pub fn vector_search_filtered(
762 &self,
763 collection: &str,
764 query: &[f32],
765 k: usize,
766 filter: &MetadataFilter,
767 ) -> Result<Vec<VectorSearchResult>> {
768 let f = self.require_vector_fn(
769 self.ffi.fn_vector_search_filtered,
770 "keradb_vector_search_filtered",
771 )?;
772 let query_json = serde_json::to_string(query)?;
773 let c_coll = KeraDbFfi::to_cstring(collection)?;
774 let c_query = KeraDbFfi::to_cstring(&query_json)?;
775 let c_filter = KeraDbFfi::to_cstring(&filter.to_json())?;
776
777 let ptr =
778 unsafe { f(self.db, c_coll.as_ptr(), c_query.as_ptr(), k as i32, c_filter.as_ptr()) };
779 if ptr.is_null() {
780 return Err(KeraDbError::Native(self.ffi.last_error()));
781 }
782 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
783 let raw: Vec<Value> = serde_json::from_str(&json_str)?;
784 Ok(raw
785 .iter()
786 .filter_map(VectorSearchResult::from_value)
787 .collect())
788 }
789
790 pub fn get_vector(&self, collection: &str, id: u64) -> Result<Option<VectorDocument>> {
792 let f = self.require_vector_fn(self.ffi.fn_get_vector, "keradb_get_vector")?;
793 let c_coll = KeraDbFfi::to_cstring(collection)?;
794 let ptr = unsafe { f(self.db, c_coll.as_ptr(), id) };
795 if ptr.is_null() {
796 return Ok(None);
797 }
798 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
799 let v: Value = serde_json::from_str(&json_str)?;
800 Ok(VectorDocument::from_value(&v))
801 }
802
803 pub fn delete_vector(&self, collection: &str, id: u64) -> Result<bool> {
805 let f = self.require_vector_fn(self.ffi.fn_delete_vector, "keradb_delete_vector")?;
806 let c_coll = KeraDbFfi::to_cstring(collection)?;
807 let result = unsafe { f(self.db, c_coll.as_ptr(), id) };
808 Ok(result != 0)
809 }
810
811 pub fn vector_stats(&self, collection: &str) -> Result<VectorCollectionStats> {
813 let f = self.require_vector_fn(self.ffi.fn_vector_stats, "keradb_vector_stats")?;
814 let c_coll = KeraDbFfi::to_cstring(collection)?;
815 let ptr = unsafe { f(self.db, c_coll.as_ptr()) };
816 if ptr.is_null() {
817 return Err(KeraDbError::Native(self.ffi.last_error()));
818 }
819 let json_str = unsafe { self.ffi.c_str_to_string_and_free(ptr) }?;
820 let v: Value = serde_json::from_str(&json_str)?;
821 VectorCollectionStats::from_value(&v)
822 .ok_or_else(|| KeraDbError::Other("Failed to parse vector stats".into()))
823 }
824
825 pub fn has_vector_support(&self) -> bool {
827 self.ffi.has_vector_support
828 }
829}
830
831impl Drop for Client {
832 fn drop(&mut self) {
833 self.close();
834 }
835}
836
837pub fn connect(path: &str) -> Result<Client> {
851 Client::open(path)
852}