btlightning 0.2.8

QUIC transport layer for Bittensor
Documentation
#![cfg(feature = "subtensor-tests")]

use btlightning::{
    LightningError, LightningServer, LightningServerConfig, Result, ValidatorPermitResolver,
};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use subxt::{dynamic::Value, OnlineClient, SubstrateConfig};

const TESTNET_ENDPOINT: &str = "wss://test.finney.opentensor.ai:443";
const PERMIT_POPULATION_TIMEOUT: Duration = Duration::from_secs(300);

struct SubtensorPermitResolver {
    rpc_url: String,
    netuid: u16,
}

impl SubtensorPermitResolver {
    fn new(rpc_url: String, netuid: u16) -> Self {
        Self { rpc_url, netuid }
    }

    async fn resolve_async(&self) -> Result<HashSet<String>> {
        let api = OnlineClient::<SubstrateConfig>::from_url(&self.rpc_url)
            .await
            .map_err(|e| LightningError::Handler(format!("subtensor connection: {}", e)))?;

        let at = api
            .at_current_block()
            .await
            .map_err(|e| LightningError::Handler(e.to_string()))?;
        let storage = at.storage();

        let permits_query: subxt::storage::DynamicAddress =
            subxt::dynamic::storage("SubtensorModule", "ValidatorPermit");

        let permits: Vec<bool> = match storage
            .try_fetch(permits_query, vec![Value::u128(self.netuid as u128)])
            .await
            .map_err(|e| LightningError::Handler(e.to_string()))?
        {
            Some(val) => val
                .decode_as()
                .map_err(|e| LightningError::Handler(format!("decode ValidatorPermit: {}", e)))?,
            None => return Ok(HashSet::new()),
        };

        let mut validators = HashSet::new();
        for (uid, has_permit) in permits.iter().enumerate() {
            if !*has_permit {
                continue;
            }

            let keys_query: subxt::storage::DynamicAddress =
                subxt::dynamic::storage("SubtensorModule", "Keys");

            if let Some(val) = storage
                .try_fetch(
                    keys_query,
                    vec![Value::u128(self.netuid as u128), Value::u128(uid as u128)],
                )
                .await
                .map_err(|e| LightningError::Handler(e.to_string()))?
            {
                let account: subxt::utils::AccountId32 = val
                    .decode_as()
                    .map_err(|e| LightningError::Handler(format!("decode Keys: {}", e)))?;
                validators.insert(account.to_string());
            }
        }

        Ok(validators)
    }
}

impl ValidatorPermitResolver for SubtensorPermitResolver {
    fn resolve_permitted_validators(&self) -> Result<HashSet<String>> {
        let handle = tokio::runtime::Handle::current();
        handle.block_on(self.resolve_async())
    }
}

#[tokio::test]
#[ignore]
async fn subtensor_resolver_fetches_permits() {
    let resolver = SubtensorPermitResolver::new(TESTNET_ENDPOINT.to_string(), 1);
    let validators = resolver.resolve_async().await.unwrap();
    assert!(
        !validators.is_empty(),
        "netuid 1 should have validators with permits"
    );
    for hotkey in &validators {
        assert!(
            hotkey.starts_with('5'),
            "SS58 addresses should start with '5', got: {}",
            hotkey
        );
        assert!(hotkey.len() >= 47, "SS58 address too short: {}", hotkey);
    }
}

#[tokio::test]
#[ignore]
async fn subtensor_resolver_integrates_with_server() {
    let resolver = SubtensorPermitResolver::new(TESTNET_ENDPOINT.to_string(), 1);

    let mut config = LightningServerConfig::default();
    config.require_validator_permit = true;
    config.validator_permit_refresh_secs = 3600;
    let mut server = LightningServer::with_config(
        "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY".into(),
        "127.0.0.1".into(),
        0,
        config,
    )
    .unwrap();
    server.set_miner_keypair([1u8; 32]);
    server.set_validator_permit_resolver(Box::new(resolver));
    server.start().await.unwrap();

    let server = Arc::new(server);
    let s = server.clone();
    let handle = tokio::spawn(async move { s.serve_forever().await });

    let srv = server.clone();
    tokio::time::timeout(PERMIT_POPULATION_TIMEOUT, async {
        loop {
            if srv.get_permitted_validator_count().await > 0 {
                break;
            }
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    })
    .await
    .unwrap_or_else(|_| {
        panic!(
            "subtensor resolver should populate permits within {:?}",
            PERMIT_POPULATION_TIMEOUT
        )
    });

    assert!(server.get_permitted_validator_count().await > 0);

    let _ = server.stop().await;
    let _ = handle.await;
}

#[tokio::test]
#[ignore]
async fn subtensor_resolver_handles_invalid_netuid() {
    let resolver = SubtensorPermitResolver::new(TESTNET_ENDPOINT.to_string(), 65535);
    let result = resolver.resolve_async().await;
    match result {
        Ok(set) => assert!(set.is_empty(), "invalid netuid should return empty set"),
        Err(_) => {}
    }
}

#[tokio::test]
#[ignore]
async fn subtensor_resolver_handles_connection_failure() {
    let resolver = SubtensorPermitResolver::new("wss://localhost:1".to_string(), 1);
    let result = resolver.resolve_async().await;
    assert!(result.is_err(), "unreachable endpoint should return Err");
}