use futures::{future::BoxFuture, Stream};
use serde::{Deserialize, Serialize};
use tower::layer::util::Identity;
use std::{fmt, fmt::Debug, pin::Pin, str::FromStr};
use crate::{
backend::Backend,
data::Extensions,
error::Error,
poller::Poller,
task::{attempt::Attempt, namespace::Namespace, task_id::TaskId},
worker::{Context, Worker},
};
#[derive(Serialize, Debug, Deserialize, Clone, Default)]
pub struct Request<Args, Ctx> {
pub args: Args,
pub parts: Parts<Ctx>,
}
#[non_exhaustive]
#[derive(Serialize, Debug, Deserialize, Clone, Default)]
pub struct Parts<Ctx> {
pub task_id: TaskId,
#[serde(skip)]
pub data: Extensions,
pub attempt: Attempt,
pub context: Ctx,
#[serde(skip)]
pub namespace: Option<Namespace>,
}
impl<T, Ctx: Default> Request<T, Ctx> {
pub fn new(args: T) -> Self {
Self::new_with_data(args, Extensions::default(), Ctx::default())
}
pub fn new_with_parts(args: T, parts: Parts<Ctx>) -> Self {
Self { args, parts }
}
pub fn new_with_ctx(req: T, ctx: Ctx) -> Self {
Self {
args: req,
parts: Parts {
context: ctx,
..Default::default()
},
}
}
pub fn new_with_data(req: T, data: Extensions, ctx: Ctx) -> Self {
Self {
args: req,
parts: Parts {
context: ctx,
data,
..Default::default()
},
}
}
pub fn take_parts(self) -> (T, Parts<Ctx>) {
(self.args, self.parts)
}
}
impl<T, Ctx> std::ops::Deref for Request<T, Ctx> {
type Target = Extensions;
fn deref(&self) -> &Self::Target {
&self.parts.data
}
}
impl<T, Ctx> std::ops::DerefMut for Request<T, Ctx> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.parts.data
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, std::cmp::Eq)]
pub enum State {
#[serde(alias = "Latest")]
Pending,
Scheduled,
Running,
Done,
Failed,
Killed,
}
impl Default for State {
fn default() -> Self {
State::Pending
}
}
impl FromStr for State {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Pending" | "Latest" => Ok(State::Pending),
"Running" => Ok(State::Running),
"Done" => Ok(State::Done),
"Failed" => Ok(State::Failed),
"Killed" => Ok(State::Killed),
"Scheduled" => Ok(State::Scheduled),
_ => Err(Error::MissingData("Invalid Job state".to_string())),
}
}
}
impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
State::Pending => write!(f, "Pending"),
State::Running => write!(f, "Running"),
State::Done => write!(f, "Done"),
State::Failed => write!(f, "Failed"),
State::Killed => write!(f, "Killed"),
State::Scheduled => write!(f, "Scheduled"),
}
}
}
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
pub type RequestFuture<T> = BoxFuture<'static, T>;
pub type RequestStream<T> = BoxStream<'static, Result<Option<T>, Error>>;
impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx>> {
type Stream = Self;
type Layer = Identity;
fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
Poller {
stream: self,
heartbeat: Box::pin(futures::future::pending()),
layer: Identity::new(),
_priv: (),
}
}
}