use crate::actors::connector;
use anyhow::Error;
use async_trait::async_trait;
use futures::Future;
use meio::Action;
use rill_protocol::flow::core::{ActionEnvelope, Flow, FlowMode};
use rill_protocol::io::provider::{Description, Path, ProviderProtocol};
use rill_protocol::io::transport::Direction;
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use tokio::sync::mpsc;
#[derive(Debug)]
pub(crate) struct EventEnvelope<T: Flow> {
pub direction: Option<Direction<ProviderProtocol>>,
pub event: T::Event,
}
pub(crate) enum ControlEvent<T> {
Flush,
AttachCallback { callback: BoxedCallback<T> },
DetachCallback,
}
impl<T: Flow> Action for EventEnvelope<T> {}
pub(crate) type DataSender<T> = mpsc::UnboundedSender<EventEnvelope<T>>;
pub(crate) type DataReceiver<T> = mpsc::UnboundedReceiver<EventEnvelope<T>>;
pub(crate) type ControlSender<T> = mpsc::UnboundedSender<ControlEvent<T>>;
pub(crate) type ControlReceiver<T> = mpsc::UnboundedReceiver<ControlEvent<T>>;
pub type ActionSender<T> = mpsc::UnboundedSender<ActionEnvelope<T>>;
pub type ActionReceiver<T> = mpsc::UnboundedReceiver<ActionEnvelope<T>>;
pub fn channel<T: Flow>() -> (ActionSender<T>, ActionReceiver<T>) {
mpsc::unbounded_channel()
}
pub(crate) struct TracerOperator<T: Flow> {
pub mode: TracerMode<T>,
pub control_rx: Option<ControlReceiver<T>>,
}
pub(crate) enum TracerMode<T: Flow> {
Push {
state: T,
receiver: Option<DataReceiver<T>>,
},
Pull {
state: Weak<Mutex<T>>,
interval: Option<Duration>,
},
}
#[derive(Debug)]
enum InnerMode<T: Flow> {
Push {
sender: DataSender<T>,
},
Pull {
state: Arc<Mutex<T>>,
},
}
impl<T: Flow> Clone for InnerMode<T> {
fn clone(&self) -> Self {
match self {
Self::Push { sender } => Self::Push {
sender: sender.clone(),
},
Self::Pull { state } => Self::Pull {
state: state.clone(),
},
}
}
}
#[derive(Debug)]
pub struct Tracer<T: Flow> {
description: Arc<Description>,
control_tx: ControlSender<T>,
mode: InnerMode<T>,
}
impl<T: Flow> Clone for Tracer<T> {
fn clone(&self) -> Self {
Self {
description: self.description.clone(),
control_tx: self.control_tx.clone(),
mode: self.mode.clone(),
}
}
}
impl<T: Flow> PartialEq for Tracer<T> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.description, &other.description)
}
}
impl<T: Flow> Eq for Tracer<T> {}
impl<T: Flow> Tracer<T> {
pub fn new(state: T, path: Path, mode: FlowMode) -> Self {
match mode {
FlowMode::Realtime => Self::new_push(state, path),
FlowMode::Throttle { ms } => {
Self::new_pull(state, path, Some(Duration::from_millis(ms)))
}
FlowMode::FlushOnly => Self::new_pull(state, path, None),
}
}
pub fn new_push(state: T, path: Path) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let mode = TracerMode::Push {
state,
receiver: Some(rx),
};
let inner_mode = InnerMode::Push { sender: tx };
Self::new_inner(path, inner_mode, mode)
}
pub fn new_pull(state: T, path: Path, interval: Option<Duration>) -> Self {
let state = Arc::new(Mutex::new(state));
let mode = TracerMode::Pull {
state: Arc::downgrade(&state),
interval,
};
let inner_mode = InnerMode::Pull { state };
Self::new_inner(path, inner_mode, mode)
}
fn new_inner(path: Path, inner_mode: InnerMode<T>, mode: TracerMode<T>) -> Self {
let (control_tx, control_rx) = mpsc::unbounded_channel();
let operator = TracerOperator {
mode,
control_rx: Some(control_rx),
};
let stream_type = T::stream_type();
let description = Description { path, stream_type };
log::trace!("Creating Tracer with path: {}", description.path);
let description = Arc::new(description);
let this = Tracer {
description: description.clone(),
control_tx,
mode: inner_mode,
};
if let Err(err) = connector::DISTRIBUTOR.register_tracer(description, operator) {
log::error!(
"Can't register a Tracer. The worker can be terminated already: {}",
err
);
}
this
}
pub fn path(&self) -> &Path {
&self.description.path
}
pub fn description(&self) -> &Description {
&self.description
}
pub fn send(&self, event: T::Event, direction: Option<Direction<ProviderProtocol>>) {
match &self.mode {
InnerMode::Push { sender, .. } => {
let envelope = EventEnvelope { direction, event };
if let Err(err) = sender.send(envelope) {
log::error!("Can't transfer data to sender of {}: {}", self.path(), err);
}
}
InnerMode::Pull { state, .. } => match state.lock() {
Ok(ref mut state) => {
T::apply(state, event);
}
Err(err) => {
log::error!(
"Can't lock the mutex to apply the changes of {}: {}",
self.path(),
err
);
}
},
}
}
pub fn flush(&self) {
let event = ControlEvent::Flush;
if let Err(err) = self.control_tx.send(event) {
log::error!("Can't send a flush event to {}: {}", self.path(), err);
}
}
pub fn sync_callback<F>(&self, callback: F)
where
F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
F: Send + Sync + 'static,
{
let sync_callback = SyncCallback::new(callback);
let callback = Box::new(sync_callback);
let event = ControlEvent::AttachCallback { callback };
if let Err(err) = self.control_tx.send(event) {
log::error!("Can't attach the callback from {}: {}", self.path(), err);
}
}
pub fn async_callback<F, Fut>(&self, callback: F)
where
F: Fn(ActionEnvelope<T>) -> Fut,
F: Send + Sync + 'static,
Fut: Future<Output = Result<(), Error>>,
Fut: Send + 'static,
{
let async_callback = AsyncCallback::new(callback);
let callback = Box::new(async_callback);
let event = ControlEvent::AttachCallback { callback };
if let Err(err) = self.control_tx.send(event) {
log::error!("Can't attach the callback from {}: {}", self.path(), err);
}
}
pub fn detach_callback(&self) {
let event = ControlEvent::DetachCallback;
if let Err(err) = self.control_tx.send(event) {
log::error!("Can't detach the callback from {}: {}", self.path(), err);
}
}
}
#[async_trait]
pub trait ActionCallback<T: Flow>: Send + 'static {
async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error>;
}
pub type BoxedCallback<T> = Box<dyn ActionCallback<T>>;
struct SyncCallback<F> {
func: Arc<F>,
}
impl<F> SyncCallback<F> {
fn new(func: F) -> Self {
Self {
func: Arc::new(func),
}
}
}
#[async_trait]
impl<T, F> ActionCallback<T> for SyncCallback<F>
where
T: Flow,
F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
F: Send + Sync + 'static,
{
async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
let func = self.func.clone();
tokio::task::spawn_blocking(move || func(envelope))
.await
.map_err(Error::from)
.and_then(std::convert::identity)
}
}
use std::marker::PhantomData;
struct AsyncCallback<F, Fut> {
func: F,
fut: PhantomData<Fut>,
}
impl<F, Fut> AsyncCallback<F, Fut> {
fn new(func: F) -> Self {
Self {
func,
fut: PhantomData,
}
}
}
#[async_trait]
impl<T, F, Fut> ActionCallback<T> for AsyncCallback<F, Fut>
where
T: Flow,
F: Fn(ActionEnvelope<T>) -> Fut,
F: Send + Sync + 'static,
Fut: Future<Output = Result<(), Error>>,
Fut: Send + 'static,
{
async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
(self.func)(envelope).await
}
}