1use super::{
8 CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
9 ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
10};
11use crate::protocol::error::{Error, Result};
12use crate::protocol::offset::Offset;
13use crate::protocol::producer::ProducerHeaders;
14use base64::Engine;
15use bytes::Bytes;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::fs::{self, File, OpenOptions};
20use std::io::{self, Read, Seek, SeekFrom, Write};
21use std::path::{Path, PathBuf};
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::{Arc, RwLock};
24use tokio::sync::broadcast;
25use tracing::warn;
26
27const RECORD_HEADER_BYTES: usize = 4;
29const INITIAL_INDEX_CAPACITY: usize = 256;
30const INITIAL_PRODUCERS_CAPACITY: usize = 8;
31
32fn retry_on_eintr<T>(
40 mut op: impl FnMut() -> std::result::Result<T, io::Error>,
41) -> std::result::Result<T, io::Error> {
42 loop {
43 match op() {
44 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
45 result => return result,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
51struct MessageIndex {
52 offset: Offset,
53 file_pos: u64,
54 byte_len: u64,
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58struct StreamMeta {
59 name: String,
60 config: StreamConfig,
61 closed: bool,
62 created_at: DateTime<Utc>,
63 last_seq: Option<String>,
64 producers: HashMap<String, ProducerState>,
65}
66
67struct StreamEntry {
68 config: StreamConfig,
69 index: Vec<MessageIndex>,
70 closed: bool,
71 next_read_seq: u64,
72 next_byte_offset: u64,
73 total_bytes: u64,
74 created_at: DateTime<Utc>,
75 producers: HashMap<String, ProducerState>,
76 notify: broadcast::Sender<()>,
77 last_seq: Option<String>,
78 file: File,
79 file_len: u64,
80 dir: PathBuf,
81}
82
83impl StreamEntry {
84 fn new(config: StreamConfig, file: File, dir: PathBuf) -> Self {
85 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
86 let file_len = file.metadata().map_or(0, |m| m.len());
87 Self {
88 config,
89 index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
90 closed: false,
91 next_read_seq: 0,
92 next_byte_offset: 0,
93 total_bytes: 0,
94 created_at: Utc::now(),
95 producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
96 notify,
97 last_seq: None,
98 file,
99 file_len,
100 dir,
101 }
102 }
103}
104
105#[allow(clippy::module_name_repetitions)]
117pub struct FileStorage {
118 streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
119 total_bytes: AtomicU64,
120 max_total_bytes: u64,
121 max_stream_bytes: u64,
122 root_dir: PathBuf,
123 root_dir_canonical: PathBuf,
124 sync_on_append: bool,
125}
126
127impl FileStorage {
128 pub fn new(
138 root_dir: impl Into<PathBuf>,
139 max_total_bytes: u64,
140 max_stream_bytes: u64,
141 sync_on_append: bool,
142 ) -> Result<Self> {
143 let root_dir = root_dir.into();
144 retry_on_eintr(|| fs::create_dir_all(&root_dir)).map_err(|e| {
145 Error::classify_io_failure(
146 "file",
147 "create storage directory",
148 format!(
149 "failed to create storage directory {}: {e}",
150 root_dir.display()
151 ),
152 &e,
153 )
154 })?;
155
156 let root_dir_canonical = fs::canonicalize(&root_dir).map_err(|e| {
157 Error::Storage(format!(
158 "failed to canonicalize storage directory {}: {e}",
159 root_dir.display()
160 ))
161 })?;
162
163 let storage = Self {
164 streams: RwLock::new(HashMap::new()),
165 total_bytes: AtomicU64::new(0),
166 max_total_bytes,
167 max_stream_bytes,
168 root_dir,
169 root_dir_canonical,
170 sync_on_append,
171 };
172 storage.load_existing_streams()?;
173 Ok(storage)
174 }
175
176 #[must_use]
178 pub fn total_bytes(&self) -> u64 {
179 self.total_bytes.load(Ordering::Acquire)
180 }
181
182 fn stream_dir_for_name(&self, name: &str) -> Result<PathBuf> {
188 let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode(name.as_bytes());
189
190 if encoded.contains("..") || encoded.contains('/') || encoded.contains('\\') {
195 return Err(Error::Storage(
196 "encoded stream name contains path traversal characters".to_string(),
197 ));
198 }
199 if !encoded
200 .chars()
201 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
202 {
203 return Err(Error::Storage(
204 "encoded stream directory contains invalid characters".to_string(),
205 ));
206 }
207
208 let dir = self.root_dir.join(&encoded);
209 if !dir.starts_with(&self.root_dir) {
210 return Err(Error::Storage(format!(
211 "stream directory escapes storage root: {encoded}"
212 )));
213 }
214 Ok(dir)
215 }
216
217 fn validate_stream_dir(&self, dir: &Path) -> Result<()> {
218 if !dir.starts_with(&self.root_dir) {
219 return Err(Error::Storage(format!(
220 "path escapes storage root: {}",
221 dir.display()
222 )));
223 }
224
225 let rel = dir.strip_prefix(&self.root_dir).map_err(|e| {
226 Error::Storage(format!(
227 "failed to validate storage path {}: {e}",
228 dir.display()
229 ))
230 })?;
231 if rel.components().count() != 1 {
232 return Err(Error::Storage(format!(
233 "invalid stream path depth: {}",
234 dir.display()
235 )));
236 }
237
238 if dir.exists() {
239 let metadata = fs::symlink_metadata(dir).map_err(|e| {
240 Error::Storage(format!(
241 "failed to stat stream directory {}: {e}",
242 dir.display()
243 ))
244 })?;
245 if metadata.file_type().is_symlink() {
246 return Err(Error::Storage(format!(
247 "stream directory cannot be a symlink: {}",
248 dir.display()
249 )));
250 }
251 if !metadata.is_dir() {
252 return Err(Error::Storage(format!(
253 "stream path is not a directory: {}",
254 dir.display()
255 )));
256 }
257
258 let canonical = fs::canonicalize(dir).map_err(|e| {
259 Error::Storage(format!(
260 "failed to canonicalize stream directory {}: {e}",
261 dir.display()
262 ))
263 })?;
264 if !canonical.starts_with(&self.root_dir_canonical) {
265 return Err(Error::Storage(format!(
266 "stream directory resolves outside storage root: {}",
267 dir.display()
268 )));
269 }
270 }
271
272 Ok(())
273 }
274
275 fn data_log_path(dir: &Path) -> PathBuf {
276 dir.join("data.log")
277 }
278
279 fn meta_path(dir: &Path) -> PathBuf {
280 dir.join("meta.json")
281 }
282
283 fn write_metadata_for(&self, name: &str, entry: &StreamEntry) -> Result<()> {
284 self.validate_stream_dir(&entry.dir)?;
285 let meta = StreamMeta {
286 name: name.to_string(),
287 config: entry.config.clone(),
288 closed: entry.closed,
289 created_at: entry.created_at,
290 last_seq: entry.last_seq.clone(),
291 producers: entry.producers.clone(),
292 };
293
294 let meta_path = Self::meta_path(&entry.dir);
295 let tmp_path = entry.dir.join("meta.json.tmp");
296 let payload = serde_json::to_vec(&meta)
297 .map_err(|e| Error::Storage(format!("failed to serialize stream metadata: {e}")))?;
298
299 retry_on_eintr(|| fs::write(&tmp_path, payload.as_slice())).map_err(|e| {
300 Error::classify_io_failure(
301 "file",
302 "write stream metadata temp file",
303 format!(
304 "failed to write metadata temp file {}: {e}",
305 tmp_path.display()
306 ),
307 &e,
308 )
309 })?;
310
311 retry_on_eintr(|| fs::rename(&tmp_path, &meta_path)).map_err(|e| {
312 Error::classify_io_failure(
313 "file",
314 "replace stream metadata",
315 format!(
316 "failed to atomically replace metadata {}: {e}",
317 meta_path.display()
318 ),
319 &e,
320 )
321 })?;
322
323 Ok(())
324 }
325
326 fn open_stream_file(&self, dir: &Path) -> Result<File> {
327 self.validate_stream_dir(dir)?;
328 let path = Self::data_log_path(dir);
329 retry_on_eintr(|| {
330 OpenOptions::new()
331 .create(true)
332 .append(true)
333 .read(true)
334 .open(&path)
335 })
336 .map_err(|e| {
337 Error::classify_io_failure(
338 "file",
339 "open stream log",
340 format!("failed to open stream log {}: {e}", path.display()),
341 &e,
342 )
343 })
344 }
345
346 fn rebuild_index(file: &mut File) -> Result<(Vec<MessageIndex>, u64, u64)> {
347 let mut index = Vec::new();
348 let mut next_read_seq = 0u64;
349 let mut next_byte_offset = 0u64;
350
351 let file_len = file
352 .metadata()
353 .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
354 .len();
355
356 let mut cursor = 0u64;
357 let mut header = [0u8; RECORD_HEADER_BYTES];
358
359 while cursor < file_len {
360 file.seek(SeekFrom::Start(cursor))
361 .map_err(|e| Error::Storage(format!("failed to seek stream log: {e}")))?;
362
363 let read = file
364 .read(&mut header)
365 .map_err(|e| Error::Storage(format!("failed to read stream log header: {e}")))?;
366
367 if read == 0 {
368 break;
369 }
370
371 if read < RECORD_HEADER_BYTES {
372 file.set_len(cursor).map_err(|e| {
373 Error::Storage(format!("failed to truncate partial record: {e}"))
374 })?;
375 break;
376 }
377
378 let record_len = u64::from(u32::from_le_bytes(header));
379 let record_end = cursor + RECORD_HEADER_BYTES as u64 + record_len;
380
381 if record_end > file_len {
382 file.set_len(cursor).map_err(|e| {
383 Error::Storage(format!("failed to truncate partial record: {e}"))
384 })?;
385 break;
386 }
387
388 index.push(MessageIndex {
389 offset: Offset::new(next_read_seq, next_byte_offset),
390 file_pos: cursor + RECORD_HEADER_BYTES as u64,
391 byte_len: record_len,
392 });
393
394 next_read_seq += 1;
395 next_byte_offset += record_len;
396 cursor = record_end;
397 }
398
399 file.seek(SeekFrom::End(0))
400 .map_err(|e| Error::Storage(format!("failed to seek end of stream log: {e}")))?;
401
402 Ok((index, next_read_seq, next_byte_offset))
403 }
404
405 fn rollback_total_bytes(&self, bytes: u64) {
406 self.total_bytes
407 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
408 Some(current.saturating_sub(bytes))
409 })
410 .ok();
411 }
412
413 fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
414 let streams = self.streams.read().expect("streams lock poisoned");
415 streams.get(name).map(Arc::clone)
416 }
417
418 fn append_records(
419 &self,
420 name: &str,
421 stream: &mut StreamEntry,
422 messages: &[Bytes],
423 ) -> Result<()> {
424 if messages.is_empty() {
425 return Ok(());
426 }
427
428 let mut total_batch_bytes = 0u64;
429 let mut payload_bytes = 0u64;
430 let mut sizes = Vec::with_capacity(messages.len());
431
432 for msg in messages {
433 let len = u64::try_from(msg.len()).unwrap_or(u64::MAX);
434 if len > u64::from(u32::MAX) {
435 return Err(Error::InvalidHeader {
436 header: "Content-Length".to_string(),
437 reason: "message too large for file record format".to_string(),
438 });
439 }
440 payload_bytes += len;
441 total_batch_bytes += len;
442 sizes.push(len);
443 }
444
445 if self
449 .total_bytes
450 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
451 current
452 .checked_add(total_batch_bytes)
453 .filter(|next| *next <= self.max_total_bytes)
454 })
455 .is_err()
456 {
457 return Err(Error::MemoryLimitExceeded);
458 }
459
460 if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
462 self.rollback_total_bytes(total_batch_bytes);
463 return Err(Error::StreamSizeLimitExceeded);
464 }
465
466 let wire_overhead = RECORD_HEADER_BYTES.saturating_mul(messages.len());
467 let mut write_buf =
468 Vec::with_capacity(usize::try_from(payload_bytes).unwrap_or(0) + wire_overhead);
469 for msg in messages {
470 let len = u32::try_from(msg.len()).unwrap_or(u32::MAX);
471 write_buf.extend_from_slice(&len.to_le_bytes());
472 write_buf.extend_from_slice(msg);
473 }
474
475 let before_len = stream.file_len;
476
477 if let Err(e) = retry_on_eintr(|| stream.file.write_all(&write_buf)) {
478 if let Ok(m) = stream.file.metadata() {
480 stream.file_len = m.len();
481 }
482 self.rollback_total_bytes(total_batch_bytes);
483 return Err(Error::Storage(format!(
484 "failed to append stream log for {name}: {e}"
485 )));
486 }
487
488 if self.sync_on_append
489 && let Err(e) = retry_on_eintr(|| stream.file.sync_data())
490 {
491 if let Ok(m) = stream.file.metadata() {
493 stream.file_len = m.len();
494 }
495 self.rollback_total_bytes(total_batch_bytes);
496 return Err(Error::classify_io_failure(
497 "file",
498 "sync stream log",
499 format!("failed to sync stream log for {name}: {e}"),
500 &e,
501 ));
502 }
503
504 let mut cursor = before_len;
505 for len in sizes {
506 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
507 stream.index.push(MessageIndex {
508 offset,
509 file_pos: cursor + RECORD_HEADER_BYTES as u64,
510 byte_len: len,
511 });
512 stream.next_read_seq += 1;
513 stream.next_byte_offset += len;
514 stream.total_bytes += len;
515 cursor += RECORD_HEADER_BYTES as u64 + len;
516 }
517 stream.file_len = cursor;
518
519 let _ = stream.notify.send(());
520 Ok(())
521 }
522
523 fn read_messages(file: &File, index_slice: &[MessageIndex]) -> Result<Vec<Bytes>> {
524 if index_slice.is_empty() {
525 return Ok(Vec::new());
526 }
527
528 let first_pos = index_slice[0].file_pos;
529 let last = index_slice
530 .last()
531 .expect("index_slice non-empty due early return");
532 let read_end = last.file_pos + last.byte_len;
533 let read_len = read_end.saturating_sub(first_pos);
534
535 let mut raw = vec![0u8; usize::try_from(read_len).unwrap_or(usize::MAX)];
539 #[cfg(unix)]
540 {
541 use std::os::unix::fs::FileExt;
542 file.read_exact_at(&mut raw, first_pos)
543 .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
544 }
545 #[cfg(windows)]
546 {
547 use std::os::windows::fs::FileExt;
548 file.seek_read(&mut raw, first_pos)
549 .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
550 }
551 #[cfg(not(any(unix, windows)))]
552 {
553 let mut reader = file
554 .try_clone()
555 .map_err(|e| Error::Storage(format!("failed to clone stream file handle: {e}")))?;
556 reader
557 .seek(SeekFrom::Start(first_pos))
558 .map_err(|e| Error::Storage(format!("failed to seek message data: {e}")))?;
559 reader
560 .read_exact(&mut raw)
561 .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
562 }
563
564 let shared = Bytes::from(raw);
565 let mut messages = Vec::with_capacity(index_slice.len());
566 for idx in index_slice {
567 let rel_start =
568 usize::try_from(idx.file_pos.saturating_sub(first_pos)).unwrap_or(usize::MAX);
569 let rel_end = rel_start + usize::try_from(idx.byte_len).unwrap_or(usize::MAX);
570 messages.push(shared.slice(rel_start..rel_end));
571 }
572
573 Ok(messages)
574 }
575
576 fn remove_stream_dir(&self, dir: &Path) -> Result<()> {
577 self.validate_stream_dir(dir)?;
578 retry_on_eintr(|| fs::remove_dir_all(dir)).map_err(|e| {
579 Error::classify_io_failure(
580 "file",
581 "remove stream directory",
582 format!("failed to remove stream directory {}: {e}", dir.display()),
583 &e,
584 )
585 })
586 }
587
588 fn load_existing_streams(&self) -> Result<()> {
589 let entries = fs::read_dir(&self.root_dir).map_err(|e| {
590 Error::Storage(format!(
591 "failed to read storage directory {}: {e}",
592 self.root_dir.display()
593 ))
594 })?;
595
596 let mut streams_map = self.streams.write().expect("streams lock poisoned");
597 let mut restored_total = 0u64;
598
599 for dir_entry in entries {
600 let dir_entry = dir_entry
601 .map_err(|e| Error::Storage(format!("failed to inspect storage entry: {e}")))?;
602 let path = dir_entry.path();
603 if !path.is_dir() {
604 continue;
605 }
606 if self.validate_stream_dir(&path).is_err() {
607 continue;
608 }
609
610 let meta_path = Self::meta_path(&path);
611 if !meta_path.exists() {
612 continue;
613 }
614
615 let meta_payload = fs::read(&meta_path).map_err(|e| {
616 Error::Storage(format!(
617 "failed to read stream metadata {}: {e}",
618 meta_path.display()
619 ))
620 })?;
621 let meta: StreamMeta = serde_json::from_slice(&meta_payload).map_err(|e| {
622 Error::Storage(format!(
623 "failed to parse stream metadata {}: {e}",
624 meta_path.display()
625 ))
626 })?;
627
628 let mut file = self.open_stream_file(&path)?;
629 let (index, next_read_seq, next_byte_offset) = Self::rebuild_index(&mut file)?;
630 let total_bytes = next_byte_offset;
631 let file_len = file
632 .metadata()
633 .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
634 .len();
635
636 let log_msg_count = index.len() as u64;
640 let meta_has_data =
641 meta.closed || !meta.producers.is_empty() || meta.last_seq.is_some();
642 if log_msg_count == 0 && meta_has_data {
643 warn!(
644 stream = meta.name,
645 "meta.json indicates activity but data.log has 0 messages; \
646 data.log is authoritative"
647 );
648 }
649
650 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
651 let mut entry = StreamEntry {
652 config: meta.config,
653 index,
654 closed: meta.closed,
655 next_read_seq,
656 next_byte_offset,
657 total_bytes,
658 created_at: meta.created_at,
659 producers: meta.producers,
660 notify,
661 last_seq: meta.last_seq,
662 file,
663 file_len,
664 dir: path,
665 };
666
667 if super::is_stream_expired(&entry.config) {
668 self.remove_stream_dir(&entry.dir)?;
669 continue;
670 }
671
672 super::cleanup_stale_producers(&mut entry.producers);
673
674 if let Err(e) = self.write_metadata_for(&meta.name, &entry) {
678 warn!(
679 %e,
680 stream = meta.name,
681 "failed to re-persist reconciled metadata during recovery"
682 );
683 }
684 restored_total = restored_total.saturating_add(entry.total_bytes);
685 streams_map.insert(meta.name, Arc::new(RwLock::new(entry)));
686 }
687
688 self.total_bytes.store(restored_total, Ordering::Release);
689
690 Ok(())
691 }
692}
693
694impl Storage for FileStorage {
695 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
696 let mut streams = self.streams.write().expect("streams lock poisoned");
697
698 if let Some(stream_arc) = streams.get(name) {
699 let stream = stream_arc.read().expect("stream lock poisoned");
700
701 if super::is_stream_expired(&stream.config) {
702 let stream_bytes = stream.total_bytes;
703 let dir = stream.dir.clone();
704 drop(stream);
705 streams.remove(name);
706
707 self.remove_stream_dir(&dir)?;
708 self.rollback_total_bytes(stream_bytes);
709 } else if stream.config == config {
710 return Ok(CreateStreamResult::AlreadyExists);
711 } else {
712 return Err(Error::ConfigMismatch);
713 }
714 }
715
716 let dir = self.stream_dir_for_name(name)?;
717 retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
718 Error::classify_io_failure(
719 "file",
720 "create stream directory",
721 format!("failed to create stream directory {}: {e}", dir.display()),
722 &e,
723 )
724 })?;
725
726 self.validate_stream_dir(&dir)?;
727 let file = self.open_stream_file(&dir)?;
728 let entry = StreamEntry::new(config, file, dir.clone());
729
730 if let Err(e) = self.write_metadata_for(name, &entry) {
731 if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
732 warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
733 }
734 return Err(e);
735 }
736 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
737
738 Ok(CreateStreamResult::Created)
739 }
740
741 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
742 let stream_arc = self
743 .get_stream(name)
744 .ok_or_else(|| Error::NotFound(name.to_string()))?;
745
746 let mut stream = stream_arc.write().expect("stream lock poisoned");
747
748 if super::is_stream_expired(&stream.config) {
749 return Err(Error::StreamExpired);
750 }
751
752 if stream.closed {
753 return Err(Error::StreamClosed);
754 }
755
756 super::validate_content_type(&stream.config.content_type, content_type)?;
757
758 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
759 self.append_records(name, &mut stream, &[data])?;
760 Ok(offset)
762 }
763
764 fn batch_append(
765 &self,
766 name: &str,
767 messages: Vec<Bytes>,
768 content_type: &str,
769 seq: Option<&str>,
770 ) -> Result<Offset> {
771 if messages.is_empty() {
772 return Err(Error::InvalidHeader {
773 header: "Content-Length".to_string(),
774 reason: "batch cannot be empty".to_string(),
775 });
776 }
777
778 let stream_arc = self
779 .get_stream(name)
780 .ok_or_else(|| Error::NotFound(name.to_string()))?;
781
782 let mut stream = stream_arc.write().expect("stream lock poisoned");
783
784 if super::is_stream_expired(&stream.config) {
785 return Err(Error::StreamExpired);
786 }
787
788 if stream.closed {
789 return Err(Error::StreamClosed);
790 }
791
792 super::validate_content_type(&stream.config.content_type, content_type)?;
793
794 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
795 self.append_records(name, &mut stream, &messages)?;
796 if let Some(new_seq) = pending_seq {
797 stream.last_seq = Some(new_seq);
798 if let Err(e) = self.write_metadata_for(name, &stream) {
800 warn!(%e, stream = name, "metadata persist failed after batch append");
801 }
802 }
803
804 Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
805 }
806
807 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
808 let stream_arc = self
809 .get_stream(name)
810 .ok_or_else(|| Error::NotFound(name.to_string()))?;
811
812 let stream = stream_arc.read().expect("stream lock poisoned");
813
814 if super::is_stream_expired(&stream.config) {
815 return Err(Error::StreamExpired);
816 }
817
818 if from_offset.is_now() {
819 return Ok(ReadResult {
820 messages: Vec::new(),
821 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
822 at_tail: true,
823 closed: stream.closed,
824 });
825 }
826
827 let start_idx = if from_offset.is_start() {
828 0
829 } else {
830 match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
831 Ok(idx) | Err(idx) => idx,
832 }
833 };
834
835 let index_slice = &stream.index[start_idx..];
836 let messages = Self::read_messages(&stream.file, index_slice)?;
837 let at_tail = start_idx + messages.len() >= stream.index.len();
838
839 Ok(ReadResult {
840 messages,
841 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
842 at_tail,
843 closed: stream.closed,
844 })
845 }
846
847 fn delete(&self, name: &str) -> Result<()> {
848 let mut streams = self.streams.write().expect("streams lock poisoned");
849
850 if let Some(stream_arc) = streams.remove(name) {
851 let stream = stream_arc.read().expect("stream lock poisoned");
852 let dir = stream.dir.clone();
853 let stream_bytes = stream.total_bytes;
854 drop(stream);
855
856 if let Err(e) = self.remove_stream_dir(&dir) {
857 streams.insert(name.to_string(), stream_arc);
859 return Err(e);
860 }
861 self.rollback_total_bytes(stream_bytes);
862 Ok(())
863 } else {
864 Err(Error::NotFound(name.to_string()))
865 }
866 }
867
868 fn head(&self, name: &str) -> Result<StreamMetadata> {
869 let stream_arc = self
870 .get_stream(name)
871 .ok_or_else(|| Error::NotFound(name.to_string()))?;
872
873 let stream = stream_arc.read().expect("stream lock poisoned");
874
875 if super::is_stream_expired(&stream.config) {
876 return Err(Error::StreamExpired);
877 }
878
879 Ok(StreamMetadata {
880 config: stream.config.clone(),
881 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
882 closed: stream.closed,
883 total_bytes: stream.total_bytes,
884 message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
885 created_at: stream.created_at,
886 })
887 }
888
889 fn close_stream(&self, name: &str) -> Result<()> {
890 let stream_arc = self
891 .get_stream(name)
892 .ok_or_else(|| Error::NotFound(name.to_string()))?;
893
894 let mut stream = stream_arc.write().expect("stream lock poisoned");
895
896 if super::is_stream_expired(&stream.config) {
897 return Err(Error::StreamExpired);
898 }
899
900 stream.closed = true;
901 self.write_metadata_for(name, &stream)?;
902
903 let _ = stream.notify.send(());
904 Ok(())
905 }
906
907 fn append_with_producer(
908 &self,
909 name: &str,
910 messages: Vec<Bytes>,
911 content_type: &str,
912 producer: &ProducerHeaders,
913 should_close: bool,
914 seq: Option<&str>,
915 ) -> Result<ProducerAppendResult> {
916 let stream_arc = self
917 .get_stream(name)
918 .ok_or_else(|| Error::NotFound(name.to_string()))?;
919
920 let mut stream = stream_arc.write().expect("stream lock poisoned");
921
922 if super::is_stream_expired(&stream.config) {
923 return Err(Error::StreamExpired);
924 }
925
926 super::cleanup_stale_producers(&mut stream.producers);
927
928 if !messages.is_empty() {
929 super::validate_content_type(&stream.config.content_type, content_type)?;
930 }
931
932 let now = Utc::now();
933
934 match super::check_producer(
935 stream.producers.get(producer.id.as_str()),
936 producer,
937 stream.closed,
938 )? {
939 ProducerCheck::Accept => {}
940 ProducerCheck::Duplicate { epoch, seq } => {
941 return Ok(ProducerAppendResult::Duplicate {
942 epoch,
943 seq,
944 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
945 closed: stream.closed,
946 });
947 }
948 }
949
950 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
951 self.append_records(name, &mut stream, &messages)?;
952
953 if let Some(new_seq) = pending_seq {
954 stream.last_seq = Some(new_seq);
955 }
956 if should_close {
957 stream.closed = true;
958 }
959
960 stream.producers.insert(
961 producer.id.clone(),
962 ProducerState {
963 epoch: producer.epoch,
964 last_seq: producer.seq,
965 updated_at: now,
966 },
967 );
968
969 if let Err(e) = self.write_metadata_for(name, &stream) {
971 warn!(%e, stream = name, "metadata persist failed after committed producer append");
972 }
973
974 Ok(ProducerAppendResult::Accepted {
975 epoch: producer.epoch,
976 seq: producer.seq,
977 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
978 closed: stream.closed,
979 })
980 }
981
982 fn create_stream_with_data(
983 &self,
984 name: &str,
985 config: StreamConfig,
986 messages: Vec<Bytes>,
987 should_close: bool,
988 ) -> Result<CreateWithDataResult> {
989 let mut streams = self.streams.write().expect("streams lock poisoned");
990
991 if let Some(stream_arc) = streams.get(name) {
992 let stream = stream_arc.read().expect("stream lock poisoned");
993
994 if super::is_stream_expired(&stream.config) {
995 let stream_bytes = stream.total_bytes;
996 let dir = stream.dir.clone();
997 drop(stream);
998 streams.remove(name);
999
1000 self.remove_stream_dir(&dir)?;
1001 self.rollback_total_bytes(stream_bytes);
1002 } else if stream.config == config {
1003 return Ok(CreateWithDataResult {
1004 status: CreateStreamResult::AlreadyExists,
1005 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
1006 closed: stream.closed,
1007 });
1008 } else {
1009 return Err(Error::ConfigMismatch);
1010 }
1011 }
1012
1013 let dir = self.stream_dir_for_name(name)?;
1014 retry_on_eintr(|| fs::create_dir_all(&dir)).map_err(|e| {
1015 Error::classify_io_failure(
1016 "file",
1017 "create stream directory",
1018 format!("failed to create stream directory {}: {e}", dir.display()),
1019 &e,
1020 )
1021 })?;
1022
1023 self.validate_stream_dir(&dir)?;
1024 let file = self.open_stream_file(&dir)?;
1025 let mut entry = StreamEntry::new(config, file, dir.clone());
1026
1027 if !messages.is_empty()
1028 && let Err(e) = self.append_records(name, &mut entry, &messages)
1029 {
1030 if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1031 warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1032 }
1033 return Err(e);
1034 }
1035 if should_close {
1036 entry.closed = true;
1037 }
1038
1039 let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
1040 let closed = entry.closed;
1041
1042 if let Err(e) = self.write_metadata_for(name, &entry) {
1043 if let Err(cleanup_err) = self.remove_stream_dir(&dir) {
1044 warn!(%cleanup_err, stream = name, "failed to clean up orphaned stream directory");
1045 }
1046 return Err(e);
1047 }
1048 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
1049
1050 Ok(CreateWithDataResult {
1051 status: CreateStreamResult::Created,
1052 next_offset,
1053 closed,
1054 })
1055 }
1056
1057 fn exists(&self, name: &str) -> bool {
1058 let streams = self.streams.read().expect("streams lock poisoned");
1059 if let Some(stream_arc) = streams.get(name) {
1060 let stream = stream_arc.read().expect("stream lock poisoned");
1061 !super::is_stream_expired(&stream.config)
1062 } else {
1063 false
1064 }
1065 }
1066
1067 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1068 let stream_arc = self.get_stream(name)?;
1069 let stream = stream_arc.read().expect("stream lock poisoned");
1070
1071 if super::is_stream_expired(&stream.config) {
1072 return None;
1073 }
1074
1075 Some(stream.notify.subscribe())
1076 }
1077
1078 fn cleanup_expired_streams(&self) -> usize {
1079 let mut streams = self.streams.write().expect("streams lock poisoned");
1080 let mut expired = Vec::new();
1081
1082 for (name, stream_arc) in streams.iter() {
1083 let stream = stream_arc.read().expect("stream lock poisoned");
1084 if super::is_stream_expired(&stream.config) {
1085 expired.push((name.clone(), stream.total_bytes, stream.dir.clone()));
1086 }
1087 }
1088
1089 let count = expired.len();
1090 for (name, bytes, dir) in &expired {
1091 streams.remove(name);
1092 if let Err(e) = self.remove_stream_dir(dir) {
1093 warn!(%e, stream = name.as_str(), "failed to remove expired stream directory");
1094 } else {
1095 self.rollback_total_bytes(*bytes);
1096 }
1097 }
1098
1099 count
1100 }
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use super::*;
1106 use base64::Engine;
1107
1108 fn test_storage_dir() -> PathBuf {
1109 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1110 let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1111 let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1112 let pid = std::process::id();
1113 std::env::temp_dir().join(format!("ds-file-storage-test-{stamp}-{pid}-{seq}"))
1114 }
1115
1116 fn test_storage() -> FileStorage {
1117 FileStorage::new(test_storage_dir(), 1024 * 1024, 100 * 1024, false)
1118 .expect("file storage should initialize")
1119 }
1120
1121 #[test]
1122 fn test_delete_removes_files() {
1123 let storage = test_storage();
1124 let config = StreamConfig::new("text/plain".to_string());
1125 storage.create_stream("test", config).unwrap();
1126 storage
1127 .append("test", Bytes::from("data"), "text/plain")
1128 .unwrap();
1129
1130 let dir = storage.stream_dir_for_name("test").unwrap();
1131 assert!(dir.exists(), "stream directory should exist before delete");
1132
1133 storage.delete("test").unwrap();
1134 assert!(
1135 !dir.exists(),
1136 "stream directory should be removed after delete"
1137 );
1138 }
1139
1140 #[test]
1141 fn test_restore_from_disk() {
1142 let root = test_storage_dir();
1143 let config = StreamConfig::new("text/plain".to_string());
1144
1145 {
1146 let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false)
1147 .expect("file storage should initialize");
1148 storage
1149 .create_stream("events", config.clone())
1150 .expect("stream should be created");
1151 storage
1152 .append("events", Bytes::from("event-1"), "text/plain")
1153 .expect("append should succeed");
1154 storage
1155 .append("events", Bytes::from("event-2"), "text/plain")
1156 .expect("append should succeed");
1157 }
1158
1159 let restored =
1160 FileStorage::new(root, 1024 * 1024, 100 * 1024, false).expect("restore should work");
1161
1162 let read = restored
1163 .read("events", &Offset::start())
1164 .expect("read should succeed");
1165
1166 assert_eq!(read.messages.len(), 2);
1167 assert_eq!(read.messages[0], Bytes::from("event-1"));
1168 assert_eq!(read.messages[1], Bytes::from("event-2"));
1169 }
1170
1171 #[test]
1172 fn test_restore_closed_stream_from_disk() {
1173 let root = test_storage_dir();
1174 let config = StreamConfig::new("text/plain".to_string());
1175
1176 {
1177 let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1178 storage.create_stream("s", config.clone()).unwrap();
1179 storage
1180 .append("s", Bytes::from("data"), "text/plain")
1181 .unwrap();
1182 storage.close_stream("s").unwrap();
1183 }
1184
1185 let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1186 let meta = restored.head("s").unwrap();
1187 assert!(meta.closed);
1188 assert_eq!(meta.message_count, 1);
1189
1190 assert!(matches!(
1191 restored.append("s", Bytes::from("more"), "text/plain"),
1192 Err(Error::StreamClosed)
1193 ));
1194 }
1195
1196 #[test]
1197 fn test_partial_record_truncation_on_recovery() {
1198 let root = test_storage_dir();
1199 let config = StreamConfig::new("text/plain".to_string());
1200
1201 {
1202 let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1203 storage.create_stream("s", config.clone()).unwrap();
1204 storage
1205 .append("s", Bytes::from("good"), "text/plain")
1206 .unwrap();
1207 }
1208
1209 let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode("s".as_bytes());
1211 let log_path = root.join(&encoded).join("data.log");
1212 let mut f = OpenOptions::new().append(true).open(&log_path).unwrap();
1213 f.write_all(&[0xFF, 0xFF]).unwrap();
1215 drop(f);
1216
1217 let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1218 let read = restored.read("s", &Offset::start()).unwrap();
1219 assert_eq!(read.messages.len(), 1);
1220 assert_eq!(read.messages[0], Bytes::from("good"));
1221 }
1222}