Skip to main content

dwbase_storage_sled/
lib.rs

1//! Sled-backed append-only storage engine for DWBase.
2//!
3//! Atoms are immutable and written once; replay is driven by a per-world log that records
4//! append order. Storage keys:
5//! - `world/{world}/atoms/{atom_id}` -> serialized `Atom`
6//! - `world/{world}/log/{seq}` -> `atom_id` (sequence is zero-padded decimal for ordering)
7
8use std::{path::PathBuf, sync::Arc};
9
10use aes_gcm::{
11    aead::{Aead, AeadCore, KeyInit, OsRng},
12    Aes256Gcm, Nonce,
13};
14use crc32fast::Hasher as Crc32;
15use dwbase_core::{Atom, AtomId, WorldKey};
16use dwbase_engine::{AtomFilter, DwbaseError, Result, StorageEngine, StorageStats};
17use rmp_serde::{decode, encode};
18use serde::{Deserialize, Serialize};
19use sha2::{Digest, Sha256};
20use sled::IVec;
21use thiserror::Error;
22
23const LOG_SEQ_WIDTH: usize = 20;
24const ENC_MAGIC: &[u8] = b"ENC1";
25
26#[derive(Debug, Clone)]
27pub struct SledConfig {
28    pub path: PathBuf,
29    pub flush_on_write: bool,
30    pub encryption_enabled: bool,
31    pub key_id: Option<String>,
32}
33
34impl SledConfig {
35    pub fn new(path: impl Into<PathBuf>) -> Self {
36        Self {
37            path: path.into(),
38            flush_on_write: true,
39            encryption_enabled: false,
40            key_id: None,
41        }
42    }
43}
44
45pub trait KeyProvider: Send + Sync {
46    fn key_bytes(&self, key_id: &str) -> Result<Vec<u8>>;
47}
48
49#[derive(Debug, Clone, Default)]
50pub struct DummyKeyProvider {
51    keys: std::collections::HashMap<String, Vec<u8>>,
52}
53
54impl DummyKeyProvider {
55    pub fn with_key(mut self, key_id: impl Into<String>, key_bytes: [u8; 32]) -> Self {
56        self.keys.insert(key_id.into(), key_bytes.to_vec());
57        self
58    }
59}
60
61impl KeyProvider for DummyKeyProvider {
62    fn key_bytes(&self, key_id: &str) -> Result<Vec<u8>> {
63        self.keys
64            .get(key_id)
65            .cloned()
66            .ok_or_else(|| DwbaseError::InvalidInput(format!("missing key for id {key_id}")))
67    }
68}
69
70#[derive(Debug, Default)]
71pub struct EnvKeyProvider;
72
73impl KeyProvider for EnvKeyProvider {
74    fn key_bytes(&self, key_id: &str) -> Result<Vec<u8>> {
75        let env_key = format!(
76            "DWBASE_KEY_{}",
77            key_id.to_ascii_uppercase().replace('-', "_")
78        );
79        let raw = std::env::var(&env_key).map_err(|_| {
80            DwbaseError::InvalidInput(format!("env var {env_key} missing for key id {key_id}"))
81        })?;
82        hex::decode(raw.trim()).map_err(|e| DwbaseError::InvalidInput(e.to_string()))
83    }
84}
85
86/// Sled-backed implementation of `StorageEngine`.
87pub struct SledStorage {
88    db: sled::Db,
89    flush_on_write: bool,
90    encryption_enabled: bool,
91    key_id: Option<String>,
92    key_provider: Arc<dyn KeyProvider>,
93}
94
95#[derive(Debug, Serialize, Deserialize)]
96struct AtomIndexEntry {
97    world: WorldKey,
98    seq: u64,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102struct LogFrame {
103    atom_id: AtomId,
104    checksum: u32,
105    len: u64,
106}
107
108#[derive(Debug, Serialize, Deserialize)]
109struct EncryptedBlob {
110    key_id: String,
111    nonce: [u8; 12],
112    cipher: Vec<u8>,
113}
114
115#[derive(Debug, Error)]
116enum StorageError {
117    #[error("sled error: {0}")]
118    Sled(#[from] sled::Error),
119    #[error("serialization error: {0}")]
120    Encode(#[from] encode::Error),
121    #[error("deserialization error: {0}")]
122    Decode(#[from] decode::Error),
123    #[error("utf8 error: {0}")]
124    Utf8(#[from] std::string::FromUtf8Error),
125    #[error("str utf8 error: {0}")]
126    StrUtf8(#[from] std::str::Utf8Error),
127}
128
129impl From<StorageError> for DwbaseError {
130    fn from(err: StorageError) -> Self {
131        DwbaseError::Storage(err.to_string())
132    }
133}
134
135impl SledStorage {
136    pub fn open(config: SledConfig, key_provider: Arc<dyn KeyProvider>) -> Result<Self> {
137        if config.encryption_enabled && config.key_id.is_none() {
138            return Err(DwbaseError::InvalidInput(
139                "encryption enabled but key_id missing".into(),
140            ));
141        }
142        let db = sled::open(&config.path).map_err(StorageError::from)?;
143        let storage = Self {
144            db,
145            flush_on_write: config.flush_on_write,
146            encryption_enabled: config.encryption_enabled,
147            key_id: config.key_id.clone(),
148            key_provider,
149        };
150        storage.repair_logs()?;
151        // Best-effort index build to avoid legacy scans.
152        let _ = storage.rebuild_index();
153        Ok(storage)
154    }
155
156    pub fn open_with_env(config: SledConfig) -> Result<Self> {
157        Self::open(config, Arc::new(EnvKeyProvider))
158    }
159
160    fn key_from_bytes(bytes: Vec<u8>) -> Result<[u8; 32]> {
161        if bytes.len() != 32 {
162            return Err(DwbaseError::InvalidInput(
163                "encryption key must be 32 bytes (AES-256-GCM)".into(),
164            ));
165        }
166        let mut key = [0u8; 32];
167        key.copy_from_slice(&bytes);
168        Ok(key)
169    }
170
171    fn derive_data_key(master: [u8; 32], key_id: &str, nonce: &[u8]) -> aes_gcm::Key<Aes256Gcm> {
172        let mut hasher = Sha256::new();
173        hasher.update(master);
174        hasher.update(key_id.as_bytes());
175        hasher.update(nonce);
176        let derived = hasher.finalize();
177        let mut key = [0u8; 32];
178        key.copy_from_slice(&derived);
179        *aes_gcm::Key::<Aes256Gcm>::from_slice(&key)
180    }
181
182    fn encrypt_bytes(&self, plain: &[u8]) -> Result<Vec<u8>> {
183        let key_id = self
184            .key_id
185            .as_ref()
186            .ok_or_else(|| DwbaseError::InvalidInput("key_id missing".into()))?;
187        let key_bytes = self.key_provider.key_bytes(key_id)?;
188        let master = Self::key_from_bytes(key_bytes)?;
189        let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
190        let mut nonce_bytes = [0u8; 12];
191        nonce_bytes.copy_from_slice(nonce.as_slice());
192        let data_key = Self::derive_data_key(master, key_id, &nonce_bytes);
193        let cipher = Aes256Gcm::new(&data_key)
194            .encrypt(Nonce::from_slice(&nonce_bytes), plain)
195            .map_err(|e| DwbaseError::Storage(e.to_string()))?;
196        let blob = EncryptedBlob {
197            key_id: key_id.clone(),
198            nonce: nonce_bytes,
199            cipher,
200        };
201        let mut out = ENC_MAGIC.to_vec();
202        let blob_bytes = rmp_serde::to_vec(&blob).map_err(StorageError::from)?;
203        out.extend(blob_bytes);
204        Ok(out)
205    }
206
207    fn decrypt_bytes(&self, bytes: &[u8]) -> Result<Vec<u8>> {
208        let blob: EncryptedBlob = rmp_serde::from_slice(bytes).map_err(StorageError::from)?;
209        let key_bytes = self.key_provider.key_bytes(&blob.key_id)?;
210        let master = Self::key_from_bytes(key_bytes)?;
211        let data_key = Self::derive_data_key(master, &blob.key_id, &blob.nonce);
212        Aes256Gcm::new(&data_key)
213            .decrypt(Nonce::from_slice(&blob.nonce), blob.cipher.as_ref())
214            .map_err(|e| DwbaseError::Storage(e.to_string()))
215    }
216
217    fn atom_key(world: &WorldKey, atom_id: &AtomId) -> Vec<u8> {
218        format!("world/{}/atoms/{}", world.0, atom_id.0).into_bytes()
219    }
220
221    fn log_prefix(world: &WorldKey) -> Vec<u8> {
222        format!("world/{}/log/", world.0).into_bytes()
223    }
224
225    fn log_key(world: &WorldKey, seq: u64) -> Vec<u8> {
226        format!(
227            "world/{}/log/{:0width$}",
228            world.0,
229            seq,
230            width = LOG_SEQ_WIDTH
231        )
232        .into_bytes()
233    }
234
235    fn next_seq(&self, world: &WorldKey) -> Result<u64> {
236        let prefix = Self::log_prefix(world);
237        let mut iter = self.db.scan_prefix(prefix);
238        let last = iter
239            .next_back()
240            .and_then(|res| res.ok())
241            .and_then(|(key, _)| Self::seq_from_log_key(&key));
242        Ok(last.map_or(0, |n| n + 1))
243    }
244
245    fn seq_from_log_key(key: &IVec) -> Option<u64> {
246        let s = std::str::from_utf8(key.as_ref()).ok()?;
247        let seq_part = s.rsplit('/').next()?;
248        seq_part.parse().ok()
249    }
250
251    fn decode_frame(&self, bytes: &[u8]) -> Result<LogFrame> {
252        rmp_serde::from_slice(bytes)
253            .map_err(StorageError::from)
254            .map_err(Into::into)
255    }
256
257    fn repair_logs(&self) -> Result<()> {
258        let mut worlds = std::collections::HashSet::new();
259        for entry in self.db.scan_prefix("world/") {
260            let (key, _) = entry.map_err(StorageError::from)?;
261            if let Some(world) = Self::world_from_key(&key) {
262                worlds.insert(world);
263            }
264        }
265        for world in worlds {
266            self.repair_world_log(&world)?;
267        }
268        Ok(())
269    }
270
271    fn world_from_key(key: &IVec) -> Option<WorldKey> {
272        let s = std::str::from_utf8(key.as_ref()).ok()?;
273        let mut parts = s.split('/');
274        let first = parts.next()?;
275        let world = parts.next()?;
276        if first == "world" {
277            Some(WorldKey(world.to_string()))
278        } else {
279            None
280        }
281    }
282
283    fn repair_world_log(&self, world: &WorldKey) -> Result<()> {
284        let prefix = Self::log_prefix(world);
285        let mut _last_good: Option<u64> = None;
286        let mut bad_seq: Option<u64> = None;
287        for entry in self.db.scan_prefix(prefix.clone()) {
288            let (key, val) = entry.map_err(StorageError::from)?;
289            let seq = match Self::seq_from_log_key(&key) {
290                Some(s) => s,
291                None => continue,
292            };
293            let frame = match self.decode_frame(val.as_ref()) {
294                Ok(f) => f,
295                Err(_) => {
296                    bad_seq = Some(seq);
297                    break;
298                }
299            };
300            let atom_bytes = match self.db.get(Self::atom_key(world, &frame.atom_id)) {
301                Ok(Some(b)) => b,
302                _ => {
303                    bad_seq = Some(seq);
304                    break;
305                }
306            };
307            let checksum = Self::checksum(atom_bytes.as_ref());
308            if checksum != frame.checksum {
309                bad_seq = Some(seq);
310                break;
311            }
312            _last_good = Some(seq);
313        }
314
315        if let Some(bad) = bad_seq {
316            let mut to_delete = Vec::new();
317            for entry in self.db.scan_prefix(prefix.clone()) {
318                let (key, _) = entry.map_err(StorageError::from)?;
319                if let Some(seq) = Self::seq_from_log_key(&key) {
320                    if seq >= bad {
321                        to_delete.push(key);
322                    }
323                }
324            }
325            for key in to_delete {
326                let _ = self.db.remove(key);
327            }
328            eprintln!("repair: truncated log for {} at seq {}", world.0, bad);
329        }
330        Ok(())
331    }
332
333    fn encode_atom(&self, atom: &Atom) -> Result<Vec<u8>> {
334        let plain = rmp_serde::to_vec(atom).map_err(StorageError::from)?;
335        if self.encryption_enabled {
336            self.encrypt_bytes(&plain)
337        } else {
338            Ok(plain)
339        }
340    }
341
342    fn decode_atom(&self, bytes: &[u8]) -> Result<Atom> {
343        if bytes.starts_with(ENC_MAGIC) {
344            let decrypted = self.decrypt_bytes(&bytes[ENC_MAGIC.len()..])?;
345            return rmp_serde::from_slice(&decrypted)
346                .map_err(StorageError::from)
347                .map_err(Into::into);
348        }
349        rmp_serde::from_slice(bytes)
350            .map_err(StorageError::from)
351            .map_err(Into::into)
352    }
353
354    fn flush_if_needed(&self) -> Result<()> {
355        if self.flush_on_write {
356            self.db.flush().map_err(StorageError::from)?;
357        }
358        Ok(())
359    }
360
361    fn checksum(bytes: &[u8]) -> u32 {
362        let mut h = Crc32::new();
363        h.update(bytes);
364        h.finalize()
365    }
366
367    /// Append a batch of atoms to the same world, returning their ids.
368    pub fn append_atoms(&self, world: &WorldKey, atoms: Vec<Atom>) -> Result<Vec<AtomId>> {
369        let mut seq = self.next_seq(world)?;
370        let mut ids = Vec::with_capacity(atoms.len());
371        for atom in atoms {
372            let atom_world = atom.world().clone();
373            if &atom_world != world {
374                return Err(DwbaseError::InvalidInput(format!(
375                    "atom world {} does not match target world {}",
376                    atom_world.0, world.0
377                )));
378            }
379            let id = atom.id().clone();
380            let bytes = self.encode_atom(&atom)?;
381            let frame = LogFrame {
382                atom_id: id.clone(),
383                checksum: Self::checksum(&bytes),
384                len: bytes.len() as u64,
385            };
386            let frame_bytes = rmp_serde::to_vec(&frame).map_err(StorageError::from)?;
387            self.db
388                .insert(Self::atom_key(world, &id), bytes)
389                .map_err(StorageError::from)?;
390            self.db
391                .insert(Self::log_key(world, seq), frame_bytes)
392                .map_err(StorageError::from)?;
393            self.record_index(world, &id, seq)?;
394            seq += 1;
395            ids.push(id);
396        }
397        self.flush_if_needed()?;
398        Ok(ids)
399    }
400
401    fn atom_from_store(&self, id: &AtomId, world: &WorldKey) -> Result<Option<Atom>> {
402        let key = Self::atom_key(world, id);
403        let bytes = match self.db.get(key).map_err(StorageError::from)? {
404            Some(b) => b,
405            None => return Ok(None),
406        };
407        self.decode_atom(bytes.as_ref()).map(Some)
408    }
409
410    fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
411        if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
412            return false;
413        }
414        if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
415            return false;
416        }
417        if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
418            return false;
419        }
420        if let Some(since) = &filter.since {
421            if atom.timestamp().0 < since.0 {
422                return false;
423            }
424        }
425        if let Some(until) = &filter.until {
426            if atom.timestamp().0 > until.0 {
427                return false;
428            }
429        }
430        true
431    }
432
433    #[cfg(test)]
434    fn corrupt_log_entry(&self, world: &WorldKey, seq: u64, bytes: &[u8]) {
435        let _ = self.db.insert(Self::log_key(world, seq), bytes);
436    }
437
438    fn atom_index_key(id: &AtomId) -> Vec<u8> {
439        format!("idx/atom/{}", id.0).into_bytes()
440    }
441
442    fn index_entry(world: &WorldKey, seq: u64) -> Result<Vec<u8>> {
443        let entry = AtomIndexEntry {
444            world: world.clone(),
445            seq,
446        };
447        rmp_serde::to_vec(&entry)
448            .map_err(StorageError::from)
449            .map_err(Into::into)
450    }
451
452    fn decode_index(bytes: &[u8]) -> Result<AtomIndexEntry> {
453        rmp_serde::from_slice(bytes)
454            .map_err(StorageError::from)
455            .map_err(Into::into)
456    }
457
458    fn record_index(&self, world: &WorldKey, id: &AtomId, seq: u64) -> Result<()> {
459        let key = Self::atom_index_key(id);
460        let val = Self::index_entry(world, seq)?;
461        self.db.insert(key, val).map_err(StorageError::from)?;
462        Ok(())
463    }
464
465    fn clear_index(&self) -> Result<()> {
466        let mut to_delete = Vec::new();
467        for entry in self.db.scan_prefix("idx/atom/") {
468            let (k, _) = entry.map_err(StorageError::from)?;
469            to_delete.push(k);
470        }
471        for k in to_delete {
472            let _ = self.db.remove(k);
473        }
474        Ok(())
475    }
476
477    /// Rebuild the atom id index from existing worlds/logs; returns entries written.
478    pub fn rebuild_index(&self) -> Result<u64> {
479        self.clear_index()?;
480        let mut written = 0u64;
481        for world in self.worlds()? {
482            let prefix = Self::log_prefix(&world);
483            for entry in self.db.scan_prefix(prefix) {
484                let (key, val) = entry.map_err(StorageError::from)?;
485                let seq = match Self::seq_from_log_key(&key) {
486                    Some(s) => s,
487                    None => continue,
488                };
489                let frame = match self.decode_frame(val.as_ref()) {
490                    Ok(f) => f,
491                    Err(_) => continue,
492                };
493                self.record_index(&world, &frame.atom_id, seq)?;
494                written += 1;
495            }
496        }
497        Ok(written)
498    }
499}
500
501impl StorageEngine for SledStorage {
502    fn append(&self, atom: Atom) -> Result<()> {
503        let world = atom.world().clone();
504        self.append_atoms(&world, vec![atom])?;
505        Ok(())
506    }
507
508    fn get_by_ids(&self, ids: &[AtomId]) -> Result<Vec<Atom>> {
509        let mut out = Vec::with_capacity(ids.len());
510        for id in ids {
511            let mut found = None;
512            if let Some(idx_bytes) = self
513                .db
514                .get(Self::atom_index_key(id))
515                .map_err(StorageError::from)?
516            {
517                if let Ok(entry) = Self::decode_index(idx_bytes.as_ref()) {
518                    if let Some(atom) = self.atom_from_store(id, &entry.world)? {
519                        found = Some(atom);
520                    }
521                }
522            }
523            if found.is_none() {
524                // World is encoded in atom entry path, but not in id; legacy fallback scans worlds.
525                for world_atom in self.db.scan_prefix(b"world/") {
526                    let (key, _) = world_atom.map_err(StorageError::from)?;
527                    if !key.ends_with(format!("/atoms/{}", id.0).as_bytes()) {
528                        continue;
529                    }
530                    let s = match std::str::from_utf8(key.as_ref()) {
531                        Ok(v) => v,
532                        Err(_) => continue,
533                    };
534                    let parts: Vec<_> = s.split('/').collect();
535                    let world = WorldKey(parts.get(1).unwrap_or(&"").to_string());
536                    found = self.atom_from_store(id, &world)?;
537                    if found.is_some() {
538                        break;
539                    }
540                }
541            }
542            if let Some(atom) = found {
543                out.push(atom);
544            }
545        }
546        Ok(out)
547    }
548
549    fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> Result<Vec<Atom>> {
550        if let Some(f_world) = &filter.world {
551            if f_world != world {
552                return Ok(Vec::new());
553            }
554        }
555        let prefix = Self::log_prefix(world);
556        let mut results = Vec::new();
557        for entry in self.db.scan_prefix(prefix) {
558            let (_, frame_bytes) = entry.map_err(StorageError::from)?;
559            let frame = match self.decode_frame(frame_bytes.as_ref()) {
560                Ok(f) => f,
561                Err(_) => continue,
562            };
563            let atom_id = frame.atom_id;
564            if let Some(atom) = self.atom_from_store(&atom_id, world)? {
565                if Self::matches_filter(&atom, filter) {
566                    results.push(atom);
567                    if let Some(limit) = filter.limit {
568                        if results.len() >= limit {
569                            break;
570                        }
571                    }
572                }
573            }
574        }
575        Ok(results)
576    }
577
578    fn stats(&self, world: &WorldKey) -> Result<StorageStats> {
579        let mut atom_count = 0usize;
580        let mut vector_count = 0usize;
581        let prefix = format!("world/{}/atoms/", world.0);
582        for entry in self.db.scan_prefix(prefix.as_bytes()) {
583            let (_, bytes) = entry.map_err(StorageError::from)?;
584            atom_count += 1;
585            if let Ok(atom) = self.decode_atom(bytes.as_ref()) {
586                if atom.vector().is_some() {
587                    vector_count += 1;
588                }
589            }
590        }
591        Ok(StorageStats {
592            atom_count,
593            vector_count,
594        })
595    }
596
597    fn list_ids_in_window(
598        &self,
599        world: &WorldKey,
600        window: &dwbase_engine::TimeWindow,
601    ) -> Result<Vec<AtomId>> {
602        let mut ids = Vec::new();
603        let prefix = Self::log_prefix(world);
604        for entry in self.db.scan_prefix(prefix) {
605            let (_key, val) = entry.map_err(StorageError::from)?;
606            let frame = match self.decode_frame(val.as_ref()) {
607                Ok(f) => f,
608                Err(_) => continue,
609            };
610            if let Some(atom) = self.atom_from_store(&frame.atom_id, world)? {
611                let ts = dwbase_core::Timestamp::new(atom.timestamp().0.clone());
612                if let Ok(dt) = time::OffsetDateTime::parse(
613                    ts.0.as_str(),
614                    &time::format_description::well_known::Rfc3339,
615                ) {
616                    let ms = (dt.unix_timestamp_nanos() / 1_000_000) as i64;
617                    if ms >= window.start_ms && ms <= window.end_ms {
618                        ids.push(frame.atom_id.clone());
619                    }
620                }
621            }
622        }
623        Ok(ids)
624    }
625
626    fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> Result<usize> {
627        let mut removed = 0usize;
628        for id in ids {
629            let _ = self.db.remove(Self::atom_key(world, id));
630            let _ = self.db.remove(Self::atom_index_key(id));
631        }
632        let prefix = Self::log_prefix(world);
633        for entry in self.db.scan_prefix(prefix.clone()) {
634            let (key, val) = entry.map_err(StorageError::from)?;
635            let frame = match self.decode_frame(val.as_ref()) {
636                Ok(f) => f,
637                Err(_) => continue,
638            };
639            if ids.contains(&frame.atom_id) {
640                let _ = self.db.remove(key);
641                removed += 1;
642            }
643        }
644        Ok(removed)
645    }
646
647    fn worlds(&self) -> Result<Vec<WorldKey>> {
648        let mut worlds = std::collections::HashSet::new();
649        for entry in self.db.scan_prefix("world/") {
650            let (key, _) = entry.map_err(StorageError::from)?;
651            if let Some(w) = Self::world_from_key(&key) {
652                worlds.insert(w);
653            }
654        }
655        Ok(worlds.into_iter().collect())
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662    use dwbase_core::{AtomKind, Importance, Timestamp, WorkerKey};
663    use std::sync::Arc;
664    use tempfile::TempDir;
665
666    fn sample_atom(id: &str, world: &str, ts: &str, importance: f32) -> Atom {
667        Atom::builder(
668            AtomId::new(id),
669            WorldKey::new(world),
670            WorkerKey::new("worker"),
671            AtomKind::Observation,
672            Timestamp::new(ts),
673            Importance::new(importance).unwrap(),
674            r#"{"hello":"world"}"#,
675        )
676        .add_flag("f1")
677        .add_label("l1")
678        .build()
679    }
680
681    fn new_store() -> (SledStorage, TempDir) {
682        let tmp = TempDir::new().unwrap();
683        let storage = SledStorage::open(
684            SledConfig::new(tmp.path()),
685            Arc::new(DummyKeyProvider::default()),
686        )
687        .unwrap();
688        (storage, tmp)
689    }
690
691    #[test]
692    fn append_and_replay_preserves_order() {
693        let (storage, _tmp) = new_store();
694        let world = WorldKey::new("w1");
695        let a1 = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
696        let a2 = sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6);
697
698        storage
699            .append_atoms(&world, vec![a1.clone(), a2.clone()])
700            .unwrap();
701
702        let replayed = storage
703            .scan(&world, &AtomFilter::default())
704            .expect("replay");
705        assert_eq!(replayed.len(), 2);
706        assert_eq!(replayed[0].id(), a1.id());
707        assert_eq!(replayed[1].id(), a2.id());
708    }
709
710    #[test]
711    fn get_by_ids_returns_atoms() {
712        let (storage, _tmp) = new_store();
713        let world = WorldKey::new("w1");
714        let a1 = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
715        let a2 = sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6);
716        storage
717            .append_atoms(&world, vec![a1.clone(), a2.clone()])
718            .unwrap();
719
720        let atoms = storage
721            .get_by_ids(&[AtomId::new("a2"), AtomId::new("a1")])
722            .unwrap();
723        assert_eq!(atoms.len(), 2);
724        assert_eq!(atoms[0].id(), a2.id());
725        assert_eq!(atoms[1].id(), a1.id());
726    }
727
728    #[test]
729    fn get_by_ids_prefers_indexed_world() {
730        let (storage, _tmp) = new_store();
731        let w1 = WorldKey::new("a");
732        let w2 = WorldKey::new("z");
733        let dup1 = sample_atom("dup", "a", "2024-01-01T00:00:00Z", 0.1);
734        let dup2 = sample_atom("dup", "z", "2024-01-01T00:00:01Z", 0.2);
735        storage.append_atoms(&w1, vec![dup1]).unwrap();
736        storage.append_atoms(&w2, vec![dup2.clone()]).unwrap();
737
738        let atoms = storage.get_by_ids(&[AtomId::new("dup")]).unwrap();
739        assert_eq!(atoms.len(), 1);
740        assert_eq!(atoms[0].world(), dup2.world());
741    }
742
743    #[test]
744    fn replay_with_limit_and_filters() {
745        let (storage, _tmp) = new_store();
746        let world = WorldKey::new("w1");
747        storage
748            .append_atoms(
749                &world,
750                vec![
751                    sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
752                    sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
753                    sample_atom("a3", "w1", "2024-01-01T00:00:02Z", 0.7),
754                ],
755            )
756            .unwrap();
757
758        let filter = AtomFilter {
759            world: None,
760            kinds: vec![AtomKind::Observation],
761            labels: vec!["l1".to_string()],
762            flags: vec!["f1".to_string()],
763            since: Some(Timestamp::new("2024-01-01T00:00:01Z")),
764            until: None,
765            limit: Some(1),
766        };
767        let replayed = storage.scan(&world, &filter).unwrap();
768        assert_eq!(replayed.len(), 1);
769        assert_eq!(replayed[0].id(), &AtomId::new("a2"));
770    }
771
772    #[test]
773    fn rebuild_index_populates_entries() {
774        let (storage, _tmp) = new_store();
775        let world = WorldKey::new("w1");
776        storage
777            .append_atoms(
778                &world,
779                vec![
780                    sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
781                    sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
782                ],
783            )
784            .unwrap();
785
786        storage.clear_index().unwrap();
787        assert!(storage
788            .db
789            .get(SledStorage::atom_index_key(&AtomId::new("a1")))
790            .unwrap()
791            .is_none());
792
793        let rebuilt = storage.rebuild_index().unwrap();
794        assert_eq!(rebuilt, 2);
795
796        let entry_bytes = storage
797            .db
798            .get(SledStorage::atom_index_key(&AtomId::new("a1")))
799            .unwrap()
800            .expect("rebuilt index entry");
801        let entry = SledStorage::decode_index(entry_bytes.as_ref()).unwrap();
802        assert_eq!(entry.world, world);
803        assert_eq!(entry.seq, 0);
804    }
805
806    #[test]
807    fn delete_atoms_clears_index_entries() {
808        let (storage, _tmp) = new_store();
809        let world = WorldKey::new("w1");
810        storage
811            .append_atoms(
812                &world,
813                vec![
814                    sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
815                    sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
816                ],
817            )
818            .unwrap();
819
820        let removed = storage.delete_atoms(&world, &[AtomId::new("a1")]).unwrap();
821        assert_eq!(removed, 1);
822        assert!(storage
823            .db
824            .get(SledStorage::atom_index_key(&AtomId::new("a1")))
825            .unwrap()
826            .is_none());
827        let atoms = storage.get_by_ids(&[AtomId::new("a1")]).unwrap();
828        assert!(atoms.is_empty());
829    }
830
831    #[test]
832    fn detects_and_truncates_corrupt_log_tail() {
833        let (storage, tmp) = new_store();
834        let world = WorldKey::new("w1");
835        storage
836            .append_atoms(
837                &world,
838                vec![
839                    sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
840                    sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
841                ],
842            )
843            .unwrap();
844
845        // Corrupt the last log frame.
846        storage.corrupt_log_entry(&world, 1, b"badframe");
847        drop(storage);
848
849        let storage2 = SledStorage::open(
850            SledConfig::new(tmp.path()),
851            Arc::new(DummyKeyProvider::default()),
852        )
853        .unwrap();
854        let replayed = storage2
855            .scan(&world, &AtomFilter::default())
856            .expect("scan after repair");
857        assert_eq!(replayed.len(), 1);
858        assert_eq!(replayed[0].id(), &AtomId::new("a1"));
859    }
860
861    #[test]
862    fn encrypted_roundtrip_and_on_disk_ciphertext() {
863        let tmp = TempDir::new().unwrap();
864        let mut cfg = SledConfig::new(tmp.path());
865        cfg.encryption_enabled = true;
866        cfg.key_id = Some("k1".into());
867        let provider = DummyKeyProvider::default().with_key("k1", [7u8; 32]);
868
869        let storage =
870            SledStorage::open(cfg.clone(), Arc::new(provider.clone())).expect("open encrypted");
871        let world = WorldKey::new("w1");
872        let atom = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
873        storage.append_atoms(&world, vec![atom.clone()]).unwrap();
874
875        // Validate ciphertext on disk.
876        let stored = storage
877            .db
878            .get(SledStorage::atom_key(&world, atom.id()))
879            .unwrap()
880            .expect("stored");
881        assert!(
882            stored.starts_with(ENC_MAGIC),
883            "encrypted payload should be prefixed with magic"
884        );
885        drop(storage);
886
887        // Re-open and read back with the same key.
888        let storage2 = SledStorage::open(cfg, Arc::new(provider)).expect("reopen");
889        let replayed = storage2.scan(&world, &AtomFilter::default()).unwrap();
890        assert_eq!(replayed.len(), 1);
891        assert_eq!(replayed[0].id(), atom.id());
892    }
893
894    #[test]
895    fn encrypted_read_with_wrong_key_fails() {
896        let tmp = TempDir::new().unwrap();
897        let mut cfg = SledConfig::new(tmp.path());
898        cfg.encryption_enabled = true;
899        cfg.key_id = Some("k1".into());
900        let provider = DummyKeyProvider::default().with_key("k1", [8u8; 32]);
901
902        let storage = SledStorage::open(cfg.clone(), Arc::new(provider)).expect("open encrypted");
903        let world = WorldKey::new("w1");
904        let atom = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
905        storage.append_atoms(&world, vec![atom]).unwrap();
906        drop(storage);
907
908        let wrong_provider = DummyKeyProvider::default().with_key("k1", [9u8; 32]);
909        let storage2 = SledStorage::open(cfg, Arc::new(wrong_provider)).expect("reopen");
910        let err = storage2.scan(&world, &AtomFilter::default()).unwrap_err();
911        assert!(
912            format!("{err:?}").contains("Storage"),
913            "expected storage error when decrypting with wrong key"
914        );
915    }
916}