1pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24 Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25 DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36 pub const OAM: u8 = 0x1f;
39 pub const DECLARE: u8 = 0x1e;
40 pub const PUSH: u8 = 0x1d;
41 pub const REQUEST: u8 = 0x1c;
42 pub const RESPONSE: u8 = 0x1b;
43 pub const RESPONSE_FINAL: u8 = 0x1a;
44 pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50 #[default]
51 Receiver = 0,
52 Sender = 1,
53}
54
55impl Mapping {
56 pub const DEFAULT: Self = Self::Receiver;
57
58 #[cfg(feature = "test")]
59 pub fn rand() -> Self {
60 use rand::Rng;
61
62 let mut rng = rand::thread_rng();
63 if rng.gen_bool(0.5) {
64 Mapping::Sender
65 } else {
66 Mapping::Receiver
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74 Push(Push),
75 Request(Request),
76 Response(Response),
77 ResponseFinal(ResponseFinal),
78 Interest(Interest),
79 Declare(Declare),
80 OAM(Oam),
81}
82
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub enum NetworkBodyRef<'a> {
85 Push(&'a Push),
86 Request(&'a Request),
87 Response(&'a Response),
88 ResponseFinal(&'a ResponseFinal),
89 Interest(&'a Interest),
90 Declare(&'a Declare),
91 OAM(&'a Oam),
92}
93
94#[derive(Debug, PartialEq, Eq)]
95pub enum NetworkBodyMut<'a> {
96 Push(&'a mut Push),
97 Request(&'a mut Request),
98 Response(&'a mut Response),
99 ResponseFinal(&'a mut ResponseFinal),
100 Interest(&'a mut Interest),
101 Declare(&'a mut Declare),
102 OAM(&'a mut Oam),
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct NetworkMessage {
107 pub body: NetworkBody,
108 pub reliability: Reliability,
109 #[cfg(feature = "stats")]
110 pub size: Option<core::num::NonZeroUsize>,
111}
112
113#[derive(Debug, Copy, Clone, PartialEq, Eq)]
114pub struct NetworkMessageRef<'a> {
115 pub body: NetworkBodyRef<'a>,
116 pub reliability: Reliability,
117 #[cfg(feature = "stats")]
118 pub size: Option<core::num::NonZeroUsize>,
119}
120
121#[derive(Debug, PartialEq, Eq)]
122pub struct NetworkMessageMut<'a> {
123 pub body: NetworkBodyMut<'a>,
124 pub reliability: Reliability,
125 #[cfg(feature = "stats")]
126 pub size: Option<core::num::NonZeroUsize>,
127}
128
129pub trait NetworkMessageExt {
130 #[doc(hidden)]
131 fn body(&self) -> NetworkBodyRef;
132 #[doc(hidden)]
133 fn reliability(&self) -> Reliability;
134 #[cfg(feature = "stats")]
135 fn size(&self) -> Option<core::num::NonZeroUsize>;
136
137 #[inline]
138 fn is_reliable(&self) -> bool {
139 self.reliability() == Reliability::Reliable
140 }
141
142 #[inline]
143 fn is_express(&self) -> bool {
144 match self.body() {
145 NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
146 NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
147 NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
148 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
149 NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
150 NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
151 NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
152 }
153 }
154
155 #[inline]
156 fn is_droppable(&self) -> bool {
157 if !self.is_reliable() {
158 return true;
159 }
160
161 let cc = match self.body() {
162 NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
163 NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
164 NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
165 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
166 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
167 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
168 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
169 };
170
171 cc == CongestionControl::Drop
172 }
173
174 #[inline]
175 fn priority(&self) -> Priority {
176 match self.body() {
177 NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
178 NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
179 NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
180 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
181 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
182 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
183 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
184 }
185 }
186
187 #[inline]
188 fn wire_expr(&self) -> Option<&WireExpr> {
189 match &self.body() {
190 NetworkBodyRef::Push(m) => Some(&m.wire_expr),
191 NetworkBodyRef::Request(m) => Some(&m.wire_expr),
192 NetworkBodyRef::Response(m) => Some(&m.wire_expr),
193 NetworkBodyRef::ResponseFinal(_) => None,
194 NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
195 NetworkBodyRef::Declare(m) => match &m.body {
196 DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
197 DeclareBody::UndeclareKeyExpr(_) => None,
198 DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
199 DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
200 DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
201 DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
202 DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
203 DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
204 DeclareBody::DeclareFinal(_) => None,
205 },
206 NetworkBodyRef::OAM(_) => None,
207 }
208 }
209
210 #[inline]
211 fn as_ref(&self) -> NetworkMessageRef {
212 NetworkMessageRef {
213 body: self.body(),
214 reliability: self.reliability(),
215 #[cfg(feature = "stats")]
216 size: self.size(),
217 }
218 }
219}
220
221impl NetworkMessageExt for NetworkMessage {
222 fn body(&self) -> NetworkBodyRef {
223 match &self.body {
224 NetworkBody::Push(body) => NetworkBodyRef::Push(body),
225 NetworkBody::Request(body) => NetworkBodyRef::Request(body),
226 NetworkBody::Response(body) => NetworkBodyRef::Response(body),
227 NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
228 NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
229 NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
230 NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
231 }
232 }
233
234 fn reliability(&self) -> Reliability {
235 self.reliability
236 }
237
238 #[cfg(feature = "stats")]
239 fn size(&self) -> Option<core::num::NonZeroUsize> {
240 self.size
241 }
242}
243
244impl NetworkMessageExt for NetworkMessageRef<'_> {
245 fn body(&self) -> NetworkBodyRef {
246 self.body
247 }
248
249 fn reliability(&self) -> Reliability {
250 self.reliability
251 }
252
253 #[cfg(feature = "stats")]
254 fn size(&self) -> Option<core::num::NonZeroUsize> {
255 self.size
256 }
257}
258
259impl NetworkMessageExt for NetworkMessageMut<'_> {
260 fn body(&self) -> NetworkBodyRef {
261 match &self.body {
262 NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
263 NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
264 NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
265 NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
266 NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
267 NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
268 NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
269 }
270 }
271
272 fn reliability(&self) -> Reliability {
273 self.reliability
274 }
275
276 #[cfg(feature = "stats")]
277 fn size(&self) -> Option<core::num::NonZeroUsize> {
278 self.size
279 }
280}
281
282impl NetworkMessage {
283 #[cfg(feature = "test")]
284 pub fn rand() -> Self {
285 use rand::Rng;
286
287 let mut rng = rand::thread_rng();
288
289 let body = match rng.gen_range(0..6) {
290 0 => NetworkBody::Push(Push::rand()),
291 1 => NetworkBody::Request(Request::rand()),
292 2 => NetworkBody::Response(Response::rand()),
293 3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
294 4 => NetworkBody::Declare(Declare::rand()),
295 5 => NetworkBody::OAM(Oam::rand()),
296 _ => unreachable!(),
297 };
298
299 body.into()
300 }
301
302 #[inline]
303 pub fn as_mut(&mut self) -> NetworkMessageMut {
304 let body = match &mut self.body {
305 NetworkBody::Push(body) => NetworkBodyMut::Push(body),
306 NetworkBody::Request(body) => NetworkBodyMut::Request(body),
307 NetworkBody::Response(body) => NetworkBodyMut::Response(body),
308 NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
309 NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
310 NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
311 NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
312 };
313 NetworkMessageMut {
314 body,
315 reliability: self.reliability,
316 #[cfg(feature = "stats")]
317 size: self.size,
318 }
319 }
320}
321
322impl NetworkMessageMut<'_> {
323 #[inline]
324 pub fn as_mut(&mut self) -> NetworkMessageMut {
325 let body = match &mut self.body {
326 NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
327 NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
328 NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
329 NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
330 NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
331 NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
332 NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
333 };
334 NetworkMessageMut {
335 body,
336 reliability: self.reliability,
337 #[cfg(feature = "stats")]
338 size: self.size,
339 }
340 }
341}
342
343impl fmt::Display for NetworkMessageRef<'_> {
344 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
345 match &self.body {
346 NetworkBodyRef::OAM(_) => write!(f, "OAM"),
347 NetworkBodyRef::Push(_) => write!(f, "Push"),
348 NetworkBodyRef::Request(_) => write!(f, "Request"),
349 NetworkBodyRef::Response(_) => write!(f, "Response"),
350 NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
351 NetworkBodyRef::Interest(_) => write!(f, "Interest"),
352 NetworkBodyRef::Declare(_) => write!(f, "Declare"),
353 }
354 }
355}
356
357impl fmt::Display for NetworkMessage {
358 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
359 self.as_ref().fmt(f)
360 }
361}
362
363impl fmt::Display for NetworkMessageMut<'_> {
364 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
365 self.as_ref().fmt(f)
366 }
367}
368
369impl From<NetworkBody> for NetworkMessage {
370 #[inline]
371 fn from(body: NetworkBody) -> Self {
372 Self {
373 body,
374 reliability: Reliability::DEFAULT,
375 #[cfg(feature = "stats")]
376 size: None,
377 }
378 }
379}
380
381impl From<Declare> for NetworkMessage {
382 fn from(declare: Declare) -> Self {
383 NetworkBody::Declare(declare).into()
384 }
385}
386
387impl From<Push> for NetworkMessage {
388 fn from(push: Push) -> Self {
389 NetworkBody::Push(push).into()
390 }
391}
392
393impl From<Request> for NetworkMessage {
394 fn from(request: Request) -> Self {
395 NetworkBody::Request(request).into()
396 }
397}
398
399impl From<Response> for NetworkMessage {
400 fn from(response: Response) -> Self {
401 NetworkBody::Response(response).into()
402 }
403}
404
405impl From<ResponseFinal> for NetworkMessage {
406 fn from(final_response: ResponseFinal) -> Self {
407 NetworkBody::ResponseFinal(final_response).into()
408 }
409}
410
411pub mod ext {
413 use core::fmt;
414
415 use crate::{
416 common::{imsg, ZExtZ64},
417 core::{CongestionControl, EntityId, Priority, ZenohIdProto},
418 };
419
420 #[repr(transparent)]
434 #[derive(Clone, Copy, PartialEq, Eq)]
435 pub struct QoSType<const ID: u8> {
436 inner: u8,
437 }
438
439 impl<const ID: u8> QoSType<{ ID }> {
440 const P_MASK: u8 = 0b00000111;
441 const D_FLAG: u8 = 0b00001000;
442 const E_FLAG: u8 = 0b00010000;
443
444 pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
445
446 pub const DECLARE: Self =
447 Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
448 pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
449 pub const REQUEST: Self =
450 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
451 pub const RESPONSE: Self = Self::new(
452 Priority::DEFAULT,
453 CongestionControl::DEFAULT_RESPONSE,
454 false,
455 );
456 pub const RESPONSE_FINAL: Self = Self::new(
457 Priority::DEFAULT,
458 CongestionControl::DEFAULT_RESPONSE,
459 false,
460 );
461 pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
462
463 pub const fn new(
464 priority: Priority,
465 congestion_control: CongestionControl,
466 is_express: bool,
467 ) -> Self {
468 let mut inner = priority as u8;
469 if let CongestionControl::Block = congestion_control {
470 inner |= Self::D_FLAG;
471 }
472 if is_express {
473 inner |= Self::E_FLAG;
474 }
475 Self { inner }
476 }
477
478 pub fn set_priority(&mut self, priority: Priority) {
479 self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
480 }
481
482 pub const fn get_priority(&self) -> Priority {
483 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
484 }
485
486 pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
487 match cctrl {
488 CongestionControl::Block => self.inner = imsg::set_flag(self.inner, Self::D_FLAG),
489 CongestionControl::Drop => self.inner = imsg::unset_flag(self.inner, Self::D_FLAG),
490 }
491 }
492
493 pub const fn get_congestion_control(&self) -> CongestionControl {
494 match imsg::has_flag(self.inner, Self::D_FLAG) {
495 true => CongestionControl::Block,
496 false => CongestionControl::Drop,
497 }
498 }
499
500 pub fn set_is_express(&mut self, is_express: bool) {
501 match is_express {
502 true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
503 false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
504 }
505 }
506
507 pub const fn is_express(&self) -> bool {
508 imsg::has_flag(self.inner, Self::E_FLAG)
509 }
510
511 #[cfg(feature = "test")]
512 pub fn rand() -> Self {
513 use rand::Rng;
514 let mut rng = rand::thread_rng();
515
516 let inner: u8 = rng.gen();
517 Self { inner }
518 }
519 }
520
521 impl<const ID: u8> Default for QoSType<{ ID }> {
522 fn default() -> Self {
523 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
524 }
525 }
526
527 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
528 fn from(ext: ZExtZ64<{ ID }>) -> Self {
529 Self {
530 inner: ext.value as u8,
531 }
532 }
533 }
534
535 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
536 fn from(ext: QoSType<{ ID }>) -> Self {
537 ZExtZ64::new(ext.inner as u64)
538 }
539 }
540
541 impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
542 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
543 f.debug_struct("QoS")
544 .field("priority", &self.get_priority())
545 .field("congestion", &self.get_congestion_control())
546 .field("express", &self.is_express())
547 .finish()
548 }
549 }
550
551 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
560 pub struct TimestampType<const ID: u8> {
561 pub timestamp: uhlc::Timestamp,
562 }
563
564 impl<const ID: u8> TimestampType<{ ID }> {
565 #[cfg(feature = "test")]
566 pub fn rand() -> Self {
567 use rand::Rng;
568 let mut rng = rand::thread_rng();
569
570 let time = uhlc::NTP64(rng.gen());
571 let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
572 let timestamp = uhlc::Timestamp::new(time, id);
573 Self { timestamp }
574 }
575 }
576
577 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
586 pub struct NodeIdType<const ID: u8> {
587 pub node_id: u16,
588 }
589
590 impl<const ID: u8> NodeIdType<{ ID }> {
591 pub const DEFAULT: Self = Self { node_id: 0 };
593
594 #[cfg(feature = "test")]
595 pub fn rand() -> Self {
596 use rand::Rng;
597 let mut rng = rand::thread_rng();
598 let node_id = rng.gen();
599 Self { node_id }
600 }
601 }
602
603 impl<const ID: u8> Default for NodeIdType<{ ID }> {
604 fn default() -> Self {
605 Self::DEFAULT
606 }
607 }
608
609 impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
610 fn from(ext: ZExtZ64<{ ID }>) -> Self {
611 Self {
612 node_id: ext.value as u16,
613 }
614 }
615 }
616
617 impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
618 fn from(ext: NodeIdType<{ ID }>) -> Self {
619 ZExtZ64::new(ext.node_id as u64)
620 }
621 }
622
623 #[derive(Debug, Clone, PartialEq, Eq)]
634 pub struct EntityGlobalIdType<const ID: u8> {
635 pub zid: ZenohIdProto,
636 pub eid: EntityId,
637 }
638
639 impl<const ID: u8> EntityGlobalIdType<{ ID }> {
640 #[cfg(feature = "test")]
641 pub fn rand() -> Self {
642 use rand::Rng;
643 let mut rng = rand::thread_rng();
644
645 let zid = ZenohIdProto::rand();
646 let eid: EntityId = rng.gen();
647 Self { zid, eid }
648 }
649 }
650}