elbus/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2
3use std::fmt;
4use std::sync::Arc;
5use std::time::Duration;
6
7pub const OP_NOP: u8 = 0x00;
8pub const OP_PUBLISH: u8 = 0x01;
9pub const OP_SUBSCRIBE: u8 = 0x02;
10pub const OP_UNSUBSCRIBE: u8 = 0x03;
11pub const OP_MESSAGE: u8 = 0x12;
12pub const OP_BROADCAST: u8 = 0x13;
13pub const OP_ACK: u8 = 0xFE;
14
15pub const PROTOCOL_VERSION: u16 = 0x01;
16
17pub const RESPONSE_OK: u8 = 0x01;
18
19pub const PING_FRAME: &[u8] = &[0, 0, 0, 0, 0, 0, 0, 0, 0];
20
21pub const ERR_CLIENT_NOT_REGISTERED: u8 = 0x71;
22pub const ERR_DATA: u8 = 0x72;
23pub const ERR_IO: u8 = 0x73;
24pub const ERR_OTHER: u8 = 0x74;
25pub const ERR_NOT_SUPPORTED: u8 = 0x75;
26pub const ERR_BUSY: u8 = 0x76;
27pub const ERR_NOT_DELIVERED: u8 = 0x77;
28pub const ERR_TIMEOUT: u8 = 0x78;
29pub const ERR_ACCESS: u8 = 0x79;
30
31pub const GREETINGS: [u8; 1] = [0xEB];
32
33pub const VERSION: &str = env!("CARGO_PKG_VERSION");
34
35pub static AUTHOR: &str = "(c) 2022 Bohemia Automation / Altertech";
36
37pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
38pub const DEFAULT_BUF_TTL: Duration = Duration::from_micros(10);
39pub const DEFAULT_BUF_SIZE: usize = 8192;
40
41pub const DEFAULT_QUEUE_SIZE: usize = 8192;
42
43pub const SECONDARY_SEP: &str = "%%";
44
45/// When a frame is sent, methods do not wait for the result, but they return OpConfirm type to let
46/// the sender get the result if required.
47///
48/// When the frame is sent with QoS "processed", the Option contains Receiver<Result>
49///
50/// Example:
51///
52/// ```rust,ignore
53/// use elbus::QoS;
54///
55/// let result = client.send("target", payload, QoS::Processed).await.unwrap(); // get send result
56/// let confirm = result.unwrap(); // get OpConfirm
57/// let op_result = confirm.await.unwrap(); // receive the operation result
58/// match op_result {
59///     Ok(_) => { /* the server has confirmed that it had processed the message */ }
60///     Err(e) => { /* the server has returned an error */ }
61/// }
62/// ```
63pub type OpConfirm = Option<tokio::sync::oneshot::Receiver<Result<(), Error>>>;
64pub type Frame = Arc<FrameData>;
65pub type EventChannel = async_channel::Receiver<Frame>;
66
67#[derive(Debug, Eq, PartialEq, Copy, Clone)]
68#[repr(u8)]
69pub enum ErrorKind {
70    NotRegistered = ERR_CLIENT_NOT_REGISTERED,
71    NotSupported = ERR_NOT_SUPPORTED,
72    Io = ERR_IO,
73    Timeout = ERR_TIMEOUT,
74    Data = ERR_DATA,
75    Busy = ERR_BUSY,
76    NotDelivered = ERR_NOT_DELIVERED,
77    Access = ERR_ACCESS,
78    Other = ERR_OTHER,
79    Eof = 0xff,
80}
81
82impl From<u8> for ErrorKind {
83    fn from(code: u8) -> Self {
84        match code {
85            ERR_CLIENT_NOT_REGISTERED => ErrorKind::NotRegistered,
86            ERR_NOT_SUPPORTED => ErrorKind::NotSupported,
87            ERR_IO => ErrorKind::Io,
88            ERR_DATA => ErrorKind::Data,
89            ERR_BUSY => ErrorKind::Busy,
90            ERR_NOT_DELIVERED => ErrorKind::NotDelivered,
91            ERR_ACCESS => ErrorKind::Access,
92            _ => ErrorKind::Other,
93        }
94    }
95}
96
97impl fmt::Display for ErrorKind {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(
100            f,
101            "{}",
102            match self {
103                ErrorKind::NotRegistered => "Client not registered",
104                ErrorKind::NotSupported => "Feature not supported",
105                ErrorKind::Io => "I/O Error",
106                ErrorKind::Timeout => "Timeout",
107                ErrorKind::Data => "Data Error",
108                ErrorKind::Busy => "Busy",
109                ErrorKind::NotDelivered => "Frame not delivered",
110                ErrorKind::Other => "Error",
111                ErrorKind::Access => "Access denied",
112                ErrorKind::Eof => "Eof",
113            }
114        )
115    }
116}
117
118#[derive(Debug)]
119pub struct Error {
120    kind: ErrorKind,
121    message: Option<String>,
122}
123
124impl fmt::Display for Error {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        if let Some(ref message) = self.message {
127            write!(f, "{}: {}", self.kind, message)
128        } else {
129            write!(f, "{}", self.kind)
130        }
131    }
132}
133
134impl Error {
135    #[inline]
136    pub fn new(kind: ErrorKind, message: Option<impl fmt::Display>) -> Self {
137        Self {
138            kind,
139            message: message.map(|m| m.to_string()),
140        }
141    }
142    #[inline]
143    pub fn io(e: impl fmt::Display) -> Self {
144        Self {
145            kind: ErrorKind::Io,
146            message: Some(e.to_string()),
147        }
148    }
149    #[inline]
150    pub fn data(e: impl fmt::Display) -> Self {
151        Self {
152            kind: ErrorKind::Data,
153            message: Some(e.to_string()),
154        }
155    }
156    #[inline]
157    pub fn access(e: impl fmt::Display) -> Self {
158        Self {
159            kind: ErrorKind::Access,
160            message: Some(e.to_string()),
161        }
162    }
163    #[inline]
164    pub fn not_supported(e: impl fmt::Display) -> Self {
165        Self {
166            kind: ErrorKind::NotSupported,
167            message: Some(e.to_string()),
168        }
169    }
170    #[inline]
171    pub fn not_registered() -> Self {
172        Self {
173            kind: ErrorKind::NotRegistered,
174            message: None,
175        }
176    }
177    #[inline]
178    pub fn not_delivered() -> Self {
179        Self {
180            kind: ErrorKind::NotDelivered,
181            message: None,
182        }
183    }
184    #[inline]
185    pub fn timeout() -> Self {
186        Self {
187            kind: ErrorKind::Timeout,
188            message: None,
189        }
190    }
191    #[inline]
192    pub fn busy(e: impl fmt::Display) -> Self {
193        Self {
194            kind: ErrorKind::Busy,
195            message: Some(e.to_string()),
196        }
197    }
198    #[inline]
199    pub fn kind(&self) -> ErrorKind {
200        self.kind
201    }
202}
203
204pub trait IntoElbusResult {
205    fn to_elbus_result(self) -> Result<(), Error>;
206}
207
208impl IntoElbusResult for u8 {
209    #[inline]
210    fn to_elbus_result(self) -> Result<(), Error> {
211        if self == RESPONSE_OK {
212            Ok(())
213        } else {
214            Err(Error {
215                kind: self.into(),
216                message: None,
217            })
218        }
219    }
220}
221
222impl From<tokio::time::error::Elapsed> for Error {
223    fn from(_e: tokio::time::error::Elapsed) -> Error {
224        Error::timeout()
225    }
226}
227
228impl From<std::io::Error> for Error {
229    fn from(e: std::io::Error) -> Error {
230        if e.kind() == std::io::ErrorKind::UnexpectedEof
231            || e.kind() == std::io::ErrorKind::BrokenPipe
232            || e.kind() == std::io::ErrorKind::ConnectionReset
233        {
234            Error {
235                kind: ErrorKind::Eof,
236                message: None,
237            }
238        } else {
239            Error::io(e)
240        }
241    }
242}
243
244impl From<&std::io::Error> for Error {
245    fn from(e: &std::io::Error) -> Error {
246        if e.kind() == std::io::ErrorKind::UnexpectedEof
247            || e.kind() == std::io::ErrorKind::BrokenPipe
248            || e.kind() == std::io::ErrorKind::ConnectionReset
249        {
250            Error {
251                kind: ErrorKind::Eof,
252                message: None,
253            }
254        } else {
255            Error::io(e)
256        }
257    }
258}
259
260impl From<std::str::Utf8Error> for Error {
261    fn from(e: std::str::Utf8Error) -> Error {
262        Error::data(e)
263    }
264}
265
266impl From<std::array::TryFromSliceError> for Error {
267    fn from(e: std::array::TryFromSliceError) -> Error {
268        Error::data(e)
269    }
270}
271
272impl<T> From<async_channel::SendError<T>> for Error {
273    fn from(_e: async_channel::SendError<T>) -> Error {
274        Error {
275            kind: ErrorKind::Eof,
276            message: None,
277        }
278    }
279}
280
281impl From<tokio::sync::oneshot::error::RecvError> for Error {
282    fn from(_e: tokio::sync::oneshot::error::RecvError) -> Error {
283        Error {
284            kind: ErrorKind::Eof,
285            message: None,
286        }
287    }
288}
289
290#[derive(Debug, Eq, PartialEq, Copy, Clone)]
291#[repr(u8)]
292pub enum FrameOp {
293    Nop = OP_NOP,
294    Message = OP_MESSAGE,
295    Broadcast = OP_BROADCAST,
296    PublishTopic = OP_PUBLISH,
297    SubscribeTopic = OP_SUBSCRIBE,
298    UnsubscribeTopic = OP_UNSUBSCRIBE,
299}
300
301impl TryFrom<u8> for FrameOp {
302    type Error = Error;
303    fn try_from(tp: u8) -> Result<Self, Error> {
304        match tp {
305            OP_NOP => Ok(FrameOp::Nop),
306            OP_MESSAGE => Ok(FrameOp::Message),
307            OP_BROADCAST => Ok(FrameOp::Broadcast),
308            OP_PUBLISH => Ok(FrameOp::PublishTopic),
309            OP_SUBSCRIBE => Ok(FrameOp::SubscribeTopic),
310            OP_UNSUBSCRIBE => Ok(FrameOp::UnsubscribeTopic),
311            _ => Err(Error::data(format!("Invalid frame type: {}", tp))),
312        }
313    }
314}
315
316#[derive(Debug, Copy, Clone)]
317#[repr(u8)]
318pub enum QoS {
319    No = 0,
320    Processed = 1,
321    Realtime = 2,
322    RealtimeProcessed = 3,
323}
324
325impl QoS {
326    #[inline]
327    pub fn is_realtime(self) -> bool {
328        self as u8 & 0b10 != 0
329    }
330    #[inline]
331    pub fn needs_ack(self) -> bool {
332        self as u8 & 0b1 != 0
333    }
334}
335
336impl TryFrom<u8> for QoS {
337    type Error = Error;
338    fn try_from(q: u8) -> Result<Self, Error> {
339        match q {
340            0 => Ok(QoS::No),
341            1 => Ok(QoS::Processed),
342            2 => Ok(QoS::Realtime),
343            3 => Ok(QoS::RealtimeProcessed),
344            _ => Err(Error::data(format!("Invalid QoS: {}", q))),
345        }
346    }
347}
348
349#[derive(Debug, Eq, PartialEq, Copy, Clone)]
350#[repr(u8)]
351pub enum FrameKind {
352    Prepared = 0xff,
353    Message = OP_MESSAGE,
354    Broadcast = OP_BROADCAST,
355    Publish = OP_PUBLISH,
356    Acknowledge = OP_ACK,
357    Nop = OP_NOP,
358}
359
360impl TryFrom<u8> for FrameKind {
361    type Error = Error;
362    fn try_from(code: u8) -> Result<Self, Self::Error> {
363        match code {
364            OP_MESSAGE => Ok(FrameKind::Message),
365            OP_BROADCAST => Ok(FrameKind::Broadcast),
366            OP_PUBLISH => Ok(FrameKind::Publish),
367            OP_ACK => Ok(FrameKind::Acknowledge),
368            OP_NOP => Ok(FrameKind::Nop),
369            _ => Err(Error::data(format!("Invalid frame type: {:x}", code))),
370        }
371    }
372}
373
374#[derive(Debug)]
375pub struct FrameData {
376    kind: FrameKind,
377    sender: Option<String>,
378    topic: Option<String>,
379    header: Option<Vec<u8>>, // zero-copy payload prefix
380    buf: Vec<u8>,
381    payload_pos: usize,
382    realtime: bool,
383}
384
385impl FrameData {
386    #[inline]
387    pub fn new(
388        kind: FrameKind,
389        sender: Option<String>,
390        topic: Option<String>,
391        header: Option<Vec<u8>>,
392        buf: Vec<u8>,
393        payload_pos: usize,
394        realtime: bool,
395    ) -> Self {
396        Self {
397            kind,
398            sender,
399            topic,
400            header,
401            buf,
402            payload_pos,
403            realtime,
404        }
405    }
406    #[inline]
407    pub fn new_nop() -> Self {
408        Self {
409            kind: FrameKind::Nop,
410            sender: None,
411            topic: None,
412            header: None,
413            buf: Vec::new(),
414            payload_pos: 0,
415            realtime: false,
416        }
417    }
418    #[inline]
419    pub fn kind(&self) -> FrameKind {
420        self.kind
421    }
422    /// # Panics
423    ///
424    /// Will panic if called for a prepared frame
425    #[inline]
426    pub fn sender(&self) -> &str {
427        self.sender.as_ref().unwrap()
428    }
429    /// # Panics
430    ///
431    /// Will panic if called for a prepared frame
432    #[inline]
433    pub fn primary_sender(&self) -> &str {
434        let primary_sender = self.sender.as_ref().unwrap();
435        if let Some(pos) = primary_sender.find(SECONDARY_SEP) {
436            &primary_sender[..pos]
437        } else {
438            primary_sender
439        }
440    }
441    /// Filled for pub/sub communications
442    #[inline]
443    pub fn topic(&self) -> Option<&str> {
444        self.topic.as_deref()
445    }
446    /// To keep zero-copy model, frames contain the full incoming buffer + actual payload position.
447    /// Use this method to get the actual call payload.
448    #[inline]
449    pub fn payload(&self) -> &[u8] {
450        &self.buf[self.payload_pos..]
451    }
452    /// The header can be used by certain implementations (e.g. the default RPC layer) to
453    /// keep zero-copy model. The header is None for IPC communications, but filled for
454    /// inter-thread ones. A custom layer should use/parse the header to avoid unnecessary payload
455    /// copy
456    #[inline]
457    pub fn header(&self) -> Option<&[u8]> {
458        self.header.as_deref()
459    }
460    #[inline]
461    pub fn is_realtime(&self) -> bool {
462        self.realtime
463    }
464}
465
466pub mod borrow;
467pub mod common;
468pub mod tools {
469    #[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
470    pub mod pubsub;
471}
472
473#[cfg(feature = "broker")]
474pub mod broker;
475#[cfg(feature = "ipc")]
476pub mod ipc;
477#[cfg(feature = "rpc")]
478pub mod rpc;
479
480#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
481pub mod client;
482#[cfg(any(feature = "broker", feature = "ipc"))]
483pub mod comm;