Skip to main content

durable_streams_server/storage/
acid.rs

1use super::{
2    CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
3    ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
4};
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use bytes::Bytes;
9use chrono::{DateTime, Utc};
10use redb::{Database, Durability, ReadableDatabase, ReadableTable, Table, TableDefinition};
11use seahash::hash;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::RwLock;
17use std::sync::atomic::{AtomicU64, Ordering};
18use tokio::sync::broadcast;
19
20const STREAMS: TableDefinition<&str, &[u8]> = TableDefinition::new("streams");
21const MESSAGES: TableDefinition<(&str, u64, u64), &[u8]> = TableDefinition::new("messages");
22
23const LAYOUT_FORMAT_VERSION: u32 = 1;
24const HASH_POLICY: &str = "seahash-v1";
25
26#[derive(Debug, Serialize, Deserialize)]
27struct LayoutManifest {
28    format_version: u32,
29    shard_count: usize,
30    hash_policy: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34struct StoredStreamMeta {
35    config: StreamConfig,
36    closed: bool,
37    next_read_seq: u64,
38    next_byte_offset: u64,
39    total_bytes: u64,
40    created_at: DateTime<Utc>,
41    last_seq: Option<String>,
42    producers: HashMap<String, ProducerState>,
43}
44
45#[derive(Debug)]
46struct AcidShard {
47    db: Database,
48}
49
50#[allow(clippy::module_name_repetitions)]
51pub struct AcidStorage {
52    shards: Vec<AcidShard>,
53    shard_count: usize,
54    total_bytes: AtomicU64,
55    max_total_bytes: u64,
56    max_stream_bytes: u64,
57    notifiers: RwLock<HashMap<String, broadcast::Sender<()>>>,
58}
59
60impl AcidStorage {
61    /// # Errors
62    ///
63    /// Returns `Error::Storage` if storage layout validation fails, shard
64    /// databases cannot be opened, or on-disk recovery cannot complete.
65    pub fn new(
66        root_dir: impl Into<PathBuf>,
67        shard_count: usize,
68        max_total_bytes: u64,
69        max_stream_bytes: u64,
70    ) -> Result<Self> {
71        Self::validate_shard_count(shard_count)?;
72
73        let root_dir = root_dir.into();
74        let acid_dir = Self::acid_dir(&root_dir);
75        fs::create_dir_all(&acid_dir).map_err(|e| {
76            Self::storage_err(
77                format!(
78                    "failed to create acid storage directory {}",
79                    acid_dir.display()
80                ),
81                e,
82            )
83        })?;
84
85        Self::load_or_create_layout(&acid_dir, shard_count)?;
86
87        let mut shards = Vec::with_capacity(shard_count);
88        for idx in 0..shard_count {
89            let shard_path = acid_dir.join(format!("shard_{idx:02x}.redb"));
90            let db = Database::create(&shard_path).map_err(|e| {
91                Self::storage_err(
92                    format!("failed to open shard database {}", shard_path.display()),
93                    e,
94                )
95            })?;
96            Self::ensure_schema(&db)?;
97            shards.push(AcidShard { db });
98        }
99
100        let storage = Self {
101            shards,
102            shard_count,
103            total_bytes: AtomicU64::new(0),
104            max_total_bytes,
105            max_stream_bytes,
106            notifiers: RwLock::new(HashMap::new()),
107        };
108
109        let total_bytes = storage.rebuild_state_from_disk()?;
110        storage.total_bytes.store(total_bytes, Ordering::Release);
111
112        Ok(storage)
113    }
114
115    #[must_use]
116    pub fn total_bytes(&self) -> u64 {
117        self.total_bytes.load(Ordering::Acquire)
118    }
119
120    fn validate_shard_count(shard_count: usize) -> Result<()> {
121        if !(1..=256).contains(&shard_count) {
122            return Err(Error::Storage(format!(
123                "acid shard count must be in range 1..=256, got {shard_count}"
124            )));
125        }
126        if !shard_count.is_power_of_two() {
127            return Err(Error::Storage(format!(
128                "acid shard count must be a power of two, got {shard_count}"
129            )));
130        }
131        Ok(())
132    }
133
134    fn storage_err(context: impl Into<String>, err: impl std::fmt::Display) -> Error {
135        Error::Storage(format!("{}: {err}", context.into()))
136    }
137
138    fn acid_dir(root_dir: &Path) -> PathBuf {
139        root_dir.join("acid")
140    }
141
142    fn layout_path(acid_dir: &Path) -> PathBuf {
143        acid_dir.join("layout.json")
144    }
145
146    fn load_or_create_layout(acid_dir: &Path, shard_count: usize) -> Result<()> {
147        let layout_path = Self::layout_path(acid_dir);
148        if layout_path.exists() {
149            let payload = fs::read(&layout_path).map_err(|e| {
150                Self::storage_err(
151                    format!("failed to read acid layout file {}", layout_path.display()),
152                    e,
153                )
154            })?;
155            let manifest: LayoutManifest = serde_json::from_slice(&payload).map_err(|e| {
156                Self::storage_err(
157                    format!("failed to parse acid layout file {}", layout_path.display()),
158                    e,
159                )
160            })?;
161
162            if manifest.format_version != LAYOUT_FORMAT_VERSION {
163                return Err(Error::Storage(format!(
164                    "acid layout mismatch: format_version={}, expected={}",
165                    manifest.format_version, LAYOUT_FORMAT_VERSION
166                )));
167            }
168            if manifest.shard_count != shard_count {
169                return Err(Error::Storage(format!(
170                    "acid layout mismatch: shard_count={}, expected={shard_count}",
171                    manifest.shard_count
172                )));
173            }
174            if manifest.hash_policy != HASH_POLICY {
175                return Err(Error::Storage(format!(
176                    "acid layout mismatch: hash_policy='{}', expected='{}'",
177                    manifest.hash_policy, HASH_POLICY
178                )));
179            }
180            return Ok(());
181        }
182
183        let manifest = LayoutManifest {
184            format_version: LAYOUT_FORMAT_VERSION,
185            shard_count,
186            hash_policy: HASH_POLICY.to_string(),
187        };
188        let payload = serde_json::to_vec_pretty(&manifest)
189            .map_err(|e| Self::storage_err("failed to serialize acid layout manifest", e))?;
190
191        let tmp_path = acid_dir.join("layout.json.tmp");
192        fs::write(&tmp_path, payload).map_err(|e| {
193            Self::storage_err(
194                format!("failed to write temp layout file {}", tmp_path.display()),
195                e,
196            )
197        })?;
198        fs::rename(&tmp_path, &layout_path).map_err(|e| {
199            Self::storage_err(
200                format!("failed to write layout file {}", layout_path.display()),
201                e,
202            )
203        })?;
204
205        Ok(())
206    }
207
208    #[must_use]
209    fn shard_index(&self, name: &str) -> usize {
210        let hash_u64 = hash(name.as_bytes());
211        let hash_usize = usize::try_from(hash_u64).unwrap_or_else(|_| {
212            let masked = hash_u64 & u64::from(u32::MAX);
213            usize::try_from(masked).expect("masked hash value must fit in usize")
214        });
215        hash_usize & (self.shard_count - 1)
216    }
217
218    fn shard(&self, name: &str) -> &AcidShard {
219        &self.shards[self.shard_index(name)]
220    }
221
222    fn reserve_total_bytes(&self, bytes: u64) -> Result<()> {
223        if bytes == 0 {
224            return Ok(());
225        }
226
227        if self
228            .total_bytes
229            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
230                current
231                    .checked_add(bytes)
232                    .filter(|next| *next <= self.max_total_bytes)
233            })
234            .is_err()
235        {
236            return Err(Error::MemoryLimitExceeded);
237        }
238        Ok(())
239    }
240
241    fn rollback_total_bytes(&self, bytes: u64) {
242        self.saturating_sub_total_bytes(bytes);
243    }
244
245    fn saturating_sub_total_bytes(&self, bytes: u64) {
246        if bytes == 0 {
247            return;
248        }
249
250        self.total_bytes
251            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
252                Some(current.saturating_sub(bytes))
253            })
254            .ok();
255    }
256
257    fn read_stream_meta<T>(streams: &T, name: &str) -> Result<Option<StoredStreamMeta>>
258    where
259        T: ReadableTable<&'static str, &'static [u8]>,
260    {
261        let payload = streams
262            .get(name)
263            .map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
264
265        if let Some(payload) = payload {
266            let meta = serde_json::from_slice(payload.value())
267                .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
268            Ok(Some(meta))
269        } else {
270            Ok(None)
271        }
272    }
273
274    fn write_stream_meta(
275        streams: &mut Table<'_, &'static str, &'static [u8]>,
276        name: &str,
277        meta: &StoredStreamMeta,
278    ) -> Result<()> {
279        let payload = serde_json::to_vec(meta)
280            .map_err(|e| Self::storage_err("failed to serialize stream metadata", e))?;
281        streams
282            .insert(name, payload.as_slice())
283            .map_err(|e| Self::storage_err("failed to write stream metadata", e))?;
284        Ok(())
285    }
286
287    fn delete_stream_messages(
288        messages: &mut Table<'_, (&'static str, u64, u64), &'static [u8]>,
289        name: &str,
290    ) -> Result<()> {
291        // redb table iterators cannot be safely mutated in-place; we must collect
292        // keys first, then remove in a second pass.
293        let mut keys = Vec::new();
294        let iter = messages
295            .range((name, 0_u64, 0_u64)..=(name, u64::MAX, u64::MAX))
296            .map_err(|e| Self::storage_err("failed to iterate stream messages", e))?;
297
298        for item in iter {
299            let (key, _) = item.map_err(|e| Self::storage_err("failed to read message key", e))?;
300            let (_, read_seq, byte_offset) = key.value();
301            keys.push((read_seq, byte_offset));
302        }
303
304        for (read_seq, byte_offset) in keys {
305            messages
306                .remove((name, read_seq, byte_offset))
307                .map_err(|e| Self::storage_err("failed to delete message", e))?;
308        }
309
310        Ok(())
311    }
312
313    fn notifier_sender(&self, name: &str) -> broadcast::Sender<()> {
314        if let Some(sender) = self
315            .notifiers
316            .read()
317            .expect("notifiers lock poisoned")
318            .get(name)
319        {
320            return sender.clone();
321        }
322
323        let mut guard = self.notifiers.write().expect("notifiers lock poisoned");
324        guard
325            .entry(name.to_string())
326            .or_insert_with(|| {
327                let (sender, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
328                sender
329            })
330            .clone()
331    }
332
333    fn notify_stream(&self, name: &str) {
334        if let Some(sender) = self
335            .notifiers
336            .read()
337            .expect("notifiers lock poisoned")
338            .get(name)
339        {
340            let _ = sender.send(());
341        }
342    }
343
344    fn drop_notifier(&self, name: &str) {
345        self.notifiers
346            .write()
347            .expect("notifiers lock poisoned")
348            .remove(name);
349    }
350
351    fn new_stream_meta(config: StreamConfig) -> StoredStreamMeta {
352        StoredStreamMeta {
353            config,
354            closed: false,
355            next_read_seq: 0,
356            next_byte_offset: 0,
357            total_bytes: 0,
358            created_at: Utc::now(),
359            last_seq: None,
360            producers: HashMap::new(),
361        }
362    }
363
364    fn batch_bytes(messages: &[Bytes]) -> u64 {
365        messages
366            .iter()
367            .map(|m| u64::try_from(m.len()).unwrap_or(u64::MAX))
368            .sum()
369    }
370
371    fn begin_write_txn(db: &Database) -> Result<redb::WriteTransaction> {
372        let mut txn = db
373            .begin_write()
374            .map_err(|e| Self::storage_err("failed to begin write transaction", e))?;
375        txn.set_durability(Durability::Immediate)
376            .map_err(|e| Self::storage_err("failed to set write durability", e))?;
377        Ok(txn)
378    }
379
380    fn ensure_schema(db: &Database) -> Result<()> {
381        let txn = Self::begin_write_txn(db)?;
382        let streams = txn
383            .open_table(STREAMS)
384            .map_err(|e| Self::storage_err("failed to initialize streams table", e))?;
385        let messages = txn
386            .open_table(MESSAGES)
387            .map_err(|e| Self::storage_err("failed to initialize messages table", e))?;
388        drop(messages);
389        drop(streams);
390        txn.commit()
391            .map_err(|e| Self::storage_err("failed to commit schema initialization", e))?;
392        Ok(())
393    }
394
395    fn rebuild_state_from_disk(&self) -> Result<u64> {
396        let mut total = 0_u64;
397        for shard in &self.shards {
398            total = total.saturating_add(self.rebuild_shard(shard)?);
399        }
400        Ok(total)
401    }
402
403    fn rebuild_shard(&self, shard: &AcidShard) -> Result<u64> {
404        let read_txn = shard
405            .db
406            .begin_read()
407            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
408        let streams = read_txn
409            .open_table(STREAMS)
410            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
411
412        let mut live_bytes = 0_u64;
413        let mut expired_names = Vec::new();
414
415        {
416            let iter = streams
417                .iter()
418                .map_err(|e| Self::storage_err("failed to iterate stream metadata", e))?;
419            for item in iter {
420                let (key, value) =
421                    item.map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
422                let stream_name = key.value().to_string();
423                let meta: StoredStreamMeta = serde_json::from_slice(value.value())
424                    .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
425                if super::is_stream_expired(&meta.config) {
426                    expired_names.push(stream_name);
427                } else {
428                    live_bytes = live_bytes.saturating_add(meta.total_bytes);
429                }
430            }
431        }
432
433        drop(streams);
434        drop(read_txn);
435
436        if expired_names.is_empty() {
437            return Ok(live_bytes);
438        }
439
440        let txn = Self::begin_write_txn(&shard.db)?;
441        let mut streams = txn
442            .open_table(STREAMS)
443            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
444        let mut messages = txn
445            .open_table(MESSAGES)
446            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
447
448        for name in &expired_names {
449            Self::delete_stream_messages(&mut messages, name)?;
450            streams
451                .remove(name.as_str())
452                .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
453            self.drop_notifier(name);
454        }
455
456        drop(messages);
457        drop(streams);
458        txn.commit()
459            .map_err(|e| Self::storage_err("failed to commit startup cleanup", e))?;
460
461        Ok(live_bytes)
462    }
463}
464
465impl Storage for AcidStorage {
466    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
467        let shard = self.shard(name);
468        let txn = Self::begin_write_txn(&shard.db)?;
469        let mut streams = txn
470            .open_table(STREAMS)
471            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
472        let mut messages = txn
473            .open_table(MESSAGES)
474            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
475
476        let mut removed_expired_bytes = 0_u64;
477
478        if let Some(existing) = Self::read_stream_meta(&streams, name)? {
479            if super::is_stream_expired(&existing.config) {
480                removed_expired_bytes = existing.total_bytes;
481                Self::delete_stream_messages(&mut messages, name)?;
482                streams
483                    .remove(name)
484                    .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
485            } else if existing.config == config {
486                return Ok(CreateStreamResult::AlreadyExists);
487            } else {
488                return Err(Error::ConfigMismatch);
489            }
490        }
491
492        let meta = Self::new_stream_meta(config);
493        Self::write_stream_meta(&mut streams, name, &meta)?;
494
495        drop(messages);
496        drop(streams);
497        txn.commit()
498            .map_err(|e| Self::storage_err("failed to commit create stream", e))?;
499
500        if removed_expired_bytes > 0 {
501            self.saturating_sub_total_bytes(removed_expired_bytes);
502            self.drop_notifier(name);
503        }
504
505        Ok(CreateStreamResult::Created)
506    }
507
508    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
509        let message_bytes = u64::try_from(data.len()).unwrap_or(u64::MAX);
510        self.reserve_total_bytes(message_bytes)?;
511
512        let result = (|| {
513            let shard = self.shard(name);
514            let txn = Self::begin_write_txn(&shard.db)?;
515            let mut streams = txn
516                .open_table(STREAMS)
517                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
518            let mut messages = txn
519                .open_table(MESSAGES)
520                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
521
522            let mut meta = Self::read_stream_meta(&streams, name)?
523                .ok_or_else(|| Error::NotFound(name.to_string()))?;
524
525            if super::is_stream_expired(&meta.config) {
526                return Err(Error::StreamExpired);
527            }
528            if meta.closed {
529                return Err(Error::StreamClosed);
530            }
531
532            super::validate_content_type(&meta.config.content_type, content_type)?;
533
534            if meta.total_bytes + message_bytes > self.max_stream_bytes {
535                return Err(Error::StreamSizeLimitExceeded);
536            }
537
538            let offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
539            messages
540                .insert(
541                    (name, meta.next_read_seq, meta.next_byte_offset),
542                    data.as_ref(),
543                )
544                .map_err(|e| Self::storage_err("failed to append message", e))?;
545
546            meta.next_read_seq += 1;
547            meta.next_byte_offset += message_bytes;
548            meta.total_bytes += message_bytes;
549
550            Self::write_stream_meta(&mut streams, name, &meta)?;
551
552            drop(messages);
553            drop(streams);
554            txn.commit()
555                .map_err(|e| Self::storage_err("failed to commit append", e))?;
556
557            Ok(offset)
558        })();
559
560        if result.is_err() {
561            self.rollback_total_bytes(message_bytes);
562            return result;
563        }
564
565        self.notify_stream(name);
566        result
567    }
568
569    fn batch_append(
570        &self,
571        name: &str,
572        messages: Vec<Bytes>,
573        content_type: &str,
574        seq: Option<&str>,
575    ) -> Result<Offset> {
576        if messages.is_empty() {
577            return Err(Error::InvalidHeader {
578                header: "Content-Length".to_string(),
579                reason: "batch cannot be empty".to_string(),
580            });
581        }
582
583        let batch_bytes = Self::batch_bytes(&messages);
584        self.reserve_total_bytes(batch_bytes)?;
585
586        let result = (|| {
587            let shard = self.shard(name);
588            let txn = Self::begin_write_txn(&shard.db)?;
589            let mut streams = txn
590                .open_table(STREAMS)
591                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
592            let mut message_table = txn
593                .open_table(MESSAGES)
594                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
595
596            let mut meta = Self::read_stream_meta(&streams, name)?
597                .ok_or_else(|| Error::NotFound(name.to_string()))?;
598
599            if super::is_stream_expired(&meta.config) {
600                return Err(Error::StreamExpired);
601            }
602            if meta.closed {
603                return Err(Error::StreamClosed);
604            }
605
606            super::validate_content_type(&meta.config.content_type, content_type)?;
607            let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
608
609            if meta.total_bytes + batch_bytes > self.max_stream_bytes {
610                return Err(Error::StreamSizeLimitExceeded);
611            }
612
613            for data in &messages {
614                let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
615                message_table
616                    .insert(
617                        (name, meta.next_read_seq, meta.next_byte_offset),
618                        data.as_ref(),
619                    )
620                    .map_err(|e| Self::storage_err("failed to append batch message", e))?;
621                meta.next_read_seq += 1;
622                meta.next_byte_offset += len;
623                meta.total_bytes += len;
624            }
625
626            if let Some(new_seq) = pending_seq {
627                meta.last_seq = Some(new_seq);
628            }
629
630            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
631            Self::write_stream_meta(&mut streams, name, &meta)?;
632
633            drop(message_table);
634            drop(streams);
635            txn.commit()
636                .map_err(|e| Self::storage_err("failed to commit batch append", e))?;
637
638            Ok(next_offset)
639        })();
640
641        if result.is_err() {
642            self.rollback_total_bytes(batch_bytes);
643            return result;
644        }
645
646        self.notify_stream(name);
647        result
648    }
649
650    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
651        let shard = self.shard(name);
652        let txn = shard
653            .db
654            .begin_read()
655            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
656
657        let streams = txn
658            .open_table(STREAMS)
659            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
660        let message_table = txn
661            .open_table(MESSAGES)
662            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
663
664        let meta = Self::read_stream_meta(&streams, name)?
665            .ok_or_else(|| Error::NotFound(name.to_string()))?;
666
667        if super::is_stream_expired(&meta.config) {
668            return Err(Error::StreamExpired);
669        }
670
671        if from_offset.is_now() {
672            return Ok(ReadResult {
673                messages: Vec::new(),
674                next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
675                at_tail: true,
676                closed: meta.closed,
677            });
678        }
679
680        let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
681            (0_u64, 0_u64)
682        } else {
683            from_offset.parse_components().ok_or_else(|| {
684                Error::InvalidOffset("non-concrete offset in read range".to_string())
685            })?
686        };
687
688        let iter = message_table
689            .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
690            .map_err(|e| Self::storage_err("failed to read stream range", e))?;
691
692        let mut messages = Vec::new();
693        for item in iter {
694            let (_, value) =
695                item.map_err(|e| Self::storage_err("failed to read stream message", e))?;
696            messages.push(Bytes::copy_from_slice(value.value()));
697        }
698
699        Ok(ReadResult {
700            messages,
701            next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
702            at_tail: true,
703            closed: meta.closed,
704        })
705    }
706
707    fn delete(&self, name: &str) -> Result<()> {
708        let shard = self.shard(name);
709        let txn = Self::begin_write_txn(&shard.db)?;
710        let mut streams = txn
711            .open_table(STREAMS)
712            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
713        let mut messages = txn
714            .open_table(MESSAGES)
715            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
716
717        let meta = Self::read_stream_meta(&streams, name)?
718            .ok_or_else(|| Error::NotFound(name.to_string()))?;
719
720        Self::delete_stream_messages(&mut messages, name)?;
721        streams
722            .remove(name)
723            .map_err(|e| Self::storage_err("failed to remove stream metadata", e))?;
724
725        drop(messages);
726        drop(streams);
727        txn.commit()
728            .map_err(|e| Self::storage_err("failed to commit delete", e))?;
729
730        self.saturating_sub_total_bytes(meta.total_bytes);
731        self.drop_notifier(name);
732        Ok(())
733    }
734
735    fn head(&self, name: &str) -> Result<StreamMetadata> {
736        let shard = self.shard(name);
737        let txn = shard
738            .db
739            .begin_read()
740            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
741
742        let streams = txn
743            .open_table(STREAMS)
744            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
745
746        let meta = Self::read_stream_meta(&streams, name)?
747            .ok_or_else(|| Error::NotFound(name.to_string()))?;
748
749        if super::is_stream_expired(&meta.config) {
750            return Err(Error::StreamExpired);
751        }
752
753        Ok(StreamMetadata {
754            config: meta.config,
755            next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
756            closed: meta.closed,
757            total_bytes: meta.total_bytes,
758            message_count: meta.next_read_seq,
759            created_at: meta.created_at,
760        })
761    }
762
763    fn close_stream(&self, name: &str) -> Result<()> {
764        let shard = self.shard(name);
765        let txn = Self::begin_write_txn(&shard.db)?;
766        let mut streams = txn
767            .open_table(STREAMS)
768            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
769
770        let mut meta = Self::read_stream_meta(&streams, name)?
771            .ok_or_else(|| Error::NotFound(name.to_string()))?;
772
773        if super::is_stream_expired(&meta.config) {
774            return Err(Error::StreamExpired);
775        }
776
777        meta.closed = true;
778        Self::write_stream_meta(&mut streams, name, &meta)?;
779
780        drop(streams);
781        txn.commit()
782            .map_err(|e| Self::storage_err("failed to commit close stream", e))?;
783
784        self.notify_stream(name);
785        Ok(())
786    }
787
788    fn append_with_producer(
789        &self,
790        name: &str,
791        messages: Vec<Bytes>,
792        content_type: &str,
793        producer: &ProducerHeaders,
794        should_close: bool,
795        seq: Option<&str>,
796    ) -> Result<ProducerAppendResult> {
797        let batch_bytes = Self::batch_bytes(&messages);
798        self.reserve_total_bytes(batch_bytes)?;
799
800        let result = (|| {
801            let shard = self.shard(name);
802            let txn = Self::begin_write_txn(&shard.db)?;
803            let mut streams = txn
804                .open_table(STREAMS)
805                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
806            let mut message_table = txn
807                .open_table(MESSAGES)
808                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
809
810            let mut meta = Self::read_stream_meta(&streams, name)?
811                .ok_or_else(|| Error::NotFound(name.to_string()))?;
812
813            if super::is_stream_expired(&meta.config) {
814                return Err(Error::StreamExpired);
815            }
816
817            super::cleanup_stale_producers(&mut meta.producers);
818
819            if !messages.is_empty() {
820                super::validate_content_type(&meta.config.content_type, content_type)?;
821            }
822
823            match super::check_producer(
824                meta.producers.get(producer.id.as_str()),
825                producer,
826                meta.closed,
827            )? {
828                ProducerCheck::Accept => {}
829                ProducerCheck::Duplicate { epoch, seq } => {
830                    return Ok(ProducerAppendResult::Duplicate {
831                        epoch,
832                        seq,
833                        next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
834                        closed: meta.closed,
835                    });
836                }
837            }
838
839            let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
840
841            if meta.total_bytes + batch_bytes > self.max_stream_bytes {
842                return Err(Error::StreamSizeLimitExceeded);
843            }
844
845            for data in &messages {
846                let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
847                message_table
848                    .insert(
849                        (name, meta.next_read_seq, meta.next_byte_offset),
850                        data.as_ref(),
851                    )
852                    .map_err(|e| Self::storage_err("failed to append producer message", e))?;
853                meta.next_read_seq += 1;
854                meta.next_byte_offset += len;
855                meta.total_bytes += len;
856            }
857
858            if let Some(new_seq) = pending_seq {
859                meta.last_seq = Some(new_seq);
860            }
861            if should_close {
862                meta.closed = true;
863            }
864
865            meta.producers.insert(
866                producer.id.clone(),
867                ProducerState {
868                    epoch: producer.epoch,
869                    last_seq: producer.seq,
870                    updated_at: Utc::now(),
871                },
872            );
873
874            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
875            let closed = meta.closed;
876
877            Self::write_stream_meta(&mut streams, name, &meta)?;
878            drop(message_table);
879            drop(streams);
880            txn.commit()
881                .map_err(|e| Self::storage_err("failed to commit producer append", e))?;
882
883            Ok(ProducerAppendResult::Accepted {
884                epoch: producer.epoch,
885                seq: producer.seq,
886                next_offset,
887                closed,
888            })
889        })();
890
891        if result.is_err() || matches!(result, Ok(ProducerAppendResult::Duplicate { .. })) {
892            self.rollback_total_bytes(batch_bytes);
893        }
894
895        if result.is_ok() && (!messages.is_empty() || should_close) {
896            self.notify_stream(name);
897        }
898
899        result
900    }
901
902    fn create_stream_with_data(
903        &self,
904        name: &str,
905        config: StreamConfig,
906        messages: Vec<Bytes>,
907        should_close: bool,
908    ) -> Result<CreateWithDataResult> {
909        let batch_bytes = Self::batch_bytes(&messages);
910
911        let mut reserved = false;
912        let mut removed_expired_bytes = 0_u64;
913
914        let result = (|| {
915            let shard = self.shard(name);
916            let txn = Self::begin_write_txn(&shard.db)?;
917            let mut streams = txn
918                .open_table(STREAMS)
919                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
920            let mut message_table = txn
921                .open_table(MESSAGES)
922                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
923
924            if let Some(existing) = Self::read_stream_meta(&streams, name)? {
925                if super::is_stream_expired(&existing.config) {
926                    removed_expired_bytes = existing.total_bytes;
927                    Self::delete_stream_messages(&mut message_table, name)?;
928                    streams
929                        .remove(name)
930                        .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
931                } else if existing.config == config {
932                    return Ok(CreateWithDataResult {
933                        status: CreateStreamResult::AlreadyExists,
934                        next_offset: Offset::new(existing.next_read_seq, existing.next_byte_offset),
935                        closed: existing.closed,
936                    });
937                } else {
938                    return Err(Error::ConfigMismatch);
939                }
940            }
941
942            if batch_bytes > 0 {
943                self.reserve_total_bytes(batch_bytes)?;
944                reserved = true;
945            }
946
947            let mut meta = Self::new_stream_meta(config);
948
949            if batch_bytes > 0 {
950                if meta.total_bytes + batch_bytes > self.max_stream_bytes {
951                    return Err(Error::StreamSizeLimitExceeded);
952                }
953                for data in &messages {
954                    let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
955                    message_table
956                        .insert(
957                            (name, meta.next_read_seq, meta.next_byte_offset),
958                            data.as_ref(),
959                        )
960                        .map_err(|e| {
961                            Self::storage_err("failed to append create-with-data message", e)
962                        })?;
963                    meta.next_read_seq += 1;
964                    meta.next_byte_offset += len;
965                    meta.total_bytes += len;
966                }
967            }
968
969            if should_close {
970                meta.closed = true;
971            }
972
973            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
974            let closed = meta.closed;
975
976            Self::write_stream_meta(&mut streams, name, &meta)?;
977            drop(message_table);
978            drop(streams);
979            txn.commit()
980                .map_err(|e| Self::storage_err("failed to commit create stream with data", e))?;
981
982            Ok(CreateWithDataResult {
983                status: CreateStreamResult::Created,
984                next_offset,
985                closed,
986            })
987        })();
988
989        if result.is_err() && reserved {
990            self.rollback_total_bytes(batch_bytes);
991        }
992
993        if result.is_ok() {
994            if removed_expired_bytes > 0 {
995                self.saturating_sub_total_bytes(removed_expired_bytes);
996                self.drop_notifier(name);
997            }
998            if should_close || !messages.is_empty() {
999                self.notify_stream(name);
1000            }
1001        }
1002
1003        result
1004    }
1005
1006    fn exists(&self, name: &str) -> bool {
1007        let shard = self.shard(name);
1008        let Ok(txn) = shard.db.begin_read() else {
1009            return false;
1010        };
1011        let Ok(streams) = txn.open_table(STREAMS) else {
1012            return false;
1013        };
1014
1015        match Self::read_stream_meta(&streams, name) {
1016            Ok(Some(meta)) => !super::is_stream_expired(&meta.config),
1017            _ => false,
1018        }
1019    }
1020
1021    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1022        let shard = self.shard(name);
1023        let txn = shard.db.begin_read().ok()?;
1024        let streams = txn.open_table(STREAMS).ok()?;
1025        let meta = Self::read_stream_meta(&streams, name).ok()??;
1026
1027        if super::is_stream_expired(&meta.config) {
1028            return None;
1029        }
1030
1031        Some(self.notifier_sender(name).subscribe())
1032    }
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037    use super::*;
1038    use chrono::Duration;
1039    use std::sync::Arc;
1040    use std::sync::atomic::{AtomicU64, Ordering};
1041    use std::thread;
1042
1043    fn test_storage_dir() -> PathBuf {
1044        static COUNTER: AtomicU64 = AtomicU64::new(0);
1045        let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1046        let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
1047        let pid = std::process::id();
1048        std::env::temp_dir().join(format!("ds-acid-storage-test-{stamp}-{pid}-{seq}"))
1049    }
1050
1051    fn test_storage() -> AcidStorage {
1052        AcidStorage::new(test_storage_dir(), 16, 1024 * 1024, 100 * 1024)
1053            .expect("acid storage should initialize")
1054    }
1055
1056    fn producer(id: &str, epoch: u64, seq: u64) -> ProducerHeaders {
1057        ProducerHeaders {
1058            id: id.to_string(),
1059            epoch,
1060            seq,
1061        }
1062    }
1063
1064    #[test]
1065    fn test_restore_from_disk() {
1066        let root = test_storage_dir();
1067        let cfg = StreamConfig::new("text/plain".to_string());
1068
1069        {
1070            let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1071            storage.create_stream("events", cfg.clone()).unwrap();
1072            storage
1073                .append("events", Bytes::from("event-1"), "text/plain")
1074                .unwrap();
1075            storage
1076                .append("events", Bytes::from("event-2"), "text/plain")
1077                .unwrap();
1078        }
1079
1080        let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1081        let read = restored.read("events", &Offset::start()).unwrap();
1082
1083        assert_eq!(read.messages.len(), 2);
1084        assert_eq!(read.messages[0], Bytes::from("event-1"));
1085        assert_eq!(read.messages[1], Bytes::from("event-2"));
1086    }
1087
1088    #[test]
1089    fn test_restore_closed_stream_from_disk() {
1090        let root = test_storage_dir();
1091        let cfg = StreamConfig::new("text/plain".to_string());
1092
1093        {
1094            let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1095            storage.create_stream("s", cfg.clone()).unwrap();
1096            storage
1097                .append("s", Bytes::from("data"), "text/plain")
1098                .unwrap();
1099            storage.close_stream("s").unwrap();
1100        }
1101
1102        let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1103        let meta = restored.head("s").unwrap();
1104        assert!(meta.closed);
1105        assert_eq!(meta.message_count, 1);
1106
1107        assert!(matches!(
1108            restored.append("s", Bytes::from("more"), "text/plain"),
1109            Err(Error::StreamClosed)
1110        ));
1111    }
1112
1113    #[test]
1114    fn test_restart_preserves_producer_state() {
1115        let root = test_storage_dir();
1116
1117        {
1118            let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1119            storage
1120                .create_stream("s", StreamConfig::new("text/plain".to_string()))
1121                .unwrap();
1122            let result = storage
1123                .append_with_producer(
1124                    "s",
1125                    vec![Bytes::from("x")],
1126                    "text/plain",
1127                    &producer("p1", 0, 0),
1128                    false,
1129                    None,
1130                )
1131                .unwrap();
1132            assert!(matches!(result, ProducerAppendResult::Accepted { .. }));
1133        }
1134
1135        let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1136        let dup = restored
1137            .append_with_producer(
1138                "s",
1139                vec![Bytes::from("x")],
1140                "text/plain",
1141                &producer("p1", 0, 0),
1142                false,
1143                None,
1144            )
1145            .unwrap();
1146        assert!(matches!(dup, ProducerAppendResult::Duplicate { .. }));
1147    }
1148
1149    #[test]
1150    fn test_shard_routing_same_stream_is_stable() {
1151        let storage = test_storage();
1152        let a = storage.shard_index("same-stream");
1153        let b = storage.shard_index("same-stream");
1154        assert_eq!(a, b);
1155    }
1156
1157    #[test]
1158    fn test_shard_distribution_uses_multiple_shards() {
1159        let storage = test_storage();
1160        let mut seen = std::collections::HashSet::new();
1161        for i in 0..256 {
1162            seen.insert(storage.shard_index(&format!("stream-{i}")));
1163        }
1164        assert!(seen.len() > 1);
1165    }
1166
1167    #[test]
1168    fn test_startup_purges_expired_streams() {
1169        let root = test_storage_dir();
1170        {
1171            let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1172            let expires = Utc::now() + Duration::milliseconds(50);
1173            let cfg = StreamConfig::new("text/plain".to_string()).with_expires_at(expires);
1174            storage.create_stream("expiring", cfg).unwrap();
1175            storage
1176                .append("expiring", Bytes::from("x"), "text/plain")
1177                .unwrap();
1178        }
1179
1180        std::thread::sleep(std::time::Duration::from_millis(100));
1181
1182        let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1183        assert!(!restored.exists("expiring"));
1184        assert!(matches!(
1185            restored.read("expiring", &Offset::start()),
1186            Err(Error::NotFound(_) | Error::StreamExpired)
1187        ));
1188    }
1189
1190    #[test]
1191    fn test_global_cap_strict_under_concurrency() {
1192        let storage = Arc::new(AcidStorage::new(test_storage_dir(), 16, 120, 120).unwrap());
1193        let shard_count = (0..8)
1194            .map(|i| storage.shard_index(&format!("s-{i}")))
1195            .collect::<std::collections::HashSet<_>>()
1196            .len();
1197        assert!(
1198            shard_count > 1,
1199            "test streams must span multiple shards to validate cross-shard cap behavior"
1200        );
1201
1202        for i in 0..8 {
1203            storage
1204                .create_stream(
1205                    &format!("s-{i}"),
1206                    StreamConfig::new("text/plain".to_string()),
1207                )
1208                .unwrap();
1209        }
1210
1211        let mut handles = Vec::new();
1212        for i in 0..8 {
1213            let storage = Arc::clone(&storage);
1214            handles.push(thread::spawn(move || {
1215                storage.append(&format!("s-{i}"), Bytes::from(vec![0_u8; 40]), "text/plain")
1216            }));
1217        }
1218
1219        for h in handles {
1220            let _ = h.join().unwrap();
1221        }
1222
1223        assert!(storage.total_bytes() <= 120);
1224    }
1225
1226    #[test]
1227    fn test_layout_manifest_mismatch_fails_fast() {
1228        let root = test_storage_dir();
1229        let first = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024);
1230        assert!(first.is_ok());
1231
1232        let mismatch = AcidStorage::new(root, 8, 1024 * 1024, 100 * 1024);
1233        assert!(matches!(mismatch, Err(Error::Storage(_))));
1234    }
1235
1236    #[test]
1237    fn test_layout_manifest_invalid_json_fails_fast() {
1238        let root = test_storage_dir();
1239        let acid_dir = root.join("acid");
1240        fs::create_dir_all(&acid_dir).unwrap();
1241        fs::write(acid_dir.join("layout.json"), b"{invalid-json").unwrap();
1242
1243        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1244        assert!(matches!(reopened, Err(Error::Storage(_))));
1245    }
1246
1247    #[test]
1248    fn test_layout_manifest_hash_policy_mismatch_fails_fast() {
1249        let root = test_storage_dir();
1250        let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1251        drop(storage);
1252
1253        let layout_path = root.join("acid").join("layout.json");
1254        let mut layout: serde_json::Value =
1255            serde_json::from_slice(&fs::read(&layout_path).unwrap()).unwrap();
1256        layout["hash_policy"] = serde_json::Value::String("tampered-hash-policy".to_string());
1257        fs::write(layout_path, serde_json::to_vec_pretty(&layout).unwrap()).unwrap();
1258
1259        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1260        assert!(matches!(reopened, Err(Error::Storage(_))));
1261    }
1262
1263    #[test]
1264    fn test_corrupted_stream_metadata_fails_fast_on_startup() {
1265        let root = test_storage_dir();
1266        let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1267        storage
1268            .create_stream("s", StreamConfig::new("text/plain".to_string()))
1269            .unwrap();
1270        storage
1271            .append("s", Bytes::from("payload"), "text/plain")
1272            .unwrap();
1273
1274        // Simulate on-disk corruption of stream metadata.
1275        let shard_idx = storage.shard_index("s");
1276        let txn = AcidStorage::begin_write_txn(&storage.shards[shard_idx].db).unwrap();
1277        let mut streams = txn.open_table(STREAMS).unwrap();
1278        let corrupt = b"{not-json".to_vec();
1279        streams.insert("s", corrupt.as_slice()).unwrap();
1280        drop(streams);
1281        txn.commit().unwrap();
1282        drop(storage);
1283
1284        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1285        assert!(matches!(reopened, Err(Error::Storage(_))));
1286    }
1287
1288    #[test]
1289    fn test_tampered_shard_file_fails_fast_on_startup() {
1290        let root = test_storage_dir();
1291        let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1292        storage
1293            .create_stream("s", StreamConfig::new("text/plain".to_string()))
1294            .unwrap();
1295        storage
1296            .append("s", Bytes::from("payload"), "text/plain")
1297            .unwrap();
1298        let shard_idx = storage.shard_index("s");
1299        drop(storage);
1300
1301        let shard_path = root
1302            .join("acid")
1303            .join(format!("shard_{shard_idx:02x}.redb"));
1304        fs::write(&shard_path, b"not-a-valid-redb-file").unwrap();
1305
1306        let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1307        assert!(matches!(reopened, Err(Error::Storage(_))));
1308    }
1309}