atomr_distributed_data/
replicator_actor.rs1use std::sync::Arc;
14
15use tokio::sync::{mpsc, oneshot};
16use tokio::task::JoinHandle;
17
18use crate::durable::{DurableStore, NoopDurableStore};
19use crate::replicator::{ReadConsistency, Replicator, WriteConsistency};
20use crate::traits::CrdtMerge;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum ReplicatorAck {
26 Ok,
27 KeyNotFound,
28 Timeout,
29}
30
31type DynUpdate = Box<dyn FnOnce(&Arc<Replicator>) -> Result<(), ReplicatorError> + Send + 'static>;
32type DynQuery = Box<dyn FnOnce(&Arc<Replicator>) + Send + 'static>;
33
34enum Cmd {
35 Update { key: String, op: DynUpdate, ack: oneshot::Sender<ReplicatorAck> },
36 Query { op: DynQuery },
37 Delete { key: String, ack: oneshot::Sender<ReplicatorAck> },
38 Shutdown,
39}
40
41#[derive(Debug, thiserror::Error)]
42pub enum ReplicatorError {
43 #[error("type mismatch for key {0}")]
44 TypeMismatch(String),
45}
46
47pub struct ReplicatorActor {
49 cmd: mpsc::UnboundedSender<Cmd>,
50 inner: Arc<Replicator>,
51 join: Option<JoinHandle<()>>,
52}
53
54impl ReplicatorActor {
55 pub fn spawn() -> Self {
57 Self::spawn_with(Arc::new(NoopDurableStore))
58 }
59
60 pub fn spawn_with(store: Arc<dyn DurableStore>) -> Self {
62 let inner = Replicator::new();
63 let inner2 = inner.clone();
64 let (tx, mut rx) = mpsc::unbounded_channel::<Cmd>();
65 let join = tokio::spawn(async move {
66 while let Some(cmd) = rx.recv().await {
67 match cmd {
68 Cmd::Update { key, op, ack } => {
69 let res = op(&inner2);
70 if res.is_ok() {
71 let _ = store.persist_marker(&key);
77 }
78 let _ = ack.send(match res {
79 Ok(()) => ReplicatorAck::Ok,
80 Err(_) => ReplicatorAck::KeyNotFound,
81 });
82 }
83 Cmd::Query { op } => op(&inner2),
84 Cmd::Delete { key, ack } => {
85 inner2.delete(&key);
86 let _ = store.delete_marker(&key);
87 let _ = ack.send(ReplicatorAck::Ok);
88 }
89 Cmd::Shutdown => break,
90 }
91 }
92 });
93 Self { cmd: tx, inner, join: Some(join) }
94 }
95
96 pub fn inner(&self) -> &Arc<Replicator> {
98 &self.inner
99 }
100
101 pub async fn update<T>(&self, key: impl Into<String>, value: T, _write: WriteConsistency) -> ReplicatorAck
103 where
104 T: CrdtMerge + Send + Sync + 'static,
105 {
106 let key = key.into();
107 let key_for_op = key.clone();
108 let (ack_tx, ack_rx) = oneshot::channel();
109 let op: DynUpdate = Box::new(move |r: &Arc<Replicator>| {
110 r.update(&key_for_op, value);
111 Ok(())
112 });
113 if self.cmd.send(Cmd::Update { key, op, ack: ack_tx }).is_err() {
114 return ReplicatorAck::Timeout;
115 }
116 ack_rx.await.unwrap_or(ReplicatorAck::Timeout)
117 }
118
119 pub async fn get<T>(&self, key: impl Into<String>, _read: ReadConsistency) -> Option<T>
122 where
123 T: CrdtMerge + Clone + Send + Sync + 'static,
124 {
125 let (tx, rx) = oneshot::channel();
126 let key = key.into();
127 let op: DynQuery = Box::new(move |r: &Arc<Replicator>| {
128 let v: Option<T> = r.get::<T>(&key);
129 let _ = tx.send(v);
130 });
131 if self.cmd.send(Cmd::Query { op }).is_err() {
132 return None;
133 }
134 rx.await.ok().flatten()
135 }
136
137 pub async fn delete(&self, key: impl Into<String>) -> ReplicatorAck {
139 let (ack_tx, ack_rx) = oneshot::channel();
140 if self.cmd.send(Cmd::Delete { key: key.into(), ack: ack_tx }).is_err() {
141 return ReplicatorAck::Timeout;
142 }
143 ack_rx.await.unwrap_or(ReplicatorAck::Timeout)
144 }
145
146 pub async fn shutdown(mut self) {
148 let _ = self.cmd.send(Cmd::Shutdown);
149 if let Some(j) = self.join.take() {
150 let _ = j.await;
151 }
152 }
153}
154
155impl Drop for ReplicatorActor {
156 fn drop(&mut self) {
157 let _ = self.cmd.send(Cmd::Shutdown);
158 if let Some(j) = self.join.take() {
159 j.abort();
160 }
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::GCounter;
168 use std::time::Duration;
169
170 #[tokio::test]
171 async fn actor_serializes_update_and_get() {
172 let r = ReplicatorActor::spawn();
173 let mut c = GCounter::new();
174 c.increment("n1", 4);
175 let ack = r.update("k", c, WriteConsistency::Local).await;
176 assert_eq!(ack, ReplicatorAck::Ok);
177 let got: GCounter = r.get::<GCounter>("k", ReadConsistency::Local).await.expect("key found");
178 assert_eq!(got.value(), 4);
179 r.shutdown().await;
180 }
181
182 #[tokio::test]
183 async fn actor_delete_marks_key_gone() {
184 let r = ReplicatorActor::spawn();
185 let mut c = GCounter::new();
186 c.increment("n1", 1);
187 r.update("k", c, WriteConsistency::Local).await;
188 r.delete("k").await;
189 let v: Option<GCounter> = r.get("k", ReadConsistency::Local).await;
190 assert!(v.is_none());
191 r.shutdown().await;
192 }
193
194 #[tokio::test]
195 async fn actor_persists_through_durable_store() {
196 let store = Arc::new(crate::durable::FileDurableStore::tmp().unwrap());
197 let r = ReplicatorActor::spawn_with(store.clone());
198 let mut c = GCounter::new();
199 c.increment("n1", 7);
200 r.update("k", c, WriteConsistency::Local).await;
201 tokio::time::sleep(Duration::from_millis(20)).await;
203 assert!(store.contains("k"));
204 r.delete("k").await;
205 tokio::time::sleep(Duration::from_millis(20)).await;
206 assert!(!store.contains("k"));
207 r.shutdown().await;
208 }
209}