use crate::{FailedDetails, JobMetrics, JobState};
use derive_more::Debug;
use uuid::Uuid;
#[derive(Clone, Debug)]
pub enum EventParameters<R, P> {
Prioritized {
job_id: u64,
name: Option<String>,
priority: u64,
},
Added {
job_id: u64,
name: Option<String>,
},
WaitingToRun {
job_id: u64,
prev_state: Option<JobState>,
},
Delayed {
job_id: u64,
delay: Duration,
},
Active {
job_id: u64,
prev_state: Option<JobState>,
},
Completed {
job_id: u64,
job_metrics: JobMetrics,
expected_delay: Duration,
prev_state: Option<JobState>,
#[debug(skip)]
result: R,
},
Void,
Progress {
job_id: u64,
#[debug(skip)]
data: P,
},
Stalled {
job_id: u64,
prev_state: JobState,
},
Failed {
reason: FailedDetails,
job_id: u64,
prev_state: JobState,
},
Processing {
worker_id: Uuid,
job_id: u64,
status: JobState,
},
}
use serde::de::DeserializeOwned;
use std::{sync::Arc, time::Duration};
use typed_emitter::TypedEmitter;
pub type Emitter<R, P> = TypedEmitter<JobState, EventParameters<R, P>>;
pub type EventEmitter<R, P> = Arc<Emitter<R, P>>;
mod redis_events;
pub use redis_events::QueueStreamEvent;
use crate::KioResult;
impl<R: DeserializeOwned, P: DeserializeOwned> EventParameters<R, P> {
pub fn from_queue_event(event: QueueStreamEvent<R, P>) -> KioResult<Self> {
let job_state = event.event;
let job_id = event.job_id;
let parameter = match job_state {
JobState::Prioritized => Self::Prioritized {
job_id: event.job_id,
name: event.name,
priority: event.priority.unwrap_or_default(),
},
JobState::Wait if event.prev.is_none() => Self::Added {
job_id: event.job_id,
name: event.name,
},
JobState::Wait => Self::WaitingToRun {
job_id: event.job_id,
prev_state: event.prev,
},
JobState::Stalled => Self::Stalled {
job_id: event.job_id,
prev_state: event.prev.unwrap_or_default(),
},
JobState::Active => Self::Active {
job_id,
prev_state: event.prev,
},
JobState::Paused | JobState::Resumed | JobState::Obliterated => Self::Void,
JobState::Completed => {
let job_metrics = event.metrics.unwrap_or_default();
Self::Completed {
job_metrics,
job_id,
prev_state: event.prev,
expected_delay: Duration::from_millis(job_metrics.delay),
result: event.returned_value.expect("there is no result"),
}
}
JobState::Failed => Self::Failed {
reason: event.failed_reason.unwrap_or_default(),
job_id: event.job_id,
prev_state: event.prev.unwrap_or_default(),
},
JobState::Delayed => Self::Delayed {
job_id: event.job_id,
delay: Duration::from_millis(event.delay.unwrap_or_default()),
},
JobState::Progress => Self::Progress {
job_id: event.job_id,
data: event.progress_data.expect("expecting a value"),
},
JobState::Processing => Self::Processing {
worker_id: event.worker_id.unwrap_or_default(),
job_id: event.job_id,
status: event.prev.unwrap_or_default(),
},
};
Ok(parameter)
}
}