busrt/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2
3use std::fmt;
4use std::sync::Arc;
5use std::time::Duration;
6
7#[cfg(feature = "rpc")]
8pub use async_trait::async_trait;
9
10pub const OP_NOP: u8 = 0x00;
11pub const OP_PUBLISH: u8 = 0x01;
12pub const OP_SUBSCRIBE: u8 = 0x02;
13pub const OP_UNSUBSCRIBE: u8 = 0x03;
14pub const OP_EXCLUDE: u8 = 0x04;
15pub const OP_UNEXCLUDE: u8 = 0x05;
16pub const OP_PUBLISH_FOR: u8 = 0x06;
17pub const OP_MESSAGE: u8 = 0x12;
18pub const OP_BROADCAST: u8 = 0x13;
19pub const OP_ACK: u8 = 0xFE;
20
21pub const PROTOCOL_VERSION: u16 = 0x01;
22
23pub const RESPONSE_OK: u8 = 0x01;
24
25pub const PING_FRAME: &[u8] = &[0, 0, 0, 0, 0, 0, 0, 0, 0];
26
27pub const ERR_CLIENT_NOT_REGISTERED: u8 = 0x71;
28pub const ERR_DATA: u8 = 0x72;
29pub const ERR_IO: u8 = 0x73;
30pub const ERR_OTHER: u8 = 0x74;
31pub const ERR_NOT_SUPPORTED: u8 = 0x75;
32pub const ERR_BUSY: u8 = 0x76;
33pub const ERR_NOT_DELIVERED: u8 = 0x77;
34pub const ERR_TIMEOUT: u8 = 0x78;
35pub const ERR_ACCESS: u8 = 0x79;
36
37pub const GREETINGS: [u8; 1] = [0xEB];
38
39pub const VERSION: &str = env!("CARGO_PKG_VERSION");
40
41pub static AUTHOR: &str = "(c) 2022 Bohemia Automation / Altertech";
42
43pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
44pub const DEFAULT_BUF_TTL: Duration = Duration::from_micros(10);
45pub const DEFAULT_BUF_SIZE: usize = 8192;
46
47pub const DEFAULT_QUEUE_SIZE: usize = 8192;
48
49pub const SECONDARY_SEP: &str = "%%";
50
51/// When a frame is sent, methods do not wait for the result, but they return OpConfirm type to let
52/// the sender get the result if required.
53///
54/// When the frame is sent with QoS "processed", the Option contains Receiver<Result>
55///
56/// Example:
57///
58/// ```rust,ignore
59/// use busrt::QoS;
60///
61/// let result = client.send("target", payload, QoS::Processed).await.unwrap(); // get send result
62/// let confirm = result.unwrap(); // get OpConfirm
63/// let op_result = confirm.await.unwrap(); // receive the operation result
64/// match op_result {
65///     Ok(_) => { /* the server has confirmed that it had processed the message */ }
66///     Err(e) => { /* the server has returned an error */ }
67/// }
68/// ```
69#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
70pub type OpConfirm = Option<tokio::sync::oneshot::Receiver<Result<(), Error>>>;
71pub type Frame = Arc<FrameData>;
72pub type EventChannel = async_channel::Receiver<Frame>;
73
74#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), feature = "rt"))]
75type RawMutex = rtsc::pi::RawMutex;
76#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), feature = "rt"))]
77type Condvar = rtsc::pi::Condvar;
78#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), not(feature = "rt")))]
79type RawMutex = parking_lot::RawMutex;
80#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), not(feature = "rt")))]
81type Condvar = parking_lot::Condvar;
82
83#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
84pub type SyncEventChannel = rtsc::channel::Receiver<Frame, RawMutex, Condvar>;
85#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
86type SyncEventSender = rtsc::channel::Sender<Frame, RawMutex, Condvar>;
87
88#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
89pub type SyncOpConfirm = Option<oneshot::Receiver<Result<(), Error>>>;
90
91#[derive(Debug, Eq, PartialEq, Copy, Clone)]
92#[repr(u8)]
93pub enum ErrorKind {
94    NotRegistered = ERR_CLIENT_NOT_REGISTERED,
95    NotSupported = ERR_NOT_SUPPORTED,
96    Io = ERR_IO,
97    Timeout = ERR_TIMEOUT,
98    Data = ERR_DATA,
99    Busy = ERR_BUSY,
100    NotDelivered = ERR_NOT_DELIVERED,
101    Access = ERR_ACCESS,
102    Other = ERR_OTHER,
103    Eof = 0xff,
104}
105
106impl From<u8> for ErrorKind {
107    fn from(code: u8) -> Self {
108        match code {
109            ERR_CLIENT_NOT_REGISTERED => ErrorKind::NotRegistered,
110            ERR_NOT_SUPPORTED => ErrorKind::NotSupported,
111            ERR_IO => ErrorKind::Io,
112            ERR_DATA => ErrorKind::Data,
113            ERR_BUSY => ErrorKind::Busy,
114            ERR_NOT_DELIVERED => ErrorKind::NotDelivered,
115            ERR_ACCESS => ErrorKind::Access,
116            _ => ErrorKind::Other,
117        }
118    }
119}
120
121impl fmt::Display for ErrorKind {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        write!(
124            f,
125            "{}",
126            match self {
127                ErrorKind::NotRegistered => "Client not registered",
128                ErrorKind::NotSupported => "Feature not supported",
129                ErrorKind::Io => "I/O Error",
130                ErrorKind::Timeout => "Timeout",
131                ErrorKind::Data => "Data Error",
132                ErrorKind::Busy => "Busy",
133                ErrorKind::NotDelivered => "Frame not delivered",
134                ErrorKind::Other => "Error",
135                ErrorKind::Access => "Access denied",
136                ErrorKind::Eof => "Eof",
137            }
138        )
139    }
140}
141
142#[derive(Debug)]
143pub struct Error {
144    kind: ErrorKind,
145    message: Option<String>,
146}
147
148impl fmt::Display for Error {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        if let Some(ref message) = self.message {
151            write!(f, "{}: {}", self.kind, message)
152        } else {
153            write!(f, "{}", self.kind)
154        }
155    }
156}
157
158impl std::error::Error for Error {}
159
160impl Error {
161    #[inline]
162    pub fn new(kind: ErrorKind, message: Option<impl fmt::Display>) -> Self {
163        Self {
164            kind,
165            message: message.map(|m| m.to_string()),
166        }
167    }
168    #[inline]
169    pub fn io(e: impl fmt::Display) -> Self {
170        Self {
171            kind: ErrorKind::Io,
172            message: Some(e.to_string()),
173        }
174    }
175    #[inline]
176    pub fn data(e: impl fmt::Display) -> Self {
177        Self {
178            kind: ErrorKind::Data,
179            message: Some(e.to_string()),
180        }
181    }
182    #[inline]
183    pub fn access(e: impl fmt::Display) -> Self {
184        Self {
185            kind: ErrorKind::Access,
186            message: Some(e.to_string()),
187        }
188    }
189    #[inline]
190    pub fn not_supported(e: impl fmt::Display) -> Self {
191        Self {
192            kind: ErrorKind::NotSupported,
193            message: Some(e.to_string()),
194        }
195    }
196    #[inline]
197    pub fn not_registered() -> Self {
198        Self {
199            kind: ErrorKind::NotRegistered,
200            message: None,
201        }
202    }
203    #[inline]
204    pub fn not_delivered() -> Self {
205        Self {
206            kind: ErrorKind::NotDelivered,
207            message: None,
208        }
209    }
210    #[inline]
211    pub fn timeout() -> Self {
212        Self {
213            kind: ErrorKind::Timeout,
214            message: None,
215        }
216    }
217    #[inline]
218    pub fn busy(e: impl fmt::Display) -> Self {
219        Self {
220            kind: ErrorKind::Busy,
221            message: Some(e.to_string()),
222        }
223    }
224    #[inline]
225    pub fn kind(&self) -> ErrorKind {
226        self.kind
227    }
228}
229
230pub trait IntoBusRtResult {
231    fn to_busrt_result(self) -> Result<(), Error>;
232}
233
234impl IntoBusRtResult for u8 {
235    #[inline]
236    fn to_busrt_result(self) -> Result<(), Error> {
237        if self == RESPONSE_OK {
238            Ok(())
239        } else {
240            Err(Error {
241                kind: self.into(),
242                message: None,
243            })
244        }
245    }
246}
247
248#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
249impl From<tokio::time::error::Elapsed> for Error {
250    fn from(_e: tokio::time::error::Elapsed) -> Error {
251        Error::timeout()
252    }
253}
254
255impl From<std::io::Error> for Error {
256    fn from(e: std::io::Error) -> Error {
257        if e.kind() == std::io::ErrorKind::UnexpectedEof
258            || e.kind() == std::io::ErrorKind::BrokenPipe
259            || e.kind() == std::io::ErrorKind::ConnectionReset
260        {
261            Error {
262                kind: ErrorKind::Eof,
263                message: None,
264            }
265        } else {
266            Error::io(e)
267        }
268    }
269}
270
271impl From<&std::io::Error> for Error {
272    fn from(e: &std::io::Error) -> Error {
273        if e.kind() == std::io::ErrorKind::UnexpectedEof
274            || e.kind() == std::io::ErrorKind::BrokenPipe
275            || e.kind() == std::io::ErrorKind::ConnectionReset
276        {
277            Error {
278                kind: ErrorKind::Eof,
279                message: None,
280            }
281        } else {
282            Error::io(e)
283        }
284    }
285}
286
287impl From<std::str::Utf8Error> for Error {
288    fn from(e: std::str::Utf8Error) -> Error {
289        Error::data(e)
290    }
291}
292
293impl From<std::array::TryFromSliceError> for Error {
294    fn from(e: std::array::TryFromSliceError) -> Error {
295        Error::data(e)
296    }
297}
298
299impl<T> From<async_channel::SendError<T>> for Error {
300    fn from(_e: async_channel::SendError<T>) -> Error {
301        Error {
302            kind: ErrorKind::Eof,
303            message: None,
304        }
305    }
306}
307
308#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
309impl From<tokio::sync::oneshot::error::RecvError> for Error {
310    fn from(_e: tokio::sync::oneshot::error::RecvError) -> Error {
311        Error {
312            kind: ErrorKind::Eof,
313            message: None,
314        }
315    }
316}
317
318#[derive(Debug, Eq, PartialEq, Copy, Clone)]
319#[repr(u8)]
320pub enum FrameOp {
321    Nop = OP_NOP,
322    Message = OP_MESSAGE,
323    Broadcast = OP_BROADCAST,
324    PublishTopic = OP_PUBLISH,
325    PublishTopicFor = OP_PUBLISH_FOR,
326    SubscribeTopic = OP_SUBSCRIBE,
327    UnsubscribeTopic = OP_UNSUBSCRIBE,
328    ExcludeTopic = OP_EXCLUDE,
329    // not include but unexclude as it's clearly an opposite operation to exclude
330    // while include may be confused with subscribe
331    UnexcludeTopic = OP_UNEXCLUDE,
332}
333
334impl TryFrom<u8> for FrameOp {
335    type Error = Error;
336    fn try_from(tp: u8) -> Result<Self, Error> {
337        match tp {
338            OP_NOP => Ok(FrameOp::Nop),
339            OP_MESSAGE => Ok(FrameOp::Message),
340            OP_BROADCAST => Ok(FrameOp::Broadcast),
341            OP_PUBLISH => Ok(FrameOp::PublishTopic),
342            OP_PUBLISH_FOR => Ok(FrameOp::PublishTopicFor),
343            OP_SUBSCRIBE => Ok(FrameOp::SubscribeTopic),
344            OP_UNSUBSCRIBE => Ok(FrameOp::UnsubscribeTopic),
345            OP_EXCLUDE => Ok(FrameOp::ExcludeTopic),
346            OP_UNEXCLUDE => Ok(FrameOp::UnexcludeTopic),
347            _ => Err(Error::data(format!("Invalid frame type: {}", tp))),
348        }
349    }
350}
351
352#[derive(Debug, Copy, Clone)]
353#[repr(u8)]
354pub enum QoS {
355    No = 0,
356    Processed = 1,
357    Realtime = 2,
358    RealtimeProcessed = 3,
359}
360
361impl QoS {
362    #[inline]
363    pub fn is_realtime(self) -> bool {
364        self as u8 & 0b10 != 0
365    }
366    #[inline]
367    pub fn needs_ack(self) -> bool {
368        self as u8 & 0b1 != 0
369    }
370}
371
372impl TryFrom<u8> for QoS {
373    type Error = Error;
374    fn try_from(q: u8) -> Result<Self, Error> {
375        match q {
376            0 => Ok(QoS::No),
377            1 => Ok(QoS::Processed),
378            2 => Ok(QoS::Realtime),
379            3 => Ok(QoS::RealtimeProcessed),
380            _ => Err(Error::data(format!("Invalid QoS: {}", q))),
381        }
382    }
383}
384
385#[derive(Debug, Eq, PartialEq, Copy, Clone)]
386#[repr(u8)]
387pub enum FrameKind {
388    Prepared = 0xff,
389    Message = OP_MESSAGE,
390    Broadcast = OP_BROADCAST,
391    Publish = OP_PUBLISH,
392    Acknowledge = OP_ACK,
393    Nop = OP_NOP,
394}
395
396impl TryFrom<u8> for FrameKind {
397    type Error = Error;
398    fn try_from(code: u8) -> Result<Self, Self::Error> {
399        match code {
400            OP_MESSAGE => Ok(FrameKind::Message),
401            OP_BROADCAST => Ok(FrameKind::Broadcast),
402            OP_PUBLISH => Ok(FrameKind::Publish),
403            OP_ACK => Ok(FrameKind::Acknowledge),
404            OP_NOP => Ok(FrameKind::Nop),
405            _ => Err(Error::data(format!("Invalid frame type: {:x}", code))),
406        }
407    }
408}
409
410#[derive(Debug)]
411pub struct FrameData {
412    kind: FrameKind,
413    sender: Option<String>,
414    topic: Option<String>,
415    header: Option<Vec<u8>>, // zero-copy payload prefix
416    buf: Vec<u8>,
417    payload_pos: usize,
418    realtime: bool,
419}
420
421impl FrameData {
422    #[inline]
423    pub fn new(
424        kind: FrameKind,
425        sender: Option<String>,
426        topic: Option<String>,
427        header: Option<Vec<u8>>,
428        buf: Vec<u8>,
429        payload_pos: usize,
430        realtime: bool,
431    ) -> Self {
432        Self {
433            kind,
434            sender,
435            topic,
436            header,
437            buf,
438            payload_pos,
439            realtime,
440        }
441    }
442    #[inline]
443    pub fn new_nop() -> Self {
444        Self {
445            kind: FrameKind::Nop,
446            sender: None,
447            topic: None,
448            header: None,
449            buf: Vec::new(),
450            payload_pos: 0,
451            realtime: false,
452        }
453    }
454    #[inline]
455    pub fn kind(&self) -> FrameKind {
456        self.kind
457    }
458    /// # Panics
459    ///
460    /// Will panic if called for a prepared frame
461    #[inline]
462    pub fn sender(&self) -> &str {
463        self.sender.as_ref().unwrap()
464    }
465    /// # Panics
466    ///
467    /// Will panic if called for a prepared frame
468    #[inline]
469    pub fn primary_sender(&self) -> &str {
470        let primary_sender = self.sender.as_ref().unwrap();
471        if let Some(pos) = primary_sender.find(SECONDARY_SEP) {
472            &primary_sender[..pos]
473        } else {
474            primary_sender
475        }
476    }
477    /// Filled for pub/sub communications
478    #[inline]
479    pub fn topic(&self) -> Option<&str> {
480        self.topic.as_deref()
481    }
482    /// To keep zero-copy model, frames contain the full incoming buffer + actual payload position.
483    /// Use this method to get the actual call payload.
484    #[inline]
485    pub fn payload(&self) -> &[u8] {
486        &self.buf[self.payload_pos..]
487    }
488    /// The header can be used by certain implementations (e.g. the default RPC layer) to
489    /// keep zero-copy model. The header is None for IPC communications, but filled for
490    /// inter-thread ones. A custom layer should use/parse the header to avoid unnecessary payload
491    /// copy
492    #[inline]
493    pub fn header(&self) -> Option<&[u8]> {
494        self.header.as_deref()
495    }
496    #[inline]
497    pub fn is_realtime(&self) -> bool {
498        self.realtime
499    }
500}
501
502pub mod borrow;
503pub mod common;
504pub mod tools {
505    #[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
506    pub mod pubsub;
507}
508
509#[cfg(feature = "broker")]
510pub mod broker;
511#[cfg(feature = "cursors")]
512pub mod cursors;
513#[cfg(feature = "ipc")]
514pub mod ipc;
515#[cfg(any(feature = "rpc", feature = "rpc-sync"))]
516pub mod rpc;
517#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
518pub mod sync;
519
520#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
521pub mod client;
522#[cfg(any(feature = "broker", feature = "ipc"))]
523pub mod comm;
524
525#[macro_export]
526macro_rules! empty_payload {
527    () => {
528        $crate::borrow::Cow::Borrowed(&[])
529    };
530}