use {
super::{Config, LogReplaySync, LogReplaySyncMessage},
crate::{
groups::{
Index,
StateMachine,
StateSyncProvider,
SyncContext,
SyncProviderContext,
},
primitives::{Encoded, Pretty, Short},
},
core::marker::PhantomData,
};
pub struct LogReplayProvider<M: StateMachine> {
config: Config,
#[doc(hidden)]
_marker: PhantomData<M>,
}
impl<M: StateMachine> LogReplayProvider<M> {
pub(super) fn new(
config: &Config,
_: &dyn SyncContext<LogReplaySync<M>>,
) -> Self {
Self {
config: config.clone(),
_marker: PhantomData,
}
}
}
impl<M: StateMachine> StateSyncProvider for LogReplayProvider<M> {
type Owner = LogReplaySync<M>;
fn receive(
&mut self,
message: LogReplaySyncMessage<M::Command>,
sender: crate::PeerId,
cx: &mut dyn SyncProviderContext<Self::Owner>,
) -> Result<(), LogReplaySyncMessage<M::Command>> {
match message {
LogReplaySyncMessage::AvailabilityRequest => {
let available = cx.log().available();
let committed = cx.committed().index();
let available = *available.start()..=committed.min(*available.end());
tracing::trace!(
peer = %Short(sender),
range = %Pretty(&available),
group = %Short(cx.group_id()),
network = %Short(cx.network_id()),
"state availability confirmed for"
);
cx.send_to(
sender,
LogReplaySyncMessage::AvailabilityResponse(available),
)
.expect("infallible serialization");
}
LogReplaySyncMessage::FetchEntriesRequest(range) => {
if range.is_empty() {
return Ok(());
}
let len = range.end().distance(range.start()) + 1;
let len = len.min(self.config.batch_size);
let range = *range.start()..=(*range.start() + len - 1u32);
let entries = cx.log().get_range(&range);
if entries.is_empty() {
cx.send_to(sender, LogReplaySyncMessage::FetchEntriesResponse {
range: Index::zero()..=Index::zero(),
entries: Vec::new(),
})
.expect("infallible serialization");
return Ok(());
}
let actual_start = entries.first().unwrap().1;
let actual_end = entries.last().unwrap().1;
let actual_range = actual_start..=actual_end;
tracing::trace!(
peer = %Short(sender),
range = %Pretty(&actual_range),
group = %Short(cx.group_id()),
network = %Short(cx.network_id()),
"providing state to"
);
if let Err(e) =
cx.send_to(sender, LogReplaySyncMessage::FetchEntriesResponse {
range: actual_range.clone(),
entries: entries
.into_iter()
.map(|(term, _, command)| (Encoded(command), term))
.collect(),
}) {
tracing::warn!(
err = ?e,
peer = %Short(sender),
range = %Pretty(&actual_range),
group = %Short(cx.group_id()),
network = %Short(cx.network_id()),
"failed to encode log entries",
);
}
}
message => return Err(message),
}
Ok(())
}
}