use std::{
borrow::Cow,
collections::HashMap,
fmt::Display,
marker::PhantomData,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use time::{Duration, OffsetDateTime};
use crate::{timeout::TimeoutPolicy, UnixNanos};
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)]
#[must_use]
pub struct WorkerSelector {
pub kind: Cow<'static, str>,
}
impl<T> From<T> for WorkerSelector
where
T: Into<String>,
{
fn from(value: T) -> Self {
Self {
kind: value.into().into(),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[must_use]
pub struct TaskDefinition<T = ()> {
pub target: UnixNanos,
pub worker_selector: WorkerSelector,
pub data: Vec<u8>,
pub data_format: TaskDataFormat,
#[serde(default)]
pub labels: HashMap<String, Value>,
#[serde(default)]
pub timeout: TimeoutPolicy,
#[doc(hidden)]
#[serde(default, skip)]
pub _task_type: PhantomData<T>,
}
impl<T> Clone for TaskDefinition<T> {
fn clone(&self) -> Self {
Self {
target: self.target,
worker_selector: self.worker_selector.clone(),
data: self.data.clone(),
data_format: self.data_format,
labels: self.labels.clone(),
timeout: self.timeout,
_task_type: PhantomData,
}
}
}
impl<T> TaskDefinition<T> {
pub fn with_timeout(mut self, timeout: impl Into<TimeoutPolicy>) -> Self {
self.timeout = timeout.into();
self
}
pub fn immediate(mut self) -> Self {
self.target = UnixNanos(0);
self
}
pub fn at(mut self, target: OffsetDateTime) -> Self {
let nanos = target.unix_timestamp_nanos();
self.target = if nanos.is_negative() {
UnixNanos(0)
} else {
UnixNanos(nanos.unsigned_abs().try_into().unwrap_or(u64::MAX))
};
self
}
pub fn at_unix(mut self, target: UnixNanos) -> Self {
self.target = target;
self
}
pub fn now(mut self) -> Self {
self.target = UnixNanos::now();
self
}
#[allow(clippy::cast_possible_truncation)]
pub fn after(mut self, duration: Duration) -> Self {
let nanos = duration.whole_nanoseconds();
let nanos = if nanos.is_negative() {
0
} else {
nanos.unsigned_abs().try_into().unwrap_or(u64::MAX)
};
self.target = UnixNanos(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.saturating_add(std::time::Duration::from_nanos(nanos))
.as_nanos() as u64,
);
self
}
pub fn with_worker_selector(mut self, selector: impl Into<WorkerSelector>) -> Self {
self.worker_selector = selector.into();
self
}
pub fn with_label(mut self, name: &str, value: impl Serialize) -> Self {
self.labels
.insert(name.into(), serde_json::to_value(value).unwrap());
self
}
pub fn cast<U>(self) -> TaskDefinition<U> {
TaskDefinition {
target: self.target,
worker_selector: self.worker_selector,
data: self.data,
data_format: self.data_format,
labels: self.labels,
timeout: self.timeout,
_task_type: PhantomData,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TaskStatus {
Pending,
Ready,
Started,
Succeeded,
Failed,
Cancelled,
}
impl FromStr for TaskStatus {
type Err = UnexpectedValueError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"pending" => TaskStatus::Pending,
"ready" => TaskStatus::Ready,
"started" => TaskStatus::Started,
"succeeded" => TaskStatus::Succeeded,
"failed" => TaskStatus::Failed,
"cancelled" => TaskStatus::Cancelled,
_ => Err(UnexpectedValueError(s.to_string()))?,
})
}
}
impl TaskStatus {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
TaskStatus::Pending => "pending",
TaskStatus::Ready => "ready",
TaskStatus::Started => "started",
TaskStatus::Succeeded => "succeeded",
TaskStatus::Failed => "failed",
TaskStatus::Cancelled => "cancelled",
}
}
#[must_use]
pub fn is_finished(&self) -> bool {
matches!(
self,
TaskStatus::Succeeded | TaskStatus::Failed | TaskStatus::Cancelled
)
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskDataFormat {
#[default]
Unknown,
MessagePack,
Json,
}
impl TaskDataFormat {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
TaskDataFormat::Unknown => "unknown",
TaskDataFormat::MessagePack => "message_pack",
TaskDataFormat::Json => "json",
}
}
}
impl FromStr for TaskDataFormat {
type Err = UnexpectedValueError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"unknown" => TaskDataFormat::Unknown,
"message_pack" => TaskDataFormat::MessagePack,
"json" => TaskDataFormat::Json,
_ => Err(UnexpectedValueError(s.to_string()))?,
})
}
}
#[derive(Debug)]
pub struct UnexpectedValueError(String);
impl Display for UnexpectedValueError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "unexpected value: {}", self.0)
}
}
impl std::error::Error for UnexpectedValueError {}