Skip to main content

Node

Struct Node 

Source
pub struct Node<T>
where T: TypeConfig,
{ /* private fields */ }
Expand description

Raft consensus node

Represents a single node participating in a Raft cluster. Coordinates protocol execution, storage, and networking.

Created via NodeBuilder.

§Running the Node

let node = builder.start()?;
node.run().await?;  // Blocks until shutdown

Implementations§

Source§

impl<T> Node<T>
where T: TypeConfig,

Source

pub async fn run(&self) -> Result<(), Error>

Starts and runs the Raft node’s main execution loop.

§Workflow

Strategy-based bootstrap depending on node type:

  • Learner: Skip cluster ready check, join cluster after warmup
  • Voter: Wait for cluster ready, then warmup connections

Both paths converge to the Raft event processing loop.

§Errors

Returns Err if any bootstrap step or Raft execution fails.

§Example
let node = Node::new(...);
tokio::spawn(async move {
    node.run().await.expect("Node execution failed");
});
Source

pub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>

Returns a receiver for leader change notifications.

Subscribe to be notified when:

  • First leader is elected (initial election)
  • Leader changes (re-election)
  • No leader exists (during election)
§Performance

Event-driven notification, <1ms latency

§Example
let mut leader_rx = node.leader_change_notifier();
while leader_rx.changed().await.is_ok() {
    if let Some(info) = leader_rx.borrow().as_ref() {
        println!("Leader: {} (term {})", info.leader_id, info.term);
    }
}
Source

pub fn membership_change_notifier(&self) -> Receiver<MembershipSnapshot>

Subscribe to committed membership change notifications.

Returns a watch::Receiver that fires whenever a ConfChange entry commits. The first borrow() returns the current membership state without waiting for a change.

Source

pub fn node_id(&self) -> u32

Returns this node’s unique identifier.

Useful for logging, metrics, and integrations that need to identify which Raft node is handling operations.

Trait Implementations§

Source§

impl<T> ClusterManagementService for Node<T>
where T: TypeConfig,

Source§

