pub struct Node<T>where
T: TypeConfig,{ /* private fields */ }Expand description
Raft consensus node
Represents a single node participating in a Raft cluster. Coordinates protocol execution, storage, and networking.
Created via NodeBuilder.
§Running the Node
let node = builder.start()?;
node.run().await?; // Blocks until shutdownImplementations§
Source§impl<T> Node<T>where
T: TypeConfig,
impl<T> Node<T>where
T: TypeConfig,
Sourcepub async fn run(&self) -> Result<(), Error>
pub async fn run(&self) -> Result<(), Error>
Starts and runs the Raft node’s main execution loop.
§Workflow
Strategy-based bootstrap depending on node type:
- Learner: Skip cluster ready check, join cluster after warmup
- Voter: Wait for cluster ready, then warmup connections
Both paths converge to the Raft event processing loop.
§Errors
Returns Err if any bootstrap step or Raft execution fails.
§Example
let node = Node::new(...);
tokio::spawn(async move {
node.run().await.expect("Node execution failed");
});Sourcepub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
pub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
Returns a receiver for leader change notifications.
Subscribe to be notified when:
- First leader is elected (initial election)
- Leader changes (re-election)
- No leader exists (during election)
§Performance
Event-driven notification, <1ms latency
§Example
let mut leader_rx = node.leader_change_notifier();
while leader_rx.changed().await.is_ok() {
if let Some(info) = leader_rx.borrow().as_ref() {
println!("Leader: {} (term {})", info.leader_id, info.term);
}
}Sourcepub fn membership_change_notifier(&self) -> Receiver<MembershipSnapshot>
pub fn membership_change_notifier(&self) -> Receiver<MembershipSnapshot>
Subscribe to committed membership change notifications.
Returns a watch::Receiver that fires whenever a ConfChange entry
commits. The first borrow() returns the current membership state
without waiting for a change.
Trait Implementations§
Source§impl<T> ClusterManagementService for Node<T>where
T: TypeConfig,
impl<T> ClusterManagementService for Node<T>where
T: TypeConfig,
Source§fn update_cluster_conf<'life0, 'async_trait>(
&'life0 self,
request: Request<ClusterConfChangeRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClusterConfUpdateResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn update_cluster_conf<'life0, 'async_trait>(
&'life0 self,
request: Request<ClusterConfChangeRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClusterConfUpdateResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Handles cluster membership changes (joint consensus)
§Raft Protocol Logic
- Implements cluster configuration changes (Section 6)
- Validates new configuration against current cluster state
- Ensures safety during membership transitions
Source§fn get_cluster_metadata<'life0, 'async_trait>(
&'life0 self,
request: Request<MetadataRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClusterMembership>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn get_cluster_metadata<'life0, 'async_trait>(
&'life0 self,
request: Request<MetadataRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClusterMembership>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Returns current cluster membership and state metadata
§Usage
- Administrative API for cluster inspection
- Provides snapshot of current configuration
Source§fn join_cluster<'life0, 'async_trait>(
&'life0 self,
request: Request<JoinRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<JoinResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn join_cluster<'life0, 'async_trait>(
&'life0 self,
request: Request<JoinRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<JoinResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Source§fn discover_leader<'life0, 'async_trait>(
&'life0 self,
request: Request<LeaderDiscoveryRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<LeaderDiscoveryResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn discover_leader<'life0, 'async_trait>(
&'life0 self,
request: Request<LeaderDiscoveryRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<LeaderDiscoveryResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Source§impl<T> Debug for Node<T>where
T: TypeConfig,
impl<T> Debug for Node<T>where
T: TypeConfig,
Source§impl<T> RaftClientService for Node<T>where
T: TypeConfig,
impl<T> RaftClientService for Node<T>where
T: TypeConfig,
Source§fn handle_client_write<'life0, 'async_trait>(
&'life0 self,
request: Request<ClientWriteRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClientResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn handle_client_write<'life0, 'async_trait>(
&'life0 self,
request: Request<ClientWriteRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClientResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Processes client write requests requiring consensus
§Raft Protocol Logic
- Entry point for client proposals (Section 7)
- Validates requests before appending to leader’s log
- Ensures linearizable writes through log replication
Source§fn handle_client_read<'life0, 'async_trait>(
&'life0 self,
request: Request<ClientReadRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClientResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn handle_client_read<'life0, 'async_trait>(
&'life0 self,
request: Request<ClientReadRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ClientResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Handles client read requests with linearizability guarantees
§Raft Protocol Logic
- Implements lease-based leader reads (Section 6.4)
- Verifies leadership before serving reads
- Ensures read-after-write consistency
Source§fn handle_client_scan<'life0, 'async_trait>(
&'life0 self,
request: Request<ScanRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ScanResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn handle_client_scan<'life0, 'async_trait>(
&'life0 self,
request: Request<ScanRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<ScanResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Scan all keys under a prefix.
Routes through the Raft command channel so the leader serves the scan (linearizable by default). Returns all matching entries plus the applied index at scan time — clients use the revision to filter watch events during reconnection.
Source§fn watch<'life0, 'async_trait>(
&'life0 self,
request: Request<WatchRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftClientService>::WatchStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn watch<'life0, 'async_trait>(
&'life0 self,
request: Request<WatchRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftClientService>::WatchStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Watch for changes to a specific key
Returns a stream of events (PUT/DELETE) for the specified key. The stream will continue until the client disconnects or the server shuts down.
§Arguments
request- Contains the key to watch
§Returns
A stream of WatchResponse messages containing PUT/DELETE events
§Errors
Returns Status::UNAVAILABLE if Watch is disabled in configuration
Source§fn watch_membership<'life0, 'async_trait>(
&'life0 self,
_request: Request<WatchMembershipRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftClientService>::WatchMembershipStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn watch_membership<'life0, 'async_trait>(
&'life0 self,
_request: Request<WatchMembershipRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftClientService>::WatchMembershipStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Stream committed membership snapshots to the client.
Sends the current snapshot immediately on connect (via mark_changed), then
one snapshot per committed ConfChange. Closes with UNAVAILABLE on server shutdown
so clients know to reconnect.
Source§type WatchStream = Pin<Box<dyn Stream<Item = Result<WatchResponse, Status>> + Send>>
type WatchStream = Pin<Box<dyn Stream<Item = Result<WatchResponse, Status>> + Send>>
Source§type WatchMembershipStream = Pin<Box<dyn Stream<Item = Result<MembershipSnapshot, Status>> + Send>>
type WatchMembershipStream = Pin<Box<dyn Stream<Item = Result<MembershipSnapshot, Status>> + Send>>
Source§impl<T> RaftElectionService for Node<T>where
T: TypeConfig,
impl<T> RaftElectionService for Node<T>where
T: TypeConfig,
Source§fn request_vote<'life0, 'async_trait>(
&'life0 self,
request: Request<VoteRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<VoteResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn request_vote<'life0, 'async_trait>(
&'life0 self,
request: Request<VoteRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<VoteResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Handles RequestVote RPC calls from candidate nodes during leader elections
§Raft Protocol Logic
- Part of leader election mechanism (Section 5.2)
- Validates candidate’s term and log completeness
- Grants vote if candidate’s log is at least as up-to-date as local log
Source§impl<T> RaftReplicationService for Node<T>where
T: TypeConfig,
impl<T> RaftReplicationService for Node<T>where
T: TypeConfig,
Source§fn append_entries<'life0, 'async_trait>(
&'life0 self,
request: Request<AppendEntriesRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<AppendEntriesResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn append_entries<'life0, 'async_trait>(
&'life0 self,
request: Request<AppendEntriesRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<AppendEntriesResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Processes AppendEntries RPC calls from cluster leader
§Raft Protocol Logic
- Heartbeat mechanism (Section 5.2)
- Log replication entry point (Section 5.3)
- Term comparison logic:
- If incoming term > current term: revert to follower state
- Reset election timeout on valid leader communication
Source§fn stream_append_entries<'life0, 'async_trait>(
&'life0 self,
request: Request<Streaming<AppendEntriesRequest>>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftReplicationService>::StreamAppendEntriesStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn stream_append_entries<'life0, 'async_trait>(
&'life0 self,
request: Request<Streaming<AppendEntriesRequest>>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftReplicationService>::StreamAppendEntriesStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Processes a persistent bidirectional AppendEntries stream from the cluster leader.
Decouples request ingestion from response emission:
- recv task: reads batches from the stream, dispatches each as a
RaftEvent::AppendEntries(non-blocking between batches) - forwarder task: drains ordered response handles sequentially; ordering is guaranteed by the Raft single-threaded event loop
Source§type StreamAppendEntriesStream = Pin<Box<dyn Stream<Item = Result<AppendEntriesResponse, Status>> + Send>>
type StreamAppendEntriesStream = Pin<Box<dyn Stream<Item = Result<AppendEntriesResponse, Status>> + Send>>
Source§impl<T> SnapshotService for Node<T>where
T: TypeConfig,
impl<T> SnapshotService for Node<T>where
T: TypeConfig,
Source§type StreamSnapshotStream = Streaming<SnapshotChunk>
type StreamSnapshotStream = Streaming<SnapshotChunk>
Source§fn stream_snapshot<'life0, 'async_trait>(
&'life0 self,
request: Request<Streaming<SnapshotAck>>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as SnapshotService>::StreamSnapshotStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn stream_snapshot<'life0, 'async_trait>(
&'life0 self,
request: Request<Streaming<SnapshotAck>>,
) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as SnapshotService>::StreamSnapshotStream>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Source§fn install_snapshot<'life0, 'async_trait>(
&'life0 self,
request: Request<Streaming<SnapshotChunk>>,
) -> Pin<Box<dyn Future<Output = Result<Response<SnapshotResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
fn install_snapshot<'life0, 'async_trait>(
&'life0 self,
request: Request<Streaming<SnapshotChunk>>,
) -> Pin<Box<dyn Future<Output = Result<Response<SnapshotResponse>, Status>> + Send + 'async_trait>>where
'life0: 'async_trait,
Node<T>: 'async_trait,
Auto Trait Implementations§
impl<T> !Freeze for Node<T>
impl<T> !RefUnwindSafe for Node<T>
impl<T> Send for Node<T>
impl<T> Sync for Node<T>
impl<T> Unpin for Node<T>
impl<T> UnsafeUnpin for Node<T>
impl<T> !UnwindSafe for Node<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request