use crate::error::{LightningError, Result};
use crate::registry::MinerRegistry;
use crate::signing::Signer;
use crate::types::{
handshake_request_message, handshake_response_message, read_frame, write_frame_and_finish,
HandshakeRequest, HandshakeResponse, MessageType, PeerAddr, QuicAxonInfo, QuicRequest,
QuicResponse, StreamChunk, StreamEnd, SynapsePacket, SynapseResponse,
DEFAULT_MAX_FRAME_PAYLOAD,
};
use crate::util::unix_timestamp_secs;
use base64::{prelude::BASE64_STANDARD, Engine};
use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, TransportConfig};
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::ClientConfig as RustlsClientConfig;
use sp_core::blake2_256;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{debug, error, info, instrument, warn};
#[cfg(feature = "subtensor")]
use crate::metagraph::{Metagraph, MetagraphMonitorConfig};
#[cfg(feature = "subtensor")]
use subxt::{OnlineClient, PolkadotConfig};
#[derive(Clone)]
pub struct LightningClientConfig {
pub connect_timeout: Duration,
pub idle_timeout: Duration,
pub keep_alive_interval: Duration,
pub reconnect_initial_backoff: Duration,
pub reconnect_max_backoff: Duration,
pub reconnect_max_retries: u32,
pub reconnect_slow_probe_interval: Option<Duration>,
pub max_connections: usize,
pub max_frame_payload_bytes: usize,
pub max_stream_payload_bytes: usize,
pub stream_chunk_timeout: Option<Duration>,
#[cfg(feature = "subtensor")]
pub metagraph: Option<MetagraphMonitorConfig>,
}
impl Default for LightningClientConfig {
fn default() -> Self {
Self {
connect_timeout: Duration::from_secs(10),
idle_timeout: Duration::from_secs(150),
keep_alive_interval: Duration::from_secs(30),
reconnect_initial_backoff: Duration::from_secs(1),
reconnect_max_backoff: Duration::from_secs(60),
reconnect_max_retries: 5,
reconnect_slow_probe_interval: Some(Duration::from_secs(60)),
max_connections: 1024,
max_frame_payload_bytes: DEFAULT_MAX_FRAME_PAYLOAD,
max_stream_payload_bytes: DEFAULT_MAX_FRAME_PAYLOAD,
stream_chunk_timeout: None,
#[cfg(feature = "subtensor")]
metagraph: None,
}
}
}
impl LightningClientConfig {
pub fn builder() -> LightningClientConfigBuilder {
LightningClientConfigBuilder {
config: Self::default(),
}
}
fn validate(&self) -> Result<()> {
if self.connect_timeout.is_zero() {
return Err(LightningError::Config(
"connect_timeout must be non-zero".into(),
));
}
if self.idle_timeout.is_zero() {
return Err(LightningError::Config(
"idle_timeout must be non-zero".into(),
));
}
if self.keep_alive_interval.is_zero() {
return Err(LightningError::Config(
"keep_alive_interval must be non-zero".into(),
));
}
if self.keep_alive_interval >= self.idle_timeout {
return Err(LightningError::Config(format!(
"keep_alive_interval ({:?}) must be less than idle_timeout ({:?})",
self.keep_alive_interval, self.idle_timeout
)));
}
if self.reconnect_initial_backoff.is_zero() {
return Err(LightningError::Config(
"reconnect_initial_backoff must be non-zero".into(),
));
}
if self.reconnect_max_backoff.is_zero() {
return Err(LightningError::Config(
"reconnect_max_backoff must be non-zero".into(),
));
}
if self.reconnect_initial_backoff > self.reconnect_max_backoff {
return Err(LightningError::Config(format!(
"reconnect_initial_backoff ({:?}) must be <= reconnect_max_backoff ({:?})",
self.reconnect_initial_backoff, self.reconnect_max_backoff
)));
}
if self.reconnect_max_retries == 0 {
return Err(LightningError::Config(
"reconnect_max_retries must be at least 1".into(),
));
}
if self
.reconnect_slow_probe_interval
.is_some_and(|d| d.is_zero())
{
return Err(LightningError::Config(
"reconnect_slow_probe_interval must be non-zero when set".into(),
));
}
if self.max_connections == 0 {
return Err(LightningError::Config(
"max_connections must be at least 1".into(),
));
}
if self.max_frame_payload_bytes < 1_048_576 {
return Err(LightningError::Config(format!(
"max_frame_payload_bytes ({}) must be at least 1048576 (1 MB)",
self.max_frame_payload_bytes
)));
}
if self.max_frame_payload_bytes > u32::MAX as usize {
return Err(LightningError::Config(format!(
"max_frame_payload_bytes ({}) must not exceed {} (u32::MAX)",
self.max_frame_payload_bytes,
u32::MAX
)));
}
if self.stream_chunk_timeout.is_some_and(|d| d.is_zero()) {
return Err(LightningError::Config(
"stream_chunk_timeout must be non-zero".into(),
));
}
if self.max_stream_payload_bytes < self.max_frame_payload_bytes {
return Err(LightningError::Config(format!(
"max_stream_payload_bytes ({}) must be >= max_frame_payload_bytes ({})",
self.max_stream_payload_bytes, self.max_frame_payload_bytes
)));
}
Ok(())
}
}
pub struct LightningClientConfigBuilder {
config: LightningClientConfig,
}
impl LightningClientConfigBuilder {
pub fn connect_timeout(mut self, val: Duration) -> Self {
self.config.connect_timeout = val;
self
}
pub fn idle_timeout(mut self, val: Duration) -> Self {
self.config.idle_timeout = val;
self
}
pub fn keep_alive_interval(mut self, val: Duration) -> Self {
self.config.keep_alive_interval = val;
self
}
pub fn reconnect_initial_backoff(mut self, val: Duration) -> Self {
self.config.reconnect_initial_backoff = val;
self
}
pub fn reconnect_max_backoff(mut self, val: Duration) -> Self {
self.config.reconnect_max_backoff = val;
self
}
pub fn reconnect_max_retries(mut self, val: u32) -> Self {
self.config.reconnect_max_retries = val;
self
}
pub fn reconnect_slow_probe_interval(mut self, val: Option<Duration>) -> Self {
self.config.reconnect_slow_probe_interval = val;
self
}
pub fn max_connections(mut self, val: usize) -> Self {
self.config.max_connections = val;
self
}
pub fn max_frame_payload_bytes(mut self, val: usize) -> Self {
self.config.max_frame_payload_bytes = val;
self
}
pub fn max_stream_payload_bytes(mut self, val: usize) -> Self {
self.config.max_stream_payload_bytes = val;
self
}
pub fn stream_chunk_timeout(mut self, val: Duration) -> Self {
self.config.stream_chunk_timeout = Some(val);
self
}
#[cfg(feature = "subtensor")]
pub fn metagraph(mut self, val: MetagraphMonitorConfig) -> Self {
self.config.metagraph = Some(val);
self
}
pub fn build(self) -> Result<LightningClientConfig> {
self.config.validate()?;
Ok(self.config)
}
}
struct ClientState {
registry: MinerRegistry,
#[cfg(feature = "subtensor")]
metagraph_shutdown: Option<tokio::sync::watch::Sender<bool>>,
#[cfg(feature = "subtensor")]
metagraph_handle: Option<tokio::task::JoinHandle<()>>,
}
pub struct StreamingResponse {
recv: quinn::RecvStream,
max_payload: usize,
max_stream_payload: usize,
chunk_timeout: Option<Duration>,
}
impl StreamingResponse {
pub async fn next_chunk(&mut self) -> Result<Option<Vec<u8>>> {
let frame_result = match self.chunk_timeout {
Some(timeout) => {
match tokio::time::timeout(timeout, read_frame(&mut self.recv, self.max_payload))
.await
{
Ok(r) => r,
Err(_) => {
self.recv.stop(0u32.into()).ok();
return Err(LightningError::Stream("chunk read timed out".to_string()));
}
}
}
None => read_frame(&mut self.recv, self.max_payload).await,
};
match frame_result {
Ok((MessageType::StreamChunk, payload)) => {
let chunk: StreamChunk = rmp_serde::from_slice(&payload).map_err(|e| {
LightningError::Serialization(format!("Failed to parse stream chunk: {}", e))
})?;
Ok(Some(chunk.data))
}
Ok((MessageType::StreamEnd, payload)) => {
let end: StreamEnd = rmp_serde::from_slice(&payload).map_err(|e| {
LightningError::Serialization(format!("Failed to parse stream end: {}", e))
})?;
if end.success {
Ok(None)
} else {
Err(LightningError::Stream(end.error.unwrap_or_else(|| {
"stream ended with failure status".to_string()
})))
}
}
Ok((MessageType::SynapseResponse, payload)) => {
let detail = rmp_serde::from_slice::<SynapseResponse>(&payload)
.ok()
.and_then(|r| r.error)
.unwrap_or_else(|| "no detail".to_string());
Err(LightningError::Stream(format!(
"server returned SynapseResponse error on streaming path: {}",
detail
)))
}
Ok((msg_type, _)) => Err(LightningError::Stream(format!(
"unexpected message type during streaming: {:?}",
msg_type
))),
Err(e) => Err(e),
}
}
pub async fn collect_all(&mut self) -> Result<Vec<Vec<u8>>> {
let mut chunks = Vec::new();
let mut total_size: usize = 0;
while let Some(chunk) = self.next_chunk().await? {
total_size = total_size.checked_add(chunk.len()).ok_or_else(|| {
LightningError::Stream("streaming response size overflow".to_string())
})?;
if total_size > self.max_stream_payload {
return Err(LightningError::Stream(format!(
"streaming response exceeded {} byte aggregate limit",
self.max_stream_payload
)));
}
chunks.push(chunk);
}
Ok(chunks)
}
}
pub struct LightningClient {
config: LightningClientConfig,
wallet_hotkey: String,
signer: Option<Arc<dyn Signer>>,
state: Arc<RwLock<ClientState>>,
endpoint: Option<Endpoint>,
}
impl LightningClient {
pub fn new(wallet_hotkey: String) -> Self {
Self::with_config(wallet_hotkey, LightningClientConfig::default())
.expect("default config is always valid")
}
pub fn with_config(wallet_hotkey: String, config: LightningClientConfig) -> Result<Self> {
config.validate()?;
Ok(Self {
config,
wallet_hotkey,
signer: None,
state: Arc::new(RwLock::new(ClientState {
registry: MinerRegistry::new(),
#[cfg(feature = "subtensor")]
metagraph_shutdown: None,
#[cfg(feature = "subtensor")]
metagraph_handle: None,
})),
endpoint: None,
})
}
pub fn set_signer(&mut self, signer: Box<dyn Signer>) {
self.signer = Some(Arc::from(signer));
info!("Signer configured");
}
#[cfg(feature = "btwallet")]
pub fn set_wallet(
&mut self,
wallet_name: &str,
wallet_path: &str,
hotkey_name: &str,
) -> Result<()> {
let signer =
crate::signing::BtWalletSigner::from_wallet(wallet_name, wallet_path, hotkey_name)?;
self.set_signer(Box::new(signer));
Ok(())
}
#[instrument(skip(self, miners), fields(miner_count = miners.len()))]
pub async fn initialize_connections(&mut self, miners: Vec<QuicAxonInfo>) -> Result<()> {
self.create_endpoint().await?;
let endpoint = self
.endpoint
.as_ref()
.ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
.clone();
let wallet_hotkey = self.wallet_hotkey.clone();
let signer = self
.signer
.as_ref()
.ok_or_else(|| LightningError::Signing("No signer configured".into()))?
.clone();
let timeout = self.config.connect_timeout;
let mut addr_groups: HashMap<PeerAddr, Vec<QuicAxonInfo>> = HashMap::new();
for miner in miners {
addr_groups.entry(miner.addr_key()).or_default().push(miner);
}
let (active_count, remaining_capacity) = {
let state = self.state.read().await;
let active = state.registry.connection_count();
(active, self.config.max_connections.saturating_sub(active))
};
let addr_groups: Vec<(PeerAddr, Vec<QuicAxonInfo>)> =
if addr_groups.len() > remaining_capacity {
warn!(
"Connection limit ({}) reached with {} active, skipping {} of {} new addresses",
self.config.max_connections,
active_count,
addr_groups.len() - remaining_capacity,
addr_groups.len()
);
addr_groups.into_iter().take(remaining_capacity).collect()
} else {
addr_groups.into_iter().collect()
};
let max_fp = self.config.max_frame_payload_bytes;
let mut set = tokio::task::JoinSet::new();
for (addr_key, miners_at_addr) in addr_groups {
let ep = endpoint.clone();
let wh = wallet_hotkey.clone();
let s = signer.clone();
set.spawn(connect_and_authenticate_per_address(
ep,
wh,
s,
addr_key,
miners_at_addr,
timeout,
max_fp,
));
}
let mut results = Vec::new();
while let Some(join_result) = set.join_next().await {
match join_result {
Ok((addr_key, conn_result, authenticated)) => {
results.push((addr_key, conn_result, authenticated));
}
Err(e) => {
error!("Connection task panicked: {}", e);
}
}
}
let mut state = self.state.write().await;
for (addr_key, conn_result, authenticated) in results {
match conn_result {
Ok(connection) => {
if authenticated.is_empty() {
warn!(
"No hotkeys authenticated at {}, dropping connection",
addr_key
);
connection.close(0u32.into(), b"no_authenticated_hotkeys");
} else {
for miner in authenticated {
info!("Authenticated miner {} at {}", miner.hotkey, addr_key);
state.registry.register(miner);
}
state.registry.set_connection(addr_key, connection);
}
}
Err(e) => {
error!("Failed to connect to {}: {}", addr_key, e);
}
}
}
#[cfg(feature = "subtensor")]
if let Some(metagraph_config) = self.config.metagraph.clone() {
self.start_metagraph_monitor(metagraph_config).await?;
}
Ok(())
}
#[instrument(skip(self))]
pub async fn create_endpoint(&mut self) -> Result<()> {
let mut tls_config = RustlsClientConfig::builder_with_provider(
rustls::crypto::ring::default_provider().into(),
)
.with_safe_default_protocol_versions()
.map_err(|e| LightningError::Config(format!("Failed to set TLS versions: {}", e)))?
.dangerous()
.with_custom_certificate_verifier(Arc::new(AcceptAnyCertVerifier))
.with_no_client_auth();
tls_config.alpn_protocols = vec![b"btlightning".to_vec()];
let mut transport_config = TransportConfig::default();
let idle_timeout = IdleTimeout::try_from(self.config.idle_timeout)
.map_err(|e| LightningError::Config(format!("Failed to set idle timeout: {}", e)))?;
transport_config.max_idle_timeout(Some(idle_timeout));
transport_config.keep_alive_interval(Some(self.config.keep_alive_interval));
let quic_crypto =
quinn::crypto::rustls::QuicClientConfig::try_from(tls_config).map_err(|e| {
LightningError::Config(format!("Failed to create QUIC crypto config: {}", e))
})?;
let mut client_config = ClientConfig::new(Arc::new(quic_crypto));
client_config.transport_config(Arc::new(transport_config));
let bind_addr: SocketAddr = "0.0.0.0:0"
.parse()
.map_err(|e| LightningError::Config(format!("Failed to parse bind address: {}", e)))?;
let mut endpoint = Endpoint::client(bind_addr).map_err(|e| {
LightningError::Connection(format!("Failed to create QUIC endpoint: {}", e))
})?;
endpoint.set_default_client_config(client_config);
self.endpoint = Some(endpoint);
info!("QUIC client endpoint created");
Ok(())
}
#[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port))]
pub async fn query_axon(
&self,
axon_info: QuicAxonInfo,
request: QuicRequest,
) -> Result<QuicResponse> {
let addr_key = axon_info.addr_key();
let connection = {
let state = self.state.read().await;
state.registry.get_connection(&addr_key)
};
let max_fp = self.config.max_frame_payload_bytes;
match connection {
Some(conn) if conn.close_reason().is_none() => {
debug!(
addr = %addr_key,
stable_id = conn.stable_id(),
"query_axon: connection alive, sending synapse"
);
send_synapse_packet(&conn, request, max_fp).await
}
Some(conn) => {
let reason = conn.close_reason();
warn!(
addr = %addr_key,
stable_id = conn.stable_id(),
close_reason = ?reason,
"QUIC connection closed, triggering reconnect"
);
self.try_reconnect_and_query(&addr_key, &axon_info, request)
.await
}
None => {
debug!(addr = %addr_key, "query_axon: no connection in registry");
self.try_reconnect_and_query(&addr_key, &axon_info, request)
.await
}
}
}
#[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port, timeout_ms = timeout.as_millis() as u64))]
pub async fn query_axon_with_timeout(
&self,
axon_info: QuicAxonInfo,
request: QuicRequest,
timeout: Duration,
) -> Result<QuicResponse> {
tokio::time::timeout(timeout, self.query_axon(axon_info, request))
.await
.map_err(|_| LightningError::Transport("query timed out".into()))?
}
#[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port))]
pub async fn query_axon_stream(
&self,
axon_info: QuicAxonInfo,
request: QuicRequest,
) -> Result<StreamingResponse> {
let addr_key = axon_info.addr_key();
let connection = {
let state = self.state.read().await;
state.registry.get_connection(&addr_key)
};
let max_fp = self.config.max_frame_payload_bytes;
let max_sp = self.config.max_stream_payload_bytes;
match connection {
Some(conn) if conn.close_reason().is_none() => {
open_streaming_synapse(
&conn,
request,
max_fp,
max_sp,
self.config.stream_chunk_timeout,
)
.await
}
Some(conn) => {
let reason = conn.close_reason();
warn!(
addr = %addr_key,
close_reason = ?reason,
"QUIC connection closed, triggering reconnect (stream)"
);
self.try_reconnect_and_stream(&addr_key, &axon_info, request)
.await
}
None => {
self.try_reconnect_and_stream(&addr_key, &axon_info, request)
.await
}
}
}
async fn try_reconnect_and_query(
&self,
addr_key: &PeerAddr,
axon_info: &QuicAxonInfo,
request: QuicRequest,
) -> Result<QuicResponse> {
let connection = self.try_reconnect(addr_key, axon_info).await?;
send_synapse_packet(&connection, request, self.config.max_frame_payload_bytes).await
}
async fn try_reconnect_and_stream(
&self,
addr_key: &PeerAddr,
axon_info: &QuicAxonInfo,
request: QuicRequest,
) -> Result<StreamingResponse> {
let connection = self.try_reconnect(addr_key, axon_info).await?;
open_streaming_synapse(
&connection,
request,
self.config.max_frame_payload_bytes,
self.config.max_stream_payload_bytes,
self.config.stream_chunk_timeout,
)
.await
}
async fn try_reconnect(
&self,
addr_key: &PeerAddr,
axon_info: &QuicAxonInfo,
) -> Result<Connection> {
let endpoint = self
.endpoint
.as_ref()
.ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
.clone();
let signer = self
.signer
.as_ref()
.ok_or_else(|| LightningError::Signing("No signer configured".into()))?
.clone();
{
let mut state = self.state.write().await;
if let Err(rejection) = state.registry.try_start_reconnect(
addr_key.clone(),
self.config.reconnect_max_retries,
self.config.reconnect_slow_probe_interval,
) {
use crate::registry::ReconnectRejection;
return match rejection {
ReconnectRejection::Backoff { next } => {
Err(LightningError::Connection(format!(
"Reconnection to {} in backoff, next retry in {:?}",
addr_key,
next.saturating_duration_since(Instant::now())
)))
}
ReconnectRejection::Exhausted { attempts } => {
Err(LightningError::Connection(format!(
"Reconnection attempts exhausted for {} ({}/{}), awaiting registry refresh",
addr_key, attempts, self.config.reconnect_max_retries
)))
}
ReconnectRejection::InProgress => {
Err(LightningError::Connection(format!(
"Reconnection to {} already in progress",
addr_key
)))
}
};
}
}
warn!("Connection to {} dead, attempting reconnection", addr_key);
let reconnect_result = tokio::time::timeout(
self.config.connect_timeout,
connect_and_handshake(
endpoint,
axon_info.clone(),
self.wallet_hotkey.clone(),
signer.clone(),
self.config.max_frame_payload_bytes,
),
)
.await;
let reconnect_result = match reconnect_result {
Ok(r) => r,
Err(_) => Err(LightningError::Connection(format!(
"Reconnection to {} timed out",
addr_key
))),
};
match reconnect_result {
Ok(connection) => {
let co_located: Vec<String> = {
let state = self.state.read().await;
state
.registry
.hotkeys_at_addr(addr_key)
.into_iter()
.filter(|hk| *hk != axon_info.hotkey)
.collect()
};
let mut failed_hotkeys = Vec::new();
for hk in &co_located {
match tokio::time::timeout(
self.config.connect_timeout,
authenticate_handshake(
&connection,
hk,
&self.wallet_hotkey,
&signer,
self.config.max_frame_payload_bytes,
),
)
.await
{
Ok(Ok(())) => {
info!(
"Re-authenticated co-located miner {} on reconnected {}",
hk, addr_key
);
}
Ok(Err(e)) => {
warn!(
"Re-authentication failed for co-located miner {} at {}: {}",
hk, addr_key, e
);
failed_hotkeys.push(hk.clone());
}
Err(_) => {
warn!(
"Re-authentication timed out for co-located miner {} at {}",
hk, addr_key
);
failed_hotkeys.push(hk.clone());
}
}
}
let mut state = self.state.write().await;
for hk in &failed_hotkeys {
state.registry.deregister(hk);
}
state.registry.register(axon_info.clone());
state
.registry
.set_connection(addr_key.clone(), connection.clone());
state.registry.remove_reconnect_state(addr_key);
info!("Reconnected to {}", addr_key);
Ok(connection)
}
Err(e) => {
let mut state = self.state.write().await;
let rs = state.registry.reconnect_state_or_insert(addr_key.clone());
rs.in_progress = false;
let shift = rs.attempts.min(20);
rs.attempts += 1;
let in_slow_probe = rs.attempts >= self.config.reconnect_max_retries;
if in_slow_probe {
if let Some(probe_interval) = self.config.reconnect_slow_probe_interval {
rs.next_retry_at = Instant::now() + probe_interval;
warn!(
"Slow probe to {} failed, next probe in {:?}: {}",
addr_key, probe_interval, e
);
}
} else {
let backoff = self
.config
.reconnect_initial_backoff
.checked_mul(2u32.pow(shift))
.map(|d| d.min(self.config.reconnect_max_backoff))
.unwrap_or(self.config.reconnect_max_backoff);
rs.next_retry_at = Instant::now() + backoff;
error!(
"Reconnection to {} failed (attempt {}/{}), next retry in {:?}: {}",
addr_key, rs.attempts, self.config.reconnect_max_retries, backoff, e
);
}
Err(e)
}
}
}
#[instrument(skip(self, miners), fields(miner_count = miners.len()))]
pub async fn update_miner_registry(&self, miners: Vec<QuicAxonInfo>) -> Result<()> {
let endpoint = self
.endpoint
.as_ref()
.ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
.clone();
let signer = self
.signer
.as_ref()
.ok_or_else(|| LightningError::Signing("No signer configured".into()))?
.clone();
update_miner_registry_inner(
&self.state,
&endpoint,
&self.wallet_hotkey,
&signer,
&self.config,
miners,
)
.await
}
#[instrument(skip(self))]
pub async fn get_connection_stats(&self) -> Result<HashMap<String, String>> {
let state = self.state.read().await;
let mut stats = HashMap::new();
stats.insert(
"total_connections".to_string(),
state.registry.connection_count().to_string(),
);
stats.insert(
"active_miners".to_string(),
state.registry.active_miner_count().to_string(),
);
for addr_key in state.registry.connection_addrs() {
let status = match state.registry.get_connection(addr_key) {
Some(conn) => {
if let Some(reason) = conn.close_reason() {
format!("closed({:?})", reason)
} else {
"active".to_string()
}
}
None => "missing".to_string(),
};
stats.insert(format!("connection_{}", addr_key), status);
}
Ok(stats)
}
#[cfg(feature = "subtensor")]
pub async fn start_metagraph_monitor(
&self,
monitor_config: MetagraphMonitorConfig,
) -> Result<()> {
if monitor_config.sync_interval.is_zero() {
return Err(LightningError::Config(
"sync_interval must be non-zero".into(),
));
}
self.stop_metagraph_monitor().await;
let endpoint = self
.endpoint
.as_ref()
.ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
.clone();
let signer = self
.signer
.as_ref()
.ok_or_else(|| LightningError::Signing("No signer configured".into()))?
.clone();
let subtensor = tokio::time::timeout(
Duration::from_secs(30),
OnlineClient::<PolkadotConfig>::from_url(&monitor_config.subtensor_endpoint),
)
.await
.map_err(|_| LightningError::Handler("subtensor connection timed out after 30s".into()))?
.map_err(|e| LightningError::Handler(format!("connecting to subtensor: {}", e)))?;
let mut metagraph = Metagraph::new(monitor_config.netuid);
tokio::time::timeout(Duration::from_secs(60), metagraph.sync(&subtensor))
.await
.map_err(|_| {
LightningError::Handler("initial metagraph sync timed out after 60s".into())
})??;
let miners = metagraph.quic_miners();
info!(
netuid = monitor_config.netuid,
miners = miners.len(),
"initial metagraph sync complete"
);
update_miner_registry_inner(
&self.state,
&endpoint,
&self.wallet_hotkey,
&signer,
&self.config,
miners,
)
.await?;
let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
let state = self.state.clone();
let wallet_hotkey = self.wallet_hotkey.clone();
let config = self.config.clone();
let sync_interval = monitor_config.sync_interval;
let subtensor_url = monitor_config.subtensor_endpoint.clone();
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(sync_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
let mut subtensor = subtensor;
loop {
tokio::select! {
_ = interval.tick() => {}
_ = shutdown_rx.changed() => {
info!("metagraph monitor shutting down");
return;
}
}
let sync_result =
tokio::time::timeout(Duration::from_secs(60), metagraph.sync(&subtensor)).await;
let needs_reconnect = match sync_result {
Ok(Ok(())) => {
let miners = metagraph.quic_miners();
info!(
netuid = metagraph.netuid,
miners = miners.len(),
block = metagraph.block,
"metagraph resync complete"
);
if let Err(e) = update_miner_registry_inner(
&state,
&endpoint,
&wallet_hotkey,
&signer,
&config,
miners,
)
.await
{
error!("registry update after metagraph sync failed: {}", e);
}
false
}
Ok(Err(e)) => {
error!("metagraph sync failed, reconnecting to subtensor: {}", e);
true
}
Err(_) => {
error!("metagraph sync timed out after 60s, reconnecting to subtensor");
true
}
};
if needs_reconnect {
match tokio::time::timeout(
Duration::from_secs(30),
OnlineClient::<PolkadotConfig>::from_url(&subtensor_url),
)
.await
{
Ok(Ok(new_client)) => {
subtensor = new_client;
info!("subtensor client reconnected");
}
Ok(Err(e)) => {
error!("subtensor reconnection failed: {}", e);
}
Err(_) => {
error!("subtensor reconnection timed out after 30s");
}
}
}
}
});
let mut st = self.state.write().await;
st.metagraph_shutdown = Some(shutdown_tx);
st.metagraph_handle = Some(handle);
Ok(())
}
#[cfg(feature = "subtensor")]
pub async fn stop_metagraph_monitor(&self) {
let (shutdown_tx, handle) = {
let mut st = self.state.write().await;
(st.metagraph_shutdown.take(), st.metagraph_handle.take())
};
if let Some(tx) = shutdown_tx {
let _ = tx.send(true);
}
if let Some(mut handle) = handle {
if tokio::time::timeout(Duration::from_secs(5), &mut handle)
.await
.is_err()
{
warn!("metagraph monitor did not shut down within 5s, aborting");
handle.abort();
let _ = handle.await;
}
}
}
#[instrument(skip(self))]
pub async fn close_all_connections(&self) -> Result<()> {
#[cfg(feature = "subtensor")]
self.stop_metagraph_monitor().await;
let mut state = self.state.write().await;
for (_, connection) in state.registry.drain_connections() {
connection.close(0u32.into(), b"client_shutdown");
}
state.registry.clear();
info!("All Lightning QUIC connections closed");
Ok(())
}
}
async fn update_miner_registry_inner(
state: &Arc<RwLock<ClientState>>,
endpoint: &Endpoint,
wallet_hotkey: &str,
signer: &Arc<dyn Signer>,
config: &LightningClientConfig,
miners: Vec<QuicAxonInfo>,
) -> Result<()> {
let new_by_hotkey: HashMap<String, QuicAxonInfo> = miners
.iter()
.map(|m| (m.hotkey.clone(), m.clone()))
.collect();
let new_hotkeys_needing_auth: Vec<QuicAxonInfo>;
let new_addrs_needing_connect: HashMap<PeerAddr, Vec<QuicAxonInfo>>;
{
let mut st = state.write().await;
let active_hotkeys = st.registry.active_hotkeys();
for hotkey in active_hotkeys {
if !new_by_hotkey.contains_key(&hotkey) {
if let Some(miner) = st.registry.deregister(&hotkey) {
let addr_key = miner.addr_key();
info!("Miner {} deregistered from {}", hotkey, addr_key);
if !st.registry.addr_has_hotkeys(&addr_key) {
if let Some(connection) = st.registry.remove_connection(&addr_key) {
connection.close(0u32.into(), b"miner_deregistered");
}
st.registry.remove_reconnect_state(&addr_key);
}
}
}
}
let active_addrs = st.registry.active_addrs();
for addr_key in &active_addrs {
if st.registry.remove_reconnect_state(addr_key) {
info!(
"Registry refresh reset reconnection backoff for {}",
addr_key
);
}
}
let dead_addrs: Vec<PeerAddr> = active_addrs
.iter()
.filter(|addr| {
st.registry
.get_connection(addr)
.is_some_and(|c| c.close_reason().is_some())
})
.cloned()
.collect();
for addr_key in &dead_addrs {
if let Some(conn) = st.registry.remove_connection(addr_key) {
let hotkeys = st.registry.hotkeys_at_addr(addr_key);
info!(
addr = %addr_key,
close_reason = ?conn.close_reason(),
hotkeys = ?hotkeys,
"Pruning dead connection and deregistering miners"
);
for hk in &hotkeys {
st.registry.deregister(hk);
}
}
}
for new_miner in new_by_hotkey.values() {
if let Some(old_miner) = st.registry.active_miner(&new_miner.hotkey) {
let old_addr = old_miner.addr_key();
let new_addr = new_miner.addr_key();
if old_addr != new_addr {
info!(
"Miner {} changed address from {} to {}",
new_miner.hotkey, old_addr, new_addr
);
st.registry.deregister(&new_miner.hotkey);
if !st.registry.addr_has_hotkeys(&old_addr) {
if let Some(conn) = st.registry.remove_connection(&old_addr) {
conn.close(0u32.into(), b"miner_addr_changed");
}
st.registry.remove_reconnect_state(&old_addr);
}
}
}
}
let new_hotkeys: Vec<QuicAxonInfo> = new_by_hotkey
.values()
.filter(|m| !st.registry.contains_active_miner(&m.hotkey))
.cloned()
.collect();
let mut need_auth = Vec::new();
let mut need_connect: HashMap<PeerAddr, Vec<QuicAxonInfo>> = HashMap::new();
for miner in new_hotkeys {
let addr_key = miner.addr_key();
if st.registry.contains_connection(&addr_key) {
need_auth.push(miner);
} else {
need_connect.entry(addr_key).or_default().push(miner);
}
}
let active_count = st.registry.connection_count();
let remaining_capacity = config.max_connections.saturating_sub(active_count);
if need_connect.len() > remaining_capacity {
warn!(
"Connection limit ({}) reached with {} active, skipping {} of {} new addresses",
config.max_connections,
active_count,
need_connect.len() - remaining_capacity,
need_connect.len()
);
}
new_hotkeys_needing_auth = need_auth;
new_addrs_needing_connect = need_connect.into_iter().take(remaining_capacity).collect();
}
let timeout = config.connect_timeout;
let max_fp = config.max_frame_payload_bytes;
if !new_hotkeys_needing_auth.is_empty() {
let miners_with_conns: Vec<(QuicAxonInfo, Connection)> = {
let st = state.read().await;
new_hotkeys_needing_auth
.into_iter()
.filter_map(|miner| {
let addr_key = miner.addr_key();
st.registry
.get_connection(&addr_key)
.map(|conn| (miner, conn))
})
.collect()
};
let mut authenticated = Vec::new();
for (miner, conn) in &miners_with_conns {
let addr_key = miner.addr_key();
match tokio::time::timeout(
timeout,
authenticate_handshake(conn, &miner.hotkey, wallet_hotkey, signer, max_fp),
)
.await
{
Ok(Ok(())) => {
info!(
"Authenticated new miner {} on existing connection to {}",
miner.hotkey, addr_key
);
authenticated.push(miner.clone());
}
Ok(Err(e)) => {
warn!(
"Handshake failed for new hotkey {} at {}: {}",
miner.hotkey, addr_key, e
);
}
Err(_) => {
warn!(
"Handshake timed out for new hotkey {} at {}",
miner.hotkey, addr_key
);
}
}
}
let mut st = state.write().await;
for miner in authenticated {
st.registry.register(miner);
}
}
if !new_addrs_needing_connect.is_empty() {
let mut set = tokio::task::JoinSet::new();
for (addr_key, miners_at_addr) in new_addrs_needing_connect {
info!(
"New address detected, establishing QUIC connection: {}",
addr_key
);
let ep = endpoint.clone();
let wh = wallet_hotkey.to_string();
let s = signer.clone();
set.spawn(connect_and_authenticate_per_address(
ep,
wh,
s,
addr_key,
miners_at_addr,
timeout,
max_fp,
));
}
let mut results = Vec::new();
while let Some(join_result) = set.join_next().await {
match join_result {
Ok((addr_key, conn_result, authenticated)) => {
results.push((addr_key, conn_result, authenticated));
}
Err(e) => {
error!("Connection task panicked: {}", e);
}
}
}
let mut st = state.write().await;
for (addr_key, conn_result, authenticated) in results {
match conn_result {
Ok(connection) => {
if authenticated.is_empty() {
warn!(
"No hotkeys authenticated at {}, dropping connection",
addr_key
);
connection.close(0u32.into(), b"no_authenticated_hotkeys");
} else {
for miner in authenticated {
st.registry.register(miner);
}
st.registry.set_connection(addr_key, connection);
}
}
Err(e) => {
error!("Failed to connect to {}: {}", addr_key, e);
}
}
}
}
Ok(())
}
fn get_peer_cert_fingerprint(connection: &Connection) -> Option<[u8; 32]> {
let identity = connection.peer_identity()?;
let certs = identity.downcast::<Vec<CertificateDer<'static>>>().ok()?;
let first = certs.first()?;
Some(blake2_256(first.as_ref()))
}
async fn quic_connect(
endpoint: &Endpoint,
addr_key: &PeerAddr,
server_name: &str,
) -> Result<Connection> {
let addr: SocketAddr = addr_key
.as_ref()
.parse()
.map_err(|e| LightningError::Connection(format!("Invalid address: {}", e)))?;
endpoint
.connect(addr, server_name)
.map_err(|e| LightningError::Connection(format!("Connection failed: {}", e)))?
.await
.map_err(|e| LightningError::Connection(format!("Connection handshake failed: {}", e)))
}
async fn connect_and_authenticate_per_address(
endpoint: Endpoint,
wallet_hotkey: String,
signer: Arc<dyn Signer>,
addr_key: PeerAddr,
miners_at_addr: Vec<QuicAxonInfo>,
timeout: Duration,
max_frame_payload: usize,
) -> (PeerAddr, Result<Connection>, Vec<QuicAxonInfo>) {
let first = match miners_at_addr.first() {
Some(m) => m,
None => {
return (
addr_key,
Err(LightningError::Connection("no miners for address".into())),
vec![],
);
}
};
let conn = match tokio::time::timeout(timeout, quic_connect(&endpoint, &addr_key, &first.ip))
.await
{
Ok(Ok(c)) => c,
Ok(Err(e)) => return (addr_key, Err(e), vec![]),
Err(_) => {
let err = LightningError::Connection(format!("Connection to {} timed out", addr_key));
return (addr_key, Err(err), vec![]);
}
};
let mut authenticated = Vec::new();
for miner in &miners_at_addr {
match tokio::time::timeout(
timeout,
authenticate_handshake(
&conn,
&miner.hotkey,
&wallet_hotkey,
&signer,
max_frame_payload,
),
)
.await
{
Ok(Ok(())) => authenticated.push(miner.clone()),
Ok(Err(e)) => {
warn!(
"Handshake failed for hotkey {} at {}: {}",
miner.hotkey, addr_key, e
);
}
Err(_) => {
warn!(
"Handshake timed out for hotkey {} at {}",
miner.hotkey, addr_key
);
}
}
}
(addr_key, Ok(conn), authenticated)
}
async fn authenticate_handshake(
connection: &Connection,
expected_hotkey: &str,
wallet_hotkey: &str,
signer: &Arc<dyn Signer>,
max_frame_payload: usize,
) -> Result<()> {
let peer_cert_fp = get_peer_cert_fingerprint(connection).ok_or_else(|| {
LightningError::Handshake("peer certificate not available for fingerprinting".to_string())
})?;
let peer_cert_fp_b64 = BASE64_STANDARD.encode(peer_cert_fp);
let nonce = generate_nonce();
let timestamp = unix_timestamp_secs();
let message = handshake_request_message(wallet_hotkey, timestamp, &nonce, &peer_cert_fp_b64);
let msg_bytes = message.into_bytes();
let signer_clone = signer.clone();
let signature_bytes = tokio::task::spawn_blocking(move || signer_clone.sign(&msg_bytes))
.await
.map_err(|e| LightningError::Signing(format!("signer task failed: {}", e)))??;
let handshake_request = HandshakeRequest {
validator_hotkey: wallet_hotkey.to_string(),
timestamp,
nonce: nonce.clone(),
signature: BASE64_STANDARD.encode(&signature_bytes),
};
let response = send_handshake(connection, handshake_request, max_frame_payload).await?;
if !response.accepted {
return Err(LightningError::Handshake(
"Handshake rejected by miner".into(),
));
}
if response.miner_hotkey != expected_hotkey {
return Err(LightningError::Handshake(format!(
"Miner hotkey mismatch: expected {}, got {}",
expected_hotkey, response.miner_hotkey
)));
}
match response.cert_fingerprint {
Some(ref resp_fp) if *resp_fp == peer_cert_fp_b64 => {}
Some(_) => {
return Err(LightningError::Handshake(
"Cert fingerprint mismatch between TLS session and handshake response".to_string(),
));
}
None => {
return Err(LightningError::Handshake(
"Miner handshake response omitted required cert fingerprint".to_string(),
));
}
}
verify_miner_response_signature(&response, wallet_hotkey, &nonce, &peer_cert_fp_b64).await?;
info!("Handshake successful with miner {}", expected_hotkey);
Ok(())
}
async fn connect_and_handshake(
endpoint: Endpoint,
miner: QuicAxonInfo,
wallet_hotkey: String,
signer: Arc<dyn Signer>,
max_frame_payload: usize,
) -> Result<Connection> {
let addr_key = miner.addr_key();
let connection = quic_connect(&endpoint, &addr_key, &miner.ip).await?;
authenticate_handshake(
&connection,
&miner.hotkey,
&wallet_hotkey,
&signer,
max_frame_payload,
)
.await?;
Ok(connection)
}
async fn verify_miner_response_signature(
response: &HandshakeResponse,
validator_hotkey: &str,
nonce: &str,
cert_fp_b64: &str,
) -> Result<()> {
if response.signature.is_empty() {
return Err(LightningError::Handshake(
"Miner returned empty signature".to_string(),
));
}
let expected_message = handshake_response_message(
validator_hotkey,
&response.miner_hotkey,
response.timestamp,
nonce,
cert_fp_b64,
);
let valid = crate::signing::verify_sr25519_signature(
&response.miner_hotkey,
&response.signature,
&expected_message,
)
.await?;
if !valid {
return Err(LightningError::Handshake(
"Miner response signature verification failed".to_string(),
));
}
Ok(())
}
async fn send_handshake(
connection: &Connection,
request: HandshakeRequest,
max_frame_payload: usize,
) -> Result<HandshakeResponse> {
let (mut send, mut recv) = connection.open_bi().await.map_err(|e| {
LightningError::Connection(format!("Failed to open bidirectional stream: {}", e))
})?;
let request_bytes = rmp_serde::to_vec(&request).map_err(|e| {
LightningError::Serialization(format!("Failed to serialize handshake: {}", e))
})?;
write_frame_and_finish(&mut send, MessageType::HandshakeRequest, &request_bytes).await?;
let (msg_type, payload) = read_frame(&mut recv, max_frame_payload).await?;
if msg_type != MessageType::HandshakeResponse {
return Err(LightningError::Handshake(format!(
"Expected HandshakeResponse, got {:?}",
msg_type
)));
}
let response: HandshakeResponse = rmp_serde::from_slice(&payload).map_err(|e| {
LightningError::Serialization(format!("Failed to parse handshake response: {}", e))
})?;
Ok(response)
}
async fn send_synapse_frame(send: &mut quinn::SendStream, request: QuicRequest) -> Result<()> {
let synapse_packet = SynapsePacket {
synapse_type: request.synapse_type,
data: request.data,
timestamp: unix_timestamp_secs(),
};
let packet_bytes = rmp_serde::to_vec(&synapse_packet).map_err(|e| {
LightningError::Serialization(format!("Failed to serialize synapse packet: {}", e))
})?;
write_frame_and_finish(send, MessageType::SynapsePacket, &packet_bytes).await
}
async fn send_synapse_packet(
connection: &Connection,
request: QuicRequest,
max_frame_payload: usize,
) -> Result<QuicResponse> {
let stable_id = connection.stable_id();
debug!(stable_id, "send_synapse_packet: opening bi stream");
let (mut send, mut recv) = connection
.open_bi()
.await
.map_err(|e| LightningError::Connection(format!("Failed to open stream: {}", e)))?;
debug!(stable_id, "send_synapse_packet: bi stream opened");
let start = Instant::now();
send_synapse_frame(&mut send, request).await?;
debug!(
stable_id,
"send_synapse_packet: frame sent, awaiting response"
);
let (msg_type, payload) = read_frame(&mut recv, max_frame_payload).await?;
debug!(stable_id, msg_type = ?msg_type, elapsed_ms = start.elapsed().as_millis() as u64, "send_synapse_packet: response received");
match msg_type {
MessageType::SynapseResponse => {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
let synapse_response: SynapseResponse =
rmp_serde::from_slice(&payload).map_err(|e| {
LightningError::Serialization(format!(
"Failed to parse synapse response: {}",
e
))
})?;
Ok(QuicResponse {
success: synapse_response.success,
data: synapse_response.data,
latency_ms,
error: synapse_response.error,
})
}
MessageType::StreamChunk => Err(LightningError::Transport(
"received StreamChunk on non-streaming query; use query_axon_stream for streaming synapses".to_string(),
)),
other => Err(LightningError::Transport(format!(
"unexpected response type: {:?}",
other
))),
}
}
async fn open_streaming_synapse(
connection: &Connection,
request: QuicRequest,
max_frame_payload: usize,
max_stream_payload: usize,
chunk_timeout: Option<Duration>,
) -> Result<StreamingResponse> {
let (mut send, recv) = connection
.open_bi()
.await
.map_err(|e| LightningError::Connection(format!("Failed to open stream: {}", e)))?;
send_synapse_frame(&mut send, request).await?;
Ok(StreamingResponse {
recv,
max_payload: max_frame_payload,
max_stream_payload,
chunk_timeout,
})
}
fn generate_nonce() -> String {
use rand::Rng;
let bytes: [u8; 16] = rand::thread_rng().gen();
format!("{:032x}", u128::from_be_bytes(bytes))
}
#[cfg(test)]
mod tests {
use super::*;
use sp_core::{crypto::Ss58Codec, sr25519, Pair};
const MINER_SEED: [u8; 32] = [1u8; 32];
const VALIDATOR_SEED: [u8; 32] = [2u8; 32];
fn make_signed_response(
miner_seed: [u8; 32],
validator_hotkey: &str,
nonce: &str,
cert_fp_b64: &str,
) -> HandshakeResponse {
let pair = sr25519::Pair::from_seed(&miner_seed);
let miner_hotkey = pair.public().to_ss58check();
let timestamp = unix_timestamp_secs();
let message = handshake_response_message(
validator_hotkey,
&miner_hotkey,
timestamp,
nonce,
cert_fp_b64,
);
let signature = pair.sign(message.as_bytes());
HandshakeResponse {
miner_hotkey,
timestamp,
signature: BASE64_STANDARD.encode(signature.0),
accepted: true,
connection_id: "test".to_string(),
cert_fingerprint: Some(cert_fp_b64.to_string()),
}
}
fn validator_hotkey() -> String {
sr25519::Pair::from_seed(&VALIDATOR_SEED)
.public()
.to_ss58check()
}
#[tokio::test]
async fn verify_valid_miner_signature() {
let nonce = "test-nonce";
let fp = "dGVzdC1mcA==";
let resp = make_signed_response(MINER_SEED, &validator_hotkey(), nonce, fp);
assert!(
verify_miner_response_signature(&resp, &validator_hotkey(), nonce, fp)
.await
.is_ok()
);
}
#[tokio::test]
async fn verify_rejects_empty_signature() {
let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
resp.signature = String::new();
let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
.await
.unwrap_err();
assert!(err.to_string().contains("empty signature"));
}
#[tokio::test]
async fn verify_rejects_invalid_base64() {
let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
resp.signature = "not-valid-base64!!!".to_string();
let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
.await
.unwrap_err();
assert!(err.to_string().contains("Failed to decode signature"));
}
#[tokio::test]
async fn verify_rejects_wrong_signature_length() {
let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
resp.signature = BASE64_STANDARD.encode([0u8; 32]);
let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
.await
.unwrap_err();
assert!(err.to_string().contains("Invalid signature length"));
}
#[tokio::test]
async fn verify_rejects_bad_ss58_address() {
let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
resp.miner_hotkey = "not_a_valid_ss58".to_string();
let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
.await
.unwrap_err();
assert!(err.to_string().contains("Invalid SS58 address"));
}
#[tokio::test]
async fn verify_rejects_wrong_signer() {
let nonce = "n";
let fp = "fp";
let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), nonce, fp);
let wrong_pair = sr25519::Pair::from_seed(&[99u8; 32]);
resp.miner_hotkey = wrong_pair.public().to_ss58check();
let err = verify_miner_response_signature(&resp, &validator_hotkey(), nonce, fp)
.await
.unwrap_err();
assert!(err.to_string().contains("signature verification failed"));
}
#[tokio::test]
async fn verify_rejects_tampered_nonce() {
let fp = "fp";
let resp = make_signed_response(MINER_SEED, &validator_hotkey(), "original-nonce", fp);
let err = verify_miner_response_signature(&resp, &validator_hotkey(), "tampered-nonce", fp)
.await
.unwrap_err();
assert!(err.to_string().contains("signature verification failed"));
}
#[test]
fn with_config_rejects_frame_payload_below_minimum() {
let cfg = LightningClientConfig {
max_frame_payload_bytes: 512,
..LightningClientConfig::default()
};
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
}
#[test]
fn with_config_rejects_frame_payload_above_u32_max() {
let too_big: u128 = u32::MAX as u128 + 1;
let val = match usize::try_from(too_big) {
Ok(v) => v,
Err(_) => return,
};
let cfg = LightningClientConfig {
max_frame_payload_bytes: val,
max_stream_payload_bytes: val,
..LightningClientConfig::default()
};
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
}
#[test]
fn with_config_rejects_stream_below_frame() {
let base = LightningClientConfig::default();
let cfg = LightningClientConfig {
max_stream_payload_bytes: base.max_frame_payload_bytes - 1,
..base
};
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
}
#[test]
fn with_config_rejects_zero_stream_chunk_timeout() {
let cfg = LightningClientConfig {
stream_chunk_timeout: Some(Duration::ZERO),
..LightningClientConfig::default()
};
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
}
#[test]
fn with_config_default_succeeds() {
assert!(
LightningClient::with_config("hk".into(), LightningClientConfig::default()).is_ok()
);
}
}
#[derive(Debug)]
struct AcceptAnyCertVerifier;
impl ServerCertVerifier for AcceptAnyCertVerifier {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp_response: &[u8],
_now: UnixTime,
) -> std::result::Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
Err(rustls::Error::PeerIncompatible(
rustls::PeerIncompatible::Tls12NotOffered,
))
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
rustls::crypto::ring::default_provider()
.signature_verification_algorithms
.supported_schemes()
}
}