Skip to main content

durable_streams_server/storage/acid/
mod.rs

1//! Crash-resilient redb-backed storage with sharded databases.
2//!
3//! This backend stores stream metadata and messages in redb tables and uses a
4//! stable hash-based shard layout so a stream always maps to the same database
5//! file after restarts.
6
7mod layout;
8mod storage_impl;
9
10#[cfg(test)]
11mod tests;
12
13use super::{ForkInfo, NOTIFY_CHANNEL_CAPACITY, ProducerState, StreamConfig, StreamState};
14use crate::config::AcidBackend;
15use crate::protocol::error::{Error, Result};
16use crate::protocol::offset::Offset;
17use bytes::Bytes;
18use chrono::{DateTime, Utc};
19use redb::backends::InMemoryBackend;
20use redb::{
21    CommitError, Database, DatabaseError, Durability, ReadableDatabase, ReadableTable,
22    SetDurabilityError, StorageError as RedbStorageError, Table, TableDefinition, TableError,
23    TransactionError,
24};
25use seahash::hash;
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::fs;
29use std::path::{Path, PathBuf};
30use std::sync::RwLock;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::time::Duration;
33use tokio::sync::broadcast;
34use tracing::warn;
35
36const STREAMS: TableDefinition<&str, &[u8]> = TableDefinition::new("streams");
37const MESSAGES: TableDefinition<(&str, u64, u64), &[u8]> = TableDefinition::new("messages");
38
39const LAYOUT_FORMAT_VERSION: u32 = 1;
40const HASH_POLICY: &str = "seahash-v1";
41/// Retry backoff for startup-only operations (shard database open).
42/// Not used on the request path — transient errors there propagate as 503.
43const STARTUP_RETRY_BACKOFF_MS: [u64; 3] = [10, 25, 50];
44
45#[derive(Debug, Serialize, Deserialize)]
46struct LayoutManifest {
47    format_version: u32,
48    shard_count: usize,
49    hash_policy: String,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct StoredStreamMeta {
54    config: StreamConfig,
55    closed: bool,
56    next_read_seq: u64,
57    next_byte_offset: u64,
58    total_bytes: u64,
59    created_at: DateTime<Utc>,
60    #[serde(default)]
61    updated_at: Option<DateTime<Utc>>,
62    last_seq: Option<String>,
63    producers: HashMap<String, ProducerState>,
64    #[serde(default)]
65    fork_info: Option<ForkInfo>,
66    #[serde(default)]
67    ref_count: u32,
68    #[serde(default)]
69    state: StreamState,
70}
71
72#[derive(Debug)]
73struct AcidShard {
74    db: Database,
75}
76
77#[allow(clippy::module_name_repetitions)]
78pub struct AcidStorage {
79    shards: Vec<AcidShard>,
80    shard_count: usize,
81    total_bytes: AtomicU64,
82    max_total_bytes: u64,
83    max_stream_bytes: u64,
84    notifiers: RwLock<HashMap<String, broadcast::Sender<()>>>,
85}
86
87impl AcidStorage {
88    /// Create or reopen an ACID storage root.
89    ///
90    /// When `backend` is [`AcidBackend::File`] the backend stores its files
91    /// beneath `<root>/acid`, validates a layout manifest, and rebuilds
92    /// aggregate state from disk before serving requests.
93    ///
94    /// When `backend` is [`AcidBackend::InMemory`] the `root_dir` is ignored
95    /// and each shard uses a redb [`InMemoryBackend`]. ACID transaction
96    /// semantics still apply but all data is lost on shutdown.
97    ///
98    /// # Errors
99    ///
100    /// Returns `Error::Storage` if storage layout validation fails, shard
101    /// databases cannot be opened, or on-disk recovery cannot complete.
102    pub fn new(
103        root_dir: impl Into<PathBuf>,
104        shard_count: usize,
105        max_total_bytes: u64,
106        max_stream_bytes: u64,
107        backend: AcidBackend,
108    ) -> Result<Self> {
109        Self::validate_shard_count(shard_count)?;
110
111        let shards = match backend {
112            AcidBackend::File => Self::create_file_shards(&root_dir.into(), shard_count)?,
113            AcidBackend::InMemory => Self::create_in_memory_shards(shard_count)?,
114        };
115
116        let storage = Self {
117            shards,
118            shard_count,
119            total_bytes: AtomicU64::new(0),
120            max_total_bytes,
121            max_stream_bytes,
122            notifiers: RwLock::new(HashMap::new()),
123        };
124
125        let total_bytes = storage.rebuild_state_from_disk()?;
126        storage.total_bytes.store(total_bytes, Ordering::Release);
127
128        Ok(storage)
129    }
130
131    /// Return the currently tracked total payload bytes across all streams.
132    #[must_use]
133    pub fn total_bytes(&self) -> u64 {
134        self.total_bytes.load(Ordering::Acquire)
135    }
136
137    fn validate_shard_count(shard_count: usize) -> Result<()> {
138        if !(1..=256).contains(&shard_count) {
139            return Err(Error::Storage(format!(
140                "acid shard count must be in range 1..=256, got {shard_count}"
141            )));
142        }
143        if !shard_count.is_power_of_two() {
144            return Err(Error::Storage(format!(
145                "acid shard count must be a power of two, got {shard_count}"
146            )));
147        }
148        Ok(())
149    }
150
151    fn storage_err<E: ClassifyError>(context: impl Into<String>, err: E) -> Error {
152        let context = context.into();
153        let detail = format!("{context}: {err}");
154        err.into_storage_error(context, detail)
155    }
156
157    fn classify_redb_storage_error(
158        context: String,
159        err: &RedbStorageError,
160        detail: String,
161    ) -> Error {
162        match err {
163            RedbStorageError::Io(io_err) => {
164                Error::classify_io_failure("acid", context, detail, io_err)
165            }
166            RedbStorageError::DatabaseClosed | RedbStorageError::PreviousIo => {
167                Error::storage_unavailable("acid", context, detail)
168            }
169            RedbStorageError::ValueTooLarge(_) => {
170                Error::storage_insufficient("acid", context, detail)
171            }
172            RedbStorageError::Corrupted(_) | RedbStorageError::LockPoisoned(_) => {
173                Error::Storage(detail)
174            }
175            _ => {
176                warn!(error = %err, "unhandled redb StorageError variant");
177                Error::Storage(detail)
178            }
179        }
180    }
181
182    #[must_use]
183    fn shard_index(&self, name: &str) -> usize {
184        let hash_u64 = hash(name.as_bytes());
185        let hash_usize = usize::try_from(hash_u64).unwrap_or_else(|_| {
186            let masked = hash_u64 & u64::from(u32::MAX);
187            usize::try_from(masked).expect("masked hash value must fit in usize")
188        });
189        hash_usize & (self.shard_count - 1)
190    }
191
192    fn find_stream_shard_index(&self, name: &str) -> Result<Option<usize>> {
193        let hashed_idx = self.shard_index(name);
194        if self.stream_exists_in_shard(hashed_idx, name)? {
195            return Ok(Some(hashed_idx));
196        }
197
198        let mut found = None;
199
200        for (idx, shard) in self.shards.iter().enumerate() {
201            if idx == hashed_idx {
202                continue;
203            }
204
205            let txn = shard
206                .db
207                .begin_read()
208                .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
209            let streams = txn
210                .open_table(STREAMS)
211                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
212
213            if Self::read_stream_meta(&streams, name)?.is_some() && found.replace(idx).is_some() {
214                return Err(Error::Storage(format!(
215                    "stream metadata exists in multiple shards for {name}"
216                )));
217            }
218        }
219
220        Ok(found)
221    }
222
223    fn stream_exists_in_shard(&self, shard_idx: usize, name: &str) -> Result<bool> {
224        let shard = &self.shards[shard_idx];
225        let txn = shard
226            .db
227            .begin_read()
228            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
229        let streams = txn
230            .open_table(STREAMS)
231            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
232
233        Ok(Self::read_stream_meta(&streams, name)?.is_some())
234    }
235
236    fn existing_shard_index(&self, name: &str) -> Result<usize> {
237        self.find_stream_shard_index(name)?
238            .ok_or_else(|| Error::NotFound(name.to_string()))
239    }
240
241    fn reserve_total_bytes(&self, bytes: u64) -> Result<()> {
242        if bytes == 0 {
243            return Ok(());
244        }
245
246        if self
247            .total_bytes
248            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
249                current
250                    .checked_add(bytes)
251                    .filter(|next| *next <= self.max_total_bytes)
252            })
253            .is_err()
254        {
255            return Err(Error::MemoryLimitExceeded);
256        }
257        Ok(())
258    }
259
260    fn rollback_total_bytes(&self, bytes: u64) {
261        self.saturating_sub_total_bytes(bytes);
262    }
263
264    fn saturating_sub_total_bytes(&self, bytes: u64) {
265        if bytes == 0 {
266            return;
267        }
268
269        self.total_bytes
270            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
271                Some(current.saturating_sub(bytes))
272            })
273            .ok();
274    }
275
276    fn read_stream_meta<T>(streams: &T, name: &str) -> Result<Option<StoredStreamMeta>>
277    where
278        T: ReadableTable<&'static str, &'static [u8]>,
279    {
280        let payload = streams
281            .get(name)
282            .map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
283
284        if let Some(payload) = payload {
285            let meta = serde_json::from_slice(payload.value())
286                .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
287            Ok(Some(meta))
288        } else {
289            Ok(None)
290        }
291    }
292
293    fn write_stream_meta(
294        streams: &mut Table<'_, &'static str, &'static [u8]>,
295        name: &str,
296        meta: &StoredStreamMeta,
297    ) -> Result<()> {
298        let payload = serde_json::to_vec(meta)
299            .map_err(|e| Self::storage_err("failed to serialize stream metadata", e))?;
300        streams
301            .insert(name, payload.as_slice())
302            .map_err(|e| Self::storage_err("failed to write stream metadata", e))?;
303        Ok(())
304    }
305
306    fn delete_stream_messages(
307        messages: &mut Table<'_, (&'static str, u64, u64), &'static [u8]>,
308        name: &str,
309    ) -> Result<()> {
310        let mut keys = Vec::new();
311        let iter = messages
312            .range((name, 0_u64, 0_u64)..=(name, u64::MAX, u64::MAX))
313            .map_err(|e| Self::storage_err("failed to iterate stream messages", e))?;
314
315        for item in iter {
316            let (key, _) = item.map_err(|e| Self::storage_err("failed to read message key", e))?;
317            let (_, read_seq, byte_offset) = key.value();
318            keys.push((read_seq, byte_offset));
319        }
320
321        for (read_seq, byte_offset) in keys {
322            messages
323                .remove((name, read_seq, byte_offset))
324                .map_err(|e| Self::storage_err("failed to delete message", e))?;
325        }
326
327        Ok(())
328    }
329
330    fn notifier_sender(&self, name: &str) -> broadcast::Sender<()> {
331        let mut guard = self.notifiers.write().expect("notifiers lock poisoned");
332        guard
333            .entry(name.to_string())
334            .or_insert_with(|| {
335                let (sender, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
336                sender
337            })
338            .clone()
339    }
340
341    fn notify_stream(&self, name: &str) {
342        if let Some(sender) = self
343            .notifiers
344            .read()
345            .expect("notifiers lock poisoned")
346            .get(name)
347        {
348            let _ = sender.send(());
349        }
350    }
351
352    fn drop_notifier(&self, name: &str) {
353        self.notifiers
354            .write()
355            .expect("notifiers lock poisoned")
356            .remove(name);
357    }
358
359    fn new_stream_meta(config: StreamConfig) -> StoredStreamMeta {
360        StoredStreamMeta {
361            config,
362            closed: false,
363            next_read_seq: 0,
364            next_byte_offset: 0,
365            total_bytes: 0,
366            created_at: Utc::now(),
367            updated_at: None,
368            last_seq: None,
369            producers: HashMap::new(),
370            fork_info: None,
371            ref_count: 0,
372            state: StreamState::Active,
373        }
374    }
375
376    fn batch_bytes(messages: &[Bytes]) -> u64 {
377        messages
378            .iter()
379            .map(|m| u64::try_from(m.len()).unwrap_or(u64::MAX))
380            .sum()
381    }
382
383    /// Read messages from a stream's shard within a given offset range.
384    ///
385    /// Returns messages with offsets `>= from_offset` and `< up_to` (if
386    /// `up_to` is `Some`). Used by fork read stitching to pull ancestor
387    /// messages without fork/tombstone validation.
388    fn read_messages_from_shard(
389        &self,
390        name: &str,
391        from_offset: &Offset,
392        up_to: Option<&Offset>,
393    ) -> Result<Vec<Bytes>> {
394        let shard = &self.shards[self.existing_shard_index(name)?];
395        let txn = shard
396            .db
397            .begin_read()
398            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
399        let message_table = txn
400            .open_table(MESSAGES)
401            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
402
403        let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
404            (0_u64, 0_u64)
405        } else {
406            from_offset.parse_components().unwrap_or((0, 0))
407        };
408
409        let iter = message_table
410            .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
411            .map_err(|e| Self::storage_err("failed to read shard message range", e))?;
412
413        let mut messages = Vec::new();
414        for item in iter {
415            let (key, value) =
416                item.map_err(|e| Self::storage_err("failed to read shard message", e))?;
417            if let Some(bound) = up_to {
418                let (_, read_seq, byte_offset) = key.value();
419                let msg_offset = Offset::new(read_seq, byte_offset);
420                if msg_offset >= *bound {
421                    break;
422                }
423            }
424            messages.push(Bytes::copy_from_slice(value.value()));
425        }
426
427        Ok(messages)
428    }
429
430    fn cascade_delete_acid(&self, parent_name: &str) -> Result<()> {
431        let mut current_parent = parent_name.to_string();
432        loop {
433            let Some(shard_idx) = self.find_stream_shard_index(&current_parent)? else {
434                break;
435            };
436            let shard = &self.shards[shard_idx];
437            let txn = Self::begin_write_txn(&shard.db)?;
438            let mut streams = txn
439                .open_table(STREAMS)
440                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
441
442            let Some(mut meta) = Self::read_stream_meta(&streams, &current_parent)? else {
443                break;
444            };
445
446            meta.ref_count = meta.ref_count.saturating_sub(1);
447
448            if meta.state == StreamState::Tombstone && meta.ref_count == 0 {
449                let fi = meta.fork_info.clone();
450                let total_bytes = meta.total_bytes;
451
452                let mut messages = txn
453                    .open_table(MESSAGES)
454                    .map_err(|e| Self::storage_err("failed to open messages table", e))?;
455                Self::delete_stream_messages(&mut messages, &current_parent)?;
456                drop(messages);
457
458                streams
459                    .remove(current_parent.as_str())
460                    .map_err(|e| Self::storage_err("failed to remove tombstoned parent", e))?;
461                drop(streams);
462                txn.commit()
463                    .map_err(|e| Self::storage_err("failed to commit cascade delete", e))?;
464
465                self.saturating_sub_total_bytes(total_bytes);
466                self.drop_notifier(&current_parent);
467
468                if let Some(fi) = fi {
469                    current_parent = fi.source_name;
470                } else {
471                    break;
472                }
473            } else {
474                Self::write_stream_meta(&mut streams, &current_parent, &meta)?;
475                drop(streams);
476                txn.commit()
477                    .map_err(|e| Self::storage_err("failed to commit ref_count decrement", e))?;
478                break;
479            }
480        }
481        Ok(())
482    }
483
484    /// Read messages from the MESSAGES table for a non-forked stream, starting
485    /// from the given offset. Opens its own read transaction.
486    fn read_non_forked_table_messages(
487        &self,
488        name: &str,
489        from_offset: &Offset,
490        shard_idx: usize,
491    ) -> Result<Vec<Bytes>> {
492        let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
493            (0_u64, 0_u64)
494        } else {
495            from_offset.parse_components().ok_or_else(|| {
496                Error::InvalidOffset("non-concrete offset in read range".to_string())
497            })?
498        };
499
500        let shard = &self.shards[shard_idx];
501        let txn = shard
502            .db
503            .begin_read()
504            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
505        let message_table = txn
506            .open_table(MESSAGES)
507            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
508
509        let iter = message_table
510            .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
511            .map_err(|e| Self::storage_err("failed to read stream range", e))?;
512
513        let mut messages = Vec::new();
514        for item in iter {
515            let (_, value) =
516                item.map_err(|e| Self::storage_err("failed to read stream message", e))?;
517            messages.push(Bytes::copy_from_slice(value.value()));
518        }
519
520        Ok(messages)
521    }
522
523    /// Traverse the fork chain and collect all messages for a forked stream read.
524    fn collect_fork_chain_messages(
525        &self,
526        name: &str,
527        from_offset: &Offset,
528        fi: &ForkInfo,
529    ) -> Result<Vec<Bytes>> {
530        let mut all_messages: Vec<Bytes> = Vec::new();
531        if from_offset.is_start() || *from_offset < fi.fork_offset {
532            let plan = super::fork::build_read_plan(&fi.source_name, |segment_name| {
533                let shard_idx = self.find_stream_shard_index(segment_name).ok().flatten()?;
534                let shard = &self.shards[shard_idx];
535                let txn = shard.db.begin_read().ok()?;
536                let streams = txn.open_table(STREAMS).ok()?;
537                let meta = Self::read_stream_meta(&streams, segment_name).ok()??;
538                Some(meta.fork_info)
539            });
540
541            for (i, segment) in plan.iter().enumerate() {
542                let effective_up_to = if i == plan.len() - 1 {
543                    Some(&fi.fork_offset)
544                } else {
545                    segment.read_up_to.as_ref()
546                };
547                let effective_from = if i == 0 {
548                    from_offset
549                } else {
550                    &Offset::start()
551                };
552                let segment_msgs =
553                    self.read_messages_from_shard(&segment.name, effective_from, effective_up_to)?;
554                all_messages.extend(segment_msgs);
555            }
556        }
557
558        let fork_msgs = if from_offset.is_start() || *from_offset <= fi.fork_offset {
559            self.read_messages_from_shard(name, &fi.fork_offset, None)?
560        } else {
561            self.read_messages_from_shard(name, from_offset, None)?
562        };
563        all_messages.extend(fork_msgs);
564
565        Ok(all_messages)
566    }
567
568    fn begin_write_txn(db: &Database) -> Result<redb::WriteTransaction> {
569        let mut txn = db
570            .begin_write()
571            .map_err(|e| Self::storage_err("failed to begin write transaction", e))?;
572        txn.set_durability(Durability::Immediate)
573            .map_err(|e| Self::storage_err("failed to set write durability", e))?;
574        Ok(txn)
575    }
576}
577
578/// Type-safe error classification trait for redb and IO error types.
579///
580/// Replaces the previous `Any`-based downcasting dispatcher with compile-time
581/// dispatch. Each implementation maps the concrete error type into the correct
582/// [`Error`] variant so that transient failures become 503, capacity failures
583/// become 507, and everything else maps to a generic 500.
584trait ClassifyError: std::fmt::Display {
585    fn into_storage_error(self, context: String, detail: String) -> Error;
586}
587
588impl ClassifyError for std::io::Error {
589    fn into_storage_error(self, context: String, detail: String) -> Error {
590        Error::classify_io_failure("acid", context, detail, &self)
591    }
592}
593
594impl ClassifyError for DatabaseError {
595    fn into_storage_error(self, context: String, detail: String) -> Error {
596        match &self {
597            DatabaseError::DatabaseAlreadyOpen => {
598                Error::storage_unavailable("acid", context, detail)
599            }
600            DatabaseError::Storage(storage_err) => {
601                AcidStorage::classify_redb_storage_error(context, storage_err, detail)
602            }
603            DatabaseError::RepairAborted | DatabaseError::UpgradeRequired(_) => {
604                Error::Storage(detail)
605            }
606            _ => {
607                warn!(error = %self, "unhandled redb DatabaseError variant");
608                Error::Storage(detail)
609            }
610        }
611    }
612}
613
614impl ClassifyError for TransactionError {
615    fn into_storage_error(self, context: String, detail: String) -> Error {
616        match &self {
617            TransactionError::Storage(storage_err) => {
618                AcidStorage::classify_redb_storage_error(context, storage_err, detail)
619            }
620            TransactionError::ReadTransactionStillInUse(_) => Error::Storage(detail),
621            _ => {
622                warn!(error = %self, "unhandled redb TransactionError variant");
623                Error::Storage(detail)
624            }
625        }
626    }
627}
628
629impl ClassifyError for TableError {
630    fn into_storage_error(self, context: String, detail: String) -> Error {
631        match &self {
632            TableError::Storage(storage_err) => {
633                AcidStorage::classify_redb_storage_error(context, storage_err, detail)
634            }
635            TableError::TableTypeMismatch { .. }
636            | TableError::TableIsMultimap(_)
637            | TableError::TableIsNotMultimap(_)
638            | TableError::TypeDefinitionChanged { .. }
639            | TableError::TableDoesNotExist(_)
640            | TableError::TableExists(_)
641            | TableError::TableAlreadyOpen(_, _) => Error::Storage(detail),
642            _ => {
643                warn!(error = %self, "unhandled redb TableError variant");
644                Error::Storage(detail)
645            }
646        }
647    }
648}
649
650impl ClassifyError for CommitError {
651    fn into_storage_error(self, context: String, detail: String) -> Error {
652        if let CommitError::Storage(storage_err) = &self {
653            AcidStorage::classify_redb_storage_error(context, storage_err, detail)
654        } else {
655            warn!(error = %self, "unhandled redb CommitError variant");
656            Error::Storage(detail)
657        }
658    }
659}
660
661impl ClassifyError for RedbStorageError {
662    fn into_storage_error(self, context: String, detail: String) -> Error {
663        AcidStorage::classify_redb_storage_error(context, &self, detail)
664    }
665}
666
667impl ClassifyError for SetDurabilityError {
668    fn into_storage_error(self, _context: String, detail: String) -> Error {
669        Error::Storage(detail)
670    }
671}
672
673impl ClassifyError for serde_json::Error {
674    fn into_storage_error(self, _context: String, detail: String) -> Error {
675        Error::Storage(detail)
676    }
677}