Skip to main content

Transport

Trait Transport 

Source
pub trait Transport<T>:
    Send
    + Sync
    + 'static
where T: TypeConfig,
{ // Required methods fn send_cluster_update<'life0, 'life1, 'async_trait>( &'life0 self, req: ClusterConfChangeRequest, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<ClusterUpdateResult>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn send_append_requests<'life0, 'life1, 'async_trait>( &'life0 self, requests: Vec<(u32, AppendEntriesRequest)>, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, response_compress_enabled: bool, ) -> Pin<Box<dyn Future<Output = Result<AppendResult>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn send_vote_requests<'life0, 'life1, 'async_trait>( &'life0 self, req: VoteRequest, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<VoteResult>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn join_cluster<'life0, 'async_trait>( &'life0 self, leader_id: u32, request: JoinRequest, retry: BackoffPolicy, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<JoinResponse>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn discover_leader<'life0, 'async_trait>( &'life0 self, request: LeaderDiscoveryRequest, rpc_enable_compression: bool, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<Vec<LeaderDiscoveryResponse>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn send_append_request<'life0, 'life1, 'async_trait>( &'life0 self, peer_id: u32, request: AppendEntriesRequest, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, response_compress_enabled: bool, ) -> Pin<Box<dyn Future<Output = Result<AppendEntriesResponse>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn send_snapshot<'life0, 'async_trait>( &'life0 self, peer_id: u32, metadata: SnapshotMetadata, state_machine_handler: Arc<SMHOF<T>>, membership: Arc<MOF<T>>, config: SnapshotConfig, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn request_snapshot_from_leader<'life0, 'life1, 'async_trait>( &'life0 self, leader_id: u32, ack_tx: Receiver<SnapshotAck>, retry: &'life1 InstallSnapshotBackoffPolicy, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<Box<Streaming<SnapshotChunk>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn open_replication_stream<'life0, 'async_trait>( &'life0 self, peer_id: u32, membership: Arc<MOF<T>>, compress: bool, ) -> Pin<Box<dyn Future<Output = Result<ReplicationStream>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; }

Required Methods§

Source

fn send_cluster_update<'life0, 'life1, 'async_trait>( &'life0 self, req: ClusterConfChangeRequest, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<ClusterUpdateResult>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Propagates cluster configuration changes to voting members using Raft’s joint consensus.

§Protocol
  • Implements membership change protocol from Raft §6
  • Leader-exclusive operation
  • Automatically filters self-references and duplicates
§Parameters
  • req: Configuration change details with transition state
  • retry: Network retry policy with exponential backoff
  • membership: Cluster membership for channel resolution
§Errors
  • NetworkError::EmptyPeerList if no peers provided
  • NetworkError::TaskFailed for background execution failures
  • ConsensusError::NotLeader if executed by non-leader
§Implementation
  • Uses compressed gRPC streams for efficiency
  • Maintains response order matching input peers
  • Concurrent request processing with ordered aggregation
Source

fn send_append_requests<'life0, 'life1, 'async_trait>( &'life0 self, requests: Vec<(u32, AppendEntriesRequest)>, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, response_compress_enabled: bool, ) -> Pin<Box<dyn Future<Output = Result<AppendResult>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Replicates log entries to followers and learners.

§Protocol
  • Implements log replication from Raft §5.3
  • Leader-exclusive operation
  • Handles log consistency checks automatically
§Parameters
  • requests: Vector of (peer_id, AppendEntriesRequest)
  • retry: Network retry configuration
  • membership: Cluster membership for channel resolution
  • response_compress_enabled: Enable compression for replication responses
§Returns
  • On success: Ok(AppendResult) containing aggregated responses
  • On failure: Err(NetworkError) for unrecoverable errors
§Error Conditions: Top-level Err is returned ONLY when:
  • Input requests_with_peer_address is empty (NetworkError::EmptyPeerList)
  • Critical failures prevent spawning async tasks (not shown in current impl)
§Errors
  • NetworkError::EmptyPeerList for empty input
  • NetworkError::TaskFailed for partial execution failures
§Guarantees
  • At-least-once delivery semantics
  • Automatic deduplication of peer entries
  • Non-blocking error handling
Source

fn send_vote_requests<'life0, 'life1, 'async_trait>( &'life0 self, req: VoteRequest, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<VoteResult>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Initiates leader election by requesting votes from cluster peers.

§Protocol
  • Implements leader election from Raft §5.2
  • Candidate-exclusive operation
  • Validates log completeness requirements
§Parameters
  • req: Election metadata with candidate’s term and log state
  • retry: Election-specific retry strategy
  • membership: Cluster membership for channel resolution
§Errors
  • NetworkError::EmptyPeerList for empty peer list
  • NetworkError::TaskFailed for RPC execution failures
§Safety
  • Automatic term validation in responses
  • Strict candidate state enforcement
  • Non-blocking partial failure handling
