use crate::{
supervision::{Connector, SupervisorHandle, SupervisorLoop},
ConnectionState, NorthwardData, NorthwardError, NorthwardResult, Plugin,
};
use async_trait::async_trait;
use downcast_rs::{impl_downcast, DowncastSync};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::{watch, Mutex};
#[async_trait]
pub trait NorthwardHandle: DowncastSync + Send + Sync + 'static {
async fn process_data(&self, data: Arc<NorthwardData>) -> NorthwardResult<()>;
async fn ping(&self) -> NorthwardResult<std::time::Duration> {
Err(NorthwardError::NotConnected)
}
}
impl_downcast!(sync NorthwardHandle);
pub struct SupervisedPlugin<C>
where
C: Connector,
C::Handle: NorthwardHandle,
{
supervisor: Arc<SupervisorLoop<C>>,
started: AtomicBool,
handle: Mutex<Option<SupervisorHandle>>,
}
impl<C> SupervisedPlugin<C>
where
C: Connector,
C::Handle: NorthwardHandle,
{
#[inline]
pub fn new(supervisor: SupervisorLoop<C>) -> Self {
Self {
supervisor: Arc::new(supervisor),
started: AtomicBool::new(false),
handle: Mutex::new(None),
}
}
#[inline]
fn load_handle(&self) -> NorthwardResult<Arc<C::Handle>> {
self.supervisor
.load_handle()
.ok_or(NorthwardError::NotConnected)
}
}
#[async_trait]
impl<C> Plugin for SupervisedPlugin<C>
where
C: Connector,
C::Handle: NorthwardHandle,
{
async fn start(&self) -> NorthwardResult<()> {
if self
.started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Ok(());
}
let h = Arc::clone(&self.supervisor).start();
*self.handle.lock().await = Some(h);
Ok(())
}
fn subscribe_connection_state(&self) -> watch::Receiver<Arc<ConnectionState>> {
self.supervisor.subscribe_state()
}
async fn process_data(&self, data: Arc<NorthwardData>) -> NorthwardResult<()> {
let h = self.load_handle()?;
h.process_data(data).await
}
async fn stop(&self) -> NorthwardResult<()> {
if let Some(h) = self.handle.lock().await.take() {
h.stop();
}
Ok(())
}
}