1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use crate::{
commands,
interfaces::ClientLike,
prelude::FredResult,
runtime::{spawn, BroadcastReceiver, JoinHandle},
types::{client::Invalidation, MultipleStrings},
};
use fred_macros::rm_send_if;
use futures::Future;
/// A high level interface that supports [client side caching](https://redis.io/docs/manual/client-side-caching/) via the [client tracking](https://redis.io/commands/client-tracking/) interface.
#[cfg_attr(docsrs, doc(cfg(feature = "i-tracking")))]
#[rm_send_if(feature = "glommio")]
pub trait TrackingInterface: ClientLike + Sized {
/// Send the [CLIENT TRACKING](https://redis.io/commands/client-tracking/) command to all connected servers, subscribing to [invalidation messages](Self::on_invalidation) on the same connection.
///
/// This interface requires the RESP3 protocol mode and supports all server deployment types (centralized,
/// clustered, and sentinel).
///
/// See the basic [client tracking](crate::interfaces::ClientInterface::client_tracking) function for more
/// information on the underlying commands.
fn start_tracking<P>(
&self,
prefixes: P,
bcast: bool,
optin: bool,
optout: bool,
noloop: bool,
) -> impl Future<Output = FredResult<()>> + Send
where
P: Into<MultipleStrings> + Send,
{
async move {
into!(prefixes);
commands::tracking::start_tracking(self, prefixes, bcast, optin, optout, noloop).await
}
}
/// Disable client tracking on all connections.
fn stop_tracking(&self) -> impl Future<Output = FredResult<()>> + Send {
async move { commands::tracking::stop_tracking(self).await }
}
/// Spawn a task that processes invalidation messages from the server.
///
/// See [invalidation_rx](Self::invalidation_rx) for a more flexible variation of this function.
fn on_invalidation<F>(&self, func: F) -> JoinHandle<FredResult<()>>
where
F: Fn(Invalidation) -> FredResult<()> + Send + 'static,
{
let mut invalidation_rx = self.invalidation_rx();
spawn(async move {
let mut result = Ok(());
while let Ok(invalidation) = invalidation_rx.recv().await {
if let Err(err) = func(invalidation) {
result = Err(err);
break;
}
}
result
})
}
/// Subscribe to invalidation messages from the server(s).
fn invalidation_rx(&self) -> BroadcastReceiver<Invalidation> {
self.inner().notifications.invalidations.load().subscribe()
}
}