Source

fn join_cluster<'life0, 'async_trait>( &'life0 self, leader_id: u32, request: JoinRequest, retry: BackoffPolicy, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<JoinResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Orchestrates log compaction across cluster peers after snapshot creation.

§Protocol
  • Implements log truncation from Raft §7 Initiates cluster join process for a learner node
§Protocol
  • Implements cluster join protocol from Raft §6
  • Learner-exclusive operation
  • Requires leader connection
§Parameters
  • leader_channel: Pre-established gRPC channel to cluster leader
  • request: Join request with node metadata
  • retry: Join-specific retry configuration
  • membership: Cluster membership for channel resolution
§Errors
  • NetworkError::JoinFailed: On unrecoverable join failure
  • NetworkError::NotLeader: If contacted node isn’t leader
§Guarantees
  • At-least-once delivery
  • Automatic leader discovery
  • Idempotent operation
Source

fn discover_leader<'life0, 'async_trait>( &'life0 self, request: LeaderDiscoveryRequest, rpc_enable_compression: bool, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<Vec<LeaderDiscoveryResponse>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Discovers current cluster leader

§Protocol
  • Broadcast-based leader discovery
  • Handles redirection to current leader
§Parameters
  • bootstrap_endpoints: Initial cluster endpoints
  • request: Discovery request with node metadata
§Errors
  • NetworkError::DiscoveryTimeout: When no response received
Source

fn send_append_request<'life0, 'life1, 'async_trait>( &'life0 self, peer_id: u32, request: AppendEntriesRequest, retry: &'life1 RetryPolicies, membership: Arc<MOF<T>>, response_compress_enabled: bool, ) -> Pin<Box<dyn Future<Output = Result<AppendEntriesResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Send a single AppendEntries request to one peer (used by ReplicationWorker).

Non-blocking from the caller’s perspective: the caller fires this and the per-follower worker task awaits the response independently. Reuses the existing FIFO peer_appender_task infrastructure internally.

§Parameters
  • peer_id: Target follower node ID
  • request: AppendEntries RPC request
  • retry: Retry / timeout configuration
  • membership: Cluster membership for channel resolution
  • response_compress_enabled: Enable gRPC response compression
§Returns

Ok(AppendEntriesResponse) on success, Err on network / timeout failure

Source

fn send_snapshot<'life0, 'async_trait>( &'life0 self, peer_id: u32, metadata: SnapshotMetadata, state_machine_handler: Arc<SMHOF<T>>, membership: Arc<MOF<T>>, config: SnapshotConfig, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Pushes a snapshot to a lagging peer (called by per-follower ReplicationWorker).

Used when a peer’s next_index falls below the leader’s purge boundary and AppendEntries would carry a stale prev_log_term = 0, causing a perpetual conflict loop. The worker calls this and awaits completion before emitting RoleEvent::SnapshotPushCompleted.

§Parameters
  • peer_id: Target follower node ID
  • metadata: Snapshot metadata (term, index, size)
  • state_machine_handler: Used to load the snapshot data stream
  • membership: Cluster membership for bulk-channel resolution
  • config: Snapshot transfer configuration (chunk size, timeout, etc.)
§Returns

Ok(()) on successful transfer, Err on network or serialization failure.

Source

fn request_snapshot_from_leader<'life0, 'life1, 'async_trait>( &'life0 self, leader_id: u32, ack_tx: Receiver<SnapshotAck>, retry: &'life1 InstallSnapshotBackoffPolicy, membership: Arc<MOF<T>>, ) -> Pin<Box<dyn Future<Output = Result<Box<Streaming<SnapshotChunk>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Requests and streams a snapshot from the current leader.

§Parameters
  • leader_id: Current leader node ID
  • retry: Retry configuration (currently unused in implementation)
  • membership: Cluster membership for channel resolution
§Returns

Streaming of snapshot chunks from the leader

§Errors

Returns NetworkError if:

  • Connection to leader fails
  • gRPC call fails
Source

fn open_replication_stream<'life0, 'async_trait>( &'life0 self, peer_id: u32, membership: Arc<MOF<T>>, compress: bool, ) -> Pin<Box<dyn Future<Output = Result<ReplicationStream>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Opens a persistent bidirectional AppendEntries stream to the given peer.

Called once per follower when the leader becomes active (or on reconnect). The returned [ReplicationStream] contains:

  • sender: push batches into the open h2 stream (non-blocking, capacity 128)
  • receiver: stream of ACKs from the follower

When the stream breaks (network error, peer restart), the receiver yields an Err(tonic::Status) and the caller should emit RoleEvent::PeerStreamError { peer_id } so the Raft loop can reset next_index and schedule reconnection.

§Errors

Returns NetworkError if the initial stream handshake fails.

Dyn Compatibility§

This trait is dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§