zenoh_protocol/network/
interest.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::{
15    fmt::{self, Debug},
16    ops::{Add, AddAssign, Sub, SubAssign},
17    sync::atomic::AtomicU32,
18};
19
20use crate::{common::imsg, core::WireExpr, network::Mapping};
21
22pub type InterestId = u32;
23
24pub mod flag {
25    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
26}
27
28/// The INTEREST message is sent to request the transmission of current and optionally future
29/// declarations of a given kind matching a target keyexpr. E.g., an interest could be
30/// sent to request the transmission of all current subscriptions matching `a/*`.
31///
32/// The behaviour of a INTEREST depends on the INTEREST MODE.
33///
34/// E.g., the message flow is the following for an [`Interest`] with mode [`InterestMode::Current`]:
35///
36/// ```text
37///     A                   B
38///     |     INTEREST      |
39///     |------------------>| -- Mode: Current
40///     |                   |    This is an Interest e.g. for subscriber declarations.
41///     |                   |
42///     |  DECL SUBSCRIBER  |
43///     |<------------------| -- With interest_id field set
44///     |  DECL SUBSCRIBER  |
45///     |<------------------| -- With interest_id field set
46///     |  DECL SUBSCRIBER  |
47///     |<------------------| -- With interest_id field set
48///     |                   |
49///     |     DECL FINAL    |
50///     |<------------------| -- With interest_id field set
51///     |                   |
52/// ```
53///
54/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::CurrentFuture`]:
55///
56/// ```text
57///     A                   B
58///     |     INTEREST      |
59///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
60///     |                   |
61///     |  DECL SUBSCRIBER  |
62///     |<------------------| -- With interest_id field set
63///     |  DECL SUBSCRIBER  |
64///     |<------------------| -- With interest_id field set
65///     |  DECL SUBSCRIBER  |
66///     |<------------------| -- With interest_id field set
67///     |                   |
68///     |     DECL FINAL    |
69///     |<------------------| -- With interest_id field set
70///     |                   |
71///     |  DECL SUBSCRIBER  |
72///     |<------------------| -- With interest_id field not set
73///     | UNDECL SUBSCRIBER |
74///     |<------------------| -- With interest_id field not set
75///     |                   |
76///     |        ...        |
77///     |                   |
78///     | INTEREST FINAL    |
79///     |------------------>| -- Mode: Final
80///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
81///     |                   |
82/// ```
83///
84/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]:
85///
86/// ```text
87///     A                   B
88///     |     INTEREST      |
89///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
90///     |                   |
91///     |  DECL SUBSCRIBER  |
92///     |<------------------| -- With interest_id field not set
93///     | UNDECL SUBSCRIBER |
94///     |<------------------| -- With interest_id field not set
95///     |                   |
96///     |        ...        |
97///     |                   |
98///     | INTEREST FINAL    |
99///     |------------------>| -- Mode: Final
100///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
101///     |                   |
102/// ```
103///
104/// ```text
105/// Flags:
106/// - |: Mode           The mode of the interest*
107/// -/
108/// - Z: Extension      If Z==1 then at least one extension is present
109///
110/// 7 6 5 4 3 2 1 0
111/// +-+-+-+-+-+-+-+-+
112/// |Z|Mod|INTEREST |
113/// +-+-+-+---------+
114/// ~    id:z32     ~
115/// +---------------+
116/// |A|M|N|R|T|Q|S|K|  if Mod!=Final (*)
117/// +---------------+
118/// ~ key_scope:z16 ~  if Mod!=Final && R==1
119/// +---------------+
120/// ~  key_suffix   ~  if Mod!=Final && R==1 && N==1 -- <u8;z16>
121/// +---------------+
122/// ~  [int_exts]   ~  if Z==1
123/// +---------------+
124///
125/// Mode of declaration:
126/// - Mode 0b00: Final
127/// - Mode 0b01: Current
128/// - Mode 0b10: Future
129/// - Mode 0b11: CurrentFuture
130///
131/// (*) - if K==1 then the interest refers to key expressions
132///     - if S==1 then the interest refers to subscribers
133///     - if Q==1 then the interest refers to queryables
134///     - if T==1 then the interest refers to tokens
135///     - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions.
136///     - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0.
137///     - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver.
138///               If R==0 then M should be set to 0.
139///     - if A==1 then the replies SHOULD be aggregated
140/// ```
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct Interest {
143    pub id: InterestId,
144    pub mode: InterestMode,
145    pub options: InterestOptions,
146    pub wire_expr: Option<WireExpr<'static>>,
147    pub ext_qos: ext::QoSType,
148    pub ext_tstamp: Option<ext::TimestampType>,
149    pub ext_nodeid: ext::NodeIdType,
150}
151
152/// The resolution of a RequestId
153pub type DeclareRequestId = u32;
154pub type AtomicDeclareRequestId = AtomicU32;
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum InterestMode {
158    Final,
159    Current,
160    Future,
161    CurrentFuture,
162}
163
164impl InterestMode {
165    #[cfg(feature = "test")]
166    #[doc(hidden)]
167    pub fn rand() -> Self {
168        use rand::Rng;
169
170        let mut rng = rand::thread_rng();
171
172        match rng.gen_range(0..4) {
173            0 => InterestMode::Final,
174            1 => InterestMode::Current,
175            2 => InterestMode::Future,
176            3 => InterestMode::CurrentFuture,
177            _ => unreachable!(),
178        }
179    }
180}
181
182pub mod ext {
183    use crate::{
184        common::{ZExtZ64, ZExtZBuf},
185        zextz64, zextzbuf,
186    };
187
188    pub type QoS = zextz64!(0x1, false);
189    pub type QoSType = crate::network::ext::QoSType<{ QoS::ID }>;
190
191    pub type Timestamp = zextzbuf!(0x2, false);
192    pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>;
193
194    pub type NodeId = zextz64!(0x3, true);
195    pub type NodeIdType = crate::network::ext::NodeIdType<{ NodeId::ID }>;
196}
197
198impl Interest {
199    pub fn options(&self) -> u8 {
200        let mut interest = self.options;
201        if let Some(we) = self.wire_expr.as_ref() {
202            interest += InterestOptions::RESTRICTED;
203            if we.has_suffix() {
204                interest += InterestOptions::NAMED;
205            }
206            if let Mapping::Sender = we.mapping {
207                interest += InterestOptions::MAPPING;
208            }
209        }
210        interest.options
211    }
212
213    #[cfg(feature = "test")]
214    #[doc(hidden)]
215    pub fn rand() -> Self {
216        use rand::Rng;
217        let mut rng = rand::thread_rng();
218
219        let id = rng.gen::<InterestId>();
220        let mode = InterestMode::rand();
221        let options = if mode == InterestMode::Final {
222            InterestOptions::empty()
223        } else {
224            InterestOptions::rand()
225        };
226        let wire_expr = options.restricted().then_some(WireExpr::rand());
227        let ext_qos = ext::QoSType::rand();
228        let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand);
229        let ext_nodeid = ext::NodeIdType::rand();
230
231        Self {
232            id,
233            mode,
234            wire_expr,
235            options,
236            ext_qos,
237            ext_tstamp,
238            ext_nodeid,
239        }
240    }
241}
242
243#[repr(transparent)]
244#[derive(Clone, Copy)]
245pub struct InterestOptions {
246    options: u8,
247}
248
249impl InterestOptions {
250    // Flags
251    pub const KEYEXPRS: InterestOptions = InterestOptions::options(1);
252    pub const SUBSCRIBERS: InterestOptions = InterestOptions::options(1 << 1);
253    pub const QUERYABLES: InterestOptions = InterestOptions::options(1 << 2);
254    pub const TOKENS: InterestOptions = InterestOptions::options(1 << 3);
255    const RESTRICTED: InterestOptions = InterestOptions::options(1 << 4);
256    const NAMED: InterestOptions = InterestOptions::options(1 << 5);
257    const MAPPING: InterestOptions = InterestOptions::options(1 << 6);
258    pub const AGGREGATE: InterestOptions = InterestOptions::options(1 << 7);
259    pub const ALL: InterestOptions = InterestOptions::options(
260        InterestOptions::KEYEXPRS.options
261            | InterestOptions::SUBSCRIBERS.options
262            | InterestOptions::QUERYABLES.options
263            | InterestOptions::TOKENS.options,
264    );
265
266    const fn options(options: u8) -> Self {
267        Self { options }
268    }
269
270    pub const fn empty() -> Self {
271        Self { options: 0 }
272    }
273
274    pub const fn keyexprs(&self) -> bool {
275        imsg::has_flag(self.options, Self::KEYEXPRS.options)
276    }
277
278    pub const fn subscribers(&self) -> bool {
279        imsg::has_flag(self.options, Self::SUBSCRIBERS.options)
280    }
281
282    pub const fn queryables(&self) -> bool {
283        imsg::has_flag(self.options, Self::QUERYABLES.options)
284    }
285
286    pub const fn tokens(&self) -> bool {
287        imsg::has_flag(self.options, Self::TOKENS.options)
288    }
289
290    pub const fn restricted(&self) -> bool {
291        imsg::has_flag(self.options, Self::RESTRICTED.options)
292    }
293
294    pub const fn named(&self) -> bool {
295        imsg::has_flag(self.options, Self::NAMED.options)
296    }
297
298    pub const fn mapping(&self) -> bool {
299        imsg::has_flag(self.options, Self::MAPPING.options)
300    }
301
302    pub const fn aggregate(&self) -> bool {
303        imsg::has_flag(self.options, Self::AGGREGATE.options)
304    }
305
306    #[cfg(feature = "test")]
307    #[doc(hidden)]
308    pub fn rand() -> Self {
309        use rand::Rng;
310        let mut rng = rand::thread_rng();
311
312        let mut s = Self::empty();
313        if rng.gen_bool(0.5) {
314            s += InterestOptions::KEYEXPRS;
315        }
316        if rng.gen_bool(0.5) {
317            s += InterestOptions::SUBSCRIBERS;
318        }
319        if rng.gen_bool(0.5) {
320            s += InterestOptions::TOKENS;
321        }
322        if rng.gen_bool(0.5) {
323            s += InterestOptions::AGGREGATE;
324        }
325        s
326    }
327}
328
329impl PartialEq for InterestOptions {
330    fn eq(&self, other: &Self) -> bool {
331        self.keyexprs() == other.keyexprs()
332            && self.subscribers() == other.subscribers()
333            && self.queryables() == other.queryables()
334            && self.tokens() == other.tokens()
335            && self.aggregate() == other.aggregate()
336    }
337}
338
339impl Debug for InterestOptions {
340    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341        write!(f, "Interest {{ ")?;
342        if self.keyexprs() {
343            write!(f, "K:Y, ")?;
344        } else {
345            write!(f, "K:N, ")?;
346        }
347        if self.subscribers() {
348            write!(f, "S:Y, ")?;
349        } else {
350            write!(f, "S:N, ")?;
351        }
352        if self.queryables() {
353            write!(f, "Q:Y, ")?;
354        } else {
355            write!(f, "Q:N, ")?;
356        }
357        if self.tokens() {
358            write!(f, "T:Y, ")?;
359        } else {
360            write!(f, "T:N, ")?;
361        }
362        if self.aggregate() {
363            write!(f, "A:Y")?;
364        } else {
365            write!(f, "A:N")?;
366        }
367        write!(f, " }}")?;
368        Ok(())
369    }
370}
371
372impl Eq for InterestOptions {}
373
374impl Add for InterestOptions {
375    type Output = Self;
376
377    #[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest
378    fn add(self, rhs: Self) -> Self::Output {
379        Self {
380            options: self.options | rhs.options,
381        }
382    }
383}
384
385impl AddAssign for InterestOptions {
386    #[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest
387    fn add_assign(&mut self, rhs: Self) {
388        self.options |= rhs.options;
389    }
390}
391
392impl Sub for InterestOptions {
393    type Output = Self;
394
395    fn sub(self, rhs: Self) -> Self::Output {
396        Self {
397            options: self.options & !rhs.options,
398        }
399    }
400}
401
402impl SubAssign for InterestOptions {
403    fn sub_assign(&mut self, rhs: Self) {
404        self.options &= !rhs.options;
405    }
406}
407
408impl From<u8> for InterestOptions {
409    fn from(options: u8) -> Self {
410        Self { options }
411    }
412}