Skip to main content

orbit_metrics/
lib.rs

1//! Metrics snapshot families over `orbit-rs` rings.
2//!
3//! Metrics are not cache entries. They are periodic, worker-local
4//! snapshots where readers want the latest value per node and can
5//! discard stale samples. This crate keeps that semantic layer out of
6//! `orbit-rs` while staying independent of any application runtime.
7
8use std::collections::HashMap;
9use std::hash::Hash;
10use std::sync::Arc;
11
12use bytes::Bytes;
13pub use orbit_rs::OrbitTyped;
14use orbit_rs::{Fleet, NetId64};
15
16/// A periodic metrics snapshot carried by an Orbit ring.
17///
18/// Implement this on a compact snapshot type. Hot paths should update
19/// local atomics; a background task captures a snapshot and publishes it
20/// through [`OrbitMetricFamily`].
21pub trait OrbitMetricSnapshot: OrbitTyped + Sized {
22    /// Human-readable family name for diagnostics.
23    const FAMILY: &'static str;
24
25    /// Logical node this snapshot describes.
26    fn node_id(&self) -> u16;
27
28    /// Unix timestamp in seconds when the snapshot was captured.
29    fn captured_at_unix_secs(&self) -> u64;
30
31    /// Encode the snapshot into a ring payload.
32    fn encode(&self) -> Result<Vec<u8>, String>;
33
34    /// Decode the snapshot from a ring payload.
35    fn decode(bytes: &[u8]) -> Result<Self, String>;
36}
37
38/// Optional key projection for row-like metric families.
39///
40/// Worker-control metrics usually use `node_id`, but label-preserving
41/// families often need "latest row per logical key" where the key is not
42/// the producing process id.
43pub trait OrbitMetricKeyedSnapshot: OrbitMetricSnapshot {
44    type Key: Eq + Hash;
45
46    fn metric_key(&self) -> Self::Key;
47}
48
49/// One decoded metrics sample plus the Orbit id that carried it.
50#[derive(Clone, Debug)]
51pub struct OrbitMetricSample<T> {
52    pub id: NetId64,
53    pub snapshot: T,
54}
55
56impl<T: OrbitMetricSnapshot> OrbitMetricSample<T> {
57    pub fn node_id(&self) -> u16 {
58        self.snapshot.node_id()
59    }
60
61    pub fn captured_at_unix_secs(&self) -> u64 {
62        self.snapshot.captured_at_unix_secs()
63    }
64
65    pub fn age_secs(&self, now_unix_secs: u64) -> u64 {
66        now_unix_secs.saturating_sub(self.captured_at_unix_secs())
67    }
68
69    pub fn is_fresh(&self, now_unix_secs: u64, max_age_secs: u64) -> bool {
70        self.age_secs(now_unix_secs) <= max_age_secs
71    }
72}
73
74/// Ring-backed metrics family.
75#[derive(Clone)]
76pub struct OrbitMetricFamily<T: OrbitMetricSnapshot> {
77    fleet: Arc<Fleet>,
78    _t: std::marker::PhantomData<T>,
79}
80
81impl<T: OrbitMetricSnapshot> OrbitMetricFamily<T> {
82    pub fn new(fleet: Arc<Fleet>) -> Self {
83        Self {
84            fleet,
85            _t: std::marker::PhantomData,
86        }
87    }
88
89    pub fn publisher(&self) -> OrbitMetricPublisher<T> {
90        OrbitMetricPublisher {
91            family: self.clone(),
92        }
93    }
94
95    pub fn collector(&self) -> OrbitMetricCollector<T> {
96        OrbitMetricCollector {
97            family: self.clone(),
98        }
99    }
100}
101
102/// Write-side handle for one metrics family. Usually lives in the
103/// worker/background publisher task.
104#[derive(Clone)]
105pub struct OrbitMetricPublisher<T: OrbitMetricSnapshot> {
106    family: OrbitMetricFamily<T>,
107}
108
109impl<T: OrbitMetricSnapshot> OrbitMetricPublisher<T> {
110    pub fn new(fleet: Arc<Fleet>) -> Self {
111        Self {
112            family: OrbitMetricFamily::new(fleet),
113        }
114    }
115
116    /// Publish one captured snapshot. The ring frame version mirrors
117    /// the snapshot timestamp so low-level tools can inspect freshness
118    /// without decoding.
119    pub fn publish(&self, snapshot: &T) -> Result<NetId64, String> {
120        let payload = snapshot.encode()?;
121        #[cfg(unix)]
122        if payload.len() > orbit_rs::ring_shm::PAYLOAD_MAX {
123            return Err(format!(
124                "orbit metrics payload too large for {}: {} > {}",
125                T::FAMILY,
126                payload.len(),
127                orbit_rs::ring_shm::PAYLOAD_MAX
128            ));
129        }
130        Ok(self.family.fleet.publish::<T>(
131            0,
132            snapshot.captured_at_unix_secs(),
133            Bytes::from(payload),
134        ))
135    }
136}
137
138/// Read-side handle for one metrics family. Usually lives in the
139/// master/aggregator path.
140#[derive(Clone)]
141pub struct OrbitMetricCollector<T: OrbitMetricSnapshot> {
142    family: OrbitMetricFamily<T>,
143}
144
145impl<T: OrbitMetricSnapshot> OrbitMetricCollector<T> {
146    pub fn new(fleet: Arc<Fleet>) -> Self {
147        Self {
148            family: OrbitMetricFamily::new(fleet),
149        }
150    }
151
152    /// Walk the ring backwards and return the newest decodable sample
153    /// for each node. Malformed frames are ignored; a newer valid frame
154    /// for a node wins because the walk starts at the ring head.
155    pub fn latest_by_node(&self) -> HashMap<u16, OrbitMetricSample<T>> {
156        let head = self.family.fleet.head::<T>();
157        if head == 0 {
158            return HashMap::new();
159        }
160
161        let capacity = self.family.fleet.ring_capacity::<T>() as u64;
162        let walk_count = head.min(capacity);
163        let mut samples = HashMap::new();
164        let expected_nodes = self.family.fleet.fleet_size() as usize;
165
166        for i in 0..walk_count {
167            let counter = head - 1 - i;
168            let Some(frame) = self.family.fleet.read_at::<T>(counter) else {
169                if counter == 0 {
170                    break;
171                }
172                continue;
173            };
174            let Ok(snapshot) = T::decode(&frame.payload) else {
175                if counter == 0 {
176                    break;
177                }
178                continue;
179            };
180            samples
181                .entry(snapshot.node_id())
182                .or_insert(OrbitMetricSample {
183                    id: frame.id,
184                    snapshot,
185                });
186            if expected_nodes > 0 && samples.len() >= expected_nodes {
187                break;
188            }
189            if counter == 0 {
190                break;
191            }
192        }
193
194        samples
195    }
196
197    /// Walk the ring backwards and return the newest decodable sample
198    /// for each logical metric key.
199    pub fn latest_by_key<K>(&self) -> HashMap<K, OrbitMetricSample<T>>
200    where
201        T: OrbitMetricKeyedSnapshot<Key = K>,
202        K: Eq + Hash,
203    {
204        let head = self.family.fleet.head::<T>();
205        if head == 0 {
206            return HashMap::new();
207        }
208
209        let capacity = self.family.fleet.ring_capacity::<T>() as u64;
210        let walk_count = head.min(capacity);
211        let mut samples = HashMap::new();
212
213        for i in 0..walk_count {
214            let counter = head - 1 - i;
215            let Some(frame) = self.family.fleet.read_at::<T>(counter) else {
216                if counter == 0 {
217                    break;
218                }
219                continue;
220            };
221            let Ok(snapshot) = T::decode(&frame.payload) else {
222                if counter == 0 {
223                    break;
224                }
225                continue;
226            };
227            samples
228                .entry(snapshot.metric_key())
229                .or_insert(OrbitMetricSample {
230                    id: frame.id,
231                    snapshot,
232                });
233            if counter == 0 {
234                break;
235            }
236        }
237
238        samples
239    }
240
241    /// Return latest keyed samples whose captured timestamp is within
242    /// `max_age_secs` of `now_unix_secs`.
243    pub fn fresh_by_key<K>(
244        &self,
245        now_unix_secs: u64,
246        max_age_secs: u64,
247    ) -> HashMap<K, OrbitMetricSample<T>>
248    where
249        T: OrbitMetricKeyedSnapshot<Key = K>,
250        K: Eq + Hash,
251    {
252        self.latest_by_key()
253            .into_iter()
254            .filter(|(_, sample)| sample.is_fresh(now_unix_secs, max_age_secs))
255            .collect()
256    }
257
258    /// Return latest samples whose captured timestamp is within
259    /// `max_age_secs` of `now_unix_secs`.
260    pub fn fresh_by_node(
261        &self,
262        now_unix_secs: u64,
263        max_age_secs: u64,
264    ) -> HashMap<u16, OrbitMetricSample<T>> {
265        self.latest_by_node()
266            .into_iter()
267            .filter(|(_, sample)| sample.is_fresh(now_unix_secs, max_age_secs))
268            .collect()
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[derive(Clone, Debug, PartialEq, Eq)]
277    struct TestSnapshot {
278        node: u16,
279        captured_at: u64,
280        value: u64,
281    }
282
283    impl OrbitTyped for TestSnapshot {
284        const KIND: u8 = 211;
285    }
286
287    impl OrbitMetricSnapshot for TestSnapshot {
288        const FAMILY: &'static str = "test";
289
290        fn node_id(&self) -> u16 {
291            self.node
292        }
293
294        fn captured_at_unix_secs(&self) -> u64 {
295            self.captured_at
296        }
297
298        fn encode(&self) -> Result<Vec<u8>, String> {
299            let mut out = Vec::with_capacity(18);
300            out.extend_from_slice(&self.node.to_le_bytes());
301            out.extend_from_slice(&self.captured_at.to_le_bytes());
302            out.extend_from_slice(&self.value.to_le_bytes());
303            Ok(out)
304        }
305
306        fn decode(bytes: &[u8]) -> Result<Self, String> {
307            if bytes.len() != 18 {
308                return Err(format!("bad len {}", bytes.len()));
309            }
310            let node = u16::from_le_bytes(bytes[0..2].try_into().expect("node bytes"));
311            let captured_at = u64::from_le_bytes(bytes[2..10].try_into().expect("time bytes"));
312            let value = u64::from_le_bytes(bytes[10..18].try_into().expect("value bytes"));
313            Ok(Self {
314                node,
315                captured_at,
316                value,
317            })
318        }
319    }
320
321    #[test]
322    fn latest_by_node_keeps_newest_sample_per_node() {
323        let fleet = Arc::new(Fleet::join("metrics-test", 2).unwrap());
324        let family = OrbitMetricFamily::<TestSnapshot>::new(fleet);
325        let publisher = family.publisher();
326        let collector = family.collector();
327
328        publisher
329            .publish(&TestSnapshot {
330                node: 1,
331                captured_at: 10,
332                value: 100,
333            })
334            .unwrap();
335        publisher
336            .publish(&TestSnapshot {
337                node: 2,
338                captured_at: 11,
339                value: 200,
340            })
341            .unwrap();
342        publisher
343            .publish(&TestSnapshot {
344                node: 1,
345                captured_at: 12,
346                value: 101,
347            })
348            .unwrap();
349
350        let latest = collector.latest_by_node();
351        assert_eq!(latest.len(), 2);
352        assert_eq!(latest[&1].snapshot.value, 101);
353        assert_eq!(latest[&2].snapshot.value, 200);
354    }
355
356    #[test]
357    fn fresh_by_node_drops_stale_samples() {
358        let fleet = Arc::new(Fleet::join("metrics-fresh-test", 2).unwrap());
359        let family = OrbitMetricFamily::<TestSnapshot>::new(fleet);
360        let publisher = family.publisher();
361        let collector = family.collector();
362
363        publisher
364            .publish(&TestSnapshot {
365                node: 1,
366                captured_at: 10,
367                value: 100,
368            })
369            .unwrap();
370        publisher
371            .publish(&TestSnapshot {
372                node: 2,
373                captured_at: 20,
374                value: 200,
375            })
376            .unwrap();
377
378        let fresh = collector.fresh_by_node(25, 10);
379        assert_eq!(fresh.len(), 1);
380        assert_eq!(fresh[&2].snapshot.value, 200);
381    }
382
383    #[derive(Clone, Debug, PartialEq, Eq)]
384    struct KeyedSnapshot {
385        node: u16,
386        key: &'static str,
387        captured_at: u64,
388        value: u64,
389    }
390
391    impl OrbitTyped for KeyedSnapshot {
392        const KIND: u8 = 212;
393    }
394
395    impl OrbitMetricSnapshot for KeyedSnapshot {
396        const FAMILY: &'static str = "keyed-test";
397
398        fn node_id(&self) -> u16 {
399            self.node
400        }
401
402        fn captured_at_unix_secs(&self) -> u64 {
403            self.captured_at
404        }
405
406        fn encode(&self) -> Result<Vec<u8>, String> {
407            let mut out = Vec::with_capacity(27);
408            out.extend_from_slice(&self.node.to_le_bytes());
409            out.extend_from_slice(&self.captured_at.to_le_bytes());
410            out.extend_from_slice(&self.value.to_le_bytes());
411            let key = self.key.as_bytes();
412            out.push(key.len() as u8);
413            out.extend_from_slice(key);
414            Ok(out)
415        }
416
417        fn decode(bytes: &[u8]) -> Result<Self, String> {
418            if bytes.len() < 19 {
419                return Err(format!("bad len {}", bytes.len()));
420            }
421            let node = u16::from_le_bytes(bytes[0..2].try_into().expect("node bytes"));
422            let captured_at = u64::from_le_bytes(bytes[2..10].try_into().expect("time bytes"));
423            let value = u64::from_le_bytes(bytes[10..18].try_into().expect("value bytes"));
424            let key_len = usize::from(bytes[18]);
425            if bytes.len() != 19 + key_len {
426                return Err(format!("bad key len {}", bytes.len()));
427            }
428            let key = std::str::from_utf8(&bytes[19..]).map_err(|e| e.to_string())?;
429            let key = match key {
430                "alpha" => "alpha",
431                "beta" => "beta",
432                _ => return Err(format!("unknown key {key}")),
433            };
434            Ok(Self {
435                node,
436                key,
437                captured_at,
438                value,
439            })
440        }
441    }
442
443    impl OrbitMetricKeyedSnapshot for KeyedSnapshot {
444        type Key = String;
445
446        fn metric_key(&self) -> Self::Key {
447            self.key.to_owned()
448        }
449    }
450
451    #[test]
452    fn latest_by_key_keeps_newest_sample_per_key() {
453        let fleet = Arc::new(Fleet::join("metrics-keyed-test", 2).unwrap());
454        let family = OrbitMetricFamily::<KeyedSnapshot>::new(fleet);
455        let publisher = family.publisher();
456        let collector = family.collector();
457
458        publisher
459            .publish(&KeyedSnapshot {
460                node: 0,
461                key: "alpha",
462                captured_at: 10,
463                value: 100,
464            })
465            .unwrap();
466        publisher
467            .publish(&KeyedSnapshot {
468                node: 0,
469                key: "beta",
470                captured_at: 11,
471                value: 200,
472            })
473            .unwrap();
474        publisher
475            .publish(&KeyedSnapshot {
476                node: 0,
477                key: "alpha",
478                captured_at: 12,
479                value: 101,
480            })
481            .unwrap();
482
483        let latest = collector.latest_by_key();
484        assert_eq!(latest.len(), 2);
485        assert_eq!(latest["alpha"].snapshot.value, 101);
486        assert_eq!(latest["beta"].snapshot.value, 200);
487    }
488}