mosaik/groups/machine/mod.rs
1//! Replicated State Machine (RSM) module for consensus groups.
2//!
3//! This module defines the [`StateMachine`] trait — the main integration
4//! point for application-specific logic in a mosaik consensus group. Every
5//! group runs exactly one state machine instance per node, and the Raft
6//! protocol ensures that all instances process the same commands in the same
7//! order, converging on identical state.
8//!
9//! The module also defines the [`StateSync`] trait, which handles catch-up
10//! for followers that fall behind the leader's log. The built-in
11//! [`LogReplaySync`](super::LogReplaySync) implementation works with any
12//! state machine by replaying missed log entries.
13
14use {
15 crate::{
16 Datum,
17 groups::{Cursor, Term},
18 primitives::UniqueId,
19 },
20 serde::{Serialize, de::DeserializeOwned},
21};
22
23mod noop;
24mod sync;
25
26#[doc(hidden)]
27pub use noop::NoOp;
28// Public API traits for user-provided state machine implementations.
29pub use sync::*;
30
31/// The core trait for application-specific replicated state in a consensus
32/// group.
33///
34/// Every mosaik consensus group is backed by a single `StateMachine`
35/// implementation. The Raft protocol replicates [`Command`]s across all
36/// group members, and each member applies them **in the same order** to
37/// its local `StateMachine` instance, guaranteeing that all nodes converge
38/// on identical state.
39///
40/// # Lifecycle
41///
42/// 1. **Create** — you instantiate your state machine and pass it to the group
43/// builder via
44/// [`with_state_machine`](super::GroupBuilder::with_state_machine).
45/// 2. **Join** — calling [`join()`](super::GroupBuilder::join) on the builder
46/// starts the Raft protocol and returns a [`Group<M>`](super::Group) handle.
47/// 3. **Commands** — clients submit commands through
48/// [`Group::execute`](super::Group::execute). The leader replicates each
49/// command to a quorum of followers; once committed, every node calls
50/// [`apply`](StateMachine::apply) with that command.
51/// 4. **Queries** — clients call [`Group::query`](super::Group::query) with
52/// either `Weak` (local, possibly stale) or `Strong` (forwarded to leader)
53/// consistency. The group calls [`query`](StateMachine::query) on the local
54/// state machine and returns the result.
55/// 5. **Catch-up** — when a follower falls behind, the [`StateSync`]
56/// implementation returned by [`state_sync`](StateMachine::state_sync) fills
57/// the gap (replaying missed log entries or transferring a snapshot).
58///
59/// # Determinism
60///
61/// **`apply` must be deterministic.** Given the same sequence of commands,
62/// every node must produce the exact same state. Do not read wall-clock
63/// time, random values, or any external I/O inside `apply`. If you need
64/// non-deterministic input, inject it *into* a command before submitting
65/// it.
66///
67/// # Example
68///
69/// A simple counter replicated across a group:
70///
71/// ```rust
72/// use {
73/// mosaik::{
74/// UniqueId,
75/// groups::{
76/// ApplyContext,
77/// LeadershipPreference,
78/// LogReplaySync,
79/// StateMachine,
80/// },
81/// },
82/// serde::{Deserialize, Serialize},
83/// };
84///
85/// /// Commands that mutate the counter.
86/// #[derive(Clone, Serialize, Deserialize)]
87/// enum CounterCmd {
88/// Increment(u32),
89/// Decrement(u32),
90/// }
91///
92/// /// Queries that read the counter.
93/// #[derive(Clone, Serialize, Deserialize)]
94/// struct GetValue;
95///
96/// struct Counter {
97/// value: i64,
98/// }
99///
100/// impl StateMachine for Counter {
101/// type Command = CounterCmd;
102/// type Query = GetValue;
103/// type QueryResult = i64;
104/// type StateSync = LogReplaySync<Self>;
105///
106/// fn signature(&self) -> UniqueId {
107/// UniqueId::from("my_counter_v1")
108/// }
109///
110/// fn apply(&mut self, cmd: CounterCmd, _ctx: &dyn ApplyContext) {
111/// match cmd {
112/// CounterCmd::Increment(n) => {
113/// self.value += i64::from(n);
114/// }
115/// CounterCmd::Decrement(n) => {
116/// self.value -= i64::from(n);
117/// }
118/// }
119/// }
120///
121/// fn query(&self, _: GetValue) -> i64 {
122/// self.value
123/// }
124///
125/// fn state_sync(&self) -> LogReplaySync<Self> {
126/// LogReplaySync::default()
127/// }
128/// }
129/// ```
130///
131/// Then, to use it in a group:
132///
133/// ```rust,ignore
134/// let group = network
135/// .groups()
136/// .with_key("my-counter-group")
137/// .with_state_machine(Counter { value: 0 })
138/// .join();
139///
140/// // Wait for a leader to be elected.
141/// group.when().online().await;
142///
143/// // Submit a command (replicated to all members).
144/// group.execute(CounterCmd::Increment(1)).await?;
145///
146/// // Read the current value (strong consistency).
147/// let result = group.query(GetValue, Consistency::Strong).await?;
148/// assert_eq!(*result, 1);
149/// ```
150pub trait StateMachine: Sized + Send + Sync + 'static {
151 /// The type of commands that mutate the state machine.
152 ///
153 /// Commands are the unit of replication: the leader appends each
154 /// command to the Raft log, replicates it to a quorum of followers,
155 /// and then every node applies it via [`apply`](Self::apply).
156 ///
157 /// Commands must be serializable ([`Datum`]) because they are
158 /// transmitted over the network and persisted in the log. They must
159 /// also be [`Clone`] so the runtime can copy them when needed (e.g.
160 /// during state sync or forwarding).
161 type Command: Command;
162
163 /// The type of read-only queries against the state machine.
164 ///
165 /// Queries do **not** go through the Raft log. They are evaluated
166 /// directly against the local state machine on whichever node
167 /// handles the request:
168 ///
169 /// - **Weak consistency** — the query runs on the local node immediately,
170 /// which may return stale data.
171 /// - **Strong consistency** — the query is forwarded to the current leader,
172 /// guaranteeing a consistent read.
173 type Query: Query;
174
175 /// The result type returned by [`query`](Self::query).
176 ///
177 /// Must be serializable so it can be sent over the network when a
178 /// follower forwards a strong-consistency query to the leader.
179 type QueryResult: QueryResult;
180
181 /// The [`StateSync`] implementation used to synchronize followers
182 /// that fall behind the leader's log.
183 ///
184 /// Use [`LogReplaySync`](super::LogReplaySync) as a sensible
185 /// default — it works with any state machine by replaying missed
186 /// log entries from peers. For higher-throughput workloads, you can
187 /// provide a snapshot-based implementation instead.
188 type StateSync: StateSync<Machine = Self>;
189
190 /// Returns a unique fingerprint of this state machine type and its
191 /// configuration.
192 ///
193 /// This value is **part of the [`GroupId`](super::GroupId)
194 /// derivation**. All members of the same group must return an
195 /// identical signature; any mismatch produces a different group id,
196 /// preventing nodes from bonding.
197 ///
198 /// # What to include
199 ///
200 /// - The state machine's **type name or version tag** (so incompatible
201 /// implementations never collide).
202 /// - Any **configuration parameters** that affect command semantics (e.g.
203 /// capacity limits, feature flags). If two nodes interpret the same command
204 /// differently because of a config difference, the signature must differ.
205 ///
206 /// # Example
207 ///
208 /// ```rust,ignore
209 /// fn signature(&self) -> UniqueId {
210 /// UniqueId::from("my_orderbook_v2")
211 /// .derive(self.max_depth.to_le_bytes())
212 /// }
213 /// ```
214 fn signature(&self) -> UniqueId;
215
216 /// Applies a committed command to the state machine.
217 ///
218 /// This method is called **exactly once per committed log entry**,
219 /// in log order, on every node in the group. Because all nodes
220 /// apply the same commands in the same order, the state machine
221 /// must remain **deterministic**: the same input must always produce
222 /// the same state mutation.
223 ///
224 /// The [`ApplyContext`] provides metadata about the current log
225 /// position and term, which can be useful for versioning,
226 /// compaction, or progress tracking.
227 ///
228 /// # Panics
229 ///
230 /// Panicking inside `apply` is treated as a fatal divergence — the
231 /// node will shut down rather than risk inconsistent state.
232 fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext);
233
234 /// Applies a batch of commands in one call.
235 ///
236 /// The default implementation calls [`apply`](Self::apply) for
237 /// each command sequentially. Override this if your state machine
238 /// can amortize per-command overhead (e.g. by deferring index
239 /// rebuilds until the end of the batch).
240 ///
241 /// The batch is received in log order and all commands belong to
242 /// the same term (available via [`ApplyContext::current_term`]).
243 #[inline]
244 fn apply_batch(
245 &mut self,
246 commands: impl IntoIterator<Item = Self::Command>,
247 ctx: &dyn ApplyContext,
248 ) {
249 for command in commands {
250 self.apply(command, ctx);
251 }
252 }
253
254 /// Evaluates a read-only query against the current state.
255 ///
256 /// This is called on the node that handles the query (the local
257 /// node for weak consistency, the leader for strong consistency).
258 /// It must **not** mutate the state machine.
259 fn query(&self, query: Self::Query) -> Self::QueryResult;
260
261 /// Returns a new [`StateSync`] instance for this state machine.
262 ///
263 /// Called once when the group is initialized. The returned value
264 /// provides the session/provider factories that the Raft protocol
265 /// uses during follower catch-up.
266 ///
267 /// For most use cases, returning
268 /// [`LogReplaySync::default()`](super::LogReplaySync::default) is
269 /// sufficient:
270 ///
271 /// ```rust,ignore
272 /// fn state_sync(&self) -> LogReplaySync<Self> {
273 /// LogReplaySync::default()
274 /// }
275 /// ```
276 fn state_sync(&self) -> Self::StateSync;
277
278 /// Returns the leadership preference for this node.
279 ///
280 /// This is a **per-node** setting that does not affect the group
281 /// id — different nodes in the same group can have different
282 /// preferences. The Raft election logic enforces the preference:
283 ///
284 /// - [`LeadershipPreference::Normal`] — standard candidate behavior (the
285 /// default).
286 /// - [`LeadershipPreference::Reluctant`] — longer election timeouts, reducing
287 /// the chance of becoming leader but not preventing it entirely.
288 /// - [`LeadershipPreference::Observer`] — never self-nominates, abstains from
289 /// all votes, and does not count toward quorum. The node still replicates
290 /// the log and applies commands.
291 ///
292 /// # Example
293 ///
294 /// A read-replica that should never lead:
295 ///
296 /// ```rust,ignore
297 /// fn leadership_preference(&self) -> LeadershipPreference {
298 /// LeadershipPreference::Observer
299 /// }
300 /// ```
301 #[inline]
302 fn leadership_preference(&self) -> LeadershipPreference {
303 LeadershipPreference::Normal
304 }
305}
306
307/// Contextual information provided to the state machine when applying commands.
308///
309/// This trait does not offer any information that is non-deterministic or that
310/// can diverge between different nodes in the group, so it is safe to use for
311/// deterministic state machines.
312pub trait ApplyContext {
313 /// The index and term of the last committed log entry that has been applied
314 /// by the state machine before applying the current batch of commands.
315 ///
316 /// This can be used by the state machine to determine the current position of
317 /// the log when it is applying new commands.
318 ///
319 /// If the state machine implements its own `apply_batch` method, it needs to
320 /// compute the index of each command in the batch by itself using the number
321 /// of commands in the batch and the index of the last committed command
322 /// returned by this method.
323 fn committed(&self) -> Cursor;
324
325 /// The index and term of the last log entry in the log.
326 fn log_position(&self) -> Cursor;
327
328 /// The term of the commands being applied.
329 fn current_term(&self) -> Term;
330
331 /// The unique identifier of the group this state machine belongs to.
332 fn group_id(&self) -> &crate::groups::GroupId;
333
334 /// The network identifier of the group this state machine belongs to.
335 fn network_id(&self) -> &crate::NetworkId;
336}
337
338pub trait StateMachineMessage:
339 Clone + Send + Sync + Serialize + DeserializeOwned + 'static
340{
341}
342
343impl<T> StateMachineMessage for T where
344 T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static
345{
346}
347
348pub trait Command: Datum + Sync + Clone {}
349impl<T> Command for T where T: Datum + Sync + Clone {}
350
351pub trait Query: Datum + Sync + Clone {}
352impl<T> Query for T where T: Datum + Sync + Clone {}
353
354pub trait QueryResult: Datum + Sync + Clone {}
355impl<T> QueryResult for T where T: Datum + Sync + Clone {}
356
357/// Describes a node's preference for assuming leadership within a group.
358///
359/// This preference is per-node and does not affect the group identity.
360/// Different nodes in the same group can have different leadership preferences.
361/// The preference is enforced by the Raft election logic at the follower level:
362/// observers never transition to candidate state, and reluctant nodes use
363/// longer election timeouts to reduce their likelihood of winning elections.
364///
365/// `Normal` and `Reluctant` nodes participate fully in voting and log
366/// replication. `Observer` nodes replicate the log but abstain from both
367/// election votes and commit-advancement votes, so they never inflate the
368/// quorum.
369///
370/// # Safety Considerations
371///
372/// - **Observer nodes do not count toward quorum.** They replicate the log and
373/// advance their committed state, but abstain from election and commit votes.
374/// This prevents slow or offline observers from stalling writes.
375/// - **Liveness.** If all nodes in a group are observers, no leader can ever be
376/// elected and the group will be unable to make progress. Ensure that at
377/// least one node in every group has `Normal` or `Reluctant` preference.
378#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
379pub enum LeadershipPreference {
380 /// Default Raft behavior. The node participates in elections as a
381 /// candidate with standard election timeouts.
382 #[default]
383 Normal,
384
385 /// The node uses longer election timeouts, reducing its likelihood of
386 /// becoming leader. It can still be elected if no other candidate is
387 /// available (e.g., during network partitions or when all preferred
388 /// leaders are down). The factor controls the timeout multiplier
389 /// (e.g., a factor of 3 triples the election timeout).
390 Reluctant {
391 /// The multiplier applied to the election timeout and bootstrap
392 /// delay. Must be greater than 1 for the deprioritization to have
393 /// any effect.
394 factor: u32,
395 },
396
397 /// The node never self-nominates as a candidate and abstains from all
398 /// votes (both elections and commit advancement), so it never counts
399 /// toward quorum. It still replicates the log and advances its
400 /// committed state from the leader's heartbeats.
401 Observer,
402}
403
404impl LeadershipPreference {
405 /// Creates a reluctant preference with the default factor of 3.
406 pub const fn reluctant() -> Self {
407 Self::Reluctant { factor: 3 }
408 }
409}