1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2
3use std::fmt;
4use std::sync::Arc;
5use std::time::Duration;
6
7pub const OP_NOP: u8 = 0x00;
8pub const OP_PUBLISH: u8 = 0x01;
9pub const OP_SUBSCRIBE: u8 = 0x02;
10pub const OP_UNSUBSCRIBE: u8 = 0x03;
11pub const OP_MESSAGE: u8 = 0x12;
12pub const OP_BROADCAST: u8 = 0x13;
13pub const OP_ACK: u8 = 0xFE;
14
15pub const PROTOCOL_VERSION: u16 = 0x01;
16
17pub const RESPONSE_OK: u8 = 0x01;
18
19pub const PING_FRAME: &[u8] = &[0, 0, 0, 0, 0, 0, 0, 0, 0];
20
21pub const ERR_CLIENT_NOT_REGISTERED: u8 = 0x71;
22pub const ERR_DATA: u8 = 0x72;
23pub const ERR_IO: u8 = 0x73;
24pub const ERR_OTHER: u8 = 0x74;
25pub const ERR_NOT_SUPPORTED: u8 = 0x75;
26pub const ERR_BUSY: u8 = 0x76;
27pub const ERR_NOT_DELIVERED: u8 = 0x77;
28pub const ERR_TIMEOUT: u8 = 0x78;
29pub const ERR_ACCESS: u8 = 0x79;
30
31pub const GREETINGS: [u8; 1] = [0xEB];
32
33pub const VERSION: &str = env!("CARGO_PKG_VERSION");
34
35pub static AUTHOR: &str = "(c) 2022 Bohemia Automation / Altertech";
36
37pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
38pub const DEFAULT_BUF_TTL: Duration = Duration::from_micros(10);
39pub const DEFAULT_BUF_SIZE: usize = 8192;
40
41pub const DEFAULT_QUEUE_SIZE: usize = 8192;
42
43pub const SECONDARY_SEP: &str = "%%";
44
45pub type OpConfirm = Option<tokio::sync::oneshot::Receiver<Result<(), Error>>>;
64pub type Frame = Arc<FrameData>;
65pub type EventChannel = async_channel::Receiver<Frame>;
66
67#[derive(Debug, Eq, PartialEq, Copy, Clone)]
68#[repr(u8)]
69pub enum ErrorKind {
70 NotRegistered = ERR_CLIENT_NOT_REGISTERED,
71 NotSupported = ERR_NOT_SUPPORTED,
72 Io = ERR_IO,
73 Timeout = ERR_TIMEOUT,
74 Data = ERR_DATA,
75 Busy = ERR_BUSY,
76 NotDelivered = ERR_NOT_DELIVERED,
77 Access = ERR_ACCESS,
78 Other = ERR_OTHER,
79 Eof = 0xff,
80}
81
82impl From<u8> for ErrorKind {
83 fn from(code: u8) -> Self {
84 match code {
85 ERR_CLIENT_NOT_REGISTERED => ErrorKind::NotRegistered,
86 ERR_NOT_SUPPORTED => ErrorKind::NotSupported,
87 ERR_IO => ErrorKind::Io,
88 ERR_DATA => ErrorKind::Data,
89 ERR_BUSY => ErrorKind::Busy,
90 ERR_NOT_DELIVERED => ErrorKind::NotDelivered,
91 ERR_ACCESS => ErrorKind::Access,
92 _ => ErrorKind::Other,
93 }
94 }
95}
96
97impl fmt::Display for ErrorKind {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(
100 f,
101 "{}",
102 match self {
103 ErrorKind::NotRegistered => "Client not registered",
104 ErrorKind::NotSupported => "Feature not supported",
105 ErrorKind::Io => "I/O Error",
106 ErrorKind::Timeout => "Timeout",
107 ErrorKind::Data => "Data Error",
108 ErrorKind::Busy => "Busy",
109 ErrorKind::NotDelivered => "Frame not delivered",
110 ErrorKind::Other => "Error",
111 ErrorKind::Access => "Access denied",
112 ErrorKind::Eof => "Eof",
113 }
114 )
115 }
116}
117
118#[derive(Debug)]
119pub struct Error {
120 kind: ErrorKind,
121 message: Option<String>,
122}
123
124impl fmt::Display for Error {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 if let Some(ref message) = self.message {
127 write!(f, "{}: {}", self.kind, message)
128 } else {
129 write!(f, "{}", self.kind)
130 }
131 }
132}
133
134impl Error {
135 #[inline]
136 pub fn new(kind: ErrorKind, message: Option<impl fmt::Display>) -> Self {
137 Self {
138 kind,
139 message: message.map(|m| m.to_string()),
140 }
141 }
142 #[inline]
143 pub fn io(e: impl fmt::Display) -> Self {
144 Self {
145 kind: ErrorKind::Io,
146 message: Some(e.to_string()),
147 }
148 }
149 #[inline]
150 pub fn data(e: impl fmt::Display) -> Self {
151 Self {
152 kind: ErrorKind::Data,
153 message: Some(e.to_string()),
154 }
155 }
156 #[inline]
157 pub fn access(e: impl fmt::Display) -> Self {
158 Self {
159 kind: ErrorKind::Access,
160 message: Some(e.to_string()),
161 }
162 }
163 #[inline]
164 pub fn not_supported(e: impl fmt::Display) -> Self {
165 Self {
166 kind: ErrorKind::NotSupported,
167 message: Some(e.to_string()),
168 }
169 }
170 #[inline]
171 pub fn not_registered() -> Self {
172 Self {
173 kind: ErrorKind::NotRegistered,
174 message: None,
175 }
176 }
177 #[inline]
178 pub fn not_delivered() -> Self {
179 Self {
180 kind: ErrorKind::NotDelivered,
181 message: None,
182 }
183 }
184 #[inline]
185 pub fn timeout() -> Self {
186 Self {
187 kind: ErrorKind::Timeout,
188 message: None,
189 }
190 }
191 #[inline]
192 pub fn busy(e: impl fmt::Display) -> Self {
193 Self {
194 kind: ErrorKind::Busy,
195 message: Some(e.to_string()),
196 }
197 }
198 #[inline]
199 pub fn kind(&self) -> ErrorKind {
200 self.kind
201 }
202}
203
204pub trait IntoElbusResult {
205 fn to_elbus_result(self) -> Result<(), Error>;
206}
207
208impl IntoElbusResult for u8 {
209 #[inline]
210 fn to_elbus_result(self) -> Result<(), Error> {
211 if self == RESPONSE_OK {
212 Ok(())
213 } else {
214 Err(Error {
215 kind: self.into(),
216 message: None,
217 })
218 }
219 }
220}
221
222impl From<tokio::time::error::Elapsed> for Error {
223 fn from(_e: tokio::time::error::Elapsed) -> Error {
224 Error::timeout()
225 }
226}
227
228impl From<std::io::Error> for Error {
229 fn from(e: std::io::Error) -> Error {
230 if e.kind() == std::io::ErrorKind::UnexpectedEof
231 || e.kind() == std::io::ErrorKind::BrokenPipe
232 || e.kind() == std::io::ErrorKind::ConnectionReset
233 {
234 Error {
235 kind: ErrorKind::Eof,
236 message: None,
237 }
238 } else {
239 Error::io(e)
240 }
241 }
242}
243
244impl From<&std::io::Error> for Error {
245 fn from(e: &std::io::Error) -> Error {
246 if e.kind() == std::io::ErrorKind::UnexpectedEof
247 || e.kind() == std::io::ErrorKind::BrokenPipe
248 || e.kind() == std::io::ErrorKind::ConnectionReset
249 {
250 Error {
251 kind: ErrorKind::Eof,
252 message: None,
253 }
254 } else {
255 Error::io(e)
256 }
257 }
258}
259
260impl From<std::str::Utf8Error> for Error {
261 fn from(e: std::str::Utf8Error) -> Error {
262 Error::data(e)
263 }
264}
265
266impl From<std::array::TryFromSliceError> for Error {
267 fn from(e: std::array::TryFromSliceError) -> Error {
268 Error::data(e)
269 }
270}
271
272impl<T> From<async_channel::SendError<T>> for Error {
273 fn from(_e: async_channel::SendError<T>) -> Error {
274 Error {
275 kind: ErrorKind::Eof,
276 message: None,
277 }
278 }
279}
280
281impl From<tokio::sync::oneshot::error::RecvError> for Error {
282 fn from(_e: tokio::sync::oneshot::error::RecvError) -> Error {
283 Error {
284 kind: ErrorKind::Eof,
285 message: None,
286 }
287 }
288}
289
290#[derive(Debug, Eq, PartialEq, Copy, Clone)]
291#[repr(u8)]
292pub enum FrameOp {
293 Nop = OP_NOP,
294 Message = OP_MESSAGE,
295 Broadcast = OP_BROADCAST,
296 PublishTopic = OP_PUBLISH,
297 SubscribeTopic = OP_SUBSCRIBE,
298 UnsubscribeTopic = OP_UNSUBSCRIBE,
299}
300
301impl TryFrom<u8> for FrameOp {
302 type Error = Error;
303 fn try_from(tp: u8) -> Result<Self, Error> {
304 match tp {
305 OP_NOP => Ok(FrameOp::Nop),
306 OP_MESSAGE => Ok(FrameOp::Message),
307 OP_BROADCAST => Ok(FrameOp::Broadcast),
308 OP_PUBLISH => Ok(FrameOp::PublishTopic),
309 OP_SUBSCRIBE => Ok(FrameOp::SubscribeTopic),
310 OP_UNSUBSCRIBE => Ok(FrameOp::UnsubscribeTopic),
311 _ => Err(Error::data(format!("Invalid frame type: {}", tp))),
312 }
313 }
314}
315
316#[derive(Debug, Copy, Clone)]
317#[repr(u8)]
318pub enum QoS {
319 No = 0,
320 Processed = 1,
321 Realtime = 2,
322 RealtimeProcessed = 3,
323}
324
325impl QoS {
326 #[inline]
327 pub fn is_realtime(self) -> bool {
328 self as u8 & 0b10 != 0
329 }
330 #[inline]
331 pub fn needs_ack(self) -> bool {
332 self as u8 & 0b1 != 0
333 }
334}
335
336impl TryFrom<u8> for QoS {
337 type Error = Error;
338 fn try_from(q: u8) -> Result<Self, Error> {
339 match q {
340 0 => Ok(QoS::No),
341 1 => Ok(QoS::Processed),
342 2 => Ok(QoS::Realtime),
343 3 => Ok(QoS::RealtimeProcessed),
344 _ => Err(Error::data(format!("Invalid QoS: {}", q))),
345 }
346 }
347}
348
349#[derive(Debug, Eq, PartialEq, Copy, Clone)]
350#[repr(u8)]
351pub enum FrameKind {
352 Prepared = 0xff,
353 Message = OP_MESSAGE,
354 Broadcast = OP_BROADCAST,
355 Publish = OP_PUBLISH,
356 Acknowledge = OP_ACK,
357 Nop = OP_NOP,
358}
359
360impl TryFrom<u8> for FrameKind {
361 type Error = Error;
362 fn try_from(code: u8) -> Result<Self, Self::Error> {
363 match code {
364 OP_MESSAGE => Ok(FrameKind::Message),
365 OP_BROADCAST => Ok(FrameKind::Broadcast),
366 OP_PUBLISH => Ok(FrameKind::Publish),
367 OP_ACK => Ok(FrameKind::Acknowledge),
368 OP_NOP => Ok(FrameKind::Nop),
369 _ => Err(Error::data(format!("Invalid frame type: {:x}", code))),
370 }
371 }
372}
373
374#[derive(Debug)]
375pub struct FrameData {
376 kind: FrameKind,
377 sender: Option<String>,
378 topic: Option<String>,
379 header: Option<Vec<u8>>, buf: Vec<u8>,
381 payload_pos: usize,
382 realtime: bool,
383}
384
385impl FrameData {
386 #[inline]
387 pub fn new(
388 kind: FrameKind,
389 sender: Option<String>,
390 topic: Option<String>,
391 header: Option<Vec<u8>>,
392 buf: Vec<u8>,
393 payload_pos: usize,
394 realtime: bool,
395 ) -> Self {
396 Self {
397 kind,
398 sender,
399 topic,
400 header,
401 buf,
402 payload_pos,
403 realtime,
404 }
405 }
406 #[inline]
407 pub fn new_nop() -> Self {
408 Self {
409 kind: FrameKind::Nop,
410 sender: None,
411 topic: None,
412 header: None,
413 buf: Vec::new(),
414 payload_pos: 0,
415 realtime: false,
416 }
417 }
418 #[inline]
419 pub fn kind(&self) -> FrameKind {
420 self.kind
421 }
422 #[inline]
426 pub fn sender(&self) -> &str {
427 self.sender.as_ref().unwrap()
428 }
429 #[inline]
433 pub fn primary_sender(&self) -> &str {
434 let primary_sender = self.sender.as_ref().unwrap();
435 if let Some(pos) = primary_sender.find(SECONDARY_SEP) {
436 &primary_sender[..pos]
437 } else {
438 primary_sender
439 }
440 }
441 #[inline]
443 pub fn topic(&self) -> Option<&str> {
444 self.topic.as_deref()
445 }
446 #[inline]
449 pub fn payload(&self) -> &[u8] {
450 &self.buf[self.payload_pos..]
451 }
452 #[inline]
457 pub fn header(&self) -> Option<&[u8]> {
458 self.header.as_deref()
459 }
460 #[inline]
461 pub fn is_realtime(&self) -> bool {
462 self.realtime
463 }
464}
465
466pub mod borrow;
467pub mod common;
468pub mod tools {
469 #[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
470 pub mod pubsub;
471}
472
473#[cfg(feature = "broker")]
474pub mod broker;
475#[cfg(feature = "ipc")]
476pub mod ipc;
477#[cfg(feature = "rpc")]
478pub mod rpc;
479
480#[cfg(any(feature = "rpc", feature = "broker", feature = "ipc"))]
481pub mod client;
482#[cfg(any(feature = "broker", feature = "ipc"))]
483pub mod comm;