1use std::collections::BTreeMap;
2use std::path::PathBuf;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use rusqlite::{params, Connection, OpenFlags};
6use serde_json::{json, Value};
7
8use crate::http::{parse_query, HttpRequest, HttpResponse};
9use crate::state::{read_state, SharedState};
10
11const DEFAULT_WINDOW_SECONDS: i64 = 24 * 60 * 60;
12const MAX_WINDOW_SECONDS: i64 = 30 * 24 * 60 * 60;
13const DEFAULT_LIMIT: usize = 10;
14const MAX_LIMIT: usize = 100;
15
16pub fn handle_summary(req: &HttpRequest, state: &SharedState) -> HttpResponse {
17 let params = parse_query(&req.query);
18 let query = match StatsQuery::from_params(¶ms) {
19 Ok(query) => query,
20 Err(err) => return HttpResponse::bad_request(&err),
21 };
22 with_db(state, |db_path, conn| {
23 let announce_total: i64 = conn
24 .query_row(
25 "SELECT COUNT(*) FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
26 params![query.start_ms, query.end_ms],
27 |row| row.get(0),
28 )
29 .map_err(db_error)?;
30 let unique_destinations: i64 = conn
31 .query_row(
32 "SELECT COUNT(DISTINCT hex(destination_hash))
33 FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
34 params![query.start_ms, query.end_ms],
35 |row| row.get(0),
36 )
37 .map_err(db_error)?;
38 let unique_identities: i64 = conn
39 .query_row(
40 "SELECT COUNT(DISTINCT hex(identity_hash))
41 FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
42 params![query.start_ms, query.end_ms],
43 |row| row.get(0),
44 )
45 .map_err(db_error)?;
46 let unique_names: i64 = conn
47 .query_row(
48 "SELECT COUNT(DISTINCT hex(name_hash))
49 FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
50 params![query.start_ms, query.end_ms],
51 |row| row.get(0),
52 )
53 .map_err(db_error)?;
54 let unique_interfaces: i64 = conn
55 .query_row(
56 "SELECT COUNT(DISTINCT interface_id)
57 FROM seen_announces
58 WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2 AND interface_id IS NOT NULL",
59 params![query.start_ms, query.end_ms],
60 |row| row.get(0),
61 )
62 .map_err(db_error)?;
63 let first_seen_ms: Option<i64> = conn
64 .query_row(
65 "SELECT MIN(seen_at_ms) FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
66 params![query.start_ms, query.end_ms],
67 |row| row.get(0),
68 )
69 .map_err(db_error)?;
70 let last_seen_ms: Option<i64> = conn
71 .query_row(
72 "SELECT MAX(seen_at_ms) FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
73 params![query.start_ms, query.end_ms],
74 |row| row.get(0),
75 )
76 .map_err(db_error)?;
77 let latest_process_sample = latest_process_sample(conn)?;
78 let provider_dropped_events: i64 = conn
79 .query_row(
80 "SELECT COALESCE(SUM(dropped_events), 0)
81 FROM provider_drop_samples WHERE ts_ms >= ?1 AND ts_ms < ?2",
82 params![query.start_ms, query.end_ms],
83 |row| row.get(0),
84 )
85 .map_err(db_error)?;
86
87 let mut rx_packets = 0i64;
88 let mut tx_packets = 0i64;
89 let mut rx_bytes = 0i64;
90 let mut tx_bytes = 0i64;
91 let mut latest_packet_update_ms = None;
92 let mut stmt = conn
93 .prepare(
94 "SELECT direction,
95 COALESCE(SUM(packets), 0),
96 COALESCE(SUM(bytes), 0),
97 MAX(updated_at_ms)
98 FROM packet_counters
99 GROUP BY direction",
100 )
101 .map_err(db_error)?;
102 let mut rows = stmt.query([]).map_err(db_error)?;
103 while let Some(row) = rows.next().map_err(db_error)? {
104 let direction: String = row.get(0).map_err(db_error)?;
105 let packets: i64 = row.get(1).map_err(db_error)?;
106 let bytes: i64 = row.get(2).map_err(db_error)?;
107 let updated_at_ms: Option<i64> = row.get(3).map_err(db_error)?;
108 if direction == "in" {
109 rx_packets = packets;
110 rx_bytes = bytes;
111 } else if direction == "out" {
112 tx_packets = packets;
113 tx_bytes = bytes;
114 }
115 latest_packet_update_ms = latest_packet_update_ms.max(updated_at_ms);
116 }
117 let active_counters_in_window: i64 = conn
118 .query_row(
119 "SELECT COUNT(*) FROM packet_counters WHERE updated_at_ms >= ?1 AND updated_at_ms < ?2",
120 params![query.start_ms, query.end_ms],
121 |row| row.get(0),
122 )
123 .map_err(db_error)?;
124
125 Ok(HttpResponse::ok(json!({
126 "db_path": db_path.display().to_string(),
127 "generated_at_ms": query.end_ms,
128 "window": query.window_json(),
129 "announces": {
130 "total": announce_total,
131 "unique_destinations": unique_destinations,
132 "unique_identities": unique_identities,
133 "unique_names": unique_names,
134 "unique_interfaces": unique_interfaces,
135 "first_seen_ms": first_seen_ms,
136 "last_seen_ms": last_seen_ms,
137 },
138 "packets": {
139 "scope": "lifetime_counters_snapshot",
140 "rx_packets": rx_packets,
141 "tx_packets": tx_packets,
142 "rx_bytes": rx_bytes,
143 "tx_bytes": tx_bytes,
144 "active_counters_in_window": active_counters_in_window,
145 "latest_updated_at_ms": latest_packet_update_ms,
146 },
147 "system": {
148 "provider_dropped_events": provider_dropped_events,
149 "latest_process_sample": latest_process_sample,
150 }
151 })))
152 })
153}
154
155pub fn handle_announces(req: &HttpRequest, state: &SharedState) -> HttpResponse {
156 let params = parse_query(&req.query);
157 let query = match StatsQuery::from_params(¶ms) {
158 Ok(query) => query,
159 Err(err) => return HttpResponse::bad_request(&err),
160 };
161 with_db(state, |db_path, conn| {
162 let mut buckets = zero_count_buckets(&query);
163 let mut stmt = conn
164 .prepare(
165 "SELECT (seen_at_ms / ?1) * ?1 AS bucket_start_ms,
166 COUNT(*) AS announce_count,
167 COUNT(DISTINCT hex(destination_hash)) AS unique_destinations,
168 COUNT(DISTINCT hex(identity_hash)) AS unique_identities
169 FROM seen_announces
170 WHERE seen_at_ms >= ?2 AND seen_at_ms < ?3
171 GROUP BY bucket_start_ms
172 ORDER BY bucket_start_ms",
173 )
174 .map_err(db_error)?;
175 let mut rows = stmt
176 .query(params![query.bucket_ms, query.start_ms, query.end_ms])
177 .map_err(db_error)?;
178 while let Some(row) = rows.next().map_err(db_error)? {
179 let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
180 if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
181 bucket["announce_count"] = Value::from(row.get::<_, i64>(1).map_err(db_error)?);
182 bucket["unique_destinations"] =
183 Value::from(row.get::<_, i64>(2).map_err(db_error)?);
184 bucket["unique_identities"] = Value::from(row.get::<_, i64>(3).map_err(db_error)?);
185 }
186 }
187
188 let series: Vec<Value> = buckets.into_values().collect();
189 let average = if series.is_empty() {
190 0.0
191 } else {
192 series
193 .iter()
194 .map(|bucket| bucket["announce_count"].as_i64().unwrap_or(0) as f64)
195 .sum::<f64>()
196 / series.len() as f64
197 };
198 let burst_buckets: Vec<Value> = series
199 .iter()
200 .filter(|bucket| {
201 let count = bucket["announce_count"].as_i64().unwrap_or(0) as f64;
202 average > 0.0 && count > average * 2.0
203 })
204 .cloned()
205 .collect();
206
207 Ok(HttpResponse::ok(json!({
208 "db_path": db_path.display().to_string(),
209 "window": query.window_json(),
210 "bucket_seconds": query.bucket_ms / 1000,
211 "series": series,
212 "anomalies": {
213 "average_announce_count_per_bucket": average,
214 "burst_buckets": burst_buckets,
215 }
216 })))
217 })
218}
219
220pub fn handle_interfaces(req: &HttpRequest, state: &SharedState) -> HttpResponse {
221 let params = parse_query(&req.query);
222 let query = match StatsQuery::from_params(¶ms) {
223 Ok(query) => query,
224 Err(err) => return HttpResponse::bad_request(&err),
225 };
226 let limit = parse_limit(¶ms);
227 with_db(state, |db_path, conn| {
228 let mut stmt = conn
229 .prepare(
230 "SELECT interface_id,
231 COUNT(*) AS announce_count,
232 COUNT(DISTINCT hex(destination_hash)) AS unique_destinations,
233 COUNT(DISTINCT hex(identity_hash)) AS unique_identities,
234 MIN(hops) AS min_hops,
235 MAX(hops) AS max_hops,
236 MAX(seen_at_ms) AS last_seen_ms
237 FROM seen_announces
238 WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2
239 GROUP BY interface_id
240 ORDER BY announce_count DESC, last_seen_ms DESC
241 LIMIT ?3",
242 )
243 .map_err(db_error)?;
244 let entries = collect_rows(
245 stmt.query(params![query.start_ms, query.end_ms, limit as i64])
246 .map_err(db_error)?,
247 |row| {
248 Ok(json!({
249 "interface_id": row.get::<_, Option<i64>>(0).map_err(db_error)?,
250 "announce_count": row.get::<_, i64>(1).map_err(db_error)?,
251 "unique_destinations": row.get::<_, i64>(2).map_err(db_error)?,
252 "unique_identities": row.get::<_, i64>(3).map_err(db_error)?,
253 "min_hops": row.get::<_, i64>(4).map_err(db_error)?,
254 "max_hops": row.get::<_, i64>(5).map_err(db_error)?,
255 "last_seen_ms": row.get::<_, i64>(6).map_err(db_error)?,
256 }))
257 },
258 )?;
259 Ok(HttpResponse::ok(json!({
260 "db_path": db_path.display().to_string(),
261 "window": query.window_json(),
262 "limit": limit,
263 "interfaces": entries,
264 })))
265 })
266}
267
268pub fn handle_destinations(req: &HttpRequest, state: &SharedState) -> HttpResponse {
269 let params = parse_query(&req.query);
270 let query = match StatsQuery::from_params(¶ms) {
271 Ok(query) => query,
272 Err(err) => return HttpResponse::bad_request(&err),
273 };
274 let limit = parse_limit(¶ms);
275 with_db(state, |db_path, conn| {
276 let mut stmt = conn
277 .prepare(
278 "SELECT hex(a.destination_hash) AS destination_hash,
279 hex(MAX(a.identity_hash)) AS identity_hash,
280 hex(MAX(a.name_hash)) AS name_hash,
281 COUNT(*) AS announce_count,
282 MIN(a.seen_at_ms) AS first_seen_ms,
283 MAX(a.seen_at_ms) AS last_seen_ms,
284 MIN(a.hops) AS min_hops,
285 MAX(a.hops) AS max_hops,
286 d.announce_count AS lifetime_announce_count,
287 d.last_interface_id AS last_interface_id
288 FROM seen_announces a
289 LEFT JOIN seen_destinations d ON d.destination_hash = a.destination_hash
290 WHERE a.seen_at_ms >= ?1 AND a.seen_at_ms < ?2
291 GROUP BY a.destination_hash
292 ORDER BY announce_count DESC, last_seen_ms DESC
293 LIMIT ?3",
294 )
295 .map_err(db_error)?;
296 let entries = collect_rows(
297 stmt.query(params![query.start_ms, query.end_ms, limit as i64])
298 .map_err(db_error)?,
299 |row| {
300 Ok(json!({
301 "destination_hash": row.get::<_, String>(0).map_err(db_error)?.to_lowercase(),
302 "identity_hash": row.get::<_, String>(1).map_err(db_error)?.to_lowercase(),
303 "name_hash": row.get::<_, String>(2).map_err(db_error)?.to_lowercase(),
304 "announce_count": row.get::<_, i64>(3).map_err(db_error)?,
305 "first_seen_ms": row.get::<_, i64>(4).map_err(db_error)?,
306 "last_seen_ms": row.get::<_, i64>(5).map_err(db_error)?,
307 "min_hops": row.get::<_, i64>(6).map_err(db_error)?,
308 "max_hops": row.get::<_, i64>(7).map_err(db_error)?,
309 "lifetime_announce_count": row.get::<_, Option<i64>>(8).map_err(db_error)?,
310 "last_interface_id": row.get::<_, Option<i64>>(9).map_err(db_error)?,
311 }))
312 },
313 )?;
314 Ok(HttpResponse::ok(json!({
315 "db_path": db_path.display().to_string(),
316 "window": query.window_json(),
317 "limit": limit,
318 "destinations": entries,
319 })))
320 })
321}
322
323pub fn handle_packets(req: &HttpRequest, state: &SharedState) -> HttpResponse {
324 let params = parse_query(&req.query);
325 let query = match StatsQuery::from_params(¶ms) {
326 Ok(query) => query,
327 Err(err) => return HttpResponse::bad_request(&err),
328 };
329 let limit = parse_limit(¶ms);
330 with_db(state, |db_path, conn| {
331 let mut stmt = conn
332 .prepare(
333 "SELECT interface_key, interface_id, direction, packet_type, packets, bytes, updated_at_ms
334 FROM packet_counters
335 WHERE updated_at_ms >= ?1 AND updated_at_ms < ?2
336 ORDER BY packets DESC, bytes DESC
337 LIMIT ?3",
338 )
339 .map_err(db_error)?;
340 let counters = collect_rows(
341 stmt.query(params![query.start_ms, query.end_ms, limit as i64])
342 .map_err(db_error)?,
343 |row| {
344 Ok(json!({
345 "interface_key": row.get::<_, String>(0).map_err(db_error)?,
346 "interface_id": row.get::<_, Option<i64>>(1).map_err(db_error)?,
347 "direction": row.get::<_, String>(2).map_err(db_error)?,
348 "packet_type": row.get::<_, String>(3).map_err(db_error)?,
349 "packets": row.get::<_, i64>(4).map_err(db_error)?,
350 "bytes": row.get::<_, i64>(5).map_err(db_error)?,
351 "updated_at_ms": row.get::<_, i64>(6).map_err(db_error)?,
352 }))
353 },
354 )?;
355 Ok(HttpResponse::ok(json!({
356 "db_path": db_path.display().to_string(),
357 "window": query.window_json(),
358 "limit": limit,
359 "scope": "lifetime_counters_filtered_by_recent_activity",
360 "counters": counters,
361 })))
362 })
363}
364
365pub fn handle_packet_series(req: &HttpRequest, state: &SharedState) -> HttpResponse {
366 let params = parse_query(&req.query);
367 let query = match StatsQuery::from_params(¶ms) {
368 Ok(query) => query,
369 Err(err) => return HttpResponse::bad_request(&err),
370 };
371 with_db(state, |db_path, conn| {
372 if !has_table(conn, "packet_samples")? {
373 let mut series: Vec<Value> = zero_packet_buckets(&query).into_values().collect();
374 finalize_packet_series(&mut series);
375 return Ok(HttpResponse::ok(json!({
376 "db_path": db_path.display().to_string(),
377 "window": query.window_json(),
378 "bucket_seconds": query.bucket_ms / 1000,
379 "series": series,
380 "anomalies": {
381 "average_packets_per_bucket": 0.0,
382 "busy_buckets": [],
383 }
384 })));
385 }
386 let mut buckets = zero_packet_buckets(&query);
387 let mut stmt = conn
388 .prepare(
389 "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
390 direction,
391 COALESCE(SUM(packets), 0),
392 COALESCE(SUM(bytes), 0),
393 COUNT(DISTINCT interface_key || ':' || packet_type)
394 FROM packet_samples
395 WHERE ts_ms >= ?2 AND ts_ms < ?3
396 GROUP BY bucket_start_ms, direction
397 ORDER BY bucket_start_ms",
398 )
399 .map_err(db_error)?;
400 let mut rows = stmt
401 .query(params![query.bucket_ms, query.start_ms, query.end_ms])
402 .map_err(db_error)?;
403 while let Some(row) = rows.next().map_err(db_error)? {
404 let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
405 let direction: String = row.get(1).map_err(db_error)?;
406 if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
407 let packets = row.get::<_, i64>(2).map_err(db_error)?;
408 let bytes = row.get::<_, i64>(3).map_err(db_error)?;
409 let active_keys = row.get::<_, i64>(4).map_err(db_error)?;
410 if direction == "rx" {
411 bucket["rx_packets"] = Value::from(packets);
412 bucket["rx_bytes"] = Value::from(bytes);
413 } else if direction == "tx" {
414 bucket["tx_packets"] = Value::from(packets);
415 bucket["tx_bytes"] = Value::from(bytes);
416 }
417 let current_active = bucket["active_keys"].as_i64().unwrap_or(0);
418 bucket["active_keys"] = Value::from(current_active + active_keys);
419 }
420 }
421
422 let mut series: Vec<Value> = buckets.into_values().collect();
423 finalize_packet_series(&mut series);
424 let average = if series.is_empty() {
425 0.0
426 } else {
427 series
428 .iter()
429 .map(|bucket| bucket["total_packets"].as_i64().unwrap_or(0) as f64)
430 .sum::<f64>()
431 / series.len() as f64
432 };
433 let busy_buckets: Vec<Value> = series
434 .iter()
435 .filter(|bucket| {
436 let total = bucket["total_packets"].as_i64().unwrap_or(0) as f64;
437 average > 0.0 && total > average * 2.0
438 })
439 .cloned()
440 .collect();
441
442 Ok(HttpResponse::ok(json!({
443 "db_path": db_path.display().to_string(),
444 "window": query.window_json(),
445 "bucket_seconds": query.bucket_ms / 1000,
446 "series": series,
447 "anomalies": {
448 "average_packets_per_bucket": average,
449 "busy_buckets": busy_buckets,
450 }
451 })))
452 })
453}
454
455pub fn handle_links(req: &HttpRequest, state: &SharedState) -> HttpResponse {
456 let params = parse_query(&req.query);
457 let query = match StatsQuery::from_params(¶ms) {
458 Ok(query) => query,
459 Err(err) => return HttpResponse::bad_request(&err),
460 };
461 let limit = parse_limit(¶ms);
462 with_db(state, |db_path, conn| {
463 if !has_table(conn, "link_event_samples")? {
464 return Ok(HttpResponse::ok(json!({
465 "db_path": db_path.display().to_string(),
466 "window": query.window_json(),
467 "bucket_seconds": query.bucket_ms / 1000,
468 "series": zero_link_buckets(&query).into_values().collect::<Vec<_>>(),
469 "interfaces": [],
470 "anomalies": {
471 "close_buckets": [],
472 }
473 })));
474 }
475 let mut buckets = zero_link_buckets(&query);
476 let mut stmt = conn
477 .prepare(
478 "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
479 event_type,
480 COUNT(*),
481 COUNT(DISTINCT hex(link_id))
482 FROM link_event_samples
483 WHERE ts_ms >= ?2 AND ts_ms < ?3
484 GROUP BY bucket_start_ms, event_type
485 ORDER BY bucket_start_ms",
486 )
487 .map_err(db_error)?;
488 let mut rows = stmt
489 .query(params![query.bucket_ms, query.start_ms, query.end_ms])
490 .map_err(db_error)?;
491 while let Some(row) = rows.next().map_err(db_error)? {
492 let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
493 let event_type: String = row.get(1).map_err(db_error)?;
494 if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
495 let count = row.get::<_, i64>(2).map_err(db_error)?;
496 let unique_links = row.get::<_, i64>(3).map_err(db_error)?;
497 match event_type.as_str() {
498 "requested" => bucket["requested"] = Value::from(count),
499 "established" => bucket["established"] = Value::from(count),
500 "closed" => bucket["closed"] = Value::from(count),
501 _ => {}
502 }
503 let current_unique = bucket["unique_links"].as_i64().unwrap_or(0);
504 bucket["unique_links"] = Value::from(current_unique.max(unique_links));
505 }
506 }
507
508 let series: Vec<Value> = buckets.into_values().collect();
509 let close_buckets: Vec<Value> = series
510 .iter()
511 .filter(|bucket| bucket["closed"].as_i64().unwrap_or(0) > 0)
512 .cloned()
513 .collect();
514 let mut iface_stmt = conn
515 .prepare(
516 "SELECT interface_id,
517 SUM(CASE WHEN event_type = 'requested' THEN 1 ELSE 0 END) AS requested_count,
518 SUM(CASE WHEN event_type = 'established' THEN 1 ELSE 0 END) AS established_count,
519 SUM(CASE WHEN event_type = 'closed' THEN 1 ELSE 0 END) AS closed_count,
520 COUNT(DISTINCT hex(link_id)) AS unique_links,
521 MAX(ts_ms) AS last_seen_ms
522 FROM link_event_samples
523 WHERE ts_ms >= ?1 AND ts_ms < ?2
524 GROUP BY interface_id
525 ORDER BY established_count DESC, requested_count DESC, closed_count DESC, last_seen_ms DESC
526 LIMIT ?3",
527 )
528 .map_err(db_error)?;
529 let interfaces = collect_rows(
530 iface_stmt
531 .query(params![query.start_ms, query.end_ms, limit as i64])
532 .map_err(db_error)?,
533 |row| {
534 Ok(json!({
535 "interface_id": row.get::<_, Option<i64>>(0).map_err(db_error)?,
536 "requested_count": row.get::<_, i64>(1).map_err(db_error)?,
537 "established_count": row.get::<_, i64>(2).map_err(db_error)?,
538 "closed_count": row.get::<_, i64>(3).map_err(db_error)?,
539 "unique_links": row.get::<_, i64>(4).map_err(db_error)?,
540 "last_seen_ms": row.get::<_, i64>(5).map_err(db_error)?,
541 }))
542 },
543 )?;
544
545 Ok(HttpResponse::ok(json!({
546 "db_path": db_path.display().to_string(),
547 "window": query.window_json(),
548 "bucket_seconds": query.bucket_ms / 1000,
549 "series": series,
550 "interfaces": interfaces,
551 "anomalies": {
552 "close_buckets": close_buckets,
553 }
554 })))
555 })
556}
557
558pub fn handle_system(req: &HttpRequest, state: &SharedState) -> HttpResponse {
559 let params = parse_query(&req.query);
560 let query = match StatsQuery::from_params(¶ms) {
561 Ok(query) => query,
562 Err(err) => return HttpResponse::bad_request(&err),
563 };
564 with_db(state, |db_path, conn| {
565 let mut buckets = zero_system_buckets(&query);
566 let mut process_stmt = conn
567 .prepare(
568 "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
569 AVG(rss_bytes), MAX(rss_bytes),
570 AVG(threads), MAX(threads),
571 AVG(fds), MAX(fds)
572 FROM process_samples
573 WHERE ts_ms >= ?2 AND ts_ms < ?3
574 GROUP BY bucket_start_ms
575 ORDER BY bucket_start_ms",
576 )
577 .map_err(db_error)?;
578 let mut process_rows = process_stmt
579 .query(params![query.bucket_ms, query.start_ms, query.end_ms])
580 .map_err(db_error)?;
581 while let Some(row) = process_rows.next().map_err(db_error)? {
582 let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
583 if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
584 bucket["avg_rss_bytes"] =
585 json_number_from_f64(row.get::<_, f64>(1).map_err(db_error)?);
586 bucket["max_rss_bytes"] = Value::from(row.get::<_, i64>(2).map_err(db_error)?);
587 bucket["avg_threads"] =
588 json_number_from_f64(row.get::<_, f64>(3).map_err(db_error)?);
589 bucket["max_threads"] = Value::from(row.get::<_, i64>(4).map_err(db_error)?);
590 bucket["avg_fds"] = json_number_from_f64(row.get::<_, f64>(5).map_err(db_error)?);
591 bucket["max_fds"] = Value::from(row.get::<_, i64>(6).map_err(db_error)?);
592 }
593 }
594 let mut drop_stmt = conn
595 .prepare(
596 "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
597 COALESCE(SUM(dropped_events), 0)
598 FROM provider_drop_samples
599 WHERE ts_ms >= ?2 AND ts_ms < ?3
600 GROUP BY bucket_start_ms
601 ORDER BY bucket_start_ms",
602 )
603 .map_err(db_error)?;
604 let mut drop_rows = drop_stmt
605 .query(params![query.bucket_ms, query.start_ms, query.end_ms])
606 .map_err(db_error)?;
607 while let Some(row) = drop_rows.next().map_err(db_error)? {
608 let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
609 if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
610 bucket["provider_dropped_events"] =
611 Value::from(row.get::<_, i64>(1).map_err(db_error)?);
612 }
613 }
614 let latest_process_sample = latest_process_sample(conn)?;
615 let nonzero_drop_buckets: Vec<Value> = buckets
616 .values()
617 .filter(|bucket| bucket["provider_dropped_events"].as_i64().unwrap_or(0) > 0)
618 .cloned()
619 .collect();
620 Ok(HttpResponse::ok(json!({
621 "db_path": db_path.display().to_string(),
622 "window": query.window_json(),
623 "bucket_seconds": query.bucket_ms / 1000,
624 "latest_process_sample": latest_process_sample,
625 "series": buckets.into_values().collect::<Vec<_>>(),
626 "anomalies": {
627 "provider_drop_buckets": nonzero_drop_buckets,
628 }
629 })))
630 })
631}
632
633fn with_db<F>(state: &SharedState, f: F) -> HttpResponse
634where
635 F: FnOnce(PathBuf, &Connection) -> Result<HttpResponse, String>,
636{
637 let db_path = match stats_db_path(state) {
638 Ok(path) => path,
639 Err(err) => return HttpResponse::internal_error(&err),
640 };
641 let conn = match open_readonly(&db_path) {
642 Ok(conn) => conn,
643 Err(err) => return HttpResponse::internal_error(&err),
644 };
645 match f(db_path, &conn) {
646 Ok(response) => response,
647 Err(err) => HttpResponse::internal_error(&err),
648 }
649}
650
651fn stats_db_path(state: &SharedState) -> Result<PathBuf, String> {
652 let state = read_state(state);
653 let config = state
654 .server_config
655 .as_ref()
656 .ok_or_else(|| "Server config is unavailable".to_string())?;
657 if config.stats_db_path.trim().is_empty() {
658 return Err("Stats DB path is not configured".into());
659 }
660 Ok(PathBuf::from(&config.stats_db_path))
661}
662
663fn open_readonly(path: &PathBuf) -> Result<Connection, String> {
664 if !path.exists() {
665 return Err(format!("Stats database does not exist: {}", path.display()));
666 }
667 let conn = Connection::open_with_flags(
668 path,
669 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
670 )
671 .map_err(db_error)?;
672 conn.busy_timeout(std::time::Duration::from_secs(2))
673 .map_err(db_error)?;
674 Ok(conn)
675}
676
677fn latest_process_sample(conn: &Connection) -> Result<Value, String> {
678 let mut stmt = conn
679 .prepare(
680 "SELECT ts_ms, pid, rss_bytes, cpu_user_ms, cpu_system_ms, threads, fds
681 FROM process_samples ORDER BY ts_ms DESC LIMIT 1",
682 )
683 .map_err(db_error)?;
684 let mut rows = stmt.query([]).map_err(db_error)?;
685 let Some(row) = rows.next().map_err(db_error)? else {
686 return Ok(Value::Null);
687 };
688 Ok(json!({
689 "ts_ms": row.get::<_, i64>(0).map_err(db_error)?,
690 "pid": row.get::<_, i64>(1).map_err(db_error)?,
691 "rss_bytes": row.get::<_, i64>(2).map_err(db_error)?,
692 "cpu_user_ms": row.get::<_, i64>(3).map_err(db_error)?,
693 "cpu_system_ms": row.get::<_, i64>(4).map_err(db_error)?,
694 "threads": row.get::<_, i64>(5).map_err(db_error)?,
695 "fds": row.get::<_, i64>(6).map_err(db_error)?,
696 }))
697}
698
699fn collect_rows<F>(mut rows: rusqlite::Rows<'_>, mut map: F) -> Result<Vec<Value>, String>
700where
701 F: FnMut(&rusqlite::Row<'_>) -> Result<Value, String>,
702{
703 let mut values = Vec::new();
704 while let Some(row) = rows.next().map_err(db_error)? {
705 values.push(map(row)?);
706 }
707 Ok(values)
708}
709
710fn has_table(conn: &Connection, table_name: &str) -> Result<bool, String> {
711 conn.query_row(
712 "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)",
713 params![table_name],
714 |row| row.get::<_, i64>(0),
715 )
716 .map(|value| value != 0)
717 .map_err(db_error)
718}
719
720fn parse_limit(params: &std::collections::HashMap<String, String>) -> usize {
721 params
722 .get("limit")
723 .and_then(|value| value.parse::<usize>().ok())
724 .map(|value| value.clamp(1, MAX_LIMIT))
725 .unwrap_or(DEFAULT_LIMIT)
726}
727
728fn db_error(err: rusqlite::Error) -> String {
729 format!("stats query failed: {}", err)
730}
731
732fn now_ms() -> i64 {
733 SystemTime::now()
734 .duration_since(UNIX_EPOCH)
735 .unwrap_or_default()
736 .as_millis() as i64
737}
738
739fn json_number_from_f64(value: f64) -> Value {
740 serde_json::Number::from_f64(value)
741 .map(Value::Number)
742 .unwrap_or(Value::Null)
743}
744
745fn zero_count_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
746 let mut buckets = BTreeMap::new();
747 let mut bucket_start = query.aligned_start_ms();
748 while bucket_start < query.end_ms {
749 buckets.insert(
750 bucket_start,
751 json!({
752 "bucket_start_ms": bucket_start,
753 "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
754 "announce_count": 0,
755 "unique_destinations": 0,
756 "unique_identities": 0,
757 }),
758 );
759 bucket_start += query.bucket_ms;
760 }
761 buckets
762}
763
764fn zero_system_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
765 let mut buckets = BTreeMap::new();
766 let mut bucket_start = query.aligned_start_ms();
767 while bucket_start < query.end_ms {
768 buckets.insert(
769 bucket_start,
770 json!({
771 "bucket_start_ms": bucket_start,
772 "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
773 "avg_rss_bytes": Value::Null,
774 "max_rss_bytes": Value::Null,
775 "avg_threads": Value::Null,
776 "max_threads": Value::Null,
777 "avg_fds": Value::Null,
778 "max_fds": Value::Null,
779 "provider_dropped_events": 0,
780 }),
781 );
782 bucket_start += query.bucket_ms;
783 }
784 buckets
785}
786
787fn zero_packet_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
788 let mut buckets = BTreeMap::new();
789 let mut bucket_start = query.aligned_start_ms();
790 while bucket_start < query.end_ms {
791 buckets.insert(
792 bucket_start,
793 json!({
794 "bucket_start_ms": bucket_start,
795 "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
796 "rx_packets": 0,
797 "tx_packets": 0,
798 "rx_bytes": 0,
799 "tx_bytes": 0,
800 "active_keys": 0,
801 }),
802 );
803 bucket_start += query.bucket_ms;
804 }
805 buckets
806}
807
808fn zero_link_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
809 let mut buckets = BTreeMap::new();
810 let mut bucket_start = query.aligned_start_ms();
811 while bucket_start < query.end_ms {
812 buckets.insert(
813 bucket_start,
814 json!({
815 "bucket_start_ms": bucket_start,
816 "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
817 "requested": 0,
818 "established": 0,
819 "closed": 0,
820 "unique_links": 0,
821 }),
822 );
823 bucket_start += query.bucket_ms;
824 }
825 buckets
826}
827
828fn finalize_packet_series(series: &mut [Value]) {
829 for bucket in series {
830 let total_packets =
831 bucket["rx_packets"].as_i64().unwrap_or(0) + bucket["tx_packets"].as_i64().unwrap_or(0);
832 let total_bytes =
833 bucket["rx_bytes"].as_i64().unwrap_or(0) + bucket["tx_bytes"].as_i64().unwrap_or(0);
834 bucket["total_packets"] = Value::from(total_packets);
835 bucket["total_bytes"] = Value::from(total_bytes);
836 }
837}
838
839struct StatsQuery {
840 start_ms: i64,
841 end_ms: i64,
842 bucket_ms: i64,
843 window_seconds: i64,
844}
845
846impl StatsQuery {
847 fn from_params(params: &std::collections::HashMap<String, String>) -> Result<Self, String> {
848 let end_ms = now_ms();
849 let window_seconds = params
850 .get("window")
851 .map(|value| parse_duration_seconds(value))
852 .transpose()?
853 .unwrap_or(DEFAULT_WINDOW_SECONDS)
854 .clamp(60, MAX_WINDOW_SECONDS);
855 let bucket_seconds = params
856 .get("bucket")
857 .map(|value| parse_duration_seconds(value))
858 .transpose()?
859 .unwrap_or_else(|| default_bucket_seconds(window_seconds))
860 .clamp(60, window_seconds.max(60));
861 Ok(Self {
862 start_ms: end_ms - window_seconds * 1000,
863 end_ms,
864 bucket_ms: bucket_seconds * 1000,
865 window_seconds,
866 })
867 }
868
869 fn aligned_start_ms(&self) -> i64 {
870 (self.start_ms / self.bucket_ms) * self.bucket_ms
871 }
872
873 fn window_json(&self) -> Value {
874 json!({
875 "start_ms": self.start_ms,
876 "end_ms": self.end_ms,
877 "seconds": self.window_seconds,
878 })
879 }
880}
881
882fn default_bucket_seconds(window_seconds: i64) -> i64 {
883 if window_seconds <= 3600 {
884 60
885 } else if window_seconds <= 24 * 3600 {
886 3600
887 } else if window_seconds <= 7 * 24 * 3600 {
888 6 * 3600
889 } else {
890 24 * 3600
891 }
892}
893
894fn parse_duration_seconds(raw: &str) -> Result<i64, String> {
895 let raw = raw.trim();
896 if raw.is_empty() {
897 return Err("duration cannot be empty".into());
898 }
899 if let Ok(seconds) = raw.parse::<i64>() {
900 if seconds <= 0 {
901 return Err("duration must be greater than 0".into());
902 }
903 return Ok(seconds);
904 }
905 let Some(last) = raw.chars().last() else {
906 return Err("duration cannot be empty".into());
907 };
908 let (digits, multiplier) = match last {
909 's' => (&raw[..raw.len() - 1], 1),
910 'm' => (&raw[..raw.len() - 1], 60),
911 'h' => (&raw[..raw.len() - 1], 60 * 60),
912 'd' => (&raw[..raw.len() - 1], 24 * 60 * 60),
913 'w' => (&raw[..raw.len() - 1], 7 * 24 * 60 * 60),
914 _ => return Err(format!("invalid duration '{}'", raw)),
915 };
916 let value = digits
917 .parse::<i64>()
918 .map_err(|_| format!("invalid duration '{}'", raw))?;
919 if value <= 0 {
920 return Err("duration must be greater than 0".into());
921 }
922 Ok(value * multiplier)
923}