1use super::{
8 CreateStreamResult, CreateWithDataResult, ForkInfo, NOTIFY_CHANNEL_CAPACITY,
9 ProducerAppendResult, ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig,
10 StreamMetadata, StreamState,
11};
12use crate::protocol::error::{Error, Result};
13use crate::protocol::offset::Offset;
14use crate::protocol::producer::ProducerHeaders;
15use base64::Engine;
16use bytes::Bytes;
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::fs::{self, File, OpenOptions};
21use std::io::{self, Read, Seek, SeekFrom, Write};
22use std::path::{Path, PathBuf};
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25use tokio::sync::broadcast;
26use tracing::warn;
27
28const RECORD_HEADER_BYTES: usize = 4;
30const INITIAL_INDEX_CAPACITY: usize = 256;
31const INITIAL_PRODUCERS_CAPACITY: usize = 8;
32
33fn retry_on_eintr<T>(
41 mut op: impl FnMut() -> std::result::Result<T, io::Error>,
42) -> std::result::Result<T, io::Error> {
43 loop {
44 match op() {
45 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
46 result => return result,
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
52struct MessageIndex {
53 offset: Offset,
54 file_pos: u64,
55 byte_len: u64,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
59struct StreamMeta {
60 name: String,
61 config: StreamConfig,
62 closed: bool,
63 created_at: DateTime<Utc>,
64 #[serde(default)]
65 updated_at: Option<DateTime<Utc>>,
66 last_seq: Option<String>,
67 producers: HashMap<String, ProducerState>,
68 #[serde(default)]
69 fork_info: Option<ForkInfo>,
70 #[serde(default)]
71 ref_count: u32,
72 #[serde(default)]
73 state: StreamState,
74}
75
76struct StreamEntry {
77 config: StreamConfig,
78 index: Vec<MessageIndex>,
79 closed: bool,
80 next_read_seq: u64,
81 next_byte_offset: u64,
82 total_bytes: u64,
83 created_at: DateTime<Utc>,
84 updated_at: Option<DateTime<Utc>>,
85 producers: HashMap<String, ProducerState>,
86 notify: broadcast::Sender<()>,
87 last_seq: Option<String>,
88 file: File,
89 file_len: u64,
90 dir: PathBuf,
91 fork_info: Option<ForkInfo>,
92 ref_count: u32,
93 state: StreamState,
94}
95
96impl StreamEntry {
97 fn new(config: StreamConfig, file: File, dir: PathBuf) -> Self {
98 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
99 let file_len = file.metadata().map_or(0, |m| m.len());
100 Self {
101 config,
102 index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
103 closed: false,
104 next_read_seq: 0,
105 next_byte_offset: 0,
106 total_bytes: 0,
107 created_at: Utc::now(),
108 updated_at: None,
109 producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
110 notify,
111 last_seq: None,
112 file,
113 file_len,
114 dir,
115 fork_info: None,
116 ref_count: 0,
117 state: StreamState::Active,
118 }
119 }
120}
121
122#[allow(clippy::module_name_repetitions)]
134pub struct FileStorage {
135 streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
136 total_bytes: AtomicU64,
137 max_total_bytes: u64,
138 max_stream_bytes: u64,
139 root_dir: PathBuf,
140 root_dir_canonical: PathBuf,
141 sync_on_append: bool,
142}
143
144impl FileStorage {
145 pub fn new(
155 root_dir: impl Into<PathBuf>,
156 max_total_bytes: u64,
157 max_stream_bytes: u64,
158 sync_on_append: bool,
159 ) -> Result<Self> {
160 let root_dir = root_dir.into();
161 retry_on_eintr(|| fs::create_dir_all(&root_dir)).map_err(|e| {
162 Error::classify_io_failure(
163 "file",
164 "create storage directory",
165 format!(
166 "failed to create storage directory {}: {e}",
167 root_dir.display()
168 ),
169 &e,
170 )
171 })?;
172
173 let root_dir_canonical = fs::canonicalize(&root_dir).map_err(|e| {
174 Error::Storage(format!(
175 "failed to canonicalize storage directory {}: {e}",
176 root_dir.display()
177 ))
178 })?;
179
180 let storage = Self {
181 streams: RwLock::new(HashMap::new()),
182 total_bytes: AtomicU64::new(0),
183 max_total_bytes,
184 max_stream_bytes,
185 root_dir,
186 root_dir_canonical,
187 sync_on_append,
188 };
189 storage.load_existing_streams()?;
190 Ok(storage)
191 }
192
193 #[must_use]
195 pub fn total_bytes(&self) -> u64 {
196 self.total_bytes.load(Ordering::Acquire)
197 }
198
199 fn stream_dir_for_name(&self, name: &str) -> Result<PathBuf> {
205 let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode(name.as_bytes());
206
207 if encoded.contains("..") || encoded.contains('/') || encoded.contains('\\') {
212 return Err(Error::Storage(
213 "encoded stream name contains path traversal characters".to_string(),
214 ));
215 }
216 if !encoded
217 .chars()
218 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
219 {
220 return Err(Error::Storage(
221 "encoded stream directory contains invalid characters".to_string(),
222 ));
223 }
224
225 let dir = self.root_dir.join(&encoded);
226 if !dir.starts_with(&self.root_dir) {
227 return Err(Error::Storage(format!(
228 "stream directory escapes storage root: {encoded}"
229 )));
230 }
231 Ok(dir)
232 }
233
234 fn validate_stream_dir(&self, dir: &Path) -> Result<()> {
235 if !dir.starts_with(&self.root_dir) {
236 return Err(Error::Storage(format!(
237 "path escapes storage root: {}",
238 dir.display()
239 )));
240 }
241
242 let rel = dir.strip_prefix(&self.root_dir).map_err(|e| {
243 Error::Storage(format!(
244 "failed to validate storage path {}: {e}",
245 dir.display()
246 ))
247 })?;
248 if rel.components().count() != 1 {
249 return Err(Error::Storage(format!(
250 "invalid stream path depth: {}",
251 dir.display()
252 )));
253 }
254
255 if dir.exists() {
256 let metadata = fs::symlink_metadata(dir).map_err(|e| {
257 Error::Storage(format!(
258 "failed to stat stream directory {}: {e}",
259 dir.display()
260 ))
261 })?;
262 if metadata.file_type().is_symlink() {
263 return Err(Error::Storage(format!(
264 "stream directory cannot be a symlink: {}",
265 dir.display()
266 )));
267 }
268 if !metadata.is_dir() {
269 return Err(Error::Storage(format!(
270 "stream path is not a directory: {}",
271 dir.display()
272 )));
273 }
274
275 let canonical = fs::canonicalize(dir).map_err(|e| {
276 Error::Storage(format!(
277 "failed to canonicalize stream directory {}: {e}",
278 dir.display()
279 ))
280 })?;
281 if !canonical.starts_with(&self.root_dir_canonical) {
282 return Err(Error::Storage(format!(
283 "stream directory resolves outside storage root: {}",
284 dir.display()
285 )));
286 }
287 }
288
289 Ok(())
290 }
291
292 fn data_log_path(dir: &Path) -> PathBuf {
293 dir.join("data.log")
294 }
295
296 fn meta_path(dir: &Path) -> PathBuf {
297 dir.join("meta.json")
298 }
299
300 fn write_metadata_for(&self, name: &str, entry: &StreamEntry) -> Result<()> {
301 self.validate_stream_dir(&entry.dir)?;
302 let meta = StreamMeta {
303 name: name.to_string(),
304 config: entry.config.clone(),
305 closed: entry.closed,
306 created_at: entry.created_at,
307 updated_at: entry.updated_at,
308 last_seq: entry.last_seq.clone(),
309 producers: entry.producers.clone(),
310 fork_info: entry.fork_info.clone(),
311 ref_count: entry.ref_count,
312 state: entry.state,
313 };
314
315 let meta_path = Self::meta_path(&entry.dir);
316 let tmp_path = entry.dir.join("meta.json.tmp");
317 let payload = serde_json::to_vec(&meta)
318 .map_err(|e| Error::Storage(format!("failed to serialize stream metadata: {e}")))?;
319
320 retry_on_eintr(|| fs::write(&tmp_path, payload.as_slice())).map_err(|e| {
321 Error::classify_io_failure(
322 "file",
323 "write stream metadata temp file",
324 format!(
325 "failed to write metadata temp file {}: {e}",
326 tmp_path.display()
327 ),
328 &e,
329 )
330 })?;
331
332 retry_on_eintr(|| fs::rename(&tmp_path, &meta_path)).map_err(|e| {
333 Error::classify_io_failure(
334 "file",
335 "replace stream metadata",
336 format!(
337 "failed to atomically replace metadata {}: {e}",
338 meta_path.display()
339 ),
340 &e,
341 )
342 })?;
343
344 Ok(())
345 }
346
347 fn open_stream_file(&self, dir: &Path) -> Result<File> {
348 self.validate_stream_dir(dir)?;
349 let path = Self::data_log_path(dir);
350 retry_on_eintr(|| {
351 OpenOptions::new()
352 .create(true)
353 .append(true)
354 .read(true)
355 .open(&path)
356 })
357 .map_err(|e| {
358 Error::classify_io_failure(
359 "file",
360 "open stream log",
361 format!("failed to open stream log {}: {e}", path.display()),
362 &e,
363 )
364 })
365 }
366
367 fn rebuild_index(file: &mut File) -> Result<(Vec<MessageIndex>, u64, u64)> {
368 let mut index = Vec::new();
369 let mut next_read_seq = 0u64;
370 let mut next_byte_offset = 0u64;
371
372 let file_len = file
373 .metadata()
374 .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
375 .len();
376
377 let mut cursor = 0u64;
378 let mut header = [0u8; RECORD_HEADER_BYTES];
379
380 while cursor < file_len {
381 file.seek(SeekFrom::Start(cursor))
382 .map_err(|e| Error::Storage(format!("failed to seek stream log: {e}")))?;
383
384 let read = file
385 .read(&mut header)
386 .map_err(|e| Error::Storage(format!("failed to read stream log header: {e}")))?;
387
388 if read == 0 {
389 break;
390 }
391
392 if read < RECORD_HEADER_BYTES {
393 file.set_len(cursor).map_err(|e| {
394 Error::Storage(format!("failed to truncate partial record: {e}"))
395 })?;
396 break;
397 }
398
399 let record_len = u64::from(u32::from_le_bytes(header));
400 let record_end = cursor + RECORD_HEADER_BYTES as u64 + record_len;
401
402 if record_end > file_len {
403 file.set_len(cursor).map_err(|e| {
404 Error::Storage(format!("failed to truncate partial record: {e}"))
405 })?;
406 break;
407 }
408
409 index.push(MessageIndex {
410 offset: Offset::new(next_read_seq, next_byte_offset),
411 file_pos: cursor + RECORD_HEADER_BYTES as u64,
412 byte_len: record_len,
413 });
414
415 next_read_seq += 1;
416 next_byte_offset += record_len;
417 cursor = record_end;
418 }
419
420 file.seek(SeekFrom::End(0))
421 .map_err(|e| Error::Storage(format!("failed to seek end of stream log: {e}")))?;
422
423 Ok((index, next_read_seq, next_byte_offset))
424 }
425
426 fn rollback_total_bytes(&self, bytes: u64) {
427 self.total_bytes
428 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
429 Some(current.saturating_sub(bytes))
430 })
431 .ok();
432 }
433
434 fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
435 let streams = self.streams.read().expect("streams lock poisoned");
436 streams.get(name).map(Arc::clone)
437 }
438
439 fn hard_remove_stream(
440 &self,
441 streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
442 name: &str,
443 ) -> Result<Option<ForkInfo>> {
444 let Some(stream_arc) = streams.remove(name) else {
445 return Ok(None);
446 };
447 let stream = stream_arc.read().expect("stream lock poisoned");
448 let dir = stream.dir.clone();
449 let total_bytes = stream.total_bytes;
450 let fork_info = stream.fork_info.clone();
451 drop(stream);
452
453 self.remove_stream_dir(&dir)?;
454 self.rollback_total_bytes(total_bytes);
455 Ok(fork_info)
456 }
457
458 fn remove_for_recreate(
459 &self,
460 streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
461 name: &str,
462 ) -> Result<()> {
463 if let Some(fork_info) = self.hard_remove_stream(streams, name)? {
464 self.cascade_delete(streams, &fork_info.source_name);
465 }
466 Ok(())
467 }
468
469 fn append_records(
470 &self,
471 name: &str,
472 stream: &mut StreamEntry,
473 messages: &[Bytes],
474 ) -> Result<()> {
475 if messages.is_empty() {
476 return Ok(());
477 }
478
479 let mut total_batch_bytes = 0u64;
480 let mut payload_bytes = 0u64;
481 let mut sizes = Vec::with_capacity(messages.len());
482
483 for msg in messages {
484 let len = u64::try_from(msg.len()).unwrap_or(u64::MAX);
485 if len > u64::from(u32::MAX) {
486 return Err(Error::InvalidHeader {
487 header: "Content-Length".to_string(),
488 reason: "message too large for file record format".to_string(),
489 });
490 }
491 payload_bytes += len;
492 total_batch_bytes += len;
493 sizes.push(len);
494 }
495
496 if self
500 .total_bytes
501 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
502 current
503 .checked_add(total_batch_bytes)
504 .filter(|next| *next <= self.max_total_bytes)
505 })
506 .is_err()
507 {
508 return Err(Error::MemoryLimitExceeded);
509 }
510
511 if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
513 self.rollback_total_bytes(total_batch_bytes);
514 return Err(Error::StreamSizeLimitExceeded);
515 }
516
517 let wire_overhead = RECORD_HEADER_BYTES.saturating_mul(messages.len());
518 let mut write_buf =
519 Vec::with_capacity(usize::try_from(payload_bytes).unwrap_or(0) + wire_overhead);
520 for msg in messages {
521 let len = u32::try_from(msg.len()).unwrap_or(u32::MAX);
522 write_buf.extend_from_slice(&len.to_le_bytes());
523 write_buf.extend_from_slice(msg);
524 }
525
526 let before_len = stream.file_len;
527
528 if let Err(e) = retry_on_eintr(|| stream.file.write_all(&write_buf)) {
529 if let Ok(m) = stream.file.metadata() {
531 stream.file_len = m.len();
532 }
533 self.rollback_total_bytes(total_batch_bytes);
534 return Err(Error::Storage(format!(
535 "failed to append stream log for {name}: {e}"
536 )));
537 }
538
539 if self.sync_on_append
540 && let Err(e) = retry_on_eintr(|| stream.file.sync_data())
541 {
542 if let Ok(m) = stream.file.metadata() {
544 stream.file_len = m.len();
545 }
546 self.rollback_total_bytes(total_batch_bytes);
547 return Err(Error::classify_io_failure(
548 "file",
549 "sync stream log",
550 format!("failed to sync stream log for {name}: {e}"),
551 &e,
552 ));
553 }
554
555 let mut cursor = before_len;
556 for len in sizes {
557 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
558 stream.index.push(MessageIndex {
559 offset,
560 file_pos: cursor + RECORD_HEADER_BYTES as u64,
561 byte_len: len,
562 });
563 stream.next_read_seq += 1;
564 stream.next_byte_offset += len;
565 stream.total_bytes += len;
566 cursor += RECORD_HEADER_BYTES as u64 + len;
567 }
568 stream.file_len = cursor;
569
570 let _ = stream.notify.send(());
571 Ok(())
572 }
573
574 fn read_messages(file: &File, index_slice: &[MessageIndex]) -> Result<Vec<Bytes>> {
575 if index_slice.is_empty() {
576 return Ok(Vec::new());
577 }
578
579 let first_pos = index_slice[0].file_pos;
580 let last = index_slice
581 .last()
582 .expect("index_slice non-empty due early return");
583 let read_end = last.file_pos + last.byte_len;
584 let read_len = read_end.saturating_sub(first_pos);
585
586 let mut raw = vec![0u8; usize::try_from(read_len).unwrap_or(usize::MAX)];
590 #[cfg(unix)]
591 {
592 use std::os::unix::fs::FileExt;
593 file.read_exact_at(&mut raw, first_pos)
594 .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
595 }
596 #[cfg(windows)]
597 {
598 use std::os::windows::fs::FileExt;
599 file.seek_read(&mut raw, first_pos)
600 .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
601 }
602 #[cfg(not(any(unix, windows)))]
603 {
604 let mut reader = file
605 .try_clone()
606 .map_err(|e| Error::Storage(format!("failed to clone stream file handle: {e}")))?;
607 reader
608 .seek(SeekFrom::Start(first_pos))
609 .map_err(|e| Error::Storage(format!("failed to seek message data: {e}")))?;
610 reader
611 .read_exact(&mut raw)
612 .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
613 }
614
615 let shared = Bytes::from(raw);
616 let mut messages = Vec::with_capacity(index_slice.len());
617 for idx in index_slice {
618 let rel_start =
619 usize::try_from(idx.file_pos.saturating_sub(first_pos)).unwrap_or(usize::MAX);
620 let rel_end = rel_start + usize::try_from(idx.byte_len).unwrap_or(usize::MAX);
621 messages.push(shared.slice(rel_start..rel_end));
622 }
623
624 Ok(messages)
625 }
626
627 fn remove_stream_dir(&self, dir: &Path) -> Result<()> {
628 self.validate_stream_dir(dir)?;
629 retry_on_eintr(|| fs::remove_dir_all(dir)).map_err(|e| {
630 Error::classify_io_failure(
631 "file",
632 "remove stream directory",
633 format!("failed to remove stream directory {}: {e}", dir.display()),
634 &e,
635 )
636 })
637 }
638
639 fn cascade_delete(
644 &self,
645 streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
646 parent_name: &str,
647 ) {
648 let mut current_parent = parent_name.to_string();
649 loop {
650 let Some(parent_arc) = streams.get(¤t_parent) else {
651 break;
652 };
653 let parent_arc = parent_arc.clone();
654 let mut parent = parent_arc.write().expect("stream lock poisoned");
655 parent.ref_count = parent.ref_count.saturating_sub(1);
656
657 if parent.state == StreamState::Tombstone && parent.ref_count == 0 {
658 let fi = parent.fork_info.clone();
660 let dir = parent.dir.clone();
661 let total = parent.total_bytes;
662 drop(parent);
664 streams.remove(¤t_parent);
665
666 if let Err(e) = self.remove_stream_dir(&dir) {
667 warn!(%e, stream = current_parent.as_str(), "failed to remove tombstoned ancestor directory during cascade delete");
668 } else {
669 self.rollback_total_bytes(total);
670 }
671
672 if let Some(fi) = fi {
674 current_parent = fi.source_name;
675 } else {
676 break;
677 }
678 } else {
679 if let Err(e) = self.write_metadata_for(¤t_parent, &parent) {
681 warn!(%e, stream = current_parent.as_str(), "failed to persist parent ref_count during cascade delete");
682 }
683 break;
684 }
685 }
686 }
687
688 fn read_source_chain(
695 &self,
696 source_name: &str,
697 from_offset: &Offset,
698 up_to: &Offset,
699 ) -> Result<Vec<Bytes>> {
700 let streams = self.streams.read().expect("streams lock poisoned");
701
702 let plan = super::fork::build_read_plan(source_name, |n| {
704 streams.get(n).map(|arc| {
705 let s = arc.read().expect("stream lock poisoned");
706 s.fork_info.clone()
707 })
708 });
709
710 let mut all_messages: Vec<Bytes> = Vec::new();
711
712 for (i, segment) in plan.iter().enumerate() {
713 let Some(seg_arc) = streams.get(&segment.name) else {
714 continue;
715 };
716 let seg_stream = seg_arc.read().expect("stream lock poisoned");
717
718 let effective_up_to = if i == plan.len() - 1 {
720 Some(up_to)
722 } else {
723 segment.read_up_to.as_ref()
725 };
726
727 let effective_from = if i == 0 {
729 from_offset
730 } else {
731 &Offset::start()
732 };
733
734 let start_idx = if effective_from.is_start() {
736 0
737 } else {
738 match seg_stream
739 .index
740 .binary_search_by(|m| m.offset.cmp(effective_from))
741 {
742 Ok(idx) | Err(idx) => idx,
743 }
744 };
745
746 let end_idx = if let Some(bound) = effective_up_to {
748 match seg_stream.index.binary_search_by(|m| m.offset.cmp(bound)) {
749 Ok(idx) | Err(idx) => idx,
750 }
751 } else {
752 seg_stream.index.len()
753 };
754
755 if start_idx < end_idx {
756 let index_slice = &seg_stream.index[start_idx..end_idx];
757 let msgs = Self::read_messages(&seg_stream.file, index_slice)?;
758 all_messages.extend(msgs);
759 }
760 }
761
762 Ok(all_messages)
763 }
764
765 fn load_existing_streams(&self) -> Result<()> {
766 let entries = fs::read_dir(&self.root_dir).map_err(|e| {
767 Error::Storage(format!(
768 "failed to read storage directory {}: {e}",
769 self.root_dir.display()
770 ))
771 })?;
772
773 let mut streams_map = self.streams.write().expect("streams lock poisoned");
774 let mut restored_total = 0u64;
775
776 for dir_entry in entries {
777 let dir_entry = dir_entry
778 .map_err(|e| Error::Storage(format!("failed to inspect storage entry: {e}")))?;
779 let path = dir_entry.path();
780 if !path.is_dir() {
781 continue;
782 }
783 if self.validate_stream_dir(&path).is_err() {
784 continue;
785 }
786
787 let meta_path = Self::meta_path(&path);
788 if !meta_path.exists() {
789 continue;
790 }
791
792 let meta_payload = fs::read(&meta_path).map_err(|e| {
793 Error::Storage(format!(
794 "failed to read stream metadata {}: {e}",
795 meta_path.display()
796 ))
797 })?;
798 let meta: StreamMeta = serde_json::from_slice(&meta_payload).map_err(|e| {
799 Error::Storage(format!(
800 "failed to parse stream metadata {}: {e}",
801 meta_path.display()
802 ))
803 })?;
804
805 let mut file = self.open_stream_file(&path)?;
806 let (index, next_read_seq, next_byte_offset) = Self::rebuild_index(&mut file)?;
807 let total_bytes = next_byte_offset;
808 let file_len = file
809 .metadata()
810 .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
811 .len();
812
813 let log_msg_count = index.len() as u64;
817 let meta_has_data =
818 meta.closed || !meta.producers.is_empty() || meta.last_seq.is_some();
819 if log_msg_count == 0 && meta_has_data {
820 warn!(
821 stream = meta.name,
822 "meta.json indicates activity but data.log has 0 messages; \
823 data.log is authoritative"
824 );
825 }
826
827 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
828 let mut entry = StreamEntry {
829 config: meta.config,
830 index,
831 closed: meta.closed,
832 next_read_seq,
833 next_byte_offset,
834 total_bytes,
835 created_at: meta.created_at,
836 updated_at: meta.updated_at,
837 producers: meta.producers,
838 notify,
839 last_seq: meta.last_seq,
840 file,
841 file_len,
842 dir: path,
843 fork_info: meta.fork_info,
844 ref_count: meta.ref_count,
845 state: meta.state,
846 };
847
848 if super::is_stream_expired(&entry.config) {
849 self.remove_stream_dir(&entry.dir)?;
850 continue;
851 }
852
853 super::cleanup_stale_producers(&mut entry.producers);
854
855 if let Err(e) = self.write_metadata_for(&meta.name, &entry) {
859 warn!(
860 %e,
861 stream = meta.name,
862 "failed to re-persist reconciled metadata during recovery"
863 );
864 }
865 restored_total = restored_total.saturating_add(entry.total_bytes);
866 streams_map.insert(meta.name, Arc::new(RwLock::new(entry)));
867 }
868
869 self.total_bytes.store(restored_total, Ordering::Release);
870
871 Ok(())
872 }
873
874 fn read_local_file_messages(
876 stream: &StreamEntry,
877 from_offset: &Offset,
878 next_offset: Offset,
879 ) -> Result<ReadResult> {
880 let start_idx = if from_offset.is_start() {
881 0
882 } else {
883 match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
884 Ok(idx) | Err(idx) => idx,
885 }
886 };
887
888 let index_slice = &stream.index[start_idx..];
889 let messages = Self::read_messages(&stream.file, index_slice)?;
890 let at_tail = start_idx + messages.len() >= stream.index.len();
891
892 Ok(ReadResult {
893 messages,
894 next_offset,
895 at_tail,
896 closed: stream.closed,
897 })
898 }
899
900 fn read_fork_local_messages(
902 stream: &StreamEntry,
903 from_offset: &Offset,
904 fork_offset: &Offset,
905 ) -> Result<Vec<Bytes>> {
906 if from_offset.is_start() || *from_offset <= *fork_offset {
907 Self::read_messages(&stream.file, &stream.index)
908 } else {
909 let start_idx = match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
910 Ok(idx) | Err(idx) => idx,
911 };
912 Self::read_messages(&stream.file, &stream.index[start_idx..])
913 }
914 }
915
916 fn assemble_fork_read(
918 &self,
919 from_offset: &Offset,
920 fi: &super::ForkInfo,
921 fork_local_messages: Vec<Bytes>,
922 next_offset: Offset,
923 closed: bool,
924 ) -> Result<ReadResult> {
925 let mut all_messages: Vec<Bytes> = Vec::new();
926 if from_offset.is_start() || *from_offset < fi.fork_offset {
927 let source_messages =
928 self.read_source_chain(&fi.source_name, from_offset, &fi.fork_offset)?;
929 all_messages.extend(source_messages);
930 }
931 all_messages.extend(fork_local_messages);
932
933 Ok(ReadResult {
934 messages: all_messages,
935 next_offset,
936 at_tail: true,
937 closed,
938 })
939 }
940}
941
942impl Storage for FileStorage {
943 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
944 let mut streams = self.streams.write().expect("streams lock poisoned");
945
946 if let Some(stream_arc) = streams.get(name) {
947 let stream = stream_arc.read().expect("stream lock poisoned");
948 match super::fork::evaluate_root_create(
949 name,
950 &stream.config,
951 stream.state,
952 stream.ref_count,
953 &config,
954 ) {
955 super::fork::ExistingCreateDisposition::RemoveExpired => {
956 drop(stream);
957 self.remove_for_recreate(&mut streams, name)?;
958 }
959 super::fork::ExistingCreateDisposition::AlreadyExists => {
960 return Ok(CreateStreamResult::AlreadyExists);
961 }
962 super::fork::ExistingCreateDisposition::Conflict(err) => {
963 return Err(err);
964 }
965 }
966 }
967
968 let dir = self.stream_dir_for_name(name)?;
969 retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
970 Error::classify_io_failure(
971 "file",
972 "create stream directory",
973 format!("failed to create stream directory {}: {e}", dir.display()),
974 &e,
975 )
976 })?;
977
978 self.validate_stream_dir(&dir)?;
979 let file = self.open_stream_file(&dir)?;
980 let entry = StreamEntry::new(config, file, dir.clone());
981
982 if let Err(e) = self.write_metadata_for(name, &entry) {
983 if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
984 warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
985 }
986 return Err(e);
987 }
988 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
989
990 Ok(CreateStreamResult::Created)
991 }
992
993 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
994 let stream_arc = self
995 .get_stream(name)
996 .ok_or_else(|| Error::NotFound(name.to_string()))?;
997
998 let mut stream = stream_arc.write().expect("stream lock poisoned");
999
1000 super::fork::check_stream_access(&stream.config, stream.state, name)?;
1001
1002 if stream.closed {
1003 return Err(Error::StreamClosed);
1004 }
1005
1006 super::validate_content_type(&stream.config.content_type, content_type)?;
1007
1008 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
1009 self.append_records(name, &mut stream, &[data])?;
1010 stream.updated_at = Some(Utc::now());
1011 if super::fork::renew_ttl(&mut stream.config) {
1012 self.write_metadata_for(name, &stream)?;
1013 }
1014 Ok(offset)
1015 }
1016
1017 fn batch_append(
1018 &self,
1019 name: &str,
1020 messages: Vec<Bytes>,
1021 content_type: &str,
1022 seq: Option<&str>,
1023 ) -> Result<Offset> {
1024 if messages.is_empty() {
1025 return Err(Error::InvalidHeader {
1026 header: "Content-Length".to_string(),
1027 reason: "batch cannot be empty".to_string(),
1028 });
1029 }
1030
1031 let stream_arc = self
1032 .get_stream(name)
1033 .ok_or_else(|| Error::NotFound(name.to_string()))?;
1034
1035 let mut stream = stream_arc.write().expect("stream lock poisoned");
1036
1037 super::fork::check_stream_access(&stream.config, stream.state, name)?;
1038
1039 if stream.closed {
1040 return Err(Error::StreamClosed);
1041 }
1042
1043 super::validate_content_type(&stream.config.content_type, content_type)?;
1044
1045 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
1046 let seq_changed = pending_seq.is_some();
1047 self.append_records(name, &mut stream, &messages)?;
1048 stream.updated_at = Some(Utc::now());
1049 let ttl_renewed = super::fork::renew_ttl(&mut stream.config);
1050 if let Some(new_seq) = pending_seq {
1051 stream.last_seq = Some(new_seq);
1052 }
1053 if ttl_renewed || seq_changed {
1054 self.write_metadata_for(name, &stream)?;
1055 }
1056
1057 Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
1058 }
1059
1060 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
1061 let stream_arc = self
1062 .get_stream(name)
1063 .ok_or_else(|| Error::NotFound(name.to_string()))?;
1064
1065 let needs_ttl_renewal = {
1066 let stream = stream_arc.read().expect("stream lock poisoned");
1067 super::fork::check_stream_access(&stream.config, stream.state, name)?;
1068 stream.config.ttl_seconds.is_some()
1069 };
1070
1071 if !needs_ttl_renewal {
1072 let stream = stream_arc.read().expect("stream lock poisoned");
1073 let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
1074
1075 if from_offset.is_now() {
1076 return Ok(ReadResult {
1077 messages: Vec::new(),
1078 next_offset,
1079 at_tail: true,
1080 closed: stream.closed,
1081 });
1082 }
1083
1084 if stream.fork_info.is_none() {
1085 return Self::read_local_file_messages(&stream, from_offset, next_offset);
1086 }
1087
1088 let fi = stream.fork_info.clone().expect("checked above");
1089 let closed = stream.closed;
1090 let fork_local_messages =
1091 Self::read_fork_local_messages(&stream, from_offset, &fi.fork_offset)?;
1092 drop(stream);
1093
1094 return self.assemble_fork_read(
1095 from_offset,
1096 &fi,
1097 fork_local_messages,
1098 next_offset,
1099 closed,
1100 );
1101 }
1102
1103 let mut stream = stream_arc.write().expect("stream lock poisoned");
1104 super::fork::check_stream_access(&stream.config, stream.state, name)?;
1105
1106 let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
1107 if from_offset.is_now() {
1108 super::fork::renew_ttl(&mut stream.config);
1109 self.write_metadata_for(name, &stream)?;
1110 return Ok(ReadResult {
1111 messages: Vec::new(),
1112 next_offset,
1113 at_tail: true,
1114 closed: stream.closed,
1115 });
1116 }
1117
1118 if stream.fork_info.is_none() {
1119 let result = Self::read_local_file_messages(&stream, from_offset, next_offset)?;
1120 super::fork::renew_ttl(&mut stream.config);
1121 self.write_metadata_for(name, &stream)?;
1122 return Ok(result);
1123 }
1124
1125 let fi = stream.fork_info.clone().expect("checked above");
1126 let closed = stream.closed;
1127 let fork_local_messages =
1128 Self::read_fork_local_messages(&stream, from_offset, &fi.fork_offset)?;
1129 super::fork::renew_ttl(&mut stream.config);
1130 self.write_metadata_for(name, &stream)?;
1131 drop(stream);
1132
1133 self.assemble_fork_read(from_offset, &fi, fork_local_messages, next_offset, closed)
1134 }
1135
1136 fn delete(&self, name: &str) -> Result<()> {
1137 let mut streams = self.streams.write().expect("streams lock poisoned");
1138
1139 let stream_arc = streams
1140 .get(name)
1141 .ok_or_else(|| Error::NotFound(name.to_string()))?
1142 .clone();
1143
1144 {
1145 let stream = stream_arc.read().expect("stream lock poisoned");
1146
1147 match super::fork::evaluate_delete(name, stream.state, stream.ref_count)? {
1148 super::fork::DeleteDisposition::Tombstone => {
1149 drop(stream);
1150 let mut stream_w = stream_arc.write().expect("stream lock poisoned");
1151 stream_w.state = StreamState::Tombstone;
1152 self.write_metadata_for(name, &stream_w)?;
1153 return Ok(());
1154 }
1155 super::fork::DeleteDisposition::HardDelete => {}
1156 }
1157 }
1158
1159 let fork_info = self.hard_remove_stream(&mut streams, name)?;
1160
1161 if let Some(fi) = fork_info {
1162 self.cascade_delete(&mut streams, &fi.source_name);
1163 }
1164
1165 Ok(())
1166 }
1167
1168 fn head(&self, name: &str) -> Result<StreamMetadata> {
1169 let stream_arc = self
1170 .get_stream(name)
1171 .ok_or_else(|| Error::NotFound(name.to_string()))?;
1172
1173 let stream = stream_arc.read().expect("stream lock poisoned");
1174
1175 super::fork::check_stream_access(&stream.config, stream.state, name)?;
1176
1177 Ok(StreamMetadata {
1178 config: stream.config.clone(),
1179 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1180 closed: stream.closed,
1181 total_bytes: stream.total_bytes,
1182 message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
1183 created_at: stream.created_at,
1184 updated_at: stream.updated_at,
1185 })
1186 }
1187
1188 fn close_stream(&self, name: &str) -> Result<()> {
1189 let stream_arc = self
1190 .get_stream(name)
1191 .ok_or_else(|| Error::NotFound(name.to_string()))?;
1192
1193 let mut stream = stream_arc.write().expect("stream lock poisoned");
1194
1195 super::fork::check_stream_access(&stream.config, stream.state, name)?;
1196
1197 stream.closed = true;
1198 stream.updated_at = Some(Utc::now());
1199 super::fork::renew_ttl(&mut stream.config);
1200 self.write_metadata_for(name, &stream)?;
1201
1202 let _ = stream.notify.send(());
1203 Ok(())
1204 }
1205
1206 fn append_with_producer(
1207 &self,
1208 name: &str,
1209 messages: Vec<Bytes>,
1210 content_type: &str,
1211 producer: &ProducerHeaders,
1212 should_close: bool,
1213 seq: Option<&str>,
1214 ) -> Result<ProducerAppendResult> {
1215 let stream_arc = self
1216 .get_stream(name)
1217 .ok_or_else(|| Error::NotFound(name.to_string()))?;
1218
1219 let mut stream = stream_arc.write().expect("stream lock poisoned");
1220
1221 super::fork::check_stream_access(&stream.config, stream.state, name)?;
1222
1223 super::cleanup_stale_producers(&mut stream.producers);
1224
1225 if !messages.is_empty() {
1226 super::validate_content_type(&stream.config.content_type, content_type)?;
1227 }
1228
1229 let now = Utc::now();
1230
1231 match super::check_producer(
1232 stream.producers.get(producer.id.as_str()),
1233 producer,
1234 stream.closed,
1235 )? {
1236 ProducerCheck::Accept => {}
1237 ProducerCheck::Duplicate { epoch, seq } => {
1238 return Ok(ProducerAppendResult::Duplicate {
1239 epoch,
1240 seq,
1241 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1242 closed: stream.closed,
1243 });
1244 }
1245 }
1246
1247 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
1248 self.append_records(name, &mut stream, &messages)?;
1249
1250 if let Some(new_seq) = pending_seq {
1251 stream.last_seq = Some(new_seq);
1252 }
1253 if should_close {
1254 stream.closed = true;
1255 }
1256
1257 stream.producers.insert(
1258 producer.id.clone(),
1259 ProducerState {
1260 epoch: producer.epoch,
1261 last_seq: producer.seq,
1262 updated_at: now,
1263 },
1264 );
1265 stream.updated_at = Some(now);
1266 super::fork::renew_ttl(&mut stream.config);
1267
1268 self.write_metadata_for(name, &stream)?;
1269
1270 Ok(ProducerAppendResult::Accepted {
1271 epoch: producer.epoch,
1272 seq: producer.seq,
1273 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1274 closed: stream.closed,
1275 })
1276 }
1277
1278 fn create_stream_with_data(
1279 &self,
1280 name: &str,
1281 config: StreamConfig,
1282 messages: Vec<Bytes>,
1283 should_close: bool,
1284 ) -> Result<CreateWithDataResult> {
1285 let mut streams = self.streams.write().expect("streams lock poisoned");
1286
1287 if let Some(stream_arc) = streams.get(name) {
1288 let stream = stream_arc.read().expect("stream lock poisoned");
1289 match super::fork::evaluate_root_create(
1290 name,
1291 &stream.config,
1292 stream.state,
1293 stream.ref_count,
1294 &config,
1295 ) {
1296 super::fork::ExistingCreateDisposition::RemoveExpired => {
1297 drop(stream);
1298 self.remove_for_recreate(&mut streams, name)?;
1299 }
1300 super::fork::ExistingCreateDisposition::AlreadyExists => {
1301 return Ok(CreateWithDataResult {
1302 status: CreateStreamResult::AlreadyExists,
1303 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1304 closed: stream.closed,
1305 });
1306 }
1307 super::fork::ExistingCreateDisposition::Conflict(err) => {
1308 return Err(err);
1309 }
1310 }
1311 }
1312
1313 let dir = self.stream_dir_for_name(name)?;
1314 retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
1315 Error::classify_io_failure(
1316 "file",
1317 "create stream directory",
1318 format!("failed to create stream directory {}: {e}", dir.display()),
1319 &e,
1320 )
1321 })?;
1322
1323 self.validate_stream_dir(&dir)?;
1324 let file = self.open_stream_file(&dir)?;
1325 let mut entry = StreamEntry::new(config, file, dir.clone());
1326
1327 if !messages.is_empty()
1328 && let Err(e) = self.append_records(name, &mut entry, &messages)
1329 {
1330 if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1331 warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1332 }
1333 return Err(e);
1334 }
1335 if should_close {
1336 entry.closed = true;
1337 }
1338
1339 let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
1340 let closed = entry.closed;
1341
1342 if let Err(e) = self.write_metadata_for(name, &entry) {
1343 if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1344 warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1345 }
1346 return Err(e);
1347 }
1348 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
1349
1350 Ok(CreateWithDataResult {
1351 status: CreateStreamResult::Created,
1352 next_offset,
1353 closed,
1354 })
1355 }
1356
1357 fn exists(&self, name: &str) -> bool {
1358 let streams = self.streams.read().expect("streams lock poisoned");
1359 if let Some(stream_arc) = streams.get(name) {
1360 let stream = stream_arc.read().expect("stream lock poisoned");
1361 !super::is_stream_expired(&stream.config) && stream.state == StreamState::Active
1362 } else {
1363 false
1364 }
1365 }
1366
1367 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1368 let stream_arc = self.get_stream(name)?;
1369 let stream = stream_arc.read().expect("stream lock poisoned");
1370
1371 if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
1372 return None;
1373 }
1374
1375 Some(stream.notify.subscribe())
1376 }
1377
1378 fn cleanup_expired_streams(&self) -> usize {
1379 let mut streams = self.streams.write().expect("streams lock poisoned");
1380 let mut expired = Vec::new();
1381
1382 for (name, stream_arc) in streams.iter() {
1383 let stream = stream_arc.read().expect("stream lock poisoned");
1384 if super::is_stream_expired(&stream.config) {
1385 expired.push((
1386 name.clone(),
1387 stream.total_bytes,
1388 stream.dir.clone(),
1389 stream.ref_count,
1390 stream.fork_info.clone(),
1391 ));
1392 }
1393 }
1394
1395 let count = expired.len();
1396 for (name, _bytes, _dir, ref_count, _fork_info) in expired {
1397 match super::fork::evaluate_expired_cleanup(ref_count) {
1398 super::fork::DeleteDisposition::Tombstone => {
1399 if let Some(arc) = streams.get(&name) {
1400 let mut stream = arc.write().expect("stream lock poisoned");
1401 stream.state = StreamState::Tombstone;
1402 if let Err(e) = self.write_metadata_for(&name, &stream) {
1403 warn!(%e, stream = name.as_str(), "failed to persist tombstone for expired stream");
1404 }
1405 }
1406 }
1407 super::fork::DeleteDisposition::HardDelete => {
1408 if let Err(e) = self.remove_for_recreate(&mut streams, &name) {
1409 warn!(%e, stream = name.as_str(), "failed to remove expired stream during cleanup");
1410 }
1411 }
1412 }
1413 }
1414
1415 count
1416 }
1417
1418 fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>> {
1419 let streams = self.streams.read().expect("streams lock poisoned");
1420 let mut result = Vec::new();
1421 for (name, stream_arc) in streams.iter() {
1422 let stream = stream_arc.read().expect("stream lock poisoned");
1423 if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
1424 continue;
1425 }
1426 result.push((
1427 name.clone(),
1428 StreamMetadata {
1429 config: stream.config.clone(),
1430 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1431 closed: stream.closed,
1432 total_bytes: stream.total_bytes,
1433 message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
1434 created_at: stream.created_at,
1435 updated_at: stream.updated_at,
1436 },
1437 ));
1438 }
1439 result.sort_by(|a, b| a.0.cmp(&b.0));
1440 Ok(result)
1441 }
1442
1443 fn create_fork(
1444 &self,
1445 name: &str,
1446 source_name: &str,
1447 fork_offset: Option<&Offset>,
1448 config: StreamConfig,
1449 ) -> Result<CreateStreamResult> {
1450 let mut streams = self.streams.write().expect("streams lock poisoned");
1451
1452 let source_arc = streams
1454 .get(source_name)
1455 .ok_or_else(|| Error::NotFound(source_name.to_string()))?
1456 .clone();
1457
1458 let source = source_arc.read().expect("stream lock poisoned");
1459
1460 super::fork::check_fork_source_access(&source.config, source.state, source_name)?;
1461
1462 let source_next_offset = Offset::new(source.next_read_seq, source.next_byte_offset);
1464 let resolved_offset = super::fork::resolve_fork_offset(fork_offset, &source_next_offset)?;
1465
1466 if !config
1468 .content_type
1469 .eq_ignore_ascii_case(&source.config.content_type)
1470 {
1471 return Err(Error::ContentTypeMismatch {
1472 expected: source.config.content_type.clone(),
1473 actual: config.content_type.clone(),
1474 });
1475 }
1476
1477 let fork_spec = super::fork::build_fork_create_spec(
1478 source_name,
1479 &source.config,
1480 &config,
1481 resolved_offset.clone(),
1482 );
1483
1484 drop(source);
1485
1486 if let Some(existing_arc) = streams.get(name) {
1487 let existing = existing_arc.read().expect("stream lock poisoned");
1488 match super::fork::evaluate_fork_create(
1489 name,
1490 &existing.config,
1491 existing.fork_info.as_ref(),
1492 existing.state,
1493 existing.ref_count,
1494 &fork_spec,
1495 ) {
1496 super::fork::ExistingCreateDisposition::RemoveExpired => {
1497 drop(existing);
1498 self.remove_for_recreate(&mut streams, name)?;
1499 }
1500 super::fork::ExistingCreateDisposition::AlreadyExists => {
1501 return Ok(CreateStreamResult::AlreadyExists);
1502 }
1503 super::fork::ExistingCreateDisposition::Conflict(err) => {
1504 return Err(err);
1505 }
1506 }
1507 }
1508
1509 let (fork_read_seq, fork_byte_offset) =
1511 resolved_offset.parse_components().unwrap_or((0, 0));
1512
1513 let dir = self.stream_dir_for_name(name)?;
1515 retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
1516 Error::classify_io_failure(
1517 "file",
1518 "create fork directory",
1519 format!("failed to create fork directory {}: {e}", dir.display()),
1520 &e,
1521 )
1522 })?;
1523 self.validate_stream_dir(&dir)?;
1524 let file = self.open_stream_file(&dir)?;
1525
1526 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
1528 let file_len = file.metadata().map_or(0, |m| m.len());
1529 let entry = StreamEntry {
1530 config: fork_spec.config,
1531 index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
1532 closed: config.created_closed,
1533 next_read_seq: fork_read_seq,
1534 next_byte_offset: fork_byte_offset,
1535 total_bytes: 0,
1536 created_at: Utc::now(),
1537 updated_at: None,
1538 producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
1539 notify,
1540 last_seq: None,
1541 file,
1542 file_len,
1543 dir: dir.clone(),
1544 fork_info: Some(ForkInfo {
1545 source_name: fork_spec.source_name,
1546 fork_offset: resolved_offset,
1547 }),
1548 ref_count: 0,
1549 state: StreamState::Active,
1550 };
1551
1552 if let Err(e) = self.write_metadata_for(name, &entry) {
1554 if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1555 warn!(%cleanup_err, stream = name, "failed to clean up orphaned fork directory");
1556 }
1557 return Err(e);
1558 }
1559
1560 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
1561
1562 if let Some(source_arc) = streams.get(source_name) {
1564 let mut source = source_arc.write().expect("stream lock poisoned");
1565 source.ref_count += 1;
1566 if let Err(e) = self.write_metadata_for(source_name, &source) {
1567 warn!(%e, stream = source_name, "failed to persist source ref_count after fork creation");
1568 }
1569 }
1570
1571 Ok(CreateStreamResult::Created)
1572 }
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577 use super::*;
1578 use base64::Engine;
1579
1580 fn test_storage_dir() -> PathBuf {
1581 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1582 let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1583 let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1584 let pid = std::process::id();
1585 std::env::temp_dir().join(format!("ds-file-storage-test-{stamp}-{pid}-{seq}"))
1586 }
1587
1588 fn test_storage() -> FileStorage {
1589 FileStorage::new(test_storage_dir(), 1024 * 1024, 100 * 1024, false)
1590 .expect("file storage should initialize")
1591 }
1592
1593 #[test]
1594 fn test_delete_removes_files() {
1595 let storage = test_storage();
1596 let config = StreamConfig::new("text/plain".to_string());
1597 storage.create_stream("test", config).unwrap();
1598 storage
1599 .append("test", Bytes::from("data"), "text/plain")
1600 .unwrap();
1601
1602 let dir = storage.stream_dir_for_name("test").unwrap();
1603 assert!(dir.exists(), "stream directory should exist before delete");
1604
1605 storage.delete("test").unwrap();
1606 assert!(
1607 !dir.exists(),
1608 "stream directory should be removed after delete"
1609 );
1610 }
1611
1612 #[test]
1616 fn test_partial_record_truncation_on_recovery() {
1617 let root = test_storage_dir();
1618 let config = StreamConfig::new("text/plain".to_string());
1619
1620 {
1621 let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1622 storage.create_stream("s", config.clone()).unwrap();
1623 storage
1624 .append("s", Bytes::from("good"), "text/plain")
1625 .unwrap();
1626 }
1627
1628 let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode("s".as_bytes());
1630 let log_path = root.join(&encoded).join("data.log");
1631 let mut f = OpenOptions::new().append(true).open(&log_path).unwrap();
1632 f.write_all(&[0xFF, 0xFF]).unwrap();
1634 drop(f);
1635
1636 let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1637 let read = restored.read("s", &Offset::start()).unwrap();
1638 assert_eq!(read.messages.len(), 1);
1639 assert_eq!(read.messages[0], Bytes::from("good"));
1640 }
1641}