use std::io::Cursor;
use crate::Commands;
use crate::message::DispatchMeta;
use crate::replica_inbox::ReplicaApply;
use crate::replication_gate::ReplicatedApplyGuard;
use crate::shard::Shard;
impl<C: Commands> Shard<C> {
pub(crate) fn drain_replica_inbox(&mut self) {
let Some(inbox) = self.replica_inbox.as_ref() else {
return;
};
const MAX_PER_TICK: usize = 1024;
let mut events = Vec::with_capacity(64);
for ev in inbox.inner.try_iter().take(MAX_PER_TICK) {
events.push(ev);
}
for ev in events {
self.apply_replica_event(ev);
}
}
fn apply_replica_event(&mut self, ev: ReplicaApply) {
match ev {
ReplicaApply::SnapshotBegin => {
self.replica_snapshot_buf.clear();
}
ReplicaApply::SnapshotChunk(bytes) => {
self.replica_snapshot_buf.extend_from_slice(&bytes);
}
ReplicaApply::SnapshotEnd { ack_offset: _ } => {
let buf = std::mem::take(&mut self.replica_snapshot_buf);
if let Err(e) = kevy_persist::load_snapshot_from(
&mut self.store,
Cursor::new(buf.as_slice()),
) {
eprintln!(
"kevy: shard {} replica snapshot load failed: {e}",
self.id,
);
}
}
ReplicaApply::Frame { offset: _, argv } => {
self.apply_replica_frame(&argv);
}
}
}
fn apply_replica_frame(&mut self, argv: &crate::Argv) {
let _guard = ReplicatedApplyGuard::enter();
let resolved = self.commands.resolve(argv);
let meta = DispatchMeta {
is_write: resolved.is_write,
wake_idx: resolved.wake_idx,
key_idx: match resolved.route {
crate::Route::Single(idx) => u8::try_from(idx).ok(),
_ => None,
},
};
self.reply_scratch.clear();
self.commands
.dispatch_into(&mut self.store, argv, &mut self.reply_scratch);
self.reply_scratch.clear();
self.post_write_housekeeping(argv, meta);
}
}