Skip to main content

ng_gateway_sdk/northward/
model.rs

1use super::types::{AlarmSeverity, DropPolicy, TargetType};
2use crate::{AccessMode, DataType, NGValue, PointValue, Transform};
3use bytes::Bytes;
4use chrono::{DateTime, Duration, Utc};
5use sea_orm::FromJsonQueryResult;
6use serde::{Deserialize, Serialize};
7use std::{collections::HashMap, fmt::Debug, sync::Arc};
8use uuid::Uuid;
9
10/// Command message received from platform
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Command {
13    /// Command identifier
14    pub command_id: String,
15    /// Command type/name
16    pub key: String,
17    /// Target type
18    pub target_type: TargetType,
19    /// Device identifier
20    pub device_id: Option<i32>,
21    /// Device name
22    pub device_name: Option<String>,
23    /// Command parameters
24    pub params: Option<serde_json::Value>,
25    /// Command expiration time
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub timeout_ms: Option<u64>,
28    /// Timestamp when command was issued
29    pub timestamp: DateTime<Utc>,
30}
31
32impl Command {
33    /// Create new command
34    pub fn new(
35        command_id: String,
36        key: String,
37        target_type: TargetType,
38        device_id: i32,
39        device_name: String,
40        params: serde_json::Value,
41    ) -> Self {
42        Self {
43            command_id,
44            key,
45            target_type,
46            device_id: Some(device_id),
47            device_name: Some(device_name),
48            params: Some(params),
49            timeout_ms: None,
50            timestamp: Utc::now(),
51        }
52    }
53
54    /// Set command timeout
55    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
56        self.timeout_ms = Some(timeout_ms);
57        self
58    }
59
60    #[inline]
61    /// Check if command has expired
62    pub fn is_expired(&self) -> bool {
63        if let Some(expires_at) = self.timeout_ms {
64            Utc::now() > self.timestamp + Duration::milliseconds(expires_at as i64)
65        } else {
66            false
67        }
68    }
69}
70
71/// Write-point request (control-plane): a northward plugin asks Gateway to write a single point.
72///
73/// # Notes
74/// - `point_id` is the stable primary key in gateway runtime.
75/// - `value` uses `NGValue` to avoid JSON allocations on the hot path.
76/// - `timeout_ms` is an overall upper bound that gateway may split across queueing + driver I/O.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct WritePoint {
79    /// Request identifier for correlating async response.
80    pub request_id: String,
81    /// Target point identifier.
82    pub point_id: i32,
83    /// Value to write.
84    pub value: NGValue,
85    /// Timestamp when the request was created at the plugin boundary.
86    pub timestamp: DateTime<Utc>,
87    /// Optional overall timeout in milliseconds.
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub timeout_ms: Option<u64>,
90}
91
92/// Write-point response (Gateway -> northward plugin).
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct WritePointResponse {
95    /// Correlates to `WritePoint.request_id`.
96    pub request_id: String,
97    /// Target point identifier.
98    pub point_id: i32,
99    /// Target device identifier (for routing/logging convenience).
100    pub device_id: i32,
101    /// Target device name (stable snapshot at response time).
102    ///
103    /// # Notes
104    /// - This is provided for northward plugins so they can publish platform-facing
105    ///   payloads without performing extra runtime lookups.
106    /// - For hot paths, `Arc<str>` is used to keep clones cheap.
107    #[serde(with = "arc_str_serde")]
108    pub device_name: Arc<str>,
109    /// Point key used for platform payload mapping (stable snapshot at response time).
110    ///
111    /// # Notes
112    /// - `point_key` is the semantic identifier used by most northward protocols (e.g. MQTT JSON keys).
113    /// - This is included to support best-practice "reported state" publishing without additional lookups.
114    #[serde(with = "arc_str_serde")]
115    pub point_key: Arc<str>,
116    /// Unified status.
117    pub status: WritePointStatus,
118    /// Optional error details when failed.
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub error: Option<WritePointError>,
121    /// Applied value (optional). Gateway may echo the requested value on success.
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub applied_value: Option<NGValue>,
124    /// Completion time generated by gateway.
125    pub completed_at: DateTime<Utc>,
126}
127
128impl WritePointResponse {
129    #[inline]
130    pub fn success(
131        request_id: String,
132        point_id: i32,
133        device_id: i32,
134        device_name: Arc<str>,
135        point_key: Arc<str>,
136        applied_value: Option<NGValue>,
137        completed_at: DateTime<Utc>,
138    ) -> Self {
139        Self {
140            request_id,
141            point_id,
142            device_id,
143            device_name,
144            point_key,
145            status: WritePointStatus::Success,
146            error: None,
147            applied_value,
148            completed_at,
149        }
150    }
151
152    #[inline]
153    pub fn failed(
154        request_id: String,
155        point_id: i32,
156        device_id: i32,
157        device_name: Arc<str>,
158        point_key: Arc<str>,
159        error: WritePointError,
160        completed_at: DateTime<Utc>,
161    ) -> Self {
162        Self {
163            request_id,
164            point_id,
165            device_id,
166            device_name,
167            point_key,
168            status: WritePointStatus::Failed,
169            error: Some(error),
170            applied_value: None,
171            completed_at,
172        }
173    }
174}
175
176#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
177pub enum WritePointStatus {
178    Success,
179    Failed,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct WritePointError {
184    pub kind: WritePointErrorKind,
185    pub message: String,
186}
187
188impl WritePointError {
189    /// Create a [`WritePointError`] with a kind and a human-readable message.
190    ///
191    /// This helper is designed for hot-path code where we want to construct errors
192    /// without introducing additional allocation patterns beyond the final `String`.
193    #[inline]
194    pub fn new(kind: WritePointErrorKind, message: impl Into<String>) -> Self {
195        Self {
196            kind,
197            message: message.into(),
198        }
199    }
200}
201
202#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
203pub enum WritePointErrorKind {
204    // ===== Gateway validation =====
205    NotFound,
206    NotWriteable,
207    TypeMismatch,
208    OutOfRange,
209    NotConnected,
210    // ===== Queueing/serialization =====
211    QueueTimeout,
212    // ===== Driver execution =====
213    DriverError,
214}
215
216/// Device connected data message
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct DeviceConnectedData {
219    /// Device identifier
220    pub device_id: i32,
221    /// Device name
222    pub device_name: String,
223    /// Device type
224    pub device_type: String,
225}
226
227/// Device disconnected data message
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct DeviceDisconnectedData {
230    /// Device identifier
231    pub device_id: i32,
232    /// Device name
233    pub device_name: String,
234    /// Device type
235    pub device_type: String,
236}
237
238/// Telemetry data message
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct TelemetryData {
241    /// Device identifier
242    pub device_id: i32,
243    /// Device name
244    pub device_name: String,
245    /// Timestamp of the data
246    pub timestamp: DateTime<Utc>,
247    /// Telemetry values as point-id keyed updates.
248    ///
249    /// `point_id` is the primary key for all hot-path operations.
250    pub values: Vec<PointValue>,
251    /// Additional metadata
252    #[serde(default)]
253    pub metadata: HashMap<String, serde_json::Value>,
254}
255
256impl TelemetryData {
257    /// Create new telemetry data
258    pub fn new(device_id: i32, device_name: impl Into<String>, values: Vec<PointValue>) -> Self {
259        Self {
260            device_id,
261            device_name: device_name.into(),
262            timestamp: Utc::now(),
263            values,
264            metadata: HashMap::new(),
265        }
266    }
267
268    /// Add metadata to the telemetry data
269    pub fn with_metadata(mut self, metadata: HashMap<String, serde_json::Value>) -> Self {
270        self.metadata = metadata;
271        self
272    }
273
274    /// Serialize to JSON bytes with zero-copy optimization
275    pub fn to_json_bytes(&self) -> Result<Bytes, serde_json::Error> {
276        let json = serde_json::to_vec(self)?;
277        Ok(Bytes::from(json))
278    }
279}
280
281/// Attribute data message
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct AttributeData {
284    /// Device identifier
285    pub device_id: i32,
286    /// Device name
287    pub device_name: String,
288    /// Timestamp of the attributes
289    pub timestamp: DateTime<Utc>,
290    /// Client-side attributes (point-id keyed updates).
291    #[serde(default)]
292    pub client_attributes: Vec<PointValue>,
293    /// Shared attributes (point-id keyed updates).
294    #[serde(default)]
295    pub shared_attributes: Vec<PointValue>,
296    /// Server-side attributes (point-id keyed updates).
297    #[serde(default)]
298    pub server_attributes: Vec<PointValue>,
299}
300
301impl AttributeData {
302    /// Create new attribute data with client attributes
303    pub fn new_client_attributes(
304        device_id: i32,
305        device_name: impl Into<String>,
306        attributes: Vec<PointValue>,
307    ) -> Self {
308        Self {
309            device_id,
310            device_name: device_name.into(),
311            timestamp: Utc::now(),
312            client_attributes: attributes,
313            shared_attributes: Vec::new(),
314            server_attributes: Vec::new(),
315        }
316    }
317
318    /// Create new attribute data with shared attributes
319    pub fn new_shared_attributes(
320        device_id: i32,
321        device_name: impl Into<String>,
322        attributes: Vec<PointValue>,
323    ) -> Self {
324        Self {
325            device_id,
326            device_name: device_name.into(),
327            timestamp: Utc::now(),
328            client_attributes: Vec::new(),
329            shared_attributes: attributes,
330            server_attributes: Vec::new(),
331        }
332    }
333
334    /// Serialize to JSON bytes
335    pub fn to_json_bytes(&self) -> Result<Bytes, serde_json::Error> {
336        let json = serde_json::to_vec(self)?;
337        Ok(Bytes::from(json))
338    }
339}
340
341/// RPC request message
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct RpcRequest {
344    /// Target type
345    pub target_type: TargetType,
346    /// Request identifier
347    pub request_id: Uuid,
348    /// Device identifier
349    pub device_id: i32,
350    /// Device name
351    pub device_name: String,
352    /// Request method
353    pub method: String,
354    /// Request parameters
355    pub params: Option<serde_json::Value>,
356}
357
358/// Server RPC response message
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct ServerRpcResponse {
361    /// Request identifier this response corresponds to
362    /// NOTE: Use String to align with platform-specific request id formats (e.g. ThingsBoard numeric ids)
363    pub request_id: String,
364    /// Target type
365    pub target_type: TargetType,
366    /// Response result (success)
367    #[serde(skip_serializing_if = "Option::is_none")]
368    pub result: Option<serde_json::Value>,
369    /// Error information (failure)
370    #[serde(skip_serializing_if = "Option::is_none")]
371    pub error: Option<String>,
372    /// Timestamp of the response
373    pub timestamp: DateTime<Utc>,
374}
375
376/// Client RPC response message
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct ClientRpcResponse {
379    /// Device identifier
380    pub device_id: i32,
381    /// Device name
382    #[serde(skip_serializing_if = "Option::is_none")]
383    pub device_name: Option<String>,
384    /// Request identifier this response corresponds to
385    /// NOTE: Use String to align with platform-specific request id formats (e.g. ThingsBoard numeric ids)
386    pub request_id: String,
387    /// Target type
388    pub target_type: TargetType,
389    /// Response result (success)
390    #[serde(skip_serializing_if = "Option::is_none")]
391    pub result: Option<serde_json::Value>,
392    /// Error information (failure)
393    #[serde(skip_serializing_if = "Option::is_none")]
394    pub error: Option<String>,
395    /// Timestamp of the response
396    pub timestamp: DateTime<Utc>,
397}
398
399impl ClientRpcResponse {
400    /// Create successful RPC response
401    pub fn success(
402        request_id: String,
403        target_type: TargetType,
404        device_id: i32,
405        device_name: Option<String>,
406        result: serde_json::Value,
407    ) -> Self {
408        Self {
409            request_id,
410            target_type,
411            device_id,
412            device_name,
413            result: Some(result),
414            error: None,
415            timestamp: Utc::now(),
416        }
417    }
418
419    /// Create error RPC response
420    pub fn error(
421        request_id: String,
422        target_type: TargetType,
423        device_id: i32,
424        device_name: Option<String>,
425        error: String,
426    ) -> Self {
427        Self {
428            request_id,
429            target_type,
430            device_id,
431            device_name,
432            result: None,
433            error: Some(error),
434            timestamp: Utc::now(),
435        }
436    }
437
438    pub fn is_success(&self) -> bool {
439        self.error.is_none()
440    }
441
442    /// Serialize to JSON bytes
443    pub fn to_json_bytes(&self) -> Result<Bytes, serde_json::Error> {
444        let json = serde_json::to_vec(self)?;
445        Ok(Bytes::from(json))
446    }
447}
448
449/// Alarm data message
450#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct AlarmData {
452    /// Device identifier
453    pub device_id: i32,
454    /// Device name
455    pub device_name: String,
456    /// Alarm type identifier
457    pub alarm_type: String,
458    /// Alarm severity level
459    pub severity: AlarmSeverity,
460    /// Human-readable alarm message
461    pub message: String,
462    /// Additional alarm details
463    pub details: HashMap<String, serde_json::Value>,
464    /// Timestamp when alarm was triggered
465    pub timestamp: DateTime<Utc>,
466    /// Whether the alarm has been cleared
467    pub cleared: bool,
468}
469
470impl AlarmData {
471    /// Create new alarm
472    pub fn new(
473        device_id: i32,
474        device_name: String,
475        alarm_type: String,
476        severity: AlarmSeverity,
477        message: String,
478    ) -> Self {
479        Self {
480            device_id,
481            device_name,
482            alarm_type,
483            severity,
484            message,
485            details: HashMap::new(),
486            timestamp: Utc::now(),
487            cleared: false,
488        }
489    }
490
491    /// Mark alarm as cleared
492    pub fn clear(mut self) -> Self {
493        self.cleared = true;
494        self
495    }
496
497    /// Add details to the alarm
498    pub fn with_details(mut self, details: HashMap<String, serde_json::Value>) -> Self {
499        self.details = details;
500        self
501    }
502}
503
504#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult)]
505#[serde(rename_all = "camelCase")]
506pub struct QueuePolicy {
507    #[serde(default = "QueuePolicy::default_capacity")]
508    pub capacity: u32,
509    #[serde(default = "QueuePolicy::default_drop_policy")]
510    pub drop_policy: DropPolicy,
511    #[serde(default = "QueuePolicy::default_block_duration_ms")]
512    pub block_duration: u64,
513    /// Enable buffer queue for unconnected state
514    #[serde(default = "QueuePolicy::default_buffer_enabled")]
515    pub buffer_enabled: bool,
516    /// Buffer queue capacity (max number of items to buffer)
517    #[serde(default = "QueuePolicy::default_buffer_capacity")]
518    pub buffer_capacity: u32,
519    /// Buffer expiration time in milliseconds (0 means no expiration)
520    #[serde(default = "QueuePolicy::default_buffer_expire_ms")]
521    pub buffer_expire_ms: u64,
522}
523
524impl QueuePolicy {
525    fn default_capacity() -> u32 {
526        1000
527    }
528
529    fn default_drop_policy() -> DropPolicy {
530        DropPolicy::Discard
531    }
532
533    fn default_block_duration_ms() -> u64 {
534        1000
535    }
536
537    fn default_buffer_enabled() -> bool {
538        true
539    }
540
541    fn default_buffer_capacity() -> u32 {
542        1000
543    }
544
545    fn default_buffer_expire_ms() -> u64 {
546        300_000 // 5 minutes
547    }
548}
549
550impl sea_orm::IntoActiveValue<QueuePolicy> for QueuePolicy {
551    fn into_active_value(self) -> sea_orm::ActiveValue<QueuePolicy> {
552        sea_orm::ActiveValue::Set(self)
553    }
554}
555
556/// Serde adapter for `Arc<str>` fields.
557///
558/// `Arc<str>` does not implement serde traits by default, but we want the type for
559/// low-allocation clones on hot paths. This adapter serializes it as a normal string.
560mod arc_str_serde {
561    use serde::{Deserialize, Deserializer, Serializer};
562    use std::sync::Arc;
563
564    pub fn serialize<S>(v: &Arc<str>, serializer: S) -> Result<S::Ok, S::Error>
565    where
566        S: Serializer,
567    {
568        serializer.serialize_str(v.as_ref())
569    }
570
571    pub fn deserialize<'de, D>(deserializer: D) -> Result<Arc<str>, D::Error>
572    where
573        D: Deserializer<'de>,
574    {
575        let s = String::deserialize(deserializer)?;
576        Ok(Arc::<str>::from(s))
577    }
578
579    pub mod option {
580        use super::*;
581        use serde::Deserialize;
582
583        pub fn serialize<S>(v: &Option<Arc<str>>, serializer: S) -> Result<S::Ok, S::Error>
584        where
585            S: Serializer,
586        {
587            match v {
588                Some(s) => serializer.serialize_some(s.as_ref()),
589                None => serializer.serialize_none(),
590            }
591        }
592
593        pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Arc<str>>, D::Error>
594        where
595            D: Deserializer<'de>,
596        {
597            let opt = Option::<String>::deserialize(deserializer)?;
598            Ok(opt.map(Arc::<str>::from))
599        }
600    }
601}
602
603/// Point metadata snapshot for northward consumption.
604///
605/// This struct is intended to be:
606/// - **Read-only** for plugins
607/// - **Cheap to clone** via `Arc<PointMeta>`
608/// - **Stable** across core internal refactors
609///
610/// All strings are stored as `Arc<str>` to reduce cloning cost.
611#[derive(Clone, Debug, Serialize, Deserialize)]
612pub struct PointMeta {
613    /// Point identifier (primary key).
614    pub point_id: i32,
615    /// Channel identifier.
616    pub channel_id: i32,
617    /// Channel name.
618    #[serde(with = "arc_str_serde")]
619    pub channel_name: Arc<str>,
620    /// Device identifier.
621    pub device_id: i32,
622    /// Device name.
623    #[serde(with = "arc_str_serde")]
624    pub device_name: Arc<str>,
625    /// Point display name.
626    #[serde(with = "arc_str_serde")]
627    pub point_name: Arc<str>,
628    /// Point key used for protocol encoding and UI display.
629    #[serde(with = "arc_str_serde")]
630    pub point_key: Arc<str>,
631    /// Strong data type definition for this point.
632    pub data_type: DataType,
633    /// Access mode for read/write validation.
634    pub access_mode: AccessMode,
635    /// Unit of measurement (optional).
636    #[serde(default, with = "arc_str_serde::option")]
637    pub unit: Option<Arc<str>>,
638    /// Minimum allowed engineering value (optional).
639    pub min_value: Option<f64>,
640    /// Maximum allowed engineering value (optional).
641    pub max_value: Option<f64>,
642    /// Logical-layer transform rules for this point.
643    ///
644    /// This is always present; identity semantics are defined by `Transform`.
645    #[serde(default)]
646    pub transform: Transform,
647    /// Human-readable description (optional).
648    #[serde(default, with = "arc_str_serde::option")]
649    pub description: Option<Arc<str>>,
650}
651
652impl PointMeta {
653    /// Get the wire data type (protocol-level, memory-layout semantics).
654    ///
655    /// For `PointMeta`, this is the configured `data_type` persisted in the gateway.
656    #[inline]
657    pub fn wire_data_type(&self) -> DataType {
658        self.data_type
659    }
660
661    /// Get the logical data type (northward-facing semantics).
662    ///
663    /// This is derived from `transform.transform_data_type` and falls back to the
664    /// wire data type when not configured.
665    #[inline]
666    pub fn logical_data_type(&self) -> DataType {
667        self.transform.resolve_logical_datatype(self.data_type)
668    }
669
670    #[inline]
671    pub fn readable(&self) -> bool {
672        matches!(self.access_mode, AccessMode::Read | AccessMode::ReadWrite)
673    }
674
675    #[inline]
676    pub fn writable(&self) -> bool {
677        matches!(self.access_mode, AccessMode::Write | AccessMode::ReadWrite)
678    }
679}