use super::builder::{EventSender, HandleError, StateWatcher, SupervisorHandle};
use std::fmt::Debug;
use std::marker::PhantomData;
use tokio::task::JoinHandle;
pub struct HandleBuilder<E, S> {
event_sender: Option<EventSender<E>>,
state_watcher: Option<StateWatcher<S>>,
supervisor_task: Option<JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>>,
_phantom: PhantomData<(E, S)>,
}
impl<E, S> Default for HandleBuilder<E, S> {
fn default() -> Self {
Self {
event_sender: None,
state_watcher: None,
supervisor_task: None,
_phantom: PhantomData,
}
}
}
impl<E, S> HandleBuilder<E, S>
where
E: Debug + Send + 'static,
S: Clone + Debug + Send + Sync + 'static,
{
pub fn new() -> Self {
Self::default()
}
pub fn with_event_sender(mut self, sender: EventSender<E>) -> Self {
self.event_sender = Some(sender);
self
}
pub fn with_state_watcher(mut self, watcher: StateWatcher<S>) -> Self {
self.state_watcher = Some(watcher);
self
}
pub fn with_supervisor_task(
mut self,
task: JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>,
) -> Self {
self.supervisor_task = Some(task);
self
}
pub fn build_standard(self) -> Result<StandardHandle<E, S>, &'static str> {
let event_sender = self.event_sender.ok_or("Event sender is required")?;
let state_watcher = self.state_watcher.ok_or("State watcher is required")?;
let supervisor_task = self.supervisor_task.ok_or("Supervisor task is required")?;
Ok(StandardHandle {
event_sender,
state_watcher,
supervisor_task: Some(supervisor_task),
})
}
pub fn build_custom<H, F>(self, constructor: F) -> Result<H, &'static str>
where
F: FnOnce(
EventSender<E>,
StateWatcher<S>,
JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>,
) -> H,
{
let event_sender = self.event_sender.ok_or("Event sender is required")?;
let state_watcher = self.state_watcher.ok_or("State watcher is required")?;
let supervisor_task = self.supervisor_task.ok_or("Supervisor task is required")?;
Ok(constructor(event_sender, state_watcher, supervisor_task))
}
}
pub struct StandardHandle<E, S> {
event_sender: EventSender<E>,
state_watcher: StateWatcher<S>,
supervisor_task: Option<JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>>,
}
impl<E, S> StandardHandle<E, S>
where
E: Debug + Send + 'static,
S: Clone + Debug + Send + Sync + 'static,
{
pub fn state_receiver(&self) -> tokio::sync::watch::Receiver<S> {
self.state_watcher.subscribe()
}
pub fn is_running(&self) -> bool {
self.supervisor_task
.as_ref()
.map(|task| !task.is_finished())
.unwrap_or(false)
}
}
#[async_trait::async_trait]
impl<E, S> SupervisorHandle for StandardHandle<E, S>
where
E: Debug + Send + 'static,
S: Clone + Debug + Send + Sync + 'static,
{
type Event = E;
type State = S;
type Error = HandleError;
async fn send_event(&self, event: Self::Event) -> Result<(), Self::Error> {
self.event_sender.send(event).await
}
fn current_state(&self) -> Self::State {
self.state_watcher.current()
}
async fn wait_for_completion(mut self) -> Result<(), Self::Error> {
if let Some(task) = self.supervisor_task.take() {
match task.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(HandleError::SupervisorFailed(e.to_string())),
Err(e) => Err(HandleError::SupervisorPanicked(e.to_string())),
}
} else {
Err(HandleError::SupervisorNotRunning)
}
}
}
pub struct SupervisorTaskBuilder<S> {
name: String,
_phantom: std::marker::PhantomData<S>,
}
impl<S> SupervisorTaskBuilder<S>
where
S: Send + 'static,
{
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
_phantom: std::marker::PhantomData,
}
}
pub fn spawn<F, Fut>(
self,
supervisor_fn: F,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send
+ 'static,
{
let name = self.name;
let name_clone = name.clone();
let name_clone2 = name.clone();
let name_clone3 = name.clone();
let name_clone4 = name.clone();
let is_pipeline = name.contains("pipeline");
tracing::debug!(
"🚀 SupervisorTaskBuilder::spawn called for {} (is_pipeline: {})",
name,
is_pipeline
);
tracing::trace!("🔵 About to call supervisor_fn() for {}", name_clone);
let future = supervisor_fn();
tracing::trace!("🟢 supervisor_fn() returned future for {}", name_clone);
tracing::trace!("📦 Future size: {} bytes", std::mem::size_of_val(&future));
tracing::trace!("⚡ About to call tokio::spawn for {}", name_clone2);
let wrapped_future = async move {
tracing::trace!("🎯 WRAPPER: Task {} started executing!", name_clone3);
let result = future.await;
match &result {
Ok(_) => {
tracing::debug!("✅ Supervisor task {} completed successfully", name_clone3);
}
Err(e) => {
tracing::error!("❌ Supervisor task {} failed: {}", name_clone3, e);
}
}
tracing::trace!("🎯 WRAPPER: Task {} completed!", name_clone3);
result
};
let handle = tokio::spawn(wrapped_future);
tracing::trace!("⚡ tokio::spawn returned for {}", name_clone2);
tracing::debug!(
"🚀 SupervisorTaskBuilder::spawn returning handle for {}",
name_clone4
);
handle
}
}