Skip to main content

durable_streams_server/storage/
file.rs

1use super::{
2    CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
3    ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
4};
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use base64::Engine;
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::fs::{self, File, OpenOptions};
14use std::io::{Read, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::sync::{Arc, RwLock};
17use tokio::sync::broadcast;
18use tracing::warn;
19
20/// Binary record header size: little-endian `u32` payload length.
21const RECORD_HEADER_BYTES: usize = 4;
22const INITIAL_INDEX_CAPACITY: usize = 256;
23const INITIAL_PRODUCERS_CAPACITY: usize = 8;
24
25#[derive(Debug, Clone)]
26struct MessageIndex {
27    offset: Offset,
28    file_pos: u64,
29    byte_len: u64,
30}
31
32#[derive(Debug, Serialize, Deserialize)]
33struct StreamMeta {
34    name: String,
35    config: StreamConfig,
36    closed: bool,
37    created_at: DateTime<Utc>,
38    last_seq: Option<String>,
39    producers: HashMap<String, ProducerState>,
40}
41
42struct StreamEntry {
43    config: StreamConfig,
44    index: Vec<MessageIndex>,
45    closed: bool,
46    next_read_seq: u64,
47    next_byte_offset: u64,
48    total_bytes: u64,
49    created_at: DateTime<Utc>,
50    producers: HashMap<String, ProducerState>,
51    notify: broadcast::Sender<()>,
52    last_seq: Option<String>,
53    file: File,
54    file_len: u64,
55    dir: PathBuf,
56}
57
58impl StreamEntry {
59    fn new(config: StreamConfig, file: File, dir: PathBuf) -> Self {
60        let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
61        let file_len = file.metadata().map_or(0, |m| m.len());
62        Self {
63            config,
64            index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
65            closed: false,
66            next_read_seq: 0,
67            next_byte_offset: 0,
68            total_bytes: 0,
69            created_at: Utc::now(),
70            producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
71            notify,
72            last_seq: None,
73            file,
74            file_len,
75            dir,
76        }
77    }
78}
79
80/// High-throughput file-backed storage.
81///
82/// Design:
83/// - One append-only log file per stream (`data.log`)
84/// - In-memory offset/file index for fast reads
85/// - Stream-level write lock serializes appends and preserves monotonic offsets
86/// - Batched write per append call reduces syscall overhead
87#[allow(clippy::module_name_repetitions)]
88pub struct FileStorage {
89    streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
90    total_bytes: RwLock<u64>,
91    max_total_bytes: u64,
92    max_stream_bytes: u64,
93    root_dir: PathBuf,
94    root_dir_canonical: PathBuf,
95    sync_on_append: bool,
96}
97
98impl FileStorage {
99    /// # Errors
100    ///
101    /// Returns `Error::Storage` if the root directory cannot be created or
102    /// existing streams fail to load from disk.
103    pub fn new(
104        root_dir: impl Into<PathBuf>,
105        max_total_bytes: u64,
106        max_stream_bytes: u64,
107        sync_on_append: bool,
108    ) -> Result<Self> {
109        let root_dir = root_dir.into();
110        fs::create_dir_all(&root_dir).map_err(|e| {
111            Error::Storage(format!(
112                "failed to create storage directory {}: {e}",
113                root_dir.display()
114            ))
115        })?;
116
117        let root_dir_canonical = fs::canonicalize(&root_dir).map_err(|e| {
118            Error::Storage(format!(
119                "failed to canonicalize storage directory {}: {e}",
120                root_dir.display()
121            ))
122        })?;
123
124        let storage = Self {
125            streams: RwLock::new(HashMap::new()),
126            total_bytes: RwLock::new(0),
127            max_total_bytes,
128            max_stream_bytes,
129            root_dir,
130            root_dir_canonical,
131            sync_on_append,
132        };
133        storage.load_existing_streams()?;
134        Ok(storage)
135    }
136
137    /// # Panics
138    ///
139    /// Panics if the internal `total_bytes` lock is poisoned.
140    #[must_use]
141    pub fn total_bytes(&self) -> u64 {
142        *self.total_bytes.read().expect("total_bytes lock poisoned")
143    }
144
145    /// Map a stream name to a directory path inside `root_dir`.
146    ///
147    /// Uses base64url encoding (alphabet `[A-Za-z0-9_-]`) so the output
148    /// cannot contain path separators, but we verify containment anyway
149    /// as defense in depth.
150    fn stream_dir_for_name(&self, name: &str) -> Result<PathBuf> {
151        let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode(name.as_bytes());
152
153        // Reject path traversal sequences and separators. Base64url encoding
154        // (alphabet [A-Za-z0-9_-]) cannot produce these, but the explicit
155        // checks act as defense in depth and satisfy static analysis (CodeQL
156        // rust/path-injection).
157        if encoded.contains("..") || encoded.contains('/') || encoded.contains('\\') {
158            return Err(Error::Storage(
159                "encoded stream name contains path traversal characters".to_string(),
160            ));
161        }
162        if !encoded
163            .chars()
164            .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
165        {
166            return Err(Error::Storage(
167                "encoded stream directory contains invalid characters".to_string(),
168            ));
169        }
170
171        let dir = self.root_dir.join(&encoded);
172        if !dir.starts_with(&self.root_dir) {
173            return Err(Error::Storage(format!(
174                "stream directory escapes storage root: {encoded}"
175            )));
176        }
177        Ok(dir)
178    }
179
180    fn validate_stream_dir(&self, dir: &Path) -> Result<()> {
181        if !dir.starts_with(&self.root_dir) {
182            return Err(Error::Storage(format!(
183                "path escapes storage root: {}",
184                dir.display()
185            )));
186        }
187
188        let rel = dir.strip_prefix(&self.root_dir).map_err(|e| {
189            Error::Storage(format!(
190                "failed to validate storage path {}: {e}",
191                dir.display()
192            ))
193        })?;
194        if rel.components().count() != 1 {
195            return Err(Error::Storage(format!(
196                "invalid stream path depth: {}",
197                dir.display()
198            )));
199        }
200
201        if dir.exists() {
202            let metadata = fs::symlink_metadata(dir).map_err(|e| {
203                Error::Storage(format!(
204                    "failed to stat stream directory {}: {e}",
205                    dir.display()
206                ))
207            })?;
208            if metadata.file_type().is_symlink() {
209                return Err(Error::Storage(format!(
210                    "stream directory cannot be a symlink: {}",
211                    dir.display()
212                )));
213            }
214            if !metadata.is_dir() {
215                return Err(Error::Storage(format!(
216                    "stream path is not a directory: {}",
217                    dir.display()
218                )));
219            }
220
221            let canonical = fs::canonicalize(dir).map_err(|e| {
222                Error::Storage(format!(
223                    "failed to canonicalize stream directory {}: {e}",
224                    dir.display()
225                ))
226            })?;
227            if !canonical.starts_with(&self.root_dir_canonical) {
228                return Err(Error::Storage(format!(
229                    "stream directory resolves outside storage root: {}",
230                    dir.display()
231                )));
232            }
233        }
234
235        Ok(())
236    }
237
238    fn data_log_path(dir: &Path) -> PathBuf {
239        dir.join("data.log")
240    }
241
242    fn meta_path(dir: &Path) -> PathBuf {
243        dir.join("meta.json")
244    }
245
246    fn write_metadata_for(&self, name: &str, entry: &StreamEntry) -> Result<()> {
247        self.validate_stream_dir(&entry.dir)?;
248        let meta = StreamMeta {
249            name: name.to_string(),
250            config: entry.config.clone(),
251            closed: entry.closed,
252            created_at: entry.created_at,
253            last_seq: entry.last_seq.clone(),
254            producers: entry.producers.clone(),
255        };
256
257        let meta_path = Self::meta_path(&entry.dir);
258        let tmp_path = entry.dir.join("meta.json.tmp");
259        let payload = serde_json::to_vec(&meta)
260            .map_err(|e| Error::Storage(format!("failed to serialize stream metadata: {e}")))?;
261
262        fs::write(&tmp_path, payload).map_err(|e| {
263            Error::Storage(format!(
264                "failed to write metadata temp file {}: {e}",
265                tmp_path.display()
266            ))
267        })?;
268
269        fs::rename(&tmp_path, &meta_path).map_err(|e| {
270            Error::Storage(format!(
271                "failed to atomically replace metadata {}: {e}",
272                meta_path.display()
273            ))
274        })?;
275
276        Ok(())
277    }
278
279    fn open_stream_file(&self, dir: &Path) -> Result<File> {
280        self.validate_stream_dir(dir)?;
281        let path = Self::data_log_path(dir);
282        OpenOptions::new()
283            .create(true)
284            .append(true)
285            .read(true)
286            .open(&path)
287            .map_err(|e| {
288                Error::Storage(format!("failed to open stream log {}: {e}", path.display()))
289            })
290    }
291
292    fn rebuild_index(file: &mut File) -> Result<(Vec<MessageIndex>, u64, u64)> {
293        let mut index = Vec::new();
294        let mut next_read_seq = 0u64;
295        let mut next_byte_offset = 0u64;
296
297        let file_len = file
298            .metadata()
299            .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
300            .len();
301
302        let mut cursor = 0u64;
303        let mut header = [0u8; RECORD_HEADER_BYTES];
304
305        while cursor < file_len {
306            file.seek(SeekFrom::Start(cursor))
307                .map_err(|e| Error::Storage(format!("failed to seek stream log: {e}")))?;
308
309            let read = file
310                .read(&mut header)
311                .map_err(|e| Error::Storage(format!("failed to read stream log header: {e}")))?;
312
313            if read == 0 {
314                break;
315            }
316
317            if read < RECORD_HEADER_BYTES {
318                file.set_len(cursor).map_err(|e| {
319                    Error::Storage(format!("failed to truncate partial record: {e}"))
320                })?;
321                break;
322            }
323
324            let record_len = u64::from(u32::from_le_bytes(header));
325            let record_end = cursor + RECORD_HEADER_BYTES as u64 + record_len;
326
327            if record_end > file_len {
328                file.set_len(cursor).map_err(|e| {
329                    Error::Storage(format!("failed to truncate partial record: {e}"))
330                })?;
331                break;
332            }
333
334            index.push(MessageIndex {
335                offset: Offset::new(next_read_seq, next_byte_offset),
336                file_pos: cursor + RECORD_HEADER_BYTES as u64,
337                byte_len: record_len,
338            });
339
340            next_read_seq += 1;
341            next_byte_offset += record_len;
342            cursor = record_end;
343        }
344
345        file.seek(SeekFrom::End(0))
346            .map_err(|e| Error::Storage(format!("failed to seek end of stream log: {e}")))?;
347
348        Ok((index, next_read_seq, next_byte_offset))
349    }
350
351    fn rollback_total_bytes(&self, bytes: u64) {
352        let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
353        *total = total.saturating_sub(bytes);
354    }
355
356    fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
357        let streams = self.streams.read().expect("streams lock poisoned");
358        streams.get(name).map(Arc::clone)
359    }
360
361    fn append_records(
362        &self,
363        name: &str,
364        stream: &mut StreamEntry,
365        messages: &[Bytes],
366    ) -> Result<()> {
367        if messages.is_empty() {
368            return Ok(());
369        }
370
371        let mut total_batch_bytes = 0u64;
372        let mut payload_bytes = 0u64;
373        let mut sizes = Vec::with_capacity(messages.len());
374
375        for msg in messages {
376            let len = u64::try_from(msg.len()).unwrap_or(u64::MAX);
377            if len > u64::from(u32::MAX) {
378                return Err(Error::InvalidHeader {
379                    header: "Content-Length".to_string(),
380                    reason: "message too large for file record format".to_string(),
381                });
382            }
383            payload_bytes += len;
384            total_batch_bytes += len;
385            sizes.push(len);
386        }
387
388        // Check-and-reserve global limit atomically under a single write lock
389        // to prevent concurrent appends on different streams from exceeding it.
390        // Global check comes first to preserve error precedence.
391        {
392            let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
393            if *total + total_batch_bytes > self.max_total_bytes {
394                return Err(Error::MemoryLimitExceeded);
395            }
396            if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
397                return Err(Error::StreamSizeLimitExceeded);
398            }
399            *total += total_batch_bytes;
400        }
401
402        let wire_overhead = RECORD_HEADER_BYTES.saturating_mul(messages.len());
403        let mut write_buf =
404            Vec::with_capacity(usize::try_from(payload_bytes).unwrap_or(0) + wire_overhead);
405        for msg in messages {
406            let len = u32::try_from(msg.len()).unwrap_or(u32::MAX);
407            write_buf.extend_from_slice(&len.to_le_bytes());
408            write_buf.extend_from_slice(msg);
409        }
410
411        let before_len = stream.file_len;
412
413        if let Err(e) = stream.file.write_all(&write_buf) {
414            // Write errors may still leave partial bytes on disk; refresh cached length.
415            if let Ok(m) = stream.file.metadata() {
416                stream.file_len = m.len();
417            }
418            self.rollback_total_bytes(total_batch_bytes);
419            return Err(Error::Storage(format!(
420                "failed to append stream log for {name}: {e}"
421            )));
422        }
423
424        if self.sync_on_append
425            && let Err(e) = stream.file.sync_data()
426        {
427            // Data may be written even if fsync fails; refresh cached length.
428            if let Ok(m) = stream.file.metadata() {
429                stream.file_len = m.len();
430            }
431            self.rollback_total_bytes(total_batch_bytes);
432            return Err(Error::Storage(format!(
433                "failed to sync stream log for {name}: {e}"
434            )));
435        }
436
437        let mut cursor = before_len;
438        for len in sizes {
439            let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
440            stream.index.push(MessageIndex {
441                offset,
442                file_pos: cursor + RECORD_HEADER_BYTES as u64,
443                byte_len: len,
444            });
445            stream.next_read_seq += 1;
446            stream.next_byte_offset += len;
447            stream.total_bytes += len;
448            cursor += RECORD_HEADER_BYTES as u64 + len;
449        }
450        stream.file_len = cursor;
451
452        let _ = stream.notify.send(());
453        Ok(())
454    }
455
456    fn read_messages(file: &File, index_slice: &[MessageIndex]) -> Result<Vec<Bytes>> {
457        if index_slice.is_empty() {
458            return Ok(Vec::new());
459        }
460
461        let mut reader = file
462            .try_clone()
463            .map_err(|e| Error::Storage(format!("failed to clone stream file handle: {e}")))?;
464
465        let first_pos = index_slice[0].file_pos;
466        let last = index_slice
467            .last()
468            .expect("index_slice non-empty due early return");
469        let read_end = last.file_pos + last.byte_len;
470        let read_len = read_end.saturating_sub(first_pos);
471
472        reader
473            .seek(SeekFrom::Start(first_pos))
474            .map_err(|e| Error::Storage(format!("failed to seek message data: {e}")))?;
475
476        let mut raw = vec![0u8; usize::try_from(read_len).unwrap_or(usize::MAX)];
477        reader
478            .read_exact(&mut raw)
479            .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
480
481        let shared = Bytes::from(raw);
482        let mut messages = Vec::with_capacity(index_slice.len());
483        for idx in index_slice {
484            let rel_start =
485                usize::try_from(idx.file_pos.saturating_sub(first_pos)).unwrap_or(usize::MAX);
486            let rel_end = rel_start + usize::try_from(idx.byte_len).unwrap_or(usize::MAX);
487            messages.push(shared.slice(rel_start..rel_end));
488        }
489
490        Ok(messages)
491    }
492
493    fn remove_stream_dir(&self, dir: &Path) -> Result<()> {
494        self.validate_stream_dir(dir)?;
495        fs::remove_dir_all(dir).map_err(|e| {
496            Error::Storage(format!(
497                "failed to remove stream directory {}: {e}",
498                dir.display()
499            ))
500        })
501    }
502
503    fn load_existing_streams(&self) -> Result<()> {
504        let entries = fs::read_dir(&self.root_dir).map_err(|e| {
505            Error::Storage(format!(
506                "failed to read storage directory {}: {e}",
507                self.root_dir.display()
508            ))
509        })?;
510
511        let mut streams_map = self.streams.write().expect("streams lock poisoned");
512        let mut restored_total = 0u64;
513
514        for dir_entry in entries {
515            let dir_entry = dir_entry
516                .map_err(|e| Error::Storage(format!("failed to inspect storage entry: {e}")))?;
517            let path = dir_entry.path();
518            if !path.is_dir() {
519                continue;
520            }
521            if self.validate_stream_dir(&path).is_err() {
522                continue;
523            }
524
525            let meta_path = Self::meta_path(&path);
526            if !meta_path.exists() {
527                continue;
528            }
529
530            let meta_payload = fs::read(&meta_path).map_err(|e| {
531                Error::Storage(format!(
532                    "failed to read stream metadata {}: {e}",
533                    meta_path.display()
534                ))
535            })?;
536            let meta: StreamMeta = serde_json::from_slice(&meta_payload).map_err(|e| {
537                Error::Storage(format!(
538                    "failed to parse stream metadata {}: {e}",
539                    meta_path.display()
540                ))
541            })?;
542
543            let mut file = self.open_stream_file(&path)?;
544            let (index, next_read_seq, next_byte_offset) = Self::rebuild_index(&mut file)?;
545            let total_bytes = next_byte_offset;
546            let file_len = file
547                .metadata()
548                .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
549                .len();
550
551            let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
552            let entry = StreamEntry {
553                config: meta.config,
554                index,
555                closed: meta.closed,
556                next_read_seq,
557                next_byte_offset,
558                total_bytes,
559                created_at: meta.created_at,
560                producers: meta.producers,
561                notify,
562                last_seq: meta.last_seq,
563                file,
564                file_len,
565                dir: path,
566            };
567
568            if super::is_stream_expired(&entry.config) {
569                self.remove_stream_dir(&entry.dir)?;
570                continue;
571            }
572
573            restored_total = restored_total.saturating_add(entry.total_bytes);
574            streams_map.insert(meta.name, Arc::new(RwLock::new(entry)));
575        }
576
577        let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
578        *total = restored_total;
579
580        Ok(())
581    }
582}
583
584impl Storage for FileStorage {
585    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
586        let mut streams = self.streams.write().expect("streams lock poisoned");
587
588        if let Some(stream_arc) = streams.get(name) {
589            let stream = stream_arc.read().expect("stream lock poisoned");
590
591            if super::is_stream_expired(&stream.config) {
592                let stream_bytes = stream.total_bytes;
593                let dir = stream.dir.clone();
594                drop(stream);
595                streams.remove(name);
596
597                let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
598                *total = total.saturating_sub(stream_bytes);
599                drop(total);
600
601                self.remove_stream_dir(&dir)?;
602            } else if stream.config == config {
603                return Ok(CreateStreamResult::AlreadyExists);
604            } else {
605                return Err(Error::ConfigMismatch);
606            }
607        }
608
609        let dir = self.stream_dir_for_name(name)?;
610        fs::create_dir_all(&dir).map_err(|e| {
611            Error::Storage(format!(
612                "failed to create stream directory {}: {e}",
613                dir.display()
614            ))
615        })?;
616
617        self.validate_stream_dir(&dir)?;
618        let file = self.open_stream_file(&dir)?;
619        let entry = StreamEntry::new(config, file, dir);
620
621        self.write_metadata_for(name, &entry)?;
622        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
623
624        Ok(CreateStreamResult::Created)
625    }
626
627    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
628        let stream_arc = self
629            .get_stream(name)
630            .ok_or_else(|| Error::NotFound(name.to_string()))?;
631
632        let mut stream = stream_arc.write().expect("stream lock poisoned");
633
634        if super::is_stream_expired(&stream.config) {
635            return Err(Error::StreamExpired);
636        }
637
638        if stream.closed {
639            return Err(Error::StreamClosed);
640        }
641
642        super::validate_content_type(&stream.config.content_type, content_type)?;
643
644        let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
645        self.append_records(name, &mut stream, &[data])?;
646        // Plain appends do not mutate persisted metadata fields (closed/seq/producers).
647        Ok(offset)
648    }
649
650    fn batch_append(
651        &self,
652        name: &str,
653        messages: Vec<Bytes>,
654        content_type: &str,
655        seq: Option<&str>,
656    ) -> Result<Offset> {
657        if messages.is_empty() {
658            return Err(Error::InvalidHeader {
659                header: "Content-Length".to_string(),
660                reason: "batch cannot be empty".to_string(),
661            });
662        }
663
664        let stream_arc = self
665            .get_stream(name)
666            .ok_or_else(|| Error::NotFound(name.to_string()))?;
667
668        let mut stream = stream_arc.write().expect("stream lock poisoned");
669
670        if super::is_stream_expired(&stream.config) {
671            return Err(Error::StreamExpired);
672        }
673
674        if stream.closed {
675            return Err(Error::StreamClosed);
676        }
677
678        super::validate_content_type(&stream.config.content_type, content_type)?;
679
680        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
681        self.append_records(name, &mut stream, &messages)?;
682        if let Some(new_seq) = pending_seq {
683            stream.last_seq = Some(new_seq);
684            // Stream-Seq changed, persist metadata best-effort.
685            if let Err(e) = self.write_metadata_for(name, &stream) {
686                warn!(%e, stream = name, "metadata persist failed after batch append");
687            }
688        }
689
690        Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
691    }
692
693    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
694        let stream_arc = self
695            .get_stream(name)
696            .ok_or_else(|| Error::NotFound(name.to_string()))?;
697
698        let stream = stream_arc.read().expect("stream lock poisoned");
699
700        if super::is_stream_expired(&stream.config) {
701            return Err(Error::StreamExpired);
702        }
703
704        if from_offset.is_now() {
705            return Ok(ReadResult {
706                messages: Vec::new(),
707                next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
708                at_tail: true,
709                closed: stream.closed,
710            });
711        }
712
713        let start_idx = if from_offset.is_start() {
714            0
715        } else {
716            match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
717                Ok(idx) | Err(idx) => idx,
718            }
719        };
720
721        let index_slice = &stream.index[start_idx..];
722        let messages = Self::read_messages(&stream.file, index_slice)?;
723        let at_tail = start_idx + messages.len() >= stream.index.len();
724
725        Ok(ReadResult {
726            messages,
727            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
728            at_tail,
729            closed: stream.closed,
730        })
731    }
732
733    fn delete(&self, name: &str) -> Result<()> {
734        let mut streams = self.streams.write().expect("streams lock poisoned");
735
736        if let Some(stream_arc) = streams.remove(name) {
737            let stream = stream_arc.read().expect("stream lock poisoned");
738            let dir = stream.dir.clone();
739            let stream_bytes = stream.total_bytes;
740            drop(stream);
741
742            let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
743            *total = total.saturating_sub(stream_bytes);
744            drop(total);
745
746            self.remove_stream_dir(&dir)?;
747            Ok(())
748        } else {
749            Err(Error::NotFound(name.to_string()))
750        }
751    }
752
753    fn head(&self, name: &str) -> Result<StreamMetadata> {
754        let stream_arc = self
755            .get_stream(name)
756            .ok_or_else(|| Error::NotFound(name.to_string()))?;
757
758        let stream = stream_arc.read().expect("stream lock poisoned");
759
760        if super::is_stream_expired(&stream.config) {
761            return Err(Error::StreamExpired);
762        }
763
764        Ok(StreamMetadata {
765            config: stream.config.clone(),
766            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
767            closed: stream.closed,
768            total_bytes: stream.total_bytes,
769            message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
770            created_at: stream.created_at,
771        })
772    }
773
774    fn close_stream(&self, name: &str) -> Result<()> {
775        let stream_arc = self
776            .get_stream(name)
777            .ok_or_else(|| Error::NotFound(name.to_string()))?;
778
779        let mut stream = stream_arc.write().expect("stream lock poisoned");
780
781        if super::is_stream_expired(&stream.config) {
782            return Err(Error::StreamExpired);
783        }
784
785        stream.closed = true;
786        self.write_metadata_for(name, &stream)?;
787
788        let _ = stream.notify.send(());
789        Ok(())
790    }
791
792    fn append_with_producer(
793        &self,
794        name: &str,
795        messages: Vec<Bytes>,
796        content_type: &str,
797        producer: &ProducerHeaders,
798        should_close: bool,
799        seq: Option<&str>,
800    ) -> Result<ProducerAppendResult> {
801        let stream_arc = self
802            .get_stream(name)
803            .ok_or_else(|| Error::NotFound(name.to_string()))?;
804
805        let mut stream = stream_arc.write().expect("stream lock poisoned");
806
807        if super::is_stream_expired(&stream.config) {
808            return Err(Error::StreamExpired);
809        }
810
811        super::cleanup_stale_producers(&mut stream.producers);
812
813        if !messages.is_empty() {
814            super::validate_content_type(&stream.config.content_type, content_type)?;
815        }
816
817        let now = Utc::now();
818
819        match super::check_producer(
820            stream.producers.get(producer.id.as_str()),
821            producer,
822            stream.closed,
823        )? {
824            ProducerCheck::Accept => {}
825            ProducerCheck::Duplicate { epoch, seq } => {
826                return Ok(ProducerAppendResult::Duplicate {
827                    epoch,
828                    seq,
829                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
830                    closed: stream.closed,
831                });
832            }
833        }
834
835        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
836        self.append_records(name, &mut stream, &messages)?;
837
838        if let Some(new_seq) = pending_seq {
839            stream.last_seq = Some(new_seq);
840        }
841        if should_close {
842            stream.closed = true;
843        }
844
845        stream.producers.insert(
846            producer.id.clone(),
847            ProducerState {
848                epoch: producer.epoch,
849                last_seq: producer.seq,
850                updated_at: now,
851            },
852        );
853
854        // Data is committed to the log; metadata write failure is non-fatal
855        if let Err(e) = self.write_metadata_for(name, &stream) {
856            warn!(%e, stream = name, "metadata persist failed after committed producer append");
857        }
858
859        Ok(ProducerAppendResult::Accepted {
860            epoch: producer.epoch,
861            seq: producer.seq,
862            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
863            closed: stream.closed,
864        })
865    }
866
867    fn create_stream_with_data(
868        &self,
869        name: &str,
870        config: StreamConfig,
871        messages: Vec<Bytes>,
872        should_close: bool,
873    ) -> Result<CreateWithDataResult> {
874        let mut streams = self.streams.write().expect("streams lock poisoned");
875
876        if let Some(stream_arc) = streams.get(name) {
877            let stream = stream_arc.read().expect("stream lock poisoned");
878
879            if super::is_stream_expired(&stream.config) {
880                let stream_bytes = stream.total_bytes;
881                let dir = stream.dir.clone();
882                drop(stream);
883                streams.remove(name);
884
885                let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
886                *total = total.saturating_sub(stream_bytes);
887                drop(total);
888
889                self.remove_stream_dir(&dir)?;
890            } else if stream.config == config {
891                return Ok(CreateWithDataResult {
892                    status: CreateStreamResult::AlreadyExists,
893                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
894                    closed: stream.closed,
895                });
896            } else {
897                return Err(Error::ConfigMismatch);
898            }
899        }
900
901        let dir = self.stream_dir_for_name(name)?;
902        fs::create_dir_all(&dir).map_err(|e| {
903            Error::Storage(format!(
904                "failed to create stream directory {}: {e}",
905                dir.display()
906            ))
907        })?;
908
909        self.validate_stream_dir(&dir)?;
910        let file = self.open_stream_file(&dir)?;
911        let mut entry = StreamEntry::new(config, file, dir);
912
913        if !messages.is_empty() {
914            self.append_records(name, &mut entry, &messages)?;
915        }
916        if should_close {
917            entry.closed = true;
918        }
919
920        let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
921        let closed = entry.closed;
922
923        self.write_metadata_for(name, &entry)?;
924        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
925
926        Ok(CreateWithDataResult {
927            status: CreateStreamResult::Created,
928            next_offset,
929            closed,
930        })
931    }
932
933    fn exists(&self, name: &str) -> bool {
934        let streams = self.streams.read().expect("streams lock poisoned");
935        if let Some(stream_arc) = streams.get(name) {
936            let stream = stream_arc.read().expect("stream lock poisoned");
937            !super::is_stream_expired(&stream.config)
938        } else {
939            false
940        }
941    }
942
943    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
944        let stream_arc = self.get_stream(name)?;
945        let stream = stream_arc.read().expect("stream lock poisoned");
946
947        if super::is_stream_expired(&stream.config) {
948            return None;
949        }
950
951        Some(stream.notify.subscribe())
952    }
953}
954
955#[cfg(test)]
956mod tests {
957    use super::*;
958    use base64::Engine;
959
960    fn test_storage_dir() -> PathBuf {
961        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
962        let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
963        let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
964        let pid = std::process::id();
965        std::env::temp_dir().join(format!("ds-file-storage-test-{stamp}-{pid}-{seq}"))
966    }
967
968    fn test_storage() -> FileStorage {
969        FileStorage::new(test_storage_dir(), 1024 * 1024, 100 * 1024, false)
970            .expect("file storage should initialize")
971    }
972
973    #[test]
974    fn test_delete_removes_files() {
975        let storage = test_storage();
976        let config = StreamConfig::new("text/plain".to_string());
977        storage.create_stream("test", config).unwrap();
978        storage
979            .append("test", Bytes::from("data"), "text/plain")
980            .unwrap();
981
982        let dir = storage.stream_dir_for_name("test").unwrap();
983        assert!(dir.exists(), "stream directory should exist before delete");
984
985        storage.delete("test").unwrap();
986        assert!(
987            !dir.exists(),
988            "stream directory should be removed after delete"
989        );
990    }
991
992    #[test]
993    fn test_restore_from_disk() {
994        let root = test_storage_dir();
995        let config = StreamConfig::new("text/plain".to_string());
996
997        {
998            let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false)
999                .expect("file storage should initialize");
1000            storage
1001                .create_stream("events", config.clone())
1002                .expect("stream should be created");
1003            storage
1004                .append("events", Bytes::from("event-1"), "text/plain")
1005                .expect("append should succeed");
1006            storage
1007                .append("events", Bytes::from("event-2"), "text/plain")
1008                .expect("append should succeed");
1009        }
1010
1011        let restored =
1012            FileStorage::new(root, 1024 * 1024, 100 * 1024, false).expect("restore should work");
1013
1014        let read = restored
1015            .read("events", &Offset::start())
1016            .expect("read should succeed");
1017
1018        assert_eq!(read.messages.len(), 2);
1019        assert_eq!(read.messages[0], Bytes::from("event-1"));
1020        assert_eq!(read.messages[1], Bytes::from("event-2"));
1021    }
1022
1023    #[test]
1024    fn test_restore_closed_stream_from_disk() {
1025        let root = test_storage_dir();
1026        let config = StreamConfig::new("text/plain".to_string());
1027
1028        {
1029            let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1030            storage.create_stream("s", config.clone()).unwrap();
1031            storage
1032                .append("s", Bytes::from("data"), "text/plain")
1033                .unwrap();
1034            storage.close_stream("s").unwrap();
1035        }
1036
1037        let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1038        let meta = restored.head("s").unwrap();
1039        assert!(meta.closed);
1040        assert_eq!(meta.message_count, 1);
1041
1042        assert!(matches!(
1043            restored.append("s", Bytes::from("more"), "text/plain"),
1044            Err(Error::StreamClosed)
1045        ));
1046    }
1047
1048    #[test]
1049    fn test_partial_record_truncation_on_recovery() {
1050        let root = test_storage_dir();
1051        let config = StreamConfig::new("text/plain".to_string());
1052
1053        {
1054            let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1055            storage.create_stream("s", config.clone()).unwrap();
1056            storage
1057                .append("s", Bytes::from("good"), "text/plain")
1058                .unwrap();
1059        }
1060
1061        // Append partial garbage to the data log (incomplete record header)
1062        let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode("s".as_bytes());
1063        let log_path = root.join(&encoded).join("data.log");
1064        let mut f = OpenOptions::new().append(true).open(&log_path).unwrap();
1065        // Write a 2-byte partial header (less than the 4-byte record header)
1066        f.write_all(&[0xFF, 0xFF]).unwrap();
1067        drop(f);
1068
1069        let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1070        let read = restored.read("s", &Offset::start()).unwrap();
1071        assert_eq!(read.messages.len(), 1);
1072        assert_eq!(read.messages[0], Bytes::from("good"));
1073    }
1074}