1use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, bounded, unbounded};
2use std::sync::Arc;
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use parking_lot::Mutex;
7
8use crate::auth_service::AuthService;
9use crate::command::Command;
10use crate::compactor::{CompactionConfig, Compactor};
11use crate::config::StoreConfig;
12use crate::crypto::{AesGcmEncryptor, Argon2SecretHasher};
13use crate::error::{Error, Result};
14use crate::id_generator::IdGenerator;
15use crate::state::{ApplyOutcome, ConcurrentKvState, KvState};
16use crate::storage::Storage;
17
18pub trait KvEngine: Send + Sync {
20 fn submit(&self, command: Command) -> Result<ApplyOutcome>;
22 fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
24 where
25 I: IntoIterator<Item = Command>,
26 {
27 let mut outcomes = Vec::new();
28 for command in commands {
29 outcomes.push(self.submit(command)?);
30 }
31 Ok(outcomes)
32 }
33 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
35 fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
37}
38
39pub trait SnapshotEngine {
41 fn snapshot_state(&self) -> KvState;
43 fn latest_version(&self) -> u64;
45}
46
47#[derive(Debug)]
48struct EngineShared {
49 storage: Arc<Storage>,
50 state: Arc<ConcurrentKvState>,
51 version_gen: IdGenerator,
52 compactor: Compactor,
53 compaction_interval: Duration,
54 last_compaction: Mutex<Instant>,
55}
56
57impl EngineShared {
58 fn new(
59 storage: Arc<Storage>,
60 state: Arc<ConcurrentKvState>,
61 version_gen: IdGenerator,
62 compactor: Compactor,
63 compaction_interval: Duration,
64 ) -> Self {
65 Self {
66 storage,
67 state,
68 version_gen,
69 compactor,
70 compaction_interval,
71 last_compaction: Mutex::new(Instant::now()),
72 }
73 }
74
75 fn next_version(&self) -> u64 {
76 self.version_gen.next()
77 }
78
79 fn len(&self) -> usize {
80 self.state.len()
81 }
82
83 fn read_with<F, R>(&self, reader: F) -> R
84 where
85 F: FnOnce(&KvState) -> R,
86 {
87 self.state.read_with(reader)
88 }
89
90 fn apply_single(&self, command: Command) -> Result<ApplyOutcome> {
91 let mut guard = self.state.lock_entry(command.key());
92 let outcome = guard.evaluate(&command);
93 let result = match outcome {
94 ApplyOutcome::Applied | ApplyOutcome::Removed => {
95 self.storage.apply(&command)?;
96 let applied = guard.apply(&command);
97 debug_assert!(matches!(
98 applied,
99 ApplyOutcome::Applied | ApplyOutcome::Removed
100 ));
101 Ok(applied)
102 }
103 ApplyOutcome::IgnoredStale => Ok(ApplyOutcome::IgnoredStale),
104 };
105
106 let mutated = matches!(result, Ok(ApplyOutcome::Applied | ApplyOutcome::Removed));
107 drop(guard);
108 if mutated {
109 self.maybe_run_compaction()?;
110 }
111 result
112 }
113
114 fn apply_batch(&self, commands: Vec<Command>) -> Result<Vec<ApplyOutcome>> {
115 let mut outcomes = Vec::with_capacity(commands.len());
116 let mut mutated = false;
117
118 for command in commands {
119 let mut guard = self.state.lock_entry(command.key());
120 let outcome = guard.evaluate(&command);
121 match outcome {
122 ApplyOutcome::Applied | ApplyOutcome::Removed => {
123 self.storage.apply(&command)?;
124 let applied = guard.apply(&command);
125 debug_assert!(matches!(
126 applied,
127 ApplyOutcome::Applied | ApplyOutcome::Removed
128 ));
129 if matches!(applied, ApplyOutcome::Applied | ApplyOutcome::Removed) {
130 mutated = true;
131 }
132 outcomes.push(applied);
133 }
134 ApplyOutcome::IgnoredStale => outcomes.push(ApplyOutcome::IgnoredStale),
135 }
136 }
137
138 if mutated {
139 self.maybe_run_compaction()?;
140 }
141
142 Ok(outcomes)
143 }
144
145 fn maybe_run_compaction(&self) -> Result<()> {
146 if self.compaction_interval.is_zero() {
147 return Ok(());
148 }
149
150 let now = Instant::now();
151 {
152 let last = self.last_compaction.lock();
153 if now.duration_since(*last) < self.compaction_interval {
154 return Ok(());
155 }
156 }
157
158 self.compactor.run_once()?;
159 *self.last_compaction.lock() = now;
160 Ok(())
161 }
162
163 fn run_compaction_now(&self) -> Result<()> {
164 self.compactor.run_once()?;
165 *self.last_compaction.lock() = Instant::now();
166 Ok(())
167 }
168}
169
170enum WorkItem {
171 Command {
172 command: Command,
173 responder: Sender<Result<ApplyOutcome>>,
174 },
175 Batch {
176 commands: Vec<Command>,
177 responder: Sender<Result<Vec<ApplyOutcome>>>,
178 },
179}
180
181#[derive(Debug)]
183pub struct SingleNodeEngine {
184 shared: Arc<EngineShared>,
185 dispatcher: Option<Sender<WorkItem>>,
186 workers: Vec<JoinHandle<()>>,
187 compaction_signal: Option<Sender<()>>,
188 compaction_worker: Option<JoinHandle<()>>,
189}
190
191impl SingleNodeEngine {
192 pub fn new() -> Result<Self> {
194 Self::with_config(StoreConfig::default())
195 }
196
197 pub fn with_config(config: StoreConfig) -> Result<Self> {
199 let worker_threads = config.worker_threads;
200 let storage = Arc::new(Storage::new(&config)?);
201 let (mut state, mut max_version) = match storage.load_snapshot()? {
202 Some((state, version)) => (state, version),
203 None => (KvState::new(), 0u64),
204 };
205 storage.replay(|command| {
206 if command.version() > max_version {
207 max_version = max_version.max(command.version());
208 state.apply(&command);
209 }
210 Ok(())
211 })?;
212 let start_version = max_version
213 .checked_add(1)
214 .ok_or(Error::Unimplemented("engine::version_overflow"))?;
215 let compactor_config = CompactionConfig {
216 min_bytes: config.max_segment_size,
217 emit_snapshot: config.emit_snapshot_after_compaction,
218 ..CompactionConfig::default()
219 };
220 let compactor = Compactor::new(Arc::clone(&storage), compactor_config);
221
222 let concurrent_state = Arc::new(ConcurrentKvState::from(state));
223
224 let shared = Arc::new(EngineShared::new(
225 storage,
226 Arc::clone(&concurrent_state),
227 IdGenerator::new(start_version.max(1)),
228 compactor,
229 config.compaction_interval,
230 ));
231
232 let (dispatcher, workers) = if worker_threads == 0 {
233 (None, Vec::new())
234 } else {
235 let (task_tx, task_rx) = unbounded::<WorkItem>();
236 let workers = spawn_workers(worker_threads, Arc::clone(&shared), task_rx);
237 (Some(task_tx), workers)
238 };
239
240 let (shutdown_tx, shutdown_rx) = unbounded::<()>();
241 let compaction_worker = spawn_compaction_worker(Arc::clone(&shared), shutdown_rx);
242 let compaction_signal = compaction_worker.as_ref().map(|_| shutdown_tx);
243
244 Ok(Self {
245 shared,
246 dispatcher,
247 workers,
248 compaction_signal,
249 compaction_worker,
250 })
251 }
252
253 fn next_version(&self) -> u64 {
254 self.shared.next_version()
255 }
256
257 pub fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<ApplyOutcome> {
259 let version = self.next_version();
260 let command = Command::Set {
261 key,
262 value,
263 version,
264 timestamp: current_timestamp(),
265 };
266 self.dispatch_command(command)
267 }
268
269 pub fn delete(&self, key: Vec<u8>) -> Result<ApplyOutcome> {
271 let version = self.next_version();
272 let command = Command::Delete {
273 key,
274 version,
275 timestamp: current_timestamp(),
276 };
277 self.dispatch_command(command)
278 }
279
280 pub fn len(&self) -> usize {
282 self.shared.len()
283 }
284
285 pub fn flush(&self) -> Result<()> {
287 self.shared.storage.sync()
288 }
289
290 pub fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
292 where
293 I: IntoIterator<Item = Command>,
294 {
295 let collected: Vec<Command> = commands.into_iter().collect();
296 self.dispatch_batch(collected)
297 }
298
299 pub fn read_with<F, R>(&self, reader: F) -> R
301 where
302 F: FnOnce(&KvState) -> R,
303 {
304 self.shared.read_with(reader)
305 }
306
307 pub fn run_compaction_now(&self) -> Result<()> {
309 self.shared.run_compaction_now()
310 }
311
312 pub fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
314 let entries = self.shared.storage.get_prefix(prefix);
315 let mut results = Vec::with_capacity(entries.len());
316
317 for (key, entry) in entries {
318 if entry.is_tombstone {
319 continue;
320 }
321
322 if let Some(value) = self.shared.state.get(&key) {
324 results.push((key, value));
325 continue;
326 }
327
328 if let Some(command) = self.shared.storage.fetch_command(&entry)? {
330 if let Command::Set { key, value, version, timestamp } = command {
331 let mut guard = self.shared.state.lock_entry(&key);
332 guard.apply(&Command::Set {
333 key: key.clone(),
334 value: value.clone(),
335 version,
336 timestamp,
337 });
338 results.push((key, value));
339 }
340 }
341 }
342
343 Ok(results)
344 }
345
346 pub fn into_argon2_hasher_aes_gcm_auth_service(
348 self,
349 master_key: [u8; 32],
350 ) -> (
351 Arc<SingleNodeEngine>,
352 AuthService<Arc<SingleNodeEngine>, Argon2SecretHasher, AesGcmEncryptor>,
353 ) {
354 let engine = Arc::new(self);
355 let auth = AuthService::new(
356 Arc::clone(&engine),
357 Argon2SecretHasher::default(),
358 AesGcmEncryptor::new(master_key),
359 );
360 (engine, auth)
361 }
362
363 fn dispatch_command(&self, command: Command) -> Result<ApplyOutcome> {
364 if let Some(dispatcher) = &self.dispatcher {
365 let (tx, rx) = bounded(1);
366 dispatcher
367 .send(WorkItem::Command {
368 command,
369 responder: tx,
370 })
371 .map_err(|_| Error::Invariant("engine dispatcher unavailable"))?;
372 rx.recv()
373 .map_err(|_| Error::Invariant("engine worker terminated"))?
374 } else {
375 self.shared.apply_single(command)
376 }
377 }
378
379 fn dispatch_batch(&self, commands: Vec<Command>) -> Result<Vec<ApplyOutcome>> {
380 if let Some(dispatcher) = &self.dispatcher {
381 let (tx, rx) = bounded(1);
382 dispatcher
383 .send(WorkItem::Batch {
384 commands,
385 responder: tx,
386 })
387 .map_err(|_| Error::Invariant("engine dispatcher unavailable"))?;
388 rx.recv()
389 .map_err(|_| Error::Invariant("engine worker terminated"))?
390 } else {
391 self.shared.apply_batch(commands)
392 }
393 }
394 #[cfg(test)]
395 pub(crate) fn test_next_version(&self) -> u64 {
396 self.next_version()
397 }
398
399 #[cfg(test)]
400 pub(crate) fn storage_for_test(&self) -> Arc<Storage> {
401 Arc::clone(&self.shared.storage)
402 }
403
404 #[cfg(test)]
405 pub(crate) fn clear_state_for_test(&self) {
406 self.shared.state.clear_for_test()
407 }
408}
409
410impl Drop for SingleNodeEngine {
411 fn drop(&mut self) {
412 if let Some(dispatcher) = self.dispatcher.take() {
413 drop(dispatcher);
414 }
415
416 for handle in self.workers.drain(..) {
417 let _ = handle.join();
418 }
419
420 if let Some(signal) = self.compaction_signal.take() {
421 let _ = signal.send(());
422 }
423
424 if let Some(handle) = self.compaction_worker.take() {
425 let _ = handle.join();
426 }
427 }
428}
429
430impl KvEngine for SingleNodeEngine {
431 fn submit(&self, command: Command) -> Result<ApplyOutcome> {
432 self.dispatch_command(command)
433 }
434
435 fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
436 where
437 I: IntoIterator<Item = Command>,
438 {
439 let collected: Vec<Command> = commands.into_iter().collect();
440 self.dispatch_batch(collected)
441 }
442
443 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
444 if let Some(value) = self.shared.state.get(key) {
445 return Ok(Some(value));
446 }
447
448 let entry = match self.shared.storage.lookup(key) {
449 Some(entry) => entry,
450 None => return Ok(None),
451 };
452
453 if entry.is_tombstone {
454 let mut guard = self.shared.state.lock_entry(key);
455 guard.apply(&Command::Delete {
456 key: key.to_vec(),
457 version: entry.version,
458 timestamp: 0,
459 });
460 return Ok(None);
461 }
462
463 let command = match self.shared.storage.fetch_command(&entry)? {
464 Some(command) => command,
465 None => return Ok(None),
466 };
467
468 let command_timestamp = command.timestamp();
469 match &command {
470 Command::Set {
471 key,
472 value,
473 version,
474 ..
475 } => {
476 let mut guard = self.shared.state.lock_entry(key);
477 let cloned_value = value.clone();
478 guard.apply(&Command::Set {
479 key: key.clone(),
480 value: value.clone(),
481 version: *version,
482 timestamp: command_timestamp,
483 });
484 Ok(Some(cloned_value))
485 }
486 Command::Delete { key, version, .. } => {
487 let mut guard = self.shared.state.lock_entry(key);
488 guard.apply(&Command::Delete {
489 key: key.clone(),
490 version: *version,
491 timestamp: command_timestamp,
492 });
493 Ok(None)
494 }
495 }
496 }
497
498 fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
499 self.get_prefix(prefix)
500 }
501}
502
503impl<E> KvEngine for Arc<E>
504where
505 E: KvEngine,
506{
507 fn submit(&self, command: Command) -> Result<ApplyOutcome> {
508 (**self).submit(command)
509 }
510
511 fn submit_batch<I>(&self, commands: I) -> Result<Vec<ApplyOutcome>>
512 where
513 I: IntoIterator<Item = Command>,
514 {
515 (**self).submit_batch(commands)
516 }
517
518 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
519 (**self).get(key)
520 }
521
522 fn get_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
523 (**self).get_prefix(prefix)
524 }
525}
526
527impl SnapshotEngine for SingleNodeEngine {
528 fn snapshot_state(&self) -> KvState {
529 self.shared.storage.state_snapshot()
530 }
531
532 fn latest_version(&self) -> u64 {
533 self.shared.storage.latest_version()
534 }
535}
536
537fn spawn_workers(
539 count: usize,
540 shared: Arc<EngineShared>,
541 task_rx: Receiver<WorkItem>,
542) -> Vec<JoinHandle<()>> {
543 let count = count.max(1);
544 let mut handles = Vec::with_capacity(count);
545 for index in 0..count {
546 let worker_rx = task_rx.clone();
547 let worker_shared = Arc::clone(&shared);
548 let handle = thread::Builder::new()
549 .name(format!("hightower-engine-worker-{index}"))
550 .spawn(move || worker_loop(worker_shared, worker_rx))
551 .expect("failed to spawn engine worker");
552 handles.push(handle);
553 }
554 drop(task_rx);
555 handles
556}
557
558fn worker_loop(shared: Arc<EngineShared>, task_rx: Receiver<WorkItem>) {
559 while let Ok(item) = task_rx.recv() {
560 match item {
561 WorkItem::Command { command, responder } => {
562 let result = shared.apply_single(command);
563 let _ = responder.send(result);
564 }
565 WorkItem::Batch {
566 commands,
567 responder,
568 } => {
569 let result = shared.apply_batch(commands);
570 let _ = responder.send(result);
571 }
572 }
573 }
574}
575
576fn spawn_compaction_worker(
578 shared: Arc<EngineShared>,
579 shutdown: Receiver<()>,
580) -> Option<JoinHandle<()>> {
581 if shared.compaction_interval.is_zero() {
582 return None;
583 }
584
585 let interval = shared.compaction_interval;
586 Some(
587 thread::Builder::new()
588 .name("hightower-compactor".into())
589 .spawn(move || {
590 loop {
591 match shutdown.recv_timeout(interval) {
592 Ok(_) | Err(RecvTimeoutError::Disconnected) => break,
593 Err(RecvTimeoutError::Timeout) => {
594 let _ = shared.maybe_run_compaction();
595 }
596 }
597 }
598 })
599 .expect("failed to spawn compaction worker"),
600 )
601}
602
603fn current_timestamp() -> i64 {
605 SystemTime::now()
606 .duration_since(UNIX_EPOCH)
607 .map(|dur| dur.as_secs() as i64)
608 .unwrap_or(0)
609}
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614 use std::sync::Arc;
615 use std::thread;
616 use std::time::Duration;
617 use tempfile::tempdir;
618
619 fn temp_config(dir: &std::path::Path) -> StoreConfig {
620 let mut cfg = StoreConfig::default();
621 cfg.data_dir = dir.join("engine-data").to_string_lossy().into_owned();
622 cfg.worker_threads = 2;
623 cfg
624 }
625
626 #[test]
627 fn put_and_get_via_engine() {
628 let temp = tempdir().unwrap();
629 let cfg = temp_config(temp.path());
630 let engine = SingleNodeEngine::with_config(cfg).unwrap();
631 engine.put(b"alpha".to_vec(), b"beta".to_vec()).unwrap();
632 let fetched = engine.get(b"alpha").unwrap();
633 assert_eq!(fetched, Some(b"beta".to_vec()));
634 assert_eq!(engine.len(), 1);
635 }
636
637 #[test]
638 fn delete_removes_key() {
639 let temp = tempdir().unwrap();
640 let cfg = temp_config(temp.path());
641 let engine = SingleNodeEngine::with_config(cfg).unwrap();
642 engine.put(b"key".to_vec(), b"value".to_vec()).unwrap();
643 engine.delete(b"key".to_vec()).unwrap();
644 assert!(engine.get(b"key").unwrap().is_none());
645 }
646
647 #[test]
648 fn persists_across_reopen() {
649 let temp = tempdir().unwrap();
650 let cfg = temp_config(temp.path());
651 {
652 let engine = SingleNodeEngine::with_config(cfg.clone()).unwrap();
653 engine.put(b"persist".to_vec(), b"value".to_vec()).unwrap();
654 }
655 let reopened = SingleNodeEngine::with_config(cfg).unwrap();
656 let value = reopened.get(b"persist").unwrap();
657 assert_eq!(value, Some(b"value".to_vec()));
658 }
659
660 #[test]
661 fn get_reads_from_storage_on_cache_miss() {
662 let temp = tempdir().unwrap();
663 let cfg = temp_config(temp.path());
664 let engine = SingleNodeEngine::with_config(cfg).unwrap();
665 engine.put(b"alpha".to_vec(), b"beta".to_vec()).unwrap();
666
667 engine.clear_state_for_test();
668
669 let fetched = engine.get(b"alpha").unwrap();
670 assert_eq!(fetched, Some(b"beta".to_vec()));
671 assert_eq!(engine.len(), 1);
672 }
673
674 #[test]
675 fn flush_propagates_to_storage() {
676 let temp = tempdir().unwrap();
677 let cfg = temp_config(temp.path());
678 let engine = SingleNodeEngine::with_config(cfg).unwrap();
679 engine.put(b"key".to_vec(), b"value".to_vec()).unwrap();
680 engine.flush().unwrap();
681 }
682
683 #[test]
684 fn submit_batch_applies_multiple_commands() {
685 let temp = tempdir().unwrap();
686 let cfg = temp_config(temp.path());
687 let engine = SingleNodeEngine::with_config(cfg).unwrap();
688
689 let commands = vec![
690 Command::Set {
691 key: b"a".to_vec(),
692 value: b"1".to_vec(),
693 version: engine.test_next_version(),
694 timestamp: 1,
695 },
696 Command::Set {
697 key: b"b".to_vec(),
698 value: b"2".to_vec(),
699 version: engine.test_next_version(),
700 timestamp: 2,
701 },
702 ];
703
704 let outcomes = engine.submit_batch(commands.clone()).unwrap();
705 assert!(
706 outcomes
707 .iter()
708 .all(|outcome| matches!(outcome, ApplyOutcome::Applied))
709 );
710 assert_eq!(engine.get(b"a").unwrap(), Some(b"1".to_vec()));
711 assert_eq!(engine.get(b"b").unwrap(), Some(b"2".to_vec()));
712 }
713
714 #[test]
715 fn submit_batch_skips_stale_commands() {
716 let temp = tempdir().unwrap();
717 let cfg = temp_config(temp.path());
718 let engine = SingleNodeEngine::with_config(cfg).unwrap();
719 engine.put(b"k".to_vec(), b"v1".to_vec()).unwrap();
720
721 let stale = Command::Set {
722 key: b"k".to_vec(),
723 value: b"old".to_vec(),
724 version: 1,
725 timestamp: 10,
726 };
727 let fresh = Command::Set {
728 key: b"k".to_vec(),
729 value: b"v2".to_vec(),
730 version: engine.test_next_version(),
731 timestamp: 11,
732 };
733
734 let outcomes = engine.submit_batch(vec![stale, fresh]).unwrap();
735 assert!(matches!(outcomes[0], ApplyOutcome::IgnoredStale));
736 assert!(matches!(outcomes[1], ApplyOutcome::Applied));
737 assert_eq!(engine.get(b"k").unwrap(), Some(b"v2".to_vec()));
738 }
739
740 #[test]
741 fn read_with_provides_consistent_snapshot() {
742 let temp = tempdir().unwrap();
743 let cfg = temp_config(temp.path());
744 let engine = SingleNodeEngine::with_config(cfg).unwrap();
745 engine.put(b"snap".to_vec(), b"value".to_vec()).unwrap();
746
747 let snapshot = engine.read_with(|state| {
748 state
749 .get(b"snap")
750 .map(|bytes| bytes.to_vec())
751 .unwrap_or_default()
752 });
753
754 assert_eq!(snapshot, b"value".to_vec());
755 }
756
757 #[test]
758 fn run_compaction_now_merges_segments_and_creates_snapshot() {
759 let temp = tempdir().unwrap();
760 let mut cfg = temp_config(temp.path());
761 cfg.max_segment_size = 64;
762 cfg.compaction_interval = Duration::from_secs(0);
763 cfg.emit_snapshot_after_compaction = true;
764 let engine = SingleNodeEngine::with_config(cfg.clone()).unwrap();
765
766 for i in 0..6 {
767 engine
768 .put(format!("key{i}").into_bytes(), vec![b'x'; 16])
769 .unwrap();
770 }
771
772 let storage_before = engine.storage_for_test();
773 let sealed_before = storage_before.sealed_segments_snapshot();
774 assert!(sealed_before.len() >= 1);
775
776 engine.run_compaction_now().unwrap();
777
778 let storage_after = engine.storage_for_test();
779 let sealed_after = storage_after.sealed_segments_snapshot();
780 assert!(sealed_after.len() <= sealed_before.len());
781
782 let snapshot_path = std::path::Path::new(&cfg.data_dir).join("snapshot.bin");
783 assert!(snapshot_path.exists());
784
785 drop(engine);
786
787 let reopened = SingleNodeEngine::with_config(cfg).unwrap();
788 assert!(reopened.get(b"key0").unwrap().is_some());
789 }
790
791 #[test]
792 fn into_argon2_hasher_aes_gcm_auth_service_returns_handles() {
793 let temp = tempdir().unwrap();
794 let cfg = temp_config(temp.path());
795 let engine = SingleNodeEngine::with_config(cfg).unwrap();
796
797 let (engine, auth) = engine.into_argon2_hasher_aes_gcm_auth_service([7u8; 32]);
798
799 let user = auth.create_user("bundle", "secret").unwrap();
800 assert!(auth.verify_password("bundle", "secret").unwrap());
801
802 let (record, token) = auth.create_api_key(&user.user_id, None).unwrap();
803 assert!(token.starts_with(&record.key_id));
804 assert!(auth.authenticate_api_key(&token).unwrap().is_some());
805
806 engine.put(b"key".to_vec(), b"value".to_vec()).unwrap();
807 assert_eq!(engine.get(b"key").unwrap(), Some(b"value".to_vec()));
808 }
809
810 #[test]
811 fn concurrent_submitters_share_workers() {
812 let temp = tempdir().unwrap();
813 let mut cfg = temp_config(temp.path());
814 cfg.compaction_interval = Duration::from_secs(0);
815 let engine = Arc::new(SingleNodeEngine::with_config(cfg).unwrap());
816
817 let threads: Vec<_> = (0..4)
818 .map(|worker| {
819 let engine = Arc::clone(&engine);
820 thread::spawn(move || {
821 for idx in 0..25 {
822 let key = format!("k-{worker}-{idx}").into_bytes();
823 let value = format!("v-{worker}-{idx}").into_bytes();
824 engine.put(key, value).unwrap();
825 }
826 })
827 })
828 .collect();
829
830 for handle in threads {
831 handle.join().unwrap();
832 }
833
834 for worker in 0..4 {
835 for idx in 0..25 {
836 let key = format!("k-{worker}-{idx}").into_bytes();
837 let expected = format!("v-{worker}-{idx}").into_bytes();
838 assert_eq!(engine.get(&key).unwrap(), Some(expected));
839 }
840 }
841 }
842
843 #[test]
844 fn get_prefix_returns_matching_keys() {
845 let temp = tempdir().unwrap();
846 let cfg = temp_config(temp.path());
847 let engine = SingleNodeEngine::with_config(cfg).unwrap();
848
849 engine.put(b"app:user:1".to_vec(), b"alice".to_vec()).unwrap();
850 engine.put(b"app:user:2".to_vec(), b"bob".to_vec()).unwrap();
851 engine.put(b"app:session:1".to_vec(), b"s1".to_vec()).unwrap();
852 engine.put(b"other:key".to_vec(), b"value".to_vec()).unwrap();
853
854 let results = engine.get_prefix(b"app:user:").unwrap();
855 assert_eq!(results.len(), 2);
856
857 let mut keys: Vec<Vec<u8>> = results.iter().map(|(k, _)| k.clone()).collect();
858 keys.sort();
859 assert_eq!(keys[0], b"app:user:1");
860 assert_eq!(keys[1], b"app:user:2");
861
862 let values: Vec<Vec<u8>> = results.iter().map(|(_, v)| v.clone()).collect();
863 assert!(values.contains(&b"alice".to_vec()));
864 assert!(values.contains(&b"bob".to_vec()));
865 }
866
867 #[test]
868 fn get_prefix_excludes_deleted_keys() {
869 let temp = tempdir().unwrap();
870 let cfg = temp_config(temp.path());
871 let engine = SingleNodeEngine::with_config(cfg).unwrap();
872
873 engine.put(b"prefix:key1".to_vec(), b"value1".to_vec()).unwrap();
874 engine.put(b"prefix:key2".to_vec(), b"value2".to_vec()).unwrap();
875 engine.delete(b"prefix:key1".to_vec()).unwrap();
876
877 let results = engine.get_prefix(b"prefix:").unwrap();
878 assert_eq!(results.len(), 1);
879 assert_eq!(results[0].0, b"prefix:key2");
880 assert_eq!(results[0].1, b"value2");
881 }
882
883 #[test]
884 fn get_prefix_with_empty_prefix_returns_all() {
885 let temp = tempdir().unwrap();
886 let cfg = temp_config(temp.path());
887 let engine = SingleNodeEngine::with_config(cfg).unwrap();
888
889 engine.put(b"a".to_vec(), b"1".to_vec()).unwrap();
890 engine.put(b"b".to_vec(), b"2".to_vec()).unwrap();
891 engine.put(b"c".to_vec(), b"3".to_vec()).unwrap();
892
893 let results = engine.get_prefix(b"").unwrap();
894 assert_eq!(results.len(), 3);
895 }
896
897 #[test]
898 fn get_prefix_persists_across_reopen() {
899 let temp = tempdir().unwrap();
900 let cfg = temp_config(temp.path());
901
902 {
903 let engine = SingleNodeEngine::with_config(cfg.clone()).unwrap();
904 engine.put(b"persist:1".to_vec(), b"v1".to_vec()).unwrap();
905 engine.put(b"persist:2".to_vec(), b"v2".to_vec()).unwrap();
906 engine.put(b"other".to_vec(), b"v3".to_vec()).unwrap();
907 }
908
909 let reopened = SingleNodeEngine::with_config(cfg).unwrap();
910 let results = reopened.get_prefix(b"persist:").unwrap();
911 assert_eq!(results.len(), 2);
912
913 let keys: Vec<Vec<u8>> = results.iter().map(|(k, _)| k.clone()).collect();
914 assert!(keys.contains(&b"persist:1".to_vec()));
915 assert!(keys.contains(&b"persist:2".to_vec()));
916 }
917}