Skip to main content

kora_embedded/
lib.rs

1//! # kora-embedded
2//!
3//! Embeddable library mode for the Kōra cache engine.
4//!
5//! This crate provides [`Database`], a high-level API for using Kōra as an
6//! in-process library rather than a standalone network server. It wraps the
7//! same multi-threaded [`ShardEngine`](kora_core::shard::ShardEngine) that
8//! powers `kora-server`, but eliminates all network overhead: commands are
9//! serialised into the engine's internal `Command` enum and dispatched through
10//! per-shard channels directly from the calling thread.
11//!
12//! ## How it works
13//!
14//! Each [`Database`] instance owns a [`ShardEngine`](kora_core::shard::ShardEngine)
15//! with a configurable number of shard worker threads. Keys are hashed to a
16//! shard, and the corresponding command is sent over an `mpsc` channel to that
17//! shard's worker. The calling thread blocks on a `oneshot` channel until the
18//! worker replies, giving synchronous semantics with lock-free, shared-nothing
19//! execution on the data path.
20//!
21//! ## Embedded vs. server mode
22//!
23//! | Aspect | `kora-embedded` | `kora-server` |
24//! |--------|----------------|---------------|
25//! | Transport | Direct function calls | TCP / Unix socket (RESP2) |
26//! | Latency | Sub-microsecond dispatch | Network round-trip |
27//! | Deployment | Linked into your binary | Separate process |
28//! | Protocol | Rust types (`Command` / `CommandResponse`) | Wire-compatible RESP2 |
29//!
30//! ## Hybrid mode
31//!
32//! With the `server` Cargo feature enabled, [`Database::start_listener`] spawns
33//! a TCP server alongside the embedded instance, allowing external clients to
34//! connect while the host process retains direct API access.
35//!
36//! ## Quick start
37//!
38//! ```rust
39//! use kora_embedded::{Config, Database};
40//!
41//! let db = Database::open(Config::default());
42//! db.set("greeting", b"hello world");
43//! assert_eq!(db.get("greeting"), Some(b"hello world".to_vec()));
44//! ```
45
46#![warn(clippy::all)]
47
48use std::path::PathBuf;
49use std::sync::{Arc, Barrier};
50use std::time::Duration;
51
52use kora_core::command::{Command, CommandResponse};
53use kora_core::shard::{ShardEngine, ShardStore, WalRecord, WalWriter};
54use kora_doc::{
55    CollectionConfig, CollectionInfo, DictionaryInfo, DocEngine, DocError, DocMutation, IndexType,
56    InsertResult, SetResult, StorageInfo,
57};
58use kora_storage::shard_storage::ShardStorage;
59use kora_storage::wal::{SyncPolicy, WalEntry};
60use parking_lot::{Mutex, RwLock};
61use serde_json::Value;
62
63#[cfg(feature = "server")]
64use tokio::task::JoinHandle;
65
66/// Configuration for creating or updating a document collection.
67pub type DocCollectionConfig = CollectionConfig;
68/// Metadata snapshot for a document collection (document count, field count, etc.).
69pub type DocCollectionInfo = CollectionInfo;
70/// Outcome of a [`Database::doc_set`] call, indicating whether the document was created or replaced.
71pub type DocSetResult = SetResult;
72/// A single field-level mutation applied by [`Database::doc_update`].
73pub type DocUpdateMutation = DocMutation;
74/// Statistics about a collection's internal dictionary (term counts, memory usage).
75pub type DocDictionaryInfo = DictionaryInfo;
76/// Statistics about a collection's packed storage layer (byte counts, compaction state).
77pub type DocStorageInfo = StorageInfo;
78/// Result of a [`Database::doc_insert`] call with an auto-generated document ID.
79pub type DocInsertResult = InsertResult;
80
81/// Configuration for opening an embedded [`Database`].
82///
83/// The default configuration sets `shard_count` to the number of available
84/// hardware threads, which is a good starting point for most workloads.
85pub struct Config {
86    /// Number of shard worker threads to spawn.
87    ///
88    /// Each shard owns an independent key-space partition and runs on its own
89    /// OS thread, so this value controls both parallelism and memory partitioning.
90    pub shard_count: usize,
91    /// Optional data directory for persistence. When set, WAL and RDB snapshots
92    /// are stored under `{data_dir}/shard-{N}/`. Data is recovered automatically
93    /// on startup.
94    pub data_dir: Option<PathBuf>,
95    /// WAL sync policy (only used when `data_dir` is set).
96    pub wal_sync: SyncPolicy,
97}
98
99impl Default for Config {
100    fn default() -> Self {
101        Self {
102            shard_count: std::thread::available_parallelism()
103                .map(|n| n.get())
104                .unwrap_or(4),
105            data_dir: None,
106            wal_sync: SyncPolicy::EverySecond,
107        }
108    }
109}
110
111/// An embedded Kōra database instance.
112///
113/// `Database` is the primary entry point for the embedded API. It owns a
114/// multi-threaded [`ShardEngine`](kora_core::shard::ShardEngine) and a
115/// [`DocEngine`](kora_doc::DocEngine), exposing typed methods for key-value,
116/// list, hash, set, vector, and document operations.
117///
118/// All methods are safe to call from multiple threads simultaneously.
119/// Key-value operations route through per-shard channels and block the caller
120/// until the owning shard worker completes the command. Document operations
121/// are coordinated by a `RwLock` over the in-memory document engine.
122///
123/// # Examples
124///
125/// ```rust
126/// use kora_embedded::{Config, Database};
127/// use std::sync::Arc;
128///
129/// let db = Arc::new(Database::open(Config { shard_count: 4, ..Config::default() }));
130/// db.set("counter", b"0");
131/// db.incr("counter").unwrap();
132/// assert_eq!(db.get("counter"), Some(b"1".to_vec()));
133/// ```
134pub struct Database {
135    engine: Arc<ShardEngine>,
136    doc_engine: Arc<RwLock<DocEngine>>,
137    doc_wal: Option<Mutex<Box<dyn WalWriter>>>,
138}
139
140impl Database {
141    /// Open a new database with the given configuration.
142    ///
143    /// This spawns `config.shard_count` background worker threads that remain
144    /// alive for the lifetime of the returned `Database`.
145    #[allow(clippy::type_complexity)]
146    pub fn open(config: Config) -> Self {
147        let doc_engine = Arc::new(RwLock::new(DocEngine::new()));
148
149        let (engine, doc_wal) = match config.data_dir {
150            Some(ref data_dir) => {
151                let mut wal_writers: Vec<Option<Box<dyn WalWriter>>> =
152                    Vec::with_capacity(config.shard_count);
153                let mut recovery_fns: Vec<Box<dyn FnOnce(usize, &mut ShardStore) + Send>> =
154                    Vec::with_capacity(config.shard_count);
155                let barrier = Arc::new(Barrier::new(config.shard_count + 1));
156
157                for i in 0..config.shard_count {
158                    match ShardStorage::open_with_config(
159                        i as u16,
160                        data_dir,
161                        config.wal_sync,
162                        true,
163                        true,
164                        0,
165                    ) {
166                        Ok(storage) => {
167                            let doc_eng = doc_engine.clone();
168                            let shard_id = i as u16;
169                            let barrier_clone = barrier.clone();
170                            recovery_fns.push(Box::new(move |_idx, store| {
171                                recover_embedded_shard(shard_id, &storage, store, &doc_eng);
172                                barrier_clone.wait();
173                            }));
174                            let storage2 = ShardStorage::open_with_config(
175                                i as u16,
176                                data_dir,
177                                config.wal_sync,
178                                true,
179                                true,
180                                0,
181                            )
182                            .expect("failed to reopen shard storage for WAL writing");
183                            wal_writers.push(Some(Box::new(storage2)));
184                        }
185                        Err(e) => {
186                            tracing::error!("Failed to open shard {} storage: {}", i, e);
187                            wal_writers.push(None);
188                            let barrier_clone = barrier.clone();
189                            recovery_fns.push(Box::new(move |_, _| {
190                                barrier_clone.wait();
191                            }));
192                        }
193                    }
194                }
195
196                let doc_wal_writer: Option<Mutex<Box<dyn WalWriter>>> =
197                    match ShardStorage::open_with_config(
198                        0,
199                        data_dir,
200                        config.wal_sync,
201                        true,
202                        true,
203                        0,
204                    ) {
205                        Ok(storage) => Some(Mutex::new(Box::new(storage))),
206                        Err(e) => {
207                            tracing::error!("Failed to open doc WAL writer: {}", e);
208                            None
209                        }
210                    };
211
212                let engine = Arc::new(ShardEngine::new_with_recovery(
213                    config.shard_count,
214                    wal_writers,
215                    Some(recovery_fns),
216                ));
217                barrier.wait();
218
219                (engine, doc_wal_writer)
220            }
221            None => (Arc::new(ShardEngine::new(config.shard_count)), None),
222        };
223
224        Self {
225            engine,
226            doc_engine,
227            doc_wal,
228        }
229    }
230
231    /// Return a reference to the underlying [`ShardEngine`](kora_core::shard::ShardEngine).
232    pub fn engine(&self) -> &ShardEngine {
233        &self.engine
234    }
235
236    /// Return a shared handle to the underlying [`ShardEngine`](kora_core::shard::ShardEngine).
237    ///
238    /// Useful when integrating with components that need their own `Arc` clone,
239    /// such as hybrid server mode or custom command dispatch layers.
240    pub fn shared_engine(&self) -> Arc<ShardEngine> {
241        self.engine.clone()
242    }
243
244    /// Return a shared handle to the embedded [`DocEngine`](kora_doc::DocEngine).
245    ///
246    /// Callers are responsible for acquiring the inner `RwLock` appropriately.
247    #[must_use]
248    pub fn shared_doc_engine(&self) -> Arc<RwLock<DocEngine>> {
249        self.doc_engine.clone()
250    }
251
252    // ─── Document operations ─────────────────────────────────────
253
254    /// Create a document collection.
255    pub fn doc_create_collection(
256        &self,
257        collection: &str,
258        config: DocCollectionConfig,
259    ) -> Result<u16, DocError> {
260        self.doc_engine
261            .write()
262            .create_collection(collection, config)
263    }
264
265    /// Drop a document collection and all documents in it.
266    pub fn doc_drop_collection(&self, collection: &str) -> bool {
267        self.doc_engine.write().drop_collection(collection)
268    }
269
270    /// Return collection metadata, or `None` if the collection does not exist.
271    #[must_use]
272    pub fn doc_collection_info(&self, collection: &str) -> Option<DocCollectionInfo> {
273        self.doc_engine.read().collection_info(collection)
274    }
275
276    /// Return dictionary statistics for a collection.
277    pub fn doc_dictionary_info(&self, collection: &str) -> Result<DocDictionaryInfo, DocError> {
278        self.doc_engine.read().dictionary_info(collection)
279    }
280
281    /// Return packed storage statistics for a collection.
282    pub fn doc_storage_info(&self, collection: &str) -> Result<DocStorageInfo, DocError> {
283        self.doc_engine.read().storage_info(collection)
284    }
285
286    /// Insert or replace one JSON document.
287    pub fn doc_set(
288        &self,
289        collection: &str,
290        doc_id: &str,
291        json: &Value,
292    ) -> Result<DocSetResult, DocError> {
293        let result = self.doc_engine.write().set(collection, doc_id, json)?;
294        self.append_doc_wal(WalRecord::DocSet {
295            collection: collection.as_bytes().to_vec(),
296            doc_id: doc_id.as_bytes().to_vec(),
297            json: serde_json::to_vec(json).unwrap_or_default(),
298        });
299        Ok(result)
300    }
301
302    /// Insert a document with an auto-generated ID.
303    ///
304    /// Returns the generated ID and insert metadata.
305    pub fn doc_insert(&self, collection: &str, json: &Value) -> Result<DocInsertResult, DocError> {
306        let result = self.doc_engine.write().insert(collection, json)?;
307        self.append_doc_wal(WalRecord::DocSet {
308            collection: collection.as_bytes().to_vec(),
309            doc_id: result.id.as_bytes().to_vec(),
310            json: serde_json::to_vec(json).unwrap_or_default(),
311        });
312        Ok(result)
313    }
314
315    /// Insert or replace multiple JSON documents in one collection.
316    pub fn doc_mset(
317        &self,
318        collection: &str,
319        entries: &[(&str, &Value)],
320    ) -> Result<Vec<DocSetResult>, DocError> {
321        let mut engine = self.doc_engine.write();
322        let results: Result<Vec<DocSetResult>, DocError> = entries
323            .iter()
324            .map(|(doc_id, json)| engine.set(collection, doc_id, json))
325            .collect();
326        drop(engine);
327        if let Ok(ref _res) = results {
328            for (doc_id, json) in entries {
329                self.append_doc_wal(WalRecord::DocSet {
330                    collection: collection.as_bytes().to_vec(),
331                    doc_id: doc_id.as_bytes().to_vec(),
332                    json: serde_json::to_vec(json).unwrap_or_default(),
333                });
334            }
335        }
336        results
337    }
338
339    /// Read one JSON document, optionally projecting a subset of fields.
340    ///
341    /// `projection` accepts dot-separated paths (e.g. `"address.city"`) to return
342    /// only the requested fields. Pass `None` to return the full document.
343    pub fn doc_get(
344        &self,
345        collection: &str,
346        doc_id: &str,
347        projection: Option<&[&str]>,
348    ) -> Result<Option<Value>, DocError> {
349        self.doc_engine.read().get(collection, doc_id, projection)
350    }
351
352    /// Read multiple JSON documents from a collection in one call.
353    ///
354    /// Missing documents appear as `None` in the returned vector, preserving
355    /// positional correspondence with `doc_ids`.
356    pub fn doc_mget(
357        &self,
358        collection: &str,
359        doc_ids: &[&str],
360    ) -> Result<Vec<Option<Value>>, DocError> {
361        let engine = self.doc_engine.read();
362        doc_ids
363            .iter()
364            .map(|doc_id| engine.get(collection, doc_id, None))
365            .collect()
366    }
367
368    /// Apply one or more field-level mutations to an existing document.
369    ///
370    /// Returns `Ok(true)` when the document existed and was rewritten, `Ok(false)` when the
371    /// target document is missing.
372    pub fn doc_update(
373        &self,
374        collection: &str,
375        doc_id: &str,
376        mutations: &[DocUpdateMutation],
377    ) -> Result<bool, DocError> {
378        let mut engine = self.doc_engine.write();
379        let updated = engine.update(collection, doc_id, mutations)?;
380        if updated {
381            if let Ok(Some(doc)) = engine.get(collection, doc_id, None) {
382                drop(engine);
383                self.append_doc_wal(WalRecord::DocSet {
384                    collection: collection.as_bytes().to_vec(),
385                    doc_id: doc_id.as_bytes().to_vec(),
386                    json: serde_json::to_vec(&doc).unwrap_or_default(),
387                });
388            }
389        }
390        Ok(updated)
391    }
392
393    /// Delete a document. Returns `Ok(true)` if the document existed.
394    pub fn doc_del(&self, collection: &str, doc_id: &str) -> Result<bool, DocError> {
395        let deleted = self.doc_engine.write().del(collection, doc_id)?;
396        if deleted {
397            self.append_doc_wal(WalRecord::DocDel {
398                collection: collection.as_bytes().to_vec(),
399                doc_id: doc_id.as_bytes().to_vec(),
400            });
401        }
402        Ok(deleted)
403    }
404
405    /// Check whether a document exists in a collection.
406    pub fn doc_exists(&self, collection: &str, doc_id: &str) -> Result<bool, DocError> {
407        self.doc_engine.read().exists(collection, doc_id)
408    }
409
410    /// Create a secondary index on a collection field.
411    pub fn doc_create_index(
412        &self,
413        collection: &str,
414        field: &str,
415        index_type: &str,
416    ) -> Result<(), DocError> {
417        let idx_type = parse_index_type_str(index_type)?;
418        self.doc_engine
419            .write()
420            .create_index(collection, field, idx_type)
421    }
422
423    /// Drop a secondary index from a collection field.
424    pub fn doc_drop_index(&self, collection: &str, field: &str) -> Result<(), DocError> {
425        self.doc_engine.write().drop_index(collection, field)
426    }
427
428    /// List all secondary indexes for a collection.
429    ///
430    /// Returns `(field_path, index_type_name)` pairs where `index_type_name`
431    /// is one of `"hash"`, `"sorted"`, `"array"`, or `"unique"`.
432    pub fn doc_indexes(&self, collection: &str) -> Result<Vec<(String, String)>, DocError> {
433        let indexes = self.doc_engine.read().indexes(collection)?;
434        Ok(indexes
435            .into_iter()
436            .map(|(path, idx_type)| {
437                let type_name = match idx_type {
438                    IndexType::Hash => "hash",
439                    IndexType::Sorted => "sorted",
440                    IndexType::Array => "array",
441                    IndexType::Unique => "unique",
442                };
443                (path, type_name.to_string())
444            })
445            .collect())
446    }
447
448    /// Find documents matching a WHERE clause with optional projection, limit, offset, and sorting.
449    #[allow(clippy::too_many_arguments)]
450    pub fn doc_find(
451        &self,
452        collection: &str,
453        where_clause: &str,
454        projection: Option<&[&str]>,
455        limit: Option<usize>,
456        offset: usize,
457        order_by: Option<&str>,
458        order_desc: bool,
459    ) -> Result<Vec<Value>, DocError> {
460        self.doc_engine.read().find(
461            collection,
462            where_clause,
463            projection,
464            limit,
465            offset,
466            order_by,
467            order_desc,
468        )
469    }
470
471    /// Count documents matching a WHERE clause.
472    pub fn doc_count(&self, collection: &str, where_clause: &str) -> Result<u64, DocError> {
473        self.doc_engine.read().count(collection, where_clause)
474    }
475
476    // ─── String operations ───────────────────────────────────────
477
478    /// Return the value stored at `key`, or `None` if the key does not exist.
479    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
480        match self.engine.dispatch_blocking(Command::Get {
481            key: key.as_bytes().to_vec(),
482        }) {
483            CommandResponse::BulkString(v) => Some(v),
484            _ => None,
485        }
486    }
487
488    /// Store a key-value pair, overwriting any existing value.
489    pub fn set(&self, key: &str, value: &[u8]) {
490        self.engine.dispatch_blocking(Command::Set {
491            key: key.as_bytes().to_vec(),
492            value: value.to_vec(),
493            ex: None,
494            px: None,
495            nx: false,
496            xx: false,
497        });
498    }
499
500    /// Store a key-value pair that expires after `ttl`.
501    pub fn set_ex(&self, key: &str, value: &[u8], ttl: Duration) {
502        self.engine.dispatch_blocking(Command::Set {
503            key: key.as_bytes().to_vec(),
504            value: value.to_vec(),
505            ex: Some(ttl.as_secs()),
506            px: None,
507            nx: false,
508            xx: false,
509        });
510    }
511
512    /// Delete a key. Returns `true` if the key existed.
513    pub fn del(&self, key: &str) -> bool {
514        matches!(
515            self.engine.dispatch_blocking(Command::Del {
516                keys: vec![key.as_bytes().to_vec()],
517            }),
518            CommandResponse::Integer(n) if n > 0
519        )
520    }
521
522    /// Check whether `key` exists in the store.
523    pub fn exists(&self, key: &str) -> bool {
524        matches!(
525            self.engine.dispatch_blocking(Command::Exists {
526                keys: vec![key.as_bytes().to_vec()],
527            }),
528            CommandResponse::Integer(n) if n > 0
529        )
530    }
531
532    /// Atomically increment a key's integer value by 1, returning the new value.
533    pub fn incr(&self, key: &str) -> Result<i64, String> {
534        match self.engine.dispatch_blocking(Command::Incr {
535            key: key.as_bytes().to_vec(),
536        }) {
537            CommandResponse::Integer(n) => Ok(n),
538            CommandResponse::Error(e) => Err(e),
539            _ => Err("unexpected response".into()),
540        }
541    }
542
543    /// Atomically set `key` to `value` and return the previous value, if any.
544    pub fn getset(&self, key: &str, value: &[u8]) -> Option<Vec<u8>> {
545        match self.engine.dispatch_blocking(Command::GetSet {
546            key: key.as_bytes().to_vec(),
547            value: value.to_vec(),
548        }) {
549            CommandResponse::BulkString(v) => Some(v),
550            _ => None,
551        }
552    }
553
554    /// Append `value` to the string stored at `key`, returning the new byte length.
555    pub fn append(&self, key: &str, value: &[u8]) -> i64 {
556        match self.engine.dispatch_blocking(Command::Append {
557            key: key.as_bytes().to_vec(),
558            value: value.to_vec(),
559        }) {
560            CommandResponse::Integer(n) => n,
561            _ => 0,
562        }
563    }
564
565    /// Get the length of the string value stored at key.
566    pub fn strlen(&self, key: &str) -> i64 {
567        match self.engine.dispatch_blocking(Command::Strlen {
568            key: key.as_bytes().to_vec(),
569        }) {
570            CommandResponse::Integer(n) => n,
571            _ => 0,
572        }
573    }
574
575    /// Atomically decrement a key's integer value by 1, returning the new value.
576    pub fn decr(&self, key: &str) -> Result<i64, String> {
577        match self.engine.dispatch_blocking(Command::Decr {
578            key: key.as_bytes().to_vec(),
579        }) {
580            CommandResponse::Integer(n) => Ok(n),
581            CommandResponse::Error(e) => Err(e),
582            _ => Err("unexpected response".into()),
583        }
584    }
585
586    /// Atomically increment a key's integer value by `delta`, returning the new value.
587    pub fn incrby(&self, key: &str, delta: i64) -> Result<i64, String> {
588        match self.engine.dispatch_blocking(Command::IncrBy {
589            key: key.as_bytes().to_vec(),
590            delta,
591        }) {
592            CommandResponse::Integer(n) => Ok(n),
593            CommandResponse::Error(e) => Err(e),
594            _ => Err("unexpected response".into()),
595        }
596    }
597
598    /// Atomically decrement a key's integer value by `delta`, returning the new value.
599    pub fn decrby(&self, key: &str, delta: i64) -> Result<i64, String> {
600        match self.engine.dispatch_blocking(Command::DecrBy {
601            key: key.as_bytes().to_vec(),
602            delta,
603        }) {
604            CommandResponse::Integer(n) => Ok(n),
605            CommandResponse::Error(e) => Err(e),
606            _ => Err("unexpected response".into()),
607        }
608    }
609
610    /// Return the values of multiple keys in a single call.
611    ///
612    /// Missing keys appear as `None`, preserving positional correspondence with `keys`.
613    pub fn mget(&self, keys: &[&str]) -> Vec<Option<Vec<u8>>> {
614        let cmd_keys: Vec<Vec<u8>> = keys.iter().map(|k| k.as_bytes().to_vec()).collect();
615        match self
616            .engine
617            .dispatch_blocking(Command::MGet { keys: cmd_keys })
618        {
619            CommandResponse::Array(items) => items
620                .into_iter()
621                .map(|r| match r {
622                    CommandResponse::BulkString(v) => Some(v),
623                    _ => None,
624                })
625                .collect(),
626            _ => vec![None; keys.len()],
627        }
628    }
629
630    /// Store multiple key-value pairs in a single call.
631    pub fn mset(&self, entries: &[(&str, &[u8])]) {
632        let cmd_entries: Vec<(Vec<u8>, Vec<u8>)> = entries
633            .iter()
634            .map(|(k, v)| (k.as_bytes().to_vec(), v.to_vec()))
635            .collect();
636        self.engine.dispatch_blocking(Command::MSet {
637            entries: cmd_entries,
638        });
639    }
640
641    /// Store a key-value pair only if the key does not already exist.
642    ///
643    /// Returns `true` if the key was set, `false` if it already existed.
644    pub fn setnx(&self, key: &str, value: &[u8]) -> bool {
645        matches!(
646            self.engine.dispatch_blocking(Command::SetNx {
647                key: key.as_bytes().to_vec(),
648                value: value.to_vec(),
649            }),
650            CommandResponse::Integer(1)
651        )
652    }
653
654    /// Set a time-to-live on `key`. Returns `true` if the key exists.
655    pub fn expire(&self, key: &str, seconds: u64) -> bool {
656        matches!(
657            self.engine.dispatch_blocking(Command::Expire {
658                key: key.as_bytes().to_vec(),
659                seconds,
660            }),
661            CommandResponse::Integer(1)
662        )
663    }
664
665    /// Remove the time-to-live on `key`, making it persistent.
666    ///
667    /// Returns `true` if the key existed and had a TTL.
668    pub fn persist(&self, key: &str) -> bool {
669        matches!(
670            self.engine.dispatch_blocking(Command::Persist {
671                key: key.as_bytes().to_vec(),
672            }),
673            CommandResponse::Integer(1)
674        )
675    }
676
677    /// Return the remaining time-to-live (in seconds) for `key`.
678    ///
679    /// Returns `None` if the key does not exist or has no TTL set.
680    pub fn ttl(&self, key: &str) -> Option<i64> {
681        match self.engine.dispatch_blocking(Command::Ttl {
682            key: key.as_bytes().to_vec(),
683        }) {
684            CommandResponse::Integer(n) if n >= 0 => Some(n),
685            _ => None,
686        }
687    }
688
689    /// Return the data type of the value stored at `key` (e.g. `"string"`, `"list"`, `"none"`).
690    pub fn key_type(&self, key: &str) -> String {
691        match self.engine.dispatch_blocking(Command::Type {
692            key: key.as_bytes().to_vec(),
693        }) {
694            CommandResponse::SimpleString(s) => s,
695            _ => "none".into(),
696        }
697    }
698
699    /// Find all keys matching a glob pattern.
700    pub fn keys(&self, pattern: &str) -> Vec<Vec<u8>> {
701        match self.engine.dispatch_blocking(Command::Keys {
702            pattern: pattern.to_string(),
703        }) {
704            CommandResponse::Array(items) => items
705                .into_iter()
706                .filter_map(|r| match r {
707                    CommandResponse::BulkString(v) => Some(v),
708                    _ => None,
709                })
710                .collect(),
711            _ => vec![],
712        }
713    }
714
715    // ─── List operations ─────────────────────────────────────────
716
717    /// Prepend one or more values to a list, returning the new length.
718    pub fn lpush(&self, key: &str, values: &[&[u8]]) -> i64 {
719        match self.engine.dispatch_blocking(Command::LPush {
720            key: key.as_bytes().to_vec(),
721            values: values.iter().map(|v| v.to_vec()).collect(),
722        }) {
723            CommandResponse::Integer(n) => n,
724            _ => 0,
725        }
726    }
727
728    /// Append one or more values to a list, returning the new length.
729    pub fn rpush(&self, key: &str, values: &[&[u8]]) -> i64 {
730        match self.engine.dispatch_blocking(Command::RPush {
731            key: key.as_bytes().to_vec(),
732            values: values.iter().map(|v| v.to_vec()).collect(),
733        }) {
734            CommandResponse::Integer(n) => n,
735            _ => 0,
736        }
737    }
738
739    /// Return a contiguous range of elements from a list.
740    ///
741    /// Negative indices count from the end (`-1` is the last element).
742    pub fn lrange(&self, key: &str, start: i64, stop: i64) -> Vec<Vec<u8>> {
743        match self.engine.dispatch_blocking(Command::LRange {
744            key: key.as_bytes().to_vec(),
745            start,
746            stop,
747        }) {
748            CommandResponse::Array(items) => items
749                .into_iter()
750                .filter_map(|r| match r {
751                    CommandResponse::BulkString(v) => Some(v),
752                    _ => None,
753                })
754                .collect(),
755            _ => vec![],
756        }
757    }
758
759    /// Remove and return the first element of a list.
760    pub fn lpop(&self, key: &str) -> Option<Vec<u8>> {
761        match self.engine.dispatch_blocking(Command::LPop {
762            key: key.as_bytes().to_vec(),
763        }) {
764            CommandResponse::BulkString(v) => Some(v),
765            _ => None,
766        }
767    }
768
769    /// Remove and return the last element of a list.
770    pub fn rpop(&self, key: &str) -> Option<Vec<u8>> {
771        match self.engine.dispatch_blocking(Command::RPop {
772            key: key.as_bytes().to_vec(),
773        }) {
774            CommandResponse::BulkString(v) => Some(v),
775            _ => None,
776        }
777    }
778
779    /// Return the number of elements in a list.
780    pub fn llen(&self, key: &str) -> i64 {
781        match self.engine.dispatch_blocking(Command::LLen {
782            key: key.as_bytes().to_vec(),
783        }) {
784            CommandResponse::Integer(n) => n,
785            _ => 0,
786        }
787    }
788
789    /// Return the element at `index` in a list, or `None` if out of range.
790    pub fn lindex(&self, key: &str, index: i64) -> Option<Vec<u8>> {
791        match self.engine.dispatch_blocking(Command::LIndex {
792            key: key.as_bytes().to_vec(),
793            index,
794        }) {
795            CommandResponse::BulkString(v) => Some(v),
796            _ => None,
797        }
798    }
799
800    // ─── Hash operations ─────────────────────────────────────────
801
802    /// Set a field in a hash, creating the hash if it does not exist.
803    pub fn hset(&self, key: &str, field: &str, value: &[u8]) {
804        self.engine.dispatch_blocking(Command::HSet {
805            key: key.as_bytes().to_vec(),
806            fields: vec![(field.as_bytes().to_vec(), value.to_vec())],
807        });
808    }
809
810    /// Return the value of a hash field, or `None` if the field or hash does not exist.
811    pub fn hget(&self, key: &str, field: &str) -> Option<Vec<u8>> {
812        match self.engine.dispatch_blocking(Command::HGet {
813            key: key.as_bytes().to_vec(),
814            field: field.as_bytes().to_vec(),
815        }) {
816            CommandResponse::BulkString(v) => Some(v),
817            _ => None,
818        }
819    }
820
821    /// Remove one or more fields from a hash, returning the number of fields removed.
822    pub fn hdel(&self, key: &str, fields: &[&str]) -> i64 {
823        match self.engine.dispatch_blocking(Command::HDel {
824            key: key.as_bytes().to_vec(),
825            fields: fields.iter().map(|f| f.as_bytes().to_vec()).collect(),
826        }) {
827            CommandResponse::Integer(n) => n,
828            _ => 0,
829        }
830    }
831
832    /// Return all field-value pairs from a hash.
833    pub fn hgetall(&self, key: &str) -> Vec<(Vec<u8>, Vec<u8>)> {
834        match self.engine.dispatch_blocking(Command::HGetAll {
835            key: key.as_bytes().to_vec(),
836        }) {
837            CommandResponse::Array(items) => {
838                let mut result = Vec::new();
839                let mut iter = items.into_iter();
840                while let (
841                    Some(CommandResponse::BulkString(k)),
842                    Some(CommandResponse::BulkString(v)),
843                ) = (iter.next(), iter.next())
844                {
845                    result.push((k, v));
846                }
847                result
848            }
849            _ => vec![],
850        }
851    }
852
853    /// Return the number of fields in a hash.
854    pub fn hlen(&self, key: &str) -> i64 {
855        match self.engine.dispatch_blocking(Command::HLen {
856            key: key.as_bytes().to_vec(),
857        }) {
858            CommandResponse::Integer(n) => n,
859            _ => 0,
860        }
861    }
862
863    /// Check whether a field exists in a hash.
864    pub fn hexists(&self, key: &str, field: &str) -> bool {
865        matches!(
866            self.engine.dispatch_blocking(Command::HExists {
867                key: key.as_bytes().to_vec(),
868                field: field.as_bytes().to_vec(),
869            }),
870            CommandResponse::Integer(1)
871        )
872    }
873
874    /// Atomically increment a hash field's integer value by `delta`, returning the new value.
875    pub fn hincrby(&self, key: &str, field: &str, delta: i64) -> Result<i64, String> {
876        match self.engine.dispatch_blocking(Command::HIncrBy {
877            key: key.as_bytes().to_vec(),
878            field: field.as_bytes().to_vec(),
879            delta,
880        }) {
881            CommandResponse::Integer(n) => Ok(n),
882            CommandResponse::Error(e) => Err(e),
883            _ => Err("unexpected response".into()),
884        }
885    }
886
887    // ─── Set operations ──────────────────────────────────────────
888
889    /// Add one or more members to a set, returning the number of new members added.
890    pub fn sadd(&self, key: &str, members: &[&[u8]]) -> i64 {
891        match self.engine.dispatch_blocking(Command::SAdd {
892            key: key.as_bytes().to_vec(),
893            members: members.iter().map(|m| m.to_vec()).collect(),
894        }) {
895            CommandResponse::Integer(n) => n,
896            _ => 0,
897        }
898    }
899
900    /// Return all members of a set.
901    pub fn smembers(&self, key: &str) -> Vec<Vec<u8>> {
902        match self.engine.dispatch_blocking(Command::SMembers {
903            key: key.as_bytes().to_vec(),
904        }) {
905            CommandResponse::Array(items) => items
906                .into_iter()
907                .filter_map(|r| match r {
908                    CommandResponse::BulkString(v) => Some(v),
909                    _ => None,
910                })
911                .collect(),
912            _ => vec![],
913        }
914    }
915
916    /// Remove one or more members from a set, returning the number of members removed.
917    pub fn srem(&self, key: &str, members: &[&[u8]]) -> i64 {
918        match self.engine.dispatch_blocking(Command::SRem {
919            key: key.as_bytes().to_vec(),
920            members: members.iter().map(|m| m.to_vec()).collect(),
921        }) {
922            CommandResponse::Integer(n) => n,
923            _ => 0,
924        }
925    }
926
927    /// Check whether `member` belongs to the set stored at `key`.
928    pub fn sismember(&self, key: &str, member: &[u8]) -> bool {
929        matches!(
930            self.engine.dispatch_blocking(Command::SIsMember {
931                key: key.as_bytes().to_vec(),
932                member: member.to_vec(),
933            }),
934            CommandResponse::Integer(1)
935        )
936    }
937
938    /// Return the number of members in a set.
939    pub fn scard(&self, key: &str) -> i64 {
940        match self.engine.dispatch_blocking(Command::SCard {
941            key: key.as_bytes().to_vec(),
942        }) {
943            CommandResponse::Integer(n) => n,
944            _ => 0,
945        }
946    }
947
948    // ─── Server operations ───────────────────────────────────────
949
950    /// Return the total number of keys across all shards.
951    pub fn db_size(&self) -> i64 {
952        match self.engine.dispatch_blocking(Command::DbSize) {
953            CommandResponse::Integer(n) => n,
954            _ => 0,
955        }
956    }
957
958    /// Remove all keys from every shard.
959    pub fn flush_db(&self) {
960        self.engine.dispatch_blocking(Command::FlushDb);
961    }
962
963    // ─── Vector operations ──────────────────────────────────────
964
965    /// Insert a vector into a named index, returning the vector ID.
966    ///
967    /// Creates the index if it does not exist.
968    pub fn vector_set(&self, index: &str, dim: usize, vector: &[f32]) -> Result<u64, String> {
969        match self.engine.dispatch_blocking(Command::VecSet {
970            key: index.as_bytes().to_vec(),
971            dimensions: dim,
972            vector: vector.to_vec(),
973        }) {
974            CommandResponse::Integer(id) => Ok(id as u64),
975            CommandResponse::Error(e) => Err(e),
976            _ => Err("unexpected response".into()),
977        }
978    }
979
980    /// Search for the K nearest neighbors of a query vector.
981    ///
982    /// Returns a list of `(id, distance)` pairs sorted by distance.
983    pub fn vector_search(
984        &self,
985        index: &str,
986        query: &[f32],
987        k: usize,
988    ) -> Result<Vec<(u64, f32)>, String> {
989        match self.engine.dispatch_blocking(Command::VecQuery {
990            key: index.as_bytes().to_vec(),
991            k,
992            vector: query.to_vec(),
993        }) {
994            CommandResponse::Array(items) => {
995                let mut results = Vec::with_capacity(items.len());
996                for item in items {
997                    if let CommandResponse::Array(pair) = item {
998                        if pair.len() == 2 {
999                            if let (
1000                                CommandResponse::Integer(id),
1001                                CommandResponse::BulkString(dist_bytes),
1002                            ) = (&pair[0], &pair[1])
1003                            {
1004                                let dist: f32 = String::from_utf8_lossy(dist_bytes)
1005                                    .parse()
1006                                    .unwrap_or(f32::MAX);
1007                                results.push((*id as u64, dist));
1008                            }
1009                        }
1010                    }
1011                }
1012                Ok(results)
1013            }
1014            CommandResponse::Error(e) => Err(e),
1015            _ => Err("unexpected response".into()),
1016        }
1017    }
1018
1019    /// Delete an entire vector index. Returns true if it existed.
1020    pub fn vector_del(&self, index: &str) -> Result<bool, String> {
1021        match self.engine.dispatch_blocking(Command::VecDel {
1022            key: index.as_bytes().to_vec(),
1023        }) {
1024            CommandResponse::Integer(n) => Ok(n > 0),
1025            CommandResponse::Error(e) => Err(e),
1026            _ => Err("unexpected response".into()),
1027        }
1028    }
1029
1030    fn append_doc_wal(&self, record: WalRecord) {
1031        if let Some(ref wal) = self.doc_wal {
1032            wal.lock().append(&record);
1033        }
1034    }
1035
1036    // ─── Hybrid mode ────────────────────────────────────────────
1037
1038    /// Start a TCP listener on `addr` for hybrid embedded + network mode.
1039    ///
1040    /// Returns a `JoinHandle` for the background server task and a
1041    /// `watch::Sender` that shuts the server down when `true` is sent.
1042    ///
1043    /// **Note:** the server creates its own shard stores -- data is not shared
1044    /// with the embedded `Database` key-value store. This is intended for
1045    /// scenarios where external clients need independent access.
1046    ///
1047    /// Requires the `server` Cargo feature.
1048    #[cfg(feature = "server")]
1049    pub fn start_listener(
1050        &self,
1051        addr: &str,
1052    ) -> Result<(JoinHandle<()>, tokio::sync::watch::Sender<bool>), String> {
1053        let config = kora_server::ServerConfig {
1054            bind_address: addr.to_string(),
1055            worker_count: self.engine.shard_count(),
1056            ..Default::default()
1057        };
1058        let server = kora_server::KoraServer::new(config);
1059        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1060
1061        let handle = tokio::spawn(async move {
1062            if let Err(e) = server.run(shutdown_rx).await {
1063                tracing::error!("Hybrid server error: {}", e);
1064            }
1065        });
1066
1067        Ok((handle, shutdown_tx))
1068    }
1069}
1070
1071fn parse_index_type_str(raw: &str) -> Result<IndexType, DocError> {
1072    match raw.to_ascii_lowercase().as_str() {
1073        "hash" => Ok(IndexType::Hash),
1074        "sorted" => Ok(IndexType::Sorted),
1075        "array" => Ok(IndexType::Array),
1076        "unique" => Ok(IndexType::Unique),
1077        _ => Err(DocError::InvalidMutation(format!(
1078            "unknown index type '{}' (expected hash|sorted|array|unique)",
1079            raw
1080        ))),
1081    }
1082}
1083
1084fn recover_embedded_shard(
1085    shard_id: u16,
1086    storage: &ShardStorage,
1087    store: &mut ShardStore,
1088    doc_engine: &RwLock<DocEngine>,
1089) {
1090    match storage.rdb_load() {
1091        Ok(entries) if !entries.is_empty() => {
1092            for entry in &entries {
1093                let cmd = rdb_entry_to_restore_command(entry);
1094                store.execute(cmd);
1095            }
1096            tracing::info!(
1097                "Shard {} recovered {} entries from RDB",
1098                shard_id,
1099                entries.len()
1100            );
1101        }
1102        Ok(_) => {}
1103        Err(e) => tracing::error!("Shard {} RDB load failed: {}", shard_id, e),
1104    }
1105
1106    match storage.wal_replay(|entry| match entry {
1107        WalEntry::DocSet {
1108            collection,
1109            doc_id,
1110            json,
1111        } => {
1112            let col = String::from_utf8_lossy(&collection);
1113            let did = String::from_utf8_lossy(&doc_id);
1114            if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&json) {
1115                let mut engine = doc_engine.write();
1116                let _ = engine.create_collection(&col, Default::default());
1117                let _ = engine.set(&col, &did, &value);
1118            }
1119        }
1120        WalEntry::DocDel { collection, doc_id } => {
1121            let col = String::from_utf8_lossy(&collection);
1122            let did = String::from_utf8_lossy(&doc_id);
1123            let mut engine = doc_engine.write();
1124            let _ = engine.del(&col, &did);
1125        }
1126        WalEntry::VecSet {
1127            key,
1128            dimensions,
1129            vector,
1130        } => {
1131            let floats: Vec<f32> = vector
1132                .chunks_exact(4)
1133                .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1134                .collect();
1135            store.execute(Command::VecSet {
1136                key,
1137                dimensions,
1138                vector: floats,
1139            });
1140        }
1141        WalEntry::VecDel { key } => {
1142            store.execute(Command::VecDel { key });
1143        }
1144        other => {
1145            store.execute(wal_entry_to_restore_command(other));
1146        }
1147    }) {
1148        Ok(count) if count > 0 => {
1149            tracing::info!("Shard {} replayed {} WAL entries", shard_id, count);
1150        }
1151        Ok(_) => {}
1152        Err(e) => tracing::error!("Shard {} WAL replay failed: {}", shard_id, e),
1153    }
1154}
1155
1156fn rdb_entry_to_restore_command(entry: &kora_storage::rdb::RdbEntry) -> Command {
1157    use kora_storage::rdb::RdbValue;
1158    match &entry.value {
1159        RdbValue::String(v) => Command::Set {
1160            key: entry.key.clone(),
1161            value: v.clone(),
1162            ex: None,
1163            px: entry.ttl_ms,
1164            nx: false,
1165            xx: false,
1166        },
1167        RdbValue::Int(n) => Command::Set {
1168            key: entry.key.clone(),
1169            value: n.to_string().into_bytes(),
1170            ex: None,
1171            px: entry.ttl_ms,
1172            nx: false,
1173            xx: false,
1174        },
1175        RdbValue::List(items) => Command::RPush {
1176            key: entry.key.clone(),
1177            values: items.clone(),
1178        },
1179        RdbValue::Set(members) => Command::SAdd {
1180            key: entry.key.clone(),
1181            members: members.clone(),
1182        },
1183        RdbValue::Hash(fields) => Command::HSet {
1184            key: entry.key.clone(),
1185            fields: fields.clone(),
1186        },
1187    }
1188}
1189
1190fn wal_entry_to_restore_command(entry: WalEntry) -> Command {
1191    match entry {
1192        WalEntry::Set { key, value, ttl_ms } => Command::Set {
1193            key,
1194            value,
1195            ex: None,
1196            px: ttl_ms,
1197            nx: false,
1198            xx: false,
1199        },
1200        WalEntry::Del { key } => Command::Del { keys: vec![key] },
1201        WalEntry::Expire { key, ttl_ms } => Command::PExpire {
1202            key,
1203            millis: ttl_ms,
1204        },
1205        WalEntry::LPush { key, values } => Command::LPush { key, values },
1206        WalEntry::RPush { key, values } => Command::RPush { key, values },
1207        WalEntry::HSet { key, fields } => Command::HSet { key, fields },
1208        WalEntry::SAdd { key, members } => Command::SAdd { key, members },
1209        WalEntry::FlushDb => Command::FlushDb,
1210        _ => unreachable!("doc/vec entries handled separately"),
1211    }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use serde_json::json;
1217
1218    use super::*;
1219
1220    #[test]
1221    fn test_basic_set_get() {
1222        let db = Database::open(Config {
1223            shard_count: 2,
1224            ..Config::default()
1225        });
1226        db.set("hello", b"world");
1227        assert_eq!(db.get("hello"), Some(b"world".to_vec()));
1228        assert_eq!(db.get("nonexistent"), None);
1229    }
1230
1231    #[test]
1232    fn test_del() {
1233        let db = Database::open(Config {
1234            shard_count: 2,
1235            ..Config::default()
1236        });
1237        db.set("k", b"v");
1238        assert!(db.del("k"));
1239        assert!(!db.del("k"));
1240        assert_eq!(db.get("k"), None);
1241    }
1242
1243    #[test]
1244    fn test_incr() {
1245        let db = Database::open(Config {
1246            shard_count: 2,
1247            ..Config::default()
1248        });
1249        assert_eq!(db.incr("counter").unwrap(), 1);
1250        assert_eq!(db.incr("counter").unwrap(), 2);
1251        assert_eq!(db.incr("counter").unwrap(), 3);
1252    }
1253
1254    #[test]
1255    fn test_list_operations() {
1256        let db = Database::open(Config {
1257            shard_count: 2,
1258            ..Config::default()
1259        });
1260        db.rpush("list", &[b"a", b"b", b"c"]);
1261        let items = db.lrange("list", 0, -1);
1262        assert_eq!(items, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
1263    }
1264
1265    #[test]
1266    fn test_hash_operations() {
1267        let db = Database::open(Config {
1268            shard_count: 2,
1269            ..Config::default()
1270        });
1271        db.hset("user", "name", b"Alice");
1272        assert_eq!(db.hget("user", "name"), Some(b"Alice".to_vec()));
1273        assert_eq!(db.hget("user", "age"), None);
1274    }
1275
1276    #[test]
1277    fn test_set_operations() {
1278        let db = Database::open(Config {
1279            shard_count: 2,
1280            ..Config::default()
1281        });
1282        db.sadd("tags", &[b"rust", b"cache", b"rust"]);
1283        let members = db.smembers("tags");
1284        assert_eq!(members.len(), 2); // "rust" deduplicated
1285    }
1286
1287    #[test]
1288    fn test_db_size_and_flush() {
1289        let db = Database::open(Config {
1290            shard_count: 2,
1291            ..Config::default()
1292        });
1293        db.set("a", b"1");
1294        db.set("b", b"2");
1295        assert_eq!(db.db_size(), 2);
1296        db.flush_db();
1297        assert_eq!(db.db_size(), 0);
1298    }
1299
1300    #[test]
1301    fn test_concurrent_access() {
1302        let db = std::sync::Arc::new(Database::open(Config {
1303            shard_count: 4,
1304            ..Config::default()
1305        }));
1306        let mut handles = vec![];
1307        for t in 0..4 {
1308            let db = db.clone();
1309            handles.push(std::thread::spawn(move || {
1310                for i in 0..100 {
1311                    let key = format!("t{}:k{}", t, i);
1312                    let val = format!("v{}", i);
1313                    db.set(&key, val.as_bytes());
1314                    assert_eq!(db.get(&key), Some(val.into_bytes()));
1315                }
1316            }));
1317        }
1318        for h in handles {
1319            h.join().unwrap();
1320        }
1321    }
1322
1323    #[test]
1324    fn test_vector_set_search_del() {
1325        let db = Database::open(Config {
1326            shard_count: 2,
1327            ..Config::default()
1328        });
1329
1330        let v1 = vec![1.0f32, 0.0, 0.0, 0.0];
1331        let v2 = vec![0.0f32, 1.0, 0.0, 0.0];
1332        let v3 = vec![1.0f32, 1.0, 0.0, 0.0];
1333
1334        let id1 = db.vector_set("my_idx", 4, &v1).unwrap();
1335        let id2 = db.vector_set("my_idx", 4, &v2).unwrap();
1336        let id3 = db.vector_set("my_idx", 4, &v3).unwrap();
1337        assert_ne!(id1, id2);
1338        assert_ne!(id2, id3);
1339
1340        let results = db.vector_search("my_idx", &v1, 3).unwrap();
1341        assert!(!results.is_empty());
1342        assert!(
1343            results[0].1 < 0.001,
1344            "first result should be near-exact match"
1345        );
1346
1347        assert!(db.vector_del("my_idx").unwrap());
1348        assert!(!db.vector_del("my_idx").unwrap());
1349
1350        let results = db.vector_search("my_idx", &v1, 3).unwrap();
1351        assert!(results.is_empty());
1352    }
1353
1354    #[test]
1355    fn test_vector_dimension_mismatch() {
1356        let db = Database::open(Config {
1357            shard_count: 2,
1358            ..Config::default()
1359        });
1360        db.vector_set("idx", 3, &[1.0, 2.0, 3.0]).unwrap();
1361        let result = db.vector_set("idx", 5, &[1.0, 2.0, 3.0, 4.0, 5.0]);
1362        assert!(result.is_err());
1363    }
1364
1365    #[test]
1366    fn test_vector_search_nonexistent_index() {
1367        let db = Database::open(Config {
1368            shard_count: 2,
1369            ..Config::default()
1370        });
1371        let results = db.vector_search("nonexistent", &[1.0, 2.0], 5).unwrap();
1372        assert!(results.is_empty());
1373    }
1374
1375    #[test]
1376    fn test_document_crud() {
1377        let db = Database::open(Config {
1378            shard_count: 2,
1379            ..Config::default()
1380        });
1381        db.doc_create_collection("users", DocCollectionConfig::default())
1382            .expect("collection should be created");
1383
1384        let set = db
1385            .doc_set(
1386                "users",
1387                "doc:1",
1388                &json!({
1389                    "name": "Augustus",
1390                    "age": 30,
1391                    "address": {"city": "Accra"}
1392                }),
1393            )
1394            .expect("set should succeed");
1395        assert!(set.created);
1396        assert!(db
1397            .doc_exists("users", "doc:1")
1398            .expect("exists should succeed"));
1399
1400        let full = db
1401            .doc_get("users", "doc:1", None)
1402            .expect("get should succeed")
1403            .expect("document should exist");
1404        assert_eq!(
1405            full,
1406            json!({
1407                "name": "Augustus",
1408                "age": 30,
1409                "address": {"city": "Accra"}
1410            })
1411        );
1412
1413        let projection = db
1414            .doc_get("users", "doc:1", Some(&["name", "address.city"]))
1415            .expect("projected get should succeed")
1416            .expect("document should exist");
1417        assert_eq!(
1418            projection,
1419            json!({"name": "Augustus", "address": {"city": "Accra"}})
1420        );
1421
1422        assert!(db.doc_del("users", "doc:1").expect("delete should succeed"));
1423        assert!(!db
1424            .doc_exists("users", "doc:1")
1425            .expect("exists should succeed"));
1426    }
1427
1428    #[test]
1429    fn test_document_collection_drop() {
1430        let db = Database::open(Config {
1431            shard_count: 2,
1432            ..Config::default()
1433        });
1434        db.doc_create_collection("users", DocCollectionConfig::default())
1435            .expect("collection should be created");
1436        db.doc_set("users", "doc:1", &json!({"name": "Augustus"}))
1437            .expect("set should succeed");
1438
1439        let info = db
1440            .doc_collection_info("users")
1441            .expect("collection info should exist");
1442        assert_eq!(info.doc_count, 1);
1443
1444        assert!(db.doc_drop_collection("users"));
1445        assert!(!db.doc_drop_collection("users"));
1446        assert!(db.doc_collection_info("users").is_none());
1447    }
1448
1449    #[test]
1450    fn test_document_batch_ops() {
1451        let db = Database::open(Config {
1452            shard_count: 2,
1453            ..Config::default()
1454        });
1455        db.doc_create_collection("users", DocCollectionConfig::default())
1456            .expect("collection should be created");
1457
1458        let first = json!({"name": "Augustus", "age": 30});
1459        let second = json!({"name": "Ada", "age": 28});
1460        let results = db
1461            .doc_mset("users", &[("doc:1", &first), ("doc:2", &second)])
1462            .expect("mset should succeed");
1463        assert_eq!(results.len(), 2);
1464        assert!(results[0].created);
1465        assert!(results[1].created);
1466
1467        let docs = db
1468            .doc_mget("users", &["doc:1", "doc:2", "doc:missing"])
1469            .expect("mget should succeed");
1470        assert_eq!(docs, vec![Some(first), Some(second), None]);
1471
1472        let dictinfo = db
1473            .doc_dictionary_info("users")
1474            .expect("dictionary info should succeed");
1475        assert_eq!(dictinfo.collection_name, "users");
1476        assert!(dictinfo.dictionary_entries >= 1);
1477
1478        let storage = db
1479            .doc_storage_info("users")
1480            .expect("storage info should succeed");
1481        assert_eq!(storage.collection_name, "users");
1482        assert_eq!(storage.doc_count, 2);
1483        assert!(storage.total_packed_bytes > 0);
1484    }
1485
1486    #[test]
1487    fn test_document_update_ops() {
1488        let db = Database::open(Config {
1489            shard_count: 2,
1490            ..Config::default()
1491        });
1492        db.doc_create_collection("users", DocCollectionConfig::default())
1493            .expect("collection should be created");
1494        db.doc_set(
1495            "users",
1496            "doc:1",
1497            &json!({
1498                "name": "Augustus",
1499                "score": 10,
1500                "address": {"city": "Accra"},
1501                "tags": ["rust", "systems", "rust"]
1502            }),
1503        )
1504        .expect("set should succeed");
1505
1506        let updated = db
1507            .doc_update(
1508                "users",
1509                "doc:1",
1510                &[
1511                    DocUpdateMutation::Set {
1512                        path: "address.city".to_string(),
1513                        value: json!("London"),
1514                    },
1515                    DocUpdateMutation::Incr {
1516                        path: "score".to_string(),
1517                        delta: 0.5,
1518                    },
1519                    DocUpdateMutation::Push {
1520                        path: "tags".to_string(),
1521                        value: json!("cache"),
1522                    },
1523                    DocUpdateMutation::Pull {
1524                        path: "tags".to_string(),
1525                        value: json!("rust"),
1526                    },
1527                ],
1528            )
1529            .expect("update should succeed");
1530        assert!(updated);
1531
1532        let doc = db
1533            .doc_get("users", "doc:1", None)
1534            .expect("get should succeed")
1535            .expect("doc should exist");
1536        assert_eq!(
1537            doc,
1538            json!({
1539                "name": "Augustus",
1540                "score": 10.5,
1541                "address": {"city": "London"},
1542                "tags": ["systems", "cache"]
1543            })
1544        );
1545    }
1546
1547    #[test]
1548    fn test_persistence_survives_restart() {
1549        let dir = tempfile::TempDir::new().unwrap();
1550        let data_dir = dir.path().to_path_buf();
1551
1552        {
1553            let db = Database::open(Config {
1554                shard_count: 2,
1555                data_dir: Some(data_dir.clone()),
1556                wal_sync: SyncPolicy::EveryWrite,
1557            });
1558            db.set("greeting", b"hello world");
1559            db.set("counter", b"42");
1560            db.hset("user:1", "name", b"Augustus");
1561            db.hset("user:1", "city", b"Accra");
1562            db.rpush("tasks", &[b"task-1", b"task-2"]);
1563            db.sadd("tags", &[b"rust", b"cache"]);
1564        }
1565
1566        {
1567            let db = Database::open(Config {
1568                shard_count: 2,
1569                data_dir: Some(data_dir.clone()),
1570                wal_sync: SyncPolicy::EveryWrite,
1571            });
1572            assert_eq!(db.get("greeting"), Some(b"hello world".to_vec()));
1573            assert_eq!(db.get("counter"), Some(b"42".to_vec()));
1574            assert_eq!(db.hget("user:1", "name"), Some(b"Augustus".to_vec()));
1575            assert_eq!(db.hget("user:1", "city"), Some(b"Accra".to_vec()));
1576            assert_eq!(
1577                db.lrange("tasks", 0, -1),
1578                vec![b"task-1".to_vec(), b"task-2".to_vec()]
1579            );
1580            let members = db.smembers("tags");
1581            assert_eq!(members.len(), 2);
1582            assert!(db.sismember("tags", b"rust"));
1583            assert!(db.sismember("tags", b"cache"));
1584        }
1585    }
1586
1587    #[test]
1588    fn test_persistence_doc_survives_restart() {
1589        let dir = tempfile::TempDir::new().unwrap();
1590        let data_dir = dir.path().to_path_buf();
1591
1592        {
1593            let db = Database::open(Config {
1594                shard_count: 1,
1595                data_dir: Some(data_dir.clone()),
1596                wal_sync: SyncPolicy::EveryWrite,
1597            });
1598            db.doc_create_collection("users", DocCollectionConfig::default())
1599                .unwrap();
1600            db.doc_set(
1601                "users",
1602                "user:1",
1603                &json!({"name": "Augustus", "city": "Accra"}),
1604            )
1605            .unwrap();
1606        }
1607
1608        {
1609            let db = Database::open(Config {
1610                shard_count: 1,
1611                data_dir: Some(data_dir.clone()),
1612                wal_sync: SyncPolicy::EveryWrite,
1613            });
1614            let doc = db
1615                .doc_get("users", "user:1", None)
1616                .expect("get should succeed")
1617                .expect("doc should exist");
1618            assert_eq!(doc["name"], "Augustus");
1619            assert_eq!(doc["city"], "Accra");
1620        }
1621    }
1622}