zenoh_protocol/network/
interest.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::{
15    fmt::{self, Debug},
16    ops::{Add, AddAssign, Sub, SubAssign},
17    sync::atomic::AtomicU32,
18};
19
20use crate::{common::imsg, core::WireExpr, network::Mapping};
21
22pub type InterestId = u32;
23
24pub mod flag {
25    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
26}
27
28/// The INTEREST message is sent to request the transmission of current and optionally future
29/// declarations of a given kind matching a target keyexpr. E.g., an interest could be
30/// sent to request the transmission of all current subscriptions matching `a/*`.
31///
32/// The behaviour of a INTEREST depends on the INTEREST MODE.
33///
34/// E.g., the message flow is the following for an [`Interest`] with mode [`InterestMode::Current`]:
35///
36/// ```text
37///     A                   B
38///     |     INTEREST      |
39///     |------------------>| -- Mode: Current
40///     |                   |    This is an Interest e.g. for subscriber declarations.
41///     |                   |
42///     |  DECL SUBSCRIBER  |
43///     |<------------------| -- With interest_id field set
44///     |  DECL SUBSCRIBER  |
45///     |<------------------| -- With interest_id field set
46///     |  DECL SUBSCRIBER  |
47///     |<------------------| -- With interest_id field set
48///     |                   |
49///     |     DECL FINAL    |
50///     |<------------------| -- With interest_id field set
51///     |                   |
52/// ```
53///
54/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::CurrentFuture`]:
55///
56/// ```text
57///     A                   B
58///     |     INTEREST      |
59///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
60///     |                   |
61///     |  DECL SUBSCRIBER  |
62///     |<------------------| -- With interest_id field set
63///     |  DECL SUBSCRIBER  |
64///     |<------------------| -- With interest_id field set
65///     |  DECL SUBSCRIBER  |
66///     |<------------------| -- With interest_id field set
67///     |                   |
68///     |     DECL FINAL    |
69///     |<------------------| -- With interest_id field set
70///     |                   |
71///     |  DECL SUBSCRIBER  |
72///     |<------------------| -- With interest_id field not set
73///     | UNDECL SUBSCRIBER |
74///     |<------------------| -- With interest_id field not set
75///     |                   |
76///     |        ...        |
77///     |                   |
78///     | INTEREST FINAL    |
79///     |------------------>| -- Mode: Final
80///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
81///     |                   |
82/// ```
83///
84/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]:
85///
86/// ```text
87///     A                   B
88///     |     INTEREST      |
89///     |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
90///     |                   |
91///     |  DECL SUBSCRIBER  |
92///     |<------------------| -- With interest_id field not set
93///     | UNDECL SUBSCRIBER |
94///     |<------------------| -- With interest_id field not set
95///     |                   |
96///     |        ...        |
97///     |                   |
98///     | INTEREST FINAL    |
99///     |------------------>| -- Mode: Final
100///     |                   |    This stops the transmission of subscriber declarations/undeclarations.
101///     |                   |
102/// ```
103///
104/// ```text
105/// Flags:
106/// - |: Mode           The mode of the interest*
107/// -/
108/// - Z: Extension      If Z==1 then at least one extension is present
109///
110/// 7 6 5 4 3 2 1 0
111/// +-+-+-+-+-+-+-+-+
112/// |Z|Mod|INTEREST |
113/// +-+-+-+---------+
114/// ~    id:z32     ~
115/// +---------------+
116/// |A|M|N|R|T|Q|S|K|  if Mod!=Final (*)
117/// +---------------+
118/// ~ key_scope:z16 ~  if Mod!=Final && R==1
119/// +---------------+
120/// ~  key_suffix   ~  if Mod!=Final && R==1 && N==1 -- <u8;z16>
121/// +---------------+
122/// ~  [int_exts]   ~  if Z==1
123/// +---------------+
124///
125/// Mode of declaration:
126/// - Mode 0b00: Final
127/// - Mode 0b01: Current
128/// - Mode 0b10: Future
129/// - Mode 0b11: CurrentFuture
130///
131/// (*) - if K==1 then the interest refers to key expressions
132///     - if S==1 then the interest refers to subscribers
133///     - if Q==1 then the interest refers to queryables
134///     - if T==1 then the interest refers to tokens
135///     - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions.
136///     - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0.
137///     - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver.
138///               If R==0 then M should be set to 0.
139///     - if A==1 then the replies SHOULD be aggregated
140/// ```
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct Interest {
143    pub id: InterestId,
144    pub mode: InterestMode,
145    pub options: InterestOptions,
146    pub wire_expr: Option<WireExpr<'static>>,
147    pub ext_qos: ext::QoSType,
148    pub ext_tstamp: Option<ext::TimestampType>,
149    pub ext_nodeid: ext::NodeIdType,
150}
151
152/// The resolution of a RequestId
153pub type DeclareRequestId = u32;
154pub type AtomicDeclareRequestId = AtomicU32;
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum InterestMode {
158    Final,
159    Current,
160    Future,
161    CurrentFuture,
162}
163
164impl InterestMode {
165    #[cfg(feature = "test")]
166    pub fn rand() -> Self {
167        use rand::Rng;
168
169        let mut rng = rand::thread_rng();
170
171        match rng.gen_range(0..4) {
172            0 => InterestMode::Final,
173            1 => InterestMode::Current,
174            2 => InterestMode::Future,
175            3 => InterestMode::CurrentFuture,
176            _ => unreachable!(),
177        }
178    }
179}
180
181pub mod ext {
182    use crate::{
183        common::{ZExtZ64, ZExtZBuf},
184        zextz64, zextzbuf,
185    };
186
187    pub type QoS = zextz64!(0x1, false);
188    pub type QoSType = crate::network::ext::QoSType<{ QoS::ID }>;
189
190    pub type Timestamp = zextzbuf!(0x2, false);
191    pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>;
192
193    pub type NodeId = zextz64!(0x3, true);
194    pub type NodeIdType = crate::network::ext::NodeIdType<{ NodeId::ID }>;
195}
196
197impl Interest {
198    pub fn options(&self) -> u8 {
199        let mut interest = self.options;
200        if let Some(we) = self.wire_expr.as_ref() {
201            interest += InterestOptions::RESTRICTED;
202            if we.has_suffix() {
203                interest += InterestOptions::NAMED;
204            }
205            if let Mapping::Sender = we.mapping {
206                interest += InterestOptions::MAPPING;
207            }
208        }
209        interest.options
210    }
211
212    #[cfg(feature = "test")]
213    pub fn rand() -> Self {
214        use rand::Rng;
215        let mut rng = rand::thread_rng();
216
217        let id = rng.gen::<InterestId>();
218        let mode = InterestMode::rand();
219        let options = if mode == InterestMode::Final {
220            InterestOptions::empty()
221        } else {
222            InterestOptions::rand()
223        };
224        let wire_expr = options.restricted().then_some(WireExpr::rand());
225        let ext_qos = ext::QoSType::rand();
226        let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand);
227        let ext_nodeid = ext::NodeIdType::rand();
228
229        Self {
230            id,
231            mode,
232            wire_expr,
233            options,
234            ext_qos,
235            ext_tstamp,
236            ext_nodeid,
237        }
238    }
239}
240
241#[repr(transparent)]
242#[derive(Clone, Copy)]
243pub struct InterestOptions {
244    options: u8,
245}
246
247impl InterestOptions {
248    // Flags
249    pub const KEYEXPRS: InterestOptions = InterestOptions::options(1);
250    pub const SUBSCRIBERS: InterestOptions = InterestOptions::options(1 << 1);
251    pub const QUERYABLES: InterestOptions = InterestOptions::options(1 << 2);
252    pub const TOKENS: InterestOptions = InterestOptions::options(1 << 3);
253    const RESTRICTED: InterestOptions = InterestOptions::options(1 << 4);
254    const NAMED: InterestOptions = InterestOptions::options(1 << 5);
255    const MAPPING: InterestOptions = InterestOptions::options(1 << 6);
256    pub const AGGREGATE: InterestOptions = InterestOptions::options(1 << 7);
257    pub const ALL: InterestOptions = InterestOptions::options(
258        InterestOptions::KEYEXPRS.options
259            | InterestOptions::SUBSCRIBERS.options
260            | InterestOptions::QUERYABLES.options
261            | InterestOptions::TOKENS.options,
262    );
263
264    const fn options(options: u8) -> Self {
265        Self { options }
266    }
267
268    pub const fn empty() -> Self {
269        Self { options: 0 }
270    }
271
272    pub const fn keyexprs(&self) -> bool {
273        imsg::has_flag(self.options, Self::KEYEXPRS.options)
274    }
275
276    pub const fn subscribers(&self) -> bool {
277        imsg::has_flag(self.options, Self::SUBSCRIBERS.options)
278    }
279
280    pub const fn queryables(&self) -> bool {
281        imsg::has_flag(self.options, Self::QUERYABLES.options)
282    }
283
284    pub const fn tokens(&self) -> bool {
285        imsg::has_flag(self.options, Self::TOKENS.options)
286    }
287
288    pub const fn restricted(&self) -> bool {
289        imsg::has_flag(self.options, Self::RESTRICTED.options)
290    }
291
292    pub const fn named(&self) -> bool {
293        imsg::has_flag(self.options, Self::NAMED.options)
294    }
295
296    pub const fn mapping(&self) -> bool {
297        imsg::has_flag(self.options, Self::MAPPING.options)
298    }
299
300    pub const fn aggregate(&self) -> bool {
301        imsg::has_flag(self.options, Self::AGGREGATE.options)
302    }
303
304    #[cfg(feature = "test")]
305    pub fn rand() -> Self {
306        use rand::Rng;
307        let mut rng = rand::thread_rng();
308
309        let mut s = Self::empty();
310        if rng.gen_bool(0.5) {
311            s += InterestOptions::KEYEXPRS;
312        }
313        if rng.gen_bool(0.5) {
314            s += InterestOptions::SUBSCRIBERS;
315        }
316        if rng.gen_bool(0.5) {
317            s += InterestOptions::TOKENS;
318        }
319        if rng.gen_bool(0.5) {
320            s += InterestOptions::AGGREGATE;
321        }
322        s
323    }
324}
325
326impl PartialEq for InterestOptions {
327    fn eq(&self, other: &Self) -> bool {
328        self.keyexprs() == other.keyexprs()
329            && self.subscribers() == other.subscribers()
330            && self.queryables() == other.queryables()
331            && self.tokens() == other.tokens()
332            && self.aggregate() == other.aggregate()
333    }
334}
335
336impl Debug for InterestOptions {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        write!(f, "Interest {{ ")?;
339        if self.keyexprs() {
340            write!(f, "K:Y, ")?;
341        } else {
342            write!(f, "K:N, ")?;
343        }
344        if self.subscribers() {
345            write!(f, "S:Y, ")?;
346        } else {
347            write!(f, "S:N, ")?;
348        }
349        if self.queryables() {
350            write!(f, "Q:Y, ")?;
351        } else {
352            write!(f, "Q:N, ")?;
353        }
354        if self.tokens() {
355            write!(f, "T:Y, ")?;
356        } else {
357            write!(f, "T:N, ")?;
358        }
359        if self.aggregate() {
360            write!(f, "A:Y")?;
361        } else {
362            write!(f, "A:N")?;
363        }
364        write!(f, " }}")?;
365        Ok(())
366    }
367}
368
369impl Eq for InterestOptions {}
370
371impl Add for InterestOptions {
372    type Output = Self;
373
374    #[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest
375    fn add(self, rhs: Self) -> Self::Output {
376        Self {
377            options: self.options | rhs.options,
378        }
379    }
380}
381
382impl AddAssign for InterestOptions {
383    #[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest
384    fn add_assign(&mut self, rhs: Self) {
385        self.options |= rhs.options;
386    }
387}
388
389impl Sub for InterestOptions {
390    type Output = Self;
391
392    fn sub(self, rhs: Self) -> Self::Output {
393        Self {
394            options: self.options & !rhs.options,
395        }
396    }
397}
398
399impl SubAssign for InterestOptions {
400    fn sub_assign(&mut self, rhs: Self) {
401        self.options &= !rhs.options;
402    }
403}
404
405impl From<u8> for InterestOptions {
406    fn from(options: u8) -> Self {
407        Self { options }
408    }
409}