Skip to main content

rill_core/queues/
telemetry.rs

1//! Telemetry queue — feedback data from the audio world.
2
3use super::command::Command;
4use crate::traits::{NodeId, ParameterId, PortId};
5use std::fmt;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8/// Constants for clock telemetry
9///
10/// Format of `CLOCK_TICK` (Telemetry::Event with kind="clock_tick"):
11/// - `data[0]` — `sample_pos` (absolute sample position, f32)
12/// - `data[1]` — `sample_rate` (sample rate, Hz)
13/// - `data[2]` — `tempo` (BPM, 0.0 if unknown)
14/// - `data[3]` — `beat_position` (fractional beat position, 0.0 if no tempo)
15/// - `data[4]` — `is_new_beat` (1.0 if this is the start of a new beat, otherwise 0.0)
16/// - `data[5]` — `is_new_bar` (1.0 if this is the start of a new bar, otherwise 0.0)
17/// Event kind for clock tick telemetry.
18pub const CLOCK_TICK: &str = "clock_tick";
19/// Event kind for clock tempo telemetry.
20pub const CLOCK_TEMPO: &str = "clock_tempo";
21
22/// Lightweight wrapper around a telemetry sender.
23///
24/// Stored in nodes that wish to emit telemetry from `generate()` /
25/// `process()` / `consume()`. Non-blocking `try_send` is safe for the
26/// audio thread.
27#[derive(Clone)]
28pub struct TelemetryTx {
29    /// Optional inner crossbeam sender (None = telemetry disabled).
30    inner: Option<crossbeam_channel::Sender<Telemetry>>,
31}
32
33impl TelemetryTx {
34    /// Create a disabled (no-op) telemetry sender.
35    pub const fn empty() -> Self {
36        Self { inner: None }
37    }
38
39    /// Create a new telemetry sender wrapping a crossbeam channel sender.
40    pub fn new(tx: crossbeam_channel::Sender<Telemetry>) -> Self {
41        Self { inner: Some(tx) }
42    }
43
44    /// Try to send a telemetry event (non-blocking, safe for RT threads).
45    pub fn try_send(&self, event: Telemetry) {
46        if let Some(ref tx) = self.inner {
47            let _ = tx.try_send(event);
48        }
49    }
50
51    /// Return a reference to the inner crossbeam sender, if present.
52    pub fn sender(&self) -> Option<&crossbeam_channel::Sender<Telemetry>> {
53        self.inner.as_ref()
54    }
55}
56
57/// Telemetry type identifier.
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59pub enum TelemetryKind {
60    /// Parameter value telemetry.
61    Parameter,
62    /// Signal data (audio, sensor readings).
63    Signal,
64    /// Peak value telemetry.
65    Peak,
66    /// Event telemetry.
67    Event,
68    /// Micro-control violation telemetry.
69    Violation,
70}
71
72impl fmt::Display for TelemetryKind {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        match self {
75            TelemetryKind::Parameter => write!(f, "parameter"),
76            TelemetryKind::Signal => write!(f, "signal"),
77            TelemetryKind::Peak => write!(f, "peak"),
78            TelemetryKind::Event => write!(f, "event"),
79            TelemetryKind::Violation => write!(f, "violation"),
80        }
81    }
82}
83
84/// Telemetry data emitted by signal graph nodes.
85#[derive(Debug, Clone)]
86pub enum Telemetry {
87    /// A parameter value change.
88    ParameterValue {
89        /// Target port.
90        port: PortId,
91        /// Target parameter.
92        parameter: ParameterId,
93        /// Current parameter value.
94        value: f32,
95        /// Unix timestamp (microseconds).
96        timestamp: u64,
97    },
98
99    /// Audio or sensor signal data.
100    SignalData {
101        /// Source node ID.
102        node_id: NodeId,
103        /// Channel index.
104        channel: usize,
105        /// Signal sample data.
106        data: Vec<f32>,
107        /// Unix timestamp (microseconds).
108        timestamp: u64,
109        /// Sample rate of the signal.
110        sample_rate: f32,
111    },
112
113    /// Peak value reading.
114    Peak {
115        /// Target port.
116        port: PortId,
117        /// Peak value.
118        value: f32,
119        /// Unix timestamp (microseconds).
120        timestamp: u64,
121        /// Optional hold time in milliseconds.
122        hold_time_ms: Option<u32>,
123    },
124
125    /// Named event with float payload.
126    Event {
127        /// Source component name.
128        source: String,
129        /// Event kind string.
130        kind: String,
131        /// Event data payload.
132        data: Vec<f32>,
133        /// Unix timestamp (microseconds).
134        timestamp: u64,
135        /// Optional human-readable description.
136        description: Option<String>,
137    },
138
139    /// Micro-control timing violation.
140    Violation {
141        /// Component that exceeded its time budget.
142        component: String,
143        /// Expected execution time (nanoseconds).
144        expected_ns: u64,
145        /// Actual execution time (nanoseconds).
146        actual_ns: u64,
147        /// Optional associated value.
148        value: Option<f32>,
149        /// Unix timestamp (microseconds).
150        timestamp: u64,
151    },
152}
153
154// Implement the Command trait for Telemetry
155impl Command for Telemetry {}
156
157impl Telemetry {
158    /// Return the current Unix time in microseconds.
159    pub fn now() -> u64 {
160        SystemTime::now()
161            .duration_since(UNIX_EPOCH)
162            .unwrap_or_default()
163            .as_micros() as u64
164    }
165
166    /// Create a parameter value telemetry event.
167    pub fn parameter(port: PortId, parameter: ParameterId, value: f32) -> Self {
168        Telemetry::ParameterValue {
169            port,
170            parameter,
171            value,
172            timestamp: Self::now(),
173        }
174    }
175
176    /// Create a parameter value telemetry event with an explicit timestamp (for testing).
177    pub fn parameter_with_time(
178        port: PortId,
179        parameter: ParameterId,
180        value: f32,
181        timestamp: u64,
182    ) -> Self {
183        Telemetry::ParameterValue {
184            port,
185            parameter,
186            value,
187            timestamp,
188        }
189    }
190
191    /// Create a signal data telemetry event.
192    pub fn signal(node_id: NodeId, channel: usize, data: Vec<f32>) -> Self {
193        Telemetry::SignalData {
194            node_id,
195            channel,
196            data,
197            timestamp: Self::now(),
198            sample_rate: 44100.0,
199        }
200    }
201
202    /// Create a signal data telemetry event with an explicit sample rate.
203    pub fn signal_with_sample_rate(
204        node_id: NodeId,
205        channel: usize,
206        data: Vec<f32>,
207        sample_rate: f32,
208    ) -> Self {
209        Telemetry::SignalData {
210            node_id,
211            channel,
212            data,
213            timestamp: Self::now(),
214            sample_rate,
215        }
216    }
217
218    /// Create a peak value telemetry event.
219    pub fn peak(port: PortId, value: f32) -> Self {
220        Telemetry::Peak {
221            port,
222            value,
223            timestamp: Self::now(),
224            hold_time_ms: None,
225        }
226    }
227
228    /// Create a peak value telemetry event with a hold time.
229    pub fn peak_with_hold(port: PortId, value: f32, hold_time_ms: u32) -> Self {
230        Telemetry::Peak {
231            port,
232            value,
233            timestamp: Self::now(),
234            hold_time_ms: Some(hold_time_ms),
235        }
236    }
237
238    /// Create an event telemetry event.
239    pub fn event(source: impl Into<String>, kind: impl Into<String>, data: Vec<f32>) -> Self {
240        Telemetry::Event {
241            source: source.into(),
242            kind: kind.into(),
243            data,
244            timestamp: Self::now(),
245            description: None,
246        }
247    }
248
249    /// Create an event telemetry event with a description.
250    pub fn event_with_description(
251        source: impl Into<String>,
252        kind: impl Into<String>,
253        data: Vec<f32>,
254        description: impl Into<String>,
255    ) -> Self {
256        Telemetry::Event {
257            source: source.into(),
258            kind: kind.into(),
259            data,
260            timestamp: Self::now(),
261            description: Some(description.into()),
262        }
263    }
264
265    /// Create a micro-control violation telemetry event.
266    pub fn violation(
267        component: impl Into<String>,
268        expected_ns: u64,
269        actual_ns: u64,
270        value: Option<f32>,
271    ) -> Self {
272        Telemetry::Violation {
273            component: component.into(),
274            expected_ns,
275            actual_ns,
276            value,
277            timestamp: Self::now(),
278        }
279    }
280
281    /// Return the telemetry type category.
282    pub fn kind(&self) -> TelemetryKind {
283        match self {
284            Telemetry::ParameterValue { .. } => TelemetryKind::Parameter,
285            Telemetry::SignalData { .. } => TelemetryKind::Signal,
286            Telemetry::Peak { .. } => TelemetryKind::Peak,
287            Telemetry::Event { .. } => TelemetryKind::Event,
288            Telemetry::Violation { .. } => TelemetryKind::Violation,
289        }
290    }
291
292    /// Return the timestamp (microseconds since Unix epoch).
293    pub fn timestamp(&self) -> u64 {
294        match self {
295            Telemetry::ParameterValue { timestamp, .. } => *timestamp,
296            Telemetry::SignalData { timestamp, .. } => *timestamp,
297            Telemetry::Peak { timestamp, .. } => *timestamp,
298            Telemetry::Event { timestamp, .. } => *timestamp,
299            Telemetry::Violation { timestamp, .. } => *timestamp,
300        }
301    }
302}
303
304impl fmt::Display for Telemetry {
305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306        match self {
307            Telemetry::ParameterValue {
308                port,
309                parameter,
310                value,
311                timestamp,
312            } => {
313                write!(
314                    f,
315                    "[{}] 📊 {}::{} = {:.3}",
316                    timestamp, port, parameter, value
317                )
318            }
319            Telemetry::SignalData {
320                node_id,
321                channel,
322                data,
323                timestamp,
324                sample_rate,
325            } => {
326                let duration_ms = data.len() as f32 / sample_rate * 1000.0;
327                write!(
328                    f,
329                    "[{}] 🎵 {}:{} ({} samples, {:.1}ms)",
330                    timestamp,
331                    node_id,
332                    channel,
333                    data.len(),
334                    duration_ms
335                )
336            }
337            Telemetry::Peak {
338                port,
339                value,
340                timestamp,
341                hold_time_ms,
342            } => {
343                if let Some(hold) = hold_time_ms {
344                    write!(
345                        f,
346                        "[{}] 📈 {} = {:.3} (hold {}ms)",
347                        timestamp, port, value, hold
348                    )
349                } else {
350                    write!(f, "[{}] 📈 {} = {:.3}", timestamp, port, value)
351                }
352            }
353            Telemetry::Event {
354                source,
355                kind,
356                data,
357                timestamp,
358                description,
359            } => {
360                if let Some(desc) = description {
361                    write!(
362                        f,
363                        "[{}] 📢 {}:{} ({}) {:?}",
364                        timestamp, source, kind, desc, data
365                    )
366                } else {
367                    write!(f, "[{}] 📢 {}:{} {:?}", timestamp, source, kind, data)
368                }
369            }
370            Telemetry::Violation {
371                component,
372                expected_ns,
373                actual_ns,
374                value,
375                timestamp,
376            } => {
377                if let Some(v) = value {
378                    write!(
379                        f,
380                        "[{}] ⚠️ {} violation: {}ns > {}ns, value={:.3}",
381                        timestamp, component, actual_ns, expected_ns, v
382                    )
383                } else {
384                    write!(
385                        f,
386                        "[{}] ⚠️ {} violation: {}ns > {}ns",
387                        timestamp, component, actual_ns, expected_ns
388                    )
389                }
390            }
391        }
392    }
393}
394
395/// Alias for a [`CommandQueue`] specialised for telemetry data.
396///
397/// [`CommandQueue`]: super::command::CommandQueue
398pub type TelemetryQueue = super::command::CommandQueue<Telemetry>;
399
400/// Convenience extension methods for [`TelemetryQueue`].
401pub trait TelemetryQueueExt {
402    /// Send a parameter value telemetry event.
403    fn send_parameter(
404        &self,
405        port: PortId,
406        parameter: ParameterId,
407        value: f32,
408    ) -> Result<(), super::error::QueueError>;
409    /// Send a signal data telemetry event.
410    fn send_signal(
411        &self,
412        node_id: NodeId,
413        channel: usize,
414        data: Vec<f32>,
415    ) -> Result<(), super::error::QueueError>;
416    /// Send a signal data telemetry event with explicit sample rate.
417    fn send_signal_with_sample_rate(
418        &self,
419        node_id: NodeId,
420        channel: usize,
421        data: Vec<f32>,
422        sample_rate: f32,
423    ) -> Result<(), super::error::QueueError>;
424    /// Send a peak value telemetry event.
425    fn send_peak(&self, port: PortId, value: f32) -> Result<(), super::error::QueueError>;
426    /// Send a peak value telemetry event with hold time.
427    fn send_peak_with_hold(
428        &self,
429        port: PortId,
430        value: f32,
431        hold_time_ms: u32,
432    ) -> Result<(), super::error::QueueError>;
433    /// Send an event telemetry event.
434    fn send_event(
435        &self,
436        source: &str,
437        kind: &str,
438        data: Vec<f32>,
439    ) -> Result<(), super::error::QueueError>;
440    /// Send an event telemetry event with description.
441    fn send_event_with_description(
442        &self,
443        source: &str,
444        kind: &str,
445        data: Vec<f32>,
446        description: &str,
447    ) -> Result<(), super::error::QueueError>;
448    /// Send a micro-control violation telemetry event.
449    fn send_violation(
450        &self,
451        component: &str,
452        expected_ns: u64,
453        actual_ns: u64,
454        value: Option<f32>,
455    ) -> Result<(), super::error::QueueError>;
456}
457
458impl TelemetryQueueExt for super::command::CommandQueue<Telemetry> {
459    fn send_parameter(
460        &self,
461        port: PortId,
462        parameter: ParameterId,
463        value: f32,
464    ) -> Result<(), super::error::QueueError> {
465        self.send(Telemetry::parameter(port, parameter, value))
466    }
467
468    fn send_signal(
469        &self,
470        node_id: NodeId,
471        channel: usize,
472        data: Vec<f32>,
473    ) -> Result<(), super::error::QueueError> {
474        self.send(Telemetry::signal(node_id, channel, data))
475    }
476
477    fn send_signal_with_sample_rate(
478        &self,
479        node_id: NodeId,
480        channel: usize,
481        data: Vec<f32>,
482        sample_rate: f32,
483    ) -> Result<(), super::error::QueueError> {
484        self.send(Telemetry::signal_with_sample_rate(
485            node_id,
486            channel,
487            data,
488            sample_rate,
489        ))
490    }
491
492    fn send_peak(&self, port: PortId, value: f32) -> Result<(), super::error::QueueError> {
493        self.send(Telemetry::peak(port, value))
494    }
495
496    fn send_peak_with_hold(
497        &self,
498        port: PortId,
499        value: f32,
500        hold_time_ms: u32,
501    ) -> Result<(), super::error::QueueError> {
502        self.send(Telemetry::peak_with_hold(port, value, hold_time_ms))
503    }
504
505    fn send_event(
506        &self,
507        source: &str,
508        kind: &str,
509        data: Vec<f32>,
510    ) -> Result<(), super::error::QueueError> {
511        self.send(Telemetry::event(source, kind, data))
512    }
513
514    fn send_event_with_description(
515        &self,
516        source: &str,
517        kind: &str,
518        data: Vec<f32>,
519        description: &str,
520    ) -> Result<(), super::error::QueueError> {
521        self.send(Telemetry::event_with_description(
522            source,
523            kind,
524            data,
525            description,
526        ))
527    }
528
529    fn send_violation(
530        &self,
531        component: &str,
532        expected_ns: u64,
533        actual_ns: u64,
534        value: Option<f32>,
535    ) -> Result<(), super::error::QueueError> {
536        self.send(Telemetry::violation(
537            component,
538            expected_ns,
539            actual_ns,
540            value,
541        ))
542    }
543}