1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
//! Defines the [`Event`] enum and related types.
use std::sync::Arc;
use crate::invocation::CoordNumOf;
use crate::invocation::EffectOf;
use crate::invocation::EjectionOf;
use crate::invocation::FrozenStateOf;
use crate::invocation::Invocation;
use crate::invocation::LogEntryOf;
use crate::invocation::NodeOf;
use crate::invocation::RoundNumOf;
use crate::invocation::SnapshotFor;
use crate::invocation::StateOf;
use crate::node::NodeStatus;
use crate::RoundNum;
/// Emitted by `Node`'s [`poll_events`][crate::Node::poll_events] method.
#[non_exhaustive]
pub enum Event<I: Invocation> {
/// First event that is fired immediately after construction.
Init {
/// Initial status of the node.
status: NodeStatus,
/// Round the node is in.
round: RoundNumOf<I>,
/// Initial state or `None` when no snapshot was given.
state: Option<Arc<FrozenStateOf<I>>>,
},
/// Node's status changed.
StatusChange {
/// Old status.
old_status: NodeStatus,
/// New status.
new_status: NodeStatus,
},
/// The node is transitioning to [partially active
/// participation][crate::node::Participation].
Activate(RoundNumOf<I>),
/// A snapshot was installed.
Install {
/// Round node is in after a snapshot was installed.
round: RoundNumOf<I>,
/// State the node is in after a snapshot was installed.
state: Option<Arc<FrozenStateOf<I>>>,
},
/// An entry has been committed to the log.
///
/// The event does not imply that the entry was applied to the shared state.
Commit {
/// The round for which `log_entry` was committed.
round: RoundNumOf<I>,
/// The log entry which was committed.
log_entry: Arc<LogEntryOf<I>>,
},
/// The next log entry was applied to the state.
Apply {
/// Round the log entry was applied in.
round: RoundNumOf<I>,
/// Log entry that was applied.
log_entry: Arc<LogEntryOf<I>>,
/// The event data that the application.
effect: EffectOf<I>,
/// Concurrency after this apply.
new_concurrency: std::num::NonZeroUsize,
},
/// A log entry was queued, preceeding entries are still missing.
///
/// Note: This event is emitted even when the queued entry is within the
/// concurrency bound or if this node created the gap itself. The second
/// case can arise when the leader tries to concurrently append multiple
/// entries and abandons some of the earlier appends.
Gaps(Vec<Gap<RoundNumOf<I>>>),
/// This node received a (potentially indirect) directive for the given
/// round and from the given node. The node used the mandate obtained with
/// the given coordination number to issue the directive.
///
/// This event is not emitted when this node is disoriented or lagging.
Directive {
/// Kind of directive, either `Prepare`, `Accept` or `Commit`.
kind: DirectiveKind,
/// Leader that issued the directive.
leader: NodeOf<I>,
/// Round for which the directive was issued.
round_num: RoundNumOf<I>,
/// Coordination number with which the directive was issued.
coord_num: CoordNumOf<I>,
/// Time the directive was (locally) registered.
timestamp: instant::Instant,
},
/// The node ejected its state.
Eject {
/// The reason for the ejection.
reason: EjectionOf<I>,
/// The ejected state.
state: StateOf<I>,
},
}
impl<I: Invocation> std::fmt::Debug for Event<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Event::Init {
status,
round,
state,
} => f
.debug_struct("Event::Init")
.field("status", status)
.field("round", round)
.field("state", state)
.finish(),
Event::StatusChange {
old_status,
new_status,
} => f
.debug_struct("Event::StatusChange")
.field("old_status", old_status)
.field("new_status", new_status)
.finish(),
Event::Activate(round) => f.debug_tuple("Event::Activate").field(round).finish(),
Event::Install { round, state } => f
.debug_struct("Event::Install")
.field("round", round)
.field("state", state)
.finish(),
Event::Commit { round, log_entry } => f
.debug_struct("Event::Commit")
.field("round", round)
.field("log_entry", log_entry)
.finish(),
Event::Apply {
round,
log_entry,
effect: result,
new_concurrency,
} => f
.debug_struct("Event::Apply")
.field("round", round)
.field("log_entry", log_entry)
.field("result", result)
.field("new_concurrency", new_concurrency)
.finish(),
Event::Gaps(gaps) => f.debug_tuple("Event::Gaps").field(gaps).finish(),
Event::Directive {
kind,
leader,
round_num,
coord_num,
timestamp,
} => f
.debug_struct("Event::Directive")
.field("kind", kind)
.field("leader", leader)
.field("round_num", round_num)
.field("coord_num", coord_num)
.field("timestamp", timestamp)
.finish(),
Event::Eject { reason, state } => f
.debug_struct("Event::Eject")
.field("reason", reason)
.field("state", state)
.finish(),
}
}
}
/// Gap in the local log.
#[derive(Clone, Debug)]
pub struct Gap<R: RoundNum> {
/// The point in time when the gap appeared.
pub since: instant::Instant,
/// The locations of the gap within the log.
pub rounds: std::ops::Range<R>,
}
/// Kind of directive, see [`Event::Directive`].
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DirectiveKind {
/// Prepare directive.
Prepare,
/// Accept directive.
Accept,
/// Commit directive.
Commit,
}
/// Emitted by `Shutdown`'s [`poll_shutdown`][crate::Shutdown::poll_shutdown]
/// method.
pub enum ShutdownEvent<I: Invocation> {
/// Regular event emitted during the shutdown process.
Regular(Event<I>),
/// Final event of the shutdown process.
///
/// When this event is received the shutdown procedure has completed.
#[non_exhaustive]
Final {
/// Snapshot that may be used to restart the node via
/// [`resuming_from`][crate::NodeBuilderWithNodeIdAndCommunicator::
/// resuming_from].
snapshot: SnapshotFor<I>,
},
}
impl<I: Invocation> From<Event<I>> for ShutdownEvent<I> {
fn from(e: Event<I>) -> Self {
ShutdownEvent::Regular(e)
}
}
impl<I: Invocation> std::fmt::Debug for ShutdownEvent<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ShutdownEvent::Regular(e) => f.debug_tuple("ShutdownEvent::Regular").field(e).finish(),
ShutdownEvent::Final { snapshot } => f
.debug_struct("ShutdownEvent::Last")
.field("snapshot", snapshot)
.finish(),
}
}
}