datacake_eventual_consistency/rpc/
client.rs1use std::collections::BTreeMap;
2use std::net::SocketAddr;
3
4use datacake_crdt::{HLCTimestamp, Key, OrSWotSet};
5use datacake_node::{Clock, NodeId};
6use datacake_rpc::{Channel, RpcClient, Status};
7use rkyv::AlignedVec;
8
9use crate::core::{Document, DocumentMetadata};
10use crate::rpc::services::consistency_impl::{
11 BatchPayload,
12 ConsistencyService,
13 Context,
14 MultiPutPayload,
15 MultiRemovePayload,
16 PutPayload,
17 RemovePayload,
18};
19use crate::rpc::services::replication_impl::{
20 FetchDocs,
21 GetState,
22 PollKeyspace,
23 ReplicationService,
24};
25use crate::{DocVec, Storage};
26
27pub struct ConsistencyClient<S>
29where
30 S: Storage,
31{
32 clock: Clock,
33 inner: RpcClient<ConsistencyService<S>>,
34}
35
36impl<S> ConsistencyClient<S>
37where
38 S: Storage,
39{
40 pub fn new(clock: Clock, channel: Channel) -> Self {
41 Self {
42 clock,
43 inner: RpcClient::new(channel),
44 }
45 }
46}
47
48impl<S> ConsistencyClient<S>
49where
50 S: Storage,
51{
52 pub async fn put(
54 &mut self,
55 keyspace: impl Into<String>,
56 document: Document,
57 node_id: NodeId,
58 node_addr: SocketAddr,
59 ) -> Result<(), Status> {
60 let timestamp = self.clock.get_time().await;
61 let ts = self
62 .inner
63 .send(&PutPayload {
64 keyspace: keyspace.into(),
65 document,
66 ctx: Some(Context { node_id, node_addr }),
67 timestamp,
68 })
69 .await?
70 .to_owned()
71 .map_err(Status::internal)?;
72 self.clock.register_ts(ts).await;
73 Ok(())
74 }
75
76 pub async fn multi_put(
78 &mut self,
79 keyspace: impl Into<String>,
80 documents: impl Iterator<Item = Document>,
81 node_id: NodeId,
82 node_addr: SocketAddr,
83 ) -> Result<(), Status> {
84 let timestamp = self.clock.get_time().await;
85 let ts = self
86 .inner
87 .send(&MultiPutPayload {
88 keyspace: keyspace.into(),
89 documents: documents.collect(),
90 ctx: Some(Context { node_id, node_addr }),
91 timestamp,
92 })
93 .await?
94 .to_owned()
95 .map_err(Status::internal)?;
96 self.clock.register_ts(ts).await;
97 Ok(())
98 }
99
100 pub async fn del(
102 &mut self,
103 keyspace: impl Into<String>,
104 id: Key,
105 ts: HLCTimestamp,
106 ) -> Result<(), Status> {
107 let timestamp = self.clock.get_time().await;
108 let ts = self
109 .inner
110 .send(&RemovePayload {
111 keyspace: keyspace.into(),
112 document: DocumentMetadata::new(id, ts),
113 timestamp,
114 })
115 .await?
116 .to_owned()
117 .map_err(Status::internal)?;
118 self.clock.register_ts(ts).await;
119 Ok(())
120 }
121
122 pub async fn multi_del(
124 &mut self,
125 keyspace: impl Into<String>,
126 documents: DocVec<DocumentMetadata>,
127 ) -> Result<(), Status> {
128 let timestamp = self.clock.get_time().await;
129 let ts = self
130 .inner
131 .send(&MultiRemovePayload {
132 keyspace: keyspace.into(),
133 documents,
134 timestamp,
135 })
136 .await?
137 .to_owned()
138 .map_err(Status::internal)?;
139 self.clock.register_ts(ts).await;
140 Ok(())
141 }
142
143 pub async fn apply_batch(&mut self, batch: &BatchPayload) -> Result<(), Status> {
144 let ts = self
145 .inner
146 .send(batch)
147 .await?
148 .to_owned()
149 .map_err(Status::internal)?;
150 self.clock.register_ts(ts).await;
151 Ok(())
152 }
153}
154
155pub struct ReplicationClient<S>
157where
158 S: Storage,
159{
160 clock: Clock,
161 inner: RpcClient<ReplicationService<S>>,
162}
163
164impl<S> ReplicationClient<S>
165where
166 S: Storage,
167{
168 pub fn new(clock: Clock, channel: Channel) -> Self {
169 Self {
170 clock,
171 inner: RpcClient::new(channel),
172 }
173 }
174}
175
176impl<S> ReplicationClient<S>
177where
178 S: Storage,
179{
180 pub async fn poll_keyspace(
182 &mut self,
183 ) -> Result<BTreeMap<String, HLCTimestamp>, Status> {
184 let timestamp = self.clock.get_time().await;
185 let inner = self
186 .inner
187 .send(&PollKeyspace(timestamp))
188 .await?
189 .to_owned()
190 .map_err(Status::internal)?;
191
192 self.clock.register_ts(inner.timestamp).await;
193 Ok(inner.keyspace_timestamps)
194 }
195
196 pub async fn get_state(
203 &mut self,
204 keyspace: impl Into<String>,
205 ) -> Result<(HLCTimestamp, OrSWotSet<{ crate::keyspace::NUM_SOURCES }>), Status>
206 {
207 let timestamp = self.clock.get_time().await;
208 let inner = self
209 .inner
210 .send(&GetState {
211 timestamp,
212 keyspace: keyspace.into(),
213 })
214 .await?
215 .to_owned()
216 .map_err(Status::internal)?;
217
218 self.clock.register_ts(inner.timestamp).await;
219
220 let mut aligned = AlignedVec::with_capacity(inner.set.len());
221 aligned.extend_from_slice(&inner.set);
222
223 let state = rkyv::from_bytes(&aligned).map_err(|_| Status::invalid())?;
224 Ok((inner.last_updated, state))
225 }
226
227 pub async fn fetch_docs(
229 &mut self,
230 keyspace: impl Into<String>,
231 doc_ids: Vec<Key>,
232 ) -> Result<Vec<Document>, Status> {
233 let timestamp = self.clock.get_time().await;
234 let inner = self
235 .inner
236 .send(&FetchDocs {
237 timestamp,
238 keyspace: keyspace.into(),
239 doc_ids,
240 })
241 .await?;
242
243 let payload = inner.to_owned().unwrap();
244
245 self.clock.register_ts(payload.timestamp).await;
246 Ok(payload.documents)
247 }
248}