1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
//! Replica-side apply path (T1.29(c) + (b)) — the [`Shard`] half of
//! the cross-thread bridge from the replica runner to the local
//! `Store`. The runner thread runs blocking
//! `kevy_replicate::replica::ReplicaClient::next_event` reads and
//! drops each event into the per-shard [`crate::ReplicaInboxSender`];
//! once per reactor tick, [`Shard::drain_replica_inbox`] empties the
//! channel and applies each event on the reactor's own thread (so
//! the `Store` is touched only by its owner, never cross-thread).
//!
//! Snapshot path: accumulates [`ReplicaApply::SnapshotChunk`] bytes
//! in `Shard.replica_snapshot_buf` until [`ReplicaApply::SnapshotEnd`]
//! arrives, then hands the buffer to `kevy_persist::load_snapshot_from`
//! → the local `Store` is replaced.
//!
//! Live-frame path: each [`ReplicaApply::Frame`] runs through
//! `Commands::dispatch_into` inside a [`crate::ReplicatedApplyGuard`]
//! scope (so the apply doesn't re-push into this shard's downstream
//! source) followed by the usual `post_write_housekeeping` (AOF /
//! WATCH bump / keyspace notify / BLOCK wake all still fire — local
//! readers must see consistent state).
use std::io::Cursor;
use crate::Commands;
use crate::message::DispatchMeta;
use crate::replica_inbox::ReplicaApply;
use crate::replication_gate::ReplicatedApplyGuard;
use crate::shard::Shard;
impl<C: Commands> Shard<C> {
/// Drain every pending replica-runner event for this shard,
/// applying each on the reactor thread. Called from the per-tick
/// housekeeping branch alongside [`Self::tick_persist`]. No-op
/// (one `Option::is_none` check) when this shard isn't running
/// as a replica.
pub(crate) fn drain_replica_inbox(&mut self) {
let Some(inbox) = self.replica_inbox.as_ref() else {
return;
};
// Take ownership of all currently-queued events without
// blocking. `try_iter` yields until the channel is empty;
// we cap the per-tick budget to keep the reactor responsive
// when a flood of frames lands at once.
const MAX_PER_TICK: usize = 1024;
let mut events = Vec::with_capacity(64);
for ev in inbox.inner.try_iter().take(MAX_PER_TICK) {
events.push(ev);
}
for ev in events {
self.apply_replica_event(ev);
}
}
/// Apply one [`ReplicaApply`] event. Split out so the iter
/// borrow on `self.replica_inbox.inner` doesn't conflict with the
/// `&mut self` apply methods need.
fn apply_replica_event(&mut self, ev: ReplicaApply) {
match ev {
ReplicaApply::SnapshotBegin => {
self.replica_snapshot_buf.clear();
}
ReplicaApply::SnapshotChunk(bytes) => {
self.replica_snapshot_buf.extend_from_slice(&bytes);
}
ReplicaApply::SnapshotEnd { ack_offset: _ } => {
let buf = std::mem::take(&mut self.replica_snapshot_buf);
if let Err(e) = kevy_persist::load_snapshot_from(
&mut self.store,
Cursor::new(buf.as_slice()),
) {
eprintln!(
"kevy: shard {} replica snapshot load failed: {e}",
self.id,
);
}
}
ReplicaApply::Frame { offset: _, argv } => {
self.apply_replica_frame(&argv);
}
}
}
/// Dispatch one replicated mutation frame against the local
/// `Store`. The [`ReplicatedApplyGuard`] suppresses the source
/// push inside `post_write_housekeeping`; everything else (AOF,
/// WATCH bump, keyspace notify, BLOCK wake) fires normally.
fn apply_replica_frame(&mut self, argv: &crate::Argv) {
let _guard = ReplicatedApplyGuard::enter();
let resolved = self.commands.resolve(argv);
let meta = DispatchMeta {
is_write: resolved.is_write,
wake_idx: resolved.wake_idx,
key_idx: match resolved.route {
crate::Route::Single(idx) => u8::try_from(idx).ok(),
_ => None,
},
};
self.reply_scratch.clear();
self.commands
.dispatch_into(&mut self.store, argv, &mut self.reply_scratch);
self.reply_scratch.clear();
self.post_write_housekeeping(argv, meta);
}
}