use super::{Group, Recorder};
use crate::tracers::tracer::{ActionReceiver, BoxedCallback};
use anyhow::Error;
use async_trait::async_trait;
use meio::{Context, IdOf, LiteTask, TaskEliminated, TaskError};
use rill_protocol::flow::core;
use rill_protocol::io::provider::Description;
use std::sync::Arc;
use tokio::sync::mpsc;
impl<T: core::Flow> Recorder<T> {
pub(super) fn attach_callback(&mut self, callback: BoxedCallback<T>, ctx: &mut Context<Self>) {
let (tx, rx) = mpsc::unbounded_channel();
self.callback = Some(tx);
let worker = CallbackWorker {
description: self.description.clone(),
receiver: rx,
callback,
};
ctx.spawn_task(worker, (), Group::Callback);
}
pub(super) fn detach_callback(&mut self, _ctx: &mut Context<Self>) {
self.callback.take();
}
}
pub struct CallbackWorker<T: core::Flow> {
description: Arc<Description>,
receiver: ActionReceiver<T>,
callback: BoxedCallback<T>,
}
#[async_trait]
impl<T: core::Flow> LiteTask for CallbackWorker<T> {
type Output = ();
async fn interruptable_routine(mut self) -> Result<Self::Output, Error> {
while let Some(envelope) = self.receiver.recv().await {
if let Err(err) = self.callback.handle_activity(envelope).await {
log::error!("Callback of {} failed: {}", self.description.path, err);
}
}
Ok(())
}
}
#[async_trait]
impl<T: core::Flow> TaskEliminated<CallbackWorker<T>, ()> for Recorder<T> {
async fn handle(
&mut self,
_id: IdOf<CallbackWorker<T>>,
_tag: (),
_result: Result<(), TaskError>,
_ctx: &mut Context<Self>,
) -> Result<(), Error> {
Ok(())
}
}