use crate::protocol::error::{Error, Result};
use crate::protocol::offset::Offset;
use crate::storage::{ForkCreateSpec, ForkInfo, StreamConfig, StreamState};
use chrono::{DateTime, Utc};
pub(crate) fn check_stream_access(
config: &StreamConfig,
state: StreamState,
name: &str,
) -> Result<()> {
if state == StreamState::Tombstone {
return Err(Error::StreamGone(name.to_string()));
}
if super::shared::is_stream_expired(config) {
return Err(Error::StreamExpired);
}
Ok(())
}
pub(crate) fn check_fork_source_access(
config: &StreamConfig,
state: StreamState,
name: &str,
) -> Result<()> {
if state == StreamState::Tombstone {
return Err(Error::ForkFromTombstone(name.to_string()));
}
if super::shared::is_stream_expired(config) {
return Err(Error::StreamExpired);
}
Ok(())
}
pub(crate) fn resolve_fork_ttl(
source_config: &StreamConfig,
fork_ttl: Option<u64>,
fork_expires_at: Option<DateTime<Utc>>,
) -> (Option<u64>, Option<DateTime<Utc>>) {
if let Some(ttl) = fork_ttl {
let expires_at =
Utc::now() + chrono::Duration::seconds(i64::try_from(ttl).unwrap_or(i64::MAX));
return (Some(ttl), Some(expires_at));
}
if let Some(expires_at) = fork_expires_at {
return (None, Some(expires_at));
}
if let Some(ttl) = source_config.ttl_seconds {
let expires_at =
Utc::now() + chrono::Duration::seconds(i64::try_from(ttl).unwrap_or(i64::MAX));
return (Some(ttl), Some(expires_at));
}
if let Some(expires_at) = source_config.expires_at {
return (None, Some(expires_at));
}
(None, None)
}
#[must_use]
pub(crate) fn build_fork_config(
source_config: &StreamConfig,
requested_config: &StreamConfig,
) -> StreamConfig {
let (fork_ttl, fork_expires_at) = resolve_fork_ttl(
source_config,
requested_config.ttl_seconds,
requested_config.expires_at,
);
let mut config = StreamConfig::new(source_config.content_type.clone());
if let Some(ttl) = fork_ttl {
config = config.with_ttl(ttl);
}
if let Some(expires_at) = fork_expires_at {
config = config.with_expires_at(expires_at);
}
if requested_config.created_closed {
config = config.with_created_closed(true);
}
config
}
#[must_use]
pub(crate) fn build_fork_create_spec(
source_name: &str,
source_config: &StreamConfig,
requested_config: &StreamConfig,
fork_offset: Offset,
) -> ForkCreateSpec {
ForkCreateSpec {
source_name: source_name.to_string(),
fork_offset,
config: build_fork_config(source_config, requested_config),
}
}
pub(crate) fn resolve_fork_offset(
requested: Option<&Offset>,
source_next_offset: &Offset,
) -> Result<Offset> {
match requested {
Some(offset) => {
if offset > source_next_offset {
Err(Error::ForkOffsetBeyondTail)
} else {
Ok(offset.clone())
}
}
None => Ok(source_next_offset.clone()),
}
}
pub(crate) enum ExistingCreateDisposition {
RemoveExpired,
AlreadyExists,
Conflict(Error),
}
#[must_use]
pub(crate) fn evaluate_root_create(
name: &str,
existing_config: &StreamConfig,
existing_state: StreamState,
existing_ref_count: u32,
requested_config: &StreamConfig,
) -> ExistingCreateDisposition {
if super::shared::is_stream_expired(existing_config) {
if existing_ref_count > 0 {
return ExistingCreateDisposition::Conflict(Error::StreamPathBlocked(name.to_string()));
}
return ExistingCreateDisposition::RemoveExpired;
}
if existing_state == StreamState::Tombstone {
return ExistingCreateDisposition::Conflict(Error::StreamPathBlocked(name.to_string()));
}
if existing_config == requested_config {
ExistingCreateDisposition::AlreadyExists
} else {
ExistingCreateDisposition::Conflict(Error::ConfigMismatch)
}
}
#[must_use]
pub(crate) fn evaluate_fork_create(
name: &str,
existing_config: &StreamConfig,
existing_fork_info: Option<&ForkInfo>,
existing_state: StreamState,
existing_ref_count: u32,
requested_spec: &ForkCreateSpec,
) -> ExistingCreateDisposition {
if super::shared::is_stream_expired(existing_config) {
if existing_ref_count > 0 {
return ExistingCreateDisposition::Conflict(Error::StreamPathBlocked(name.to_string()));
}
return ExistingCreateDisposition::RemoveExpired;
}
if existing_state == StreamState::Tombstone {
return ExistingCreateDisposition::Conflict(Error::StreamPathBlocked(name.to_string()));
}
let expected_fork_info = ForkInfo {
source_name: requested_spec.source_name.clone(),
fork_offset: requested_spec.fork_offset.clone(),
};
if existing_config == &requested_spec.config && existing_fork_info == Some(&expected_fork_info)
{
ExistingCreateDisposition::AlreadyExists
} else {
ExistingCreateDisposition::Conflict(Error::ConfigMismatch)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DeleteDisposition {
Tombstone,
HardDelete,
}
pub(crate) fn evaluate_delete(
name: &str,
state: StreamState,
ref_count: u32,
) -> Result<DeleteDisposition> {
if state == StreamState::Tombstone {
return Err(Error::StreamGone(name.to_string()));
}
if ref_count > 0 {
Ok(DeleteDisposition::Tombstone)
} else {
Ok(DeleteDisposition::HardDelete)
}
}
#[must_use]
pub(crate) fn evaluate_expired_cleanup(ref_count: u32) -> DeleteDisposition {
if ref_count > 0 {
DeleteDisposition::Tombstone
} else {
DeleteDisposition::HardDelete
}
}
pub(crate) fn renew_ttl(config: &mut StreamConfig) -> bool {
if let Some(ttl) = config.ttl_seconds {
config.expires_at =
Some(Utc::now() + chrono::Duration::seconds(i64::try_from(ttl).unwrap_or(i64::MAX)));
true
} else {
false
}
}
pub(crate) struct ReadSegment {
pub name: String,
pub read_up_to: Option<Offset>,
}
pub(crate) fn build_read_plan(
name: &str,
lookup: impl Fn(&str) -> Option<Option<ForkInfo>>,
) -> Vec<ReadSegment> {
let mut chain: Vec<(String, Option<Offset>)> = Vec::new();
let mut current = name.to_string();
loop {
if let Some(Some(fork_info)) = lookup(¤t) {
chain.push((current.clone(), Some(fork_info.fork_offset.clone())));
current.clone_from(&fork_info.source_name);
} else {
chain.push((current, None));
break;
}
}
chain.reverse();
let len = chain.len();
let mut segments = Vec::with_capacity(len);
for i in 0..len {
let (ref seg_name, _) = chain[i];
let read_up_to = if i + 1 < len {
chain[i + 1].1.clone()
} else {
None
};
segments.push(ReadSegment {
name: seg_name.clone(),
read_up_to,
});
}
segments
}