use std::{
mem,
sync::mpsc::{Receiver, channel},
thread::{self, JoinHandle},
};
use crate::{Result, ServiceApp};
pub struct BaseService<F, R> {
name: String,
service_fn: Option<F>,
sender_fn: Box<dyn (Fn() -> Result<()>) + Send>,
receiver: Option<R>,
handle: Option<JoinHandle<Result<()>>>,
is_service: bool,
}
impl<F, R> BaseService<F, R>
where
F: FnOnce(R, bool) -> Result<()>,
{
pub fn new(
name: impl Into<String>,
service_fn: F,
is_service: bool,
sender_fn: impl Fn() -> Result<()> + Send + 'static,
receiver: R,
) -> Self {
Self {
name: name.into(),
service_fn: Some(service_fn),
sender_fn: Box::new(sender_fn),
receiver: Some(receiver),
handle: None,
is_service,
}
}
}
impl<F> BaseService<F, Receiver<()>>
where
F: FnOnce(Receiver<()>, bool) -> Result<()>,
{
pub fn new_sync(name: impl Into<String>, service_fn: F, is_service: bool) -> Self {
let (sender, receiver) = channel();
let sender = move || {
sender.send(())?;
Ok(())
};
Self::new(name, service_fn, is_service, sender, receiver)
}
}
#[cfg(feature = "tokio")]
impl<F> BaseService<F, tokio::sync::mpsc::Receiver<()>>
where
F: FnOnce(tokio::sync::mpsc::Receiver<()>, bool) -> Result<()>,
{
pub fn new_tokio(name: impl Into<String>, service_fn: F, is_service: bool) -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(1);
let sender = move || {
sender.blocking_send(())?;
Ok(())
};
Self::new(name, service_fn, is_service, sender, receiver)
}
}
impl<F, R> ServiceApp for BaseService<F, R>
where
F: FnOnce(R, bool) -> Result<()> + Send + 'static,
R: Send + 'static,
{
fn name(&self) -> &str {
&self.name
}
fn start(&mut self) -> Result<()> {
tracing::info!("Starting service '{}'...", self.name);
let receiver = mem::take(&mut self.receiver).ok_or("Receiver not found")?;
let is_service = self.is_service;
let service_fn = mem::take(&mut self.service_fn).ok_or("Service function not found")?;
self.handle = Some(thread::spawn(move || service_fn(receiver, is_service)));
Ok(())
}
fn stop(&mut self) -> Result<()> {
tracing::info!("Stopping service '{}'...", self.name);
(self.sender_fn)()?;
let handle = mem::take(&mut self.handle);
if let Some(handle) = handle {
handle.join().map_err(|_| "Error joining thread")??;
}
tracing::info!("Service '{}' is shut down.", self.name);
Ok(())
}
}