1#![warn(clippy::all)]
47
48use std::path::PathBuf;
49use std::sync::{Arc, Barrier};
50use std::time::Duration;
51
52use kora_core::command::{Command, CommandResponse};
53use kora_core::shard::{ShardEngine, ShardStore, WalRecord, WalWriter};
54use kora_doc::{
55 CollectionConfig, CollectionInfo, DictionaryInfo, DocEngine, DocError, DocMutation, IndexType,
56 InsertResult, SetResult, StorageInfo,
57};
58use kora_storage::shard_storage::ShardStorage;
59use kora_storage::wal::{SyncPolicy, WalEntry};
60use parking_lot::{Mutex, RwLock};
61use serde_json::Value;
62
63#[cfg(feature = "server")]
64use tokio::task::JoinHandle;
65
66pub type DocCollectionConfig = CollectionConfig;
68pub type DocCollectionInfo = CollectionInfo;
70pub type DocSetResult = SetResult;
72pub type DocUpdateMutation = DocMutation;
74pub type DocDictionaryInfo = DictionaryInfo;
76pub type DocStorageInfo = StorageInfo;
78pub type DocInsertResult = InsertResult;
80
81pub struct Config {
86 pub shard_count: usize,
91 pub data_dir: Option<PathBuf>,
95 pub wal_sync: SyncPolicy,
97}
98
99impl Default for Config {
100 fn default() -> Self {
101 Self {
102 shard_count: std::thread::available_parallelism()
103 .map(|n| n.get())
104 .unwrap_or(4),
105 data_dir: None,
106 wal_sync: SyncPolicy::EverySecond,
107 }
108 }
109}
110
111pub struct Database {
135 engine: Arc<ShardEngine>,
136 doc_engine: Arc<RwLock<DocEngine>>,
137 doc_wal: Option<Mutex<Box<dyn WalWriter>>>,
138}
139
140impl Database {
141 #[allow(clippy::type_complexity)]
146 pub fn open(config: Config) -> Self {
147 let doc_engine = Arc::new(RwLock::new(DocEngine::new()));
148
149 let (engine, doc_wal) = match config.data_dir {
150 Some(ref data_dir) => {
151 let mut wal_writers: Vec<Option<Box<dyn WalWriter>>> =
152 Vec::with_capacity(config.shard_count);
153 let mut recovery_fns: Vec<Box<dyn FnOnce(usize, &mut ShardStore) + Send>> =
154 Vec::with_capacity(config.shard_count);
155 let barrier = Arc::new(Barrier::new(config.shard_count + 1));
156
157 for i in 0..config.shard_count {
158 match ShardStorage::open_with_config(
159 i as u16,
160 data_dir,
161 config.wal_sync,
162 true,
163 true,
164 0,
165 ) {
166 Ok(storage) => {
167 let doc_eng = doc_engine.clone();
168 let shard_id = i as u16;
169 let barrier_clone = barrier.clone();
170 recovery_fns.push(Box::new(move |_idx, store| {
171 recover_embedded_shard(shard_id, &storage, store, &doc_eng);
172 barrier_clone.wait();
173 }));
174 let storage2 = ShardStorage::open_with_config(
175 i as u16,
176 data_dir,
177 config.wal_sync,
178 true,
179 true,
180 0,
181 )
182 .expect("failed to reopen shard storage for WAL writing");
183 wal_writers.push(Some(Box::new(storage2)));
184 }
185 Err(e) => {
186 tracing::error!("Failed to open shard {} storage: {}", i, e);
187 wal_writers.push(None);
188 let barrier_clone = barrier.clone();
189 recovery_fns.push(Box::new(move |_, _| {
190 barrier_clone.wait();
191 }));
192 }
193 }
194 }
195
196 let doc_wal_writer: Option<Mutex<Box<dyn WalWriter>>> =
197 match ShardStorage::open_with_config(
198 0,
199 data_dir,
200 config.wal_sync,
201 true,
202 true,
203 0,
204 ) {
205 Ok(storage) => Some(Mutex::new(Box::new(storage))),
206 Err(e) => {
207 tracing::error!("Failed to open doc WAL writer: {}", e);
208 None
209 }
210 };
211
212 let engine = Arc::new(ShardEngine::new_with_recovery(
213 config.shard_count,
214 wal_writers,
215 Some(recovery_fns),
216 ));
217 barrier.wait();
218
219 (engine, doc_wal_writer)
220 }
221 None => (Arc::new(ShardEngine::new(config.shard_count)), None),
222 };
223
224 Self {
225 engine,
226 doc_engine,
227 doc_wal,
228 }
229 }
230
231 pub fn engine(&self) -> &ShardEngine {
233 &self.engine
234 }
235
236 pub fn shared_engine(&self) -> Arc<ShardEngine> {
241 self.engine.clone()
242 }
243
244 #[must_use]
248 pub fn shared_doc_engine(&self) -> Arc<RwLock<DocEngine>> {
249 self.doc_engine.clone()
250 }
251
252 pub fn doc_create_collection(
256 &self,
257 collection: &str,
258 config: DocCollectionConfig,
259 ) -> Result<u16, DocError> {
260 self.doc_engine
261 .write()
262 .create_collection(collection, config)
263 }
264
265 pub fn doc_drop_collection(&self, collection: &str) -> bool {
267 self.doc_engine.write().drop_collection(collection)
268 }
269
270 #[must_use]
272 pub fn doc_collection_info(&self, collection: &str) -> Option<DocCollectionInfo> {
273 self.doc_engine.read().collection_info(collection)
274 }
275
276 pub fn doc_dictionary_info(&self, collection: &str) -> Result<DocDictionaryInfo, DocError> {
278 self.doc_engine.read().dictionary_info(collection)
279 }
280
281 pub fn doc_storage_info(&self, collection: &str) -> Result<DocStorageInfo, DocError> {
283 self.doc_engine.read().storage_info(collection)
284 }
285
286 pub fn doc_set(
288 &self,
289 collection: &str,
290 doc_id: &str,
291 json: &Value,
292 ) -> Result<DocSetResult, DocError> {
293 let result = self.doc_engine.write().set(collection, doc_id, json)?;
294 self.append_doc_wal(WalRecord::DocSet {
295 collection: collection.as_bytes().to_vec(),
296 doc_id: doc_id.as_bytes().to_vec(),
297 json: serde_json::to_vec(json).unwrap_or_default(),
298 });
299 Ok(result)
300 }
301
302 pub fn doc_insert(&self, collection: &str, json: &Value) -> Result<DocInsertResult, DocError> {
306 let result = self.doc_engine.write().insert(collection, json)?;
307 self.append_doc_wal(WalRecord::DocSet {
308 collection: collection.as_bytes().to_vec(),
309 doc_id: result.id.as_bytes().to_vec(),
310 json: serde_json::to_vec(json).unwrap_or_default(),
311 });
312 Ok(result)
313 }
314
315 pub fn doc_mset(
317 &self,
318 collection: &str,
319 entries: &[(&str, &Value)],
320 ) -> Result<Vec<DocSetResult>, DocError> {
321 let mut engine = self.doc_engine.write();
322 let results: Result<Vec<DocSetResult>, DocError> = entries
323 .iter()
324 .map(|(doc_id, json)| engine.set(collection, doc_id, json))
325 .collect();
326 drop(engine);
327 if let Ok(ref _res) = results {
328 for (doc_id, json) in entries {
329 self.append_doc_wal(WalRecord::DocSet {
330 collection: collection.as_bytes().to_vec(),
331 doc_id: doc_id.as_bytes().to_vec(),
332 json: serde_json::to_vec(json).unwrap_or_default(),
333 });
334 }
335 }
336 results
337 }
338
339 pub fn doc_get(
344 &self,
345 collection: &str,
346 doc_id: &str,
347 projection: Option<&[&str]>,
348 ) -> Result<Option<Value>, DocError> {
349 self.doc_engine.read().get(collection, doc_id, projection)
350 }
351
352 pub fn doc_mget(
357 &self,
358 collection: &str,
359 doc_ids: &[&str],
360 ) -> Result<Vec<Option<Value>>, DocError> {
361 let engine = self.doc_engine.read();
362 doc_ids
363 .iter()
364 .map(|doc_id| engine.get(collection, doc_id, None))
365 .collect()
366 }
367
368 pub fn doc_update(
373 &self,
374 collection: &str,
375 doc_id: &str,
376 mutations: &[DocUpdateMutation],
377 ) -> Result<bool, DocError> {
378 let mut engine = self.doc_engine.write();
379 let updated = engine.update(collection, doc_id, mutations)?;
380 if updated {
381 if let Ok(Some(doc)) = engine.get(collection, doc_id, None) {
382 drop(engine);
383 self.append_doc_wal(WalRecord::DocSet {
384 collection: collection.as_bytes().to_vec(),
385 doc_id: doc_id.as_bytes().to_vec(),
386 json: serde_json::to_vec(&doc).unwrap_or_default(),
387 });
388 }
389 }
390 Ok(updated)
391 }
392
393 pub fn doc_del(&self, collection: &str, doc_id: &str) -> Result<bool, DocError> {
395 let deleted = self.doc_engine.write().del(collection, doc_id)?;
396 if deleted {
397 self.append_doc_wal(WalRecord::DocDel {
398 collection: collection.as_bytes().to_vec(),
399 doc_id: doc_id.as_bytes().to_vec(),
400 });
401 }
402 Ok(deleted)
403 }
404
405 pub fn doc_exists(&self, collection: &str, doc_id: &str) -> Result<bool, DocError> {
407 self.doc_engine.read().exists(collection, doc_id)
408 }
409
410 pub fn doc_create_index(
412 &self,
413 collection: &str,
414 field: &str,
415 index_type: &str,
416 ) -> Result<(), DocError> {
417 let idx_type = parse_index_type_str(index_type)?;
418 self.doc_engine
419 .write()
420 .create_index(collection, field, idx_type)
421 }
422
423 pub fn doc_drop_index(&self, collection: &str, field: &str) -> Result<(), DocError> {
425 self.doc_engine.write().drop_index(collection, field)
426 }
427
428 pub fn doc_indexes(&self, collection: &str) -> Result<Vec<(String, String)>, DocError> {
433 let indexes = self.doc_engine.read().indexes(collection)?;
434 Ok(indexes
435 .into_iter()
436 .map(|(path, idx_type)| {
437 let type_name = match idx_type {
438 IndexType::Hash => "hash",
439 IndexType::Sorted => "sorted",
440 IndexType::Array => "array",
441 IndexType::Unique => "unique",
442 };
443 (path, type_name.to_string())
444 })
445 .collect())
446 }
447
448 #[allow(clippy::too_many_arguments)]
450 pub fn doc_find(
451 &self,
452 collection: &str,
453 where_clause: &str,
454 projection: Option<&[&str]>,
455 limit: Option<usize>,
456 offset: usize,
457 order_by: Option<&str>,
458 order_desc: bool,
459 ) -> Result<Vec<Value>, DocError> {
460 self.doc_engine.read().find(
461 collection,
462 where_clause,
463 projection,
464 limit,
465 offset,
466 order_by,
467 order_desc,
468 )
469 }
470
471 pub fn doc_count(&self, collection: &str, where_clause: &str) -> Result<u64, DocError> {
473 self.doc_engine.read().count(collection, where_clause)
474 }
475
476 pub fn get(&self, key: &str) -> Option<Vec<u8>> {
480 match self.engine.dispatch_blocking(Command::Get {
481 key: key.as_bytes().to_vec(),
482 }) {
483 CommandResponse::BulkString(v) => Some(v),
484 _ => None,
485 }
486 }
487
488 pub fn set(&self, key: &str, value: &[u8]) {
490 self.engine.dispatch_blocking(Command::Set {
491 key: key.as_bytes().to_vec(),
492 value: value.to_vec(),
493 ex: None,
494 px: None,
495 nx: false,
496 xx: false,
497 });
498 }
499
500 pub fn set_ex(&self, key: &str, value: &[u8], ttl: Duration) {
502 self.engine.dispatch_blocking(Command::Set {
503 key: key.as_bytes().to_vec(),
504 value: value.to_vec(),
505 ex: Some(ttl.as_secs()),
506 px: None,
507 nx: false,
508 xx: false,
509 });
510 }
511
512 pub fn del(&self, key: &str) -> bool {
514 matches!(
515 self.engine.dispatch_blocking(Command::Del {
516 keys: vec![key.as_bytes().to_vec()],
517 }),
518 CommandResponse::Integer(n) if n > 0
519 )
520 }
521
522 pub fn exists(&self, key: &str) -> bool {
524 matches!(
525 self.engine.dispatch_blocking(Command::Exists {
526 keys: vec![key.as_bytes().to_vec()],
527 }),
528 CommandResponse::Integer(n) if n > 0
529 )
530 }
531
532 pub fn incr(&self, key: &str) -> Result<i64, String> {
534 match self.engine.dispatch_blocking(Command::Incr {
535 key: key.as_bytes().to_vec(),
536 }) {
537 CommandResponse::Integer(n) => Ok(n),
538 CommandResponse::Error(e) => Err(e),
539 _ => Err("unexpected response".into()),
540 }
541 }
542
543 pub fn getset(&self, key: &str, value: &[u8]) -> Option<Vec<u8>> {
545 match self.engine.dispatch_blocking(Command::GetSet {
546 key: key.as_bytes().to_vec(),
547 value: value.to_vec(),
548 }) {
549 CommandResponse::BulkString(v) => Some(v),
550 _ => None,
551 }
552 }
553
554 pub fn append(&self, key: &str, value: &[u8]) -> i64 {
556 match self.engine.dispatch_blocking(Command::Append {
557 key: key.as_bytes().to_vec(),
558 value: value.to_vec(),
559 }) {
560 CommandResponse::Integer(n) => n,
561 _ => 0,
562 }
563 }
564
565 pub fn strlen(&self, key: &str) -> i64 {
567 match self.engine.dispatch_blocking(Command::Strlen {
568 key: key.as_bytes().to_vec(),
569 }) {
570 CommandResponse::Integer(n) => n,
571 _ => 0,
572 }
573 }
574
575 pub fn decr(&self, key: &str) -> Result<i64, String> {
577 match self.engine.dispatch_blocking(Command::Decr {
578 key: key.as_bytes().to_vec(),
579 }) {
580 CommandResponse::Integer(n) => Ok(n),
581 CommandResponse::Error(e) => Err(e),
582 _ => Err("unexpected response".into()),
583 }
584 }
585
586 pub fn incrby(&self, key: &str, delta: i64) -> Result<i64, String> {
588 match self.engine.dispatch_blocking(Command::IncrBy {
589 key: key.as_bytes().to_vec(),
590 delta,
591 }) {
592 CommandResponse::Integer(n) => Ok(n),
593 CommandResponse::Error(e) => Err(e),
594 _ => Err("unexpected response".into()),
595 }
596 }
597
598 pub fn decrby(&self, key: &str, delta: i64) -> Result<i64, String> {
600 match self.engine.dispatch_blocking(Command::DecrBy {
601 key: key.as_bytes().to_vec(),
602 delta,
603 }) {
604 CommandResponse::Integer(n) => Ok(n),
605 CommandResponse::Error(e) => Err(e),
606 _ => Err("unexpected response".into()),
607 }
608 }
609
610 pub fn mget(&self, keys: &[&str]) -> Vec<Option<Vec<u8>>> {
614 let cmd_keys: Vec<Vec<u8>> = keys.iter().map(|k| k.as_bytes().to_vec()).collect();
615 match self
616 .engine
617 .dispatch_blocking(Command::MGet { keys: cmd_keys })
618 {
619 CommandResponse::Array(items) => items
620 .into_iter()
621 .map(|r| match r {
622 CommandResponse::BulkString(v) => Some(v),
623 _ => None,
624 })
625 .collect(),
626 _ => vec![None; keys.len()],
627 }
628 }
629
630 pub fn mset(&self, entries: &[(&str, &[u8])]) {
632 let cmd_entries: Vec<(Vec<u8>, Vec<u8>)> = entries
633 .iter()
634 .map(|(k, v)| (k.as_bytes().to_vec(), v.to_vec()))
635 .collect();
636 self.engine.dispatch_blocking(Command::MSet {
637 entries: cmd_entries,
638 });
639 }
640
641 pub fn setnx(&self, key: &str, value: &[u8]) -> bool {
645 matches!(
646 self.engine.dispatch_blocking(Command::SetNx {
647 key: key.as_bytes().to_vec(),
648 value: value.to_vec(),
649 }),
650 CommandResponse::Integer(1)
651 )
652 }
653
654 pub fn expire(&self, key: &str, seconds: u64) -> bool {
656 matches!(
657 self.engine.dispatch_blocking(Command::Expire {
658 key: key.as_bytes().to_vec(),
659 seconds,
660 }),
661 CommandResponse::Integer(1)
662 )
663 }
664
665 pub fn persist(&self, key: &str) -> bool {
669 matches!(
670 self.engine.dispatch_blocking(Command::Persist {
671 key: key.as_bytes().to_vec(),
672 }),
673 CommandResponse::Integer(1)
674 )
675 }
676
677 pub fn ttl(&self, key: &str) -> Option<i64> {
681 match self.engine.dispatch_blocking(Command::Ttl {
682 key: key.as_bytes().to_vec(),
683 }) {
684 CommandResponse::Integer(n) if n >= 0 => Some(n),
685 _ => None,
686 }
687 }
688
689 pub fn key_type(&self, key: &str) -> String {
691 match self.engine.dispatch_blocking(Command::Type {
692 key: key.as_bytes().to_vec(),
693 }) {
694 CommandResponse::SimpleString(s) => s,
695 _ => "none".into(),
696 }
697 }
698
699 pub fn keys(&self, pattern: &str) -> Vec<Vec<u8>> {
701 match self.engine.dispatch_blocking(Command::Keys {
702 pattern: pattern.to_string(),
703 }) {
704 CommandResponse::Array(items) => items
705 .into_iter()
706 .filter_map(|r| match r {
707 CommandResponse::BulkString(v) => Some(v),
708 _ => None,
709 })
710 .collect(),
711 _ => vec![],
712 }
713 }
714
715 pub fn lpush(&self, key: &str, values: &[&[u8]]) -> i64 {
719 match self.engine.dispatch_blocking(Command::LPush {
720 key: key.as_bytes().to_vec(),
721 values: values.iter().map(|v| v.to_vec()).collect(),
722 }) {
723 CommandResponse::Integer(n) => n,
724 _ => 0,
725 }
726 }
727
728 pub fn rpush(&self, key: &str, values: &[&[u8]]) -> i64 {
730 match self.engine.dispatch_blocking(Command::RPush {
731 key: key.as_bytes().to_vec(),
732 values: values.iter().map(|v| v.to_vec()).collect(),
733 }) {
734 CommandResponse::Integer(n) => n,
735 _ => 0,
736 }
737 }
738
739 pub fn lrange(&self, key: &str, start: i64, stop: i64) -> Vec<Vec<u8>> {
743 match self.engine.dispatch_blocking(Command::LRange {
744 key: key.as_bytes().to_vec(),
745 start,
746 stop,
747 }) {
748 CommandResponse::Array(items) => items
749 .into_iter()
750 .filter_map(|r| match r {
751 CommandResponse::BulkString(v) => Some(v),
752 _ => None,
753 })
754 .collect(),
755 _ => vec![],
756 }
757 }
758
759 pub fn lpop(&self, key: &str) -> Option<Vec<u8>> {
761 match self.engine.dispatch_blocking(Command::LPop {
762 key: key.as_bytes().to_vec(),
763 }) {
764 CommandResponse::BulkString(v) => Some(v),
765 _ => None,
766 }
767 }
768
769 pub fn rpop(&self, key: &str) -> Option<Vec<u8>> {
771 match self.engine.dispatch_blocking(Command::RPop {
772 key: key.as_bytes().to_vec(),
773 }) {
774 CommandResponse::BulkString(v) => Some(v),
775 _ => None,
776 }
777 }
778
779 pub fn llen(&self, key: &str) -> i64 {
781 match self.engine.dispatch_blocking(Command::LLen {
782 key: key.as_bytes().to_vec(),
783 }) {
784 CommandResponse::Integer(n) => n,
785 _ => 0,
786 }
787 }
788
789 pub fn lindex(&self, key: &str, index: i64) -> Option<Vec<u8>> {
791 match self.engine.dispatch_blocking(Command::LIndex {
792 key: key.as_bytes().to_vec(),
793 index,
794 }) {
795 CommandResponse::BulkString(v) => Some(v),
796 _ => None,
797 }
798 }
799
800 pub fn hset(&self, key: &str, field: &str, value: &[u8]) {
804 self.engine.dispatch_blocking(Command::HSet {
805 key: key.as_bytes().to_vec(),
806 fields: vec![(field.as_bytes().to_vec(), value.to_vec())],
807 });
808 }
809
810 pub fn hget(&self, key: &str, field: &str) -> Option<Vec<u8>> {
812 match self.engine.dispatch_blocking(Command::HGet {
813 key: key.as_bytes().to_vec(),
814 field: field.as_bytes().to_vec(),
815 }) {
816 CommandResponse::BulkString(v) => Some(v),
817 _ => None,
818 }
819 }
820
821 pub fn hdel(&self, key: &str, fields: &[&str]) -> i64 {
823 match self.engine.dispatch_blocking(Command::HDel {
824 key: key.as_bytes().to_vec(),
825 fields: fields.iter().map(|f| f.as_bytes().to_vec()).collect(),
826 }) {
827 CommandResponse::Integer(n) => n,
828 _ => 0,
829 }
830 }
831
832 pub fn hgetall(&self, key: &str) -> Vec<(Vec<u8>, Vec<u8>)> {
834 match self.engine.dispatch_blocking(Command::HGetAll {
835 key: key.as_bytes().to_vec(),
836 }) {
837 CommandResponse::Array(items) => {
838 let mut result = Vec::new();
839 let mut iter = items.into_iter();
840 while let (
841 Some(CommandResponse::BulkString(k)),
842 Some(CommandResponse::BulkString(v)),
843 ) = (iter.next(), iter.next())
844 {
845 result.push((k, v));
846 }
847 result
848 }
849 _ => vec![],
850 }
851 }
852
853 pub fn hlen(&self, key: &str) -> i64 {
855 match self.engine.dispatch_blocking(Command::HLen {
856 key: key.as_bytes().to_vec(),
857 }) {
858 CommandResponse::Integer(n) => n,
859 _ => 0,
860 }
861 }
862
863 pub fn hexists(&self, key: &str, field: &str) -> bool {
865 matches!(
866 self.engine.dispatch_blocking(Command::HExists {
867 key: key.as_bytes().to_vec(),
868 field: field.as_bytes().to_vec(),
869 }),
870 CommandResponse::Integer(1)
871 )
872 }
873
874 pub fn hincrby(&self, key: &str, field: &str, delta: i64) -> Result<i64, String> {
876 match self.engine.dispatch_blocking(Command::HIncrBy {
877 key: key.as_bytes().to_vec(),
878 field: field.as_bytes().to_vec(),
879 delta,
880 }) {
881 CommandResponse::Integer(n) => Ok(n),
882 CommandResponse::Error(e) => Err(e),
883 _ => Err("unexpected response".into()),
884 }
885 }
886
887 pub fn sadd(&self, key: &str, members: &[&[u8]]) -> i64 {
891 match self.engine.dispatch_blocking(Command::SAdd {
892 key: key.as_bytes().to_vec(),
893 members: members.iter().map(|m| m.to_vec()).collect(),
894 }) {
895 CommandResponse::Integer(n) => n,
896 _ => 0,
897 }
898 }
899
900 pub fn smembers(&self, key: &str) -> Vec<Vec<u8>> {
902 match self.engine.dispatch_blocking(Command::SMembers {
903 key: key.as_bytes().to_vec(),
904 }) {
905 CommandResponse::Array(items) => items
906 .into_iter()
907 .filter_map(|r| match r {
908 CommandResponse::BulkString(v) => Some(v),
909 _ => None,
910 })
911 .collect(),
912 _ => vec![],
913 }
914 }
915
916 pub fn srem(&self, key: &str, members: &[&[u8]]) -> i64 {
918 match self.engine.dispatch_blocking(Command::SRem {
919 key: key.as_bytes().to_vec(),
920 members: members.iter().map(|m| m.to_vec()).collect(),
921 }) {
922 CommandResponse::Integer(n) => n,
923 _ => 0,
924 }
925 }
926
927 pub fn sismember(&self, key: &str, member: &[u8]) -> bool {
929 matches!(
930 self.engine.dispatch_blocking(Command::SIsMember {
931 key: key.as_bytes().to_vec(),
932 member: member.to_vec(),
933 }),
934 CommandResponse::Integer(1)
935 )
936 }
937
938 pub fn scard(&self, key: &str) -> i64 {
940 match self.engine.dispatch_blocking(Command::SCard {
941 key: key.as_bytes().to_vec(),
942 }) {
943 CommandResponse::Integer(n) => n,
944 _ => 0,
945 }
946 }
947
948 pub fn db_size(&self) -> i64 {
952 match self.engine.dispatch_blocking(Command::DbSize) {
953 CommandResponse::Integer(n) => n,
954 _ => 0,
955 }
956 }
957
958 pub fn flush_db(&self) {
960 self.engine.dispatch_blocking(Command::FlushDb);
961 }
962
963 pub fn vector_set(&self, index: &str, dim: usize, vector: &[f32]) -> Result<u64, String> {
969 match self.engine.dispatch_blocking(Command::VecSet {
970 key: index.as_bytes().to_vec(),
971 dimensions: dim,
972 vector: vector.to_vec(),
973 }) {
974 CommandResponse::Integer(id) => Ok(id as u64),
975 CommandResponse::Error(e) => Err(e),
976 _ => Err("unexpected response".into()),
977 }
978 }
979
980 pub fn vector_search(
984 &self,
985 index: &str,
986 query: &[f32],
987 k: usize,
988 ) -> Result<Vec<(u64, f32)>, String> {
989 match self.engine.dispatch_blocking(Command::VecQuery {
990 key: index.as_bytes().to_vec(),
991 k,
992 vector: query.to_vec(),
993 }) {
994 CommandResponse::Array(items) => {
995 let mut results = Vec::with_capacity(items.len());
996 for item in items {
997 if let CommandResponse::Array(pair) = item {
998 if pair.len() == 2 {
999 if let (
1000 CommandResponse::Integer(id),
1001 CommandResponse::BulkString(dist_bytes),
1002 ) = (&pair[0], &pair[1])
1003 {
1004 let dist: f32 = String::from_utf8_lossy(dist_bytes)
1005 .parse()
1006 .unwrap_or(f32::MAX);
1007 results.push((*id as u64, dist));
1008 }
1009 }
1010 }
1011 }
1012 Ok(results)
1013 }
1014 CommandResponse::Error(e) => Err(e),
1015 _ => Err("unexpected response".into()),
1016 }
1017 }
1018
1019 pub fn vector_del(&self, index: &str) -> Result<bool, String> {
1021 match self.engine.dispatch_blocking(Command::VecDel {
1022 key: index.as_bytes().to_vec(),
1023 }) {
1024 CommandResponse::Integer(n) => Ok(n > 0),
1025 CommandResponse::Error(e) => Err(e),
1026 _ => Err("unexpected response".into()),
1027 }
1028 }
1029
1030 fn append_doc_wal(&self, record: WalRecord) {
1031 if let Some(ref wal) = self.doc_wal {
1032 wal.lock().append(&record);
1033 }
1034 }
1035
1036 #[cfg(feature = "server")]
1049 pub fn start_listener(
1050 &self,
1051 addr: &str,
1052 ) -> Result<(JoinHandle<()>, tokio::sync::watch::Sender<bool>), String> {
1053 let config = kora_server::ServerConfig {
1054 bind_address: addr.to_string(),
1055 worker_count: self.engine.shard_count(),
1056 ..Default::default()
1057 };
1058 let server = kora_server::KoraServer::new(config);
1059 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1060
1061 let handle = tokio::spawn(async move {
1062 if let Err(e) = server.run(shutdown_rx).await {
1063 tracing::error!("Hybrid server error: {}", e);
1064 }
1065 });
1066
1067 Ok((handle, shutdown_tx))
1068 }
1069}
1070
1071fn parse_index_type_str(raw: &str) -> Result<IndexType, DocError> {
1072 match raw.to_ascii_lowercase().as_str() {
1073 "hash" => Ok(IndexType::Hash),
1074 "sorted" => Ok(IndexType::Sorted),
1075 "array" => Ok(IndexType::Array),
1076 "unique" => Ok(IndexType::Unique),
1077 _ => Err(DocError::InvalidMutation(format!(
1078 "unknown index type '{}' (expected hash|sorted|array|unique)",
1079 raw
1080 ))),
1081 }
1082}
1083
1084fn recover_embedded_shard(
1085 shard_id: u16,
1086 storage: &ShardStorage,
1087 store: &mut ShardStore,
1088 doc_engine: &RwLock<DocEngine>,
1089) {
1090 match storage.rdb_load() {
1091 Ok(entries) if !entries.is_empty() => {
1092 for entry in &entries {
1093 let cmd = rdb_entry_to_restore_command(entry);
1094 store.execute(cmd);
1095 }
1096 tracing::info!(
1097 "Shard {} recovered {} entries from RDB",
1098 shard_id,
1099 entries.len()
1100 );
1101 }
1102 Ok(_) => {}
1103 Err(e) => tracing::error!("Shard {} RDB load failed: {}", shard_id, e),
1104 }
1105
1106 match storage.wal_replay(|entry| match entry {
1107 WalEntry::DocSet {
1108 collection,
1109 doc_id,
1110 json,
1111 } => {
1112 let col = String::from_utf8_lossy(&collection);
1113 let did = String::from_utf8_lossy(&doc_id);
1114 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&json) {
1115 let mut engine = doc_engine.write();
1116 let _ = engine.create_collection(&col, Default::default());
1117 let _ = engine.set(&col, &did, &value);
1118 }
1119 }
1120 WalEntry::DocDel { collection, doc_id } => {
1121 let col = String::from_utf8_lossy(&collection);
1122 let did = String::from_utf8_lossy(&doc_id);
1123 let mut engine = doc_engine.write();
1124 let _ = engine.del(&col, &did);
1125 }
1126 WalEntry::VecSet {
1127 key,
1128 dimensions,
1129 vector,
1130 } => {
1131 let floats: Vec<f32> = vector
1132 .chunks_exact(4)
1133 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1134 .collect();
1135 store.execute(Command::VecSet {
1136 key,
1137 dimensions,
1138 vector: floats,
1139 });
1140 }
1141 WalEntry::VecDel { key } => {
1142 store.execute(Command::VecDel { key });
1143 }
1144 other => {
1145 store.execute(wal_entry_to_restore_command(other));
1146 }
1147 }) {
1148 Ok(count) if count > 0 => {
1149 tracing::info!("Shard {} replayed {} WAL entries", shard_id, count);
1150 }
1151 Ok(_) => {}
1152 Err(e) => tracing::error!("Shard {} WAL replay failed: {}", shard_id, e),
1153 }
1154}
1155
1156fn rdb_entry_to_restore_command(entry: &kora_storage::rdb::RdbEntry) -> Command {
1157 use kora_storage::rdb::RdbValue;
1158 match &entry.value {
1159 RdbValue::String(v) => Command::Set {
1160 key: entry.key.clone(),
1161 value: v.clone(),
1162 ex: None,
1163 px: entry.ttl_ms,
1164 nx: false,
1165 xx: false,
1166 },
1167 RdbValue::Int(n) => Command::Set {
1168 key: entry.key.clone(),
1169 value: n.to_string().into_bytes(),
1170 ex: None,
1171 px: entry.ttl_ms,
1172 nx: false,
1173 xx: false,
1174 },
1175 RdbValue::List(items) => Command::RPush {
1176 key: entry.key.clone(),
1177 values: items.clone(),
1178 },
1179 RdbValue::Set(members) => Command::SAdd {
1180 key: entry.key.clone(),
1181 members: members.clone(),
1182 },
1183 RdbValue::Hash(fields) => Command::HSet {
1184 key: entry.key.clone(),
1185 fields: fields.clone(),
1186 },
1187 }
1188}
1189
1190fn wal_entry_to_restore_command(entry: WalEntry) -> Command {
1191 match entry {
1192 WalEntry::Set { key, value, ttl_ms } => Command::Set {
1193 key,
1194 value,
1195 ex: None,
1196 px: ttl_ms,
1197 nx: false,
1198 xx: false,
1199 },
1200 WalEntry::Del { key } => Command::Del { keys: vec![key] },
1201 WalEntry::Expire { key, ttl_ms } => Command::PExpire {
1202 key,
1203 millis: ttl_ms,
1204 },
1205 WalEntry::LPush { key, values } => Command::LPush { key, values },
1206 WalEntry::RPush { key, values } => Command::RPush { key, values },
1207 WalEntry::HSet { key, fields } => Command::HSet { key, fields },
1208 WalEntry::SAdd { key, members } => Command::SAdd { key, members },
1209 WalEntry::FlushDb => Command::FlushDb,
1210 _ => unreachable!("doc/vec entries handled separately"),
1211 }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use serde_json::json;
1217
1218 use super::*;
1219
1220 #[test]
1221 fn test_basic_set_get() {
1222 let db = Database::open(Config {
1223 shard_count: 2,
1224 ..Config::default()
1225 });
1226 db.set("hello", b"world");
1227 assert_eq!(db.get("hello"), Some(b"world".to_vec()));
1228 assert_eq!(db.get("nonexistent"), None);
1229 }
1230
1231 #[test]
1232 fn test_del() {
1233 let db = Database::open(Config {
1234 shard_count: 2,
1235 ..Config::default()
1236 });
1237 db.set("k", b"v");
1238 assert!(db.del("k"));
1239 assert!(!db.del("k"));
1240 assert_eq!(db.get("k"), None);
1241 }
1242
1243 #[test]
1244 fn test_incr() {
1245 let db = Database::open(Config {
1246 shard_count: 2,
1247 ..Config::default()
1248 });
1249 assert_eq!(db.incr("counter").unwrap(), 1);
1250 assert_eq!(db.incr("counter").unwrap(), 2);
1251 assert_eq!(db.incr("counter").unwrap(), 3);
1252 }
1253
1254 #[test]
1255 fn test_list_operations() {
1256 let db = Database::open(Config {
1257 shard_count: 2,
1258 ..Config::default()
1259 });
1260 db.rpush("list", &[b"a", b"b", b"c"]);
1261 let items = db.lrange("list", 0, -1);
1262 assert_eq!(items, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
1263 }
1264
1265 #[test]
1266 fn test_hash_operations() {
1267 let db = Database::open(Config {
1268 shard_count: 2,
1269 ..Config::default()
1270 });
1271 db.hset("user", "name", b"Alice");
1272 assert_eq!(db.hget("user", "name"), Some(b"Alice".to_vec()));
1273 assert_eq!(db.hget("user", "age"), None);
1274 }
1275
1276 #[test]
1277 fn test_set_operations() {
1278 let db = Database::open(Config {
1279 shard_count: 2,
1280 ..Config::default()
1281 });
1282 db.sadd("tags", &[b"rust", b"cache", b"rust"]);
1283 let members = db.smembers("tags");
1284 assert_eq!(members.len(), 2); }
1286
1287 #[test]
1288 fn test_db_size_and_flush() {
1289 let db = Database::open(Config {
1290 shard_count: 2,
1291 ..Config::default()
1292 });
1293 db.set("a", b"1");
1294 db.set("b", b"2");
1295 assert_eq!(db.db_size(), 2);
1296 db.flush_db();
1297 assert_eq!(db.db_size(), 0);
1298 }
1299
1300 #[test]
1301 fn test_concurrent_access() {
1302 let db = std::sync::Arc::new(Database::open(Config {
1303 shard_count: 4,
1304 ..Config::default()
1305 }));
1306 let mut handles = vec![];
1307 for t in 0..4 {
1308 let db = db.clone();
1309 handles.push(std::thread::spawn(move || {
1310 for i in 0..100 {
1311 let key = format!("t{}:k{}", t, i);
1312 let val = format!("v{}", i);
1313 db.set(&key, val.as_bytes());
1314 assert_eq!(db.get(&key), Some(val.into_bytes()));
1315 }
1316 }));
1317 }
1318 for h in handles {
1319 h.join().unwrap();
1320 }
1321 }
1322
1323 #[test]
1324 fn test_vector_set_search_del() {
1325 let db = Database::open(Config {
1326 shard_count: 2,
1327 ..Config::default()
1328 });
1329
1330 let v1 = vec![1.0f32, 0.0, 0.0, 0.0];
1331 let v2 = vec![0.0f32, 1.0, 0.0, 0.0];
1332 let v3 = vec![1.0f32, 1.0, 0.0, 0.0];
1333
1334 let id1 = db.vector_set("my_idx", 4, &v1).unwrap();
1335 let id2 = db.vector_set("my_idx", 4, &v2).unwrap();
1336 let id3 = db.vector_set("my_idx", 4, &v3).unwrap();
1337 assert_ne!(id1, id2);
1338 assert_ne!(id2, id3);
1339
1340 let results = db.vector_search("my_idx", &v1, 3).unwrap();
1341 assert!(!results.is_empty());
1342 assert!(
1343 results[0].1 < 0.001,
1344 "first result should be near-exact match"
1345 );
1346
1347 assert!(db.vector_del("my_idx").unwrap());
1348 assert!(!db.vector_del("my_idx").unwrap());
1349
1350 let results = db.vector_search("my_idx", &v1, 3).unwrap();
1351 assert!(results.is_empty());
1352 }
1353
1354 #[test]
1355 fn test_vector_dimension_mismatch() {
1356 let db = Database::open(Config {
1357 shard_count: 2,
1358 ..Config::default()
1359 });
1360 db.vector_set("idx", 3, &[1.0, 2.0, 3.0]).unwrap();
1361 let result = db.vector_set("idx", 5, &[1.0, 2.0, 3.0, 4.0, 5.0]);
1362 assert!(result.is_err());
1363 }
1364
1365 #[test]
1366 fn test_vector_search_nonexistent_index() {
1367 let db = Database::open(Config {
1368 shard_count: 2,
1369 ..Config::default()
1370 });
1371 let results = db.vector_search("nonexistent", &[1.0, 2.0], 5).unwrap();
1372 assert!(results.is_empty());
1373 }
1374
1375 #[test]
1376 fn test_document_crud() {
1377 let db = Database::open(Config {
1378 shard_count: 2,
1379 ..Config::default()
1380 });
1381 db.doc_create_collection("users", DocCollectionConfig::default())
1382 .expect("collection should be created");
1383
1384 let set = db
1385 .doc_set(
1386 "users",
1387 "doc:1",
1388 &json!({
1389 "name": "Augustus",
1390 "age": 30,
1391 "address": {"city": "Accra"}
1392 }),
1393 )
1394 .expect("set should succeed");
1395 assert!(set.created);
1396 assert!(db
1397 .doc_exists("users", "doc:1")
1398 .expect("exists should succeed"));
1399
1400 let full = db
1401 .doc_get("users", "doc:1", None)
1402 .expect("get should succeed")
1403 .expect("document should exist");
1404 assert_eq!(
1405 full,
1406 json!({
1407 "name": "Augustus",
1408 "age": 30,
1409 "address": {"city": "Accra"}
1410 })
1411 );
1412
1413 let projection = db
1414 .doc_get("users", "doc:1", Some(&["name", "address.city"]))
1415 .expect("projected get should succeed")
1416 .expect("document should exist");
1417 assert_eq!(
1418 projection,
1419 json!({"name": "Augustus", "address": {"city": "Accra"}})
1420 );
1421
1422 assert!(db.doc_del("users", "doc:1").expect("delete should succeed"));
1423 assert!(!db
1424 .doc_exists("users", "doc:1")
1425 .expect("exists should succeed"));
1426 }
1427
1428 #[test]
1429 fn test_document_collection_drop() {
1430 let db = Database::open(Config {
1431 shard_count: 2,
1432 ..Config::default()
1433 });
1434 db.doc_create_collection("users", DocCollectionConfig::default())
1435 .expect("collection should be created");
1436 db.doc_set("users", "doc:1", &json!({"name": "Augustus"}))
1437 .expect("set should succeed");
1438
1439 let info = db
1440 .doc_collection_info("users")
1441 .expect("collection info should exist");
1442 assert_eq!(info.doc_count, 1);
1443
1444 assert!(db.doc_drop_collection("users"));
1445 assert!(!db.doc_drop_collection("users"));
1446 assert!(db.doc_collection_info("users").is_none());
1447 }
1448
1449 #[test]
1450 fn test_document_batch_ops() {
1451 let db = Database::open(Config {
1452 shard_count: 2,
1453 ..Config::default()
1454 });
1455 db.doc_create_collection("users", DocCollectionConfig::default())
1456 .expect("collection should be created");
1457
1458 let first = json!({"name": "Augustus", "age": 30});
1459 let second = json!({"name": "Ada", "age": 28});
1460 let results = db
1461 .doc_mset("users", &[("doc:1", &first), ("doc:2", &second)])
1462 .expect("mset should succeed");
1463 assert_eq!(results.len(), 2);
1464 assert!(results[0].created);
1465 assert!(results[1].created);
1466
1467 let docs = db
1468 .doc_mget("users", &["doc:1", "doc:2", "doc:missing"])
1469 .expect("mget should succeed");
1470 assert_eq!(docs, vec![Some(first), Some(second), None]);
1471
1472 let dictinfo = db
1473 .doc_dictionary_info("users")
1474 .expect("dictionary info should succeed");
1475 assert_eq!(dictinfo.collection_name, "users");
1476 assert!(dictinfo.dictionary_entries >= 1);
1477
1478 let storage = db
1479 .doc_storage_info("users")
1480 .expect("storage info should succeed");
1481 assert_eq!(storage.collection_name, "users");
1482 assert_eq!(storage.doc_count, 2);
1483 assert!(storage.total_packed_bytes > 0);
1484 }
1485
1486 #[test]
1487 fn test_document_update_ops() {
1488 let db = Database::open(Config {
1489 shard_count: 2,
1490 ..Config::default()
1491 });
1492 db.doc_create_collection("users", DocCollectionConfig::default())
1493 .expect("collection should be created");
1494 db.doc_set(
1495 "users",
1496 "doc:1",
1497 &json!({
1498 "name": "Augustus",
1499 "score": 10,
1500 "address": {"city": "Accra"},
1501 "tags": ["rust", "systems", "rust"]
1502 }),
1503 )
1504 .expect("set should succeed");
1505
1506 let updated = db
1507 .doc_update(
1508 "users",
1509 "doc:1",
1510 &[
1511 DocUpdateMutation::Set {
1512 path: "address.city".to_string(),
1513 value: json!("London"),
1514 },
1515 DocUpdateMutation::Incr {
1516 path: "score".to_string(),
1517 delta: 0.5,
1518 },
1519 DocUpdateMutation::Push {
1520 path: "tags".to_string(),
1521 value: json!("cache"),
1522 },
1523 DocUpdateMutation::Pull {
1524 path: "tags".to_string(),
1525 value: json!("rust"),
1526 },
1527 ],
1528 )
1529 .expect("update should succeed");
1530 assert!(updated);
1531
1532 let doc = db
1533 .doc_get("users", "doc:1", None)
1534 .expect("get should succeed")
1535 .expect("doc should exist");
1536 assert_eq!(
1537 doc,
1538 json!({
1539 "name": "Augustus",
1540 "score": 10.5,
1541 "address": {"city": "London"},
1542 "tags": ["systems", "cache"]
1543 })
1544 );
1545 }
1546
1547 #[test]
1548 fn test_persistence_survives_restart() {
1549 let dir = tempfile::TempDir::new().unwrap();
1550 let data_dir = dir.path().to_path_buf();
1551
1552 {
1553 let db = Database::open(Config {
1554 shard_count: 2,
1555 data_dir: Some(data_dir.clone()),
1556 wal_sync: SyncPolicy::EveryWrite,
1557 });
1558 db.set("greeting", b"hello world");
1559 db.set("counter", b"42");
1560 db.hset("user:1", "name", b"Augustus");
1561 db.hset("user:1", "city", b"Accra");
1562 db.rpush("tasks", &[b"task-1", b"task-2"]);
1563 db.sadd("tags", &[b"rust", b"cache"]);
1564 }
1565
1566 {
1567 let db = Database::open(Config {
1568 shard_count: 2,
1569 data_dir: Some(data_dir.clone()),
1570 wal_sync: SyncPolicy::EveryWrite,
1571 });
1572 assert_eq!(db.get("greeting"), Some(b"hello world".to_vec()));
1573 assert_eq!(db.get("counter"), Some(b"42".to_vec()));
1574 assert_eq!(db.hget("user:1", "name"), Some(b"Augustus".to_vec()));
1575 assert_eq!(db.hget("user:1", "city"), Some(b"Accra".to_vec()));
1576 assert_eq!(
1577 db.lrange("tasks", 0, -1),
1578 vec![b"task-1".to_vec(), b"task-2".to_vec()]
1579 );
1580 let members = db.smembers("tags");
1581 assert_eq!(members.len(), 2);
1582 assert!(db.sismember("tags", b"rust"));
1583 assert!(db.sismember("tags", b"cache"));
1584 }
1585 }
1586
1587 #[test]
1588 fn test_persistence_doc_survives_restart() {
1589 let dir = tempfile::TempDir::new().unwrap();
1590 let data_dir = dir.path().to_path_buf();
1591
1592 {
1593 let db = Database::open(Config {
1594 shard_count: 1,
1595 data_dir: Some(data_dir.clone()),
1596 wal_sync: SyncPolicy::EveryWrite,
1597 });
1598 db.doc_create_collection("users", DocCollectionConfig::default())
1599 .unwrap();
1600 db.doc_set(
1601 "users",
1602 "user:1",
1603 &json!({"name": "Augustus", "city": "Accra"}),
1604 )
1605 .unwrap();
1606 }
1607
1608 {
1609 let db = Database::open(Config {
1610 shard_count: 1,
1611 data_dir: Some(data_dir.clone()),
1612 wal_sync: SyncPolicy::EveryWrite,
1613 });
1614 let doc = db
1615 .doc_get("users", "user:1", None)
1616 .expect("get should succeed")
1617 .expect("doc should exist");
1618 assert_eq!(doc["name"], "Augustus");
1619 assert_eq!(doc["city"], "Accra");
1620 }
1621 }
1622}