Skip to main content

mosaik/groups/machine/
mod.rs

1//! Replicated State Machine (RSM) module for consensus groups.
2//!
3//! This module defines the [`StateMachine`] trait — the main integration
4//! point for application-specific logic in a mosaik consensus group. Every
5//! group runs exactly one state machine instance per node, and the Raft
6//! protocol ensures that all instances process the same commands in the same
7//! order, converging on identical state.
8//!
9//! The module also defines the [`StateSync`] trait, which handles catch-up
10//! for followers that fall behind the leader's log. The built-in
11//! [`LogReplaySync`](super::LogReplaySync) implementation works with any
12//! state machine by replaying missed log entries.
13
14use {
15	crate::{
16		Datum,
17		groups::{Cursor, Term},
18		primitives::UniqueId,
19	},
20	serde::{Serialize, de::DeserializeOwned},
21};
22
23mod noop;
24mod sync;
25
26#[doc(hidden)]
27pub use noop::NoOp;
28// Public API traits for user-provided state machine implementations.
29pub use sync::*;
30
31/// The core trait for application-specific replicated state in a consensus
32/// group.
33///
34/// Every mosaik consensus group is backed by a single `StateMachine`
35/// implementation. The Raft protocol replicates [`Command`]s across all
36/// group members, and each member applies them **in the same order** to
37/// its local `StateMachine` instance, guaranteeing that all nodes converge
38/// on identical state.
39///
40/// # Lifecycle
41///
42/// 1. **Create** — you instantiate your state machine and pass it to the group
43///    builder via
44///    [`with_state_machine`](super::GroupBuilder::with_state_machine).
45/// 2. **Join** — calling [`join()`](super::GroupBuilder::join) on the builder
46///    starts the Raft protocol and returns a [`Group<M>`](super::Group) handle.
47/// 3. **Commands** — clients submit commands through
48///    [`Group::execute`](super::Group::execute). The leader replicates each
49///    command to a quorum of followers; once committed, every node calls
50///    [`apply`](StateMachine::apply) with that command.
51/// 4. **Queries** — clients call [`Group::query`](super::Group::query) with
52///    either `Weak` (local, possibly stale) or `Strong` (forwarded to leader)
53///    consistency. The group calls [`query`](StateMachine::query) on the local
54///    state machine and returns the result.
55/// 5. **Catch-up** — when a follower falls behind, the [`StateSync`]
56///    implementation returned by [`state_sync`](StateMachine::state_sync) fills
57///    the gap (replaying missed log entries or transferring a snapshot).
58///
59/// # Determinism
60///
61/// **`apply` must be deterministic.** Given the same sequence of commands,
62/// every node must produce the exact same state. Do not read wall-clock
63/// time, random values, or any external I/O inside `apply`. If you need
64/// non-deterministic input, inject it *into* a command before submitting
65/// it.
66///
67/// # Example
68///
69/// A simple counter replicated across a group:
70///
71/// ```rust
72/// use {
73/// 	mosaik::{
74/// 		UniqueId,
75/// 		groups::{
76/// 			ApplyContext,
77/// 			LeadershipPreference,
78/// 			LogReplaySync,
79/// 			StateMachine,
80/// 		},
81/// 	},
82/// 	serde::{Deserialize, Serialize},
83/// };
84///
85/// /// Commands that mutate the counter.
86/// #[derive(Clone, Serialize, Deserialize)]
87/// enum CounterCmd {
88/// 	Increment(u32),
89/// 	Decrement(u32),
90/// }
91///
92/// /// Queries that read the counter.
93/// #[derive(Clone, Serialize, Deserialize)]
94/// struct GetValue;
95///
96/// struct Counter {
97/// 	value: i64,
98/// }
99///
100/// impl StateMachine for Counter {
101/// 	type Command = CounterCmd;
102/// 	type Query = GetValue;
103/// 	type QueryResult = i64;
104/// 	type StateSync = LogReplaySync<Self>;
105///
106/// 	fn signature(&self) -> UniqueId {
107/// 		UniqueId::from("my_counter_v1")
108/// 	}
109///
110/// 	fn apply(&mut self, cmd: CounterCmd, _ctx: &dyn ApplyContext) {
111/// 		match cmd {
112/// 			CounterCmd::Increment(n) => {
113/// 				self.value += i64::from(n);
114/// 			}
115/// 			CounterCmd::Decrement(n) => {
116/// 				self.value -= i64::from(n);
117/// 			}
118/// 		}
119/// 	}
120///
121/// 	fn query(&self, _: GetValue) -> i64 {
122/// 		self.value
123/// 	}
124///
125/// 	fn state_sync(&self) -> LogReplaySync<Self> {
126/// 		LogReplaySync::default()
127/// 	}
128/// }
129/// ```
130///
131/// Then, to use it in a group:
132///
133/// ```rust,ignore
134/// let group = network
135///     .groups()
136///     .with_key("my-counter-group")
137///     .with_state_machine(Counter { value: 0 })
138///     .join();
139///
140/// // Wait for a leader to be elected.
141/// group.when().online().await;
142///
143/// // Submit a command (replicated to all members).
144/// group.execute(CounterCmd::Increment(1)).await?;
145///
146/// // Read the current value (strong consistency).
147/// let result = group.query(GetValue, Consistency::Strong).await?;
148/// assert_eq!(*result, 1);
149/// ```
150pub trait StateMachine: Sized + Send + Sync + 'static {
151	/// The type of commands that mutate the state machine.
152	///
153	/// Commands are the unit of replication: the leader appends each
154	/// command to the Raft log, replicates it to a quorum of followers,
155	/// and then every node applies it via [`apply`](Self::apply).
156	///
157	/// Commands must be serializable ([`Datum`]) because they are
158	/// transmitted over the network and persisted in the log. They must
159	/// also be [`Clone`] so the runtime can copy them when needed (e.g.
160	/// during state sync or forwarding).
161	type Command: Command;
162
163	/// The type of read-only queries against the state machine.
164	///
165	/// Queries do **not** go through the Raft log. They are evaluated
166	/// directly against the local state machine on whichever node
167	/// handles the request:
168	///
169	/// - **Weak consistency** — the query runs on the local node immediately,
170	///   which may return stale data.
171	/// - **Strong consistency** — the query is forwarded to the current leader,
172	///   guaranteeing a consistent read.
173	type Query: Query;
174
175	/// The result type returned by [`query`](Self::query).
176	///
177	/// Must be serializable so it can be sent over the network when a
178	/// follower forwards a strong-consistency query to the leader.
179	type QueryResult: QueryResult;
180
181	/// The [`StateSync`] implementation used to synchronize followers
182	/// that fall behind the leader's log.
183	///
184	/// Use [`LogReplaySync`](super::LogReplaySync) as a sensible
185	/// default — it works with any state machine by replaying missed
186	/// log entries from peers. For higher-throughput workloads, you can
187	/// provide a snapshot-based implementation instead.
188	type StateSync: StateSync<Machine = Self>;
189
190	/// Returns a unique fingerprint of this state machine type and its
191	/// configuration.
192	///
193	/// This value is **part of the [`GroupId`](super::GroupId)
194	/// derivation**. All members of the same group must return an
195	/// identical signature; any mismatch produces a different group id,
196	/// preventing nodes from bonding.
197	///
198	/// # What to include
199	///
200	/// - The state machine's **type name or version tag** (so incompatible
201	///   implementations never collide).
202	/// - Any **configuration parameters** that affect command semantics (e.g.
203	///   capacity limits, feature flags). If two nodes interpret the same command
204	///   differently because of a config difference, the signature must differ.
205	///
206	/// # Example
207	///
208	/// ```rust,ignore
209	/// fn signature(&self) -> UniqueId {
210	///     UniqueId::from("my_orderbook_v2")
211	///         .derive(self.max_depth.to_le_bytes())
212	/// }
213	/// ```
214	fn signature(&self) -> UniqueId;
215
216	/// Applies a committed command to the state machine.
217	///
218	/// This method is called **exactly once per committed log entry**,
219	/// in log order, on every node in the group. Because all nodes
220	/// apply the same commands in the same order, the state machine
221	/// must remain **deterministic**: the same input must always produce
222	/// the same state mutation.
223	///
224	/// The [`ApplyContext`] provides metadata about the current log
225	/// position and term, which can be useful for versioning,
226	/// compaction, or progress tracking.
227	///
228	/// # Panics
229	///
230	/// Panicking inside `apply` is treated as a fatal divergence — the
231	/// node will shut down rather than risk inconsistent state.
232	fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext);
233
234	/// Applies a batch of commands in one call.
235	///
236	/// The default implementation calls [`apply`](Self::apply) for
237	/// each command sequentially. Override this if your state machine
238	/// can amortize per-command overhead (e.g. by deferring index
239	/// rebuilds until the end of the batch).
240	///
241	/// The batch is received in log order and all commands belong to
242	/// the same term (available via [`ApplyContext::current_term`]).
243	#[inline]
244	fn apply_batch(
245		&mut self,
246		commands: impl IntoIterator<Item = Self::Command>,
247		ctx: &dyn ApplyContext,
248	) {
249		for command in commands {
250			self.apply(command, ctx);
251		}
252	}
253
254	/// Evaluates a read-only query against the current state.
255	///
256	/// This is called on the node that handles the query (the local
257	/// node for weak consistency, the leader for strong consistency).
258	/// It must **not** mutate the state machine.
259	fn query(&self, query: Self::Query) -> Self::QueryResult;
260
261	/// Returns a new [`StateSync`] instance for this state machine.
262	///
263	/// Called once when the group is initialized. The returned value
264	/// provides the session/provider factories that the Raft protocol
265	/// uses during follower catch-up.
266	///
267	/// For most use cases, returning
268	/// [`LogReplaySync::default()`](super::LogReplaySync::default) is
269	/// sufficient:
270	///
271	/// ```rust,ignore
272	/// fn state_sync(&self) -> LogReplaySync<Self> {
273	///     LogReplaySync::default()
274	/// }
275	/// ```
276	fn state_sync(&self) -> Self::StateSync;
277
278	/// Returns the leadership preference for this node.
279	///
280	/// This is a **per-node** setting that does not affect the group
281	/// id — different nodes in the same group can have different
282	/// preferences. The Raft election logic enforces the preference:
283	///
284	/// - [`LeadershipPreference::Normal`] — standard candidate behavior (the
285	///   default).
286	/// - [`LeadershipPreference::Reluctant`] — longer election timeouts, reducing
287	///   the chance of becoming leader but not preventing it entirely.
288	/// - [`LeadershipPreference::Observer`] — never self-nominates, abstains from
289	///   all votes, and does not count toward quorum. The node still replicates
290	///   the log and applies commands.
291	///
292	/// # Example
293	///
294	/// A read-replica that should never lead:
295	///
296	/// ```rust,ignore
297	/// fn leadership_preference(&self) -> LeadershipPreference {
298	///     LeadershipPreference::Observer
299	/// }
300	/// ```
301	#[inline]
302	fn leadership_preference(&self) -> LeadershipPreference {
303		LeadershipPreference::Normal
304	}
305}
306
307/// Contextual information provided to the state machine when applying commands.
308///
309/// This trait does not offer any information that is non-deterministic or that
310/// can diverge between different nodes in the group, so it is safe to use for
311/// deterministic state machines.
312pub trait ApplyContext {
313	/// The index and term of the last committed log entry that has been applied
314	/// by the state machine before applying the current batch of commands.
315	///
316	/// This can be used by the state machine to determine the current position of
317	/// the log when it is applying new commands.
318	///
319	/// If the state machine implements its own `apply_batch` method, it needs to
320	/// compute the index of each command in the batch by itself using the number
321	/// of commands in the batch and the index of the last committed command
322	/// returned by this method.
323	fn committed(&self) -> Cursor;
324
325	/// The index and term of the last log entry in the log.
326	fn log_position(&self) -> Cursor;
327
328	/// The term of the commands being applied.
329	fn current_term(&self) -> Term;
330
331	/// The unique identifier of the group this state machine belongs to.
332	fn group_id(&self) -> &crate::groups::GroupId;
333
334	/// The network identifier of the group this state machine belongs to.
335	fn network_id(&self) -> &crate::NetworkId;
336}
337
338pub trait StateMachineMessage:
339	Clone + Send + Sync + Serialize + DeserializeOwned + 'static
340{
341}
342
343impl<T> StateMachineMessage for T where
344	T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static
345{
346}
347
348pub trait Command: Datum + Sync + Clone {}
349impl<T> Command for T where T: Datum + Sync + Clone {}
350
351pub trait Query: Datum + Sync + Clone {}
352impl<T> Query for T where T: Datum + Sync + Clone {}
353
354pub trait QueryResult: Datum + Sync + Clone {}
355impl<T> QueryResult for T where T: Datum + Sync + Clone {}
356
357/// Describes a node's preference for assuming leadership within a group.
358///
359/// This preference is per-node and does not affect the group identity.
360/// Different nodes in the same group can have different leadership preferences.
361/// The preference is enforced by the Raft election logic at the follower level:
362/// observers never transition to candidate state, and reluctant nodes use
363/// longer election timeouts to reduce their likelihood of winning elections.
364///
365/// `Normal` and `Reluctant` nodes participate fully in voting and log
366/// replication. `Observer` nodes replicate the log but abstain from both
367/// election votes and commit-advancement votes, so they never inflate the
368/// quorum.
369///
370/// # Safety Considerations
371///
372/// - **Observer nodes do not count toward quorum.** They replicate the log and
373///   advance their committed state, but abstain from election and commit votes.
374///   This prevents slow or offline observers from stalling writes.
375/// - **Liveness.** If all nodes in a group are observers, no leader can ever be
376///   elected and the group will be unable to make progress. Ensure that at
377///   least one node in every group has `Normal` or `Reluctant` preference.
378#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
379pub enum LeadershipPreference {
380	/// Default Raft behavior. The node participates in elections as a
381	/// candidate with standard election timeouts.
382	#[default]
383	Normal,
384
385	/// The node uses longer election timeouts, reducing its likelihood of
386	/// becoming leader. It can still be elected if no other candidate is
387	/// available (e.g., during network partitions or when all preferred
388	/// leaders are down). The factor controls the timeout multiplier
389	/// (e.g., a factor of 3 triples the election timeout).
390	Reluctant {
391		/// The multiplier applied to the election timeout and bootstrap
392		/// delay. Must be greater than 1 for the deprioritization to have
393		/// any effect.
394		factor: u32,
395	},
396
397	/// The node never self-nominates as a candidate and abstains from all
398	/// votes (both elections and commit advancement), so it never counts
399	/// toward quorum. It still replicates the log and advances its
400	/// committed state from the leader's heartbeats.
401	Observer,
402}
403
404impl LeadershipPreference {
405	/// Creates a reluctant preference with the default factor of 3.
406	pub const fn reluctant() -> Self {
407		Self::Reluctant { factor: 3 }
408	}
409}