zenoh_protocol/network/
interest.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::{
15    fmt::{self, Debug},
16    ops::{Add, AddAssign, Sub, SubAssign},
17    sync::atomic::AtomicU32,
18};
19
20use crate::{common::imsg, core::WireExpr, network::Mapping};
21
22pub type InterestId = u32;
23
24pub mod flag {
25    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
26}
27
28/// The INTEREST message is sent to request the transmission of current and optionally future
29/// declarations of a given kind matching a target keyexpr. E.g., an interest could be
30/// sent to request the transmission of all current subscriptions matching `a/*`.
31///
32/// The behaviour of a INTEREST depends on the INTEREST MODE.
33///
34/// E.g., the message flow is the following for an [`Interest`] with mode [`InterestMode::Current`]:
35///
36/// ```text
37///     A                   B
38///     |     INTEREST      |
39///     |------------------>| -- Mode: Current
40///     |                   |    This is an Interest e.g. for subscriber declarations.
41///     |                   |
42///     |  DECL SUBSCRIBER  |
43///     |<------------------| -- With interest_id field set
44///     |  DECL SUBSCRIBER  |
45///     |<------------------| -- With interest_id field set
46///     |  DECL SUBSCRIBER  |
47///     |<------------------| -- With interest_id field set
48///     |                   |
49///     |     DECL FINAL    |
50///     |<------------------| -- With interest_id field set
51///     |                   |
52/// ```
53///
54/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::CurrentFuture`]:
55///
56/// ```text
57///     A                   B
58///     |     INTEREST      |
59///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
60///     |                   |
61///     |  DECL SUBSCRIBER  |
62///     |<------------------| -- With interest_id field set
63///     |  DECL SUBSCRIBER  |
64///     |<------------------| -- With interest_id field set
65///     |  DECL SUBSCRIBER  |
66///     |<------------------| -- With interest_id field set
67///     |                   |
68///     |     DECL FINAL    |
69///     |<------------------| -- With interest_id field set
70///     |                   |
71///     |  DECL SUBSCRIBER  |
72///     |<------------------| -- With interest_id field not set
73///     | UNDECL SUBSCRIBER |
74///     |<------------------| -- With interest_id field not set
75///     |                   |
76///     |        ...        |
77///     |                   |
78///     | INTEREST FINAL    |
79///     |------------------>| -- Mode: Final
80///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
81///     |                   |
82/// ```
83///
84/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]:
85///
86/// ```text
87///     A                   B
88///     |     INTEREST      |
89///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
90///     |                   |
91///     |  DECL SUBSCRIBER  |
92///     |<------------------| -- With interest_id field not set
93///     | UNDECL SUBSCRIBER |
94///     |<------------------| -- With interest_id field not set
95///     |                   |
96///     |        ...        |
97///     |                   |
98///     | INTEREST FINAL    |
99///     |------------------>| -- Mode: Final
100///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
101///     |                   |
102/// ```
103///
104/// ```text
105/// Flags:
106/// - |: Mode           The mode of the interest*
107/// -/
108/// - Z: Extension      If Z==1 then at least one extension is present
109///
110/// 7 6 5 4 3 2 1 0
111/// +-+-+-+-+-+-+-+-+
112/// |Z|Mod|INTEREST |
113/// +-+-+-+---------+
114/// ~    id:z32     ~
115/// +---------------+
116/// |A|M|N|R|T|Q|S|K|  if Mod!=Final (*)
117/// +---------------+
118/// ~ key_scope:z16 ~  if Mod!=Final && R==1
119/// +---------------+
120/// ~  key_suffix   ~  if Mod!=Final && R==1 && N==1 -- <u8;z16>
121/// +---------------+
122/// ~  [int_exts]   ~  if Z==1
123/// +---------------+
124///
125/// Mode of declaration:
126/// - Mode 0b00: Final
127/// - Mode 0b01: Current
128/// - Mode 0b10: Future
129/// - Mode 0b11: CurrentFuture
130///
131/// (*) - if K==1 then the interest refers to key expressions
132///     - if S==1 then the interest refers to subscribers
133///     - if Q==1 then the interest refers to queryables
134///     - if T==1 then the interest refers to tokens
135///     - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions.
136///     - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0.
137///     - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver.
138///               If R==0 then M should be set to 0.
139///     - if A==1 then the replies SHOULD be aggregated
140/// ```
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct Interest {
143    pub id: InterestId,
144    pub mode: InterestMode,
145    pub options: InterestOptions,
146    pub wire_expr: Option<WireExpr<'static>>,
147    pub ext_qos: ext::QoSType,
148    pub ext_tstamp: Option<ext::TimestampType>,
149    pub ext_nodeid: ext::NodeIdType,
150}
151
152/// The resolution of a RequestId
153pub type DeclareRequestId = u32;
154pub type AtomicDeclareRequestId = AtomicU32;
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum InterestMode {
158    Final,
159    Current,
160    Future,
161    CurrentFuture,
162}
163
164impl InterestMode {
165    #[cfg(feature = "test")]
166    #[doc(hidden)]
167    pub fn rand() -> Self {
168        use rand::Rng;
169
170        let mut rng = rand::thread_rng();
171
172        match rng.gen_range(0..4) {
173            0 => InterestMode::Final,
174            1 => InterestMode::Current,
175            2 => InterestMode::Future,
176            3 => InterestMode::CurrentFuture,
177            _ => unreachable!(),
178        }
179    }
180}
181
182pub mod ext {
183    use crate::{
184        common::{ZExtZ64, ZExtZBuf},
185        zextz64, zextzbuf,
186    };
187
188    pub type QoS = zextz64!(0x1, false);
189    pub type QoSType = crate::network::ext::QoSType<{ QoS::ID }>;
190
191    pub type Timestamp = zextzbuf!(0x2, false);
192    pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>;
193
194    pub type NodeId = zextz64!(0x3, true);
195    pub type NodeIdType = crate::network::ext::NodeIdType<{ NodeId::ID }>;
196}
197
198impl Interest {
199    pub fn options(&self) -> u8 {
200        let mut interest = self.options;
201        if let Some(we) = self.wire_expr.as_ref() {
202            interest += InterestOptions::RESTRICTED;
203            if we.has_suffix() {
204                interest += InterestOptions::NAMED;
205            } else {
206                interest -= InterestOptions::NAMED;
207            }
208            if let Mapping::Sender = we.mapping {
209                interest += InterestOptions::MAPPING;
210            } else {
211                interest -= InterestOptions::MAPPING;
212            }
213        } else {
214            interest -= InterestOptions::RESTRICTED;
215        }
216        interest.options
217    }
218
219    #[cfg(feature = "test")]
220    #[doc(hidden)]
221    pub fn rand() -> Self {
222        use rand::Rng;
223        let mut rng = rand::thread_rng();
224
225        let id = rng.gen::<InterestId>();
226        let mode = InterestMode::rand();
227        let options = if mode == InterestMode::Final {
228            InterestOptions::empty()
229        } else {
230            InterestOptions::rand()
231        };
232        let wire_expr = options.restricted().then_some(WireExpr::rand());
233        let ext_qos = ext::QoSType::rand();
234        let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand);
235        let ext_nodeid = ext::NodeIdType::rand();
236
237        Self {
238            id,
239            mode,
240            wire_expr,
241            options,
242            ext_qos,
243            ext_tstamp,
244            ext_nodeid,
245        }
246    }
247}
248
249#[repr(transparent)]
250#[derive(Clone, Copy)]
251pub struct InterestOptions {
252    options: u8,
253}
254
255impl InterestOptions {
256    // Flags
257    pub const KEYEXPRS: InterestOptions = InterestOptions::options(1);
258    pub const SUBSCRIBERS: InterestOptions = InterestOptions::options(1 << 1);
259    pub const QUERYABLES: InterestOptions = InterestOptions::options(1 << 2);
260    pub const TOKENS: InterestOptions = InterestOptions::options(1 << 3);
261    const RESTRICTED: InterestOptions = InterestOptions::options(1 << 4);
262    const NAMED: InterestOptions = InterestOptions::options(1 << 5);
263    const MAPPING: InterestOptions = InterestOptions::options(1 << 6);
264    pub const AGGREGATE: InterestOptions = InterestOptions::options(1 << 7);
265    pub const ALL: InterestOptions = InterestOptions::options(
266        InterestOptions::KEYEXPRS.options
267            | InterestOptions::SUBSCRIBERS.options
268            | InterestOptions::QUERYABLES.options
269            | InterestOptions::TOKENS.options,
270    );
271
272    const fn options(options: u8) -> Self {
273        Self { options }
274    }
275
276    pub const fn empty() -> Self {
277        Self { options: 0 }
278    }
279
280    pub const fn keyexprs(&self) -> bool {
281        imsg::has_flag(self.options, Self::KEYEXPRS.options)
282    }
283
284    pub const fn subscribers(&self) -> bool {
285        imsg::has_flag(self.options, Self::SUBSCRIBERS.options)
286    }
287
288    pub const fn queryables(&self) -> bool {
289        imsg::has_flag(self.options, Self::QUERYABLES.options)
290    }
291
292    pub const fn tokens(&self) -> bool {
293        imsg::has_flag(self.options, Self::TOKENS.options)
294    }
295
296    pub const fn restricted(&self) -> bool {
297        imsg::has_flag(self.options, Self::RESTRICTED.options)
298    }
299
300    pub const fn named(&self) -> bool {
301        imsg::has_flag(self.options, Self::NAMED.options)
302    }
303
304    pub const fn mapping(&self) -> bool {
305        imsg::has_flag(self.options, Self::MAPPING.options)
306    }
307
308    pub const fn aggregate(&self) -> bool {
309        imsg::has_flag(self.options, Self::AGGREGATE.options)
310    }
311
312    #[cfg(feature = "test")]
313    #[doc(hidden)]
314    pub fn rand() -> Self {
315        use rand::Rng;
316        let mut rng = rand::thread_rng();
317
318        let mut s = Self::empty();
319        if rng.gen_bool(0.5) {
320            s += InterestOptions::KEYEXPRS;
321        }
322        if rng.gen_bool(0.5) {
323            s += InterestOptions::SUBSCRIBERS;
324        }
325        if rng.gen_bool(0.5) {
326            s += InterestOptions::TOKENS;
327        }
328        if rng.gen_bool(0.5) {
329            s += InterestOptions::AGGREGATE;
330        }
331        s
332    }
333}
334
335impl PartialEq for InterestOptions {
336    fn eq(&self, other: &Self) -> bool {
337        self.keyexprs() == other.keyexprs()
338            && self.subscribers() == other.subscribers()
339            && self.queryables() == other.queryables()
340            && self.tokens() == other.tokens()
341            && self.aggregate() == other.aggregate()
342    }
343}
344
345impl Debug for InterestOptions {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        write!(f, "Interest {{ ")?;
348        if self.keyexprs() {
349            write!(f, "K:Y, ")?;
350        } else {
351            write!(f, "K:N, ")?;
352        }
353        if self.subscribers() {
354            write!(f, "S:Y, ")?;
355        } else {
356            write!(f, "S:N, ")?;
357        }
358        if self.queryables() {
359            write!(f, "Q:Y, ")?;
360        } else {
361            write!(f, "Q:N, ")?;
362        }
363        if self.tokens() {
364            write!(f, "T:Y, ")?;
365        } else {
366            write!(f, "T:N, ")?;
367        }
368        if self.aggregate() {
369            write!(f, "A:Y")?;
370        } else {
371            write!(f, "A:N")?;
372        }
373        write!(f, " }}")?;
374        Ok(())
375    }
376}
377
378impl Eq for InterestOptions {}
379
380impl Add for InterestOptions {
381    type Output = Self;
382
383    #[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest
384    fn add(self, rhs: Self) -> Self::Output {
385        Self {
386            options: self.options | rhs.options,
387        }
388    }
389}
390
391impl AddAssign for InterestOptions {
392    #[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest
393    fn add_assign(&mut self, rhs: Self) {
394        self.options |= rhs.options;
395    }
396}
397
398impl Sub for InterestOptions {
399    type Output = Self;
400
401    fn sub(self, rhs: Self) -> Self::Output {
402        Self {
403            options: self.options & !rhs.options,
404        }
405    }
406}
407
408impl SubAssign for InterestOptions {
409    fn sub_assign(&mut self, rhs: Self) {
410        self.options &= !rhs.options;
411    }
412}
413
414impl From<u8> for InterestOptions {
415    fn from(options: u8) -> Self {
416        Self { options }
417    }
418}