streamkit_core/
stats.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Node statistics tracking and reporting.
6//!
7//! This module provides types and utilities for collecting runtime statistics
8//! from nodes during pipeline execution. Statistics are throttled to prevent
9//! overload (typically every 10 seconds or 1000 packets).
10
11use serde::{Deserialize, Serialize};
12use std::time::SystemTime;
13use ts_rs::TS;
14
15/// Runtime statistics for a node, tracking packet processing metrics.
16#[derive(Debug, Clone, Serialize, Deserialize, TS)]
17#[ts(export)]
18pub struct NodeStats {
19    /// Total packets received on all input pins
20    pub received: u64,
21    /// Total packets successfully sent on all output pins
22    pub sent: u64,
23    /// Total packets discarded (e.g., due to backpressure, invalid data)
24    pub discarded: u64,
25    /// Total processing errors that didn't crash the node
26    pub errored: u64,
27    /// Duration in seconds since the node started processing (for rate calculation)
28    pub duration_secs: f64,
29}
30
31impl Default for NodeStats {
32    fn default() -> Self {
33        Self { received: 0, sent: 0, discarded: 0, errored: 0, duration_secs: 0.0 }
34    }
35}
36
37/// A statistics update message sent by a node to report its current metrics.
38/// These updates are throttled to prevent overload (typically every 10s or 1000 packets).
39#[derive(Debug, Clone)]
40pub struct NodeStatsUpdate {
41    /// The unique identifier of the node reporting the stats
42    pub node_id: String,
43    /// The current statistics snapshot
44    pub stats: NodeStats,
45    /// When this snapshot was taken
46    pub timestamp: SystemTime,
47}
48
49/// Helper for tracking and throttling node statistics updates.
50/// Automatically sends updates every 10 seconds or 1000 packets.
51pub struct NodeStatsTracker {
52    stats: NodeStats,
53    start_time: std::time::Instant,
54    last_send: std::time::Instant,
55    node_id: String,
56    stats_tx: Option<tokio::sync::mpsc::Sender<NodeStatsUpdate>>,
57}
58
59impl NodeStatsTracker {
60    /// Throttling configuration
61    const SEND_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
62    const SEND_PACKET_THRESHOLD: u64 = 1000;
63
64    /// Create a new stats tracker for a node
65    pub fn new(
66        node_id: String,
67        stats_tx: Option<tokio::sync::mpsc::Sender<NodeStatsUpdate>>,
68    ) -> Self {
69        let now = std::time::Instant::now();
70        Self { stats: NodeStats::default(), start_time: now, last_send: now, node_id, stats_tx }
71    }
72
73    /// Record a received packet
74    #[inline]
75    pub const fn received(&mut self) {
76        self.stats.received += 1;
77    }
78
79    /// Record multiple received packets (for batched stats reporting)
80    #[inline]
81    pub const fn received_n(&mut self, count: u64) {
82        self.stats.received += count;
83    }
84
85    /// Record a sent packet
86    #[inline]
87    pub const fn sent(&mut self) {
88        self.stats.sent += 1;
89    }
90
91    /// Record a discarded packet
92    #[inline]
93    pub const fn discarded(&mut self) {
94        self.stats.discarded += 1;
95    }
96
97    /// Record an error
98    #[inline]
99    pub const fn errored(&mut self) {
100        self.stats.errored += 1;
101    }
102
103    /// Automatically send stats if threshold is met (every 10s or 1000 packets).
104    /// Call this after processing a batch of packets.
105    pub fn maybe_send(&mut self) {
106        let should_send = self.last_send.elapsed() >= Self::SEND_INTERVAL
107            || self.stats.received.is_multiple_of(Self::SEND_PACKET_THRESHOLD);
108
109        if should_send {
110            self.force_send();
111        }
112    }
113
114    /// Force send stats immediately (useful for final updates)
115    pub fn force_send(&mut self) {
116        if let Some(ref stats_tx) = self.stats_tx {
117            // Update duration before sending
118            self.stats.duration_secs = self.start_time.elapsed().as_secs_f64();
119
120            let _ = stats_tx.try_send(NodeStatsUpdate {
121                node_id: self.node_id.clone(),
122                stats: self.stats.clone(),
123                timestamp: SystemTime::now(),
124            });
125            self.last_send = std::time::Instant::now();
126        }
127    }
128}