fred/commands/interfaces/
tracking.rs

1use crate::{
2  commands,
3  interfaces::ClientLike,
4  prelude::FredResult,
5  runtime::{spawn, BroadcastReceiver, JoinHandle},
6  types::{client::Invalidation, MultipleStrings},
7};
8use fred_macros::rm_send_if;
9use futures::Future;
10
11/// A high level interface that supports [client side caching](https://redis.io/docs/manual/client-side-caching/) via the [client tracking](https://redis.io/commands/client-tracking/) interface.
12#[cfg_attr(docsrs, doc(cfg(feature = "i-tracking")))]
13#[rm_send_if(feature = "glommio")]
14pub trait TrackingInterface: ClientLike + Sized {
15  /// Send the [CLIENT TRACKING](https://redis.io/commands/client-tracking/) command to all connected servers, subscribing to [invalidation messages](Self::on_invalidation) on the same connection.
16  ///
17  /// This interface requires the RESP3 protocol mode and supports all server deployment types (centralized,
18  /// clustered, and sentinel).
19  ///
20  /// See the basic [client tracking](crate::interfaces::ClientInterface::client_tracking) function for more
21  /// information on the underlying commands.
22  fn start_tracking<P>(
23    &self,
24    prefixes: P,
25    bcast: bool,
26    optin: bool,
27    optout: bool,
28    noloop: bool,
29  ) -> impl Future<Output = FredResult<()>> + Send
30  where
31    P: Into<MultipleStrings> + Send,
32  {
33    async move {
34      into!(prefixes);
35      commands::tracking::start_tracking(self, prefixes, bcast, optin, optout, noloop).await
36    }
37  }
38
39  /// Disable client tracking on all connections.
40  fn stop_tracking(&self) -> impl Future<Output = FredResult<()>> + Send {
41    async move { commands::tracking::stop_tracking(self).await }
42  }
43
44  /// Spawn a task that processes invalidation messages from the server.
45  ///
46  /// See [invalidation_rx](Self::invalidation_rx) for a more flexible variation of this function.
47  fn on_invalidation<F>(&self, func: F) -> JoinHandle<FredResult<()>>
48  where
49    F: Fn(Invalidation) -> FredResult<()> + Send + 'static,
50  {
51    let mut invalidation_rx = self.invalidation_rx();
52
53    spawn(async move {
54      let mut result = Ok(());
55
56      while let Ok(invalidation) = invalidation_rx.recv().await {
57        if let Err(err) = func(invalidation) {
58          result = Err(err);
59          break;
60        }
61      }
62      result
63    })
64  }
65
66  /// Subscribe to invalidation messages from the server(s).
67  fn invalidation_rx(&self) -> BroadcastReceiver<Invalidation> {
68    self.inner().notifications.invalidations.load().subscribe()
69  }
70}