1use crate::error::{LightningError, Result};
2use crate::registry::MinerRegistry;
3use crate::signing::Signer;
4use crate::types::{
5 handshake_request_message, handshake_response_message, read_frame, write_frame_and_finish,
6 HandshakeRequest, HandshakeResponse, MessageType, PeerAddr, QuicAxonInfo, QuicRequest,
7 QuicResponse, StreamChunk, StreamEnd, SynapsePacket, SynapseResponse,
8 DEFAULT_MAX_FRAME_PAYLOAD,
9};
10use crate::util::unix_timestamp_secs;
11use base64::{prelude::BASE64_STANDARD, Engine};
12use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, TransportConfig};
13use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
14use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
15use rustls::ClientConfig as RustlsClientConfig;
16use sp_core::blake2_256;
17use std::collections::HashMap;
18use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::sync::RwLock;
22use tokio::time::Instant;
23use tracing::{debug, error, info, instrument, warn};
24
25#[cfg(feature = "subtensor")]
26use crate::metagraph::{Metagraph, MetagraphMonitorConfig};
27#[cfg(feature = "subtensor")]
28use subxt::{OnlineClient, PolkadotConfig};
29
30#[derive(Clone)]
34pub struct LightningClientConfig {
35 pub connect_timeout: Duration,
37 pub idle_timeout: Duration,
39 pub keep_alive_interval: Duration,
41 pub reconnect_initial_backoff: Duration,
43 pub reconnect_max_backoff: Duration,
45 pub reconnect_max_retries: u32,
47 pub reconnect_slow_probe_interval: Option<Duration>,
51 pub max_connections: usize,
53 pub max_frame_payload_bytes: usize,
55 pub max_stream_payload_bytes: usize,
59 pub stream_chunk_timeout: Option<Duration>,
62 #[cfg(feature = "subtensor")]
65 pub metagraph: Option<MetagraphMonitorConfig>,
66}
67
68impl Default for LightningClientConfig {
69 fn default() -> Self {
70 Self {
71 connect_timeout: Duration::from_secs(10),
72 idle_timeout: Duration::from_secs(150),
73 keep_alive_interval: Duration::from_secs(30),
74 reconnect_initial_backoff: Duration::from_secs(1),
75 reconnect_max_backoff: Duration::from_secs(60),
76 reconnect_max_retries: 5,
77 reconnect_slow_probe_interval: Some(Duration::from_secs(60)),
78 max_connections: 1024,
79 max_frame_payload_bytes: DEFAULT_MAX_FRAME_PAYLOAD,
80 max_stream_payload_bytes: DEFAULT_MAX_FRAME_PAYLOAD,
81 stream_chunk_timeout: None,
82 #[cfg(feature = "subtensor")]
83 metagraph: None,
84 }
85 }
86}
87
88impl LightningClientConfig {
89 pub fn builder() -> LightningClientConfigBuilder {
90 LightningClientConfigBuilder {
91 config: Self::default(),
92 }
93 }
94
95 fn validate(&self) -> Result<()> {
96 if self.connect_timeout.is_zero() {
97 return Err(LightningError::Config(
98 "connect_timeout must be non-zero".into(),
99 ));
100 }
101 if self.idle_timeout.is_zero() {
102 return Err(LightningError::Config(
103 "idle_timeout must be non-zero".into(),
104 ));
105 }
106 if self.keep_alive_interval.is_zero() {
107 return Err(LightningError::Config(
108 "keep_alive_interval must be non-zero".into(),
109 ));
110 }
111 if self.keep_alive_interval >= self.idle_timeout {
112 return Err(LightningError::Config(format!(
113 "keep_alive_interval ({:?}) must be less than idle_timeout ({:?})",
114 self.keep_alive_interval, self.idle_timeout
115 )));
116 }
117 if self.reconnect_initial_backoff.is_zero() {
118 return Err(LightningError::Config(
119 "reconnect_initial_backoff must be non-zero".into(),
120 ));
121 }
122 if self.reconnect_max_backoff.is_zero() {
123 return Err(LightningError::Config(
124 "reconnect_max_backoff must be non-zero".into(),
125 ));
126 }
127 if self.reconnect_initial_backoff > self.reconnect_max_backoff {
128 return Err(LightningError::Config(format!(
129 "reconnect_initial_backoff ({:?}) must be <= reconnect_max_backoff ({:?})",
130 self.reconnect_initial_backoff, self.reconnect_max_backoff
131 )));
132 }
133 if self.reconnect_max_retries == 0 {
134 return Err(LightningError::Config(
135 "reconnect_max_retries must be at least 1".into(),
136 ));
137 }
138 if self
139 .reconnect_slow_probe_interval
140 .is_some_and(|d| d.is_zero())
141 {
142 return Err(LightningError::Config(
143 "reconnect_slow_probe_interval must be non-zero when set".into(),
144 ));
145 }
146 if self.max_connections == 0 {
147 return Err(LightningError::Config(
148 "max_connections must be at least 1".into(),
149 ));
150 }
151 if self.max_frame_payload_bytes < 1_048_576 {
152 return Err(LightningError::Config(format!(
153 "max_frame_payload_bytes ({}) must be at least 1048576 (1 MB)",
154 self.max_frame_payload_bytes
155 )));
156 }
157 if self.max_frame_payload_bytes > u32::MAX as usize {
158 return Err(LightningError::Config(format!(
159 "max_frame_payload_bytes ({}) must not exceed {} (u32::MAX)",
160 self.max_frame_payload_bytes,
161 u32::MAX
162 )));
163 }
164 if self.stream_chunk_timeout.is_some_and(|d| d.is_zero()) {
165 return Err(LightningError::Config(
166 "stream_chunk_timeout must be non-zero".into(),
167 ));
168 }
169 if self.max_stream_payload_bytes < self.max_frame_payload_bytes {
170 return Err(LightningError::Config(format!(
171 "max_stream_payload_bytes ({}) must be >= max_frame_payload_bytes ({})",
172 self.max_stream_payload_bytes, self.max_frame_payload_bytes
173 )));
174 }
175 Ok(())
176 }
177}
178
179pub struct LightningClientConfigBuilder {
180 config: LightningClientConfig,
181}
182
183impl LightningClientConfigBuilder {
184 pub fn connect_timeout(mut self, val: Duration) -> Self {
185 self.config.connect_timeout = val;
186 self
187 }
188 pub fn idle_timeout(mut self, val: Duration) -> Self {
189 self.config.idle_timeout = val;
190 self
191 }
192 pub fn keep_alive_interval(mut self, val: Duration) -> Self {
193 self.config.keep_alive_interval = val;
194 self
195 }
196 pub fn reconnect_initial_backoff(mut self, val: Duration) -> Self {
197 self.config.reconnect_initial_backoff = val;
198 self
199 }
200 pub fn reconnect_max_backoff(mut self, val: Duration) -> Self {
201 self.config.reconnect_max_backoff = val;
202 self
203 }
204 pub fn reconnect_max_retries(mut self, val: u32) -> Self {
205 self.config.reconnect_max_retries = val;
206 self
207 }
208 pub fn reconnect_slow_probe_interval(mut self, val: Option<Duration>) -> Self {
209 self.config.reconnect_slow_probe_interval = val;
210 self
211 }
212 pub fn max_connections(mut self, val: usize) -> Self {
213 self.config.max_connections = val;
214 self
215 }
216 pub fn max_frame_payload_bytes(mut self, val: usize) -> Self {
217 self.config.max_frame_payload_bytes = val;
218 self
219 }
220 pub fn max_stream_payload_bytes(mut self, val: usize) -> Self {
221 self.config.max_stream_payload_bytes = val;
222 self
223 }
224 pub fn stream_chunk_timeout(mut self, val: Duration) -> Self {
225 self.config.stream_chunk_timeout = Some(val);
226 self
227 }
228 #[cfg(feature = "subtensor")]
229 pub fn metagraph(mut self, val: MetagraphMonitorConfig) -> Self {
230 self.config.metagraph = Some(val);
231 self
232 }
233 pub fn build(self) -> Result<LightningClientConfig> {
234 self.config.validate()?;
235 Ok(self.config)
236 }
237}
238
239struct ClientState {
240 registry: MinerRegistry,
241 #[cfg(feature = "subtensor")]
242 metagraph_shutdown: Option<tokio::sync::watch::Sender<bool>>,
243 #[cfg(feature = "subtensor")]
244 metagraph_handle: Option<tokio::task::JoinHandle<()>>,
245}
246
247pub struct StreamingResponse {
252 recv: quinn::RecvStream,
253 max_payload: usize,
254 max_stream_payload: usize,
255 chunk_timeout: Option<Duration>,
256}
257
258impl StreamingResponse {
259 pub async fn next_chunk(&mut self) -> Result<Option<Vec<u8>>> {
261 let frame_result = match self.chunk_timeout {
262 Some(timeout) => {
263 match tokio::time::timeout(timeout, read_frame(&mut self.recv, self.max_payload))
264 .await
265 {
266 Ok(r) => r,
267 Err(_) => {
268 self.recv.stop(0u32.into()).ok();
269 return Err(LightningError::Stream("chunk read timed out".to_string()));
270 }
271 }
272 }
273 None => read_frame(&mut self.recv, self.max_payload).await,
274 };
275 match frame_result {
276 Ok((MessageType::StreamChunk, payload)) => {
277 let chunk: StreamChunk = rmp_serde::from_slice(&payload).map_err(|e| {
278 LightningError::Serialization(format!("Failed to parse stream chunk: {}", e))
279 })?;
280 Ok(Some(chunk.data))
281 }
282 Ok((MessageType::StreamEnd, payload)) => {
283 let end: StreamEnd = rmp_serde::from_slice(&payload).map_err(|e| {
284 LightningError::Serialization(format!("Failed to parse stream end: {}", e))
285 })?;
286 if end.success {
287 Ok(None)
288 } else {
289 Err(LightningError::Stream(end.error.unwrap_or_else(|| {
290 "stream ended with failure status".to_string()
291 })))
292 }
293 }
294 Ok((MessageType::SynapseResponse, payload)) => {
295 let detail = rmp_serde::from_slice::<SynapseResponse>(&payload)
296 .ok()
297 .and_then(|r| r.error)
298 .unwrap_or_else(|| "no detail".to_string());
299 Err(LightningError::Stream(format!(
300 "server returned SynapseResponse error on streaming path: {}",
301 detail
302 )))
303 }
304 Ok((msg_type, _)) => Err(LightningError::Stream(format!(
305 "unexpected message type during streaming: {:?}",
306 msg_type
307 ))),
308 Err(e) => Err(e),
309 }
310 }
311
312 pub async fn collect_all(&mut self) -> Result<Vec<Vec<u8>>> {
314 let mut chunks = Vec::new();
315 let mut total_size: usize = 0;
316 while let Some(chunk) = self.next_chunk().await? {
317 total_size = total_size.checked_add(chunk.len()).ok_or_else(|| {
318 LightningError::Stream("streaming response size overflow".to_string())
319 })?;
320 if total_size > self.max_stream_payload {
321 return Err(LightningError::Stream(format!(
322 "streaming response exceeded {} byte aggregate limit",
323 self.max_stream_payload
324 )));
325 }
326 chunks.push(chunk);
327 }
328 Ok(chunks)
329 }
330}
331
332pub struct LightningClient {
338 config: LightningClientConfig,
339 wallet_hotkey: String,
340 signer: Option<Arc<dyn Signer>>,
341 state: Arc<RwLock<ClientState>>,
342 endpoint: Option<Endpoint>,
343}
344
345impl LightningClient {
346 pub fn new(wallet_hotkey: String) -> Self {
348 Self::with_config(wallet_hotkey, LightningClientConfig::default())
349 .expect("default config is always valid")
350 }
351
352 pub fn with_config(wallet_hotkey: String, config: LightningClientConfig) -> Result<Self> {
354 config.validate()?;
355 Ok(Self {
356 config,
357 wallet_hotkey,
358 signer: None,
359 state: Arc::new(RwLock::new(ClientState {
360 registry: MinerRegistry::new(),
361 #[cfg(feature = "subtensor")]
362 metagraph_shutdown: None,
363 #[cfg(feature = "subtensor")]
364 metagraph_handle: None,
365 })),
366 endpoint: None,
367 })
368 }
369
370 pub fn set_signer(&mut self, signer: Box<dyn Signer>) {
373 self.signer = Some(Arc::from(signer));
374 info!("Signer configured");
375 }
376
377 #[cfg(feature = "btwallet")]
379 pub fn set_wallet(
380 &mut self,
381 wallet_name: &str,
382 wallet_path: &str,
383 hotkey_name: &str,
384 ) -> Result<()> {
385 let signer =
386 crate::signing::BtWalletSigner::from_wallet(wallet_name, wallet_path, hotkey_name)?;
387 self.set_signer(Box::new(signer));
388 Ok(())
389 }
390
391 #[instrument(skip(self, miners), fields(miner_count = miners.len()))]
396 pub async fn initialize_connections(&mut self, miners: Vec<QuicAxonInfo>) -> Result<()> {
397 self.create_endpoint().await?;
398
399 let endpoint = self
400 .endpoint
401 .as_ref()
402 .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
403 .clone();
404 let wallet_hotkey = self.wallet_hotkey.clone();
405 let signer = self
406 .signer
407 .as_ref()
408 .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
409 .clone();
410 let timeout = self.config.connect_timeout;
411
412 let mut addr_groups: HashMap<PeerAddr, Vec<QuicAxonInfo>> = HashMap::new();
413 for miner in miners {
414 addr_groups.entry(miner.addr_key()).or_default().push(miner);
415 }
416
417 let (active_count, remaining_capacity) = {
418 let state = self.state.read().await;
419 let active = state.registry.connection_count();
420 (active, self.config.max_connections.saturating_sub(active))
421 };
422
423 let addr_groups: Vec<(PeerAddr, Vec<QuicAxonInfo>)> =
424 if addr_groups.len() > remaining_capacity {
425 warn!(
426 "Connection limit ({}) reached with {} active, skipping {} of {} new addresses",
427 self.config.max_connections,
428 active_count,
429 addr_groups.len() - remaining_capacity,
430 addr_groups.len()
431 );
432 addr_groups.into_iter().take(remaining_capacity).collect()
433 } else {
434 addr_groups.into_iter().collect()
435 };
436
437 let max_fp = self.config.max_frame_payload_bytes;
438 let mut set = tokio::task::JoinSet::new();
439 for (addr_key, miners_at_addr) in addr_groups {
440 let ep = endpoint.clone();
441 let wh = wallet_hotkey.clone();
442 let s = signer.clone();
443 set.spawn(connect_and_authenticate_per_address(
444 ep,
445 wh,
446 s,
447 addr_key,
448 miners_at_addr,
449 timeout,
450 max_fp,
451 ));
452 }
453
454 let mut results = Vec::new();
455 while let Some(join_result) = set.join_next().await {
456 match join_result {
457 Ok((addr_key, conn_result, authenticated)) => {
458 results.push((addr_key, conn_result, authenticated));
459 }
460 Err(e) => {
461 error!("Connection task panicked: {}", e);
462 }
463 }
464 }
465
466 let mut state = self.state.write().await;
467 for (addr_key, conn_result, authenticated) in results {
468 match conn_result {
469 Ok(connection) => {
470 if authenticated.is_empty() {
471 warn!(
472 "No hotkeys authenticated at {}, dropping connection",
473 addr_key
474 );
475 connection.close(0u32.into(), b"no_authenticated_hotkeys");
476 } else {
477 for miner in authenticated {
478 info!("Authenticated miner {} at {}", miner.hotkey, addr_key);
479 state.registry.register(miner);
480 }
481 state.registry.set_connection(addr_key, connection);
482 }
483 }
484 Err(e) => {
485 error!("Failed to connect to {}: {}", addr_key, e);
486 }
487 }
488 }
489
490 #[cfg(feature = "subtensor")]
491 if let Some(metagraph_config) = self.config.metagraph.clone() {
492 self.start_metagraph_monitor(metagraph_config).await?;
493 }
494
495 Ok(())
496 }
497
498 #[instrument(skip(self))]
501 pub async fn create_endpoint(&mut self) -> Result<()> {
502 let mut tls_config = RustlsClientConfig::builder_with_provider(
503 rustls::crypto::ring::default_provider().into(),
504 )
505 .with_safe_default_protocol_versions()
506 .map_err(|e| LightningError::Config(format!("Failed to set TLS versions: {}", e)))?
507 .dangerous()
508 .with_custom_certificate_verifier(Arc::new(AcceptAnyCertVerifier))
509 .with_no_client_auth();
510
511 tls_config.alpn_protocols = vec![b"btlightning".to_vec()];
512
513 let mut transport_config = TransportConfig::default();
514
515 let idle_timeout = IdleTimeout::try_from(self.config.idle_timeout)
516 .map_err(|e| LightningError::Config(format!("Failed to set idle timeout: {}", e)))?;
517 transport_config.max_idle_timeout(Some(idle_timeout));
518 transport_config.keep_alive_interval(Some(self.config.keep_alive_interval));
519
520 let quic_crypto =
521 quinn::crypto::rustls::QuicClientConfig::try_from(tls_config).map_err(|e| {
522 LightningError::Config(format!("Failed to create QUIC crypto config: {}", e))
523 })?;
524 let mut client_config = ClientConfig::new(Arc::new(quic_crypto));
525 client_config.transport_config(Arc::new(transport_config));
526
527 let bind_addr: SocketAddr = "0.0.0.0:0"
528 .parse()
529 .map_err(|e| LightningError::Config(format!("Failed to parse bind address: {}", e)))?;
530 let mut endpoint = Endpoint::client(bind_addr).map_err(|e| {
531 LightningError::Connection(format!("Failed to create QUIC endpoint: {}", e))
532 })?;
533 endpoint.set_default_client_config(client_config);
534 self.endpoint = Some(endpoint);
535
536 info!("QUIC client endpoint created");
537 Ok(())
538 }
539
540 #[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port))]
544 pub async fn query_axon(
545 &self,
546 axon_info: QuicAxonInfo,
547 request: QuicRequest,
548 ) -> Result<QuicResponse> {
549 let addr_key = axon_info.addr_key();
550
551 let connection = {
552 let state = self.state.read().await;
553 state.registry.get_connection(&addr_key)
554 };
555
556 let max_fp = self.config.max_frame_payload_bytes;
557 match connection {
558 Some(conn) if conn.close_reason().is_none() => {
559 debug!(
560 addr = %addr_key,
561 stable_id = conn.stable_id(),
562 "query_axon: connection alive, sending synapse"
563 );
564 send_synapse_packet(&conn, request, max_fp).await
565 }
566 Some(conn) => {
567 let reason = conn.close_reason();
568 warn!(
569 addr = %addr_key,
570 stable_id = conn.stable_id(),
571 close_reason = ?reason,
572 "QUIC connection closed, triggering reconnect"
573 );
574 self.try_reconnect_and_query(&addr_key, &axon_info, request)
575 .await
576 }
577 None => {
578 debug!(addr = %addr_key, "query_axon: no connection in registry");
579 self.try_reconnect_and_query(&addr_key, &axon_info, request)
580 .await
581 }
582 }
583 }
584
585 #[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port, timeout_ms = timeout.as_millis() as u64))]
587 pub async fn query_axon_with_timeout(
588 &self,
589 axon_info: QuicAxonInfo,
590 request: QuicRequest,
591 timeout: Duration,
592 ) -> Result<QuicResponse> {
593 tokio::time::timeout(timeout, self.query_axon(axon_info, request))
594 .await
595 .map_err(|_| LightningError::Transport("query timed out".into()))?
596 }
597
598 #[instrument(skip(self, axon_info, request), fields(miner_ip = %axon_info.ip, miner_port = axon_info.port))]
600 pub async fn query_axon_stream(
601 &self,
602 axon_info: QuicAxonInfo,
603 request: QuicRequest,
604 ) -> Result<StreamingResponse> {
605 let addr_key = axon_info.addr_key();
606
607 let connection = {
608 let state = self.state.read().await;
609 state.registry.get_connection(&addr_key)
610 };
611
612 let max_fp = self.config.max_frame_payload_bytes;
613 let max_sp = self.config.max_stream_payload_bytes;
614 match connection {
615 Some(conn) if conn.close_reason().is_none() => {
616 open_streaming_synapse(
617 &conn,
618 request,
619 max_fp,
620 max_sp,
621 self.config.stream_chunk_timeout,
622 )
623 .await
624 }
625 Some(conn) => {
626 let reason = conn.close_reason();
627 warn!(
628 addr = %addr_key,
629 close_reason = ?reason,
630 "QUIC connection closed, triggering reconnect (stream)"
631 );
632 self.try_reconnect_and_stream(&addr_key, &axon_info, request)
633 .await
634 }
635 None => {
636 self.try_reconnect_and_stream(&addr_key, &axon_info, request)
637 .await
638 }
639 }
640 }
641
642 async fn try_reconnect_and_query(
643 &self,
644 addr_key: &PeerAddr,
645 axon_info: &QuicAxonInfo,
646 request: QuicRequest,
647 ) -> Result<QuicResponse> {
648 let connection = self.try_reconnect(addr_key, axon_info).await?;
649 send_synapse_packet(&connection, request, self.config.max_frame_payload_bytes).await
650 }
651
652 async fn try_reconnect_and_stream(
653 &self,
654 addr_key: &PeerAddr,
655 axon_info: &QuicAxonInfo,
656 request: QuicRequest,
657 ) -> Result<StreamingResponse> {
658 let connection = self.try_reconnect(addr_key, axon_info).await?;
659 open_streaming_synapse(
660 &connection,
661 request,
662 self.config.max_frame_payload_bytes,
663 self.config.max_stream_payload_bytes,
664 self.config.stream_chunk_timeout,
665 )
666 .await
667 }
668
669 async fn try_reconnect(
670 &self,
671 addr_key: &PeerAddr,
672 axon_info: &QuicAxonInfo,
673 ) -> Result<Connection> {
674 let endpoint = self
675 .endpoint
676 .as_ref()
677 .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
678 .clone();
679 let signer = self
680 .signer
681 .as_ref()
682 .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
683 .clone();
684
685 {
686 let mut state = self.state.write().await;
687 if let Err(rejection) = state.registry.try_start_reconnect(
688 addr_key.clone(),
689 self.config.reconnect_max_retries,
690 self.config.reconnect_slow_probe_interval,
691 ) {
692 use crate::registry::ReconnectRejection;
693 return match rejection {
694 ReconnectRejection::Backoff { next } => {
695 Err(LightningError::Connection(format!(
696 "Reconnection to {} in backoff, next retry in {:?}",
697 addr_key,
698 next.saturating_duration_since(Instant::now())
699 )))
700 }
701 ReconnectRejection::Exhausted { attempts } => {
702 Err(LightningError::Connection(format!(
703 "Reconnection attempts exhausted for {} ({}/{}), awaiting registry refresh",
704 addr_key, attempts, self.config.reconnect_max_retries
705 )))
706 }
707 ReconnectRejection::InProgress => {
708 Err(LightningError::Connection(format!(
709 "Reconnection to {} already in progress",
710 addr_key
711 )))
712 }
713 };
714 }
715 }
716
717 warn!("Connection to {} dead, attempting reconnection", addr_key);
718
719 let reconnect_result = tokio::time::timeout(
720 self.config.connect_timeout,
721 connect_and_handshake(
722 endpoint,
723 axon_info.clone(),
724 self.wallet_hotkey.clone(),
725 signer.clone(),
726 self.config.max_frame_payload_bytes,
727 ),
728 )
729 .await;
730
731 let reconnect_result = match reconnect_result {
732 Ok(r) => r,
733 Err(_) => Err(LightningError::Connection(format!(
734 "Reconnection to {} timed out",
735 addr_key
736 ))),
737 };
738
739 match reconnect_result {
740 Ok(connection) => {
741 let co_located: Vec<String> = {
742 let state = self.state.read().await;
743 state
744 .registry
745 .hotkeys_at_addr(addr_key)
746 .into_iter()
747 .filter(|hk| *hk != axon_info.hotkey)
748 .collect()
749 };
750 let mut failed_hotkeys = Vec::new();
751 for hk in &co_located {
752 match tokio::time::timeout(
753 self.config.connect_timeout,
754 authenticate_handshake(
755 &connection,
756 hk,
757 &self.wallet_hotkey,
758 &signer,
759 self.config.max_frame_payload_bytes,
760 ),
761 )
762 .await
763 {
764 Ok(Ok(())) => {
765 info!(
766 "Re-authenticated co-located miner {} on reconnected {}",
767 hk, addr_key
768 );
769 }
770 Ok(Err(e)) => {
771 warn!(
772 "Re-authentication failed for co-located miner {} at {}: {}",
773 hk, addr_key, e
774 );
775 failed_hotkeys.push(hk.clone());
776 }
777 Err(_) => {
778 warn!(
779 "Re-authentication timed out for co-located miner {} at {}",
780 hk, addr_key
781 );
782 failed_hotkeys.push(hk.clone());
783 }
784 }
785 }
786
787 let mut state = self.state.write().await;
788 for hk in &failed_hotkeys {
789 state.registry.deregister(hk);
790 }
791 state.registry.register(axon_info.clone());
792 state
793 .registry
794 .set_connection(addr_key.clone(), connection.clone());
795 state.registry.remove_reconnect_state(addr_key);
796 info!("Reconnected to {}", addr_key);
797 Ok(connection)
798 }
799 Err(e) => {
800 let mut state = self.state.write().await;
801 let rs = state.registry.reconnect_state_or_insert(addr_key.clone());
802 rs.in_progress = false;
803 let shift = rs.attempts.min(20);
804 rs.attempts += 1;
805 let in_slow_probe = rs.attempts >= self.config.reconnect_max_retries;
806 if in_slow_probe {
807 if let Some(probe_interval) = self.config.reconnect_slow_probe_interval {
808 rs.next_retry_at = Instant::now() + probe_interval;
809 warn!(
810 "Slow probe to {} failed, next probe in {:?}: {}",
811 addr_key, probe_interval, e
812 );
813 }
814 } else {
815 let backoff = self
816 .config
817 .reconnect_initial_backoff
818 .checked_mul(2u32.pow(shift))
819 .map(|d| d.min(self.config.reconnect_max_backoff))
820 .unwrap_or(self.config.reconnect_max_backoff);
821 rs.next_retry_at = Instant::now() + backoff;
822 error!(
823 "Reconnection to {} failed (attempt {}/{}), next retry in {:?}: {}",
824 addr_key, rs.attempts, self.config.reconnect_max_retries, backoff, e
825 );
826 }
827 Err(e)
828 }
829 }
830 }
831
832 #[instrument(skip(self, miners), fields(miner_count = miners.len()))]
835 pub async fn update_miner_registry(&self, miners: Vec<QuicAxonInfo>) -> Result<()> {
836 let endpoint = self
837 .endpoint
838 .as_ref()
839 .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
840 .clone();
841 let signer = self
842 .signer
843 .as_ref()
844 .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
845 .clone();
846 update_miner_registry_inner(
847 &self.state,
848 &endpoint,
849 &self.wallet_hotkey,
850 &signer,
851 &self.config,
852 miners,
853 )
854 .await
855 }
856
857 #[instrument(skip(self))]
859 pub async fn get_connection_stats(&self) -> Result<HashMap<String, String>> {
860 let state = self.state.read().await;
861
862 let mut stats = HashMap::new();
863 stats.insert(
864 "total_connections".to_string(),
865 state.registry.connection_count().to_string(),
866 );
867 stats.insert(
868 "active_miners".to_string(),
869 state.registry.active_miner_count().to_string(),
870 );
871
872 for addr_key in state.registry.connection_addrs() {
873 let status = match state.registry.get_connection(addr_key) {
874 Some(conn) => {
875 if let Some(reason) = conn.close_reason() {
876 format!("closed({:?})", reason)
877 } else {
878 "active".to_string()
879 }
880 }
881 None => "missing".to_string(),
882 };
883 stats.insert(format!("connection_{}", addr_key), status);
884 }
885
886 Ok(stats)
887 }
888
889 #[cfg(feature = "subtensor")]
892 pub async fn start_metagraph_monitor(
893 &self,
894 monitor_config: MetagraphMonitorConfig,
895 ) -> Result<()> {
896 if monitor_config.sync_interval.is_zero() {
897 return Err(LightningError::Config(
898 "sync_interval must be non-zero".into(),
899 ));
900 }
901
902 self.stop_metagraph_monitor().await;
903
904 let endpoint = self
905 .endpoint
906 .as_ref()
907 .ok_or_else(|| LightningError::Connection("QUIC endpoint not initialized".into()))?
908 .clone();
909 let signer = self
910 .signer
911 .as_ref()
912 .ok_or_else(|| LightningError::Signing("No signer configured".into()))?
913 .clone();
914
915 let subtensor = tokio::time::timeout(
916 Duration::from_secs(30),
917 OnlineClient::<PolkadotConfig>::from_url(&monitor_config.subtensor_endpoint),
918 )
919 .await
920 .map_err(|_| LightningError::Handler("subtensor connection timed out after 30s".into()))?
921 .map_err(|e| LightningError::Handler(format!("connecting to subtensor: {}", e)))?;
922
923 let mut metagraph = Metagraph::new(monitor_config.netuid);
924 tokio::time::timeout(Duration::from_secs(60), metagraph.sync(&subtensor))
925 .await
926 .map_err(|_| {
927 LightningError::Handler("initial metagraph sync timed out after 60s".into())
928 })??;
929
930 let miners = metagraph.quic_miners();
931 info!(
932 netuid = monitor_config.netuid,
933 miners = miners.len(),
934 "initial metagraph sync complete"
935 );
936
937 update_miner_registry_inner(
938 &self.state,
939 &endpoint,
940 &self.wallet_hotkey,
941 &signer,
942 &self.config,
943 miners,
944 )
945 .await?;
946
947 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
948 let state = self.state.clone();
949 let wallet_hotkey = self.wallet_hotkey.clone();
950 let config = self.config.clone();
951 let sync_interval = monitor_config.sync_interval;
952 let subtensor_url = monitor_config.subtensor_endpoint.clone();
953
954 let handle = tokio::spawn(async move {
955 let mut interval = tokio::time::interval(sync_interval);
956 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
957 interval.tick().await;
958 let mut subtensor = subtensor;
959
960 loop {
961 tokio::select! {
962 _ = interval.tick() => {}
963 _ = shutdown_rx.changed() => {
964 info!("metagraph monitor shutting down");
965 return;
966 }
967 }
968
969 let sync_result =
970 tokio::time::timeout(Duration::from_secs(60), metagraph.sync(&subtensor)).await;
971
972 let needs_reconnect = match sync_result {
973 Ok(Ok(())) => {
974 let miners = metagraph.quic_miners();
975 info!(
976 netuid = metagraph.netuid,
977 miners = miners.len(),
978 block = metagraph.block,
979 "metagraph resync complete"
980 );
981 if let Err(e) = update_miner_registry_inner(
982 &state,
983 &endpoint,
984 &wallet_hotkey,
985 &signer,
986 &config,
987 miners,
988 )
989 .await
990 {
991 error!("registry update after metagraph sync failed: {}", e);
992 }
993 false
994 }
995 Ok(Err(e)) => {
996 error!("metagraph sync failed, reconnecting to subtensor: {}", e);
997 true
998 }
999 Err(_) => {
1000 error!("metagraph sync timed out after 60s, reconnecting to subtensor");
1001 true
1002 }
1003 };
1004
1005 if needs_reconnect {
1006 match tokio::time::timeout(
1007 Duration::from_secs(30),
1008 OnlineClient::<PolkadotConfig>::from_url(&subtensor_url),
1009 )
1010 .await
1011 {
1012 Ok(Ok(new_client)) => {
1013 subtensor = new_client;
1014 info!("subtensor client reconnected");
1015 }
1016 Ok(Err(e)) => {
1017 error!("subtensor reconnection failed: {}", e);
1018 }
1019 Err(_) => {
1020 error!("subtensor reconnection timed out after 30s");
1021 }
1022 }
1023 }
1024 }
1025 });
1026
1027 let mut st = self.state.write().await;
1028 st.metagraph_shutdown = Some(shutdown_tx);
1029 st.metagraph_handle = Some(handle);
1030 Ok(())
1031 }
1032
1033 #[cfg(feature = "subtensor")]
1035 pub async fn stop_metagraph_monitor(&self) {
1036 let (shutdown_tx, handle) = {
1037 let mut st = self.state.write().await;
1038 (st.metagraph_shutdown.take(), st.metagraph_handle.take())
1039 };
1040 if let Some(tx) = shutdown_tx {
1041 let _ = tx.send(true);
1042 }
1043 if let Some(mut handle) = handle {
1044 if tokio::time::timeout(Duration::from_secs(5), &mut handle)
1045 .await
1046 .is_err()
1047 {
1048 warn!("metagraph monitor did not shut down within 5s, aborting");
1049 handle.abort();
1050 let _ = handle.await;
1051 }
1052 }
1053 }
1054
1055 #[instrument(skip(self))]
1057 pub async fn close_all_connections(&self) -> Result<()> {
1058 #[cfg(feature = "subtensor")]
1059 self.stop_metagraph_monitor().await;
1060
1061 let mut state = self.state.write().await;
1062
1063 for (_, connection) in state.registry.drain_connections() {
1064 connection.close(0u32.into(), b"client_shutdown");
1065 }
1066
1067 state.registry.clear();
1068
1069 info!("All Lightning QUIC connections closed");
1070 Ok(())
1071 }
1072}
1073
1074async fn update_miner_registry_inner(
1075 state: &Arc<RwLock<ClientState>>,
1076 endpoint: &Endpoint,
1077 wallet_hotkey: &str,
1078 signer: &Arc<dyn Signer>,
1079 config: &LightningClientConfig,
1080 miners: Vec<QuicAxonInfo>,
1081) -> Result<()> {
1082 let new_by_hotkey: HashMap<String, QuicAxonInfo> = miners
1083 .iter()
1084 .map(|m| (m.hotkey.clone(), m.clone()))
1085 .collect();
1086
1087 let new_hotkeys_needing_auth: Vec<QuicAxonInfo>;
1088 let new_addrs_needing_connect: HashMap<PeerAddr, Vec<QuicAxonInfo>>;
1089 {
1090 let mut st = state.write().await;
1091
1092 let active_hotkeys = st.registry.active_hotkeys();
1093 for hotkey in active_hotkeys {
1094 if !new_by_hotkey.contains_key(&hotkey) {
1095 if let Some(miner) = st.registry.deregister(&hotkey) {
1096 let addr_key = miner.addr_key();
1097 info!("Miner {} deregistered from {}", hotkey, addr_key);
1098 if !st.registry.addr_has_hotkeys(&addr_key) {
1099 if let Some(connection) = st.registry.remove_connection(&addr_key) {
1100 connection.close(0u32.into(), b"miner_deregistered");
1101 }
1102 st.registry.remove_reconnect_state(&addr_key);
1103 }
1104 }
1105 }
1106 }
1107
1108 let active_addrs = st.registry.active_addrs();
1109 for addr_key in &active_addrs {
1110 if st.registry.remove_reconnect_state(addr_key) {
1111 info!(
1112 "Registry refresh reset reconnection backoff for {}",
1113 addr_key
1114 );
1115 }
1116 }
1117
1118 let dead_addrs: Vec<PeerAddr> = active_addrs
1119 .iter()
1120 .filter(|addr| {
1121 st.registry
1122 .get_connection(addr)
1123 .is_some_and(|c| c.close_reason().is_some())
1124 })
1125 .cloned()
1126 .collect();
1127 for addr_key in &dead_addrs {
1128 if let Some(conn) = st.registry.remove_connection(addr_key) {
1129 let hotkeys = st.registry.hotkeys_at_addr(addr_key);
1130 info!(
1131 addr = %addr_key,
1132 close_reason = ?conn.close_reason(),
1133 hotkeys = ?hotkeys,
1134 "Pruning dead connection and deregistering miners"
1135 );
1136 for hk in &hotkeys {
1137 st.registry.deregister(hk);
1138 }
1139 }
1140 }
1141
1142 for new_miner in new_by_hotkey.values() {
1143 if let Some(old_miner) = st.registry.active_miner(&new_miner.hotkey) {
1144 let old_addr = old_miner.addr_key();
1145 let new_addr = new_miner.addr_key();
1146 if old_addr != new_addr {
1147 info!(
1148 "Miner {} changed address from {} to {}",
1149 new_miner.hotkey, old_addr, new_addr
1150 );
1151 st.registry.deregister(&new_miner.hotkey);
1152 if !st.registry.addr_has_hotkeys(&old_addr) {
1153 if let Some(conn) = st.registry.remove_connection(&old_addr) {
1154 conn.close(0u32.into(), b"miner_addr_changed");
1155 }
1156 st.registry.remove_reconnect_state(&old_addr);
1157 }
1158 }
1159 }
1160 }
1161
1162 let new_hotkeys: Vec<QuicAxonInfo> = new_by_hotkey
1163 .values()
1164 .filter(|m| !st.registry.contains_active_miner(&m.hotkey))
1165 .cloned()
1166 .collect();
1167
1168 let mut need_auth = Vec::new();
1169 let mut need_connect: HashMap<PeerAddr, Vec<QuicAxonInfo>> = HashMap::new();
1170 for miner in new_hotkeys {
1171 let addr_key = miner.addr_key();
1172 if st.registry.contains_connection(&addr_key) {
1173 need_auth.push(miner);
1174 } else {
1175 need_connect.entry(addr_key).or_default().push(miner);
1176 }
1177 }
1178
1179 let active_count = st.registry.connection_count();
1180 let remaining_capacity = config.max_connections.saturating_sub(active_count);
1181 if need_connect.len() > remaining_capacity {
1182 warn!(
1183 "Connection limit ({}) reached with {} active, skipping {} of {} new addresses",
1184 config.max_connections,
1185 active_count,
1186 need_connect.len() - remaining_capacity,
1187 need_connect.len()
1188 );
1189 }
1190
1191 new_hotkeys_needing_auth = need_auth;
1192 new_addrs_needing_connect = need_connect.into_iter().take(remaining_capacity).collect();
1193 }
1194
1195 let timeout = config.connect_timeout;
1196 let max_fp = config.max_frame_payload_bytes;
1197
1198 if !new_hotkeys_needing_auth.is_empty() {
1199 let miners_with_conns: Vec<(QuicAxonInfo, Connection)> = {
1200 let st = state.read().await;
1201 new_hotkeys_needing_auth
1202 .into_iter()
1203 .filter_map(|miner| {
1204 let addr_key = miner.addr_key();
1205 st.registry
1206 .get_connection(&addr_key)
1207 .map(|conn| (miner, conn))
1208 })
1209 .collect()
1210 };
1211
1212 let mut authenticated = Vec::new();
1213 for (miner, conn) in &miners_with_conns {
1214 let addr_key = miner.addr_key();
1215 match tokio::time::timeout(
1216 timeout,
1217 authenticate_handshake(conn, &miner.hotkey, wallet_hotkey, signer, max_fp),
1218 )
1219 .await
1220 {
1221 Ok(Ok(())) => {
1222 info!(
1223 "Authenticated new miner {} on existing connection to {}",
1224 miner.hotkey, addr_key
1225 );
1226 authenticated.push(miner.clone());
1227 }
1228 Ok(Err(e)) => {
1229 warn!(
1230 "Handshake failed for new hotkey {} at {}: {}",
1231 miner.hotkey, addr_key, e
1232 );
1233 }
1234 Err(_) => {
1235 warn!(
1236 "Handshake timed out for new hotkey {} at {}",
1237 miner.hotkey, addr_key
1238 );
1239 }
1240 }
1241 }
1242
1243 let mut st = state.write().await;
1244 for miner in authenticated {
1245 st.registry.register(miner);
1246 }
1247 }
1248
1249 if !new_addrs_needing_connect.is_empty() {
1250 let mut set = tokio::task::JoinSet::new();
1251 for (addr_key, miners_at_addr) in new_addrs_needing_connect {
1252 info!(
1253 "New address detected, establishing QUIC connection: {}",
1254 addr_key
1255 );
1256 let ep = endpoint.clone();
1257 let wh = wallet_hotkey.to_string();
1258 let s = signer.clone();
1259 set.spawn(connect_and_authenticate_per_address(
1260 ep,
1261 wh,
1262 s,
1263 addr_key,
1264 miners_at_addr,
1265 timeout,
1266 max_fp,
1267 ));
1268 }
1269
1270 let mut results = Vec::new();
1271 while let Some(join_result) = set.join_next().await {
1272 match join_result {
1273 Ok((addr_key, conn_result, authenticated)) => {
1274 results.push((addr_key, conn_result, authenticated));
1275 }
1276 Err(e) => {
1277 error!("Connection task panicked: {}", e);
1278 }
1279 }
1280 }
1281
1282 let mut st = state.write().await;
1283 for (addr_key, conn_result, authenticated) in results {
1284 match conn_result {
1285 Ok(connection) => {
1286 if authenticated.is_empty() {
1287 warn!(
1288 "No hotkeys authenticated at {}, dropping connection",
1289 addr_key
1290 );
1291 connection.close(0u32.into(), b"no_authenticated_hotkeys");
1292 } else {
1293 for miner in authenticated {
1294 st.registry.register(miner);
1295 }
1296 st.registry.set_connection(addr_key, connection);
1297 }
1298 }
1299 Err(e) => {
1300 error!("Failed to connect to {}: {}", addr_key, e);
1301 }
1302 }
1303 }
1304 }
1305
1306 Ok(())
1307}
1308
1309fn get_peer_cert_fingerprint(connection: &Connection) -> Option<[u8; 32]> {
1310 let identity = connection.peer_identity()?;
1311 let certs = identity.downcast::<Vec<CertificateDer<'static>>>().ok()?;
1312 let first = certs.first()?;
1313 Some(blake2_256(first.as_ref()))
1314}
1315
1316async fn quic_connect(
1317 endpoint: &Endpoint,
1318 addr_key: &PeerAddr,
1319 server_name: &str,
1320) -> Result<Connection> {
1321 let addr: SocketAddr = addr_key
1322 .as_ref()
1323 .parse()
1324 .map_err(|e| LightningError::Connection(format!("Invalid address: {}", e)))?;
1325
1326 endpoint
1327 .connect(addr, server_name)
1328 .map_err(|e| LightningError::Connection(format!("Connection failed: {}", e)))?
1329 .await
1330 .map_err(|e| LightningError::Connection(format!("Connection handshake failed: {}", e)))
1331}
1332
1333async fn connect_and_authenticate_per_address(
1334 endpoint: Endpoint,
1335 wallet_hotkey: String,
1336 signer: Arc<dyn Signer>,
1337 addr_key: PeerAddr,
1338 miners_at_addr: Vec<QuicAxonInfo>,
1339 timeout: Duration,
1340 max_frame_payload: usize,
1341) -> (PeerAddr, Result<Connection>, Vec<QuicAxonInfo>) {
1342 let first = match miners_at_addr.first() {
1343 Some(m) => m,
1344 None => {
1345 return (
1346 addr_key,
1347 Err(LightningError::Connection("no miners for address".into())),
1348 vec![],
1349 );
1350 }
1351 };
1352
1353 let conn = match tokio::time::timeout(timeout, quic_connect(&endpoint, &addr_key, &first.ip))
1354 .await
1355 {
1356 Ok(Ok(c)) => c,
1357 Ok(Err(e)) => return (addr_key, Err(e), vec![]),
1358 Err(_) => {
1359 let err = LightningError::Connection(format!("Connection to {} timed out", addr_key));
1360 return (addr_key, Err(err), vec![]);
1361 }
1362 };
1363
1364 let mut authenticated = Vec::new();
1365 for miner in &miners_at_addr {
1366 match tokio::time::timeout(
1367 timeout,
1368 authenticate_handshake(
1369 &conn,
1370 &miner.hotkey,
1371 &wallet_hotkey,
1372 &signer,
1373 max_frame_payload,
1374 ),
1375 )
1376 .await
1377 {
1378 Ok(Ok(())) => authenticated.push(miner.clone()),
1379 Ok(Err(e)) => {
1380 warn!(
1381 "Handshake failed for hotkey {} at {}: {}",
1382 miner.hotkey, addr_key, e
1383 );
1384 }
1385 Err(_) => {
1386 warn!(
1387 "Handshake timed out for hotkey {} at {}",
1388 miner.hotkey, addr_key
1389 );
1390 }
1391 }
1392 }
1393
1394 (addr_key, Ok(conn), authenticated)
1395}
1396
1397async fn authenticate_handshake(
1398 connection: &Connection,
1399 expected_hotkey: &str,
1400 wallet_hotkey: &str,
1401 signer: &Arc<dyn Signer>,
1402 max_frame_payload: usize,
1403) -> Result<()> {
1404 let peer_cert_fp = get_peer_cert_fingerprint(connection).ok_or_else(|| {
1405 LightningError::Handshake("peer certificate not available for fingerprinting".to_string())
1406 })?;
1407 let peer_cert_fp_b64 = BASE64_STANDARD.encode(peer_cert_fp);
1408
1409 let nonce = generate_nonce();
1410 let timestamp = unix_timestamp_secs();
1411 let message = handshake_request_message(wallet_hotkey, timestamp, &nonce, &peer_cert_fp_b64);
1412 let msg_bytes = message.into_bytes();
1413 let signer_clone = signer.clone();
1414 let signature_bytes = tokio::task::spawn_blocking(move || signer_clone.sign(&msg_bytes))
1415 .await
1416 .map_err(|e| LightningError::Signing(format!("signer task failed: {}", e)))??;
1417
1418 let handshake_request = HandshakeRequest {
1419 validator_hotkey: wallet_hotkey.to_string(),
1420 timestamp,
1421 nonce: nonce.clone(),
1422 signature: BASE64_STANDARD.encode(&signature_bytes),
1423 };
1424
1425 let response = send_handshake(connection, handshake_request, max_frame_payload).await?;
1426 if !response.accepted {
1427 return Err(LightningError::Handshake(
1428 "Handshake rejected by miner".into(),
1429 ));
1430 }
1431
1432 if response.miner_hotkey != expected_hotkey {
1433 return Err(LightningError::Handshake(format!(
1434 "Miner hotkey mismatch: expected {}, got {}",
1435 expected_hotkey, response.miner_hotkey
1436 )));
1437 }
1438
1439 match response.cert_fingerprint {
1440 Some(ref resp_fp) if *resp_fp == peer_cert_fp_b64 => {}
1441 Some(_) => {
1442 return Err(LightningError::Handshake(
1443 "Cert fingerprint mismatch between TLS session and handshake response".to_string(),
1444 ));
1445 }
1446 None => {
1447 return Err(LightningError::Handshake(
1448 "Miner handshake response omitted required cert fingerprint".to_string(),
1449 ));
1450 }
1451 }
1452
1453 verify_miner_response_signature(&response, wallet_hotkey, &nonce, &peer_cert_fp_b64).await?;
1454
1455 info!("Handshake successful with miner {}", expected_hotkey);
1456 Ok(())
1457}
1458
1459async fn connect_and_handshake(
1460 endpoint: Endpoint,
1461 miner: QuicAxonInfo,
1462 wallet_hotkey: String,
1463 signer: Arc<dyn Signer>,
1464 max_frame_payload: usize,
1465) -> Result<Connection> {
1466 let addr_key = miner.addr_key();
1467 let connection = quic_connect(&endpoint, &addr_key, &miner.ip).await?;
1468 authenticate_handshake(
1469 &connection,
1470 &miner.hotkey,
1471 &wallet_hotkey,
1472 &signer,
1473 max_frame_payload,
1474 )
1475 .await?;
1476 Ok(connection)
1477}
1478
1479async fn verify_miner_response_signature(
1480 response: &HandshakeResponse,
1481 validator_hotkey: &str,
1482 nonce: &str,
1483 cert_fp_b64: &str,
1484) -> Result<()> {
1485 if response.signature.is_empty() {
1486 return Err(LightningError::Handshake(
1487 "Miner returned empty signature".to_string(),
1488 ));
1489 }
1490
1491 let expected_message = handshake_response_message(
1492 validator_hotkey,
1493 &response.miner_hotkey,
1494 response.timestamp,
1495 nonce,
1496 cert_fp_b64,
1497 );
1498
1499 let valid = crate::signing::verify_sr25519_signature(
1500 &response.miner_hotkey,
1501 &response.signature,
1502 &expected_message,
1503 )
1504 .await?;
1505
1506 if !valid {
1507 return Err(LightningError::Handshake(
1508 "Miner response signature verification failed".to_string(),
1509 ));
1510 }
1511
1512 Ok(())
1513}
1514
1515async fn send_handshake(
1516 connection: &Connection,
1517 request: HandshakeRequest,
1518 max_frame_payload: usize,
1519) -> Result<HandshakeResponse> {
1520 let (mut send, mut recv) = connection.open_bi().await.map_err(|e| {
1521 LightningError::Connection(format!("Failed to open bidirectional stream: {}", e))
1522 })?;
1523
1524 let request_bytes = rmp_serde::to_vec(&request).map_err(|e| {
1525 LightningError::Serialization(format!("Failed to serialize handshake: {}", e))
1526 })?;
1527
1528 write_frame_and_finish(&mut send, MessageType::HandshakeRequest, &request_bytes).await?;
1529
1530 let (msg_type, payload) = read_frame(&mut recv, max_frame_payload).await?;
1531 if msg_type != MessageType::HandshakeResponse {
1532 return Err(LightningError::Handshake(format!(
1533 "Expected HandshakeResponse, got {:?}",
1534 msg_type
1535 )));
1536 }
1537
1538 let response: HandshakeResponse = rmp_serde::from_slice(&payload).map_err(|e| {
1539 LightningError::Serialization(format!("Failed to parse handshake response: {}", e))
1540 })?;
1541
1542 Ok(response)
1543}
1544
1545async fn send_synapse_frame(send: &mut quinn::SendStream, request: QuicRequest) -> Result<()> {
1546 let synapse_packet = SynapsePacket {
1547 synapse_type: request.synapse_type,
1548 data: request.data,
1549 timestamp: unix_timestamp_secs(),
1550 };
1551
1552 let packet_bytes = rmp_serde::to_vec(&synapse_packet).map_err(|e| {
1553 LightningError::Serialization(format!("Failed to serialize synapse packet: {}", e))
1554 })?;
1555
1556 write_frame_and_finish(send, MessageType::SynapsePacket, &packet_bytes).await
1557}
1558
1559async fn send_synapse_packet(
1560 connection: &Connection,
1561 request: QuicRequest,
1562 max_frame_payload: usize,
1563) -> Result<QuicResponse> {
1564 let stable_id = connection.stable_id();
1565 debug!(stable_id, "send_synapse_packet: opening bi stream");
1566 let (mut send, mut recv) = connection
1567 .open_bi()
1568 .await
1569 .map_err(|e| LightningError::Connection(format!("Failed to open stream: {}", e)))?;
1570 debug!(stable_id, "send_synapse_packet: bi stream opened");
1571
1572 let start = Instant::now();
1573
1574 send_synapse_frame(&mut send, request).await?;
1575 debug!(
1576 stable_id,
1577 "send_synapse_packet: frame sent, awaiting response"
1578 );
1579
1580 let (msg_type, payload) = read_frame(&mut recv, max_frame_payload).await?;
1581 debug!(stable_id, msg_type = ?msg_type, elapsed_ms = start.elapsed().as_millis() as u64, "send_synapse_packet: response received");
1582
1583 match msg_type {
1584 MessageType::SynapseResponse => {
1585 let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
1586 let synapse_response: SynapseResponse =
1587 rmp_serde::from_slice(&payload).map_err(|e| {
1588 LightningError::Serialization(format!(
1589 "Failed to parse synapse response: {}",
1590 e
1591 ))
1592 })?;
1593
1594 Ok(QuicResponse {
1595 success: synapse_response.success,
1596 data: synapse_response.data,
1597 latency_ms,
1598 error: synapse_response.error,
1599 })
1600 }
1601 MessageType::StreamChunk => Err(LightningError::Transport(
1602 "received StreamChunk on non-streaming query; use query_axon_stream for streaming synapses".to_string(),
1603 )),
1604 other => Err(LightningError::Transport(format!(
1605 "unexpected response type: {:?}",
1606 other
1607 ))),
1608 }
1609}
1610
1611async fn open_streaming_synapse(
1612 connection: &Connection,
1613 request: QuicRequest,
1614 max_frame_payload: usize,
1615 max_stream_payload: usize,
1616 chunk_timeout: Option<Duration>,
1617) -> Result<StreamingResponse> {
1618 let (mut send, recv) = connection
1619 .open_bi()
1620 .await
1621 .map_err(|e| LightningError::Connection(format!("Failed to open stream: {}", e)))?;
1622
1623 send_synapse_frame(&mut send, request).await?;
1624
1625 Ok(StreamingResponse {
1626 recv,
1627 max_payload: max_frame_payload,
1628 max_stream_payload,
1629 chunk_timeout,
1630 })
1631}
1632
1633fn generate_nonce() -> String {
1634 use rand::Rng;
1635 let bytes: [u8; 16] = rand::thread_rng().gen();
1636 format!("{:032x}", u128::from_be_bytes(bytes))
1637}
1638
1639#[cfg(test)]
1640mod tests {
1641 use super::*;
1642 use sp_core::{crypto::Ss58Codec, sr25519, Pair};
1643
1644 const MINER_SEED: [u8; 32] = [1u8; 32];
1645 const VALIDATOR_SEED: [u8; 32] = [2u8; 32];
1646
1647 fn make_signed_response(
1648 miner_seed: [u8; 32],
1649 validator_hotkey: &str,
1650 nonce: &str,
1651 cert_fp_b64: &str,
1652 ) -> HandshakeResponse {
1653 let pair = sr25519::Pair::from_seed(&miner_seed);
1654 let miner_hotkey = pair.public().to_ss58check();
1655 let timestamp = unix_timestamp_secs();
1656 let message = handshake_response_message(
1657 validator_hotkey,
1658 &miner_hotkey,
1659 timestamp,
1660 nonce,
1661 cert_fp_b64,
1662 );
1663 let signature = pair.sign(message.as_bytes());
1664 HandshakeResponse {
1665 miner_hotkey,
1666 timestamp,
1667 signature: BASE64_STANDARD.encode(signature.0),
1668 accepted: true,
1669 connection_id: "test".to_string(),
1670 cert_fingerprint: Some(cert_fp_b64.to_string()),
1671 }
1672 }
1673
1674 fn validator_hotkey() -> String {
1675 sr25519::Pair::from_seed(&VALIDATOR_SEED)
1676 .public()
1677 .to_ss58check()
1678 }
1679
1680 #[tokio::test]
1681 async fn verify_valid_miner_signature() {
1682 let nonce = "test-nonce";
1683 let fp = "dGVzdC1mcA==";
1684 let resp = make_signed_response(MINER_SEED, &validator_hotkey(), nonce, fp);
1685 assert!(
1686 verify_miner_response_signature(&resp, &validator_hotkey(), nonce, fp)
1687 .await
1688 .is_ok()
1689 );
1690 }
1691
1692 #[tokio::test]
1693 async fn verify_rejects_empty_signature() {
1694 let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1695 resp.signature = String::new();
1696 let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1697 .await
1698 .unwrap_err();
1699 assert!(err.to_string().contains("empty signature"));
1700 }
1701
1702 #[tokio::test]
1703 async fn verify_rejects_invalid_base64() {
1704 let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1705 resp.signature = "not-valid-base64!!!".to_string();
1706 let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1707 .await
1708 .unwrap_err();
1709 assert!(err.to_string().contains("Failed to decode signature"));
1710 }
1711
1712 #[tokio::test]
1713 async fn verify_rejects_wrong_signature_length() {
1714 let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1715 resp.signature = BASE64_STANDARD.encode([0u8; 32]);
1716 let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1717 .await
1718 .unwrap_err();
1719 assert!(err.to_string().contains("Invalid signature length"));
1720 }
1721
1722 #[tokio::test]
1723 async fn verify_rejects_bad_ss58_address() {
1724 let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), "n", "fp");
1725 resp.miner_hotkey = "not_a_valid_ss58".to_string();
1726 let err = verify_miner_response_signature(&resp, &validator_hotkey(), "n", "fp")
1727 .await
1728 .unwrap_err();
1729 assert!(err.to_string().contains("Invalid SS58 address"));
1730 }
1731
1732 #[tokio::test]
1733 async fn verify_rejects_wrong_signer() {
1734 let nonce = "n";
1735 let fp = "fp";
1736 let mut resp = make_signed_response(MINER_SEED, &validator_hotkey(), nonce, fp);
1737 let wrong_pair = sr25519::Pair::from_seed(&[99u8; 32]);
1738 resp.miner_hotkey = wrong_pair.public().to_ss58check();
1739 let err = verify_miner_response_signature(&resp, &validator_hotkey(), nonce, fp)
1740 .await
1741 .unwrap_err();
1742 assert!(err.to_string().contains("signature verification failed"));
1743 }
1744
1745 #[tokio::test]
1746 async fn verify_rejects_tampered_nonce() {
1747 let fp = "fp";
1748 let resp = make_signed_response(MINER_SEED, &validator_hotkey(), "original-nonce", fp);
1749 let err = verify_miner_response_signature(&resp, &validator_hotkey(), "tampered-nonce", fp)
1750 .await
1751 .unwrap_err();
1752 assert!(err.to_string().contains("signature verification failed"));
1753 }
1754
1755 #[test]
1756 fn with_config_rejects_frame_payload_below_minimum() {
1757 let cfg = LightningClientConfig {
1758 max_frame_payload_bytes: 512,
1759 ..LightningClientConfig::default()
1760 };
1761 assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1762 }
1763
1764 #[test]
1765 fn with_config_rejects_frame_payload_above_u32_max() {
1766 let too_big: u128 = u32::MAX as u128 + 1;
1767 let val = match usize::try_from(too_big) {
1768 Ok(v) => v,
1769 Err(_) => return,
1770 };
1771 let cfg = LightningClientConfig {
1772 max_frame_payload_bytes: val,
1773 max_stream_payload_bytes: val,
1774 ..LightningClientConfig::default()
1775 };
1776 assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1777 }
1778
1779 #[test]
1780 fn with_config_rejects_stream_below_frame() {
1781 let base = LightningClientConfig::default();
1782 let cfg = LightningClientConfig {
1783 max_stream_payload_bytes: base.max_frame_payload_bytes - 1,
1784 ..base
1785 };
1786 assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1787 }
1788
1789 #[test]
1790 fn with_config_rejects_zero_stream_chunk_timeout() {
1791 let cfg = LightningClientConfig {
1792 stream_chunk_timeout: Some(Duration::ZERO),
1793 ..LightningClientConfig::default()
1794 };
1795 assert!(LightningClient::with_config("hk".into(), cfg).is_err());
1796 }
1797
1798 #[test]
1799 fn with_config_default_succeeds() {
1800 assert!(
1801 LightningClient::with_config("hk".into(), LightningClientConfig::default()).is_ok()
1802 );
1803 }
1804}
1805
1806#[derive(Debug)]
1811struct AcceptAnyCertVerifier;
1812
1813impl ServerCertVerifier for AcceptAnyCertVerifier {
1814 fn verify_server_cert(
1815 &self,
1816 _end_entity: &CertificateDer<'_>,
1817 _intermediates: &[CertificateDer<'_>],
1818 _server_name: &ServerName<'_>,
1819 _ocsp_response: &[u8],
1820 _now: UnixTime,
1821 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
1822 Ok(ServerCertVerified::assertion())
1823 }
1824
1825 fn verify_tls12_signature(
1826 &self,
1827 _message: &[u8],
1828 _cert: &CertificateDer<'_>,
1829 _dss: &rustls::DigitallySignedStruct,
1830 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
1831 Err(rustls::Error::PeerIncompatible(
1832 rustls::PeerIncompatible::Tls12NotOffered,
1833 ))
1834 }
1835
1836 fn verify_tls13_signature(
1837 &self,
1838 _message: &[u8],
1839 _cert: &CertificateDer<'_>,
1840 _dss: &rustls::DigitallySignedStruct,
1841 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
1842 Ok(HandshakeSignatureValid::assertion())
1843 }
1844
1845 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1846 rustls::crypto::ring::default_provider()
1847 .signature_verification_algorithms
1848 .supported_schemes()
1849 }
1850}