1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// I2PControl client implementation
use std::time::{Duration, Instant};
use log::{info, warn};
use serde_json::Value;
use tokio::sync::Mutex;
use super::rpc::{rpc_call, RpcCallError};
use super::types::{AuthResult, RouterInfoResult};
const ROUTER_INFO_KEYS_BATCH_1: &[&str] = &[
"i2p.router.status", // Router status as string "1" or "0"
"i2p.router.version", // Request router version string
"i2p.router.uptime", // Request uptime in milliseconds
"i2p.router.net.bw.inbound.1s", // Request inbound bandwidth (1s avg, Bps)
"i2p.router.net.bw.inbound.15s", // Request inbound bandwidth (15s avg, Bps)
"i2p.router.net.bw.outbound.1s", // Request outbound bandwidth (1s avg, Bps)
"i2p.router.net.bw.outbound.15s", // Request outbound bandwidth (15s avg, Bps)
"i2p.router.net.bw.transit.15s", // Request transit bandwidth (15s avg, Bps)
"i2p.router.net.status", // Request IPv4 network status code (0 OK, 1 Firewalled, 2 Unknown, 3 Proxy, 4 Mesh, 5 Stan)
"i2p.router.net.status.v6", // Request IPv6 network status code (optional, same mapping)
"i2p.router.net.error", // Request IPv4 network error code
"i2p.router.net.error.v6", // Request IPv6 network error code
"i2p.router.net.testing", // Request IPv4 network testing flag
"i2p.router.net.testing.v6", // Request IPv6 network testing flag
];
const ROUTER_INFO_KEYS_BATCH_2: &[&str] = &[
"i2p.router.net.tunnels.participating", // Request participating tunnel count (0 or 1 likely)
"i2p.router.net.tunnels.inbound", // Request inbound tunnel count
"i2p.router.net.tunnels.outbound", // Request outbound tunnel count
"i2p.router.net.tunnels.successrate", // Request tunnel success rate (percent integer)
"i2p.router.net.tunnels.totalsuccessrate", // Request aggregate tunnel success rate (percent integer)
"i2p.router.net.tunnels.queue", // Request tunnel build queue size
"i2p.router.net.tunnels.tbmqueue", // Request transit build message queue size
"i2p.router.netdb.activepeers", // Request active peer count (floodfills)
"i2p.router.netdb.knownpeers", // Request known peer count (total RouterInfos)
"i2p.router.netdb.floodfills", // Request floodfill routers known to NetDB
"i2p.router.netdb.leasesets", // Request LeaseSets known to NetDB
"i2p.router.net.total.received.bytes", // Request total received bytes
"i2p.router.net.total.sent.bytes", // Request total sent bytes
"i2p.router.net.transit.sent.bytes", // Request total transit-sent bytes
];
fn build_router_info_params(keys: &[&str], token: &str) -> Value {
let mut params = serde_json::Map::new();
for key in keys {
// Use empty string instead of null; some i2pd builds reject nulls with parse errors.
params.insert((*key).to_string(), Value::String(String::new()));
}
params.insert("Token".to_string(), Value::String(token.to_string()));
Value::Object(params)
}
// Holds shared state for the application, including the API client,
// configuration, and the authentication token (protected by a Mutex).
pub struct I2pControlClient {
pub api_client: reqwest::Client, // HTTP client for making API requests
pub api_url: String, // Full URL for the I2PControl JSON-RPC endpoint
pub password: String, // Password for the I2PControl API
pub token: Mutex<Option<String>>, // Current authentication token (None if not authenticated)
// Singleflight-style mutex to ensure only one in-flight authentication
// happens at a time across concurrent scrapes.
auth_lock: Mutex<()>,
pub max_scrape_timeout: Duration, // Hard cap for header-derived scrape timeout
}
impl I2pControlClient {
// Creates a new AppState instance.
pub fn new(
api_client: reqwest::Client,
api_url: String,
password: String,
max_scrape_timeout: Duration,
) -> Self {
I2pControlClient {
api_client,
api_url,
password,
token: Mutex::new(None),
auth_lock: Mutex::new(()),
max_scrape_timeout,
}
}
// Authenticate with the I2PControl JSON-RPC API using the configured password.
// Stores the obtained token in the AppState's Mutex and returns it.
pub async fn authenticate(
&self,
timeout: Duration,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Ensure only one concurrent authentication attempt is in-flight.
let _flight = self.auth_lock.lock().await;
// Double-check if another task already refreshed the token while we waited.
if let Some(existing) = { self.token.lock().await.clone() } {
return Ok(existing);
}
let params = serde_json::json!({ "API": 1, "Password": self.password });
let result: AuthResult = rpc_call(
&self.api_client,
&self.api_url,
"Authenticate",
params,
timeout,
)
.await
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
if let Some(token) = result.token {
{
let mut guard = self.token.lock().await;
*guard = Some(token.clone());
}
info!("Obtained authentication token from I2PControl");
return Ok(token);
}
Err("Authentication failed: no token received".into())
}
// Fetch router information from the I2PControl API.
// Handles token acquisition and re-authentication if the token expires.
pub async fn fetch_router_info(
&self,
overall_timeout: Duration,
) -> Result<RouterInfoResult, Box<dyn std::error::Error + Send + Sync>> {
let deadline = Instant::now() + overall_timeout;
let mut did_retry = false; // Flag to prevent infinite retry loops
let mut combined = RouterInfoResult::default();
let mut next_batch_idx = 0usize; // Track progress so partial successes are preserved across retries
'outer: loop {
// Loop to handle potential re-authentication
// Get the current token from the mutex
let current_token = {
let guard = self.token.lock().await; // Lock the mutex
guard.clone() // Clone the Option<String>
}; // Mutex guard is dropped here
// If no token exists, call authenticate() to get one.
// If a token exists, use it.
let token = match current_token {
Some(tok) => tok,
None => {
info!("No token found, authenticating...");
let now = Instant::now();
let rem = if now >= deadline {
Duration::from_millis(0)
} else {
deadline.saturating_duration_since(now)
};
if rem.is_zero() {
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"deadline exceeded before authentication",
)
.into());
}
self.authenticate(rem).await?
}
};
for (batch_idx, keys) in [ROUTER_INFO_KEYS_BATCH_1, ROUTER_INFO_KEYS_BATCH_2]
.iter()
.enumerate()
.skip(next_batch_idx)
{
let now = Instant::now();
let rem = if now >= deadline {
Duration::from_millis(0)
} else {
deadline.saturating_duration_since(now)
};
if rem.is_zero() {
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"deadline exceeded before RouterInfo batch {}",
batch_idx + 1
),
)
.into());
}
let params = build_router_info_params(keys, &token);
let data = match rpc_call::<RouterInfoResult>(
&self.api_client,
&self.api_url,
"RouterInfo",
params,
rem,
)
.await
{
Ok(data) => data,
Err(err) => {
let is_token_err = matches!(
err,
RpcCallError::Rpc {
code: -32004..=-32002,
..
}
);
if is_token_err && !did_retry {
warn!(
"Token error during RouterInfo batch {}, re-authenticating...: {}",
batch_idx + 1,
err
);
{
let mut guard = self.token.lock().await;
*guard = None;
}
let now = Instant::now();
let rem = if now >= deadline {
Duration::from_millis(0)
} else {
deadline.saturating_duration_since(now)
};
if rem.is_zero() {
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"deadline exceeded before re-authentication",
)
.into());
}
let _ = self.authenticate(rem).await?;
did_retry = true;
// Preserve already merged batches and retry from the failed batch.
continue 'outer;
}
return Err(Box::new(err));
}
};
combined.merge_from(data);
next_batch_idx = batch_idx + 1;
}
return Ok(combined);
}
}
}