1use crate::format::metrics_value_pretty;
2use anyhow::Result;
3use krata::v1::common::ZoneState;
4use krata::{
5 events::EventStream,
6 v1::{
7 common::{Zone, ZoneMetricNode},
8 control::{
9 control_service_client::ControlServiceClient, watch_events_reply::Event,
10 ListZonesRequest, ReadZoneMetricsRequest,
11 },
12 },
13};
14use log::error;
15use std::time::Duration;
16use tokio::{
17 select,
18 sync::mpsc::{channel, Receiver, Sender},
19 task::JoinHandle,
20 time::{sleep, timeout},
21};
22use tonic::transport::Channel;
23
24pub struct MetricState {
25 pub zone: Zone,
26 pub root: Option<ZoneMetricNode>,
27}
28
29pub struct MultiMetricState {
30 pub zones: Vec<MetricState>,
31}
32
33pub struct MultiMetricCollector {
34 client: ControlServiceClient<Channel>,
35 events: EventStream,
36 period: Duration,
37}
38
39pub struct MultiMetricCollectorHandle {
40 pub receiver: Receiver<MultiMetricState>,
41 task: JoinHandle<()>,
42}
43
44impl Drop for MultiMetricCollectorHandle {
45 fn drop(&mut self) {
46 self.task.abort();
47 }
48}
49
50impl MultiMetricCollector {
51 pub fn new(
52 client: ControlServiceClient<Channel>,
53 events: EventStream,
54 period: Duration,
55 ) -> Result<MultiMetricCollector> {
56 Ok(MultiMetricCollector {
57 client,
58 events,
59 period,
60 })
61 }
62
63 pub async fn launch(mut self) -> Result<MultiMetricCollectorHandle> {
64 let (sender, receiver) = channel::<MultiMetricState>(100);
65 let task = tokio::task::spawn(async move {
66 if let Err(error) = self.process(sender).await {
67 error!("failed to process multi metric collector: {}", error);
68 }
69 });
70 Ok(MultiMetricCollectorHandle { receiver, task })
71 }
72
73 pub async fn process(&mut self, sender: Sender<MultiMetricState>) -> Result<()> {
74 let mut events = self.events.subscribe();
75 let mut zones: Vec<Zone> = self
76 .client
77 .list_zones(ListZonesRequest {})
78 .await?
79 .into_inner()
80 .zones;
81 loop {
82 let collect = select! {
83 x = events.recv() => match x {
84 Ok(event) => {
85 let Event::ZoneChanged(changed) = event;
86 let Some(zone) = changed.zone else {
87 continue;
88 };
89 let Some(ref status) = zone.status else {
90 continue;
91 };
92 zones.retain(|x| x.id != zone.id);
93 if status.state() != ZoneState::Destroying {
94 zones.push(zone);
95 }
96 false
97 },
98
99 Err(error) => {
100 return Err(error.into());
101 }
102 },
103
104 _ = sleep(self.period) => {
105 true
106 }
107 };
108
109 if !collect {
110 continue;
111 }
112
113 let mut metrics = Vec::new();
114 for zone in &zones {
115 let Some(ref status) = zone.status else {
116 continue;
117 };
118
119 if status.state() != ZoneState::Created {
120 continue;
121 }
122
123 let root = timeout(
124 Duration::from_secs(5),
125 self.client.read_zone_metrics(ReadZoneMetricsRequest {
126 zone_id: zone.id.clone(),
127 }),
128 )
129 .await
130 .ok()
131 .and_then(|x| x.ok())
132 .map(|x| x.into_inner())
133 .and_then(|x| x.root);
134 metrics.push(MetricState {
135 zone: zone.clone(),
136 root,
137 });
138 }
139 sender.send(MultiMetricState { zones: metrics }).await?;
140 }
141 }
142}
143
144pub fn lookup<'a>(node: &'a ZoneMetricNode, path: &str) -> Option<&'a ZoneMetricNode> {
145 let Some((what, b)) = path.split_once('/') else {
146 return node.children.iter().find(|x| x.name == path);
147 };
148 let next = node.children.iter().find(|x| x.name == what)?;
149 return lookup(next, b);
150}
151
152pub fn lookup_metric_value(node: &ZoneMetricNode, path: &str) -> Option<String> {
153 lookup(node, path).and_then(|x| {
154 x.value
155 .as_ref()
156 .map(|v| metrics_value_pretty(v.clone(), x.format()))
157 })
158}