use crate::config::BittensorConfig;
use crate::connect::{CircuitBreaker, HealthChecker, RetryConfig, RetryNode};
use crate::connect::{ConnectionManager, ConnectionPool, ConnectionPoolBuilder};
use crate::error::BittensorError;
use crate::utils::{set_weights_payload, NormalizedWeight};
use crate::AccountId;
use anyhow::Result;
use std::path::PathBuf;
use crate::api::api;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};
use subxt_signer::sr25519::Keypair;
fn home_hotkey_location(wallet_name: &str, hotkey_name: &str) -> Option<PathBuf> {
home::home_dir().map(|home| {
home.join(".bittensor")
.join("wallets")
.join(wallet_name)
.join("hotkeys")
.join(hotkey_name)
})
}
fn load_key_seed(path: &PathBuf) -> Result<String, Box<dyn std::error::Error>> {
let content = std::fs::read_to_string(path)?;
if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(secret_phrase) = json_value.get("secretPhrase").and_then(|v| v.as_str()) {
return Ok(secret_phrase.to_string());
}
if let Some(secret_seed) = json_value.get("secretSeed").and_then(|v| v.as_str()) {
return Ok(secret_seed.to_string());
}
return Err("JSON wallet file missing secretPhrase or secretSeed".into());
}
Ok(content.trim().to_string())
}
fn signer_from_seed(seed: &str) -> Result<Keypair, Box<dyn std::error::Error + Send + Sync>> {
use subxt_signer::SecretUri;
let uri: SecretUri = seed.parse()?;
let keypair = Keypair::from_uri(&uri)?;
Ok(keypair)
}
use crate::{Metagraph, SelectiveMetagraph};
pub struct Service {
config: BittensorConfig,
connection_pool: Arc<ConnectionPool>,
connection_manager: Arc<ConnectionManager>,
signer: Keypair,
retry_node: RetryNode,
circuit_breaker: Arc<tokio::sync::Mutex<CircuitBreaker>>,
health_monitor_handle: Option<tokio::task::JoinHandle<()>>,
}
impl Service {
pub async fn new(config: BittensorConfig) -> Result<Self, BittensorError> {
info!(
"Initializing enhanced Bittensor service for network: {}",
config.network
);
let pool = Arc::new(
ConnectionPoolBuilder::new(config.get_chain_endpoints())
.max_connections(config.connection_pool_size.unwrap_or(3))
.retry_config(RetryConfig::network())
.build(),
);
let retry_node = RetryNode::new().with_timeout(Duration::from_secs(120));
retry_node
.execute_with_config(
|| async {
pool.initialize()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Pool initialization failed: {}", e),
})
},
RetryConfig::network(),
)
.await?;
info!(
"Connection pool initialized with {} healthy connections",
pool.healthy_connection_count().await
);
let connection_manager = Arc::new(ConnectionManager::new(config.clone()));
let health_checker = Arc::new(
HealthChecker::new()
.with_interval(
config
.health_check_interval
.unwrap_or(Duration::from_secs(60)),
)
.with_timeout(Duration::from_secs(5))
.with_failure_threshold(3),
);
let monitor_handle = health_checker.start_monitoring(Arc::clone(&pool));
let hotkey_path = home_hotkey_location(&config.wallet_name, &config.hotkey_name)
.ok_or_else(|| BittensorError::WalletError {
message: "Failed to find home directory".to_string(),
})?;
if config.read_only {
info!(
"Loading hotkey from path: {:?} in READ-ONLY mode (wallet: {}, hotkey: {})",
hotkey_path, config.wallet_name, config.hotkey_name
);
info!("Read-only mode: wallet is used for metagraph queries only, not for signing transactions");
} else {
info!(
"Loading hotkey from path: {:?} (wallet: {}, hotkey: {})",
hotkey_path, config.wallet_name, config.hotkey_name
);
}
let seed = load_key_seed(&hotkey_path).map_err(|e| BittensorError::WalletError {
message: format!("Failed to load hotkey from {hotkey_path:?}: {e}"),
})?;
let signer = signer_from_seed(&seed).map_err(|e| BittensorError::WalletError {
message: format!("Failed to create signer from seed: {e}"),
})?;
let circuit_breaker = Arc::new(tokio::sync::Mutex::new(CircuitBreaker::new(
config.circuit_breaker_threshold.unwrap_or(5),
config
.circuit_breaker_recovery
.unwrap_or(Duration::from_secs(60)),
)));
let service = Self {
config,
connection_pool: pool,
connection_manager,
signer,
retry_node,
circuit_breaker,
health_monitor_handle: Some(monitor_handle),
};
info!("Enhanced Bittensor service initialized with connection pooling");
Ok(service)
}
pub async fn serve_axon(
&self,
netuid: u16,
axon_addr: SocketAddr,
) -> Result<(), BittensorError> {
info!(
"Serving axon for netuid {} at {} with retry logic",
netuid, axon_addr
);
let operation = || {
let (ip, ip_type): (u128, u8) = match axon_addr.ip() {
std::net::IpAddr::V4(ipv4) => (u32::from(ipv4) as u128, 4),
std::net::IpAddr::V6(ipv6) => (u128::from(ipv6), 6),
};
let port = axon_addr.port();
let protocol = 0;
let payload = api::tx().subtensor_module().serve_axon(
netuid, 0, ip, port, ip_type, protocol, 0, 0, );
let signer = &self.signer;
async move {
let client = self
.connection_pool
.get_healthy_client()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Failed to get healthy client: {}", e),
})?;
client
.tx()
.sign_and_submit_then_watch_default(&payload, signer)
.await
.map_err(|e| {
let err_msg = e.to_string();
let err_lower = err_msg.to_lowercase();
if err_lower.contains("timeout") {
BittensorError::TxTimeoutError {
message: format!("serve_axon transaction timeout: {err_msg}"),
timeout: Duration::from_secs(60),
}
} else if err_lower.contains("fee") || err_lower.contains("balance") {
BittensorError::InsufficientTxFees {
required: 0,
available: 0,
}
} else if err_lower.contains("nonce") {
BittensorError::InvalidNonce {
expected: 0,
actual: 0,
}
} else {
BittensorError::TxSubmissionError {
message: format!("Failed to submit serve_axon: {err_msg}"),
}
}
})?;
Ok(())
}
};
self.retry_node.execute(operation).await?;
info!("Axon served successfully");
Ok(())
}
pub async fn set_weights(
&self,
netuid: u16,
weights: Vec<(u16, u16)>,
) -> Result<(), BittensorError> {
info!(
"Setting weights for netuid {} with {} weights using retry logic",
netuid,
weights.len()
);
if weights.is_empty() {
return Err(BittensorError::InvalidWeights {
reason: "Weight vector cannot be empty".to_string(),
});
}
let mut seen_uids = std::collections::HashSet::new();
for (uid, _) in &weights {
if !seen_uids.insert(*uid) {
return Err(BittensorError::InvalidWeights {
reason: format!("Duplicate UID found: {uid}"),
});
}
}
let operation = || {
let normalized_weights: Vec<NormalizedWeight> = weights
.iter()
.map(|(uid, weight)| NormalizedWeight {
uid: *uid,
weight: *weight,
})
.collect();
let payload = set_weights_payload(netuid, normalized_weights, 0);
let signer = &self.signer;
async move {
let client = self
.connection_pool
.get_healthy_client()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Failed to get healthy client: {}", e),
})?;
client
.tx()
.sign_and_submit_then_watch_default(&payload, signer)
.await
.map_err(|e| {
let err_msg = e.to_string();
let err_lower = err_msg.to_lowercase();
if err_lower.contains("timeout") {
BittensorError::TxTimeoutError {
message: format!("set_weights transaction timeout: {err_msg}"),
timeout: Duration::from_secs(120),
}
} else if err_lower.contains("weight") || err_lower.contains("invalid") {
BittensorError::WeightSettingFailed {
netuid,
reason: format!("Weight validation failed: {err_msg}"),
}
} else if err_lower.contains("fee") || err_lower.contains("balance") {
BittensorError::InsufficientTxFees {
required: 0,
available: 0,
}
} else if err_lower.contains("nonce") {
BittensorError::InvalidNonce {
expected: 0,
actual: 0,
}
} else {
BittensorError::TxSubmissionError {
message: format!("Failed to submit set_weights: {err_msg}"),
}
}
})?;
Ok(())
}
};
self.retry_node.execute(operation).await?;
info!("Weights set successfully for netuid {}", netuid);
Ok(())
}
pub async fn get_neuron(
&self,
netuid: u16,
uid: u16,
) -> Result<Option<crate::NeuronInfo>, BittensorError> {
debug!("Getting neuron info for UID: {} on netuid: {}", uid, netuid);
let client = self
.connection_pool
.get_healthy_client()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Failed to get healthy client: {}", e),
})?;
let runtime_api =
client
.runtime_api()
.at_latest()
.await
.map_err(|e| BittensorError::RpcError {
message: format!("Failed to get runtime API: {e}"),
})?;
let neuron_info = runtime_api
.call(
api::runtime_apis::neuron_info_runtime_api::NeuronInfoRuntimeApi
.get_neuron(netuid, uid),
)
.await
.map_err(|e| BittensorError::RpcError {
message: format!("Failed to call get_neuron: {e}"),
})?;
Ok(neuron_info)
}
pub async fn get_metagraph(&self, netuid: u16) -> Result<Metagraph, BittensorError> {
info!(
"Fetching metagraph for netuid: {} with circuit breaker protection",
netuid
);
let operation = || {
async move {
let client = self
.connection_pool
.get_healthy_client()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Failed to get healthy client: {}", e),
})?;
let runtime_api = client.runtime_api().at_latest().await.map_err(|e| {
let err_msg = e.to_string();
let err_lower = err_msg.to_lowercase();
if err_lower.contains("timeout") {
BittensorError::RpcTimeoutError {
message: format!("Runtime API timeout: {err_msg}"),
timeout: Duration::from_secs(30),
}
} else if err_lower.contains("connection") {
BittensorError::RpcConnectionError {
message: format!("Runtime API connection failed: {err_msg}"),
}
} else {
BittensorError::RpcMethodError {
method: "runtime_api".to_string(),
message: err_msg,
}
}
})?;
let metagraph = runtime_api
.call(
api::runtime_apis::subnet_info_runtime_api::SubnetInfoRuntimeApi
.get_metagraph(netuid),
)
.await
.map_err(|e| {
let err_msg = e.to_string();
if err_msg.to_lowercase().contains("timeout") {
BittensorError::RpcTimeoutError {
message: format!("get_metagraph call timeout: {err_msg}"),
timeout: Duration::from_secs(30),
}
} else {
BittensorError::RpcMethodError {
method: "get_metagraph".to_string(),
message: err_msg,
}
}
})?
.ok_or(BittensorError::SubnetNotFound { netuid })?;
Ok(metagraph)
}
};
let mut circuit_breaker = {
let cb = self.circuit_breaker.lock().await;
cb.clone()
};
let result = circuit_breaker.execute(operation).await;
{
let mut original_cb = self.circuit_breaker.lock().await;
*original_cb = circuit_breaker;
}
match &result {
Ok(_) => info!("Metagraph fetched successfully for netuid: {}", netuid),
Err(e) => warn!("Failed to fetch metagraph for netuid {}: {}", netuid, e),
}
result
}
pub async fn get_selective_metagraph(
&self,
netuid: u16,
fields: Vec<u16>,
) -> Result<SelectiveMetagraph, BittensorError> {
info!(
"Fetching selective metagraph for netuid: {} with {} fields",
netuid,
fields.len()
);
let client = self
.connection_pool
.get_healthy_client()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Failed to get healthy client: {}", e),
})?;
let runtime_api =
client
.runtime_api()
.at_latest()
.await
.map_err(|e| BittensorError::RpcError {
message: format!("Failed to get runtime API: {e}"),
})?;
let selective_metagraph = runtime_api
.call(
api::runtime_apis::subnet_info_runtime_api::SubnetInfoRuntimeApi
.get_selective_metagraph(netuid, fields),
)
.await
.map_err(|e| BittensorError::RpcError {
message: format!("Failed to call get_selective_metagraph: {e}"),
})?
.ok_or_else(|| BittensorError::RpcError {
message: format!("Selective metagraph not found for subnet {netuid}"),
})?;
Ok(selective_metagraph)
}
pub async fn get_block_number(&self) -> Result<u64, BittensorError> {
let client = self
.connection_pool
.get_healthy_client()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Failed to get healthy client: {}", e),
})?;
let latest_block =
client
.blocks()
.at_latest()
.await
.map_err(|e| BittensorError::RpcError {
message: format!("Failed to get latest block: {e}"),
})?;
Ok(latest_block.number().into())
}
pub fn get_account_id(&self) -> AccountId {
subxt::config::polkadot::AccountId32::from(self.signer.public_key().0)
}
pub async fn get_current_block(&self) -> Result<u64, BittensorError> {
self.get_block_number().await
}
pub async fn submit_extrinsic<T>(&self, payload: T) -> Result<(), BittensorError>
where
T: subxt::tx::Payload,
{
let client = self
.connection_pool
.get_healthy_client()
.await
.map_err(|e| BittensorError::NetworkError {
message: format!("Failed to get healthy client: {}", e),
})?;
let tx_result = client
.tx()
.sign_and_submit_default(&payload, &self.signer)
.await
.map_err(|e| BittensorError::TxSubmissionError {
message: format!("Failed to submit extrinsic: {e}"),
})?;
info!("Transaction submitted with hash: {:?}", tx_result);
Ok(())
}
pub fn get_network(&self) -> &str {
&self.config.network
}
pub fn get_netuid(&self) -> u16 {
self.config.netuid
}
pub fn sign_data(&self, data: &[u8]) -> Result<String, BittensorError> {
let signature = self.signer.sign(data);
Ok(hex::encode(signature.0))
}
}
impl Service {
pub async fn get_retry_stats(&self) -> RetryStats {
RetryStats {
circuit_breaker_state: {
let cb = self.circuit_breaker.lock().await;
format!("{cb:?}")
},
}
}
pub async fn reset_circuit_breaker(&self) {
let mut cb = self.circuit_breaker.lock().await;
*cb = CircuitBreaker::new(5, Duration::from_secs(60));
info!("Circuit breaker reset");
}
pub async fn connection_metrics(&self) -> ConnectionPoolMetrics {
ConnectionPoolMetrics {
total_connections: self.connection_pool.total_connections().await,
healthy_connections: self.connection_pool.healthy_connection_count().await,
connection_state: self.connection_manager.get_state().await.status_message(),
metrics: self.connection_manager.metrics(),
}
}
pub async fn force_reconnect(&self) -> Result<(), BittensorError> {
warn!("Forcing reconnection of all connections");
self.connection_pool.refresh_connections().await?;
self.connection_manager.force_reconnect().await?;
Ok(())
}
pub async fn shutdown(mut self) {
info!("Shutting down enhanced Bittensor service");
if let Some(handle) = self.health_monitor_handle.take() {
handle.abort();
}
}
}
#[derive(Debug, Clone)]
pub struct RetryStats {
pub circuit_breaker_state: String,
}
#[derive(Debug, Clone)]
pub struct ConnectionPoolMetrics {
pub total_connections: usize,
pub healthy_connections: usize,
pub connection_state: String,
pub metrics: crate::connect::ConnectionMetricsSnapshot,
}
impl Drop for Service {
fn drop(&mut self) {
if let Some(handle) = self.health_monitor_handle.take() {
handle.abort();
}
}
}