use super::super::{
supervision::{Connector, SupervisorHandle, SupervisorLoop},
CollectItem, CollectionGroupKey, ConnectionState, Driver, DriverError, DriverResult,
ExecuteResult, NGValue, NorthwardData, RuntimeAction, RuntimeDelta, RuntimeDevice,
RuntimeParameter, RuntimePoint, WriteResult,
};
use async_trait::async_trait;
use downcast_rs::{impl_downcast, DowncastSync};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::{watch, Mutex};
#[async_trait]
pub trait SouthwardHandle: DowncastSync + Send + Sync + 'static {
fn collection_group_key(&self, device: &dyn RuntimeDevice) -> Option<CollectionGroupKey> {
let _ = device;
None
}
fn collect_max_inflight(&self) -> usize {
1
}
async fn collect_data(&self, items: &[CollectItem]) -> DriverResult<Vec<NorthwardData>>;
async fn execute_action(
&self,
device: Arc<dyn RuntimeDevice>,
action: Arc<dyn RuntimeAction>,
parameters: Vec<(Arc<dyn RuntimeParameter>, NGValue)>,
) -> DriverResult<ExecuteResult>;
async fn write_point(
&self,
device: Arc<dyn RuntimeDevice>,
point: Arc<dyn RuntimePoint>,
value: &NGValue,
timeout_ms: Option<u64>,
) -> DriverResult<WriteResult>;
async fn apply_runtime_delta(&self, delta: RuntimeDelta) -> DriverResult<()> {
let _ = delta;
Ok(())
}
}
impl_downcast!(sync SouthwardHandle);
pub struct SupervisedDriver<C>
where
C: Connector,
C::Handle: SouthwardHandle,
{
supervisor: Arc<SupervisorLoop<C>>,
started: AtomicBool,
handle: Mutex<Option<SupervisorHandle>>,
collect_max_inflight_default: usize,
}
impl<C> SupervisedDriver<C>
where
C: Connector,
C::Handle: SouthwardHandle,
{
#[inline]
pub fn new(supervisor: SupervisorLoop<C>) -> Self {
Self::new_with_collect_max_inflight(supervisor, 1)
}
#[inline]
pub fn new_with_collect_max_inflight(
supervisor: SupervisorLoop<C>,
collect_max_inflight_default: usize,
) -> Self {
Self {
supervisor: Arc::new(supervisor),
started: AtomicBool::new(false),
handle: Mutex::new(None),
collect_max_inflight_default: collect_max_inflight_default.max(1),
}
}
#[inline]
fn load_handle(&self) -> DriverResult<Arc<C::Handle>> {
self.supervisor
.load_handle()
.ok_or(DriverError::ServiceUnavailable)
}
}
#[async_trait]
impl<C> Driver for SupervisedDriver<C>
where
C: Connector,
C::Handle: SouthwardHandle,
{
#[inline]
async fn start(&self) -> DriverResult<()> {
if self
.started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Ok(());
}
let h = Arc::clone(&self.supervisor).start();
*self.handle.lock().await = Some(h);
Ok(())
}
#[inline]
async fn stop(&self) -> DriverResult<()> {
if let Some(h) = self.handle.lock().await.take() {
h.stop();
}
Ok(())
}
#[inline]
fn collection_group_key(&self, device: &dyn RuntimeDevice) -> Option<CollectionGroupKey> {
self.supervisor
.load_handle()
.and_then(|h| h.collection_group_key(device))
}
#[inline]
fn collect_max_inflight(&self) -> usize {
self.supervisor
.load_handle()
.map(|h| h.collect_max_inflight())
.unwrap_or(self.collect_max_inflight_default)
}
#[inline]
async fn collect_data(&self, items: &[CollectItem]) -> DriverResult<Vec<NorthwardData>> {
let h = self.load_handle()?;
h.collect_data(items).await
}
#[inline]
async fn execute_action(
&self,
device: Arc<dyn RuntimeDevice>,
action: Arc<dyn RuntimeAction>,
parameters: Vec<(Arc<dyn RuntimeParameter>, NGValue)>,
) -> DriverResult<ExecuteResult> {
let h = self.load_handle()?;
h.execute_action(device, action, parameters).await
}
#[inline]
async fn write_point(
&self,
device: Arc<dyn RuntimeDevice>,
point: Arc<dyn RuntimePoint>,
value: &NGValue,
timeout_ms: Option<u64>,
) -> DriverResult<WriteResult> {
let h = self.load_handle()?;
h.write_point(device, point, value, timeout_ms).await
}
#[inline]
fn subscribe_connection_state(&self) -> watch::Receiver<Arc<ConnectionState>> {
self.supervisor.subscribe_state()
}
#[inline]
async fn apply_runtime_delta(&self, delta: RuntimeDelta) -> DriverResult<()> {
let h = self.load_handle()?;
h.apply_runtime_delta(delta).await
}
}