use std::error::Error;
use std::fmt::Debug;
#[async_trait::async_trait]
pub trait SupervisorBuilder: Sized {
type Handle: SupervisorHandle;
type Error: Error + Send + Sync + 'static;
async fn build(self) -> Result<Self::Handle, Self::Error>;
}
#[derive(Debug, thiserror::Error)]
pub enum BuilderError {
#[error("Failed to register writer: {0}")]
WriterRegistrationError(String),
#[error("Failed to create context: {0}")]
ContextCreationError(String),
#[error("Failed to spawn supervisor: {0}")]
SpawnError(String),
#[error("Channel creation failed")]
ChannelError,
#[error("Other error: {0}")]
Other(String),
}
#[async_trait::async_trait]
pub trait SupervisorHandle: Send + Sync {
type Event: Debug + Send + 'static;
type State: Clone + Debug + Send + Sync + 'static;
type Error: Error + Send + Sync + 'static;
async fn send_event(&self, event: Self::Event) -> Result<(), Self::Error>;
fn current_state(&self) -> Self::State;
async fn wait_for_completion(self) -> Result<(), Self::Error>;
}
#[derive(Debug, thiserror::Error)]
pub enum HandleError {
#[error("Supervisor is not running")]
SupervisorNotRunning,
#[error("Failed to send event: {0}")]
SendError(String),
#[error("Supervisor task panicked: {0}")]
SupervisorPanicked(String),
#[error("Supervisor task failed: {0}")]
SupervisorFailed(String),
}
pub struct ChannelBuilder<E, S> {
event_buffer: usize,
_phantom: std::marker::PhantomData<(E, S)>,
}
impl<E, S> Default for ChannelBuilder<E, S> {
fn default() -> Self {
Self {
event_buffer: 100,
_phantom: std::marker::PhantomData,
}
}
}
impl<E, S> ChannelBuilder<E, S>
where
E: Debug + Send + 'static,
S: Clone + Debug + Send + Sync + 'static,
{
pub fn new() -> Self {
Self::default()
}
pub fn with_event_buffer(mut self, size: usize) -> Self {
self.event_buffer = size;
self
}
pub fn build(self, initial_state: S) -> (EventSender<E>, EventReceiver<E>, StateWatcher<S>) {
let (event_tx, event_rx) = tokio::sync::mpsc::channel(self.event_buffer);
let (state_tx, state_rx) = tokio::sync::watch::channel(initial_state);
(
EventSender { tx: event_tx },
EventReceiver { rx: event_rx },
StateWatcher {
tx: state_tx,
rx: state_rx,
},
)
}
}
pub struct EventSender<E> {
tx: tokio::sync::mpsc::Sender<E>,
}
impl<E: Debug + Send + 'static> EventSender<E> {
pub async fn send(&self, event: E) -> Result<(), HandleError> {
self.tx
.send(event)
.await
.map_err(|_| HandleError::SupervisorNotRunning)
}
pub fn try_send(&self, event: E) -> Result<(), HandleError> {
self.tx
.try_send(event)
.map_err(|_| HandleError::SupervisorNotRunning)
}
}
impl<E> Clone for EventSender<E> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
pub struct EventReceiver<E> {
rx: tokio::sync::mpsc::Receiver<E>,
}
impl<E> EventReceiver<E> {
pub async fn recv(&mut self) -> Option<E> {
self.rx.recv().await
}
pub fn try_recv(&mut self) -> Result<E, tokio::sync::mpsc::error::TryRecvError> {
self.rx.try_recv()
}
}
pub struct StateWatcher<S> {
tx: tokio::sync::watch::Sender<S>,
rx: tokio::sync::watch::Receiver<S>,
}
impl<S: Clone> StateWatcher<S> {
pub fn update(&self, state: S) -> Result<(), HandleError> {
self.tx
.send(state)
.map_err(|_| HandleError::SupervisorNotRunning)
}
pub fn subscribe(&self) -> tokio::sync::watch::Receiver<S> {
self.rx.clone()
}
pub fn current(&self) -> S {
self.rx.borrow().clone()
}
}
impl<S> Clone for StateWatcher<S> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
rx: self.rx.clone(),
}
}
}