Skip to main content

rns_ctl/
stats_api.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use rusqlite::{params, Connection, OpenFlags};
6use serde_json::{json, Value};
7
8use crate::http::{parse_query, HttpRequest, HttpResponse};
9use crate::state::{read_state, SharedState};
10
11const DEFAULT_WINDOW_SECONDS: i64 = 24 * 60 * 60;
12const MAX_WINDOW_SECONDS: i64 = 30 * 24 * 60 * 60;
13const DEFAULT_LIMIT: usize = 10;
14const MAX_LIMIT: usize = 100;
15
16pub fn handle_summary(req: &HttpRequest, state: &SharedState) -> HttpResponse {
17    let params = parse_query(&req.query);
18    let query = match StatsQuery::from_params(&params) {
19        Ok(query) => query,
20        Err(err) => return HttpResponse::bad_request(&err),
21    };
22    with_db(state, |db_path, conn| {
23        let announce_total: i64 = conn
24            .query_row(
25                "SELECT COUNT(*) FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
26                params![query.start_ms, query.end_ms],
27                |row| row.get(0),
28            )
29            .map_err(db_error)?;
30        let unique_destinations: i64 = conn
31            .query_row(
32                "SELECT COUNT(DISTINCT hex(destination_hash))
33                 FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
34                params![query.start_ms, query.end_ms],
35                |row| row.get(0),
36            )
37            .map_err(db_error)?;
38        let unique_identities: i64 = conn
39            .query_row(
40                "SELECT COUNT(DISTINCT hex(identity_hash))
41                 FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
42                params![query.start_ms, query.end_ms],
43                |row| row.get(0),
44            )
45            .map_err(db_error)?;
46        let unique_names: i64 = conn
47            .query_row(
48                "SELECT COUNT(DISTINCT hex(name_hash))
49                 FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
50                params![query.start_ms, query.end_ms],
51                |row| row.get(0),
52            )
53            .map_err(db_error)?;
54        let unique_interfaces: i64 = conn
55            .query_row(
56                "SELECT COUNT(DISTINCT interface_id)
57                 FROM seen_announces
58                 WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2 AND interface_id IS NOT NULL",
59                params![query.start_ms, query.end_ms],
60                |row| row.get(0),
61            )
62            .map_err(db_error)?;
63        let first_seen_ms: Option<i64> = conn
64            .query_row(
65                "SELECT MIN(seen_at_ms) FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
66                params![query.start_ms, query.end_ms],
67                |row| row.get(0),
68            )
69            .map_err(db_error)?;
70        let last_seen_ms: Option<i64> = conn
71            .query_row(
72                "SELECT MAX(seen_at_ms) FROM seen_announces WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2",
73                params![query.start_ms, query.end_ms],
74                |row| row.get(0),
75            )
76            .map_err(db_error)?;
77        let latest_process_sample = latest_process_sample(conn)?;
78        let provider_dropped_events: i64 = conn
79            .query_row(
80                "SELECT COALESCE(SUM(dropped_events), 0)
81                 FROM provider_drop_samples WHERE ts_ms >= ?1 AND ts_ms < ?2",
82                params![query.start_ms, query.end_ms],
83                |row| row.get(0),
84            )
85            .map_err(db_error)?;
86
87        let mut rx_packets = 0i64;
88        let mut tx_packets = 0i64;
89        let mut rx_bytes = 0i64;
90        let mut tx_bytes = 0i64;
91        let mut latest_packet_update_ms = None;
92        let mut stmt = conn
93            .prepare(
94                "SELECT direction,
95                        COALESCE(SUM(packets), 0),
96                        COALESCE(SUM(bytes), 0),
97                        MAX(updated_at_ms)
98                 FROM packet_counters
99                 GROUP BY direction",
100            )
101            .map_err(db_error)?;
102        let mut rows = stmt.query([]).map_err(db_error)?;
103        while let Some(row) = rows.next().map_err(db_error)? {
104            let direction: String = row.get(0).map_err(db_error)?;
105            let packets: i64 = row.get(1).map_err(db_error)?;
106            let bytes: i64 = row.get(2).map_err(db_error)?;
107            let updated_at_ms: Option<i64> = row.get(3).map_err(db_error)?;
108            if direction == "in" {
109                rx_packets = packets;
110                rx_bytes = bytes;
111            } else if direction == "out" {
112                tx_packets = packets;
113                tx_bytes = bytes;
114            }
115            latest_packet_update_ms = latest_packet_update_ms.max(updated_at_ms);
116        }
117        let active_counters_in_window: i64 = conn
118            .query_row(
119                "SELECT COUNT(*) FROM packet_counters WHERE updated_at_ms >= ?1 AND updated_at_ms < ?2",
120                params![query.start_ms, query.end_ms],
121                |row| row.get(0),
122            )
123            .map_err(db_error)?;
124
125        Ok(HttpResponse::ok(json!({
126            "db_path": db_path.display().to_string(),
127            "generated_at_ms": query.end_ms,
128            "window": query.window_json(),
129            "announces": {
130                "total": announce_total,
131                "unique_destinations": unique_destinations,
132                "unique_identities": unique_identities,
133                "unique_names": unique_names,
134                "unique_interfaces": unique_interfaces,
135                "first_seen_ms": first_seen_ms,
136                "last_seen_ms": last_seen_ms,
137            },
138            "packets": {
139                "scope": "lifetime_counters_snapshot",
140                "rx_packets": rx_packets,
141                "tx_packets": tx_packets,
142                "rx_bytes": rx_bytes,
143                "tx_bytes": tx_bytes,
144                "active_counters_in_window": active_counters_in_window,
145                "latest_updated_at_ms": latest_packet_update_ms,
146            },
147            "system": {
148                "provider_dropped_events": provider_dropped_events,
149                "latest_process_sample": latest_process_sample,
150            }
151        })))
152    })
153}
154
155pub fn handle_announces(req: &HttpRequest, state: &SharedState) -> HttpResponse {
156    let params = parse_query(&req.query);
157    let query = match StatsQuery::from_params(&params) {
158        Ok(query) => query,
159        Err(err) => return HttpResponse::bad_request(&err),
160    };
161    with_db(state, |db_path, conn| {
162        let mut buckets = zero_count_buckets(&query);
163        let mut stmt = conn
164            .prepare(
165                "SELECT (seen_at_ms / ?1) * ?1 AS bucket_start_ms,
166                        COUNT(*) AS announce_count,
167                        COUNT(DISTINCT hex(destination_hash)) AS unique_destinations,
168                        COUNT(DISTINCT hex(identity_hash)) AS unique_identities
169                 FROM seen_announces
170                 WHERE seen_at_ms >= ?2 AND seen_at_ms < ?3
171                 GROUP BY bucket_start_ms
172                 ORDER BY bucket_start_ms",
173            )
174            .map_err(db_error)?;
175        let mut rows = stmt
176            .query(params![query.bucket_ms, query.start_ms, query.end_ms])
177            .map_err(db_error)?;
178        while let Some(row) = rows.next().map_err(db_error)? {
179            let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
180            if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
181                bucket["announce_count"] = Value::from(row.get::<_, i64>(1).map_err(db_error)?);
182                bucket["unique_destinations"] =
183                    Value::from(row.get::<_, i64>(2).map_err(db_error)?);
184                bucket["unique_identities"] = Value::from(row.get::<_, i64>(3).map_err(db_error)?);
185            }
186        }
187
188        let series: Vec<Value> = buckets.into_values().collect();
189        let average = if series.is_empty() {
190            0.0
191        } else {
192            series
193                .iter()
194                .map(|bucket| bucket["announce_count"].as_i64().unwrap_or(0) as f64)
195                .sum::<f64>()
196                / series.len() as f64
197        };
198        let burst_buckets: Vec<Value> = series
199            .iter()
200            .filter(|bucket| {
201                let count = bucket["announce_count"].as_i64().unwrap_or(0) as f64;
202                average > 0.0 && count > average * 2.0
203            })
204            .cloned()
205            .collect();
206
207        Ok(HttpResponse::ok(json!({
208            "db_path": db_path.display().to_string(),
209            "window": query.window_json(),
210            "bucket_seconds": query.bucket_ms / 1000,
211            "series": series,
212            "anomalies": {
213                "average_announce_count_per_bucket": average,
214                "burst_buckets": burst_buckets,
215            }
216        })))
217    })
218}
219
220pub fn handle_interfaces(req: &HttpRequest, state: &SharedState) -> HttpResponse {
221    let params = parse_query(&req.query);
222    let query = match StatsQuery::from_params(&params) {
223        Ok(query) => query,
224        Err(err) => return HttpResponse::bad_request(&err),
225    };
226    let limit = parse_limit(&params);
227    with_db(state, |db_path, conn| {
228        let mut stmt = conn
229            .prepare(
230                "SELECT interface_id,
231                        COUNT(*) AS announce_count,
232                        COUNT(DISTINCT hex(destination_hash)) AS unique_destinations,
233                        COUNT(DISTINCT hex(identity_hash)) AS unique_identities,
234                        MIN(hops) AS min_hops,
235                        MAX(hops) AS max_hops,
236                        MAX(seen_at_ms) AS last_seen_ms
237                 FROM seen_announces
238                 WHERE seen_at_ms >= ?1 AND seen_at_ms < ?2
239                 GROUP BY interface_id
240                 ORDER BY announce_count DESC, last_seen_ms DESC
241                 LIMIT ?3",
242            )
243            .map_err(db_error)?;
244        let entries = collect_rows(
245            stmt.query(params![query.start_ms, query.end_ms, limit as i64])
246                .map_err(db_error)?,
247            |row| {
248                Ok(json!({
249                    "interface_id": row.get::<_, Option<i64>>(0).map_err(db_error)?,
250                    "announce_count": row.get::<_, i64>(1).map_err(db_error)?,
251                    "unique_destinations": row.get::<_, i64>(2).map_err(db_error)?,
252                    "unique_identities": row.get::<_, i64>(3).map_err(db_error)?,
253                    "min_hops": row.get::<_, i64>(4).map_err(db_error)?,
254                    "max_hops": row.get::<_, i64>(5).map_err(db_error)?,
255                    "last_seen_ms": row.get::<_, i64>(6).map_err(db_error)?,
256                }))
257            },
258        )?;
259        Ok(HttpResponse::ok(json!({
260            "db_path": db_path.display().to_string(),
261            "window": query.window_json(),
262            "limit": limit,
263            "interfaces": entries,
264        })))
265    })
266}
267
268pub fn handle_destinations(req: &HttpRequest, state: &SharedState) -> HttpResponse {
269    let params = parse_query(&req.query);
270    let query = match StatsQuery::from_params(&params) {
271        Ok(query) => query,
272        Err(err) => return HttpResponse::bad_request(&err),
273    };
274    let limit = parse_limit(&params);
275    with_db(state, |db_path, conn| {
276        let mut stmt = conn
277            .prepare(
278                "SELECT hex(a.destination_hash) AS destination_hash,
279                        hex(MAX(a.identity_hash)) AS identity_hash,
280                        hex(MAX(a.name_hash)) AS name_hash,
281                        COUNT(*) AS announce_count,
282                        MIN(a.seen_at_ms) AS first_seen_ms,
283                        MAX(a.seen_at_ms) AS last_seen_ms,
284                        MIN(a.hops) AS min_hops,
285                        MAX(a.hops) AS max_hops,
286                        d.announce_count AS lifetime_announce_count,
287                        d.last_interface_id AS last_interface_id
288                 FROM seen_announces a
289                 LEFT JOIN seen_destinations d ON d.destination_hash = a.destination_hash
290                 WHERE a.seen_at_ms >= ?1 AND a.seen_at_ms < ?2
291                 GROUP BY a.destination_hash
292                 ORDER BY announce_count DESC, last_seen_ms DESC
293                 LIMIT ?3",
294            )
295            .map_err(db_error)?;
296        let entries = collect_rows(
297            stmt.query(params![query.start_ms, query.end_ms, limit as i64])
298                .map_err(db_error)?,
299            |row| {
300                Ok(json!({
301                    "destination_hash": row.get::<_, String>(0).map_err(db_error)?.to_lowercase(),
302                    "identity_hash": row.get::<_, String>(1).map_err(db_error)?.to_lowercase(),
303                    "name_hash": row.get::<_, String>(2).map_err(db_error)?.to_lowercase(),
304                    "announce_count": row.get::<_, i64>(3).map_err(db_error)?,
305                    "first_seen_ms": row.get::<_, i64>(4).map_err(db_error)?,
306                    "last_seen_ms": row.get::<_, i64>(5).map_err(db_error)?,
307                    "min_hops": row.get::<_, i64>(6).map_err(db_error)?,
308                    "max_hops": row.get::<_, i64>(7).map_err(db_error)?,
309                    "lifetime_announce_count": row.get::<_, Option<i64>>(8).map_err(db_error)?,
310                    "last_interface_id": row.get::<_, Option<i64>>(9).map_err(db_error)?,
311                }))
312            },
313        )?;
314        Ok(HttpResponse::ok(json!({
315            "db_path": db_path.display().to_string(),
316            "window": query.window_json(),
317            "limit": limit,
318            "destinations": entries,
319        })))
320    })
321}
322
323pub fn handle_packets(req: &HttpRequest, state: &SharedState) -> HttpResponse {
324    let params = parse_query(&req.query);
325    let query = match StatsQuery::from_params(&params) {
326        Ok(query) => query,
327        Err(err) => return HttpResponse::bad_request(&err),
328    };
329    let limit = parse_limit(&params);
330    with_db(state, |db_path, conn| {
331        let mut stmt = conn
332            .prepare(
333                "SELECT interface_key, interface_id, direction, packet_type, packets, bytes, updated_at_ms
334                 FROM packet_counters
335                 WHERE updated_at_ms >= ?1 AND updated_at_ms < ?2
336                 ORDER BY packets DESC, bytes DESC
337                 LIMIT ?3",
338            )
339            .map_err(db_error)?;
340        let counters = collect_rows(
341            stmt.query(params![query.start_ms, query.end_ms, limit as i64])
342                .map_err(db_error)?,
343            |row| {
344                Ok(json!({
345                    "interface_key": row.get::<_, String>(0).map_err(db_error)?,
346                    "interface_id": row.get::<_, Option<i64>>(1).map_err(db_error)?,
347                    "direction": row.get::<_, String>(2).map_err(db_error)?,
348                    "packet_type": row.get::<_, String>(3).map_err(db_error)?,
349                    "packets": row.get::<_, i64>(4).map_err(db_error)?,
350                    "bytes": row.get::<_, i64>(5).map_err(db_error)?,
351                    "updated_at_ms": row.get::<_, i64>(6).map_err(db_error)?,
352                }))
353            },
354        )?;
355        Ok(HttpResponse::ok(json!({
356            "db_path": db_path.display().to_string(),
357            "window": query.window_json(),
358            "limit": limit,
359            "scope": "lifetime_counters_filtered_by_recent_activity",
360            "counters": counters,
361        })))
362    })
363}
364
365pub fn handle_packet_series(req: &HttpRequest, state: &SharedState) -> HttpResponse {
366    let params = parse_query(&req.query);
367    let query = match StatsQuery::from_params(&params) {
368        Ok(query) => query,
369        Err(err) => return HttpResponse::bad_request(&err),
370    };
371    with_db(state, |db_path, conn| {
372        if !has_table(conn, "packet_samples")? {
373            let mut series: Vec<Value> = zero_packet_buckets(&query).into_values().collect();
374            finalize_packet_series(&mut series);
375            return Ok(HttpResponse::ok(json!({
376                "db_path": db_path.display().to_string(),
377                "window": query.window_json(),
378                "bucket_seconds": query.bucket_ms / 1000,
379                "series": series,
380                "anomalies": {
381                    "average_packets_per_bucket": 0.0,
382                    "busy_buckets": [],
383                }
384            })));
385        }
386        let mut buckets = zero_packet_buckets(&query);
387        let mut stmt = conn
388            .prepare(
389                "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
390                        direction,
391                        COALESCE(SUM(packets), 0),
392                        COALESCE(SUM(bytes), 0),
393                        COUNT(DISTINCT interface_key || ':' || packet_type)
394                 FROM packet_samples
395                 WHERE ts_ms >= ?2 AND ts_ms < ?3
396                 GROUP BY bucket_start_ms, direction
397                 ORDER BY bucket_start_ms",
398            )
399            .map_err(db_error)?;
400        let mut rows = stmt
401            .query(params![query.bucket_ms, query.start_ms, query.end_ms])
402            .map_err(db_error)?;
403        while let Some(row) = rows.next().map_err(db_error)? {
404            let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
405            let direction: String = row.get(1).map_err(db_error)?;
406            if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
407                let packets = row.get::<_, i64>(2).map_err(db_error)?;
408                let bytes = row.get::<_, i64>(3).map_err(db_error)?;
409                let active_keys = row.get::<_, i64>(4).map_err(db_error)?;
410                if direction == "rx" {
411                    bucket["rx_packets"] = Value::from(packets);
412                    bucket["rx_bytes"] = Value::from(bytes);
413                } else if direction == "tx" {
414                    bucket["tx_packets"] = Value::from(packets);
415                    bucket["tx_bytes"] = Value::from(bytes);
416                }
417                let current_active = bucket["active_keys"].as_i64().unwrap_or(0);
418                bucket["active_keys"] = Value::from(current_active + active_keys);
419            }
420        }
421
422        let mut series: Vec<Value> = buckets.into_values().collect();
423        finalize_packet_series(&mut series);
424        let average = if series.is_empty() {
425            0.0
426        } else {
427            series
428                .iter()
429                .map(|bucket| bucket["total_packets"].as_i64().unwrap_or(0) as f64)
430                .sum::<f64>()
431                / series.len() as f64
432        };
433        let busy_buckets: Vec<Value> = series
434            .iter()
435            .filter(|bucket| {
436                let total = bucket["total_packets"].as_i64().unwrap_or(0) as f64;
437                average > 0.0 && total > average * 2.0
438            })
439            .cloned()
440            .collect();
441
442        Ok(HttpResponse::ok(json!({
443            "db_path": db_path.display().to_string(),
444            "window": query.window_json(),
445            "bucket_seconds": query.bucket_ms / 1000,
446            "series": series,
447            "anomalies": {
448                "average_packets_per_bucket": average,
449                "busy_buckets": busy_buckets,
450            }
451        })))
452    })
453}
454
455pub fn handle_links(req: &HttpRequest, state: &SharedState) -> HttpResponse {
456    let params = parse_query(&req.query);
457    let query = match StatsQuery::from_params(&params) {
458        Ok(query) => query,
459        Err(err) => return HttpResponse::bad_request(&err),
460    };
461    let limit = parse_limit(&params);
462    with_db(state, |db_path, conn| {
463        if !has_table(conn, "link_event_samples")? {
464            return Ok(HttpResponse::ok(json!({
465                "db_path": db_path.display().to_string(),
466                "window": query.window_json(),
467                "bucket_seconds": query.bucket_ms / 1000,
468                "series": zero_link_buckets(&query).into_values().collect::<Vec<_>>(),
469                "interfaces": [],
470                "anomalies": {
471                    "close_buckets": [],
472                }
473            })));
474        }
475        let mut buckets = zero_link_buckets(&query);
476        let mut stmt = conn
477            .prepare(
478                "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
479                        event_type,
480                        COUNT(*),
481                        COUNT(DISTINCT hex(link_id))
482                 FROM link_event_samples
483                 WHERE ts_ms >= ?2 AND ts_ms < ?3
484                 GROUP BY bucket_start_ms, event_type
485                 ORDER BY bucket_start_ms",
486            )
487            .map_err(db_error)?;
488        let mut rows = stmt
489            .query(params![query.bucket_ms, query.start_ms, query.end_ms])
490            .map_err(db_error)?;
491        while let Some(row) = rows.next().map_err(db_error)? {
492            let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
493            let event_type: String = row.get(1).map_err(db_error)?;
494            if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
495                let count = row.get::<_, i64>(2).map_err(db_error)?;
496                let unique_links = row.get::<_, i64>(3).map_err(db_error)?;
497                match event_type.as_str() {
498                    "requested" => bucket["requested"] = Value::from(count),
499                    "established" => bucket["established"] = Value::from(count),
500                    "closed" => bucket["closed"] = Value::from(count),
501                    _ => {}
502                }
503                let current_unique = bucket["unique_links"].as_i64().unwrap_or(0);
504                bucket["unique_links"] = Value::from(current_unique.max(unique_links));
505            }
506        }
507
508        let series: Vec<Value> = buckets.into_values().collect();
509        let close_buckets: Vec<Value> = series
510            .iter()
511            .filter(|bucket| bucket["closed"].as_i64().unwrap_or(0) > 0)
512            .cloned()
513            .collect();
514        let mut iface_stmt = conn
515            .prepare(
516                "SELECT interface_id,
517                        SUM(CASE WHEN event_type = 'requested' THEN 1 ELSE 0 END) AS requested_count,
518                        SUM(CASE WHEN event_type = 'established' THEN 1 ELSE 0 END) AS established_count,
519                        SUM(CASE WHEN event_type = 'closed' THEN 1 ELSE 0 END) AS closed_count,
520                        COUNT(DISTINCT hex(link_id)) AS unique_links,
521                        MAX(ts_ms) AS last_seen_ms
522                 FROM link_event_samples
523                 WHERE ts_ms >= ?1 AND ts_ms < ?2
524                 GROUP BY interface_id
525                 ORDER BY established_count DESC, requested_count DESC, closed_count DESC, last_seen_ms DESC
526                 LIMIT ?3",
527            )
528            .map_err(db_error)?;
529        let interfaces = collect_rows(
530            iface_stmt
531                .query(params![query.start_ms, query.end_ms, limit as i64])
532                .map_err(db_error)?,
533            |row| {
534                Ok(json!({
535                    "interface_id": row.get::<_, Option<i64>>(0).map_err(db_error)?,
536                    "requested_count": row.get::<_, i64>(1).map_err(db_error)?,
537                    "established_count": row.get::<_, i64>(2).map_err(db_error)?,
538                    "closed_count": row.get::<_, i64>(3).map_err(db_error)?,
539                    "unique_links": row.get::<_, i64>(4).map_err(db_error)?,
540                    "last_seen_ms": row.get::<_, i64>(5).map_err(db_error)?,
541                }))
542            },
543        )?;
544
545        Ok(HttpResponse::ok(json!({
546            "db_path": db_path.display().to_string(),
547            "window": query.window_json(),
548            "bucket_seconds": query.bucket_ms / 1000,
549            "series": series,
550            "interfaces": interfaces,
551            "anomalies": {
552                "close_buckets": close_buckets,
553            }
554        })))
555    })
556}
557
558pub fn handle_system(req: &HttpRequest, state: &SharedState) -> HttpResponse {
559    let params = parse_query(&req.query);
560    let query = match StatsQuery::from_params(&params) {
561        Ok(query) => query,
562        Err(err) => return HttpResponse::bad_request(&err),
563    };
564    with_db(state, |db_path, conn| {
565        let mut buckets = zero_system_buckets(&query);
566        let mut process_stmt = conn
567            .prepare(
568                "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
569                        AVG(rss_bytes), MAX(rss_bytes),
570                        AVG(threads), MAX(threads),
571                        AVG(fds), MAX(fds)
572                 FROM process_samples
573                 WHERE ts_ms >= ?2 AND ts_ms < ?3
574                 GROUP BY bucket_start_ms
575                 ORDER BY bucket_start_ms",
576            )
577            .map_err(db_error)?;
578        let mut process_rows = process_stmt
579            .query(params![query.bucket_ms, query.start_ms, query.end_ms])
580            .map_err(db_error)?;
581        while let Some(row) = process_rows.next().map_err(db_error)? {
582            let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
583            if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
584                bucket["avg_rss_bytes"] =
585                    json_number_from_f64(row.get::<_, f64>(1).map_err(db_error)?);
586                bucket["max_rss_bytes"] = Value::from(row.get::<_, i64>(2).map_err(db_error)?);
587                bucket["avg_threads"] =
588                    json_number_from_f64(row.get::<_, f64>(3).map_err(db_error)?);
589                bucket["max_threads"] = Value::from(row.get::<_, i64>(4).map_err(db_error)?);
590                bucket["avg_fds"] = json_number_from_f64(row.get::<_, f64>(5).map_err(db_error)?);
591                bucket["max_fds"] = Value::from(row.get::<_, i64>(6).map_err(db_error)?);
592            }
593        }
594        let mut drop_stmt = conn
595            .prepare(
596                "SELECT (ts_ms / ?1) * ?1 AS bucket_start_ms,
597                        COALESCE(SUM(dropped_events), 0)
598                 FROM provider_drop_samples
599                 WHERE ts_ms >= ?2 AND ts_ms < ?3
600                 GROUP BY bucket_start_ms
601                 ORDER BY bucket_start_ms",
602            )
603            .map_err(db_error)?;
604        let mut drop_rows = drop_stmt
605            .query(params![query.bucket_ms, query.start_ms, query.end_ms])
606            .map_err(db_error)?;
607        while let Some(row) = drop_rows.next().map_err(db_error)? {
608            let bucket_start_ms: i64 = row.get(0).map_err(db_error)?;
609            if let Some(bucket) = buckets.get_mut(&bucket_start_ms) {
610                bucket["provider_dropped_events"] =
611                    Value::from(row.get::<_, i64>(1).map_err(db_error)?);
612            }
613        }
614        let latest_process_sample = latest_process_sample(conn)?;
615        let nonzero_drop_buckets: Vec<Value> = buckets
616            .values()
617            .filter(|bucket| bucket["provider_dropped_events"].as_i64().unwrap_or(0) > 0)
618            .cloned()
619            .collect();
620        Ok(HttpResponse::ok(json!({
621            "db_path": db_path.display().to_string(),
622            "window": query.window_json(),
623            "bucket_seconds": query.bucket_ms / 1000,
624            "latest_process_sample": latest_process_sample,
625            "series": buckets.into_values().collect::<Vec<_>>(),
626            "anomalies": {
627                "provider_drop_buckets": nonzero_drop_buckets,
628            }
629        })))
630    })
631}
632
633fn with_db<F>(state: &SharedState, f: F) -> HttpResponse
634where
635    F: FnOnce(PathBuf, &Connection) -> Result<HttpResponse, String>,
636{
637    let db_path = match stats_db_path(state) {
638        Ok(path) => path,
639        Err(err) => return HttpResponse::internal_error(&err),
640    };
641    let conn = match open_readonly(&db_path) {
642        Ok(conn) => conn,
643        Err(err) => return HttpResponse::internal_error(&err),
644    };
645    match f(db_path, &conn) {
646        Ok(response) => response,
647        Err(err) => HttpResponse::internal_error(&err),
648    }
649}
650
651fn stats_db_path(state: &SharedState) -> Result<PathBuf, String> {
652    let state = read_state(state);
653    let config = state
654        .server_config
655        .as_ref()
656        .ok_or_else(|| "Server config is unavailable".to_string())?;
657    if config.stats_db_path.trim().is_empty() {
658        return Err("Stats DB path is not configured".into());
659    }
660    Ok(PathBuf::from(&config.stats_db_path))
661}
662
663fn open_readonly(path: &PathBuf) -> Result<Connection, String> {
664    if !path.exists() {
665        return Err(format!("Stats database does not exist: {}", path.display()));
666    }
667    let conn = Connection::open_with_flags(
668        path,
669        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
670    )
671    .map_err(db_error)?;
672    conn.busy_timeout(std::time::Duration::from_secs(2))
673        .map_err(db_error)?;
674    Ok(conn)
675}
676
677fn latest_process_sample(conn: &Connection) -> Result<Value, String> {
678    let mut stmt = conn
679        .prepare(
680            "SELECT ts_ms, pid, rss_bytes, cpu_user_ms, cpu_system_ms, threads, fds
681             FROM process_samples ORDER BY ts_ms DESC LIMIT 1",
682        )
683        .map_err(db_error)?;
684    let mut rows = stmt.query([]).map_err(db_error)?;
685    let Some(row) = rows.next().map_err(db_error)? else {
686        return Ok(Value::Null);
687    };
688    Ok(json!({
689        "ts_ms": row.get::<_, i64>(0).map_err(db_error)?,
690        "pid": row.get::<_, i64>(1).map_err(db_error)?,
691        "rss_bytes": row.get::<_, i64>(2).map_err(db_error)?,
692        "cpu_user_ms": row.get::<_, i64>(3).map_err(db_error)?,
693        "cpu_system_ms": row.get::<_, i64>(4).map_err(db_error)?,
694        "threads": row.get::<_, i64>(5).map_err(db_error)?,
695        "fds": row.get::<_, i64>(6).map_err(db_error)?,
696    }))
697}
698
699fn collect_rows<F>(mut rows: rusqlite::Rows<'_>, mut map: F) -> Result<Vec<Value>, String>
700where
701    F: FnMut(&rusqlite::Row<'_>) -> Result<Value, String>,
702{
703    let mut values = Vec::new();
704    while let Some(row) = rows.next().map_err(db_error)? {
705        values.push(map(row)?);
706    }
707    Ok(values)
708}
709
710fn has_table(conn: &Connection, table_name: &str) -> Result<bool, String> {
711    conn.query_row(
712        "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)",
713        params![table_name],
714        |row| row.get::<_, i64>(0),
715    )
716    .map(|value| value != 0)
717    .map_err(db_error)
718}
719
720fn parse_limit(params: &std::collections::HashMap<String, String>) -> usize {
721    params
722        .get("limit")
723        .and_then(|value| value.parse::<usize>().ok())
724        .map(|value| value.clamp(1, MAX_LIMIT))
725        .unwrap_or(DEFAULT_LIMIT)
726}
727
728fn db_error(err: rusqlite::Error) -> String {
729    format!("stats query failed: {}", err)
730}
731
732fn now_ms() -> i64 {
733    SystemTime::now()
734        .duration_since(UNIX_EPOCH)
735        .unwrap_or_default()
736        .as_millis() as i64
737}
738
739fn json_number_from_f64(value: f64) -> Value {
740    serde_json::Number::from_f64(value)
741        .map(Value::Number)
742        .unwrap_or(Value::Null)
743}
744
745fn zero_count_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
746    let mut buckets = BTreeMap::new();
747    let mut bucket_start = query.aligned_start_ms();
748    while bucket_start < query.end_ms {
749        buckets.insert(
750            bucket_start,
751            json!({
752                "bucket_start_ms": bucket_start,
753                "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
754                "announce_count": 0,
755                "unique_destinations": 0,
756                "unique_identities": 0,
757            }),
758        );
759        bucket_start += query.bucket_ms;
760    }
761    buckets
762}
763
764fn zero_system_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
765    let mut buckets = BTreeMap::new();
766    let mut bucket_start = query.aligned_start_ms();
767    while bucket_start < query.end_ms {
768        buckets.insert(
769            bucket_start,
770            json!({
771                "bucket_start_ms": bucket_start,
772                "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
773                "avg_rss_bytes": Value::Null,
774                "max_rss_bytes": Value::Null,
775                "avg_threads": Value::Null,
776                "max_threads": Value::Null,
777                "avg_fds": Value::Null,
778                "max_fds": Value::Null,
779                "provider_dropped_events": 0,
780            }),
781        );
782        bucket_start += query.bucket_ms;
783    }
784    buckets
785}
786
787fn zero_packet_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
788    let mut buckets = BTreeMap::new();
789    let mut bucket_start = query.aligned_start_ms();
790    while bucket_start < query.end_ms {
791        buckets.insert(
792            bucket_start,
793            json!({
794                "bucket_start_ms": bucket_start,
795                "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
796                "rx_packets": 0,
797                "tx_packets": 0,
798                "rx_bytes": 0,
799                "tx_bytes": 0,
800                "active_keys": 0,
801            }),
802        );
803        bucket_start += query.bucket_ms;
804    }
805    buckets
806}
807
808fn zero_link_buckets(query: &StatsQuery) -> BTreeMap<i64, Value> {
809    let mut buckets = BTreeMap::new();
810    let mut bucket_start = query.aligned_start_ms();
811    while bucket_start < query.end_ms {
812        buckets.insert(
813            bucket_start,
814            json!({
815                "bucket_start_ms": bucket_start,
816                "bucket_end_ms": (bucket_start + query.bucket_ms).min(query.end_ms),
817                "requested": 0,
818                "established": 0,
819                "closed": 0,
820                "unique_links": 0,
821            }),
822        );
823        bucket_start += query.bucket_ms;
824    }
825    buckets
826}
827
828fn finalize_packet_series(series: &mut [Value]) {
829    for bucket in series {
830        let total_packets =
831            bucket["rx_packets"].as_i64().unwrap_or(0) + bucket["tx_packets"].as_i64().unwrap_or(0);
832        let total_bytes =
833            bucket["rx_bytes"].as_i64().unwrap_or(0) + bucket["tx_bytes"].as_i64().unwrap_or(0);
834        bucket["total_packets"] = Value::from(total_packets);
835        bucket["total_bytes"] = Value::from(total_bytes);
836    }
837}
838
839struct StatsQuery {
840    start_ms: i64,
841    end_ms: i64,
842    bucket_ms: i64,
843    window_seconds: i64,
844}
845
846impl StatsQuery {
847    fn from_params(params: &std::collections::HashMap<String, String>) -> Result<Self, String> {
848        let end_ms = now_ms();
849        let window_seconds = params
850            .get("window")
851            .map(|value| parse_duration_seconds(value))
852            .transpose()?
853            .unwrap_or(DEFAULT_WINDOW_SECONDS)
854            .clamp(60, MAX_WINDOW_SECONDS);
855        let bucket_seconds = params
856            .get("bucket")
857            .map(|value| parse_duration_seconds(value))
858            .transpose()?
859            .unwrap_or_else(|| default_bucket_seconds(window_seconds))
860            .clamp(60, window_seconds.max(60));
861        Ok(Self {
862            start_ms: end_ms - window_seconds * 1000,
863            end_ms,
864            bucket_ms: bucket_seconds * 1000,
865            window_seconds,
866        })
867    }
868
869    fn aligned_start_ms(&self) -> i64 {
870        (self.start_ms / self.bucket_ms) * self.bucket_ms
871    }
872
873    fn window_json(&self) -> Value {
874        json!({
875            "start_ms": self.start_ms,
876            "end_ms": self.end_ms,
877            "seconds": self.window_seconds,
878        })
879    }
880}
881
882fn default_bucket_seconds(window_seconds: i64) -> i64 {
883    if window_seconds <= 3600 {
884        60
885    } else if window_seconds <= 24 * 3600 {
886        3600
887    } else if window_seconds <= 7 * 24 * 3600 {
888        6 * 3600
889    } else {
890        24 * 3600
891    }
892}
893
894fn parse_duration_seconds(raw: &str) -> Result<i64, String> {
895    let raw = raw.trim();
896    if raw.is_empty() {
897        return Err("duration cannot be empty".into());
898    }
899    if let Ok(seconds) = raw.parse::<i64>() {
900        if seconds <= 0 {
901            return Err("duration must be greater than 0".into());
902        }
903        return Ok(seconds);
904    }
905    let Some(last) = raw.chars().last() else {
906        return Err("duration cannot be empty".into());
907    };
908    let (digits, multiplier) = match last {
909        's' => (&raw[..raw.len() - 1], 1),
910        'm' => (&raw[..raw.len() - 1], 60),
911        'h' => (&raw[..raw.len() - 1], 60 * 60),
912        'd' => (&raw[..raw.len() - 1], 24 * 60 * 60),
913        'w' => (&raw[..raw.len() - 1], 7 * 24 * 60 * 60),
914        _ => return Err(format!("invalid duration '{}'", raw)),
915    };
916    let value = digits
917        .parse::<i64>()
918        .map_err(|_| format!("invalid duration '{}'", raw))?;
919    if value <= 0 {
920        return Err("duration must be greater than 0".into());
921    }
922    Ok(value * multiplier)
923}