pub mod buffer;
pub mod codec;
pub mod downlink;
pub mod envelope;
pub mod extension;
pub mod log;
pub mod mapping;
pub(crate) mod model;
pub mod payload;
pub mod probe;
pub mod runtime_api;
pub mod supervised;
pub mod template;
pub(crate) mod types;
use crate::{
supervision::{NoopObserverFactory, ObserverFactory},
ConnectionState, ExtensionStore, NorthwardResult,
};
use async_trait::async_trait;
use downcast_rs::{impl_downcast, DowncastSync};
use envelope::EnvelopeKind;
use model::{
AlarmData, AttributeData, ClientRpcResponse, Command, DeviceConnectedData,
DeviceDisconnectedData, ServerRpcResponse, TelemetryData, WritePoint, WritePointResponse,
};
use runtime_api::NorthwardRuntimeApi;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, watch};
#[derive(Clone)]
pub struct NorthwardInitContext {
pub extension_store: Arc<dyn ExtensionStore>,
pub app_id: i32,
pub app_name: String,
pub config: Arc<dyn PluginConfig>,
pub events_tx: mpsc::Sender<NorthwardEvent>,
pub runtime: Arc<dyn NorthwardRuntimeApi>,
pub retry_policy: crate::RetryPolicy,
pub observer_factory: Arc<dyn ObserverFactory>,
}
impl NorthwardInitContext {
#[inline]
pub fn with_noop_observer(mut self) -> Self {
self.observer_factory = Arc::new(NoopObserverFactory);
self
}
}
#[macro_export]
macro_rules! ng_plugin_factory {
(name = $name:expr, description = $description:expr, plugin_type = $plugin_type:expr, component = $component:ty, metadata_fn = $metadata_fn:path, model_convert = $model_convert:ty $(, channel_capacity = $cap:expr)? $(,)?) => {
struct __NgComponentPluginFactory {
model_convert: $model_convert,
}
impl __NgComponentPluginFactory {
#[inline]
fn new() -> Self {
Self {
model_convert: <$model_convert as ::core::default::Default>::default(),
}
}
}
impl ::core::default::Default for __NgComponentPluginFactory {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl $crate::PluginFactory for __NgComponentPluginFactory {
fn create_plugin(
&self,
ctx: $crate::NorthwardInitContext,
) -> $crate::NorthwardResult<Box<dyn $crate::Plugin>> {
fn __assert_handle_is_northward_handle<H: $crate::NorthwardHandle>() {}
__assert_handle_is_northward_handle::<<$component as $crate::supervision::Connector>::Handle>();
fn __assert_init_ctx_is_northward_init_context<C>()
where
C: $crate::supervision::Connector<InitContext = $crate::NorthwardInitContext>,
{
}
__assert_init_ctx_is_northward_init_context::<$component>();
use $crate::export::tracing::info_span;
let span = info_span!(
"northward-plugin",
app_id = ctx.app_id,
plugin_type = $plugin_type
);
let observer = ctx.observer_factory.create_northward(
$crate::supervision::NorthwardObserverLabels {
app_id: ctx.app_id,
plugin_kind: ::std::sync::Arc::<str>::from($plugin_type),
}
);
let retry_policy = ctx.retry_policy;
let connector = <$component as $crate::supervision::Connector>::new(ctx)?;
let params = $crate::supervision::SupervisorParams {
retry_policy,
reconnect_queue: 8,
};
let (loop_, _state_rx) = $crate::supervision::SupervisorLoop::new_with_span(
connector,
params,
observer,
span,
);
let plugin = $crate::SupervisedPlugin::new(loop_);
Ok(Box::new(plugin))
}
fn convert_plugin_config(
&self,
config: $crate::export::serde_json::Value,
) -> $crate::NorthwardResult<std::sync::Arc<dyn $crate::PluginConfig>> {
<$model_convert as $crate::supervision::converter::NorthwardModelConverter>::convert_plugin_config(
&self.model_convert,
config,
)
}
}
$crate::ng_plugin_factory!(
@core name = $name,
description = Some($description),
plugin_type = $plugin_type,
factory_ty = __NgComponentPluginFactory,
metadata_fn = $metadata_fn,
channel_capacity = 1024 $(+ $cap * 0 + $cap)?
);
};
(name = $name:expr, plugin_type = $plugin_type:expr, component = $component:ty, metadata_fn = $metadata_fn:path, model_convert = $model_convert:ty $(, channel_capacity = $cap:expr)? $(,)?) => {
struct __NgComponentPluginFactory {
model_convert: $model_convert,
}
impl __NgComponentPluginFactory {
#[inline]
fn new() -> Self {
Self {
model_convert: <$model_convert as ::core::default::Default>::default(),
}
}
}
impl ::core::default::Default for __NgComponentPluginFactory {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl $crate::PluginFactory for __NgComponentPluginFactory {
fn create_plugin(
&self,
ctx: $crate::NorthwardInitContext,
) -> $crate::NorthwardResult<Box<dyn $crate::Plugin>> {
fn __assert_handle_is_northward_handle<H: $crate::NorthwardHandle>() {}
__assert_handle_is_northward_handle::<<$component as $crate::supervision::Connector>::Handle>();
fn __assert_init_ctx_is_northward_init_context<C>()
where
C: $crate::supervision::Connector<InitContext = $crate::NorthwardInitContext>,
{
}
__assert_init_ctx_is_northward_init_context::<$component>();
use $crate::export::tracing::info_span;
let span = info_span!(
"northward-plugin",
app_id = ctx.app_id,
plugin_type = $plugin_type
);
let observer = ctx.observer_factory.create_northward(
$crate::supervision::NorthwardObserverLabels {
app_id: ctx.app_id,
plugin_kind: ::std::sync::Arc::<str>::from($plugin_type),
}
);
let retry_policy = ctx.retry_policy;
let connector = <$component as $crate::supervision::Connector>::new(ctx)?;
let params = $crate::supervision::SupervisorParams {
retry_policy,
reconnect_queue: 8,
};
let (loop_, _state_rx) = $crate::supervision::SupervisorLoop::new_with_span(
connector,
params,
observer,
span,
);
let plugin = $crate::SupervisedPlugin::new(loop_);
Ok(Box::new(plugin))
}
fn convert_plugin_config(
&self,
config: $crate::export::serde_json::Value,
) -> $crate::NorthwardResult<std::sync::Arc<dyn $crate::PluginConfig>> {
<$model_convert as $crate::supervision::model_convert::NorthwardModelConverter>::convert_plugin_config(
&self.model_convert,
config,
)
}
}
$crate::ng_plugin_factory!(
@core name = $name,
description = None,
plugin_type = $plugin_type,
factory_ty = __NgComponentPluginFactory,
metadata_fn = $metadata_fn,
channel_capacity = 1024 $(+ $cap * 0 + $cap)?
);
};
(@core name = $name:expr, description = $desc_opt:expr, plugin_type = $plugin_type:expr, factory_ty = $factory:ty, metadata_fn = $metadata_fn:path, channel_capacity = $cap:expr) => {
#[no_mangle]
pub extern "C" fn ng_plugin_api_version() -> u32 {
$crate::sdk::sdk_api_version()
}
#[no_mangle]
pub extern "C" fn ng_plugin_sdk_version() -> *const ::std::os::raw::c_char {
static SDK_VER: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized($crate::sdk::SDK_VERSION))
};
SDK_VER.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_plugin_version() -> *const ::std::os::raw::c_char {
static VER: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized(env!("CARGO_PKG_VERSION")))
};
VER.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_plugin_type() -> *const ::std::os::raw::c_char {
static TYPE_STR: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized($plugin_type))
};
TYPE_STR.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_plugin_name() -> *const ::std::os::raw::c_char {
static NAME_STR: $crate::export::once_cell::sync::Lazy<::std::ffi::CString> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::cstring_sanitized($name))
};
NAME_STR.as_ptr()
}
#[no_mangle]
pub extern "C" fn ng_plugin_description() -> *const ::std::os::raw::c_char {
static DESC_STR: $crate::export::once_cell::sync::Lazy<Option<::std::ffi::CString>> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $desc_opt.map($crate::ffi::cstring_sanitized))
};
match DESC_STR.as_ref() {
Some(c) => c.as_ptr(),
None => ::std::ptr::null(),
}
}
#[doc(hidden)]
pub static NG_PLUGIN_METADATA_JSON: $crate::export::once_cell::sync::Lazy<Vec<u8>> = {
use $crate::export::once_cell::sync::Lazy;
use $crate::export::serde_json;
Lazy::new(|| {
let md: $crate::PluginConfigSchemas = $metadata_fn();
serde_json::to_vec(&md).unwrap_or_else(|_| ::std::vec::Vec::new())
})
};
#[no_mangle]
pub unsafe extern "C" fn ng_plugin_metadata_json_ptr(
out_ptr: *mut *const u8,
out_len: *mut usize,
) {
$crate::ffi::write_slice_ptr_len(out_ptr, out_len, &NG_PLUGIN_METADATA_JSON);
}
#[no_mangle]
pub extern "C" fn ng_plugin_set_log_sink(sink: $crate::log::LogSinkV1) -> u32 {
$crate::northward::log::set_log_sink(sink)
}
#[no_mangle]
pub extern "C" fn ng_plugin_set_max_level(level: u8) -> u32 {
$crate::northward::log::set_max_level(level)
}
#[no_mangle]
pub extern "C" fn create_plugin_factory() -> *mut dyn $crate::PluginFactory {
let inner: Box<dyn $crate::PluginFactory> =
Box::new(<$factory as ::core::default::Default>::default());
let rt_handle = NG_RUNTIME.as_ref().map(|rt| rt.handle().clone());
let wrapper: Box<dyn $crate::PluginFactory> =
Box::new($crate::ffi::RuntimeAwarePluginFactory::new(inner, $cap, rt_handle));
Box::into_raw(wrapper)
}
#[doc(hidden)]
pub static NG_RUNTIME: $crate::export::once_cell::sync::Lazy<Option<tokio::runtime::Runtime>> = {
use $crate::export::once_cell::sync::Lazy;
Lazy::new(|| $crate::ffi::build_runtime(concat!($plugin_type, "-plugin")))
};
#[no_mangle]
pub extern "C" fn ng_plugin_init_tracing(debug: bool) {
let handle = NG_RUNTIME
.as_ref()
.map(|rt| rt.handle().clone())
.or_else(|| tokio::runtime::Handle::try_current().ok());
if let Some(h) = handle {
$crate::northward::log::init_plugin_tracing(h, debug);
}
}
};
}
impl_downcast!(sync NorthwardPublisher);
impl_downcast!(sync PluginFactory);
impl_downcast!(sync Plugin);
impl_downcast!(sync PluginConfig);
pub trait NorthwardPublisher: DowncastSync + Send + Sync + Debug {
fn try_publish(&self, data: Arc<NorthwardData>) -> NorthwardResult<()>;
}
pub trait PluginFactory: DowncastSync + Send + Sync {
fn create_plugin(&self, ctx: NorthwardInitContext) -> NorthwardResult<Box<dyn Plugin>>;
fn convert_plugin_config(
&self,
config: serde_json::Value,
) -> NorthwardResult<Arc<dyn PluginConfig>>;
}
pub trait PluginConfig: DowncastSync + Send + Sync + Debug {}
#[async_trait]
pub trait Plugin: DowncastSync + Send + Sync {
async fn start(&self) -> NorthwardResult<()>;
fn subscribe_connection_state(&self) -> watch::Receiver<Arc<ConnectionState>>;
async fn process_data(&self, data: Arc<NorthwardData>) -> NorthwardResult<()>;
async fn stop(&self) -> NorthwardResult<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NorthwardData {
DeviceConnected(DeviceConnectedData),
DeviceDisconnected(DeviceDisconnectedData),
Telemetry(TelemetryData),
Attributes(AttributeData),
Alarm(AlarmData),
RpcResponse(ClientRpcResponse),
WritePointResponse(WritePointResponse),
}
impl NorthwardData {
#[inline]
pub fn envelope_kind(&self) -> EnvelopeKind {
match self {
NorthwardData::DeviceConnected(_) => EnvelopeKind::DeviceConnected,
NorthwardData::DeviceDisconnected(_) => EnvelopeKind::DeviceDisconnected,
NorthwardData::Telemetry(_) => EnvelopeKind::Telemetry,
NorthwardData::Attributes(_) => EnvelopeKind::Attributes,
NorthwardData::Alarm(_) => EnvelopeKind::Alarm,
NorthwardData::RpcResponse(_) => EnvelopeKind::RpcResponse,
NorthwardData::WritePointResponse(_) => EnvelopeKind::WritePointResponse,
}
}
}
impl NorthwardData {
pub fn device_id(&self) -> i32 {
match self {
NorthwardData::DeviceConnected(data) => data.device_id,
NorthwardData::DeviceDisconnected(data) => data.device_id,
NorthwardData::Telemetry(data) => data.device_id,
NorthwardData::Attributes(data) => data.device_id,
NorthwardData::Alarm(data) => data.device_id,
NorthwardData::RpcResponse(data) => data.device_id,
NorthwardData::WritePointResponse(data) => data.device_id,
}
}
}
pub type EventReceiver = broadcast::Receiver<NorthwardEvent>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NorthwardEvent {
RpcResponseReceived(ServerRpcResponse),
CommandReceived(Command),
WritePoint(WritePoint),
}
impl NorthwardEvent {
#[inline]
pub fn envelope_kind(&self) -> EnvelopeKind {
match self {
NorthwardEvent::RpcResponseReceived(_) => EnvelopeKind::RpcResponseReceived,
NorthwardEvent::CommandReceived(_) => EnvelopeKind::CommandReceived,
NorthwardEvent::WritePoint(_) => EnvelopeKind::WritePoint,
}
}
}