edgehog_device_runtime_containers/stats/
mod.rs1use std::sync::Arc;
22
23use astarte_device_sdk::aggregate::AstarteObject;
24use astarte_device_sdk::chrono::{DateTime, Utc};
25use astarte_device_sdk::Client;
26use bollard::secret::ContainerStatsResponse;
27use edgehog_store::models::containers::container::ContainerStatus;
28use edgehog_store::models::containers::volume::VolumeStatus;
29use tokio::sync::OnceCell;
30use tracing::{debug, error, instrument, trace};
31use uuid::Uuid;
32
33use crate::container::ContainerId;
34use crate::local::ContainerHandle;
35use crate::store::StateStore;
36use crate::volume::VolumeId;
37use crate::Docker;
38
39use self::blkio::ContainerBlkio;
40use self::cpu::ContainerCpu;
41use self::memory::{ContainerMemory, ContainerMemoryStats};
42use self::network::ContainerNetworkStats;
43use self::procs::ContainerProcesses;
44use self::volume::VolumeUsage;
45
46mod blkio;
47mod cpu;
48mod memory;
49mod network;
50mod procs;
51mod volume;
52
53#[derive(Debug)]
55pub struct StatsMonitor {
56 handle: Arc<OnceCell<ContainerHandle>>,
57}
58
59impl StatsMonitor {
60 pub fn new(handle: Arc<OnceCell<ContainerHandle>>) -> Self {
62 Self { handle }
63 }
64
65 pub fn with_handle(client: Docker, store: StateStore) -> Self {
67 Self {
68 handle: Arc::new(OnceCell::const_new_with(ContainerHandle::new(
69 client, store,
70 ))),
71 }
72 }
73
74 fn get_handle(&self) -> Option<&ContainerHandle> {
75 let handle = self.handle.get();
76
77 if handle.is_none() {
78 debug!("handle not yet initialized");
79 }
80
81 handle
82 }
83
84 async fn load_container_ids(&self) -> Option<Vec<ContainerId>> {
86 let handle = self.get_handle()?;
87
88 let containers: Vec<ContainerId> = handle
89 .store
90 .load_containers_in_state(vec![ContainerStatus::Stopped, ContainerStatus::Running])
91 .await
92 .inspect_err(|err| error!(error = format!("{err:#}"), "couldn't load containers"))
93 .ok()?
94 .into_iter()
95 .map(|(id, local_id)| ContainerId::new(local_id, *id))
96 .collect();
97
98 trace!(len = containers.len(), "loaded containers from store");
99
100 Some(containers)
101 }
102
103 #[instrument(skip(self))]
105 async fn read_stats(
106 &self,
107 container: &ContainerId,
108 ) -> Option<(ContainerStatsResponse, DateTime<Utc>)> {
109 let handle = self.handle.get()?;
110
111 let stats = match container.stats(&handle.client).await {
112 Ok(Some(stats)) => stats,
113 Ok(None) => {
114 debug!("missing stats for container");
115
116 return None;
117 }
118 Err(err) => {
119 error!(%container, error = %format!("{:#}", eyre::Report::new(err)), "couldn't get container stasts");
120
121 return None;
122 }
123 };
124
125 let timestamp = stats.read.unwrap_or_else(|| {
126 debug!("missing read timestamp, generating one");
127
128 Utc::now()
129 });
130
131 Some((stats, timestamp))
132 }
133
134 #[instrument(skip(self, device))]
136 pub async fn network<D>(&mut self, device: &mut D)
137 where
138 D: Client + Send + Sync + 'static,
139 {
140 let Some(containers) = self.load_container_ids().await else {
141 return;
142 };
143
144 for container in containers {
145 let Some((stats, timestamp)) = self.read_stats(&container).await else {
146 continue;
147 };
148
149 if let Some(networks) = stats.networks {
150 let networks = ContainerNetworkStats::from_stats(networks);
151
152 for net in networks {
153 net.send(&container.name, device, ×tamp).await;
154 }
155 } else {
156 debug!("missing network stats");
157 }
158 }
159 }
160
161 #[instrument(skip(self, device))]
163 pub async fn memory<D>(&mut self, device: &mut D)
164 where
165 D: Client + Send + Sync + 'static,
166 {
167 let Some(containers) = self.load_container_ids().await else {
168 return;
169 };
170
171 for container in containers {
172 let Some((stats, timestamp)) = self.read_stats(&container).await else {
173 debug!("missing stats for container");
174
175 continue;
176 };
177
178 if let Some(memory) = stats.memory_stats {
179 ContainerMemory::from(&memory)
180 .send(&container.name, device, ×tamp)
181 .await;
182 } else {
183 debug!("missing memory stats");
184 }
185 }
186 }
187
188 #[instrument(skip(self, device))]
190 pub async fn memory_stats<D>(&mut self, device: &mut D)
191 where
192 D: Client + Send + Sync + 'static,
193 {
194 let Some(containers) = self.load_container_ids().await else {
195 return;
196 };
197
198 for container in containers {
199 let Some((stats, timestamp)) = self.read_stats(&container).await else {
200 debug!("missing stats for container");
201
202 continue;
203 };
204
205 if let Some(memory) = stats.memory_stats {
206 if let Some(memory_stats) = memory.stats {
207 let memory = ContainerMemoryStats::from_stats(memory_stats);
208
209 for mem in memory {
210 mem.send(&container.name, device, ×tamp).await;
211 }
212 } else {
213 trace!("missing cgroups v2 memory stats");
214 }
215 } else {
216 debug!("missing memory stats");
217 }
218 }
219 }
220
221 #[instrument(skip(self, device))]
223 pub async fn cpu<D>(&mut self, device: &mut D)
224 where
225 D: Client + Send + Sync + 'static,
226 {
227 let Some(containers) = self.load_container_ids().await else {
228 return;
229 };
230
231 for container in containers {
232 let Some((stats, timestamp)) = self.read_stats(&container).await else {
233 debug!("missing stats for container");
234
235 continue;
236 };
237
238 if let Some(cpu) = stats.cpu_stats {
239 ContainerCpu::from_stats(cpu, stats.precpu_stats.unwrap_or_default())
240 .send(&container.name, device, ×tamp)
241 .await;
242 } else {
243 debug!("missing cpu stats");
244 }
245 }
246 }
247
248 #[instrument(skip(self, device))]
250 pub async fn blkio<D>(&mut self, device: &mut D)
251 where
252 D: Client + Send,
253 {
254 let Some(containers) = self.load_container_ids().await else {
255 return;
256 };
257
258 for container in containers {
259 let Some((stats, timestamp)) = self.read_stats(&container).await else {
260 debug!("missing stats for container");
261
262 continue;
263 };
264
265 if let Some(blkio) = stats.blkio_stats {
266 let blkio = ContainerBlkio::from_stats(blkio);
267 for value in blkio {
268 value.send(&container.name, device, ×tamp).await;
269 }
270 } else {
271 debug!("missing blkio stats");
272 }
273 }
274 }
275
276 #[instrument(skip(self, device))]
278 pub async fn pids<D>(&mut self, device: &mut D)
279 where
280 D: Client + Send + Sync + 'static,
281 {
282 let Some(containers) = self.load_container_ids().await else {
283 return;
284 };
285
286 for container in containers {
287 let Some((stats, timestamp)) = self.read_stats(&container).await else {
288 debug!("missing stats for container");
289
290 continue;
291 };
292
293 if let Some(pids) = stats.pids_stats {
294 ContainerProcesses::from(pids)
295 .send(&container.name, device, ×tamp)
296 .await;
297 } else {
298 debug!("missing pids stats");
299 }
300 }
301 }
302
303 #[instrument(skip(self, device))]
305 pub async fn volumes<D>(&mut self, device: &mut D)
306 where
307 D: Client + Send + Sync + 'static,
308 {
309 let Some(handle) = self.get_handle() else {
310 return;
311 };
312
313 let volumes: Vec<VolumeId> = handle
314 .store
315 .load_volumes_in_state(VolumeStatus::Created)
316 .await
317 .inspect_err(|err| error!(error = format!("{err:#}"), "couldn't load volumes"))
318 .unwrap_or_default()
319 .into_iter()
320 .map(|id| VolumeId::new(*id))
321 .collect();
322
323 trace!(len = volumes.len(), "loaded volumes from store");
324
325 for volume in volumes {
326 match volume.inspect(&handle.client).await {
327 Ok(Some(info)) => {
328 VolumeUsage::from(info)
329 .send(&volume.name, device, &Utc::now())
330 .await;
331 }
332 Ok(None) => {}
333 Err(err) => {
334 error!(%volume, error = %format!("{:#}", eyre::Report::new(err)), "couldn't get container stasts");
335
336 continue;
337 }
338 };
339 }
340 }
341}
342
343trait Metric: TryInto<AstarteObject> {
347 const INTERFACE: &'static str;
348 const METRIC_NAME: &'static str;
350
351 async fn send<D>(self, id: &Uuid, device: &mut D, timestamp: &DateTime<Utc>)
352 where
353 D: Client + Send,
354 Self::Error: std::error::Error + Send + Sync + 'static,
355 {
356 let data: AstarteObject = match self.try_into() {
357 Ok(data) => data,
358 Err(err) => {
359 error!(container=%id, error = format!("{:#}", eyre::Report::new(err)), "couldn't convert {} stats", Self::METRIC_NAME);
360
361 return;
362 }
363 };
364
365 let res = device
366 .send_object_with_timestamp(Self::INTERFACE, &format!("/{id}"), data, *timestamp)
367 .await;
368
369 if let Err(err) = res {
370 error!(container=%id, error = format!("{:#}", eyre::Report::new(err)), "couldn't send {} stats", Self::METRIC_NAME);
371 }
372 }
373}
374
375trait IntoAstarteExt {
376 type Out;
377
378 fn into_astarte(self) -> Self::Out;
379}
380
381impl IntoAstarteExt for Option<u32> {
382 type Out = i32;
383
384 fn into_astarte(self) -> Self::Out {
385 self.unwrap_or_default().try_into().unwrap_or(i32::MAX)
386 }
387}
388
389impl IntoAstarteExt for Option<u64> {
390 type Out = i64;
391
392 fn into_astarte(self) -> Self::Out {
393 self.unwrap_or_default().try_into().unwrap_or(i64::MAX)
394 }
395}
396
397impl IntoAstarteExt for Option<Vec<u64>> {
398 type Out = Vec<i64>;
399
400 fn into_astarte(self) -> Self::Out {
401 self.unwrap_or_default()
402 .into_iter()
403 .map(|value| value.try_into().unwrap_or(i64::MAX))
404 .collect()
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use edgehog_store::db;
411 use tempfile::TempDir;
412
413 use crate::store::StateStore;
414
415 use super::*;
416
417 #[tokio::test]
418 async fn create_new() {
419 let tmp = TempDir::with_prefix("fetch_by_local_id").unwrap();
420 let db_file = tmp.path().join("state.db");
421 let db_file = db_file.to_str().unwrap();
422
423 let handle = db::Handle::open(db_file).await.unwrap();
424 let store = StateStore::new(handle);
425
426 let client = crate::Docker::connect().await.unwrap();
427
428 let _stats = StatsMonitor::with_handle(client, store);
429 }
430
431 #[test]
432 fn check_into_astarte_ext() {
433 let u32_val: Option<u32> = Some(42);
434 assert_eq!(u32_val.into_astarte(), 42i32);
435
436 let u32_none: Option<u32> = None;
437 assert_eq!(u32_none.into_astarte(), 0i32);
438
439 let u32_max: Option<u32> = Some(u32::MAX);
440 assert_eq!(u32_max.into_astarte(), i32::MAX);
441
442 let u64_val: Option<u64> = Some(12345);
443 assert_eq!(u64_val.into_astarte(), 12345i64);
444
445 let u64_none: Option<u64> = None;
446 assert_eq!(u64_none.into_astarte(), 0i64);
447
448 let u64_max: Option<u64> = Some(u64::MAX);
449 assert_eq!(u64_max.into_astarte(), i64::MAX);
450
451 let vec_val: Option<Vec<u64>> = Some(vec![10, 20, 30]);
452 assert_eq!(vec_val.into_astarte(), vec![10i64, 20i64, 30i64]);
453
454 let vec_none: Option<Vec<u64>> = None;
455 assert_eq!(vec_none.into_astarte(), Vec::<i64>::new());
456
457 let vec_empty: Option<Vec<u64>> = Some(vec![]);
458 assert_eq!(vec_empty.into_astarte(), Vec::<i64>::new());
459
460 let vec_mixed: Option<Vec<u64>> = Some(vec![100, u64::MAX, 200]);
461 assert_eq!(vec_mixed.into_astarte(), vec![100i64, i64::MAX, 200i64]);
462 }
463}