pub(crate) mod codec;
pub mod log;
pub(crate) mod model;
pub mod probe;
pub mod supervised;
pub mod transport;
pub(crate) mod types;
pub(crate) mod validation;
pub mod wire;
use crate::{ConnectionState, DriverResult, NGValue, NorthwardData, Transform};
use async_trait::async_trait;
use downcast_rs::{impl_downcast, DowncastSync};
use model::{
ActionModel, ChannelModel, ConnectionPolicy, DeviceModel, PointModel, SouthwardInitContext,
};
use std::{fmt, fmt::Debug, sync::Arc};
use tokio::sync::watch;
use types::{AccessMode, CollectionType, DataPointType, DataType, ReportType, Status};
#[derive(Debug, Clone)]
pub struct ExecuteResult {
pub outcome: ExecuteOutcome,
pub payload: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecuteOutcome {
Completed,
Queued,
}
#[macro_export]
macro_rules! ng_driver_factory {
(name = $name:expr, description = $description:expr, driver_type = $driver_type:expr, component = $component:ty, metadata_fn = $metadata_fn:path, model_convert = $model_convert:ty $(, channel_capacity = $cap:expr)? $(, collect_max_inflight = $collect_max_inflight:expr)? $(,)?) => {
struct __NgComponentDriverFactory {
model_convert: $model_convert,
}
impl __NgComponentDriverFactory {
#[inline]
fn new() -> Self {
Self {
model_convert: <$model_convert as ::core::default::Default>::default(),
}
}
}
impl ::core::default::Default for __NgComponentDriverFactory {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl $crate::DriverFactory for __NgComponentDriverFactory {
fn create_driver(
&self,
ctx: $crate::SouthwardInitContext,
) -> $crate::DriverResult<Box<dyn $crate::Driver>> {
fn __assert_handle_is_southward_handle<H: $crate::SouthwardHandle>() {}
__assert_handle_is_southward_handle::<<$component as $crate::supervision::Connector>::Handle>();
fn __assert_init_ctx_is_southward_init_context<C>()
where
C: $crate::supervision::Connector<InitContext = $crate::SouthwardInitContext>,
{
}
__assert_init_ctx_is_southward_init_context::<$component>();
use $crate::export::tracing::info_span;
let span = info_span!(
"southward-driver",
channel_id = ctx.channel_id,
driver_type = $driver_type
);
let observer = ctx.observer_factory.create_southward(
$crate::supervision::SouthwardObserverLabels {
channel_id: ctx.channel_id,
driver_kind: ::std::sync::Arc::<str>::from($driver_type),
}
);
let retry_policy = ctx.runtime_channel.connection_policy().backoff.clone();
let connector = <$component as $crate::supervision::Connector>::new(ctx)?;
let params = $crate::supervision::SupervisorParams {
retry_policy,
reconnect_queue: 8,
};
let (loop_, _state_rx) = $crate::supervision::SupervisorLoop::new_with_span(
connector,
params,
observer,
span,
);
let collect_max_inflight: usize = 1usize;
$(let collect_max_inflight: usize = $collect_max_inflight;)?
let driver = $crate::SupervisedDriver::new_with_collect_max_inflight(loop_, collect_max_inflight);
Ok(Box::new(driver))
}
fn convert_runtime_channel(
&self,
channel: $crate::ChannelModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimeChannel>> {
<$model_convert as $crate::supervision::converter::SouthwardModelConverter>::convert_runtime_channel(
&self.model_convert,
channel,
)
}
fn convert_runtime_device(
&self,
device: $crate::DeviceModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimeDevice>> {
<$model_convert as $crate::supervision::converter::SouthwardModelConverter>::convert_runtime_device(
&self.model_convert,
device,
)
}
fn convert_runtime_point(
&self,
point: $crate::PointModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimePoint>> {
<$model_convert as $crate::supervision::converter::SouthwardModelConverter>::convert_runtime_point(
&self.model_convert,
point,
)
}
fn convert_runtime_action(
&self,
action: $crate::ActionModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimeAction>> {
<$model_convert as $crate::supervision::converter::SouthwardModelConverter>::convert_runtime_action(
&self.model_convert,
action,
)
}
}
$crate::ng_driver_factory!(
@core name = $name,
description = Some($description),
driver_type = $driver_type,
factory_ty = __NgComponentDriverFactory,
metadata_fn = $metadata_fn,
channel_capacity = 100 $(+ $cap * 0 + $cap)?
);
};
(name = $name:expr, driver_type = $driver_type:expr, component = $component:ty, metadata_fn = $metadata_fn:path, model_convert = $model_convert:ty $(, channel_capacity = $cap:expr)? $(, collect_max_inflight = $collect_max_inflight:expr)? $(,)?) => {
struct __NgComponentDriverFactory {
model_convert: $model_convert,
}
impl __NgComponentDriverFactory {
#[inline]
fn new() -> Self {
Self {
model_convert: <$model_convert as ::core::default::Default>::default(),
}
}
}
impl ::core::default::Default for __NgComponentDriverFactory {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl $crate::DriverFactory for __NgComponentDriverFactory {
fn create_driver(
&self,
ctx: $crate::SouthwardInitContext,
) -> $crate::DriverResult<Box<dyn $crate::Driver>> {
fn __assert_handle_is_southward_handle<H: $crate::SouthwardHandle>() {}
__assert_handle_is_southward_handle::<<$component as $crate::supervision::Connector>::Handle>();
fn __assert_init_ctx_is_southward_init_context<C>()
where
C: $crate::supervision::Connector<InitContext = $crate::SouthwardInitContext>,
{
}
__assert_init_ctx_is_southward_init_context::<$component>();
use $crate::export::tracing::info_span;
let span = info_span!(
"southward-driver",
channel_id = ctx.channel_id,
driver_type = $driver_type
);
let observer = ctx.observer_factory.create_southward(
$crate::supervision::SouthwardObserverLabels {
channel_id: ctx.channel_id,
driver_kind: ::std::sync::Arc::<str>::from($driver_type),
}
);
let retry_policy = ctx.runtime_channel.connection_policy().backoff.clone();
let connector = <$component as $crate::supervision::Connector>::new(ctx)?;
let params = $crate::supervision::SupervisorParams {
retry_policy,
reconnect_queue: 8,
};
let (loop_, _state_rx) = $crate::supervision::SupervisorLoop::new_with_span(
connector,
params,
observer,
span,
);
let collect_max_inflight: usize = 1usize;
$(let collect_max_inflight: usize = $collect_max_inflight;)?
let driver = $crate::SupervisedDriver::new_with_collect_max_inflight(loop_, collect_max_inflight);
Ok(Box::new(driver))
}
fn convert_runtime_channel(
&self,
channel: $crate::ChannelModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimeChannel>> {
<$model_convert as $crate::supervision::model_convert::SouthwardModelConverter>::convert_runtime_channel(
&self.model_convert,
channel,
)
}
fn convert_runtime_device(
&self,
device: $crate::DeviceModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimeDevice>> {
<$model_convert as $crate::supervision::model_convert::SouthwardModelConverter>::convert_runtime_device(
&self.model_convert,
device,
)
}
fn convert_runtime_point(
&self,
point: $crate::PointModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimePoint>> {
<$model_convert as $crate::supervision::model_convert::SouthwardModelConverter>::convert_runtime_point(
&self.model_convert,
point,
)
}
fn convert_runtime_action(
&self,
action: $crate::ActionModel,
) -> $crate::DriverResult<std::sync::Arc<dyn $crate::RuntimeAction>> {
<$model_convert as $crate::supervision::model_convert::SouthwardModelConverter>::convert_runtime_action(
&self.model_convert,
action,
)
}
}
$crate::ng_driver_factory!(
@core name = $name,
description = None,
driver_type = $driver_type,
factory_ty = __NgComponentDriverFactory,
metadata_fn = $metadata_fn,
channel_capacity = 100 $(+ $cap * 0 + $cap)?
);
};
(@core name = $name:expr, description = $desc_opt:expr, driver_type = $driver_type:expr, factory_ty = $factory:ty, metadata_fn = $metadata_fn:path, channel_capacity = $cap:expr) => {
#[no_mangle]
pub extern "C" fn ng_driver_api_version() -> u32 {
$crate::sdk::sdk_api_version()
}
#[no_mangle]
pub extern "C" fn ng_driver_sdk_version() -> *const ::std::os::raw::c_char {
static SDK_VER: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized($crate::sdk::SDK_VERSION))
};
SDK_VER.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_driver_version() -> *const ::std::os::raw::c_char {
static VER: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized(env!("CARGO_PKG_VERSION")))
};
VER.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_driver_type() -> *const ::std::os::raw::c_char {
static TYPE_STR: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized($driver_type))
};
TYPE_STR.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_driver_name() -> *const ::std::os::raw::c_char {
static NAME_STR: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized($name))
};
NAME_STR.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_driver_description() -> *const ::std::os::raw::c_char {
static DESC_STR: $crate::export::once_cell::sync::Lazy<Option<::std::ffi::CString>> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $desc_opt.map($crate::ffi::cstring_sanitized))
};
match DESC_STR.as_ref() {
Some(c) => c.as_ptr(),
None => ::std::ptr::null(),
}
}
#[doc(hidden)]
pub static NG_DRIVER_METADATA_JSON: $crate::export::once_cell::sync::Lazy<Vec<u8>> = {
use $crate::export::once_cell::sync::Lazy;
use $crate::export::serde_json;
Lazy::new(|| {
let md: $crate::DriverSchemas = $metadata_fn();
serde_json::to_vec(&md).unwrap_or_else(|_| ::std::vec::Vec::new())
})
};
#[no_mangle]
pub unsafe extern "C" fn ng_driver_metadata_json_ptr(
out_ptr: *mut *const u8,
out_len: *mut usize,
) {
$crate::ffi::write_slice_ptr_len(out_ptr, out_len, &NG_DRIVER_METADATA_JSON);
}
#[no_mangle]
pub extern "C" fn ng_driver_set_log_sink(sink: $crate::log::LogSinkV1) -> u32 {
$crate::southward::log::set_log_sink(sink)
}
#[no_mangle]
pub extern "C" fn ng_driver_set_max_level(level: u8) -> u32 {
$crate::southward::log::set_max_level(level)
}
#[no_mangle]
pub extern "C" fn ng_driver_get_max_level() -> u8 {
$crate::southward::log::get_max_level()
}
#[no_mangle]
pub extern "C" fn create_driver_factory() -> *mut dyn $crate::DriverFactory {
let inner: Box<dyn $crate::DriverFactory> =
Box::new(<$factory as ::core::default::Default>::default());
let rt_handle = NG_RUNTIME.as_ref().map(|rt| rt.handle().clone());
let wrapper: Box<dyn $crate::DriverFactory> =
Box::new($crate::ffi::RuntimeAwareDriverFactory::new(inner, $cap, rt_handle));
Box::into_raw(wrapper)
}
#[doc(hidden)]
pub static NG_RUNTIME: $crate::export::once_cell::sync::Lazy<Option<tokio::runtime::Runtime>> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::build_runtime(concat!($driver_type, "-driver")))
};
#[no_mangle]
pub extern "C" fn ng_driver_init_tracing(debug: bool) {
let Some(rt) = NG_RUNTIME.as_ref() else {
return;
};
$crate::log::init_driver_tracing(rt.handle().clone(), debug);
}
};
}
impl_downcast!(sync DriverFactory);
impl_downcast!(sync Driver);
impl_downcast!(sync RuntimeChannel);
impl_downcast!(sync RuntimeDevice);
impl_downcast!(sync RuntimePoint);
impl_downcast!(sync RuntimeAction);
impl_downcast!(sync RuntimeParameter);
impl_downcast!(sync DriverConfig);
#[derive(Debug, Clone)]
pub enum RuntimeDelta {
DevicesChanged {
added: Vec<Arc<dyn RuntimeDevice>>,
updated: Vec<Arc<dyn RuntimeDevice>>,
removed: Vec<Arc<dyn RuntimeDevice>>,
status_changed: Vec<(Arc<dyn RuntimeDevice>, Status)>,
},
PointsChanged {
device: Arc<dyn RuntimeDevice>,
added: Vec<Arc<dyn RuntimePoint>>,
updated: Vec<Arc<dyn RuntimePoint>>,
removed: Vec<Arc<dyn RuntimePoint>>,
},
ActionsChanged {
device: Arc<dyn RuntimeDevice>,
added: Vec<Arc<dyn RuntimeAction>>,
updated: Vec<Arc<dyn RuntimeAction>>,
removed: Vec<Arc<dyn RuntimeAction>>,
},
}
#[async_trait]
pub trait DriverFactory: DowncastSync + Send + Sync {
fn create_driver(&self, ctx: SouthwardInitContext) -> DriverResult<Box<dyn Driver>>;
fn convert_runtime_channel(
&self,
channel: ChannelModel,
) -> DriverResult<Arc<dyn RuntimeChannel>>;
fn convert_runtime_device(&self, device: DeviceModel) -> DriverResult<Arc<dyn RuntimeDevice>>;
fn convert_runtime_point(&self, point: PointModel) -> DriverResult<Arc<dyn RuntimePoint>>;
fn convert_runtime_action(&self, action: ActionModel) -> DriverResult<Arc<dyn RuntimeAction>>;
}
#[async_trait]
pub trait Driver: DowncastSync + Send + Sync {
async fn start(&self) -> DriverResult<()>;
async fn stop(&self) -> DriverResult<()>;
#[inline]
fn collection_group_key(&self, _device: &dyn RuntimeDevice) -> Option<CollectionGroupKey> {
None
}
async fn collect_data(&self, items: &[CollectItem]) -> DriverResult<Vec<NorthwardData>>;
#[inline]
fn collect_max_inflight(&self) -> usize {
1
}
async fn execute_action(
&self,
device: Arc<dyn RuntimeDevice>,
action: Arc<dyn RuntimeAction>,
parameters: Vec<(Arc<dyn RuntimeParameter>, NGValue)>,
) -> DriverResult<ExecuteResult>;
async fn write_point(
&self,
device: Arc<dyn RuntimeDevice>,
point: Arc<dyn RuntimePoint>,
value: &NGValue,
timeout_ms: Option<u64>,
) -> DriverResult<WriteResult>;
fn subscribe_connection_state(&self) -> watch::Receiver<Arc<ConnectionState>>;
async fn apply_runtime_delta(&self, _delta: RuntimeDelta) -> DriverResult<()> {
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct WriteResult {
pub outcome: WriteOutcome,
pub applied_value: Option<NGValue>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteOutcome {
Applied,
Queued,
}
pub trait DriverConfig: DowncastSync + Send + Sync + Debug {}
pub trait RuntimeChannel: DowncastSync + Send + Sync + Debug {
fn id(&self) -> i32;
fn name(&self) -> &str;
fn driver_id(&self) -> i32;
fn collection_type(&self) -> CollectionType;
fn report_type(&self) -> ReportType;
fn period(&self) -> Option<u32>;
fn status(&self) -> Status;
fn connection_policy(&self) -> &ConnectionPolicy;
fn config(&self) -> &dyn DriverConfig;
#[inline]
fn collectable(&self) -> bool {
matches!(self.collection_type(), CollectionType::Collection)
}
}
pub trait RuntimeDevice: DowncastSync + Send + Sync + Debug {
fn id(&self) -> i32;
fn device_name(&self) -> &str;
fn device_type(&self) -> &str;
fn channel_id(&self) -> i32;
fn status(&self) -> Status;
}
pub trait RuntimePoint: DowncastSync + Send + Sync + Debug {
fn id(&self) -> i32;
fn device_id(&self) -> i32;
fn name(&self) -> &str;
fn key(&self) -> &str;
fn r#type(&self) -> DataPointType;
fn data_type(&self) -> DataType;
fn access_mode(&self) -> AccessMode;
fn unit(&self) -> Option<&str>;
fn min_value(&self) -> Option<f64>;
fn max_value(&self) -> Option<f64>;
fn transform(&self) -> &Transform;
#[inline]
fn wire_data_type(&self) -> DataType {
self.data_type()
}
#[inline]
fn logical_data_type(&self) -> DataType {
self.transform().resolve_logical_datatype(self.data_type())
}
#[inline]
fn readable(&self) -> bool {
matches!(self.access_mode(), AccessMode::Read | AccessMode::ReadWrite)
}
#[inline]
fn writable(&self) -> bool {
matches!(
self.access_mode(),
AccessMode::Write | AccessMode::ReadWrite
)
}
}
pub trait RuntimeParameter: DowncastSync + Send + Sync + Debug {
fn name(&self) -> &str;
fn key(&self) -> &str;
fn data_type(&self) -> DataType;
fn required(&self) -> bool;
fn default_value(&self) -> Option<serde_json::Value>;
fn max_value(&self) -> Option<f64>;
fn min_value(&self) -> Option<f64>;
fn transform(&self) -> &Transform;
#[inline]
fn wire_data_type(&self) -> DataType {
self.data_type()
}
#[inline]
fn logical_data_type(&self) -> DataType {
self.transform().resolve_logical_datatype(self.data_type())
}
}
pub trait RuntimeAction: DowncastSync + Send + Sync + Debug {
fn id(&self) -> i32;
fn name(&self) -> &str;
fn device_id(&self) -> i32;
fn command(&self) -> &str;
fn input_parameters(&self) -> Vec<Arc<dyn RuntimeParameter>>;
}
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct CollectionGroupKey(pub [u8; 16]);
impl CollectionGroupKey {
#[inline]
pub fn from_u64(kind: u32, v: u64) -> Self {
let mut out = [0u8; 16];
out[0..4].copy_from_slice(&kind.to_be_bytes());
out[8..16].copy_from_slice(&v.to_be_bytes());
Self(out)
}
#[inline]
pub fn from_pair_u64(kind: u32, a: u64, b: u64) -> Self {
#[inline]
fn write_u48_be(dst: &mut [u8], v: u64) {
let x = v & 0x0000_FFFF_FFFF_FFFF;
dst[0] = ((x >> 40) & 0xFF) as u8;
dst[1] = ((x >> 32) & 0xFF) as u8;
dst[2] = ((x >> 24) & 0xFF) as u8;
dst[3] = ((x >> 16) & 0xFF) as u8;
dst[4] = ((x >> 8) & 0xFF) as u8;
dst[5] = (x & 0xFF) as u8;
}
let mut out = [0u8; 16];
out[0..4].copy_from_slice(&kind.to_be_bytes());
write_u48_be(&mut out[4..10], a);
write_u48_be(&mut out[10..16], b);
Self(out)
}
#[inline]
pub fn from_bytes(kind: u32, payload: [u8; 12]) -> Self {
let mut out = [0u8; 16];
out[0..4].copy_from_slice(&kind.to_be_bytes());
out[4..16].copy_from_slice(&payload);
Self(out)
}
#[inline]
pub fn from_hash128(kind: u32, hash: [u8; 16]) -> Self {
let mut payload = [0u8; 12];
payload.copy_from_slice(&hash[0..12]);
Self::from_bytes(kind, payload)
}
#[inline]
pub fn kind(&self) -> u32 {
u32::from_be_bytes([self.0[0], self.0[1], self.0[2], self.0[3]])
}
}
impl fmt::Debug for CollectionGroupKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"CollectionGroupKey(kind=0x{:08X}, payload=0x",
self.kind()
)?;
for b in &self.0[4..16] {
write!(f, "{:02X}", b)?;
}
write!(f, ")")
}
}
pub type CollectItem = (Arc<dyn RuntimeDevice>, Arc<[Arc<dyn RuntimePoint>]>);