1use super::{
8 CreateStreamResult, CreateWithDataResult, NOTIFY_CHANNEL_CAPACITY, ProducerAppendResult,
9 ProducerCheck, ProducerState, ReadResult, Storage, StreamConfig, StreamMetadata,
10};
11use crate::config::AcidBackend;
12use crate::protocol::error::{Error, Result};
13use crate::protocol::offset::Offset;
14use crate::protocol::producer::ProducerHeaders;
15use bytes::Bytes;
16use chrono::{DateTime, Utc};
17use redb::backends::InMemoryBackend;
18use redb::{
19 CommitError, Database, DatabaseError, Durability, ReadableDatabase, ReadableTable,
20 SetDurabilityError, StorageError as RedbStorageError, Table, TableDefinition, TableError,
21 TransactionError,
22};
23use seahash::hash;
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::fs;
27use std::path::{Path, PathBuf};
28use std::sync::RwLock;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::Duration;
31use tokio::sync::broadcast;
32use tracing::warn;
33
34const STREAMS: TableDefinition<&str, &[u8]> = TableDefinition::new("streams");
35const MESSAGES: TableDefinition<(&str, u64, u64), &[u8]> = TableDefinition::new("messages");
36
37const LAYOUT_FORMAT_VERSION: u32 = 1;
38const HASH_POLICY: &str = "seahash-v1";
39const STARTUP_RETRY_BACKOFF_MS: [u64; 3] = [10, 25, 50];
42
43#[derive(Debug, Serialize, Deserialize)]
44struct LayoutManifest {
45 format_version: u32,
46 shard_count: usize,
47 hash_policy: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51struct StoredStreamMeta {
52 config: StreamConfig,
53 closed: bool,
54 next_read_seq: u64,
55 next_byte_offset: u64,
56 total_bytes: u64,
57 created_at: DateTime<Utc>,
58 last_seq: Option<String>,
59 producers: HashMap<String, ProducerState>,
60}
61
62#[derive(Debug)]
63struct AcidShard {
64 db: Database,
65}
66
67#[allow(clippy::module_name_repetitions)]
68pub struct AcidStorage {
69 shards: Vec<AcidShard>,
70 shard_count: usize,
71 total_bytes: AtomicU64,
72 max_total_bytes: u64,
73 max_stream_bytes: u64,
74 notifiers: RwLock<HashMap<String, broadcast::Sender<()>>>,
75}
76
77impl AcidStorage {
78 pub fn new(
93 root_dir: impl Into<PathBuf>,
94 shard_count: usize,
95 max_total_bytes: u64,
96 max_stream_bytes: u64,
97 backend: AcidBackend,
98 ) -> Result<Self> {
99 Self::validate_shard_count(shard_count)?;
100
101 let shards = match backend {
102 AcidBackend::File => Self::create_file_shards(&root_dir.into(), shard_count)?,
103 AcidBackend::InMemory => Self::create_in_memory_shards(shard_count)?,
104 };
105
106 let storage = Self {
107 shards,
108 shard_count,
109 total_bytes: AtomicU64::new(0),
110 max_total_bytes,
111 max_stream_bytes,
112 notifiers: RwLock::new(HashMap::new()),
113 };
114
115 let total_bytes = storage.rebuild_state_from_disk()?;
118 storage.total_bytes.store(total_bytes, Ordering::Release);
119
120 Ok(storage)
121 }
122
123 fn create_file_shards(root_dir: &Path, shard_count: usize) -> Result<Vec<AcidShard>> {
124 let acid_dir = Self::acid_dir(root_dir);
125 fs::create_dir_all(&acid_dir).map_err(|e| {
126 Self::storage_err(
127 format!(
128 "failed to create acid storage directory {}",
129 acid_dir.display()
130 ),
131 e,
132 )
133 })?;
134
135 Self::load_or_create_layout(&acid_dir, shard_count)?;
136
137 let mut shards = Vec::with_capacity(shard_count);
138 for idx in 0..shard_count {
139 let shard_path = acid_dir.join(format!("shard_{idx:02x}.redb"));
140 let db = Self::open_shard_database(&shard_path)?;
141 Self::ensure_schema(&db)?;
142 shards.push(AcidShard { db });
143 }
144
145 Ok(shards)
146 }
147
148 fn create_in_memory_shards(shard_count: usize) -> Result<Vec<AcidShard>> {
149 let mut shards = Vec::with_capacity(shard_count);
150 for _ in 0..shard_count {
151 let db = Database::builder()
152 .create_with_backend(InMemoryBackend::new())
153 .map_err(|e| Self::storage_err("failed to create in-memory shard database", e))?;
154 Self::ensure_schema(&db)?;
155 shards.push(AcidShard { db });
156 }
157 Ok(shards)
158 }
159
160 #[must_use]
162 pub fn total_bytes(&self) -> u64 {
163 self.total_bytes.load(Ordering::Acquire)
164 }
165
166 fn validate_shard_count(shard_count: usize) -> Result<()> {
167 if !(1..=256).contains(&shard_count) {
168 return Err(Error::Storage(format!(
169 "acid shard count must be in range 1..=256, got {shard_count}"
170 )));
171 }
172 if !shard_count.is_power_of_two() {
173 return Err(Error::Storage(format!(
174 "acid shard count must be a power of two, got {shard_count}"
175 )));
176 }
177 Ok(())
178 }
179
180 fn storage_err<E: ClassifyError>(context: impl Into<String>, err: E) -> Error {
181 let context = context.into();
182 let detail = format!("{context}: {err}");
183 err.into_storage_error(context, detail)
184 }
185
186 fn classify_redb_storage_error(
187 context: String,
188 err: &RedbStorageError,
189 detail: String,
190 ) -> Error {
191 match err {
192 RedbStorageError::Io(io_err) => {
193 Error::classify_io_failure("acid", context, detail, io_err)
194 }
195 RedbStorageError::DatabaseClosed | RedbStorageError::PreviousIo => {
196 Error::storage_unavailable("acid", context, detail)
197 }
198 RedbStorageError::ValueTooLarge(_) => {
199 Error::storage_insufficient("acid", context, detail)
200 }
201 RedbStorageError::Corrupted(_) | RedbStorageError::LockPoisoned(_) => {
202 Error::Storage(detail)
203 }
204 _ => {
205 warn!(error = %err, "unhandled redb StorageError variant");
206 Error::Storage(detail)
207 }
208 }
209 }
210
211 fn open_shard_database(shard_path: &Path) -> Result<Database> {
215 let context = format!("failed to open shard database {}", shard_path.display());
216 let mut delays = STARTUP_RETRY_BACKOFF_MS.into_iter();
217
218 loop {
219 match Database::builder().create(shard_path) {
220 Ok(db) => return Ok(db),
221 Err(err) if Self::is_retryable_database_open(&err) => {
222 if let Some(delay_ms) = delays.next() {
223 std::thread::sleep(Duration::from_millis(delay_ms));
224 continue;
225 }
226 return Err(Self::storage_err(context, err));
227 }
228 Err(err) => return Err(Self::storage_err(context, err)),
229 }
230 }
231 }
232
233 fn is_retryable_database_open(err: &DatabaseError) -> bool {
234 match err {
235 DatabaseError::DatabaseAlreadyOpen => true,
236 DatabaseError::Storage(RedbStorageError::Io(io_err)) => {
237 Error::is_retryable_io_error(io_err)
238 }
239 _ => false,
240 }
241 }
242
243 fn acid_dir(root_dir: &Path) -> PathBuf {
244 root_dir.join("acid")
245 }
246
247 fn layout_path(acid_dir: &Path) -> PathBuf {
248 acid_dir.join("layout.json")
249 }
250
251 fn load_or_create_layout(acid_dir: &Path, shard_count: usize) -> Result<()> {
252 let layout_path = Self::layout_path(acid_dir);
253 if layout_path.exists() {
254 let payload = fs::read(&layout_path).map_err(|e| {
255 Self::storage_err(
256 format!("failed to read acid layout file {}", layout_path.display()),
257 e,
258 )
259 })?;
260 let manifest: LayoutManifest = serde_json::from_slice(&payload).map_err(|e| {
261 Self::storage_err(
262 format!("failed to parse acid layout file {}", layout_path.display()),
263 e,
264 )
265 })?;
266
267 if manifest.format_version != LAYOUT_FORMAT_VERSION {
268 return Err(Error::Storage(format!(
269 "acid layout mismatch: format_version={}, expected={}",
270 manifest.format_version, LAYOUT_FORMAT_VERSION
271 )));
272 }
273 if manifest.shard_count != shard_count {
274 return Err(Error::Storage(format!(
275 "acid layout mismatch: shard_count={}, expected={shard_count}",
276 manifest.shard_count
277 )));
278 }
279 if manifest.hash_policy != HASH_POLICY {
280 return Err(Error::Storage(format!(
281 "acid layout mismatch: hash_policy='{}', expected='{}'",
282 manifest.hash_policy, HASH_POLICY
283 )));
284 }
285 return Ok(());
286 }
287
288 let manifest = LayoutManifest {
289 format_version: LAYOUT_FORMAT_VERSION,
290 shard_count,
291 hash_policy: HASH_POLICY.to_string(),
292 };
293 let payload = serde_json::to_vec_pretty(&manifest)
294 .map_err(|e| Self::storage_err("failed to serialize acid layout manifest", e))?;
295
296 let tmp_path = acid_dir.join("layout.json.tmp");
297 fs::write(&tmp_path, payload).map_err(|e| {
298 Self::storage_err(
299 format!("failed to write temp layout file {}", tmp_path.display()),
300 e,
301 )
302 })?;
303 fs::rename(&tmp_path, &layout_path).map_err(|e| {
304 Self::storage_err(
305 format!("failed to write layout file {}", layout_path.display()),
306 e,
307 )
308 })?;
309
310 Ok(())
311 }
312
313 #[must_use]
314 fn shard_index(&self, name: &str) -> usize {
315 let hash_u64 = hash(name.as_bytes());
316 let hash_usize = usize::try_from(hash_u64).unwrap_or_else(|_| {
317 let masked = hash_u64 & u64::from(u32::MAX);
318 usize::try_from(masked).expect("masked hash value must fit in usize")
319 });
320 hash_usize & (self.shard_count - 1)
321 }
322
323 fn shard(&self, name: &str) -> &AcidShard {
324 &self.shards[self.shard_index(name)]
325 }
326
327 fn reserve_total_bytes(&self, bytes: u64) -> Result<()> {
328 if bytes == 0 {
329 return Ok(());
330 }
331
332 if self
333 .total_bytes
334 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
335 current
336 .checked_add(bytes)
337 .filter(|next| *next <= self.max_total_bytes)
338 })
339 .is_err()
340 {
341 return Err(Error::MemoryLimitExceeded);
342 }
343 Ok(())
344 }
345
346 fn rollback_total_bytes(&self, bytes: u64) {
347 self.saturating_sub_total_bytes(bytes);
348 }
349
350 fn saturating_sub_total_bytes(&self, bytes: u64) {
351 if bytes == 0 {
352 return;
353 }
354
355 self.total_bytes
356 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
357 Some(current.saturating_sub(bytes))
358 })
359 .ok();
360 }
361
362 fn read_stream_meta<T>(streams: &T, name: &str) -> Result<Option<StoredStreamMeta>>
363 where
364 T: ReadableTable<&'static str, &'static [u8]>,
365 {
366 let payload = streams
367 .get(name)
368 .map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
369
370 if let Some(payload) = payload {
371 let meta = serde_json::from_slice(payload.value())
372 .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
373 Ok(Some(meta))
374 } else {
375 Ok(None)
376 }
377 }
378
379 fn write_stream_meta(
380 streams: &mut Table<'_, &'static str, &'static [u8]>,
381 name: &str,
382 meta: &StoredStreamMeta,
383 ) -> Result<()> {
384 let payload = serde_json::to_vec(meta)
385 .map_err(|e| Self::storage_err("failed to serialize stream metadata", e))?;
386 streams
387 .insert(name, payload.as_slice())
388 .map_err(|e| Self::storage_err("failed to write stream metadata", e))?;
389 Ok(())
390 }
391
392 fn delete_stream_messages(
393 messages: &mut Table<'_, (&'static str, u64, u64), &'static [u8]>,
394 name: &str,
395 ) -> Result<()> {
396 let mut keys = Vec::new();
399 let iter = messages
400 .range((name, 0_u64, 0_u64)..=(name, u64::MAX, u64::MAX))
401 .map_err(|e| Self::storage_err("failed to iterate stream messages", e))?;
402
403 for item in iter {
404 let (key, _) = item.map_err(|e| Self::storage_err("failed to read message key", e))?;
405 let (_, read_seq, byte_offset) = key.value();
406 keys.push((read_seq, byte_offset));
407 }
408
409 for (read_seq, byte_offset) in keys {
410 messages
411 .remove((name, read_seq, byte_offset))
412 .map_err(|e| Self::storage_err("failed to delete message", e))?;
413 }
414
415 Ok(())
416 }
417
418 fn notifier_sender(&self, name: &str) -> broadcast::Sender<()> {
419 let mut guard = self.notifiers.write().expect("notifiers lock poisoned");
423 guard
424 .entry(name.to_string())
425 .or_insert_with(|| {
426 let (sender, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
427 sender
428 })
429 .clone()
430 }
431
432 fn notify_stream(&self, name: &str) {
433 if let Some(sender) = self
434 .notifiers
435 .read()
436 .expect("notifiers lock poisoned")
437 .get(name)
438 {
439 let _ = sender.send(());
440 }
441 }
442
443 fn drop_notifier(&self, name: &str) {
444 self.notifiers
445 .write()
446 .expect("notifiers lock poisoned")
447 .remove(name);
448 }
449
450 fn new_stream_meta(config: StreamConfig) -> StoredStreamMeta {
451 StoredStreamMeta {
452 config,
453 closed: false,
454 next_read_seq: 0,
455 next_byte_offset: 0,
456 total_bytes: 0,
457 created_at: Utc::now(),
458 last_seq: None,
459 producers: HashMap::new(),
460 }
461 }
462
463 fn batch_bytes(messages: &[Bytes]) -> u64 {
464 messages
465 .iter()
466 .map(|m| u64::try_from(m.len()).unwrap_or(u64::MAX))
467 .sum()
468 }
469
470 fn begin_write_txn(db: &Database) -> Result<redb::WriteTransaction> {
471 let mut txn = db
472 .begin_write()
473 .map_err(|e| Self::storage_err("failed to begin write transaction", e))?;
474 txn.set_durability(Durability::Immediate)
475 .map_err(|e| Self::storage_err("failed to set write durability", e))?;
476 Ok(txn)
477 }
478
479 fn ensure_schema(db: &Database) -> Result<()> {
480 let txn = Self::begin_write_txn(db)?;
481 let streams = txn
482 .open_table(STREAMS)
483 .map_err(|e| Self::storage_err("failed to initialize streams table", e))?;
484 let messages = txn
485 .open_table(MESSAGES)
486 .map_err(|e| Self::storage_err("failed to initialize messages table", e))?;
487 drop(messages);
488 drop(streams);
489 txn.commit()
490 .map_err(|e| Self::storage_err("failed to commit schema initialization", e))?;
491 Ok(())
492 }
493
494 fn rebuild_state_from_disk(&self) -> Result<u64> {
495 let mut total = 0_u64;
496 for shard in &self.shards {
497 total = total.saturating_add(self.rebuild_shard(shard)?);
498 }
499 Ok(total)
500 }
501
502 fn rebuild_shard(&self, shard: &AcidShard) -> Result<u64> {
503 let read_txn = shard
504 .db
505 .begin_read()
506 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
507 let streams = read_txn
508 .open_table(STREAMS)
509 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
510
511 let mut live_bytes = 0_u64;
512 let mut expired_names = Vec::new();
513
514 {
515 let iter = streams
516 .iter()
517 .map_err(|e| Self::storage_err("failed to iterate stream metadata", e))?;
518 for item in iter {
519 let (key, value) =
520 item.map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
521 let stream_name = key.value().to_string();
522 let meta: StoredStreamMeta = serde_json::from_slice(value.value())
523 .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
524 if super::is_stream_expired(&meta.config) {
525 expired_names.push(stream_name);
526 } else {
527 live_bytes = live_bytes.saturating_add(meta.total_bytes);
528 }
529 }
530 }
531
532 drop(streams);
533 drop(read_txn);
534
535 if expired_names.is_empty() {
536 return Ok(live_bytes);
537 }
538
539 let txn = Self::begin_write_txn(&shard.db)?;
540 let mut streams = txn
541 .open_table(STREAMS)
542 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
543 let mut messages = txn
544 .open_table(MESSAGES)
545 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
546
547 for name in &expired_names {
548 Self::delete_stream_messages(&mut messages, name)?;
549 streams
550 .remove(name.as_str())
551 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
552 self.drop_notifier(name);
553 }
554
555 drop(messages);
556 drop(streams);
557 txn.commit()
558 .map_err(|e| Self::storage_err("failed to commit startup cleanup", e))?;
559
560 Ok(live_bytes)
561 }
562}
563
564trait ClassifyError: std::fmt::Display {
571 fn into_storage_error(self, context: String, detail: String) -> Error;
572}
573
574impl ClassifyError for std::io::Error {
575 fn into_storage_error(self, context: String, detail: String) -> Error {
576 Error::classify_io_failure("acid", context, detail, &self)
577 }
578}
579
580impl ClassifyError for DatabaseError {
581 fn into_storage_error(self, context: String, detail: String) -> Error {
582 match &self {
583 DatabaseError::DatabaseAlreadyOpen => {
584 Error::storage_unavailable("acid", context, detail)
585 }
586 DatabaseError::Storage(storage_err) => {
587 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
588 }
589 DatabaseError::RepairAborted | DatabaseError::UpgradeRequired(_) => {
590 Error::Storage(detail)
591 }
592 _ => {
593 warn!(error = %self, "unhandled redb DatabaseError variant");
594 Error::Storage(detail)
595 }
596 }
597 }
598}
599
600impl ClassifyError for TransactionError {
601 fn into_storage_error(self, context: String, detail: String) -> Error {
602 match &self {
603 TransactionError::Storage(storage_err) => {
604 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
605 }
606 TransactionError::ReadTransactionStillInUse(_) => Error::Storage(detail),
607 _ => {
608 warn!(error = %self, "unhandled redb TransactionError variant");
609 Error::Storage(detail)
610 }
611 }
612 }
613}
614
615impl ClassifyError for TableError {
616 fn into_storage_error(self, context: String, detail: String) -> Error {
617 match &self {
618 TableError::Storage(storage_err) => {
619 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
620 }
621 TableError::TableTypeMismatch { .. }
622 | TableError::TableIsMultimap(_)
623 | TableError::TableIsNotMultimap(_)
624 | TableError::TypeDefinitionChanged { .. }
625 | TableError::TableDoesNotExist(_)
626 | TableError::TableExists(_)
627 | TableError::TableAlreadyOpen(_, _) => Error::Storage(detail),
628 _ => {
629 warn!(error = %self, "unhandled redb TableError variant");
630 Error::Storage(detail)
631 }
632 }
633 }
634}
635
636impl ClassifyError for CommitError {
637 fn into_storage_error(self, context: String, detail: String) -> Error {
638 if let CommitError::Storage(storage_err) = &self {
639 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
640 } else {
641 warn!(error = %self, "unhandled redb CommitError variant");
642 Error::Storage(detail)
643 }
644 }
645}
646
647impl ClassifyError for RedbStorageError {
648 fn into_storage_error(self, context: String, detail: String) -> Error {
649 AcidStorage::classify_redb_storage_error(context, &self, detail)
650 }
651}
652
653impl ClassifyError for SetDurabilityError {
654 fn into_storage_error(self, _context: String, detail: String) -> Error {
655 Error::Storage(detail)
656 }
657}
658
659impl ClassifyError for serde_json::Error {
660 fn into_storage_error(self, _context: String, detail: String) -> Error {
661 Error::Storage(detail)
662 }
663}
664
665impl Storage for AcidStorage {
666 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
667 let shard = self.shard(name);
668 let txn = Self::begin_write_txn(&shard.db)?;
669 let mut streams = txn
670 .open_table(STREAMS)
671 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
672 let mut messages = txn
673 .open_table(MESSAGES)
674 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
675
676 let mut removed_expired_bytes = 0_u64;
677
678 if let Some(existing) = Self::read_stream_meta(&streams, name)? {
679 if super::is_stream_expired(&existing.config) {
680 removed_expired_bytes = existing.total_bytes;
681 Self::delete_stream_messages(&mut messages, name)?;
682 streams
683 .remove(name)
684 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
685 } else if existing.config == config {
686 return Ok(CreateStreamResult::AlreadyExists);
687 } else {
688 return Err(Error::ConfigMismatch);
689 }
690 }
691
692 let meta = Self::new_stream_meta(config);
693 Self::write_stream_meta(&mut streams, name, &meta)?;
694
695 drop(messages);
696 drop(streams);
697 txn.commit()
698 .map_err(|e| Self::storage_err("failed to commit create stream", e))?;
699
700 if removed_expired_bytes > 0 {
701 self.saturating_sub_total_bytes(removed_expired_bytes);
702 self.drop_notifier(name);
703 }
704
705 Ok(CreateStreamResult::Created)
706 }
707
708 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
709 let message_bytes = u64::try_from(data.len()).unwrap_or(u64::MAX);
710 self.reserve_total_bytes(message_bytes)?;
711
712 let result = (|| {
713 let shard = self.shard(name);
714 let txn = Self::begin_write_txn(&shard.db)?;
715 let mut streams = txn
716 .open_table(STREAMS)
717 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
718 let mut messages = txn
719 .open_table(MESSAGES)
720 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
721
722 let mut meta = Self::read_stream_meta(&streams, name)?
723 .ok_or_else(|| Error::NotFound(name.to_string()))?;
724
725 if super::is_stream_expired(&meta.config) {
726 return Err(Error::StreamExpired);
727 }
728 if meta.closed {
729 return Err(Error::StreamClosed);
730 }
731
732 super::validate_content_type(&meta.config.content_type, content_type)?;
733
734 if meta.total_bytes + message_bytes > self.max_stream_bytes {
735 return Err(Error::StreamSizeLimitExceeded);
736 }
737
738 let offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
739 messages
740 .insert(
741 (name, meta.next_read_seq, meta.next_byte_offset),
742 data.as_ref(),
743 )
744 .map_err(|e| Self::storage_err("failed to append message", e))?;
745
746 meta.next_read_seq += 1;
747 meta.next_byte_offset += message_bytes;
748 meta.total_bytes += message_bytes;
749
750 Self::write_stream_meta(&mut streams, name, &meta)?;
751
752 drop(messages);
753 drop(streams);
754 txn.commit()
755 .map_err(|e| Self::storage_err("failed to commit append", e))?;
756
757 Ok(offset)
758 })();
759
760 if result.is_err() {
761 self.rollback_total_bytes(message_bytes);
762 return result;
763 }
764
765 self.notify_stream(name);
766 result
767 }
768
769 fn batch_append(
770 &self,
771 name: &str,
772 messages: Vec<Bytes>,
773 content_type: &str,
774 seq: Option<&str>,
775 ) -> Result<Offset> {
776 if messages.is_empty() {
777 return Err(Error::InvalidHeader {
778 header: "Content-Length".to_string(),
779 reason: "batch cannot be empty".to_string(),
780 });
781 }
782
783 let batch_bytes = Self::batch_bytes(&messages);
784 self.reserve_total_bytes(batch_bytes)?;
785
786 let result = (|| {
787 let shard = self.shard(name);
788 let txn = Self::begin_write_txn(&shard.db)?;
789 let mut streams = txn
790 .open_table(STREAMS)
791 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
792 let mut message_table = txn
793 .open_table(MESSAGES)
794 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
795
796 let mut meta = Self::read_stream_meta(&streams, name)?
797 .ok_or_else(|| Error::NotFound(name.to_string()))?;
798
799 if super::is_stream_expired(&meta.config) {
800 return Err(Error::StreamExpired);
801 }
802 if meta.closed {
803 return Err(Error::StreamClosed);
804 }
805
806 super::validate_content_type(&meta.config.content_type, content_type)?;
807 let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
808
809 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
810 return Err(Error::StreamSizeLimitExceeded);
811 }
812
813 for data in &messages {
814 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
815 message_table
816 .insert(
817 (name, meta.next_read_seq, meta.next_byte_offset),
818 data.as_ref(),
819 )
820 .map_err(|e| Self::storage_err("failed to append batch message", e))?;
821 meta.next_read_seq += 1;
822 meta.next_byte_offset += len;
823 meta.total_bytes += len;
824 }
825
826 if let Some(new_seq) = pending_seq {
827 meta.last_seq = Some(new_seq);
828 }
829
830 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
831 Self::write_stream_meta(&mut streams, name, &meta)?;
832
833 drop(message_table);
834 drop(streams);
835 txn.commit()
836 .map_err(|e| Self::storage_err("failed to commit batch append", e))?;
837
838 Ok(next_offset)
839 })();
840
841 if result.is_err() {
842 self.rollback_total_bytes(batch_bytes);
843 return result;
844 }
845
846 self.notify_stream(name);
847 result
848 }
849
850 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
851 let shard = self.shard(name);
852 let txn = shard
853 .db
854 .begin_read()
855 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
856
857 let streams = txn
858 .open_table(STREAMS)
859 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
860 let message_table = txn
861 .open_table(MESSAGES)
862 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
863
864 let meta = Self::read_stream_meta(&streams, name)?
865 .ok_or_else(|| Error::NotFound(name.to_string()))?;
866
867 if super::is_stream_expired(&meta.config) {
868 return Err(Error::StreamExpired);
869 }
870
871 if from_offset.is_now() {
872 return Ok(ReadResult {
873 messages: Vec::new(),
874 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
875 at_tail: true,
876 closed: meta.closed,
877 });
878 }
879
880 let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
881 (0_u64, 0_u64)
882 } else {
883 from_offset.parse_components().ok_or_else(|| {
884 Error::InvalidOffset("non-concrete offset in read range".to_string())
885 })?
886 };
887
888 let iter = message_table
889 .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
890 .map_err(|e| Self::storage_err("failed to read stream range", e))?;
891
892 let mut messages = Vec::new();
893 for item in iter {
894 let (_, value) =
895 item.map_err(|e| Self::storage_err("failed to read stream message", e))?;
896 messages.push(Bytes::copy_from_slice(value.value()));
897 }
898
899 Ok(ReadResult {
900 messages,
901 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
902 at_tail: true,
903 closed: meta.closed,
904 })
905 }
906
907 fn delete(&self, name: &str) -> Result<()> {
908 let shard = self.shard(name);
909 let txn = Self::begin_write_txn(&shard.db)?;
910 let mut streams = txn
911 .open_table(STREAMS)
912 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
913 let mut messages = txn
914 .open_table(MESSAGES)
915 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
916
917 let meta = Self::read_stream_meta(&streams, name)?
918 .ok_or_else(|| Error::NotFound(name.to_string()))?;
919
920 Self::delete_stream_messages(&mut messages, name)?;
921 streams
922 .remove(name)
923 .map_err(|e| Self::storage_err("failed to remove stream metadata", e))?;
924
925 drop(messages);
926 drop(streams);
927 txn.commit()
928 .map_err(|e| Self::storage_err("failed to commit delete", e))?;
929
930 self.saturating_sub_total_bytes(meta.total_bytes);
931 self.drop_notifier(name);
932 Ok(())
933 }
934
935 fn head(&self, name: &str) -> Result<StreamMetadata> {
936 let shard = self.shard(name);
937 let txn = shard
938 .db
939 .begin_read()
940 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
941
942 let streams = txn
943 .open_table(STREAMS)
944 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
945
946 let meta = Self::read_stream_meta(&streams, name)?
947 .ok_or_else(|| Error::NotFound(name.to_string()))?;
948
949 if super::is_stream_expired(&meta.config) {
950 return Err(Error::StreamExpired);
951 }
952
953 Ok(StreamMetadata {
954 config: meta.config,
955 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
956 closed: meta.closed,
957 total_bytes: meta.total_bytes,
958 message_count: meta.next_read_seq,
959 created_at: meta.created_at,
960 })
961 }
962
963 fn close_stream(&self, name: &str) -> Result<()> {
964 let shard = self.shard(name);
965 let txn = Self::begin_write_txn(&shard.db)?;
966 let mut streams = txn
967 .open_table(STREAMS)
968 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
969
970 let mut meta = Self::read_stream_meta(&streams, name)?
971 .ok_or_else(|| Error::NotFound(name.to_string()))?;
972
973 if super::is_stream_expired(&meta.config) {
974 return Err(Error::StreamExpired);
975 }
976
977 meta.closed = true;
978 Self::write_stream_meta(&mut streams, name, &meta)?;
979
980 drop(streams);
981 txn.commit()
982 .map_err(|e| Self::storage_err("failed to commit close stream", e))?;
983
984 self.notify_stream(name);
985 Ok(())
986 }
987
988 fn append_with_producer(
989 &self,
990 name: &str,
991 messages: Vec<Bytes>,
992 content_type: &str,
993 producer: &ProducerHeaders,
994 should_close: bool,
995 seq: Option<&str>,
996 ) -> Result<ProducerAppendResult> {
997 let batch_bytes = Self::batch_bytes(&messages);
998 self.reserve_total_bytes(batch_bytes)?;
999
1000 let result = (|| {
1001 let shard = self.shard(name);
1002 let txn = Self::begin_write_txn(&shard.db)?;
1003 let mut streams = txn
1004 .open_table(STREAMS)
1005 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
1006 let mut message_table = txn
1007 .open_table(MESSAGES)
1008 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
1009
1010 let mut meta = Self::read_stream_meta(&streams, name)?
1011 .ok_or_else(|| Error::NotFound(name.to_string()))?;
1012
1013 if super::is_stream_expired(&meta.config) {
1014 return Err(Error::StreamExpired);
1015 }
1016
1017 super::cleanup_stale_producers(&mut meta.producers);
1018
1019 if !messages.is_empty() {
1020 super::validate_content_type(&meta.config.content_type, content_type)?;
1021 }
1022
1023 match super::check_producer(
1024 meta.producers.get(producer.id.as_str()),
1025 producer,
1026 meta.closed,
1027 )? {
1028 ProducerCheck::Accept => {}
1029 ProducerCheck::Duplicate { epoch, seq } => {
1030 return Ok(ProducerAppendResult::Duplicate {
1031 epoch,
1032 seq,
1033 next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
1034 closed: meta.closed,
1035 });
1036 }
1037 }
1038
1039 let pending_seq = super::validate_seq(meta.last_seq.as_deref(), seq)?;
1040
1041 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
1042 return Err(Error::StreamSizeLimitExceeded);
1043 }
1044
1045 for data in &messages {
1046 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
1047 message_table
1048 .insert(
1049 (name, meta.next_read_seq, meta.next_byte_offset),
1050 data.as_ref(),
1051 )
1052 .map_err(|e| Self::storage_err("failed to append producer message", e))?;
1053 meta.next_read_seq += 1;
1054 meta.next_byte_offset += len;
1055 meta.total_bytes += len;
1056 }
1057
1058 if let Some(new_seq) = pending_seq {
1059 meta.last_seq = Some(new_seq);
1060 }
1061 if should_close {
1062 meta.closed = true;
1063 }
1064
1065 meta.producers.insert(
1066 producer.id.clone(),
1067 ProducerState {
1068 epoch: producer.epoch,
1069 last_seq: producer.seq,
1070 updated_at: Utc::now(),
1071 },
1072 );
1073
1074 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
1075 let closed = meta.closed;
1076
1077 Self::write_stream_meta(&mut streams, name, &meta)?;
1078 drop(message_table);
1079 drop(streams);
1080 txn.commit()
1081 .map_err(|e| Self::storage_err("failed to commit producer append", e))?;
1082
1083 Ok(ProducerAppendResult::Accepted {
1084 epoch: producer.epoch,
1085 seq: producer.seq,
1086 next_offset,
1087 closed,
1088 })
1089 })();
1090
1091 if result.is_err() || matches!(result, Ok(ProducerAppendResult::Duplicate { .. })) {
1092 self.rollback_total_bytes(batch_bytes);
1093 }
1094
1095 if result.is_ok() && (!messages.is_empty() || should_close) {
1096 self.notify_stream(name);
1097 }
1098
1099 result
1100 }
1101
1102 fn create_stream_with_data(
1103 &self,
1104 name: &str,
1105 config: StreamConfig,
1106 messages: Vec<Bytes>,
1107 should_close: bool,
1108 ) -> Result<CreateWithDataResult> {
1109 let batch_bytes = Self::batch_bytes(&messages);
1110
1111 let mut reserved = false;
1112 let mut removed_expired_bytes = 0_u64;
1113
1114 let result = (|| {
1115 let shard = self.shard(name);
1116 let txn = Self::begin_write_txn(&shard.db)?;
1117 let mut streams = txn
1118 .open_table(STREAMS)
1119 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
1120 let mut message_table = txn
1121 .open_table(MESSAGES)
1122 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
1123
1124 if let Some(existing) = Self::read_stream_meta(&streams, name)? {
1125 if super::is_stream_expired(&existing.config) {
1126 removed_expired_bytes = existing.total_bytes;
1127 Self::delete_stream_messages(&mut message_table, name)?;
1128 streams
1129 .remove(name)
1130 .map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
1131 } else if existing.config == config {
1132 return Ok(CreateWithDataResult {
1133 status: CreateStreamResult::AlreadyExists,
1134 next_offset: Offset::new(existing.next_read_seq, existing.next_byte_offset),
1135 closed: existing.closed,
1136 });
1137 } else {
1138 return Err(Error::ConfigMismatch);
1139 }
1140 }
1141
1142 if batch_bytes > 0 {
1143 self.reserve_total_bytes(batch_bytes)?;
1144 reserved = true;
1145 }
1146
1147 let mut meta = Self::new_stream_meta(config);
1148
1149 if batch_bytes > 0 {
1150 if meta.total_bytes + batch_bytes > self.max_stream_bytes {
1151 return Err(Error::StreamSizeLimitExceeded);
1152 }
1153 for data in &messages {
1154 let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
1155 message_table
1156 .insert(
1157 (name, meta.next_read_seq, meta.next_byte_offset),
1158 data.as_ref(),
1159 )
1160 .map_err(|e| {
1161 Self::storage_err("failed to append create-with-data message", e)
1162 })?;
1163 meta.next_read_seq += 1;
1164 meta.next_byte_offset += len;
1165 meta.total_bytes += len;
1166 }
1167 }
1168
1169 if should_close {
1170 meta.closed = true;
1171 }
1172
1173 let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
1174 let closed = meta.closed;
1175
1176 Self::write_stream_meta(&mut streams, name, &meta)?;
1177 drop(message_table);
1178 drop(streams);
1179 txn.commit()
1180 .map_err(|e| Self::storage_err("failed to commit create stream with data", e))?;
1181
1182 Ok(CreateWithDataResult {
1183 status: CreateStreamResult::Created,
1184 next_offset,
1185 closed,
1186 })
1187 })();
1188
1189 if result.is_err() && reserved {
1190 self.rollback_total_bytes(batch_bytes);
1191 }
1192
1193 if result.is_ok() {
1194 if removed_expired_bytes > 0 {
1195 self.saturating_sub_total_bytes(removed_expired_bytes);
1196 self.drop_notifier(name);
1197 }
1198 if should_close || !messages.is_empty() {
1199 self.notify_stream(name);
1200 }
1201 }
1202
1203 result
1204 }
1205
1206 fn exists(&self, name: &str) -> bool {
1207 let shard = self.shard(name);
1208 let Ok(txn) = shard.db.begin_read() else {
1209 return false;
1210 };
1211 let Ok(streams) = txn.open_table(STREAMS) else {
1212 return false;
1213 };
1214
1215 match Self::read_stream_meta(&streams, name) {
1216 Ok(Some(meta)) => !super::is_stream_expired(&meta.config),
1217 _ => false,
1218 }
1219 }
1220
1221 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
1222 let shard = self.shard(name);
1223 let txn = shard.db.begin_read().ok()?;
1224 let streams = txn.open_table(STREAMS).ok()?;
1225 let meta = Self::read_stream_meta(&streams, name).ok()??;
1226
1227 if super::is_stream_expired(&meta.config) {
1228 return None;
1229 }
1230
1231 Some(self.notifier_sender(name).subscribe())
1232 }
1233
1234 fn cleanup_expired_streams(&self) -> usize {
1235 let mut total_removed = 0;
1236
1237 for shard in &self.shards {
1238 let Ok(read_txn) = shard.db.begin_read() else {
1240 continue;
1241 };
1242 let Ok(streams_table) = read_txn.open_table(STREAMS) else {
1243 continue;
1244 };
1245 let Ok(iter) = streams_table.iter() else {
1246 continue;
1247 };
1248
1249 let mut candidates: Vec<String> = Vec::new();
1250 for item in iter {
1251 let Ok((key, value)) = item else {
1252 continue;
1253 };
1254 let name = key.value().to_string();
1255 let Ok(meta) = serde_json::from_slice::<StoredStreamMeta>(value.value()) else {
1256 continue;
1257 };
1258 if super::is_stream_expired(&meta.config) {
1259 candidates.push(name);
1260 }
1261 }
1262
1263 drop(streams_table);
1264 drop(read_txn);
1265
1266 if candidates.is_empty() {
1267 continue;
1268 }
1269
1270 let Ok(txn) = Self::begin_write_txn(&shard.db) else {
1272 continue;
1273 };
1274 let Ok(mut streams) = txn.open_table(STREAMS) else {
1275 continue;
1276 };
1277 let Ok(mut messages) = txn.open_table(MESSAGES) else {
1278 continue;
1279 };
1280
1281 let mut committed = Vec::new();
1282 for name in &candidates {
1283 let meta = streams
1286 .get(name.as_str())
1287 .ok()
1288 .flatten()
1289 .and_then(|v| serde_json::from_slice::<StoredStreamMeta>(v.value()).ok());
1290 let Some(meta) = meta else { continue };
1291 if !super::is_stream_expired(&meta.config) {
1292 continue;
1293 }
1294
1295 let _ = Self::delete_stream_messages(&mut messages, name);
1296 let _ = streams.remove(name.as_str());
1297 committed.push((name.clone(), meta.total_bytes));
1298 }
1299
1300 drop(messages);
1301 drop(streams);
1302
1303 match txn.commit() {
1304 Ok(()) => {
1305 for (name, bytes) in &committed {
1306 self.rollback_total_bytes(*bytes);
1307 self.drop_notifier(name);
1308 }
1309 total_removed += committed.len();
1310 }
1311 Err(e) => {
1312 warn!(%e, "failed to commit expired stream cleanup");
1313 }
1314 }
1315 }
1316
1317 total_removed
1318 }
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323 use super::*;
1324 use chrono::Duration;
1325 use std::sync::Arc;
1326 use std::sync::atomic::{AtomicU64, Ordering};
1327 use std::thread;
1328
1329 fn test_storage_dir() -> PathBuf {
1330 static COUNTER: AtomicU64 = AtomicU64::new(0);
1331 let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1332 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
1333 let pid = std::process::id();
1334 std::env::temp_dir().join(format!("ds-acid-storage-test-{stamp}-{pid}-{seq}"))
1335 }
1336
1337 fn test_storage() -> AcidStorage {
1338 AcidStorage::new(
1339 test_storage_dir(),
1340 16,
1341 1024 * 1024,
1342 100 * 1024,
1343 AcidBackend::File,
1344 )
1345 .expect("acid storage should initialize")
1346 }
1347
1348 fn producer(id: &str, epoch: u64, seq: u64) -> ProducerHeaders {
1349 ProducerHeaders {
1350 id: id.to_string(),
1351 epoch,
1352 seq,
1353 }
1354 }
1355
1356 #[test]
1357 fn test_restore_from_disk() {
1358 let root = test_storage_dir();
1359 let cfg = StreamConfig::new("text/plain".to_string());
1360
1361 {
1362 let storage =
1363 AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1364 .unwrap();
1365 storage.create_stream("events", cfg.clone()).unwrap();
1366 storage
1367 .append("events", Bytes::from("event-1"), "text/plain")
1368 .unwrap();
1369 storage
1370 .append("events", Bytes::from("event-2"), "text/plain")
1371 .unwrap();
1372 }
1373
1374 let restored =
1375 AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1376 let read = restored.read("events", &Offset::start()).unwrap();
1377
1378 assert_eq!(read.messages.len(), 2);
1379 assert_eq!(read.messages[0], Bytes::from("event-1"));
1380 assert_eq!(read.messages[1], Bytes::from("event-2"));
1381 }
1382
1383 #[test]
1384 fn test_restore_closed_stream_from_disk() {
1385 let root = test_storage_dir();
1386 let cfg = StreamConfig::new("text/plain".to_string());
1387
1388 {
1389 let storage =
1390 AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1391 .unwrap();
1392 storage.create_stream("s", cfg.clone()).unwrap();
1393 storage
1394 .append("s", Bytes::from("data"), "text/plain")
1395 .unwrap();
1396 storage.close_stream("s").unwrap();
1397 }
1398
1399 let restored =
1400 AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1401 let meta = restored.head("s").unwrap();
1402 assert!(meta.closed);
1403 assert_eq!(meta.message_count, 1);
1404
1405 assert!(matches!(
1406 restored.append("s", Bytes::from("more"), "text/plain"),
1407 Err(Error::StreamClosed)
1408 ));
1409 }
1410
1411 #[test]
1412 fn test_restart_preserves_producer_state() {
1413 let root = test_storage_dir();
1414
1415 {
1416 let storage =
1417 AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1418 .unwrap();
1419 storage
1420 .create_stream("s", StreamConfig::new("text/plain".to_string()))
1421 .unwrap();
1422 let result = storage
1423 .append_with_producer(
1424 "s",
1425 vec![Bytes::from("x")],
1426 "text/plain",
1427 &producer("p1", 0, 0),
1428 false,
1429 None,
1430 )
1431 .unwrap();
1432 assert!(matches!(result, ProducerAppendResult::Accepted { .. }));
1433 }
1434
1435 let restored =
1436 AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1437 let dup = restored
1438 .append_with_producer(
1439 "s",
1440 vec![Bytes::from("x")],
1441 "text/plain",
1442 &producer("p1", 0, 0),
1443 false,
1444 None,
1445 )
1446 .unwrap();
1447 assert!(matches!(dup, ProducerAppendResult::Duplicate { .. }));
1448 }
1449
1450 #[test]
1451 fn test_shard_routing_same_stream_is_stable() {
1452 let storage = test_storage();
1453 let a = storage.shard_index("same-stream");
1454 let b = storage.shard_index("same-stream");
1455 assert_eq!(a, b);
1456 }
1457
1458 #[test]
1459 fn test_shard_distribution_uses_multiple_shards() {
1460 let storage = test_storage();
1461 let mut seen = std::collections::HashSet::new();
1462 for i in 0..256 {
1463 seen.insert(storage.shard_index(&format!("stream-{i}")));
1464 }
1465 assert!(seen.len() > 1);
1466 }
1467
1468 #[test]
1469 fn test_startup_purges_expired_streams() {
1470 let root = test_storage_dir();
1471 {
1472 let storage =
1473 AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File)
1474 .unwrap();
1475 let expires = Utc::now() + Duration::milliseconds(50);
1476 let cfg = StreamConfig::new("text/plain".to_string()).with_expires_at(expires);
1477 storage.create_stream("expiring", cfg).unwrap();
1478 storage
1479 .append("expiring", Bytes::from("x"), "text/plain")
1480 .unwrap();
1481 }
1482
1483 std::thread::sleep(std::time::Duration::from_millis(100));
1484
1485 let restored =
1486 AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1487 assert!(!restored.exists("expiring"));
1488 assert!(matches!(
1489 restored.read("expiring", &Offset::start()),
1490 Err(Error::NotFound(_) | Error::StreamExpired)
1491 ));
1492 }
1493
1494 #[test]
1495 fn test_global_cap_strict_under_concurrency() {
1496 let storage = Arc::new(
1497 AcidStorage::new(test_storage_dir(), 16, 120, 120, AcidBackend::File).unwrap(),
1498 );
1499 let shard_count = (0..8)
1500 .map(|i| storage.shard_index(&format!("s-{i}")))
1501 .collect::<std::collections::HashSet<_>>()
1502 .len();
1503 assert!(
1504 shard_count > 1,
1505 "test streams must span multiple shards to validate cross-shard cap behavior"
1506 );
1507
1508 for i in 0..8 {
1509 storage
1510 .create_stream(
1511 &format!("s-{i}"),
1512 StreamConfig::new("text/plain".to_string()),
1513 )
1514 .unwrap();
1515 }
1516
1517 let mut handles = Vec::new();
1518 for i in 0..8 {
1519 let storage = Arc::clone(&storage);
1520 handles.push(thread::spawn(move || {
1521 storage.append(&format!("s-{i}"), Bytes::from(vec![0_u8; 40]), "text/plain")
1522 }));
1523 }
1524
1525 for h in handles {
1526 let _ = h.join().unwrap();
1527 }
1528
1529 assert!(storage.total_bytes() <= 120);
1530 }
1531
1532 #[test]
1533 fn test_layout_manifest_mismatch_fails_fast() {
1534 let root = test_storage_dir();
1535 let first = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1536 assert!(first.is_ok());
1537
1538 let mismatch = AcidStorage::new(root, 8, 1024 * 1024, 100 * 1024, AcidBackend::File);
1539 assert!(matches!(mismatch, Err(Error::Storage(_))));
1540 }
1541
1542 #[test]
1543 fn test_layout_manifest_invalid_json_fails_fast() {
1544 let root = test_storage_dir();
1545 let acid_dir = root.join("acid");
1546 fs::create_dir_all(&acid_dir).unwrap();
1547 fs::write(acid_dir.join("layout.json"), b"{invalid-json").unwrap();
1548
1549 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1550 assert!(matches!(reopened, Err(Error::Storage(_))));
1551 }
1552
1553 #[test]
1554 fn test_layout_manifest_hash_policy_mismatch_fails_fast() {
1555 let root = test_storage_dir();
1556 let storage =
1557 AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1558 drop(storage);
1559
1560 let layout_path = root.join("acid").join("layout.json");
1561 let mut layout: serde_json::Value =
1562 serde_json::from_slice(&fs::read(&layout_path).unwrap()).unwrap();
1563 layout["hash_policy"] = serde_json::Value::String("tampered-hash-policy".to_string());
1564 fs::write(layout_path, serde_json::to_vec_pretty(&layout).unwrap()).unwrap();
1565
1566 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1567 assert!(matches!(reopened, Err(Error::Storage(_))));
1568 }
1569
1570 #[test]
1571 fn test_corrupted_stream_metadata_fails_fast_on_startup() {
1572 let root = test_storage_dir();
1573 let storage =
1574 AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1575 storage
1576 .create_stream("s", StreamConfig::new("text/plain".to_string()))
1577 .unwrap();
1578 storage
1579 .append("s", Bytes::from("payload"), "text/plain")
1580 .unwrap();
1581
1582 let shard_idx = storage.shard_index("s");
1584 let txn = AcidStorage::begin_write_txn(&storage.shards[shard_idx].db).unwrap();
1585 let mut streams = txn.open_table(STREAMS).unwrap();
1586 let corrupt = b"{not-json".to_vec();
1587 streams.insert("s", corrupt.as_slice()).unwrap();
1588 drop(streams);
1589 txn.commit().unwrap();
1590 drop(storage);
1591
1592 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1593 assert!(matches!(reopened, Err(Error::Storage(_))));
1594 }
1595
1596 #[test]
1597 fn test_tampered_shard_file_fails_fast_on_startup() {
1598 let root = test_storage_dir();
1599 let storage =
1600 AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
1601 storage
1602 .create_stream("s", StreamConfig::new("text/plain".to_string()))
1603 .unwrap();
1604 storage
1605 .append("s", Bytes::from("payload"), "text/plain")
1606 .unwrap();
1607 let shard_idx = storage.shard_index("s");
1608 drop(storage);
1609
1610 let shard_path = root
1611 .join("acid")
1612 .join(format!("shard_{shard_idx:02x}.redb"));
1613 fs::write(&shard_path, b"not-a-valid-redb-file").unwrap();
1614
1615 let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
1616 assert!(matches!(reopened, Err(Error::Storage(_))));
1617 }
1618
1619 #[test]
1620 fn test_in_memory_backend_create_append_read() {
1621 let storage = AcidStorage::new(
1622 test_storage_dir(),
1623 4,
1624 1024 * 1024,
1625 100 * 1024,
1626 AcidBackend::InMemory,
1627 )
1628 .expect("in-memory acid storage should initialize");
1629
1630 let cfg = StreamConfig::new("text/plain".to_string());
1631 storage.create_stream("s", cfg).unwrap();
1632 storage
1633 .append("s", Bytes::from("hello"), "text/plain")
1634 .unwrap();
1635 storage
1636 .append("s", Bytes::from("world"), "text/plain")
1637 .unwrap();
1638
1639 let read = storage.read("s", &Offset::start()).unwrap();
1640 assert_eq!(read.messages.len(), 2);
1641 assert_eq!(read.messages[0], Bytes::from("hello"));
1642 assert_eq!(read.messages[1], Bytes::from("world"));
1643
1644 let meta = storage.head("s").unwrap();
1645 assert_eq!(meta.message_count, 2);
1646 assert_eq!(meta.total_bytes, 10);
1647 }
1648
1649 #[test]
1650 fn test_in_memory_backend_global_cap() {
1651 let storage = AcidStorage::new(test_storage_dir(), 4, 50, 50, AcidBackend::InMemory)
1652 .expect("in-memory acid storage should initialize");
1653
1654 let cfg = StreamConfig::new("text/plain".to_string());
1655 storage.create_stream("s", cfg).unwrap();
1656 storage
1657 .append("s", Bytes::from(vec![0_u8; 40]), "text/plain")
1658 .unwrap();
1659
1660 let result = storage.append("s", Bytes::from(vec![0_u8; 20]), "text/plain");
1661 assert!(result.is_err());
1662 assert_eq!(storage.total_bytes(), 40);
1663 }
1664}