kevy_rt/replica_inbox.rs
1//! Cross-thread inbox from an external replica runner into a
2//! [`Shard`]'s reactor thread. The replica runner lives on its own OS
3//! thread (it does blocking `TcpStream` reads from the upstream
4//! primary via `ReplicaClient`); applying mutations to the shard's
5//! `Store` must happen on the shard's reactor thread, so the runner
6//! drops events into this channel and the shard drains it once per
7//! tick.
8//!
9//! The kevy server (the embedder) creates one [`ReplicaInbox`] pair
10//! per shard before `Runtime::run`, hands the receivers to the
11//! runtime via `with_replica_inboxes`, and keeps the senders to wire
12//! into the runner threads. v1.18 spawns one runner per shard
13//! (matching the primary's per-shard listener layout), so the
14//! channels are 1:1.
15//!
16//! v1.18 cap: events are unbounded. Each [`ReplicaApply::Frame`]
17//! carries an owned [`Argv`] (snapshot path is `Vec<u8>` chunks); for
18//! a slow shard this can grow. Backpressure / capping is tracked as a
19//! follow-up — the v1.18 model assumes the shard's apply rate matches
20//! the upstream emit rate (single-machine cluster). The unbounded
21//! channel never blocks the runner thread, so a stuck shard never
22//! stalls the runner's TCP read (it just buffers).
23
24use std::sync::mpsc::{Receiver, SendError, Sender, channel};
25
26use crate::Argv;
27
28/// One event delivered from a replica runner to its target shard.
29/// Mirrors `kevy_replicate::replica::ReplicaEvent` except `Frame`
30/// carries an owned [`Argv`] (already decoded by the runner) instead
31/// of a `DecodedFrame { offset, argv }` — the offset is gap-checked
32/// by the runner on the way in, so the shard doesn't need it.
33#[derive(Debug)]
34pub enum ReplicaApply {
35 /// Upstream started shipping a full snapshot. The shard should
36 /// reset its accumulating snapshot buffer.
37 SnapshotBegin,
38 /// One chunk of snapshot bytes. The shard appends to its buffer.
39 SnapshotChunk(Vec<u8>),
40 /// Upstream finished the snapshot. The shard hands its buffered
41 /// bytes to `kevy_persist::load_snapshot_from` (replacing the
42 /// `Store` contents) and resumes at `ack_offset` for live frames.
43 SnapshotEnd { ack_offset: u64 },
44 /// One live mutation frame to be applied via `kevy::dispatch`
45 /// (inside a [`crate::ReplicatedApplyGuard`] scope so the apply
46 /// doesn't re-push into this shard's downstream
47 /// `ReplicationSource`).
48 Frame { offset: u64, argv: Argv },
49}
50
51/// Sender end of a per-shard replica inbox. `Send + Clone + Sync`
52/// (one std::sync::mpsc::Sender, no extra state) so the embedder can
53/// hand it freely to runner threads.
54#[derive(Clone)]
55pub struct ReplicaInboxSender {
56 inner: Sender<ReplicaApply>,
57}
58
59impl ReplicaInboxSender {
60 /// Send one event to the target shard. Fails only when the shard
61 /// has dropped its receiver (the runtime stopped or the shard
62 /// crashed) — the runner should treat that as "no more apply
63 /// possible" and exit.
64 pub fn send(&self, ev: ReplicaApply) -> Result<(), SendError<ReplicaApply>> {
65 self.inner.send(ev)
66 }
67}
68
69/// Receiver end. Lives inside the (private) `Shard`; drained once
70/// per reactor tick. Constructed by [`replica_inbox_pair`] and
71/// handed to the runtime via `Runtime::with_replica_inboxes`.
72pub struct ReplicaInboxReceiver {
73 pub(crate) inner: Receiver<ReplicaApply>,
74}
75
76/// Create a matched (sender, receiver) pair for one shard's replica
77/// inbox. The embedder calls this `nshards` times before
78/// `Runtime::run`.
79#[must_use]
80pub fn replica_inbox_pair() -> (ReplicaInboxSender, ReplicaInboxReceiver) {
81 let (tx, rx) = channel();
82 (
83 ReplicaInboxSender { inner: tx },
84 ReplicaInboxReceiver { inner: rx },
85 )
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91
92 #[test]
93 fn pair_round_trips_one_event() {
94 let (tx, rx) = replica_inbox_pair();
95 tx.send(ReplicaApply::SnapshotBegin).unwrap();
96 match rx.inner.recv().unwrap() {
97 ReplicaApply::SnapshotBegin => {}
98 other => panic!("expected SnapshotBegin, got {other:?}"),
99 }
100 }
101
102 #[test]
103 fn drop_receiver_makes_send_fail() {
104 let (tx, rx) = replica_inbox_pair();
105 drop(rx);
106 let err = tx.send(ReplicaApply::SnapshotBegin).unwrap_err();
107 match err.0 {
108 ReplicaApply::SnapshotBegin => {}
109 other => panic!("expected payload roundtrip, got {other:?}"),
110 }
111 }
112
113 #[test]
114 fn sender_is_clone_send_sync() {
115 fn assert_traits<T: Clone + Send + Sync>() {}
116 assert_traits::<ReplicaInboxSender>();
117 }
118}