1use super::{
2 CreateStreamResult, ForkInfo, Message, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
3 ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata, StreamState,
4};
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use bytes::Bytes;
9use chrono::Utc;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, RwLock};
13use tokio::sync::broadcast;
14
15const INITIAL_MESSAGES_CAPACITY: usize = 256;
16const INITIAL_PRODUCERS_CAPACITY: usize = 8;
17
18struct StreamEntry {
20 config: StreamConfig,
21 messages: Vec<Message>,
22 closed: bool,
23 next_read_seq: u64,
24 next_byte_offset: u64,
25 total_bytes: u64,
26 created_at: chrono::DateTime<Utc>,
27 updated_at: Option<chrono::DateTime<Utc>>,
28 producers: HashMap<String, ProducerState>,
30 notify: broadcast::Sender<()>,
32 last_seq: Option<String>,
34 fork_info: Option<ForkInfo>,
36 ref_count: u32,
38 state: StreamState,
40}
41
42impl StreamEntry {
43 fn new(config: StreamConfig) -> Self {
44 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
47 Self {
48 config,
49 messages: Vec::with_capacity(INITIAL_MESSAGES_CAPACITY),
50 closed: false,
51 next_read_seq: 0,
52 next_byte_offset: 0,
53 total_bytes: 0,
54 created_at: Utc::now(),
55 updated_at: None,
56 producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
57 notify,
58 last_seq: None,
59 fork_info: None,
60 ref_count: 0,
61 state: StreamState::Active,
62 }
63 }
64}
65
66pub struct InMemoryStorage {
79 streams: RwLock<HashMap<String, Arc<RwLock<StreamEntry>>>>,
80 total_bytes: AtomicU64,
81 max_total_bytes: u64,
82 max_stream_bytes: u64,
83}
84
85impl InMemoryStorage {
86 #[must_use]
88 pub fn new(max_total_bytes: u64, max_stream_bytes: u64) -> Self {
89 Self {
90 streams: RwLock::new(HashMap::new()),
91 total_bytes: AtomicU64::new(0),
92 max_total_bytes,
93 max_stream_bytes,
94 }
95 }
96
97 #[must_use]
99 pub fn total_bytes(&self) -> u64 {
100 self.total_bytes.load(Ordering::Acquire)
101 }
102
103 fn saturating_sub_total_bytes(&self, bytes: u64) {
104 self.total_bytes
105 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
106 Some(current.saturating_sub(bytes))
107 })
108 .ok();
109 }
110
111 fn get_stream(&self, name: &str) -> Option<Arc<RwLock<StreamEntry>>> {
112 let streams = self.streams.read().expect("streams lock poisoned");
113 streams.get(name).map(Arc::clone)
114 }
115
116 fn hard_remove_stream(
117 &self,
118 streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
119 name: &str,
120 ) -> Option<ForkInfo> {
121 let stream_arc = streams.remove(name)?;
122 let stream = stream_arc.read().expect("stream lock poisoned");
123 self.saturating_sub_total_bytes(stream.total_bytes);
124 stream.fork_info.clone()
125 }
126
127 fn remove_for_recreate(
128 &self,
129 streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
130 name: &str,
131 ) {
132 if let Some(fork_info) = self.hard_remove_stream(streams, name) {
133 self.cascade_delete(streams, &fork_info.source_name);
134 }
135 }
136
137 #[allow(clippy::unnecessary_wraps)]
139 fn read_local_messages(
140 stream: &StreamEntry,
141 from_offset: &Offset,
142 next_offset: Offset,
143 ) -> Result<ReadResult> {
144 let start_idx = if from_offset.is_start() {
145 0
146 } else {
147 match stream
148 .messages
149 .binary_search_by(|m| m.offset.cmp(from_offset))
150 {
151 Ok(idx) | Err(idx) => idx,
152 }
153 };
154
155 let messages: Vec<Bytes> = stream.messages[start_idx..]
156 .iter()
157 .map(|m| m.data.clone())
158 .collect();
159
160 let at_tail = start_idx + messages.len() >= stream.messages.len();
161
162 Ok(ReadResult {
163 messages,
164 next_offset,
165 at_tail,
166 closed: stream.closed,
167 })
168 }
169
170 fn cascade_delete(
175 &self,
176 streams: &mut HashMap<String, Arc<RwLock<StreamEntry>>>,
177 parent_name: &str,
178 ) {
179 let mut current_parent = parent_name.to_string();
180 loop {
181 let Some(parent_arc) = streams.get(¤t_parent) else {
182 break;
183 };
184 let parent_arc = parent_arc.clone();
185 let mut parent = parent_arc.write().expect("stream lock poisoned");
186 parent.ref_count = parent.ref_count.saturating_sub(1);
187
188 if parent.state == StreamState::Tombstone && parent.ref_count == 0 {
189 let fi = parent.fork_info.clone();
191 self.saturating_sub_total_bytes(parent.total_bytes);
192 drop(parent);
193 streams.remove(¤t_parent);
194
195 if let Some(fi) = fi {
197 current_parent = fi.source_name;
198 } else {
199 break;
200 }
201 } else {
202 break;
203 }
204 }
205 }
206
207 fn read_source_chain(
214 &self,
215 source_name: &str,
216 from_offset: &Offset,
217 up_to: &Offset,
218 ) -> Vec<Bytes> {
219 let streams = self.streams.read().expect("streams lock poisoned");
220
221 let plan = super::fork::build_read_plan(source_name, |n| {
223 streams.get(n).map(|arc| {
224 let s = arc.read().expect("stream lock poisoned");
225 s.fork_info.clone()
226 })
227 });
228
229 let mut all_messages: Vec<Bytes> = Vec::new();
230
231 for (i, segment) in plan.iter().enumerate() {
232 let Some(seg_arc) = streams.get(&segment.name) else {
233 continue;
234 };
235 let seg_stream = seg_arc.read().expect("stream lock poisoned");
236
237 let effective_up_to = if i == plan.len() - 1 {
239 Some(up_to)
241 } else {
242 segment.read_up_to.as_ref()
244 };
245
246 let effective_from = if i == 0 {
248 from_offset
249 } else {
250 &Offset::start()
251 };
252
253 let start_idx = if effective_from.is_start() {
255 0
256 } else {
257 match seg_stream
258 .messages
259 .binary_search_by(|m| m.offset.cmp(effective_from))
260 {
261 Ok(idx) | Err(idx) => idx,
262 }
263 };
264
265 for msg in &seg_stream.messages[start_idx..] {
266 if effective_up_to.is_some_and(|bound| msg.offset >= *bound) {
267 break;
268 }
269 all_messages.push(msg.data.clone());
270 }
271 }
272
273 all_messages
274 }
275
276 fn commit_messages(&self, stream: &mut StreamEntry, messages: Vec<Bytes>) -> Result<()> {
281 if messages.is_empty() {
282 return Ok(());
283 }
284
285 let mut total_batch_bytes = 0u64;
286 let mut message_sizes = Vec::with_capacity(messages.len());
287 for data in &messages {
288 let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
289 message_sizes.push(byte_len);
290 total_batch_bytes += byte_len;
291 }
292
293 if self
295 .total_bytes
296 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
297 current
298 .checked_add(total_batch_bytes)
299 .filter(|next| *next <= self.max_total_bytes)
300 })
301 .is_err()
302 {
303 return Err(Error::MemoryLimitExceeded);
304 }
305 if stream.total_bytes + total_batch_bytes > self.max_stream_bytes {
306 self.saturating_sub_total_bytes(total_batch_bytes);
307 return Err(Error::StreamSizeLimitExceeded);
308 }
309
310 for (data, byte_len) in messages.into_iter().zip(message_sizes) {
311 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
312 stream.next_read_seq += 1;
313 stream.next_byte_offset += byte_len;
314 stream.total_bytes += byte_len;
315 let message = Message::new(offset, data);
316 stream.messages.push(message);
317 }
318
319 let _ = stream.notify.send(());
322
323 Ok(())
324 }
325
326 fn assemble_fork_read(
331 &self,
332 name: &str,
333 from_offset: &Offset,
334 fi: &super::ForkInfo,
335 fork_messages_data: Vec<Bytes>,
336 next_offset: Offset,
337 closed: bool,
338 ) -> Result<ReadResult> {
339 let mut all_messages: Vec<Bytes> = Vec::new();
340 if from_offset.is_start() || *from_offset < fi.fork_offset {
341 let source_messages =
342 self.read_source_chain(&fi.source_name, from_offset, &fi.fork_offset);
343 all_messages.extend(source_messages);
344 }
345
346 if from_offset.is_start() || *from_offset <= fi.fork_offset {
347 all_messages.extend(fork_messages_data);
348 } else {
349 let stream_arc = self
350 .get_stream(name)
351 .ok_or_else(|| Error::NotFound(name.to_string()))?;
352 let stream = stream_arc.read().expect("stream lock poisoned");
353 let start_idx = match stream
354 .messages
355 .binary_search_by(|m| m.offset.cmp(from_offset))
356 {
357 Ok(idx) | Err(idx) => idx,
358 };
359 let msgs: Vec<Bytes> = stream.messages[start_idx..]
360 .iter()
361 .map(|m| m.data.clone())
362 .collect();
363 all_messages.extend(msgs);
364 }
365
366 Ok(ReadResult {
367 messages: all_messages,
368 next_offset,
369 at_tail: true,
370 closed,
371 })
372 }
373}
374
375impl Storage for InMemoryStorage {
376 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
377 let mut streams = self.streams.write().expect("streams lock poisoned");
378
379 if let Some(stream_arc) = streams.get(name) {
380 let stream = stream_arc.read().expect("stream lock poisoned");
381 match super::fork::evaluate_root_create(
382 name,
383 &stream.config,
384 stream.state,
385 stream.ref_count,
386 &config,
387 ) {
388 super::fork::ExistingCreateDisposition::RemoveExpired => {
389 drop(stream);
390 self.remove_for_recreate(&mut streams, name);
391 }
392 super::fork::ExistingCreateDisposition::AlreadyExists => {
393 return Ok(CreateStreamResult::AlreadyExists);
394 }
395 super::fork::ExistingCreateDisposition::Conflict(err) => {
396 return Err(err);
397 }
398 }
399 }
400
401 let entry = StreamEntry::new(config);
402 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
403
404 Ok(CreateStreamResult::Created)
405 }
406
407 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
408 let stream_arc = self
409 .get_stream(name)
410 .ok_or_else(|| Error::NotFound(name.to_string()))?;
411
412 let mut stream = stream_arc.write().expect("stream lock poisoned");
413
414 super::fork::check_stream_access(&stream.config, stream.state, name)?;
415
416 if stream.closed {
417 return Err(Error::StreamClosed);
418 }
419
420 super::validate_content_type(&stream.config.content_type, content_type)?;
421
422 let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
423
424 if self
425 .total_bytes
426 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
427 current
428 .checked_add(byte_len)
429 .filter(|next| *next <= self.max_total_bytes)
430 })
431 .is_err()
432 {
433 return Err(Error::MemoryLimitExceeded);
434 }
435
436 if stream.total_bytes + byte_len > self.max_stream_bytes {
437 self.saturating_sub_total_bytes(byte_len);
438 return Err(Error::StreamSizeLimitExceeded);
439 }
440
441 let offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
442
443 stream.next_read_seq += 1;
444 stream.next_byte_offset += byte_len;
445 stream.total_bytes += byte_len;
446
447 let message = Message::new(offset.clone(), data);
448 stream.messages.push(message);
449 stream.updated_at = Some(Utc::now());
450 super::fork::renew_ttl(&mut stream.config);
451 let _ = stream.notify.send(());
452
453 Ok(offset)
454 }
455
456 fn batch_append(
457 &self,
458 name: &str,
459 messages: Vec<Bytes>,
460 content_type: &str,
461 seq: Option<&str>,
462 ) -> Result<Offset> {
463 if messages.is_empty() {
464 return Err(Error::InvalidHeader {
465 header: "Content-Length".to_string(),
466 reason: "batch cannot be empty".to_string(),
467 });
468 }
469
470 let stream_arc = self
471 .get_stream(name)
472 .ok_or_else(|| Error::NotFound(name.to_string()))?;
473
474 let mut stream = stream_arc.write().expect("stream lock poisoned");
475
476 super::fork::check_stream_access(&stream.config, stream.state, name)?;
477
478 if stream.closed {
479 return Err(Error::StreamClosed);
480 }
481
482 super::validate_content_type(&stream.config.content_type, content_type)?;
483
484 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
485
486 self.commit_messages(&mut stream, messages)?;
487 if let Some(new_seq) = pending_seq {
488 stream.last_seq = Some(new_seq);
489 }
490 stream.updated_at = Some(Utc::now());
491 super::fork::renew_ttl(&mut stream.config);
492
493 Ok(Offset::new(stream.next_read_seq, stream.next_byte_offset))
494 }
495
496 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
497 let stream_arc = self
498 .get_stream(name)
499 .ok_or_else(|| Error::NotFound(name.to_string()))?;
500
501 let needs_ttl_renewal = {
502 let stream = stream_arc.read().expect("stream lock poisoned");
503 super::fork::check_stream_access(&stream.config, stream.state, name)?;
504 stream.config.ttl_seconds.is_some()
505 };
506
507 if !needs_ttl_renewal {
508 let stream = stream_arc.read().expect("stream lock poisoned");
509 let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
510
511 if from_offset.is_now() {
512 return Ok(ReadResult {
513 messages: Vec::new(),
514 next_offset,
515 at_tail: true,
516 closed: stream.closed,
517 });
518 }
519
520 if stream.fork_info.is_none() {
521 return Self::read_local_messages(&stream, from_offset, next_offset);
522 }
523
524 let fi = stream.fork_info.clone().expect("checked above");
525 let closed = stream.closed;
526 let fork_messages_data: Vec<Bytes> =
527 stream.messages.iter().map(|m| m.data.clone()).collect();
528 drop(stream);
529
530 return self.assemble_fork_read(
531 name,
532 from_offset,
533 &fi,
534 fork_messages_data,
535 next_offset,
536 closed,
537 );
538 }
539
540 let mut stream = stream_arc.write().expect("stream lock poisoned");
541 super::fork::check_stream_access(&stream.config, stream.state, name)?;
542
543 let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
544 let result = if from_offset.is_now() {
545 ReadResult {
546 messages: Vec::new(),
547 next_offset,
548 at_tail: true,
549 closed: stream.closed,
550 }
551 } else if stream.fork_info.is_none() {
552 Self::read_local_messages(&stream, from_offset, next_offset)?
553 } else {
554 let fi = stream.fork_info.clone().expect("checked above");
555 let closed = stream.closed;
556 let fork_messages_data: Vec<Bytes> =
557 stream.messages.iter().map(|m| m.data.clone()).collect();
558 drop(stream);
559
560 let result = self.assemble_fork_read(
561 name,
562 from_offset,
563 &fi,
564 fork_messages_data,
565 next_offset,
566 closed,
567 )?;
568
569 stream = stream_arc.write().expect("stream lock poisoned");
570 result
571 };
572
573 super::fork::renew_ttl(&mut stream.config);
574 Ok(result)
575 }
576
577 fn delete(&self, name: &str) -> Result<()> {
578 let mut streams = self.streams.write().expect("streams lock poisoned");
579
580 let stream_arc = streams
581 .get(name)
582 .ok_or_else(|| Error::NotFound(name.to_string()))?
583 .clone();
584
585 {
586 let stream = stream_arc.read().expect("stream lock poisoned");
587
588 match super::fork::evaluate_delete(name, stream.state, stream.ref_count)? {
589 super::fork::DeleteDisposition::Tombstone => {
590 drop(stream);
591 let mut stream_w = stream_arc.write().expect("stream lock poisoned");
592 stream_w.state = StreamState::Tombstone;
593 return Ok(());
594 }
595 super::fork::DeleteDisposition::HardDelete => {}
596 }
597 }
598
599 let fork_info = self.hard_remove_stream(&mut streams, name);
600
601 if let Some(fi) = fork_info {
602 self.cascade_delete(&mut streams, &fi.source_name);
603 }
604
605 Ok(())
606 }
607
608 fn head(&self, name: &str) -> Result<StreamMetadata> {
609 let stream_arc = self
610 .get_stream(name)
611 .ok_or_else(|| Error::NotFound(name.to_string()))?;
612
613 let stream = stream_arc.read().expect("stream lock poisoned");
614
615 super::fork::check_stream_access(&stream.config, stream.state, name)?;
616
617 Ok(StreamMetadata {
618 config: stream.config.clone(),
619 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
620 closed: stream.closed,
621 total_bytes: stream.total_bytes,
622 message_count: u64::try_from(stream.messages.len()).unwrap_or(u64::MAX),
623 created_at: stream.created_at,
624 updated_at: stream.updated_at,
625 })
626 }
627
628 fn close_stream(&self, name: &str) -> Result<()> {
629 let stream_arc = self
630 .get_stream(name)
631 .ok_or_else(|| Error::NotFound(name.to_string()))?;
632
633 let mut stream = stream_arc.write().expect("stream lock poisoned");
634
635 super::fork::check_stream_access(&stream.config, stream.state, name)?;
636
637 stream.closed = true;
638 stream.updated_at = Some(Utc::now());
639 super::fork::renew_ttl(&mut stream.config);
640
641 let _ = stream.notify.send(());
642
643 Ok(())
644 }
645
646 fn append_with_producer(
647 &self,
648 name: &str,
649 messages: Vec<Bytes>,
650 content_type: &str,
651 producer: &ProducerHeaders,
652 should_close: bool,
653 seq: Option<&str>,
654 ) -> Result<ProducerAppendResult> {
655 let stream_arc = self
656 .get_stream(name)
657 .ok_or_else(|| Error::NotFound(name.to_string()))?;
658
659 let mut stream = stream_arc.write().expect("stream lock poisoned");
660
661 super::fork::check_stream_access(&stream.config, stream.state, name)?;
662
663 super::cleanup_stale_producers(&mut stream.producers);
664
665 if !messages.is_empty() {
666 super::validate_content_type(&stream.config.content_type, content_type)?;
667 }
668
669 let now = Utc::now();
670
671 match super::check_producer(stream.producers.get(&producer.id), producer, stream.closed)? {
672 ProducerCheck::Accept => {}
673 ProducerCheck::Duplicate { epoch, seq } => {
674 return Ok(ProducerAppendResult::Duplicate {
675 epoch,
676 seq,
677 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
678 closed: stream.closed,
679 });
680 }
681 }
682
683 let pending_seq = super::validate_seq(stream.last_seq.as_deref(), seq)?;
684
685 self.commit_messages(&mut stream, messages)?;
686 if let Some(new_seq) = pending_seq {
687 stream.last_seq = Some(new_seq);
688 }
689
690 if should_close {
691 stream.closed = true;
692 }
693
694 stream.updated_at = Some(now);
695
696 stream.producers.insert(
697 producer.id.clone(),
698 ProducerState {
699 epoch: producer.epoch,
700 last_seq: producer.seq,
701 updated_at: now,
702 },
703 );
704 super::fork::renew_ttl(&mut stream.config);
705
706 let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
707 let closed = stream.closed;
708
709 Ok(ProducerAppendResult::Accepted {
710 epoch: producer.epoch,
711 seq: producer.seq,
712 next_offset,
713 closed,
714 })
715 }
716
717 fn create_stream_with_data(
718 &self,
719 name: &str,
720 config: StreamConfig,
721 messages: Vec<Bytes>,
722 should_close: bool,
723 ) -> Result<super::CreateWithDataResult> {
724 let mut streams = self.streams.write().expect("streams lock poisoned");
725
726 if let Some(stream_arc) = streams.get(name) {
727 let stream = stream_arc.read().expect("stream lock poisoned");
728 match super::fork::evaluate_root_create(
729 name,
730 &stream.config,
731 stream.state,
732 stream.ref_count,
733 &config,
734 ) {
735 super::fork::ExistingCreateDisposition::RemoveExpired => {
736 drop(stream);
737 self.remove_for_recreate(&mut streams, name);
738 }
739 super::fork::ExistingCreateDisposition::AlreadyExists => {
740 let next_offset = Offset::new(stream.next_read_seq, stream.next_byte_offset);
741 let closed = stream.closed;
742 return Ok(super::CreateWithDataResult {
743 status: CreateStreamResult::AlreadyExists,
744 next_offset,
745 closed,
746 });
747 }
748 super::fork::ExistingCreateDisposition::Conflict(err) => {
749 return Err(err);
750 }
751 }
752 }
753
754 let mut entry = StreamEntry::new(config);
755
756 if !messages.is_empty() {
757 self.commit_messages(&mut entry, messages)?;
758 }
759
760 if should_close {
761 entry.closed = true;
762 }
763
764 let next_offset = Offset::new(entry.next_read_seq, entry.next_byte_offset);
765 let closed = entry.closed;
766
767 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
768
769 Ok(super::CreateWithDataResult {
770 status: CreateStreamResult::Created,
771 next_offset,
772 closed,
773 })
774 }
775
776 fn exists(&self, name: &str) -> bool {
777 let streams = self.streams.read().expect("streams lock poisoned");
778 if let Some(stream_arc) = streams.get(name) {
779 let stream = stream_arc.read().expect("stream lock poisoned");
780 !super::is_stream_expired(&stream.config) && stream.state == StreamState::Active
781 } else {
782 false
783 }
784 }
785
786 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
787 let stream_arc = self.get_stream(name)?;
788 let stream = stream_arc.read().expect("stream lock poisoned");
789
790 if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
791 return None;
792 }
793
794 Some(stream.notify.subscribe())
795 }
796
797 fn cleanup_expired_streams(&self) -> usize {
798 let mut streams = self.streams.write().expect("streams lock poisoned");
799 let mut expired = Vec::new();
800
801 for (name, stream_arc) in streams.iter() {
802 let stream = stream_arc.read().expect("stream lock poisoned");
803 if super::is_stream_expired(&stream.config) {
804 expired.push((name.clone(), stream.ref_count));
805 }
806 }
807
808 let removed_count = expired.len();
809 for (name, ref_count) in expired {
810 match super::fork::evaluate_expired_cleanup(ref_count) {
811 super::fork::DeleteDisposition::Tombstone => {
812 if let Some(stream_arc) = streams.get(&name) {
813 let mut stream = stream_arc.write().expect("stream lock poisoned");
814 stream.state = StreamState::Tombstone;
815 }
816 }
817 super::fork::DeleteDisposition::HardDelete => {
818 self.remove_for_recreate(&mut streams, &name);
819 }
820 }
821 }
822
823 removed_count
824 }
825
826 fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>> {
827 let streams = self.streams.read().expect("streams lock poisoned");
828 let mut result = Vec::new();
829 for (name, stream_arc) in streams.iter() {
830 let stream = stream_arc.read().expect("stream lock poisoned");
831 if super::is_stream_expired(&stream.config) || stream.state == StreamState::Tombstone {
832 continue;
833 }
834 result.push((
835 name.clone(),
836 StreamMetadata {
837 config: stream.config.clone(),
838 next_offset: Offset::new(stream.next_read_seq, stream.next_byte_offset),
839 closed: stream.closed,
840 total_bytes: stream.total_bytes,
841 message_count: u64::try_from(stream.messages.len()).unwrap_or(u64::MAX),
842 created_at: stream.created_at,
843 updated_at: stream.updated_at,
844 },
845 ));
846 }
847 result.sort_by(|a, b| a.0.cmp(&b.0));
848 Ok(result)
849 }
850
851 fn create_fork(
852 &self,
853 name: &str,
854 source_name: &str,
855 fork_offset: Option<&Offset>,
856 config: StreamConfig,
857 ) -> Result<CreateStreamResult> {
858 let mut streams = self.streams.write().expect("streams lock poisoned");
859
860 let source_arc = streams
862 .get(source_name)
863 .ok_or_else(|| Error::NotFound(source_name.to_string()))?
864 .clone();
865
866 let source = source_arc.read().expect("stream lock poisoned");
867
868 super::fork::check_fork_source_access(&source.config, source.state, source_name)?;
869
870 let source_next_offset = Offset::new(source.next_read_seq, source.next_byte_offset);
872 let resolved_offset = super::fork::resolve_fork_offset(fork_offset, &source_next_offset)?;
873
874 if !config
875 .content_type
876 .eq_ignore_ascii_case(&source.config.content_type)
877 {
878 return Err(Error::ContentTypeMismatch {
879 expected: source.config.content_type.clone(),
880 actual: config.content_type.clone(),
881 });
882 }
883 let fork_spec = super::fork::build_fork_create_spec(
884 source_name,
885 &source.config,
886 &config,
887 resolved_offset.clone(),
888 );
889
890 drop(source);
891
892 if let Some(existing_arc) = streams.get(name) {
893 let existing = existing_arc.read().expect("stream lock poisoned");
894 match super::fork::evaluate_fork_create(
895 name,
896 &existing.config,
897 existing.fork_info.as_ref(),
898 existing.state,
899 existing.ref_count,
900 &fork_spec,
901 ) {
902 super::fork::ExistingCreateDisposition::RemoveExpired => {
903 drop(existing);
904 self.remove_for_recreate(&mut streams, name);
905 }
906 super::fork::ExistingCreateDisposition::AlreadyExists => {
907 return Ok(CreateStreamResult::AlreadyExists);
908 }
909 super::fork::ExistingCreateDisposition::Conflict(err) => {
910 return Err(err);
911 }
912 }
913 }
914
915 let (fork_read_seq, fork_byte_offset) =
917 resolved_offset.parse_components().unwrap_or((0, 0));
918
919 let (notify, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
920 let entry = StreamEntry {
921 config: fork_spec.config,
922 messages: Vec::with_capacity(INITIAL_MESSAGES_CAPACITY),
923 closed: config.created_closed,
924 next_read_seq: fork_read_seq,
925 next_byte_offset: fork_byte_offset,
926 total_bytes: 0,
927 created_at: Utc::now(),
928 updated_at: None,
929 producers: HashMap::with_capacity(INITIAL_PRODUCERS_CAPACITY),
930 notify,
931 last_seq: None,
932 fork_info: Some(ForkInfo {
933 source_name: fork_spec.source_name,
934 fork_offset: resolved_offset,
935 }),
936 ref_count: 0,
937 state: StreamState::Active,
938 };
939
940 streams.insert(name.to_string(), Arc::new(RwLock::new(entry)));
941
942 if let Some(source_arc) = streams.get(source_name) {
944 let mut source = source_arc.write().expect("stream lock poisoned");
945 source.ref_count += 1;
946 }
947
948 Ok(CreateStreamResult::Created)
949 }
950}
951
952