Skip to main content

zenoh_protocol/network/
interest.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::{
15    fmt::{self, Debug, Display},
16    hash::{Hash, Hasher},
17    ops::{Add, AddAssign, Sub, SubAssign},
18    sync::atomic::AtomicU32,
19};
20
21use crate::{common::imsg, core::WireExpr, network::Mapping};
22
23pub type InterestId = u32;
24
25pub mod flag {
26    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
27}
28
29/// The INTEREST message is sent to request the transmission of current and optionally future
30/// declarations of a given kind matching a target keyexpr. E.g., an interest could be
31/// sent to request the transmission of all current subscriptions matching `a/*`.
32///
33/// The behaviour of a INTEREST depends on the INTEREST MODE.
34///
35/// E.g., the message flow is the following for an [`Interest`] with mode [`InterestMode::Current`]:
36///
37/// ```text
38///     A                   B
39///     |     INTEREST      |
40///     |------------------>| -- Mode: Current
41///     |                   |    This is an Interest e.g. for subscriber declarations.
42///     |                   |
43///     |  DECL SUBSCRIBER  |
44///     |<------------------| -- With interest_id field set
45///     |  DECL SUBSCRIBER  |
46///     |<------------------| -- With interest_id field set
47///     |  DECL SUBSCRIBER  |
48///     |<------------------| -- With interest_id field set
49///     |                   |
50///     |     DECL FINAL    |
51///     |<------------------| -- With interest_id field set
52///     |                   |
53/// ```
54///
55/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::CurrentFuture`]:
56///
57/// ```text
58///     A                   B
59///     |     INTEREST      |
60///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
61///     |                   |
62///     |  DECL SUBSCRIBER  |
63///     |<------------------| -- With interest_id field set
64///     |  DECL SUBSCRIBER  |
65///     |<------------------| -- With interest_id field set
66///     |  DECL SUBSCRIBER  |
67///     |<------------------| -- With interest_id field set
68///     |                   |
69///     |     DECL FINAL    |
70///     |<------------------| -- With interest_id field set
71///     |                   |
72///     |  DECL SUBSCRIBER  |
73///     |<------------------| -- With interest_id field not set
74///     | UNDECL SUBSCRIBER |
75///     |<------------------| -- With interest_id field not set
76///     |                   |
77///     |        ...        |
78///     |                   |
79///     | INTEREST FINAL    |
80///     |------------------>| -- Mode: Final
81///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
82///     |                   |
83/// ```
84///
85/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]:
86///
87/// ```text
88///     A                   B
89///     |     INTEREST      |
90///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
91///     |                   |
92///     |  DECL SUBSCRIBER  |
93///     |<------------------| -- With interest_id field not set
94///     | UNDECL SUBSCRIBER |
95///     |<------------------| -- With interest_id field not set
96///     |                   |
97///     |        ...        |
98///     |                   |
99///     | INTEREST FINAL    |
100///     |------------------>| -- Mode: Final
101///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
102///     |                   |
103/// ```
104///
105/// ```text
106/// Flags:
107/// - |: Mode           The mode of the interest*
108/// -/
109/// - Z: Extension      If Z==1 then at least one extension is present
110///
111/// 7 6 5 4 3 2 1 0
112/// +-+-+-+-+-+-+-+-+
113/// |Z|Mod|INTEREST |
114/// +-+-+-+---------+
115/// ~    id:z32     ~
116/// +---------------+
117/// |A|M|N|R|T|Q|S|K|  if Mod!=Final (*)
118/// +---------------+
119/// ~ key_scope:z16 ~  if Mod!=Final && R==1
120/// +---------------+
121/// ~  key_suffix   ~  if Mod!=Final && R==1 && N==1 -- <u8;z16>
122/// +---------------+
123/// ~  [int_exts]   ~  if Z==1
124/// +---------------+
125///
126/// Mode of declaration:
127/// - Mode 0b00: Final
128/// - Mode 0b01: Current
129/// - Mode 0b10: Future
130/// - Mode 0b11: CurrentFuture
131///
132/// (*) - if K==1 then the interest refers to key expressions
133///     - if S==1 then the interest refers to subscribers
134///     - if Q==1 then the interest refers to queryables
135///     - if T==1 then the interest refers to tokens
136///     - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions.
137///     - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0.
138///     - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver.
139///               If R==0 then M should be set to 0.
140///     - if A==1 then the replies SHOULD be aggregated
141/// ```
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct Interest {
144    pub id: InterestId,
145    pub mode: InterestMode,
146    pub options: InterestOptions,
147    pub wire_expr: Option<WireExpr<'static>>,
148    pub ext_qos: ext::QoSType,
149    pub ext_tstamp: Option<ext::TimestampType>,
150    pub ext_nodeid: ext::NodeIdType,
151}
152
153/// The resolution of a RequestId
154pub type DeclareRequestId = u32;
155pub type AtomicDeclareRequestId = AtomicU32;
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
158pub enum InterestMode {
159    Final,
160    Current,
161    Future,
162    CurrentFuture,
163}
164
165impl InterestMode {
166    #[cfg(feature = "test")]
167    #[doc(hidden)]
168    pub fn rand() -> Self {
169        use rand::Rng;
170
171        let mut rng = rand::thread_rng();
172
173        match rng.gen_range(0..4) {
174            0 => InterestMode::Final,
175            1 => InterestMode::Current,
176            2 => InterestMode::Future,
177            3 => InterestMode::CurrentFuture,
178            _ => unreachable!(),
179        }
180    }
181
182    pub fn is_future(&self) -> bool {
183        self == &InterestMode::Future || self == &InterestMode::CurrentFuture
184    }
185
186    pub fn is_current(&self) -> bool {
187        self == &InterestMode::Current || self == &InterestMode::CurrentFuture
188    }
189}
190
191pub mod ext {
192    use crate::{zextz64, zextzbuf};
193
194    pub type QoS = zextz64!(0x1, false);
195    pub type QoSType = crate::network::ext::QoSType<{ QoS::ID }>;
196
197    pub type Timestamp = zextzbuf!(0x2, false);
198    pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>;
199
200    pub type NodeId = zextz64!(0x3, true);
201    pub type NodeIdType = crate::network::ext::NodeIdType<{ NodeId::ID }>;
202}
203
204impl Interest {
205    pub fn options(&self) -> u8 {
206        let mut interest = self.options;
207        if let Some(we) = self.wire_expr.as_ref() {
208            interest += InterestOptions::RESTRICTED;
209            if we.has_suffix() {
210                interest += InterestOptions::NAMED;
211            } else {
212                interest -= InterestOptions::NAMED;
213            }
214            if let Mapping::Sender = we.mapping {
215                interest += InterestOptions::MAPPING;
216            } else {
217                interest -= InterestOptions::MAPPING;
218            }
219        } else {
220            interest -= InterestOptions::RESTRICTED;
221        }
222        interest.options
223    }
224
225    #[cfg(feature = "test")]
226    #[doc(hidden)]
227    pub fn rand() -> Self {
228        use rand::Rng;
229        let mut rng = rand::thread_rng();
230
231        let id = rng.gen::<InterestId>();
232        let mode = InterestMode::rand();
233        let options = if mode == InterestMode::Final {
234            InterestOptions::empty()
235        } else {
236            InterestOptions::rand()
237        };
238        let wire_expr = options.restricted().then_some(WireExpr::rand());
239        let ext_qos = ext::QoSType::rand();
240        let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand);
241        let ext_nodeid = ext::NodeIdType::rand();
242
243        Self {
244            id,
245            mode,
246            wire_expr,
247            options,
248            ext_qos,
249            ext_tstamp,
250            ext_nodeid,
251        }
252    }
253}
254
255#[repr(transparent)]
256#[derive(Clone, Copy)]
257pub struct InterestOptions {
258    options: u8,
259}
260
261impl InterestOptions {
262    // Flags
263    pub const KEYEXPRS: InterestOptions = InterestOptions::options(1);
264    pub const SUBSCRIBERS: InterestOptions = InterestOptions::options(1 << 1);
265    pub const QUERYABLES: InterestOptions = InterestOptions::options(1 << 2);
266    pub const TOKENS: InterestOptions = InterestOptions::options(1 << 3);
267    const RESTRICTED: InterestOptions = InterestOptions::options(1 << 4);
268    const NAMED: InterestOptions = InterestOptions::options(1 << 5);
269    const MAPPING: InterestOptions = InterestOptions::options(1 << 6);
270    pub const AGGREGATE: InterestOptions = InterestOptions::options(1 << 7);
271    pub const ALL: InterestOptions = InterestOptions::options(
272        InterestOptions::KEYEXPRS.options
273            | InterestOptions::SUBSCRIBERS.options
274            | InterestOptions::QUERYABLES.options
275            | InterestOptions::TOKENS.options,
276    );
277
278    const fn options(options: u8) -> Self {
279        Self { options }
280    }
281
282    pub const fn empty() -> Self {
283        Self { options: 0 }
284    }
285
286    pub const fn keyexprs(&self) -> bool {
287        imsg::has_flag(self.options, Self::KEYEXPRS.options)
288    }
289
290    pub const fn subscribers(&self) -> bool {
291        imsg::has_flag(self.options, Self::SUBSCRIBERS.options)
292    }
293
294    pub const fn queryables(&self) -> bool {
295        imsg::has_flag(self.options, Self::QUERYABLES.options)
296    }
297
298    pub const fn tokens(&self) -> bool {
299        imsg::has_flag(self.options, Self::TOKENS.options)
300    }
301
302    pub const fn restricted(&self) -> bool {
303        imsg::has_flag(self.options, Self::RESTRICTED.options)
304    }
305
306    pub const fn named(&self) -> bool {
307        imsg::has_flag(self.options, Self::NAMED.options)
308    }
309
310    pub const fn mapping(&self) -> bool {
311        imsg::has_flag(self.options, Self::MAPPING.options)
312    }
313
314    pub const fn aggregate(&self) -> bool {
315        imsg::has_flag(self.options, Self::AGGREGATE.options)
316    }
317
318    #[cfg(feature = "test")]
319    #[doc(hidden)]
320    pub fn rand() -> Self {
321        use rand::Rng;
322        let mut rng = rand::thread_rng();
323
324        let mut s = Self::empty();
325        if rng.gen_bool(0.5) {
326            s += InterestOptions::KEYEXPRS;
327        }
328        if rng.gen_bool(0.5) {
329            s += InterestOptions::SUBSCRIBERS;
330        }
331        if rng.gen_bool(0.5) {
332            s += InterestOptions::TOKENS;
333        }
334        if rng.gen_bool(0.5) {
335            s += InterestOptions::AGGREGATE;
336        }
337        s
338    }
339}
340
341impl PartialEq for InterestOptions {
342    fn eq(&self, other: &Self) -> bool {
343        self.keyexprs() == other.keyexprs()
344            && self.subscribers() == other.subscribers()
345            && self.queryables() == other.queryables()
346            && self.tokens() == other.tokens()
347            && self.aggregate() == other.aggregate()
348    }
349}
350
351impl Hash for InterestOptions {
352    fn hash<H: Hasher>(&self, state: &mut H) {
353        self.keyexprs().hash(state);
354        self.subscribers().hash(state);
355        self.queryables().hash(state);
356        self.tokens().hash(state);
357        self.aggregate().hash(state);
358    }
359}
360
361impl Debug for InterestOptions {
362    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
363        write!(f, "Interest {{ ")?;
364        if self.keyexprs() {
365            write!(f, "K:Y, ")?;
366        } else {
367            write!(f, "K:N, ")?;
368        }
369        if self.subscribers() {
370            write!(f, "S:Y, ")?;
371        } else {
372            write!(f, "S:N, ")?;
373        }
374        if self.queryables() {
375            write!(f, "Q:Y, ")?;
376        } else {
377            write!(f, "Q:N, ")?;
378        }
379        if self.tokens() {
380            write!(f, "T:Y, ")?;
381        } else {
382            write!(f, "T:N, ")?;
383        }
384        if self.aggregate() {
385            write!(f, "A:Y")?;
386        } else {
387            write!(f, "A:N")?;
388        }
389        write!(f, " }}")?;
390        Ok(())
391    }
392}
393
394impl Display for InterestOptions {
395    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396        let mut option_strs = alloc::vec::Vec::with_capacity(5);
397
398        if self.keyexprs() {
399            option_strs.push("K");
400        }
401
402        if self.subscribers() {
403            option_strs.push("S");
404        }
405
406        if self.queryables() {
407            option_strs.push("Q");
408        }
409
410        if self.tokens() {
411            option_strs.push("T");
412        }
413
414        if self.aggregate() {
415            option_strs.push("A");
416        }
417
418        write!(f, "{{{}}}", option_strs.join(" "))
419    }
420}
421
422impl Eq for InterestOptions {}
423
424impl Add for InterestOptions {
425    type Output = Self;
426
427    #[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest
428    fn add(self, rhs: Self) -> Self::Output {
429        Self {
430            options: self.options | rhs.options,
431        }
432    }
433}
434
435impl AddAssign for InterestOptions {
436    #[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest
437    fn add_assign(&mut self, rhs: Self) {
438        self.options |= rhs.options;
439    }
440}
441
442impl Sub for InterestOptions {
443    type Output = Self;
444
445    fn sub(self, rhs: Self) -> Self::Output {
446        Self {
447            options: self.options & !rhs.options,
448        }
449    }
450}
451
452impl SubAssign for InterestOptions {
453    fn sub_assign(&mut self, rhs: Self) {
454        self.options &= !rhs.options;
455    }
456}
457
458impl From<u8> for InterestOptions {
459    fn from(options: u8) -> Self {
460        Self { options }
461    }
462}