1use super::{
2 CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
3 ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
4};
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use base64::Engine;
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::fs::{self, File, OpenOptions};
14use std::io::{Read, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::sync::{Arc, RwLock};
17use tokio::sync::broadcast;
18use tracing::warn;
19
20const RECORD_HEADER_BYTES: usize = 4;
22const INITIAL_INDEX_CAPACITY: usize = 256;
23const INITIAL_PRODUCERS_CAPACITY: usize = 8;
24
25#[derive(Debug, Clone)]
26struct MessageIndex {
27 offset: Offset,
28 file_pos: u64,
29 byte_len: u64,
30}
31
32#[derive(Debug, Serialize, Deserialize)]
33struct StreamMeta {
34 name: String,
35 config: StreamConfig,
36 closed: bool,
37 created_at: DateTime<Utc>,
38 last_seq: Option<String>,
39 producers: HashMap<String, ProducerState>,
40}
41
42struct StreamEntry {
43 config: StreamConfig,
44 index: Vec<MessageIndex>,
45 closed: bool,
46 next_read_seq: u64,
47 next_byte_offset: u64,
48 total_bytes: u64,
49 created_at: DateTime<Utc>,
50 producers: HashMap<String, ProducerState>,
51 notify: broadcast::Sender<()>,
52 last_seq: Option<String>,
53 file: File,
54 file_len: u64,
55 dir: PathBuf,
56}
57
58impl StreamEntry {
59 fn new(config: StreamConfig, file: File, dir: PathBuf) -> Self {
60 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
61 let file_len = file.metadata().map_or(0, |m| m.len());
62 Self {
63 config,
64 index: Vec::with_capacity(INITIAL_INDEX_CAPACITY),
65 closed: false,
66 next_read_seq: 0,
67 next_byte_offset: 0,
68 total_bytes: 0,
69 created_at: Utc::now(),
70 producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
71 notify,
72 last_seq: None,
73 file,
74 file_len,
75 dir,
76 }
77 }
78}
79
80#[allow(clippy::module_name_repetitions)]
88pub struct FileStorage {
89 streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
90 total_bytes: RwLock<u64>,
91 max_total_bytes: u64,
92 max_stream_bytes: u64,
93 root_dir: PathBuf,
94 root_dir_canonical: PathBuf,
95 sync_on_append: bool,
96}
97
98impl FileStorage {
99 pub fn new(
104 root_dir: impl Into<PathBuf>,
105 max_total_bytes: u64,
106 max_stream_bytes: u64,
107 sync_on_append: bool,
108 ) -> Result<Self> {
109 let root_dir = root_dir.into();
110 fs::create_dir_all(&root_dir).map_err(|e| {
111 Error::Storage(format!(
112 "failed to create storage directory {}: {e}",
113 root_dir.display()
114 ))
115 })?;
116
117 let root_dir_canonical = fs::canonicalize(&root_dir).map_err(|e| {
118 Error::Storage(format!(
119 "failed to canonicalize storage directory {}: {e}",
120 root_dir.display()
121 ))
122 })?;
123
124 let storage = Self {
125 streams: RwLock::new(HashMap::new()),
126 total_bytes: RwLock::new(0),
127 max_total_bytes,
128 max_stream_bytes,
129 root_dir,
130 root_dir_canonical,
131 sync_on_append,
132 };
133 storage.load_existing_streams()?;
134 Ok(storage)
135 }
136
137 #[must_use]
141 pub fn total_bytes(&self) -> u64 {
142 *self.total_bytes.read().expect("total_bytes lock poisoned")
143 }
144
145 fn stream_dir_for_name(&self, name: &str) -> Result<PathBuf> {
151 let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode(name.as_bytes());
152
153 if encoded.contains("..") || encoded.contains('/') || encoded.contains('\\') {
158 return Err(Error::Storage(
159 "encoded stream name contains path traversal characters".to_string(),
160 ));
161 }
162 if !encoded
163 .chars()
164 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
165 {
166 return Err(Error::Storage(
167 "encoded stream directory contains invalid characters".to_string(),
168 ));
169 }
170
171 let dir = self.root_dir.join(&encoded);
172 if !dir.starts_with(&self.root_dir) {
173 return Err(Error::Storage(format!(
174 "stream directory escapes storage root: {encoded}"
175 )));
176 }
177 Ok(dir)
178 }
179
180 fn validate_stream_dir(&self, dir: &Path) -> Result<()> {
181 if !dir.starts_with(&self.root_dir) {
182 return Err(Error::Storage(format!(
183 "path escapes storage root: {}",
184 dir.display()
185 )));
186 }
187
188 let rel = dir.strip_prefix(&self.root_dir).map_err(|e| {
189 Error::Storage(format!(
190 "failed to validate storage path {}: {e}",
191 dir.display()
192 ))
193 })?;
194 if rel.components().count() != 1 {
195 return Err(Error::Storage(format!(
196 "invalid stream path depth: {}",
197 dir.display()
198 )));
199 }
200
201 if dir.exists() {
202 let metadata = fs::symlink_metadata(dir).map_err(|e| {
203 Error::Storage(format!(
204 "failed to stat stream directory {}: {e}",
205 dir.display()
206 ))
207 })?;
208 if metadata.file_type().is_symlink() {
209 return Err(Error::Storage(format!(
210 "stream directory cannot be a symlink: {}",
211 dir.display()
212 )));
213 }
214 if !metadata.is_dir() {
215 return Err(Error::Storage(format!(
216 "stream path is not a directory: {}",
217 dir.display()
218 )));
219 }
220
221 let canonical = fs::canonicalize(dir).map_err(|e| {
222 Error::Storage(format!(
223 "failed to canonicalize stream directory {}: {e}",
224 dir.display()
225 ))
226 })?;
227 if !canonical.starts_with(&self.root_dir_canonical) {
228 return Err(Error::Storage(format!(
229 "stream directory resolves outside storage root: {}",
230 dir.display()
231 )));
232 }
233 }
234
235 Ok(())
236 }
237
238 fn data_log_path(dir: &Path) -> PathBuf {
239 dir.join("data.log")
240 }
241
242 fn meta_path(dir: &Path) -> PathBuf {
243 dir.join("meta.json")
244 }
245
246 fn write_metadata_for(&self, name: &str, entry: &StreamEntry) -> Result<()> {
247 self.validate_stream_dir(&entry.dir)?;
248 let meta = StreamMeta {
249 name: name.to_string(),
250 config: entry.config.clone(),
251 closed: entry.closed,
252 created_at: entry.created_at,
253 last_seq: entry.last_seq.clone(),
254 producers: entry.producers.clone(),
255 };
256
257 let meta_path = Self::meta_path(&entry.dir);
258 let tmp_path = entry.dir.join("meta.json.tmp");
259 let payload = serde_json::to_vec(&meta)
260 .map_err(|e| Error::Storage(format!("failed to serialize stream metadata: {e}")))?;
261
262 fs::write(&tmp_path, payload).map_err(|e| {
263 Error::Storage(format!(
264 "failed to write metadata temp file {}: {e}",
265 tmp_path.display()
266 ))
267 })?;
268
269 fs::rename(&tmp_path, &meta_path).map_err(|e| {
270 Error::Storage(format!(
271 "failed to atomically replace metadata {}: {e}",
272 meta_path.display()
273 ))
274 })?;
275
276 Ok(())
277 }
278
279 fn open_stream_file(&self, dir: &Path) -> Result<File> {
280 self.validate_stream_dir(dir)?;
281 let path = Self::data_log_path(dir);
282 OpenOptions::new()
283 .create(true)
284 .append(true)
285 .read(true)
286 .open(&path)
287 .map_err(|e| {
288 Error::Storage(format!("failed to open stream log {}: {e}", path.display()))
289 })
290 }
291
292 fn rebuild_index(file: &mut File) -> Result<(Vec<MessageIndex>, u64, u64)> {
293 let mut index = Vec::new();
294 let mut next_read_seq = 0u64;
295 let mut next_byte_offset = 0u64;
296
297 let file_len = file
298 .metadata()
299 .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
300 .len();
301
302 let mut cursor = 0u64;
303 let mut header = [0u8; RECORD_HEADER_BYTES];
304
305 while cursor < file_len {
306 file.seek(SeekFrom::Start(cursor))
307 .map_err(|e| Error::Storage(format!("failed to seek stream log: {e}")))?;
308
309 let read = file
310 .read(&mut header)
311 .map_err(|e| Error::Storage(format!("failed to read stream log header: {e}")))?;
312
313 if read == 0 {
314 break;
315 }
316
317 if read < RECORD_HEADER_BYTES {
318 file.set_len(cursor).map_err(|e| {
319 Error::Storage(format!("failed to truncate partial record: {e}"))
320 })?;
321 break;
322 }
323
324 let record_len = u64::from(u32::from_le_bytes(header));
325 let record_end = cursor + RECORD_HEADER_BYTES as u64 + record_len;
326
327 if record_end > file_len {
328 file.set_len(cursor).map_err(|e| {
329 Error::Storage(format!("failed to truncate partial record: {e}"))
330 })?;
331 break;
332 }
333
334 index.push(MessageIndex {
335 offset: Offset::new(next_read_seq, next_byte_offset),
336 file_pos: cursor + RECORD_HEADER_BYTES as u64,
337 byte_len: record_len,
338 });
339
340 next_read_seq += 1;
341 next_byte_offset += record_len;
342 cursor = record_end;
343 }
344
345 file.seek(SeekFrom::End(0))
346 .map_err(|e| Error::Storage(format!("failed to seek end of stream log: {e}")))?;
347
348 Ok((index, next_read_seq, next_byte_offset))
349 }
350
351 fn rollback_total_bytes(&self, bytes: u64) {
352 let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
353 *total = total.saturating_sub(bytes);
354 }
355
356 fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
357 let streams = self.streams.read().expect("streams lock poisoned");
358 streams.get(name).map(Arc::clone)
359 }
360
361 fn append_records(
362 &self,
363 name: &str,
364 stream: &mut StreamEntry,
365 messages: &[Bytes],
366 ) -> Result<()> {
367 if messages.is_empty() {
368 return Ok(());
369 }
370
371 let mut total_batch_bytes = 0u64;
372 let mut payload_bytes = 0u64;
373 let mut sizes = Vec::with_capacity(messages.len());
374
375 for msg in messages {
376 let len = u64::try_from(msg.len()).unwrap_or(u64::MAX);
377 if len > u64::from(u32::MAX) {
378 return Err(Error::InvalidHeader {
379 header: "Content-Length".to_string(),
380 reason: "message too large for file record format".to_string(),
381 });
382 }
383 payload_bytes += len;
384 total_batch_bytes += len;
385 sizes.push(len);
386 }
387
388 {
392 let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
393 if *total + total_batch_bytes > self.max_total_bytes {
394 return Err(Error::MemoryLimitExceeded);
395 }
396 if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
397 return Err(Error::StreamSizeLimitExceeded);
398 }
399 *total += total_batch_bytes;
400 }
401
402 let wire_overhead = RECORD_HEADER_BYTES.saturating_mul(messages.len());
403 let mut write_buf =
404 Vec::with_capacity(usize::try_from(payload_bytes).unwrap_or(0) + wire_overhead);
405 for msg in messages {
406 let len = u32::try_from(msg.len()).unwrap_or(u32::MAX);
407 write_buf.extend_from_slice(&len.to_le_bytes());
408 write_buf.extend_from_slice(msg);
409 }
410
411 let before_len = stream.file_len;
412
413 if let Err(e) = stream.file.write_all(&write_buf) {
414 if let Ok(m) = stream.file.metadata() {
416 stream.file_len = m.len();
417 }
418 self.rollback_total_bytes(total_batch_bytes);
419 return Err(Error::Storage(format!(
420 "failed to append stream log for {name}: {e}"
421 )));
422 }
423
424 if self.sync_on_append
425 && let Err(e) = stream.file.sync_data()
426 {
427 if let Ok(m) = stream.file.metadata() {
429 stream.file_len = m.len();
430 }
431 self.rollback_total_bytes(total_batch_bytes);
432 return Err(Error::Storage(format!(
433 "failed to sync stream log for {name}: {e}"
434 )));
435 }
436
437 let mut cursor = before_len;
438 for len in sizes {
439 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
440 stream.index.push(MessageIndex {
441 offset,
442 file_pos: cursor + RECORD_HEADER_BYTES as u64,
443 byte_len: len,
444 });
445 stream.next_read_seq += 1;
446 stream.next_byte_offset += len;
447 stream.total_bytes += len;
448 cursor += RECORD_HEADER_BYTES as u64 + len;
449 }
450 stream.file_len = cursor;
451
452 let _ = stream.notify.send(());
453 Ok(())
454 }
455
456 fn read_messages(file: &File, index_slice: &[MessageIndex]) -> Result<Vec<Bytes>> {
457 if index_slice.is_empty() {
458 return Ok(Vec::new());
459 }
460
461 let mut reader = file
462 .try_clone()
463 .map_err(|e| Error::Storage(format!("failed to clone stream file handle: {e}")))?;
464
465 let first_pos = index_slice[0].file_pos;
466 let last = index_slice
467 .last()
468 .expect("index_slice non-empty due early return");
469 let read_end = last.file_pos + last.byte_len;
470 let read_len = read_end.saturating_sub(first_pos);
471
472 reader
473 .seek(SeekFrom::Start(first_pos))
474 .map_err(|e| Error::Storage(format!("failed to seek message data: {e}")))?;
475
476 let mut raw = vec![0u8; usize::try_from(read_len).unwrap_or(usize::MAX)];
477 reader
478 .read_exact(&mut raw)
479 .map_err(|e| Error::Storage(format!("failed to read message data: {e}")))?;
480
481 let shared = Bytes::from(raw);
482 let mut messages = Vec::with_capacity(index_slice.len());
483 for idx in index_slice {
484 let rel_start =
485 usize::try_from(idx.file_pos.saturating_sub(first_pos)).unwrap_or(usize::MAX);
486 let rel_end = rel_start + usize::try_from(idx.byte_len).unwrap_or(usize::MAX);
487 messages.push(shared.slice(rel_start..rel_end));
488 }
489
490 Ok(messages)
491 }
492
493 fn remove_stream_dir(&self, dir: &Path) -> Result<()> {
494 self.validate_stream_dir(dir)?;
495 fs::remove_dir_all(dir).map_err(|e| {
496 Error::Storage(format!(
497 "failed to remove stream directory {}: {e}",
498 dir.display()
499 ))
500 })
501 }
502
503 fn load_existing_streams(&self) -> Result<()> {
504 let entries = fs::read_dir(&self.root_dir).map_err(|e| {
505 Error::Storage(format!(
506 "failed to read storage directory {}: {e}",
507 self.root_dir.display()
508 ))
509 })?;
510
511 let mut streams_map = self.streams.write().expect("streams lock poisoned");
512 let mut restored_total = 0u64;
513
514 for dir_entry in entries {
515 let dir_entry = dir_entry
516 .map_err(|e| Error::Storage(format!("failed to inspect storage entry: {e}")))?;
517 let path = dir_entry.path();
518 if !path.is_dir() {
519 continue;
520 }
521 if self.validate_stream_dir(&path).is_err() {
522 continue;
523 }
524
525 let meta_path = Self::meta_path(&path);
526 if !meta_path.exists() {
527 continue;
528 }
529
530 let meta_payload = fs::read(&meta_path).map_err(|e| {
531 Error::Storage(format!(
532 "failed to read stream metadata {}: {e}",
533 meta_path.display()
534 ))
535 })?;
536 let meta: StreamMeta = serde_json::from_slice(&meta_payload).map_err(|e| {
537 Error::Storage(format!(
538 "failed to parse stream metadata {}: {e}",
539 meta_path.display()
540 ))
541 })?;
542
543 let mut file = self.open_stream_file(&path)?;
544 let (index, next_read_seq, next_byte_offset) = Self::rebuild_index(&mut file)?;
545 let total_bytes = next_byte_offset;
546 let file_len = file
547 .metadata()
548 .map_err(|e| Error::Storage(format!("failed to stat stream log: {e}")))?
549 .len();
550
551 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
552 let entry = StreamEntry {
553 config: meta.config,
554 index,
555 closed: meta.closed,
556 next_read_seq,
557 next_byte_offset,
558 total_bytes,
559 created_at: meta.created_at,
560 producers: meta.producers,
561 notify,
562 last_seq: meta.last_seq,
563 file,
564 file_len,
565 dir: path,
566 };
567
568 if super::is_stream_expired(&entry.config) {
569 self.remove_stream_dir(&entry.dir)?;
570 continue;
571 }
572
573 restored_total = restored_total.saturating_add(entry.total_bytes);
574 streams_map.insert(meta.name, Arc::new(RwLock::new(entry)));
575 }
576
577 let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
578 *total = restored_total;
579
580 Ok(())
581 }
582}
583
584impl Storage for FileStorage {
585 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
586 let mut streams = self.streams.write().expect("streams lock poisoned");
587
588 if let Some(stream_arc) = streams.get(name) {
589 let stream = stream_arc.read().expect("stream lock poisoned");
590
591 if super::is_stream_expired(&stream.config) {
592 let stream_bytes = stream.total_bytes;
593 let dir = stream.dir.clone();
594 drop(stream);
595 streams.remove(name);
596
597 let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
598 *total = total.saturating_sub(stream_bytes);
599 drop(total);
600
601 self.remove_stream_dir(&dir)?;
602 } else if stream.config == config {
603 return Ok(CreateStreamResult::AlreadyExists);
604 } else {
605 return Err(Error::ConfigMismatch);
606 }
607 }
608
609 let dir = self.stream_dir_for_name(name)?;
610 fs::create_dir_all(&dir).map_err(|e| {
611 Error::Storage(format!(
612 "failed to create stream directory {}: {e}",
613 dir.display()
614 ))
615 })?;
616
617 self.validate_stream_dir(&dir)?;
618 let file = self.open_stream_file(&dir)?;
619 let entry = StreamEntry::new(config, file, dir);
620
621 self.write_metadata_for(name, &entry)?;
622 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
623
624 Ok(CreateStreamResult::Created)
625 }
626
627 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
628 let stream_arc = self
629 .get_stream(name)
630 .ok_or_else(|| Error::NotFound(name.to_string()))?;
631
632 let mut stream = stream_arc.write().expect("stream lock poisoned");
633
634 if super::is_stream_expired(&stream.config) {
635 return Err(Error::StreamExpired);
636 }
637
638 if stream.closed {
639 return Err(Error::StreamClosed);
640 }
641
642 super::validate_content_type(&stream.config.content_type, content_type)?;
643
644 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
645 self.append_records(name, &mut stream, &[data])?;
646 Ok(offset)
648 }
649
650 fn batch_append(
651 &self,
652 name: &str,
653 messages: Vec<Bytes>,
654 content_type: &str,
655 seq: Option<&str>,
656 ) -> Result<Offset> {
657 if messages.is_empty() {
658 return Err(Error::InvalidHeader {
659 header: "Content-Length".to_string(),
660 reason: "batch cannot be empty".to_string(),
661 });
662 }
663
664 let stream_arc = self
665 .get_stream(name)
666 .ok_or_else(|| Error::NotFound(name.to_string()))?;
667
668 let mut stream = stream_arc.write().expect("stream lock poisoned");
669
670 if super::is_stream_expired(&stream.config) {
671 return Err(Error::StreamExpired);
672 }
673
674 if stream.closed {
675 return Err(Error::StreamClosed);
676 }
677
678 super::validate_content_type(&stream.config.content_type, content_type)?;
679
680 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
681 self.append_records(name, &mut stream, &messages)?;
682 if let Some(new_seq) = pending_seq {
683 stream.last_seq = Some(new_seq);
684 if let Err(e) = self.write_metadata_for(name, &stream) {
686 warn!(%e, stream = name, "metadata persist failed after batch append");
687 }
688 }
689
690 Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
691 }
692
693 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
694 let stream_arc = self
695 .get_stream(name)
696 .ok_or_else(|| Error::NotFound(name.to_string()))?;
697
698 let stream = stream_arc.read().expect("stream lock poisoned");
699
700 if super::is_stream_expired(&stream.config) {
701 return Err(Error::StreamExpired);
702 }
703
704 if from_offset.is_now() {
705 return Ok(ReadResult {
706 messages: Vec::new(),
707 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
708 at_tail: true,
709 closed: stream.closed,
710 });
711 }
712
713 let start_idx = if from_offset.is_start() {
714 0
715 } else {
716 match stream.index.binary_search_by(|m| m.offset.cmp(from_offset)) {
717 Ok(idx) | Err(idx) => idx,
718 }
719 };
720
721 let index_slice = &stream.index[start_idx..];
722 let messages = Self::read_messages(&stream.file, index_slice)?;
723 let at_tail = start_idx + messages.len() >= stream.index.len();
724
725 Ok(ReadResult {
726 messages,
727 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
728 at_tail,
729 closed: stream.closed,
730 })
731 }
732
733 fn delete(&self, name: &str) -> Result<()> {
734 let mut streams = self.streams.write().expect("streams lock poisoned");
735
736 if let Some(stream_arc) = streams.remove(name) {
737 let stream = stream_arc.read().expect("stream lock poisoned");
738 let dir = stream.dir.clone();
739 let stream_bytes = stream.total_bytes;
740 drop(stream);
741
742 let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
743 *total = total.saturating_sub(stream_bytes);
744 drop(total);
745
746 self.remove_stream_dir(&dir)?;
747 Ok(())
748 } else {
749 Err(Error::NotFound(name.to_string()))
750 }
751 }
752
753 fn head(&self, name: &str) -> Result<StreamMetadata> {
754 let stream_arc = self
755 .get_stream(name)
756 .ok_or_else(|| Error::NotFound(name.to_string()))?;
757
758 let stream = stream_arc.read().expect("stream lock poisoned");
759
760 if super::is_stream_expired(&stream.config) {
761 return Err(Error::StreamExpired);
762 }
763
764 Ok(StreamMetadata {
765 config: stream.config.clone(),
766 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
767 closed: stream.closed,
768 total_bytes: stream.total_bytes,
769 message_count: u64::try_from(stream.index.len()).unwrap_or(u64::MAX),
770 created_at: stream.created_at,
771 })
772 }
773
774 fn close_stream(&self, name: &str) -> Result<()> {
775 let stream_arc = self
776 .get_stream(name)
777 .ok_or_else(|| Error::NotFound(name.to_string()))?;
778
779 let mut stream = stream_arc.write().expect("stream lock poisoned");
780
781 if super::is_stream_expired(&stream.config) {
782 return Err(Error::StreamExpired);
783 }
784
785 stream.closed = true;
786 self.write_metadata_for(name, &stream)?;
787
788 let _ = stream.notify.send(());
789 Ok(())
790 }
791
792 fn append_with_producer(
793 &self,
794 name: &str,
795 messages: Vec<Bytes>,
796 content_type: &str,
797 producer: &ProducerHeaders,
798 should_close: bool,
799 seq: Option<&str>,
800 ) -> Result<ProducerAppendResult> {
801 let stream_arc = self
802 .get_stream(name)
803 .ok_or_else(|| Error::NotFound(name.to_string()))?;
804
805 let mut stream = stream_arc.write().expect("stream lock poisoned");
806
807 if super::is_stream_expired(&stream.config) {
808 return Err(Error::StreamExpired);
809 }
810
811 super::cleanup_stale_producers(&mut stream.producers);
812
813 if !messages.is_empty() {
814 super::validate_content_type(&stream.config.content_type, content_type)?;
815 }
816
817 let now = Utc::now();
818
819 match super::check_producer(
820 stream.producers.get(producer.id.as_str()),
821 producer,
822 stream.closed,
823 )? {
824 ProducerCheck::Accept => {}
825 ProducerCheck::Duplicate { epoch, seq } => {
826 return Ok(ProducerAppendResult::Duplicate {
827 epoch,
828 seq,
829 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
830 closed: stream.closed,
831 });
832 }
833 }
834
835 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
836 self.append_records(name, &mut stream, &messages)?;
837
838 if let Some(new_seq) = pending_seq {
839 stream.last_seq = Some(new_seq);
840 }
841 if should_close {
842 stream.closed = true;
843 }
844
845 stream.producers.insert(
846 producer.id.clone(),
847 ProducerState {
848 epoch: producer.epoch,
849 last_seq: producer.seq,
850 updated_at: now,
851 },
852 );
853
854 if let Err(e) = self.write_metadata_for(name, &stream) {
856 warn!(%e, stream = name, "metadata persist failed after committed producer append");
857 }
858
859 Ok(ProducerAppendResult::Accepted {
860 epoch: producer.epoch,
861 seq: producer.seq,
862 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
863 closed: stream.closed,
864 })
865 }
866
867 fn create_stream_with_data(
868 &self,
869 name: &str,
870 config: StreamConfig,
871 messages: Vec<Bytes>,
872 should_close: bool,
873 ) -> Result<CreateWithDataResult> {
874 let mut streams = self.streams.write().expect("streams lock poisoned");
875
876 if let Some(stream_arc) = streams.get(name) {
877 let stream = stream_arc.read().expect("stream lock poisoned");
878
879 if super::is_stream_expired(&stream.config) {
880 let stream_bytes = stream.total_bytes;
881 let dir = stream.dir.clone();
882 drop(stream);
883 streams.remove(name);
884
885 let mut total = self.total_bytes.write().expect("total_bytes lock poisoned");
886 *total = total.saturating_sub(stream_bytes);
887 drop(total);
888
889 self.remove_stream_dir(&dir)?;
890 } else if stream.config == config {
891 return Ok(CreateWithDataResult {
892 status: CreateStreamResult::AlreadyExists,
893 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
894 closed: stream.closed,
895 });
896 } else {
897 return Err(Error::ConfigMismatch);
898 }
899 }
900
901 let dir = self.stream_dir_for_name(name)?;
902 fs::create_dir_all(&dir).map_err(|e| {
903 Error::Storage(format!(
904 "failed to create stream directory {}: {e}",
905 dir.display()
906 ))
907 })?;
908
909 self.validate_stream_dir(&dir)?;
910 let file = self.open_stream_file(&dir)?;
911 let mut entry = StreamEntry::new(config, file, dir);
912
913 if !messages.is_empty() {
914 self.append_records(name, &mut entry, &messages)?;
915 }
916 if should_close {
917 entry.closed = true;
918 }
919
920 let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
921 let closed = entry.closed;
922
923 self.write_metadata_for(name, &entry)?;
924 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
925
926 Ok(CreateWithDataResult {
927 status: CreateStreamResult::Created,
928 next_offset,
929 closed,
930 })
931 }
932
933 fn exists(&self, name: &str) -> bool {
934 let streams = self.streams.read().expect("streams lock poisoned");
935 if let Some(stream_arc) = streams.get(name) {
936 let stream = stream_arc.read().expect("stream lock poisoned");
937 !super::is_stream_expired(&stream.config)
938 } else {
939 false
940 }
941 }
942
943 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
944 let stream_arc = self.get_stream(name)?;
945 let stream = stream_arc.read().expect("stream lock poisoned");
946
947 if super::is_stream_expired(&stream.config) {
948 return None;
949 }
950
951 Some(stream.notify.subscribe())
952 }
953}
954
955#[cfg(test)]
956mod tests {
957 use super::*;
958 use base64::Engine;
959
960 fn test_storage_dir() -> PathBuf {
961 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
962 let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
963 let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
964 let pid = std::process::id();
965 std::env::temp_dir().join(format!("ds-file-storage-test-{stamp}-{pid}-{seq}"))
966 }
967
968 fn test_storage() -> FileStorage {
969 FileStorage::new(test_storage_dir(), 1024 * 1024, 100 * 1024, false)
970 .expect("file storage should initialize")
971 }
972
973 #[test]
974 fn test_delete_removes_files() {
975 let storage = test_storage();
976 let config = StreamConfig::new("text/plain".to_string());
977 storage.create_stream("test", config).unwrap();
978 storage
979 .append("test", Bytes::from("data"), "text/plain")
980 .unwrap();
981
982 let dir = storage.stream_dir_for_name("test").unwrap();
983 assert!(dir.exists(), "stream directory should exist before delete");
984
985 storage.delete("test").unwrap();
986 assert!(
987 !dir.exists(),
988 "stream directory should be removed after delete"
989 );
990 }
991
992 #[test]
993 fn test_restore_from_disk() {
994 let root = test_storage_dir();
995 let config = StreamConfig::new("text/plain".to_string());
996
997 {
998 let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false)
999 .expect("file storage should initialize");
1000 storage
1001 .create_stream("events", config.clone())
1002 .expect("stream should be created");
1003 storage
1004 .append("events", Bytes::from("event-1"), "text/plain")
1005 .expect("append should succeed");
1006 storage
1007 .append("events", Bytes::from("event-2"), "text/plain")
1008 .expect("append should succeed");
1009 }
1010
1011 let restored =
1012 FileStorage::new(root, 1024 * 1024, 100 * 1024, false).expect("restore should work");
1013
1014 let read = restored
1015 .read("events", &Offset::start())
1016 .expect("read should succeed");
1017
1018 assert_eq!(read.messages.len(), 2);
1019 assert_eq!(read.messages[0], Bytes::from("event-1"));
1020 assert_eq!(read.messages[1], Bytes::from("event-2"));
1021 }
1022
1023 #[test]
1024 fn test_restore_closed_stream_from_disk() {
1025 let root = test_storage_dir();
1026 let config = StreamConfig::new("text/plain".to_string());
1027
1028 {
1029 let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1030 storage.create_stream("s", config.clone()).unwrap();
1031 storage
1032 .append("s", Bytes::from("data"), "text/plain")
1033 .unwrap();
1034 storage.close_stream("s").unwrap();
1035 }
1036
1037 let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1038 let meta = restored.head("s").unwrap();
1039 assert!(meta.closed);
1040 assert_eq!(meta.message_count, 1);
1041
1042 assert!(matches!(
1043 restored.append("s", Bytes::from("more"), "text/plain"),
1044 Err(Error::StreamClosed)
1045 ));
1046 }
1047
1048 #[test]
1049 fn test_partial_record_truncation_on_recovery() {
1050 let root = test_storage_dir();
1051 let config = StreamConfig::new("text/plain".to_string());
1052
1053 {
1054 let storage = FileStorage::new(root.clone(), 1024 * 1024, 100 * 1024, false).unwrap();
1055 storage.create_stream("s", config.clone()).unwrap();
1056 storage
1057 .append("s", Bytes::from("good"), "text/plain")
1058 .unwrap();
1059 }
1060
1061 let encoded = base64::prelude::BASE64_URL_SAFE_NO_PAD.encode("s".as_bytes());
1063 let log_path = root.join(&encoded).join("data.log");
1064 let mut f = OpenOptions::new().append(true).open(&log_path).unwrap();
1065 f.write_all(&[0xFF, 0xFF]).unwrap();
1067 drop(f);
1068
1069 let restored = FileStorage::new(root, 1024 * 1024, 100 * 1024, false).unwrap();
1070 let read = restored.read("s", &Offset::start()).unwrap();
1071 assert_eq!(read.messages.len(), 1);
1072 assert_eq!(read.messages[0], Bytes::from("good"));
1073 }
1074}