1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2
3use std::fmt;
4use std::sync::Arc;
5use std::time::Duration;
6
7#[cfg(feature = "rpc")]
8pub use async_trait::async_trait;
9
10pub const OP_NOP: u8 = 0x00;
11pub const OP_PUBLISH: u8 = 0x01;
12pub const OP_SUBSCRIBE: u8 = 0x02;
13pub const OP_UNSUBSCRIBE: u8 = 0x03;
14pub const OP_EXCLUDE: u8 = 0x04;
15pub const OP_UNEXCLUDE: u8 = 0x05;
16pub const OP_PUBLISH_FOR: u8 = 0x06;
17pub const OP_MESSAGE: u8 = 0x12;
18pub const OP_BROADCAST: u8 = 0x13;
19pub const OP_ACK: u8 = 0xFE;
20
21pub const PROTOCOL_VERSION: u16 = 0x01;
22
23pub const RESPONSE_OK: u8 = 0x01;
24
25pub const PING_FRAME: &[u8] = &[0, 0, 0, 0, 0, 0, 0, 0, 0];
26
27pub const ERR_CLIENT_NOT_REGISTERED: u8 = 0x71;
28pub const ERR_DATA: u8 = 0x72;
29pub const ERR_IO: u8 = 0x73;
30pub const ERR_OTHER: u8 = 0x74;
31pub const ERR_NOT_SUPPORTED: u8 = 0x75;
32pub const ERR_BUSY: u8 = 0x76;
33pub const ERR_NOT_DELIVERED: u8 = 0x77;
34pub const ERR_TIMEOUT: u8 = 0x78;
35pub const ERR_ACCESS: u8 = 0x79;
36
37pub const GREETINGS: [u8; 1] = [0xEB];
38
39pub const VERSION: &str = env!("CARGO_PKG_VERSION");
40
41pub static AUTHOR: &str = "(c) 2022 Bohemia Automation / Altertech";
42
43pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
44pub const DEFAULT_BUF_TTL: Duration = Duration::from_micros(10);
45pub const DEFAULT_BUF_SIZE: usize = 8192;
46
47pub const DEFAULT_QUEUE_SIZE: usize = 8192;
48
49pub const SECONDARY_SEP: &str = "%%";
50
51#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
70pub type OpConfirm = Option<tokio::sync::oneshot::Receiver<Result<(), Error>>>;
71pub type Frame = Arc<FrameData>;
72pub type EventChannel = async_channel::Receiver<Frame>;
73
74#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), feature = "rt"))]
75type RawMutex = rtsc::pi::RawMutex;
76#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), feature = "rt"))]
77type Condvar = rtsc::pi::Condvar;
78#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), not(feature = "rt")))]
79type RawMutex = parking_lot::RawMutex;
80#[cfg(all(any(feature = "ipc-sync", feature = "rpc-sync"), not(feature = "rt")))]
81type Condvar = parking_lot::Condvar;
82
83#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
84pub type SyncEventChannel = rtsc::channel::Receiver<Frame, RawMutex, Condvar>;
85#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
86type SyncEventSender = rtsc::channel::Sender<Frame, RawMutex, Condvar>;
87
88#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
89pub type SyncOpConfirm = Option<oneshot::Receiver<Result<(), Error>>>;
90
91#[derive(Debug, Eq, PartialEq, Copy, Clone)]
92#[repr(u8)]
93pub enum ErrorKind {
94 NotRegistered = ERR_CLIENT_NOT_REGISTERED,
95 NotSupported = ERR_NOT_SUPPORTED,
96 Io = ERR_IO,
97 Timeout = ERR_TIMEOUT,
98 Data = ERR_DATA,
99 Busy = ERR_BUSY,
100 NotDelivered = ERR_NOT_DELIVERED,
101 Access = ERR_ACCESS,
102 Other = ERR_OTHER,
103 Eof = 0xff,
104}
105
106impl From<u8> for ErrorKind {
107 fn from(code: u8) -> Self {
108 match code {
109 ERR_CLIENT_NOT_REGISTERED => ErrorKind::NotRegistered,
110 ERR_NOT_SUPPORTED => ErrorKind::NotSupported,
111 ERR_IO => ErrorKind::Io,
112 ERR_DATA => ErrorKind::Data,
113 ERR_BUSY => ErrorKind::Busy,
114 ERR_NOT_DELIVERED => ErrorKind::NotDelivered,
115 ERR_ACCESS => ErrorKind::Access,
116 _ => ErrorKind::Other,
117 }
118 }
119}
120
121impl fmt::Display for ErrorKind {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 write!(
124 f,
125 "{}",
126 match self {
127 ErrorKind::NotRegistered => "Client not registered",
128 ErrorKind::NotSupported => "Feature not supported",
129 ErrorKind::Io => "I/O Error",
130 ErrorKind::Timeout => "Timeout",
131 ErrorKind::Data => "Data Error",
132 ErrorKind::Busy => "Busy",
133 ErrorKind::NotDelivered => "Frame not delivered",
134 ErrorKind::Other => "Error",
135 ErrorKind::Access => "Access denied",
136 ErrorKind::Eof => "Eof",
137 }
138 )
139 }
140}
141
142#[derive(Debug)]
143pub struct Error {
144 kind: ErrorKind,
145 message: Option<String>,
146}
147
148impl fmt::Display for Error {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 if let Some(ref message) = self.message {
151 write!(f, "{}: {}", self.kind, message)
152 } else {
153 write!(f, "{}", self.kind)
154 }
155 }
156}
157
158impl std::error::Error for Error {}
159
160impl Error {
161 #[inline]
162 pub fn new(kind: ErrorKind, message: Option<impl fmt::Display>) -> Self {
163 Self {
164 kind,
165 message: message.map(|m| m.to_string()),
166 }
167 }
168 #[inline]
169 pub fn io(e: impl fmt::Display) -> Self {
170 Self {
171 kind: ErrorKind::Io,
172 message: Some(e.to_string()),
173 }
174 }
175 #[inline]
176 pub fn data(e: impl fmt::Display) -> Self {
177 Self {
178 kind: ErrorKind::Data,
179 message: Some(e.to_string()),
180 }
181 }
182 #[inline]
183 pub fn access(e: impl fmt::Display) -> Self {
184 Self {
185 kind: ErrorKind::Access,
186 message: Some(e.to_string()),
187 }
188 }
189 #[inline]
190 pub fn not_supported(e: impl fmt::Display) -> Self {
191 Self {
192 kind: ErrorKind::NotSupported,
193 message: Some(e.to_string()),
194 }
195 }
196 #[inline]
197 pub fn not_registered() -> Self {
198 Self {
199 kind: ErrorKind::NotRegistered,
200 message: None,
201 }
202 }
203 #[inline]
204 pub fn not_delivered() -> Self {
205 Self {
206 kind: ErrorKind::NotDelivered,
207 message: None,
208 }
209 }
210 #[inline]
211 pub fn timeout() -> Self {
212 Self {
213 kind: ErrorKind::Timeout,
214 message: None,
215 }
216 }
217 #[inline]
218 pub fn busy(e: impl fmt::Display) -> Self {
219 Self {
220 kind: ErrorKind::Busy,
221 message: Some(e.to_string()),
222 }
223 }
224 #[inline]
225 pub fn kind(&self) -> ErrorKind {
226 self.kind
227 }
228}
229
230pub trait IntoBusRtResult {
231 fn to_busrt_result(self) -> Result<(), Error>;
232}
233
234impl IntoBusRtResult for u8 {
235 #[inline]
236 fn to_busrt_result(self) -> Result<(), Error> {
237 if self == RESPONSE_OK {
238 Ok(())
239 } else {
240 Err(Error {
241 kind: self.into(),
242 message: None,
243 })
244 }
245 }
246}
247
248#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
249impl From<tokio::time::error::Elapsed> for Error {
250 fn from(_e: tokio::time::error::Elapsed) -> Error {
251 Error::timeout()
252 }
253}
254
255impl From<std::io::Error> for Error {
256 fn from(e: std::io::Error) -> Error {
257 if e.kind() == std::io::ErrorKind::UnexpectedEof
258 || e.kind() == std::io::ErrorKind::BrokenPipe
259 || e.kind() == std::io::ErrorKind::ConnectionReset
260 {
261 Error {
262 kind: ErrorKind::Eof,
263 message: None,
264 }
265 } else {
266 Error::io(e)
267 }
268 }
269}
270
271impl From<&std::io::Error> for Error {
272 fn from(e: &std::io::Error) -> Error {
273 if e.kind() == std::io::ErrorKind::UnexpectedEof
274 || e.kind() == std::io::ErrorKind::BrokenPipe
275 || e.kind() == std::io::ErrorKind::ConnectionReset
276 {
277 Error {
278 kind: ErrorKind::Eof,
279 message: None,
280 }
281 } else {
282 Error::io(e)
283 }
284 }
285}
286
287impl From<std::str::Utf8Error> for Error {
288 fn from(e: std::str::Utf8Error) -> Error {
289 Error::data(e)
290 }
291}
292
293impl From<std::array::TryFromSliceError> for Error {
294 fn from(e: std::array::TryFromSliceError) -> Error {
295 Error::data(e)
296 }
297}
298
299impl<T> From<async_channel::SendError<T>> for Error {
300 fn from(_e: async_channel::SendError<T>) -> Error {
301 Error {
302 kind: ErrorKind::Eof,
303 message: None,
304 }
305 }
306}
307
308#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
309impl From<tokio::sync::oneshot::error::RecvError> for Error {
310 fn from(_e: tokio::sync::oneshot::error::RecvError) -> Error {
311 Error {
312 kind: ErrorKind::Eof,
313 message: None,
314 }
315 }
316}
317
318#[derive(Debug, Eq, PartialEq, Copy, Clone)]
319#[repr(u8)]
320pub enum FrameOp {
321 Nop = OP_NOP,
322 Message = OP_MESSAGE,
323 Broadcast = OP_BROADCAST,
324 PublishTopic = OP_PUBLISH,
325 PublishTopicFor = OP_PUBLISH_FOR,
326 SubscribeTopic = OP_SUBSCRIBE,
327 UnsubscribeTopic = OP_UNSUBSCRIBE,
328 ExcludeTopic = OP_EXCLUDE,
329 UnexcludeTopic = OP_UNEXCLUDE,
332}
333
334impl TryFrom<u8> for FrameOp {
335 type Error = Error;
336 fn try_from(tp: u8) -> Result<Self, Error> {
337 match tp {
338 OP_NOP => Ok(FrameOp::Nop),
339 OP_MESSAGE => Ok(FrameOp::Message),
340 OP_BROADCAST => Ok(FrameOp::Broadcast),
341 OP_PUBLISH => Ok(FrameOp::PublishTopic),
342 OP_PUBLISH_FOR => Ok(FrameOp::PublishTopicFor),
343 OP_SUBSCRIBE => Ok(FrameOp::SubscribeTopic),
344 OP_UNSUBSCRIBE => Ok(FrameOp::UnsubscribeTopic),
345 OP_EXCLUDE => Ok(FrameOp::ExcludeTopic),
346 OP_UNEXCLUDE => Ok(FrameOp::UnexcludeTopic),
347 _ => Err(Error::data(format!("Invalid frame type: {}", tp))),
348 }
349 }
350}
351
352#[derive(Debug, Copy, Clone)]
353#[repr(u8)]
354pub enum QoS {
355 No = 0,
356 Processed = 1,
357 Realtime = 2,
358 RealtimeProcessed = 3,
359}
360
361impl QoS {
362 #[inline]
363 pub fn is_realtime(self) -> bool {
364 self as u8 & 0b10 != 0
365 }
366 #[inline]
367 pub fn needs_ack(self) -> bool {
368 self as u8 & 0b1 != 0
369 }
370}
371
372impl TryFrom<u8> for QoS {
373 type Error = Error;
374 fn try_from(q: u8) -> Result<Self, Error> {
375 match q {
376 0 => Ok(QoS::No),
377 1 => Ok(QoS::Processed),
378 2 => Ok(QoS::Realtime),
379 3 => Ok(QoS::RealtimeProcessed),
380 _ => Err(Error::data(format!("Invalid QoS: {}", q))),
381 }
382 }
383}
384
385#[derive(Debug, Eq, PartialEq, Copy, Clone)]
386#[repr(u8)]
387pub enum FrameKind {
388 Prepared = 0xff,
389 Message = OP_MESSAGE,
390 Broadcast = OP_BROADCAST,
391 Publish = OP_PUBLISH,
392 Acknowledge = OP_ACK,
393 Nop = OP_NOP,
394}
395
396impl TryFrom<u8> for FrameKind {
397 type Error = Error;
398 fn try_from(code: u8) -> Result<Self, Self::Error> {
399 match code {
400 OP_MESSAGE => Ok(FrameKind::Message),
401 OP_BROADCAST => Ok(FrameKind::Broadcast),
402 OP_PUBLISH => Ok(FrameKind::Publish),
403 OP_ACK => Ok(FrameKind::Acknowledge),
404 OP_NOP => Ok(FrameKind::Nop),
405 _ => Err(Error::data(format!("Invalid frame type: {:x}", code))),
406 }
407 }
408}
409
410#[derive(Debug)]
411pub struct FrameData {
412 kind: FrameKind,
413 sender: Option<String>,
414 topic: Option<String>,
415 header: Option<Vec<u8>>, buf: Vec<u8>,
417 payload_pos: usize,
418 realtime: bool,
419}
420
421impl FrameData {
422 #[inline]
423 pub fn new(
424 kind: FrameKind,
425 sender: Option<String>,
426 topic: Option<String>,
427 header: Option<Vec<u8>>,
428 buf: Vec<u8>,
429 payload_pos: usize,
430 realtime: bool,
431 ) -> Self {
432 Self {
433 kind,
434 sender,
435 topic,
436 header,
437 buf,
438 payload_pos,
439 realtime,
440 }
441 }
442 #[inline]
443 pub fn new_nop() -> Self {
444 Self {
445 kind: FrameKind::Nop,
446 sender: None,
447 topic: None,
448 header: None,
449 buf: Vec::new(),
450 payload_pos: 0,
451 realtime: false,
452 }
453 }
454 #[inline]
455 pub fn kind(&self) -> FrameKind {
456 self.kind
457 }
458 #[inline]
462 pub fn sender(&self) -> &str {
463 self.sender.as_ref().unwrap()
464 }
465 #[inline]
469 pub fn primary_sender(&self) -> &str {
470 let primary_sender = self.sender.as_ref().unwrap();
471 if let Some(pos) = primary_sender.find(SECONDARY_SEP) {
472 &primary_sender[..pos]
473 } else {
474 primary_sender
475 }
476 }
477 #[inline]
479 pub fn topic(&self) -> Option<&str> {
480 self.topic.as_deref()
481 }
482 #[inline]
485 pub fn payload(&self) -> &[u8] {
486 &self.buf[self.payload_pos..]
487 }
488 #[inline]
493 pub fn header(&self) -> Option<&[u8]> {
494 self.header.as_deref()
495 }
496 #[inline]
497 pub fn is_realtime(&self) -> bool {
498 self.realtime
499 }
500}
501
502pub mod borrow;
503pub mod common;
504pub mod tools {
505 #[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
506 pub mod pubsub;
507}
508
509#[cfg(feature = "broker")]
510pub mod broker;
511#[cfg(feature = "cursors")]
512pub mod cursors;
513#[cfg(feature = "ipc")]
514pub mod ipc;
515#[cfg(any(feature = "rpc", feature = "rpc-sync"))]
516pub mod rpc;
517#[cfg(any(feature = "ipc-sync", feature = "rpc-sync"))]
518pub mod sync;
519
520#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
521pub mod client;
522#[cfg(any(feature = "broker", feature = "ipc"))]
523pub mod comm;
524
525#[macro_export]
526macro_rules! empty_payload {
527 () => {
528 $crate::borrow::Cow::Borrowed(&[])
529 };
530}