1pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24 Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25 DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36 pub const OAM: u8 = 0x1f;
39 pub const DECLARE: u8 = 0x1e;
40 pub const PUSH: u8 = 0x1d;
41 pub const REQUEST: u8 = 0x1c;
42 pub const RESPONSE: u8 = 0x1b;
43 pub const RESPONSE_FINAL: u8 = 0x1a;
44 pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50 #[default]
51 Receiver = 0,
52 Sender = 1,
53}
54
55impl Mapping {
56 pub const DEFAULT: Self = Self::Receiver;
57
58 #[cfg(feature = "test")]
59 #[doc(hidden)]
60 pub fn rand() -> Self {
61 use rand::Rng;
62
63 let mut rng = rand::thread_rng();
64 if rng.gen_bool(0.5) {
65 Mapping::Sender
66 } else {
67 Mapping::Receiver
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum NetworkBody {
75 Push(Push),
76 Request(Request),
77 Response(Response),
78 ResponseFinal(ResponseFinal),
79 Interest(Interest),
80 Declare(Declare),
81 OAM(Oam),
82}
83
84#[derive(Debug, Copy, Clone, PartialEq, Eq)]
85pub enum NetworkBodyRef<'a> {
86 Push(&'a Push),
87 Request(&'a Request),
88 Response(&'a Response),
89 ResponseFinal(&'a ResponseFinal),
90 Interest(&'a Interest),
91 Declare(&'a Declare),
92 OAM(&'a Oam),
93}
94
95#[derive(Debug, PartialEq, Eq)]
96pub enum NetworkBodyMut<'a> {
97 Push(&'a mut Push),
98 Request(&'a mut Request),
99 Response(&'a mut Response),
100 ResponseFinal(&'a mut ResponseFinal),
101 Interest(&'a mut Interest),
102 Declare(&'a mut Declare),
103 OAM(&'a mut Oam),
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct NetworkMessage {
108 pub body: NetworkBody,
109 pub reliability: Reliability,
110}
111
112#[derive(Debug, Copy, Clone, PartialEq, Eq)]
113pub struct NetworkMessageRef<'a> {
114 pub body: NetworkBodyRef<'a>,
115 pub reliability: Reliability,
116}
117
118#[derive(Debug, PartialEq, Eq)]
119pub struct NetworkMessageMut<'a> {
120 pub body: NetworkBodyMut<'a>,
121 pub reliability: Reliability,
122}
123
124pub trait NetworkMessageExt {
125 #[doc(hidden)]
126 fn body(&self) -> NetworkBodyRef<'_>;
127
128 #[doc(hidden)]
129 fn reliability(&self) -> Reliability;
130
131 #[inline]
132 fn is_reliable(&self) -> bool {
133 self.reliability() == Reliability::Reliable
134 }
135
136 #[inline]
137 fn is_express(&self) -> bool {
138 match self.body() {
139 NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
140 NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
141 NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
142 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
143 NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
144 NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
145 NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
146 }
147 }
148
149 #[inline]
150 fn congestion_control(&self) -> CongestionControl {
151 match self.body() {
152 NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
153 NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
154 NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
155 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
156 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
157 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
158 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
159 }
160 }
161
162 #[inline]
163 fn is_droppable(&self) -> bool {
164 !self.is_reliable() || self.congestion_control() == CongestionControl::Drop
165 }
166
167 #[inline]
168 fn priority(&self) -> Priority {
169 match self.body() {
170 NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
171 NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
172 NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
173 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
174 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
175 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
176 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
177 }
178 }
179
180 #[inline]
181 fn wire_expr(&self) -> Option<&WireExpr<'_>> {
182 match &self.body() {
183 NetworkBodyRef::Push(m) => Some(&m.wire_expr),
184 NetworkBodyRef::Request(m) => Some(&m.wire_expr),
185 NetworkBodyRef::Response(m) => Some(&m.wire_expr),
186 NetworkBodyRef::ResponseFinal(_) => None,
187 NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
188 NetworkBodyRef::Declare(m) => match &m.body {
189 DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
190 DeclareBody::UndeclareKeyExpr(_) => None,
191 DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
192 DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
193 DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
194 DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
195 DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
196 DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
197 DeclareBody::DeclareFinal(_) => None,
198 },
199 NetworkBodyRef::OAM(_) => None,
200 }
201 }
202
203 #[inline]
204 fn as_ref(&self) -> NetworkMessageRef<'_> {
205 NetworkMessageRef {
206 body: self.body(),
207 reliability: self.reliability(),
208 }
209 }
210
211 #[inline]
212 fn to_owned(&self) -> NetworkMessage {
213 NetworkMessage {
214 body: match self.body() {
215 NetworkBodyRef::Push(msg) => NetworkBody::Push(msg.clone()),
216 NetworkBodyRef::Request(msg) => NetworkBody::Request(msg.clone()),
217 NetworkBodyRef::Response(msg) => NetworkBody::Response(msg.clone()),
218 NetworkBodyRef::ResponseFinal(msg) => NetworkBody::ResponseFinal(msg.clone()),
219 NetworkBodyRef::Interest(msg) => NetworkBody::Interest(msg.clone()),
220 NetworkBodyRef::Declare(msg) => NetworkBody::Declare(msg.clone()),
221 NetworkBodyRef::OAM(msg) => NetworkBody::OAM(msg.clone()),
222 },
223 reliability: self.reliability(),
224 }
225 }
226}
227
228impl NetworkMessageExt for NetworkMessage {
229 fn body(&self) -> NetworkBodyRef<'_> {
230 match &self.body {
231 NetworkBody::Push(body) => NetworkBodyRef::Push(body),
232 NetworkBody::Request(body) => NetworkBodyRef::Request(body),
233 NetworkBody::Response(body) => NetworkBodyRef::Response(body),
234 NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
235 NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
236 NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
237 NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
238 }
239 }
240
241 fn reliability(&self) -> Reliability {
242 self.reliability
243 }
244}
245
246impl NetworkMessageExt for NetworkMessageRef<'_> {
247 fn body(&self) -> NetworkBodyRef<'_> {
248 self.body
249 }
250
251 fn reliability(&self) -> Reliability {
252 self.reliability
253 }
254}
255
256impl NetworkMessageExt for NetworkMessageMut<'_> {
257 fn body(&self) -> NetworkBodyRef<'_> {
258 match &self.body {
259 NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
260 NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
261 NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
262 NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
263 NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
264 NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
265 NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
266 }
267 }
268
269 fn reliability(&self) -> Reliability {
270 self.reliability
271 }
272}
273
274impl NetworkMessage {
275 #[cfg(feature = "test")]
276 #[doc(hidden)]
277 pub fn rand() -> Self {
278 use rand::Rng;
279
280 let mut rng = rand::thread_rng();
281
282 let body = match rng.gen_range(0..6) {
283 0 => NetworkBody::Push(Push::rand()),
284 1 => NetworkBody::Request(Request::rand()),
285 2 => NetworkBody::Response(Response::rand()),
286 3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
287 4 => NetworkBody::Declare(Declare::rand()),
288 5 => NetworkBody::OAM(Oam::rand()),
289 _ => unreachable!(),
290 };
291
292 body.into()
293 }
294
295 #[inline]
296 pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
297 let body = match &mut self.body {
298 NetworkBody::Push(body) => NetworkBodyMut::Push(body),
299 NetworkBody::Request(body) => NetworkBodyMut::Request(body),
300 NetworkBody::Response(body) => NetworkBodyMut::Response(body),
301 NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
302 NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
303 NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
304 NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
305 };
306 NetworkMessageMut {
307 body,
308 reliability: self.reliability,
309 }
310 }
311}
312
313impl NetworkMessageMut<'_> {
314 #[inline]
315 pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
316 let body = match &mut self.body {
317 NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
318 NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
319 NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
320 NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
321 NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
322 NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
323 NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
324 };
325 NetworkMessageMut {
326 body,
327 reliability: self.reliability,
328 }
329 }
330}
331
332impl fmt::Display for NetworkMessageRef<'_> {
333 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
334 match &self.body {
335 NetworkBodyRef::OAM(_) => write!(f, "OAM"),
336 NetworkBodyRef::Push(_) => write!(f, "Push"),
337 NetworkBodyRef::Request(_) => write!(f, "Request"),
338 NetworkBodyRef::Response(_) => write!(f, "Response"),
339 NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
340 NetworkBodyRef::Interest(_) => write!(f, "Interest"),
341 NetworkBodyRef::Declare(_) => write!(f, "Declare"),
342 }
343 }
344}
345
346impl fmt::Display for NetworkMessage {
347 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
348 self.as_ref().fmt(f)
349 }
350}
351
352impl fmt::Display for NetworkMessageMut<'_> {
353 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
354 self.as_ref().fmt(f)
355 }
356}
357
358impl From<NetworkBody> for NetworkMessage {
359 #[inline]
360 fn from(body: NetworkBody) -> Self {
361 Self {
362 body,
363 reliability: Reliability::DEFAULT,
364 }
365 }
366}
367
368#[cfg(feature = "test")]
369impl From<Push> for NetworkMessage {
370 fn from(push: Push) -> Self {
371 NetworkBody::Push(push).into()
372 }
373}
374
375pub mod ext {
377 use core::fmt;
378
379 use crate::{
380 common::{imsg, ZExtZ64},
381 core::{CongestionControl, EntityId, Priority, ZenohIdProto},
382 };
383
384 #[repr(transparent)]
399 #[derive(Clone, Copy, PartialEq, Eq)]
400 pub struct QoSType<const ID: u8> {
401 inner: u8,
402 }
403
404 impl<const ID: u8> QoSType<{ ID }> {
405 const P_MASK: u8 = 0b00000111;
406 const D_FLAG: u8 = 0b00001000;
407 const E_FLAG: u8 = 0b00010000;
408 const F_FLAG: u8 = 0b00100000;
409
410 pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
411
412 pub const DECLARE: Self =
413 Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
414 pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
415 pub const REQUEST: Self =
416 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
417 pub const RESPONSE: Self = Self::new(
418 Priority::DEFAULT,
419 CongestionControl::DEFAULT_RESPONSE,
420 false,
421 );
422 pub const RESPONSE_FINAL: Self = Self::new(
423 Priority::DEFAULT,
424 CongestionControl::DEFAULT_RESPONSE,
425 false,
426 );
427 pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
428
429 pub const fn new(
430 priority: Priority,
431 congestion_control: CongestionControl,
432 is_express: bool,
433 ) -> Self {
434 let mut inner = priority as u8;
435 match congestion_control {
436 CongestionControl::Block => inner |= Self::D_FLAG,
437 #[cfg(feature = "unstable")]
438 CongestionControl::BlockFirst => inner |= Self::F_FLAG,
439 _ => {}
440 }
441 if is_express {
442 inner |= Self::E_FLAG;
443 }
444 Self { inner }
445 }
446
447 pub fn set_priority(&mut self, priority: Priority) {
448 self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
449 }
450
451 pub const fn get_priority(&self) -> Priority {
452 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
453 }
454
455 pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
456 match cctrl {
457 CongestionControl::Block => {
458 self.inner = imsg::set_flag(self.inner, Self::D_FLAG);
459 self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
460 }
461 CongestionControl::Drop => {
462 self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
463 self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
464 }
465 #[cfg(feature = "unstable")]
466 CongestionControl::BlockFirst => {
467 self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
468 self.inner = imsg::set_flag(self.inner, Self::F_FLAG);
469 }
470 }
471 }
472
473 pub const fn get_congestion_control(&self) -> CongestionControl {
474 match (
475 imsg::has_flag(self.inner, Self::D_FLAG),
476 imsg::has_flag(self.inner, Self::F_FLAG),
477 ) {
478 (false, false) => CongestionControl::Drop,
479 #[cfg(feature = "unstable")]
480 (false, true) => CongestionControl::BlockFirst,
481 #[cfg(not(feature = "unstable"))]
482 (false, true) => CongestionControl::Drop,
483 (true, _) => CongestionControl::Block,
484 }
485 }
486
487 pub fn set_is_express(&mut self, is_express: bool) {
488 match is_express {
489 true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
490 false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
491 }
492 }
493
494 pub const fn is_express(&self) -> bool {
495 imsg::has_flag(self.inner, Self::E_FLAG)
496 }
497
498 #[cfg(feature = "test")]
499 #[doc(hidden)]
500 pub fn rand() -> Self {
501 use rand::Rng;
502 let mut rng = rand::thread_rng();
503
504 let inner: u8 = rng.gen();
505 Self { inner }
506 }
507 }
508
509 impl<const ID: u8> Default for QoSType<{ ID }> {
510 fn default() -> Self {
511 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
512 }
513 }
514
515 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
516 fn from(ext: ZExtZ64<{ ID }>) -> Self {
517 Self {
518 inner: ext.value as u8,
519 }
520 }
521 }
522
523 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
524 fn from(ext: QoSType<{ ID }>) -> Self {
525 ZExtZ64::new(ext.inner as u64)
526 }
527 }
528
529 impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
530 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
531 f.debug_struct("QoS")
532 .field("priority", &self.get_priority())
533 .field("congestion", &self.get_congestion_control())
534 .field("express", &self.is_express())
535 .finish()
536 }
537 }
538
539 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
548 pub struct TimestampType<const ID: u8> {
549 pub timestamp: uhlc::Timestamp,
550 }
551
552 impl<const ID: u8> TimestampType<{ ID }> {
553 #[cfg(feature = "test")]
554 #[doc(hidden)]
555 pub fn rand() -> Self {
556 use rand::Rng;
557 let mut rng = rand::thread_rng();
558
559 let time = uhlc::NTP64(rng.gen());
560 let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
561 let timestamp = uhlc::Timestamp::new(time, id);
562 Self { timestamp }
563 }
564 }
565
566 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
575 pub struct NodeIdType<const ID: u8> {
576 pub node_id: u16,
577 }
578
579 impl<const ID: u8> NodeIdType<{ ID }> {
580 pub const DEFAULT: Self = Self { node_id: 0 };
582
583 #[cfg(feature = "test")]
584 #[doc(hidden)]
585 pub fn rand() -> Self {
586 use rand::Rng;
587 let mut rng = rand::thread_rng();
588 let node_id = rng.gen();
589 Self { node_id }
590 }
591 }
592
593 impl<const ID: u8> Default for NodeIdType<{ ID }> {
594 fn default() -> Self {
595 Self::DEFAULT
596 }
597 }
598
599 impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
600 fn from(ext: ZExtZ64<{ ID }>) -> Self {
601 Self {
602 node_id: ext.value as u16,
603 }
604 }
605 }
606
607 impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
608 fn from(ext: NodeIdType<{ ID }>) -> Self {
609 ZExtZ64::new(ext.node_id as u64)
610 }
611 }
612
613 #[derive(Debug, Clone, PartialEq, Eq)]
624 pub struct EntityGlobalIdType<const ID: u8> {
625 pub zid: ZenohIdProto,
626 pub eid: EntityId,
627 }
628
629 impl<const ID: u8> EntityGlobalIdType<{ ID }> {
630 #[cfg(feature = "test")]
631 #[doc(hidden)]
632 pub fn rand() -> Self {
633 use rand::Rng;
634 let mut rng = rand::thread_rng();
635
636 let zid = ZenohIdProto::rand();
637 let eid: EntityId = rng.gen();
638 Self { zid, eid }
639 }
640 }
641}