use std::collections::{HashMap, VecDeque};
use std::sync::{Mutex, MutexGuard};
use aion_core::{Payload, WorkflowId};
use crate::engine_seam::{
EngineHandle, EngineSeamError, WorkflowMailboxMessage, WorkflowResidency,
};
#[derive(Clone, Debug, PartialEq, Eq)]
struct DeferredSignal {
name: String,
payload: Payload,
}
#[derive(Default)]
pub struct SignalResumeHandoff {
deferred: Mutex<HashMap<WorkflowId, VecDeque<DeferredSignal>>>,
}
impl SignalResumeHandoff {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn defer(
&self,
workflow_id: WorkflowId,
name: String,
payload: Payload,
) -> Result<(), SignalResumeError> {
self.state()?
.entry(workflow_id)
.or_default()
.push_back(DeferredSignal { name, payload });
Ok(())
}
pub fn deliver_deferred(
&self,
engine: &dyn EngineHandle,
workflow_id: &WorkflowId,
) -> Result<usize, SignalResumeError> {
let process = match engine
.resolve_workflow(workflow_id)
.map_err(SignalResumeError::Resolve)?
{
WorkflowResidency::Resident(process) => process,
WorkflowResidency::NonResident => {
return Err(SignalResumeError::NonResident {
workflow_id: workflow_id.clone(),
});
}
WorkflowResidency::Terminal => {
return Err(SignalResumeError::Terminal {
workflow_id: workflow_id.clone(),
});
}
WorkflowResidency::Unknown => {
return Err(SignalResumeError::Unknown {
workflow_id: workflow_id.clone(),
});
}
};
let mut delivered = 0;
let mut state = self.state()?;
let Some(queue) = state.get_mut(workflow_id) else {
return Ok(0);
};
while let Some(signal) = queue.pop_front() {
if let Err(error) = engine.deliver_workflow_message(
process,
WorkflowMailboxMessage::SignalReceived {
name: signal.name,
payload: signal.payload,
},
) {
tracing::warn!(
workflow_id = %workflow_id,
process = process.pid(),
error = %error,
"dropping already-recorded deferred signal after mailbox delivery failure"
);
continue;
}
delivered += 1;
}
state.remove(workflow_id);
Ok(delivered)
}
pub fn pending_count(&self, workflow_id: &WorkflowId) -> Result<usize, SignalResumeError> {
Ok(self.state()?.get(workflow_id).map_or(0, VecDeque::len))
}
fn state(
&self,
) -> Result<MutexGuard<'_, HashMap<WorkflowId, VecDeque<DeferredSignal>>>, SignalResumeError>
{
self.deferred.lock().map_err(|_| SignalResumeError::State {
reason: "deferred signal registry lock was poisoned".to_owned(),
})
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum SignalResumeError {
#[error("workflow {workflow_id} is not resident")]
NonResident {
workflow_id: WorkflowId,
},
#[error("workflow {workflow_id} is terminal")]
Terminal {
workflow_id: WorkflowId,
},
#[error("workflow {workflow_id} is unknown")]
Unknown {
workflow_id: WorkflowId,
},
#[error("workflow resolution failed: {0}")]
Resolve(EngineSeamError),
#[error("deferred signal delivery failed: {0}")]
Deliver(EngineSeamError),
#[error("deferred signal registry failed: {reason}")]
State {
reason: String,
},
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use aion_core::{ContentType, Payload, WorkflowId};
use super::SignalResumeHandoff;
use crate::engine_seam::test_support::{DeliveredWorkflowMessage, FakeEngineHandle};
use crate::engine_seam::{EngineSeamError, WorkflowProcessHandle, WorkflowResidency};
#[test]
fn deferred_signals_deliver_in_order_exactly_once() -> Result<(), Box<dyn std::error::Error>> {
let engine = Arc::new(FakeEngineHandle::new());
let handoff = SignalResumeHandoff::new();
let workflow_id = WorkflowId::new_v4();
let process = WorkflowProcessHandle::new(41);
let first = payload(b"{\"n\":1}".to_vec());
let second = payload(b"{\"n\":2}".to_vec());
handoff.defer(workflow_id.clone(), "first".to_owned(), first.clone())?;
handoff.defer(workflow_id.clone(), "second".to_owned(), second.clone())?;
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
assert_eq!(handoff.deliver_deferred(engine.as_ref(), &workflow_id)?, 2);
assert_eq!(handoff.deliver_deferred(engine.as_ref(), &workflow_id)?, 0);
assert_eq!(
engine.delivered_messages()?,
vec![
(
process,
DeliveredWorkflowMessage::SignalReceived {
name: "first".to_owned(),
payload: first,
},
),
(
process,
DeliveredWorkflowMessage::SignalReceived {
name: "second".to_owned(),
payload: second,
},
),
]
);
assert_eq!(handoff.pending_count(&workflow_id)?, 0);
Ok(())
}
#[test]
fn failed_delivery_is_logged_dropped_and_subsequent_signals_continue()
-> Result<(), Box<dyn std::error::Error>> {
let engine = Arc::new(FakeEngineHandle::new());
let handoff = SignalResumeHandoff::new();
let workflow_id = WorkflowId::new_v4();
let process = WorkflowProcessHandle::new(42);
let first = payload(b"{\"n\":1}".to_vec());
let second = payload(b"{\"n\":2}".to_vec());
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
engine.push_delivery_response(Err(EngineSeamError::Delivery {
reason: "mailbox unavailable".to_owned(),
}))?;
handoff.defer(workflow_id.clone(), "first".to_owned(), first.clone())?;
handoff.defer(workflow_id.clone(), "second".to_owned(), second.clone())?;
assert_eq!(handoff.deliver_deferred(engine.as_ref(), &workflow_id)?, 1);
assert_eq!(handoff.pending_count(&workflow_id)?, 0);
assert_eq!(
engine.delivered_messages()?,
vec![(
process,
DeliveredWorkflowMessage::SignalReceived {
name: "second".to_owned(),
payload: second,
},
),]
);
assert_eq!(handoff.pending_count(&workflow_id)?, 0);
Ok(())
}
fn payload(bytes: Vec<u8>) -> Payload {
Payload::new(ContentType::Json, bytes)
}
}