fn update_cluster_conf<'life0, 'async_trait>( &'life0 self, request: Request<ClusterConfChangeRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<ClusterConfUpdateResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Handles cluster membership changes (joint consensus)

§Raft Protocol Logic
  • Implements cluster configuration changes (Section 6)
  • Validates new configuration against current cluster state
  • Ensures safety during membership transitions
Source§

fn get_cluster_metadata<'life0, 'async_trait>( &'life0 self, request: Request<MetadataRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<ClusterMembership>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Returns current cluster membership and state metadata

§Usage
  • Administrative API for cluster inspection
  • Provides snapshot of current configuration
Source§

fn join_cluster<'life0, 'async_trait>( &'life0 self, request: Request<JoinRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<JoinResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Request to join the cluster as a new learner node
Source§

fn discover_leader<'life0, 'async_trait>( &'life0 self, request: Request<LeaderDiscoveryRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<LeaderDiscoveryResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

New RPC for leader discovery
Source§

impl<T> Debug for Node<T>
where T: TypeConfig,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl<T> RaftClientService for Node<T>
where T: TypeConfig,

Source§

fn handle_client_write<'life0, 'async_trait>( &'life0 self, request: Request<ClientWriteRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<ClientResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Processes client write requests requiring consensus

§Raft Protocol Logic
  • Entry point for client proposals (Section 7)
  • Validates requests before appending to leader’s log
  • Ensures linearizable writes through log replication
Source§

fn handle_client_read<'life0, 'async_trait>( &'life0 self, request: Request<ClientReadRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<ClientResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Handles client read requests with linearizability guarantees

§Raft Protocol Logic
  • Implements lease-based leader reads (Section 6.4)
  • Verifies leadership before serving reads
  • Ensures read-after-write consistency
Source§

fn handle_client_scan<'life0, 'async_trait>( &'life0 self, request: Request<ScanRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<ScanResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Scan all keys under a prefix.

Routes through the Raft command channel so the leader serves the scan (linearizable by default). Returns all matching entries plus the applied index at scan time — clients use the revision to filter watch events during reconnection.

Source§

fn watch<'life0, 'async_trait>( &'life0 self, request: Request<WatchRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftClientService>::WatchStream>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Watch for changes to a specific key

Returns a stream of events (PUT/DELETE) for the specified key. The stream will continue until the client disconnects or the server shuts down.

§Arguments
  • request - Contains the key to watch
§Returns

A stream of WatchResponse messages containing PUT/DELETE events

§Errors

Returns Status::UNAVAILABLE if Watch is disabled in configuration

Source§

fn watch_membership<'life0, 'async_trait>( &'life0 self, _request: Request<WatchMembershipRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftClientService>::WatchMembershipStream>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Stream committed membership snapshots to the client.

Sends the current snapshot immediately on connect (via mark_changed), then one snapshot per committed ConfChange. Closes with UNAVAILABLE on server shutdown so clients know to reconnect.

Source§

type WatchStream = Pin<Box<dyn Stream<Item = Result<WatchResponse, Status>> + Send>>

Server streaming response type for the Watch method.
Source§

type WatchMembershipStream = Pin<Box<dyn Stream<Item = Result<MembershipSnapshot, Status>> + Send>>

Server streaming response type for the WatchMembership method.
Source§

impl<T> RaftElectionService for Node<T>
where T: TypeConfig,

Source§

fn request_vote<'life0, 'async_trait>( &'life0 self, request: Request<VoteRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<VoteResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Handles RequestVote RPC calls from candidate nodes during leader elections

§Raft Protocol Logic
  • Part of leader election mechanism (Section 5.2)
  • Validates candidate’s term and log completeness
  • Grants vote if candidate’s log is at least as up-to-date as local log
Source§

impl<T> RaftReplicationService for Node<T>
where T: TypeConfig,

Source§

fn append_entries<'life0, 'async_trait>( &'life0 self, request: Request<AppendEntriesRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<AppendEntriesResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Processes AppendEntries RPC calls from cluster leader

§Raft Protocol Logic
  • Heartbeat mechanism (Section 5.2)
  • Log replication entry point (Section 5.3)
  • Term comparison logic:
    • If incoming term > current term: revert to follower state
    • Reset election timeout on valid leader communication
Source§

fn stream_append_entries<'life0, 'async_trait>( &'life0 self, request: Request<Streaming<AppendEntriesRequest>>, ) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as RaftReplicationService>::StreamAppendEntriesStream>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Processes a persistent bidirectional AppendEntries stream from the cluster leader.

Decouples request ingestion from response emission:

  • recv task: reads batches from the stream, dispatches each as a RaftEvent::AppendEntries (non-blocking between batches)
  • forwarder task: drains ordered response handles sequentially; ordering is guaranteed by the Raft single-threaded event loop
Source§

type StreamAppendEntriesStream = Pin<Box<dyn Stream<Item = Result<AppendEntriesResponse, Status>> + Send>>

Server streaming response type for the StreamAppendEntries method.
Source§

impl<T> SnapshotService for Node<T>
where T: TypeConfig,

Source§

type StreamSnapshotStream = Streaming<SnapshotChunk>

Server streaming response type for the StreamSnapshot method.
Source§

fn stream_snapshot<'life0, 'async_trait>( &'life0 self, request: Request<Streaming<SnapshotAck>>, ) -> Pin<Box<dyn Future<Output = Result<Response<<Node<T> as SnapshotService>::StreamSnapshotStream>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Learner-driven snapshot streaming
Source§

fn install_snapshot<'life0, 'async_trait>( &'life0 self, request: Request<Streaming<SnapshotChunk>>, ) -> Pin<Box<dyn Future<Output = Result<Response<SnapshotResponse>, Status>> + Send + 'async_trait>>
where 'life0: 'async_trait, Node<T>: 'async_trait,

Leader-driven snapshot streaming

Auto Trait Implementations§

§

impl<T> !Freeze for Node<T>

§

impl<T> !RefUnwindSafe for Node<T>

§

impl<T> Send for Node<T>

§

impl<T> Sync for Node<T>

§

impl<T> Unpin for Node<T>

§

impl<T> UnsafeUnpin for Node<T>

§

impl<T> !UnwindSafe for Node<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more