use crate::{
action::Actions,
blob_encoding::BlobMetadata,
states::{
BlobIsInputOrOutput, JobFullState, JobFullStateWithData, JobStateAndWorkerId,
JobStateForDAG, JobStatus, UnnamespacedJobType, UnnamespacedWorkerSpec, WorkerId,
},
JobId,
};
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
type Result<T, E = crate::Error> = anyhow::Result<T, E>;
#[derive(Debug)]
pub struct SubmittableJob {
pub(crate) job_type: UnnamespacedJobType,
pub(crate) id: JobId,
pub(crate) data: (BlobMetadata, Vec<u8>),
pub(crate) allowed_attempts: i64,
pub(crate) dependency: Option<JobId>,
pub(crate) dependency_type: Option<UnnamespacedJobType>,
pub(crate) expiry: Option<Duration>,
}
#[derive(Debug)]
pub struct BackfillableJob {
pub(crate) job_type: UnnamespacedJobType,
pub(crate) id: JobId,
pub(crate) data: Option<(BlobMetadata, Vec<u8>)>,
pub(crate) ready_at: SystemTime,
}
#[derive(Debug)]
pub enum HandleJobCompletionStatus {
Success(Actions),
Failure(String),
}
#[async_trait::async_trait]
pub trait JobbyStore {
async fn handle_job_completion(
&self,
job_type: UnnamespacedJobType,
job_id: JobId,
worker_id: WorkerId,
status: HandleJobCompletionStatus,
) -> Result<()>;
async fn submit_jobs(&self, jobs: Vec<SubmittableJob>) -> Result<()>;
async fn fetch_and_claim_jobs(&self) -> Result<Vec<JobStateAndWorkerId>>;
async fn fetch_job_data(
&self,
job_type: UnnamespacedJobType,
id: JobId,
is_input: BlobIsInputOrOutput,
) -> Result<Option<(BlobMetadata, Vec<u8>, i64)>>;
async fn startup(&self) -> Result<()>;
async fn register_workers(&self, workers: Vec<UnnamespacedWorkerSpec>) -> Result<()>;
async fn backfill_jobs(&self, jobs: Vec<BackfillableJob>) -> Result<()>;
async fn expire_jobs(&self) -> Result<()>;
async fn expire_job(
&self,
job_type: UnnamespacedJobType,
job_id: JobId,
duration: Duration,
) -> Result<()>;
async fn unexpire_job(&self, job_type: UnnamespacedJobType, job_id: JobId) -> Result<()>;
async fn job_status_metrics(
&self,
job_type: Option<UnnamespacedJobType>,
) -> Result<HashMap<i64, HashMap<JobStatus, i64>>>;
async fn job_timing_metrics(
&self,
job_type: UnnamespacedJobType,
) -> Result<(i64, i64, i64, i64)>;
async fn job_sizing_metrics(
&self,
job_type: UnnamespacedJobType,
) -> Result<Vec<(BlobIsInputOrOutput, i64, i64)>>;
async fn job_type_recent_jobs(
&self,
job_type: UnnamespacedJobType,
limit: usize,
status: JobStatus,
) -> Result<Vec<JobFullState>>;
async fn job_full_state(
&self,
job_type: UnnamespacedJobType,
job_id: JobId,
) -> Result<JobFullStateWithData>;
async fn mark_job_ready(&self, job_type: UnnamespacedJobType, job_id: JobId) -> Result<()>;
async fn force_run_job_now(&self, job_type: UnnamespacedJobType, job_id: JobId) -> Result<()>;
async fn search_jobs(
&self,
prefix: String,
job_type: Option<UnnamespacedJobType>,
) -> Result<Vec<(i64, String, JobStatus)>>;
async fn fetch_dag(
&self,
job_type: UnnamespacedJobType,
job_id: JobId,
) -> Result<Vec<JobStateForDAG>>;
}