use std::{convert::Infallible, error::Error, fmt, future::Future, pin::Pin, sync::Arc};
use crate::{readiness::ReadinessMode, Context};
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ServiceOutcome {
Completed,
Cancelled,
RequestedShutdown,
Error(ServiceError),
}
impl ServiceOutcome {
pub const fn completed() -> Self {
Self::Completed
}
pub const fn cancelled() -> Self {
Self::Cancelled
}
pub const fn requested_shutdown() -> Self {
Self::RequestedShutdown
}
pub fn failed(error: impl Into<ServiceError>) -> Self {
Self::Error(error.into())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ServiceError {
message: Arc<str>,
}
impl ServiceError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: Arc::<str>::from(message.into()),
}
}
pub fn from_error(error: impl std::error::Error) -> Self {
Self::new(error.to_string())
}
pub fn message(&self) -> &str {
&self.message
}
}
impl fmt::Display for ServiceError {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str(&self.message)
}
}
impl From<String> for ServiceError {
fn from(message: String) -> Self {
Self::new(message)
}
}
impl From<&str> for ServiceError {
fn from(message: &str) -> Self {
Self::new(message)
}
}
pub trait IntoServiceError {
fn into_service_error(self) -> ServiceError;
}
impl IntoServiceError for ServiceError {
fn into_service_error(self) -> ServiceError {
self
}
}
impl<E> IntoServiceError for E
where
E: Error + Send + Sync + 'static,
{
fn into_service_error(self) -> ServiceError {
ServiceError::from_error(self)
}
}
pub trait IntoServiceOutcome {
fn into_service_outcome(self) -> ServiceOutcome;
}
impl IntoServiceOutcome for ServiceOutcome {
fn into_service_outcome(self) -> ServiceOutcome {
self
}
}
impl IntoServiceOutcome for () {
fn into_service_outcome(self) -> ServiceOutcome {
ServiceOutcome::Completed
}
}
impl IntoServiceOutcome for Infallible {
fn into_service_outcome(self) -> ServiceOutcome {
match self {}
}
}
impl<T, E> IntoServiceOutcome for Result<T, E>
where
T: IntoServiceOutcome,
E: IntoServiceError,
{
fn into_service_outcome(self) -> ServiceOutcome {
match self {
Ok(outcome) => outcome.into_service_outcome(),
Err(error) => ServiceOutcome::Error(error.into_service_error()),
}
}
}
pub trait SupervisedService: Send + Sync + 'static {
type Context: Clone + Send + Sync + 'static;
fn name(&self) -> &'static str;
fn readiness(&self) -> ReadinessMode {
ReadinessMode::Immediate
}
fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome>;
}
pub trait ServiceExt: SupervisedService + Sized {
fn when_ready(self) -> WhenReady<Self> {
when_ready(self)
}
fn until_cancelled(self) -> UntilCancelled<Self> {
until_cancelled(self)
}
}
impl<S> ServiceExt for S where S: SupervisedService {}
#[derive(Clone)]
pub struct FnService<C, F> {
name: &'static str,
run: F,
marker: std::marker::PhantomData<fn() -> C>,
}
impl<C, F> FnService<C, F> {
pub fn name(&self) -> &'static str {
self.name
}
}
pub fn service_fn<C, F, Fut, O>(name: &'static str, run: F) -> FnService<C, F>
where
C: Clone + Send + Sync + 'static,
F: Fn(Context<C>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = O> + Send + 'static,
O: IntoServiceOutcome + Send + 'static,
{
FnService {
name,
run,
marker: std::marker::PhantomData,
}
}
impl<C, F, Fut, O> SupervisedService for FnService<C, F>
where
C: Clone + Send + Sync + 'static,
F: Fn(Context<C>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = O> + Send + 'static,
O: IntoServiceOutcome + Send + 'static,
{
type Context = C;
fn name(&self) -> &'static str {
self.name
}
fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
let future = (self.run)(ctx);
Box::pin(async move { future.await.into_service_outcome() })
}
}
pub fn when_ready<S>(service: S) -> WhenReady<S>
where
S: SupervisedService,
{
WhenReady {
service: Arc::new(service),
}
}
#[derive(Clone)]
pub struct WhenReady<S>
where
S: SupervisedService,
{
service: Arc<S>,
}
impl<S> SupervisedService for WhenReady<S>
where
S: SupervisedService,
{
type Context = S::Context;
fn name(&self) -> &'static str {
self.service.name()
}
fn readiness(&self) -> ReadinessMode {
ReadinessMode::Explicit
}
fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
self.service.run(ctx)
}
}
pub fn until_cancelled<S>(service: S) -> UntilCancelled<S>
where
S: SupervisedService,
{
UntilCancelled {
service: Arc::new(service),
}
}
#[derive(Clone)]
pub struct UntilCancelled<S>
where
S: SupervisedService,
{
service: Arc<S>,
}
impl<S> SupervisedService for UntilCancelled<S>
where
S: SupervisedService,
{
type Context = S::Context;
fn name(&self) -> &'static str {
self.service.name()
}
fn readiness(&self) -> ReadinessMode {
self.service.readiness()
}
fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
let token = ctx.token().clone();
let service = Arc::clone(&self.service);
Box::pin(async move {
match token.run_until_cancelled_owned(service.run(ctx)).await {
Some(outcome) => outcome,
None => ServiceOutcome::Cancelled,
}
})
}
}