use std::future::Future;
use std::time::Duration;
use super::error::{CapabilityError, ErrorCategory};
use crate::context::Context;
use crate::experience_store::{EventQuery, ExperienceEventEnvelope, TimeRange};
#[derive(Debug, Clone)]
pub enum StoreError {
Unavailable {
message: String,
},
SerializationFailed {
message: String,
},
Conflict {
event_id: String,
},
InvalidQuery {
message: String,
},
AuthFailed {
message: String,
},
RateLimited {
retry_after: Duration,
},
Timeout {
elapsed: Duration,
deadline: Duration,
},
NotFound {
message: String,
},
InvariantViolation {
message: String,
},
Internal {
message: String,
},
}
impl std::fmt::Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unavailable { message } => {
write!(f, "store unavailable: {}", message)
}
Self::SerializationFailed { message } => {
write!(f, "serialization failed: {}", message)
}
Self::Conflict { event_id } => {
write!(f, "event already exists: {}", event_id)
}
Self::InvalidQuery { message } => {
write!(f, "invalid query: {}", message)
}
Self::AuthFailed { message } => {
write!(f, "store auth failed: {}", message)
}
Self::RateLimited { retry_after } => {
write!(f, "rate limited, retry after {:?}", retry_after)
}
Self::Timeout { elapsed, deadline } => {
write!(
f,
"store operation timed out after {:?} (deadline: {:?})",
elapsed, deadline
)
}
Self::NotFound { message } => {
write!(f, "not found: {}", message)
}
Self::InvariantViolation { message } => {
write!(f, "invariant violation: {}", message)
}
Self::Internal { message } => {
write!(f, "internal store error: {}", message)
}
}
}
}
impl std::error::Error for StoreError {}
impl CapabilityError for StoreError {
fn category(&self) -> ErrorCategory {
match self {
Self::Unavailable { .. } => ErrorCategory::Unavailable,
Self::SerializationFailed { .. } => ErrorCategory::InvalidInput,
Self::Conflict { .. } => ErrorCategory::Conflict,
Self::InvalidQuery { .. } => ErrorCategory::InvalidInput,
Self::AuthFailed { .. } => ErrorCategory::Auth,
Self::RateLimited { .. } => ErrorCategory::RateLimit,
Self::Timeout { .. } => ErrorCategory::Timeout,
Self::NotFound { .. } => ErrorCategory::NotFound,
Self::InvariantViolation { .. } => ErrorCategory::InvariantViolation,
Self::Internal { .. } => ErrorCategory::Internal,
}
}
fn is_transient(&self) -> bool {
match self {
Self::Unavailable { .. } => true,
Self::SerializationFailed { .. } => false,
Self::Conflict { .. } => false,
Self::InvalidQuery { .. } => false,
Self::AuthFailed { .. } => false,
Self::RateLimited { .. } => true,
Self::Timeout { .. } => true,
Self::NotFound { .. } => false,
Self::InvariantViolation { .. } => false,
Self::Internal { .. } => false,
}
}
fn is_retryable(&self) -> bool {
match self {
Self::Unavailable { .. } => true,
Self::RateLimited { .. } => true,
Self::Timeout { .. } => true,
Self::Conflict { .. } => true,
Self::SerializationFailed { .. } => false,
Self::InvalidQuery { .. } => false,
Self::AuthFailed { .. } => false,
Self::NotFound { .. } => false,
Self::InvariantViolation { .. } => false,
Self::Internal { .. } => false,
}
}
fn retry_after(&self) -> Option<Duration> {
match self {
Self::RateLimited { retry_after } => Some(*retry_after),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct ReplayCursor {
pub position: String,
pub has_more: bool,
}
impl ReplayCursor {
#[must_use]
pub fn start() -> Self {
Self {
position: "".to_string(),
has_more: true,
}
}
#[must_use]
pub fn at(position: impl Into<String>) -> Self {
Self {
position: position.into(),
has_more: true,
}
}
#[must_use]
pub fn end() -> Self {
Self {
position: "".to_string(),
has_more: false,
}
}
}
#[derive(Debug, Clone)]
pub struct ReplayBatch {
pub events: Vec<ExperienceEventEnvelope>,
pub cursor: ReplayCursor,
}
#[derive(Debug, Clone, Default)]
pub struct ReplayOptions {
pub batch_size: Option<usize>,
pub time_range: Option<TimeRange>,
pub tenant_id: Option<String>,
pub correlation_id: Option<String>,
}
impl ReplayOptions {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = Some(size);
self
}
#[must_use]
pub fn with_time_range(mut self, range: TimeRange) -> Self {
self.time_range = Some(range);
self
}
#[must_use]
pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
self.tenant_id = Some(tenant_id.into());
self
}
#[must_use]
pub fn with_correlation(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
#[must_use]
pub fn to_event_query(&self) -> EventQuery {
EventQuery {
tenant_id: self.tenant_id.clone(),
time_range: self.time_range.clone(),
kinds: Vec::new(),
correlation_id: self.correlation_id.clone(),
chain_id: None,
limit: self.batch_size,
}
}
}
pub trait ExperienceAppender: Send + Sync {
type AppendFut<'a>: Future<Output = Result<(), StoreError>> + Send + 'a
where
Self: 'a;
fn append<'a>(&'a self, events: &'a [ExperienceEventEnvelope]) -> Self::AppendFut<'a>;
}
pub trait ExperienceReplayer: Send + Sync {
type ReplayFut<'a>: Future<Output = Result<ReplayBatch, StoreError>> + Send + 'a
where
Self: 'a;
type QueryFut<'a>: Future<Output = Result<Vec<ExperienceEventEnvelope>, StoreError>> + Send + 'a
where
Self: 'a;
fn replay<'a>(
&'a self,
options: &'a ReplayOptions,
cursor: &'a ReplayCursor,
) -> Self::ReplayFut<'a>;
fn query<'a>(&'a self, query: &'a EventQuery) -> Self::QueryFut<'a>;
}
pub trait ContextStore: Send + Sync {
type LoadFut<'a>: Future<Output = Result<Option<Context>, StoreError>> + Send + 'a
where
Self: 'a;
type SaveFut<'a>: Future<Output = Result<(), StoreError>> + Send + 'a
where
Self: 'a;
fn load_context<'a>(&'a self, scope_id: &'a str) -> Self::LoadFut<'a>;
fn save_context<'a>(&'a self, scope_id: &'a str, context: &'a Context) -> Self::SaveFut<'a>;
}
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub trait DynExperienceAppender: Send + Sync {
fn append<'a>(
&'a self,
events: &'a [ExperienceEventEnvelope],
) -> BoxFuture<'a, Result<(), StoreError>>;
}
impl<T: ExperienceAppender> DynExperienceAppender for T {
fn append<'a>(
&'a self,
events: &'a [ExperienceEventEnvelope],
) -> BoxFuture<'a, Result<(), StoreError>> {
Box::pin(ExperienceAppender::append(self, events))
}
}
pub trait DynExperienceReplayer: Send + Sync {
fn replay<'a>(
&'a self,
options: &'a ReplayOptions,
cursor: &'a ReplayCursor,
) -> BoxFuture<'a, Result<ReplayBatch, StoreError>>;
fn query<'a>(
&'a self,
query: &'a EventQuery,
) -> BoxFuture<'a, Result<Vec<ExperienceEventEnvelope>, StoreError>>;
}
impl<T: ExperienceReplayer> DynExperienceReplayer for T {
fn replay<'a>(
&'a self,
options: &'a ReplayOptions,
cursor: &'a ReplayCursor,
) -> BoxFuture<'a, Result<ReplayBatch, StoreError>> {
Box::pin(ExperienceReplayer::replay(self, options, cursor))
}
fn query<'a>(
&'a self,
query: &'a EventQuery,
) -> BoxFuture<'a, Result<Vec<ExperienceEventEnvelope>, StoreError>> {
Box::pin(ExperienceReplayer::query(self, query))
}
}
pub trait DynContextStore: Send + Sync {
fn load_context<'a>(
&'a self,
scope_id: &'a str,
) -> BoxFuture<'a, Result<Option<Context>, StoreError>>;
fn save_context<'a>(
&'a self,
scope_id: &'a str,
context: &'a Context,
) -> BoxFuture<'a, Result<(), StoreError>>;
}
impl<T: ContextStore> DynContextStore for T {
fn load_context<'a>(
&'a self,
scope_id: &'a str,
) -> BoxFuture<'a, Result<Option<Context>, StoreError>> {
Box::pin(ContextStore::load_context(self, scope_id))
}
fn save_context<'a>(
&'a self,
scope_id: &'a str,
context: &'a Context,
) -> BoxFuture<'a, Result<(), StoreError>> {
Box::pin(ContextStore::save_context(self, scope_id, context))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn store_error_display() {
let err = StoreError::Conflict {
event_id: "evt-123".to_string(),
};
assert!(err.to_string().contains("evt-123"));
}
#[test]
fn store_error_category_classification() {
assert_eq!(
StoreError::Unavailable {
message: "test".to_string()
}
.category(),
ErrorCategory::Unavailable
);
assert_eq!(
StoreError::Conflict {
event_id: "test".to_string()
}
.category(),
ErrorCategory::Conflict
);
assert_eq!(
StoreError::InvariantViolation {
message: "test".to_string()
}
.category(),
ErrorCategory::InvariantViolation
);
assert_eq!(
StoreError::RateLimited {
retry_after: Duration::from_secs(60)
}
.category(),
ErrorCategory::RateLimit
);
}
#[test]
fn store_error_transient_classification() {
assert!(
StoreError::Unavailable {
message: "test".to_string()
}
.is_transient()
);
assert!(
StoreError::RateLimited {
retry_after: Duration::from_secs(60)
}
.is_transient()
);
assert!(
StoreError::Timeout {
elapsed: Duration::from_secs(30),
deadline: Duration::from_secs(30),
}
.is_transient()
);
assert!(
!StoreError::Conflict {
event_id: "test".to_string()
}
.is_transient()
);
assert!(
!StoreError::SerializationFailed {
message: "test".to_string()
}
.is_transient()
);
assert!(
!StoreError::InvariantViolation {
message: "test".to_string()
}
.is_transient()
);
}
#[test]
fn store_error_retryable_classification() {
assert!(
StoreError::Unavailable {
message: "test".to_string()
}
.is_retryable()
);
assert!(
StoreError::RateLimited {
retry_after: Duration::from_secs(60)
}
.is_retryable()
);
assert!(
StoreError::Timeout {
elapsed: Duration::from_secs(30),
deadline: Duration::from_secs(30),
}
.is_retryable()
);
assert!(
StoreError::Conflict {
event_id: "test".to_string()
}
.is_retryable()
);
assert!(
!StoreError::SerializationFailed {
message: "test".to_string()
}
.is_retryable()
);
assert!(
!StoreError::InvariantViolation {
message: "test".to_string()
}
.is_retryable()
);
assert!(
!StoreError::AuthFailed {
message: "test".to_string()
}
.is_retryable()
);
}
#[test]
fn store_error_retry_after() {
let err = StoreError::RateLimited {
retry_after: Duration::from_secs(60),
};
assert_eq!(err.retry_after(), Some(Duration::from_secs(60)));
let err2 = StoreError::Unavailable {
message: "test".to_string(),
};
assert_eq!(err2.retry_after(), None);
}
#[test]
fn replay_cursor_factories() {
let start = ReplayCursor::start();
assert!(start.has_more);
assert!(start.position.is_empty());
let at = ReplayCursor::at("pos-123");
assert!(at.has_more);
assert_eq!(at.position, "pos-123");
let end = ReplayCursor::end();
assert!(!end.has_more);
}
#[test]
fn replay_options_builder() {
let opts = ReplayOptions::new()
.with_batch_size(100)
.with_tenant("tenant-1")
.with_correlation("corr-1");
assert_eq!(opts.batch_size, Some(100));
assert_eq!(opts.tenant_id, Some("tenant-1".to_string()));
assert_eq!(opts.correlation_id, Some("corr-1".to_string()));
}
#[test]
fn replay_options_to_event_query() {
let opts = ReplayOptions::new()
.with_batch_size(50)
.with_tenant("tenant-2");
let query = opts.to_event_query();
assert_eq!(query.limit, Some(50));
assert_eq!(query.tenant_id, Some("tenant-2".to_string()));
}
}