use std::sync::Arc;
use std::net::SocketAddr;
use std::time::Duration;
use parking_lot::Mutex;
use serde::Deserialize;
use serde_json::Value;
use log::{info, warn, error};
use warp::Filter;
#[derive(Debug, Deserialize)]
struct RpcError {
code: i32,
message: String,
}
#[derive(Debug, Deserialize)]
struct RpcResponse<T> {
#[serde(default)]
result: Option<T>,
#[serde(default)]
error: Option<RpcError>,
}
#[derive(Debug, Deserialize, Default)]
struct AuthResult {
#[serde(rename = "Token")]
token: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
struct RouterInfoResult {
#[serde(rename = "i2p.router.status")]
router_status: Option<String>,
#[serde(rename = "i2p.router.version")]
router_version: Option<String>,
#[serde(rename = "i2p.router.uptime")]
router_uptime: Option<Value>,
#[serde(rename = "i2p.router.net.bw.inbound.1s")]
bw_inbound_1s: Option<f64>,
#[serde(rename = "i2p.router.net.bw.inbound.15s")]
bw_inbound_15s: Option<f64>,
#[serde(rename = "i2p.router.net.bw.outbound.1s")]
bw_outbound_1s: Option<f64>,
#[serde(rename = "i2p.router.net.bw.outbound.15s")]
bw_outbound_15s: Option<f64>,
#[serde(rename = "i2p.router.net.status")]
net_status: Option<u64>,
#[serde(rename = "i2p.router.net.tunnels.participating")]
tunnels_participating: Option<u64>,
#[serde(rename = "i2p.router.netdb.activepeers")]
netdb_activepeers: Option<u64>,
#[serde(rename = "i2p.router.netdb.knownpeers")]
netdb_knownpeers: Option<u64>,
}
struct AppState {
api_client: reqwest::Client, api_url: String, password: String, token: Mutex<Option<String>>, }
impl AppState {
fn new(
api_client: reqwest::Client,
api_url: String,
password: String,
) -> Self {
AppState {
api_client,
api_url,
password,
token: Mutex::new(None),
}
}
async fn authenticate(&self) -> Result<String, Box<dyn std::error::Error>> {
let req_body = serde_json::json!({
"id": "1", "method": "Authenticate",
"params": { "API": 1, "Password": self.password },
"jsonrpc": "2.0"
});
let response = self.api_client.post(&self.api_url).json(&req_body).send().await?;
if !response.status().is_success() {
return Err(format!(
"Authentication HTTP request failed with status {}",
response.status()
)
.into());
}
let rpc: RpcResponse<AuthResult> = response.json().await?;
if let Some(err) = rpc.error {
return Err(format!(
"Authentication error {}: {}",
err.code, err.message
)
.into());
}
if let Some(result) = rpc.result {
if let Some(token) = result.token {
{
let mut guard = self.token.lock(); *guard = Some(token.clone()); } info!("Obtained authentication token from I2PControl");
return Ok(token); }
}
Err("Authentication failed: no token received".into())
}
async fn fetch_metrics(&self) -> Result<String, Box<dyn std::error::Error>> {
let mut did_retry = false;
loop { let current_token = {
let guard = self.token.lock(); guard.clone() };
let token = match current_token {
Some(tok) => tok,
None => {
info!("No token found, authenticating...");
self.authenticate().await?
}
};
let mut params = serde_json::Map::new();
for key in &[
"i2p.router.status", "i2p.router.version", "i2p.router.uptime", "i2p.router.net.bw.inbound.1s", "i2p.router.net.bw.inbound.15s", "i2p.router.net.bw.outbound.1s", "i2p.router.net.bw.outbound.15s", "i2p.router.net.status", "i2p.router.net.tunnels.participating", "i2p.router.netdb.activepeers", "i2p.router.netdb.knownpeers", ] {
params.insert((*key).to_string(), Value::Null);
}
params.insert("Token".to_string(), Value::String(token.clone()));
let req_body = serde_json::json!({
"id": "1", "method": "RouterInfo", "params": params, "jsonrpc": "2.0"
});
let response = self.api_client.post(&self.api_url).json(&req_body).send().await?;
if !response.status().is_success() {
return Err(format!(
"Metrics HTTP request failed with status {}",
response.status()
)
.into());
}
let rpc: RpcResponse<RouterInfoResult> = response.json().await?;
if let Some(err) = rpc.error {
let code = err.code;
if (code == -32002 || code == -32003 || code == -32004) && !did_retry {
warn!("Token error (code {}), re-authenticating...", code);
{
let mut guard = self.token.lock();
*guard = None;
}
let _ = self.authenticate().await?;
did_retry = true; continue;
}
return Err(format!("RouterInfo error {}: {}", code, err.message).into());
}
let data = rpc.result.ok_or("No RouterInfo result")?;
let mut output = String::with_capacity(1024);
if let Some(status) = &data.router_status {
output += "# HELP i2p_router_status Router status string\n";
output += "# TYPE i2p_router_status gauge\n";
let status_value = status.parse::<f64>().unwrap_or(1.0);
output += &format!("i2p_router_status {}\n", status_value);
}
if let Some(version) = &data.router_version {
output += "# HELP i2p_router_version_info Router version information\n";
output += "# TYPE i2p_router_version_info gauge\n";
output += &format!("i2p_router_version_info{{version=\"{}\"}} 1\n", version);
}
if let Some(val) = data.router_uptime {
let seconds = match val {
Value::Number(num) => num.as_f64().unwrap_or(0.0) / 1000.0, Value::String(s) => s.parse::<f64>().unwrap_or(0.0) / 1000.0, _ => 0.0, };
output += "# HELP i2p_router_uptime_seconds Router uptime in seconds\n";
output += "# TYPE i2p_router_uptime_seconds gauge\n";
output += &format!("i2p_router_uptime_seconds {:.3}\n", seconds); }
if data.bw_inbound_1s.is_some() || data.bw_inbound_15s.is_some() {
output += "# HELP i2p_router_bandwidth_inbound_bytes_per_second Inbound bandwidth in bytes/sec\n";
output += "# TYPE i2p_router_bandwidth_inbound_bytes_per_second gauge\n";
if let Some(bw) = data.bw_inbound_1s { output += &format!("i2p_router_bandwidth_inbound_bytes_per_second{{interval=\"1s\"}} {}\n", bw);
}
if let Some(bw) = data.bw_inbound_15s { output += &format!("i2p_router_bandwidth_inbound_bytes_per_second{{interval=\"15s\"}} {}\n", bw);
}
}
if data.bw_outbound_1s.is_some() || data.bw_outbound_15s.is_some() {
output += "# HELP i2p_router_bandwidth_outbound_bytes_per_second Outbound bandwidth in bytes/sec\n";
output += "# TYPE i2p_router_bandwidth_outbound_bytes_per_second gauge\n";
if let Some(bw) = data.bw_outbound_1s { output += &format!("i2p_router_bandwidth_outbound_bytes_per_second{{interval=\"1s\"}} {}\n", bw);
}
if let Some(bw) = data.bw_outbound_15s { output += &format!("i2p_router_bandwidth_outbound_bytes_per_second{{interval=\"15s\"}} {}\n", bw);
}
}
if let Some(status) = data.net_status {
output += "# HELP i2p_router_network_status_code Network status code (numeric)\n";
output += "# TYPE i2p_router_network_status_code gauge\n";
output += &format!("i2p_router_network_status_code {}\n", status);
}
if let Some(count) = data.tunnels_participating {
output += "# HELP i2p_router_tunnels_participating Number of active participating transit tunnels\n";
output += "# TYPE i2p_router_tunnels_participating gauge\n";
output += &format!("i2p_router_tunnels_participating {}\n", count);
}
if let Some(count) = data.netdb_activepeers {
output += "# HELP i2p_router_netdb_activepeers Number of active known peers in NetDB\n";
output += "# TYPE i2p_router_netdb_activepeers gauge\n";
output += &format!("i2p_router_netdb_activepeers {}\n", count);
}
if let Some(count) = data.netdb_knownpeers {
output += "# HELP i2p_router_netdb_knownpeers Total number of known peers (RouterInfos) in NetDB\n";
output += "# TYPE i2p_router_netdb_knownpeers gauge\n";
output += &format!("i2p_router_netdb_knownpeers {}\n", count);
}
output += "# HELP i2pd_exporter_version_info Version of the i2pd-exporter\n";
output += "# TYPE i2pd_exporter_version_info gauge\n";
output += &format!(
"i2pd_exporter_version_info{{version=\"{}\"}} 1\n",
env!("CARGO_PKG_VERSION")
);
return Ok(output);
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let i2p_addr = std::env::var("I2PCONTROL_ADDRESS")
.unwrap_or_else(|_| "https://127.0.0.1:7650".to_string());
let i2p_password = std::env::var("I2PCONTROL_PASSWORD")
.unwrap_or_else(|_| "itoopie".to_string());
let listen_addr = std::env::var("METRICS_LISTEN_ADDR")
.unwrap_or_else(|_| "0.0.0.0:9600".to_string());
let http_timeout = std::env::var("HTTP_TIMEOUT_SECONDS")
.unwrap_or_else(|_| "60".to_string())
.parse::<u64>()
.unwrap_or(60);
let listen_addr: SocketAddr = listen_addr.parse().expect("Invalid listen address");
info!("Starting I2PControl exporter on {} (target: {})", listen_addr, i2p_addr);
let api_client = reqwest::Client::builder()
.danger_accept_invalid_certs(true) .timeout(Duration::from_secs(http_timeout))
.build()?;
let state = Arc::new(AppState::new(
api_client,
format!("{}/jsonrpc", i2p_addr.trim_end_matches('/')),
i2p_password,
));
if !state.password.is_empty() {
if let Err(e) = state.authenticate().await {
error!("Initial authentication failed: {}", e);
}
}
async fn metrics_handler(st: Arc<AppState>) -> Result<impl warp::Reply, warp::Rejection> {
match st.fetch_metrics().await {
Ok(metrics) => {
let reply = warp::reply::with_status(metrics, warp::http::StatusCode::OK);
let reply = warp::reply::with_header(
reply,
"Content-Type",
"text/plain; version=0.0.4"
);
Ok(reply)
}
Err(err) => {
error!("Failed to fetch metrics: {}", err);
let error_body = "Error retrieving metrics".to_string();
let reply = warp::reply::with_status(error_body, warp::http::StatusCode::INTERNAL_SERVER_ERROR);
let reply = warp::reply::with_header(reply, "Content-Type", "text/plain; version=0.0.4");
Ok(reply)
}
}
}
let route_metrics = warp::path("metrics")
.and(warp::any().map(move || state.clone()))
.and_then(metrics_handler);
let route_404 = warp::any().map(|| {
warp::reply::with_status("Not Found", warp::http::StatusCode::NOT_FOUND)
});
let routes = route_metrics.or(route_404);
info!("Listening on http://{}", listen_addr);
warp::serve(routes).run(listen_addr).await;
Ok(())
}