hightower_kv/
engine.rs

1use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, bounded, unbounded};
2use std::sync::Arc;
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use parking_lot::Mutex;
7
8use crate::auth_service::AuthService;
9use crate::command::Command;
10use crate::compactor::{CompactionConfig, Compactor};
11use crate::config::StoreConfig;
12use crate::crypto::{AesGcmEncryptor, Argon2SecretHasher};
13use crate::error::{Error, Result};
14use crate::id_generator::IdGenerator;
15use crate::state::{ApplyOutcome, ConcurrentKvState, KvState};
16use crate::storage::Storage;
17
18/// Core key-value engine trait for command submission and retrieval.
19pub trait KvEngine: Send + Sync {
20    /// Submits a single command for execution.
21    fn submit(&self, command: Command) -> Result<ApplyOutcome>;
22    /// Submits a batch of commands for execution.
23    fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
24    where
25        I: IntoIterator<Item = Command>,
26    {
27        let mut outcomes = Vec::new();
28        for command in commands {
29            outcomes.push(self.submit(command)?);
30        }
31        Ok(outcomes)
32    }
33    /// Retrieves the value for a key if it exists.
34    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
35    /// Retrieves all key-value pairs with the given prefix.
36    fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
37}
38
39/// Trait for engines that can produce state snapshots.
40pub trait SnapshotEngine {
41    /// Returns a snapshot of the current key-value state.
42    fn snapshot_state(&self) -> KvState;
43    /// Returns the latest version number in the store.
44    fn latest_version(&self) -> u64;
45}
46
47#[derive(Debug)]
48struct EngineShared {
49    storage: Arc<Storage>,
50    state: Arc<ConcurrentKvState>,
51    version_gen: IdGenerator,
52    compactor: Compactor,
53    compaction_interval: Duration,
54    last_compaction: Mutex<Instant>,
55}
56
57impl EngineShared {
58    fn new(
59        storage: Arc<Storage>,
60        state: Arc<ConcurrentKvState>,
61        version_gen: IdGenerator,
62        compactor: Compactor,
63        compaction_interval: Duration,
64    ) -> Self {
65        Self {
66            storage,
67            state,
68            version_gen,
69            compactor,
70            compaction_interval,
71            last_compaction: Mutex::new(Instant::now()),
72        }
73    }
74
75    fn next_version(&self) -> u64 {
76        self.version_gen.next()
77    }
78
79    fn len(&self) -> usize {
80        self.state.len()
81    }
82
83    fn read_with<F, R>(&self, reader: F) -> R
84    where
85        F: FnOnce(&KvState) -> R,
86    {
87        self.state.read_with(reader)
88    }
89
90    fn apply_single(&self, command: Command) -> Result<ApplyOutcome> {
91        let mut guard = self.state.lock_entry(command.key());
92        let outcome = guard.evaluate(&command);
93        let result = match outcome {
94            ApplyOutcome::Applied | ApplyOutcome::Removed => {
95                self.storage.apply(&command)?;
96                let applied = guard.apply(&command);
97                debug_assert!(matches!(
98                    applied,
99                    ApplyOutcome::Applied | ApplyOutcome::Removed
100                ));
101                Ok(applied)
102            }
103            ApplyOutcome::IgnoredStale => Ok(ApplyOutcome::IgnoredStale),
104        };
105
106        let mutated = matches!(result, Ok(ApplyOutcome::Applied | ApplyOutcome::Removed));
107        drop(guard);
108        if mutated {
109            self.maybe_run_compaction()?;
110        }
111        result
112    }
113
114    fn apply_batch(&self, commands: Vec<Command>) -> Result<Vec<ApplyOutcome>> {
115        let mut outcomes = Vec::with_capacity(commands.len());
116        let mut mutated = false;
117
118        for command in commands {
119            let mut guard = self.state.lock_entry(command.key());
120            let outcome = guard.evaluate(&command);
121            match outcome {
122                ApplyOutcome::Applied | ApplyOutcome::Removed => {
123                    self.storage.apply(&command)?;
124                    let applied = guard.apply(&command);
125                    debug_assert!(matches!(
126                        applied,
127                        ApplyOutcome::Applied | ApplyOutcome::Removed
128                    ));
129                    if matches!(applied, ApplyOutcome::Applied | ApplyOutcome::Removed) {
130                        mutated = true;
131                    }
132                    outcomes.push(applied);
133                }
134                ApplyOutcome::IgnoredStale => outcomes.push(ApplyOutcome::IgnoredStale),
135            }
136        }
137
138        if mutated {
139            self.maybe_run_compaction()?;
140        }
141
142        Ok(outcomes)
143    }
144
145    fn maybe_run_compaction(&self) -> Result<()> {
146        if self.compaction_interval.is_zero() {
147            return Ok(());
148        }
149
150        let now = Instant::now();
151        {
152            let last = self.last_compaction.lock();
153            if now.duration_since(*last) < self.compaction_interval {
154                return Ok(());
155            }
156        }
157
158        self.compactor.run_once()?;
159        *self.last_compaction.lock() = now;
160        Ok(())
161    }
162
163    fn run_compaction_now(&self) -> Result<()> {
164        self.compactor.run_once()?;
165        *self.last_compaction.lock() = Instant::now();
166        Ok(())
167    }
168}
169
170enum WorkItem {
171    Command {
172        command: Command,
173        responder: Sender<Result<ApplyOutcome>>,
174    },
175    Batch {
176        commands: Vec<Command>,
177        responder: Sender<Result<Vec<ApplyOutcome>>>,
178    },
179}
180
181/// Single-node key-value engine with worker threads and automatic compaction.
182#[derive(Debug)]
183pub struct SingleNodeEngine {
184    shared: Arc<EngineShared>,
185    dispatcher: Option<Sender<WorkItem>>,
186    workers: Vec<JoinHandle<()>>,
187    compaction_signal: Option<Sender<()>>,
188    compaction_worker: Option<JoinHandle<()>>,
189}
190
191impl SingleNodeEngine {
192    /// Creates a new engine with default configuration.
193    pub fn new() -> Result<Self> {
194        Self::with_config(StoreConfig::default())
195    }
196
197    /// Creates a new engine with the specified configuration.
198    pub fn with_config(config: StoreConfig) -> Result<Self> {
199        let worker_threads = config.worker_threads;
200        let storage = Arc::new(Storage::new(&config)?);
201        let (mut state, mut max_version) = match storage.load_snapshot()? {
202            Some((state, version)) => (state, version),
203            None => (KvState::new(), 0u64),
204        };
205        storage.replay(|command| {
206            if command.version() > max_version {
207                max_version = max_version.max(command.version());
208                state.apply(&command);
209            }
210            Ok(())
211        })?;
212        let start_version = max_version
213            .checked_add(1)
214            .ok_or(Error::Unimplemented("engine::version_overflow"))?;
215        let compactor_config = CompactionConfig {
216            min_bytes: config.max_segment_size,
217            emit_snapshot: config.emit_snapshot_after_compaction,
218            ..CompactionConfig::default()
219        };
220        let compactor = Compactor::new(Arc::clone(&storage), compactor_config);
221
222        let concurrent_state = Arc::new(ConcurrentKvState::from(state));
223
224        let shared = Arc::new(EngineShared::new(
225            storage,
226            Arc::clone(&concurrent_state),
227            IdGenerator::new(start_version.max(1)),
228            compactor,
229            config.compaction_interval,
230        ));
231
232        let (dispatcher, workers) = if worker_threads == 0 {
233            (None, Vec::new())
234        } else {
235            let (task_tx, task_rx) = unbounded::<WorkItem>();
236            let workers = spawn_workers(worker_threads, Arc::clone(&shared), task_rx);
237            (Some(task_tx), workers)
238        };
239
240        let (shutdown_tx, shutdown_rx) = unbounded::<()>();
241        let compaction_worker = spawn_compaction_worker(Arc::clone(&shared), shutdown_rx);
242        let compaction_signal = compaction_worker.as_ref().map(|_| shutdown_tx);
243
244        Ok(Self {
245            shared,
246            dispatcher,
247            workers,
248            compaction_signal,
249            compaction_worker,
250        })
251    }
252
253    fn next_version(&self) -> u64 {
254        self.shared.next_version()
255    }
256
257    /// Inserts or updates a key-value pair.
258    pub fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<ApplyOutcome> {
259        let version = self.next_version();
260        let command = Command::Set {
261            key,
262            value,
263            version,
264            timestamp: current_timestamp(),
265        };
266        self.dispatch_command(command)
267    }
268
269    /// Deletes a key from the store.
270    pub fn delete(&self, key: Vec<u8>) -> Result<ApplyOutcome> {
271        let version = self.next_version();
272        let command = Command::Delete {
273            key,
274            version,
275            timestamp: current_timestamp(),
276        };
277        self.dispatch_command(command)
278    }
279
280    /// Returns the number of keys in the store.
281    pub fn len(&self) -> usize {
282        self.shared.len()
283    }
284
285    /// Flushes pending writes to disk.
286    pub fn flush(&self) -> Result<()> {
287        self.shared.storage.sync()
288    }
289
290    /// Submits a batch of commands for execution.
291    pub fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
292    where
293        I: IntoIterator<Item = Command>,
294    {
295        let collected: Vec<Command> = commands.into_iter().collect();
296        self.dispatch_batch(collected)
297    }
298
299    /// Executes a read operation with a consistent snapshot of the state.
300    pub fn read_with<F, R>(&self, reader: F) -> R
301    where
302        F: FnOnce(&KvState) -> R,
303    {
304        self.shared.read_with(reader)
305    }
306
307    /// Triggers an immediate compaction run.
308    pub fn run_compaction_now(&self) -> Result<()> {
309        self.shared.run_compaction_now()
310    }
311
312    /// Retrieves all key-value pairs with the given prefix.
313    pub fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
314        let entries = self.shared.storage.get_prefix(prefix);
315        let mut results = Vec::with_capacity(entries.len());
316
317        for (key, entry) in entries {
318            if entry.is_tombstone {
319                continue;
320            }
321
322            // Try to get from state first
323            if let Some(value) = self.shared.state.get(&key) {
324                results.push((key, value));
325                continue;
326            }
327
328            // Fetch from storage if not in state
329            if let Some(command) = self.shared.storage.fetch_command(&entry)? {
330                if let Command::Set { key, value, version, timestamp } = command {
331                    let mut guard = self.shared.state.lock_entry(&key);
332                    guard.apply(&Command::Set {
333                        key: key.clone(),
334                        value: value.clone(),
335                        version,
336                        timestamp,
337                    });
338                    results.push((key, value));
339                }
340            }
341        }
342
343        Ok(results)
344    }
345
346    /// Wraps the engine in an `Arc` and constructs an `AuthService` configured with Argon2 hashing and AES-GCM encryption.
347    pub fn into_argon2_hasher_aes_gcm_auth_service(
348        self,
349        master_key: [u8; 32],
350    ) -> (
351        Arc<SingleNodeEngine>,
352        AuthService<Arc<SingleNodeEngine>, Argon2SecretHasher, AesGcmEncryptor>,
353    ) {
354        let engine = Arc::new(self);
355        let auth = AuthService::new(
356            Arc::clone(&engine),
357            Argon2SecretHasher::default(),
358            AesGcmEncryptor::new(master_key),
359        );
360        (engine, auth)
361    }
362
363    fn dispatch_command(&self, command: Command) -> Result<ApplyOutcome> {
364        if let Some(dispatcher) = &self.dispatcher {
365            let (tx, rx) = bounded(1);
366            dispatcher
367                .send(WorkItem::Command {
368                    command,
369                    responder: tx,
370                })
371                .map_err(|_| Error::Invariant("engine dispatcher unavailable"))?;
372            rx.recv()
373                .map_err(|_| Error::Invariant("engine worker terminated"))?
374        } else {
375            self.shared.apply_single(command)
376        }
377    }
378
379    fn dispatch_batch(&self, commands: Vec<Command>) -> Result<Vec<ApplyOutcome>> {
380        if let Some(dispatcher) = &self.dispatcher {
381            let (tx, rx) = bounded(1);
382            dispatcher
383                .send(WorkItem::Batch {
384                    commands,
385                    responder: tx,
386                })
387                .map_err(|_| Error::Invariant("engine dispatcher unavailable"))?;
388            rx.recv()
389                .map_err(|_| Error::Invariant("engine worker terminated"))?
390        } else {
391            self.shared.apply_batch(commands)
392        }
393    }
394    #[cfg(test)]
395    pub(crate) fn test_next_version(&self) -> u64 {
396        self.next_version()
397    }
398
399    #[cfg(test)]
400    pub(crate) fn storage_for_test(&self) -> Arc<Storage> {
401        Arc::clone(&self.shared.storage)
402    }
403
404    #[cfg(test)]
405    pub(crate) fn clear_state_for_test(&self) {
406        self.shared.state.clear_for_test()
407    }
408}
409
410impl Drop for SingleNodeEngine {
411    fn drop(&mut self) {
412        if let Some(dispatcher) = self.dispatcher.take() {
413            drop(dispatcher);
414        }
415
416        for handle in self.workers.drain(..) {
417            let _ = handle.join();
418        }
419
420        if let Some(signal) = self.compaction_signal.take() {
421            let _ = signal.send(());
422        }
423
424        if let Some(handle) = self.compaction_worker.take() {
425            let _ = handle.join();
426        }
427    }
428}
429
430impl KvEngine for SingleNodeEngine {
431    fn submit(&self, command: Command) -> Result<ApplyOutcome> {
432        self.dispatch_command(command)
433    }
434
435    fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
436    where
437        I: IntoIterator<Item = Command>,
438    {
439        let collected: Vec<Command> = commands.into_iter().collect();
440        self.dispatch_batch(collected)
441    }
442
443    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
444        if let Some(value) = self.shared.state.get(key) {
445            return Ok(Some(value));
446        }
447
448        let entry = match self.shared.storage.lookup(key) {
449            Some(entry) => entry,
450            None => return Ok(None),
451        };
452
453        if entry.is_tombstone {
454            let mut guard = self.shared.state.lock_entry(key);
455            guard.apply(&Command::Delete {
456                key: key.to_vec(),
457                version: entry.version,
458                timestamp: 0,
459            });
460            return Ok(None);
461        }
462
463        let command = match self.shared.storage.fetch_command(&entry)? {
464            Some(command) => command,
465            None => return Ok(None),
466        };
467
468        let command_timestamp = command.timestamp();
469        match &command {
470            Command::Set {
471                key,
472                value,
473                version,
474                ..
475            } => {
476                let mut guard = self.shared.state.lock_entry(key);
477                let cloned_value = value.clone();
478                guard.apply(&Command::Set {
479                    key: key.clone(),
480                    value: value.clone(),
481                    version: *version,
482                    timestamp: command_timestamp,
483                });
484                Ok(Some(cloned_value))
485            }
486            Command::Delete { key, version, .. } => {
487                let mut guard = self.shared.state.lock_entry(key);
488                guard.apply(&Command::Delete {
489                    key: key.clone(),
490                    version: *version,
491                    timestamp: command_timestamp,
492                });
493                Ok(None)
494            }
495        }
496    }
497
498    fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
499        self.get_prefix(prefix)
500    }
501}
502
503impl<E> KvEngine for Arc<E>
504where
505    E: KvEngine,
506{
507    fn submit(&self, command: Command) -> Result<ApplyOutcome> {
508        (**self).submit(command)
509    }
510
511    fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
512    where
513        I: IntoIterator<Item = Command>,
514    {
515        (**self).submit_batch(commands)
516    }
517
518    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
519        (**self).get(key)
520    }
521
522    fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
523        (**self).get_prefix(prefix)
524    }
525}
526
527impl SnapshotEngine for SingleNodeEngine {
528    fn snapshot_state(&self) -> KvState {
529        self.shared.storage.state_snapshot()
530    }
531
532    fn latest_version(&self) -> u64 {
533        self.shared.storage.latest_version()
534    }
535}
536
537/// Spawns worker threads to process commands concurrently.
538fn spawn_workers(
539    count: usize,
540    shared: Arc<EngineShared>,
541    task_rx: Receiver<WorkItem>,
542) -> Vec<JoinHandle<()>> {
543    let count = count.max(1);
544    let mut handles = Vec::with_capacity(count);
545    for index in 0..count {
546        let worker_rx = task_rx.clone();
547        let worker_shared = Arc::clone(&shared);
548        let handle = thread::Builder::new()
549            .name(format!("hightower-engine-worker-{index}"))
550            .spawn(move || worker_loop(worker_shared, worker_rx))
551            .expect("failed to spawn engine worker");
552        handles.push(handle);
553    }
554    drop(task_rx);
555    handles
556}
557
558fn worker_loop(shared: Arc<EngineShared>, task_rx: Receiver<WorkItem>) {
559    while let Ok(item) = task_rx.recv() {
560        match item {
561            WorkItem::Command { command, responder } => {
562                let result = shared.apply_single(command);
563                let _ = responder.send(result);
564            }
565            WorkItem::Batch {
566                commands,
567                responder,
568            } => {
569                let result = shared.apply_batch(commands);
570                let _ = responder.send(result);
571            }
572        }
573    }
574}
575
576/// Spawns a background worker to run periodic compaction.
577fn spawn_compaction_worker(
578    shared: Arc<EngineShared>,
579    shutdown: Receiver<()>,
580) -> Option<JoinHandle<()>> {
581    if shared.compaction_interval.is_zero() {
582        return None;
583    }
584
585    let interval = shared.compaction_interval;
586    Some(
587        thread::Builder::new()
588            .name("hightower-compactor".into())
589            .spawn(move || {
590                loop {
591                    match shutdown.recv_timeout(interval) {
592                        Ok(_) | Err(RecvTimeoutError::Disconnected) => break,
593                        Err(RecvTimeoutError::Timeout) => {
594                            let _ = shared.maybe_run_compaction();
595                        }
596                    }
597                }
598            })
599            .expect("failed to spawn compaction worker"),
600    )
601}
602
603/// Returns the current Unix timestamp in seconds.
604fn current_timestamp() -> i64 {
605    SystemTime::now()
606        .duration_since(UNIX_EPOCH)
607        .map(|dur| dur.as_secs() as i64)
608        .unwrap_or(0)
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use std::sync::Arc;
615    use std::thread;
616    use std::time::Duration;
617    use tempfile::tempdir;
618
619    fn temp_config(dir: &std::path::Path) -> StoreConfig {
620        let mut cfg = StoreConfig::default();
621        cfg.data_dir = dir.join("engine-data").to_string_lossy().into_owned();
622        cfg.worker_threads = 2;
623        cfg
624    }
625
626    #[test]
627    fn put_and_get_via_engine() {
628        let temp = tempdir().unwrap();
629        let cfg = temp_config(temp.path());
630        let engine = SingleNodeEngine::with_config(cfg).unwrap();
631        engine.put(b"alpha".to_vec(), b"beta".to_vec()).unwrap();
632        let fetched = engine.get(b"alpha").unwrap();
633        assert_eq!(fetched, Some(b"beta".to_vec()));
634        assert_eq!(engine.len(), 1);
635    }
636
637    #[test]
638    fn delete_removes_key() {
639        let temp = tempdir().unwrap();
640        let cfg = temp_config(temp.path());
641        let engine = SingleNodeEngine::with_config(cfg).unwrap();
642        engine.put(b"key".to_vec(), b"value".to_vec()).unwrap();
643        engine.delete(b"key".to_vec()).unwrap();
644        assert!(engine.get(b"key").unwrap().is_none());
645    }
646
647    #[test]
648    fn persists_across_reopen() {
649        let temp = tempdir().unwrap();
650        let cfg = temp_config(temp.path());
651        {
652            let engine = SingleNodeEngine::with_config(cfg.clone()).unwrap();
653            engine.put(b"persist".to_vec(), b"value".to_vec()).unwrap();
654        }
655        let reopened = SingleNodeEngine::with_config(cfg).unwrap();
656        let value = reopened.get(b"persist").unwrap();
657        assert_eq!(value, Some(b"value".to_vec()));
658    }
659
660    #[test]
661    fn get_reads_from_storage_on_cache_miss() {
662        let temp = tempdir().unwrap();
663        let cfg = temp_config(temp.path());
664        let engine = SingleNodeEngine::with_config(cfg).unwrap();
665        engine.put(b"alpha".to_vec(), b"beta".to_vec()).unwrap();
666
667        engine.clear_state_for_test();
668
669        let fetched = engine.get(b"alpha").unwrap();
670        assert_eq!(fetched, Some(b"beta".to_vec()));
671        assert_eq!(engine.len(), 1);
672    }
673
674    #[test]
675    fn flush_propagates_to_storage() {
676        let temp = tempdir().unwrap();
677        let cfg = temp_config(temp.path());
678        let engine = SingleNodeEngine::with_config(cfg).unwrap();
679        engine.put(b"key".to_vec(), b"value".to_vec()).unwrap();
680        engine.flush().unwrap();
681    }
682
683    #[test]
684    fn submit_batch_applies_multiple_commands() {
685        let temp = tempdir().unwrap();
686        let cfg = temp_config(temp.path());
687        let engine = SingleNodeEngine::with_config(cfg).unwrap();
688
689        let commands = vec![
690            Command::Set {
691                key: b"a".to_vec(),
692                value: b"1".to_vec(),
693                version: engine.test_next_version(),
694                timestamp: 1,
695            },
696            Command::Set {
697                key: b"b".to_vec(),
698                value: b"2".to_vec(),
699                version: engine.test_next_version(),
700                timestamp: 2,
701            },
702        ];
703
704        let outcomes = engine.submit_batch(commands.clone()).unwrap();
705        assert!(
706            outcomes
707                .iter()
708                .all(|outcome| matches!(outcome, ApplyOutcome::Applied))
709        );
710        assert_eq!(engine.get(b"a").unwrap(), Some(b"1".to_vec()));
711        assert_eq!(engine.get(b"b").unwrap(), Some(b"2".to_vec()));
712    }
713
714    #[test]
715    fn submit_batch_skips_stale_commands() {
716        let temp = tempdir().unwrap();
717        let cfg = temp_config(temp.path());
718        let engine = SingleNodeEngine::with_config(cfg).unwrap();
719        engine.put(b"k".to_vec(), b"v1".to_vec()).unwrap();
720
721        let stale = Command::Set {
722            key: b"k".to_vec(),
723            value: b"old".to_vec(),
724            version: 1,
725            timestamp: 10,
726        };
727        let fresh = Command::Set {
728            key: b"k".to_vec(),
729            value: b"v2".to_vec(),
730            version: engine.test_next_version(),
731            timestamp: 11,
732        };
733
734        let outcomes = engine.submit_batch(vec![stale, fresh]).unwrap();
735        assert!(matches!(outcomes[0], ApplyOutcome::IgnoredStale));
736        assert!(matches!(outcomes[1], ApplyOutcome::Applied));
737        assert_eq!(engine.get(b"k").unwrap(), Some(b"v2".to_vec()));
738    }
739
740    #[test]
741    fn read_with_provides_consistent_snapshot() {
742        let temp = tempdir().unwrap();
743        let cfg = temp_config(temp.path());
744        let engine = SingleNodeEngine::with_config(cfg).unwrap();
745        engine.put(b"snap".to_vec(), b"value".to_vec()).unwrap();
746
747        let snapshot = engine.read_with(|state| {
748            state
749                .get(b"snap")
750                .map(|bytes| bytes.to_vec())
751                .unwrap_or_default()
752        });
753
754        assert_eq!(snapshot, b"value".to_vec());
755    }
756
757    #[test]
758    fn run_compaction_now_merges_segments_and_creates_snapshot() {
759        let temp = tempdir().unwrap();
760        let mut cfg = temp_config(temp.path());
761        cfg.max_segment_size = 64;
762        cfg.compaction_interval = Duration::from_secs(0);
763        cfg.emit_snapshot_after_compaction = true;
764        let engine = SingleNodeEngine::with_config(cfg.clone()).unwrap();
765
766        for i in 0..6 {
767            engine
768                .put(format!("key{i}").into_bytes(), vec![b'x'; 16])
769                .unwrap();
770        }
771
772        let storage_before = engine.storage_for_test();
773        let sealed_before = storage_before.sealed_segments_snapshot();
774        assert!(sealed_before.len() >= 1);
775
776        engine.run_compaction_now().unwrap();
777
778        let storage_after = engine.storage_for_test();
779        let sealed_after = storage_after.sealed_segments_snapshot();
780        assert!(sealed_after.len() <= sealed_before.len());
781
782        let snapshot_path = std::path::Path::new(&cfg.data_dir).join("snapshot.bin");
783        assert!(snapshot_path.exists());
784
785        drop(engine);
786
787        let reopened = SingleNodeEngine::with_config(cfg).unwrap();
788        assert!(reopened.get(b"key0").unwrap().is_some());
789    }
790
791    #[test]
792    fn into_argon2_hasher_aes_gcm_auth_service_returns_handles() {
793        let temp = tempdir().unwrap();
794        let cfg = temp_config(temp.path());
795        let engine = SingleNodeEngine::with_config(cfg).unwrap();
796
797        let (engine, auth) = engine.into_argon2_hasher_aes_gcm_auth_service([7u8; 32]);
798
799        let user = auth.create_user("bundle", "secret").unwrap();
800        assert!(auth.verify_password("bundle", "secret").unwrap());
801
802        let (record, token) = auth.create_api_key(&user.user_id, None).unwrap();
803        assert!(token.starts_with(&record.key_id));
804        assert!(auth.authenticate_api_key(&token).unwrap().is_some());
805
806        engine.put(b"key".to_vec(), b"value".to_vec()).unwrap();
807        assert_eq!(engine.get(b"key").unwrap(), Some(b"value".to_vec()));
808    }
809
810    #[test]
811    fn concurrent_submitters_share_workers() {
812        let temp = tempdir().unwrap();
813        let mut cfg = temp_config(temp.path());
814        cfg.compaction_interval = Duration::from_secs(0);
815        let engine = Arc::new(SingleNodeEngine::with_config(cfg).unwrap());
816
817        let threads: Vec<_> = (0..4)
818            .map(|worker| {
819                let engine = Arc::clone(&engine);
820                thread::spawn(move || {
821                    for idx in 0..25 {
822                        let key = format!("k-{worker}-{idx}").into_bytes();
823                        let value = format!("v-{worker}-{idx}").into_bytes();
824                        engine.put(key, value).unwrap();
825                    }
826                })
827            })
828            .collect();
829
830        for handle in threads {
831            handle.join().unwrap();
832        }
833
834        for worker in 0..4 {
835            for idx in 0..25 {
836                let key = format!("k-{worker}-{idx}").into_bytes();
837                let expected = format!("v-{worker}-{idx}").into_bytes();
838                assert_eq!(engine.get(&key).unwrap(), Some(expected));
839            }
840        }
841    }
842
843    #[test]
844    fn get_prefix_returns_matching_keys() {
845        let temp = tempdir().unwrap();
846        let cfg = temp_config(temp.path());
847        let engine = SingleNodeEngine::with_config(cfg).unwrap();
848
849        engine.put(b"app:user:1".to_vec(), b"alice".to_vec()).unwrap();
850        engine.put(b"app:user:2".to_vec(), b"bob".to_vec()).unwrap();
851        engine.put(b"app:session:1".to_vec(), b"s1".to_vec()).unwrap();
852        engine.put(b"other:key".to_vec(), b"value".to_vec()).unwrap();
853
854        let results = engine.get_prefix(b"app:user:").unwrap();
855        assert_eq!(results.len(), 2);
856
857        let mut keys: Vec<Vec<u8>> = results.iter().map(|(k, _)| k.clone()).collect();
858        keys.sort();
859        assert_eq!(keys[0], b"app:user:1");
860        assert_eq!(keys[1], b"app:user:2");
861
862        let values: Vec<Vec<u8>> = results.iter().map(|(_, v)| v.clone()).collect();
863        assert!(values.contains(&b"alice".to_vec()));
864        assert!(values.contains(&b"bob".to_vec()));
865    }
866
867    #[test]
868    fn get_prefix_excludes_deleted_keys() {
869        let temp = tempdir().unwrap();
870        let cfg = temp_config(temp.path());
871        let engine = SingleNodeEngine::with_config(cfg).unwrap();
872
873        engine.put(b"prefix:key1".to_vec(), b"value1".to_vec()).unwrap();
874        engine.put(b"prefix:key2".to_vec(), b"value2".to_vec()).unwrap();
875        engine.delete(b"prefix:key1".to_vec()).unwrap();
876
877        let results = engine.get_prefix(b"prefix:").unwrap();
878        assert_eq!(results.len(), 1);
879        assert_eq!(results[0].0, b"prefix:key2");
880        assert_eq!(results[0].1, b"value2");
881    }
882
883    #[test]
884    fn get_prefix_with_empty_prefix_returns_all() {
885        let temp = tempdir().unwrap();
886        let cfg = temp_config(temp.path());
887        let engine = SingleNodeEngine::with_config(cfg).unwrap();
888
889        engine.put(b"a".to_vec(), b"1".to_vec()).unwrap();
890        engine.put(b"b".to_vec(), b"2".to_vec()).unwrap();
891        engine.put(b"c".to_vec(), b"3".to_vec()).unwrap();
892
893        let results = engine.get_prefix(b"").unwrap();
894        assert_eq!(results.len(), 3);
895    }
896
897    #[test]
898    fn get_prefix_persists_across_reopen() {
899        let temp = tempdir().unwrap();
900        let cfg = temp_config(temp.path());
901
902        {
903            let engine = SingleNodeEngine::with_config(cfg.clone()).unwrap();
904            engine.put(b"persist:1".to_vec(), b"v1".to_vec()).unwrap();
905            engine.put(b"persist:2".to_vec(), b"v2".to_vec()).unwrap();
906            engine.put(b"other".to_vec(), b"v3".to_vec()).unwrap();
907        }
908
909        let reopened = SingleNodeEngine::with_config(cfg).unwrap();
910        let results = reopened.get_prefix(b"persist:").unwrap();
911        assert_eq!(results.len(), 2);
912
913        let keys: Vec<Vec<u8>> = results.iter().map(|(k, _)| k.clone()).collect();
914        assert!(keys.contains(&b"persist:1".to_vec()));
915        assert!(keys.contains(&b"persist:2".to_vec()));
916    }
917}