use std::sync::Arc;
use futures_util::future::BoxFuture;
use crate::jobs::manager::{JobManager, JobManagerError, JobMetaSource};
use crate::jobs::publisher::JobEventPublisher;
use crate::jobs::runtime_worker::JobCancellationToken;
use crate::jobs::types::{Job, JobContext, JobLogEntry, JobLogLevel, JobProgress};
type HeartbeatHook = Arc<dyn Fn() -> BoxFuture<'static, Result<(), String>> + Send + Sync>;
#[derive(Debug, thiserror::Error)]
pub enum ActiveJobRuntimeError {
#[error("failed to send worker heartbeat: {0}")]
Heartbeat(String),
}
#[derive(Clone)]
pub struct ActiveJob<P, M> {
manager: JobManager<P, M>,
job: Job,
cancellation: JobCancellationToken,
heartbeat: HeartbeatHook,
}
impl<P, M> ActiveJob<P, M> {
pub(crate) fn new(
manager: JobManager<P, M>,
job: Job,
cancellation: JobCancellationToken,
heartbeat: HeartbeatHook,
) -> Self {
Self {
manager,
job,
cancellation,
heartbeat,
}
}
pub fn job(&self) -> &Job {
&self.job
}
pub fn context(&self) -> &JobContext {
&self.job.context
}
pub fn is_cancelled(&self) -> bool {
self.cancellation.is_cancelled()
}
pub fn cancellation_token(&self) -> JobCancellationToken {
self.cancellation.clone()
}
pub async fn heartbeat(&self) -> Result<(), ActiveJobRuntimeError> {
(self.heartbeat)()
.await
.map_err(ActiveJobRuntimeError::Heartbeat)
}
}
impl<P, M> ActiveJob<P, M>
where
P: JobEventPublisher,
M: JobMetaSource,
{
pub async fn update_progress(
&self,
current: u64,
total: u64,
message: Option<String>,
) -> Result<(), JobManagerError<P::Error>> {
self.manager
.emit_progress(
&self.job,
JobProgress {
step: None,
message,
current: Some(current),
total: Some(total),
},
)
.await
}
pub async fn log(
&self,
level: JobLogLevel,
message: impl Into<String>,
) -> Result<(), JobManagerError<P::Error>> {
self.manager
.emit_log(
&self.job,
JobLogEntry {
timestamp: self.manager.now_iso(),
level,
message: message.into(),
},
)
.await
}
}