use std::sync::Arc;
use x402_gateway::state::AppState as GatewayState;
use x402_identity::InstanceIdentity;
use x402_soul::error::SoulError;
use x402_soul::observer::{NodeObserver, NodeSnapshot};
pub struct NodeObserverImpl {
gateway: GatewayState,
identity: Option<InstanceIdentity>,
generation: u32,
started_at: chrono::DateTime<chrono::Utc>,
db_path: String,
peers_cache: std::sync::Mutex<(Vec<x402_soul::observer::PeerInfo>, std::time::Instant)>,
}
impl NodeObserverImpl {
pub fn new(
gateway: GatewayState,
identity: Option<InstanceIdentity>,
generation: u32,
started_at: chrono::DateTime<chrono::Utc>,
db_path: String,
) -> Arc<Self> {
Arc::new(Self {
gateway,
identity,
generation,
started_at,
db_path,
peers_cache: std::sync::Mutex::new((Vec::new(), std::time::Instant::now())),
})
}
}
impl NodeObserverImpl {
pub async fn refresh_peers(&self) {
use x402_soul::observer::PeerInfo;
let parent_url = std::env::var("PARENT_URL").ok();
let self_instance_id = self.identity.as_ref().map(|id| id.instance_id.as_str());
let mut peers = Vec::new();
if let Ok(conn) = rusqlite::Connection::open(&self.db_path) {
if let Ok(children) = crate::db::query_children_active(&conn) {
for child in children {
if child.status != "running" {
continue;
}
if let Some(url) = &child.url {
let mut peer = PeerInfo {
instance_id: child.instance_id.clone(),
url: url.clone(),
address: Some(child.address.clone()),
version: None,
endpoints: Vec::new(),
};
if let Ok(info) = Self::fetch_peer_info(url).await {
peer.version = info.version;
peer.endpoints = info.endpoints;
}
peers.push(peer);
}
}
}
}
if let Some(ref parent) = parent_url {
if let Ok(info) = Self::fetch_peer_info(parent).await {
let parent_instance_id = info
.instance_id
.clone()
.unwrap_or_else(|| "parent".to_string());
let skip_parent = self_instance_id == Some(parent_instance_id.as_str());
if !skip_parent {
peers.push(PeerInfo {
instance_id: parent_instance_id,
url: parent.clone(),
address: info.address.clone(),
version: info.version.clone(),
endpoints: info.endpoints.clone(),
});
}
}
let siblings_url = format!("{}/instance/siblings", parent.trim_end_matches('/'));
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.redirect(reqwest::redirect::Policy::limited(5))
.build();
if let Ok(client) = client {
if let Ok(resp) = client.get(&siblings_url).send().await {
if let Ok(json) = resp.json::<serde_json::Value>().await {
if let Some(siblings) = json.get("siblings").and_then(|v| v.as_array()) {
for sib in siblings {
let inst_id = sib
.get("instance_id")
.and_then(|v| v.as_str())
.unwrap_or_default();
if self_instance_id == Some(inst_id) {
continue;
}
if peers.iter().any(|p| p.instance_id == inst_id) {
continue;
}
let url = match sib.get("url").and_then(|v| v.as_str()) {
Some(u) => u.to_string(),
None => continue,
};
let mut peer = PeerInfo {
instance_id: inst_id.to_string(),
url: url.clone(),
address: sib
.get("address")
.and_then(|v| v.as_str())
.map(String::from),
version: None,
endpoints: Vec::new(),
};
if let Ok(info) = Self::fetch_peer_info(&url).await {
peer.version = info.version;
peer.endpoints = info.endpoints;
}
peers.push(peer);
}
}
}
}
}
}
if let Ok(mut cache) = self.peers_cache.lock() {
*cache = (peers, std::time::Instant::now());
}
}
async fn fetch_peer_info(
peer_url: &str,
) -> Result<PeerInfoResult, Box<dyn std::error::Error + Send + Sync>> {
use x402_soul::observer::PeerEndpoint;
let url = format!("{}/instance/info", peer_url.trim_end_matches('/'));
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.redirect(reqwest::redirect::Policy::limited(5))
.build()?;
let resp = client.get(&url).send().await?;
let json: serde_json::Value = resp.json().await?;
let instance_id = json
.get("identity")
.and_then(|v| v.get("instance_id"))
.and_then(|v| v.as_str())
.map(String::from);
let address = json
.get("identity")
.and_then(|v| v.get("address"))
.and_then(|v| v.as_str())
.map(String::from);
let version = json
.get("version")
.and_then(|v| v.as_str())
.map(String::from);
let mut endpoints = Vec::new();
if let Some(eps) = json.get("endpoints").and_then(|v| v.as_array()) {
for ep in eps {
endpoints.push(PeerEndpoint {
slug: ep
.get("slug")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string(),
price: ep
.get("price")
.and_then(|v| v.as_str())
.unwrap_or("0")
.to_string(),
description: ep
.get("description")
.and_then(|v| v.as_str())
.map(String::from),
});
}
}
Ok(PeerInfoResult {
instance_id,
address,
version,
endpoints,
})
}
}
struct PeerInfoResult {
instance_id: Option<String>,
address: Option<String>,
version: Option<String>,
endpoints: Vec<x402_soul::observer::PeerEndpoint>,
}
impl NodeObserver for NodeObserverImpl {
fn observe(&self) -> Result<NodeSnapshot, SoulError> {
use x402_soul::observer::EndpointSummary;
let uptime_secs = (chrono::Utc::now() - self.started_at).num_seconds().max(0) as u64;
let endpoints = self
.gateway
.db
.list_endpoints(500, 0)
.map_err(|e| SoulError::Observer(format!("failed to list endpoints: {e}")))?;
let endpoint_count = endpoints.len() as u32;
let stats = self
.gateway
.db
.list_endpoint_stats(500, 0)
.map_err(|e| SoulError::Observer(format!("failed to list stats: {e}")))?;
let stats_by_slug: std::collections::HashMap<&str, _> =
stats.iter().map(|s| (s.slug.as_str(), s)).collect();
let mut total_revenue: u128 = 0;
let mut total_payments: u64 = 0;
let mut endpoint_summaries = Vec::new();
for ep in &endpoints {
let stat = stats_by_slug.get(ep.slug.as_str());
let req_count = stat.map(|s| s.request_count).unwrap_or(0);
let pay_count = stat.map(|s| s.payment_count).unwrap_or(0);
let rev = stat
.map(|s| s.revenue_total.clone())
.unwrap_or_else(|| "0".to_string());
total_revenue += rev.parse::<u128>().unwrap_or(0);
total_payments += pay_count as u64;
endpoint_summaries.push(EndpointSummary {
slug: ep.slug.clone(),
price: ep.price_usd.clone(),
description: ep.description.clone(),
request_count: req_count,
payment_count: pay_count,
revenue: rev,
});
}
let children_count = {
match rusqlite::Connection::open(&self.db_path) {
Ok(conn) => crate::db::query_children_active(&conn)
.map(|c| c.len() as u32)
.unwrap_or(0),
Err(_) => 0,
}
};
let peers = self
.peers_cache
.lock()
.map(|cache| cache.0.clone())
.unwrap_or_default();
Ok(NodeSnapshot {
uptime_secs,
endpoint_count,
total_revenue: total_revenue.to_string(),
total_payments,
children_count,
wallet_address: self
.identity
.as_ref()
.map(|id| format!("{:#x}", id.address)),
instance_id: self.identity.as_ref().map(|id| id.instance_id.clone()),
generation: self.generation,
endpoints: endpoint_summaries,
peers,
})
}
}