use crate::protocol::error::{Error, Result};
use crate::protocol::producer::ProducerHeaders;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use super::StreamConfig;
pub(crate) const PRODUCER_STATE_TTL_SECS: i64 = 7 * 24 * 60 * 60;
pub(crate) const NOTIFY_CHANNEL_CAPACITY: usize = 16;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(crate) struct ProducerState {
pub epoch: u64,
pub last_seq: u64,
pub updated_at: DateTime<Utc>,
}
pub(crate) enum ProducerCheck {
Accept,
Duplicate { epoch: u64, seq: u64 },
}
pub(crate) fn is_stream_expired(config: &StreamConfig) -> bool {
config
.expires_at
.is_some_and(|expires_at| Utc::now() >= expires_at)
}
pub(crate) fn validate_content_type(stream_ct: &str, request_ct: &str) -> Result<()> {
if !request_ct.eq_ignore_ascii_case(stream_ct) {
return Err(Error::ContentTypeMismatch {
expected: stream_ct.to_string(),
actual: request_ct.to_string(),
});
}
Ok(())
}
pub(crate) fn validate_seq(
last_seq: Option<&str>,
new_seq: Option<&str>,
) -> Result<Option<String>> {
if let Some(new) = new_seq {
if let Some(last) = last_seq
&& new <= last
{
return Err(Error::SeqOrderingViolation {
last: last.to_string(),
received: new.to_string(),
});
}
return Ok(Some(new.to_string()));
}
Ok(None)
}
pub(crate) fn cleanup_stale_producers(producers: &mut HashMap<String, ProducerState>) {
let cutoff = Utc::now()
- chrono::TimeDelta::try_seconds(PRODUCER_STATE_TTL_SECS)
.expect("7 days fits in TimeDelta");
producers.retain(|_, state| state.updated_at > cutoff);
}
pub(crate) fn check_producer(
existing: Option<&ProducerState>,
producer: &ProducerHeaders,
stream_closed: bool,
) -> Result<ProducerCheck> {
if let Some(state) = existing {
if producer.epoch < state.epoch {
return Err(Error::EpochFenced {
current: state.epoch,
received: producer.epoch,
});
}
if producer.epoch == state.epoch && producer.seq <= state.last_seq {
return Ok(ProducerCheck::Duplicate {
epoch: state.epoch,
seq: state.last_seq,
});
}
if stream_closed {
return Err(Error::StreamClosed);
}
if producer.epoch > state.epoch {
if producer.seq != 0 {
return Err(Error::InvalidProducerState(
"new epoch must start at seq 0".to_string(),
));
}
} else if producer.seq > state.last_seq + 1 {
return Err(Error::SequenceGap {
expected: state.last_seq + 1,
actual: producer.seq,
});
}
} else {
if stream_closed {
return Err(Error::StreamClosed);
}
if producer.seq != 0 {
return Err(Error::SequenceGap {
expected: 0,
actual: producer.seq,
});
}
}
Ok(ProducerCheck::Accept)
}