1pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24 Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25 DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36 pub const OAM: u8 = 0x1f;
39 pub const DECLARE: u8 = 0x1e;
40 pub const PUSH: u8 = 0x1d;
41 pub const REQUEST: u8 = 0x1c;
42 pub const RESPONSE: u8 = 0x1b;
43 pub const RESPONSE_FINAL: u8 = 0x1a;
44 pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50 #[default]
51 Receiver = 0,
52 Sender = 1,
53}
54
55impl Mapping {
56 pub const DEFAULT: Self = Self::Receiver;
57
58 #[cfg(feature = "test")]
59 pub fn rand() -> Self {
60 use rand::Rng;
61
62 let mut rng = rand::thread_rng();
63 if rng.gen_bool(0.5) {
64 Mapping::Sender
65 } else {
66 Mapping::Receiver
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74 Push(Push),
75 Request(Request),
76 Response(Response),
77 ResponseFinal(ResponseFinal),
78 Interest(Interest),
79 Declare(Declare),
80 OAM(Oam),
81}
82
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub enum NetworkBodyRef<'a> {
85 Push(&'a Push),
86 Request(&'a Request),
87 Response(&'a Response),
88 ResponseFinal(&'a ResponseFinal),
89 Interest(&'a Interest),
90 Declare(&'a Declare),
91 OAM(&'a Oam),
92}
93
94#[derive(Debug, PartialEq, Eq)]
95pub enum NetworkBodyMut<'a> {
96 Push(&'a mut Push),
97 Request(&'a mut Request),
98 Response(&'a mut Response),
99 ResponseFinal(&'a mut ResponseFinal),
100 Interest(&'a mut Interest),
101 Declare(&'a mut Declare),
102 OAM(&'a mut Oam),
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct NetworkMessage {
107 pub body: NetworkBody,
108 pub reliability: Reliability,
109}
110
111#[derive(Debug, Copy, Clone, PartialEq, Eq)]
112pub struct NetworkMessageRef<'a> {
113 pub body: NetworkBodyRef<'a>,
114 pub reliability: Reliability,
115}
116
117#[derive(Debug, PartialEq, Eq)]
118pub struct NetworkMessageMut<'a> {
119 pub body: NetworkBodyMut<'a>,
120 pub reliability: Reliability,
121}
122
123pub trait NetworkMessageExt {
124 #[doc(hidden)]
125 fn body(&self) -> NetworkBodyRef<'_>;
126
127 #[doc(hidden)]
128 fn reliability(&self) -> Reliability;
129
130 #[inline]
131 fn is_reliable(&self) -> bool {
132 self.reliability() == Reliability::Reliable
133 }
134
135 #[inline]
136 fn is_express(&self) -> bool {
137 match self.body() {
138 NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
139 NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
140 NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
141 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
142 NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
143 NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
144 NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
145 }
146 }
147
148 #[inline]
149 fn congestion_control(&self) -> CongestionControl {
150 match self.body() {
151 NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
152 NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
153 NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
154 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
155 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
156 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
157 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
158 }
159 }
160
161 #[inline]
162 fn is_droppable(&self) -> bool {
163 !self.is_reliable() || self.congestion_control() == CongestionControl::Drop
164 }
165
166 #[inline]
167 fn priority(&self) -> Priority {
168 match self.body() {
169 NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
170 NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
171 NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
172 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
173 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
174 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
175 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
176 }
177 }
178
179 #[inline]
180 fn wire_expr(&self) -> Option<&WireExpr<'_>> {
181 match &self.body() {
182 NetworkBodyRef::Push(m) => Some(&m.wire_expr),
183 NetworkBodyRef::Request(m) => Some(&m.wire_expr),
184 NetworkBodyRef::Response(m) => Some(&m.wire_expr),
185 NetworkBodyRef::ResponseFinal(_) => None,
186 NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
187 NetworkBodyRef::Declare(m) => match &m.body {
188 DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
189 DeclareBody::UndeclareKeyExpr(_) => None,
190 DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
191 DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
192 DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
193 DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
194 DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
195 DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
196 DeclareBody::DeclareFinal(_) => None,
197 },
198 NetworkBodyRef::OAM(_) => None,
199 }
200 }
201
202 #[inline]
203 fn as_ref(&self) -> NetworkMessageRef<'_> {
204 NetworkMessageRef {
205 body: self.body(),
206 reliability: self.reliability(),
207 }
208 }
209
210 #[inline]
211 fn to_owned(&self) -> NetworkMessage {
212 NetworkMessage {
213 body: match self.body() {
214 NetworkBodyRef::Push(msg) => NetworkBody::Push(msg.clone()),
215 NetworkBodyRef::Request(msg) => NetworkBody::Request(msg.clone()),
216 NetworkBodyRef::Response(msg) => NetworkBody::Response(msg.clone()),
217 NetworkBodyRef::ResponseFinal(msg) => NetworkBody::ResponseFinal(msg.clone()),
218 NetworkBodyRef::Interest(msg) => NetworkBody::Interest(msg.clone()),
219 NetworkBodyRef::Declare(msg) => NetworkBody::Declare(msg.clone()),
220 NetworkBodyRef::OAM(msg) => NetworkBody::OAM(msg.clone()),
221 },
222 reliability: self.reliability(),
223 }
224 }
225}
226
227impl NetworkMessageExt for NetworkMessage {
228 fn body(&self) -> NetworkBodyRef<'_> {
229 match &self.body {
230 NetworkBody::Push(body) => NetworkBodyRef::Push(body),
231 NetworkBody::Request(body) => NetworkBodyRef::Request(body),
232 NetworkBody::Response(body) => NetworkBodyRef::Response(body),
233 NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
234 NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
235 NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
236 NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
237 }
238 }
239
240 fn reliability(&self) -> Reliability {
241 self.reliability
242 }
243}
244
245impl NetworkMessageExt for NetworkMessageRef<'_> {
246 fn body(&self) -> NetworkBodyRef<'_> {
247 self.body
248 }
249
250 fn reliability(&self) -> Reliability {
251 self.reliability
252 }
253}
254
255impl NetworkMessageExt for NetworkMessageMut<'_> {
256 fn body(&self) -> NetworkBodyRef<'_> {
257 match &self.body {
258 NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
259 NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
260 NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
261 NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
262 NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
263 NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
264 NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
265 }
266 }
267
268 fn reliability(&self) -> Reliability {
269 self.reliability
270 }
271}
272
273impl NetworkMessage {
274 #[cfg(feature = "test")]
275 pub fn rand() -> Self {
276 use rand::Rng;
277
278 let mut rng = rand::thread_rng();
279
280 let body = match rng.gen_range(0..6) {
281 0 => NetworkBody::Push(Push::rand()),
282 1 => NetworkBody::Request(Request::rand()),
283 2 => NetworkBody::Response(Response::rand()),
284 3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
285 4 => NetworkBody::Declare(Declare::rand()),
286 5 => NetworkBody::OAM(Oam::rand()),
287 _ => unreachable!(),
288 };
289
290 body.into()
291 }
292
293 #[inline]
294 pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
295 let body = match &mut self.body {
296 NetworkBody::Push(body) => NetworkBodyMut::Push(body),
297 NetworkBody::Request(body) => NetworkBodyMut::Request(body),
298 NetworkBody::Response(body) => NetworkBodyMut::Response(body),
299 NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
300 NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
301 NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
302 NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
303 };
304 NetworkMessageMut {
305 body,
306 reliability: self.reliability,
307 }
308 }
309}
310
311impl NetworkMessageMut<'_> {
312 #[inline]
313 pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
314 let body = match &mut self.body {
315 NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
316 NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
317 NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
318 NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
319 NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
320 NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
321 NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
322 };
323 NetworkMessageMut {
324 body,
325 reliability: self.reliability,
326 }
327 }
328}
329
330impl fmt::Display for NetworkMessageRef<'_> {
331 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
332 match &self.body {
333 NetworkBodyRef::OAM(_) => write!(f, "OAM"),
334 NetworkBodyRef::Push(_) => write!(f, "Push"),
335 NetworkBodyRef::Request(_) => write!(f, "Request"),
336 NetworkBodyRef::Response(_) => write!(f, "Response"),
337 NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
338 NetworkBodyRef::Interest(_) => write!(f, "Interest"),
339 NetworkBodyRef::Declare(_) => write!(f, "Declare"),
340 }
341 }
342}
343
344impl fmt::Display for NetworkMessage {
345 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
346 self.as_ref().fmt(f)
347 }
348}
349
350impl fmt::Display for NetworkMessageMut<'_> {
351 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
352 self.as_ref().fmt(f)
353 }
354}
355
356impl From<NetworkBody> for NetworkMessage {
357 #[inline]
358 fn from(body: NetworkBody) -> Self {
359 Self {
360 body,
361 reliability: Reliability::DEFAULT,
362 }
363 }
364}
365
366#[cfg(feature = "test")]
367impl From<Push> for NetworkMessage {
368 fn from(push: Push) -> Self {
369 NetworkBody::Push(push).into()
370 }
371}
372
373pub mod ext {
375 use core::fmt;
376
377 use crate::{
378 common::{imsg, ZExtZ64},
379 core::{CongestionControl, EntityId, Priority, ZenohIdProto},
380 };
381
382 #[repr(transparent)]
397 #[derive(Clone, Copy, PartialEq, Eq)]
398 pub struct QoSType<const ID: u8> {
399 inner: u8,
400 }
401
402 impl<const ID: u8> QoSType<{ ID }> {
403 const P_MASK: u8 = 0b00000111;
404 const D_FLAG: u8 = 0b00001000;
405 const E_FLAG: u8 = 0b00010000;
406 const F_FLAG: u8 = 0b00100000;
407
408 pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
409
410 pub const DECLARE: Self =
411 Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
412 pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
413 pub const REQUEST: Self =
414 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
415 pub const RESPONSE: Self = Self::new(
416 Priority::DEFAULT,
417 CongestionControl::DEFAULT_RESPONSE,
418 false,
419 );
420 pub const RESPONSE_FINAL: Self = Self::new(
421 Priority::DEFAULT,
422 CongestionControl::DEFAULT_RESPONSE,
423 false,
424 );
425 pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
426
427 pub const fn new(
428 priority: Priority,
429 congestion_control: CongestionControl,
430 is_express: bool,
431 ) -> Self {
432 let mut inner = priority as u8;
433 match congestion_control {
434 CongestionControl::Block => inner |= Self::D_FLAG,
435 #[cfg(feature = "unstable")]
436 CongestionControl::BlockFirst => inner |= Self::F_FLAG,
437 _ => {}
438 }
439 if is_express {
440 inner |= Self::E_FLAG;
441 }
442 Self { inner }
443 }
444
445 pub fn set_priority(&mut self, priority: Priority) {
446 self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
447 }
448
449 pub const fn get_priority(&self) -> Priority {
450 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
451 }
452
453 pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
454 match cctrl {
455 CongestionControl::Block => {
456 self.inner = imsg::set_flag(self.inner, Self::D_FLAG);
457 self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
458 }
459 CongestionControl::Drop => {
460 self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
461 self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
462 }
463 #[cfg(feature = "unstable")]
464 CongestionControl::BlockFirst => {
465 self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
466 self.inner = imsg::set_flag(self.inner, Self::F_FLAG);
467 }
468 }
469 }
470
471 pub const fn get_congestion_control(&self) -> CongestionControl {
472 match (
473 imsg::has_flag(self.inner, Self::D_FLAG),
474 imsg::has_flag(self.inner, Self::F_FLAG),
475 ) {
476 (false, false) => CongestionControl::Drop,
477 #[cfg(feature = "unstable")]
478 (false, true) => CongestionControl::BlockFirst,
479 #[cfg(not(feature = "unstable"))]
480 (false, true) => CongestionControl::Drop,
481 (true, _) => CongestionControl::Block,
482 }
483 }
484
485 pub fn set_is_express(&mut self, is_express: bool) {
486 match is_express {
487 true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
488 false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
489 }
490 }
491
492 pub const fn is_express(&self) -> bool {
493 imsg::has_flag(self.inner, Self::E_FLAG)
494 }
495
496 #[cfg(feature = "test")]
497 pub fn rand() -> Self {
498 use rand::Rng;
499 let mut rng = rand::thread_rng();
500
501 let inner: u8 = rng.gen();
502 Self { inner }
503 }
504 }
505
506 impl<const ID: u8> Default for QoSType<{ ID }> {
507 fn default() -> Self {
508 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
509 }
510 }
511
512 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
513 fn from(ext: ZExtZ64<{ ID }>) -> Self {
514 Self {
515 inner: ext.value as u8,
516 }
517 }
518 }
519
520 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
521 fn from(ext: QoSType<{ ID }>) -> Self {
522 ZExtZ64::new(ext.inner as u64)
523 }
524 }
525
526 impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
527 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
528 f.debug_struct("QoS")
529 .field("priority", &self.get_priority())
530 .field("congestion", &self.get_congestion_control())
531 .field("express", &self.is_express())
532 .finish()
533 }
534 }
535
536 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
545 pub struct TimestampType<const ID: u8> {
546 pub timestamp: uhlc::Timestamp,
547 }
548
549 impl<const ID: u8> TimestampType<{ ID }> {
550 #[cfg(feature = "test")]
551 pub fn rand() -> Self {
552 use rand::Rng;
553 let mut rng = rand::thread_rng();
554
555 let time = uhlc::NTP64(rng.gen());
556 let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
557 let timestamp = uhlc::Timestamp::new(time, id);
558 Self { timestamp }
559 }
560 }
561
562 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
571 pub struct NodeIdType<const ID: u8> {
572 pub node_id: u16,
573 }
574
575 impl<const ID: u8> NodeIdType<{ ID }> {
576 pub const DEFAULT: Self = Self { node_id: 0 };
578
579 #[cfg(feature = "test")]
580 pub fn rand() -> Self {
581 use rand::Rng;
582 let mut rng = rand::thread_rng();
583 let node_id = rng.gen();
584 Self { node_id }
585 }
586 }
587
588 impl<const ID: u8> Default for NodeIdType<{ ID }> {
589 fn default() -> Self {
590 Self::DEFAULT
591 }
592 }
593
594 impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
595 fn from(ext: ZExtZ64<{ ID }>) -> Self {
596 Self {
597 node_id: ext.value as u16,
598 }
599 }
600 }
601
602 impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
603 fn from(ext: NodeIdType<{ ID }>) -> Self {
604 ZExtZ64::new(ext.node_id as u64)
605 }
606 }
607
608 #[derive(Debug, Clone, PartialEq, Eq)]
619 pub struct EntityGlobalIdType<const ID: u8> {
620 pub zid: ZenohIdProto,
621 pub eid: EntityId,
622 }
623
624 impl<const ID: u8> EntityGlobalIdType<{ ID }> {
625 #[cfg(feature = "test")]
626 pub fn rand() -> Self {
627 use rand::Rng;
628 let mut rng = rand::thread_rng();
629
630 let zid = ZenohIdProto::rand();
631 let eid: EntityId = rng.gen();
632 Self { zid, eid }
633 }
634 }
635}