Skip to main content

atomr_distributed_data/
replicator_actor.rs

1//! `ReplicatorActor` — Phase 8.E.
2//!
3//! Wraps a [`crate::Replicator`] in an actor-style task: all reads and
4//! writes are serialized through an mpsc command channel rather than the
5//! existing `RwLock<HashMap>` plumbing. Useful in cluster mode where
6//! gossip-driven merges must interleave with consistency-level quorums
7//! without lock contention.
8//!
9//! Pairs with [`crate::DurableStore`] (Phase 8.F) for write-through
10//! persistence: every successful update / delete flushes through the
11//! configured store before the response is acknowledged.
12
13use std::sync::Arc;
14
15use tokio::sync::{mpsc, oneshot};
16use tokio::task::JoinHandle;
17
18use crate::durable::{DurableStore, NoopDurableStore};
19use crate::replicator::{ReadConsistency, Replicator, WriteConsistency};
20use crate::traits::CrdtMerge;
21
22/// Tagged response payloads for [`ReplicatorActor`] commands.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum ReplicatorAck {
26    Ok,
27    KeyNotFound,
28    Timeout,
29}
30
31type DynUpdate = Box<dyn FnOnce(&Arc<Replicator>) -> Result<(), ReplicatorError> + Send + 'static>;
32type DynQuery = Box<dyn FnOnce(&Arc<Replicator>) + Send + 'static>;
33
34enum Cmd {
35    Update { key: String, op: DynUpdate, ack: oneshot::Sender<ReplicatorAck> },
36    Query { op: DynQuery },
37    Delete { key: String, ack: oneshot::Sender<ReplicatorAck> },
38    Shutdown,
39}
40
41#[derive(Debug, thiserror::Error)]
42pub enum ReplicatorError {
43    #[error("type mismatch for key {0}")]
44    TypeMismatch(String),
45}
46
47/// Actor-style replicator handle.
48pub struct ReplicatorActor {
49    cmd: mpsc::UnboundedSender<Cmd>,
50    inner: Arc<Replicator>,
51    join: Option<JoinHandle<()>>,
52}
53
54impl ReplicatorActor {
55    /// Spawn a new replicator with the default in-memory store.
56    pub fn spawn() -> Self {
57        Self::spawn_with(Arc::new(NoopDurableStore))
58    }
59
60    /// Spawn a new replicator with a pluggable durable backend.
61    pub fn spawn_with(store: Arc<dyn DurableStore>) -> Self {
62        let inner = Replicator::new();
63        let inner2 = inner.clone();
64        let (tx, mut rx) = mpsc::unbounded_channel::<Cmd>();
65        let join = tokio::spawn(async move {
66            while let Some(cmd) = rx.recv().await {
67                match cmd {
68                    Cmd::Update { key, op, ack } => {
69                        let res = op(&inner2);
70                        if res.is_ok() {
71                            // Snapshot the entry as serialized bytes via the durable store
72                            // (the type-erased layer means we can only persist the key here;
73                            // the user is responsible for installing a typed store via
74                            // `register_persistor` for now — keep this actor a serialized
75                            // façade and let durability be opt-in per key).
76                            let _ = store.persist_marker(&key);
77                        }
78                        let _ = ack.send(match res {
79                            Ok(()) => ReplicatorAck::Ok,
80                            Err(_) => ReplicatorAck::KeyNotFound,
81                        });
82                    }
83                    Cmd::Query { op } => op(&inner2),
84                    Cmd::Delete { key, ack } => {
85                        inner2.delete(&key);
86                        let _ = store.delete_marker(&key);
87                        let _ = ack.send(ReplicatorAck::Ok);
88                    }
89                    Cmd::Shutdown => break,
90                }
91            }
92        });
93        Self { cmd: tx, inner, join: Some(join) }
94    }
95
96    /// Read-only access to the inner replicator (subscriptions etc).
97    pub fn inner(&self) -> &Arc<Replicator> {
98        &self.inner
99    }
100
101    /// Submit a typed update and wait for the ack.
102    pub async fn update<T>(&self, key: impl Into<String>, value: T, _write: WriteConsistency) -> ReplicatorAck
103    where
104        T: CrdtMerge + Send + Sync + 'static,
105    {
106        let key = key.into();
107        let key_for_op = key.clone();
108        let (ack_tx, ack_rx) = oneshot::channel();
109        let op: DynUpdate = Box::new(move |r: &Arc<Replicator>| {
110            r.update(&key_for_op, value);
111            Ok(())
112        });
113        if self.cmd.send(Cmd::Update { key, op, ack: ack_tx }).is_err() {
114            return ReplicatorAck::Timeout;
115        }
116        ack_rx.await.unwrap_or(ReplicatorAck::Timeout)
117    }
118
119    /// Read a key. Consistency is `Local` for now (Phase 8.D wires the
120    /// quorum exchange once gossip lands).
121    pub async fn get<T>(&self, key: impl Into<String>, _read: ReadConsistency) -> Option<T>
122    where
123        T: CrdtMerge + Clone + Send + Sync + 'static,
124    {
125        let (tx, rx) = oneshot::channel();
126        let key = key.into();
127        let op: DynQuery = Box::new(move |r: &Arc<Replicator>| {
128            let v: Option<T> = r.get::<T>(&key);
129            let _ = tx.send(v);
130        });
131        if self.cmd.send(Cmd::Query { op }).is_err() {
132            return None;
133        }
134        rx.await.ok().flatten()
135    }
136
137    /// Delete a key.
138    pub async fn delete(&self, key: impl Into<String>) -> ReplicatorAck {
139        let (ack_tx, ack_rx) = oneshot::channel();
140        if self.cmd.send(Cmd::Delete { key: key.into(), ack: ack_tx }).is_err() {
141            return ReplicatorAck::Timeout;
142        }
143        ack_rx.await.unwrap_or(ReplicatorAck::Timeout)
144    }
145
146    /// Stop the actor task and join it.
147    pub async fn shutdown(mut self) {
148        let _ = self.cmd.send(Cmd::Shutdown);
149        if let Some(j) = self.join.take() {
150            let _ = j.await;
151        }
152    }
153}
154
155impl Drop for ReplicatorActor {
156    fn drop(&mut self) {
157        let _ = self.cmd.send(Cmd::Shutdown);
158        if let Some(j) = self.join.take() {
159            j.abort();
160        }
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use crate::GCounter;
168    use std::time::Duration;
169
170    #[tokio::test]
171    async fn actor_serializes_update_and_get() {
172        let r = ReplicatorActor::spawn();
173        let mut c = GCounter::new();
174        c.increment("n1", 4);
175        let ack = r.update("k", c, WriteConsistency::Local).await;
176        assert_eq!(ack, ReplicatorAck::Ok);
177        let got: GCounter = r.get::<GCounter>("k", ReadConsistency::Local).await.expect("key found");
178        assert_eq!(got.value(), 4);
179        r.shutdown().await;
180    }
181
182    #[tokio::test]
183    async fn actor_delete_marks_key_gone() {
184        let r = ReplicatorActor::spawn();
185        let mut c = GCounter::new();
186        c.increment("n1", 1);
187        r.update("k", c, WriteConsistency::Local).await;
188        r.delete("k").await;
189        let v: Option<GCounter> = r.get("k", ReadConsistency::Local).await;
190        assert!(v.is_none());
191        r.shutdown().await;
192    }
193
194    #[tokio::test]
195    async fn actor_persists_through_durable_store() {
196        let store = Arc::new(crate::durable::FileDurableStore::tmp().unwrap());
197        let r = ReplicatorActor::spawn_with(store.clone());
198        let mut c = GCounter::new();
199        c.increment("n1", 7);
200        r.update("k", c, WriteConsistency::Local).await;
201        // Give the spawned task one scheduling tick.
202        tokio::time::sleep(Duration::from_millis(20)).await;
203        assert!(store.contains("k"));
204        r.delete("k").await;
205        tokio::time::sleep(Duration::from_millis(20)).await;
206        assert!(!store.contains("k"));
207        r.shutdown().await;
208    }
209}