1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// SPDX-License-Identifier: BUSL-1.1
//! Per-peer QUIC connection pool: registration, dialling, eviction, warm-up.
use std::net::SocketAddr;
use tracing::debug;
use crate::error::{ClusterError, Result};
use crate::transport::config::SNI_HOSTNAME;
use crate::wire_version::handshake_io::{local_version_range, perform_version_handshake_client};
use super::transport::NexarTransport;
impl NexarTransport {
/// Register a peer's address for outbound connections.
pub fn register_peer(&self, node_id: u64, addr: SocketAddr) {
let mut addrs = self.peer_addrs.write().unwrap_or_else(|p| p.into_inner());
addrs.insert(node_id, addr);
debug!(node_id, %addr, "peer registered");
}
/// Pre-warm the QUIC connection cache for a peer by performing the full
/// dial + handshake and inserting the connection into the peer cache. On
/// success, the next `send_rpc(target, ...)` skips the dial entirely and
/// reuses the cached `quinn::Connection`.
///
/// Caller MUST have called [`register_peer`] first — this function
/// resolves the peer address from the `peer_addrs` map. Used by the
/// startup `warm_peers` phase so the first replicated request after boot
/// doesn't pay a cold-connect penalty.
///
/// [`register_peer`]: Self::register_peer
pub async fn warm_peer(&self, node_id: u64) -> Result<()> {
self.get_or_connect(node_id).await.map(|_| ())
}
/// Get the stable ID of the cached connection to a peer.
///
/// Returns `None` if no connection is cached or the connection is closed.
/// Used during migrations to detect if the peer connection changed
/// (indicating possible node replacement).
pub fn peer_connection_stable_id(&self, target: u64) -> Option<usize> {
let peers = self.peers.read().unwrap_or_else(|p| p.into_inner());
peers.get(&target).and_then(|conn| {
if conn.close_reason().is_none() {
Some(conn.stable_id())
} else {
None
}
})
}
/// Get an existing connection to a peer, or establish a new one.
pub(super) async fn get_or_connect(&self, target: u64) -> Result<quinn::Connection> {
// Check cache — fast path.
{
let peers = self.peers.read().unwrap_or_else(|p| p.into_inner());
if let Some(conn) = peers.get(&target)
&& conn.close_reason().is_none()
{
return Ok(conn.clone());
}
}
// Resolve address.
let addr = {
let addrs = self.peer_addrs.read().unwrap_or_else(|p| p.into_inner());
addrs
.get(&target)
.copied()
.ok_or(ClusterError::NodeUnreachable { node_id: target })?
};
// Connect — bounded by rpc_timeout so a hung QUIC handshake
// (peer not yet serving) doesn't block for the full 30s idle timeout.
let connecting = self
.listener
.endpoint()
.connect_with(self.client_config.clone(), addr, SNI_HOSTNAME)
.map_err(|e| ClusterError::Transport {
detail: format!("connect to node {target} at {addr}: {e}"),
})?;
let conn = tokio::time::timeout(self.rpc_timeout, connecting)
.await
.map_err(|_| ClusterError::Transport {
detail: format!(
"handshake timeout ({}ms) with node {target} at {addr}",
self.rpc_timeout.as_millis()
),
})?
.map_err(|e| ClusterError::Transport {
detail: format!("handshake with node {target} at {addr}: {e}"),
})?;
debug!(target, %addr, "connected to peer");
// Open a dedicated bidi stream for the wire-version handshake.
// This must complete before any RPC stream is opened on this connection.
let agreed = {
let (mut hs_send, mut hs_recv) =
conn.open_bi().await.map_err(|e| ClusterError::Transport {
detail: format!("open handshake stream to node {target} at {addr}: {e}"),
})?;
let version = tokio::time::timeout(
self.rpc_timeout,
perform_version_handshake_client(&mut hs_send, &mut hs_recv),
)
.await
.map_err(|_| ClusterError::Transport {
detail: format!(
"handshake timeout ({}ms) with node {target} at {addr}",
self.rpc_timeout.as_millis()
),
})??;
// Finish the handshake send stream — the server reads it until FIN.
let _ = hs_send.finish();
version
};
let local = local_version_range();
debug!(
target,
%addr,
agreed_version = %agreed,
local_min = %local.min,
local_max = %local.max,
"wire version negotiated"
);
// Cache the agreed version keyed on the QUIC connection's stable id.
self.store_agreed_version(conn.stable_id(), agreed);
// Cache (harmless race: last writer wins, both connections valid).
let mut peers = self.peers.write().unwrap_or_else(|p| p.into_inner());
peers.insert(target, conn.clone());
Ok(conn)
}
/// Remove a cached connection (forces reconnect on next use).
pub(super) fn evict_peer(&self, target: u64) {
let stable_id = {
let peers = self.peers.read().unwrap_or_else(|p| p.into_inner());
peers.get(&target).map(|c| c.stable_id())
};
let mut peers = self.peers.write().unwrap_or_else(|p| p.into_inner());
peers.remove(&target);
drop(peers);
if let Some(id) = stable_id {
self.evict_agreed_version(id);
}
}
}