1use std::{
15 collections::{HashMap, VecDeque},
16 convert::TryInto,
17 future::{IntoFuture, Ready},
18 time::Duration,
19};
20
21use zenoh::{
22 handlers::FifoChannelHandler,
23 internal::{bail, runtime::ZRuntime, ResolveFuture, TerminatableTask},
24 key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
25 pubsub::Subscriber,
26 query::{Query, Queryable, ZenohParameters},
27 sample::{Locality, Sample},
28 Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
29};
30
31#[zenoh_macros::unstable]
33#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
34#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
35pub struct PublicationCacheBuilder<'a, 'b, 'c, const BACKGROUND: bool = false> {
36 session: &'a Session,
37 pub_key_expr: ZResult<KeyExpr<'b>>,
38 queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
39 queryable_origin: Option<Locality>,
40 complete: Option<bool>,
41 history: usize,
42 resources_limit: Option<usize>,
43}
44
45#[allow(deprecated)]
46#[zenoh_macros::unstable]
47impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
48 pub(crate) fn new(
49 session: &'a Session,
50 pub_key_expr: ZResult<KeyExpr<'b>>,
51 ) -> PublicationCacheBuilder<'a, 'b, 'c> {
52 PublicationCacheBuilder {
53 session,
54 pub_key_expr,
55 queryable_suffix: None,
56 queryable_origin: None,
57 complete: None,
58 history: 1,
59 resources_limit: None,
60 }
61 }
62
63 #[zenoh_macros::unstable]
65 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
66 pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
67 where
68 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
69 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<Error>,
70 {
71 self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
72 self
73 }
74
75 #[zenoh_macros::unstable]
77 #[inline]
78 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
79 pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self {
80 self.queryable_origin = Some(origin);
81 self
82 }
83
84 #[zenoh_macros::unstable]
86 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
87 pub fn queryable_complete(mut self, complete: bool) -> Self {
88 self.complete = Some(complete);
89 self
90 }
91
92 #[zenoh_macros::unstable]
94 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
95 pub fn history(mut self, history: usize) -> Self {
96 self.history = history;
97 self
98 }
99
100 #[zenoh_macros::unstable]
102 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
103 pub fn resources_limit(mut self, limit: usize) -> Self {
104 self.resources_limit = Some(limit);
105 self
106 }
107
108 #[zenoh_macros::unstable]
109 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
110 pub fn background(self) -> PublicationCacheBuilder<'a, 'b, 'c, true> {
111 PublicationCacheBuilder {
112 session: self.session,
113 pub_key_expr: self.pub_key_expr,
114 queryable_suffix: self.queryable_suffix,
115 queryable_origin: self.queryable_origin,
116 complete: self.complete,
117 history: self.history,
118 resources_limit: self.resources_limit,
119 }
120 }
121}
122
123#[zenoh_macros::unstable]
124#[allow(deprecated)]
125impl Resolvable for PublicationCacheBuilder<'_, '_, '_> {
126 type To = ZResult<PublicationCache>;
127}
128
129#[zenoh_macros::unstable]
130#[allow(deprecated)]
131impl Wait for PublicationCacheBuilder<'_, '_, '_> {
132 #[zenoh_macros::unstable]
133 fn wait(self) -> <Self as Resolvable>::To {
134 PublicationCache::new(self)
135 }
136}
137
138#[zenoh_macros::unstable]
139#[allow(deprecated)]
140impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {
141 type Output = <Self as Resolvable>::To;
142 type IntoFuture = Ready<<Self as Resolvable>::To>;
143
144 #[zenoh_macros::unstable]
145 fn into_future(self) -> Self::IntoFuture {
146 std::future::ready(self.wait())
147 }
148}
149
150#[zenoh_macros::unstable]
151#[allow(deprecated)]
152impl Resolvable for PublicationCacheBuilder<'_, '_, '_, true> {
153 type To = ZResult<()>;
154}
155
156#[zenoh_macros::unstable]
157#[allow(deprecated)]
158impl Wait for PublicationCacheBuilder<'_, '_, '_, true> {
159 #[zenoh_macros::unstable]
160 fn wait(self) -> <Self as Resolvable>::To {
161 PublicationCache::new(self).map(|_| ())
162 }
163}
164
165#[zenoh_macros::unstable]
166#[allow(deprecated)]
167impl IntoFuture for PublicationCacheBuilder<'_, '_, '_, true> {
168 type Output = <Self as Resolvable>::To;
169 type IntoFuture = Ready<<Self as Resolvable>::To>;
170
171 #[zenoh_macros::unstable]
172 fn into_future(self) -> Self::IntoFuture {
173 std::future::ready(self.wait())
174 }
175}
176
177#[zenoh_macros::unstable]
179#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
180pub struct PublicationCache {
181 local_sub: Subscriber<FifoChannelHandler<Sample>>,
182 _queryable: Queryable<FifoChannelHandler<Query>>,
183 task: TerminatableTask,
184}
185
186#[zenoh_macros::unstable]
187#[allow(deprecated)]
188impl PublicationCache {
189 #[zenoh_macros::unstable]
190 fn new<const BACKGROUND: bool>(
191 conf: PublicationCacheBuilder<'_, '_, '_, BACKGROUND>,
192 ) -> ZResult<PublicationCache> {
193 let key_expr = conf.pub_key_expr?;
194 let (queryable_suffix, queryable_key_expr): (Option<OwnedKeyExpr>, KeyExpr) =
196 match conf.queryable_suffix {
197 None => (None, key_expr.clone()),
198 Some(Ok(ke)) => {
199 let queryable_key_expr = &key_expr / &ke;
200 (Some(ke.into()), queryable_key_expr)
201 }
202 Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
203 };
204 tracing::debug!(
205 "Create PublicationCache on {} with history={} resource_limit={:?}",
206 &key_expr,
207 conf.history,
208 conf.resources_limit
209 );
210
211 if conf.session.hlc().is_none() {
212 bail!(
213 "Failed requirement for PublicationCache on {}: \
214 the 'timestamping' setting must be enabled in the Zenoh configuration",
215 key_expr,
216 )
217 }
218
219 let mut local_sub = conf
221 .session
222 .declare_subscriber(&key_expr)
223 .allowed_origin(Locality::SessionLocal)
224 .wait()?;
225 if BACKGROUND {
226 local_sub.set_background(true);
227 }
228
229 let mut queryable = conf.session.declare_queryable(&queryable_key_expr);
231 if let Some(origin) = conf.queryable_origin {
232 queryable = queryable.allowed_origin(origin);
233 }
234 if let Some(complete) = conf.complete {
235 queryable = queryable.complete(complete);
236 }
237 let mut queryable = queryable.wait()?;
238 if BACKGROUND {
239 queryable.set_background(true);
240 }
241
242 let sub_recv = local_sub.handler().clone();
244 let quer_recv = queryable.handler().clone();
245 let pub_key_expr = key_expr.into_owned();
246 let resources_limit = conf.resources_limit;
247 let history = conf.history;
248
249 let token = TerminatableTask::create_cancellation_token();
251 let token2 = token.clone();
252 let task = TerminatableTask::spawn(
253 ZRuntime::Application,
254 async move {
255 let mut cache: HashMap<OwnedKeyExpr, VecDeque<Sample>> =
256 HashMap::with_capacity(resources_limit.unwrap_or(32));
257 let limit = resources_limit.unwrap_or(usize::MAX);
258 loop {
259 tokio::select! {
260 sample = sub_recv.recv_async() => {
262 if let Ok(sample) = sample {
263 let queryable_key_expr: KeyExpr<'_> = if let Some(suffix) = &queryable_suffix {
264 sample.key_expr() / suffix
265 } else {
266 sample.key_expr().clone()
267 };
268
269 if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
270 if queue.len() >= history {
271 queue.pop_front();
272 }
273 queue.push_back(sample);
274 } else if cache.len() >= limit {
275 tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
276 pub_key_expr);
277 } else {
278 let mut queue: VecDeque<Sample> = VecDeque::new();
279 queue.push_back(sample);
280 cache.insert(queryable_key_expr.into(), queue);
281 }
282 }
283 },
284
285 query = quer_recv.recv_async() => {
287 if let Ok(query) = query {
288 if !query.key_expr().as_str().contains('*') {
289 if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
290 for sample in queue {
291 if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
292 if !time_range.contains(timestamp.get_time().to_system_time()){
293 continue;
294 }
295 }
296 if let Err(e) = query.reply_sample(sample.clone()).await {
297 tracing::warn!("Error replying to query: {}", e);
298 }
299 }
300 }
301 } else {
302 for (key_expr, queue) in cache.iter() {
303 if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
304 for sample in queue {
305 if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
306 if !time_range.contains(timestamp.get_time().to_system_time()){
307 continue;
308 }
309 }
310 if let Err(e) = query.reply_sample(sample.clone()).await {
311 tracing::warn!("Error replying to query: {}", e);
312 }
313 }
314 }
315 }
316 }
317 }
318 },
319 _ = token2.cancelled() => return
320 }
321 }
322 },
323 token,
324 );
325
326 Ok(PublicationCache {
327 local_sub,
328 _queryable: queryable,
329 task,
330 })
331 }
332
333 #[zenoh_macros::unstable]
335 #[inline]
336 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
337 pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
338 ResolveFuture::new(async move {
339 let PublicationCache {
340 _queryable,
341 local_sub,
342 mut task,
343 } = self;
344 _queryable.undeclare().await?;
345 local_sub.undeclare().await?;
346 task.terminate(Duration::from_secs(10));
347 Ok(())
348 })
349 }
350
351 #[zenoh_macros::internal]
352 #[zenoh_macros::unstable]
353 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
354 pub fn set_background(&mut self, background: bool) {
355 self.local_sub.set_background(background);
356 self._queryable.set_background(background);
357 }
358
359 #[zenoh_macros::unstable]
360 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
361 pub fn key_expr(&self) -> &KeyExpr<'static> {
362 self.local_sub.key_expr()
363 }
364}