1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
//! groupcache module provides public interface for consumers of the library.
//!
//! Entire functionality is exposed via [Groupcache] struct, which needs [ValueLoader] to be constructed.
use crate::errors::GroupcacheError;
use crate::groupcache_builder::GroupcacheBuilder;
use crate::GroupcacheInner;
use async_trait::async_trait;
use groupcache_pb::GroupcacheClient;
use groupcache_pb::GroupcacheServer;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use tonic::transport::Channel;

/// Contains most of the library API.
///
/// It is an [`Arc`] wrapper around [`GroupcacheInner`] which implements the API,
/// so that applications don't have to wrap groupcache inside [`Arc`] themselves
/// in concurrent context which is the target audience.
///
/// In order for groupcache peers to discover themselves application author needs to hook in some service discovery:
/// - static IP addresses of hosts running groupcache
/// - [consul](https://www.consul.io/)
/// - [kubernetes API server](https://kubernetes.io/docs/reference/command-line-tools-reference/kube-apiserver/)
/// - ...
///
/// Integration of [`crate::ServiceDiscovery`] with groupcache is done via:
/// - [`Groupcache::set_peers`] - preferred for pull-based service discovery,
///
/// There are also:
/// - [`Groupcache::add_peer`] and [`Groupcache::remove_peer`] - preferred for push-based service discovery.
///
/// There is an example showing how to use kubernetes API server for service discovery with groupcache -
/// [see here](https://github.com/Petroniuss/groupcache/tree/main/examples/kubernetes-service-discovery).
#[derive(Clone)]
pub struct Groupcache<Value: ValueBounds>(pub(crate) Arc<GroupcacheInner<Value>>);

impl<Value: ValueBounds> Groupcache<Value> {
    /// In order to construct [`Groupcache`] application needs to provide:
    /// - [`GroupcachePeer`] - necessary for routing
    /// - [`ValueLoader`] implementation
    pub fn builder(
        me: GroupcachePeer,
        loader: impl ValueLoader<Value = Value> + 'static,
    ) -> GroupcacheBuilder<Value> {
        GroupcacheBuilder::new(me, Box::new(loader))
    }

    /// Provided a given `key`
    /// groupcache attempts to figure out which peer owns
    /// a given KV pair based on consistent hashing
    /// and make sure that only this peer handles loading values for this particular key
    /// unless it's `hot` value which may be cached locally.
    ///
    /// If KV is found in local `hot_cache`, cached value is returned.
    /// If KV is owned by this peer and value is cached in `main_cache` it is returned.
    /// If KV is owned by this peer it is loaded via [`ValueLoader`].
    /// If KV is owned by a different peer, gRPC request is made to this peer using address provided in [`Groupcache::add_peer`]
    /// and that peer is responsible for loading that value in a replicated set of processes.
    /// If a request to peer fails, peer tries to load value locally via [`ValueLoader`].
    /// If loading value via [`ValueLoader`] fails an error is returned.
    ///
    /// Groupcache coordinates cache fills such that only one load in one process of an entire replicated set of processes populates the cache,
    /// then multiplexes the loaded value to all callers.
    ///
    /// Caches can be customized via [`Options`].
    pub async fn get(&self, key: &str) -> Result<Value, GroupcacheError> {
        self.0.get(key).await
    }

    /// Original groupcache library only provided [`Groupcache::get`] but there are use-cases
    /// where KV pairs need to be updated but this is problematic to do in a distributed system based on consistent hashing.
    ///
    /// This library does a simple thing:
    /// - remove method removes KV pair from `main_cache` of the owner of the KV pair
    /// - and removes KV pair from `hot_cache` of this node.
    ///
    /// However removed KV pair may still be cached on other nodes in `hot_cache`.
    /// To deal with this application can either:
    /// - accept that there might be some stale values served from `hot_cache` for some time after call to `remove`.
    /// - tweak `hot_cache` in such a way that it is acceptable for the application.
    ///   For example disabling it entirely so that value is cached only on the owner node of KV pair.
    ///   Note that this will likely increase number of RPCs over the network since all requests will have to go to the owner.
    pub async fn remove(&self, key: &str) -> Result<(), GroupcacheError> {
        self.0.remove(key).await
    }

    /// service-discovery:
    ///
    /// Once in a while groupcache backend should refresh view of groupcache nodes
    /// to make sure that groupcache routes traffic evenly to all healthy nodes.
    ///
    /// This method can be used to notify groupcache about all peers in the cluster.
    /// Groupcache will figure out:
    /// - which are new and - will try to open a connection to these peers
    /// - which it already knew about - will keep connection open as is.
    /// - which it knew about but are now missing - will disconnect with such peers
    ///
    /// Instead of using this method directly in a loop,
    /// applications can implement [`crate::ServiceDiscovery`]
    /// and only provide implementation fetching state of the cluster.
    ///
    /// If it isn't possible to connect to some [`GroupcachePeer`]s
    /// this method will return an error with all the peers it failed to connect to
    /// and won't update routing table with these peers.
    /// It will however update its routing table accordingly with peers that it successfully connected with.
    ///
    /// Note that [`Groupcache::set_peers`] isn't broadcasted to other peers,
    /// and each groupcache peer needs to update its routing table via the same call.
    /// In other words this only updates local routing table, not routing table of all nodes in the cluster.
    pub async fn set_peers(&self, peers: HashSet<GroupcachePeer>) -> Result<(), GroupcacheError> {
        self.0.set_peers(peers).await
    }

