btlightning 0.2.8

QUIC transport layer for Bittensor
Documentation
use super::response::rejected_handshake;
use super::ServerContext;
use crate::error::{LightningError, Result};
use crate::signing::{verify_sr25519_signature, Signer};
use crate::types::{
    handshake_request_message, handshake_response_message, HandshakeRequest, HandshakeResponse,
};
use crate::util::unix_timestamp_secs;
use base64::{prelude::BASE64_STANDARD, Engine};
use indexmap::IndexMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::{error, info, warn};

const HANDSHAKE_RATE_WINDOW_SECS: u64 = 60;
const MAX_FUTURE_CLOCK_SKEW_SECS: u64 = 60;

pub(super) async fn process_handshake(
    request: HandshakeRequest,
    connection: Arc<quinn::Connection>,
    ctx: &ServerContext,
) -> HandshakeResponse {
    let cert_fp = match *ctx.cert_fingerprint.read().await {
        Some(fp) => fp,
        None => {
            error!("Certificate fingerprint unavailable during handshake");
            return rejected_handshake(&ctx.miner_hotkey, unix_timestamp_secs());
        }
    };

    let is_valid = verify_validator_signature(
        &request,
        ctx.used_nonces.clone(),
        &cert_fp,
        ctx.config.max_signature_age_secs,
        ctx.config.max_nonce_entries,
    )
    .await;

    if !is_valid {
        error!("Handshake failed: invalid signature");
        return rejected_handshake(&ctx.miner_hotkey, unix_timestamp_secs());
    }

    if ctx.config.require_validator_permit {
        if ctx.permit_resolver.is_none() {
            error!(
                "Validator permit required but no resolver configured, rejecting {}",
                request.validator_hotkey
            );
            return rejected_handshake(&ctx.miner_hotkey, unix_timestamp_secs());
        }
        let permitted = ctx.permitted_validators.read().await;
        if !permitted.contains(&request.validator_hotkey) {
            warn!(
                "Handshake rejected: hotkey {} does not hold a validator permit",
                request.validator_hotkey
            );
            return rejected_handshake(&ctx.miner_hotkey, unix_timestamp_secs());
        }
    }

    let now = unix_timestamp_secs();
    let signature = match sign_handshake_response(
        &request,
        &ctx.miner_hotkey,
        ctx.miner_signer.clone(),
        now,
        &cert_fp,
    )
    .await
    {
        Ok(sig) => sig,
        Err(e) => {
            error!("Handshake signing failed: {}", e);
            return rejected_handshake(&ctx.miner_hotkey, now);
        }
    };

    let connection_id = format!(
        "conn_{}_{}",
        request.validator_hotkey,
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or(Duration::ZERO)
            .as_millis()
    );

    let remote_addr = connection.remote_address();
    let mut connections_guard = ctx.connections.write().await;
    let is_reconnect = connections_guard.contains_key(&request.validator_hotkey);
    if !is_reconnect && connections_guard.len() >= ctx.config.max_connections {
        error!(
            "Connection limit reached ({}/{}), rejecting validator {}",
            connections_guard.len(),
            ctx.config.max_connections,
            request.validator_hotkey
        );
        return rejected_handshake(&ctx.miner_hotkey, now);
    }
    let mut addr_index = ctx.addr_to_hotkey.write().await;
    let mut validator_conn = super::ValidatorConnection::new(
        request.validator_hotkey.clone(),
        connection_id.clone(),
        connection.clone(),
    );
    validator_conn.verify();
    if let Some(prev_conn) =
        connections_guard.insert(request.validator_hotkey.clone(), validator_conn)
    {
        if !Arc::ptr_eq(&prev_conn.connection, &connection) {
            warn!(
                validator = %request.validator_hotkey,
                prev_addr = %prev_conn.connection.remote_address(),
                new_addr = %remote_addr,
                "closing previous connection for validator (replaced by new handshake)"
            );
            prev_conn.connection.close(0u32.into(), b"replaced");
            let prev_addr = prev_conn.connection.remote_address();
            if prev_addr != remote_addr {
                addr_index.remove(&prev_addr);
            }
        }
    }
    addr_index.insert(remote_addr, request.validator_hotkey.clone());
    drop(addr_index);
    drop(connections_guard);

    info!(
        "Handshake successful, established connection: {}",
        connection_id
    );

    HandshakeResponse {
        miner_hotkey: ctx.miner_hotkey.clone(),
        timestamp: now,
        signature,
        accepted: true,
        connection_id,
        cert_fingerprint: Some(BASE64_STANDARD.encode(cert_fp)),
    }
}

