Skip to main content

durable_streams_server/storage/
acid.rs

1//! Crash-resilient redb-backed storage with sharded databases.
2//!
3//! This backend stores stream metadata and messages in redb tables and uses a
4//! stable hash-based shard layout so a stream always maps to the same database
5//! file after restarts.
6
7use super::{
8    CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
9    ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
10};
11use crate::config::AcidBackend;
12use crate::protocol::error::{Error, Result};
13use crate::protocol::offset::Offset;
14use crate::protocol::producer::ProducerHeaders;
15use bytes::Bytes;
16use chrono::{DateTime, Utc};
17use redb::backends::InMemoryBackend;
18use redb::{
19    CommitError, Database, DatabaseError, Durability, ReadableDatabase, ReadableTable,
20    SetDurabilityError, StorageError as RedbStorageError, Table, TableDefinition, TableError,
21    TransactionError,
22};
23use seahash::hash;
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::fs;
27use std::path::{Path, PathBuf};
28use std::sync::RwLock;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::Duration;
31use tokio::sync::broadcast;
32use tracing::warn;
33
34const STREAMS: TableDefinition<&str, &[u8]> = TableDefinition::new("streams");
35const MESSAGES: TableDefinition<(&str, u64, u64), &[u8]> = TableDefinition::new("messages");
36
37const LAYOUT_FORMAT_VERSION: u32 = 1;
38const HASH_POLICY: &str = "seahash-v1";
39/// Retry backoff for startup-only operations (shard database open).
40/// Not used on the request path — transient errors there propagate as 503.
41const STARTUP_RETRY_BACKOFF_MS: [u64; 3] = [10, 25, 50];
42
43#[derive(Debug, Serialize, Deserialize)]
44struct LayoutManifest {
45    format_version: u32,
46    shard_count: usize,
47    hash_policy: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51struct StoredStreamMeta {
52    config: StreamConfig,
53    closed: bool,
54    next_read_seq: u64,
55    next_byte_offset: u64,
56    total_bytes: u64,
57    created_at: DateTime<Utc>,
58    last_seq: Option<String>,
59    producers: HashMap<String, ProducerState>,
60}
61
62#[derive(Debug)]
63struct AcidShard {
64    db: Database,
65}
66
67#[allow(clippy::module_name_repetitions)]
68pub struct AcidStorage {
69    shards: Vec<AcidShard>,
70    shard_count: usize,
71    total_bytes: AtomicU64,
72    max_total_bytes: u64,
73    max_stream_bytes: u64,
74    notifiers: RwLock<HashMap<String, broadcast::Sender<()>>>,
75}
76
77impl AcidStorage {
78    /// Create or reopen an ACID storage root.
79    ///
80    /// When `backend` is [`AcidBackend::File`] the backend stores its files
81    /// beneath `<root>/acid`, validates a layout manifest, and rebuilds
82    /// aggregate state from disk before serving requests.
83    ///
84    /// When `backend` is [`AcidBackend::InMemory`] the `root_dir` is ignored
85    /// and each shard uses a redb [`InMemoryBackend`]. ACID transaction
86    /// semantics still apply but all data is lost on shutdown.
87    ///
88    /// # Errors
89    ///
90    /// Returns `Error::Storage` if storage layout validation fails, shard
91    /// databases cannot be opened, or on-disk recovery cannot complete.
92    pub fn new(
93        root_dir: impl Into<PathBuf>,
94        shard_count: usize,
95        max_total_bytes: u64,
96        max_stream_bytes: u64,
97        backend: AcidBackend,
98    ) -> Result<Self> {
99        Self::validate_shard_count(shard_count)?;
100
101        let shards = match backend {
102            AcidBackend::File => Self::create_file_shards(&root_dir.into(), shard_count)?,
103            AcidBackend::InMemory => Self::create_in_memory_shards(shard_count)?,
104        };
105
106        let storage = Self {
107            shards,
108            shard_count,
109            total_bytes: AtomicU64::new(0),
110            max_total_bytes,
111            max_stream_bytes,
112            notifiers: RwLock::new(HashMap::new()),
113        };
114
115        // File backend: rebuilds aggregate state from persisted data.
116        // In-memory backend: databases start empty so this returns 0.
117        let total_bytes = storage.rebuild_state_from_disk()?;
118        storage.total_bytes.store(total_bytes, Ordering::Release);
119
120        Ok(storage)
121    }
122
123    fn create_file_shards(root_dir: &Path, shard_count: usize) -> Result<Vec<AcidShard>> {
124        let acid_dir = Self::acid_dir(root_dir);
125        fs::create_dir_all(&acid_dir).map_err(|e| {
126            Self::storage_err(
127                format!(
128                    "failed to create acid storage directory {}",
129                    acid_dir.display()
130                ),
131                e,
132            )
133        })?;
134
135        Self::load_or_create_layout(&acid_dir, shard_count)?;
136
137        let mut shards = Vec::with_capacity(shard_count);
138        for idx in 0..shard_count {
139            let shard_path = acid_dir.join(format!("shard_{idx:02x}.redb"));
140            let db = Self::open_shard_database(&shard_path)?;
141            Self::ensure_schema(&db)?;
142            shards.push(AcidShard { db });
143        }
144
145        Ok(shards)
146    }
147
148    fn create_in_memory_shards(shard_count: usize) -> Result<Vec<AcidShard>> {
149        let mut shards = Vec::with_capacity(shard_count);
150        for _ in 0..shard_count {
151            let db = Database::builder()
152                .create_with_backend(InMemoryBackend::new())
153                .map_err(|e| Self::storage_err("failed to create in-memory shard database", e))?;
154            Self::ensure_schema(&db)?;
155            shards.push(AcidShard { db });
156        }
157        Ok(shards)
158    }
159
160    /// Return the currently tracked total payload bytes across all streams.
161    #[must_use]
162    pub fn total_bytes(&self) -> u64 {
163        self.total_bytes.load(Ordering::Acquire)
164    }
165
166    fn validate_shard_count(shard_count: usize) -> Result<()> {
167        if !(1..=256).contains(&shard_count) {
168            return Err(Error::Storage(format!(
169                "acid shard count must be in range 1..=256, got {shard_count}"
170            )));
171        }
172        if !shard_count.is_power_of_two() {
173            return Err(Error::Storage(format!(
174                "acid shard count must be a power of two, got {shard_count}"
175            )));
176        }
177        Ok(())
178    }
179
180    fn storage_err<E: ClassifyError>(context: impl Into<String>, err: E) -> Error {
181        let context = context.into();
182        let detail = format!("{context}: {err}");
183        err.into_storage_error(context, detail)
184    }
185
186    fn classify_redb_storage_error(
187        context: String,
188        err: &RedbStorageError,
189        detail: String,
190    ) -> Error {
191        match err {
192            RedbStorageError::Io(io_err) => {
193                Error::classify_io_failure("acid", context, detail, io_err)
194            }
195            RedbStorageError::DatabaseClosed | RedbStorageError::PreviousIo => {
196                Error::storage_unavailable("acid", context, detail)
197            }
198            RedbStorageError::ValueTooLarge(_) => {
199                Error::storage_insufficient("acid", context, detail)
200            }
201            RedbStorageError::Corrupted(_) | RedbStorageError::LockPoisoned(_) => {
202                Error::Storage(detail)
203            }
204            _ => {
205                warn!(error = %err, "unhandled redb StorageError variant");
206                Error::Storage(detail)
207            }
208        }
209    }
210
211    /// Open a shard database with retry. This is startup-only code — the
212    /// `thread::sleep` backoff is acceptable here since it runs once per shard
213    /// during initialization, never on the request path.
214    fn open_shard_database(shard_path: &Path) -> Result<Database> {
215        let context = format!("failed to open shard database {}", shard_path.display());
216        let mut delays = STARTUP_RETRY_BACKOFF_MS.into_iter();
217
218        loop {
219            match Database::builder().create(shard_path) {
220                Ok(db) => return Ok(db),
221                Err(err) if Self::is_retryable_database_open(&err) => {
222                    if let Some(delay_ms) = delays.next() {
223                        std::thread::sleep(Duration::from_millis(delay_ms));
224                        continue;
225                    }
226                    return Err(Self::storage_err(context, err));
227                }
228                Err(err) => return Err(Self::storage_err(context, err)),
229            }
230        }
231    }
232
233    fn is_retryable_database_open(err: &DatabaseError) -> bool {
234        match err {
235            DatabaseError::DatabaseAlreadyOpen => true,
236            DatabaseError::Storage(RedbStorageError::Io(io_err)) => {
237                Error::is_retryable_io_error(io_err)
238            }
239            _ => false,
240        }
241    }
242
243    fn acid_dir(root_dir: &Path) -> PathBuf {
244        root_dir.join("acid")
245    }
246
247    fn layout_path(acid_dir: &Path) -> PathBuf {
248        acid_dir.join("layout.json")
249    }
250
251    fn load_or_create_layout(acid_dir: &Path, shard_count: usize) -> Result<()> {
252        let layout_path = Self::layout_path(acid_dir);
253        if layout_path.exists() {
254            let payload = fs::read(&layout_path).map_err(|e| {
255                Self::storage_err(
256                    format!("failed to read acid layout file {}", layout_path.display()),
257                    e,
258                )
259            })?;
260            let manifest: LayoutManifest = serde_json::from_slice(&payload).map_err(|e| {
261                Self::storage_err(
262                    format!("failed to parse acid layout file {}", layout_path.display()),
263                    e,
264                )
265            })?;
266
267            if manifest.format_version != LAYOUT_FORMAT_VERSION {
268                return Err(Error::Storage(format!(
269                    "acid layout mismatch: format_version={}, expected={}",
270                    manifest.format_version, LAYOUT_FORMAT_VERSION
271                )));
272            }
273            if manifest.shard_count != shard_count {
274                return Err(Error::Storage(format!(
275                    "acid layout mismatch: shard_count={}, expected={shard_count}",
276                    manifest.shard_count
277                )));
278            }
279            if manifest.hash_policy != HASH_POLICY {
280                return Err(Error::Storage(format!(
281                    "acid layout mismatch: hash_policy='{}', expected='{}'",
282                    manifest.hash_policy, HASH_POLICY
283                )));
284            }
285            return Ok(());
286        }
287
288        let manifest = LayoutManifest {
289            format_version: LAYOUT_FORMAT_VERSION,
290            shard_count,
291            hash_policy: HASH_POLICY.to_string(),
292        };
293        let payload = serde_json::to_vec_pretty(&manifest)
294            .map_err(|e| Self::storage_err("failed to serialize acid layout manifest", e))?;
295
296        let tmp_path = acid_dir.join("layout.json.tmp");
297        fs::write(&tmp_path, payload).map_err(|e| {
298            Self::storage_err(
299                format!("failed to write temp layout file {}", tmp_path.display()),
300                e,
301            )
302        })?;
303        fs::rename(&tmp_path, &layout_path).map_err(|e| {
304            Self::storage_err(
305                format!("failed to write layout file {}", layout_path.display()),
306                e,
307            )
308        })?;
309
310        Ok(())
311    }
312
313    #[must_use]
314    fn shard_index(&self, name: &str) -> usize {
315        let hash_u64 = hash(name.as_bytes());
316        let hash_usize = usize::try_from(hash_u64).unwrap_or_else(|_| {
317            let masked = hash_u64 & u64::from(u32::MAX);
318            usize::try_from(masked).expect("masked hash value must fit in usize")
319        });
320        hash_usize & (self.shard_count - 1)
321    }
322
323    fn shard(&self, name: &str) -> &AcidShard {
324        &self.shards[self.shard_index(name)]
325    }
326
327    fn reserve_total_bytes(&self, bytes: u64) -> Result<()> {
328        if bytes == 0 {
329            return Ok(());
330        }
331
332        if self
333            .total_bytes
334            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
335                current
336                    .checked_add(bytes)
337                    .filter(|next| *next <= self.max_total_bytes)
338            })
339            .is_err()
340        {
341            return Err(Error::MemoryLimitExceeded);
342        }
343        Ok(())
344    }
345
346    fn rollback_total_bytes(&self, bytes: u64) {
347        self.saturating_sub_total_bytes(bytes);
348    }
349
350    fn saturating_sub_total_bytes(&self, bytes: u64) {
351        if bytes == 0 {
352            return;
353        }
354
355        self.total_bytes
356            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
357                Some(current.saturating_sub(bytes))
358            })
359            .ok();
360    }
361
362    fn read_stream_meta<T>(streams: &T, name: &str) -> Result<Option<StoredStreamMeta>>
363    where
364        T: ReadableTable<&'static str, &'static [u8]>,
365    {
366        let payload = streams
367            .get(name)
368            .map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
369
370        if let Some(payload) = payload {
371            let meta = serde_json::from_slice(payload.value())
372                .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
373            Ok(Some(meta))
374        } else {
375            Ok(None)
376        }
377    }
378
379    fn write_stream_meta(
380        streams: &mut Table<'_, &'static str, &'static [u8]>,
381        name: &str,
382        meta: &StoredStreamMeta,
383    ) -> Result<()> {
384        let payload = serde_json::to_vec(meta)
385            .map_err(|e| Self::storage_err("failed to serialize stream metadata", e))?;
386        streams
387            .insert(name, payload.as_slice())
388            .map_err(|e| Self::storage_err("failed to write stream metadata", e))?;
389        Ok(())
390    }
391
392    fn delete_stream_messages(
393        messages: &mut Table<'_, (&'static str, u64, u64), &'static [u8]>,
394        name: &str,
395    ) -> Result<()> {
396        // redb table iterators cannot be safely mutated in-place; we must collect
397        // keys first, then remove in a second pass.
398        let mut keys = Vec::new();
399        let iter = messages
400            .range((name, 0_u64, 0_u64)..=(name, u64::MAX, u64::MAX))
401            .map_err(|e| Self::storage_err("failed to iterate stream messages", e))?;
402
403        for item in iter {
404            let (key, _) = item.map_err(|e| Self::storage_err("failed to read message key", e))?;
405            let (_, read_seq, byte_offset) = key.value();
406            keys.push((read_seq, byte_offset));
407        }
408
409        for (read_seq, byte_offset) in keys {
410            messages
411                .remove((name, read_seq, byte_offset))
412                .map_err(|e| Self::storage_err("failed to delete message", e))?;
413        }
414
415        Ok(())
416    }
417
418    fn notifier_sender(&self, name: &str) -> broadcast::Sender<()> {
419        // Use a single write lock with entry() to avoid a TOCTOU race where
420        // drop_notifier() could remove the sender between a read-lock miss
421        // and the subsequent write-lock acquire.
422        let mut guard = self.notifiers.write().expect("notifiers lock poisoned");
423        guard
424            .entry(name.to_string())
425            .or_insert_with(|| {
426                let (sender, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
427                sender
428            })
429            .clone()
430    }
431
432    fn notify_stream(&self, name: &str) {
433        if let Some(sender) = self
434            .notifiers
435            .read()
436            .expect("notifiers lock poisoned")
437            .get(name)
438        {
439            let _ = sender.send(());
440        }
441    }
442
443    fn drop_notifier(&self, name: &str) {
444        self.notifiers
445            .write()
446            .expect("notifiers lock poisoned")
447            .remove(name);
448    }
449
450    fn new_stream_meta(config: StreamConfig) -> StoredStreamMeta {
451        StoredStreamMeta {
452            config,
453            closed: false,
454            next_read_seq: 0,
455            next_byte_offset: 0,
456            total_bytes: 0,
457            created_at: Utc::now(),
458            last_seq: None,
459            producers: HashMap::new(),
460        }
461    }
462
463    fn batch_bytes(messages: &[Bytes]) -> u64 {
464        messages
465            .iter()
466            .map(|m| u64::try_from(m.len()).unwrap_or(u64::MAX))
467            .sum()
468    }
469
470    fn begin_write_txn(db: &Database) -> Result<redb::WriteTransaction> {
471        let mut txn = db
472            .begin_write()
473            .map_err(|e| Self::storage_err("failed to begin write transaction", e))?;
474        txn.set_durability(Durability::Immediate)
475            .map_err(|e| Self::storage_err("failed to set write durability", e))?;
476        Ok(txn)
477    }
478
479    fn ensure_schema(db: &Database) -> Result<()> {
480        let txn = Self::begin_write_txn(db)?;
481        let streams = txn
482            .open_table(STREAMS)
483            .map_err(|e| Self::storage_err("failed to initialize streams table", e))?;
484        let messages = txn
485            .open_table(MESSAGES)
486            .map_err(|e| Self::storage_err("failed to initialize messages table", e))?;
487        drop(messages);
488        drop(streams);
489        txn.commit()
490            .map_err(|e| Self::storage_err("failed to commit schema initialization", e))?;
491        Ok(())
492    }
493
494    fn rebuild_state_from_disk(&self) -> Result<u64> {
495        let mut total = 0_u64;
496        for shard in &self.shards {
497            total = total.saturating_add(self.rebuild_shard(shard)?);
498        }
499        Ok(total)
500    }
501
502    fn rebuild_shard(&self, shard: &AcidShard) -> Result<u64> {
503        let read_txn = shard
504            .db
505            .begin_read()
506            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
507        let streams = read_txn
508            .open_table(STREAMS)
509            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
510
511        let mut live_bytes = 0_u64;
512        let mut expired_names = Vec::new();
513
514        {
515            let iter = streams
516                .iter()
517                .map_err(|e| Self::storage_err("failed to iterate stream metadata", e))?;
518            for item in iter {
519                let (key, value) =
520                    item.map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
521                let stream_name = key.value().to_string();
522                let meta: StoredStreamMeta = serde_json::from_slice(value.value())
523                    .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
524                if super::is_stream_expired(&meta.config) {
525                    expired_names.push(stream_name);
526                } else {
527                    live_bytes = live_bytes.saturating_add(meta.total_bytes);
528                }
529            }
530        }
531
532        drop(streams);
533        drop(read_txn);
534
535        if expired_names.is_empty() {
536            return Ok(live_bytes);
537        }
538
539        let txn = Self::begin_write_txn(&shard.db)?;
540        let mut streams = txn
541            .open_table(STREAMS)
542            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
543        let mut messages = txn
544            .open_table(MESSAGES)
545            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
546
547        for name in &expired_names {
548            Self::delete_stream_messages(&mut messages, name)?;
549            streams
550                .remove(name.as_str())
551                .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
552            self.drop_notifier(name);
553        }
554
555        drop(messages);
556        drop(streams);
557        txn.commit()
558            .map_err(|e| Self::storage_err("failed to commit startup cleanup", e))?;
559
560        Ok(live_bytes)
561    }
562}
563
564/// Type-safe error classification trait for redb and IO error types.
565///
566/// Replaces the previous `Any`-based downcasting dispatcher with compile-time
567/// dispatch. Each implementation maps the concrete error type into the correct
568/// [`Error`] variant so that transient failures become 503, capacity failures
569/// become 507, and everything else maps to a generic 500.
570trait ClassifyError: std::fmt::Display {
571    fn into_storage_error(self, context: String, detail: String) -> Error;
572}
573
574impl ClassifyError for std::io::Error {
575    fn into_storage_error(self, context: String, detail: String) -> Error {
576        Error::classify_io_failure("acid", context, detail, &self)
577    }
578}
579
580impl ClassifyError for DatabaseError {
581    fn into_storage_error(self, context: String, detail: String) -> Error {
582        match &self {
583            DatabaseError::DatabaseAlreadyOpen => {
584                Error::storage_unavailable("acid", context, detail)
585            }
586            DatabaseError::Storage(storage_err) => {
587                AcidStorage::classify_redb_storage_error(context, storage_err, detail)
588            }
589            DatabaseError::RepairAborted | DatabaseError::UpgradeRequired(_) => {
590                Error::Storage(detail)
591            }
592            _ => {
593                warn!(error = %self, "unhandled redb DatabaseError variant");
594                Error::Storage(detail)
595            }
596        }
597    }
598}
599
600impl ClassifyError for TransactionError {
601    fn into_storage_error(self, context: String, detail: String) -> Error {
602        match &self {
603            TransactionError::Storage(storage_err) => {
604                AcidStorage::classify_redb_storage_error(context, storage_err, detail)
605            }
606            TransactionError::ReadTransactionStillInUse(_) => Error::Storage(detail),
607            _ => {
608                warn!(error = %self, "unhandled redb TransactionError variant");
609                Error::Storage(detail)
610            }
611        }
612    }
613}
614
615impl ClassifyError for TableError {
616    fn into_storage_error(self, context: String, detail: String) -> Error {
617        match &self {
618            TableError::Storage(storage_err) => {
619                AcidStorage::classify_redb_storage_error(context, storage_err, detail)
620            }
621            TableError::TableTypeMismatch { .. }
622            | TableError::TableIsMultimap(_)
623            | TableError::TableIsNotMultimap(_)
624            | TableError::TypeDefinitionChanged { .. }
625            | TableError::TableDoesNotExist(_)
626            | TableError::TableExists(_)
627            | TableError::TableAlreadyOpen(_, _) => Error::Storage(detail),
628            _ => {
629                warn!(error = %self, "unhandled redb TableError variant");
630                Error::Storage(detail)
631            }
632        }
633    }
634}
635
636impl ClassifyError for CommitError {
637    fn into_storage_error(self, context: String, detail: String) -> Error {
638        if let CommitError::Storage(storage_err) = &self {
639            AcidStorage::classify_redb_storage_error(context, storage_err, detail)
640        } else {
641            warn!(error = %self, "unhandled redb CommitError variant");
642            Error::Storage(detail)
643        }
644    }
645}
646
647impl ClassifyError for RedbStorageError {
648    fn into_storage_error(self, context: String, detail: String) -> Error {
649        AcidStorage::classify_redb_storage_error(context, &self, detail)
650    }
651}
652
653impl ClassifyError for SetDurabilityError {
654    fn into_storage_error(self, _context: String, detail: String) -> Error {
655        Error::Storage(detail)
656    }
657}
658
659impl ClassifyError for serde_json::Error {
660    fn into_storage_error(self, _context: String, detail: String) -> Error {
661        Error::Storage(detail)
662    }
663}
664
665impl Storage for AcidStorage {
666    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
667        let shard = self.shard(name);
668        let txn = Self::begin_write_txn(&shard.db)?;
669        let mut streams = txn
670            .open_table(STREAMS)
671            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
672        let mut messages = txn
673            .open_table(MESSAGES)
674            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
675
676        let mut removed_expired_bytes = 0_u64;
677
678        if let Some(existing) = Self::read_stream_meta(&streams, name)? {
679            if super::is_stream_expired(&existing.config) {
680                removed_expired_bytes = existing.total_bytes;
681                Self::delete_stream_messages(&mut messages, name)?;
682                streams
683                    .remove(name)
684                    .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
685            } else if existing.config == config {
686                return Ok(CreateStreamResult::AlreadyExists);
687            } else {
688                return Err(Error::ConfigMismatch);
689            }
690        }
691
692        let meta = Self::new_stream_meta(config);
693        Self::write_stream_meta(&mut streams, name, &meta)?;
694
695        drop(messages);
696        drop(streams);
697        txn.commit()
698            .map_err(|e| Self::storage_err("failed to commit create stream", e))?;
699
700        if removed_expired_bytes > 0 {
701            self.saturating_sub_total_bytes(removed_expired_bytes);
702            self.drop_notifier(name);
703        }
704
705        Ok(CreateStreamResult::Created)
706    }
707
708    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
709        let message_bytes = u64::try_from(data.len()).unwrap_or(u64::MAX);
710        self.reserve_total_bytes(message_bytes)?;
711
712        let result = (|| {
713            let shard = self.shard(name);
714            let txn = Self::begin_write_txn(&shard.db)?;
715            let mut streams = txn
716                .open_table(STREAMS)
717                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
718            let mut messages = txn
719                .open_table(MESSAGES)
720                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
721
722            let mut meta = Self::read_stream_meta(&streams, name)?
723                .ok_or_else(|| Error::NotFound(name.to_string()))?;
724
725            if super::is_stream_expired(&meta.config) {
726                return Err(Error::StreamExpired);
727            }
728            if meta.closed {
729                return Err(Error::StreamClosed);
730            }
731
732            super::validate_content_type(&meta.config.content_type, content_type)?;
733
734            if meta.total_bytes + message_bytes > self.max_stream_bytes {
735                return Err(Error::StreamSizeLimitExceeded);
736            }
737
738            let offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
739            messages
740                .insert(
741                    (name, meta.next_read_seq, meta.next_byte_offset),
742                    data.as_ref(),
743                )
744                .map_err(|e| Self::storage_err("failed to append message", e))?;
745
746            meta.next_read_seq += 1;
747            meta.next_byte_offset += message_bytes;
748            meta.total_bytes += message_bytes;
749
750            Self::write_stream_meta(&mut streams, name, &meta)?;
751
752            drop(messages);
753            drop(streams);
754            txn.commit()
755                .map_err(|e| Self::storage_err("failed to commit append", e))?;
756
757            Ok(offset)
758        })();
759
760        if result.is_err() {
761            self.rollback_total_bytes(message_bytes);
762            return result;
763        }
764
765        self.notify_stream(name);
766        result
767    }
768
769    fn batch_append(
770        &self,
771        name: &str,
772        messages: Vec<Bytes>,
773        content_type: &str,
774        seq: Option<&str>,
775    ) -> Result<Offset> {
776        if messages.is_empty() {
777            return Err(Error::InvalidHeader {
778                header: "Content-Length".to_string(),
779                reason: "batch cannot be empty".to_string(),
780            });
781        }
782
783        let batch_bytes = Self::batch_bytes(&messages);
784        self.reserve_total_bytes(batch_bytes)?;
785
786        let result = (|| {
787            let shard = self.shard(name);
788            let txn = Self::begin_write_txn(&shard.db)?;
789            let mut streams = txn
790                .open_table(STREAMS)
791                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
792            let mut message_table = txn
793                .open_table(MESSAGES)
794                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
795
796            let mut meta = Self::read_stream_meta(&streams, name)?
797                .ok_or_else(|| Error::NotFound(name.to_string()))?;
798
799            if super::is_stream_expired(&meta.config) {
800                return Err(Error::StreamExpired);
801            }
802            if meta.closed {
803                return Err(Error::StreamClosed);
804            }
805
806            super::validate_content_type(&meta.config.content_type, content_type)?;
807            let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
808
809            if meta.total_bytes + batch_bytes > self.max_stream_bytes {
810                return Err(Error::StreamSizeLimitExceeded);
811            }
812
813            for data in &messages {
814                let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
815                message_table
816                    .insert(
817                        (name, meta.next_read_seq, meta.next_byte_offset),
818                        data.as_ref(),
819                    )
820                    .map_err(|e| Self::storage_err("failed to append batch message", e))?;
821                meta.next_read_seq += 1;
822                meta.next_byte_offset += len;
823                meta.total_bytes += len;
824            }
825
826            if let Some(new_seq) = pending_seq {
827                meta.last_seq = Some(new_seq);
828            }
829
830            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
831            Self::write_stream_meta(&mut streams, name, &meta)?;
832
833            drop(message_table);
834            drop(streams);
835            txn.commit()
836                .map_err(|e| Self::storage_err("failed to commit batch append", e))?;
837
838            Ok(next_offset)
839        })();
840
841        if result.is_err() {
842            self.rollback_total_bytes(batch_bytes);
843            return result;
844        }
845
846        self.notify_stream(name);
847        result
848    }
849
850    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
851        let shard = self.shard(name);
852        let txn = shard
853            .db
854            .begin_read()
855            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
856
857        let streams = txn
858            .open_table(STREAMS)
859            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
860        let message_table = txn
861            .open_table(MESSAGES)
862            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
863
864        let meta = Self::read_stream_meta(&streams, name)?
865            .ok_or_else(|| Error::NotFound(name.to_string()))?;
866
867        if super::is_stream_expired(&meta.config) {
868            return Err(Error::StreamExpired);
869        }
870
871        if from_offset.is_now() {
872            return Ok(ReadResult {
873                messages: Vec::new(),
874                next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
875                at_tail: true,
876                closed: meta.closed,
877            });
878        }
879
880        let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
881            (0_u64, 0_u64)
882        } else {
883            from_offset.parse_components().ok_or_else(|| {
884                Error::InvalidOffset("non-concrete offset in read range".to_string())
885            })?
886        };
887
888        let iter = message_table
889            .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
890            .map_err(|e| Self::storage_err("failed to read stream range", e))?;
891
892        let mut messages = Vec::new();
893        for item in iter {
894            let (_, value) =
895                item.map_err(|e| Self::storage_err("failed to read stream message", e))?;
896            messages.push(Bytes::copy_from_slice(value.value()));
897        }
898
899        Ok(ReadResult {
900            messages,
901            next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
902            at_tail: true,
903            closed: meta.closed,
904        })
905    }
906
907    fn delete(&self, name: &str) -> Result<()> {
908        let shard = self.shard(name);
909        let txn = Self::begin_write_txn(&shard.db)?;
910        let mut streams = txn
911            .open_table(STREAMS)
912            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
913        let mut messages = txn
914            .open_table(MESSAGES)
915            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
916
917        let meta = Self::read_stream_meta(&streams, name)?
918            .ok_or_else(|| Error::NotFound(name.to_string()))?;
919
920        Self::delete_stream_messages(&mut messages, name)?;
921        streams
922            .remove(name)
923            .map_err(|e| Self::storage_err("failed to remove stream metadata", e))?;
924
925        drop(messages);
926        drop(streams);
927        txn.commit()
928            .map_err(|e| Self::storage_err("failed to commit delete", e))?;
929
930        self.saturating_sub_total_bytes(meta.total_bytes);
931        self.drop_notifier(name);
932        Ok(())
933    }
934
935    fn head(&self, name: &str) -> Result<StreamMetadata> {
936        let shard = self.shard(name);
937        let txn = shard
938            .db
939            .begin_read()
940            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
941
942        let streams = txn
943            .open_table(STREAMS)
944            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
945
946        let meta = Self::read_stream_meta(&streams, name)?
947            .ok_or_else(|| Error::NotFound(name.to_string()))?;
948
949        if super::is_stream_expired(&meta.config) {
950            return Err(Error::StreamExpired);
951        }
952
953        Ok(StreamMetadata {
954            config: meta.config,
955            next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
956            closed: meta.closed,
957            total_bytes: meta.total_bytes,
958            message_count: meta.next_read_seq,
959            created_at: meta.created_at,
960        })
961    }
962
963    fn close_stream(&self, name: &str) -> Result<()> {
964        let shard = self.shard(name);
965        let txn = Self::begin_write_txn(&shard.db)?;
966        let mut streams = txn
967            .open_table(STREAMS)
968            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
969
970        let mut meta = Self::read_stream_meta(&streams, name)?
971            .ok_or_else(|| Error::NotFound(name.to_string()))?;
972
973        if super::is_stream_expired(&meta.config) {
974            return Err(Error::StreamExpired);
975        }
976
977        meta.closed = true;
978        Self::write_stream_meta(&mut streams, name, &meta)?;
979
980        drop(streams);
981        txn.commit()
982            .map_err(|e| Self::storage_err("failed to commit close stream", e))?;
983
984        self.notify_stream(name);
985        Ok(())
986    }
987
988    fn append_with_producer(
989        &self,
990        name: &str,
991        messages: Vec<Bytes>,
992        content_type: &str,
993        producer: &ProducerHeaders,
994        should_close: bool,
995        seq: Option<&str>,
996    ) -> Result<ProducerAppendResult> {
997        let batch_bytes = Self::batch_bytes(&messages);
998        self.reserve_total_bytes(batch_bytes)?;
999
1000        let result = (|| {
1001            let shard = self.shard(name);
1002            let txn = Self::begin_write_txn(&shard.db)?;
1003            let mut streams = txn
1004                .open_table(STREAMS)
1005                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
1006            let mut message_table = txn
1007                .open_table(MESSAGES)
1008                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
1009
1010            let mut meta = Self::read_stream_meta(&streams, name)?
1011                .ok_or_else(|| Error::NotFound(name.to_string()))?;
1012
1013            if super::is_stream_expired(&meta.config) {
1014                return Err(Error::StreamExpired);
1015            }
1016
1017            super::cleanup_stale_producers(&mut meta.producers);
1018
1019            if !messages.is_empty() {
1020                super::validate_content_type(&meta.config.content_type, content_type)?;
1021            }
1022
1023            match super::check_producer(
1024                meta.producers.get(producer.id.as_str()),
1025                producer,
1026                meta.closed,
1027            )? {
1028                ProducerCheck::Accept => {}
1029                ProducerCheck::Duplicate { epoch, seq } => {
1030                    return Ok(ProducerAppendResult::Duplicate {
1031                        epoch,
1032                        seq,
1033                        next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
1034                        closed: meta.closed,
1035                    });
1036                }
1037            }
1038
1039            let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
1040
1041            if meta.total_bytes + batch_bytes > self.max_stream_bytes {
1042                return Err(Error::StreamSizeLimitExceeded);
1043            }
1044
1045            for data in &messages {
1046                let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
1047                message_table
1048                    .insert(
1049                        (name, meta.next_read_seq, meta.next_byte_offset),
1050                        data.as_ref(),
1051                    )
1052                    .map_err(|e| Self::storage_err("failed to append producer message", e))?;
1053                meta.next_read_seq += 1;
1054                meta.next_byte_offset += len;
1055                meta.total_bytes += len;
1056            }
1057
1058            if let Some(new_seq) = pending_seq {
1059                meta.last_seq = Some(new_seq);
1060            }
1061            if should_close {
1062                meta.closed = true;
1063            }
1064
1065            meta.producers.insert(
1066                producer.id.clone(),
1067                ProducerState {
1068                    epoch: producer.epoch,
1069                    last_seq: producer.seq,
1070                    updated_at: Utc::now(),
1071                },
1072            );
1073
1074            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
1075            let closed = meta.closed;
1076
1077            Self::write_stream_meta(&mut streams, name, &meta)?;
1078            drop(message_table);
1079            drop(streams);
1080            txn.commit()
1081                .map_err(|e| Self::storage_err("failed to commit producer append", e))?;
1082
1083            Ok(ProducerAppendResult::Accepted {
1084                epoch: producer.epoch,
1085                seq: producer.seq,
1086                next_offset,
1087                closed,
1088            })
1089        })();
1090
1091        if result.is_err() || matches!(result, Ok(ProducerAppendResult::Duplicate { .. })) {
1092            self.rollback_total_bytes(batch_bytes);
1093        }
1094
1095        if result.is_ok() && (!messages.is_empty() || should_close) {
1096            self.notify_stream(name);
1097        }
1098
1099        result
1100    }
1101
1102    fn create_stream_with_data(
1103        &self,
1104        name: &str,
1105        config: StreamConfig,
1106        messages: Vec<Bytes>,
1107        should_close: bool,
1108    ) -> Result<CreateWithDataResult> {
1109        let batch_bytes = Self::batch_bytes(&messages);
1110
1111        let mut reserved = false;
1112        let mut removed_expired_bytes = 0_u64;
1113
1114        let result = (|| {
1115            let shard = self.shard(name);
1116            let txn = Self::begin_write_txn(&shard.db)?;
1117            let mut streams = txn
1118                .open_table(STREAMS)
1119                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
1120            let mut message_table = txn
1121                .open_table(MESSAGES)
1122                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
1123
1124            if let Some(existing) = Self::read_stream_meta(&streams, name)? {
1125                if super::is_stream_expired(&existing.config) {
1126                    removed_expired_bytes = existing.total_bytes;
1127                    Self::delete_stream_messages(&mut message_table, name)?;
1128                    streams
1129                        .remove(name)
1130                        .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
1131                } else if existing.config == config {
1132                    return Ok(CreateWithDataResult {
1133                        status: CreateStreamResult::AlreadyExists,
1134                        next_offset: Offset::new(existing.next_read_seq, existing.next_byte_offset),
1135                        closed: existing.closed,
1136                    });
1137                } else {
1138                    return Err(Error::ConfigMismatch);
1139                }
1140            }
1141
1142            if batch_bytes > 0 {
1143                self.reserve_total_bytes(batch_bytes)?;
1144                reserved = true;
1145            }
1146
1147            let mut meta = Self::new_stream_meta(config);
1148
1149            if batch_bytes > 0 {
1150                if meta.total_bytes + batch_bytes > self.max_stream_bytes {
1151                    return Err(Error::StreamSizeLimitExceeded);
1152                }
1153                for data in &messages {
1154                    let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
1155                    message_table
1156                        .insert(
1157                            (name, meta.next_read_seq, meta.next_byte_offset),
1158                            data.as_ref(),
1159                        )
1160                        .map_err(|e| {
1161                            Self::storage_err("failed to append create-with-data message", e)
1162                        })?;
1163                    meta.next_read_seq += 1;
1164                    meta.next_byte_offset += len;
1165                    meta.total_bytes += len;
1166                }
1167            }
1168
1169            if should_close {
1170                meta.closed = true;
1171            }
1172
1173            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
1174            let closed = meta.closed;
1175
1176            Self::write_stream_meta(&mut streams, name, &meta)?;
1177            drop(message_table);
1178            drop(streams);
1179            txn.commit()
1180                .map_err(|e| Self::storage_err("failed to commit create stream with data", e))?;
1181
1182            Ok(CreateWithDataResult {
1183                status: CreateStreamResult::Created,
1184                next_offset,
1185                closed,
1186            })
1187        })();
1188
1189        if result.is_err() && reserved {
1190            self.rollback_total_bytes(batch_bytes);
1191        }
1192
1193        if result.is_ok() {
1194            if removed_expired_bytes > 0 {
1195                self.saturating_sub_total_bytes(removed_expired_bytes);
1196                self.drop_notifier(name);
1197            }
1198            if should_close || !messages.is_empty() {
1199                self.notify_stream(name);
1200            }
1201        }
1202
1203        result
1204    }
1205
1206    fn exists(&self, name: &str) -> bool {
1207        let shard = self.shard(name);
1208        let Ok(txn) = shard.db.begin_read() else {
1209            return false;
1210        };
1211        let Ok(streams) = txn.open_table(STREAMS) else {
1212            return false;
1213        };
1214
1215        match Self::read_stream_meta(&streams, name) {
1216            Ok(Some(meta)) => !super::is_stream_expired(&meta.config),
1217            _ => false,
1218        }
1219    }
1220
1221    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1222        let shard = self.shard(name);
1223        let txn = shard.db.begin_read().ok()?;
1224        let streams = txn.open_table(STREAMS).ok()?;
1225        let meta = Self::read_stream_meta(&streams, name).ok()??;
1226
1227        if super::is_stream_expired(&meta.config) {
1228            return None;
1229        }
1230
1231        Some(self.notifier_sender(name).subscribe())
1232    }
1233
1234    fn cleanup_expired_streams(&self) -> usize {
1235        let mut total_removed = 0;
1236
1237        for shard in &self.shards {
1238            // Read pass: find candidate expired stream names.
1239            let Ok(read_txn) = shard.db.begin_read() else {
1240                continue;
1241            };
1242            let Ok(streams_table) = read_txn.open_table(STREAMS) else {
1243                continue;
1244            };
1245            let Ok(iter) = streams_table.iter() else {
1246                continue;
1247            };
1248
1249            let mut candidates: Vec<String> = Vec::new();
1250            for item in iter {
1251                let Ok((key, value)) = item else {
1252                    continue;
1253                };
1254                let name = key.value().to_string();
1255                let Ok(meta) = serde_json::from_slice::<StoredStreamMeta>(value.value()) else {
1256                    continue;
1257                };
1258                if super::is_stream_expired(&meta.config) {
1259                    candidates.push(name);
1260                }
1261            }
1262
1263            drop(streams_table);
1264            drop(read_txn);
1265
1266            if candidates.is_empty() {
1267                continue;
1268            }
1269
1270            // Write pass: re-verify expiration and delete confirmed streams.
1271            let Ok(txn) = Self::begin_write_txn(&shard.db) else {
1272                continue;
1273            };
1274            let Ok(mut streams) = txn.open_table(STREAMS) else {
1275                continue;
1276            };
1277            let Ok(mut messages) = txn.open_table(MESSAGES) else {
1278                continue;
1279            };
1280
1281            let mut committed = Vec::new();
1282            for name in &candidates {
1283                // Re-check expiration inside the write transaction to avoid a
1284                // TOCTOU race with concurrent creates.
1285                let meta = streams
1286                    .get(name.as_str())
1287                    .ok()
1288                    .flatten()
1289                    .and_then(|v| serde_json::from_slice::<StoredStreamMeta>(v.value()).ok());
1290                let Some(meta) = meta else { continue };
1291                if !super::is_stream_expired(&meta.config) {
1292                    continue;
1293                }
1294
1295                let _ = Self::delete_stream_messages(&mut messages, name);
1296                let _ = streams.remove(name.as_str());
1297                committed.push((name.clone(), meta.total_bytes));
1298            }
1299
1300            drop(messages);
1301            drop(streams);
1302
1303            match txn.commit() {
1304                Ok(()) => {
1305                    for (name, bytes) in &committed {
1306                        self.rollback_total_bytes(*bytes);
1307                        self.drop_notifier(name);
1308                    }
1309                    total_removed += committed.len();
1310                }
1311                Err(e) => {
1312                    warn!(%e, "failed to commit expired stream cleanup");
1313                }
1314            }
1315        }
1316
1317        total_removed
1318    }
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323    use super::*;
1324    use chrono::Duration;
1325    use std::sync::Arc;
1326    use std::sync::atomic::{AtomicU64, Ordering};
1327    use std::thread;
1328
1329    fn test_storage_dir() -> PathBuf {
1330        static COUNTER: AtomicU64 = AtomicU64::new(0);
1331        let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1332        let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
1333        let pid = std::process::id();
1334        std::env::temp_dir().join(format!("ds-acid-storage-test-{stamp}-{pid}-{seq}"))
1335    }
1336
1337    fn test_storage() -> AcidStorage {
1338        AcidStorage::new(
1339            test_storage_dir(),
1340            16,
1341            1024 * 1024,
1342            100 * 1024,
1343            AcidBackend::File,
1344        )
1345        .expect("acid storage should initialize")
1346    }
1347
1348    fn producer(id: &str, epoch: u64, seq: u64) -> ProducerHeaders {
1349        ProducerHeaders {
1350            id: id.to_string(),
1351            epoch,
1352            seq,
1353        }
1354    }
1355
1356    #[test]
1357    fn test_restore_from_disk() {
1358        let root = test_storage_dir();
1359        let cfg = StreamConfig::new("text/plain".to_string());
1360
1361        {
1362            let storage =
1363                AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1364                    .unwrap();
1365            storage.create_stream("events", cfg.clone()).unwrap();
1366            storage
1367                .append("events", Bytes::from("event-1"), "text/plain")
1368                .unwrap();
1369            storage
1370                .append("events", Bytes::from("event-2"), "text/plain")
1371                .unwrap();
1372        }
1373
1374        let restored =
1375            AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1376        let read = restored.read("events", &Offset::start()).unwrap();
1377
1378        assert_eq!(read.messages.len(), 2);
1379        assert_eq!(read.messages[0], Bytes::from("event-1"));
1380        assert_eq!(read.messages[1], Bytes::from("event-2"));
1381    }
1382
1383    #[test]
1384    fn test_restore_closed_stream_from_disk() {
1385        let root = test_storage_dir();
1386        let cfg = StreamConfig::new("text/plain".to_string());
1387
1388        {
1389            let storage =
1390                AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1391                    .unwrap();
1392            storage.create_stream("s", cfg.clone()).unwrap();
1393            storage
1394                .append("s", Bytes::from("data"), "text/plain")
1395                .unwrap();
1396            storage.close_stream("s").unwrap();
1397        }
1398
1399        let restored =
1400            AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1401        let meta = restored.head("s").unwrap();
1402        assert!(meta.closed);
1403        assert_eq!(meta.message_count, 1);
1404
1405        assert!(matches!(
1406            restored.append("s", Bytes::from("more"), "text/plain"),
1407            Err(Error::StreamClosed)
1408        ));
1409    }
1410
1411    #[test]
1412    fn test_restart_preserves_producer_state() {
1413        let root = test_storage_dir();
1414
1415        {
1416            let storage =
1417                AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1418                    .unwrap();
1419            storage
1420                .create_stream("s", StreamConfig::new("text/plain".to_string()))
1421                .unwrap();
1422            let result = storage
1423                .append_with_producer(
1424                    "s",
1425                    vec![Bytes::from("x")],
1426                    "text/plain",
1427                    &producer("p1", 0, 0),
1428                    false,
1429                    None,
1430                )
1431                .unwrap();
1432            assert!(matches!(result, ProducerAppendResult::Accepted { .. }));
1433        }
1434
1435        let restored =
1436            AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1437        let dup = restored
1438            .append_with_producer(
1439                "s",
1440                vec![Bytes::from("x")],
1441                "text/plain",
1442                &producer("p1", 0, 0),
1443                false,
1444                None,
1445            )
1446            .unwrap();
1447        assert!(matches!(dup, ProducerAppendResult::Duplicate { .. }));
1448    }
1449
1450    #[test]
1451    fn test_shard_routing_same_stream_is_stable() {
1452        let storage = test_storage();
1453        let a = storage.shard_index("same-stream");
1454        let b = storage.shard_index("same-stream");
1455        assert_eq!(a, b);
1456    }
1457
1458    #[test]
1459    fn test_shard_distribution_uses_multiple_shards() {
1460        let storage = test_storage();
1461        let mut seen = std::collections::HashSet::new();
1462        for i in 0..256 {
1463            seen.insert(storage.shard_index(&format!("stream-{i}")));
1464        }
1465        assert!(seen.len() > 1);
1466    }
1467
1468    #[test]
1469    fn test_startup_purges_expired_streams() {
1470        let root = test_storage_dir();
1471        {
1472            let storage =
1473                AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1474                    .unwrap();
1475            let expires = Utc::now() + Duration::milliseconds(50);
1476            let cfg = StreamConfig::new("text/plain".to_string()).with_expires_at(expires);
1477            storage.create_stream("expiring", cfg).unwrap();
1478            storage
1479                .append("expiring", Bytes::from("x"), "text/plain")
1480                .unwrap();
1481        }
1482
1483        std::thread::sleep(std::time::Duration::from_millis(100));
1484
1485        let restored =
1486            AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1487        assert!(!restored.exists("expiring"));
1488        assert!(matches!(
1489            restored.read("expiring", &Offset::start()),
1490            Err(Error::NotFound(_) | Error::StreamExpired)
1491        ));
1492    }
1493
1494    #[test]
1495    fn test_global_cap_strict_under_concurrency() {
1496        let storage = Arc::new(
1497            AcidStorage::new(test_storage_dir(), 16, 120, 120, AcidBackend::File).unwrap(),
1498        );
1499        let shard_count = (0..8)
1500            .map(|i| storage.shard_index(&format!("s-{i}")))
1501            .collect::<std::collections::HashSet<_>>()
1502            .len();
1503        assert!(
1504            shard_count > 1,
1505            "test streams must span multiple shards to validate cross-shard cap behavior"
1506        );
1507
1508        for i in 0..8 {
1509            storage
1510                .create_stream(
1511                    &format!("s-{i}"),
1512                    StreamConfig::new("text/plain".to_string()),
1513                )
1514                .unwrap();
1515        }
1516
1517        let mut handles = Vec::new();
1518        for i in 0..8 {
1519            let storage = Arc::clone(&storage);
1520            handles.push(thread::spawn(move || {
1521                storage.append(&format!("s-{i}"), Bytes::from(vec![0_u8; 40]), "text/plain")
1522            }));
1523        }
1524
1525        for h in handles {
1526            let _ = h.join().unwrap();
1527        }
1528
1529        assert!(storage.total_bytes() <= 120);
1530    }
1531
1532    #[test]
1533    fn test_layout_manifest_mismatch_fails_fast() {
1534        let root = test_storage_dir();
1535        let first = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1536        assert!(first.is_ok());
1537
1538        let mismatch = AcidStorage::new(root, 8, 1024 * 1024, 100 * 1024, AcidBackend::File);
1539        assert!(matches!(mismatch, Err(Error::Storage(_))));
1540    }
1541
1542    #[test]
1543    fn test_layout_manifest_invalid_json_fails_fast() {
1544        let root = test_storage_dir();
1545        let acid_dir = root.join("acid");
1546        fs::create_dir_all(&acid_dir).unwrap();
1547        fs::write(acid_dir.join("layout.json"), b"{invalid-json").unwrap();
1548
1549        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1550        assert!(matches!(reopened, Err(Error::Storage(_))));
1551    }
1552
1553    #[test]
1554    fn test_layout_manifest_hash_policy_mismatch_fails_fast() {
1555        let root = test_storage_dir();
1556        let storage =
1557            AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1558        drop(storage);
1559
1560        let layout_path = root.join("acid").join("layout.json");
1561        let mut layout: serde_json::Value =
1562            serde_json::from_slice(&fs::read(&layout_path).unwrap()).unwrap();
1563        layout["hash_policy"] = serde_json::Value::String("tampered-hash-policy".to_string());
1564        fs::write(layout_path, serde_json::to_vec_pretty(&layout).unwrap()).unwrap();
1565
1566        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1567        assert!(matches!(reopened, Err(Error::Storage(_))));
1568    }
1569
1570    #[test]
1571    fn test_corrupted_stream_metadata_fails_fast_on_startup() {
1572        let root = test_storage_dir();
1573        let storage =
1574            AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1575        storage
1576            .create_stream("s", StreamConfig::new("text/plain".to_string()))
1577            .unwrap();
1578        storage
1579            .append("s", Bytes::from("payload"), "text/plain")
1580            .unwrap();
1581
1582        // Simulate on-disk corruption of stream metadata.
1583        let shard_idx = storage.shard_index("s");
1584        let txn = AcidStorage::begin_write_txn(&storage.shards[shard_idx].db).unwrap();
1585        let mut streams = txn.open_table(STREAMS).unwrap();
1586        let corrupt = b"{not-json".to_vec();
1587        streams.insert("s", corrupt.as_slice()).unwrap();
1588        drop(streams);
1589        txn.commit().unwrap();
1590        drop(storage);
1591
1592        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1593        assert!(matches!(reopened, Err(Error::Storage(_))));
1594    }
1595
1596    #[test]
1597    fn test_tampered_shard_file_fails_fast_on_startup() {
1598        let root = test_storage_dir();
1599        let storage =
1600            AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1601        storage
1602            .create_stream("s", StreamConfig::new("text/plain".to_string()))
1603            .unwrap();
1604        storage
1605            .append("s", Bytes::from("payload"), "text/plain")
1606            .unwrap();
1607        let shard_idx = storage.shard_index("s");
1608        drop(storage);
1609
1610        let shard_path = root
1611            .join("acid")
1612            .join(format!("shard_{shard_idx:02x}.redb"));
1613        fs::write(&shard_path, b"not-a-valid-redb-file").unwrap();
1614
1615        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1616        assert!(matches!(reopened, Err(Error::Storage(_))));
1617    }
1618
1619    #[test]
1620    fn test_in_memory_backend_create_append_read() {
1621        let storage = AcidStorage::new(
1622            test_storage_dir(),
1623            4,
1624            1024 * 1024,
1625            100 * 1024,
1626            AcidBackend::InMemory,
1627        )
1628        .expect("in-memory acid storage should initialize");
1629
1630        let cfg = StreamConfig::new("text/plain".to_string());
1631        storage.create_stream("s", cfg).unwrap();
1632        storage
1633            .append("s", Bytes::from("hello"), "text/plain")
1634            .unwrap();
1635        storage
1636            .append("s", Bytes::from("world"), "text/plain")
1637            .unwrap();
1638
1639        let read = storage.read("s", &Offset::start()).unwrap();
1640        assert_eq!(read.messages.len(), 2);
1641        assert_eq!(read.messages[0], Bytes::from("hello"));
1642        assert_eq!(read.messages[1], Bytes::from("world"));
1643
1644        let meta = storage.head("s").unwrap();
1645        assert_eq!(meta.message_count, 2);
1646        assert_eq!(meta.total_bytes, 10);
1647    }
1648
1649    #[test]
1650    fn test_in_memory_backend_global_cap() {
1651        let storage = AcidStorage::new(test_storage_dir(), 4, 50, 50, AcidBackend::InMemory)
1652            .expect("in-memory acid storage should initialize");
1653
1654        let cfg = StreamConfig::new("text/plain".to_string());
1655        storage.create_stream("s", cfg).unwrap();
1656        storage
1657            .append("s", Bytes::from(vec![0_u8; 40]), "text/plain")
1658            .unwrap();
1659
1660        let result = storage.append("s", Bytes::from(vec![0_u8; 20]), "text/plain");
1661        assert!(result.is_err());
1662        assert_eq!(storage.total_bytes(), 40);
1663    }
1664}