kratactl/
metrics.rs

1use crate::format::metrics_value_pretty;
2use anyhow::Result;
3use krata::v1::common::ZoneState;
4use krata::{
5    events::EventStream,
6    v1::{
7        common::{Zone, ZoneMetricNode},
8        control::{
9            control_service_client::ControlServiceClient, watch_events_reply::Event,
10            ListZonesRequest, ReadZoneMetricsRequest,
11        },
12    },
13};
14use log::error;
15use std::time::Duration;
16use tokio::{
17    select,
18    sync::mpsc::{channel, Receiver, Sender},
19    task::JoinHandle,
20    time::{sleep, timeout},
21};
22use tonic::transport::Channel;
23
24pub struct MetricState {
25    pub zone: Zone,
26    pub root: Option<ZoneMetricNode>,
27}
28
29pub struct MultiMetricState {
30    pub zones: Vec<MetricState>,
31}
32
33pub struct MultiMetricCollector {
34    client: ControlServiceClient<Channel>,
35    events: EventStream,
36    period: Duration,
37}
38
39pub struct MultiMetricCollectorHandle {
40    pub receiver: Receiver<MultiMetricState>,
41    task: JoinHandle<()>,
42}
43
44impl Drop for MultiMetricCollectorHandle {
45    fn drop(&mut self) {
46        self.task.abort();
47    }
48}
49
50impl MultiMetricCollector {
51    pub fn new(
52        client: ControlServiceClient<Channel>,
53        events: EventStream,
54        period: Duration,
55    ) -> Result<MultiMetricCollector> {
56        Ok(MultiMetricCollector {
57            client,
58            events,
59            period,
60        })
61    }
62
63    pub async fn launch(mut self) -> Result<MultiMetricCollectorHandle> {
64        let (sender, receiver) = channel::<MultiMetricState>(100);
65        let task = tokio::task::spawn(async move {
66            if let Err(error) = self.process(sender).await {
67                error!("failed to process multi metric collector: {}", error);
68            }
69        });
70        Ok(MultiMetricCollectorHandle { receiver, task })
71    }
72
73    pub async fn process(&mut self, sender: Sender<MultiMetricState>) -> Result<()> {
74        let mut events = self.events.subscribe();
75        let mut zones: Vec<Zone> = self
76            .client
77            .list_zones(ListZonesRequest {})
78            .await?
79            .into_inner()
80            .zones;
81        loop {
82            let collect = select! {
83                x = events.recv() => match x {
84                    Ok(event) => {
85                        let Event::ZoneChanged(changed) = event;
86                            let Some(zone) = changed.zone else {
87                                continue;
88                            };
89                            let Some(ref status) = zone.status else {
90                                continue;
91                            };
92                            zones.retain(|x| x.id != zone.id);
93                            if status.state() != ZoneState::Destroying {
94                                zones.push(zone);
95                            }
96                        false
97                    },
98
99                    Err(error) => {
100                        return Err(error.into());
101                    }
102                },
103
104                _ = sleep(self.period) => {
105                    true
106                }
107            };
108
109            if !collect {
110                continue;
111            }
112
113            let mut metrics = Vec::new();
114            for zone in &zones {
115                let Some(ref status) = zone.status else {
116                    continue;
117                };
118
119                if status.state() != ZoneState::Created {
120                    continue;
121                }
122
123                let root = timeout(
124                    Duration::from_secs(5),
125                    self.client.read_zone_metrics(ReadZoneMetricsRequest {
126                        zone_id: zone.id.clone(),
127                    }),
128                )
129                .await
130                .ok()
131                .and_then(|x| x.ok())
132                .map(|x| x.into_inner())
133                .and_then(|x| x.root);
134                metrics.push(MetricState {
135                    zone: zone.clone(),
136                    root,
137                });
138            }
139            sender.send(MultiMetricState { zones: metrics }).await?;
140        }
141    }
142}
143
144pub fn lookup<'a>(node: &'a ZoneMetricNode, path: &str) -> Option<&'a ZoneMetricNode> {
145    let Some((what, b)) = path.split_once('/') else {
146        return node.children.iter().find(|x| x.name == path);
147    };
148    let next = node.children.iter().find(|x| x.name == what)?;
149    return lookup(next, b);
150}
151
152pub fn lookup_metric_value(node: &ZoneMetricNode, path: &str) -> Option<String> {
153    lookup(node, path).and_then(|x| {
154        x.value
155            .as_ref()
156            .map(|v| metrics_value_pretty(v.clone(), x.format()))
157    })
158}