use std::sync::Arc;
#[cfg(any(test, feature = "testing"))]
use std::collections::HashMap;
#[cfg(any(test, feature = "testing"))]
use tokio::sync::RwLock;
use time::OffsetDateTime;
use crate::{
error::EngineError,
ids::{DeadlineId, ProcessId, StreamId, TenantId},
version::WorkflowId,
};
#[allow(clippy::struct_field_names)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Deadline {
deadline_id: DeadlineId,
stream_id: StreamId,
process_id: ProcessId,
tenant_id: TenantId,
workflow_id: WorkflowId,
label: Box<str>,
due_at: OffsetDateTime,
created_at: OffsetDateTime,
}
impl Deadline {
#[must_use]
pub fn new(
stream_id: StreamId,
process_id: ProcessId,
tenant_id: TenantId,
workflow_id: WorkflowId,
label: impl Into<Box<str>>,
due_at: OffsetDateTime,
) -> Self {
Self {
deadline_id: DeadlineId::new(),
stream_id,
process_id,
tenant_id,
workflow_id,
label: label.into(),
due_at,
created_at: OffsetDateTime::now_utc(),
}
}
#[must_use]
pub fn is_due(&self, now: OffsetDateTime) -> bool {
self.due_at <= now
}
#[must_use]
pub fn deadline_id(&self) -> DeadlineId {
self.deadline_id
}
#[must_use]
pub fn stream_id(&self) -> &StreamId {
&self.stream_id
}
#[must_use]
pub fn process_id(&self) -> ProcessId {
self.process_id
}
#[must_use]
pub fn tenant_id(&self) -> TenantId {
self.tenant_id
}
#[must_use]
pub fn workflow_id(&self) -> &WorkflowId {
&self.workflow_id
}
#[must_use]
pub fn label(&self) -> &str {
&self.label
}
#[must_use]
pub fn due_at(&self) -> OffsetDateTime {
self.due_at
}
#[must_use]
pub fn created_at(&self) -> OffsetDateTime {
self.created_at
}
}
#[derive(Debug, Clone)]
pub struct DueNowResult {
pub deadlines: Vec<Deadline>,
pub has_more: bool,
}
#[allow(async_fn_in_trait)]
pub trait DeadlineStore: Send + Sync {
async fn register(&self, deadline: &Deadline) -> Result<(), EngineError>;
async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError>;
async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError>;
async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError>;
async fn len(&self) -> Result<usize, EngineError>;
async fn is_empty(&self) -> Result<bool, EngineError> {
Ok(self.len().await? == 0)
}
async fn overdue_count(&self) -> Result<usize, EngineError> {
const LIMIT: usize = 10_000;
let result = self.due_now(LIMIT).await?;
Ok(if result.has_more {
LIMIT
} else {
result.deadlines.len()
})
}
}
impl<S: DeadlineStore> DeadlineStore for Arc<S> {
async fn register(&self, deadline: &Deadline) -> Result<(), EngineError> {
self.as_ref().register(deadline).await
}
async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError> {
self.as_ref().cancel(id).await
}
async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError> {
self.as_ref().due_now(limit).await
}
async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
self.as_ref().for_stream(stream_id).await
}
async fn len(&self) -> Result<usize, EngineError> {
self.as_ref().len().await
}
async fn overdue_count(&self) -> Result<usize, EngineError> {
self.as_ref().overdue_count().await
}
}
#[derive(Debug, Clone, Copy, Default)]
#[must_use = "NoopDeadlineStore discards all deadlines silently — use a persistent DeadlineStore in production"]
pub struct NoopDeadlineStore;
#[cfg(any(test, feature = "testing"))]
impl DeadlineStore for NoopDeadlineStore {
async fn register(&self, _deadline: &Deadline) -> Result<(), EngineError> {
Ok(())
}
async fn cancel(&self, _id: DeadlineId) -> Result<(), EngineError> {
Ok(())
}
async fn due_now(&self, _limit: usize) -> Result<DueNowResult, EngineError> {
Ok(DueNowResult {
deadlines: Vec::new(),
has_more: false,
})
}
async fn for_stream(&self, _stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
Ok(Vec::new())
}
async fn len(&self) -> Result<usize, EngineError> {
Ok(0)
}
}
#[cfg(any(test, feature = "testing"))]
#[derive(Debug, Default, Clone)]
pub struct InMemoryDeadlineStore {
inner: Arc<RwLock<HashMap<DeadlineId, Deadline>>>,
}
#[cfg(any(test, feature = "testing"))]
impl InMemoryDeadlineStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn is_empty(&self) -> bool {
self.inner.read().await.is_empty()
}
}
#[cfg(any(test, feature = "testing"))]
impl DeadlineStore for InMemoryDeadlineStore {
async fn register(&self, deadline: &Deadline) -> Result<(), EngineError> {
self.inner
.write()
.await
.insert(deadline.deadline_id, deadline.clone());
Ok(())
}
async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError> {
self.inner.write().await.remove(&id);
Ok(())
}
async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError> {
let now = OffsetDateTime::now_utc();
let map = self.inner.read().await;
let mut due: Vec<_> = map.values().filter(|d| d.is_due(now)).cloned().collect();
due.sort_by_key(|d| d.due_at);
let has_more = due.len() > limit;
due.truncate(limit);
Ok(DueNowResult {
deadlines: due,
has_more,
})
}
async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
let map = self.inner.read().await;
Ok(map
.values()
.filter(|d| &d.stream_id == stream_id)
.cloned()
.collect())
}
async fn len(&self) -> Result<usize, EngineError> {
Ok(self.inner.read().await.len())
}
}
#[cfg(test)]
mod tests {
use super::*;
use time::Duration;
fn make_deadline(due_at: OffsetDateTime) -> Deadline {
Deadline::new(
StreamId::new("process/test"),
ProcessId::new(),
TenantId::new(),
WorkflowId::new("test-workflow", "FV2025-10-01"),
"aperak-response-window",
due_at,
)
}
#[tokio::test]
async fn register_and_cancel() {
let store = InMemoryDeadlineStore::new();
let d = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
let id = d.deadline_id;
store.register(&d).await.unwrap();
assert_eq!(store.len().await.unwrap(), 1);
store.cancel(id).await.unwrap();
assert!(store.is_empty().await);
}
#[tokio::test]
async fn due_now_only_returns_overdue() {
let store = InMemoryDeadlineStore::new();
let past = make_deadline(OffsetDateTime::now_utc() - Duration::seconds(1));
let future = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
store.register(&past).await.unwrap();
store.register(&future).await.unwrap();
let due = store.due_now(100).await.unwrap();
assert_eq!(due.deadlines.len(), 1);
assert_eq!(due.deadlines[0].label.as_ref(), "aperak-response-window");
assert!(!due.has_more);
}
#[tokio::test]
async fn due_now_ordered_soonest_first() {
let store = InMemoryDeadlineStore::new();
let t1 = OffsetDateTime::now_utc() - Duration::seconds(60);
let t2 = OffsetDateTime::now_utc() - Duration::seconds(10);
let t3 = OffsetDateTime::now_utc() - Duration::seconds(1);
store.register(&make_deadline(t3)).await.unwrap();
store.register(&make_deadline(t1)).await.unwrap();
store.register(&make_deadline(t2)).await.unwrap();
let due = store.due_now(10).await.unwrap();
assert_eq!(due.deadlines.len(), 3);
assert!(due.deadlines[0].due_at <= due.deadlines[1].due_at);
assert!(due.deadlines[1].due_at <= due.deadlines[2].due_at);
assert!(!due.has_more);
}
#[tokio::test]
async fn for_stream_filters_by_stream() {
let store = InMemoryDeadlineStore::new();
let stream1 = StreamId::new("process/aaa");
let stream2 = StreamId::new("process/bbb");
let d1 = Deadline::new(
stream1.clone(),
ProcessId::new(),
TenantId::new(),
WorkflowId::new("test-workflow", "FV2025-10-01"),
"label",
OffsetDateTime::now_utc() + Duration::days(1),
);
let d2 = Deadline::new(
stream2.clone(),
ProcessId::new(),
TenantId::new(),
WorkflowId::new("test-workflow", "FV2025-10-01"),
"label",
OffsetDateTime::now_utc() + Duration::days(1),
);
store.register(&d1).await.unwrap();
store.register(&d2).await.unwrap();
let for1 = store.for_stream(&stream1).await.unwrap();
assert_eq!(for1.len(), 1);
assert_eq!(for1[0].stream_id, stream1);
}
#[tokio::test]
async fn register_upserts_on_same_id() {
let store = InMemoryDeadlineStore::new();
let mut d = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
store.register(&d).await.unwrap();
let new_due = OffsetDateTime::now_utc() + Duration::days(10);
d.due_at = new_due;
store.register(&d).await.unwrap();
assert_eq!(
store.len().await.unwrap(),
1,
"upsert must not create a duplicate"
);
let found = store.for_stream(&d.stream_id).await.unwrap();
assert_eq!(found[0].due_at, new_due);
}
#[tokio::test]
async fn noop_store_succeeds_silently() {
let store = NoopDeadlineStore;
let d = make_deadline(OffsetDateTime::now_utc() - Duration::seconds(1));
store.register(&d).await.unwrap();
assert!(store.due_now(10).await.unwrap().deadlines.is_empty());
assert!(store.is_empty().await.unwrap());
}
#[tokio::test]
async fn clone_shares_state() {
let store1 = InMemoryDeadlineStore::new();
let store2 = store1.clone();
let d = make_deadline(OffsetDateTime::now_utc() + Duration::days(1));
store1.register(&d).await.unwrap();
assert_eq!(store2.len().await.unwrap(), 1);
}
#[tokio::test]
async fn due_now_has_more_signals_truncation() {
let store = InMemoryDeadlineStore::new();
let past = OffsetDateTime::now_utc() - Duration::seconds(1);
for _ in 0..5 {
store.register(&make_deadline(past)).await.unwrap();
}
let r = store.due_now(3).await.unwrap();
assert_eq!(r.deadlines.len(), 3);
assert!(r.has_more);
let r2 = store.due_now(10).await.unwrap();
assert_eq!(r2.deadlines.len(), 5);
assert!(!r2.has_more);
}
}