1mod layout;
8mod storage_impl;
9
10#[cfg(test)]
11mod tests;
12
13use super::{ForkInfo, NOTIFY_CHANNEL_CAPACITY, ProducerState, StreamConfig, StreamState};
14use crate::config::AcidBackend;
15use crate::protocol::error::{Error, Result};
16use crate::protocol::offset::Offset;
17use bytes::Bytes;
18use chrono::{DateTime, Utc};
19use redb::backends::InMemoryBackend;
20use redb::{
21 CommitError, Database, DatabaseError, Durability, ReadableDatabase, ReadableTable,
22 SetDurabilityError, StorageError as RedbStorageError, Table, TableDefinition, TableError,
23 TransactionError,
24};
25use seahash::hash;
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::fs;
29use std::path::{Path, PathBuf};
30use std::sync::RwLock;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::time::Duration;
33use tokio::sync::broadcast;
34use tracing::warn;
35
36const STREAMS: TableDefinition<&str, &[u8]> = TableDefinition::new("streams");
37const MESSAGES: TableDefinition<(&str, u64, u64), &[u8]> = TableDefinition::new("messages");
38
39const LAYOUT_FORMAT_VERSION: u32 = 1;
40const HASH_POLICY: &str = "seahash-v1";
41const STARTUP_RETRY_BACKOFF_MS: [u64; 3] = [10, 25, 50];
44
45#[derive(Debug, Serialize, Deserialize)]
46struct LayoutManifest {
47 format_version: u32,
48 shard_count: usize,
49 hash_policy: String,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct StoredStreamMeta {
54 config: StreamConfig,
55 closed: bool,
56 next_read_seq: u64,
57 next_byte_offset: u64,
58 total_bytes: u64,
59 created_at: DateTime<Utc>,
60 #[serde(default)]
61 updated_at: Option<DateTime<Utc>>,
62 last_seq: Option<String>,
63 producers: HashMap<String, ProducerState>,
64 #[serde(default)]
65 fork_info: Option<ForkInfo>,
66 #[serde(default)]
67 ref_count: u32,
68 #[serde(default)]
69 state: StreamState,
70}
71
72#[derive(Debug)]
73struct AcidShard {
74 db: Database,
75}
76
77#[allow(clippy::module_name_repetitions)]
78pub struct AcidStorage {
79 shards: Vec<AcidShard>,
80 shard_count: usize,
81 total_bytes: AtomicU64,
82 max_total_bytes: u64,
83 max_stream_bytes: u64,
84 notifiers: RwLock<HashMap<String, broadcast::Sender<()>>>,
85}
86
87impl AcidStorage {
88 pub fn new(
103 root_dir: impl Into<PathBuf>,
104 shard_count: usize,
105 max_total_bytes: u64,
106 max_stream_bytes: u64,
107 backend: AcidBackend,
108 ) -> Result<Self> {
109 Self::validate_shard_count(shard_count)?;
110
111 let shards = match backend {
112 AcidBackend::File => Self::create_file_shards(&root_dir.into(), shard_count)?,
113 AcidBackend::InMemory => Self::create_in_memory_shards(shard_count)?,
114 };
115
116 let storage = Self {
117 shards,
118 shard_count,
119 total_bytes: AtomicU64::new(0),
120 max_total_bytes,
121 max_stream_bytes,
122 notifiers: RwLock::new(HashMap::new()),
123 };
124
125 let total_bytes = storage.rebuild_state_from_disk()?;
126 storage.total_bytes.store(total_bytes, Ordering::Release);
127
128 Ok(storage)
129 }
130
131 #[must_use]
133 pub fn total_bytes(&self) -> u64 {
134 self.total_bytes.load(Ordering::Acquire)
135 }
136
137 fn validate_shard_count(shard_count: usize) -> Result<()> {
138 if !(1..=256).contains(&shard_count) {
139 return Err(Error::Storage(format!(
140 "acid shard count must be in range 1..=256, got {shard_count}"
141 )));
142 }
143 if !shard_count.is_power_of_two() {
144 return Err(Error::Storage(format!(
145 "acid shard count must be a power of two, got {shard_count}"
146 )));
147 }
148 Ok(())
149 }
150
151 fn storage_err<E: ClassifyError>(context: impl Into<String>, err: E) -> Error {
152 let context = context.into();
153 let detail = format!("{context}: {err}");
154 err.into_storage_error(context, detail)
155 }
156
157 fn classify_redb_storage_error(
158 context: String,
159 err: &RedbStorageError,
160 detail: String,
161 ) -> Error {
162 match err {
163 RedbStorageError::Io(io_err) => {
164 Error::classify_io_failure("acid", context, detail, io_err)
165 }
166 RedbStorageError::DatabaseClosed | RedbStorageError::PreviousIo => {
167 Error::storage_unavailable("acid", context, detail)
168 }
169 RedbStorageError::ValueTooLarge(_) => {
170 Error::storage_insufficient("acid", context, detail)
171 }
172 RedbStorageError::Corrupted(_) | RedbStorageError::LockPoisoned(_) => {
173 Error::Storage(detail)
174 }
175 _ => {
176 warn!(error = %err, "unhandled redb StorageError variant");
177 Error::Storage(detail)
178 }
179 }
180 }
181
182 #[must_use]
183 fn shard_index(&self, name: &str) -> usize {
184 let hash_u64 = hash(name.as_bytes());
185 let hash_usize = usize::try_from(hash_u64).unwrap_or_else(|_| {
186 let masked = hash_u64 & u64::from(u32::MAX);
187 usize::try_from(masked).expect("masked hash value must fit in usize")
188 });
189 hash_usize & (self.shard_count - 1)
190 }
191
192 fn find_stream_shard_index(&self, name: &str) -> Result<Option<usize>> {
193 let hashed_idx = self.shard_index(name);
194 if self.stream_exists_in_shard(hashed_idx, name)? {
195 return Ok(Some(hashed_idx));
196 }
197
198 let mut found = None;
199
200 for (idx, shard) in self.shards.iter().enumerate() {
201 if idx == hashed_idx {
202 continue;
203 }
204
205 let txn = shard
206 .db
207 .begin_read()
208 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
209 let streams = txn
210 .open_table(STREAMS)
211 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
212
213 if Self::read_stream_meta(&streams, name)?.is_some() && found.replace(idx).is_some() {
214 return Err(Error::Storage(format!(
215 "stream metadata exists in multiple shards for {name}"
216 )));
217 }
218 }
219
220 Ok(found)
221 }
222
223 fn stream_exists_in_shard(&self, shard_idx: usize, name: &str) -> Result<bool> {
224 let shard = &self.shards[shard_idx];
225 let txn = shard
226 .db
227 .begin_read()
228 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
229 let streams = txn
230 .open_table(STREAMS)
231 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
232
233 Ok(Self::read_stream_meta(&streams, name)?.is_some())
234 }
235
236 fn existing_shard_index(&self, name: &str) -> Result<usize> {
237 self.find_stream_shard_index(name)?
238 .ok_or_else(|| Error::NotFound(name.to_string()))
239 }
240
241 fn reserve_total_bytes(&self, bytes: u64) -> Result<()> {
242 if bytes == 0 {
243 return Ok(());
244 }
245
246 if self
247 .total_bytes
248 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
249 current
250 .checked_add(bytes)
251 .filter(|next| *next <= self.max_total_bytes)
252 })
253 .is_err()
254 {
255 return Err(Error::MemoryLimitExceeded);
256 }
257 Ok(())
258 }
259
260 fn rollback_total_bytes(&self, bytes: u64) {
261 self.saturating_sub_total_bytes(bytes);
262 }
263
264 fn saturating_sub_total_bytes(&self, bytes: u64) {
265 if bytes == 0 {
266 return;
267 }
268
269 self.total_bytes
270 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
271 Some(current.saturating_sub(bytes))
272 })
273 .ok();
274 }
275
276 fn read_stream_meta<T>(streams: &T, name: &str) -> Result<Option<StoredStreamMeta>>
277 where
278 T: ReadableTable<&'static str, &'static [u8]>,
279 {
280 let payload = streams
281 .get(name)
282 .map_err(|e| Self::storage_err("failed to read stream metadata", e))?;
283
284 if let Some(payload) = payload {
285 let meta = serde_json::from_slice(payload.value())
286 .map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
287 Ok(Some(meta))
288 } else {
289 Ok(None)
290 }
291 }
292
293 fn write_stream_meta(
294 streams: &mut Table<'_, &'static str, &'static [u8]>,
295 name: &str,
296 meta: &StoredStreamMeta,
297 ) -> Result<()> {
298 let payload = serde_json::to_vec(meta)
299 .map_err(|e| Self::storage_err("failed to serialize stream metadata", e))?;
300 streams
301 .insert(name, payload.as_slice())
302 .map_err(|e| Self::storage_err("failed to write stream metadata", e))?;
303 Ok(())
304 }
305
306 fn delete_stream_messages(
307 messages: &mut Table<'_, (&'static str, u64, u64), &'static [u8]>,
308 name: &str,
309 ) -> Result<()> {
310 let mut keys = Vec::new();
311 let iter = messages
312 .range((name, 0_u64, 0_u64)..=(name, u64::MAX, u64::MAX))
313 .map_err(|e| Self::storage_err("failed to iterate stream messages", e))?;
314
315 for item in iter {
316 let (key, _) = item.map_err(|e| Self::storage_err("failed to read message key", e))?;
317 let (_, read_seq, byte_offset) = key.value();
318 keys.push((read_seq, byte_offset));
319 }
320
321 for (read_seq, byte_offset) in keys {
322 messages
323 .remove((name, read_seq, byte_offset))
324 .map_err(|e| Self::storage_err("failed to delete message", e))?;
325 }
326
327 Ok(())
328 }
329
330 fn notifier_sender(&self, name: &str) -> broadcast::Sender<()> {
331 let mut guard = self.notifiers.write().expect("notifiers lock poisoned");
332 guard
333 .entry(name.to_string())
334 .or_insert_with(|| {
335 let (sender, _) = broadcast::channel(NOTIFY_CHANNEL_CAPACITY);
336 sender
337 })
338 .clone()
339 }
340
341 fn notify_stream(&self, name: &str) {
342 if let Some(sender) = self
343 .notifiers
344 .read()
345 .expect("notifiers lock poisoned")
346 .get(name)
347 {
348 let _ = sender.send(());
349 }
350 }
351
352 fn drop_notifier(&self, name: &str) {
353 self.notifiers
354 .write()
355 .expect("notifiers lock poisoned")
356 .remove(name);
357 }
358
359 fn new_stream_meta(config: StreamConfig) -> StoredStreamMeta {
360 StoredStreamMeta {
361 config,
362 closed: false,
363 next_read_seq: 0,
364 next_byte_offset: 0,
365 total_bytes: 0,
366 created_at: Utc::now(),
367 updated_at: None,
368 last_seq: None,
369 producers: HashMap::new(),
370 fork_info: None,
371 ref_count: 0,
372 state: StreamState::Active,
373 }
374 }
375
376 fn batch_bytes(messages: &[Bytes]) -> u64 {
377 messages
378 .iter()
379 .map(|m| u64::try_from(m.len()).unwrap_or(u64::MAX))
380 .sum()
381 }
382
383 fn read_messages_from_shard(
389 &self,
390 name: &str,
391 from_offset: &Offset,
392 up_to: Option<&Offset>,
393 ) -> Result<Vec<Bytes>> {
394 let shard = &self.shards[self.existing_shard_index(name)?];
395 let txn = shard
396 .db
397 .begin_read()
398 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
399 let message_table = txn
400 .open_table(MESSAGES)
401 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
402
403 let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
404 (0_u64, 0_u64)
405 } else {
406 from_offset.parse_components().unwrap_or((0, 0))
407 };
408
409 let iter = message_table
410 .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
411 .map_err(|e| Self::storage_err("failed to read shard message range", e))?;
412
413 let mut messages = Vec::new();
414 for item in iter {
415 let (key, value) =
416 item.map_err(|e| Self::storage_err("failed to read shard message", e))?;
417 if let Some(bound) = up_to {
418 let (_, read_seq, byte_offset) = key.value();
419 let msg_offset = Offset::new(read_seq, byte_offset);
420 if msg_offset >= *bound {
421 break;
422 }
423 }
424 messages.push(Bytes::copy_from_slice(value.value()));
425 }
426
427 Ok(messages)
428 }
429
430 fn cascade_delete_acid(&self, parent_name: &str) -> Result<()> {
431 let mut current_parent = parent_name.to_string();
432 loop {
433 let Some(shard_idx) = self.find_stream_shard_index(¤t_parent)? else {
434 break;
435 };
436 let shard = &self.shards[shard_idx];
437 let txn = Self::begin_write_txn(&shard.db)?;
438 let mut streams = txn
439 .open_table(STREAMS)
440 .map_err(|e| Self::storage_err("failed to open streams table", e))?;
441
442 let Some(mut meta) = Self::read_stream_meta(&streams, ¤t_parent)? else {
443 break;
444 };
445
446 meta.ref_count = meta.ref_count.saturating_sub(1);
447
448 if meta.state == StreamState::Tombstone && meta.ref_count == 0 {
449 let fi = meta.fork_info.clone();
450 let total_bytes = meta.total_bytes;
451
452 let mut messages = txn
453 .open_table(MESSAGES)
454 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
455 Self::delete_stream_messages(&mut messages, ¤t_parent)?;
456 drop(messages);
457
458 streams
459 .remove(current_parent.as_str())
460 .map_err(|e| Self::storage_err("failed to remove tombstoned parent", e))?;
461 drop(streams);
462 txn.commit()
463 .map_err(|e| Self::storage_err("failed to commit cascade delete", e))?;
464
465 self.saturating_sub_total_bytes(total_bytes);
466 self.drop_notifier(¤t_parent);
467
468 if let Some(fi) = fi {
469 current_parent = fi.source_name;
470 } else {
471 break;
472 }
473 } else {
474 Self::write_stream_meta(&mut streams, ¤t_parent, &meta)?;
475 drop(streams);
476 txn.commit()
477 .map_err(|e| Self::storage_err("failed to commit ref_count decrement", e))?;
478 break;
479 }
480 }
481 Ok(())
482 }
483
484 fn read_non_forked_table_messages(
487 &self,
488 name: &str,
489 from_offset: &Offset,
490 shard_idx: usize,
491 ) -> Result<Vec<Bytes>> {
492 let (start_read_seq, start_byte_offset) = if from_offset.is_start() {
493 (0_u64, 0_u64)
494 } else {
495 from_offset.parse_components().ok_or_else(|| {
496 Error::InvalidOffset("non-concrete offset in read range".to_string())
497 })?
498 };
499
500 let shard = &self.shards[shard_idx];
501 let txn = shard
502 .db
503 .begin_read()
504 .map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
505 let message_table = txn
506 .open_table(MESSAGES)
507 .map_err(|e| Self::storage_err("failed to open messages table", e))?;
508
509 let iter = message_table
510 .range((name, start_read_seq, start_byte_offset)..=(name, u64::MAX, u64::MAX))
511 .map_err(|e| Self::storage_err("failed to read stream range", e))?;
512
513 let mut messages = Vec::new();
514 for item in iter {
515 let (_, value) =
516 item.map_err(|e| Self::storage_err("failed to read stream message", e))?;
517 messages.push(Bytes::copy_from_slice(value.value()));
518 }
519
520 Ok(messages)
521 }
522
523 fn collect_fork_chain_messages(
525 &self,
526 name: &str,
527 from_offset: &Offset,
528 fi: &ForkInfo,
529 ) -> Result<Vec<Bytes>> {
530 let mut all_messages: Vec<Bytes> = Vec::new();
531 if from_offset.is_start() || *from_offset < fi.fork_offset {
532 let plan = super::fork::build_read_plan(&fi.source_name, |segment_name| {
533 let shard_idx = self.find_stream_shard_index(segment_name).ok().flatten()?;
534 let shard = &self.shards[shard_idx];
535 let txn = shard.db.begin_read().ok()?;
536 let streams = txn.open_table(STREAMS).ok()?;
537 let meta = Self::read_stream_meta(&streams, segment_name).ok()??;
538 Some(meta.fork_info)
539 });
540
541 for (i, segment) in plan.iter().enumerate() {
542 let effective_up_to = if i == plan.len() - 1 {
543 Some(&fi.fork_offset)
544 } else {
545 segment.read_up_to.as_ref()
546 };
547 let effective_from = if i == 0 {
548 from_offset
549 } else {
550 &Offset::start()
551 };
552 let segment_msgs =
553 self.read_messages_from_shard(&segment.name, effective_from, effective_up_to)?;
554 all_messages.extend(segment_msgs);
555 }
556 }
557
558 let fork_msgs = if from_offset.is_start() || *from_offset <= fi.fork_offset {
559 self.read_messages_from_shard(name, &fi.fork_offset, None)?
560 } else {
561 self.read_messages_from_shard(name, from_offset, None)?
562 };
563 all_messages.extend(fork_msgs);
564
565 Ok(all_messages)
566 }
567
568 fn begin_write_txn(db: &Database) -> Result<redb::WriteTransaction> {
569 let mut txn = db
570 .begin_write()
571 .map_err(|e| Self::storage_err("failed to begin write transaction", e))?;
572 txn.set_durability(Durability::Immediate)
573 .map_err(|e| Self::storage_err("failed to set write durability", e))?;
574 Ok(txn)
575 }
576}
577
578trait ClassifyError: std::fmt::Display {
585 fn into_storage_error(self, context: String, detail: String) -> Error;
586}
587
588impl ClassifyError for std::io::Error {
589 fn into_storage_error(self, context: String, detail: String) -> Error {
590 Error::classify_io_failure("acid", context, detail, &self)
591 }
592}
593
594impl ClassifyError for DatabaseError {
595 fn into_storage_error(self, context: String, detail: String) -> Error {
596 match &self {
597 DatabaseError::DatabaseAlreadyOpen => {
598 Error::storage_unavailable("acid", context, detail)
599 }
600 DatabaseError::Storage(storage_err) => {
601 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
602 }
603 DatabaseError::RepairAborted | DatabaseError::UpgradeRequired(_) => {
604 Error::Storage(detail)
605 }
606 _ => {
607 warn!(error = %self, "unhandled redb DatabaseError variant");
608 Error::Storage(detail)
609 }
610 }
611 }
612}
613
614impl ClassifyError for TransactionError {
615 fn into_storage_error(self, context: String, detail: String) -> Error {
616 match &self {
617 TransactionError::Storage(storage_err) => {
618 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
619 }
620 TransactionError::ReadTransactionStillInUse(_) => Error::Storage(detail),
621 _ => {
622 warn!(error = %self, "unhandled redb TransactionError variant");
623 Error::Storage(detail)
624 }
625 }
626 }
627}
628
629impl ClassifyError for TableError {
630 fn into_storage_error(self, context: String, detail: String) -> Error {
631 match &self {
632 TableError::Storage(storage_err) => {
633 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
634 }
635 TableError::TableTypeMismatch { .. }
636 | TableError::TableIsMultimap(_)
637 | TableError::TableIsNotMultimap(_)
638 | TableError::TypeDefinitionChanged { .. }
639 | TableError::TableDoesNotExist(_)
640 | TableError::TableExists(_)
641 | TableError::TableAlreadyOpen(_, _) => Error::Storage(detail),
642 _ => {
643 warn!(error = %self, "unhandled redb TableError variant");
644 Error::Storage(detail)
645 }
646 }
647 }
648}
649
650impl ClassifyError for CommitError {
651 fn into_storage_error(self, context: String, detail: String) -> Error {
652 if let CommitError::Storage(storage_err) = &self {
653 AcidStorage::classify_redb_storage_error(context, storage_err, detail)
654 } else {
655 warn!(error = %self, "unhandled redb CommitError variant");
656 Error::Storage(detail)
657 }
658 }
659}
660
661impl ClassifyError for RedbStorageError {
662 fn into_storage_error(self, context: String, detail: String) -> Error {
663 AcidStorage::classify_redb_storage_error(context, &self, detail)
664 }
665}
666
667impl ClassifyError for SetDurabilityError {
668 fn into_storage_error(self, _context: String, detail: String) -> Error {
669 Error::Storage(detail)
670 }
671}
672
673impl ClassifyError for serde_json::Error {
674 fn into_storage_error(self, _context: String, detail: String) -> Error {
675 Error::Storage(detail)
676 }
677}