    /// service-discovery:
    ///
    /// whenever application notices that there is new groupcache peer it should notify groupcache
    /// so that routing table/consistent hashing ring can be updated.
    ///
    /// If it isn't possible to connect to [`GroupcachePeer`]
    /// this method will return an error and won't update the routing table.
    ///
    /// Upon success some portion of requests will be forwarded to this peer.
    ///
    /// Note that [`Groupcache::add_peer`] isn't broadcasted to other peers,
    /// and each groupcache peer needs to update its routing table via the same call.
    /// In other words this only updates local routing table, not routing table of all nodes in the cluster.
    pub async fn add_peer(&self, peer: GroupcachePeer) -> Result<(), GroupcacheError> {
        self.0.add_peer(peer).await
    }

    /// service-discovery:
    ///
    /// whenever application notices that a groupcache peer is no longer able to serve requests because:
    /// - it is down
    /// - the server is not healthy
    /// - it has been moved to a different address via container orchestrator
    /// - it isn't reachable.
    /// so that routing table/consistent hashing ring can be updated.
    ///
    /// Requests will no longer be forwarded to this peer.
    ///
    /// Note that [`Groupcache::remove_peer`] isn't broadcasted to other peers,
    /// and each groupcache peer needs to update its routing table via the same call.
    /// In other words this only updates local routing table, not routing table of all nodes in the cluster.
    pub async fn remove_peer(&self, peer: GroupcachePeer) -> Result<(), GroupcacheError> {
        self.0.remove_peer(peer).await
    }

    /// Retrieves underlying groupcache gRPC server implementation.
    ///
    /// Library doesn't start gRPC server automatically, it's instead responsibility of an application to do so.
    /// It is done this way to allow for customisations (tracing, metrics etc), see examples.
    pub fn grpc_service(&self) -> GroupcacheServer<GroupcacheInner<Value>> {
        GroupcacheServer::from_arc(self.0.clone())
    }

    /// Returns address of this peer.
    pub fn addr(&self) -> SocketAddr {
        self.0.addr()
    }
}

/// [ValueLoader] loads a value for a particular key - which can be potentially expensive.
/// Groupcache is responsible for calling load on whichever node is responsible for a particular key and caching that value.
/// [ValueLoader::Value]s will be cached by groupcache according to passed options.
///
/// [ValueLoader::Value] cached by groupcache must satisfy [ValueBounds].
///
/// If you want to load resources of different types,
/// your implementation of load may distinguish desired type by prefix of `key` and return an enum as [ValueLoader::Value].
/// This is a deviation from original groupcache library which implemented separate groups
/// and consumers of the library could have multiple implementation of [ValueLoader].
#[async_trait]
pub trait ValueLoader: Send + Sync {
    /// Value is a type returned by load, see [ValueBounds].
    type Value: ValueBounds;

    async fn load(
        &self,
        key: &str,
    ) -> Result<Self::Value, Box<dyn std::error::Error + Send + Sync + 'static>>;
}

/// [ValueLoader::Value] cached by groupcache must satisfy [ValueBounds]:
/// - serializable/deserializable: because they're sent over the network,
/// - cloneable: because value is loaded once and then multiplexed to all callers via clone,
/// - Send + Sync + 'static: because they're shared across potentially many threads.
///
/// Typical data structs should automatically conform to this trait.
/// ```
///     use serde::{Deserialize, Serialize};
///     #[derive(Clone, Deserialize, Serialize)]
///     struct DatabaseEntity {
///         id: String,
///         value: String,
///     }
/// ```
///
/// For small datastructures plain struct should suffice but if cached [ValueLoader::Value]
/// was large enough it might be worth it to wrap it inside [Arc] so that cached values are
/// are stored in memory only once and reference the same piece of data.
///
/// ```
///     use std::sync::Arc;
///     use serde::{Deserialize, Serialize};
///
///     #[derive(Clone, Deserialize, Serialize)]
///     struct Wrapped (Arc<Entity>);
///
///     #[derive(Clone, Deserialize, Serialize)]
///     struct Entity {
///         id: String,
///         value: String,
///     }
/// ```
pub trait ValueBounds: Serialize + for<'a> Deserialize<'a> + Clone + Send + Sync + 'static {}

/// Automatically implement [`ValueBounds`] for types that satisfy the trait.
impl<T: Serialize + for<'a> Deserialize<'a> + Clone + Send + Sync + 'static> ValueBounds for T {}

/// Groupcache uses tonic to connect to its peers
pub(crate) type GroupcachePeerClient = GroupcacheClient<Channel>;

/// Wrapper around peer address, will be used to connect to groupcache peer.
/// Peer should expose [`Groupcache::grpc_service`] under this address.
///
/// Use [`GroupcachePeer::from_socket`] or [`From<SocketAddr>`] to construct.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct GroupcachePeer {
    pub(crate) socket: SocketAddr,
}

impl GroupcachePeer {
    pub fn from_socket(value: SocketAddr) -> Self {
        From::from(value)
    }
}

impl From<SocketAddr> for GroupcachePeer {
    fn from(value: SocketAddr) -> Self {
        Self { socket: value }
    }
}