1use super::{
2 AcidStorage, Bytes, ForkInfo, MESSAGES, Offset, ProducerState, Result, STREAMS,
3 StoredStreamMeta, StreamConfig, StreamState,
4};
5use crate::protocol::error::Error;
6use crate::protocol::producer::ProducerHeaders;
7use crate::storage::{
8 CreateStreamResult, CreateWithDataResult, ForkCreateSpec, ProducerAppendResult, ProducerCheck,
9 ReadResult, Storage, StreamMetadata, check_producer, cleanup_stale_producers, fork,
10 is_stream_expired, validate_content_type, validate_seq,
11};
12use chrono::Utc;
13use redb::{ReadableDatabase, ReadableTable};
14use std::collections::HashMap;
15use tokio::sync::broadcast;
16use tracing::warn;
17
18enum CrossShardForkResult {
20 Continue(u64, Option<String>),
22 AlreadyExists,
24}
25
26impl Storage for AcidStorage {
27 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
28 let shard_idx = self
29 .find_stream_shard_index(name)?
30 .unwrap_or_else(|| self.shard_index(name));
31 let shard = &self.shards[shard_idx];
32 let txn = Self::begin_write_txn(&shard.db)?;
33 let mut streams = txn
34 .open_table(STREAMS)
35 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
36 let mut messages = txn
37 .open_table(MESSAGES)
38 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
39
40 let mut removed_expired_bytes = 0_u64;
41 let mut removed_expired_parent = None;
42
43 if let Some(existing) = Self::read_stream_meta(&streams, name)? {
44 match fork::evaluate_root_create(
45 name,
46 &existing.config,
47 existing.state,
48 existing.ref_count,
49 &config,
50 ) {
51 fork::ExistingCreateDisposition::RemoveExpired => {
52 removed_expired_bytes = existing.total_bytes;
53 removed_expired_parent = existing.fork_info.map(|info| info.source_name);
54 Self::delete_stream_messages(&mut messages, name)?;
55 streams
56 .remove(name)
57 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
58 }
59 fork::ExistingCreateDisposition::AlreadyExists => {
60 return Ok(CreateStreamResult::AlreadyExists);
61 }
62 fork::ExistingCreateDisposition::Conflict(err) => {
63 return Err(err);
64 }
65 }
66 }
67
68 let meta = Self::new_stream_meta(config);
69 Self::write_stream_meta(&mut streams, name, &meta)?;
70
71 drop(messages);
72 drop(streams);
73 txn.commit()
74 .map_err(|e| Self::storage_err("failed to commit create stream", e))?;
75
76 if removed_expired_bytes > 0 {
77 self.saturating_sub_total_bytes(removed_expired_bytes);
78 self.drop_notifier(name);
79 if let Some(parent) = removed_expired_parent {
80 self.cascade_delete_acid(&parent)?;
81 }
82 }
83
84 Ok(CreateStreamResult::Created)
85 }
86
87 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
88 let message_bytes = u64::try_from(data.len()).unwrap_or(u64::MAX);
89 self.reserve_total_bytes(message_bytes)?;
90
91 let result = (|| {
92 let shard = &self.shards[self.existing_shard_index(name)?];
93 let txn = Self::begin_write_txn(&shard.db)?;
94 let mut streams = txn
95 .open_table(STREAMS)
96 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
97 let mut messages = txn
98 .open_table(MESSAGES)
99 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
100
101 let mut meta = Self::read_stream_meta(&streams, name)?
102 .ok_or_else(|| Error::NotFound(name.to_string()))?;
103
104 fork::check_stream_access(&meta.config, meta.state, name)?;
105
106 if meta.closed {
107 return Err(Error::StreamClosed);
108 }
109
110 validate_content_type(&meta.config.content_type, content_type)?;
111
112 if meta.total_bytes + message_bytes > self.max_stream_bytes {
113 return Err(Error::StreamSizeLimitExceeded);
114 }
115
116 let offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
117 messages
118 .insert(
119 (name, meta.next_read_seq, meta.next_byte_offset),
120 data.as_ref(),
121 )
122 .map_err(|e| Self::storage_err("failed to append message", e))?;
123
124 meta.next_read_seq += 1;
125 meta.next_byte_offset += message_bytes;
126 meta.total_bytes += message_bytes;
127 meta.updated_at = Some(Utc::now());
128 fork::renew_ttl(&mut meta.config);
129
130 Self::write_stream_meta(&mut streams, name, &meta)?;
131
132 drop(messages);
133 drop(streams);
134 txn.commit()
135 .map_err(|e| Self::storage_err("failed to commit append", e))?;
136
137 Ok(offset)
138 })();
139
140 if result.is_err() {
141 self.rollback_total_bytes(message_bytes);
142 return result;
143 }
144
145 self.notify_stream(name);
146 result
147 }
148
149 fn batch_append(
150 &self,
151 name: &str,
152 messages: Vec<Bytes>,
153 content_type: &str,
154 seq: Option<&str>,
155 ) -> Result<Offset> {
156 if messages.is_empty() {
157 return Err(Error::InvalidHeader {
158 header: "Content-Length".to_string(),
159 reason: "batch cannot be empty".to_string(),
160 });
161 }
162
163 let batch_bytes = Self::batch_bytes(&messages);
164 self.reserve_total_bytes(batch_bytes)?;
165
166 let result = (|| {
167 let shard = &self.shards[self.existing_shard_index(name)?];
168 let txn = Self::begin_write_txn(&shard.db)?;
169 let mut streams = txn
170 .open_table(STREAMS)
171 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
172 let mut message_table = txn
173 .open_table(MESSAGES)
174 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
175
176 let mut meta = Self::read_stream_meta(&streams, name)?
177 .ok_or_else(|| Error::NotFound(name.to_string()))?;
178
179 fork::check_stream_access(&meta.config, meta.state, name)?;
180
181 if meta.closed {
182 return Err(Error::StreamClosed);
183 }
184
185 validate_content_type(&meta.config.content_type, content_type)?;
186 let pending_seq = validate_seq(meta.last_seq.as_deref(), seq)?;
187
188 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
189 return Err(Error::StreamSizeLimitExceeded);
190 }
191
192 for data in &messages {
193 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
194 message_table
195 .insert(
196 (name, meta.next_read_seq, meta.next_byte_offset),
197 data.as_ref(),
198 )
199 .map_err(|e| Self::storage_err("failed to append batch message", e))?;
200 meta.next_read_seq += 1;
201 meta.next_byte_offset += len;
202 meta.total_bytes += len;
203 }
204
205 if let Some(new_seq) = pending_seq {
206 meta.last_seq = Some(new_seq);
207 }
208 meta.updated_at = Some(Utc::now());
209 fork::renew_ttl(&mut meta.config);
210
211 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
212 Self::write_stream_meta(&mut streams, name, &meta)?;
213
214 drop(message_table);
215 drop(streams);
216 txn.commit()
217 .map_err(|e| Self::storage_err("failed to commit batch append", e))?;
218
219 Ok(next_offset)
220 })();
221
222 if result.is_err() {
223 self.rollback_total_bytes(batch_bytes);
224 return result;
225 }
226
227 self.notify_stream(name);
228 result
229 }
230
231 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
232 let shard_idx = self.existing_shard_index(name)?;
233 let needs_ttl_renewal = {
234 let shard = &self.shards[shard_idx];
235 let txn = shard
236 .db
237 .begin_read()
238 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
239 let streams = txn
240 .open_table(STREAMS)
241 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
242 let meta = Self::read_stream_meta(&streams, name)?
243 .ok_or_else(|| Error::NotFound(name.to_string()))?;
244 fork::check_stream_access(&meta.config, meta.state, name)?;
245 meta.config.ttl_seconds.is_some()
246 };
247
248 if !needs_ttl_renewal {
249 return self.read_without_ttl_renewal(name, from_offset, shard_idx);
250 }
251
252 self.read_with_ttl_renewal(name, from_offset, shard_idx)
253 }
254
255 fn delete(&self, name: &str) -> Result<()> {
256 let shard = &self.shards[self.existing_shard_index(name)?];
257 let txn = Self::begin_write_txn(&shard.db)?;
258 let mut streams = txn
259 .open_table(STREAMS)
260 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
261
262 let meta = Self::read_stream_meta(&streams, name)?
263 .ok_or_else(|| Error::NotFound(name.to_string()))?;
264
265 match fork::evaluate_delete(name, meta.state, meta.ref_count)? {
266 fork::DeleteDisposition::Tombstone => {
267 let mut updated = meta;
268 updated.state = StreamState::Tombstone;
269 Self::write_stream_meta(&mut streams, name, &updated)?;
270 drop(streams);
271 txn.commit()
272 .map_err(|e| Self::storage_err("failed to commit soft delete", e))?;
273 return Ok(());
274 }
275 fork::DeleteDisposition::HardDelete => {}
276 }
277
278 let mut messages = txn
279 .open_table(MESSAGES)
280 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
281 Self::delete_stream_messages(&mut messages, name)?;
282 drop(messages);
283
284 streams
285 .remove(name)
286 .map_err(|e| Self::storage_err("failed to remove stream metadata", e))?;
287
288 let fork_info = meta.fork_info.clone();
289 let total_bytes = meta.total_bytes;
290
291 drop(streams);
292 txn.commit()
293 .map_err(|e| Self::storage_err("failed to commit delete", e))?;
294
295 self.saturating_sub_total_bytes(total_bytes);
296 self.drop_notifier(name);
297
298 if let Some(fi) = fork_info {
299 self.cascade_delete_acid(&fi.source_name)?;
300 }
301
302 Ok(())
303 }
304
305 fn head(&self, name: &str) -> Result<StreamMetadata> {
306 let shard = &self.shards[self.existing_shard_index(name)?];
307 let txn = shard
308 .db
309 .begin_read()
310 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
311
312 let streams = txn
313 .open_table(STREAMS)
314 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
315
316 let meta = Self::read_stream_meta(&streams, name)?
317 .ok_or_else(|| Error::NotFound(name.to_string()))?;
318
319 fork::check_stream_access(&meta.config, meta.state, name)?;
320
321 Ok(StreamMetadata {
322 config: meta.config,
323 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
324 closed: meta.closed,
325 total_bytes: meta.total_bytes,
326 message_count: meta.next_read_seq,
327 created_at: meta.created_at,
328 updated_at: meta.updated_at,
329 })
330 }
331
332 fn close_stream(&self, name: &str) -> Result<()> {
333 let shard = &self.shards[self.existing_shard_index(name)?];
334 let txn = Self::begin_write_txn(&shard.db)?;
335 let mut streams = txn
336 .open_table(STREAMS)
337 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
338
339 let mut meta = Self::read_stream_meta(&streams, name)?
340 .ok_or_else(|| Error::NotFound(name.to_string()))?;
341
342 fork::check_stream_access(&meta.config, meta.state, name)?;
343
344 meta.closed = true;
345 meta.updated_at = Some(Utc::now());
346 fork::renew_ttl(&mut meta.config);
347 Self::write_stream_meta(&mut streams, name, &meta)?;
348
349 drop(streams);
350 txn.commit()
351 .map_err(|e| Self::storage_err("failed to commit close stream", e))?;
352
353 self.notify_stream(name);
354 Ok(())
355 }
356
357 fn append_with_producer(
358 &self,
359 name: &str,
360 messages: Vec<Bytes>,
361 content_type: &str,
362 producer: &ProducerHeaders,
363 should_close: bool,
364 seq: Option<&str>,
365 ) -> Result<ProducerAppendResult> {
366 let batch_bytes = Self::batch_bytes(&messages);
367 self.reserve_total_bytes(batch_bytes)?;
368
369 let result = (|| {
370 let shard = &self.shards[self.existing_shard_index(name)?];
371 let txn = Self::begin_write_txn(&shard.db)?;
372 let mut streams = txn
373 .open_table(STREAMS)
374 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
375 let mut message_table = txn
376 .open_table(MESSAGES)
377 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
378
379 let mut meta = Self::read_stream_meta(&streams, name)?
380 .ok_or_else(|| Error::NotFound(name.to_string()))?;
381
382 fork::check_stream_access(&meta.config, meta.state, name)?;
383
384 cleanup_stale_producers(&mut meta.producers);
385
386 if !messages.is_empty() {
387 validate_content_type(&meta.config.content_type, content_type)?;
388 }
389
390 match check_producer(
391 meta.producers.get(producer.id.as_str()),
392 producer,
393 meta.closed,
394 )? {
395 ProducerCheck::Accept => {}
396 ProducerCheck::Duplicate { epoch, seq } => {
397 return Ok(ProducerAppendResult::Duplicate {
398 epoch,
399 seq,
400 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
401 closed: meta.closed,
402 });
403 }
404 }
405
406 let pending_seq = validate_seq(meta.last_seq.as_deref(), seq)?;
407
408 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
409 return Err(Error::StreamSizeLimitExceeded);
410 }
411
412 for data in &messages {
413 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
414 message_table
415 .insert(
416 (name, meta.next_read_seq, meta.next_byte_offset),
417 data.as_ref(),
418 )
419 .map_err(|e| Self::storage_err("failed to append producer message", e))?;
420 meta.next_read_seq += 1;
421 meta.next_byte_offset += len;
422 meta.total_bytes += len;
423 }
424
425 if let Some(new_seq) = pending_seq {
426 meta.last_seq = Some(new_seq);
427 }
428 if should_close {
429 meta.closed = true;
430 }
431
432 let now = Utc::now();
433 meta.producers.insert(
434 producer.id.clone(),
435 ProducerState {
436 epoch: producer.epoch,
437 last_seq: producer.seq,
438 updated_at: now,
439 },
440 );
441 meta.updated_at = Some(now);
442 fork::renew_ttl(&mut meta.config);
443
444 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
445 let closed = meta.closed;
446
447 Self::write_stream_meta(&mut streams, name, &meta)?;
448 drop(message_table);
449 drop(streams);
450 txn.commit()
451 .map_err(|e| Self::storage_err("failed to commit producer append", e))?;
452
453 Ok(ProducerAppendResult::Accepted {
454 epoch: producer.epoch,
455 seq: producer.seq,
456 next_offset,
457 closed,
458 })
459 })();
460
461 if result.is_err() || matches!(result, Ok(ProducerAppendResult::Duplicate { .. })) {
462 self.rollback_total_bytes(batch_bytes);
463 }
464
465 if result.is_ok() && (!messages.is_empty() || should_close) {
466 self.notify_stream(name);
467 }
468
469 result
470 }
471
472 fn create_stream_with_data(
473 &self,
474 name: &str,
475 config: StreamConfig,
476 messages: Vec<Bytes>,
477 should_close: bool,
478 ) -> Result<CreateWithDataResult> {
479 let batch_bytes = Self::batch_bytes(&messages);
480
481 let mut reserved = false;
482 let mut removed_expired_bytes = 0_u64;
483 let mut removed_expired_parent = None;
484
485 let result = (|| {
486 let shard_idx = self
487 .find_stream_shard_index(name)?
488 .unwrap_or_else(|| self.shard_index(name));
489 let shard = &self.shards[shard_idx];
490 let txn = Self::begin_write_txn(&shard.db)?;
491 let mut streams = txn
492 .open_table(STREAMS)
493 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
494 let mut message_table = txn
495 .open_table(MESSAGES)
496 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
497
498 if let Some(existing) = Self::read_stream_meta(&streams, name)? {
499 match fork::evaluate_root_create(
500 name,
501 &existing.config,
502 existing.state,
503 existing.ref_count,
504 &config,
505 ) {
506 fork::ExistingCreateDisposition::RemoveExpired => {
507 removed_expired_bytes = existing.total_bytes;
508 removed_expired_parent =
509 existing.fork_info.clone().map(|info| info.source_name);
510 Self::delete_stream_messages(&mut message_table, name)?;
511 streams
512 .remove(name)
513 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
514 }
515 fork::ExistingCreateDisposition::AlreadyExists => {
516 return Ok(CreateWithDataResult {
517 status: CreateStreamResult::AlreadyExists,
518 next_offset: Offset::new(
519 existing.next_read_seq,
520 existing.next_byte_offset,
521 ),
522 closed: existing.closed,
523 });
524 }
525 fork::ExistingCreateDisposition::Conflict(err) => {
526 return Err(err);
527 }
528 }
529 }
530
531 if batch_bytes > 0 {
532 self.reserve_total_bytes(batch_bytes)?;
533 reserved = true;
534 }
535
536 let mut meta = Self::new_stream_meta(config);
537 Self::write_initial_messages(
538 name,
539 &messages,
540 batch_bytes,
541 self.max_stream_bytes,
542 &mut meta,
543 &mut message_table,
544 )?;
545
546 if should_close {
547 meta.closed = true;
548 }
549
550 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
551 let closed = meta.closed;
552
553 Self::write_stream_meta(&mut streams, name, &meta)?;
554 drop(message_table);
555 drop(streams);
556 txn.commit()
557 .map_err(|e| Self::storage_err("failed to commit create stream with data", e))?;
558
559 Ok(CreateWithDataResult {
560 status: CreateStreamResult::Created,
561 next_offset,
562 closed,
563 })
564 })();
565
566 if result.is_err() && reserved {
567 self.rollback_total_bytes(batch_bytes);
568 }
569
570 if result.is_ok() {
571 if removed_expired_bytes > 0 {
572 self.saturating_sub_total_bytes(removed_expired_bytes);
573 self.drop_notifier(name);
574 if let Some(parent) = removed_expired_parent {
575 self.cascade_delete_acid(&parent)?;
576 }
577 }
578 if should_close || !messages.is_empty() {
579 self.notify_stream(name);
580 }
581 }
582
583 result
584 }
585
586 fn exists(&self, name: &str) -> bool {
587 let Ok(Some(shard_idx)) = self.find_stream_shard_index(name) else {
588 return false;
589 };
590 let shard = &self.shards[shard_idx];
591 let Ok(txn) = shard.db.begin_read() else {
592 return false;
593 };
594 let Ok(streams) = txn.open_table(STREAMS) else {
595 return false;
596 };
597
598 match Self::read_stream_meta(&streams, name) {
599 Ok(Some(meta)) => !is_stream_expired(&meta.config) && meta.state == StreamState::Active,
600 _ => false,
601 }
602 }
603
604 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
605 let shard_idx = self.find_stream_shard_index(name).ok().flatten()?;
606 let shard = &self.shards[shard_idx];
607 let txn = shard.db.begin_read().ok()?;
608 let streams = txn.open_table(STREAMS).ok()?;
609 let meta = Self::read_stream_meta(&streams, name).ok()??;
610
611 if is_stream_expired(&meta.config) || meta.state == StreamState::Tombstone {
612 return None;
613 }
614
615 Some(self.notifier_sender(name).subscribe())
616 }
617
618 fn cleanup_expired_streams(&self) -> usize {
619 let mut total_removed = 0;
620
621 for shard in &self.shards {
622 let Ok(read_txn) = shard.db.begin_read() else {
623 continue;
624 };
625 let Ok(streams_table) = read_txn.open_table(STREAMS) else {
626 continue;
627 };
628 let Ok(iter) = streams_table.iter() else {
629 continue;
630 };
631
632 let mut candidates: Vec<String> = Vec::new();
633 for item in iter {
634 let Ok((key, value)) = item else {
635 continue;
636 };
637 let name = key.value().to_string();
638 let Ok(meta) = serde_json::from_slice::<StoredStreamMeta>(value.value()) else {
639 continue;
640 };
641 if is_stream_expired(&meta.config) {
642 candidates.push(name);
643 }
644 }
645
646 drop(streams_table);
647 drop(read_txn);
648
649 if candidates.is_empty() {
650 continue;
651 }
652
653 let Ok(txn) = Self::begin_write_txn(&shard.db) else {
654 continue;
655 };
656 let Ok(mut streams) = txn.open_table(STREAMS) else {
657 continue;
658 };
659 let Ok(mut messages) = txn.open_table(MESSAGES) else {
660 continue;
661 };
662
663 let mut committed = Vec::new();
664 for name in &candidates {
665 let meta = streams
666 .get(name.as_str())
667 .ok()
668 .flatten()
669 .and_then(|v| serde_json::from_slice::<StoredStreamMeta>(v.value()).ok());
670 let Some(meta) = meta else { continue };
671 if !is_stream_expired(&meta.config) {
672 continue;
673 }
674
675 match fork::evaluate_expired_cleanup(meta.ref_count) {
676 fork::DeleteDisposition::Tombstone => {
677 let mut updated = meta.clone();
678 updated.state = StreamState::Tombstone;
679 let payload = serde_json::to_vec(&updated).ok();
680 if let Some(payload) = payload {
681 let _ = streams.insert(name.as_str(), payload.as_slice());
682 }
683 committed.push((name.clone(), 0, None));
684 }
685 fork::DeleteDisposition::HardDelete => {
686 let _ = Self::delete_stream_messages(&mut messages, name);
687 let _ = streams.remove(name.as_str());
688 committed.push((
689 name.clone(),
690 meta.total_bytes,
691 meta.fork_info.map(|info| info.source_name),
692 ));
693 }
694 }
695 }
696
697 drop(messages);
698 drop(streams);
699
700 match txn.commit() {
701 Ok(()) => {
702 let committed_len = committed.len();
703 for (name, bytes, parent) in committed {
704 self.rollback_total_bytes(bytes);
705 self.drop_notifier(&name);
706 if let Some(parent) = parent {
707 let _ = self.cascade_delete_acid(&parent);
708 }
709 }
710 total_removed += committed_len;
711 }
712 Err(e) => {
713 warn!(%e, "failed to commit expired stream cleanup");
714 }
715 }
716 }
717
718 total_removed
719 }
720
721 fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>> {
722 let mut result = Vec::new();
723
724 for shard in &self.shards {
725 let read_txn = shard
726 .db
727 .begin_read()
728 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
729 let streams_table = read_txn
730 .open_table(STREAMS)
731 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
732 let iter = streams_table
733 .iter()
734 .map_err(|e| Self::storage_err("failed to iterate streams", e))?;
735
736 for item in iter {
737 let (key, value) =
738 item.map_err(|e| Self::storage_err("failed to read stream entry", e))?;
739 let name = key.value().to_string();
740 let meta: StoredStreamMeta = serde_json::from_slice(value.value())
741 .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
742
743 if is_stream_expired(&meta.config) || meta.state == StreamState::Tombstone {
744 continue;
745 }
746
747 result.push((
748 name,
749 StreamMetadata {
750 config: meta.config,
751 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
752 closed: meta.closed,
753 total_bytes: meta.total_bytes,
754 message_count: meta.next_read_seq,
755 created_at: meta.created_at,
756 updated_at: meta.updated_at,
757 },
758 ));
759 }
760 }
761
762 result.sort_by(|a, b| a.0.cmp(&b.0));
763 Ok(result)
764 }
765
766 fn create_fork(
767 &self,
768 name: &str,
769 source_name: &str,
770 fork_offset: Option<&Offset>,
771 config: StreamConfig,
772 ) -> Result<CreateStreamResult> {
773 let source_shard_idx = self.existing_shard_index(source_name)?;
774 let source_shard = &self.shards[source_shard_idx];
775 let source_read_txn = source_shard
776 .db
777 .begin_read()
778 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
779 let source_read_streams = source_read_txn
780 .open_table(STREAMS)
781 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
782
783 let source_meta = Self::read_stream_meta(&source_read_streams, source_name)?
784 .ok_or_else(|| Error::NotFound(source_name.to_string()))?;
785 fork::check_fork_source_access(&source_meta.config, source_meta.state, source_name)?;
786
787 let source_next_offset =
788 Offset::new(source_meta.next_read_seq, source_meta.next_byte_offset);
789 let resolved_offset = fork::resolve_fork_offset(fork_offset, &source_next_offset)?;
790
791 if !config
792 .content_type
793 .eq_ignore_ascii_case(&source_meta.config.content_type)
794 {
795 return Err(Error::ContentTypeMismatch {
796 expected: source_meta.config.content_type.clone(),
797 actual: config.content_type.clone(),
798 });
799 }
800
801 let fork_spec = fork::build_fork_create_spec(
802 source_name,
803 &source_meta.config,
804 &config,
805 resolved_offset.clone(),
806 );
807
808 let (mut removed_expired_bytes, mut removed_expired_parent) =
809 match self.remove_cross_shard_existing_fork(name, source_shard_idx, &fork_spec)? {
810 CrossShardForkResult::Continue(bytes, parent) => (bytes, parent),
811 CrossShardForkResult::AlreadyExists => {
812 return Ok(CreateStreamResult::AlreadyExists);
813 }
814 };
815
816 let shard = &self.shards[source_shard_idx];
817 let txn = Self::begin_write_txn(&shard.db)?;
818 let mut streams = txn
819 .open_table(STREAMS)
820 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
821 let mut messages = txn
822 .open_table(MESSAGES)
823 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
824
825 let mut source_meta = Self::read_stream_meta(&streams, source_name)?
826 .ok_or_else(|| Error::NotFound(source_name.to_string()))?;
827
828 if let Some(existing) = Self::read_stream_meta(&streams, name)? {
829 match fork::evaluate_fork_create(
830 name,
831 &existing.config,
832 existing.fork_info.as_ref(),
833 existing.state,
834 existing.ref_count,
835 &fork_spec,
836 ) {
837 fork::ExistingCreateDisposition::RemoveExpired => {
838 removed_expired_bytes = existing.total_bytes;
839 removed_expired_parent =
840 existing.fork_info.clone().map(|info| info.source_name);
841 Self::delete_stream_messages(&mut messages, name)?;
842 streams
843 .remove(name)
844 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
845 }
846 fork::ExistingCreateDisposition::AlreadyExists => {
847 return Ok(CreateStreamResult::AlreadyExists);
848 }
849 fork::ExistingCreateDisposition::Conflict(err) => {
850 return Err(err);
851 }
852 }
853 }
854
855 let fork_meta = Self::build_fork_stored_meta(&fork_spec, &config, &resolved_offset);
856 Self::write_stream_meta(&mut streams, name, &fork_meta)?;
857 source_meta.ref_count += 1;
858 Self::write_stream_meta(&mut streams, source_name, &source_meta)?;
859 drop(messages);
860 drop(streams);
861 txn.commit()
862 .map_err(|e| Self::storage_err("failed to commit create fork", e))?;
863
864 self.cleanup_expired_and_notify(name, removed_expired_bytes, removed_expired_parent)?;
865
866 Ok(CreateStreamResult::Created)
867 }
868}
869
870impl AcidStorage {
872 fn read_without_ttl_renewal(
874 &self,
875 name: &str,
876 from_offset: &Offset,
877 shard_idx: usize,
878 ) -> Result<ReadResult> {
879 let shard = &self.shards[shard_idx];
880 let txn = shard
881 .db
882 .begin_read()
883 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
884
885 let streams = txn
886 .open_table(STREAMS)
887 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
888
889 let meta = Self::read_stream_meta(&streams, name)?
890 .ok_or_else(|| Error::NotFound(name.to_string()))?;
891
892 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
893
894 if from_offset.is_now() {
895 return Ok(ReadResult {
896 messages: Vec::new(),
897 next_offset,
898 at_tail: true,
899 closed: meta.closed,
900 });
901 }
902
903 if meta.fork_info.is_none() {
904 drop(streams);
905 drop(txn);
906 let messages = self.read_non_forked_table_messages(name, from_offset, shard_idx)?;
907
908 return Ok(ReadResult {
909 messages,
910 next_offset,
911 at_tail: true,
912 closed: meta.closed,
913 });
914 }
915
916 let fi = meta.fork_info.clone().expect("checked above");
917 let closed = meta.closed;
918 drop(streams);
919 drop(txn);
920
921 let all_messages = self.collect_fork_chain_messages(name, from_offset, &fi)?;
922
923 Ok(ReadResult {
924 messages: all_messages,
925 next_offset,
926 at_tail: true,
927 closed,
928 })
929 }
930
931 fn read_with_ttl_renewal(
933 &self,
934 name: &str,
935 from_offset: &Offset,
936 shard_idx: usize,
937 ) -> Result<ReadResult> {
938 let shard = &self.shards[shard_idx];
939 let txn = Self::begin_write_txn(&shard.db)?;
940 let mut streams = txn
941 .open_table(STREAMS)
942 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
943 let mut meta = Self::read_stream_meta(&streams, name)?
944 .ok_or_else(|| Error::NotFound(name.to_string()))?;
945 fork::check_stream_access(&meta.config, meta.state, name)?;
946
947 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
948 let result = if from_offset.is_now() {
949 ReadResult {
950 messages: Vec::new(),
951 next_offset,
952 at_tail: true,
953 closed: meta.closed,
954 }
955 } else if meta.fork_info.is_none() {
956 let messages = self.read_non_forked_table_messages(name, from_offset, shard_idx)?;
957
958 ReadResult {
959 messages,
960 next_offset,
961 at_tail: true,
962 closed: meta.closed,
963 }
964 } else {
965 let fi = meta.fork_info.clone().expect("checked above");
966 let closed = meta.closed;
967 drop(streams);
968 drop(txn);
969
970 let all_messages = self.collect_fork_chain_messages(name, from_offset, &fi)?;
971
972 let shard = &self.shards[shard_idx];
973 let txn = Self::begin_write_txn(&shard.db)?;
974 let mut streams = txn
975 .open_table(STREAMS)
976 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
977 let mut meta = Self::read_stream_meta(&streams, name)?
978 .ok_or_else(|| Error::NotFound(name.to_string()))?;
979 fork::renew_ttl(&mut meta.config);
980 Self::write_stream_meta(&mut streams, name, &meta)?;
981 drop(streams);
982 txn.commit()
983 .map_err(|e| Self::storage_err("failed to commit ttl renewal", e))?;
984
985 return Ok(ReadResult {
986 messages: all_messages,
987 next_offset,
988 at_tail: true,
989 closed,
990 });
991 };
992
993 fork::renew_ttl(&mut meta.config);
994 Self::write_stream_meta(&mut streams, name, &meta)?;
995 drop(streams);
996 txn.commit()
997 .map_err(|e| Self::storage_err("failed to commit ttl renewal", e))?;
998
999 Ok(result)
1000 }
1001
1002 fn write_initial_messages(
1004 name: &str,
1005 messages: &[Bytes],
1006 batch_bytes: u64,
1007 max_stream_bytes: u64,
1008 meta: &mut StoredStreamMeta,
1009 message_table: &mut redb::Table<'_, (&str, u64, u64), &[u8]>,
1010 ) -> Result<()> {
1011 if batch_bytes == 0 {
1012 return Ok(());
1013 }
1014 if meta.total_bytes + batch_bytes > max_stream_bytes {
1015 return Err(Error::StreamSizeLimitExceeded);
1016 }
1017 for data in messages {
1018 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
1019 message_table
1020 .insert(
1021 (name, meta.next_read_seq, meta.next_byte_offset),
1022 data.as_ref(),
1023 )
1024 .map_err(|e| Self::storage_err("failed to append create-with-data message", e))?;
1025 meta.next_read_seq += 1;
1026 meta.next_byte_offset += len;
1027 meta.total_bytes += len;
1028 }
1029 Ok(())
1030 }
1031
1032 #[allow(clippy::type_complexity)]
1036 fn remove_cross_shard_existing_fork(
1037 &self,
1038 name: &str,
1039 source_shard_idx: usize,
1040 fork_spec: &ForkCreateSpec,
1041 ) -> Result<CrossShardForkResult> {
1042 let Some(existing_shard_idx) = self.find_stream_shard_index(name)? else {
1043 return Ok(CrossShardForkResult::Continue(0, None));
1044 };
1045 if existing_shard_idx == source_shard_idx {
1046 return Ok(CrossShardForkResult::Continue(0, None));
1047 }
1048
1049 let existing_shard = &self.shards[existing_shard_idx];
1050 let existing_txn = Self::begin_write_txn(&existing_shard.db)?;
1051 let mut existing_streams = existing_txn
1052 .open_table(STREAMS)
1053 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
1054 let mut existing_messages = existing_txn
1055 .open_table(MESSAGES)
1056 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
1057
1058 let Some(existing) = Self::read_stream_meta(&existing_streams, name)? else {
1059 return Ok(CrossShardForkResult::Continue(0, None));
1060 };
1061
1062 match fork::evaluate_fork_create(
1063 name,
1064 &existing.config,
1065 existing.fork_info.as_ref(),
1066 existing.state,
1067 existing.ref_count,
1068 fork_spec,
1069 ) {
1070 fork::ExistingCreateDisposition::RemoveExpired => {
1071 let removed_bytes = existing.total_bytes;
1072 let removed_parent = existing.fork_info.clone().map(|info| info.source_name);
1073 Self::delete_stream_messages(&mut existing_messages, name)?;
1074 existing_streams
1075 .remove(name)
1076 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
1077 drop(existing_messages);
1078 drop(existing_streams);
1079 existing_txn.commit().map_err(|e| {
1080 Self::storage_err("failed to commit expired cross-shard fork removal", e)
1081 })?;
1082 Ok(CrossShardForkResult::Continue(
1083 removed_bytes,
1084 removed_parent,
1085 ))
1086 }
1087 fork::ExistingCreateDisposition::AlreadyExists => {
1088 Ok(CrossShardForkResult::AlreadyExists)
1089 }
1090 fork::ExistingCreateDisposition::Conflict(err) => Err(err),
1091 }
1092 }
1093
1094 fn build_fork_stored_meta(
1096 fork_spec: &ForkCreateSpec,
1097 config: &StreamConfig,
1098 resolved_offset: &Offset,
1099 ) -> StoredStreamMeta {
1100 let (fork_read_seq, fork_byte_offset) =
1101 resolved_offset.parse_components().unwrap_or((0, 0));
1102 StoredStreamMeta {
1103 config: fork_spec.config.clone(),
1104 closed: config.created_closed,
1105 next_read_seq: fork_read_seq,
1106 next_byte_offset: fork_byte_offset,
1107 total_bytes: 0,
1108 created_at: Utc::now(),
1109 updated_at: None,
1110 last_seq: None,
1111 producers: HashMap::new(),
1112 fork_info: Some(ForkInfo {
1113 source_name: fork_spec.source_name.clone(),
1114 fork_offset: resolved_offset.clone(),
1115 }),
1116 ref_count: 0,
1117 state: StreamState::Active,
1118 }
1119 }
1120
1121 fn cleanup_expired_and_notify(
1123 &self,
1124 name: &str,
1125 removed_expired_bytes: u64,
1126 removed_expired_parent: Option<String>,
1127 ) -> Result<()> {
1128 if removed_expired_bytes > 0 {
1129 self.saturating_sub_total_bytes(removed_expired_bytes);
1130 self.drop_notifier(name);
1131 if let Some(parent) = removed_expired_parent {
1132 self.cascade_delete_acid(&parent)?;
1133 }
1134 }
1135 self.notifier_sender(name);
1136 Ok(())
1137 }
1138}