use {
crate::{
groups::{
Command,
Cursor,
Index,
StateMachine,
SyncContext,
Term,
machine::*,
},
primitives::{UniqueId, datum::Encoded},
},
core::{
any::type_name,
marker::PhantomData,
num::NonZero,
ops::RangeInclusive,
time::Duration,
},
provider::LogReplayProvider,
serde::{Deserialize, Serialize},
session::LogReplaySession,
};
mod provider;
mod session;
pub struct LogReplaySync<M: StateMachine> {
config: Config,
#[doc(hidden)]
_marker: PhantomData<M>,
}
impl<M: StateMachine> Default for LogReplaySync<M> {
fn default() -> Self {
Self {
config: Config::default(),
_marker: PhantomData,
}
}
}
impl<M: StateMachine> core::fmt::Debug for LogReplaySync<M> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("LogReplaySync")
.field("config", &self.config)
.finish()
}
}
impl<M: StateMachine> StateSync for LogReplaySync<M> {
type Machine = M;
type Message = LogReplaySyncMessage<M::Command>;
type Provider = LogReplayProvider<M>;
type Session = LogReplaySession<M>;
fn signature(&self) -> UniqueId {
UniqueId::from("mosaik_log_replay_sync")
.derive(type_name::<M>())
.derive(self.config.batch_size.to_le_bytes())
.derive(self.config.fetch_timeout.as_secs().to_le_bytes())
}
fn create_provider(&self, cx: &dyn SyncContext<Self>) -> Self::Provider {
LogReplayProvider::new(&self.config, cx)
}
fn create_session(
&self,
cx: &mut dyn SyncSessionContext<Self>,
position: Cursor,
_leader_committed: Index,
entries: Vec<(M::Command, Term)>,
) -> Self::Session {
LogReplaySession::new(&self.config, cx, position, entries)
}
}
impl<M: StateMachine> LogReplaySync<M> {
#[must_use]
pub const fn with_batch_size(mut self, batch_size: NonZero<u64>) -> Self {
self.config.batch_size = batch_size.get();
self
}
#[must_use]
pub const fn with_fetch_timeout(mut self, fetch_timeout: Duration) -> Self {
self.config.fetch_timeout = fetch_timeout;
self
}
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(bound = "")]
pub enum LogReplaySyncMessage<C: Command> {
AvailabilityRequest,
AvailabilityResponse(RangeInclusive<Index>),
FetchEntriesRequest(RangeInclusive<Index>),
FetchEntriesResponse {
range: RangeInclusive<Index>,
entries: Vec<(Encoded<C>, Term)>,
},
}
#[derive(Debug, Clone)]
struct Config {
pub batch_size: u64,
pub fetch_timeout: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
batch_size: 2000,
fetch_timeout: Duration::from_secs(25),
}
}
}