1use std::{
15 collections::{HashMap, VecDeque},
16 convert::TryInto,
17 future::{IntoFuture, Ready},
18 time::Duration,
19};
20
21use zenoh::{
22 handlers::FifoChannelHandler,
23 internal::{bail, runtime::ZRuntime, ResolveFuture, TerminatableTask},
24 key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
25 pubsub::Subscriber,
26 query::{Query, Queryable, ZenohParameters},
27 sample::{Locality, Sample},
28 Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
29};
30
31#[zenoh_macros::unstable]
33#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
34#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
35pub struct PublicationCacheBuilder<'a, 'b, 'c, const BACKGROUND: bool = false> {
36 session: &'a Session,
37 pub_key_expr: ZResult<KeyExpr<'b>>,
38 queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
39 queryable_origin: Option<Locality>,
40 complete: Option<bool>,
41 history: usize,
42 resources_limit: Option<usize>,
43}
44
45#[allow(deprecated)]
46#[zenoh_macros::unstable]
47impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
48 pub(crate) fn new(
49 session: &'a Session,
50 pub_key_expr: ZResult<KeyExpr<'b>>,
51 ) -> PublicationCacheBuilder<'a, 'b, 'c> {
52 PublicationCacheBuilder {
53 session,
54 pub_key_expr,
55 queryable_suffix: None,
56 queryable_origin: None,
57 complete: None,
58 history: 1,
59 resources_limit: None,
60 }
61 }
62
63 #[zenoh_macros::unstable]
65 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
66 pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
67 where
68 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
69 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<Error>,
70 {
71 self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
72 self
73 }
74
75 #[zenoh_macros::unstable]
78 #[inline]
79 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
80 pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self {
81 self.queryable_origin = Some(origin);
82 self
83 }
84
85 #[zenoh_macros::unstable]
87 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
88 pub fn queryable_complete(mut self, complete: bool) -> Self {
89 self.complete = Some(complete);
90 self
91 }
92
93 #[zenoh_macros::unstable]
95 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
96 pub fn history(mut self, history: usize) -> Self {
97 self.history = history;
98 self
99 }
100
101 #[zenoh_macros::unstable]
103 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
104 pub fn resources_limit(mut self, limit: usize) -> Self {
105 self.resources_limit = Some(limit);
106 self
107 }
108
109 #[zenoh_macros::unstable]
110 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
111 pub fn background(self) -> PublicationCacheBuilder<'a, 'b, 'c, true> {
112 PublicationCacheBuilder {
113 session: self.session,
114 pub_key_expr: self.pub_key_expr,
115 queryable_suffix: self.queryable_suffix,
116 queryable_origin: self.queryable_origin,
117 complete: self.complete,
118 history: self.history,
119 resources_limit: self.resources_limit,
120 }
121 }
122}
123
124#[zenoh_macros::unstable]
125#[allow(deprecated)]
126impl Resolvable for PublicationCacheBuilder<'_, '_, '_> {
127 type To = ZResult<PublicationCache>;
128}
129
130#[zenoh_macros::unstable]
131#[allow(deprecated)]
132impl Wait for PublicationCacheBuilder<'_, '_, '_> {
133 #[zenoh_macros::unstable]
134 fn wait(self) -> <Self as Resolvable>::To {
135 PublicationCache::new(self)
136 }
137}
138
139#[zenoh_macros::unstable]
140#[allow(deprecated)]
141impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {
142 type Output = <Self as Resolvable>::To;
143 type IntoFuture = Ready<<Self as Resolvable>::To>;
144
145 #[zenoh_macros::unstable]
146 fn into_future(self) -> Self::IntoFuture {
147 std::future::ready(self.wait())
148 }
149}
150
151#[zenoh_macros::unstable]
152#[allow(deprecated)]
153impl Resolvable for PublicationCacheBuilder<'_, '_, '_, true> {
154 type To = ZResult<()>;
155}
156
157#[zenoh_macros::unstable]
158#[allow(deprecated)]
159impl Wait for PublicationCacheBuilder<'_, '_, '_, true> {
160 #[zenoh_macros::unstable]
161 fn wait(self) -> <Self as Resolvable>::To {
162 PublicationCache::new(self).map(|_| ())
163 }
164}
165
166#[zenoh_macros::unstable]
167#[allow(deprecated)]
168impl IntoFuture for PublicationCacheBuilder<'_, '_, '_, true> {
169 type Output = <Self as Resolvable>::To;
170 type IntoFuture = Ready<<Self as Resolvable>::To>;
171
172 #[zenoh_macros::unstable]
173 fn into_future(self) -> Self::IntoFuture {
174 std::future::ready(self.wait())
175 }
176}
177
178#[zenoh_macros::unstable]
180#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
181pub struct PublicationCache {
182 local_sub: Subscriber<FifoChannelHandler<Sample>>,
183 _queryable: Queryable<FifoChannelHandler<Query>>,
184 task: TerminatableTask,
185}
186
187#[zenoh_macros::unstable]
188#[allow(deprecated)]
189impl PublicationCache {
190 #[zenoh_macros::unstable]
191 fn new<const BACKGROUND: bool>(
192 conf: PublicationCacheBuilder<'_, '_, '_, BACKGROUND>,
193 ) -> ZResult<PublicationCache> {
194 let key_expr = conf.pub_key_expr?;
195 let (queryable_suffix, queryable_key_expr): (Option<OwnedKeyExpr>, KeyExpr) =
197 match conf.queryable_suffix {
198 None => (None, key_expr.clone()),
199 Some(Ok(ke)) => {
200 let queryable_key_expr = &key_expr / &ke;
201 (Some(ke.into()), queryable_key_expr)
202 }
203 Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
204 };
205 tracing::debug!(
206 "Create PublicationCache on {} with history={} resource_limit={:?}",
207 &key_expr,
208 conf.history,
209 conf.resources_limit
210 );
211
212 if conf.session.hlc().is_none() {
213 bail!(
214 "Failed requirement for PublicationCache on {}: \
215 the 'timestamping' setting must be enabled in the Zenoh configuration",
216 key_expr,
217 )
218 }
219
220 let mut local_sub = conf
222 .session
223 .declare_subscriber(&key_expr)
224 .allowed_origin(Locality::SessionLocal)
225 .wait()?;
226 if BACKGROUND {
227 local_sub.set_background(true);
228 }
229
230 let mut queryable = conf.session.declare_queryable(&queryable_key_expr);
232 if let Some(origin) = conf.queryable_origin {
233 queryable = queryable.allowed_origin(origin);
234 }
235 if let Some(complete) = conf.complete {
236 queryable = queryable.complete(complete);
237 }
238 let mut queryable = queryable.wait()?;
239 if BACKGROUND {
240 queryable.set_background(true);
241 }
242
243 let sub_recv = local_sub.handler().clone();
245 let quer_recv = queryable.handler().clone();
246 let pub_key_expr = key_expr.into_owned();
247 let resources_limit = conf.resources_limit;
248 let history = conf.history;
249
250 let token = TerminatableTask::create_cancellation_token();
252 let token2 = token.clone();
253 let task = TerminatableTask::spawn(
254 ZRuntime::Application,
255 async move {
256 let mut cache: HashMap<OwnedKeyExpr, VecDeque<Sample>> =
257 HashMap::with_capacity(resources_limit.unwrap_or(32));
258 let limit = resources_limit.unwrap_or(usize::MAX);
259 loop {
260 tokio::select! {
261 sample = sub_recv.recv_async() => {
263 if let Ok(sample) = sample {
264 let queryable_key_expr: KeyExpr<'_> = if let Some(suffix) = &queryable_suffix {
265 sample.key_expr() / suffix
266 } else {
267 sample.key_expr().clone()
268 };
269
270 if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
271 if queue.len() >= history {
272 queue.pop_front();
273 }
274 queue.push_back(sample);
275 } else if cache.len() >= limit {
276 tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
277 pub_key_expr);
278 } else {
279 let mut queue: VecDeque<Sample> = VecDeque::new();
280 queue.push_back(sample);
281 cache.insert(queryable_key_expr.into(), queue);
282 }
283 }
284 },
285
286 query = quer_recv.recv_async() => {
288 if let Ok(query) = query {
289 if !query.key_expr().as_str().contains('*') {
290 if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
291 for sample in queue {
292 if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
293 if !time_range.contains(timestamp.get_time().to_system_time()){
294 continue;
295 }
296 }
297 if let Err(e) = query.reply_sample(sample.clone()).await {
298 tracing::warn!("Error replying to query: {}", e);
299 }
300 }
301 }
302 } else {
303 for (key_expr, queue) in cache.iter() {
304 if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
305 for sample in queue {
306 if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
307 if !time_range.contains(timestamp.get_time().to_system_time()){
308 continue;
309 }
310 }
311 if let Err(e) = query.reply_sample(sample.clone()).await {
312 tracing::warn!("Error replying to query: {}", e);
313 }
314 }
315 }
316 }
317 }
318 }
319 },
320 _ = token2.cancelled() => return
321 }
322 }
323 },
324 token,
325 );
326
327 Ok(PublicationCache {
328 local_sub,
329 _queryable: queryable,
330 task,
331 })
332 }
333
334 #[zenoh_macros::unstable]
336 #[inline]
337 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
338 pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
339 ResolveFuture::new(async move {
340 let PublicationCache {
341 _queryable,
342 local_sub,
343 mut task,
344 } = self;
345 _queryable.undeclare().await?;
346 local_sub.undeclare().await?;
347 task.terminate(Duration::from_secs(10));
348 Ok(())
349 })
350 }
351
352 #[zenoh_macros::internal]
353 #[zenoh_macros::unstable]
354 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
355 pub fn set_background(&mut self, background: bool) {
356 self.local_sub.set_background(background);
357 self._queryable.set_background(background);
358 }
359
360 #[zenoh_macros::unstable]
361 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
362 pub fn key_expr(&self) -> &KeyExpr<'static> {
363 self.local_sub.key_expr()
364 }
365}