use std::{error::Error, fmt::Debug, marker::PhantomData};
use futures::Stream;
use tower::{
layer::util::{Identity, Stack},
Layer, Service, ServiceBuilder,
};
use crate::{
context::HasJobContext,
job::Job,
job_fn::{job_fn, JobFn},
request::JobRequest,
worker::{ready::ReadyWorker, HeartBeat, Worker, WorkerId},
};
pub struct WorkerBuilder<Job, Source, Middleware> {
pub(crate) id: WorkerId,
pub(crate) job: PhantomData<Job>,
pub(crate) layer: ServiceBuilder<Middleware>,
pub(crate) source: Source,
pub(crate) beats: Vec<Box<dyn HeartBeat + Send>>,
}
impl<Job, Source, Middleware> std::fmt::Debug for WorkerBuilder<Job, Source, Middleware> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkerBuilder")
.field("id", &self.id)
.field("job", &std::any::type_name::<Job>())
.field("layer", &std::any::type_name::<Middleware>())
.field("source", &std::any::type_name::<Source>())
.field("beats", &self.beats.len())
.finish()
}
}
impl WorkerBuilder<(), (), Identity> {
pub fn new<T: AsRef<str>>(name: T) -> WorkerBuilder<(), (), Identity> {
let job: PhantomData<()> = PhantomData;
WorkerBuilder {
job,
layer: ServiceBuilder::new(),
source: (),
id: WorkerId::new(name),
beats: Vec::new(),
}
}
}
impl<J, S, M> WorkerBuilder<J, S, M> {
pub fn stream<NS: Stream<Item = Result<Option<JobRequest<NJ>>, E>>, E, NJ>(
self,
stream: NS,
) -> WorkerBuilder<NJ, NS, M> {
WorkerBuilder {
job: PhantomData,
layer: self.layer,
source: stream,
id: self.id,
beats: self.beats,
}
}
pub fn with_stream<
NS: Fn(&WorkerId) -> ST,
NJ,
E,
ST: Stream<Item = Result<Option<JobRequest<NJ>>, E>>,
>(
self,
stream: NS,
) -> WorkerBuilder<NJ, ST, M> {
WorkerBuilder {
job: PhantomData,
layer: self.layer,
source: stream(&self.id),
id: self.id,
beats: self.beats,
}
}
}
impl<Job, Stream, Serv> WorkerBuilder<Job, Stream, Serv> {
pub fn middleware<NewService>(
self,
f: impl Fn(ServiceBuilder<Serv>) -> ServiceBuilder<NewService>,
) -> WorkerBuilder<Job, Stream, NewService> {
let middleware = f(self.layer);
WorkerBuilder {
job: self.job,
layer: middleware,
id: self.id,
source: self.source,
beats: self.beats,
}
}
pub fn layer<U>(self, layer: U) -> WorkerBuilder<Job, Stream, Stack<U, Serv>>
where
Serv: Layer<U>,
{
WorkerBuilder {
job: self.job,
source: self.source,
layer: self.layer.layer(layer),
id: self.id,
beats: self.beats,
}
}
}
impl<J, S, M, Ser, E, Request> WorkerFactory<J, Ser> for WorkerBuilder<J, S, M>
where
S: Stream<Item = Result<Option<Request>, E>> + Send + 'static + Unpin,
J: Job + Send + 'static,
M: Layer<Ser>,
<M as Layer<Ser>>::Service: Service<Request> + Send + 'static,
E: Sync + Send + 'static + Error,
Request: Send + HasJobContext,
<<M as Layer<Ser>>::Service as Service<Request>>::Future: std::marker::Send,
Ser: Service<Request>,
<Ser as Service<Request>>::Error: Debug,
<<M as Layer<Ser>>::Service as Service<Request>>::Error: std::fmt::Debug,
<<M as Layer<Ser>>::Service as Service<Request>>::Future: 'static,
{
type Worker = ReadyWorker<S, <M as Layer<Ser>>::Service>;
fn build(self, service: Ser) -> ReadyWorker<S, <M as Layer<Ser>>::Service> {
ReadyWorker {
id: self.id,
stream: self.source,
service: self.layer.service(service),
beats: self.beats,
}
}
}
pub trait WorkerFactory<J, S> {
type Worker: Worker<J>;
fn build(self, service: S) -> Self::Worker;
}
pub trait WorkerFactoryFn<J, F> {
type Worker: Worker<J>;
fn build_fn(self, f: F) -> Self::Worker;
}
impl<J, W, F> WorkerFactoryFn<J, F> for W
where
W: WorkerFactory<J, JobFn<F>>,
{
type Worker = W::Worker;
fn build_fn(self, f: F) -> Self::Worker {
self.build(job_fn(f))
}
}