Skip to main content

durable_streams_server/storage/acid/
storage_impl.rs

1use super::{
2    AcidStorage, Bytes, ForkInfo, MESSAGES, Offset, ProducerState, Result, STREAMS,
3    StoredStreamMeta, StreamConfig, StreamState,
4};
5use crate::protocol::error::Error;
6use crate::protocol::producer::ProducerHeaders;
7use crate::storage::{
8    CreateStreamResult, CreateWithDataResult, ForkCreateSpec, ProducerAppendResult, ProducerCheck,
9    ReadResult, Storage, StreamMetadata, check_producer, cleanup_stale_producers, fork,
10    is_stream_expired, validate_content_type, validate_seq,
11};
12use chrono::Utc;
13use redb::{ReadableDatabase, ReadableTable};
14use std::collections::HashMap;
15use tokio::sync::broadcast;
16use tracing::warn;
17
18/// Result of checking for an existing fork on a cross-shard database.
19enum CrossShardForkResult {
20    /// Proceed with creation; carries `(removed_expired_bytes, removed_parent)`.
21    Continue(u64, Option<String>),
22    /// The fork already exists; the caller should return `AlreadyExists`.
23    AlreadyExists,
24}
25
26impl Storage for AcidStorage {
27    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
28        let shard_idx = self
29            .find_stream_shard_index(name)?
30            .unwrap_or_else(|| self.shard_index(name));
31        let shard = &self.shards[shard_idx];
32        let txn = Self::begin_write_txn(&shard.db)?;
33        let mut streams = txn
34            .open_table(STREAMS)
35            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
36        let mut messages = txn
37            .open_table(MESSAGES)
38            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
39
40        let mut removed_expired_bytes = 0_u64;
41        let mut removed_expired_parent = None;
42
43        if let Some(existing) = Self::read_stream_meta(&streams, name)? {
44            match fork::evaluate_root_create(
45                name,
46                &existing.config,
47                existing.state,
48                existing.ref_count,
49                &config,
50            ) {
51                fork::ExistingCreateDisposition::RemoveExpired => {
52                    removed_expired_bytes = existing.total_bytes;
53                    removed_expired_parent = existing.fork_info.map(|info| info.source_name);
54                    Self::delete_stream_messages(&mut messages, name)?;
55                    streams
56                        .remove(name)
57                        .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
58                }
59                fork::ExistingCreateDisposition::AlreadyExists => {
60                    return Ok(CreateStreamResult::AlreadyExists);
61                }
62                fork::ExistingCreateDisposition::Conflict(err) => {
63                    return Err(err);
64                }
65            }
66        }
67
68        let meta = Self::new_stream_meta(config);
69        Self::write_stream_meta(&mut streams, name, &meta)?;
70
71        drop(messages);
72        drop(streams);
73        txn.commit()
74            .map_err(|e| Self::storage_err("failed to commit create stream", e))?;
75
76        if removed_expired_bytes > 0 {
77            self.saturating_sub_total_bytes(removed_expired_bytes);
78            self.drop_notifier(name);
79            if let Some(parent) = removed_expired_parent {
80                self.cascade_delete_acid(&parent)?;
81            }
82        }
83
84        Ok(CreateStreamResult::Created)
85    }
86
87    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
88        let message_bytes = u64::try_from(data.len()).unwrap_or(u64::MAX);
89        self.reserve_total_bytes(message_bytes)?;
90
91        let result = (|| {
92            let shard = &self.shards[self.existing_shard_index(name)?];
93            let txn = Self::begin_write_txn(&shard.db)?;
94            let mut streams = txn
95                .open_table(STREAMS)
96                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
97            let mut messages = txn
98                .open_table(MESSAGES)
99                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
100
101            let mut meta = Self::read_stream_meta(&streams, name)?
102                .ok_or_else(|| Error::NotFound(name.to_string()))?;
103
104            fork::check_stream_access(&meta.config, meta.state, name)?;
105
106            if meta.closed {
107                return Err(Error::StreamClosed);
108            }
109
110            validate_content_type(&meta.config.content_type, content_type)?;
111
112            if meta.total_bytes + message_bytes > self.max_stream_bytes {
113                return Err(Error::StreamSizeLimitExceeded);
114            }
115
116            let offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
117            messages
118                .insert(
119                    (name, meta.next_read_seq, meta.next_byte_offset),
120                    data.as_ref(),
121                )
122                .map_err(|e| Self::storage_err("failed to append message", e))?;
123
124            meta.next_read_seq += 1;
125            meta.next_byte_offset += message_bytes;
126            meta.total_bytes += message_bytes;
127            meta.updated_at = Some(Utc::now());
128            fork::renew_ttl(&mut meta.config);
129
130            Self::write_stream_meta(&mut streams, name, &meta)?;
131
132            drop(messages);
133            drop(streams);
134            txn.commit()
135                .map_err(|e| Self::storage_err("failed to commit append", e))?;
136
137            Ok(offset)
138        })();
139
140        if result.is_err() {
141            self.rollback_total_bytes(message_bytes);
142            return result;
143        }
144
145        self.notify_stream(name);
146        result
147    }
148
149    fn batch_append(
150        &self,
151        name: &str,
152        messages: Vec<Bytes>,
153        content_type: &str,
154        seq: Option<&str>,
155    ) -> Result<Offset> {
156        if messages.is_empty() {
157            return Err(Error::InvalidHeader {
158                header: "Content-Length".to_string(),
159                reason: "batch cannot be empty".to_string(),
160            });
161        }
162
163        let batch_bytes = Self::batch_bytes(&messages);
164        self.reserve_total_bytes(batch_bytes)?;
165
166        let result = (|| {
167            let shard = &self.shards[self.existing_shard_index(name)?];
168            let txn = Self::begin_write_txn(&shard.db)?;
169            let mut streams = txn
170                .open_table(STREAMS)
171                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
172            let mut message_table = txn
173                .open_table(MESSAGES)
174                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
175
176            let mut meta = Self::read_stream_meta(&streams, name)?
177                .ok_or_else(|| Error::NotFound(name.to_string()))?;
178
179            fork::check_stream_access(&meta.config, meta.state, name)?;
180
181            if meta.closed {
182                return Err(Error::StreamClosed);
183            }
184
185            validate_content_type(&meta.config.content_type, content_type)?;
186            let pending_seq = validate_seq(meta.last_seq.as_deref(), seq)?;
187
188            if meta.total_bytes + batch_bytes > self.max_stream_bytes {
189                return Err(Error::StreamSizeLimitExceeded);
190            }
191
192            for data in &messages {
193                let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
194                message_table
195                    .insert(
196                        (name, meta.next_read_seq, meta.next_byte_offset),
197                        data.as_ref(),
198                    )
199                    .map_err(|e| Self::storage_err("failed to append batch message", e))?;
200                meta.next_read_seq += 1;
201                meta.next_byte_offset += len;
202                meta.total_bytes += len;
203            }
204
205            if let Some(new_seq) = pending_seq {
206                meta.last_seq = Some(new_seq);
207            }
208            meta.updated_at = Some(Utc::now());
209            fork::renew_ttl(&mut meta.config);
210
211            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
212            Self::write_stream_meta(&mut streams, name, &meta)?;
213
214            drop(message_table);
215            drop(streams);
216            txn.commit()
217                .map_err(|e| Self::storage_err("failed to commit batch append", e))?;
218
219            Ok(next_offset)
220        })();
221
222        if result.is_err() {
223            self.rollback_total_bytes(batch_bytes);
224            return result;
225        }
226
227        self.notify_stream(name);
228        result
229    }
230
231    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
232        let shard_idx = self.existing_shard_index(name)?;
233        let needs_ttl_renewal = {
234            let shard = &self.shards[shard_idx];
235            let txn = shard
236                .db
237                .begin_read()
238                .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
239            let streams = txn
240                .open_table(STREAMS)
241                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
242            let meta = Self::read_stream_meta(&streams, name)?
243                .ok_or_else(|| Error::NotFound(name.to_string()))?;
244            fork::check_stream_access(&meta.config, meta.state, name)?;
245            meta.config.ttl_seconds.is_some()
246        };
247
248        if !needs_ttl_renewal {
249            return self.read_without_ttl_renewal(name, from_offset, shard_idx);
250        }
251
252        self.read_with_ttl_renewal(name, from_offset, shard_idx)
253    }
254
255    fn delete(&self, name: &str) -> Result<()> {
256        let shard = &self.shards[self.existing_shard_index(name)?];
257        let txn = Self::begin_write_txn(&shard.db)?;
258        let mut streams = txn
259            .open_table(STREAMS)
260            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
261
262        let meta = Self::read_stream_meta(&streams, name)?
263            .ok_or_else(|| Error::NotFound(name.to_string()))?;
264
265        match fork::evaluate_delete(name, meta.state, meta.ref_count)? {
266            fork::DeleteDisposition::Tombstone => {
267                let mut updated = meta;
268                updated.state = StreamState::Tombstone;
269                Self::write_stream_meta(&mut streams, name, &updated)?;
270                drop(streams);
271                txn.commit()
272                    .map_err(|e| Self::storage_err("failed to commit soft delete", e))?;
273                return Ok(());
274            }
275            fork::DeleteDisposition::HardDelete => {}
276        }
277
278        let mut messages = txn
279            .open_table(MESSAGES)
280            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
281        Self::delete_stream_messages(&mut messages, name)?;
282        drop(messages);
283
284        streams
285            .remove(name)
286            .map_err(|e| Self::storage_err("failed to remove stream metadata", e))?;
287
288        let fork_info = meta.fork_info.clone();
289        let total_bytes = meta.total_bytes;
290
291        drop(streams);
292        txn.commit()
293            .map_err(|e| Self::storage_err("failed to commit delete", e))?;
294
295        self.saturating_sub_total_bytes(total_bytes);
296        self.drop_notifier(name);
297
298        if let Some(fi) = fork_info {
299            self.cascade_delete_acid(&fi.source_name)?;
300        }
301
302        Ok(())
303    }
304
305    fn head(&self, name: &str) -> Result<StreamMetadata> {
306        let shard = &self.shards[self.existing_shard_index(name)?];
307        let txn = shard
308            .db
309            .begin_read()
310            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
311
312        let streams = txn
313            .open_table(STREAMS)
314            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
315
316        let meta = Self::read_stream_meta(&streams, name)?
317            .ok_or_else(|| Error::NotFound(name.to_string()))?;
318
319        fork::check_stream_access(&meta.config, meta.state, name)?;
320
321        Ok(StreamMetadata {
322            config: meta.config,
323            next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
324            closed: meta.closed,
325            total_bytes: meta.total_bytes,
326            message_count: meta.next_read_seq,
327            created_at: meta.created_at,
328            updated_at: meta.updated_at,
329        })
330    }
331
332    fn close_stream(&self, name: &str) -> Result<()> {
333        let shard = &self.shards[self.existing_shard_index(name)?];
334        let txn = Self::begin_write_txn(&shard.db)?;
335        let mut streams = txn
336            .open_table(STREAMS)
337            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
338
339        let mut meta = Self::read_stream_meta(&streams, name)?
340            .ok_or_else(|| Error::NotFound(name.to_string()))?;
341
342        fork::check_stream_access(&meta.config, meta.state, name)?;
343
344        meta.closed = true;
345        meta.updated_at = Some(Utc::now());
346        fork::renew_ttl(&mut meta.config);
347        Self::write_stream_meta(&mut streams, name, &meta)?;
348
349        drop(streams);
350        txn.commit()
351            .map_err(|e| Self::storage_err("failed to commit close stream", e))?;
352
353        self.notify_stream(name);
354        Ok(())
355    }
356
357    fn append_with_producer(
358        &self,
359        name: &str,
360        messages: Vec<Bytes>,
361        content_type: &str,
362        producer: &ProducerHeaders,
363        should_close: bool,
364        seq: Option<&str>,
365    ) -> Result<ProducerAppendResult> {
366        let batch_bytes = Self::batch_bytes(&messages);
367        self.reserve_total_bytes(batch_bytes)?;
368
369        let result = (|| {
370            let shard = &self.shards[self.existing_shard_index(name)?];
371            let txn = Self::begin_write_txn(&shard.db)?;
372            let mut streams = txn
373                .open_table(STREAMS)
374                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
375            let mut message_table = txn
376                .open_table(MESSAGES)
377                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
378
379            let mut meta = Self::read_stream_meta(&streams, name)?
380                .ok_or_else(|| Error::NotFound(name.to_string()))?;
381
382            fork::check_stream_access(&meta.config, meta.state, name)?;
383
384            cleanup_stale_producers(&mut meta.producers);
385
386            if !messages.is_empty() {
387                validate_content_type(&meta.config.content_type, content_type)?;
388            }
389
390            match check_producer(
391                meta.producers.get(producer.id.as_str()),
392                producer,
393                meta.closed,
394            )? {
395                ProducerCheck::Accept => {}
396                ProducerCheck::Duplicate { epoch, seq } => {
397                    return Ok(ProducerAppendResult::Duplicate {
398                        epoch,
399                        seq,
400                        next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
401                        closed: meta.closed,
402                    });
403                }
404            }
405
406            let pending_seq = validate_seq(meta.last_seq.as_deref(), seq)?;
407
408            if meta.total_bytes + batch_bytes > self.max_stream_bytes {
409                return Err(Error::StreamSizeLimitExceeded);
410            }
411
412            for data in &messages {
413                let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
414                message_table
415                    .insert(
416                        (name, meta.next_read_seq, meta.next_byte_offset),
417                        data.as_ref(),
418                    )
419                    .map_err(|e| Self::storage_err("failed to append producer message", e))?;
420                meta.next_read_seq += 1;
421                meta.next_byte_offset += len;
422                meta.total_bytes += len;
423            }
424
425            if let Some(new_seq) = pending_seq {
426                meta.last_seq = Some(new_seq);
427            }
428            if should_close {
429                meta.closed = true;
430            }
431
432            let now = Utc::now();
433            meta.producers.insert(
434                producer.id.clone(),
435                ProducerState {
436                    epoch: producer.epoch,
437                    last_seq: producer.seq,
438                    updated_at: now,
439                },
440            );
441            meta.updated_at = Some(now);
442            fork::renew_ttl(&mut meta.config);
443
444            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
445            let closed = meta.closed;
446
447            Self::write_stream_meta(&mut streams, name, &meta)?;
448            drop(message_table);
449            drop(streams);
450            txn.commit()
451                .map_err(|e| Self::storage_err("failed to commit producer append", e))?;
452
453            Ok(ProducerAppendResult::Accepted {
454                epoch: producer.epoch,
455                seq: producer.seq,
456                next_offset,
457                closed,
458            })
459        })();
460
461        if result.is_err() || matches!(result, Ok(ProducerAppendResult::Duplicate { .. })) {
462            self.rollback_total_bytes(batch_bytes);
463        }
464
465        if result.is_ok() && (!messages.is_empty() || should_close) {
466            self.notify_stream(name);
467        }
468
469        result
470    }
471
472    fn create_stream_with_data(
473        &self,
474        name: &str,
475        config: StreamConfig,
476        messages: Vec<Bytes>,
477        should_close: bool,
478    ) -> Result<CreateWithDataResult> {
479        let batch_bytes = Self::batch_bytes(&messages);
480
481        let mut reserved = false;
482        let mut removed_expired_bytes = 0_u64;
483        let mut removed_expired_parent = None;
484
485        let result = (|| {
486            let shard_idx = self
487                .find_stream_shard_index(name)?
488                .unwrap_or_else(|| self.shard_index(name));
489            let shard = &self.shards[shard_idx];
490            let txn = Self::begin_write_txn(&shard.db)?;
491            let mut streams = txn
492                .open_table(STREAMS)
493                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
494            let mut message_table = txn
495                .open_table(MESSAGES)
496                .map_err(|e| Self::storage_err("failed to open messages table", e))?;
497
498            if let Some(existing) = Self::read_stream_meta(&streams, name)? {
499                match fork::evaluate_root_create(
500                    name,
501                    &existing.config,
502                    existing.state,
503                    existing.ref_count,
504                    &config,
505                ) {
506                    fork::ExistingCreateDisposition::RemoveExpired => {
507                        removed_expired_bytes = existing.total_bytes;
508                        removed_expired_parent =
509                            existing.fork_info.clone().map(|info| info.source_name);
510                        Self::delete_stream_messages(&mut message_table, name)?;
511                        streams
512                            .remove(name)
513                            .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
514                    }
515                    fork::ExistingCreateDisposition::AlreadyExists => {
516                        return Ok(CreateWithDataResult {
517                            status: CreateStreamResult::AlreadyExists,
518                            next_offset: Offset::new(
519                                existing.next_read_seq,
520                                existing.next_byte_offset,
521                            ),
522                            closed: existing.closed,
523                        });
524                    }
525                    fork::ExistingCreateDisposition::Conflict(err) => {
526                        return Err(err);
527                    }
528                }
529            }
530
531            if batch_bytes > 0 {
532                self.reserve_total_bytes(batch_bytes)?;
533                reserved = true;
534            }
535
536            let mut meta = Self::new_stream_meta(config);
537            Self::write_initial_messages(
538                name,
539                &messages,
540                batch_bytes,
541                self.max_stream_bytes,
542                &mut meta,
543                &mut message_table,
544            )?;
545
546            if should_close {
547                meta.closed = true;
548            }
549
550            let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
551            let closed = meta.closed;
552
553            Self::write_stream_meta(&mut streams, name, &meta)?;
554            drop(message_table);
555            drop(streams);
556            txn.commit()
557                .map_err(|e| Self::storage_err("failed to commit create stream with data", e))?;
558
559            Ok(CreateWithDataResult {
560                status: CreateStreamResult::Created,
561                next_offset,
562                closed,
563            })
564        })();
565
566        if result.is_err() && reserved {
567            self.rollback_total_bytes(batch_bytes);
568        }
569
570        if result.is_ok() {
571            if removed_expired_bytes > 0 {
572                self.saturating_sub_total_bytes(removed_expired_bytes);
573                self.drop_notifier(name);
574                if let Some(parent) = removed_expired_parent {
575                    self.cascade_delete_acid(&parent)?;
576                }
577            }
578            if should_close || !messages.is_empty() {
579                self.notify_stream(name);
580            }
581        }
582
583        result
584    }
585
586    fn exists(&self, name: &str) -> bool {
587        let Ok(Some(shard_idx)) = self.find_stream_shard_index(name) else {
588            return false;
589        };
590        let shard = &self.shards[shard_idx];
591        let Ok(txn) = shard.db.begin_read() else {
592            return false;
593        };
594        let Ok(streams) = txn.open_table(STREAMS) else {
595            return false;
596        };
597
598        match Self::read_stream_meta(&streams, name) {
599            Ok(Some(meta)) => !is_stream_expired(&meta.config) && meta.state == StreamState::Active,
600            _ => false,
601        }
602    }
603
604    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
605        let shard_idx = self.find_stream_shard_index(name).ok().flatten()?;
606        let shard = &self.shards[shard_idx];
607        let txn = shard.db.begin_read().ok()?;
608        let streams = txn.open_table(STREAMS).ok()?;
609        let meta = Self::read_stream_meta(&streams, name).ok()??;
610
611        if is_stream_expired(&meta.config) || meta.state == StreamState::Tombstone {
612            return None;
613        }
614
615        Some(self.notifier_sender(name).subscribe())
616    }
617
618    fn cleanup_expired_streams(&self) -> usize {
619        let mut total_removed = 0;
620
621        for shard in &self.shards {
622            let Ok(read_txn) = shard.db.begin_read() else {
623                continue;
624            };
625            let Ok(streams_table) = read_txn.open_table(STREAMS) else {
626                continue;
627            };
628            let Ok(iter) = streams_table.iter() else {
629                continue;
630            };
631
632            let mut candidates: Vec<String> = Vec::new();
633            for item in iter {
634                let Ok((key, value)) = item else {
635                    continue;
636                };
637                let name = key.value().to_string();
638                let Ok(meta) = serde_json::from_slice::<StoredStreamMeta>(value.value()) else {
639                    continue;
640                };
641                if is_stream_expired(&meta.config) {
642                    candidates.push(name);
643                }
644            }
645
646            drop(streams_table);
647            drop(read_txn);
648
649            if candidates.is_empty() {
650                continue;
651            }
652
653            let Ok(txn) = Self::begin_write_txn(&shard.db) else {
654                continue;
655            };
656            let Ok(mut streams) = txn.open_table(STREAMS) else {
657                continue;
658            };
659            let Ok(mut messages) = txn.open_table(MESSAGES) else {
660                continue;
661            };
662
663            let mut committed = Vec::new();
664            for name in &candidates {
665                let meta = streams
666                    .get(name.as_str())
667                    .ok()
668                    .flatten()
669                    .and_then(|v| serde_json::from_slice::<StoredStreamMeta>(v.value()).ok());
670                let Some(meta) = meta else { continue };
671                if !is_stream_expired(&meta.config) {
672                    continue;
673                }
674
675                match fork::evaluate_expired_cleanup(meta.ref_count) {
676                    fork::DeleteDisposition::Tombstone => {
677                        let mut updated = meta.clone();
678                        updated.state = StreamState::Tombstone;
679                        let payload = serde_json::to_vec(&updated).ok();
680                        if let Some(payload) = payload {
681                            let _ = streams.insert(name.as_str(), payload.as_slice());
682                        }
683                        committed.push((name.clone(), 0, None));
684                    }
685                    fork::DeleteDisposition::HardDelete => {
686                        let _ = Self::delete_stream_messages(&mut messages, name);
687                        let _ = streams.remove(name.as_str());
688                        committed.push((
689                            name.clone(),
690                            meta.total_bytes,
691                            meta.fork_info.map(|info| info.source_name),
692                        ));
693                    }
694                }
695            }
696
697            drop(messages);
698            drop(streams);
699
700            match txn.commit() {
701                Ok(()) => {
702                    let committed_len = committed.len();
703                    for (name, bytes, parent) in committed {
704                        self.rollback_total_bytes(bytes);
705                        self.drop_notifier(&name);
706                        if let Some(parent) = parent {
707                            let _ = self.cascade_delete_acid(&parent);
708                        }
709                    }
710                    total_removed += committed_len;
711                }
712                Err(e) => {
713                    warn!(%e, "failed to commit expired stream cleanup");
714                }
715            }
716        }
717
718        total_removed
719    }
720
721    fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>> {
722        let mut result = Vec::new();
723
724        for shard in &self.shards {
725            let read_txn = shard
726                .db
727                .begin_read()
728                .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
729            let streams_table = read_txn
730                .open_table(STREAMS)
731                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
732            let iter = streams_table
733                .iter()
734                .map_err(|e| Self::storage_err("failed to iterate streams", e))?;
735
736            for item in iter {
737                let (key, value) =
738                    item.map_err(|e| Self::storage_err("failed to read stream entry", e))?;
739                let name = key.value().to_string();
740                let meta: StoredStreamMeta = serde_json::from_slice(value.value())
741                    .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
742
743                if is_stream_expired(&meta.config) || meta.state == StreamState::Tombstone {
744                    continue;
745                }
746
747                result.push((
748                    name,
749                    StreamMetadata {
750                        config: meta.config,
751                        next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
752                        closed: meta.closed,
753                        total_bytes: meta.total_bytes,
754                        message_count: meta.next_read_seq,
755                        created_at: meta.created_at,
756                        updated_at: meta.updated_at,
757                    },
758                ));
759            }
760        }
761
762        result.sort_by(|a, b| a.0.cmp(&b.0));
763        Ok(result)
764    }
765
766    fn create_fork(
767        &self,
768        name: &str,
769        source_name: &str,
770        fork_offset: Option<&Offset>,
771        config: StreamConfig,
772    ) -> Result<CreateStreamResult> {
773        let source_shard_idx = self.existing_shard_index(source_name)?;
774        let source_shard = &self.shards[source_shard_idx];
775        let source_read_txn = source_shard
776            .db
777            .begin_read()
778            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
779        let source_read_streams = source_read_txn
780            .open_table(STREAMS)
781            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
782
783        let source_meta = Self::read_stream_meta(&source_read_streams, source_name)?
784            .ok_or_else(|| Error::NotFound(source_name.to_string()))?;
785        fork::check_fork_source_access(&source_meta.config, source_meta.state, source_name)?;
786
787        let source_next_offset =
788            Offset::new(source_meta.next_read_seq, source_meta.next_byte_offset);
789        let resolved_offset = fork::resolve_fork_offset(fork_offset, &source_next_offset)?;
790
791        if !config
792            .content_type
793            .eq_ignore_ascii_case(&source_meta.config.content_type)
794        {
795            return Err(Error::ContentTypeMismatch {
796                expected: source_meta.config.content_type.clone(),
797                actual: config.content_type.clone(),
798            });
799        }
800
801        let fork_spec = fork::build_fork_create_spec(
802            source_name,
803            &source_meta.config,
804            &config,
805            resolved_offset.clone(),
806        );
807
808        let (mut removed_expired_bytes, mut removed_expired_parent) =
809            match self.remove_cross_shard_existing_fork(name, source_shard_idx, &fork_spec)? {
810                CrossShardForkResult::Continue(bytes, parent) => (bytes, parent),
811                CrossShardForkResult::AlreadyExists => {
812                    return Ok(CreateStreamResult::AlreadyExists);
813                }
814            };
815
816        let shard = &self.shards[source_shard_idx];
817        let txn = Self::begin_write_txn(&shard.db)?;
818        let mut streams = txn
819            .open_table(STREAMS)
820            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
821        let mut messages = txn
822            .open_table(MESSAGES)
823            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
824
825        let mut source_meta = Self::read_stream_meta(&streams, source_name)?
826            .ok_or_else(|| Error::NotFound(source_name.to_string()))?;
827
828        if let Some(existing) = Self::read_stream_meta(&streams, name)? {
829            match fork::evaluate_fork_create(
830                name,
831                &existing.config,
832                existing.fork_info.as_ref(),
833                existing.state,
834                existing.ref_count,
835                &fork_spec,
836            ) {
837                fork::ExistingCreateDisposition::RemoveExpired => {
838                    removed_expired_bytes = existing.total_bytes;
839                    removed_expired_parent =
840                        existing.fork_info.clone().map(|info| info.source_name);
841                    Self::delete_stream_messages(&mut messages, name)?;
842                    streams
843                        .remove(name)
844                        .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
845                }
846                fork::ExistingCreateDisposition::AlreadyExists => {
847                    return Ok(CreateStreamResult::AlreadyExists);
848                }
849                fork::ExistingCreateDisposition::Conflict(err) => {
850                    return Err(err);
851                }
852            }
853        }
854
855        let fork_meta = Self::build_fork_stored_meta(&fork_spec, &config, &resolved_offset);
856        Self::write_stream_meta(&mut streams, name, &fork_meta)?;
857        source_meta.ref_count += 1;
858        Self::write_stream_meta(&mut streams, source_name, &source_meta)?;
859        drop(messages);
860        drop(streams);
861        txn.commit()
862            .map_err(|e| Self::storage_err("failed to commit create fork", e))?;
863
864        self.cleanup_expired_and_notify(name, removed_expired_bytes, removed_expired_parent)?;
865
866        Ok(CreateStreamResult::Created)
867    }
868}
869
870/// Private helpers extracted from long `Storage` trait methods.
871impl AcidStorage {
872    /// Read path when the stream has no TTL (read-only transaction).
873    fn read_without_ttl_renewal(
874        &self,
875        name: &str,
876        from_offset: &Offset,
877        shard_idx: usize,
878    ) -> Result<ReadResult> {
879        let shard = &self.shards[shard_idx];
880        let txn = shard
881            .db
882            .begin_read()
883            .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
884
885        let streams = txn
886            .open_table(STREAMS)
887            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
888
889        let meta = Self::read_stream_meta(&streams, name)?
890            .ok_or_else(|| Error::NotFound(name.to_string()))?;
891
892        let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
893
894        if from_offset.is_now() {
895            return Ok(ReadResult {
896                messages: Vec::new(),
897                next_offset,
898                at_tail: true,
899                closed: meta.closed,
900            });
901        }
902
903        if meta.fork_info.is_none() {
904            drop(streams);
905            drop(txn);
906            let messages = self.read_non_forked_table_messages(name, from_offset, shard_idx)?;
907
908            return Ok(ReadResult {
909                messages,
910                next_offset,
911                at_tail: true,
912                closed: meta.closed,
913            });
914        }
915
916        let fi = meta.fork_info.clone().expect("checked above");
917        let closed = meta.closed;
918        drop(streams);
919        drop(txn);
920
921        let all_messages = self.collect_fork_chain_messages(name, from_offset, &fi)?;
922
923        Ok(ReadResult {
924            messages: all_messages,
925            next_offset,
926            at_tail: true,
927            closed,
928        })
929    }
930
931    /// Read path when the stream has a TTL that needs renewal (write transaction).
932    fn read_with_ttl_renewal(
933        &self,
934        name: &str,
935        from_offset: &Offset,
936        shard_idx: usize,
937    ) -> Result<ReadResult> {
938        let shard = &self.shards[shard_idx];
939        let txn = Self::begin_write_txn(&shard.db)?;
940        let mut streams = txn
941            .open_table(STREAMS)
942            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
943        let mut meta = Self::read_stream_meta(&streams, name)?
944            .ok_or_else(|| Error::NotFound(name.to_string()))?;
945        fork::check_stream_access(&meta.config, meta.state, name)?;
946
947        let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
948        let result = if from_offset.is_now() {
949            ReadResult {
950                messages: Vec::new(),
951                next_offset,
952                at_tail: true,
953                closed: meta.closed,
954            }
955        } else if meta.fork_info.is_none() {
956            let messages = self.read_non_forked_table_messages(name, from_offset, shard_idx)?;
957
958            ReadResult {
959                messages,
960                next_offset,
961                at_tail: true,
962                closed: meta.closed,
963            }
964        } else {
965            let fi = meta.fork_info.clone().expect("checked above");
966            let closed = meta.closed;
967            drop(streams);
968            drop(txn);
969
970            let all_messages = self.collect_fork_chain_messages(name, from_offset, &fi)?;
971
972            let shard = &self.shards[shard_idx];
973            let txn = Self::begin_write_txn(&shard.db)?;
974            let mut streams = txn
975                .open_table(STREAMS)
976                .map_err(|e| Self::storage_err("failed to open streams table", e))?;
977            let mut meta = Self::read_stream_meta(&streams, name)?
978                .ok_or_else(|| Error::NotFound(name.to_string()))?;
979            fork::renew_ttl(&mut meta.config);
980            Self::write_stream_meta(&mut streams, name, &meta)?;
981            drop(streams);
982            txn.commit()
983                .map_err(|e| Self::storage_err("failed to commit ttl renewal", e))?;
984
985            return Ok(ReadResult {
986                messages: all_messages,
987                next_offset,
988                at_tail: true,
989                closed,
990            });
991        };
992
993        fork::renew_ttl(&mut meta.config);
994        Self::write_stream_meta(&mut streams, name, &meta)?;
995        drop(streams);
996        txn.commit()
997            .map_err(|e| Self::storage_err("failed to commit ttl renewal", e))?;
998
999        Ok(result)
1000    }
1001
1002    /// Write initial messages into the MESSAGES table during stream creation.
1003    fn write_initial_messages(
1004        name: &str,
1005        messages: &[Bytes],
1006        batch_bytes: u64,
1007        max_stream_bytes: u64,
1008        meta: &mut StoredStreamMeta,
1009        message_table: &mut redb::Table<'_, (&str, u64, u64), &[u8]>,
1010    ) -> Result<()> {
1011        if batch_bytes == 0 {
1012            return Ok(());
1013        }
1014        if meta.total_bytes + batch_bytes > max_stream_bytes {
1015            return Err(Error::StreamSizeLimitExceeded);
1016        }
1017        for data in messages {
1018            let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
1019            message_table
1020                .insert(
1021                    (name, meta.next_read_seq, meta.next_byte_offset),
1022                    data.as_ref(),
1023                )
1024                .map_err(|e| Self::storage_err("failed to append create-with-data message", e))?;
1025            meta.next_read_seq += 1;
1026            meta.next_byte_offset += len;
1027            meta.total_bytes += len;
1028        }
1029        Ok(())
1030    }
1031
1032    /// Check and remove an existing fork that lives on a different shard than
1033    /// the source stream. Returns removed bytes/parent info, or
1034    /// `Ok(Some(result))` to signal the caller should return early.
1035    #[allow(clippy::type_complexity)]
1036    fn remove_cross_shard_existing_fork(
1037        &self,
1038        name: &str,
1039        source_shard_idx: usize,
1040        fork_spec: &ForkCreateSpec,
1041    ) -> Result<CrossShardForkResult> {
1042        let Some(existing_shard_idx) = self.find_stream_shard_index(name)? else {
1043            return Ok(CrossShardForkResult::Continue(0, None));
1044        };
1045        if existing_shard_idx == source_shard_idx {
1046            return Ok(CrossShardForkResult::Continue(0, None));
1047        }
1048
1049        let existing_shard = &self.shards[existing_shard_idx];
1050        let existing_txn = Self::begin_write_txn(&existing_shard.db)?;
1051        let mut existing_streams = existing_txn
1052            .open_table(STREAMS)
1053            .map_err(|e| Self::storage_err("failed to open streams table", e))?;
1054        let mut existing_messages = existing_txn
1055            .open_table(MESSAGES)
1056            .map_err(|e| Self::storage_err("failed to open messages table", e))?;
1057
1058        let Some(existing) = Self::read_stream_meta(&existing_streams, name)? else {
1059            return Ok(CrossShardForkResult::Continue(0, None));
1060        };
1061
1062        match fork::evaluate_fork_create(
1063            name,
1064            &existing.config,
1065            existing.fork_info.as_ref(),
1066            existing.state,
1067            existing.ref_count,
1068            fork_spec,
1069        ) {
1070            fork::ExistingCreateDisposition::RemoveExpired => {
1071                let removed_bytes = existing.total_bytes;
1072                let removed_parent = existing.fork_info.clone().map(|info| info.source_name);
1073                Self::delete_stream_messages(&mut existing_messages, name)?;
1074                existing_streams
1075                    .remove(name)
1076                    .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
1077                drop(existing_messages);
1078                drop(existing_streams);
1079                existing_txn.commit().map_err(|e| {
1080                    Self::storage_err("failed to commit expired cross-shard fork removal", e)
1081                })?;
1082                Ok(CrossShardForkResult::Continue(
1083                    removed_bytes,
1084                    removed_parent,
1085                ))
1086            }
1087            fork::ExistingCreateDisposition::AlreadyExists => {
1088                Ok(CrossShardForkResult::AlreadyExists)
1089            }
1090            fork::ExistingCreateDisposition::Conflict(err) => Err(err),
1091        }
1092    }
1093
1094    /// Construct the `StoredStreamMeta` for a new fork stream.
1095    fn build_fork_stored_meta(
1096        fork_spec: &ForkCreateSpec,
1097        config: &StreamConfig,
1098        resolved_offset: &Offset,
1099    ) -> StoredStreamMeta {
1100        let (fork_read_seq, fork_byte_offset) =
1101            resolved_offset.parse_components().unwrap_or((0, 0));
1102        StoredStreamMeta {
1103            config: fork_spec.config.clone(),
1104            closed: config.created_closed,
1105            next_read_seq: fork_read_seq,
1106            next_byte_offset: fork_byte_offset,
1107            total_bytes: 0,
1108            created_at: Utc::now(),
1109            updated_at: None,
1110            last_seq: None,
1111            producers: HashMap::new(),
1112            fork_info: Some(ForkInfo {
1113                source_name: fork_spec.source_name.clone(),
1114                fork_offset: resolved_offset.clone(),
1115            }),
1116            ref_count: 0,
1117            state: StreamState::Active,
1118        }
1119    }
1120
1121    /// Clean up expired stream bytes and notify after a successful create/fork.
1122    fn cleanup_expired_and_notify(
1123        &self,
1124        name: &str,
1125        removed_expired_bytes: u64,
1126        removed_expired_parent: Option<String>,
1127    ) -> Result<()> {
1128        if removed_expired_bytes > 0 {
1129            self.saturating_sub_total_bytes(removed_expired_bytes);
1130            self.drop_notifier(name);
1131            if let Some(parent) = removed_expired_parent {
1132                self.cascade_delete_acid(&parent)?;
1133            }
1134        }
1135        self.notifier_sender(name);
1136        Ok(())
1137    }
1138}