Skip to main content

durable_streams_server/storage/
file.rs

1//! File-backed storage using one append-only log per stream.
2//!
3//! This backend keeps an in-memory index for fast reads and stores stream data
4//! beneath a caller-supplied root directory. It is a good fit when you want
5//! local persistence without introducing an external database.
6
7use super::{
8    CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
9    ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
10};
11use crate::protocol::error::{Error, Result};
12use crate::protocol::offset::Offset;
13use crate::protocol::producer::ProducerHeaders;
14use base64::Engine;
15use bytes::Bytes;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::fs::{self, File, OpenOptions};
20use std::io::{self, Read, Seek, SeekFrom, Write};
21use std::path::{Path, PathBuf};
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::{Arc, RwLock};
24use tokio::sync::broadcast;
25use tracing::warn;
26
27/// Binary record header size: little-endian `u32` payload length.
28const RECORD_HEADER_BYTES: usize = 4;
29const INITIAL_INDEX_CAPACITY: usize = 256;
30const INITIAL_PRODUCERS_CAPACITY: usize = 8;
31
32/// Re-issue a syscall when interrupted by a signal (`EINTR`).
33///
34/// Rust's standard library does not retry `fsync`, `rename`, or `mkdir` on
35/// `EINTR`, so this wrapper handles the standard POSIX retry contract.
36/// Unlike the previous `retry_on_eintr`, this never sleeps — `EINTR`
37/// retries are immediate by convention. Other transient errors (`WouldBlock`,
38/// `TimedOut`) propagate immediately and become 503 via error classification.
39fn retry_on_eintr<T>(
40    mut op: impl FnMut() -> std::result::Result<T, io::Error>,
41) -> std::result::Result<T, io::Error> {
42    loop {
43        match op() {
44            Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
45            result => return result,
46        }
47    }
48}
49
50#[derive(Debug, Clone)]
51struct MessageIndex {
52    offset: Offset,
53    file_pos: u64,
54    byte_len: u64,
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58struct StreamMeta {
59    name: String,
60    config: StreamConfig,
61    closed: bool,
62    created_at: DateTime<Utc>,
63    last_seq: Option<String>,
64    producers: HashMap<String, ProducerState>,
65}
66
67struct StreamEntry {
68    config: StreamConfig,
69    index: Vec<MessageIndex>,
70    closed: bool,
71    next_read_seq: u64,
72    next_byte_offset: u64,
73    total_bytes: u64,
74    created_at: DateTime<Utc>,
75    producers: HashMap<String, ProducerState>,
76    notify: broadcast::Sender<()>,
77    last_seq: Option<String>,
78    file: File,
79    file_len: u64,
80    dir: PathBuf,
81}
82
83impl StreamEntry {
84    fn new(config: StreamConfig, file: File, dir: PathBuf) -> Self {
85        let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
86        let file_len = file.metadata().map_or(0, |m| m.len());
87        Self {
88            config,
89            index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
90            closed: false,
91            next_read_seq: 0,
92            next_byte_offset: 0,
93            total_bytes: 0,
94            created_at: Utc::now(),
95            producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
96            notify,
97            last_seq: None,
98            file,
99            file_len,
100            dir,
101        }
102    }
103}
104
105/// High-throughput file-backed storage.
106///
107/// Design:
108/// - One append-only log file per stream (`data.log`)
109/// - In-memory offset/file index for fast reads
110/// - Stream-level write lock serializes appends and preserves monotonic offsets
111/// - Batched write per append call reduces syscall overhead
112///
113/// `sync_on_append = false` prioritizes throughput and may lose recently
114/// appended data on crash. `sync_on_append = true` trades latency for stronger
115/// durability semantics.
116#[allow(clippy::module_name_repetitions)]
117pub struct FileStorage {
118    streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
119    total_bytes: AtomicU64,
120    max_total_bytes: u64,
121    max_stream_bytes: u64,
122    root_dir: PathBuf,
123    root_dir_canonical: PathBuf,
124    sync_on_append: bool,
125}
126
127impl FileStorage {
128    /// Create or reopen a file-backed storage root.
129    ///
130    /// Existing streams under `root_dir` are discovered and indexed during
131    /// startup so subsequent reads can serve offsets without rescanning files.
132    ///
133    /// # Errors
134    ///
135    /// Returns `Error::Storage` if the root directory cannot be created or
136    /// existing streams fail to load from disk.
137    pub fn new(
138        root_dir: impl Into<PathBuf>,
139        max_total_bytes: u64,
140        max_stream_bytes: u64,
141        sync_on_append: bool,
142    ) -> Result<Self> {
143        let root_dir = root_dir.into();
144        retry_on_eintr(|| fs::create_dir_all(&root_dir)).map_err(|e| {
145            Error::classify_io_failure(
146                "file",
147                "create storage directory",
148                format!(
149                    "failed to create storage directory {}: {e}",
150                    root_dir.display()
151                ),
152                &e,
153            )
154        })?;
155
156        let root_dir_canonical = fs::canonicalize(&root_dir).map_err(|e| {
157            Error::Storage(format!(
158                "failed to canonicalize storage directory {}: {e}",
159                root_dir.display()
160            ))
161        })?;
162
163        let storage = Self {
164            streams: RwLock::new(HashMap::new()),
165            total_bytes: AtomicU64::new(0),
166            max_total_bytes,
167            max_stream_bytes,
168            root_dir,
169            root_dir_canonical,
170            sync_on_append,
171        };
172        storage.load_existing_streams()?;
173        Ok(storage)
174    }
175
176    /// Return the currently tracked total payload bytes across all streams.
177    #[must_use]
178    pub fn total_bytes(&self) -> u64 {
179        self.total_bytes.load(Ordering::Acquire)
180    }
181
182    /// Map a stream name to a directory path inside `root_dir`.
183    ///
184    /// Uses base64url encoding (alphabet `[A-Za-z0-9_-]`) so the output
185    /// cannot contain path separators, but we verify containment anyway
186    /// as defense in depth.
187    fn stream_dir_for_name(&self, name: &str) -> Result<PathBuf> {
188        let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode(name.as_bytes());
189
190        // Reject path traversal sequences and separators. Base64url encoding
191        // (alphabet [A-Za-z0-9_-]) cannot produce these, but the explicit
192        // checks act as defense in depth and satisfy static analysis (CodeQL
193        // rust/path-injection).
194        if encoded.contains("..") || encoded.contains('/') || encoded.contains('\\') {
195            return Err(Error::Storage(
196                "encoded stream name contains path traversal characters".to_string(),
197            ));
198        }
199        if !encoded
200            .chars()
201            .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
202        {
203            return Err(Error::Storage(
204                "encoded stream directory contains invalid characters".to_string(),
205            ));
206        }
207
208        let dir = self.root_dir.join(&encoded);
209        if !dir.starts_with(&self.root_dir) {
210            return Err(Error::Storage(format!(
211                "stream directory escapes storage root: {encoded}"
212            )));
213        }
214        Ok(dir)
215    }
216
217    fn validate_stream_dir(&self, dir: &Path) -> Result<()> {
218        if !dir.starts_with(&self.root_dir) {
219            return Err(Error::Storage(format!(
220                "path escapes storage root: {}",
221                dir.display()
222            )));
223        }
224
225        let rel = dir.strip_prefix(&self.root_dir).map_err(|e| {
226            Error::Storage(format!(
227                "failed to validate storage path {}: {e}",
228                dir.display()
229            ))
230        })?;
231        if rel.components().count() != 1 {
232            return Err(Error::Storage(format!(
233                "invalid stream path depth: {}",
234                dir.display()
235            )));
236        }
237
238        if dir.exists() {
239            let metadata = fs::symlink_metadata(dir).map_err(|e| {
240                Error::Storage(format!(
241                    "failed to stat stream directory {}: {e}",
242                    dir.display()
243                ))
244            })?;
245            if metadata.file_type().is_symlink() {
246                return Err(Error::Storage(format!(
247                    "stream directory cannot be a symlink: {}",
248                    dir.display()
249                )));
250            }
251            if !metadata.is_dir() {
252                return Err(Error::Storage(format!(
253                    "stream path is not a directory: {}",
254                    dir.display()
255                )));
256            }
257
258            let canonical = fs::canonicalize(dir).map_err(|e| {
259                Error::Storage(format!(
260                    "failed to canonicalize stream directory {}: {e}",
261                    dir.display()
262                ))
263            })?;
264            if !canonical.starts_with(&self.root_dir_canonical) {
265                return Err(Error::Storage(format!(
266                    "stream directory resolves outside storage root: {}",
267                    dir.display()
268                )));
269            }
270        }
271
272        Ok(())
273    }
274
275    fn data_log_path(dir: &Path) -> PathBuf {
276        dir.join("data.log")
277    }
278
279    fn meta_path(dir: &Path) -> PathBuf {
280        dir.join("meta.json")
281    }
282
283    fn write_metadata_for(&self, name: &str, entry: &StreamEntry) -> Result<()> {
284        self.validate_stream_dir(&entry.dir)?;
285        let meta = StreamMeta {
286            name: name.to_string(),
287            config: entry.config.clone(),
288            closed: entry.closed,
289            created_at: entry.created_at,
290            last_seq: entry.last_seq.clone(),
291            producers: entry.producers.clone(),
292        };
293
294        let meta_path = Self::meta_path(&entry.dir);
295        let tmp_path = entry.dir.join("meta.json.tmp");
296        let payload = serde_json::to_vec(&meta)
297            .map_err(|e| Error::Storage(format!("failed to serialize stream metadata: {e}")))?;
298
299        retry_on_eintr(|| fs::write(&tmp_path, payload.as_slice())).map_err(|e| {
300            Error::classify_io_failure(
301                "file",
302                "write stream metadata temp file",
303                format!(
304                    "failed to write metadata temp file {}: {e}",
305                    tmp_path.display()
306                ),
307                &e,
308            )
309        })?;
310
311        retry_on_eintr(|| fs::rename(&tmp_path, &meta_path)).map_err(|e| {
312            Error::classify_io_failure(
313                "file",
314                "replace stream metadata",
315                format!(
316                    "failed to atomically replace metadata {}: {e}",
317                    meta_path.display()
318                ),
319                &e,
320            )
321        })?;
322
323        Ok(())
324    }
325
326    fn open_stream_file(&self, dir: &Path) -> Result<File> {
327        self.validate_stream_dir(dir)?;
328        let path = Self::data_log_path(dir);
329        retry_on_eintr(|| {
330            OpenOptions::new()
331                .create(true)
332                .append(true)
333                .read(true)
334                .open(&path)
335        })
336        .map_err(|e| {
337            Error::classify_io_failure(
338                "file",
339                "open stream log",
340                format!("failed to open stream log {}: {e}", path.display()),
341                &e,
342            )
343        })
344    }
345
346    fn rebuild_index(file: &mut File) -> Result<(Vec<MessageIndex>, u64, u64)> {
347        let mut index = Vec::new();
348        let mut next_read_seq = 0u64;
349        let mut next_byte_offset = 0u64;
350
351        let file_len = file
352            .metadata()
353            .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
354            .len();
355
356        let mut cursor = 0u64;
357        let mut header = [0u8; RECORD_HEADER_BYTES];
358
359        while cursor < file_len {
360            file.seek(SeekFrom::Start(cursor))
361                .map_err(|e| Error::Storage(format!("failed to seek stream log: {e}")))?;
362
363            let read = file
364                .read(&mut header)
365                .map_err(|e| Error::Storage(format!("failed to read stream log header: {e}")))?;
366
367            if read == 0 {
368                break;
369            }
370
371            if read < RECORD_HEADER_BYTES {
372                file.set_len(cursor).map_err(|e| {
373                    Error::Storage(format!("failed to truncate partial record: {e}"))
374                })?;
375                break;
376            }
377
378            let record_len = u64::from(u32::from_le_bytes(header));
379            let record_end = cursor + RECORD_HEADER_BYTES as u64 + record_len;
380
381            if record_end > file_len {
382                file.set_len(cursor).map_err(|e| {
383                    Error::Storage(format!("failed to truncate partial record: {e}"))
384                })?;
385                break;
386            }
387
388            index.push(MessageIndex {
389                offset: Offset::new(next_read_seq, next_byte_offset),
390                file_pos: cursor + RECORD_HEADER_BYTES as u64,
391                byte_len: record_len,
392            });
393
394            next_read_seq += 1;
395            next_byte_offset += record_len;
396            cursor = record_end;
397        }
398
399        file.seek(SeekFrom::End(0))
400            .map_err(|e| Error::Storage(format!("failed to seek end of stream log: {e}")))?;
401
402        Ok((index, next_read_seq, next_byte_offset))
403    }
404
405    fn rollback_total_bytes(&self, bytes: u64) {
406        self.total_bytes
407            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
408                Some(current.saturating_sub(bytes))
409            })
410            .ok();
411    }
412
413    fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
414        let streams = self.streams.read().expect("streams lock poisoned");
415        streams.get(name).map(Arc::clone)
416    }
417
418    fn append_records(
419        &self,
420        name: &str,
421        stream: &mut StreamEntry,
422        messages: &[Bytes],
423    ) -> Result<()> {
424        if messages.is_empty() {
425            return Ok(());
426        }
427
428        let mut total_batch_bytes = 0u64;
429        let mut payload_bytes = 0u64;
430        let mut sizes = Vec::with_capacity(messages.len());
431
432        for msg in messages {
433            let len = u64::try_from(msg.len()).unwrap_or(u64::MAX);
434            if len > u64::from(u32::MAX) {
435                return Err(Error::InvalidHeader {
436                    header: "Content-Length".to_string(),
437                    reason: "message too large for file record format".to_string(),
438                });
439            }
440            payload_bytes += len;
441            total_batch_bytes += len;
442            sizes.push(len);
443        }
444
445        // Reserve global capacity first (preserves error precedence: global
446        // limit takes priority over per-stream limit). Uses a CAS loop so
447        // concurrent appends on different streams cannot exceed the cap.
448        if self
449            .total_bytes
450            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
451                current
452                    .checked_add(total_batch_bytes)
453                    .filter(|next| *next <= self.max_total_bytes)
454            })
455            .is_err()
456        {
457            return Err(Error::MemoryLimitExceeded);
458        }
459
460        // Check per-stream limit; rollback global reservation on failure.
461        if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
462            self.rollback_total_bytes(total_batch_bytes);
463            return Err(Error::StreamSizeLimitExceeded);
464        }
465
466        let wire_overhead = RECORD_HEADER_BYTES.saturating_mul(messages.len());
467        let mut write_buf =
468            Vec::with_capacity(usize::try_from(payload_bytes).unwrap_or(0) + wire_overhead);
469        for msg in messages {
470            let len = u32::try_from(msg.len()).unwrap_or(u32::MAX);
471            write_buf.extend_from_slice(&len.to_le_bytes());
472            write_buf.extend_from_slice(msg);
473        }
474
475        let before_len = stream.file_len;
476
477        if let Err(e) = retry_on_eintr(|| stream.file.write_all(&write_buf)) {
478            // Write errors may still leave partial bytes on disk; refresh cached length.
479            if let Ok(m) = stream.file.metadata() {
480                stream.file_len = m.len();
481            }
482            self.rollback_total_bytes(total_batch_bytes);
483            return Err(Error::Storage(format!(
484                "failed to append stream log for {name}: {e}"
485            )));
486        }
487
488        if self.sync_on_append
489            && let Err(e) = retry_on_eintr(|| stream.file.sync_data())
490        {
491            // Data may be written even if fsync fails; refresh cached length.
492            if let Ok(m) = stream.file.metadata() {
493                stream.file_len = m.len();
494            }
495            self.rollback_total_bytes(total_batch_bytes);
496            return Err(Error::classify_io_failure(
497                "file",
498                "sync stream log",
499                format!("failed to sync stream log for {name}: {e}"),
500                &e,
501            ));
502        }
503
504        let mut cursor = before_len;
505        for len in sizes {
506            let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
507            stream.index.push(MessageIndex {
508                offset,
509                file_pos: cursor + RECORD_HEADER_BYTES as u64,
510                byte_len: len,
511            });
512            stream.next_read_seq += 1;
513            stream.next_byte_offset += len;
514            stream.total_bytes += len;
515            cursor += RECORD_HEADER_BYTES as u64 + len;
516        }
517        stream.file_len = cursor;
518
519        let _ = stream.notify.send(());
520        Ok(())
521    }
522
523    fn read_messages(file: &File, index_slice: &[MessageIndex]) -> Result<Vec<Bytes>> {
524        if index_slice.is_empty() {
525            return Ok(Vec::new());
526        }
527
528        let first_pos = index_slice[0].file_pos;
529        let last = index_slice
530            .last()
531            .expect("index_slice non-empty due early return");
532        let read_end = last.file_pos + last.byte_len;
533        let read_len = read_end.saturating_sub(first_pos);
534
535        // Use positional read (pread) to avoid the shared file-offset race
536        // that occurs when multiple readers `try_clone()` the same file
537        // descriptor and `seek()` concurrently.
538        let mut raw = vec![0u8; usize::try_from(read_len).unwrap_or(usize::MAX)];
539        #[cfg(unix)]
540        {
541            use std::os::unix::fs::FileExt;
542            file.read_exact_at(&mut raw, first_pos)
543                .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
544        }
545        #[cfg(windows)]
546        {
547            use std::os::windows::fs::FileExt;
548            file.seek_read(&mut raw, first_pos)
549                .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
550        }
551        #[cfg(not(any(unix, windows)))]
552        {
553            let mut reader = file
554                .try_clone()
555                .map_err(|e| Error::Storage(format!("failed to clone stream file handle: {e}")))?;
556            reader
557                .seek(SeekFrom::Start(first_pos))
558                .map_err(|e| Error::Storage(format!("failed to seek message data: {e}")))?;
559            reader
560                .read_exact(&mut raw)
561                .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
562        }
563
564        let shared = Bytes::from(raw);
565        let mut messages = Vec::with_capacity(index_slice.len());
566        for idx in index_slice {
567            let rel_start =
568                usize::try_from(idx.file_pos.saturating_sub(first_pos)).unwrap_or(usize::MAX);
569            let rel_end = rel_start + usize::try_from(idx.byte_len).unwrap_or(usize::MAX);
570            messages.push(shared.slice(rel_start..rel_end));
571        }
572
573        Ok(messages)
574    }
575
576    fn remove_stream_dir(&self, dir: &Path) -> Result<()> {
577        self.validate_stream_dir(dir)?;
578        retry_on_eintr(|| fs::remove_dir_all(dir)).map_err(|e| {
579            Error::classify_io_failure(
580                "file",
581                "remove stream directory",
582                format!("failed to remove stream directory {}: {e}", dir.display()),
583                &e,
584            )
585        })
586    }
587
588    fn load_existing_streams(&self) -> Result<()> {
589        let entries = fs::read_dir(&self.root_dir).map_err(|e| {
590            Error::Storage(format!(
591                "failed to read storage directory {}: {e}",
592                self.root_dir.display()
593            ))
594        })?;
595
596        let mut streams_map = self.streams.write().expect("streams lock poisoned");
597        let mut restored_total = 0u64;
598
599        for dir_entry in entries {
600            let dir_entry = dir_entry
601                .map_err(|e| Error::Storage(format!("failed to inspect storage entry: {e}")))?;
602            let path = dir_entry.path();
603            if !path.is_dir() {
604                continue;
605            }
606            if self.validate_stream_dir(&path).is_err() {
607                continue;
608            }
609
610            let meta_path = Self::meta_path(&path);
611            if !meta_path.exists() {
612                continue;
613            }
614
615            let meta_payload = fs::read(&meta_path).map_err(|e| {
616                Error::Storage(format!(
617                    "failed to read stream metadata {}: {e}",
618                    meta_path.display()
619                ))
620            })?;
621            let meta: StreamMeta = serde_json::from_slice(&meta_payload).map_err(|e| {
622                Error::Storage(format!(
623                    "failed to parse stream metadata {}: {e}",
624                    meta_path.display()
625                ))
626            })?;
627
628            let mut file = self.open_stream_file(&path)?;
629            let (index, next_read_seq, next_byte_offset) = Self::rebuild_index(&mut file)?;
630            let total_bytes = next_byte_offset;
631            let file_len = file
632                .metadata()
633                .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
634                .len();
635
636            // Reconcile: data.log is the source of truth for message count
637            // and byte offsets. If meta.json is stale (e.g. crash before
638            // metadata flush), log a warning so operators can investigate.
639            let log_msg_count = index.len() as u64;
640            let meta_has_data =
641                meta.closed || !meta.producers.is_empty() || meta.last_seq.is_some();
642            if log_msg_count == 0 && meta_has_data {
643                warn!(
644                    stream = meta.name,
645                    "meta.json indicates activity but data.log has 0 messages; \
646                     data.log is authoritative"
647                );
648            }
649
650            let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
651            let mut entry = StreamEntry {
652                config: meta.config,
653                index,
654                closed: meta.closed,
655                next_read_seq,
656                next_byte_offset,
657                total_bytes,
658                created_at: meta.created_at,
659                producers: meta.producers,
660                notify,
661                last_seq: meta.last_seq,
662                file,
663                file_len,
664                dir: path,
665            };
666
667            if super::is_stream_expired(&entry.config) {
668                self.remove_stream_dir(&entry.dir)?;
669                continue;
670            }
671
672            super::cleanup_stale_producers(&mut entry.producers);
673
674            // Re-persist metadata if the on-disk copy may be stale so that
675            // future restarts see a consistent snapshot. Best-effort: a
676            // failure here is non-fatal since the data.log remains correct.
677            if let Err(e) = self.write_metadata_for(&meta.name, &entry) {
678                warn!(
679                    %e,
680                    stream = meta.name,
681                    "failed to re-persist reconciled metadata during recovery"
682                );
683            }
684            restored_total = restored_total.saturating_add(entry.total_bytes);
685            streams_map.insert(meta.name, Arc::new(RwLock::new(entry)));
686        }
687
688        self.total_bytes.store(restored_total, Ordering::Release);
689
690        Ok(())
691    }
692}
693
694impl Storage for FileStorage {
695    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
696        let mut streams = self.streams.write().expect("streams lock poisoned");
697
698        if let Some(stream_arc) = streams.get(name) {
699            let stream = stream_arc.read().expect("stream lock poisoned");
700
701            if super::is_stream_expired(&stream.config) {
702                let stream_bytes = stream.total_bytes;
703                let dir = stream.dir.clone();
704                drop(stream);
705                streams.remove(name);
706
707                self.remove_stream_dir(&dir)?;
708                self.rollback_total_bytes(stream_bytes);
709            } else if stream.config == config {
710                return Ok(CreateStreamResult::AlreadyExists);
711            } else {
712                return Err(Error::ConfigMismatch);
713            }
714        }
715
716        let dir = self.stream_dir_for_name(name)?;
717        retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
718            Error::classify_io_failure(
719                "file",
720                "create stream directory",
721                format!("failed to create stream directory {}: {e}", dir.display()),
722                &e,
723            )
724        })?;
725
726        self.validate_stream_dir(&dir)?;
727        let file = self.open_stream_file(&dir)?;
728        let entry = StreamEntry::new(config, file, dir.clone());
729
730        if let Err(e) = self.write_metadata_for(name, &entry) {
731            if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
732                warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
733            }
734            return Err(e);
735        }
736        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
737
738        Ok(CreateStreamResult::Created)
739    }
740
741    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
742        let stream_arc = self
743            .get_stream(name)
744            .ok_or_else(|| Error::NotFound(name.to_string()))?;
745
746        let mut stream = stream_arc.write().expect("stream lock poisoned");
747
748        if super::is_stream_expired(&stream.config) {
749            return Err(Error::StreamExpired);
750        }
751
752        if stream.closed {
753            return Err(Error::StreamClosed);
754        }
755
756        super::validate_content_type(&stream.config.content_type, content_type)?;
757
758        let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
759        self.append_records(name, &mut stream, &[data])?;
760        // Plain appends do not mutate persisted metadata fields (closed/seq/producers).
761        Ok(offset)
762    }
763
764    fn batch_append(
765        &self,
766        name: &str,
767        messages: Vec<Bytes>,
768        content_type: &str,
769        seq: Option<&str>,
770    ) -> Result<Offset> {
771        if messages.is_empty() {
772            return Err(Error::InvalidHeader {
773                header: "Content-Length".to_string(),
774                reason: "batch cannot be empty".to_string(),
775            });
776        }
777
778        let stream_arc = self
779            .get_stream(name)
780            .ok_or_else(|| Error::NotFound(name.to_string()))?;
781
782        let mut stream = stream_arc.write().expect("stream lock poisoned");
783
784        if super::is_stream_expired(&stream.config) {
785            return Err(Error::StreamExpired);
786        }
787
788        if stream.closed {
789            return Err(Error::StreamClosed);
790        }
791
792        super::validate_content_type(&stream.config.content_type, content_type)?;
793
794        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
795        self.append_records(name, &mut stream, &messages)?;
796        if let Some(new_seq) = pending_seq {
797            stream.last_seq = Some(new_seq);
798            // Stream-Seq changed, persist metadata best-effort.
799            if let Err(e) = self.write_metadata_for(name, &stream) {
800                warn!(%e, stream = name, "metadata persist failed after batch append");
801            }
802        }
803
804        Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
805    }
806
807    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
808        let stream_arc = self
809            .get_stream(name)
810            .ok_or_else(|| Error::NotFound(name.to_string()))?;
811
812        let stream = stream_arc.read().expect("stream lock poisoned");
813
814        if super::is_stream_expired(&stream.config) {
815            return Err(Error::StreamExpired);
816        }
817
818        if from_offset.is_now() {
819            return Ok(ReadResult {
820                messages: Vec::new(),
821                next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
822                at_tail: true,
823                closed: stream.closed,
824            });
825        }
826
827        let start_idx = if from_offset.is_start() {
828            0
829        } else {
830            match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
831                Ok(idx) | Err(idx) => idx,
832            }
833        };
834
835        let index_slice = &stream.index[start_idx..];
836        let messages = Self::read_messages(&stream.file, index_slice)?;
837        let at_tail = start_idx + messages.len() >= stream.index.len();
838
839        Ok(ReadResult {
840            messages,
841            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
842            at_tail,
843            closed: stream.closed,
844        })
845    }
846
847    fn delete(&self, name: &str) -> Result<()> {
848        let mut streams = self.streams.write().expect("streams lock poisoned");
849
850        if let Some(stream_arc) = streams.remove(name) {
851            let stream = stream_arc.read().expect("stream lock poisoned");
852            let dir = stream.dir.clone();
853            let stream_bytes = stream.total_bytes;
854            drop(stream);
855
856            if let Err(e) = self.remove_stream_dir(&dir) {
857                // Re-insert into map — the stream still exists on disk.
858                streams.insert(name.to_string(), stream_arc);
859                return Err(e);
860            }
861            self.rollback_total_bytes(stream_bytes);
862            Ok(())
863        } else {
864            Err(Error::NotFound(name.to_string()))
865        }
866    }
867
868    fn head(&self, name: &str) -> Result<StreamMetadata> {
869        let stream_arc = self
870            .get_stream(name)
871            .ok_or_else(|| Error::NotFound(name.to_string()))?;
872
873        let stream = stream_arc.read().expect("stream lock poisoned");
874
875        if super::is_stream_expired(&stream.config) {
876            return Err(Error::StreamExpired);
877        }
878
879        Ok(StreamMetadata {
880            config: stream.config.clone(),
881            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
882            closed: stream.closed,
883            total_bytes: stream.total_bytes,
884            message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
885            created_at: stream.created_at,
886        })
887    }
888
889    fn close_stream(&self, name: &str) -> Result<()> {
890        let stream_arc = self
891            .get_stream(name)
892            .ok_or_else(|| Error::NotFound(name.to_string()))?;
893
894        let mut stream = stream_arc.write().expect("stream lock poisoned");
895
896        if super::is_stream_expired(&stream.config) {
897            return Err(Error::StreamExpired);
898        }
899
900        stream.closed = true;
901        self.write_metadata_for(name, &stream)?;
902
903        let _ = stream.notify.send(());
904        Ok(())
905    }
906
907    fn append_with_producer(
908        &self,
909        name: &str,
910        messages: Vec<Bytes>,
911        content_type: &str,
912        producer: &ProducerHeaders,
913        should_close: bool,
914        seq: Option<&str>,
915    ) -> Result<ProducerAppendResult> {
916        let stream_arc = self
917            .get_stream(name)
918            .ok_or_else(|| Error::NotFound(name.to_string()))?;
919
920        let mut stream = stream_arc.write().expect("stream lock poisoned");
921
922        if super::is_stream_expired(&stream.config) {
923            return Err(Error::StreamExpired);
924        }
925
926        super::cleanup_stale_producers(&mut stream.producers);
927
928        if !messages.is_empty() {
929            super::validate_content_type(&stream.config.content_type, content_type)?;
930        }
931
932        let now = Utc::now();
933
934        match super::check_producer(
935            stream.producers.get(producer.id.as_str()),
936            producer,
937            stream.closed,
938        )? {
939            ProducerCheck::Accept => {}
940            ProducerCheck::Duplicate { epoch, seq } => {
941                return Ok(ProducerAppendResult::Duplicate {
942                    epoch,
943                    seq,
944                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
945                    closed: stream.closed,
946                });
947            }
948        }
949
950        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
951        self.append_records(name, &mut stream, &messages)?;
952
953        if let Some(new_seq) = pending_seq {
954            stream.last_seq = Some(new_seq);
955        }
956        if should_close {
957            stream.closed = true;
958        }
959
960        stream.producers.insert(
961            producer.id.clone(),
962            ProducerState {
963                epoch: producer.epoch,
964                last_seq: producer.seq,
965                updated_at: now,
966            },
967        );
968
969        // Data is committed to the log; metadata write failure is non-fatal
970        if let Err(e) = self.write_metadata_for(name, &stream) {
971            warn!(%e, stream = name, "metadata persist failed after committed producer append");
972        }
973
974        Ok(ProducerAppendResult::Accepted {
975            epoch: producer.epoch,
976            seq: producer.seq,
977            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
978            closed: stream.closed,
979        })
980    }
981
982    fn create_stream_with_data(
983        &self,
984        name: &str,
985        config: StreamConfig,
986        messages: Vec<Bytes>,
987        should_close: bool,
988    ) -> Result<CreateWithDataResult> {
989        let mut streams = self.streams.write().expect("streams lock poisoned");
990
991        if let Some(stream_arc) = streams.get(name) {
992            let stream = stream_arc.read().expect("stream lock poisoned");
993
994            if super::is_stream_expired(&stream.config) {
995                let stream_bytes = stream.total_bytes;
996                let dir = stream.dir.clone();
997                drop(stream);
998                streams.remove(name);
999
1000                self.remove_stream_dir(&dir)?;
1001                self.rollback_total_bytes(stream_bytes);
1002            } else if stream.config == config {
1003                return Ok(CreateWithDataResult {
1004                    status: CreateStreamResult::AlreadyExists,
1005                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1006                    closed: stream.closed,
1007                });
1008            } else {
1009                return Err(Error::ConfigMismatch);
1010            }
1011        }
1012
1013        let dir = self.stream_dir_for_name(name)?;
1014        retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
1015            Error::classify_io_failure(
1016                "file",
1017                "create stream directory",
1018                format!("failed to create stream directory {}: {e}", dir.display()),
1019                &e,
1020            )
1021        })?;
1022
1023        self.validate_stream_dir(&dir)?;
1024        let file = self.open_stream_file(&dir)?;
1025        let mut entry = StreamEntry::new(config, file, dir.clone());
1026
1027        if !messages.is_empty()
1028            && let Err(e) = self.append_records(name, &mut entry, &messages)
1029        {
1030            if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1031                warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1032            }
1033            return Err(e);
1034        }
1035        if should_close {
1036            entry.closed = true;
1037        }
1038
1039        let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
1040        let closed = entry.closed;
1041
1042        if let Err(e) = self.write_metadata_for(name, &entry) {
1043            if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1044                warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1045            }
1046            return Err(e);
1047        }
1048        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
1049
1050        Ok(CreateWithDataResult {
1051            status: CreateStreamResult::Created,
1052            next_offset,
1053            closed,
1054        })
1055    }
1056
1057    fn exists(&self, name: &str) -> bool {
1058        let streams = self.streams.read().expect("streams lock poisoned");
1059        if let Some(stream_arc) = streams.get(name) {
1060            let stream = stream_arc.read().expect("stream lock poisoned");
1061            !super::is_stream_expired(&stream.config)
1062        } else {
1063            false
1064        }
1065    }
1066
1067    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1068        let stream_arc = self.get_stream(name)?;
1069        let stream = stream_arc.read().expect("stream lock poisoned");
1070
1071        if super::is_stream_expired(&stream.config) {
1072            return None;
1073        }
1074
1075        Some(stream.notify.subscribe())
1076    }
1077
1078    fn cleanup_expired_streams(&self) -> usize {
1079        let mut streams = self.streams.write().expect("streams lock poisoned");
1080        let mut expired = Vec::new();
1081
1082        for (name, stream_arc) in streams.iter() {
1083            let stream = stream_arc.read().expect("stream lock poisoned");
1084            if super::is_stream_expired(&stream.config) {
1085                expired.push((name.clone(), stream.total_bytes, stream.dir.clone()));
1086            }
1087        }
1088
1089        let count = expired.len();
1090        for (name, bytes, dir) in &expired {
1091            streams.remove(name);
1092            if let Err(e) = self.remove_stream_dir(dir) {
1093                warn!(%e, stream = name.as_str(), "failed to remove expired stream directory");
1094            } else {
1095                self.rollback_total_bytes(*bytes);
1096            }
1097        }
1098
1099        count
1100    }
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105    use super::*;
1106    use base64::Engine;
1107
1108    fn test_storage_dir() -> PathBuf {
1109        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1110        let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1111        let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1112        let pid = std::process::id();
1113        std::env::temp_dir().join(format!("ds-file-storage-test-{stamp}-{pid}-{seq}"))
1114    }
1115
1116    fn test_storage() -> FileStorage {
1117        FileStorage::new(test_storage_dir(), 1024 * 1024, 100 * 1024, false)
1118            .expect("file storage should initialize")
1119    }
1120
1121    #[test]
1122    fn test_delete_removes_files() {
1123        let storage = test_storage();
1124        let config = StreamConfig::new("text/plain".to_string());
1125        storage.create_stream("test", config).unwrap();
1126        storage
1127            .append("test", Bytes::from("data"), "text/plain")
1128            .unwrap();
1129
1130        let dir = storage.stream_dir_for_name("test").unwrap();
1131        assert!(dir.exists(), "stream directory should exist before delete");
1132
1133        storage.delete("test").unwrap();
1134        assert!(
1135            !dir.exists(),
1136            "stream directory should be removed after delete"
1137        );
1138    }
1139
1140    #[test]
1141    fn test_restore_from_disk() {
1142        let root = test_storage_dir();
1143        let config = StreamConfig::new("text/plain".to_string());
1144
1145        {
1146            let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false)
1147                .expect("file storage should initialize");
1148            storage
1149                .create_stream("events", config.clone())
1150                .expect("stream should be created");
1151            storage
1152                .append("events", Bytes::from("event-1"), "text/plain")
1153                .expect("append should succeed");
1154            storage
1155                .append("events", Bytes::from("event-2"), "text/plain")
1156                .expect("append should succeed");
1157        }
1158
1159        let restored =
1160            FileStorage::new(root, 1024 * 1024, 100 * 1024, false).expect("restore should work");
1161
1162        let read = restored
1163            .read("events", &Offset::start())
1164            .expect("read should succeed");
1165
1166        assert_eq!(read.messages.len(), 2);
1167        assert_eq!(read.messages[0], Bytes::from("event-1"));
1168        assert_eq!(read.messages[1], Bytes::from("event-2"));
1169    }
1170
1171    #[test]
1172    fn test_restore_closed_stream_from_disk() {
1173        let root = test_storage_dir();
1174        let config = StreamConfig::new("text/plain".to_string());
1175
1176        {
1177            let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1178            storage.create_stream("s", config.clone()).unwrap();
1179            storage
1180                .append("s", Bytes::from("data"), "text/plain")
1181                .unwrap();
1182            storage.close_stream("s").unwrap();
1183        }
1184
1185        let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1186        let meta = restored.head("s").unwrap();
1187        assert!(meta.closed);
1188        assert_eq!(meta.message_count, 1);
1189
1190        assert!(matches!(
1191            restored.append("s", Bytes::from("more"), "text/plain"),
1192            Err(Error::StreamClosed)
1193        ));
1194    }
1195
1196    #[test]
1197    fn test_partial_record_truncation_on_recovery() {
1198        let root = test_storage_dir();
1199        let config = StreamConfig::new("text/plain".to_string());
1200
1201        {
1202            let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1203            storage.create_stream("s", config.clone()).unwrap();
1204            storage
1205                .append("s", Bytes::from("good"), "text/plain")
1206                .unwrap();
1207        }
1208
1209        // Append partial garbage to the data log (incomplete record header)
1210        let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode("s".as_bytes());
1211        let log_path = root.join(&encoded).join("data.log");
1212        let mut f = OpenOptions::new().append(true).open(&log_path).unwrap();
1213        // Write a 2-byte partial header (less than the 4-byte record header)
1214        f.write_all(&[0xFF, 0xFF]).unwrap();
1215        drop(f);
1216
1217        let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1218        let read = restored.read("s", &Offset::start()).unwrap();
1219        assert_eq!(read.messages.len(), 1);
1220        assert_eq!(read.messages[0], Bytes::from("good"));
1221    }
1222}