Skip to main content

durable_streams_server/storage/
memory.rs

1use super::{
2    CreateStreamResult, ForkInfo, Message, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
3    ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata, StreamState,
4};
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use bytes::Bytes;
9use chrono::Utc;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, RwLock};
13use tokio::sync::broadcast;
14
15const INITIAL_MESSAGES_CAPACITY: usize = 256;
16const INITIAL_PRODUCERS_CAPACITY: usize = 8;
17
18/// Internal stream entry
19struct StreamEntry {
20    config: StreamConfig,
21    messages: Vec<Message>,
22    closed: bool,
23    next_read_seq: u64,
24    next_byte_offset: u64,
25    total_bytes: u64,
26    created_at: chrono::DateTime<Utc>,
27    updated_at: Option<chrono::DateTime<Utc>>,
28    /// Per-producer state for idempotent producer support
29    producers: HashMap<String, ProducerState>,
30    /// Broadcast sender for notifying long-poll/SSE subscribers
31    notify: broadcast::Sender<()>,
32    /// Last Stream-Seq value received (lexicographic ordering)
33    last_seq: Option<String>,
34    /// Fork lineage metadata (None for root streams)
35    fork_info: Option<ForkInfo>,
36    /// Number of forks that reference this stream as their source
37    ref_count: u32,
38    /// Lifecycle state (Active or Tombstone)
39    state: StreamState,
40}
41
42impl StreamEntry {
43    fn new(config: StreamConfig) -> Self {
44        // Stream starts open; the handler closes it after any initial appends.
45        // The `created_closed` flag in config is stored for idempotent checks only.
46        let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
47        Self {
48            config,
49            messages: Vec::with_capacity(INITIAL_MESSAGES_CAPACITY),
50            closed: false,
51            next_read_seq: 0,
52            next_byte_offset: 0,
53            total_bytes: 0,
54            created_at: Utc::now(),
55            updated_at: None,
56            producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
57            notify,
58            last_seq: None,
59            fork_info: None,
60            ref_count: 0,
61            state: StreamState::Active,
62        }
63    }
64}
65
66/// In-memory storage implementation
67///
68/// Thread-safe storage with:
69/// - `RwLock<HashMap>` for stream lookup (concurrent reads)
70/// - Per-stream `RwLock` for exclusive write access (offset monotonicity)
71/// - Memory limit enforcement (global and per-stream)
72///
73/// # Concurrency Model
74///
75/// Multiple readers can access different streams concurrently.
76/// Appends to the same stream are serialized via `RwLock::write()`.
77/// Appends to different streams can proceed concurrently.
78pub struct InMemoryStorage {
79    streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
80    total_bytes: AtomicU64,
81    max_total_bytes: u64,
82    max_stream_bytes: u64,
83}
84
85impl InMemoryStorage {
86    /// Create a new in-memory storage with memory limits
87    #[must_use]
88    pub fn new(max_total_bytes: u64, max_stream_bytes: u64) -> Self {
89        Self {
90            streams: RwLock::new(HashMap::new()),
91            total_bytes: AtomicU64::new(0),
92            max_total_bytes,
93            max_stream_bytes,
94        }
95    }
96
97    /// Return the currently tracked total payload bytes across all streams.
98    #[must_use]
99    pub fn total_bytes(&self) -> u64 {
100        self.total_bytes.load(Ordering::Acquire)
101    }
102
103    fn saturating_sub_total_bytes(&self, bytes: u64) {
104        self.total_bytes
105            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
106                Some(current.saturating_sub(bytes))
107            })
108            .ok();
109    }
110
111    fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
112        let streams = self.streams.read().expect("streams lock poisoned");
113        streams.get(name).map(Arc::clone)
114    }
115
116    fn hard_remove_stream(
117        &self,
118        streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
119        name: &str,
120    ) -> Option<ForkInfo> {
121        let stream_arc = streams.remove(name)?;
122        let stream = stream_arc.read().expect("stream lock poisoned");
123        self.saturating_sub_total_bytes(stream.total_bytes);
124        stream.fork_info.clone()
125    }
126
127    fn remove_for_recreate(
128        &self,
129        streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
130        name: &str,
131    ) {
132        if let Some(fork_info) = self.hard_remove_stream(streams, name) {
133            self.cascade_delete(streams, &fork_info.source_name);
134        }
135    }
136
137    /// Read messages from a stream's local storage (no fork traversal).
138    #[allow(clippy::unnecessary_wraps)]
139    fn read_local_messages(
140        stream: &StreamEntry,
141        from_offset: &Offset,
142        next_offset: Offset,
143    ) -> Result<ReadResult> {
144        let start_idx = if from_offset.is_start() {
145            0
146        } else {
147            match stream
148                .messages
149                .binary_search_by(|m| m.offset.cmp(from_offset))
150            {
151                Ok(idx) | Err(idx) => idx,
152            }
153        };
154
155        let messages: Vec<Bytes> = stream.messages[start_idx..]
156            .iter()
157            .map(|m| m.data.clone())
158            .collect();
159
160        let at_tail = start_idx + messages.len() >= stream.messages.len();
161
162        Ok(ReadResult {
163            messages,
164            next_offset,
165            at_tail,
166            closed: stream.closed,
167        })
168    }
169
170    /// Walk up the fork chain after a hard-delete, decrementing `ref_count`s
171    /// and garbage-collecting tombstoned ancestors with zero references.
172    ///
173    /// Must be called while holding the streams write lock.
174    fn cascade_delete(
175        &self,
176        streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
177        parent_name: &str,
178    ) {
179        let mut current_parent = parent_name.to_string();
180        loop {
181            let Some(parent_arc) = streams.get(&current_parent) else {
182                break;
183            };
184            let parent_arc = parent_arc.clone();
185            let mut parent = parent_arc.write().expect("stream lock poisoned");
186            parent.ref_count = parent.ref_count.saturating_sub(1);
187
188            if parent.state == StreamState::Tombstone && parent.ref_count == 0 {
189                // This parent can be garbage-collected
190                let fi = parent.fork_info.clone();
191                self.saturating_sub_total_bytes(parent.total_bytes);
192                drop(parent);
193                streams.remove(&current_parent);
194
195                // Continue up the chain
196                if let Some(fi) = fi {
197                    current_parent = fi.source_name;
198                } else {
199                    break;
200                }
201            } else {
202                break;
203            }
204        }
205    }
206
207    /// Read messages from a source chain, following fork lineage upward.
208    ///
209    /// Reads all messages from `from_offset` up to (but not including) `up_to`
210    /// across the full ancestor chain. This bypasses tombstone checks since
211    /// source streams may be soft-deleted but their data must still be readable
212    /// by forks.
213    fn read_source_chain(
214        &self,
215        source_name: &str,
216        from_offset: &Offset,
217        up_to: &Offset,
218    ) -> Vec<Bytes> {
219        let streams = self.streams.read().expect("streams lock poisoned");
220
221        // Build the ancestor chain from source to root
222        let plan = super::fork::build_read_plan(source_name, |n| {
223            streams.get(n).map(|arc| {
224                let s = arc.read().expect("stream lock poisoned");
225                s.fork_info.clone()
226            })
227        });
228
229        let mut all_messages: Vec<Bytes> = Vec::new();
230
231        for (i, segment) in plan.iter().enumerate() {
232            let Some(seg_arc) = streams.get(&segment.name) else {
233                continue;
234            };
235            let seg_stream = seg_arc.read().expect("stream lock poisoned");
236
237            // Determine the effective upper bound for this segment
238            let effective_up_to = if i == plan.len() - 1 {
239                // Last segment (the direct source): read up to `up_to`
240                Some(up_to)
241            } else {
242                // Intermediate segment: read up to this segment's read_up_to
243                segment.read_up_to.as_ref()
244            };
245
246            // Determine start offset for this segment
247            let effective_from = if i == 0 {
248                from_offset
249            } else {
250                &Offset::start()
251            };
252
253            // Collect messages in range
254            let start_idx = if effective_from.is_start() {
255                0
256            } else {
257                match seg_stream
258                    .messages
259                    .binary_search_by(|m| m.offset.cmp(effective_from))
260                {
261                    Ok(idx) | Err(idx) => idx,
262                }
263            };
264
265            for msg in &seg_stream.messages[start_idx..] {
266                if effective_up_to.is_some_and(|bound| msg.offset >= *bound) {
267                    break;
268                }
269                all_messages.push(msg.data.clone());
270            }
271        }
272
273        all_messages
274    }
275
276    /// Commit messages to a stream, checking memory limits first.
277    ///
278    /// Caller must hold the stream write lock. Updates both stream-level
279    /// and global memory counters atomically.
280    fn commit_messages(&self, stream: &mut StreamEntry, messages: Vec<Bytes>) -> Result<()> {
281        if messages.is_empty() {
282            return Ok(());
283        }
284
285        let mut total_batch_bytes = 0u64;
286        let mut message_sizes = Vec::with_capacity(messages.len());
287        for data in &messages {
288            let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
289            message_sizes.push(byte_len);
290            total_batch_bytes += byte_len;
291        }
292
293        // Reserve global bytes atomically (global precedence before per-stream).
294        if self
295            .total_bytes
296            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
297                current
298                    .checked_add(total_batch_bytes)
299                    .filter(|next| *next <= self.max_total_bytes)
300            })
301            .is_err()
302        {
303            return Err(Error::MemoryLimitExceeded);
304        }
305        if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
306            self.saturating_sub_total_bytes(total_batch_bytes);
307            return Err(Error::StreamSizeLimitExceeded);
308        }
309
310        for (data, byte_len) in messages.into_iter().zip(message_sizes) {
311            let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
312            stream.next_read_seq += 1;
313            stream.next_byte_offset += byte_len;
314            stream.total_bytes += byte_len;
315            let message = Message::new(offset, data);
316            stream.messages.push(message);
317        }
318
319        // Notify long-poll/SSE subscribers that new data is available.
320        // Ignore errors (no active receivers is fine).
321        let _ = stream.notify.send(());
322
323        Ok(())
324    }
325
326    /// Read messages from a forked stream by combining source chain with local data.
327    ///
328    /// The caller must have already extracted fork info and local messages from
329    /// the stream entry (and dropped the lock if needed before calling this).
330    fn assemble_fork_read(
331        &self,
332        name: &str,
333        from_offset: &Offset,
334        fi: &super::ForkInfo,
335        fork_messages_data: Vec<Bytes>,
336        next_offset: Offset,
337        closed: bool,
338    ) -> Result<ReadResult> {
339        let mut all_messages: Vec<Bytes> = Vec::new();
340        if from_offset.is_start() || *from_offset < fi.fork_offset {
341            let source_messages =
342                self.read_source_chain(&fi.source_name, from_offset, &fi.fork_offset);
343            all_messages.extend(source_messages);
344        }
345
346        if from_offset.is_start() || *from_offset <= fi.fork_offset {
347            all_messages.extend(fork_messages_data);
348        } else {
349            let stream_arc = self
350                .get_stream(name)
351                .ok_or_else(|| Error::NotFound(name.to_string()))?;
352            let stream = stream_arc.read().expect("stream lock poisoned");
353            let start_idx = match stream
354                .messages
355                .binary_search_by(|m| m.offset.cmp(from_offset))
356            {
357                Ok(idx) | Err(idx) => idx,
358            };
359            let msgs: Vec<Bytes> = stream.messages[start_idx..]
360                .iter()
361                .map(|m| m.data.clone())
362                .collect();
363            all_messages.extend(msgs);
364        }
365
366        Ok(ReadResult {
367            messages: all_messages,
368            next_offset,
369            at_tail: true,
370            closed,
371        })
372    }
373}
374
375impl Storage for InMemoryStorage {
376    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
377        let mut streams = self.streams.write().expect("streams lock poisoned");
378
379        if let Some(stream_arc) = streams.get(name) {
380            let stream = stream_arc.read().expect("stream lock poisoned");
381            match super::fork::evaluate_root_create(
382                name,
383                &stream.config,
384                stream.state,
385                stream.ref_count,
386                &config,
387            ) {
388                super::fork::ExistingCreateDisposition::RemoveExpired => {
389                    drop(stream);
390                    self.remove_for_recreate(&mut streams, name);
391                }
392                super::fork::ExistingCreateDisposition::AlreadyExists => {
393                    return Ok(CreateStreamResult::AlreadyExists);
394                }
395                super::fork::ExistingCreateDisposition::Conflict(err) => {
396                    return Err(err);
397                }
398            }
399        }
400
401        let entry = StreamEntry::new(config);
402        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
403
404        Ok(CreateStreamResult::Created)
405    }
406
407    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
408        let stream_arc = self
409            .get_stream(name)
410            .ok_or_else(|| Error::NotFound(name.to_string()))?;
411
412        let mut stream = stream_arc.write().expect("stream lock poisoned");
413
414        super::fork::check_stream_access(&stream.config, stream.state, name)?;
415
416        if stream.closed {
417            return Err(Error::StreamClosed);
418        }
419
420        super::validate_content_type(&stream.config.content_type, content_type)?;
421
422        let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
423
424        if self
425            .total_bytes
426            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
427                current
428                    .checked_add(byte_len)
429                    .filter(|next| *next <= self.max_total_bytes)
430            })
431            .is_err()
432        {
433            return Err(Error::MemoryLimitExceeded);
434        }
435
436        if stream.total_bytes + byte_len > self.max_stream_bytes {
437            self.saturating_sub_total_bytes(byte_len);
438            return Err(Error::StreamSizeLimitExceeded);
439        }
440
441        let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
442
443        stream.next_read_seq += 1;
444        stream.next_byte_offset += byte_len;
445        stream.total_bytes += byte_len;
446
447        let message = Message::new(offset.clone(), data);
448        stream.messages.push(message);
449        stream.updated_at = Some(Utc::now());
450        super::fork::renew_ttl(&mut stream.config);
451        let _ = stream.notify.send(());
452
453        Ok(offset)
454    }
455
456    fn batch_append(
457        &self,
458        name: &str,
459        messages: Vec<Bytes>,
460        content_type: &str,
461        seq: Option<&str>,
462    ) -> Result<Offset> {
463        if messages.is_empty() {
464            return Err(Error::InvalidHeader {
465                header: "Content-Length".to_string(),
466                reason: "batch cannot be empty".to_string(),
467            });
468        }
469
470        let stream_arc = self
471            .get_stream(name)
472            .ok_or_else(|| Error::NotFound(name.to_string()))?;
473
474        let mut stream = stream_arc.write().expect("stream lock poisoned");
475
476        super::fork::check_stream_access(&stream.config, stream.state, name)?;
477
478        if stream.closed {
479            return Err(Error::StreamClosed);
480        }
481
482        super::validate_content_type(&stream.config.content_type, content_type)?;
483
484        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
485
486        self.commit_messages(&mut stream, messages)?;
487        if let Some(new_seq) = pending_seq {
488            stream.last_seq = Some(new_seq);
489        }
490        stream.updated_at = Some(Utc::now());
491        super::fork::renew_ttl(&mut stream.config);
492
493        Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
494    }
495
496    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
497        let stream_arc = self
498            .get_stream(name)
499            .ok_or_else(|| Error::NotFound(name.to_string()))?;
500
501        let needs_ttl_renewal = {
502            let stream = stream_arc.read().expect("stream lock poisoned");
503            super::fork::check_stream_access(&stream.config, stream.state, name)?;
504            stream.config.ttl_seconds.is_some()
505        };
506
507        if !needs_ttl_renewal {
508            let stream = stream_arc.read().expect("stream lock poisoned");
509            let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
510
511            if from_offset.is_now() {
512                return Ok(ReadResult {
513                    messages: Vec::new(),
514                    next_offset,
515                    at_tail: true,
516                    closed: stream.closed,
517                });
518            }
519
520            if stream.fork_info.is_none() {
521                return Self::read_local_messages(&stream, from_offset, next_offset);
522            }
523
524            let fi = stream.fork_info.clone().expect("checked above");
525            let closed = stream.closed;
526            let fork_messages_data: Vec<Bytes> =
527                stream.messages.iter().map(|m| m.data.clone()).collect();
528            drop(stream);
529
530            return self.assemble_fork_read(
531                name,
532                from_offset,
533                &fi,
534                fork_messages_data,
535                next_offset,
536                closed,
537            );
538        }
539
540        let mut stream = stream_arc.write().expect("stream lock poisoned");
541        super::fork::check_stream_access(&stream.config, stream.state, name)?;
542
543        let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
544        let result = if from_offset.is_now() {
545            ReadResult {
546                messages: Vec::new(),
547                next_offset,
548                at_tail: true,
549                closed: stream.closed,
550            }
551        } else if stream.fork_info.is_none() {
552            Self::read_local_messages(&stream, from_offset, next_offset)?
553        } else {
554            let fi = stream.fork_info.clone().expect("checked above");
555            let closed = stream.closed;
556            let fork_messages_data: Vec<Bytes> =
557                stream.messages.iter().map(|m| m.data.clone()).collect();
558            drop(stream);
559
560            let result = self.assemble_fork_read(
561                name,
562                from_offset,
563                &fi,
564                fork_messages_data,
565                next_offset,
566                closed,
567            )?;
568
569            stream = stream_arc.write().expect("stream lock poisoned");
570            result
571        };
572
573        super::fork::renew_ttl(&mut stream.config);
574        Ok(result)
575    }
576
577    fn delete(&self, name: &str) -> Result<()> {
578        let mut streams = self.streams.write().expect("streams lock poisoned");
579
580        let stream_arc = streams
581            .get(name)
582            .ok_or_else(|| Error::NotFound(name.to_string()))?
583            .clone();
584
585        {
586            let stream = stream_arc.read().expect("stream lock poisoned");
587
588            match super::fork::evaluate_delete(name, stream.state, stream.ref_count)? {
589                super::fork::DeleteDisposition::Tombstone => {
590                    drop(stream);
591                    let mut stream_w = stream_arc.write().expect("stream lock poisoned");
592                    stream_w.state = StreamState::Tombstone;
593                    return Ok(());
594                }
595                super::fork::DeleteDisposition::HardDelete => {}
596            }
597        }
598
599        let fork_info = self.hard_remove_stream(&mut streams, name);
600
601        if let Some(fi) = fork_info {
602            self.cascade_delete(&mut streams, &fi.source_name);
603        }
604
605        Ok(())
606    }
607
608    fn head(&self, name: &str) -> Result<StreamMetadata> {
609        let stream_arc = self
610            .get_stream(name)
611            .ok_or_else(|| Error::NotFound(name.to_string()))?;
612
613        let stream = stream_arc.read().expect("stream lock poisoned");
614
615        super::fork::check_stream_access(&stream.config, stream.state, name)?;
616
617        Ok(StreamMetadata {
618            config: stream.config.clone(),
619            next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
620            closed: stream.closed,
621            total_bytes: stream.total_bytes,
622            message_count: u64::try_from(stream.messages.len()).unwrap_or(u64::MAX),
623            created_at: stream.created_at,
624            updated_at: stream.updated_at,
625        })
626    }
627
628    fn close_stream(&self, name: &str) -> Result<()> {
629        let stream_arc = self
630            .get_stream(name)
631            .ok_or_else(|| Error::NotFound(name.to_string()))?;
632
633        let mut stream = stream_arc.write().expect("stream lock poisoned");
634
635        super::fork::check_stream_access(&stream.config, stream.state, name)?;
636
637        stream.closed = true;
638        stream.updated_at = Some(Utc::now());
639        super::fork::renew_ttl(&mut stream.config);
640
641        let _ = stream.notify.send(());
642
643        Ok(())
644    }
645
646    fn append_with_producer(
647        &self,
648        name: &str,
649        messages: Vec<Bytes>,
650        content_type: &str,
651        producer: &ProducerHeaders,
652        should_close: bool,
653        seq: Option<&str>,
654    ) -> Result<ProducerAppendResult> {
655        let stream_arc = self
656            .get_stream(name)
657            .ok_or_else(|| Error::NotFound(name.to_string()))?;
658
659        let mut stream = stream_arc.write().expect("stream lock poisoned");
660
661        super::fork::check_stream_access(&stream.config, stream.state, name)?;
662
663        super::cleanup_stale_producers(&mut stream.producers);
664
665        if !messages.is_empty() {
666            super::validate_content_type(&stream.config.content_type, content_type)?;
667        }
668
669        let now = Utc::now();
670
671        match super::check_producer(stream.producers.get(&producer.id), producer, stream.closed)? {
672            ProducerCheck::Accept => {}
673            ProducerCheck::Duplicate { epoch, seq } => {
674                return Ok(ProducerAppendResult::Duplicate {
675                    epoch,
676                    seq,
677                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
678                    closed: stream.closed,
679                });
680            }
681        }
682
683        let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
684
685        self.commit_messages(&mut stream, messages)?;
686        if let Some(new_seq) = pending_seq {
687            stream.last_seq = Some(new_seq);
688        }
689
690        if should_close {
691            stream.closed = true;
692        }
693
694        stream.updated_at = Some(now);
695
696        stream.producers.insert(
697            producer.id.clone(),
698            ProducerState {
699                epoch: producer.epoch,
700                last_seq: producer.seq,
701                updated_at: now,
702            },
703        );
704        super::fork::renew_ttl(&mut stream.config);
705
706        let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
707        let closed = stream.closed;
708
709        Ok(ProducerAppendResult::Accepted {
710            epoch: producer.epoch,
711            seq: producer.seq,
712            next_offset,
713            closed,
714        })
715    }
716
717    fn create_stream_with_data(
718        &self,
719        name: &str,
720        config: StreamConfig,
721        messages: Vec<Bytes>,
722        should_close: bool,
723    ) -> Result<super::CreateWithDataResult> {
724        let mut streams = self.streams.write().expect("streams lock poisoned");
725
726        if let Some(stream_arc) = streams.get(name) {
727            let stream = stream_arc.read().expect("stream lock poisoned");
728            match super::fork::evaluate_root_create(
729                name,
730                &stream.config,
731                stream.state,
732                stream.ref_count,
733                &config,
734            ) {
735                super::fork::ExistingCreateDisposition::RemoveExpired => {
736                    drop(stream);
737                    self.remove_for_recreate(&mut streams, name);
738                }
739                super::fork::ExistingCreateDisposition::AlreadyExists => {
740                    let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
741                    let closed = stream.closed;
742                    return Ok(super::CreateWithDataResult {
743                        status: CreateStreamResult::AlreadyExists,
744                        next_offset,
745                        closed,
746                    });
747                }
748                super::fork::ExistingCreateDisposition::Conflict(err) => {
749                    return Err(err);
750                }
751            }
752        }
753
754        let mut entry = StreamEntry::new(config);
755
756        if !messages.is_empty() {
757            self.commit_messages(&mut entry, messages)?;
758        }
759
760        if should_close {
761            entry.closed = true;
762        }
763
764        let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
765        let closed = entry.closed;
766
767        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
768
769        Ok(super::CreateWithDataResult {
770            status: CreateStreamResult::Created,
771            next_offset,
772            closed,
773        })
774    }
775
776    fn exists(&self, name: &str) -> bool {
777        let streams = self.streams.read().expect("streams lock poisoned");
778        if let Some(stream_arc) = streams.get(name) {
779            let stream = stream_arc.read().expect("stream lock poisoned");
780            !super::is_stream_expired(&stream.config) && stream.state == StreamState::Active
781        } else {
782            false
783        }
784    }
785
786    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
787        let stream_arc = self.get_stream(name)?;
788        let stream = stream_arc.read().expect("stream lock poisoned");
789
790        if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
791            return None;
792        }
793
794        Some(stream.notify.subscribe())
795    }
796
797    fn cleanup_expired_streams(&self) -> usize {
798        let mut streams = self.streams.write().expect("streams lock poisoned");
799        let mut expired = Vec::new();
800
801        for (name, stream_arc) in streams.iter() {
802            let stream = stream_arc.read().expect("stream lock poisoned");
803            if super::is_stream_expired(&stream.config) {
804                expired.push((name.clone(), stream.ref_count));
805            }
806        }
807
808        let removed_count = expired.len();
809        for (name, ref_count) in expired {
810            match super::fork::evaluate_expired_cleanup(ref_count) {
811                super::fork::DeleteDisposition::Tombstone => {
812                    if let Some(stream_arc) = streams.get(&name) {
813                        let mut stream = stream_arc.write().expect("stream lock poisoned");
814                        stream.state = StreamState::Tombstone;
815                    }
816                }
817                super::fork::DeleteDisposition::HardDelete => {
818                    self.remove_for_recreate(&mut streams, &name);
819                }
820            }
821        }
822
823        removed_count
824    }
825
826    fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>> {
827        let streams = self.streams.read().expect("streams lock poisoned");
828        let mut result = Vec::new();
829        for (name, stream_arc) in streams.iter() {
830            let stream = stream_arc.read().expect("stream lock poisoned");
831            if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
832                continue;
833            }
834            result.push((
835                name.clone(),
836                StreamMetadata {
837                    config: stream.config.clone(),
838                    next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
839                    closed: stream.closed,
840                    total_bytes: stream.total_bytes,
841                    message_count: u64::try_from(stream.messages.len()).unwrap_or(u64::MAX),
842                    created_at: stream.created_at,
843                    updated_at: stream.updated_at,
844                },
845            ));
846        }
847        result.sort_by(|a, b| a.0.cmp(&b.0));
848        Ok(result)
849    }
850
851    fn create_fork(
852        &self,
853        name: &str,
854        source_name: &str,
855        fork_offset: Option<&Offset>,
856        config: StreamConfig,
857    ) -> Result<CreateStreamResult> {
858        let mut streams = self.streams.write().expect("streams lock poisoned");
859
860        // Look up source stream
861        let source_arc = streams
862            .get(source_name)
863            .ok_or_else(|| Error::NotFound(source_name.to_string()))?
864            .clone();
865
866        let source = source_arc.read().expect("stream lock poisoned");
867
868        super::fork::check_fork_source_access(&source.config, source.state, source_name)?;
869
870        // Resolve fork offset (defaults to source tail)
871        let source_next_offset = Offset::new(source.next_read_seq, source.next_byte_offset);
872        let resolved_offset = super::fork::resolve_fork_offset(fork_offset, &source_next_offset)?;
873
874        if !config
875            .content_type
876            .eq_ignore_ascii_case(&source.config.content_type)
877        {
878            return Err(Error::ContentTypeMismatch {
879                expected: source.config.content_type.clone(),
880                actual: config.content_type.clone(),
881            });
882        }
883        let fork_spec = super::fork::build_fork_create_spec(
884            source_name,
885            &source.config,
886            &config,
887            resolved_offset.clone(),
888        );
889
890        drop(source);
891
892        if let Some(existing_arc) = streams.get(name) {
893            let existing = existing_arc.read().expect("stream lock poisoned");
894            match super::fork::evaluate_fork_create(
895                name,
896                &existing.config,
897                existing.fork_info.as_ref(),
898                existing.state,
899                existing.ref_count,
900                &fork_spec,
901            ) {
902                super::fork::ExistingCreateDisposition::RemoveExpired => {
903                    drop(existing);
904                    self.remove_for_recreate(&mut streams, name);
905                }
906                super::fork::ExistingCreateDisposition::AlreadyExists => {
907                    return Ok(CreateStreamResult::AlreadyExists);
908                }
909                super::fork::ExistingCreateDisposition::Conflict(err) => {
910                    return Err(err);
911                }
912            }
913        }
914
915        // Extract fork offset components to initialize the fork entry
916        let (fork_read_seq, fork_byte_offset) =
917            resolved_offset.parse_components().unwrap_or((0, 0));
918
919        let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
920        let entry = StreamEntry {
921            config: fork_spec.config,
922            messages: Vec::with_capacity(INITIAL_MESSAGES_CAPACITY),
923            closed: config.created_closed,
924            next_read_seq: fork_read_seq,
925            next_byte_offset: fork_byte_offset,
926            total_bytes: 0,
927            created_at: Utc::now(),
928            updated_at: None,
929            producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
930            notify,
931            last_seq: None,
932            fork_info: Some(ForkInfo {
933                source_name: fork_spec.source_name,
934                fork_offset: resolved_offset,
935            }),
936            ref_count: 0,
937            state: StreamState::Active,
938        };
939
940        streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
941
942        // Increment source ref_count
943        if let Some(source_arc) = streams.get(source_name) {
944            let mut source = source_arc.write().expect("stream lock poisoned");
945            source.ref_count += 1;
946        }
947
948        Ok(CreateStreamResult::Created)
949    }
950}
951
952// Concurrent producer and Storage trait contract tests live in the
953// integration test suite (storage_backend_contract, concurrent_stress).