use std::time::Duration;
use crate::envelope::{ServiceAction, ServiceMessage};
use crate::link::Link;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep;
#[allow(unused_variables)]
pub trait Service: Sized + Send + 'static {
fn started(&mut self, ctx: &mut ServiceContext<Self>) {}
fn start(self) -> Link<Self> {
let ctx = ServiceContext::new();
let link = ctx.link();
ctx.spawn(self);
link
}
fn create<F>(action: F) -> Link<Self>
where
F: FnOnce(&mut ServiceContext<Self>) -> Self,
{
let mut ctx = ServiceContext::new();
let service = action(&mut ctx);
let link = ctx.link();
ctx.spawn(service);
link
}
fn stopping(&mut self) {}
}
pub struct ServiceContext<S: Service> {
rx: mpsc::UnboundedReceiver<ServiceMessage<S>>,
link: Link<S>,
}
impl<S> ServiceContext<S>
where
S: Service,
{
pub(crate) fn new() -> ServiceContext<S> {
let (tx, rx) = mpsc::unbounded_channel();
let link = Link(tx);
ServiceContext { rx, link }
}
pub(crate) fn spawn(mut self, mut service: S) {
tokio::spawn(async move {
service.started(&mut self);
self.process(&mut service).await;
service.stopping();
});
}
async fn process(&mut self, service: &mut S) {
while let Some(msg) = self.rx.recv().await {
let action = msg.handle(service, self);
match action {
ServiceAction::Stop => break,
ServiceAction::Continue => continue,
ServiceAction::Execute(fut) => fut.await,
}
}
}
pub fn stop(&mut self) {
self.rx.close()
}
pub fn link(&self) -> Link<S> {
self.link.clone()
}
pub fn shared_link(&self) -> &Link<S> {
&self.link
}
pub fn run_later<F>(&self, duration: Duration, action: F) -> JoinHandle<()>
where
F: FnOnce(&mut S, &mut ServiceContext<S>) + Send + 'static,
{
let link = self.link();
tokio::spawn(async move {
sleep(duration).await;
let _ = link.do_exec(action);
})
}
pub fn run_interval<F>(&self, duration: Duration, action: F) -> JoinHandle<()>
where
F: FnOnce(&mut S, &mut ServiceContext<S>) + Clone + Send + 'static,
{
let link = self.link();
tokio::spawn(async move {
loop {
sleep(duration).await;
if link.do_exec(action.clone()).is_err() {
break;
}
}
})
}
}