Skip to main content

btlightning/
client.rs

1use crate::error::{LightningError, Result};
2use crate::registry::MinerRegistry;
3use crate::signing::Signer;
4use crate::types::{
5    handshake_request_message, handshake_response_message, read_frame, write_frame_and_finish,
6    HandshakeRequest, HandshakeResponse, MessageType, PeerAddr, QuicAxonInfo, QuicRequest,
7    QuicResponse, StreamChunk, StreamEnd, SynapsePacket, SynapseResponse,
8    DEFAULT_MAX_FRAME_PAYLOAD,
9};
10use crate::util::unix_timestamp_secs;
11use base64::{prelude::BASE64_STANDARD, Engine};
12use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, TransportConfig};
13use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
14use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
15use rustls::ClientConfig as RustlsClientConfig;
16use sp_core::blake2_256;
17use std::collections::HashMap;
18use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::sync::RwLock;
22use tokio::time::Instant;
23use tracing::{debug, error, info, instrument, warn};
24
25#[cfg(feature = "subtensor")]
26use crate::metagraph::{Metagraph, MetagraphMonitorConfig};
27#[cfg(feature = "subtensor")]
28use subxt::{OnlineClient, PolkadotConfig};
29
30/// Configuration for [`LightningClient`].
31///
32/// All timeouts use [`Duration`] and have sensible defaults via [`Default`].
33#[derive(Clone)]
34pub struct LightningClientConfig {
35    /// Timeout for QUIC connection establishment and handshake. Default: 10s.
36    pub connect_timeout: Duration,
37    /// QUIC idle timeout before the connection is closed. Default: 150s.
38    pub idle_timeout: Duration,
39    /// Interval for QUIC keep-alive pings. Default: 30s.
40    pub keep_alive_interval: Duration,
41    /// Initial backoff delay after a failed reconnection attempt. Default: 1s.
42    pub reconnect_initial_backoff: Duration,
43    /// Maximum backoff delay between reconnection attempts. Default: 60s.
44    pub reconnect_max_backoff: Duration,
45    /// Maximum consecutive reconnection attempts before giving up. Default: 5.
46    pub reconnect_max_retries: u32,
47    /// After fast retries are exhausted, interval between periodic slow probe attempts.
48    /// Keeps scoring data flowing without wasting resources. `None` disables slow
49    /// probing (hard exhaustion, original behavior). Default: 60s.
50    pub reconnect_slow_probe_interval: Option<Duration>,
51    /// Maximum number of concurrent QUIC connections. Default: 1024.
52    pub max_connections: usize,
53    /// Maximum single-frame payload size in bytes. Default: 64 MiB.
54    pub max_frame_payload_bytes: usize,
55    /// Aggregate byte limit for `collect_all()` across all chunks in a streaming response.
56    /// Defaults to `DEFAULT_MAX_FRAME_PAYLOAD` (64 MiB). Increase for multi-chunk streams
57    /// that exceed a single frame. Must be >= `max_frame_payload_bytes`.
58    pub max_stream_payload_bytes: usize,
59    /// Per-chunk read timeout for streaming responses. When set, each `next_chunk()` call
60    /// aborts if no data arrives within this duration. Default: `None` (no timeout).
61    pub stream_chunk_timeout: Option<Duration>,
62    /// Metagraph monitor configuration. When set, `initialize_connections` starts
63    /// a background task that periodically re-syncs the subnet and updates connections.
64    #[cfg(feature = "subtensor")]
65    pub metagraph: Option<MetagraphMonitorConfig>,
66}
67
68impl Default for LightningClientConfig {
69    fn default() -> Self {
70        Self {
71            connect_timeout: Duration::from_secs(10),
72            idle_timeout: Duration::from_secs(150),
73            keep_alive_interval: Duration::from_secs(30),
74            reconnect_initial_backoff: Duration::from_secs(1),
75            reconnect_max_backoff: Duration::from_secs(60),
76            reconnect_max_retries: 5,
77            reconnect_slow_probe_interval: Some(Duration::from_secs(60)),
78            max_connections: 1024,
79            max_frame_payload_bytes: DEFAULT_MAX_FRAME_PAYLOAD,
80            max_stream_payload_bytes: DEFAULT_MAX_FRAME_PAYLOAD,
81            stream_chunk_timeout: None,
82            #[cfg(feature = "subtensor")]
83            metagraph: None,
84        }
85    }
86}
87
88impl LightningClientConfig {
89    pub fn builder() -> LightningClientConfigBuilder {
90        LightningClientConfigBuilder {
91            config: Self::default(),
92        }
93    }
94
95    fn validate(&self) -> Result<()> {
96        if self.connect_timeout.is_zero() {
97            return Err(LightningError::Config(
98                "connect_timeout must be non-zero".into(),
99            ));
100        }
101        if self.idle_timeout.is_zero() {
102            return Err(LightningError::Config(
103                "idle_timeout must be non-zero".into(),
104            ));
105        }
106        if self.keep_alive_interval.is_zero() {
107            return Err(LightningError::Config(
108                "keep_alive_interval must be non-zero".into(),
109            ));
110        }
111        if self.keep_alive_interval >= self.idle_timeout {
112            return Err(LightningError::Config(format!(
113                "keep_alive_interval ({:?}) must be less than idle_timeout ({:?})",
114                self.keep_alive_interval, self.idle_timeout
115            )));
116        }
117        if self.reconnect_initial_backoff.is_zero() {
118            return Err(LightningError::Config(
119                "reconnect_initial_backoff must be non-zero".into(),
120            ));
121        }
122        if self.reconnect_max_backoff.is_zero() {
123            return Err(LightningError::Config(
124                "reconnect_max_backoff must be non-zero".into(),
125            ));
126        }
127        if self.reconnect_initial_backoff > self.reconnect_max_backoff {
128            return Err(LightningError::Config(format!(
129                "reconnect_initial_backoff ({:?}) must be <= reconnect_max_backoff ({:?})",
130                self.reconnect_initial_backoff, self.reconnect_max_backoff
131            )));
132        }
133        if self.reconnect_max_retries == 0 {
134            return Err(LightningError::Config(
135                "reconnect_max_retries must be at least 1".into(),
136            ));
137        }
138        if self
139            .reconnect_slow_probe_interval
140            .is_some_and(|d| d.is_zero())
141        {
142            return Err(LightningError::Config(
143                "reconnect_slow_probe_interval must be non-zero when set".into(),
144            ));
145        }
146        if self.max_connections == 0 {
147            return Err(LightningError::Config(
148                "max_connections must be at least 1".into(),
149            ));
150        }
151        if self.max_frame_payload_bytes < 1_048_576 {
152            return Err(LightningError::Config(format!(
153                "max_frame_payload_bytes ({}) must be at least 1048576 (1 MB)",
154                self.max_frame_payload_bytes
155            )));
156        }
157        if self.max_frame_payload_bytes > u32::MAX as usize {
158            return Err(LightningError::Config(format!(
159                "max_frame_payload_bytes ({}) must not exceed {} (u32::MAX)",
160                self.max_frame_payload_bytes,
161                u32::MAX
162            )));
163        }
164        if self.stream_chunk_timeout.is_some_and(|d| d.is_zero()) {
165            return Err(LightningError::Config(
166                "stream_chunk_timeout must be non-zero".into(),
167            ));
168        }
169        if self.max_stream_payload_bytes < self.max_frame_payload_bytes {
170            return Err(LightningError::Config(format!(
171                "max_stream_payload_bytes ({}) must be >= max_frame_payload_bytes ({})",
172                self.max_stream_payload_bytes, self.max_frame_payload_bytes
173            )));
174        }
175        Ok(())
176    }
177}
178
179pub struct LightningClientConfigBuilder {
180    config: LightningClientConfig,
181}
182
183impl LightningClientConfigBuilder {
184    pub fn connect_timeout(mut self, val: Duration) -> Self {
185        self.config.connect_timeout = val;
186        self
187    }
188    pub fn idle_timeout(mut self, val: Duration) -> Self {
189        self.config.idle_timeout = val;
190        self
191    }
192    pub fn keep_alive_interval(mut self, val: Duration) -> Self {
193        self.config.keep_alive_interval = val;
194        self
195    }
196    pub fn reconnect_initial_backoff(mut self, val: Duration) -> Self {
197        self.config.reconnect_initial_backoff = val;
198        self
199    }
200    pub fn reconnect_max_backoff(mut self, val: Duration) -> Self {
201        self.config.reconnect_max_backoff = val;
202        self
203    }
204    pub fn reconnect_max_retries(mut self, val: u32) -> Self {
205        self.config.reconnect_max_retries = val;
206        self
207    }
208    pub fn reconnect_slow_probe_interval(mut self, val: Option<Duration>) -> Self {
209        self.config.reconnect_slow_probe_interval = val;
210        self
211    }
212    pub fn max_connections(mut self, val: usize) -> Self {
213        self.config.max_connections = val;
214        self
215    }
216    pub fn max_frame_payload_bytes(mut self, val: usize) -> Self {
217        self.config.max_frame_payload_bytes = val;
218        self
219    }
220    pub fn max_stream_payload_bytes(mut self, val: usize) -> Self {
221        self.config.max_stream_payload_bytes = val;
222        self
223    }
224    pub fn stream_chunk_timeout(mut self, val: Duration) -> Self {
225        self.config.stream_chunk_timeout = Some(val);
226        self
227    }
228    #[cfg(feature = "subtensor")]
229    pub fn metagraph(mut self, val: MetagraphMonitorConfig) -> Self {
230        self.config.metagraph = Some(val);
231        self
232    }
233    pub fn build(self) -> Result<LightningClientConfig> {
234        self.config.validate()?;
235        Ok(self.config)
236    }
237}
238
239struct ClientState {
240    registry: MinerRegistry,
241    #[cfg(feature = "subtensor")]
242    metagraph_shutdown: Option<tokio::sync::watch::Sender<bool>>,
243    #[cfg(feature = "subtensor")]
244    metagraph_handle: Option<tokio::task::JoinHandle<()>>,
245}
246
247/// Handle for reading a chunked streaming response from a miner.
248///
249/// Returned by [`LightningClient::query_axon_stream`]. Call [`next_chunk`](Self::next_chunk)
250/// in a loop or use [`collect_all`](Self::collect_all) to buffer the full response.
251pub struct StreamingResponse {
252    recv: quinn::RecvStream,
253    max_payload: usize,
254    max_stream_payload: usize,
255    chunk_timeout: Option<Duration>,
256}
257
258impl StreamingResponse {
259    /// Reads the next chunk from the stream. Returns `None` on successful completion.
260    pub async fn next_chunk(&mut self) -> Result<Option<Vec<u8>>> {
261        let frame_result = match self.chunk_timeout {
262            Some(timeout) => {
263                match tokio::time::timeout(timeout, read_frame(&mut self.recv, self.max_payload))
264                    .await
265                {
266                    Ok(r) => r,
267                    Err(_) => {
268                        self.recv.stop(0u32.into()).ok();
269                        return Err(LightningError::Stream("chunk read timed out".to_string()));
270                    }
271                }
272            }
273            None => read_frame(&mut self.recv, self.max_payload).await,
274        };
275        match frame_result {
276            Ok((MessageType::StreamChunk, payload)) => {
277                let chunk: StreamChunk = rmp_serde::from_slice(&payload).map_err(|e| {
278                    LightningError::Serialization(format!("Failed to parse stream chunk: {}", e))
279                })?;
280                Ok(Some(chunk.data))
281            }
282            Ok((MessageType::StreamEnd, payload)) => {
283                let end: StreamEnd = rmp_serde::from_slice(&payload).map_err(|e| {
284                    LightningError::Serialization(format!("Failed to parse stream end: {}", e))
285                })?;
286                if end.success {
287                    Ok(None)
288                } else {
289                    Err(LightningError::Stream(end.error.unwrap_or_else(|| {
290                        "stream ended with failure status".to_string()
291                    })))
292                }
293            }
294            Ok((MessageType::SynapseResponse, payload)) => {
295                let detail = rmp_serde::from_slice::<SynapseResponse>(&payload)
296                    .ok()
297                    .and_then(|r| r.error)
298                    .unwrap_or_else(|| "no detail".to_string());
299                Err(LightningError::Stream(format!(
300                    "server returned SynapseResponse error on streaming path: {}",
301                    detail
302                )))
303            }
304            Ok((msg_type, _)) => Err(LightningError::Stream(format!(
305                "unexpected message type during streaming: {:?}",
306                msg_type
307            ))),
308            Err(e) => Err(e),
309        }
310    }
311
312    /// Reads all remaining chunks into a `Vec`. Enforces `max_stream_payload_bytes`.
313    pub async fn collect_all(&mut self) -> Result<Vec<Vec<u8>>> {
314        let mut chunks = Vec::new();
315        let mut total_size: usize = 0;
316        while let Some(chunk) = self.next_chunk().await? {
317            total_size = total_size.checked_add(chunk.len()).ok_or_else(|| {
318                LightningError::Stream("streaming response size overflow".to_string())
319            })?;
320            if total_size > self.max_stream_payload {
321                return Err(LightningError::Stream(format!(
322                    "streaming response exceeded {} byte aggregate limit",
323                    self.max_stream_payload
324                )));
325            }
326            chunks.push(chunk);
327        }
328        Ok(chunks)
329    }
330}
331
332/// QUIC client for sending synapse requests to Bittensor miners.
333///
334/// Manages persistent, authenticated QUIC connections to one or more miners.
335/// Each connection is established with a mutual sr25519 handshake; subsequent
336/// synapse requests reuse the connection without re-authenticating.
337pub struct LightningClient {
338    config: LightningClientConfig,
339    wallet_hotkey: String,
340    signer: Option<Arc<dyn Signer>>,
341    state: Arc<RwLock<ClientState>>,
342    endpoint: Option<Endpoint>,
343}
344
345impl LightningClient {
346    /// Creates a client with default configuration. Panics only if defaults are invalid.
347    pub fn new(wallet_hotkey: String) -> Self {
348        Self::with_config(wallet_hotkey, LightningClientConfig::default())
349            .expect("default config is always valid")
350    }
351
352    /// Creates a client with the given configuration, validating constraints.
353    pub fn with_config(wallet_hotkey: String, config: LightningClientConfig) -> Result<Self> {
354        config.validate()?;
355        Ok(Self {
356            config,
357            wallet_hotkey,
358            signer: None,
359            state: Arc::new(RwLock::new(ClientState {
360                registry: MinerRegistry::new(),
361                #[cfg(feature = "subtensor")]
362                metagraph_shutdown: None,
363                #[cfg(feature = "subtensor")]
364                metagraph_handle: None,
365            })),
366            endpoint: None,
367        })
368    }
369
370    /// Sets the [`Signer`] used for handshake authentication. Must be called before
371    /// `initialize_connections`.
372    pub fn set_signer(&mut self, signer: Box<dyn Signer>) {
373        self.signer = Some(Arc::from(signer));
374        info!("Signer configured");
375    }
376
377    /// Loads the signer from a Bittensor wallet on disk. Requires the `btwallet` feature.
378    #[cfg(feature = "btwallet")]
379    pub fn set_wallet(
380        &mut self,
381        wallet_name: &str,
382        wallet_path: &str,
383        hotkey_name: &str,
384    ) -> Result<()> {
385        let signer =
386            crate::signing::BtWalletSigner::from_wallet(wallet_name, wallet_path, hotkey_name)?;
387        self.set_signer(Box::new(signer));
388        Ok(())
389    }
390
391    /// Opens QUIC connections and performs sr25519 handshakes with the given miners.
392    ///
393    /// Miners sharing an `ip:port` are multiplexed over a single QUIC connection.
394    /// If `metagraph` is configured, a background monitor is also started.
395    #[instrument(skip(self, miners), fields(miner_count = miners.len()))]
396    pub async fn initialize_connections(&mut self, miners: Vec<QuicAxonInfo>) -> Result<()> {
397        self.create_endpoint().await?;
398
399        let endpoint = self
400            .endpoint
401            .as_ref()
402            .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
403            .clone();
404        let wallet_hotkey = self.wallet_hotkey.clone();
405        let signer = self
406            .signer
407            .as_ref()
408            .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
409            .clone();
410        let timeout = self.config.connect_timeout;
411
412        let mut addr_groups: HashMap<PeerAddr, Vec<QuicAxonInfo>> = HashMap::new();
413        for miner in miners {
414            addr_groups.entry(miner.addr_key()).or_default().push(miner);
415        }
416
417        let (active_count, remaining_capacity) = {
418            let state = self.state.read().await;
419            let active = state.registry.connection_count();
420            (active, self.config.max_connections.saturating_sub(active))
421        };
422
423        let addr_groups: Vec<(PeerAddr, Vec<QuicAxonInfo>)> =
424            if addr_groups.len() > remaining_capacity {
425                warn!(
426                    "Connection limit ({}) reached with {} active, skipping {} of {} new addresses",
427                    self.config.max_connections,
428                    active_count,
429                    addr_groups.len() - remaining_capacity,
430                    addr_groups.len()
431                );
432                addr_groups.into_iter().take(remaining_capacity).collect()
433            } else {
434                addr_groups.into_iter().collect()
435            };
436
437        let max_fp = self.config.max_frame_payload_bytes;
438        let mut set = tokio::task::JoinSet::new();
439        for (addr_key, miners_at_addr) in addr_groups {
440            let ep = endpoint.clone();
441            let wh = wallet_hotkey.clone();
442            let s = signer.clone();
443            set.spawn(connect_and_authenticate_per_address(
444                ep,
445                wh,
446                s,
447                addr_key,
448                miners_at_addr,
449                timeout,
450                max_fp,
451            ));
452        }
453
454        let mut results = Vec::new();
455        while let Some(join_result) = set.join_next().await {
456            match join_result {
457                Ok((addr_key, conn_result, authenticated)) => {
458                    results.push((addr_key, conn_result, authenticated));
459                }
460                Err(e) => {
461                    error!("Connection task panicked: {}", e);
462                }
463            }
464        }
465
466        let mut state = self.state.write().await;
467        for (addr_key, conn_result, authenticated) in results {
468            match conn_result {
469                Ok(connection) => {
470                    if authenticated.is_empty() {
471                        warn!(
472                            "No hotkeys authenticated at {}, dropping connection",
473                            addr_key
474                        );
475                        connection.close(0u32.into(), b"no_authenticated_hotkeys");
476                    } else {
477                        for miner in authenticated {
478                            info!("Authenticated miner {} at {}", miner.hotkey, addr_key);
479                            state.registry.register(miner);
480                        }
481                        state.registry.set_connection(addr_key, connection);
482                    }
483                }
484                Err(e) => {
485                    error!("Failed to connect to {}: {}", addr_key, e);
486                }
487            }
488        }
489
490        #[cfg(feature = "subtensor")]
491        if let Some(metagraph_config) = self.config.metagraph.clone() {
492            self.start_metagraph_monitor(metagraph_config).await?;
493        }
494
495        Ok(())
496    }
497
498    /// Creates the QUIC client endpoint bound to `0.0.0.0:0`. Called automatically by
499    /// `initialize_connections`; only call directly if you need the endpoint before connecting.
500    #[instrument(skip(self))]
501    pub async fn create_endpoint(&mut self) -> Result<()> {
502        let mut tls_config = RustlsClientConfig::builder_with_provider(
503            rustls::crypto::ring::default_provider().into(),
504        )
505        .with_safe_default_protocol_versions()
506        .map_err(|e| LightningError::Config(format!("Failed to set TLS versions: {}", e)))?
507        .dangerous()
508        .with_custom_certificate_verifier(Arc::new(AcceptAnyCertVerifier))
509        .with_no_client_auth();
510
511        tls_config.alpn_protocols = vec![b"btlightning".to_vec()];
512
513        let mut transport_config = TransportConfig::default();
514
515        let idle_timeout = IdleTimeout::try_from(self.config.idle_timeout)
516            .map_err(|e| LightningError::Config(format!("Failed to set idle timeout: {}", e)))?;
517        transport_config.max_idle_timeout(Some(idle_timeout));
518        transport_config.keep_alive_interval(Some(self.config.keep_alive_interval));
519
520        let quic_crypto =
521            quinn::crypto::rustls::QuicClientConfig::try_from(tls_config).map_err(|e| {
522                LightningError::Config(format!("Failed to create QUIC crypto config: {}", e))
523            })?;
524        let mut client_config = ClientConfig::new(Arc::new(quic_crypto));
525        client_config.transport_config(Arc::new(transport_config));
526
527        let bind_addr: SocketAddr = "0.0.0.0:0"
528            .parse()
529            .map_err(|e| LightningError::Config(format!("Failed to parse bind address: {}", e)))?;
530        let mut endpoint = Endpoint::client(bind_addr).map_err(|e| {
531            LightningError::Connection(format!("Failed to create QUIC endpoint: {}", e))
532        })?;
533        endpoint.set_default_client_config(client_config);
534        self.endpoint = Some(endpoint);
535
536        info!("QUIC client endpoint created");
537        Ok(())
538    }
539
540    /// Sends a synapse request to a miner and waits for the full response.
541    ///
542    /// Transparently reconnects if the underlying QUIC connection has died.
543    #[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port))]
544    pub async fn query_axon(
545        &self,
546        axon_info: QuicAxonInfo,
547        request: QuicRequest,
548    ) -> Result<QuicResponse> {
549        let addr_key = axon_info.addr_key();
550
551        let connection = {
552            let state = self.state.read().await;
553            state.registry.get_connection(&addr_key)
554        };
555
556        let max_fp = self.config.max_frame_payload_bytes;
557        match connection {
558            Some(conn) if conn.close_reason().is_none() => {
559                debug!(
560                    addr = %addr_key,
561                    stable_id = conn.stable_id(),
562                    "query_axon: connection alive, sending synapse"
563                );
564                send_synapse_packet(&conn, request, max_fp).await
565            }
566            Some(conn) => {
567                let reason = conn.close_reason();
568                warn!(
569                    addr = %addr_key,
570                    stable_id = conn.stable_id(),
571                    close_reason = ?reason,
572                    "QUIC connection closed, triggering reconnect"
573                );
574                self.try_reconnect_and_query(&addr_key, &axon_info, request)
575                    .await
576            }
577            None => {
578                debug!(addr = %addr_key, "query_axon: no connection in registry");
579                self.try_reconnect_and_query(&addr_key, &axon_info, request)
580                    .await
581            }
582        }
583    }
584
585    /// Like [`query_axon`](Self::query_axon) but aborts after `timeout`.
586    #[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port, timeout_ms = timeout.as_millis() as u64))]
587    pub async fn query_axon_with_timeout(
588        &self,
589        axon_info: QuicAxonInfo,
590        request: QuicRequest,
591        timeout: Duration,
592    ) -> Result<QuicResponse> {
593        tokio::time::timeout(timeout, self.query_axon(axon_info, request))
594            .await
595            .map_err(|_| LightningError::Transport("query timed out".into()))?
596    }
597
598    /// Sends a synapse request and returns a [`StreamingResponse`] for incremental chunk reading.
599    #[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port))]
600    pub async fn query_axon_stream(
601        &self,
602        axon_info: QuicAxonInfo,
603        request: QuicRequest,
604    ) -> Result<StreamingResponse> {
605        let addr_key = axon_info.addr_key();
606
607        let connection = {
608            let state = self.state.read().await;
609            state.registry.get_connection(&addr_key)
610        };
611
612        let max_fp = self.config.max_frame_payload_bytes;
613        let max_sp = self.config.max_stream_payload_bytes;
614        match connection {
615            Some(conn) if conn.close_reason().is_none() => {
616                open_streaming_synapse(
617                    &conn,
618                    request,
619                    max_fp,
620                    max_sp,
621                    self.config.stream_chunk_timeout,
622                )
623                .await
624            }
625            Some(conn) => {
626                let reason = conn.close_reason();
627                warn!(
628                    addr = %addr_key,
629                    close_reason = ?reason,
630                    "QUIC connection closed, triggering reconnect (stream)"
631                );
632                self.try_reconnect_and_stream(&addr_key, &axon_info, request)
633                    .await
634            }
635            None => {
636                self.try_reconnect_and_stream(&addr_key, &axon_info, request)
637                    .await
638            }
639        }
640    }
641
642    async fn try_reconnect_and_query(
643        &self,
644        addr_key: &PeerAddr,
645        axon_info: &QuicAxonInfo,
646        request: QuicRequest,
647    ) -> Result<QuicResponse> {
648        let connection = self.try_reconnect(addr_key, axon_info).await?;
649        send_synapse_packet(&connection, request, self.config.max_frame_payload_bytes).await
650    }
651
652    async fn try_reconnect_and_stream(
653        &self,
654        addr_key: &PeerAddr,
655        axon_info: &QuicAxonInfo,
656        request: QuicRequest,
657    ) -> Result<StreamingResponse> {
658        let connection = self.try_reconnect(addr_key, axon_info).await?;
659        open_streaming_synapse(
660            &connection,
661            request,
662            self.config.max_frame_payload_bytes,
663            self.config.max_stream_payload_bytes,
664            self.config.stream_chunk_timeout,
665        )
666        .await
667    }
668
669    async fn try_reconnect(
670        &self,
671        addr_key: &PeerAddr,
672        axon_info: &QuicAxonInfo,
673    ) -> Result<Connection> {
674        let endpoint = self
675            .endpoint
676            .as_ref()
677            .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
678            .clone();
679        let signer = self
680            .signer
681            .as_ref()
682            .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
683            .clone();
684
685        {
686            let mut state = self.state.write().await;
687            if let Err(rejection) = state.registry.try_start_reconnect(
688                addr_key.clone(),
689                self.config.reconnect_max_retries,
690                self.config.reconnect_slow_probe_interval,
691            ) {
692                use crate::registry::ReconnectRejection;
693                return match rejection {
694                    ReconnectRejection::Backoff { next } => {
695                        Err(LightningError::Connection(format!(
696                            "Reconnection to {} in backoff, next retry in {:?}",
697                            addr_key,
698                            next.saturating_duration_since(Instant::now())
699                        )))
700                    }
701                    ReconnectRejection::Exhausted { attempts } => {
702                        Err(LightningError::Connection(format!(
703                            "Reconnection attempts exhausted for {} ({}/{}), awaiting registry refresh",
704                            addr_key, attempts, self.config.reconnect_max_retries
705                        )))
706                    }
707                    ReconnectRejection::InProgress => {
708                        Err(LightningError::Connection(format!(
709                            "Reconnection to {} already in progress",
710                            addr_key
711                        )))
712                    }
713                };
714            }
715        }
716
717        warn!("Connection to {} dead, attempting reconnection", addr_key);
718
719        let reconnect_result = tokio::time::timeout(
720            self.config.connect_timeout,
721            connect_and_handshake(
722                endpoint,
723                axon_info.clone(),
724                self.wallet_hotkey.clone(),
725                signer.clone(),
726                self.config.max_frame_payload_bytes,
727            ),
728        )
729        .await;
730
731        let reconnect_result = match reconnect_result {
732            Ok(r) => r,
733            Err(_) => Err(LightningError::Connection(format!(
734                "Reconnection to {} timed out",
735                addr_key
736            ))),
737        };
738
739        match reconnect_result {
740            Ok(connection) => {
741                let co_located: Vec<String> = {
742                    let state = self.state.read().await;
743                    state
744                        .registry
745                        .hotkeys_at_addr(addr_key)
746                        .into_iter()
747                        .filter(|hk| *hk != axon_info.hotkey)
748                        .collect()
749                };
750                let mut failed_hotkeys = Vec::new();
751                for hk in &co_located {
752                    match tokio::time::timeout(
753                        self.config.connect_timeout,
754                        authenticate_handshake(
755                            &connection,
756                            hk,
757                            &self.wallet_hotkey,
758                            &signer,
759                            self.config.max_frame_payload_bytes,
760                        ),
761                    )
762                    .await
763                    {
764                        Ok(Ok(())) => {
765                            info!(
766                                "Re-authenticated co-located miner {} on reconnected {}",
767                                hk, addr_key
768                            );
769                        }
770                        Ok(Err(e)) => {
771                            warn!(
772                                "Re-authentication failed for co-located miner {} at {}: {}",
773                                hk, addr_key, e
774                            );
775                            failed_hotkeys.push(hk.clone());
776                        }
777                        Err(_) => {
778                            warn!(
779                                "Re-authentication timed out for co-located miner {} at {}",
780                                hk, addr_key
781                            );
782                            failed_hotkeys.push(hk.clone());
783                        }
784                    }
785                }
786
787                let mut state = self.state.write().await;
788                for hk in &failed_hotkeys {
789                    state.registry.deregister(hk);
790                }
791                state.registry.register(axon_info.clone());
792                state
793                    .registry
794                    .set_connection(addr_key.clone(), connection.clone());
795                state.registry.remove_reconnect_state(addr_key);
796                info!("Reconnected to {}", addr_key);
797                Ok(connection)
798            }
799            Err(e) => {
800                let mut state = self.state.write().await;
801                let rs = state.registry.reconnect_state_or_insert(addr_key.clone());
802                rs.in_progress = false;
803                let shift = rs.attempts.min(20);
804                rs.attempts += 1;
805                let in_slow_probe = rs.attempts >= self.config.reconnect_max_retries;
806                if in_slow_probe {
807                    if let Some(probe_interval) = self.config.reconnect_slow_probe_interval {
808                        rs.next_retry_at = Instant::now() + probe_interval;
809                        warn!(
810                            "Slow probe to {} failed, next probe in {:?}: {}",
811                            addr_key, probe_interval, e
812                        );
813                    }
814                } else {
815                    let backoff = self
816                        .config
817                        .reconnect_initial_backoff
818                        .checked_mul(2u32.pow(shift))
819                        .map(|d| d.min(self.config.reconnect_max_backoff))
820                        .unwrap_or(self.config.reconnect_max_backoff);
821                    rs.next_retry_at = Instant::now() + backoff;
822                    error!(
823                        "Reconnection to {} failed (attempt {}/{}), next retry in {:?}: {}",
824                        addr_key, rs.attempts, self.config.reconnect_max_retries, backoff, e
825                    );
826                }
827                Err(e)
828            }
829        }
830    }
831
832    /// Reconciles the active miner set: adds new miners, removes stale ones,
833    /// and opens/closes QUIC connections as needed.
834    #[instrument(skip(self, miners), fields(miner_count = miners.len()))]
835    pub async fn update_miner_registry(&self, miners: Vec<QuicAxonInfo>) -> Result<()> {
836        let endpoint = self
837            .endpoint
838            .as_ref()
839            .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
840            .clone();
841        let signer = self
842            .signer
843            .as_ref()
844            .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
845            .clone();
846        update_miner_registry_inner(
847            &self.state,
848            &endpoint,
849            &self.wallet_hotkey,
850            &signer,
851            &self.config,
852            miners,
853        )
854        .await
855    }
856
857    /// Returns a map of connection statistics (total connections, active miners, per-address status).
858    #[instrument(skip(self))]
859    pub async fn get_connection_stats(&self) -> Result<HashMap<String, String>> {
860        let state = self.state.read().await;
861
862        let mut stats = HashMap::new();
863        stats.insert(
864            "total_connections".to_string(),
865            state.registry.connection_count().to_string(),
866        );
867        stats.insert(
868            "active_miners".to_string(),
869            state.registry.active_miner_count().to_string(),
870        );
871
872        for addr_key in state.registry.connection_addrs() {
873            let status = match state.registry.get_connection(addr_key) {
874                Some(conn) => {
875                    if let Some(reason) = conn.close_reason() {
876                        format!("closed({:?})", reason)
877                    } else {
878                        "active".to_string()
879                    }
880                }
881                None => "missing".to_string(),
882            };
883            stats.insert(format!("connection_{}", addr_key), status);
884        }
885
886        Ok(stats)
887    }
888
889    /// Starts a background task that periodically syncs the metagraph and updates connections.
890    /// Requires the `subtensor` feature.
891    #[cfg(feature = "subtensor")]
892    pub async fn start_metagraph_monitor(
893        &self,
894        monitor_config: MetagraphMonitorConfig,
895    ) -> Result<()> {
896        if monitor_config.sync_interval.is_zero() {
897            return Err(LightningError::Config(
898                "sync_interval must be non-zero".into(),
899            ));
900        }
901
902        self.stop_metagraph_monitor().await;
903
904        let endpoint = self
905            .endpoint
906            .as_ref()
907            .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
908            .clone();
909        let signer = self
910            .signer
911            .as_ref()
912            .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
913            .clone();
914
915        let subtensor = tokio::time::timeout(
916            Duration::from_secs(30),
917            OnlineClient::<PolkadotConfig>::from_url(&monitor_config.subtensor_endpoint),
918        )
919        .await
920        .map_err(|_| LightningError::Handler("subtensor connection timed out after 30s".into()))?
921        .map_err(|e| LightningError::Handler(format!("connecting to subtensor: {}", e)))?;
922
923        let mut metagraph = Metagraph::new(monitor_config.netuid);
924        tokio::time::timeout(Duration::from_secs(60), metagraph.sync(&subtensor))
925            .await
926            .map_err(|_| {
927                LightningError::Handler("initial metagraph sync timed out after 60s".into())
928            })??;
929
930        let miners = metagraph.quic_miners();
931        info!(
932            netuid = monitor_config.netuid,
933            miners = miners.len(),
934            "initial metagraph sync complete"
935        );
936
937        update_miner_registry_inner(
938            &self.state,
939            &endpoint,
940            &self.wallet_hotkey,
941            &signer,
942            &self.config,
943            miners,
944        )
945        .await?;
946
947        let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
948        let state = self.state.clone();
949        let wallet_hotkey = self.wallet_hotkey.clone();
950        let config = self.config.clone();
951        let sync_interval = monitor_config.sync_interval;
952        let subtensor_url = monitor_config.subtensor_endpoint.clone();
953
954        let handle = tokio::spawn(async move {
955            let mut interval = tokio::time::interval(sync_interval);
956            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
957            interval.tick().await;
958            let mut subtensor = subtensor;
959
960            loop {
961                tokio::select! {
962                    _ = interval.tick() => {}
963                    _ = shutdown_rx.changed() => {
964                        info!("metagraph monitor shutting down");
965                        return;
966                    }
967                }
968
969                let sync_result =
970                    tokio::time::timeout(Duration::from_secs(60), metagraph.sync(&subtensor)).await;
971
972                let needs_reconnect = match sync_result {
973                    Ok(Ok(())) => {
974                        let miners = metagraph.quic_miners();
975                        info!(
976                            netuid = metagraph.netuid,
977                            miners = miners.len(),
978                            block = metagraph.block,
979                            "metagraph resync complete"
980                        );
981                        if let Err(e) = update_miner_registry_inner(
982                            &state,
983                            &endpoint,
984                            &wallet_hotkey,
985                            &signer,
986                            &config,
987                            miners,
988                        )
989                        .await
990                        {
991                            error!("registry update after metagraph sync failed: {}", e);
992                        }
993                        false
994                    }
995                    Ok(Err(e)) => {
996                        error!("metagraph sync failed, reconnecting to subtensor: {}", e);
997                        true
998                    }
999                    Err(_) => {
1000                        error!("metagraph sync timed out after 60s, reconnecting to subtensor");
1001                        true
1002                    }
1003                };
1004
1005                if needs_reconnect {
1006                    match tokio::time::timeout(
1007                        Duration::from_secs(30),
1008                        OnlineClient::<PolkadotConfig>::from_url(&subtensor_url),
1009                    )
1010                    .await
1011                    {
1012                        Ok(Ok(new_client)) => {
1013                            subtensor = new_client;
1014                            info!("subtensor client reconnected");
1015                        }
1016                        Ok(Err(e)) => {
1017                            error!("subtensor reconnection failed: {}", e);
1018                        }
1019                        Err(_) => {
1020                            error!("subtensor reconnection timed out after 30s");
1021                        }
1022                    }
1023                }
1024            }
1025        });
1026
1027        let mut st = self.state.write().await;
1028        st.metagraph_shutdown = Some(shutdown_tx);
1029        st.metagraph_handle = Some(handle);
1030        Ok(())
1031    }
1032
1033    /// Stops the metagraph monitor background task if running.
1034    #[cfg(feature = "subtensor")]
1035    pub async fn stop_metagraph_monitor(&self) {
1036        let (shutdown_tx, handle) = {
1037            let mut st = self.state.write().await;
1038            (st.metagraph_shutdown.take(), st.metagraph_handle.take())
1039        };
1040        if let Some(tx) = shutdown_tx {
1041            let _ = tx.send(true);
1042        }
1043        if let Some(mut handle) = handle {
1044            if tokio::time::timeout(Duration::from_secs(5), &mut handle)
1045                .await
1046                .is_err()
1047            {
1048                warn!("metagraph monitor did not shut down within 5s, aborting");
1049                handle.abort();
1050                let _ = handle.await;
1051            }
1052        }
1053    }
1054
1055    /// Gracefully closes all QUIC connections and clears the miner registry.
1056    #[instrument(skip(self))]
1057    pub async fn close_all_connections(&self) -> Result<()> {
1058        #[cfg(feature = "subtensor")]
1059        self.stop_metagraph_monitor().await;
1060
1061        let mut state = self.state.write().await;
1062
1063        for (_, connection) in state.registry.drain_connections() {
1064            connection.close(0u32.into(), b"client_shutdown");
1065        }
1066
1067        state.registry.clear();
1068
1069        info!("All Lightning QUIC connections closed");
1070        Ok(())
1071    }
1072}
1073
1074async fn update_miner_registry_inner(
1075    state: &Arc<RwLock<ClientState>>,
1076    endpoint: &Endpoint,
1077    wallet_hotkey: &str,
1078    signer: &Arc<dyn Signer>,
1079    config: &LightningClientConfig,
1080    miners: Vec<QuicAxonInfo>,
1081) -> Result<()> {
1082    let new_by_hotkey: HashMap<String, QuicAxonInfo> = miners
1083        .iter()
1084        .map(|m| (m.hotkey.clone(), m.clone()))
1085        .collect();
1086
1087    let new_hotkeys_needing_auth: Vec<QuicAxonInfo>;
1088    let new_addrs_needing_connect: HashMap<PeerAddr, Vec<QuicAxonInfo>>;
1089    {
1090        let mut st = state.write().await;
1091
1092        let active_hotkeys = st.registry.active_hotkeys();
1093        for hotkey in active_hotkeys {
1094            if !new_by_hotkey.contains_key(&hotkey) {
1095                if let Some(miner) = st.registry.deregister(&hotkey) {
1096                    let addr_key = miner.addr_key();
1097                    info!("Miner {} deregistered from {}", hotkey, addr_key);
1098                    if !st.registry.addr_has_hotkeys(&addr_key) {
1099                        if let Some(connection) = st.registry.remove_connection(&addr_key) {
1100                            connection.close(0u32.into(), b"miner_deregistered");
1101                        }
1102                        st.registry.remove_reconnect_state(&addr_key);
1103                    }
1104                }
1105            }
1106        }
1107
1108        let active_addrs = st.registry.active_addrs();
1109        for addr_key in &active_addrs {
1110            if st.registry.remove_reconnect_state(addr_key) {
1111                info!(
1112                    "Registry refresh reset reconnection backoff for {}",
1113                    addr_key
1114                );
1115            }
1116        }
1117
1118        let dead_addrs: Vec<PeerAddr> = active_addrs
1119            .iter()
1120            .filter(|addr| {
1121                st.registry
1122                    .get_connection(addr)
1123                    .is_some_and(|c| c.close_reason().is_some())
1124            })
1125            .cloned()
1126            .collect();
1127        for addr_key in &dead_addrs {
1128            if let Some(conn) = st.registry.remove_connection(addr_key) {
1129                let hotkeys = st.registry.hotkeys_at_addr(addr_key);
1130                info!(
1131                    addr = %addr_key,
1132                    close_reason = ?conn.close_reason(),
1133                    hotkeys = ?hotkeys,
1134                    "Pruning dead connection and deregistering miners"
1135                );
1136                for hk in &hotkeys {
1137                    st.registry.deregister(hk);
1138                }
1139            }
1140        }
1141
1142        for new_miner in new_by_hotkey.values() {
1143            if let Some(old_miner) = st.registry.active_miner(&new_miner.hotkey) {
1144                let old_addr = old_miner.addr_key();
1145                let new_addr = new_miner.addr_key();
1146                if old_addr != new_addr {
1147                    info!(
1148                        "Miner {} changed address from {} to {}",
1149                        new_miner.hotkey, old_addr, new_addr
1150                    );
1151                    st.registry.deregister(&new_miner.hotkey);
1152                    if !st.registry.addr_has_hotkeys(&old_addr) {
1153                        if let Some(conn) = st.registry.remove_connection(&old_addr) {
1154                            conn.close(0u32.into(), b"miner_addr_changed");
1155                        }
1156                        st.registry.remove_reconnect_state(&old_addr);
1157                    }
1158                }
1159            }
1160        }
1161
1162        let new_hotkeys: Vec<QuicAxonInfo> = new_by_hotkey
1163            .values()
1164            .filter(|m| !st.registry.contains_active_miner(&m.hotkey))
1165            .cloned()
1166            .collect();
1167
1168        let mut need_auth = Vec::new();
1169        let mut need_connect: HashMap<PeerAddr, Vec<QuicAxonInfo>> = HashMap::new();
1170        for miner in new_hotkeys {
1171            let addr_key = miner.addr_key();
1172            if st.registry.contains_connection(&addr_key) {
1173                need_auth.push(miner);
1174            } else {
1175                need_connect.entry(addr_key).or_default().push(miner);
1176            }
1177        }
1178
1179        let active_count = st.registry.connection_count();
1180        let remaining_capacity = config.max_connections.saturating_sub(active_count);
1181        if need_connect.len() > remaining_capacity {
1182            warn!(
1183                "Connection limit ({}) reached with {} active, skipping {} of {} new addresses",
1184                config.max_connections,
1185                active_count,
1186                need_connect.len() - remaining_capacity,
1187                need_connect.len()
1188            );
1189        }
1190
1191        new_hotkeys_needing_auth = need_auth;
1192        new_addrs_needing_connect = need_connect.into_iter().take(remaining_capacity).collect();
1193    }
1194
1195    let timeout = config.connect_timeout;
1196    let max_fp = config.max_frame_payload_bytes;
1197
1198    if !new_hotkeys_needing_auth.is_empty() {
1199        let miners_with_conns: Vec<(QuicAxonInfo, Connection)> = {
1200            let st = state.read().await;
1201            new_hotkeys_needing_auth
1202                .into_iter()
1203                .filter_map(|miner| {
1204                    let addr_key = miner.addr_key();
1205                    st.registry
1206                        .get_connection(&addr_key)
1207                        .map(|conn| (miner, conn))
1208                })
1209                .collect()
1210        };
1211
1212        let mut authenticated = Vec::new();
1213        for (miner, conn) in &miners_with_conns {
1214            let addr_key = miner.addr_key();
1215            match tokio::time::timeout(
1216                timeout,
1217                authenticate_handshake(conn, &miner.hotkey, wallet_hotkey, signer, max_fp),
1218            )
1219            .await
1220            {
1221                Ok(Ok(())) => {
1222                    info!(
1223                        "Authenticated new miner {} on existing connection to {}",
1224                        miner.hotkey, addr_key
1225                    );
1226                    authenticated.push(miner.clone());
1227                }
1228                Ok(Err(e)) => {
1229                    warn!(
1230                        "Handshake failed for new hotkey {} at {}: {}",
1231                        miner.hotkey, addr_key, e
1232                    );
1233                }
1234                Err(_) => {
1235                    warn!(
1236                        "Handshake timed out for new hotkey {} at {}",
1237                        miner.hotkey, addr_key
1238                    );
1239                }
1240            }
1241        }
1242
1243        let mut st = state.write().await;
1244        for miner in authenticated {
1245            st.registry.register(miner);
1246        }
1247    }
1248
1249    if !new_addrs_needing_connect.is_empty() {
1250        let mut set = tokio::task::JoinSet::new();
1251        for (addr_key, miners_at_addr) in new_addrs_needing_connect {
1252            info!(
1253                "New address detected, establishing QUIC connection: {}",
1254                addr_key
1255            );
1256            let ep = endpoint.clone();
1257            let wh = wallet_hotkey.to_string();
1258            let s = signer.clone();
1259            set.spawn(connect_and_authenticate_per_address(
1260                ep,
1261                wh,
1262                s,
1263                addr_key,
1264                miners_at_addr,
1265                timeout,
1266                max_fp,
1267            ));
1268        }
1269
1270        let mut results = Vec::new();
1271        while let Some(join_result) = set.join_next().await {
1272            match join_result {
1273                Ok((addr_key, conn_result, authenticated)) => {
1274                    results.push((addr_key, conn_result, authenticated));
1275                }
1276                Err(e) => {
1277                    error!("Connection task panicked: {}", e);
1278                }
1279            }
1280        }
1281
1282        let mut st = state.write().await;
1283        for (addr_key, conn_result, authenticated) in results {
1284            match conn_result {
1285                Ok(connection) => {
1286                    if authenticated.is_empty() {
1287                        warn!(
1288                            "No hotkeys authenticated at {}, dropping connection",
1289                            addr_key
1290                        );
1291                        connection.close(0u32.into(), b"no_authenticated_hotkeys");
1292                    } else {
1293                        for miner in authenticated {
1294                            st.registry.register(miner);
1295                        }
1296                        st.registry.set_connection(addr_key, connection);
1297                    }
1298                }
1299                Err(e) => {
1300                    error!("Failed to connect to {}: {}", addr_key, e);
1301                }
1302            }
1303        }
1304    }
1305
1306    Ok(())
1307}
1308
1309fn get_peer_cert_fingerprint(connection: &Connection) -> Option<[u8; 32]> {
1310    let identity = connection.peer_identity()?;
1311    let certs = identity.downcast::<Vec<CertificateDer<'static>>>().ok()?;
1312    let first = certs.first()?;
1313    Some(blake2_256(first.as_ref()))
1314}
1315
1316async fn quic_connect(
1317    endpoint: &Endpoint,
1318    addr_key: &PeerAddr,
1319    server_name: &str,
1320) -> Result<Connection> {
1321    let addr: SocketAddr = addr_key
1322        .as_ref()
1323        .parse()
1324        .map_err(|e| LightningError::Connection(format!("Invalid address: {}", e)))?;
1325
1326    endpoint
1327        .connect(addr, server_name)
1328        .map_err(|e| LightningError::Connection(format!("Connection failed: {}", e)))?
1329        .await
1330        .map_err(|e| LightningError::Connection(format!("Connection handshake failed: {}", e)))
1331}
1332
1333async fn connect_and_authenticate_per_address(
1334    endpoint: Endpoint,
1335    wallet_hotkey: String,
1336    signer: Arc<dyn Signer>,
1337    addr_key: PeerAddr,
1338    miners_at_addr: Vec<QuicAxonInfo>,
1339    timeout: Duration,
1340    max_frame_payload: usize,
1341) -> (PeerAddr, Result<Connection>, Vec<QuicAxonInfo>) {
1342    let first = match miners_at_addr.first() {
1343        Some(m) => m,
1344        None => {
1345            return (
1346                addr_key,
1347                Err(LightningError::Connection("no miners for address".into())),
1348                vec![],
1349            );
1350        }
1351    };
1352
1353    let conn = match tokio::time::timeout(timeout, quic_connect(&endpoint, &addr_key, &first.ip))
1354        .await
1355    {
1356        Ok(Ok(c)) => c,
1357        Ok(Err(e)) => return (addr_key, Err(e), vec![]),
1358        Err(_) => {
1359            let err = LightningError::Connection(format!("Connection to {} timed out", addr_key));
1360            return (addr_key, Err(err), vec![]);
1361        }
1362    };
1363
1364    let mut authenticated = Vec::new();
1365    for miner in &miners_at_addr {
1366        match tokio::time::timeout(
1367            timeout,
1368            authenticate_handshake(
1369                &conn,
1370                &miner.hotkey,
1371                &wallet_hotkey,
1372                &signer,
1373                max_frame_payload,
1374            ),
1375        )
1376        .await
1377        {
1378            Ok(Ok(())) => authenticated.push(miner.clone()),
1379            Ok(Err(e)) => {
1380                warn!(
1381                    "Handshake failed for hotkey {} at {}: {}",
1382                    miner.hotkey, addr_key, e
1383                );
1384            }
1385            Err(_) => {
1386                warn!(
1387                    "Handshake timed out for hotkey {} at {}",
1388                    miner.hotkey, addr_key
1389                );
1390            }
1391        }
1392    }
1393
1394    (addr_key, Ok(conn), authenticated)
1395}
1396
1397async fn authenticate_handshake(
1398    connection: &Connection,
1399    expected_hotkey: &str,
1400    wallet_hotkey: &str,
1401    signer: &Arc<dyn Signer>,
1402    max_frame_payload: usize,
1403) -> Result<()> {
1404    let peer_cert_fp = get_peer_cert_fingerprint(connection).ok_or_else(|| {
1405        LightningError::Handshake("peer certificate not available for fingerprinting".to_string())
1406    })?;
1407    let peer_cert_fp_b64 = BASE64_STANDARD.encode(peer_cert_fp);
1408
1409    let nonce = generate_nonce();
1410    let timestamp = unix_timestamp_secs();
1411    let message = handshake_request_message(wallet_hotkey, timestamp, &nonce, &peer_cert_fp_b64);
1412    let msg_bytes = message.into_bytes();
1413    let signer_clone = signer.clone();
1414    let signature_bytes = tokio::task::spawn_blocking(move || signer_clone.sign(&msg_bytes))
1415        .await
1416        .map_err(|e| LightningError::Signing(format!("signer task failed: {}", e)))??;
1417
1418    let handshake_request = HandshakeRequest {
1419        validator_hotkey: wallet_hotkey.to_string(),
1420        timestamp,
1421        nonce: nonce.clone(),
1422        signature: BASE64_STANDARD.encode(&signature_bytes),
1423    };
1424
1425    let response = send_handshake(connection, handshake_request, max_frame_payload).await?;
1426    if !response.accepted {
1427        return Err(LightningError::Handshake(
1428            "Handshake rejected by miner".into(),
1429        ));
1430    }
1431
1432    if response.miner_hotkey != expected_hotkey {
1433        return Err(LightningError::Handshake(format!(
1434            "Miner hotkey mismatch: expected {}, got {}",
1435            expected_hotkey, response.miner_hotkey
1436        )));
1437    }
1438
1439    match response.cert_fingerprint {
1440        Some(ref resp_fp) if *resp_fp == peer_cert_fp_b64 => {}
1441        Some(_) => {
1442            return Err(LightningError::Handshake(
1443                "Cert fingerprint mismatch between TLS session and handshake response".to_string(),
1444            ));
1445        }
1446        None => {
1447            return Err(LightningError::Handshake(
1448                "Miner handshake response omitted required cert fingerprint".to_string(),
1449            ));
1450        }
1451    }
1452
1453    verify_miner_response_signature(&response, wallet_hotkey, &nonce, &peer_cert_fp_b64).await?;
1454
1455    info!("Handshake successful with miner {}", expected_hotkey);
1456    Ok(())
1457}
1458
1459async fn connect_and_handshake(
1460    endpoint: Endpoint,
1461    miner: QuicAxonInfo,
1462    wallet_hotkey: String,
1463    signer: Arc<dyn Signer>,
1464    max_frame_payload: usize,
1465) -> Result<Connection> {
1466    let addr_key = miner.addr_key();
1467    let connection = quic_connect(&endpoint, &addr_key, &miner.ip).await?;
1468    authenticate_handshake(
1469        &connection,
1470        &miner.hotkey,
1471        &wallet_hotkey,
1472        &signer,
1473        max_frame_payload,
1474    )
1475    .await?;
1476    Ok(connection)
1477}
1478
1479async fn verify_miner_response_signature(
1480    response: &HandshakeResponse,
1481    validator_hotkey: &str,
1482    nonce: &str,
1483    cert_fp_b64: &str,
1484) -> Result<()> {
1485    if response.signature.is_empty() {
1486        return Err(LightningError::Handshake(
1487            "Miner returned empty signature".to_string(),
1488        ));
1489    }
1490
1491    let expected_message = handshake_response_message(
1492        validator_hotkey,
1493        &response.miner_hotkey,
1494        response.timestamp,
1495        nonce,
1496        cert_fp_b64,
1497    );
1498
1499    let valid = crate::signing::verify_sr25519_signature(
1500        &response.miner_hotkey,
1501        &response.signature,
1502        &expected_message,
1503    )
1504    .await?;
1505
1506    if !valid {
1507        return Err(LightningError::Handshake(
1508            "Miner response signature verification failed".to_string(),
1509        ));
1510    }
1511
1512    Ok(())
1513}
1514
1515async fn send_handshake(
1516    connection: &Connection,
1517    request: HandshakeRequest,
1518    max_frame_payload: usize,
1519) -> Result<HandshakeResponse> {
1520    let (mut send, mut recv) = connection.open_bi().await.map_err(|e| {
1521        LightningError::Connection(format!("Failed to open bidirectional stream: {}", e))
1522    })?;
1523
1524    let request_bytes = rmp_serde::to_vec(&request).map_err(|e| {
1525        LightningError::Serialization(format!("Failed to serialize handshake: {}", e))
1526    })?;
1527
1528    write_frame_and_finish(&mut send, MessageType::HandshakeRequest, &request_bytes).await?;
1529
1530    let (msg_type, payload) = read_frame(&mut recv, max_frame_payload).await?;
1531    if msg_type != MessageType::HandshakeResponse {
1532        return Err(LightningError::Handshake(format!(
1533            "Expected HandshakeResponse, got {:?}",
1534            msg_type
1535        )));
1536    }
1537
1538    let response: HandshakeResponse = rmp_serde::from_slice(&payload).map_err(|e| {
1539        LightningError::Serialization(format!("Failed to parse handshake response: {}", e))
1540    })?;
1541
1542    Ok(response)
1543}
1544
1545async fn send_synapse_frame(send: &mut quinn::SendStream, request: QuicRequest) -> Result<()> {
1546    let synapse_packet = SynapsePacket {
1547        synapse_type: request.synapse_type,
1548        data: request.data,
1549        timestamp: unix_timestamp_secs(),
1550    };
1551
1552    let packet_bytes = rmp_serde::to_vec(&synapse_packet).map_err(|e| {
1553        LightningError::Serialization(format!("Failed to serialize synapse packet: {}", e))
1554    })?;
1555
1556    write_frame_and_finish(send, MessageType::SynapsePacket, &packet_bytes).await
1557}
1558
1559async fn send_synapse_packet(
1560    connection: &Connection,
1561    request: QuicRequest,
1562    max_frame_payload: usize,
1563) -> Result<QuicResponse> {
1564    let stable_id = connection.stable_id();
1565    debug!(stable_id, "send_synapse_packet: opening bi stream");
1566    let (mut send, mut recv) = connection
1567        .open_bi()
1568        .await
1569        .map_err(|e| LightningError::Connection(format!("Failed to open stream: {}", e)))?;
1570    debug!(stable_id, "send_synapse_packet: bi stream opened");
1571
1572    let start = Instant::now();
1573
1574    send_synapse_frame(&mut send, request).await?;
1575    debug!(
1576        stable_id,
1577        "send_synapse_packet: frame sent, awaiting response"
1578    );
1579
1580    let (msg_type, payload) = read_frame(&mut recv, max_frame_payload).await?;
1581    debug!(stable_id, msg_type = ?msg_type, elapsed_ms = start.elapsed().as_millis() as u64, "send_synapse_packet: response received");
1582
1583    match msg_type {
1584        MessageType::SynapseResponse => {
1585            let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
1586            let synapse_response: SynapseResponse =
1587                rmp_serde::from_slice(&payload).map_err(|e| {
1588                    LightningError::Serialization(format!(
1589                        "Failed to parse synapse response: {}",
1590                        e
1591                    ))
1592                })?;
1593
1594            Ok(QuicResponse {
1595                success: synapse_response.success,
1596                data: synapse_response.data,
1597                latency_ms,
1598                error: synapse_response.error,
1599            })
1600        }
1601        MessageType::StreamChunk => Err(LightningError::Transport(
1602            "received StreamChunk on non-streaming query; use query_axon_stream for streaming synapses".to_string(),
1603        )),
1604        other => Err(LightningError::Transport(format!(
1605            "unexpected response type: {:?}",
1606            other
1607        ))),
1608    }
1609}
1610
1611async fn open_streaming_synapse(
1612    connection: &Connection,
1613    request: QuicRequest,
1614    max_frame_payload: usize,
1615    max_stream_payload: usize,
1616    chunk_timeout: Option<Duration>,
1617) -> Result<StreamingResponse> {
1618    let (mut send, recv) = connection
1619        .open_bi()
1620        .await
1621        .map_err(|e| LightningError::Connection(format!("Failed to open stream: {}", e)))?;
1622
1623    send_synapse_frame(&mut send, request).await?;
1624
1625    Ok(StreamingResponse {
1626        recv,
1627        max_payload: max_frame_payload,
1628        max_stream_payload,
1629        chunk_timeout,
1630    })
1631}
1632
1633fn generate_nonce() -> String {
1634    use rand::Rng;
1635    let bytes: [u8; 16] = rand::thread_rng().gen();
1636    format!("{:032x}", u128::from_be_bytes(bytes))
1637}
1638
1639#[cfg(test)]
1640mod tests {
1641    use super::*;
1642    use sp_core::{crypto::Ss58Codec, sr25519, Pair};
1643
1644    const MINER_SEED: [u8; 32] = [1u8; 32];
1645    const VALIDATOR_SEED: [u8; 32] = [2u8; 32];
1646
1647    fn make_signed_response(
1648        miner_seed: [u8; 32],
1649        validator_hotkey: &str,
1650        nonce: &str,
1651        cert_fp_b64: &str,
1652    ) -> HandshakeResponse {
1653        let pair = sr25519::Pair::from_seed(&miner_seed);
1654        let miner_hotkey = pair.public().to_ss58check();
1655        let timestamp = unix_timestamp_secs();
1656        let message = handshake_response_message(
1657            validator_hotkey,
1658            &miner_hotkey,
1659            timestamp,
1660            nonce,
1661            cert_fp_b64,
1662        );
1663        let signature = pair.sign(message.as_bytes());
1664        HandshakeResponse {
1665            miner_hotkey,
1666            timestamp,
1667            signature: BASE64_STANDARD.encode(signature.0),
1668            accepted: true,
1669            connection_id: "test".to_string(),
1670            cert_fingerprint: Some(cert_fp_b64.to_string()),
1671        }
1672    }
1673
1674    fn validator_hotkey() -> String {
1675        sr25519::Pair::from_seed(&VALIDATOR_SEED)
1676            .public()
1677            .to_ss58check()
1678    }
1679
1680    #[tokio::test]
1681    async fn verify_valid_miner_signature() {
1682        let nonce = "test-nonce";
1683        let fp = "dGVzdC1mcA==";
1684        let resp = make_signed_response(MINER_SEED, &validator_hotkey(), nonce, fp);
1685        assert!(
1686            verify_miner_response_signature(&resp, &validator_hotkey(), nonce, fp)
1687                .await
1688                .is_ok()
1689        );
1690    }
1691
1692    #[tokio::test]
1693    async fn verify_rejects_empty_signature() {
1694        let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1695        resp.signature = String::new();
1696        let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1697            .await
1698            .unwrap_err();
1699        assert!(err.to_string().contains("empty signature"));
1700    }
1701
1702    #[tokio::test]
1703    async fn verify_rejects_invalid_base64() {
1704        let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1705        resp.signature = "not-valid-base64!!!".to_string();
1706        let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1707            .await
1708            .unwrap_err();
1709        assert!(err.to_string().contains("Failed to decode signature"));
1710    }
1711
1712    #[tokio::test]
1713    async fn verify_rejects_wrong_signature_length() {
1714        let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1715        resp.signature = BASE64_STANDARD.encode([0u8; 32]);
1716        let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1717            .await
1718            .unwrap_err();
1719        assert!(err.to_string().contains("Invalid signature length"));
1720    }
1721
1722    #[tokio::test]
1723    async fn verify_rejects_bad_ss58_address() {
1724        let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1725        resp.miner_hotkey = "not_a_valid_ss58".to_string();
1726        let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1727            .await
1728            .unwrap_err();
1729        assert!(err.to_string().contains("Invalid SS58 address"));
1730    }
1731
1732    #[tokio::test]
1733    async fn verify_rejects_wrong_signer() {
1734        let nonce = "n";
1735        let fp = "fp";
1736        let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), nonce, fp);
1737        let wrong_pair = sr25519::Pair::from_seed(&[99u8; 32]);
1738        resp.miner_hotkey = wrong_pair.public().to_ss58check();
1739        let err = verify_miner_response_signature(&resp, &validator_hotkey(), nonce, fp)
1740            .await
1741            .unwrap_err();
1742        assert!(err.to_string().contains("signature verification failed"));
1743    }
1744
1745    #[tokio::test]
1746    async fn verify_rejects_tampered_nonce() {
1747        let fp = "fp";
1748        let resp = make_signed_response(MINER_SEED, &validator_hotkey(), "original-nonce", fp);
1749        let err = verify_miner_response_signature(&resp, &validator_hotkey(), "tampered-nonce", fp)
1750            .await
1751            .unwrap_err();
1752        assert!(err.to_string().contains("signature verification failed"));
1753    }
1754
1755    #[test]
1756    fn with_config_rejects_frame_payload_below_minimum() {
1757        let cfg = LightningClientConfig {
1758            max_frame_payload_bytes: 512,
1759            ..LightningClientConfig::default()
1760        };
1761        assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1762    }
1763
1764    #[test]
1765    fn with_config_rejects_frame_payload_above_u32_max() {
1766        let too_big: u128 = u32::MAX as u128 + 1;
1767        let val = match usize::try_from(too_big) {
1768            Ok(v) => v,
1769            Err(_) => return,
1770        };
1771        let cfg = LightningClientConfig {
1772            max_frame_payload_bytes: val,
1773            max_stream_payload_bytes: val,
1774            ..LightningClientConfig::default()
1775        };
1776        assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1777    }
1778
1779    #[test]
1780    fn with_config_rejects_stream_below_frame() {
1781        let base = LightningClientConfig::default();
1782        let cfg = LightningClientConfig {
1783            max_stream_payload_bytes: base.max_frame_payload_bytes - 1,
1784            ..base
1785        };
1786        assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1787    }
1788
1789    #[test]
1790    fn with_config_rejects_zero_stream_chunk_timeout() {
1791        let cfg = LightningClientConfig {
1792            stream_chunk_timeout: Some(Duration::ZERO),
1793            ..LightningClientConfig::default()
1794        };
1795        assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1796    }
1797
1798    #[test]
1799    fn with_config_default_succeeds() {
1800        assert!(
1801            LightningClient::with_config("hk".into(), LightningClientConfig::default()).is_ok()
1802        );
1803    }
1804}
1805
1806// Deliberately disables TLS PKI certificate validation. TLS still provides transport
1807// encryption but not identity authentication. Authenticity is instead enforced at the
1808// application layer: the handshake exchanges certificate fingerprints and verifies
1809// sr25519 signatures over them (see connect_and_authenticate_per_address / authenticate_handshake).
1810#[derive(Debug)]
1811struct AcceptAnyCertVerifier;
1812
1813impl ServerCertVerifier for AcceptAnyCertVerifier {
1814    fn verify_server_cert(
1815        &self,
1816        _end_entity: &CertificateDer<'_>,
1817        _intermediates: &[CertificateDer<'_>],
1818        _server_name: &ServerName<'_>,
1819        _ocsp_response: &[u8],
1820        _now: UnixTime,
1821    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
1822        Ok(ServerCertVerified::assertion())
1823    }
1824
1825    fn verify_tls12_signature(
1826        &self,
1827        _message: &[u8],
1828        _cert: &CertificateDer<'_>,
1829        _dss: &rustls::DigitallySignedStruct,
1830    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
1831        Err(rustls::Error::PeerIncompatible(
1832            rustls::PeerIncompatible::Tls12NotOffered,
1833        ))
1834    }
1835
1836    fn verify_tls13_signature(
1837        &self,
1838        _message: &[u8],
1839        _cert: &CertificateDer<'_>,
1840        _dss: &rustls::DigitallySignedStruct,
1841    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
1842        Ok(HandshakeSignatureValid::assertion())
1843    }
1844
1845    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1846        rustls::crypto::ring::default_provider()
1847            .signature_verification_algorithms
1848            .supported_schemes()
1849    }
1850}