Skip to main content

kevy_rt/
replica_inbox.rs

1//! Cross-thread inbox from an external replica runner into a
2//! [`Shard`]'s reactor thread. The replica runner lives on its own OS
3//! thread (it does blocking `TcpStream` reads from the upstream
4//! primary via `ReplicaClient`); applying mutations to the shard's
5//! `Store` must happen on the shard's reactor thread, so the runner
6//! drops events into this channel and the shard drains it once per
7//! tick.
8//!
9//! The kevy server (the embedder) creates one [`ReplicaInbox`] pair
10//! per shard before `Runtime::run`, hands the receivers to the
11//! runtime via `with_replica_inboxes`, and keeps the senders to wire
12//! into the runner threads. v1.18 spawns one runner per shard
13//! (matching the primary's per-shard listener layout), so the
14//! channels are 1:1.
15//!
16//! v1.18 cap: events are unbounded. Each [`ReplicaApply::Frame`]
17//! carries an owned [`Argv`] (snapshot path is `Vec<u8>` chunks); for
18//! a slow shard this can grow. Backpressure / capping is tracked as a
19//! follow-up — the v1.18 model assumes the shard's apply rate matches
20//! the upstream emit rate (single-machine cluster). The unbounded
21//! channel never blocks the runner thread, so a stuck shard never
22//! stalls the runner's TCP read (it just buffers).
23
24use std::sync::mpsc::{Receiver, SendError, Sender, channel};
25
26use crate::Argv;
27
28/// One event delivered from a replica runner to its target shard.
29/// Mirrors `kevy_replicate::replica::ReplicaEvent` except `Frame`
30/// carries an owned [`Argv`] (already decoded by the runner) instead
31/// of a `DecodedFrame { offset, argv }` — the offset is gap-checked
32/// by the runner on the way in, so the shard doesn't need it.
33#[derive(Debug)]
34pub enum ReplicaApply {
35    /// Upstream started shipping a full snapshot. The shard should
36    /// reset its accumulating snapshot buffer.
37    SnapshotBegin,
38    /// One chunk of snapshot bytes. The shard appends to its buffer.
39    SnapshotChunk(Vec<u8>),
40    /// Upstream finished the snapshot. The shard hands its buffered
41    /// bytes to `kevy_persist::load_snapshot_from` (replacing the
42    /// `Store` contents) and resumes at `ack_offset` for live frames.
43    SnapshotEnd { ack_offset: u64 },
44    /// One live mutation frame to be applied via `kevy::dispatch`
45    /// (inside a [`crate::ReplicatedApplyGuard`] scope so the apply
46    /// doesn't re-push into this shard's downstream
47    /// `ReplicationSource`).
48    Frame { offset: u64, argv: Argv },
49}
50
51/// Sender end of a per-shard replica inbox. `Send + Clone + Sync`
52/// (one std::sync::mpsc::Sender, no extra state) so the embedder can
53/// hand it freely to runner threads.
54#[derive(Clone)]
55pub struct ReplicaInboxSender {
56    inner: Sender<ReplicaApply>,
57}
58
59impl ReplicaInboxSender {
60    /// Send one event to the target shard. Fails only when the shard
61    /// has dropped its receiver (the runtime stopped or the shard
62    /// crashed) — the runner should treat that as "no more apply
63    /// possible" and exit.
64    pub fn send(&self, ev: ReplicaApply) -> Result<(), SendError<ReplicaApply>> {
65        self.inner.send(ev)
66    }
67}
68
69/// Receiver end. Lives inside the (private) `Shard`; drained once
70/// per reactor tick. Constructed by [`replica_inbox_pair`] and
71/// handed to the runtime via `Runtime::with_replica_inboxes`.
72pub struct ReplicaInboxReceiver {
73    pub(crate) inner: Receiver<ReplicaApply>,
74}
75
76/// Create a matched (sender, receiver) pair for one shard's replica
77/// inbox. The embedder calls this `nshards` times before
78/// `Runtime::run`.
79#[must_use]
80pub fn replica_inbox_pair() -> (ReplicaInboxSender, ReplicaInboxReceiver) {
81    let (tx, rx) = channel();
82    (
83        ReplicaInboxSender { inner: tx },
84        ReplicaInboxReceiver { inner: rx },
85    )
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    #[test]
93    fn pair_round_trips_one_event() {
94        let (tx, rx) = replica_inbox_pair();
95        tx.send(ReplicaApply::SnapshotBegin).unwrap();
96        match rx.inner.recv().unwrap() {
97            ReplicaApply::SnapshotBegin => {}
98            other => panic!("expected SnapshotBegin, got {other:?}"),
99        }
100    }
101
102    #[test]
103    fn drop_receiver_makes_send_fail() {
104        let (tx, rx) = replica_inbox_pair();
105        drop(rx);
106        let err = tx.send(ReplicaApply::SnapshotBegin).unwrap_err();
107        match err.0 {
108            ReplicaApply::SnapshotBegin => {}
109            other => panic!("expected payload roundtrip, got {other:?}"),
110        }
111    }
112
113    #[test]
114    fn sender_is_clone_send_sync() {
115        fn assert_traits<T: Clone + Send + Sync>() {}
116        assert_traits::<ReplicaInboxSender>();
117    }
118}