use crate::{Id, StepLog, StepNumber, WorkflowMessage, WorkflowState};
use parking_lot::Mutex;
use std::sync::Arc;
pub type EventHandler = Box<dyn Fn(&WorkflowMessage) + Send + Sync>;
#[derive(Clone)]
pub struct MessageSender {
handlers: Arc<Mutex<Vec<EventHandler>>>,
}
impl MessageSender {
pub fn new() -> Self {
Self {
handlers: Arc::new(Mutex::new(vec![])),
}
}
pub fn register(&mut self, handler: EventHandler) {
self.handlers.lock().push(handler);
}
pub fn send(&self, message: &WorkflowMessage) -> anyhow::Result<()> {
let handlers = self.handlers.lock();
for handler in &*handlers {
handler(message);
}
Ok(())
}
}
#[derive(Clone)]
pub struct WorkflowMessageSender {
workflow_run_id: Id,
sender: MessageSender,
}
#[derive(Clone)]
pub struct JobMessageSender {
workflow_run_id: Id,
job_id: Id,
sender: MessageSender,
}
#[derive(Clone)]
pub struct StepMessageSender {
workflow_run_id: Id,
job_id: Id,
step_number: StepNumber,
sender: MessageSender,
}
impl WorkflowMessageSender {
pub fn new(workflow_run_id: Id, sender: MessageSender) -> Self {
Self {
workflow_run_id,
sender,
}
}
pub fn update_workflow_state(&self, state: WorkflowState) {
self.send(WorkflowMessage::WorkflowStateUpdated {
id: self.workflow_run_id.clone(),
state,
});
}
pub fn create_job_message_sender(&self, job_id: Id) -> JobMessageSender {
JobMessageSender::new(self.workflow_run_id.clone(), job_id, self.sender.clone())
}
fn send(&self, message: WorkflowMessage) {
if let Err(err) = self.sender.send(&message) {
log::error!("Failed to send workflow message: {}", err.to_string());
}
}
}
impl JobMessageSender {
pub fn new(workflow_run_id: Id, job_id: Id, sender: MessageSender) -> Self {
Self {
workflow_run_id,
job_id,
sender,
}
}
pub fn update_job_state(&self, state: WorkflowState) {
self.send(WorkflowMessage::JobStateUpdated {
workflow_run_id: self.workflow_run_id.clone(),
job_id: self.job_id.clone(),
state,
});
}
pub fn create_step_message_sender(&self, step_number: StepNumber) -> StepMessageSender {
StepMessageSender::new(
self.workflow_run_id.clone(),
self.job_id.clone(),
step_number,
self.sender.clone(),
)
}
fn send(&self, message: WorkflowMessage) {
if let Err(err) = self.sender.send(&message) {
log::error!("Failed to send job message: {}", err.to_string());
}
}
}
impl StepMessageSender {
pub fn new(
workflow_run_id: Id,
job_id: Id,
step_number: StepNumber,
sender: MessageSender,
) -> Self {
Self {
workflow_run_id,
job_id,
step_number,
sender,
}
}
pub fn update_step_state(&self, state: WorkflowState) {
self.send(WorkflowMessage::StepStateUpdated {
workflow_run_id: self.workflow_run_id.clone(),
job_id: self.job_id.clone(),
number: self.step_number,
state,
});
}
pub fn send_log(&self, log: StepLog) {
self.send(WorkflowMessage::Log {
workflow_run_id: self.workflow_run_id.clone(),
job_id: self.job_id.clone(),
number: self.step_number,
message: log.message,
log_type: log.log_type,
time: log.time,
});
}
fn send(&self, message: WorkflowMessage) {
if let Err(err) = self.sender.send(&message) {
log::error!("Failed to send step message: {}", err.to_string());
}
}
}