zenoh_protocol/zenoh/query.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use alloc::{string::String, vec::Vec};
use crate::common::ZExtUnknown;
/// The kind of consolidation to apply to a query.
#[repr(u8)]
#[derive(Debug, Default, Clone, PartialEq, Eq, Copy)]
pub enum ConsolidationMode {
/// Apply automatic consolidation based on queryable's preferences
#[default]
Auto,
/// No consolidation applied: multiple samples may be received for the same key-timestamp.
None,
/// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp
/// has already been sent with the same key.
///
/// This optimizes latency while potentially reducing bandwidth.
///
/// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already
/// been observed with the same key.
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
// Remove the duplicates of any samples based on the their timestamp.
// Unique,
}
impl ConsolidationMode {
pub const DEFAULT: Self = Self::Auto;
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::prelude::SliceRandom;
let mut rng = rand::thread_rng();
*[Self::None, Self::Monotonic, Self::Latest, Self::Auto]
.choose(&mut rng)
.unwrap()
}
}
/// # Query message
///
/// ```text
/// Flags:
/// - C: Consolidation if C==1 then consolidation is present
/// - P: Parameters If P==1 then the parameters are present
/// - Z: Extension If Z==1 then at least one extension is present
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|P|C| QUERY |
/// +-+-+-+---------+
/// % consolidation % if C==1
/// +---------------+
/// ~ ps: <u8;z16> ~ if P==1
/// +---------------+
/// ~ [qry_exts] ~ if Z==1
/// +---------------+
/// ```
pub mod flag {
pub const C: u8 = 1 << 5; // 0x20 Consolidation if C==1 then consolidation is present
pub const P: u8 = 1 << 6; // 0x40 Parameters if P==1 then the parameters are present
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Query {
pub consolidation: ConsolidationMode,
pub parameters: String,
pub ext_sinfo: Option<ext::SourceInfoType>,
pub ext_body: Option<ext::QueryBodyType>,
pub ext_attachment: Option<ext::AttachmentType>,
pub ext_unknown: Vec<ZExtUnknown>,
}
pub mod ext {
use crate::{common::ZExtZBuf, zextzbuf};
/// # SourceInfo extension
/// Used to carry additional information about the source of data
pub type SourceInfo = zextzbuf!(0x1, false);
pub type SourceInfoType = crate::zenoh::ext::SourceInfoType<{ SourceInfo::ID }>;
/// # QueryBody extension
/// Used to carry a body attached to the query
/// Shared Memory extension is automatically defined by ValueType extension if
/// #[cfg(feature = "shared-memory")] is defined.
pub type QueryBodyType = crate::zenoh::ext::ValueType<{ ZExtZBuf::<0x03>::id(false) }, 0x04>;
/// # User attachment
pub type Attachment = zextzbuf!(0x5, false);
pub type AttachmentType = crate::zenoh::ext::AttachmentType<{ Attachment::ID }>;
}
impl Query {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::{
distributions::{Alphanumeric, DistString},
Rng,
};
use crate::common::iext;
let mut rng = rand::thread_rng();
const MIN: usize = 2;
const MAX: usize = 16;
let consolidation = ConsolidationMode::rand();
let parameters: String = if rng.gen_bool(0.5) {
let len = rng.gen_range(MIN..MAX);
Alphanumeric.sample_string(&mut rng, len)
} else {
String::new()
};
let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand());
let ext_body = rng.gen_bool(0.5).then_some(ext::QueryBodyType::rand());
let ext_attachment = rng.gen_bool(0.5).then_some(ext::AttachmentType::rand());
let mut ext_unknown = Vec::new();
for _ in 0..rng.gen_range(0..4) {
ext_unknown.push(ZExtUnknown::rand2(
iext::mid(ext::Attachment::ID) + 1,
false,
));
}
Self {
consolidation,
parameters,
ext_sinfo,
ext_body,
ext_attachment,
ext_unknown,
}
}
}