use futures::{future::BoxFuture, Stream};
use serde::{Deserialize, Serialize};
use tower::layer::util::Identity;
use std::{fmt::Debug, pin::Pin};
use crate::{
data::Extensions, error::Error, poller::Poller, task::task_id::TaskId, worker::WorkerId,
Backend,
};
#[derive(Serialize, Debug, Deserialize, Clone)]
pub struct Request<T> {
pub(crate) req: T,
#[serde(skip)]
pub(crate) data: Extensions,
}
impl<T> Request<T> {
pub fn new(req: T) -> Self {
let id = TaskId::new();
let mut data = Extensions::new();
data.insert(id);
Self::new_with_data(req, data)
}
pub fn new_with_data(req: T, data: Extensions) -> Self {
Self { req, data }
}
pub fn inner(&self) -> &T {
&self.req
}
pub fn take(self) -> T {
self.req
}
}
impl<T> std::ops::Deref for Request<T> {
type Target = Extensions;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<T> std::ops::DerefMut for Request<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
pub type RequestFuture<T> = BoxFuture<'static, T>;
pub type RequestStream<T> = BoxStream<'static, Result<Option<T>, Error>>;
impl<T> Backend<Request<T>> for RequestStream<Request<T>> {
type Stream = Self;
type Layer = Identity;
fn poll(self, _worker: WorkerId) -> Poller<Self::Stream> {
Poller {
stream: self,
heartbeat: Box::pin(async {}),
layer: Identity::new(),
}
}
}