datacake_eventual_consistency/rpc/
client.rs

1use std::collections::BTreeMap;
2use std::net::SocketAddr;
3
4use datacake_crdt::{HLCTimestamp, Key, OrSWotSet};
5use datacake_node::{Clock, NodeId};
6use datacake_rpc::{Channel, RpcClient, Status};
7use rkyv::AlignedVec;
8
9use crate::core::{Document, DocumentMetadata};
10use crate::rpc::services::consistency_impl::{
11    BatchPayload,
12    ConsistencyService,
13    Context,
14    MultiPutPayload,
15    MultiRemovePayload,
16    PutPayload,
17    RemovePayload,
18};
19use crate::rpc::services::replication_impl::{
20    FetchDocs,
21    GetState,
22    PollKeyspace,
23    ReplicationService,
24};
25use crate::{DocVec, Storage};
26
27/// A high level wrapper around the consistency GRPC service.
28pub struct ConsistencyClient<S>
29where
30    S: Storage,
31{
32    clock: Clock,
33    inner: RpcClient<ConsistencyService<S>>,
34}
35
36impl<S> ConsistencyClient<S>
37where
38    S: Storage,
39{
40    pub fn new(clock: Clock, channel: Channel) -> Self {
41        Self {
42            clock,
43            inner: RpcClient::new(channel),
44        }
45    }
46}
47
48impl<S> ConsistencyClient<S>
49where
50    S: Storage,
51{
52    /// Adds a document to the remote node's state.
53    pub async fn put(
54        &mut self,
55        keyspace: impl Into<String>,
56        document: Document,
57        node_id: NodeId,
58        node_addr: SocketAddr,
59    ) -> Result<(), Status> {
60        let timestamp = self.clock.get_time().await;
61        let ts = self
62            .inner
63            .send(&PutPayload {
64                keyspace: keyspace.into(),
65                document,
66                ctx: Some(Context { node_id, node_addr }),
67                timestamp,
68            })
69            .await?
70            .to_owned()
71            .map_err(Status::internal)?;
72        self.clock.register_ts(ts).await;
73        Ok(())
74    }
75
76    /// Adds a set of documents to the remote node's state.
77    pub async fn multi_put(
78        &mut self,
79        keyspace: impl Into<String>,
80        documents: impl Iterator<Item = Document>,
81        node_id: NodeId,
82        node_addr: SocketAddr,
83    ) -> Result<(), Status> {
84        let timestamp = self.clock.get_time().await;
85        let ts = self
86            .inner
87            .send(&MultiPutPayload {
88                keyspace: keyspace.into(),
89                documents: documents.collect(),
90                ctx: Some(Context { node_id, node_addr }),
91                timestamp,
92            })
93            .await?
94            .to_owned()
95            .map_err(Status::internal)?;
96        self.clock.register_ts(ts).await;
97        Ok(())
98    }
99
100    /// Removes a document from the remote node's state.
101    pub async fn del(
102        &mut self,
103        keyspace: impl Into<String>,
104        id: Key,
105        ts: HLCTimestamp,
106    ) -> Result<(), Status> {
107        let timestamp = self.clock.get_time().await;
108        let ts = self
109            .inner
110            .send(&RemovePayload {
111                keyspace: keyspace.into(),
112                document: DocumentMetadata::new(id, ts),
113                timestamp,
114            })
115            .await?
116            .to_owned()
117            .map_err(Status::internal)?;
118        self.clock.register_ts(ts).await;
119        Ok(())
120    }
121
122    /// Removes a set of documents from the remote node's state.
123    pub async fn multi_del(
124        &mut self,
125        keyspace: impl Into<String>,
126        documents: DocVec<DocumentMetadata>,
127    ) -> Result<(), Status> {
128        let timestamp = self.clock.get_time().await;
129        let ts = self
130            .inner
131            .send(&MultiRemovePayload {
132                keyspace: keyspace.into(),
133                documents,
134                timestamp,
135            })
136            .await?
137            .to_owned()
138            .map_err(Status::internal)?;
139        self.clock.register_ts(ts).await;
140        Ok(())
141    }
142
143    pub async fn apply_batch(&mut self, batch: &BatchPayload) -> Result<(), Status> {
144        let ts = self
145            .inner
146            .send(batch)
147            .await?
148            .to_owned()
149            .map_err(Status::internal)?;
150        self.clock.register_ts(ts).await;
151        Ok(())
152    }
153}
154
155/// A high level wrapper around the replication GRPC service.
156pub struct ReplicationClient<S>
157where
158    S: Storage,
159{
160    clock: Clock,
161    inner: RpcClient<ReplicationService<S>>,
162}
163
164impl<S> ReplicationClient<S>
165where
166    S: Storage,
167{
168    pub fn new(clock: Clock, channel: Channel) -> Self {
169        Self {
170            clock,
171            inner: RpcClient::new(channel),
172        }
173    }
174}
175
176impl<S> ReplicationClient<S>
177where
178    S: Storage,
179{
180    /// Fetches the newest version of the node's keyspace timestamps.
181    pub async fn poll_keyspace(
182        &mut self,
183    ) -> Result<BTreeMap<String, HLCTimestamp>, Status> {
184        let timestamp = self.clock.get_time().await;
185        let inner = self
186            .inner
187            .send(&PollKeyspace(timestamp))
188            .await?
189            .to_owned()
190            .map_err(Status::internal)?;
191
192        self.clock.register_ts(inner.timestamp).await;
193        Ok(inner.keyspace_timestamps)
194    }
195
196    /// Fetches the node's current state for a given keyspace and returns
197    /// the last time the keyspace was modified.
198    ///
199    /// The returned timestamp must only be used when compared against timestamps produced
200    /// by the remote node itself. This is mostly provided to reduce unnecessary IO if the state
201    /// has changed between when the keyspace was polled, and when the state was requested.
202    pub async fn get_state(
203        &mut self,
204        keyspace: impl Into<String>,
205    ) -> Result<(HLCTimestamp, OrSWotSet<{ crate::keyspace::NUM_SOURCES }>), Status>
206    {
207        let timestamp = self.clock.get_time().await;
208        let inner = self
209            .inner
210            .send(&GetState {
211                timestamp,
212                keyspace: keyspace.into(),
213            })
214            .await?
215            .to_owned()
216            .map_err(Status::internal)?;
217
218        self.clock.register_ts(inner.timestamp).await;
219
220        let mut aligned = AlignedVec::with_capacity(inner.set.len());
221        aligned.extend_from_slice(&inner.set);
222
223        let state = rkyv::from_bytes(&aligned).map_err(|_| Status::invalid())?;
224        Ok((inner.last_updated, state))
225    }
226
227    /// Fetches a set of documents with the provided IDs belonging to the given keyspace.
228    pub async fn fetch_docs(
229        &mut self,
230        keyspace: impl Into<String>,
231        doc_ids: Vec<Key>,
232    ) -> Result<Vec<Document>, Status> {
233        let timestamp = self.clock.get_time().await;
234        let inner = self
235            .inner
236            .send(&FetchDocs {
237                timestamp,
238                keyspace: keyspace.into(),
239                doc_ids,
240            })
241            .await?;
242
243        let payload = inner.to_owned().unwrap();
244
245        self.clock.register_ts(payload.timestamp).await;
246        Ok(payload.documents)
247    }
248}