1pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24 Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25 DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36 pub const OAM: u8 = 0x1f;
39 pub const DECLARE: u8 = 0x1e;
40 pub const PUSH: u8 = 0x1d;
41 pub const REQUEST: u8 = 0x1c;
42 pub const RESPONSE: u8 = 0x1b;
43 pub const RESPONSE_FINAL: u8 = 0x1a;
44 pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50 #[default]
51 Receiver = 0,
52 Sender = 1,
53}
54
55impl Mapping {
56 pub const DEFAULT: Self = Self::Receiver;
57
58 #[cfg(feature = "test")]
59 #[doc(hidden)]
60 pub fn rand() -> Self {
61 use rand::Rng;
62
63 let mut rng = rand::thread_rng();
64 if rng.gen_bool(0.5) {
65 Mapping::Sender
66 } else {
67 Mapping::Receiver
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum NetworkBody {
75 Push(Push),
76 Request(Request),
77 Response(Response),
78 ResponseFinal(ResponseFinal),
79 Interest(Interest),
80 Declare(Declare),
81 OAM(Oam),
82}
83
84#[derive(Debug, Copy, Clone, PartialEq, Eq)]
85pub enum NetworkBodyRef<'a> {
86 Push(&'a Push),
87 Request(&'a Request),
88 Response(&'a Response),
89 ResponseFinal(&'a ResponseFinal),
90 Interest(&'a Interest),
91 Declare(&'a Declare),
92 OAM(&'a Oam),
93}
94
95#[derive(Debug, PartialEq, Eq)]
96pub enum NetworkBodyMut<'a> {
97 Push(&'a mut Push),
98 Request(&'a mut Request),
99 Response(&'a mut Response),
100 ResponseFinal(&'a mut ResponseFinal),
101 Interest(&'a mut Interest),
102 Declare(&'a mut Declare),
103 OAM(&'a mut Oam),
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct NetworkMessage {
108 pub body: NetworkBody,
109 pub reliability: Reliability,
110}
111
112#[derive(Debug, Copy, Clone, PartialEq, Eq)]
113pub struct NetworkMessageRef<'a> {
114 pub body: NetworkBodyRef<'a>,
115 pub reliability: Reliability,
116}
117
118#[derive(Debug, PartialEq, Eq)]
119pub struct NetworkMessageMut<'a> {
120 pub body: NetworkBodyMut<'a>,
121 pub reliability: Reliability,
122}
123
124pub trait NetworkMessageExt {
125 #[doc(hidden)]
126 fn body(&self) -> NetworkBodyRef<'_>;
127
128 #[doc(hidden)]
129 fn reliability(&self) -> Reliability;
130
131 #[inline]
132 fn is_reliable(&self) -> bool {
133 self.reliability() == Reliability::Reliable
134 }
135
136 #[inline]
137 fn is_express(&self) -> bool {
138 match self.body() {
139 NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
140 NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
141 NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
142 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
143 NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
144 NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
145 NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
146 }
147 }
148
149 #[inline]
150 fn congestion_control(&self) -> CongestionControl {
151 match self.body() {
152 NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
153 NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
154 NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
155 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
156 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
157 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
158 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
159 }
160 }
161
162 #[inline]
163 #[cfg(feature = "shared-memory")]
164 fn is_shm(&self) -> bool {
165 use crate::zenoh::{PushBody, RequestBody, ResponseBody};
166
167 match self.body() {
168 NetworkBodyRef::Push(Push { payload, .. }) => match payload {
169 PushBody::Put(p) => p.ext_shm.is_some(),
170 PushBody::Del(_) => false,
171 },
172 NetworkBodyRef::Request(Request { payload, .. }) => match payload {
173 RequestBody::Query(b) => b.ext_body.as_ref().is_some_and(|b| b.ext_shm.is_some()),
174 },
175 NetworkBodyRef::Response(Response { payload, .. }) => match payload {
176 ResponseBody::Reply(b) => match &b.payload {
177 PushBody::Put(p) => p.ext_shm.is_some(),
178 PushBody::Del(_) => false,
179 },
180 ResponseBody::Err(e) => e.ext_shm.is_some(),
181 },
182 NetworkBodyRef::ResponseFinal(_)
183 | NetworkBodyRef::Interest(_)
184 | NetworkBodyRef::Declare(_)
185 | NetworkBodyRef::OAM(_) => false,
186 }
187 }
188
189 #[inline]
190 fn is_droppable(&self) -> bool {
191 !self.is_reliable() || self.congestion_control() == CongestionControl::Drop
192 }
193
194 #[inline]
195 fn priority(&self) -> Priority {
196 match self.body() {
197 NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
198 NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
199 NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
200 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
201 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
202 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
203 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
204 }
205 }
206
207 #[inline]
208 fn wire_expr(&self) -> Option<&WireExpr<'_>> {
209 match &self.body() {
210 NetworkBodyRef::Push(m) => Some(&m.wire_expr),
211 NetworkBodyRef::Request(m) => Some(&m.wire_expr),
212 NetworkBodyRef::Response(m) => Some(&m.wire_expr),
213 NetworkBodyRef::ResponseFinal(_) => None,
214 NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
215 NetworkBodyRef::Declare(m) => match &m.body {
216 DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
217 DeclareBody::UndeclareKeyExpr(_) => None,
218 DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
219 DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
220 DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
221 DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
222 DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
223 DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
224 DeclareBody::DeclareFinal(_) => None,
225 },
226 NetworkBodyRef::OAM(_) => None,
227 }
228 }
229
230 #[inline]
231 fn as_ref(&self) -> NetworkMessageRef<'_> {
232 NetworkMessageRef {
233 body: self.body(),
234 reliability: self.reliability(),
235 }
236 }
237
238 #[inline]
239 fn to_owned(&self) -> NetworkMessage {
240 NetworkMessage {
241 body: match self.body() {
242 NetworkBodyRef::Push(msg) => NetworkBody::Push(msg.clone()),
243 NetworkBodyRef::Request(msg) => NetworkBody::Request(msg.clone()),
244 NetworkBodyRef::Response(msg) => NetworkBody::Response(msg.clone()),
245 NetworkBodyRef::ResponseFinal(msg) => NetworkBody::ResponseFinal(msg.clone()),
246 NetworkBodyRef::Interest(msg) => NetworkBody::Interest(msg.clone()),
247 NetworkBodyRef::Declare(msg) => NetworkBody::Declare(msg.clone()),
248 NetworkBodyRef::OAM(msg) => NetworkBody::OAM(msg.clone()),
249 },
250 reliability: self.reliability(),
251 }
252 }
253}
254
255impl NetworkMessageExt for NetworkMessage {
256 fn body(&self) -> NetworkBodyRef<'_> {
257 match &self.body {
258 NetworkBody::Push(body) => NetworkBodyRef::Push(body),
259 NetworkBody::Request(body) => NetworkBodyRef::Request(body),
260 NetworkBody::Response(body) => NetworkBodyRef::Response(body),
261 NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
262 NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
263 NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
264 NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
265 }
266 }
267
268 fn reliability(&self) -> Reliability {
269 self.reliability
270 }
271}
272
273impl NetworkMessageExt for NetworkMessageRef<'_> {
274 fn body(&self) -> NetworkBodyRef<'_> {
275 self.body
276 }
277
278 fn reliability(&self) -> Reliability {
279 self.reliability
280 }
281}
282
283impl NetworkMessageExt for NetworkMessageMut<'_> {
284 fn body(&self) -> NetworkBodyRef<'_> {
285 match &self.body {
286 NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
287 NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
288 NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
289 NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
290 NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
291 NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
292 NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
293 }
294 }
295
296 fn reliability(&self) -> Reliability {
297 self.reliability
298 }
299}
300
301impl NetworkMessage {
302 #[cfg(feature = "test")]
303 #[doc(hidden)]
304 pub fn rand() -> Self {
305 use rand::Rng;
306
307 let mut rng = rand::thread_rng();
308
309 let body = match rng.gen_range(0..6) {
310 0 => NetworkBody::Push(Push::rand()),
311 1 => NetworkBody::Request(Request::rand()),
312 2 => NetworkBody::Response(Response::rand()),
313 3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
314 4 => NetworkBody::Declare(Declare::rand()),
315 5 => NetworkBody::OAM(Oam::rand()),
316 _ => unreachable!(),
317 };
318
319 body.into()
320 }
321
322 #[inline]
323 pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
324 let body = match &mut self.body {
325 NetworkBody::Push(body) => NetworkBodyMut::Push(body),
326 NetworkBody::Request(body) => NetworkBodyMut::Request(body),
327 NetworkBody::Response(body) => NetworkBodyMut::Response(body),
328 NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
329 NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
330 NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
331 NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
332 };
333 NetworkMessageMut {
334 body,
335 reliability: self.reliability,
336 }
337 }
338}
339
340impl NetworkMessageMut<'_> {
341 #[inline]
342 pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
343 let body = match &mut self.body {
344 NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
345 NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
346 NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
347 NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
348 NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
349 NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
350 NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
351 };
352 NetworkMessageMut {
353 body,
354 reliability: self.reliability,
355 }
356 }
357}
358
359impl fmt::Display for NetworkMessageRef<'_> {
360 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
361 match &self.body {
362 NetworkBodyRef::OAM(_) => write!(f, "OAM"),
363 NetworkBodyRef::Push(_) => write!(f, "Push"),
364 NetworkBodyRef::Request(_) => write!(f, "Request"),
365 NetworkBodyRef::Response(_) => write!(f, "Response"),
366 NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
367 NetworkBodyRef::Interest(_) => write!(f, "Interest"),
368 NetworkBodyRef::Declare(_) => write!(f, "Declare"),
369 }
370 }
371}
372
373impl fmt::Display for NetworkMessage {
374 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
375 self.as_ref().fmt(f)
376 }
377}
378
379impl fmt::Display for NetworkMessageMut<'_> {
380 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381 self.as_ref().fmt(f)
382 }
383}
384
385impl From<NetworkBody> for NetworkMessage {
386 #[inline]
387 fn from(body: NetworkBody) -> Self {
388 Self {
389 body,
390 reliability: Reliability::DEFAULT,
391 }
392 }
393}
394
395#[cfg(feature = "test")]
396impl From<Push> for NetworkMessage {
397 fn from(push: Push) -> Self {
398 NetworkBody::Push(push).into()
399 }
400}
401
402pub mod ext {
404 use core::fmt;
405
406 use crate::{
407 common::{imsg, ZExtZ64},
408 core::{CongestionControl, EntityId, Priority, ZenohIdProto},
409 };
410
411 #[repr(transparent)]
426 #[derive(Clone, Copy, PartialEq, Eq)]
427 pub struct QoSType<const ID: u8> {
428 inner: u8,
429 }
430
431 impl<const ID: u8> QoSType<{ ID }> {
432 const P_MASK: u8 = 0b00000111;
433 const D_FLAG: u8 = 0b00001000;
434 const E_FLAG: u8 = 0b00010000;
435 const F_FLAG: u8 = 0b00100000;
436
437 pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
438
439 pub const DECLARE: Self =
440 Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
441 pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
442 pub const REQUEST: Self =
443 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
444 pub const RESPONSE: Self = Self::new(
445 Priority::DEFAULT,
446 CongestionControl::DEFAULT_RESPONSE,
447 false,
448 );
449 pub const RESPONSE_FINAL: Self = Self::new(
450 Priority::DEFAULT,
451 CongestionControl::DEFAULT_RESPONSE,
452 false,
453 );
454 pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
455
456 pub const fn new(
457 priority: Priority,
458 congestion_control: CongestionControl,
459 is_express: bool,
460 ) -> Self {
461 let mut inner = priority as u8;
462 match congestion_control {
463 CongestionControl::Block => inner |= Self::D_FLAG,
464 #[cfg(feature = "unstable")]
465 CongestionControl::BlockFirst => inner |= Self::F_FLAG,
466 _ => {}
467 }
468 if is_express {
469 inner |= Self::E_FLAG;
470 }
471 Self { inner }
472 }
473
474 pub fn set_priority(&mut self, priority: Priority) {
475 self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
476 }
477
478 pub const fn get_priority(&self) -> Priority {
479 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
480 }
481
482 pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
483 match cctrl {
484 CongestionControl::Block => {
485 self.inner = imsg::set_flag(self.inner, Self::D_FLAG);
486 self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
487 }
488 CongestionControl::Drop => {
489 self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
490 self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
491 }
492 #[cfg(feature = "unstable")]
493 CongestionControl::BlockFirst => {
494 self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
495 self.inner = imsg::set_flag(self.inner, Self::F_FLAG);
496 }
497 }
498 }
499
500 pub const fn get_congestion_control(&self) -> CongestionControl {
501 match (
502 imsg::has_flag(self.inner, Self::D_FLAG),
503 imsg::has_flag(self.inner, Self::F_FLAG),
504 ) {
505 (false, false) => CongestionControl::Drop,
506 #[cfg(feature = "unstable")]
507 (false, true) => CongestionControl::BlockFirst,
508 #[cfg(not(feature = "unstable"))]
509 (false, true) => CongestionControl::Drop,
510 (true, _) => CongestionControl::Block,
511 }
512 }
513
514 pub fn set_is_express(&mut self, is_express: bool) {
515 match is_express {
516 true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
517 false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
518 }
519 }
520
521 pub const fn is_express(&self) -> bool {
522 imsg::has_flag(self.inner, Self::E_FLAG)
523 }
524
525 #[cfg(feature = "test")]
526 #[doc(hidden)]
527 pub fn rand() -> Self {
528 use rand::Rng;
529 let mut rng = rand::thread_rng();
530
531 let inner: u8 = rng.gen();
532 Self { inner }
533 }
534 }
535
536 impl<const ID: u8> Default for QoSType<{ ID }> {
537 fn default() -> Self {
538 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
539 }
540 }
541
542 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
543 fn from(ext: ZExtZ64<{ ID }>) -> Self {
544 Self {
545 inner: ext.value as u8,
546 }
547 }
548 }
549
550 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
551 fn from(ext: QoSType<{ ID }>) -> Self {
552 ZExtZ64::new(ext.inner as u64)
553 }
554 }
555
556 impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
557 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
558 f.debug_struct("QoS")
559 .field("priority", &self.get_priority())
560 .field("congestion", &self.get_congestion_control())
561 .field("express", &self.is_express())
562 .finish()
563 }
564 }
565
566 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
575 pub struct TimestampType<const ID: u8> {
576 pub timestamp: uhlc::Timestamp,
577 }
578
579 impl<const ID: u8> TimestampType<{ ID }> {
580 #[cfg(feature = "test")]
581 #[doc(hidden)]
582 pub fn rand() -> Self {
583 use rand::Rng;
584 let mut rng = rand::thread_rng();
585
586 let time = uhlc::NTP64(rng.gen());
587 let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
588 let timestamp = uhlc::Timestamp::new(time, id);
589 Self { timestamp }
590 }
591 }
592
593 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
602 pub struct NodeIdType<const ID: u8> {
603 pub node_id: u16,
604 }
605
606 impl<const ID: u8> NodeIdType<{ ID }> {
607 pub const DEFAULT: Self = Self { node_id: 0 };
609
610 #[cfg(feature = "test")]
611 #[doc(hidden)]
612 pub fn rand() -> Self {
613 use rand::Rng;
614 let mut rng = rand::thread_rng();
615 let node_id = rng.gen();
616 Self { node_id }
617 }
618 }
619
620 impl<const ID: u8> Default for NodeIdType<{ ID }> {
621 fn default() -> Self {
622 Self::DEFAULT
623 }
624 }
625
626 impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
627 fn from(ext: ZExtZ64<{ ID }>) -> Self {
628 Self {
629 node_id: ext.value as u16,
630 }
631 }
632 }
633
634 impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
635 fn from(ext: NodeIdType<{ ID }>) -> Self {
636 ZExtZ64::new(ext.node_id as u64)
637 }
638 }
639
640 #[derive(Debug, Clone, PartialEq, Eq)]
651 pub struct EntityGlobalIdType<const ID: u8> {
652 pub zid: ZenohIdProto,
653 pub eid: EntityId,
654 }
655
656 impl<const ID: u8> EntityGlobalIdType<{ ID }> {
657 #[cfg(feature = "test")]
658 #[doc(hidden)]
659 pub fn rand() -> Self {
660 use rand::Rng;
661 let mut rng = rand::thread_rng();
662
663 let zid = ZenohIdProto::rand();
664 let eid: EntityId = rng.gen();
665 Self { zid, eid }
666 }
667 }
668}