1use super::{
2 CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
3 ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
4};
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use bytes::Bytes;
9use chrono::{DateTime, Utc};
10use redb::{Database, Durability, ReadableDatabase, ReadableTable, Table, TableDefinition};
11use seahash::hash;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::RwLock;
17use std::sync::atomic::{AtomicU64, Ordering};
18use tokio::sync::broadcast;
19
20const STREAMS: TableDefinition<&str, &[u8]> = TableDefinition::new("streams");
21const MESSAGES: TableDefinition<(&str, u64, u64), &[u8]> = TableDefinition::new("messages");
22
23const LAYOUT_FORMAT_VERSION: u32 = 1;
24const HASH_POLICY: &str = "seahash-v1";
25
26#[derive(Debug, Serialize, Deserialize)]
27struct LayoutManifest {
28 format_version: u32,
29 shard_count: usize,
30 hash_policy: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34struct StoredStreamMeta {
35 config: StreamConfig,
36 closed: bool,
37 next_read_seq: u64,
38 next_byte_offset: u64,
39 total_bytes: u64,
40 created_at: DateTime<Utc>,
41 last_seq: Option<String>,
42 producers: HashMap<String, ProducerState>,
43}
44
45#[derive(Debug)]
46struct AcidShard {
47 db: Database,
48}
49
50#[allow(clippy::module_name_repetitions)]
51pub struct AcidStorage {
52 shards: Vec<AcidShard>,
53 shard_count: usize,
54 total_bytes: AtomicU64,
55 max_total_bytes: u64,
56 max_stream_bytes: u64,
57 notifiers: RwLock<HashMap<String, broadcast::Sender<()>>>,
58}
59
60impl AcidStorage {
61 pub fn new(
66 root_dir: impl Into<PathBuf>,
67 shard_count: usize,
68 max_total_bytes: u64,
69 max_stream_bytes: u64,
70 ) -> Result<Self> {
71 Self::validate_shard_count(shard_count)?;
72
73 let root_dir = root_dir.into();
74 let acid_dir = Self::acid_dir(&root_dir);
75 fs::create_dir_all(&acid_dir).map_err(|e| {
76 Self::storage_err(
77 format!(
78 "failed to create acid storage directory {}",
79 acid_dir.display()
80 ),
81 e,
82 )
83 })?;
84
85 Self::load_or_create_layout(&acid_dir, shard_count)?;
86
87 let mut shards = Vec::with_capacity(shard_count);
88 for idx in 0..shard_count {
89 let shard_path = acid_dir.join(format!("shard_{idx:02x}.redb"));
90 let db = Database::create(&shard_path).map_err(|e| {
91 Self::storage_err(
92 format!("failed to open shard database {}", shard_path.display()),
93 e,
94 )
95 })?;
96 Self::ensure_schema(&db)?;
97 shards.push(AcidShard { db });
98 }
99
100 let storage = Self {
101 shards,
102 shard_count,
103 total_bytes: AtomicU64::new(0),
104 max_total_bytes,
105 max_stream_bytes,
106 notifiers: RwLock::new(HashMap::new()),
107 };
108
109 let total_bytes = storage.rebuild_state_from_disk()?;
110 storage.total_bytes.store(total_bytes, Ordering::Release);
111
112 Ok(storage)
113 }
114
115 #[must_use]
116 pub fn total_bytes(&self) -> u64 {
117 self.total_bytes.load(Ordering::Acquire)
118 }
119
120 fn validate_shard_count(shard_count: usize) -> Result<()> {
121 if !(1..=256).contains(&shard_count) {
122 return Err(Error::Storage(format!(
123 "acid shard count must be in range 1..=256, got {shard_count}"
124 )));
125 }
126 if !shard_count.is_power_of_two() {
127 return Err(Error::Storage(format!(
128 "acid shard count must be a power of two, got {shard_count}"
129 )));
130 }
131 Ok(())
132 }
133
134 fn storage_err(context: impl Into<String>, err: impl std::fmt::Display) -> Error {
135 Error::Storage(format!("{}: {err}", context.into()))
136 }
137
138 fn acid_dir(root_dir: &Path) -> PathBuf {
139 root_dir.join("acid")
140 }
141
142 fn layout_path(acid_dir: &Path) -> PathBuf {
143 acid_dir.join("layout.json")
144 }
145
146 fn load_or_create_layout(acid_dir: &Path, shard_count: usize) -> Result<()> {
147 let layout_path = Self::layout_path(acid_dir);
148 if layout_path.exists() {
149 let payload = fs::read(&layout_path).map_err(|e| {
150 Self::storage_err(
151 format!("failed to read acid layout file {}", layout_path.display()),
152 e,
153 )
154 })?;
155 let manifest: LayoutManifest = serde_json::from_slice(&payload).map_err(|e| {
156 Self::storage_err(
157 format!("failed to parse acid layout file {}", layout_path.display()),
158 e,
159 )
160 })?;
161
162 if manifest.format_version != LAYOUT_FORMAT_VERSION {
163 return Err(Error::Storage(format!(
164 "acid layout mismatch: format_version={}, expected={}",
165 manifest.format_version, LAYOUT_FORMAT_VERSION
166 )));
167 }
168 if manifest.shard_count != shard_count {
169 return Err(Error::Storage(format!(
170 "acid layout mismatch: shard_count={}, expected={shard_count}",
171 manifest.shard_count
172 )));
173 }
174 if manifest.hash_policy != HASH_POLICY {
175 return Err(Error::Storage(format!(
176 "acid layout mismatch: hash_policy='{}', expected='{}'",
177 manifest.hash_policy, HASH_POLICY
178 )));
179 }
180 return Ok(());
181 }
182
183 let manifest = LayoutManifest {
184 format_version: LAYOUT_FORMAT_VERSION,
185 shard_count,
186 hash_policy: HASH_POLICY.to_string(),
187 };
188 let payload = serde_json::to_vec_pretty(&manifest)
189 .map_err(|e| Self::storage_err("failed to serialize acid layout manifest", e))?;
190
191 let tmp_path = acid_dir.join("layout.json.tmp");
192 fs::write(&tmp_path, payload).map_err(|e| {
193 Self::storage_err(
194 format!("failed to write temp layout file {}", tmp_path.display()),
195 e,
196 )
197 })?;
198 fs::rename(&tmp_path, &layout_path).map_err(|e| {
199 Self::storage_err(
200 format!("failed to write layout file {}", layout_path.display()),
201 e,
202 )
203 })?;
204
205 Ok(())
206 }
207
208 #[must_use]
209 fn shard_index(&self, name: &str) -> usize {
210 let hash_u64 = hash(name.as_bytes());
211 let hash_usize = usize::try_from(hash_u64).unwrap_or_else(|_| {
212 let masked = hash_u64 & u64::from(u32::MAX);
213 usize::try_from(masked).expect("masked hash value must fit in usize")
214 });
215 hash_usize & (self.shard_count - 1)
216 }
217
218 fn shard(&self, name: &str) -> &AcidShard {
219 &self.shards[self.shard_index(name)]
220 }
221
222 fn reserve_total_bytes(&self, bytes: u64) -> Result<()> {
223 if bytes == 0 {
224 return Ok(());
225 }
226
227 if self
228 .total_bytes
229 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
230 current
231 .checked_add(bytes)
232 .filter(|next| *next <= self.max_total_bytes)
233 })
234 .is_err()
235 {
236 return Err(Error::MemoryLimitExceeded);
237 }
238 Ok(())
239 }
240
241 fn rollback_total_bytes(&self, bytes: u64) {
242 self.saturating_sub_total_bytes(bytes);
243 }
244
245 fn saturating_sub_total_bytes(&self, bytes: u64) {
246 if bytes == 0 {
247 return;
248 }
249
250 self.total_bytes
251 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
252 Some(current.saturating_sub(bytes))
253 })
254 .ok();
255 }
256
257 fn read_stream_meta<T>(streams: &T, name: &str) -> Result<Option<StoredStreamMeta>>
258 where
259 T: ReadableTable<&'static str, &'static [u8]>,
260 {
261 let payload = streams
262 .get(name)
263 .map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
264
265 if let Some(payload) = payload {
266 let meta = serde_json::from_slice(payload.value())
267 .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
268 Ok(Some(meta))
269 } else {
270 Ok(None)
271 }
272 }
273
274 fn write_stream_meta(
275 streams: &mut Table<'_, &'static str, &'static [u8]>,
276 name: &str,
277 meta: &StoredStreamMeta,
278 ) -> Result<()> {
279 let payload = serde_json::to_vec(meta)
280 .map_err(|e| Self::storage_err("failed to serialize stream metadata", e))?;
281 streams
282 .insert(name, payload.as_slice())
283 .map_err(|e| Self::storage_err("failed to write stream metadata", e))?;
284 Ok(())
285 }
286
287 fn delete_stream_messages(
288 messages: &mut Table<'_, (&'static str, u64, u64), &'static [u8]>,
289 name: &str,
290 ) -> Result<()> {
291 let mut keys = Vec::new();
294 let iter = messages
295 .range((name, 0_u64, 0_u64)..=(name, u64::MAX, u64::MAX))
296 .map_err(|e| Self::storage_err("failed to iterate stream messages", e))?;
297
298 for item in iter {
299 let (key, _) = item.map_err(|e| Self::storage_err("failed to read message key", e))?;
300 let (_, read_seq, byte_offset) = key.value();
301 keys.push((read_seq, byte_offset));
302 }
303
304 for (read_seq, byte_offset) in keys {
305 messages
306 .remove((name, read_seq, byte_offset))
307 .map_err(|e| Self::storage_err("failed to delete message", e))?;
308 }
309
310 Ok(())
311 }
312
313 fn notifier_sender(&self, name: &str) -> broadcast::Sender<()> {
314 if let Some(sender) = self
315 .notifiers
316 .read()
317 .expect("notifiers lock poisoned")
318 .get(name)
319 {
320 return sender.clone();
321 }
322
323 let mut guard = self.notifiers.write().expect("notifiers lock poisoned");
324 guard
325 .entry(name.to_string())
326 .or_insert_with(|| {
327 let (sender, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
328 sender
329 })
330 .clone()
331 }
332
333 fn notify_stream(&self, name: &str) {
334 if let Some(sender) = self
335 .notifiers
336 .read()
337 .expect("notifiers lock poisoned")
338 .get(name)
339 {
340 let _ = sender.send(());
341 }
342 }
343
344 fn drop_notifier(&self, name: &str) {
345 self.notifiers
346 .write()
347 .expect("notifiers lock poisoned")
348 .remove(name);
349 }
350
351 fn new_stream_meta(config: StreamConfig) -> StoredStreamMeta {
352 StoredStreamMeta {
353 config,
354 closed: false,
355 next_read_seq: 0,
356 next_byte_offset: 0,
357 total_bytes: 0,
358 created_at: Utc::now(),
359 last_seq: None,
360 producers: HashMap::new(),
361 }
362 }
363
364 fn batch_bytes(messages: &[Bytes]) -> u64 {
365 messages
366 .iter()
367 .map(|m| u64::try_from(m.len()).unwrap_or(u64::MAX))
368 .sum()
369 }
370
371 fn begin_write_txn(db: &Database) -> Result<redb::WriteTransaction> {
372 let mut txn = db
373 .begin_write()
374 .map_err(|e| Self::storage_err("failed to begin write transaction", e))?;
375 txn.set_durability(Durability::Immediate)
376 .map_err(|e| Self::storage_err("failed to set write durability", e))?;
377 Ok(txn)
378 }
379
380 fn ensure_schema(db: &Database) -> Result<()> {
381 let txn = Self::begin_write_txn(db)?;
382 let streams = txn
383 .open_table(STREAMS)
384 .map_err(|e| Self::storage_err("failed to initialize streams table", e))?;
385 let messages = txn
386 .open_table(MESSAGES)
387 .map_err(|e| Self::storage_err("failed to initialize messages table", e))?;
388 drop(messages);
389 drop(streams);
390 txn.commit()
391 .map_err(|e| Self::storage_err("failed to commit schema initialization", e))?;
392 Ok(())
393 }
394
395 fn rebuild_state_from_disk(&self) -> Result<u64> {
396 let mut total = 0_u64;
397 for shard in &self.shards {
398 total = total.saturating_add(self.rebuild_shard(shard)?);
399 }
400 Ok(total)
401 }
402
403 fn rebuild_shard(&self, shard: &AcidShard) -> Result<u64> {
404 let read_txn = shard
405 .db
406 .begin_read()
407 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
408 let streams = read_txn
409 .open_table(STREAMS)
410 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
411
412 let mut live_bytes = 0_u64;
413 let mut expired_names = Vec::new();
414
415 {
416 let iter = streams
417 .iter()
418 .map_err(|e| Self::storage_err("failed to iterate stream metadata", e))?;
419 for item in iter {
420 let (key, value) =
421 item.map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
422 let stream_name = key.value().to_string();
423 let meta: StoredStreamMeta = serde_json::from_slice(value.value())
424 .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
425 if super::is_stream_expired(&meta.config) {
426 expired_names.push(stream_name);
427 } else {
428 live_bytes = live_bytes.saturating_add(meta.total_bytes);
429 }
430 }
431 }
432
433 drop(streams);
434 drop(read_txn);
435
436 if expired_names.is_empty() {
437 return Ok(live_bytes);
438 }
439
440 let txn = Self::begin_write_txn(&shard.db)?;
441 let mut streams = txn
442 .open_table(STREAMS)
443 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
444 let mut messages = txn
445 .open_table(MESSAGES)
446 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
447
448 for name in &expired_names {
449 Self::delete_stream_messages(&mut messages, name)?;
450 streams
451 .remove(name.as_str())
452 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
453 self.drop_notifier(name);
454 }
455
456 drop(messages);
457 drop(streams);
458 txn.commit()
459 .map_err(|e| Self::storage_err("failed to commit startup cleanup", e))?;
460
461 Ok(live_bytes)
462 }
463}
464
465impl Storage for AcidStorage {
466 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
467 let shard = self.shard(name);
468 let txn = Self::begin_write_txn(&shard.db)?;
469 let mut streams = txn
470 .open_table(STREAMS)
471 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
472 let mut messages = txn
473 .open_table(MESSAGES)
474 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
475
476 let mut removed_expired_bytes = 0_u64;
477
478 if let Some(existing) = Self::read_stream_meta(&streams, name)? {
479 if super::is_stream_expired(&existing.config) {
480 removed_expired_bytes = existing.total_bytes;
481 Self::delete_stream_messages(&mut messages, name)?;
482 streams
483 .remove(name)
484 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
485 } else if existing.config == config {
486 return Ok(CreateStreamResult::AlreadyExists);
487 } else {
488 return Err(Error::ConfigMismatch);
489 }
490 }
491
492 let meta = Self::new_stream_meta(config);
493 Self::write_stream_meta(&mut streams, name, &meta)?;
494
495 drop(messages);
496 drop(streams);
497 txn.commit()
498 .map_err(|e| Self::storage_err("failed to commit create stream", e))?;
499
500 if removed_expired_bytes > 0 {
501 self.saturating_sub_total_bytes(removed_expired_bytes);
502 self.drop_notifier(name);
503 }
504
505 Ok(CreateStreamResult::Created)
506 }
507
508 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
509 let message_bytes = u64::try_from(data.len()).unwrap_or(u64::MAX);
510 self.reserve_total_bytes(message_bytes)?;
511
512 let result = (|| {
513 let shard = self.shard(name);
514 let txn = Self::begin_write_txn(&shard.db)?;
515 let mut streams = txn
516 .open_table(STREAMS)
517 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
518 let mut messages = txn
519 .open_table(MESSAGES)
520 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
521
522 let mut meta = Self::read_stream_meta(&streams, name)?
523 .ok_or_else(|| Error::NotFound(name.to_string()))?;
524
525 if super::is_stream_expired(&meta.config) {
526 return Err(Error::StreamExpired);
527 }
528 if meta.closed {
529 return Err(Error::StreamClosed);
530 }
531
532 super::validate_content_type(&meta.config.content_type, content_type)?;
533
534 if meta.total_bytes + message_bytes > self.max_stream_bytes {
535 return Err(Error::StreamSizeLimitExceeded);
536 }
537
538 let offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
539 messages
540 .insert(
541 (name, meta.next_read_seq, meta.next_byte_offset),
542 data.as_ref(),
543 )
544 .map_err(|e| Self::storage_err("failed to append message", e))?;
545
546 meta.next_read_seq += 1;
547 meta.next_byte_offset += message_bytes;
548 meta.total_bytes += message_bytes;
549
550 Self::write_stream_meta(&mut streams, name, &meta)?;
551
552 drop(messages);
553 drop(streams);
554 txn.commit()
555 .map_err(|e| Self::storage_err("failed to commit append", e))?;
556
557 Ok(offset)
558 })();
559
560 if result.is_err() {
561 self.rollback_total_bytes(message_bytes);
562 return result;
563 }
564
565 self.notify_stream(name);
566 result
567 }
568
569 fn batch_append(
570 &self,
571 name: &str,
572 messages: Vec<Bytes>,
573 content_type: &str,
574 seq: Option<&str>,
575 ) -> Result<Offset> {
576 if messages.is_empty() {
577 return Err(Error::InvalidHeader {
578 header: "Content-Length".to_string(),
579 reason: "batch cannot be empty".to_string(),
580 });
581 }
582
583 let batch_bytes = Self::batch_bytes(&messages);
584 self.reserve_total_bytes(batch_bytes)?;
585
586 let result = (|| {
587 let shard = self.shard(name);
588 let txn = Self::begin_write_txn(&shard.db)?;
589 let mut streams = txn
590 .open_table(STREAMS)
591 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
592 let mut message_table = txn
593 .open_table(MESSAGES)
594 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
595
596 let mut meta = Self::read_stream_meta(&streams, name)?
597 .ok_or_else(|| Error::NotFound(name.to_string()))?;
598
599 if super::is_stream_expired(&meta.config) {
600 return Err(Error::StreamExpired);
601 }
602 if meta.closed {
603 return Err(Error::StreamClosed);
604 }
605
606 super::validate_content_type(&meta.config.content_type, content_type)?;
607 let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
608
609 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
610 return Err(Error::StreamSizeLimitExceeded);
611 }
612
613 for data in &messages {
614 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
615 message_table
616 .insert(
617 (name, meta.next_read_seq, meta.next_byte_offset),
618 data.as_ref(),
619 )
620 .map_err(|e| Self::storage_err("failed to append batch message", e))?;
621 meta.next_read_seq += 1;
622 meta.next_byte_offset += len;
623 meta.total_bytes += len;
624 }
625
626 if let Some(new_seq) = pending_seq {
627 meta.last_seq = Some(new_seq);
628 }
629
630 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
631 Self::write_stream_meta(&mut streams, name, &meta)?;
632
633 drop(message_table);
634 drop(streams);
635 txn.commit()
636 .map_err(|e| Self::storage_err("failed to commit batch append", e))?;
637
638 Ok(next_offset)
639 })();
640
641 if result.is_err() {
642 self.rollback_total_bytes(batch_bytes);
643 return result;
644 }
645
646 self.notify_stream(name);
647 result
648 }
649
650 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
651 let shard = self.shard(name);
652 let txn = shard
653 .db
654 .begin_read()
655 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
656
657 let streams = txn
658 .open_table(STREAMS)
659 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
660 let message_table = txn
661 .open_table(MESSAGES)
662 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
663
664 let meta = Self::read_stream_meta(&streams, name)?
665 .ok_or_else(|| Error::NotFound(name.to_string()))?;
666
667 if super::is_stream_expired(&meta.config) {
668 return Err(Error::StreamExpired);
669 }
670
671 if from_offset.is_now() {
672 return Ok(ReadResult {
673 messages: Vec::new(),
674 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
675 at_tail: true,
676 closed: meta.closed,
677 });
678 }
679
680 let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
681 (0_u64, 0_u64)
682 } else {
683 from_offset.parse_components().ok_or_else(|| {
684 Error::InvalidOffset("non-concrete offset in read range".to_string())
685 })?
686 };
687
688 let iter = message_table
689 .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
690 .map_err(|e| Self::storage_err("failed to read stream range", e))?;
691
692 let mut messages = Vec::new();
693 for item in iter {
694 let (_, value) =
695 item.map_err(|e| Self::storage_err("failed to read stream message", e))?;
696 messages.push(Bytes::copy_from_slice(value.value()));
697 }
698
699 Ok(ReadResult {
700 messages,
701 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
702 at_tail: true,
703 closed: meta.closed,
704 })
705 }
706
707 fn delete(&self, name: &str) -> Result<()> {
708 let shard = self.shard(name);
709 let txn = Self::begin_write_txn(&shard.db)?;
710 let mut streams = txn
711 .open_table(STREAMS)
712 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
713 let mut messages = txn
714 .open_table(MESSAGES)
715 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
716
717 let meta = Self::read_stream_meta(&streams, name)?
718 .ok_or_else(|| Error::NotFound(name.to_string()))?;
719
720 Self::delete_stream_messages(&mut messages, name)?;
721 streams
722 .remove(name)
723 .map_err(|e| Self::storage_err("failed to remove stream metadata", e))?;
724
725 drop(messages);
726 drop(streams);
727 txn.commit()
728 .map_err(|e| Self::storage_err("failed to commit delete", e))?;
729
730 self.saturating_sub_total_bytes(meta.total_bytes);
731 self.drop_notifier(name);
732 Ok(())
733 }
734
735 fn head(&self, name: &str) -> Result<StreamMetadata> {
736 let shard = self.shard(name);
737 let txn = shard
738 .db
739 .begin_read()
740 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
741
742 let streams = txn
743 .open_table(STREAMS)
744 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
745
746 let meta = Self::read_stream_meta(&streams, name)?
747 .ok_or_else(|| Error::NotFound(name.to_string()))?;
748
749 if super::is_stream_expired(&meta.config) {
750 return Err(Error::StreamExpired);
751 }
752
753 Ok(StreamMetadata {
754 config: meta.config,
755 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
756 closed: meta.closed,
757 total_bytes: meta.total_bytes,
758 message_count: meta.next_read_seq,
759 created_at: meta.created_at,
760 })
761 }
762
763 fn close_stream(&self, name: &str) -> Result<()> {
764 let shard = self.shard(name);
765 let txn = Self::begin_write_txn(&shard.db)?;
766 let mut streams = txn
767 .open_table(STREAMS)
768 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
769
770 let mut meta = Self::read_stream_meta(&streams, name)?
771 .ok_or_else(|| Error::NotFound(name.to_string()))?;
772
773 if super::is_stream_expired(&meta.config) {
774 return Err(Error::StreamExpired);
775 }
776
777 meta.closed = true;
778 Self::write_stream_meta(&mut streams, name, &meta)?;
779
780 drop(streams);
781 txn.commit()
782 .map_err(|e| Self::storage_err("failed to commit close stream", e))?;
783
784 self.notify_stream(name);
785 Ok(())
786 }
787
788 fn append_with_producer(
789 &self,
790 name: &str,
791 messages: Vec<Bytes>,
792 content_type: &str,
793 producer: &ProducerHeaders,
794 should_close: bool,
795 seq: Option<&str>,
796 ) -> Result<ProducerAppendResult> {
797 let batch_bytes = Self::batch_bytes(&messages);
798 self.reserve_total_bytes(batch_bytes)?;
799
800 let result = (|| {
801 let shard = self.shard(name);
802 let txn = Self::begin_write_txn(&shard.db)?;
803 let mut streams = txn
804 .open_table(STREAMS)
805 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
806 let mut message_table = txn
807 .open_table(MESSAGES)
808 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
809
810 let mut meta = Self::read_stream_meta(&streams, name)?
811 .ok_or_else(|| Error::NotFound(name.to_string()))?;
812
813 if super::is_stream_expired(&meta.config) {
814 return Err(Error::StreamExpired);
815 }
816
817 super::cleanup_stale_producers(&mut meta.producers);
818
819 if !messages.is_empty() {
820 super::validate_content_type(&meta.config.content_type, content_type)?;
821 }
822
823 match super::check_producer(
824 meta.producers.get(producer.id.as_str()),
825 producer,
826 meta.closed,
827 )? {
828 ProducerCheck::Accept => {}
829 ProducerCheck::Duplicate { epoch, seq } => {
830 return Ok(ProducerAppendResult::Duplicate {
831 epoch,
832 seq,
833 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
834 closed: meta.closed,
835 });
836 }
837 }
838
839 let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
840
841 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
842 return Err(Error::StreamSizeLimitExceeded);
843 }
844
845 for data in &messages {
846 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
847 message_table
848 .insert(
849 (name, meta.next_read_seq, meta.next_byte_offset),
850 data.as_ref(),
851 )
852 .map_err(|e| Self::storage_err("failed to append producer message", e))?;
853 meta.next_read_seq += 1;
854 meta.next_byte_offset += len;
855 meta.total_bytes += len;
856 }
857
858 if let Some(new_seq) = pending_seq {
859 meta.last_seq = Some(new_seq);
860 }
861 if should_close {
862 meta.closed = true;
863 }
864
865 meta.producers.insert(
866 producer.id.clone(),
867 ProducerState {
868 epoch: producer.epoch,
869 last_seq: producer.seq,
870 updated_at: Utc::now(),
871 },
872 );
873
874 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
875 let closed = meta.closed;
876
877 Self::write_stream_meta(&mut streams, name, &meta)?;
878 drop(message_table);
879 drop(streams);
880 txn.commit()
881 .map_err(|e| Self::storage_err("failed to commit producer append", e))?;
882
883 Ok(ProducerAppendResult::Accepted {
884 epoch: producer.epoch,
885 seq: producer.seq,
886 next_offset,
887 closed,
888 })
889 })();
890
891 if result.is_err() || matches!(result, Ok(ProducerAppendResult::Duplicate { .. })) {
892 self.rollback_total_bytes(batch_bytes);
893 }
894
895 if result.is_ok() && (!messages.is_empty() || should_close) {
896 self.notify_stream(name);
897 }
898
899 result
900 }
901
902 fn create_stream_with_data(
903 &self,
904 name: &str,
905 config: StreamConfig,
906 messages: Vec<Bytes>,
907 should_close: bool,
908 ) -> Result<CreateWithDataResult> {
909 let batch_bytes = Self::batch_bytes(&messages);
910
911 let mut reserved = false;
912 let mut removed_expired_bytes = 0_u64;
913
914 let result = (|| {
915 let shard = self.shard(name);
916 let txn = Self::begin_write_txn(&shard.db)?;
917 let mut streams = txn
918 .open_table(STREAMS)
919 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
920 let mut message_table = txn
921 .open_table(MESSAGES)
922 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
923
924 if let Some(existing) = Self::read_stream_meta(&streams, name)? {
925 if super::is_stream_expired(&existing.config) {
926 removed_expired_bytes = existing.total_bytes;
927 Self::delete_stream_messages(&mut message_table, name)?;
928 streams
929 .remove(name)
930 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
931 } else if existing.config == config {
932 return Ok(CreateWithDataResult {
933 status: CreateStreamResult::AlreadyExists,
934 next_offset: Offset::new(existing.next_read_seq, existing.next_byte_offset),
935 closed: existing.closed,
936 });
937 } else {
938 return Err(Error::ConfigMismatch);
939 }
940 }
941
942 if batch_bytes > 0 {
943 self.reserve_total_bytes(batch_bytes)?;
944 reserved = true;
945 }
946
947 let mut meta = Self::new_stream_meta(config);
948
949 if batch_bytes > 0 {
950 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
951 return Err(Error::StreamSizeLimitExceeded);
952 }
953 for data in &messages {
954 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
955 message_table
956 .insert(
957 (name, meta.next_read_seq, meta.next_byte_offset),
958 data.as_ref(),
959 )
960 .map_err(|e| {
961 Self::storage_err("failed to append create-with-data message", e)
962 })?;
963 meta.next_read_seq += 1;
964 meta.next_byte_offset += len;
965 meta.total_bytes += len;
966 }
967 }
968
969 if should_close {
970 meta.closed = true;
971 }
972
973 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
974 let closed = meta.closed;
975
976 Self::write_stream_meta(&mut streams, name, &meta)?;
977 drop(message_table);
978 drop(streams);
979 txn.commit()
980 .map_err(|e| Self::storage_err("failed to commit create stream with data", e))?;
981
982 Ok(CreateWithDataResult {
983 status: CreateStreamResult::Created,
984 next_offset,
985 closed,
986 })
987 })();
988
989 if result.is_err() && reserved {
990 self.rollback_total_bytes(batch_bytes);
991 }
992
993 if result.is_ok() {
994 if removed_expired_bytes > 0 {
995 self.saturating_sub_total_bytes(removed_expired_bytes);
996 self.drop_notifier(name);
997 }
998 if should_close || !messages.is_empty() {
999 self.notify_stream(name);
1000 }
1001 }
1002
1003 result
1004 }
1005
1006 fn exists(&self, name: &str) -> bool {
1007 let shard = self.shard(name);
1008 let Ok(txn) = shard.db.begin_read() else {
1009 return false;
1010 };
1011 let Ok(streams) = txn.open_table(STREAMS) else {
1012 return false;
1013 };
1014
1015 match Self::read_stream_meta(&streams, name) {
1016 Ok(Some(meta)) => !super::is_stream_expired(&meta.config),
1017 _ => false,
1018 }
1019 }
1020
1021 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1022 let shard = self.shard(name);
1023 let txn = shard.db.begin_read().ok()?;
1024 let streams = txn.open_table(STREAMS).ok()?;
1025 let meta = Self::read_stream_meta(&streams, name).ok()??;
1026
1027 if super::is_stream_expired(&meta.config) {
1028 return None;
1029 }
1030
1031 Some(self.notifier_sender(name).subscribe())
1032 }
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037 use super::*;
1038 use chrono::Duration;
1039 use std::sync::Arc;
1040 use std::sync::atomic::{AtomicU64, Ordering};
1041 use std::thread;
1042
1043 fn test_storage_dir() -> PathBuf {
1044 static COUNTER: AtomicU64 = AtomicU64::new(0);
1045 let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1046 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
1047 let pid = std::process::id();
1048 std::env::temp_dir().join(format!("ds-acid-storage-test-{stamp}-{pid}-{seq}"))
1049 }
1050
1051 fn test_storage() -> AcidStorage {
1052 AcidStorage::new(test_storage_dir(), 16, 1024 * 1024, 100 * 1024)
1053 .expect("acid storage should initialize")
1054 }
1055
1056 fn producer(id: &str, epoch: u64, seq: u64) -> ProducerHeaders {
1057 ProducerHeaders {
1058 id: id.to_string(),
1059 epoch,
1060 seq,
1061 }
1062 }
1063
1064 #[test]
1065 fn test_restore_from_disk() {
1066 let root = test_storage_dir();
1067 let cfg = StreamConfig::new("text/plain".to_string());
1068
1069 {
1070 let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1071 storage.create_stream("events", cfg.clone()).unwrap();
1072 storage
1073 .append("events", Bytes::from("event-1"), "text/plain")
1074 .unwrap();
1075 storage
1076 .append("events", Bytes::from("event-2"), "text/plain")
1077 .unwrap();
1078 }
1079
1080 let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1081 let read = restored.read("events", &Offset::start()).unwrap();
1082
1083 assert_eq!(read.messages.len(), 2);
1084 assert_eq!(read.messages[0], Bytes::from("event-1"));
1085 assert_eq!(read.messages[1], Bytes::from("event-2"));
1086 }
1087
1088 #[test]
1089 fn test_restore_closed_stream_from_disk() {
1090 let root = test_storage_dir();
1091 let cfg = StreamConfig::new("text/plain".to_string());
1092
1093 {
1094 let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1095 storage.create_stream("s", cfg.clone()).unwrap();
1096 storage
1097 .append("s", Bytes::from("data"), "text/plain")
1098 .unwrap();
1099 storage.close_stream("s").unwrap();
1100 }
1101
1102 let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1103 let meta = restored.head("s").unwrap();
1104 assert!(meta.closed);
1105 assert_eq!(meta.message_count, 1);
1106
1107 assert!(matches!(
1108 restored.append("s", Bytes::from("more"), "text/plain"),
1109 Err(Error::StreamClosed)
1110 ));
1111 }
1112
1113 #[test]
1114 fn test_restart_preserves_producer_state() {
1115 let root = test_storage_dir();
1116
1117 {
1118 let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1119 storage
1120 .create_stream("s", StreamConfig::new("text/plain".to_string()))
1121 .unwrap();
1122 let result = storage
1123 .append_with_producer(
1124 "s",
1125 vec![Bytes::from("x")],
1126 "text/plain",
1127 &producer("p1", 0, 0),
1128 false,
1129 None,
1130 )
1131 .unwrap();
1132 assert!(matches!(result, ProducerAppendResult::Accepted { .. }));
1133 }
1134
1135 let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1136 let dup = restored
1137 .append_with_producer(
1138 "s",
1139 vec![Bytes::from("x")],
1140 "text/plain",
1141 &producer("p1", 0, 0),
1142 false,
1143 None,
1144 )
1145 .unwrap();
1146 assert!(matches!(dup, ProducerAppendResult::Duplicate { .. }));
1147 }
1148
1149 #[test]
1150 fn test_shard_routing_same_stream_is_stable() {
1151 let storage = test_storage();
1152 let a = storage.shard_index("same-stream");
1153 let b = storage.shard_index("same-stream");
1154 assert_eq!(a, b);
1155 }
1156
1157 #[test]
1158 fn test_shard_distribution_uses_multiple_shards() {
1159 let storage = test_storage();
1160 let mut seen = std::collections::HashSet::new();
1161 for i in 0..256 {
1162 seen.insert(storage.shard_index(&format!("stream-{i}")));
1163 }
1164 assert!(seen.len() > 1);
1165 }
1166
1167 #[test]
1168 fn test_startup_purges_expired_streams() {
1169 let root = test_storage_dir();
1170 {
1171 let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1172 let expires = Utc::now() + Duration::milliseconds(50);
1173 let cfg = StreamConfig::new("text/plain".to_string()).with_expires_at(expires);
1174 storage.create_stream("expiring", cfg).unwrap();
1175 storage
1176 .append("expiring", Bytes::from("x"), "text/plain")
1177 .unwrap();
1178 }
1179
1180 std::thread::sleep(std::time::Duration::from_millis(100));
1181
1182 let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024).unwrap();
1183 assert!(!restored.exists("expiring"));
1184 assert!(matches!(
1185 restored.read("expiring", &Offset::start()),
1186 Err(Error::NotFound(_) | Error::StreamExpired)
1187 ));
1188 }
1189
1190 #[test]
1191 fn test_global_cap_strict_under_concurrency() {
1192 let storage = Arc::new(AcidStorage::new(test_storage_dir(), 16, 120, 120).unwrap());
1193 let shard_count = (0..8)
1194 .map(|i| storage.shard_index(&format!("s-{i}")))
1195 .collect::<std::collections::HashSet<_>>()
1196 .len();
1197 assert!(
1198 shard_count > 1,
1199 "test streams must span multiple shards to validate cross-shard cap behavior"
1200 );
1201
1202 for i in 0..8 {
1203 storage
1204 .create_stream(
1205 &format!("s-{i}"),
1206 StreamConfig::new("text/plain".to_string()),
1207 )
1208 .unwrap();
1209 }
1210
1211 let mut handles = Vec::new();
1212 for i in 0..8 {
1213 let storage = Arc::clone(&storage);
1214 handles.push(thread::spawn(move || {
1215 storage.append(&format!("s-{i}"), Bytes::from(vec![0_u8; 40]), "text/plain")
1216 }));
1217 }
1218
1219 for h in handles {
1220 let _ = h.join().unwrap();
1221 }
1222
1223 assert!(storage.total_bytes() <= 120);
1224 }
1225
1226 #[test]
1227 fn test_layout_manifest_mismatch_fails_fast() {
1228 let root = test_storage_dir();
1229 let first = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024);
1230 assert!(first.is_ok());
1231
1232 let mismatch = AcidStorage::new(root, 8, 1024 * 1024, 100 * 1024);
1233 assert!(matches!(mismatch, Err(Error::Storage(_))));
1234 }
1235
1236 #[test]
1237 fn test_layout_manifest_invalid_json_fails_fast() {
1238 let root = test_storage_dir();
1239 let acid_dir = root.join("acid");
1240 fs::create_dir_all(&acid_dir).unwrap();
1241 fs::write(acid_dir.join("layout.json"), b"{invalid-json").unwrap();
1242
1243 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1244 assert!(matches!(reopened, Err(Error::Storage(_))));
1245 }
1246
1247 #[test]
1248 fn test_layout_manifest_hash_policy_mismatch_fails_fast() {
1249 let root = test_storage_dir();
1250 let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1251 drop(storage);
1252
1253 let layout_path = root.join("acid").join("layout.json");
1254 let mut layout: serde_json::Value =
1255 serde_json::from_slice(&fs::read(&layout_path).unwrap()).unwrap();
1256 layout["hash_policy"] = serde_json::Value::String("tampered-hash-policy".to_string());
1257 fs::write(layout_path, serde_json::to_vec_pretty(&layout).unwrap()).unwrap();
1258
1259 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1260 assert!(matches!(reopened, Err(Error::Storage(_))));
1261 }
1262
1263 #[test]
1264 fn test_corrupted_stream_metadata_fails_fast_on_startup() {
1265 let root = test_storage_dir();
1266 let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1267 storage
1268 .create_stream("s", StreamConfig::new("text/plain".to_string()))
1269 .unwrap();
1270 storage
1271 .append("s", Bytes::from("payload"), "text/plain")
1272 .unwrap();
1273
1274 let shard_idx = storage.shard_index("s");
1276 let txn = AcidStorage::begin_write_txn(&storage.shards[shard_idx].db).unwrap();
1277 let mut streams = txn.open_table(STREAMS).unwrap();
1278 let corrupt = b"{not-json".to_vec();
1279 streams.insert("s", corrupt.as_slice()).unwrap();
1280 drop(streams);
1281 txn.commit().unwrap();
1282 drop(storage);
1283
1284 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1285 assert!(matches!(reopened, Err(Error::Storage(_))));
1286 }
1287
1288 #[test]
1289 fn test_tampered_shard_file_fails_fast_on_startup() {
1290 let root = test_storage_dir();
1291 let storage = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024).unwrap();
1292 storage
1293 .create_stream("s", StreamConfig::new("text/plain".to_string()))
1294 .unwrap();
1295 storage
1296 .append("s", Bytes::from("payload"), "text/plain")
1297 .unwrap();
1298 let shard_idx = storage.shard_index("s");
1299 drop(storage);
1300
1301 let shard_path = root
1302 .join("acid")
1303 .join(format!("shard_{shard_idx:02x}.redb"));
1304 fs::write(&shard_path, b"not-a-valid-redb-file").unwrap();
1305
1306 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024);
1307 assert!(matches!(reopened, Err(Error::Storage(_))));
1308 }
1309}