pub(super) async fn verify_validator_signature(
    request: &HandshakeRequest,
    used_nonces: Arc<RwLock<IndexMap<String, u64>>>,
    cert_fingerprint: &[u8; 32],
    max_signature_age: u64,
    max_nonce_entries: usize,
) -> bool {
    let current_time = unix_timestamp_secs();

    if current_time > request.timestamp && (current_time - request.timestamp) >= max_signature_age {
        error!(
            "Signature timestamp too old: {} (current: {})",
            request.timestamp, current_time
        );
        return false;
    }

    if request.timestamp > current_time + MAX_FUTURE_CLOCK_SKEW_SECS {
        error!(
            "Signature timestamp too far in future: {} (current: {})",
            request.timestamp, current_time
        );
        return false;
    }

    if !is_valid_nonce(&request.nonce) {
        error!("Invalid nonce format");
        return false;
    }

    let fp_b64 = BASE64_STANDARD.encode(cert_fingerprint);
    let expected_message = handshake_request_message(
        &request.validator_hotkey,
        request.timestamp,
        &request.nonce,
        &fp_b64,
    );

    let sig_valid = match verify_sr25519_signature(
        &request.validator_hotkey,
        &request.signature,
        &expected_message,
    )
    .await
    {
        Ok(v) => v,
        Err(e) => {
            error!("Signature verification failed: {}", e);
            return false;
        }
    };

    if !sig_valid {
        return false;
    }

    {
        let mut nonces = used_nonces.write().await;
        if nonces.contains_key(&request.nonce) {
            error!("Nonce already used: {}", request.nonce);
            return false;
        }
        nonces.insert(request.nonce.clone(), current_time);
        if nonces.len() > max_nonce_entries {
            super::evict_stale_nonces(
                &mut nonces,
                current_time,
                max_signature_age,
                Some(max_nonce_entries),
            );
        }
    }

    true
}

pub(super) async fn sign_handshake_response(
    request: &HandshakeRequest,
    miner_hotkey: &str,
    miner_signer: Option<Arc<dyn Signer>>,
    timestamp: u64,
    cert_fingerprint: &[u8; 32],
) -> Result<String> {
    let signer = miner_signer
        .ok_or_else(|| LightningError::Signing("no miner signer configured".to_string()))?;
    let fp_b64 = BASE64_STANDARD.encode(cert_fingerprint);
    let message = handshake_response_message(
        &request.validator_hotkey,
        miner_hotkey,
        timestamp,
        &request.nonce,
        &fp_b64,
    );
    let msg_bytes = message.into_bytes();
    let sig = tokio::task::spawn_blocking(move || signer.sign(&msg_bytes))
        .await
        .map_err(|e| LightningError::Signing(format!("signer task failed: {}", e)))??;
    Ok(BASE64_STANDARD.encode(sig))
}

pub(super) async fn check_handshake_rate(ctx: &ServerContext, ip: IpAddr) -> bool {
    let now = unix_timestamp_secs();
    let cutoff = now.saturating_sub(HANDSHAKE_RATE_WINDOW_SECS);
    let mut rates = ctx.handshake_rate.write().await;
    if !rates.contains_key(&ip) && rates.len() >= ctx.config.max_tracked_rate_ips {
        let oldest_ip = rates
            .iter()
            .min_by_key(|(_, attempts)| attempts.iter().copied().max().unwrap_or(0))
            .map(|(ip, _)| *ip);
        if let Some(evict_ip) = oldest_ip {
            rates.remove(&evict_ip);
        }
    }
    let attempts = rates.entry(ip).or_default();
    attempts.retain(|ts| *ts >= cutoff);
    if attempts.len() >= ctx.config.max_handshake_attempts_per_minute as usize {
        return false;
    }
    attempts.push(now);
    true
}

fn is_valid_nonce(nonce: &str) -> bool {
    nonce.len() == 32 && nonce.bytes().all(|b| b.is_ascii_hexdigit())
}