1pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24 Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25 DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability};
34
35pub mod id {
36 pub const OAM: u8 = 0x1f;
39 pub const DECLARE: u8 = 0x1e;
40 pub const PUSH: u8 = 0x1d;
41 pub const REQUEST: u8 = 0x1c;
42 pub const RESPONSE: u8 = 0x1b;
43 pub const RESPONSE_FINAL: u8 = 0x1a;
44 pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50 #[default]
51 Receiver = 0,
52 Sender = 1,
53}
54
55impl Mapping {
56 pub const DEFAULT: Self = Self::Receiver;
57
58 #[cfg(feature = "test")]
59 pub fn rand() -> Self {
60 use rand::Rng;
61
62 let mut rng = rand::thread_rng();
63 if rng.gen_bool(0.5) {
64 Mapping::Sender
65 } else {
66 Mapping::Receiver
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74 Push(Push),
75 Request(Request),
76 Response(Response),
77 ResponseFinal(ResponseFinal),
78 Interest(Interest),
79 Declare(Declare),
80 OAM(Oam),
81}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct NetworkMessage {
85 pub body: NetworkBody,
86 pub reliability: Reliability,
87 #[cfg(feature = "stats")]
88 pub size: Option<core::num::NonZeroUsize>,
89}
90
91impl NetworkMessage {
92 #[cfg(feature = "test")]
93 pub fn rand() -> Self {
94 use rand::Rng;
95
96 let mut rng = rand::thread_rng();
97
98 let body = match rng.gen_range(0..6) {
99 0 => NetworkBody::Push(Push::rand()),
100 1 => NetworkBody::Request(Request::rand()),
101 2 => NetworkBody::Response(Response::rand()),
102 3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
103 4 => NetworkBody::Declare(Declare::rand()),
104 5 => NetworkBody::OAM(Oam::rand()),
105 _ => unreachable!(),
106 };
107
108 body.into()
109 }
110
111 #[inline]
112 pub fn is_reliable(&self) -> bool {
113 self.reliability == Reliability::Reliable
114 }
115
116 #[inline]
117 pub fn is_express(&self) -> bool {
118 match &self.body {
119 NetworkBody::Push(msg) => msg.ext_qos.is_express(),
120 NetworkBody::Request(msg) => msg.ext_qos.is_express(),
121 NetworkBody::Response(msg) => msg.ext_qos.is_express(),
122 NetworkBody::ResponseFinal(msg) => msg.ext_qos.is_express(),
123 NetworkBody::Interest(msg) => msg.ext_qos.is_express(),
124 NetworkBody::Declare(msg) => msg.ext_qos.is_express(),
125 NetworkBody::OAM(msg) => msg.ext_qos.is_express(),
126 }
127 }
128
129 #[inline]
130 pub fn is_droppable(&self) -> bool {
131 if !self.is_reliable() {
132 return true;
133 }
134
135 let cc = match &self.body {
136 NetworkBody::Push(msg) => msg.ext_qos.get_congestion_control(),
137 NetworkBody::Request(msg) => msg.ext_qos.get_congestion_control(),
138 NetworkBody::Response(msg) => msg.ext_qos.get_congestion_control(),
139 NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
140 NetworkBody::Interest(msg) => msg.ext_qos.get_congestion_control(),
141 NetworkBody::Declare(msg) => msg.ext_qos.get_congestion_control(),
142 NetworkBody::OAM(msg) => msg.ext_qos.get_congestion_control(),
143 };
144
145 cc == CongestionControl::Drop
146 }
147
148 #[inline]
149 pub fn priority(&self) -> Priority {
150 match &self.body {
151 NetworkBody::Push(msg) => msg.ext_qos.get_priority(),
152 NetworkBody::Request(msg) => msg.ext_qos.get_priority(),
153 NetworkBody::Response(msg) => msg.ext_qos.get_priority(),
154 NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_priority(),
155 NetworkBody::Interest(msg) => msg.ext_qos.get_priority(),
156 NetworkBody::Declare(msg) => msg.ext_qos.get_priority(),
157 NetworkBody::OAM(msg) => msg.ext_qos.get_priority(),
158 }
159 }
160}
161
162impl fmt::Display for NetworkMessage {
163 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164 use NetworkBody::*;
165 match &self.body {
166 OAM(_) => write!(f, "OAM"),
167 Push(_) => write!(f, "Push"),
168 Request(_) => write!(f, "Request"),
169 Response(_) => write!(f, "Response"),
170 ResponseFinal(_) => write!(f, "ResponseFinal"),
171 Interest(_) => write!(f, "Interest"),
172 Declare(_) => write!(f, "Declare"),
173 }
174 }
175}
176
177impl From<NetworkBody> for NetworkMessage {
178 #[inline]
179 fn from(body: NetworkBody) -> Self {
180 Self {
181 body,
182 reliability: Reliability::DEFAULT,
183 #[cfg(feature = "stats")]
184 size: None,
185 }
186 }
187}
188
189impl From<Declare> for NetworkMessage {
190 fn from(declare: Declare) -> Self {
191 NetworkBody::Declare(declare).into()
192 }
193}
194
195impl From<Push> for NetworkMessage {
196 fn from(push: Push) -> Self {
197 NetworkBody::Push(push).into()
198 }
199}
200
201impl From<Request> for NetworkMessage {
202 fn from(request: Request) -> Self {
203 NetworkBody::Request(request).into()
204 }
205}
206
207impl From<Response> for NetworkMessage {
208 fn from(response: Response) -> Self {
209 NetworkBody::Response(response).into()
210 }
211}
212
213impl From<ResponseFinal> for NetworkMessage {
214 fn from(final_response: ResponseFinal) -> Self {
215 NetworkBody::ResponseFinal(final_response).into()
216 }
217}
218
219pub mod ext {
221 use core::fmt;
222
223 use crate::{
224 common::{imsg, ZExtZ64},
225 core::{CongestionControl, EntityId, Priority, ZenohIdProto},
226 };
227
228 #[repr(transparent)]
242 #[derive(Clone, Copy, PartialEq, Eq)]
243 pub struct QoSType<const ID: u8> {
244 inner: u8,
245 }
246
247 impl<const ID: u8> QoSType<{ ID }> {
248 const P_MASK: u8 = 0b00000111;
249 const D_FLAG: u8 = 0b00001000;
250 const E_FLAG: u8 = 0b00010000;
251
252 pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
253
254 pub const DECLARE: Self =
255 Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
256 pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
257 pub const REQUEST: Self =
258 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
259 pub const RESPONSE: Self = Self::new(
260 Priority::DEFAULT,
261 CongestionControl::DEFAULT_RESPONSE,
262 false,
263 );
264 pub const RESPONSE_FINAL: Self = Self::new(
265 Priority::DEFAULT,
266 CongestionControl::DEFAULT_RESPONSE,
267 false,
268 );
269 pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
270
271 pub const fn new(
272 priority: Priority,
273 congestion_control: CongestionControl,
274 is_express: bool,
275 ) -> Self {
276 let mut inner = priority as u8;
277 if let CongestionControl::Block = congestion_control {
278 inner |= Self::D_FLAG;
279 }
280 if is_express {
281 inner |= Self::E_FLAG;
282 }
283 Self { inner }
284 }
285
286 pub fn set_priority(&mut self, priority: Priority) {
287 self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
288 }
289
290 pub const fn get_priority(&self) -> Priority {
291 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
292 }
293
294 pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
295 match cctrl {
296 CongestionControl::Block => self.inner = imsg::set_flag(self.inner, Self::D_FLAG),
297 CongestionControl::Drop => self.inner = imsg::unset_flag(self.inner, Self::D_FLAG),
298 }
299 }
300
301 pub const fn get_congestion_control(&self) -> CongestionControl {
302 match imsg::has_flag(self.inner, Self::D_FLAG) {
303 true => CongestionControl::Block,
304 false => CongestionControl::Drop,
305 }
306 }
307
308 pub fn set_is_express(&mut self, is_express: bool) {
309 match is_express {
310 true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
311 false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
312 }
313 }
314
315 pub const fn is_express(&self) -> bool {
316 imsg::has_flag(self.inner, Self::E_FLAG)
317 }
318
319 #[cfg(feature = "test")]
320 pub fn rand() -> Self {
321 use rand::Rng;
322 let mut rng = rand::thread_rng();
323
324 let inner: u8 = rng.gen();
325 Self { inner }
326 }
327 }
328
329 impl<const ID: u8> Default for QoSType<{ ID }> {
330 fn default() -> Self {
331 Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
332 }
333 }
334
335 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
336 fn from(ext: ZExtZ64<{ ID }>) -> Self {
337 Self {
338 inner: ext.value as u8,
339 }
340 }
341 }
342
343 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
344 fn from(ext: QoSType<{ ID }>) -> Self {
345 ZExtZ64::new(ext.inner as u64)
346 }
347 }
348
349 impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
350 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
351 f.debug_struct("QoS")
352 .field("priority", &self.get_priority())
353 .field("congestion", &self.get_congestion_control())
354 .field("express", &self.is_express())
355 .finish()
356 }
357 }
358
359 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
368 pub struct TimestampType<const ID: u8> {
369 pub timestamp: uhlc::Timestamp,
370 }
371
372 impl<const ID: u8> TimestampType<{ ID }> {
373 #[cfg(feature = "test")]
374 pub fn rand() -> Self {
375 use rand::Rng;
376 let mut rng = rand::thread_rng();
377
378 let time = uhlc::NTP64(rng.gen());
379 let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
380 let timestamp = uhlc::Timestamp::new(time, id);
381 Self { timestamp }
382 }
383 }
384
385 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
394 pub struct NodeIdType<const ID: u8> {
395 pub node_id: u16,
396 }
397
398 impl<const ID: u8> NodeIdType<{ ID }> {
399 pub const DEFAULT: Self = Self { node_id: 0 };
401
402 #[cfg(feature = "test")]
403 pub fn rand() -> Self {
404 use rand::Rng;
405 let mut rng = rand::thread_rng();
406 let node_id = rng.gen();
407 Self { node_id }
408 }
409 }
410
411 impl<const ID: u8> Default for NodeIdType<{ ID }> {
412 fn default() -> Self {
413 Self::DEFAULT
414 }
415 }
416
417 impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
418 fn from(ext: ZExtZ64<{ ID }>) -> Self {
419 Self {
420 node_id: ext.value as u16,
421 }
422 }
423 }
424
425 impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
426 fn from(ext: NodeIdType<{ ID }>) -> Self {
427 ZExtZ64::new(ext.node_id as u64)
428 }
429 }
430
431 #[derive(Debug, Clone, PartialEq, Eq)]
442 pub struct EntityGlobalIdType<const ID: u8> {
443 pub zid: ZenohIdProto,
444 pub eid: EntityId,
445 }
446
447 impl<const ID: u8> EntityGlobalIdType<{ ID }> {
448 #[cfg(feature = "test")]
449 pub fn rand() -> Self {
450 use rand::Rng;
451 let mut rng = rand::thread_rng();
452
453 let zid = ZenohIdProto::rand();
454 let eid: EntityId = rng.gen();
455 Self { zid, eid }
456 }
457 }
458}