use super::{
AcidStorage, Bytes, ForkInfo, MESSAGES, Offset, ProducerState, Result, STREAMS,
StoredStreamMeta, StreamConfig, StreamState,
};
use crate::protocol::error::Error;
use crate::protocol::producer::ProducerHeaders;
use crate::storage::{
CreateStreamResult, CreateWithDataResult, ForkCreateSpec, ProducerAppendResult, ProducerCheck,
ReadResult, Storage, StreamMetadata, check_producer, cleanup_stale_producers, fork,
is_stream_expired, validate_content_type, validate_seq,
};
use chrono::Utc;
use redb::{ReadableDatabase, ReadableTable};
use std::collections::HashMap;
use tokio::sync::broadcast;
use tracing::warn;
enum CrossShardForkResult {
Continue(u64, Option<String>),
AlreadyExists,
}
impl Storage for AcidStorage {
fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult> {
let shard_idx = self
.find_stream_shard_index(name)?
.unwrap_or_else(|| self.shard_index(name));
let shard = &self.shards[shard_idx];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut messages = txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
let mut removed_expired_bytes = 0_u64;
let mut removed_expired_parent = None;
if let Some(existing) = Self::read_stream_meta(&streams, name)? {
match fork::evaluate_root_create(
name,
&existing.config,
existing.state,
existing.ref_count,
&config,
) {
fork::ExistingCreateDisposition::RemoveExpired => {
removed_expired_bytes = existing.total_bytes;
removed_expired_parent = existing.fork_info.map(|info| info.source_name);
Self::delete_stream_messages(&mut messages, name)?;
streams
.remove(name)
.map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
}
fork::ExistingCreateDisposition::AlreadyExists => {
return Ok(CreateStreamResult::AlreadyExists);
}
fork::ExistingCreateDisposition::Conflict(err) => {
return Err(err);
}
}
}
let meta = Self::new_stream_meta(config);
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(messages);
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit create stream", e))?;
if removed_expired_bytes > 0 {
self.saturating_sub_total_bytes(removed_expired_bytes);
self.drop_notifier(name);
if let Some(parent) = removed_expired_parent {
self.cascade_delete_acid(&parent)?;
}
}
Ok(CreateStreamResult::Created)
}
fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset> {
let message_bytes = u64::try_from(data.len()).unwrap_or(u64::MAX);
self.reserve_total_bytes(message_bytes)?;
let result = (|| {
let shard = &self.shards[self.existing_shard_index(name)?];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut messages = txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
let mut meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::check_stream_access(&meta.config, meta.state, name)?;
if meta.closed {
return Err(Error::StreamClosed);
}
validate_content_type(&meta.config.content_type, content_type)?;
if meta.total_bytes + message_bytes > self.max_stream_bytes {
return Err(Error::StreamSizeLimitExceeded);
}
let offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
messages
.insert(
(name, meta.next_read_seq, meta.next_byte_offset),
data.as_ref(),
)
.map_err(|e| Self::storage_err("failed to append message", e))?;
meta.next_read_seq += 1;
meta.next_byte_offset += message_bytes;
meta.total_bytes += message_bytes;
meta.updated_at = Some(Utc::now());
fork::renew_ttl(&mut meta.config);
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(messages);
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit append", e))?;
Ok(offset)
})();
if result.is_err() {
self.rollback_total_bytes(message_bytes);
return result;
}
self.notify_stream(name);
result
}
fn batch_append(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
seq: Option<&str>,
) -> Result<Offset> {
if messages.is_empty() {
return Err(Error::InvalidHeader {
header: "Content-Length".to_string(),
reason: "batch cannot be empty".to_string(),
});
}
let batch_bytes = Self::batch_bytes(&messages);
self.reserve_total_bytes(batch_bytes)?;
let result = (|| {
let shard = &self.shards[self.existing_shard_index(name)?];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut message_table = txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
let mut meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::check_stream_access(&meta.config, meta.state, name)?;
if meta.closed {
return Err(Error::StreamClosed);
}
validate_content_type(&meta.config.content_type, content_type)?;
let pending_seq = validate_seq(meta.last_seq.as_deref(), seq)?;
if meta.total_bytes + batch_bytes > self.max_stream_bytes {
return Err(Error::StreamSizeLimitExceeded);
}
for data in &messages {
let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
message_table
.insert(
(name, meta.next_read_seq, meta.next_byte_offset),
data.as_ref(),
)
.map_err(|e| Self::storage_err("failed to append batch message", e))?;
meta.next_read_seq += 1;
meta.next_byte_offset += len;
meta.total_bytes += len;
}
if let Some(new_seq) = pending_seq {
meta.last_seq = Some(new_seq);
}
meta.updated_at = Some(Utc::now());
fork::renew_ttl(&mut meta.config);
let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(message_table);
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit batch append", e))?;
Ok(next_offset)
})();
if result.is_err() {
self.rollback_total_bytes(batch_bytes);
return result;
}
self.notify_stream(name);
result
}
fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult> {
let shard_idx = self.existing_shard_index(name)?;
let needs_ttl_renewal = {
let shard = &self.shards[shard_idx];
let txn = shard
.db
.begin_read()
.map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
let streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::check_stream_access(&meta.config, meta.state, name)?;
meta.config.ttl_seconds.is_some()
};
if !needs_ttl_renewal {
return self.read_without_ttl_renewal(name, from_offset, shard_idx);
}
self.read_with_ttl_renewal(name, from_offset, shard_idx)
}
fn delete(&self, name: &str) -> Result<()> {
let shard = &self.shards[self.existing_shard_index(name)?];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
match fork::evaluate_delete(name, meta.state, meta.ref_count)? {
fork::DeleteDisposition::Tombstone => {
let mut updated = meta;
updated.state = StreamState::Tombstone;
Self::write_stream_meta(&mut streams, name, &updated)?;
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit soft delete", e))?;
return Ok(());
}
fork::DeleteDisposition::HardDelete => {}
}
let mut messages = txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
Self::delete_stream_messages(&mut messages, name)?;
drop(messages);
streams
.remove(name)
.map_err(|e| Self::storage_err("failed to remove stream metadata", e))?;
let fork_info = meta.fork_info.clone();
let total_bytes = meta.total_bytes;
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit delete", e))?;
self.saturating_sub_total_bytes(total_bytes);
self.drop_notifier(name);
if let Some(fi) = fork_info {
self.cascade_delete_acid(&fi.source_name)?;
}
Ok(())
}
fn head(&self, name: &str) -> Result<StreamMetadata> {
let shard = &self.shards[self.existing_shard_index(name)?];
let txn = shard
.db
.begin_read()
.map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
let streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::check_stream_access(&meta.config, meta.state, name)?;
Ok(StreamMetadata {
config: meta.config,
next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
closed: meta.closed,
total_bytes: meta.total_bytes,
message_count: meta.next_read_seq,
created_at: meta.created_at,
updated_at: meta.updated_at,
})
}
fn close_stream(&self, name: &str) -> Result<()> {
let shard = &self.shards[self.existing_shard_index(name)?];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::check_stream_access(&meta.config, meta.state, name)?;
meta.closed = true;
meta.updated_at = Some(Utc::now());
fork::renew_ttl(&mut meta.config);
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit close stream", e))?;
self.notify_stream(name);
Ok(())
}
fn append_with_producer(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
producer: &ProducerHeaders,
should_close: bool,
seq: Option<&str>,
) -> Result<ProducerAppendResult> {
let batch_bytes = Self::batch_bytes(&messages);
self.reserve_total_bytes(batch_bytes)?;
let result = (|| {
let shard = &self.shards[self.existing_shard_index(name)?];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut message_table = txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
let mut meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::check_stream_access(&meta.config, meta.state, name)?;
cleanup_stale_producers(&mut meta.producers);
if !messages.is_empty() {
validate_content_type(&meta.config.content_type, content_type)?;
}
match check_producer(
meta.producers.get(producer.id.as_str()),
producer,
meta.closed,
)? {
ProducerCheck::Accept => {}
ProducerCheck::Duplicate { epoch, seq } => {
return Ok(ProducerAppendResult::Duplicate {
epoch,
seq,
next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
closed: meta.closed,
});
}
}
let pending_seq = validate_seq(meta.last_seq.as_deref(), seq)?;
if meta.total_bytes + batch_bytes > self.max_stream_bytes {
return Err(Error::StreamSizeLimitExceeded);
}
for data in &messages {
let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
message_table
.insert(
(name, meta.next_read_seq, meta.next_byte_offset),
data.as_ref(),
)
.map_err(|e| Self::storage_err("failed to append producer message", e))?;
meta.next_read_seq += 1;
meta.next_byte_offset += len;
meta.total_bytes += len;
}
if let Some(new_seq) = pending_seq {
meta.last_seq = Some(new_seq);
}
if should_close {
meta.closed = true;
}
let now = Utc::now();
meta.producers.insert(
producer.id.clone(),
ProducerState {
epoch: producer.epoch,
last_seq: producer.seq,
updated_at: now,
},
);
meta.updated_at = Some(now);
fork::renew_ttl(&mut meta.config);
let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
let closed = meta.closed;
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(message_table);
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit producer append", e))?;
Ok(ProducerAppendResult::Accepted {
epoch: producer.epoch,
seq: producer.seq,
next_offset,
closed,
})
})();
if result.is_err() || matches!(result, Ok(ProducerAppendResult::Duplicate { .. })) {
self.rollback_total_bytes(batch_bytes);
}
if result.is_ok() && (!messages.is_empty() || should_close) {
self.notify_stream(name);
}
result
}
fn create_stream_with_data(
&self,
name: &str,
config: StreamConfig,
messages: Vec<Bytes>,
should_close: bool,
) -> Result<CreateWithDataResult> {
let batch_bytes = Self::batch_bytes(&messages);
let mut reserved = false;
let mut removed_expired_bytes = 0_u64;
let mut removed_expired_parent = None;
let result = (|| {
let shard_idx = self
.find_stream_shard_index(name)?
.unwrap_or_else(|| self.shard_index(name));
let shard = &self.shards[shard_idx];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut message_table = txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
if let Some(existing) = Self::read_stream_meta(&streams, name)? {
match fork::evaluate_root_create(
name,
&existing.config,
existing.state,
existing.ref_count,
&config,
) {
fork::ExistingCreateDisposition::RemoveExpired => {
removed_expired_bytes = existing.total_bytes;
removed_expired_parent =
existing.fork_info.clone().map(|info| info.source_name);
Self::delete_stream_messages(&mut message_table, name)?;
streams
.remove(name)
.map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
}
fork::ExistingCreateDisposition::AlreadyExists => {
return Ok(CreateWithDataResult {
status: CreateStreamResult::AlreadyExists,
next_offset: Offset::new(
existing.next_read_seq,
existing.next_byte_offset,
),
closed: existing.closed,
});
}
fork::ExistingCreateDisposition::Conflict(err) => {
return Err(err);
}
}
}
if batch_bytes > 0 {
self.reserve_total_bytes(batch_bytes)?;
reserved = true;
}
let mut meta = Self::new_stream_meta(config);
Self::write_initial_messages(
name,
&messages,
batch_bytes,
self.max_stream_bytes,
&mut meta,
&mut message_table,
)?;
if should_close {
meta.closed = true;
}
let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
let closed = meta.closed;
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(message_table);
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit create stream with data", e))?;
Ok(CreateWithDataResult {
status: CreateStreamResult::Created,
next_offset,
closed,
})
})();
if result.is_err() && reserved {
self.rollback_total_bytes(batch_bytes);
}
if result.is_ok() {
if removed_expired_bytes > 0 {
self.saturating_sub_total_bytes(removed_expired_bytes);
self.drop_notifier(name);
if let Some(parent) = removed_expired_parent {
self.cascade_delete_acid(&parent)?;
}
}
if should_close || !messages.is_empty() {
self.notify_stream(name);
}
}
result
}
fn exists(&self, name: &str) -> bool {
let Ok(Some(shard_idx)) = self.find_stream_shard_index(name) else {
return false;
};
let shard = &self.shards[shard_idx];
let Ok(txn) = shard.db.begin_read() else {
return false;
};
let Ok(streams) = txn.open_table(STREAMS) else {
return false;
};
match Self::read_stream_meta(&streams, name) {
Ok(Some(meta)) => !is_stream_expired(&meta.config) && meta.state == StreamState::Active,
_ => false,
}
}
fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>> {
let shard_idx = self.find_stream_shard_index(name).ok().flatten()?;
let shard = &self.shards[shard_idx];
let txn = shard.db.begin_read().ok()?;
let streams = txn.open_table(STREAMS).ok()?;
let meta = Self::read_stream_meta(&streams, name).ok()??;
if is_stream_expired(&meta.config) || meta.state == StreamState::Tombstone {
return None;
}
Some(self.notifier_sender(name).subscribe())
}
fn cleanup_expired_streams(&self) -> usize {
let mut total_removed = 0;
for shard in &self.shards {
let Ok(read_txn) = shard.db.begin_read() else {
continue;
};
let Ok(streams_table) = read_txn.open_table(STREAMS) else {
continue;
};
let Ok(iter) = streams_table.iter() else {
continue;
};
let mut candidates: Vec<String> = Vec::new();
for item in iter {
let Ok((key, value)) = item else {
continue;
};
let name = key.value().to_string();
let Ok(meta) = serde_json::from_slice::<StoredStreamMeta>(value.value()) else {
continue;
};
if is_stream_expired(&meta.config) {
candidates.push(name);
}
}
drop(streams_table);
drop(read_txn);
if candidates.is_empty() {
continue;
}
let Ok(txn) = Self::begin_write_txn(&shard.db) else {
continue;
};
let Ok(mut streams) = txn.open_table(STREAMS) else {
continue;
};
let Ok(mut messages) = txn.open_table(MESSAGES) else {
continue;
};
let mut committed = Vec::new();
for name in &candidates {
let meta = streams
.get(name.as_str())
.ok()
.flatten()
.and_then(|v| serde_json::from_slice::<StoredStreamMeta>(v.value()).ok());
let Some(meta) = meta else { continue };
if !is_stream_expired(&meta.config) {
continue;
}
match fork::evaluate_expired_cleanup(meta.ref_count) {
fork::DeleteDisposition::Tombstone => {
let mut updated = meta.clone();
updated.state = StreamState::Tombstone;
let payload = serde_json::to_vec(&updated).ok();
if let Some(payload) = payload {
let _ = streams.insert(name.as_str(), payload.as_slice());
}
committed.push((name.clone(), 0, None));
}
fork::DeleteDisposition::HardDelete => {
let _ = Self::delete_stream_messages(&mut messages, name);
let _ = streams.remove(name.as_str());
committed.push((
name.clone(),
meta.total_bytes,
meta.fork_info.map(|info| info.source_name),
));
}
}
}
drop(messages);
drop(streams);
match txn.commit() {
Ok(()) => {
let committed_len = committed.len();
for (name, bytes, parent) in committed {
self.rollback_total_bytes(bytes);
self.drop_notifier(&name);
if let Some(parent) = parent {
let _ = self.cascade_delete_acid(&parent);
}
}
total_removed += committed_len;
}
Err(e) => {
warn!(%e, "failed to commit expired stream cleanup");
}
}
}
total_removed
}
fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>> {
let mut result = Vec::new();
for shard in &self.shards {
let read_txn = shard
.db
.begin_read()
.map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
let streams_table = read_txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let iter = streams_table
.iter()
.map_err(|e| Self::storage_err("failed to iterate streams", e))?;
for item in iter {
let (key, value) =
item.map_err(|e| Self::storage_err("failed to read stream entry", e))?;
let name = key.value().to_string();
let meta: StoredStreamMeta = serde_json::from_slice(value.value())
.map_err(|e| Self::storage_err("failed to parse stream metadata", e))?;
if is_stream_expired(&meta.config) || meta.state == StreamState::Tombstone {
continue;
}
result.push((
name,
StreamMetadata {
config: meta.config,
next_offset: Offset::new(meta.next_read_seq, meta.next_byte_offset),
closed: meta.closed,
total_bytes: meta.total_bytes,
message_count: meta.next_read_seq,
created_at: meta.created_at,
updated_at: meta.updated_at,
},
));
}
}
result.sort_by(|a, b| a.0.cmp(&b.0));
Ok(result)
}
fn create_fork(
&self,
name: &str,
source_name: &str,
fork_offset: Option<&Offset>,
config: StreamConfig,
) -> Result<CreateStreamResult> {
let source_shard_idx = self.existing_shard_index(source_name)?;
let source_shard = &self.shards[source_shard_idx];
let source_read_txn = source_shard
.db
.begin_read()
.map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
let source_read_streams = source_read_txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let source_meta = Self::read_stream_meta(&source_read_streams, source_name)?
.ok_or_else(|| Error::NotFound(source_name.to_string()))?;
fork::check_fork_source_access(&source_meta.config, source_meta.state, source_name)?;
let source_next_offset =
Offset::new(source_meta.next_read_seq, source_meta.next_byte_offset);
let resolved_offset = fork::resolve_fork_offset(fork_offset, &source_next_offset)?;
if !config
.content_type
.eq_ignore_ascii_case(&source_meta.config.content_type)
{
return Err(Error::ContentTypeMismatch {
expected: source_meta.config.content_type.clone(),
actual: config.content_type.clone(),
});
}
let fork_spec = fork::build_fork_create_spec(
source_name,
&source_meta.config,
&config,
resolved_offset.clone(),
);
let (mut removed_expired_bytes, mut removed_expired_parent) =
match self.remove_cross_shard_existing_fork(name, source_shard_idx, &fork_spec)? {
CrossShardForkResult::Continue(bytes, parent) => (bytes, parent),
CrossShardForkResult::AlreadyExists => {
return Ok(CreateStreamResult::AlreadyExists);
}
};
let shard = &self.shards[source_shard_idx];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut messages = txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
let mut source_meta = Self::read_stream_meta(&streams, source_name)?
.ok_or_else(|| Error::NotFound(source_name.to_string()))?;
if let Some(existing) = Self::read_stream_meta(&streams, name)? {
match fork::evaluate_fork_create(
name,
&existing.config,
existing.fork_info.as_ref(),
existing.state,
existing.ref_count,
&fork_spec,
) {
fork::ExistingCreateDisposition::RemoveExpired => {
removed_expired_bytes = existing.total_bytes;
removed_expired_parent =
existing.fork_info.clone().map(|info| info.source_name);
Self::delete_stream_messages(&mut messages, name)?;
streams
.remove(name)
.map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
}
fork::ExistingCreateDisposition::AlreadyExists => {
return Ok(CreateStreamResult::AlreadyExists);
}
fork::ExistingCreateDisposition::Conflict(err) => {
return Err(err);
}
}
}
let fork_meta = Self::build_fork_stored_meta(&fork_spec, &config, &resolved_offset);
Self::write_stream_meta(&mut streams, name, &fork_meta)?;
source_meta.ref_count += 1;
Self::write_stream_meta(&mut streams, source_name, &source_meta)?;
drop(messages);
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit create fork", e))?;
self.cleanup_expired_and_notify(name, removed_expired_bytes, removed_expired_parent)?;
Ok(CreateStreamResult::Created)
}
}
impl AcidStorage {
fn read_without_ttl_renewal(
&self,
name: &str,
from_offset: &Offset,
shard_idx: usize,
) -> Result<ReadResult> {
let shard = &self.shards[shard_idx];
let txn = shard
.db
.begin_read()
.map_err(|e| Self::storage_err("failed to begin read transaction", e))?;
let streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
if from_offset.is_now() {
return Ok(ReadResult {
messages: Vec::new(),
next_offset,
at_tail: true,
closed: meta.closed,
});
}
if meta.fork_info.is_none() {
drop(streams);
drop(txn);
let messages = self.read_non_forked_table_messages(name, from_offset, shard_idx)?;
return Ok(ReadResult {
messages,
next_offset,
at_tail: true,
closed: meta.closed,
});
}
let fi = meta.fork_info.clone().expect("checked above");
let closed = meta.closed;
drop(streams);
drop(txn);
let all_messages = self.collect_fork_chain_messages(name, from_offset, &fi)?;
Ok(ReadResult {
messages: all_messages,
next_offset,
at_tail: true,
closed,
})
}
fn read_with_ttl_renewal(
&self,
name: &str,
from_offset: &Offset,
shard_idx: usize,
) -> Result<ReadResult> {
let shard = &self.shards[shard_idx];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::check_stream_access(&meta.config, meta.state, name)?;
let next_offset = Offset::new(meta.next_read_seq, meta.next_byte_offset);
let result = if from_offset.is_now() {
ReadResult {
messages: Vec::new(),
next_offset,
at_tail: true,
closed: meta.closed,
}
} else if meta.fork_info.is_none() {
let messages = self.read_non_forked_table_messages(name, from_offset, shard_idx)?;
ReadResult {
messages,
next_offset,
at_tail: true,
closed: meta.closed,
}
} else {
let fi = meta.fork_info.clone().expect("checked above");
let closed = meta.closed;
drop(streams);
drop(txn);
let all_messages = self.collect_fork_chain_messages(name, from_offset, &fi)?;
let shard = &self.shards[shard_idx];
let txn = Self::begin_write_txn(&shard.db)?;
let mut streams = txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut meta = Self::read_stream_meta(&streams, name)?
.ok_or_else(|| Error::NotFound(name.to_string()))?;
fork::renew_ttl(&mut meta.config);
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit ttl renewal", e))?;
return Ok(ReadResult {
messages: all_messages,
next_offset,
at_tail: true,
closed,
});
};
fork::renew_ttl(&mut meta.config);
Self::write_stream_meta(&mut streams, name, &meta)?;
drop(streams);
txn.commit()
.map_err(|e| Self::storage_err("failed to commit ttl renewal", e))?;
Ok(result)
}
fn write_initial_messages(
name: &str,
messages: &[Bytes],
batch_bytes: u64,
max_stream_bytes: u64,
meta: &mut StoredStreamMeta,
message_table: &mut redb::Table<'_, (&str, u64, u64), &[u8]>,
) -> Result<()> {
if batch_bytes == 0 {
return Ok(());
}
if meta.total_bytes + batch_bytes > max_stream_bytes {
return Err(Error::StreamSizeLimitExceeded);
}
for data in messages {
let len = u64::try_from(data.len()).unwrap_or(u64::MAX);
message_table
.insert(
(name, meta.next_read_seq, meta.next_byte_offset),
data.as_ref(),
)
.map_err(|e| Self::storage_err("failed to append create-with-data message", e))?;
meta.next_read_seq += 1;
meta.next_byte_offset += len;
meta.total_bytes += len;
}
Ok(())
}
#[allow(clippy::type_complexity)]
fn remove_cross_shard_existing_fork(
&self,
name: &str,
source_shard_idx: usize,
fork_spec: &ForkCreateSpec,
) -> Result<CrossShardForkResult> {
let Some(existing_shard_idx) = self.find_stream_shard_index(name)? else {
return Ok(CrossShardForkResult::Continue(0, None));
};
if existing_shard_idx == source_shard_idx {
return Ok(CrossShardForkResult::Continue(0, None));
}
let existing_shard = &self.shards[existing_shard_idx];
let existing_txn = Self::begin_write_txn(&existing_shard.db)?;
let mut existing_streams = existing_txn
.open_table(STREAMS)
.map_err(|e| Self::storage_err("failed to open streams table", e))?;
let mut existing_messages = existing_txn
.open_table(MESSAGES)
.map_err(|e| Self::storage_err("failed to open messages table", e))?;
let Some(existing) = Self::read_stream_meta(&existing_streams, name)? else {
return Ok(CrossShardForkResult::Continue(0, None));
};
match fork::evaluate_fork_create(
name,
&existing.config,
existing.fork_info.as_ref(),
existing.state,
existing.ref_count,
fork_spec,
) {
fork::ExistingCreateDisposition::RemoveExpired => {
let removed_bytes = existing.total_bytes;
let removed_parent = existing.fork_info.clone().map(|info| info.source_name);
Self::delete_stream_messages(&mut existing_messages, name)?;
existing_streams
.remove(name)
.map_err(|e| Self::storage_err("failed to remove expired stream", e))?;
drop(existing_messages);
drop(existing_streams);
existing_txn.commit().map_err(|e| {
Self::storage_err("failed to commit expired cross-shard fork removal", e)
})?;
Ok(CrossShardForkResult::Continue(
removed_bytes,
removed_parent,
))
}
fork::ExistingCreateDisposition::AlreadyExists => {
Ok(CrossShardForkResult::AlreadyExists)
}
fork::ExistingCreateDisposition::Conflict(err) => Err(err),
}
}
fn build_fork_stored_meta(
fork_spec: &ForkCreateSpec,
config: &StreamConfig,
resolved_offset: &Offset,
) -> StoredStreamMeta {
let (fork_read_seq, fork_byte_offset) =
resolved_offset.parse_components().unwrap_or((0, 0));
StoredStreamMeta {
config: fork_spec.config.clone(),
closed: config.created_closed,
next_read_seq: fork_read_seq,
next_byte_offset: fork_byte_offset,
total_bytes: 0,
created_at: Utc::now(),
updated_at: None,
last_seq: None,
producers: HashMap::new(),
fork_info: Some(ForkInfo {
source_name: fork_spec.source_name.clone(),
fork_offset: resolved_offset.clone(),
}),
ref_count: 0,
state: StreamState::Active,
}
}
fn cleanup_expired_and_notify(
&self,
name: &str,
removed_expired_bytes: u64,
removed_expired_parent: Option<String>,
) -> Result<()> {
if removed_expired_bytes > 0 {
self.saturating_sub_total_bytes(removed_expired_bytes);
self.drop_notifier(name);
if let Some(parent) = removed_expired_parent {
self.cascade_delete_acid(&parent)?;
}
}
self.notifier_sender(name);
Ok(())
}
}