Skip to main content

durable_streams_server/storage/
file.rs

1//! File-backed storage using one append-only log per stream.
2//!
3//! This backend keeps an in-memory index for fast reads and stores stream data
4//! beneath a caller-supplied root directory. It is a good fit when you want
5//! local persistence without introducing an external database.
6
7use super::{
8    CreateStreamResult, CreateWithDataResult, ForkInfo, NOTIFY_CHANNEL_CAPACITY,
9    ProducerAppendResult, ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig,
10    StreamMetadata, StreamState,
11};
12use crate::protocol::error::{Error, Result};
13use crate::protocol::offset::Offset;
14use crate::protocol::producer::ProducerHeaders;
15use base64::Engine;
16use bytes::Bytes;
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::fs::{self, File, OpenOptions};
21use std::io::{self, Read, Seek, SeekFrom, Write};
22use std::path::{Path, PathBuf};
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25use tokio::sync::broadcast;
26use tracing::warn;
27
28/// Binary record header size: little-endian `u32` payload length.
29const RECORD_HEADER_BYTES: usize = 4;
30const INITIAL_INDEX_CAPACITY: usize = 256;
31const INITIAL_PRODUCERS_CAPACITY: usize = 8;
32
33/// Re-issue a syscall when interrupted by a signal (`EINTR`).
34///
35/// Rust's standard library does not retry `fsync`, `rename`, or `mkdir` on
36/// `EINTR`, so this wrapper handles the standard POSIX retry contract.
37/// Unlike the previous `retry_on_eintr`, this never sleeps — `EINTR`
38/// retries are immediate by convention. Other transient errors (`WouldBlock`,
39/// `TimedOut`) propagate immediately and become 503 via error classification.
40fn retry_on_eintr<T>(
41    mut op: impl FnMut() -> std::result::Result<T, io::Error>,
42) -> std::result::Result<T, io::Error> {
43    loop {
44        match op() {
45            Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
46            result => return result,
47        }
48    }
49}
50
51#[derive(Debug, Clone)]
52struct MessageIndex {
53    offset: Offset,
54    file_pos: u64,
55    byte_len: u64,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
59struct StreamMeta {
60    name: String,
61    config: StreamConfig,
62    closed: bool,
63    created_at: DateTime<Utc>,
64    #[serde(default)]
65    updated_at: Option<DateTime<Utc>>,
66    last_seq: Option<String>,
67    producers: HashMap<String, ProducerState>,
68    #[serde(default)]
69    fork_info: Option<ForkInfo>,
70    #[serde(default)]
71    ref_count: u32,
72    #[serde(default)]
73    state: StreamState,
74}
75
76struct StreamEntry {
77    config: StreamConfig,
78    index: Vec<MessageIndex>,
79    closed: bool,
80    next_read_seq: u64,
81    next_byte_offset: u64,
82    total_bytes: u64,
83    created_at: DateTime<Utc>,
84    updated_at: Option<DateTime<Utc>>,
85    producers: HashMap<String, ProducerState>,
86    notify: broadcast::Sender<()>,
87    last_seq: Option<String>,
88    file: File,
89    file_len: u64,
90    dir: PathBuf,
91    fork_info: Option<ForkInfo>,
92    ref_count: u32,
93    state: StreamState,
94}
95
96impl StreamEntry {
97    fn new(config: StreamConfig, file: File, dir: PathBuf) -> Self {
98        let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
99        let file_len = file.metadata().map_or(0, |m| m.len());
100        Self {
101            config,
102            index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
103            closed: false,
104            next_read_seq: 0,
105            next_byte_offset: 0,
106            total_bytes: 0,
107            created_at: Utc::now(),
108            updated_at: None,
109            producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
110            notify,
111            last_seq: None,
112            file,
113            file_len,
114            dir,
115            fork_info: None,
116            ref_count: 0,
117            state: StreamState::Active,
118        }
119    }
120}
121
122/// High-throughput file-backed storage.
123///
124/// Design:
125/// - One append-only log file per stream (`data.log`)
126/// - In-memory offset/file index for fast reads
127/// - Stream-level write lock serializes appends and preserves monotonic offsets
128/// - Batched write per append call reduces syscall overhead
129///
130/// `sync_on_append = false` prioritizes throughput and may lose recently
131/// appended data on crash. `sync_on_append = true` trades latency for stronger
132/// durability semantics.
133#[allow(clippy::module_name_repetitions)]
134pub struct FileStorage {
135    streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
136    total_bytes: AtomicU64,
137    max_total_bytes: u64,
138    max_stream_bytes: u64,
139    root_dir: PathBuf,
140    root_dir_canonical: PathBuf,
141    sync_on_append: bool,
142}
143
144impl FileStorage {
145    /// Create or reopen a file-backed storage root.
146    ///
147    /// Existing streams under `root_dir` are discovered and indexed during
148    /// startup so subsequent reads can serve offsets without rescanning files.
149    ///
150    /// # Errors
151    ///
152    /// Returns `Error::Storage` if the root directory cannot be created or
153    /// existing streams fail to load from disk.
154    pub fn new(
155        root_dir: impl Into<PathBuf>,
156        max_total_bytes: u64,
157        max_stream_bytes: u64,
158        sync_on_append: bool,
159    ) -> Result<Self> {
160        let root_dir = root_dir.into();
161        retry_on_eintr(|| fs::create_dir_all(&root_dir)).map_err(|e| {
162            Error::classify_io_failure(
163                "file",
164                "create storage directory",
165                format!(
166                    "failed to create storage directory {}: {e}",
167                    root_dir.display()
168                ),
169                &e,
170            )
171        })?;
172
173        let root_dir_canonical = fs::canonicalize(&root_dir).map_err(|e| {
174            Error::Storage(format!(
175                "failed to canonicalize storage directory {}: {e}",
176                root_dir.display()
177            ))
178        })?;
179
180        let storage = Self {
181            streams: RwLock::new(HashMap::new()),
182            total_bytes: AtomicU64::new(0),
183            max_total_bytes,
184            max_stream_bytes,
185            root_dir,
186            root_dir_canonical,
187            sync_on_append,
188        };
189        storage.load_existing_streams()?;
190        Ok(storage)
191    }
192
193    /// Return the currently tracked total payload bytes across all streams.
194    #[must_use]
195    pub fn total_bytes(&self) -> u64 {
196        self.total_bytes.load(Ordering::Acquire)
197    }
198
199    /// Map a stream name to a directory path inside `root_dir`.
200    ///
201    /// Uses base64url encoding (alphabet `[A-Za-z0-9_-]`) so the output
202    /// cannot contain path separators, but we verify containment anyway
203    /// as defense in depth.
204    fn stream_dir_for_name(&self, name: &str) -> Result<PathBuf> {
205        let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode(name.as_bytes());
206
207        // Reject path traversal sequences and separators. Base64url encoding
208        // (alphabet [A-Za-z0-9_-]) cannot produce these, but the explicit
209        // checks act as defense in depth and satisfy static analysis (CodeQL
210        // rust/path-injection).
211        if encoded.contains("..") || encoded.contains('/') || encoded.contains('\\') {
212            return Err(Error::Storage(
213                "encoded stream name contains path traversal characters".to_string(),
214            ));
215        }
216        if !encoded
217            .chars()
218            .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
219        {
220            return Err(Error::Storage(
221                "encoded stream directory contains invalid characters".to_string(),
222            ));
223        }
224
225        let dir = self.root_dir.join(&encoded);
226        if !dir.starts_with(&self.root_dir) {
227            return Err(Error::Storage(format!(
228                "stream directory escapes storage root: {encoded}"
229            )));
230        }
231        Ok(dir)
232    }
233
234    fn validate_stream_dir(&self, dir: &Path) -> Result<()> {
235        if !dir.starts_with(&self.root_dir) {
236            return Err(Error::Storage(format!(
237                "path escapes storage root: {}",
238                dir.display()
239            )));
240        }
241
242        let rel = dir.strip_prefix(&self.root_dir).map_err(|e| {
243            Error::Storage(format!(
244                "failed to validate storage path {}: {e}",
245                dir.display()
246            ))
247        })?;
248        if rel.components().count() != 1 {
249            return Err(Error::Storage(format!(
250                "invalid stream path depth: {}",
251                dir.display()
252            )));
253        }
254
255        if dir.exists() {
256            let metadata = fs::symlink_metadata(dir).map_err(|e| {
257                Error::Storage(format!(
258                    "failed to stat stream directory {}: {e}",
259                    dir.display()
260                ))
261            })?;
262            if metadata.file_type().is_symlink() {
263                return Err(Error::Storage(format!(
264                    "stream directory cannot be a symlink: {}",
265                    dir.display()
266                )));
267            }
268            if !metadata.is_dir() {
269                return Err(Error::Storage(format!(
270                    "stream path is not a directory: {}",
271                    dir.display()
272                )));
273            }
274
275            let canonical = fs::canonicalize(dir).map_err(|e| {
276                Error::Storage(format!(
277                    "failed to canonicalize stream directory {}: {e}",
278                    dir.display()
279                ))
280            })?;
281            if !canonical.starts_with(&self.root_dir_canonical) {
282                return Err(Error::Storage(format!(
283                    "stream directory resolves outside storage root: {}",
284                    dir.display()
285                )));
286            }
287        }
288
289        Ok(())
290    }
291
292    fn data_log_path(dir: &Path) -> PathBuf {
293        dir.join("data.log")
294    }
295
296    fn meta_path(dir: &Path) -> PathBuf {
297        dir.join("meta.json")
298    }
299
300    fn write_metadata_for(&self, name: &str, entry: &StreamEntry) -> Result<()> {
301        self.validate_stream_dir(&entry.dir)?;
302        let meta = StreamMeta {
303            name: name.to_string(),
304            config: entry.config.clone(),
305            closed: entry.closed,
306            created_at: entry.created_at,
307            updated_at: entry.updated_at,
308            last_seq: entry.last_seq.clone(),
309            producers: entry.producers.clone(),
310            fork_info: entry.fork_info.clone(),
311            ref_count: entry.ref_count,
312            state: entry.state,
313        };
314
315        let meta_path = Self::meta_path(&entry.dir);
316        let tmp_path = entry.dir.join("meta.json.tmp");
317        let payload = serde_json::to_vec(&meta)
318            .map_err(|e| Error::Storage(format!("failed to serialize stream metadata: {e}")))?;
319
320        retry_on_eintr(|| fs::write(&tmp_path, payload.as_slice())).map_err(|e| {
321            Error::classify_io_failure(
322                "file",
323                "write stream metadata temp file",
324                format!(
325                    "failed to write metadata temp file {}: {e}",
326                    tmp_path.display()
327                ),
328                &e,
329            )
330        })?;
331
332        retry_on_eintr(|| fs::rename(&tmp_path, &meta_path)).map_err(|e| {
333            Error::classify_io_failure(
334                "file",
335                "replace stream metadata",
336                format!(
337                    "failed to atomically replace metadata {}: {e}",
338                    meta_path.display()
339                ),
340                &e,
341            )
342        })?;
343
344        Ok(())
345    }
346
347    fn open_stream_file(&self, dir: &Path) -> Result<File> {
348        self.validate_stream_dir(dir)?;
349        let path = Self::data_log_path(dir);
350        retry_on_eintr(|| {
351            OpenOptions::new()
352                .create(true)
353                .append(true)
354                .read(true)
355                .open(&path)
356        })
357        .map_err(|e| {
358            Error::classify_io_failure(
359                "file",
360                "open stream log",
361                format!("failed to open stream log {}: {e}", path.display()),
362                &e,
363            )
364        })
365    }
366
367    fn rebuild_index(file: &mut File) -> Result<(Vec<MessageIndex>, u64, u64)> {
368        let mut index = Vec::new();
369        let mut next_read_seq = 0u64;
370        let mut next_byte_offset = 0u64;
371
372        let file_len = file
373            .metadata()
374            .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
375            .len();
376
377        let mut cursor = 0u64;
378        let mut header = [0u8; RECORD_HEADER_BYTES];
379
380        while cursor < file_len {
381            file.seek(SeekFrom::Start(cursor))
382                .map_err(|e| Error::Storage(format!("failed to seek stream log: {e}")))?;
383
384            let read = file
385                .read(&mut header)
386                .map_err(|e| Error::Storage(format!("failed to read stream log header: {e}")))?;
387
388            if read == 0 {
389                break;
390            }
391
392            if read < RECORD_HEADER_BYTES {
393                file.set_len(cursor).map_err(|e| {
394                    Error::Storage(format!("failed to truncate partial record: {e}"))
395                })?;
396                break;
397            }
398
399            let record_len = u64::from(u32::from_le_bytes(header));
400            let record_end = cursor + RECORD_HEADER_BYTES as u64 + record_len;
401
402            if record_end > file_len {
403                file.set_len(cursor).map_err(|e| {
404                    Error::Storage(format!("failed to truncate partial record: {e}"))
405                })?;
406                break;
407            }
408
409            index.push(MessageIndex {
410                offset: Offset::new(next_read_seq, next_byte_offset),
411                file_pos: cursor + RECORD_HEADER_BYTES as u64,
412                byte_len: record_len,
413            });
414
415            next_read_seq += 1;
416            next_byte_offset += record_len;
417            cursor = record_end;
418        }
419
420        file.seek(SeekFrom::End(0))
421            .map_err(|e| Error::Storage(format!("failed to seek end of stream log: {e}")))?;
422
423        Ok((index, next_read_seq, next_byte_offset))
424    }
425
426    fn rollback_total_bytes(&self, bytes: u64) {
427        self.total_bytes
428            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
429                Some(current.saturating_sub(bytes))
430            })
431            .ok();
432    }
433
434    fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
435        let streams = self.streams.read().expect("streams lock poisoned");
436        streams.get(name).map(Arc::clone)
437    }
438
439    fn hard_remove_stream(
440        &self,
441        streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
442        name: &str,
443    ) -> Result<Option<ForkInfo>> {
444        let Some(stream_arc) = streams.remove(name) else {
445            return Ok(None);
446        };
447        let stream = stream_arc.read().expect("stream lock poisoned");
448        let dir = stream.dir.clone();
449        let total_bytes = stream.total_bytes;
450        let fork_info = stream.fork_info.clone();
451        drop(stream);
452
453        self.remove_stream_dir(&dir)?;
454        self.rollback_total_bytes(total_bytes);
455        Ok(fork_info)
456    }
457
458    fn remove_for_recreate(
459        &self,
460        streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
461        name: &str,
462    ) -> Result<()> {
463        if let Some(fork_info) = self.hard_remove_stream(streams, name)? {
464            self.cascade_delete(streams, &fork_info.source_name);
465        }
466        Ok(())
467    }
468
469    fn append_records(
470        &self,
471        name: &str,
472        stream: &mut StreamEntry,
473        messages: &[Bytes],
474    ) -> Result<()> {
475        if messages.is_empty() {
476            return Ok(());
477        }
478
479        let mut total_batch_bytes = 0u64;
480        let mut payload_bytes = 0u64;
481        let mut sizes = Vec::with_capacity(messages.len());
482
483        for msg in messages {
484            let len = u64::try_from(msg.len()).unwrap_or(u64::MAX);
485            if len > u64::from(u32::MAX) {
486                return Err(Error::InvalidHeader {
487                    header: "Content-Length".to_string(),
488                    reason: "message too large for file record format".to_string(),
489                });
490            }
491            payload_bytes += len;
492            total_batch_bytes += len;
493            sizes.push(len);
494        }
495
496        // Reserve global capacity first (preserves error precedence: global
497        // limit takes priority over per-stream limit). Uses a CAS loop so
498        // concurrent appends on different streams cannot exceed the cap.
499        if self
500            .total_bytes
501            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
502                current
503                    .checked_add(total_batch_bytes)
504                    .filter(|next| *next <= self.max_total_bytes)
505            })
506            .is_err()
507        {
508            return Err(Error::MemoryLimitExceeded);
509        }
510
511        // Check per-stream limit; rollback global reservation on failure.
512        if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
513            self.rollback_total_bytes(total_batch_bytes);
514            return Err(Error::StreamSizeLimitExceeded);
515        }
516
517        let wire_overhead = RECORD_HEADER_BYTES.saturating_mul(messages.len());
518        let mut write_buf =
519            Vec::with_capacity(usize::try_from(payload_bytes).unwrap_or(0) + wire_overhead);
520        for msg in messages {
521            let len = u32::try_from(msg.len()).unwrap_or(u32::MAX);
522            write_buf.extend_from_slice(&len.to_le_bytes());
523            write_buf.extend_from_slice(msg);
524        }
525
526        let before_len = stream.file_len;
527
528        if let Err(e) = retry_on_eintr(|| stream.file.write_all(&write_buf)) {
529            // Write errors may still leave partial bytes on disk; refresh cached length.
530            if let Ok(m) = stream.file.metadata() {
531                stream.file_len = m.len();
532            }
533            self.rollback_total_bytes(total_batch_bytes);
534            return Err(Error::Storage(format!(
535                "failed to append stream log for {name}: {e}"
536            )));
537        }
538
539        if self.sync_on_append
540            && let Err(e) = retry_on_eintr(|| stream.file.sync_data())
541        {
542            // Data may be written even if fsync fails; refresh cached length.
543            if let Ok(m) = stream.file.metadata() {
544                stream.file_len = m.len();
545            }
546            self.rollback_total_bytes(total_batch_bytes);
547            return Err(Error::classify_io_failure(
548                "file",
549                "sync stream log",
550                format!("failed to sync stream log for {name}: {e}"),
551                &e,
552            ));
553        }
554
555        let mut cursor = before_len;
556        for len in sizes {
557            let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
558            stream.index.push(MessageIndex {
559                offset,
560                file_pos: cursor + RECORD_HEADER_BYTES as u64,
561                byte_len: len,
562            });
563            stream.next_read_seq += 1;
564            stream.next_byte_offset += len;
565            stream.total_bytes += len;
566            cursor += RECORD_HEADER_BYTES as u64 + len;
567        }
568        stream.file_len = cursor;
569
570        let _ = stream.notify.send(());
571        Ok(())
572    }
573
574    fn read_messages(file: &File, index_slice: &[MessageIndex]) -> Result<Vec<Bytes>> {
575        if index_slice.is_empty() {
576            return Ok(Vec::new());
577        }
578
579        let first_pos = index_slice[0].file_pos;
580        let last = index_slice
581            .last()
582            .expect("index_slice non-empty due early return");
583        let read_end = last.file_pos + last.byte_len;
584        let read_len = read_end.saturating_sub(first_pos);
585
586        // Use positional read (pread) to avoid the shared file-offset race
587        // that occurs when multiple readers `try_clone()` the same file
588        // descriptor and `seek()` concurrently.
589        let mut raw = vec![0u8; usize::try_from(read_len).unwrap_or(usize::MAX)];
590        #[cfg(unix)]
591        {
592            use std::os::unix::fs::FileExt;
593            file.read_exact_at(&mut raw, first_pos)
594                .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
595        }
596        #[cfg(windows)]
597        {
598            use std::os::windows::fs::FileExt;
599            file.seek_read(&mut raw, first_pos)
600                .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
601        }
602        #[cfg(not(any(unix, windows)))]
603        {
604            let mut reader = file
605                .try_clone()
606                .map_err(|e| Error::Storage(format!("failed to clone stream file handle: {e}")))?;
607            reader
608                .seek(SeekFrom::Start(first_pos))
609                .map_err(|e| Error::Storage(format!("failed to seek message data: {e}")))?;
610            reader
611                .read_exact(&mut raw)
612                .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
613        }
614
615        let shared = Bytes::from(raw);
616        let mut messages = Vec::with_capacity(index_slice.len());
617        for idx in index_slice {
618            let rel_start =
619                usize::try_from(idx.file_pos.saturating_sub(first_pos)).unwrap_or(usize::MAX);
620            let rel_end = rel_start + usize::try_from(idx.byte_len).unwrap_or(usize::MAX);
621            messages.push(shared.slice(rel_start..rel_end));
622        }
623
624        Ok(messages)
625    }
626
627    fn remove_stream_dir(&self, dir: &Path) -> Result<()> {
628        self.validate_stream_dir(dir)?;
629        retry_on_eintr(|| fs::remove_dir_all(dir)).map_err(|e| {
630            Error::classify_io_failure(
631                "file",
632                "remove stream directory",
633                format!("failed to remove stream directory {}: {e}", dir.display()),
634                &e,
635            )
636        })
637    }
638
639    /// Walk up the fork chain after a hard-delete, decrementing `ref_count`s
640    /// and garbage-collecting tombstoned ancestors with zero references.
641    ///
642    /// Must be called while holding the streams write lock.
643    fn cascade_delete(
644        &self,
645        streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
646        parent_name: &str,
647    ) {
648        let mut current_parent = parent_name.to_string();
649        loop {
650            let Some(parent_arc) = streams.get(&current_parent) else {
651                break;
652            };
653            let parent_arc = parent_arc.clone();
654            let mut parent = parent_arc.write().expect("stream lock poisoned");
655            parent.ref_count = parent.ref_count.saturating_sub(1);
656
657            if parent.state == StreamState::Tombstone && parent.ref_count == 0 {
658                // This parent can be garbage-collected
659                let fi = parent.fork_info.clone();
660                let dir = parent.dir.clone();
661                let total = parent.total_bytes;
662                // Persist updated ref_count before removing
663                drop(parent);
664                streams.remove(&current_parent);
665
666                if let Err(e) = self.remove_stream_dir(&dir) {
667                    warn!(%e, stream = current_parent.as_str(), "failed to remove tombstoned ancestor directory during cascade delete");
668                } else {
669                    self.rollback_total_bytes(total);
670                }
671
672                // Continue up the chain
673                if let Some(fi) = fi {
674                    current_parent = fi.source_name;
675                } else {
676                    break;
677                }
678            } else {
679                // Parent still alive or has other children — persist ref_count update
680                if let Err(e) = self.write_metadata_for(&current_parent, &parent) {
681                    warn!(%e, stream = current_parent.as_str(), "failed to persist parent ref_count during cascade delete");
682                }
683                break;
684            }
685        }
686    }
687
688    /// Read messages from a source chain, following fork lineage upward.
689    ///
690    /// Reads all messages from `from_offset` up to (but not including) `up_to`
691    /// across the full ancestor chain. This bypasses tombstone checks since
692    /// source streams may be soft-deleted but their data must still be readable
693    /// by forks.
694    fn read_source_chain(
695        &self,
696        source_name: &str,
697        from_offset: &Offset,
698        up_to: &Offset,
699    ) -> Result<Vec<Bytes>> {
700        let streams = self.streams.read().expect("streams lock poisoned");
701
702        // Build the ancestor chain from source to root
703        let plan = super::fork::build_read_plan(source_name, |n| {
704            streams.get(n).map(|arc| {
705                let s = arc.read().expect("stream lock poisoned");
706                s.fork_info.clone()
707            })
708        });
709
710        let mut all_messages: Vec<Bytes> = Vec::new();
711
712        for (i, segment) in plan.iter().enumerate() {
713            let Some(seg_arc) = streams.get(&segment.name) else {
714                continue;
715            };
716            let seg_stream = seg_arc.read().expect("stream lock poisoned");
717
718            // Determine the effective upper bound for this segment
719            let effective_up_to = if i == plan.len() - 1 {
720                // Last segment (the direct source): read up to `up_to`
721                Some(up_to)
722            } else {
723                // Intermediate segment: read up to this segment's read_up_to
724                segment.read_up_to.as_ref()
725            };
726
727            // Determine start offset for this segment
728            let effective_from = if i == 0 {
729                from_offset
730            } else {
731                &Offset::start()
732            };
733
734            // Find start index in this segment's index
735            let start_idx = if effective_from.is_start() {
736                0
737            } else {
738                match seg_stream
739                    .index
740                    .binary_search_by(|m| m.offset.cmp(effective_from))
741                {
742                    Ok(idx) | Err(idx) => idx,
743                }
744            };
745
746            // Find end index based on upper bound
747            let end_idx = if let Some(bound) = effective_up_to {
748                match seg_stream.index.binary_search_by(|m| m.offset.cmp(bound)) {
749                    Ok(idx) | Err(idx) => idx,
750                }
751            } else {
752                seg_stream.index.len()
753            };
754
755            if start_idx < end_idx {
756                let index_slice = &seg_stream.index[start_idx..end_idx];
757                let msgs = Self::read_messages(&seg_stream.file, index_slice)?;
758                all_messages.extend(msgs);
759            }
760        }
761
762        Ok(all_messages)
763    }
764
765    fn load_existing_streams(&self) -> Result<()> {
766        let entries = fs::read_dir(&self.root_dir).map_err(|e| {
767            Error::Storage(format!(
768                "failed to read storage directory {}: {e}",
769                self.root_dir.display()
770            ))
771        })?;
772
773        let mut streams_map = self.streams.write().expect("streams lock poisoned");
774        let mut restored_total = 0u64;
775
776        for dir_entry in entries {
777            let dir_entry = dir_entry
778                .map_err(|e| Error::Storage(format!("failed to inspect storage entry: {e}")))?;
779            let path = dir_entry.path();
780            if !path.is_dir() {
781                continue;
782            }
783            if self.validate_stream_dir(&path).is_err() {
784                continue;
785            }
786
787            let meta_path = Self::meta_path(&path);
788            if !meta_path.exists() {
789                continue;
790            }
791
792            let meta_payload = fs::read(&meta_path).map_err(|e| {
793                Error::Storage(format!(
794                    "failed to read stream metadata {}: {e}",
795                    meta_path.display()
796                ))
797            })?;
798            let meta: StreamMeta = serde_json::from_slice(&meta_payload).map_err(|e| {
799                Error::Storage(format!(
800                    "failed to parse stream metadata {}: {e}",
801                    meta_path.display()
802                ))
803            })?;
804
805            let mut file = self.open_stream_file(&path)?;
806            let (index, next_read_seq, next_byte_offset) = Self::rebuild_index(&mut file)?;
807            let total_bytes = next_byte_offset;
808            let file_len = file
809                .metadata()
810                .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
811                .len();
812
813            // Reconcile: data.log is the source of truth for message count
814            // and byte offsets. If meta.json is stale (e.g. crash before
815            // metadata flush), log a warning so operators can investigate.
816            let log_msg_count = index.len() as u64;
817            let meta_has_data =
818                meta.closed || !meta.producers.is_empty() || meta.last_seq.is_some();
819            if log_msg_count == 0 && meta_has_data {
820                warn!(
821                    stream = meta.name,
822                    "meta.json indicates activity but data.log has 0 messages; \
823                     data.log is authoritative"
824                );
825            }
826
827            let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
828            let mut entry = StreamEntry {
829                config: meta.config,
830                index,
831                closed: meta.closed,
832                next_read_seq,
833                next_byte_offset,
834                total_bytes,
835                created_at: meta.created_at,
836                updated_at: meta.updated_at,
837                producers: meta.producers,
838                notify,
839                last_seq: meta.last_seq,
840                file,
841                file_len,
842                dir: path,
843                fork_info: meta.fork_info,
844                ref_count: meta.ref_count,
845                state: meta.state,
846            };
847
848            if super::is_stream_expired(&entry.config) {
849                self.remove_stream_dir(&entry.dir)?;
850                continue;
851            }
852
853            super::cleanup_stale_producers(&mut entry.producers);
854
855            // Re-persist metadata if the on-disk copy may be stale so that
856            // future restarts see a consistent snapshot. Best-effort: a
857            // failure here is non-fatal since the data.log remains correct.
858            if let Err(e) = self.write_metadata_for(&meta.name, &entry) {
859                warn!(
860                    %e,
861                    stream = meta.name,
862                    "failed to re-persist reconciled metadata during recovery"
863                );
864            }
865            restored_total = restored_total.saturating_add(entry.total_bytes);
866            streams_map.insert(meta.name, Arc::new(RwLock::new(entry)));
867        }
868
869        self.total_bytes.store(restored_total, Ordering::Release);
870
871        Ok(())
872    }
873
874    /// Read messages from a non-forked stream using the in-memory index.
875    fn read_local_file_messages(
876        stream: &StreamEntry,
877        from_offset: &Offset,
878        next_offset: Offset,
879    ) -> Result<ReadResult> {
880        let start_idx = if from_offset.is_start() {
881            0
882        } else {
883            match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
884                Ok(idx) | Err(idx) => idx,
885            }
886        };
887
888        let index_slice = &stream.index[start_idx..];
889        let messages = Self::read_messages(&stream.file, index_slice)?;
890        let at_tail = start_idx + messages.len() >= stream.index.len();
891
892        Ok(ReadResult {
893            messages,
894            next_offset,
895            at_tail,
896            closed: stream.closed,
897        })
898    }
899
900    /// Read the local portion of a forked stream's messages from disk.
901    fn read_fork_local_messages(
902        stream: &StreamEntry,
903        from_offset: &Offset,
904        fork_offset: &Offset,
905    ) -> Result<Vec<Bytes>> {
906        if from_offset.is_start() || *from_offset <= *fork_offset {
907            Self::read_messages(&stream.file, &stream.index)
908        } else {
909            let start_idx = match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
910                Ok(idx) | Err(idx) => idx,
911            };
912            Self::read_messages(&stream.file, &stream.index[start_idx..])
913        }
914    }
915
916    /// Combine source chain messages with fork-local messages into a read result.
917    fn assemble_fork_read(
918        &self,
919        from_offset: &Offset,
920        fi: &super::ForkInfo,
921        fork_local_messages: Vec<Bytes>,
922        next_offset: Offset,
923        closed: bool,
924    ) -> Result<ReadResult> {
925        let mut all_messages: Vec<Bytes> = Vec::new();
926        if from_offset.is_start() || *from_offset < fi.fork_offset {
927            let source_messages =
928                self.read_source_chain(&fi.source_name, from_offset, &fi.fork_offset)?;
929            all_messages.extend(source_messages);
930        }
931        all_messages.extend(fork_local_messages);
932
933        Ok(ReadResult {
934            messages: all_messages,
935            next_offset,
936            at_tail: true,
937            closed,
938        })
939    }
940}
941
942impl Storage for FileStorage {
943    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
944        let mut streams = self.streams.write().expect("streams lock poisoned");
945
946        if let Some(stream_arc) = streams.get(name) {
947            let stream = stream_arc.read().expect("stream lock poisoned");
948            match super::fork::evaluate_root_create(
949                name,
950                &stream.config,
951                stream.state,
952                stream.ref_count,
953                &config,
954            ) {
955                super::fork::ExistingCreateDisposition::RemoveExpired => {
956                    drop(stream);
957                    self.remove_for_recreate(&mut streams, name)?;
958                }
959                super::fork::ExistingCreateDisposition::AlreadyExists => {
960                    return Ok(CreateStreamResult::AlreadyExists);
961                }
962                super::fork::ExistingCreateDisposition::Conflict(err) => {
963                    return Err(err);
964                }
965            }
966        }
967
968        let dir = self.stream_dir_for_name(name)?;
969        retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
970            Error::classify_io_failure(
971                "file",
972                "create stream directory",
973                format!("failed to create stream directory {}: {e}", dir.display()),
974                &e,
975            )
976        })?;
977
978        self.validate_stream_dir(&dir)?;
979        let file = self.open_stream_file(&dir)?;
980        let entry = StreamEntry::new(config, file, dir.clone());
981
982        if let Err(e) = self.write_metadata_for(name, &entry) {
983            if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
984                warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
985            }
986            return Err(e);
987        }
988        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
989
990        Ok(CreateStreamResult::Created)
991    }
992
993    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
994        let stream_arc = self
995            .get_stream(name)
996            .ok_or_else(|| Error::NotFound(name.to_string()))?;
997
998        let mut stream = stream_arc.write().expect("stream lock poisoned");
999
1000        super::fork::check_stream_access(&stream.config, stream.state, name)?;
1001
1002        if stream.closed {
1003            return Err(Error::StreamClosed);
1004        }
1005
1006        super::validate_content_type(&stream.config.content_type, content_type)?;
1007
1008        let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
1009        self.append_records(name, &mut stream, &[data])?;
1010        stream.updated_at = Some(Utc::now());
1011        if super::fork::renew_ttl(&mut stream.config) {
1012            self.write_metadata_for(name, &stream)?;
1013        }
1014        Ok(offset)
1015    }
1016
1017    fn batch_append(
1018        &self,
1019        name: &str,
1020        messages: Vec<Bytes>,
1021        content_type: &str,
1022        seq: Option<&str>,
1023    ) -> Result<Offset> {
1024        if messages.is_empty() {
1025            return Err(Error::InvalidHeader {
1026                header: "Content-Length".to_string(),
1027                reason: "batch cannot be empty".to_string(),
1028            });
1029        }
1030
1031        let stream_arc = self
1032            .get_stream(name)
1033            .ok_or_else(|| Error::NotFound(name.to_string()))?;
1034
1035        let mut stream = stream_arc.write().expect("stream lock poisoned");
1036
1037        super::fork::check_stream_access(&stream.config, stream.state, name)?;
1038
1039        if stream.closed {
1040            return Err(Error::StreamClosed);
1041        }
1042
1043        super::validate_content_type(&stream.config.content_type, content_type)?;
1044
1045        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
1046        let seq_changed = pending_seq.is_some();
1047        self.append_records(name, &mut stream, &messages)?;
1048        stream.updated_at = Some(Utc::now());
1049        let ttl_renewed = super::fork::renew_ttl(&mut stream.config);
1050        if let Some(new_seq) = pending_seq {
1051            stream.last_seq = Some(new_seq);
1052        }
1053        if ttl_renewed || seq_changed {
1054            self.write_metadata_for(name, &stream)?;
1055        }
1056
1057        Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
1058    }
1059
1060    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
1061        let stream_arc = self
1062            .get_stream(name)
1063            .ok_or_else(|| Error::NotFound(name.to_string()))?;
1064
1065        let needs_ttl_renewal = {
1066            let stream = stream_arc.read().expect("stream lock poisoned");
1067            super::fork::check_stream_access(&stream.config, stream.state, name)?;
1068            stream.config.ttl_seconds.is_some()
1069        };
1070
1071        if !needs_ttl_renewal {
1072            let stream = stream_arc.read().expect("stream lock poisoned");
1073            let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
1074
1075            if from_offset.is_now() {
1076                return Ok(ReadResult {
1077                    messages: Vec::new(),
1078                    next_offset,
1079                    at_tail: true,
1080                    closed: stream.closed,
1081                });
1082            }
1083
1084            if stream.fork_info.is_none() {
1085                return Self::read_local_file_messages(&stream, from_offset, next_offset);
1086            }
1087
1088            let fi = stream.fork_info.clone().expect("checked above");
1089            let closed = stream.closed;
1090            let fork_local_messages =
1091                Self::read_fork_local_messages(&stream, from_offset, &fi.fork_offset)?;
1092            drop(stream);
1093
1094            return self.assemble_fork_read(
1095                from_offset,
1096                &fi,
1097                fork_local_messages,
1098                next_offset,
1099                closed,
1100            );
1101        }
1102
1103        let mut stream = stream_arc.write().expect("stream lock poisoned");
1104        super::fork::check_stream_access(&stream.config, stream.state, name)?;
1105
1106        let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
1107        if from_offset.is_now() {
1108            super::fork::renew_ttl(&mut stream.config);
1109            self.write_metadata_for(name, &stream)?;
1110            return Ok(ReadResult {
1111                messages: Vec::new(),
1112                next_offset,
1113                at_tail: true,
1114                closed: stream.closed,
1115            });
1116        }
1117
1118        if stream.fork_info.is_none() {
1119            let result = Self::read_local_file_messages(&stream, from_offset, next_offset)?;
1120            super::fork::renew_ttl(&mut stream.config);
1121            self.write_metadata_for(name, &stream)?;
1122            return Ok(result);
1123        }
1124
1125        let fi = stream.fork_info.clone().expect("checked above");
1126        let closed = stream.closed;
1127        let fork_local_messages =
1128            Self::read_fork_local_messages(&stream, from_offset, &fi.fork_offset)?;
1129        super::fork::renew_ttl(&mut stream.config);
1130        self.write_metadata_for(name, &stream)?;
1131        drop(stream);
1132
1133        self.assemble_fork_read(from_offset, &fi, fork_local_messages, next_offset, closed)
1134    }
1135
1136    fn delete(&self, name: &str) -> Result<()> {
1137        let mut streams = self.streams.write().expect("streams lock poisoned");
1138
1139        let stream_arc = streams
1140            .get(name)
1141            .ok_or_else(|| Error::NotFound(name.to_string()))?
1142            .clone();
1143
1144        {
1145            let stream = stream_arc.read().expect("stream lock poisoned");
1146
1147            match super::fork::evaluate_delete(name, stream.state, stream.ref_count)? {
1148                super::fork::DeleteDisposition::Tombstone => {
1149                    drop(stream);
1150                    let mut stream_w = stream_arc.write().expect("stream lock poisoned");
1151                    stream_w.state = StreamState::Tombstone;
1152                    self.write_metadata_for(name, &stream_w)?;
1153                    return Ok(());
1154                }
1155                super::fork::DeleteDisposition::HardDelete => {}
1156            }
1157        }
1158
1159        let fork_info = self.hard_remove_stream(&mut streams, name)?;
1160
1161        if let Some(fi) = fork_info {
1162            self.cascade_delete(&mut streams, &fi.source_name);
1163        }
1164
1165        Ok(())
1166    }
1167
1168    fn head(&self, name: &str) -> Result<StreamMetadata> {
1169        let stream_arc = self
1170            .get_stream(name)
1171            .ok_or_else(|| Error::NotFound(name.to_string()))?;
1172
1173        let stream = stream_arc.read().expect("stream lock poisoned");
1174
1175        super::fork::check_stream_access(&stream.config, stream.state, name)?;
1176
1177        Ok(StreamMetadata {
1178            config: stream.config.clone(),
1179            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1180            closed: stream.closed,
1181            total_bytes: stream.total_bytes,
1182            message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
1183            created_at: stream.created_at,
1184            updated_at: stream.updated_at,
1185        })
1186    }
1187
1188    fn close_stream(&self, name: &str) -> Result<()> {
1189        let stream_arc = self
1190            .get_stream(name)
1191            .ok_or_else(|| Error::NotFound(name.to_string()))?;
1192
1193        let mut stream = stream_arc.write().expect("stream lock poisoned");
1194
1195        super::fork::check_stream_access(&stream.config, stream.state, name)?;
1196
1197        stream.closed = true;
1198        stream.updated_at = Some(Utc::now());
1199        super::fork::renew_ttl(&mut stream.config);
1200        self.write_metadata_for(name, &stream)?;
1201
1202        let _ = stream.notify.send(());
1203        Ok(())
1204    }
1205
1206    fn append_with_producer(
1207        &self,
1208        name: &str,
1209        messages: Vec<Bytes>,
1210        content_type: &str,
1211        producer: &ProducerHeaders,
1212        should_close: bool,
1213        seq: Option<&str>,
1214    ) -> Result<ProducerAppendResult> {
1215        let stream_arc = self
1216            .get_stream(name)
1217            .ok_or_else(|| Error::NotFound(name.to_string()))?;
1218
1219        let mut stream = stream_arc.write().expect("stream lock poisoned");
1220
1221        super::fork::check_stream_access(&stream.config, stream.state, name)?;
1222
1223        super::cleanup_stale_producers(&mut stream.producers);
1224
1225        if !messages.is_empty() {
1226            super::validate_content_type(&stream.config.content_type, content_type)?;
1227        }
1228
1229        let now = Utc::now();
1230
1231        match super::check_producer(
1232            stream.producers.get(producer.id.as_str()),
1233            producer,
1234            stream.closed,
1235        )? {
1236            ProducerCheck::Accept => {}
1237            ProducerCheck::Duplicate { epoch, seq } => {
1238                return Ok(ProducerAppendResult::Duplicate {
1239                    epoch,
1240                    seq,
1241                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1242                    closed: stream.closed,
1243                });
1244            }
1245        }
1246
1247        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
1248        self.append_records(name, &mut stream, &messages)?;
1249
1250        if let Some(new_seq) = pending_seq {
1251            stream.last_seq = Some(new_seq);
1252        }
1253        if should_close {
1254            stream.closed = true;
1255        }
1256
1257        stream.producers.insert(
1258            producer.id.clone(),
1259            ProducerState {
1260                epoch: producer.epoch,
1261                last_seq: producer.seq,
1262                updated_at: now,
1263            },
1264        );
1265        stream.updated_at = Some(now);
1266        super::fork::renew_ttl(&mut stream.config);
1267
1268        self.write_metadata_for(name, &stream)?;
1269
1270        Ok(ProducerAppendResult::Accepted {
1271            epoch: producer.epoch,
1272            seq: producer.seq,
1273            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1274            closed: stream.closed,
1275        })
1276    }
1277
1278    fn create_stream_with_data(
1279        &self,
1280        name: &str,
1281        config: StreamConfig,
1282        messages: Vec<Bytes>,
1283        should_close: bool,
1284    ) -> Result<CreateWithDataResult> {
1285        let mut streams = self.streams.write().expect("streams lock poisoned");
1286
1287        if let Some(stream_arc) = streams.get(name) {
1288            let stream = stream_arc.read().expect("stream lock poisoned");
1289            match super::fork::evaluate_root_create(
1290                name,
1291                &stream.config,
1292                stream.state,
1293                stream.ref_count,
1294                &config,
1295            ) {
1296                super::fork::ExistingCreateDisposition::RemoveExpired => {
1297                    drop(stream);
1298                    self.remove_for_recreate(&mut streams, name)?;
1299                }
1300                super::fork::ExistingCreateDisposition::AlreadyExists => {
1301                    return Ok(CreateWithDataResult {
1302                        status: CreateStreamResult::AlreadyExists,
1303                        next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1304                        closed: stream.closed,
1305                    });
1306                }
1307                super::fork::ExistingCreateDisposition::Conflict(err) => {
1308                    return Err(err);
1309                }
1310            }
1311        }
1312
1313        let dir = self.stream_dir_for_name(name)?;
1314        retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
1315            Error::classify_io_failure(
1316                "file",
1317                "create stream directory",
1318                format!("failed to create stream directory {}: {e}", dir.display()),
1319                &e,
1320            )
1321        })?;
1322
1323        self.validate_stream_dir(&dir)?;
1324        let file = self.open_stream_file(&dir)?;
1325        let mut entry = StreamEntry::new(config, file, dir.clone());
1326
1327        if !messages.is_empty()
1328            && let Err(e) = self.append_records(name, &mut entry, &messages)
1329        {
1330            if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1331                warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1332            }
1333            return Err(e);
1334        }
1335        if should_close {
1336            entry.closed = true;
1337        }
1338
1339        let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
1340        let closed = entry.closed;
1341
1342        if let Err(e) = self.write_metadata_for(name, &entry) {
1343            if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1344                warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1345            }
1346            return Err(e);
1347        }
1348        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
1349
1350        Ok(CreateWithDataResult {
1351            status: CreateStreamResult::Created,
1352            next_offset,
1353            closed,
1354        })
1355    }
1356
1357    fn exists(&self, name: &str) -> bool {
1358        let streams = self.streams.read().expect("streams lock poisoned");
1359        if let Some(stream_arc) = streams.get(name) {
1360            let stream = stream_arc.read().expect("stream lock poisoned");
1361            !super::is_stream_expired(&stream.config) && stream.state == StreamState::Active
1362        } else {
1363            false
1364        }
1365    }
1366
1367    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1368        let stream_arc = self.get_stream(name)?;
1369        let stream = stream_arc.read().expect("stream lock poisoned");
1370
1371        if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
1372            return None;
1373        }
1374
1375        Some(stream.notify.subscribe())
1376    }
1377
1378    fn cleanup_expired_streams(&self) -> usize {
1379        let mut streams = self.streams.write().expect("streams lock poisoned");
1380        let mut expired = Vec::new();
1381
1382        for (name, stream_arc) in streams.iter() {
1383            let stream = stream_arc.read().expect("stream lock poisoned");
1384            if super::is_stream_expired(&stream.config) {
1385                expired.push((
1386                    name.clone(),
1387                    stream.total_bytes,
1388                    stream.dir.clone(),
1389                    stream.ref_count,
1390                    stream.fork_info.clone(),
1391                ));
1392            }
1393        }
1394
1395        let count = expired.len();
1396        for (name, _bytes, _dir, ref_count, _fork_info) in expired {
1397            match super::fork::evaluate_expired_cleanup(ref_count) {
1398                super::fork::DeleteDisposition::Tombstone => {
1399                    if let Some(arc) = streams.get(&name) {
1400                        let mut stream = arc.write().expect("stream lock poisoned");
1401                        stream.state = StreamState::Tombstone;
1402                        if let Err(e) = self.write_metadata_for(&name, &stream) {
1403                            warn!(%e, stream = name.as_str(), "failed to persist tombstone for expired stream");
1404                        }
1405                    }
1406                }
1407                super::fork::DeleteDisposition::HardDelete => {
1408                    if let Err(e) = self.remove_for_recreate(&mut streams, &name) {
1409                        warn!(%e, stream = name.as_str(), "failed to remove expired stream during cleanup");
1410                    }
1411                }
1412            }
1413        }
1414
1415        count
1416    }
1417
1418    fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>> {
1419        let streams = self.streams.read().expect("streams lock poisoned");
1420        let mut result = Vec::new();
1421        for (name, stream_arc) in streams.iter() {
1422            let stream = stream_arc.read().expect("stream lock poisoned");
1423            if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
1424                continue;
1425            }
1426            result.push((
1427                name.clone(),
1428                StreamMetadata {
1429                    config: stream.config.clone(),
1430                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1431                    closed: stream.closed,
1432                    total_bytes: stream.total_bytes,
1433                    message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
1434                    created_at: stream.created_at,
1435                    updated_at: stream.updated_at,
1436                },
1437            ));
1438        }
1439        result.sort_by(|a, b| a.0.cmp(&b.0));
1440        Ok(result)
1441    }
1442
1443    fn create_fork(
1444        &self,
1445        name: &str,
1446        source_name: &str,
1447        fork_offset: Option<&Offset>,
1448        config: StreamConfig,
1449    ) -> Result<CreateStreamResult> {
1450        let mut streams = self.streams.write().expect("streams lock poisoned");
1451
1452        // Look up source stream
1453        let source_arc = streams
1454            .get(source_name)
1455            .ok_or_else(|| Error::NotFound(source_name.to_string()))?
1456            .clone();
1457
1458        let source = source_arc.read().expect("stream lock poisoned");
1459
1460        super::fork::check_fork_source_access(&source.config, source.state, source_name)?;
1461
1462        // Resolve fork offset (defaults to source tail)
1463        let source_next_offset = Offset::new(source.next_read_seq, source.next_byte_offset);
1464        let resolved_offset = super::fork::resolve_fork_offset(fork_offset, &source_next_offset)?;
1465
1466        // Validate content type: fork must match source
1467        if !config
1468            .content_type
1469            .eq_ignore_ascii_case(&source.config.content_type)
1470        {
1471            return Err(Error::ContentTypeMismatch {
1472                expected: source.config.content_type.clone(),
1473                actual: config.content_type.clone(),
1474            });
1475        }
1476
1477        let fork_spec = super::fork::build_fork_create_spec(
1478            source_name,
1479            &source.config,
1480            &config,
1481            resolved_offset.clone(),
1482        );
1483
1484        drop(source);
1485
1486        if let Some(existing_arc) = streams.get(name) {
1487            let existing = existing_arc.read().expect("stream lock poisoned");
1488            match super::fork::evaluate_fork_create(
1489                name,
1490                &existing.config,
1491                existing.fork_info.as_ref(),
1492                existing.state,
1493                existing.ref_count,
1494                &fork_spec,
1495            ) {
1496                super::fork::ExistingCreateDisposition::RemoveExpired => {
1497                    drop(existing);
1498                    self.remove_for_recreate(&mut streams, name)?;
1499                }
1500                super::fork::ExistingCreateDisposition::AlreadyExists => {
1501                    return Ok(CreateStreamResult::AlreadyExists);
1502                }
1503                super::fork::ExistingCreateDisposition::Conflict(err) => {
1504                    return Err(err);
1505                }
1506            }
1507        }
1508
1509        // Extract fork offset components to initialize the fork entry
1510        let (fork_read_seq, fork_byte_offset) =
1511            resolved_offset.parse_components().unwrap_or((0, 0));
1512
1513        // Create fork directory and files on disk
1514        let dir = self.stream_dir_for_name(name)?;
1515        retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
1516            Error::classify_io_failure(
1517                "file",
1518                "create fork directory",
1519                format!("failed to create fork directory {}: {e}", dir.display()),
1520                &e,
1521            )
1522        })?;
1523        self.validate_stream_dir(&dir)?;
1524        let file = self.open_stream_file(&dir)?;
1525
1526        // Create fork entry
1527        let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
1528        let file_len = file.metadata().map_or(0, |m| m.len());
1529        let entry = StreamEntry {
1530            config: fork_spec.config,
1531            index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
1532            closed: config.created_closed,
1533            next_read_seq: fork_read_seq,
1534            next_byte_offset: fork_byte_offset,
1535            total_bytes: 0,
1536            created_at: Utc::now(),
1537            updated_at: None,
1538            producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
1539            notify,
1540            last_seq: None,
1541            file,
1542            file_len,
1543            dir: dir.clone(),
1544            fork_info: Some(ForkInfo {
1545                source_name: fork_spec.source_name,
1546                fork_offset: resolved_offset,
1547            }),
1548            ref_count: 0,
1549            state: StreamState::Active,
1550        };
1551
1552        // Write fork metadata to disk
1553        if let Err(e) = self.write_metadata_for(name, &entry) {
1554            if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1555                warn!(%cleanup_err, stream = name, "failed to clean up orphaned fork directory");
1556            }
1557            return Err(e);
1558        }
1559
1560        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
1561
1562        // Increment source ref_count and persist
1563        if let Some(source_arc) = streams.get(source_name) {
1564            let mut source = source_arc.write().expect("stream lock poisoned");
1565            source.ref_count += 1;
1566            if let Err(e) = self.write_metadata_for(source_name, &source) {
1567                warn!(%e, stream = source_name, "failed to persist source ref_count after fork creation");
1568            }
1569        }
1570
1571        Ok(CreateStreamResult::Created)
1572    }
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577    use super::*;
1578    use base64::Engine;
1579
1580    fn test_storage_dir() -> PathBuf {
1581        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1582        let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1583        let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1584        let pid = std::process::id();
1585        std::env::temp_dir().join(format!("ds-file-storage-test-{stamp}-{pid}-{seq}"))
1586    }
1587
1588    fn test_storage() -> FileStorage {
1589        FileStorage::new(test_storage_dir(), 1024 * 1024, 100 * 1024, false)
1590            .expect("file storage should initialize")
1591    }
1592
1593    #[test]
1594    fn test_delete_removes_files() {
1595        let storage = test_storage();
1596        let config = StreamConfig::new("text/plain".to_string());
1597        storage.create_stream("test", config).unwrap();
1598        storage
1599            .append("test", Bytes::from("data"), "text/plain")
1600            .unwrap();
1601
1602        let dir = storage.stream_dir_for_name("test").unwrap();
1603        assert!(dir.exists(), "stream directory should exist before delete");
1604
1605        storage.delete("test").unwrap();
1606        assert!(
1607            !dir.exists(),
1608            "stream directory should be removed after delete"
1609        );
1610    }
1611
1612    // Restore-from-disk and closed-stream durability tests live in
1613    // tests/crash_recovery.rs and the storage_backend_contract suite.
1614
1615    #[test]
1616    fn test_partial_record_truncation_on_recovery() {
1617        let root = test_storage_dir();
1618        let config = StreamConfig::new("text/plain".to_string());
1619
1620        {
1621            let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1622            storage.create_stream("s", config.clone()).unwrap();
1623            storage
1624                .append("s", Bytes::from("good"), "text/plain")
1625                .unwrap();
1626        }
1627
1628        // Append partial garbage to the data log (incomplete record header)
1629        let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode("s".as_bytes());
1630        let log_path = root.join(&encoded).join("data.log");
1631        let mut f = OpenOptions::new().append(true).open(&log_path).unwrap();
1632        // Write a 2-byte partial header (less than the 4-byte record header)
1633        f.write_all(&[0xFF, 0xFF]).unwrap();
1634        drop(f);
1635
1636        let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1637        let read = restored.read("s", &Offset::start()).unwrap();
1638        assert_eq!(read.messages.len(), 1);
1639        assert_eq!(read.messages[0], Bytes::from("good"));
1640    }
1641}