Skip to main content

edgehog_device_runtime_containers/stats/
mod.rs

1// This file is part of Edgehog.
2//
3// Copyright 2025 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Gather statistics of the container and sends them to Astarte.
20
21use std::sync::Arc;
22
23use astarte_device_sdk::aggregate::AstarteObject;
24use astarte_device_sdk::chrono::{DateTime, Utc};
25use astarte_device_sdk::Client;
26use bollard::secret::ContainerStatsResponse;
27use edgehog_store::models::containers::container::ContainerStatus;
28use edgehog_store::models::containers::volume::VolumeStatus;
29use tokio::sync::OnceCell;
30use tracing::{debug, error, instrument, trace};
31use uuid::Uuid;
32
33use crate::container::ContainerId;
34use crate::local::ContainerHandle;
35use crate::store::StateStore;
36use crate::volume::VolumeId;
37use crate::Docker;
38
39use self::blkio::ContainerBlkio;
40use self::cpu::ContainerCpu;
41use self::memory::{ContainerMemory, ContainerMemoryStats};
42use self::network::ContainerNetworkStats;
43use self::procs::ContainerProcesses;
44use self::volume::VolumeUsage;
45
46mod blkio;
47mod cpu;
48mod memory;
49mod network;
50mod procs;
51mod volume;
52
53/// Handles the events received from the container runtime
54#[derive(Debug)]
55pub struct StatsMonitor {
56    handle: Arc<OnceCell<ContainerHandle>>,
57}
58
59impl StatsMonitor {
60    /// Creates a new instance.
61    pub fn new(handle: Arc<OnceCell<ContainerHandle>>) -> Self {
62        Self { handle }
63    }
64
65    /// Creates an initialized instance.
66    pub fn with_handle(client: Docker, store: StateStore) -> Self {
67        Self {
68            handle: Arc::new(OnceCell::const_new_with(ContainerHandle::new(
69                client, store,
70            ))),
71        }
72    }
73
74    fn get_handle(&self) -> Option<&ContainerHandle> {
75        let handle = self.handle.get();
76
77        if handle.is_none() {
78            debug!("handle not yet initialized");
79        }
80
81        handle
82    }
83
84    /// Loads the container ids from the storage
85    async fn load_container_ids(&self) -> Option<Vec<ContainerId>> {
86        let handle = self.get_handle()?;
87
88        let containers: Vec<ContainerId> = handle
89            .store
90            .load_containers_in_state(vec![ContainerStatus::Stopped, ContainerStatus::Running])
91            .await
92            .inspect_err(|err| error!(error = format!("{err:#}"), "couldn't load containers"))
93            .ok()?
94            .into_iter()
95            .map(|(id, local_id)| ContainerId::new(local_id, *id))
96            .collect();
97
98        trace!(len = containers.len(), "loaded containers from store");
99
100        Some(containers)
101    }
102
103    /// Reads the stats of a container
104    #[instrument(skip(self))]
105    async fn read_stats(
106        &self,
107        container: &ContainerId,
108    ) -> Option<(ContainerStatsResponse, DateTime<Utc>)> {
109        let handle = self.handle.get()?;
110
111        let stats = match container.stats(&handle.client).await {
112            Ok(Some(stats)) => stats,
113            Ok(None) => {
114                debug!("missing stats for container");
115
116                return None;
117            }
118            Err(err) => {
119                error!(%container, error = %format!("{:#}", eyre::Report::new(err)), "couldn't get container stasts");
120
121                return None;
122            }
123        };
124
125        let timestamp = stats.read.unwrap_or_else(|| {
126            debug!("missing read timestamp, generating one");
127
128            Utc::now()
129        });
130
131        Some((stats, timestamp))
132    }
133
134    /// Sends the container network stats
135    #[instrument(skip(self, device))]
136    pub async fn network<D>(&mut self, device: &mut D)
137    where
138        D: Client + Send + Sync + 'static,
139    {
140        let Some(containers) = self.load_container_ids().await else {
141            return;
142        };
143
144        for container in containers {
145            let Some((stats, timestamp)) = self.read_stats(&container).await else {
146                continue;
147            };
148
149            if let Some(networks) = stats.networks {
150                let networks = ContainerNetworkStats::from_stats(networks);
151
152                for net in networks {
153                    net.send(&container.name, device, &timestamp).await;
154                }
155            } else {
156                debug!("missing network stats");
157            }
158        }
159    }
160
161    /// Sends the container memory stats
162    #[instrument(skip(self, device))]
163    pub async fn memory<D>(&mut self, device: &mut D)
164    where
165        D: Client + Send + Sync + 'static,
166    {
167        let Some(containers) = self.load_container_ids().await else {
168            return;
169        };
170
171        for container in containers {
172            let Some((stats, timestamp)) = self.read_stats(&container).await else {
173                debug!("missing stats for container");
174
175                continue;
176            };
177
178            if let Some(memory) = stats.memory_stats {
179                ContainerMemory::from(&memory)
180                    .send(&container.name, device, &timestamp)
181                    .await;
182            } else {
183                debug!("missing memory stats");
184            }
185        }
186    }
187
188    /// Sends the container memory stats for cgroup v2
189    #[instrument(skip(self, device))]
190    pub async fn memory_stats<D>(&mut self, device: &mut D)
191    where
192        D: Client + Send + Sync + 'static,
193    {
194        let Some(containers) = self.load_container_ids().await else {
195            return;
196        };
197
198        for container in containers {
199            let Some((stats, timestamp)) = self.read_stats(&container).await else {
200                debug!("missing stats for container");
201
202                continue;
203            };
204
205            if let Some(memory) = stats.memory_stats {
206                if let Some(memory_stats) = memory.stats {
207                    let memory = ContainerMemoryStats::from_stats(memory_stats);
208
209                    for mem in memory {
210                        mem.send(&container.name, device, &timestamp).await;
211                    }
212                } else {
213                    trace!("missing cgroups v2 memory stats");
214                }
215            } else {
216                debug!("missing memory stats");
217            }
218        }
219    }
220
221    /// Sends the container cpu stats
222    #[instrument(skip(self, device))]
223    pub async fn cpu<D>(&mut self, device: &mut D)
224    where
225        D: Client + Send + Sync + 'static,
226    {
227        let Some(containers) = self.load_container_ids().await else {
228            return;
229        };
230
231        for container in containers {
232            let Some((stats, timestamp)) = self.read_stats(&container).await else {
233                debug!("missing stats for container");
234
235                continue;
236            };
237
238            if let Some(cpu) = stats.cpu_stats {
239                ContainerCpu::from_stats(cpu, stats.precpu_stats.unwrap_or_default())
240                    .send(&container.name, device, &timestamp)
241                    .await;
242            } else {
243                debug!("missing cpu stats");
244            }
245        }
246    }
247
248    /// Sends the container blkio stats
249    #[instrument(skip(self, device))]
250    pub async fn blkio<D>(&mut self, device: &mut D)
251    where
252        D: Client + Send,
253    {
254        let Some(containers) = self.load_container_ids().await else {
255            return;
256        };
257
258        for container in containers {
259            let Some((stats, timestamp)) = self.read_stats(&container).await else {
260                debug!("missing stats for container");
261
262                continue;
263            };
264
265            if let Some(blkio) = stats.blkio_stats {
266                let blkio = ContainerBlkio::from_stats(blkio);
267                for value in blkio {
268                    value.send(&container.name, device, &timestamp).await;
269                }
270            } else {
271                debug!("missing blkio stats");
272            }
273        }
274    }
275
276    /// Sends the container pids stats
277    #[instrument(skip(self, device))]
278    pub async fn pids<D>(&mut self, device: &mut D)
279    where
280        D: Client + Send + Sync + 'static,
281    {
282        let Some(containers) = self.load_container_ids().await else {
283            return;
284        };
285
286        for container in containers {
287            let Some((stats, timestamp)) = self.read_stats(&container).await else {
288                debug!("missing stats for container");
289
290                continue;
291            };
292
293            if let Some(pids) = stats.pids_stats {
294                ContainerProcesses::from(pids)
295                    .send(&container.name, device, &timestamp)
296                    .await;
297            } else {
298                debug!("missing pids stats");
299            }
300        }
301    }
302
303    /// Sends the voulume usage stats
304    #[instrument(skip(self, device))]
305    pub async fn volumes<D>(&mut self, device: &mut D)
306    where
307        D: Client + Send + Sync + 'static,
308    {
309        let Some(handle) = self.get_handle() else {
310            return;
311        };
312
313        let volumes: Vec<VolumeId> = handle
314            .store
315            .load_volumes_in_state(VolumeStatus::Created)
316            .await
317            .inspect_err(|err| error!(error = format!("{err:#}"), "couldn't load volumes"))
318            .unwrap_or_default()
319            .into_iter()
320            .map(|id| VolumeId::new(*id))
321            .collect();
322
323        trace!(len = volumes.len(), "loaded volumes from store");
324
325        for volume in volumes {
326            match volume.inspect(&handle.client).await {
327                Ok(Some(info)) => {
328                    VolumeUsage::from(info)
329                        .send(&volume.name, device, &Utc::now())
330                        .await;
331                }
332                Ok(None) => {}
333                Err(err) => {
334                    error!(%volume, error = %format!("{:#}", eyre::Report::new(err)), "couldn't get container stasts");
335
336                    continue;
337                }
338            };
339        }
340    }
341}
342
343/// Send metrics to Astarte for an Interface.
344///
345/// It will handle any error raised by logging it. The interface need to be explicit timestamp.
346trait Metric: TryInto<AstarteObject> {
347    const INTERFACE: &'static str;
348    // Like "container network"
349    const METRIC_NAME: &'static str;
350
351    async fn send<D>(self, id: &Uuid, device: &mut D, timestamp: &DateTime<Utc>)
352    where
353        D: Client + Send,
354        Self::Error: std::error::Error + Send + Sync + 'static,
355    {
356        let data: AstarteObject = match self.try_into() {
357            Ok(data) => data,
358            Err(err) => {
359                error!(container=%id, error = format!("{:#}", eyre::Report::new(err)), "couldn't convert {} stats", Self::METRIC_NAME);
360
361                return;
362            }
363        };
364
365        let res = device
366            .send_object_with_timestamp(Self::INTERFACE, &format!("/{id}"), data, *timestamp)
367            .await;
368
369        if let Err(err) = res {
370            error!(container=%id, error = format!("{:#}", eyre::Report::new(err)), "couldn't send {} stats", Self::METRIC_NAME);
371        }
372    }
373}
374
375trait IntoAstarteExt {
376    type Out;
377
378    fn into_astarte(self) -> Self::Out;
379}
380
381impl IntoAstarteExt for Option<u32> {
382    type Out = i32;
383
384    fn into_astarte(self) -> Self::Out {
385        self.unwrap_or_default().try_into().unwrap_or(i32::MAX)
386    }
387}
388
389impl IntoAstarteExt for Option<u64> {
390    type Out = i64;
391
392    fn into_astarte(self) -> Self::Out {
393        self.unwrap_or_default().try_into().unwrap_or(i64::MAX)
394    }
395}
396
397impl IntoAstarteExt for Option<Vec<u64>> {
398    type Out = Vec<i64>;
399
400    fn into_astarte(self) -> Self::Out {
401        self.unwrap_or_default()
402            .into_iter()
403            .map(|value| value.try_into().unwrap_or(i64::MAX))
404            .collect()
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use edgehog_store::db;
411    use tempfile::TempDir;
412
413    use crate::store::StateStore;
414
415    use super::*;
416
417    #[tokio::test]
418    async fn create_new() {
419        let tmp = TempDir::with_prefix("fetch_by_local_id").unwrap();
420        let db_file = tmp.path().join("state.db");
421        let db_file = db_file.to_str().unwrap();
422
423        let handle = db::Handle::open(db_file).await.unwrap();
424        let store = StateStore::new(handle);
425
426        let client = crate::Docker::connect().await.unwrap();
427
428        let _stats = StatsMonitor::with_handle(client, store);
429    }
430
431    #[test]
432    fn check_into_astarte_ext() {
433        let u32_val: Option<u32> = Some(42);
434        assert_eq!(u32_val.into_astarte(), 42i32);
435
436        let u32_none: Option<u32> = None;
437        assert_eq!(u32_none.into_astarte(), 0i32);
438
439        let u32_max: Option<u32> = Some(u32::MAX);
440        assert_eq!(u32_max.into_astarte(), i32::MAX);
441
442        let u64_val: Option<u64> = Some(12345);
443        assert_eq!(u64_val.into_astarte(), 12345i64);
444
445        let u64_none: Option<u64> = None;
446        assert_eq!(u64_none.into_astarte(), 0i64);
447
448        let u64_max: Option<u64> = Some(u64::MAX);
449        assert_eq!(u64_max.into_astarte(), i64::MAX);
450
451        let vec_val: Option<Vec<u64>> = Some(vec![10, 20, 30]);
452        assert_eq!(vec_val.into_astarte(), vec![10i64, 20i64, 30i64]);
453
454        let vec_none: Option<Vec<u64>> = None;
455        assert_eq!(vec_none.into_astarte(), Vec::<i64>::new());
456
457        let vec_empty: Option<Vec<u64>> = Some(vec![]);
458        assert_eq!(vec_empty.into_astarte(), Vec::<i64>::new());
459
460        let vec_mixed: Option<Vec<u64>> = Some(vec![100, u64::MAX, 200]);
461        assert_eq!(vec_mixed.into_astarte(), vec![100i64, i64::MAX, 200i64]);
462    }
463}