1use std::collections::HashMap;
9use std::hash::Hash;
10use std::sync::Arc;
11
12use bytes::Bytes;
13pub use orbit_rs::OrbitTyped;
14use orbit_rs::{Fleet, NetId64};
15
16pub trait OrbitMetricSnapshot: OrbitTyped + Sized {
22 const FAMILY: &'static str;
24
25 fn node_id(&self) -> u16;
27
28 fn captured_at_unix_secs(&self) -> u64;
30
31 fn encode(&self) -> Result<Vec<u8>, String>;
33
34 fn decode(bytes: &[u8]) -> Result<Self, String>;
36}
37
38pub trait OrbitMetricKeyedSnapshot: OrbitMetricSnapshot {
44 type Key: Eq + Hash;
45
46 fn metric_key(&self) -> Self::Key;
47}
48
49#[derive(Clone, Debug)]
51pub struct OrbitMetricSample<T> {
52 pub id: NetId64,
53 pub snapshot: T,
54}
55
56impl<T: OrbitMetricSnapshot> OrbitMetricSample<T> {
57 pub fn node_id(&self) -> u16 {
58 self.snapshot.node_id()
59 }
60
61 pub fn captured_at_unix_secs(&self) -> u64 {
62 self.snapshot.captured_at_unix_secs()
63 }
64
65 pub fn age_secs(&self, now_unix_secs: u64) -> u64 {
66 now_unix_secs.saturating_sub(self.captured_at_unix_secs())
67 }
68
69 pub fn is_fresh(&self, now_unix_secs: u64, max_age_secs: u64) -> bool {
70 self.age_secs(now_unix_secs) <= max_age_secs
71 }
72}
73
74#[derive(Clone)]
76pub struct OrbitMetricFamily<T: OrbitMetricSnapshot> {
77 fleet: Arc<Fleet>,
78 _t: std::marker::PhantomData<T>,
79}
80
81impl<T: OrbitMetricSnapshot> OrbitMetricFamily<T> {
82 pub fn new(fleet: Arc<Fleet>) -> Self {
83 Self {
84 fleet,
85 _t: std::marker::PhantomData,
86 }
87 }
88
89 pub fn publisher(&self) -> OrbitMetricPublisher<T> {
90 OrbitMetricPublisher {
91 family: self.clone(),
92 }
93 }
94
95 pub fn collector(&self) -> OrbitMetricCollector<T> {
96 OrbitMetricCollector {
97 family: self.clone(),
98 }
99 }
100}
101
102#[derive(Clone)]
105pub struct OrbitMetricPublisher<T: OrbitMetricSnapshot> {
106 family: OrbitMetricFamily<T>,
107}
108
109impl<T: OrbitMetricSnapshot> OrbitMetricPublisher<T> {
110 pub fn new(fleet: Arc<Fleet>) -> Self {
111 Self {
112 family: OrbitMetricFamily::new(fleet),
113 }
114 }
115
116 pub fn publish(&self, snapshot: &T) -> Result<NetId64, String> {
120 let payload = snapshot.encode()?;
121 #[cfg(unix)]
122 if payload.len() > orbit_rs::ring_shm::PAYLOAD_MAX {
123 return Err(format!(
124 "orbit metrics payload too large for {}: {} > {}",
125 T::FAMILY,
126 payload.len(),
127 orbit_rs::ring_shm::PAYLOAD_MAX
128 ));
129 }
130 Ok(self.family.fleet.publish::<T>(
131 0,
132 snapshot.captured_at_unix_secs(),
133 Bytes::from(payload),
134 ))
135 }
136}
137
138#[derive(Clone)]
141pub struct OrbitMetricCollector<T: OrbitMetricSnapshot> {
142 family: OrbitMetricFamily<T>,
143}
144
145impl<T: OrbitMetricSnapshot> OrbitMetricCollector<T> {
146 pub fn new(fleet: Arc<Fleet>) -> Self {
147 Self {
148 family: OrbitMetricFamily::new(fleet),
149 }
150 }
151
152 pub fn latest_by_node(&self) -> HashMap<u16, OrbitMetricSample<T>> {
156 let head = self.family.fleet.head::<T>();
157 if head == 0 {
158 return HashMap::new();
159 }
160
161 let capacity = self.family.fleet.ring_capacity::<T>() as u64;
162 let walk_count = head.min(capacity);
163 let mut samples = HashMap::new();
164 let expected_nodes = self.family.fleet.fleet_size() as usize;
165
166 for i in 0..walk_count {
167 let counter = head - 1 - i;
168 let Some(frame) = self.family.fleet.read_at::<T>(counter) else {
169 if counter == 0 {
170 break;
171 }
172 continue;
173 };
174 let Ok(snapshot) = T::decode(&frame.payload) else {
175 if counter == 0 {
176 break;
177 }
178 continue;
179 };
180 samples
181 .entry(snapshot.node_id())
182 .or_insert(OrbitMetricSample {
183 id: frame.id,
184 snapshot,
185 });
186 if expected_nodes > 0 && samples.len() >= expected_nodes {
187 break;
188 }
189 if counter == 0 {
190 break;
191 }
192 }
193
194 samples
195 }
196
197 pub fn latest_by_key<K>(&self) -> HashMap<K, OrbitMetricSample<T>>
200 where
201 T: OrbitMetricKeyedSnapshot<Key = K>,
202 K: Eq + Hash,
203 {
204 let head = self.family.fleet.head::<T>();
205 if head == 0 {
206 return HashMap::new();
207 }
208
209 let capacity = self.family.fleet.ring_capacity::<T>() as u64;
210 let walk_count = head.min(capacity);
211 let mut samples = HashMap::new();
212
213 for i in 0..walk_count {
214 let counter = head - 1 - i;
215 let Some(frame) = self.family.fleet.read_at::<T>(counter) else {
216 if counter == 0 {
217 break;
218 }
219 continue;
220 };
221 let Ok(snapshot) = T::decode(&frame.payload) else {
222 if counter == 0 {
223 break;
224 }
225 continue;
226 };
227 samples
228 .entry(snapshot.metric_key())
229 .or_insert(OrbitMetricSample {
230 id: frame.id,
231 snapshot,
232 });
233 if counter == 0 {
234 break;
235 }
236 }
237
238 samples
239 }
240
241 pub fn fresh_by_key<K>(
244 &self,
245 now_unix_secs: u64,
246 max_age_secs: u64,
247 ) -> HashMap<K, OrbitMetricSample<T>>
248 where
249 T: OrbitMetricKeyedSnapshot<Key = K>,
250 K: Eq + Hash,
251 {
252 self.latest_by_key()
253 .into_iter()
254 .filter(|(_, sample)| sample.is_fresh(now_unix_secs, max_age_secs))
255 .collect()
256 }
257
258 pub fn fresh_by_node(
261 &self,
262 now_unix_secs: u64,
263 max_age_secs: u64,
264 ) -> HashMap<u16, OrbitMetricSample<T>> {
265 self.latest_by_node()
266 .into_iter()
267 .filter(|(_, sample)| sample.is_fresh(now_unix_secs, max_age_secs))
268 .collect()
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[derive(Clone, Debug, PartialEq, Eq)]
277 struct TestSnapshot {
278 node: u16,
279 captured_at: u64,
280 value: u64,
281 }
282
283 impl OrbitTyped for TestSnapshot {
284 const KIND: u8 = 211;
285 }
286
287 impl OrbitMetricSnapshot for TestSnapshot {
288 const FAMILY: &'static str = "test";
289
290 fn node_id(&self) -> u16 {
291 self.node
292 }
293
294 fn captured_at_unix_secs(&self) -> u64 {
295 self.captured_at
296 }
297
298 fn encode(&self) -> Result<Vec<u8>, String> {
299 let mut out = Vec::with_capacity(18);
300 out.extend_from_slice(&self.node.to_le_bytes());
301 out.extend_from_slice(&self.captured_at.to_le_bytes());
302 out.extend_from_slice(&self.value.to_le_bytes());
303 Ok(out)
304 }
305
306 fn decode(bytes: &[u8]) -> Result<Self, String> {
307 if bytes.len() != 18 {
308 return Err(format!("bad len {}", bytes.len()));
309 }
310 let node = u16::from_le_bytes(bytes[0..2].try_into().expect("node bytes"));
311 let captured_at = u64::from_le_bytes(bytes[2..10].try_into().expect("time bytes"));
312 let value = u64::from_le_bytes(bytes[10..18].try_into().expect("value bytes"));
313 Ok(Self {
314 node,
315 captured_at,
316 value,
317 })
318 }
319 }
320
321 #[test]
322 fn latest_by_node_keeps_newest_sample_per_node() {
323 let fleet = Arc::new(Fleet::join("metrics-test", 2).unwrap());
324 let family = OrbitMetricFamily::<TestSnapshot>::new(fleet);
325 let publisher = family.publisher();
326 let collector = family.collector();
327
328 publisher
329 .publish(&TestSnapshot {
330 node: 1,
331 captured_at: 10,
332 value: 100,
333 })
334 .unwrap();
335 publisher
336 .publish(&TestSnapshot {
337 node: 2,
338 captured_at: 11,
339 value: 200,
340 })
341 .unwrap();
342 publisher
343 .publish(&TestSnapshot {
344 node: 1,
345 captured_at: 12,
346 value: 101,
347 })
348 .unwrap();
349
350 let latest = collector.latest_by_node();
351 assert_eq!(latest.len(), 2);
352 assert_eq!(latest[&1].snapshot.value, 101);
353 assert_eq!(latest[&2].snapshot.value, 200);
354 }
355
356 #[test]
357 fn fresh_by_node_drops_stale_samples() {
358 let fleet = Arc::new(Fleet::join("metrics-fresh-test", 2).unwrap());
359 let family = OrbitMetricFamily::<TestSnapshot>::new(fleet);
360 let publisher = family.publisher();
361 let collector = family.collector();
362
363 publisher
364 .publish(&TestSnapshot {
365 node: 1,
366 captured_at: 10,
367 value: 100,
368 })
369 .unwrap();
370 publisher
371 .publish(&TestSnapshot {
372 node: 2,
373 captured_at: 20,
374 value: 200,
375 })
376 .unwrap();
377
378 let fresh = collector.fresh_by_node(25, 10);
379 assert_eq!(fresh.len(), 1);
380 assert_eq!(fresh[&2].snapshot.value, 200);
381 }
382
383 #[derive(Clone, Debug, PartialEq, Eq)]
384 struct KeyedSnapshot {
385 node: u16,
386 key: &'static str,
387 captured_at: u64,
388 value: u64,
389 }
390
391 impl OrbitTyped for KeyedSnapshot {
392 const KIND: u8 = 212;
393 }
394
395 impl OrbitMetricSnapshot for KeyedSnapshot {
396 const FAMILY: &'static str = "keyed-test";
397
398 fn node_id(&self) -> u16 {
399 self.node
400 }
401
402 fn captured_at_unix_secs(&self) -> u64 {
403 self.captured_at
404 }
405
406 fn encode(&self) -> Result<Vec<u8>, String> {
407 let mut out = Vec::with_capacity(27);
408 out.extend_from_slice(&self.node.to_le_bytes());
409 out.extend_from_slice(&self.captured_at.to_le_bytes());
410 out.extend_from_slice(&self.value.to_le_bytes());
411 let key = self.key.as_bytes();
412 out.push(key.len() as u8);
413 out.extend_from_slice(key);
414 Ok(out)
415 }
416
417 fn decode(bytes: &[u8]) -> Result<Self, String> {
418 if bytes.len() < 19 {
419 return Err(format!("bad len {}", bytes.len()));
420 }
421 let node = u16::from_le_bytes(bytes[0..2].try_into().expect("node bytes"));
422 let captured_at = u64::from_le_bytes(bytes[2..10].try_into().expect("time bytes"));
423 let value = u64::from_le_bytes(bytes[10..18].try_into().expect("value bytes"));
424 let key_len = usize::from(bytes[18]);
425 if bytes.len() != 19 + key_len {
426 return Err(format!("bad key len {}", bytes.len()));
427 }
428 let key = std::str::from_utf8(&bytes[19..]).map_err(|e| e.to_string())?;
429 let key = match key {
430 "alpha" => "alpha",
431 "beta" => "beta",
432 _ => return Err(format!("unknown key {key}")),
433 };
434 Ok(Self {
435 node,
436 key,
437 captured_at,
438 value,
439 })
440 }
441 }
442
443 impl OrbitMetricKeyedSnapshot for KeyedSnapshot {
444 type Key = String;
445
446 fn metric_key(&self) -> Self::Key {
447 self.key.to_owned()
448 }
449 }
450
451 #[test]
452 fn latest_by_key_keeps_newest_sample_per_key() {
453 let fleet = Arc::new(Fleet::join("metrics-keyed-test", 2).unwrap());
454 let family = OrbitMetricFamily::<KeyedSnapshot>::new(fleet);
455 let publisher = family.publisher();
456 let collector = family.collector();
457
458 publisher
459 .publish(&KeyedSnapshot {
460 node: 0,
461 key: "alpha",
462 captured_at: 10,
463 value: 100,
464 })
465 .unwrap();
466 publisher
467 .publish(&KeyedSnapshot {
468 node: 0,
469 key: "beta",
470 captured_at: 11,
471 value: 200,
472 })
473 .unwrap();
474 publisher
475 .publish(&KeyedSnapshot {
476 node: 0,
477 key: "alpha",
478 captured_at: 12,
479 value: 101,
480 })
481 .unwrap();
482
483 let latest = collector.latest_by_key();
484 assert_eq!(latest.len(), 2);
485 assert_eq!(latest["alpha"].snapshot.value, 101);
486 assert_eq!(latest["beta"].snapshot.value, 200);
487 }
488}