1pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24 Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25 DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36 pub const OAM: u8 = 0x1f;
39 pub const DECLARE: u8 = 0x1e;
40 pub const PUSH: u8 = 0x1d;
41 pub const REQUEST: u8 = 0x1c;
42 pub const RESPONSE: u8 = 0x1b;
43 pub const RESPONSE_FINAL: u8 = 0x1a;
44 pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50 #[default]
51 Receiver = 0,
52 Sender = 1,
53}
54
55impl Mapping {
56 pub const DEFAULT: Self = Self::Receiver;
57
58 #[cfg(feature = "test")]
59 pub fn rand() -> Self {
60 use rand::Rng;
61
62 let mut rng = rand::thread_rng();
63 if rng.gen_bool(0.5) {
64 Mapping::Sender
65 } else {
66 Mapping::Receiver
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74 Push(Push),
75 Request(Request),
76 Response(Response),
77 ResponseFinal(ResponseFinal),
78 Interest(Interest),
79 Declare(Declare),
80 OAM(Oam),
81}
82
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub enum NetworkBodyRef<'a> {
85 Push(&'a Push),
86 Request(&'a Request),
87 Response(&'a Response),
88 ResponseFinal(&'a ResponseFinal),
89 Interest(&'a Interest),
90 Declare(&'a Declare),
91 OAM(&'a Oam),
92}
93
94#[derive(Debug, PartialEq, Eq)]
95pub enum NetworkBodyMut<'a> {
96 Push(&'a mut Push),
97 Request(&'a mut Request),
98 Response(&'a mut Response),
99 ResponseFinal(&'a mut ResponseFinal),
100 Interest(&'a mut Interest),
101 Declare(&'a mut Declare),
102 OAM(&'a mut Oam),
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct NetworkMessage {
107 pub body: NetworkBody,
108 pub reliability: Reliability,
109}
110
111#[derive(Debug, Copy, Clone, PartialEq, Eq)]
112pub struct NetworkMessageRef<'a> {
113 pub body: NetworkBodyRef<'a>,
114 pub reliability: Reliability,
115}
116
117#[derive(Debug, PartialEq, Eq)]
118pub struct NetworkMessageMut<'a> {
119 pub body: NetworkBodyMut<'a>,
120 pub reliability: Reliability,
121}
122
123pub trait NetworkMessageExt {
124 #[doc(hidden)]
125 fn body(&self) -> NetworkBodyRef;
126 #[doc(hidden)]
127 fn reliability(&self) -> Reliability;
128
129 #[inline]
130 fn is_reliable(&self) -> bool {
131 self.reliability() == Reliability::Reliable
132 }
133
134 #[inline]
135 fn is_express(&self) -> bool {
136 match self.body() {
137 NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
138 NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
139 NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
140 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
141 NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
142 NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
143 NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
144 }
145 }
146
147 #[inline]
148 fn is_droppable(&self) -> bool {
149 if !self.is_reliable() {
150 return true;
151 }
152
153 let cc = match self.body() {
154 NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
155 NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
156 NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
157 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
158 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
159 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
160 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
161 };
162
163 cc == CongestionControl::Drop
164 }
165
166 #[inline]
167 fn priority(&self) -> Priority {
168 match self.body() {
169 NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
170 NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
171 NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
172 NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
173 NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
174 NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
175 NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
176 }
177 }
178
179 #[inline]
180 fn wire_expr(&self) -> Option<&WireExpr> {
181 match &self.body() {
182 NetworkBodyRef::Push(m) => Some(&m.wire_expr),
183 NetworkBodyRef::Request(m) => Some(&m.wire_expr),
184 NetworkBodyRef::Response(m) => Some(&m.wire_expr),
185 NetworkBodyRef::ResponseFinal(_) => None,
186 NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
187 NetworkBodyRef::Declare(m) => match &m.body {
188 DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
189 DeclareBody::UndeclareKeyExpr(_) => None,
190 DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
191 DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
192 DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
193 DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
194 DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
195 DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
196 DeclareBody::DeclareFinal(_) => None,
197 },
198 NetworkBodyRef::OAM(_) => None,
199 }
200 }
201
202 #[inline]
203 fn as_ref(&self) -> NetworkMessageRef {
204 NetworkMessageRef {
205 body: self.body(),
206 reliability: self.reliability(),
207 }
208 }
209}
210
211impl NetworkMessageExt for NetworkMessage {
212 fn body(&self) -> NetworkBodyRef {
213 match &self.body {
214 NetworkBody::Push(body) => NetworkBodyRef::Push(body),
215 NetworkBody::Request(body) => NetworkBodyRef::Request(body),
216 NetworkBody::Response(body) => NetworkBodyRef::Response(body),
217 NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
218 NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
219 NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
220 NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
221 }
222 }
223
224 fn reliability(&self) -> Reliability {
225 self.reliability
226 }
227}
228
229impl NetworkMessageExt for NetworkMessageRef<'_> {
230 fn body(&self) -> NetworkBodyRef {
231 self.body
232 }
233
234 fn reliability(&self) -> Reliability {
235 self.reliability
236 }
237}
238
239impl NetworkMessageExt for NetworkMessageMut<'_> {
240 fn body(&self) -> NetworkBodyRef {
241 match &self.body {
242 NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
243 NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
244 NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
245 NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
246 NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
247 NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
248 NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
249 }
250 }
251
252 fn reliability(&self) -> Reliability {
253 self.reliability
254 }
255}
256
257impl NetworkMessage {
258 #[cfg(feature = "test")]
259 pub fn rand() -> Self {
260 use rand::Rng;
261
262 let mut rng = rand::thread_rng();
263
264 let body = match rng.gen_range(0..6) {
265 0 => NetworkBody::Push(Push::rand()),
266 1 => NetworkBody::Request(Request::rand()),
267 2 => NetworkBody::Response(Response::rand()),
268 3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
269 4 => NetworkBody::Declare(Declare::rand()),
270 5 => NetworkBody::OAM(Oam::rand()),
271 _ => unreachable!(),
272 };
273
274 body.into()
275 }
276
277 #[inline]
278 pub fn as_mut(&mut self) -> NetworkMessageMut {
279 let body = match &mut self.body {
280 NetworkBody::Push(body) => NetworkBodyMut::Push(body),
281 NetworkBody::Request(body) => NetworkBodyMut::Request(body),
282 NetworkBody::Response(body) => NetworkBodyMut::Response(body),
283 NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
284 NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
285 NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
286 NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
287 };
288 NetworkMessageMut {
289 body,
290 reliability: self.reliability,
291 }
292 }
293}
294
295impl NetworkMessageMut<'_> {
296 #[inline]
297 pub fn as_mut(&mut self) -> NetworkMessageMut {
298 let body = match &mut self.body {
299 NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
300 NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
301 NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
302 NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
303 NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
304 NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
305 NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
306 };
307 NetworkMessageMut {
308 body,
309 reliability: self.reliability,
310 }
311 }
312}
313
314impl fmt::Display for NetworkMessageRef<'_> {
315 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
316 match &self.body {
317 NetworkBodyRef::OAM(_) => write!(f, "OAM"),
318 NetworkBodyRef::Push(_) => write!(f, "Push"),
319 NetworkBodyRef::Request(_) => write!(f, "Request"),
320 NetworkBodyRef::Response(_) => write!(f, "Response"),
321 NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
322 NetworkBodyRef::Interest(_) => write!(f, "Interest"),
323 NetworkBodyRef::Declare(_) => write!(f, "Declare"),
324 }
325 }
326}
327
328impl fmt::Display for NetworkMessage {
329 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
330 self.as_ref().fmt(f)
331 }
332}
333
334impl fmt::Display for NetworkMessageMut<'_> {
335 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
336 self.as_ref().fmt(f)
337 }
338}
339
340impl From<NetworkBody> for NetworkMessage {
341 #[inline]
342 fn from(body: NetworkBody) -> Self {
343 Self {
344 body,
345 reliability: Reliability::DEFAULT,
346 }
347 }
348}
349
350impl From<Declare> for NetworkMessage {
351 fn from(declare: Declare) -> Self {
352 NetworkBody::Declare(declare).into()
353 }
354}
355
356impl From<Push> for NetworkMessage {
357 fn from(push: Push) -> Self {
358 NetworkBody::Push(push).into()
359 }
360}
361
362impl From<Request> for NetworkMessage {
363 fn from(request: Request) -> Self {
364 NetworkBody::Request(request).into()
365 }
366}
367
368impl From<Response> for NetworkMessage {
369 fn from(response: Response) -> Self {
370 NetworkBody::Response(response).into()
371 }
372}
373
374impl From<ResponseFinal> for NetworkMessage {
375 fn from(final_response: ResponseFinal) -> Self {
376 NetworkBody::ResponseFinal(final_response).into()
377 }
378}
379
380pub mod ext {
382 use core::fmt;
383
384 use crate::{
385 common::{imsg, ZExtZ64},
386 core::{CongestionControl, EntityId, Priority, ZenohIdProto},
387 };
388
389 #[repr(transparent)]
403 #[derive(Clone, Copy, PartialEq, Eq)]
404 pub struct QoSType<const ID: u8> {
405 inner: u8,
406 }
407
408 impl<const ID: u8> QoSType<{ ID }> {
409 const P_MASK: u8 = 0b00000111;
410 const D_FLAG: u8 = 0b00001000;
411 const E_FLAG: u8 = 0b00010000;
412
413 pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
414
415 pub const DECLARE: Self =
416 Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
417 pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
418 pub const REQUEST: Self =
419 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
420 pub const RESPONSE: Self = Self::new(
421 Priority::DEFAULT,
422 CongestionControl::DEFAULT_RESPONSE,
423 false,
424 );
425 pub const RESPONSE_FINAL: Self = Self::new(
426 Priority::DEFAULT,
427 CongestionControl::DEFAULT_RESPONSE,
428 false,
429 );
430 pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
431
432 pub const fn new(
433 priority: Priority,
434 congestion_control: CongestionControl,
435 is_express: bool,
436 ) -> Self {
437 let mut inner = priority as u8;
438 if let CongestionControl::Block = congestion_control {
439 inner |= Self::D_FLAG;
440 }
441 if is_express {
442 inner |= Self::E_FLAG;
443 }
444 Self { inner }
445 }
446
447 pub fn set_priority(&mut self, priority: Priority) {
448 self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
449 }
450
451 pub const fn get_priority(&self) -> Priority {
452 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
453 }
454
455 pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
456 match cctrl {
457 CongestionControl::Block => self.inner = imsg::set_flag(self.inner, Self::D_FLAG),
458 CongestionControl::Drop => self.inner = imsg::unset_flag(self.inner, Self::D_FLAG),
459 }
460 }
461
462 pub const fn get_congestion_control(&self) -> CongestionControl {
463 match imsg::has_flag(self.inner, Self::D_FLAG) {
464 true => CongestionControl::Block,
465 false => CongestionControl::Drop,
466 }
467 }
468
469 pub fn set_is_express(&mut self, is_express: bool) {
470 match is_express {
471 true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
472 false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
473 }
474 }
475
476 pub const fn is_express(&self) -> bool {
477 imsg::has_flag(self.inner, Self::E_FLAG)
478 }
479
480 #[cfg(feature = "test")]
481 pub fn rand() -> Self {
482 use rand::Rng;
483 let mut rng = rand::thread_rng();
484
485 let inner: u8 = rng.gen();
486 Self { inner }
487 }
488 }
489
490 impl<const ID: u8> Default for QoSType<{ ID }> {
491 fn default() -> Self {
492 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
493 }
494 }
495
496 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
497 fn from(ext: ZExtZ64<{ ID }>) -> Self {
498 Self {
499 inner: ext.value as u8,
500 }
501 }
502 }
503
504 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
505 fn from(ext: QoSType<{ ID }>) -> Self {
506 ZExtZ64::new(ext.inner as u64)
507 }
508 }
509
510 impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
511 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
512 f.debug_struct("QoS")
513 .field("priority", &self.get_priority())
514 .field("congestion", &self.get_congestion_control())
515 .field("express", &self.is_express())
516 .finish()
517 }
518 }
519
520 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
529 pub struct TimestampType<const ID: u8> {
530 pub timestamp: uhlc::Timestamp,
531 }
532
533 impl<const ID: u8> TimestampType<{ ID }> {
534 #[cfg(feature = "test")]
535 pub fn rand() -> Self {
536 use rand::Rng;
537 let mut rng = rand::thread_rng();
538
539 let time = uhlc::NTP64(rng.gen());
540 let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
541 let timestamp = uhlc::Timestamp::new(time, id);
542 Self { timestamp }
543 }
544 }
545
546 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
555 pub struct NodeIdType<const ID: u8> {
556 pub node_id: u16,
557 }
558
559 impl<const ID: u8> NodeIdType<{ ID }> {
560 pub const DEFAULT: Self = Self { node_id: 0 };
562
563 #[cfg(feature = "test")]
564 pub fn rand() -> Self {
565 use rand::Rng;
566 let mut rng = rand::thread_rng();
567 let node_id = rng.gen();
568 Self { node_id }
569 }
570 }
571
572 impl<const ID: u8> Default for NodeIdType<{ ID }> {
573 fn default() -> Self {
574 Self::DEFAULT
575 }
576 }
577
578 impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
579 fn from(ext: ZExtZ64<{ ID }>) -> Self {
580 Self {
581 node_id: ext.value as u16,
582 }
583 }
584 }
585
586 impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
587 fn from(ext: NodeIdType<{ ID }>) -> Self {
588 ZExtZ64::new(ext.node_id as u64)
589 }
590 }
591
592 #[derive(Debug, Clone, PartialEq, Eq)]
603 pub struct EntityGlobalIdType<const ID: u8> {
604 pub zid: ZenohIdProto,
605 pub eid: EntityId,
606 }
607
608 impl<const ID: u8> EntityGlobalIdType<{ ID }> {
609 #[cfg(feature = "test")]
610 pub fn rand() -> Self {
611 use rand::Rng;
612 let mut rng = rand::thread_rng();
613
614 let zid = ZenohIdProto::rand();
615 let eid: EntityId = rng.gen();
616 Self { zid, eid }
617 }
618 }
619}