1use std::{path::PathBuf, sync::Arc};
9
10use aes_gcm::{
11 aead::{Aead, AeadCore, KeyInit, OsRng},
12 Aes256Gcm, Nonce,
13};
14use crc32fast::Hasher as Crc32;
15use dwbase_core::{Atom, AtomId, WorldKey};
16use dwbase_engine::{AtomFilter, DwbaseError, Result, StorageEngine, StorageStats};
17use rmp_serde::{decode, encode};
18use serde::{Deserialize, Serialize};
19use sha2::{Digest, Sha256};
20use sled::IVec;
21use thiserror::Error;
22
23const LOG_SEQ_WIDTH: usize = 20;
24const ENC_MAGIC: &[u8] = b"ENC1";
25
26#[derive(Debug, Clone)]
27pub struct SledConfig {
28 pub path: PathBuf,
29 pub flush_on_write: bool,
30 pub encryption_enabled: bool,
31 pub key_id: Option<String>,
32}
33
34impl SledConfig {
35 pub fn new(path: impl Into<PathBuf>) -> Self {
36 Self {
37 path: path.into(),
38 flush_on_write: true,
39 encryption_enabled: false,
40 key_id: None,
41 }
42 }
43}
44
45pub trait KeyProvider: Send + Sync {
46 fn key_bytes(&self, key_id: &str) -> Result<Vec<u8>>;
47}
48
49#[derive(Debug, Clone, Default)]
50pub struct DummyKeyProvider {
51 keys: std::collections::HashMap<String, Vec<u8>>,
52}
53
54impl DummyKeyProvider {
55 pub fn with_key(mut self, key_id: impl Into<String>, key_bytes: [u8; 32]) -> Self {
56 self.keys.insert(key_id.into(), key_bytes.to_vec());
57 self
58 }
59}
60
61impl KeyProvider for DummyKeyProvider {
62 fn key_bytes(&self, key_id: &str) -> Result<Vec<u8>> {
63 self.keys
64 .get(key_id)
65 .cloned()
66 .ok_or_else(|| DwbaseError::InvalidInput(format!("missing key for id {key_id}")))
67 }
68}
69
70#[derive(Debug, Default)]
71pub struct EnvKeyProvider;
72
73impl KeyProvider for EnvKeyProvider {
74 fn key_bytes(&self, key_id: &str) -> Result<Vec<u8>> {
75 let env_key = format!(
76 "DWBASE_KEY_{}",
77 key_id.to_ascii_uppercase().replace('-', "_")
78 );
79 let raw = std::env::var(&env_key).map_err(|_| {
80 DwbaseError::InvalidInput(format!("env var {env_key} missing for key id {key_id}"))
81 })?;
82 hex::decode(raw.trim()).map_err(|e| DwbaseError::InvalidInput(e.to_string()))
83 }
84}
85
86pub struct SledStorage {
88 db: sled::Db,
89 flush_on_write: bool,
90 encryption_enabled: bool,
91 key_id: Option<String>,
92 key_provider: Arc<dyn KeyProvider>,
93}
94
95#[derive(Debug, Serialize, Deserialize)]
96struct AtomIndexEntry {
97 world: WorldKey,
98 seq: u64,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102struct LogFrame {
103 atom_id: AtomId,
104 checksum: u32,
105 len: u64,
106}
107
108#[derive(Debug, Serialize, Deserialize)]
109struct EncryptedBlob {
110 key_id: String,
111 nonce: [u8; 12],
112 cipher: Vec<u8>,
113}
114
115#[derive(Debug, Error)]
116enum StorageError {
117 #[error("sled error: {0}")]
118 Sled(#[from] sled::Error),
119 #[error("serialization error: {0}")]
120 Encode(#[from] encode::Error),
121 #[error("deserialization error: {0}")]
122 Decode(#[from] decode::Error),
123 #[error("utf8 error: {0}")]
124 Utf8(#[from] std::string::FromUtf8Error),
125 #[error("str utf8 error: {0}")]
126 StrUtf8(#[from] std::str::Utf8Error),
127}
128
129impl From<StorageError> for DwbaseError {
130 fn from(err: StorageError) -> Self {
131 DwbaseError::Storage(err.to_string())
132 }
133}
134
135impl SledStorage {
136 pub fn open(config: SledConfig, key_provider: Arc<dyn KeyProvider>) -> Result<Self> {
137 if config.encryption_enabled && config.key_id.is_none() {
138 return Err(DwbaseError::InvalidInput(
139 "encryption enabled but key_id missing".into(),
140 ));
141 }
142 let db = sled::open(&config.path).map_err(StorageError::from)?;
143 let storage = Self {
144 db,
145 flush_on_write: config.flush_on_write,
146 encryption_enabled: config.encryption_enabled,
147 key_id: config.key_id.clone(),
148 key_provider,
149 };
150 storage.repair_logs()?;
151 let _ = storage.rebuild_index();
153 Ok(storage)
154 }
155
156 pub fn open_with_env(config: SledConfig) -> Result<Self> {
157 Self::open(config, Arc::new(EnvKeyProvider))
158 }
159
160 fn key_from_bytes(bytes: Vec<u8>) -> Result<[u8; 32]> {
161 if bytes.len() != 32 {
162 return Err(DwbaseError::InvalidInput(
163 "encryption key must be 32 bytes (AES-256-GCM)".into(),
164 ));
165 }
166 let mut key = [0u8; 32];
167 key.copy_from_slice(&bytes);
168 Ok(key)
169 }
170
171 fn derive_data_key(master: [u8; 32], key_id: &str, nonce: &[u8]) -> aes_gcm::Key<Aes256Gcm> {
172 let mut hasher = Sha256::new();
173 hasher.update(master);
174 hasher.update(key_id.as_bytes());
175 hasher.update(nonce);
176 let derived = hasher.finalize();
177 let mut key = [0u8; 32];
178 key.copy_from_slice(&derived);
179 *aes_gcm::Key::<Aes256Gcm>::from_slice(&key)
180 }
181
182 fn encrypt_bytes(&self, plain: &[u8]) -> Result<Vec<u8>> {
183 let key_id = self
184 .key_id
185 .as_ref()
186 .ok_or_else(|| DwbaseError::InvalidInput("key_id missing".into()))?;
187 let key_bytes = self.key_provider.key_bytes(key_id)?;
188 let master = Self::key_from_bytes(key_bytes)?;
189 let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
190 let mut nonce_bytes = [0u8; 12];
191 nonce_bytes.copy_from_slice(nonce.as_slice());
192 let data_key = Self::derive_data_key(master, key_id, &nonce_bytes);
193 let cipher = Aes256Gcm::new(&data_key)
194 .encrypt(Nonce::from_slice(&nonce_bytes), plain)
195 .map_err(|e| DwbaseError::Storage(e.to_string()))?;
196 let blob = EncryptedBlob {
197 key_id: key_id.clone(),
198 nonce: nonce_bytes,
199 cipher,
200 };
201 let mut out = ENC_MAGIC.to_vec();
202 let blob_bytes = rmp_serde::to_vec(&blob).map_err(StorageError::from)?;
203 out.extend(blob_bytes);
204 Ok(out)
205 }
206
207 fn decrypt_bytes(&self, bytes: &[u8]) -> Result<Vec<u8>> {
208 let blob: EncryptedBlob = rmp_serde::from_slice(bytes).map_err(StorageError::from)?;
209 let key_bytes = self.key_provider.key_bytes(&blob.key_id)?;
210 let master = Self::key_from_bytes(key_bytes)?;
211 let data_key = Self::derive_data_key(master, &blob.key_id, &blob.nonce);
212 Aes256Gcm::new(&data_key)
213 .decrypt(Nonce::from_slice(&blob.nonce), blob.cipher.as_ref())
214 .map_err(|e| DwbaseError::Storage(e.to_string()))
215 }
216
217 fn atom_key(world: &WorldKey, atom_id: &AtomId) -> Vec<u8> {
218 format!("world/{}/atoms/{}", world.0, atom_id.0).into_bytes()
219 }
220
221 fn log_prefix(world: &WorldKey) -> Vec<u8> {
222 format!("world/{}/log/", world.0).into_bytes()
223 }
224
225 fn log_key(world: &WorldKey, seq: u64) -> Vec<u8> {
226 format!(
227 "world/{}/log/{:0width$}",
228 world.0,
229 seq,
230 width = LOG_SEQ_WIDTH
231 )
232 .into_bytes()
233 }
234
235 fn next_seq(&self, world: &WorldKey) -> Result<u64> {
236 let prefix = Self::log_prefix(world);
237 let mut iter = self.db.scan_prefix(prefix);
238 let last = iter
239 .next_back()
240 .and_then(|res| res.ok())
241 .and_then(|(key, _)| Self::seq_from_log_key(&key));
242 Ok(last.map_or(0, |n| n + 1))
243 }
244
245 fn seq_from_log_key(key: &IVec) -> Option<u64> {
246 let s = std::str::from_utf8(key.as_ref()).ok()?;
247 let seq_part = s.rsplit('/').next()?;
248 seq_part.parse().ok()
249 }
250
251 fn decode_frame(&self, bytes: &[u8]) -> Result<LogFrame> {
252 rmp_serde::from_slice(bytes)
253 .map_err(StorageError::from)
254 .map_err(Into::into)
255 }
256
257 fn repair_logs(&self) -> Result<()> {
258 let mut worlds = std::collections::HashSet::new();
259 for entry in self.db.scan_prefix("world/") {
260 let (key, _) = entry.map_err(StorageError::from)?;
261 if let Some(world) = Self::world_from_key(&key) {
262 worlds.insert(world);
263 }
264 }
265 for world in worlds {
266 self.repair_world_log(&world)?;
267 }
268 Ok(())
269 }
270
271 fn world_from_key(key: &IVec) -> Option<WorldKey> {
272 let s = std::str::from_utf8(key.as_ref()).ok()?;
273 let mut parts = s.split('/');
274 let first = parts.next()?;
275 let world = parts.next()?;
276 if first == "world" {
277 Some(WorldKey(world.to_string()))
278 } else {
279 None
280 }
281 }
282
283 fn repair_world_log(&self, world: &WorldKey) -> Result<()> {
284 let prefix = Self::log_prefix(world);
285 let mut _last_good: Option<u64> = None;
286 let mut bad_seq: Option<u64> = None;
287 for entry in self.db.scan_prefix(prefix.clone()) {
288 let (key, val) = entry.map_err(StorageError::from)?;
289 let seq = match Self::seq_from_log_key(&key) {
290 Some(s) => s,
291 None => continue,
292 };
293 let frame = match self.decode_frame(val.as_ref()) {
294 Ok(f) => f,
295 Err(_) => {
296 bad_seq = Some(seq);
297 break;
298 }
299 };
300 let atom_bytes = match self.db.get(Self::atom_key(world, &frame.atom_id)) {
301 Ok(Some(b)) => b,
302 _ => {
303 bad_seq = Some(seq);
304 break;
305 }
306 };
307 let checksum = Self::checksum(atom_bytes.as_ref());
308 if checksum != frame.checksum {
309 bad_seq = Some(seq);
310 break;
311 }
312 _last_good = Some(seq);
313 }
314
315 if let Some(bad) = bad_seq {
316 let mut to_delete = Vec::new();
317 for entry in self.db.scan_prefix(prefix.clone()) {
318 let (key, _) = entry.map_err(StorageError::from)?;
319 if let Some(seq) = Self::seq_from_log_key(&key) {
320 if seq >= bad {
321 to_delete.push(key);
322 }
323 }
324 }
325 for key in to_delete {
326 let _ = self.db.remove(key);
327 }
328 eprintln!("repair: truncated log for {} at seq {}", world.0, bad);
329 }
330 Ok(())
331 }
332
333 fn encode_atom(&self, atom: &Atom) -> Result<Vec<u8>> {
334 let plain = rmp_serde::to_vec(atom).map_err(StorageError::from)?;
335 if self.encryption_enabled {
336 self.encrypt_bytes(&plain)
337 } else {
338 Ok(plain)
339 }
340 }
341
342 fn decode_atom(&self, bytes: &[u8]) -> Result<Atom> {
343 if bytes.starts_with(ENC_MAGIC) {
344 let decrypted = self.decrypt_bytes(&bytes[ENC_MAGIC.len()..])?;
345 return rmp_serde::from_slice(&decrypted)
346 .map_err(StorageError::from)
347 .map_err(Into::into);
348 }
349 rmp_serde::from_slice(bytes)
350 .map_err(StorageError::from)
351 .map_err(Into::into)
352 }
353
354 fn flush_if_needed(&self) -> Result<()> {
355 if self.flush_on_write {
356 self.db.flush().map_err(StorageError::from)?;
357 }
358 Ok(())
359 }
360
361 fn checksum(bytes: &[u8]) -> u32 {
362 let mut h = Crc32::new();
363 h.update(bytes);
364 h.finalize()
365 }
366
367 pub fn append_atoms(&self, world: &WorldKey, atoms: Vec<Atom>) -> Result<Vec<AtomId>> {
369 let mut seq = self.next_seq(world)?;
370 let mut ids = Vec::with_capacity(atoms.len());
371 for atom in atoms {
372 let atom_world = atom.world().clone();
373 if &atom_world != world {
374 return Err(DwbaseError::InvalidInput(format!(
375 "atom world {} does not match target world {}",
376 atom_world.0, world.0
377 )));
378 }
379 let id = atom.id().clone();
380 let bytes = self.encode_atom(&atom)?;
381 let frame = LogFrame {
382 atom_id: id.clone(),
383 checksum: Self::checksum(&bytes),
384 len: bytes.len() as u64,
385 };
386 let frame_bytes = rmp_serde::to_vec(&frame).map_err(StorageError::from)?;
387 self.db
388 .insert(Self::atom_key(world, &id), bytes)
389 .map_err(StorageError::from)?;
390 self.db
391 .insert(Self::log_key(world, seq), frame_bytes)
392 .map_err(StorageError::from)?;
393 self.record_index(world, &id, seq)?;
394 seq += 1;
395 ids.push(id);
396 }
397 self.flush_if_needed()?;
398 Ok(ids)
399 }
400
401 fn atom_from_store(&self, id: &AtomId, world: &WorldKey) -> Result<Option<Atom>> {
402 let key = Self::atom_key(world, id);
403 let bytes = match self.db.get(key).map_err(StorageError::from)? {
404 Some(b) => b,
405 None => return Ok(None),
406 };
407 self.decode_atom(bytes.as_ref()).map(Some)
408 }
409
410 fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
411 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
412 return false;
413 }
414 if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
415 return false;
416 }
417 if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
418 return false;
419 }
420 if let Some(since) = &filter.since {
421 if atom.timestamp().0 < since.0 {
422 return false;
423 }
424 }
425 if let Some(until) = &filter.until {
426 if atom.timestamp().0 > until.0 {
427 return false;
428 }
429 }
430 true
431 }
432
433 #[cfg(test)]
434 fn corrupt_log_entry(&self, world: &WorldKey, seq: u64, bytes: &[u8]) {
435 let _ = self.db.insert(Self::log_key(world, seq), bytes);
436 }
437
438 fn atom_index_key(id: &AtomId) -> Vec<u8> {
439 format!("idx/atom/{}", id.0).into_bytes()
440 }
441
442 fn index_entry(world: &WorldKey, seq: u64) -> Result<Vec<u8>> {
443 let entry = AtomIndexEntry {
444 world: world.clone(),
445 seq,
446 };
447 rmp_serde::to_vec(&entry)
448 .map_err(StorageError::from)
449 .map_err(Into::into)
450 }
451
452 fn decode_index(bytes: &[u8]) -> Result<AtomIndexEntry> {
453 rmp_serde::from_slice(bytes)
454 .map_err(StorageError::from)
455 .map_err(Into::into)
456 }
457
458 fn record_index(&self, world: &WorldKey, id: &AtomId, seq: u64) -> Result<()> {
459 let key = Self::atom_index_key(id);
460 let val = Self::index_entry(world, seq)?;
461 self.db.insert(key, val).map_err(StorageError::from)?;
462 Ok(())
463 }
464
465 fn clear_index(&self) -> Result<()> {
466 let mut to_delete = Vec::new();
467 for entry in self.db.scan_prefix("idx/atom/") {
468 let (k, _) = entry.map_err(StorageError::from)?;
469 to_delete.push(k);
470 }
471 for k in to_delete {
472 let _ = self.db.remove(k);
473 }
474 Ok(())
475 }
476
477 pub fn rebuild_index(&self) -> Result<u64> {
479 self.clear_index()?;
480 let mut written = 0u64;
481 for world in self.worlds()? {
482 let prefix = Self::log_prefix(&world);
483 for entry in self.db.scan_prefix(prefix) {
484 let (key, val) = entry.map_err(StorageError::from)?;
485 let seq = match Self::seq_from_log_key(&key) {
486 Some(s) => s,
487 None => continue,
488 };
489 let frame = match self.decode_frame(val.as_ref()) {
490 Ok(f) => f,
491 Err(_) => continue,
492 };
493 self.record_index(&world, &frame.atom_id, seq)?;
494 written += 1;
495 }
496 }
497 Ok(written)
498 }
499}
500
501impl StorageEngine for SledStorage {
502 fn append(&self, atom: Atom) -> Result<()> {
503 let world = atom.world().clone();
504 self.append_atoms(&world, vec![atom])?;
505 Ok(())
506 }
507
508 fn get_by_ids(&self, ids: &[AtomId]) -> Result<Vec<Atom>> {
509 let mut out = Vec::with_capacity(ids.len());
510 for id in ids {
511 let mut found = None;
512 if let Some(idx_bytes) = self
513 .db
514 .get(Self::atom_index_key(id))
515 .map_err(StorageError::from)?
516 {
517 if let Ok(entry) = Self::decode_index(idx_bytes.as_ref()) {
518 if let Some(atom) = self.atom_from_store(id, &entry.world)? {
519 found = Some(atom);
520 }
521 }
522 }
523 if found.is_none() {
524 for world_atom in self.db.scan_prefix(b"world/") {
526 let (key, _) = world_atom.map_err(StorageError::from)?;
527 if !key.ends_with(format!("/atoms/{}", id.0).as_bytes()) {
528 continue;
529 }
530 let s = match std::str::from_utf8(key.as_ref()) {
531 Ok(v) => v,
532 Err(_) => continue,
533 };
534 let parts: Vec<_> = s.split('/').collect();
535 let world = WorldKey(parts.get(1).unwrap_or(&"").to_string());
536 found = self.atom_from_store(id, &world)?;
537 if found.is_some() {
538 break;
539 }
540 }
541 }
542 if let Some(atom) = found {
543 out.push(atom);
544 }
545 }
546 Ok(out)
547 }
548
549 fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> Result<Vec<Atom>> {
550 if let Some(f_world) = &filter.world {
551 if f_world != world {
552 return Ok(Vec::new());
553 }
554 }
555 let prefix = Self::log_prefix(world);
556 let mut results = Vec::new();
557 for entry in self.db.scan_prefix(prefix) {
558 let (_, frame_bytes) = entry.map_err(StorageError::from)?;
559 let frame = match self.decode_frame(frame_bytes.as_ref()) {
560 Ok(f) => f,
561 Err(_) => continue,
562 };
563 let atom_id = frame.atom_id;
564 if let Some(atom) = self.atom_from_store(&atom_id, world)? {
565 if Self::matches_filter(&atom, filter) {
566 results.push(atom);
567 if let Some(limit) = filter.limit {
568 if results.len() >= limit {
569 break;
570 }
571 }
572 }
573 }
574 }
575 Ok(results)
576 }
577
578 fn stats(&self, world: &WorldKey) -> Result<StorageStats> {
579 let mut atom_count = 0usize;
580 let mut vector_count = 0usize;
581 let prefix = format!("world/{}/atoms/", world.0);
582 for entry in self.db.scan_prefix(prefix.as_bytes()) {
583 let (_, bytes) = entry.map_err(StorageError::from)?;
584 atom_count += 1;
585 if let Ok(atom) = self.decode_atom(bytes.as_ref()) {
586 if atom.vector().is_some() {
587 vector_count += 1;
588 }
589 }
590 }
591 Ok(StorageStats {
592 atom_count,
593 vector_count,
594 })
595 }
596
597 fn list_ids_in_window(
598 &self,
599 world: &WorldKey,
600 window: &dwbase_engine::TimeWindow,
601 ) -> Result<Vec<AtomId>> {
602 let mut ids = Vec::new();
603 let prefix = Self::log_prefix(world);
604 for entry in self.db.scan_prefix(prefix) {
605 let (_key, val) = entry.map_err(StorageError::from)?;
606 let frame = match self.decode_frame(val.as_ref()) {
607 Ok(f) => f,
608 Err(_) => continue,
609 };
610 if let Some(atom) = self.atom_from_store(&frame.atom_id, world)? {
611 let ts = dwbase_core::Timestamp::new(atom.timestamp().0.clone());
612 if let Ok(dt) = time::OffsetDateTime::parse(
613 ts.0.as_str(),
614 &time::format_description::well_known::Rfc3339,
615 ) {
616 let ms = (dt.unix_timestamp_nanos() / 1_000_000) as i64;
617 if ms >= window.start_ms && ms <= window.end_ms {
618 ids.push(frame.atom_id.clone());
619 }
620 }
621 }
622 }
623 Ok(ids)
624 }
625
626 fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> Result<usize> {
627 let mut removed = 0usize;
628 for id in ids {
629 let _ = self.db.remove(Self::atom_key(world, id));
630 let _ = self.db.remove(Self::atom_index_key(id));
631 }
632 let prefix = Self::log_prefix(world);
633 for entry in self.db.scan_prefix(prefix.clone()) {
634 let (key, val) = entry.map_err(StorageError::from)?;
635 let frame = match self.decode_frame(val.as_ref()) {
636 Ok(f) => f,
637 Err(_) => continue,
638 };
639 if ids.contains(&frame.atom_id) {
640 let _ = self.db.remove(key);
641 removed += 1;
642 }
643 }
644 Ok(removed)
645 }
646
647 fn worlds(&self) -> Result<Vec<WorldKey>> {
648 let mut worlds = std::collections::HashSet::new();
649 for entry in self.db.scan_prefix("world/") {
650 let (key, _) = entry.map_err(StorageError::from)?;
651 if let Some(w) = Self::world_from_key(&key) {
652 worlds.insert(w);
653 }
654 }
655 Ok(worlds.into_iter().collect())
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662 use dwbase_core::{AtomKind, Importance, Timestamp, WorkerKey};
663 use std::sync::Arc;
664 use tempfile::TempDir;
665
666 fn sample_atom(id: &str, world: &str, ts: &str, importance: f32) -> Atom {
667 Atom::builder(
668 AtomId::new(id),
669 WorldKey::new(world),
670 WorkerKey::new("worker"),
671 AtomKind::Observation,
672 Timestamp::new(ts),
673 Importance::new(importance).unwrap(),
674 r#"{"hello":"world"}"#,
675 )
676 .add_flag("f1")
677 .add_label("l1")
678 .build()
679 }
680
681 fn new_store() -> (SledStorage, TempDir) {
682 let tmp = TempDir::new().unwrap();
683 let storage = SledStorage::open(
684 SledConfig::new(tmp.path()),
685 Arc::new(DummyKeyProvider::default()),
686 )
687 .unwrap();
688 (storage, tmp)
689 }
690
691 #[test]
692 fn append_and_replay_preserves_order() {
693 let (storage, _tmp) = new_store();
694 let world = WorldKey::new("w1");
695 let a1 = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
696 let a2 = sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6);
697
698 storage
699 .append_atoms(&world, vec![a1.clone(), a2.clone()])
700 .unwrap();
701
702 let replayed = storage
703 .scan(&world, &AtomFilter::default())
704 .expect("replay");
705 assert_eq!(replayed.len(), 2);
706 assert_eq!(replayed[0].id(), a1.id());
707 assert_eq!(replayed[1].id(), a2.id());
708 }
709
710 #[test]
711 fn get_by_ids_returns_atoms() {
712 let (storage, _tmp) = new_store();
713 let world = WorldKey::new("w1");
714 let a1 = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
715 let a2 = sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6);
716 storage
717 .append_atoms(&world, vec![a1.clone(), a2.clone()])
718 .unwrap();
719
720 let atoms = storage
721 .get_by_ids(&[AtomId::new("a2"), AtomId::new("a1")])
722 .unwrap();
723 assert_eq!(atoms.len(), 2);
724 assert_eq!(atoms[0].id(), a2.id());
725 assert_eq!(atoms[1].id(), a1.id());
726 }
727
728 #[test]
729 fn get_by_ids_prefers_indexed_world() {
730 let (storage, _tmp) = new_store();
731 let w1 = WorldKey::new("a");
732 let w2 = WorldKey::new("z");
733 let dup1 = sample_atom("dup", "a", "2024-01-01T00:00:00Z", 0.1);
734 let dup2 = sample_atom("dup", "z", "2024-01-01T00:00:01Z", 0.2);
735 storage.append_atoms(&w1, vec![dup1]).unwrap();
736 storage.append_atoms(&w2, vec![dup2.clone()]).unwrap();
737
738 let atoms = storage.get_by_ids(&[AtomId::new("dup")]).unwrap();
739 assert_eq!(atoms.len(), 1);
740 assert_eq!(atoms[0].world(), dup2.world());
741 }
742
743 #[test]
744 fn replay_with_limit_and_filters() {
745 let (storage, _tmp) = new_store();
746 let world = WorldKey::new("w1");
747 storage
748 .append_atoms(
749 &world,
750 vec![
751 sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
752 sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
753 sample_atom("a3", "w1", "2024-01-01T00:00:02Z", 0.7),
754 ],
755 )
756 .unwrap();
757
758 let filter = AtomFilter {
759 world: None,
760 kinds: vec![AtomKind::Observation],
761 labels: vec!["l1".to_string()],
762 flags: vec!["f1".to_string()],
763 since: Some(Timestamp::new("2024-01-01T00:00:01Z")),
764 until: None,
765 limit: Some(1),
766 };
767 let replayed = storage.scan(&world, &filter).unwrap();
768 assert_eq!(replayed.len(), 1);
769 assert_eq!(replayed[0].id(), &AtomId::new("a2"));
770 }
771
772 #[test]
773 fn rebuild_index_populates_entries() {
774 let (storage, _tmp) = new_store();
775 let world = WorldKey::new("w1");
776 storage
777 .append_atoms(
778 &world,
779 vec![
780 sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
781 sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
782 ],
783 )
784 .unwrap();
785
786 storage.clear_index().unwrap();
787 assert!(storage
788 .db
789 .get(SledStorage::atom_index_key(&AtomId::new("a1")))
790 .unwrap()
791 .is_none());
792
793 let rebuilt = storage.rebuild_index().unwrap();
794 assert_eq!(rebuilt, 2);
795
796 let entry_bytes = storage
797 .db
798 .get(SledStorage::atom_index_key(&AtomId::new("a1")))
799 .unwrap()
800 .expect("rebuilt index entry");
801 let entry = SledStorage::decode_index(entry_bytes.as_ref()).unwrap();
802 assert_eq!(entry.world, world);
803 assert_eq!(entry.seq, 0);
804 }
805
806 #[test]
807 fn delete_atoms_clears_index_entries() {
808 let (storage, _tmp) = new_store();
809 let world = WorldKey::new("w1");
810 storage
811 .append_atoms(
812 &world,
813 vec![
814 sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
815 sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
816 ],
817 )
818 .unwrap();
819
820 let removed = storage.delete_atoms(&world, &[AtomId::new("a1")]).unwrap();
821 assert_eq!(removed, 1);
822 assert!(storage
823 .db
824 .get(SledStorage::atom_index_key(&AtomId::new("a1")))
825 .unwrap()
826 .is_none());
827 let atoms = storage.get_by_ids(&[AtomId::new("a1")]).unwrap();
828 assert!(atoms.is_empty());
829 }
830
831 #[test]
832 fn detects_and_truncates_corrupt_log_tail() {
833 let (storage, tmp) = new_store();
834 let world = WorldKey::new("w1");
835 storage
836 .append_atoms(
837 &world,
838 vec![
839 sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4),
840 sample_atom("a2", "w1", "2024-01-01T00:00:01Z", 0.6),
841 ],
842 )
843 .unwrap();
844
845 storage.corrupt_log_entry(&world, 1, b"badframe");
847 drop(storage);
848
849 let storage2 = SledStorage::open(
850 SledConfig::new(tmp.path()),
851 Arc::new(DummyKeyProvider::default()),
852 )
853 .unwrap();
854 let replayed = storage2
855 .scan(&world, &AtomFilter::default())
856 .expect("scan after repair");
857 assert_eq!(replayed.len(), 1);
858 assert_eq!(replayed[0].id(), &AtomId::new("a1"));
859 }
860
861 #[test]
862 fn encrypted_roundtrip_and_on_disk_ciphertext() {
863 let tmp = TempDir::new().unwrap();
864 let mut cfg = SledConfig::new(tmp.path());
865 cfg.encryption_enabled = true;
866 cfg.key_id = Some("k1".into());
867 let provider = DummyKeyProvider::default().with_key("k1", [7u8; 32]);
868
869 let storage =
870 SledStorage::open(cfg.clone(), Arc::new(provider.clone())).expect("open encrypted");
871 let world = WorldKey::new("w1");
872 let atom = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
873 storage.append_atoms(&world, vec![atom.clone()]).unwrap();
874
875 let stored = storage
877 .db
878 .get(SledStorage::atom_key(&world, atom.id()))
879 .unwrap()
880 .expect("stored");
881 assert!(
882 stored.starts_with(ENC_MAGIC),
883 "encrypted payload should be prefixed with magic"
884 );
885 drop(storage);
886
887 let storage2 = SledStorage::open(cfg, Arc::new(provider)).expect("reopen");
889 let replayed = storage2.scan(&world, &AtomFilter::default()).unwrap();
890 assert_eq!(replayed.len(), 1);
891 assert_eq!(replayed[0].id(), atom.id());
892 }
893
894 #[test]
895 fn encrypted_read_with_wrong_key_fails() {
896 let tmp = TempDir::new().unwrap();
897 let mut cfg = SledConfig::new(tmp.path());
898 cfg.encryption_enabled = true;
899 cfg.key_id = Some("k1".into());
900 let provider = DummyKeyProvider::default().with_key("k1", [8u8; 32]);
901
902 let storage = SledStorage::open(cfg.clone(), Arc::new(provider)).expect("open encrypted");
903 let world = WorldKey::new("w1");
904 let atom = sample_atom("a1", "w1", "2024-01-01T00:00:00Z", 0.4);
905 storage.append_atoms(&world, vec![atom]).unwrap();
906 drop(storage);
907
908 let wrong_provider = DummyKeyProvider::default().with_key("k1", [9u8; 32]);
909 let storage2 = SledStorage::open(cfg, Arc::new(wrong_provider)).expect("reopen");
910 let err = storage2.scan(&world, &AtomFilter::default()).unwrap_err();
911 assert!(
912 format!("{err:?}").contains("Storage"),
913 "expected storage error when decrypting with wrong key"
914 );
915 }
916}