use std::collections::HashMap;
use std::future::Future;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::error::{ArtifactError, LogError, QueueError, RegistryError};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Lease {
pub lease_id: String,
pub job_id: String,
pub workflow_id: String,
pub worker_id: String,
pub ttl_secs: u64,
pub granted_at_ms: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QueuedJob {
pub job_id: String,
pub workflow_id: String,
pub command: String,
pub required_labels: Vec<String>,
pub retry_policy: RetryPolicy,
pub attempt: u32,
pub upstream_outputs: HashMap<String, HashMap<String, String>>,
pub enqueued_at_ms: u64,
#[serde(default)]
pub delayed_until_ms: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_retries: u32,
pub backoff: BackoffStrategy,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 0,
backoff: BackoffStrategy::None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BackoffStrategy {
None,
Fixed { delay_secs: u64 },
Exponential { base_secs: u64, max_secs: u64 },
}
impl BackoffStrategy {
pub fn delay_ms(&self, attempt: u32) -> u64 {
match self {
BackoffStrategy::None => 0,
BackoffStrategy::Fixed { delay_secs } => delay_secs * 1000,
BackoffStrategy::Exponential {
base_secs,
max_secs,
} => {
let delay = base_secs.saturating_mul(2u64.saturating_pow(attempt));
delay.min(*max_secs) * 1000
}
}
}
}
#[derive(Clone, Debug)]
pub enum JobEvent {
Ready {
workflow_id: String,
job_id: String,
},
Started {
workflow_id: String,
job_id: String,
worker_id: String,
},
Completed {
workflow_id: String,
job_id: String,
outputs: HashMap<String, String>,
},
Failed {
workflow_id: String,
job_id: String,
error: String,
retryable: bool,
},
Cancelled {
workflow_id: String,
job_id: String,
},
LeaseExpired {
workflow_id: String,
job_id: String,
worker_id: String,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogChunk {
pub workflow_id: String,
pub job_id: String,
pub sequence: u64,
pub data: String,
pub timestamp_ms: u64,
pub stream: LogStream,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum LogStream {
Stdout,
Stderr,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WorkerInfo {
pub worker_id: String,
pub labels: Vec<String>,
pub registered_at_ms: u64,
pub last_heartbeat_ms: u64,
pub current_job: Option<String>,
pub status: WorkerStatus,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum WorkerStatus {
Idle,
Busy,
Offline,
}
pub trait JobQueue: Send + Sync + 'static {
fn enqueue(&self, job: QueuedJob) -> impl Future<Output = Result<(), QueueError>> + Send;
fn claim(
&self,
worker_id: &str,
worker_labels: &[String],
lease_ttl: Duration,
) -> impl Future<Output = Result<Option<(QueuedJob, Lease)>, QueueError>> + Send;
fn renew_lease(
&self,
lease_id: &str,
extend_by: Duration,
) -> impl Future<Output = Result<(), QueueError>> + Send;
fn complete(
&self,
lease_id: &str,
outputs: HashMap<String, String>,
) -> impl Future<Output = Result<(), QueueError>> + Send;
fn fail(
&self,
lease_id: &str,
error: String,
retryable: bool,
) -> impl Future<Output = Result<(), QueueError>> + Send;
fn cancel(
&self,
workflow_id: &str,
job_id: &str,
) -> impl Future<Output = Result<(), QueueError>> + Send;
fn cancel_workflow(
&self,
workflow_id: &str,
) -> impl Future<Output = Result<(), QueueError>> + Send;
fn is_cancelled(
&self,
workflow_id: &str,
job_id: &str,
) -> impl Future<Output = Result<bool, QueueError>> + Send;
fn reap_expired_leases(&self)
-> impl Future<Output = Result<Vec<JobEvent>, QueueError>> + Send;
fn subscribe(&self) -> broadcast::Receiver<JobEvent>;
}
pub trait ArtifactStore: Send + Sync + 'static {
fn put_outputs(
&self,
workflow_id: &str,
job_id: &str,
outputs: HashMap<String, String>,
) -> impl Future<Output = Result<(), ArtifactError>> + Send;
fn get_outputs(
&self,
workflow_id: &str,
job_id: &str,
) -> impl Future<Output = Result<HashMap<String, String>, ArtifactError>> + Send;
fn get_upstream_outputs(
&self,
workflow_id: &str,
job_ids: &[String],
) -> impl Future<Output = Result<HashMap<String, HashMap<String, String>>, ArtifactError>> + Send;
}
pub trait LogSink: Send + Sync + 'static {
fn append(&self, chunk: LogChunk) -> impl Future<Output = Result<(), LogError>> + Send;
fn get_all(
&self,
workflow_id: &str,
job_id: &str,
) -> impl Future<Output = Result<Vec<LogChunk>, LogError>> + Send;
fn subscribe(&self, workflow_id: &str, job_id: &str) -> broadcast::Receiver<LogChunk>;
}
pub trait WorkerRegistry: Send + Sync + 'static {
fn register(
&self,
worker_id: &str,
labels: &[String],
) -> impl Future<Output = Result<(), RegistryError>> + Send;
fn heartbeat(&self, worker_id: &str) -> impl Future<Output = Result<(), RegistryError>> + Send;
fn deregister(&self, worker_id: &str)
-> impl Future<Output = Result<(), RegistryError>> + Send;
fn list_workers(&self) -> impl Future<Output = Result<Vec<WorkerInfo>, RegistryError>> + Send;
fn mark_busy(
&self,
worker_id: &str,
job_id: &str,
) -> impl Future<Output = Result<(), RegistryError>> + Send;
fn mark_idle(&self, worker_id: &str) -> impl Future<Output = Result<(), RegistryError>> + Send;
}