Skip to main content

edgehog_device_runtime/telemetry/
mod.rs

1// This file is part of Edgehog.
2//
3// Copyright 2022 - 2025 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19use std::str::FromStr;
20use std::{borrow::Cow, collections::HashMap, ops::Deref, path::PathBuf};
21
22use async_trait::async_trait;
23use event::{TelemetryConfig, TelemetryEvent};
24use serde::{Deserialize, Serialize};
25use tokio::time::Duration;
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error};
28
29use crate::Client;
30
31use crate::{
32    controller::actor::Actor,
33    repository::{file_state_repository::FileStateRepository, StateRepository},
34};
35
36use self::sender::Task;
37use self::stats::TelemetryInterface;
38
39pub mod event;
40mod sender;
41mod stats;
42pub mod status;
43
44const TELEMETRY_PATH: &str = "telemetry.json";
45
46const DEFAULT_PERIOD: Duration = Duration::from_secs(60);
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct TelemetryInterfaceConfig<'a> {
50    pub interface_name: Cow<'a, str>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub enabled: Option<bool>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub period: Option<u64>,
55}
56
57impl TelemetryInterfaceConfig<'_> {
58    fn period_duration(&self) -> Option<Duration> {
59        self.period.map(Duration::from_secs)
60    }
61}
62
63/// Configuration for the tasks.
64#[derive(Debug, Clone, Copy)]
65pub struct TaskConfig {
66    pub enabled: Overridable<bool>,
67    pub period: Overridable<Duration>,
68}
69
70impl TaskConfig {
71    /// Creates a tasks configuration from the one from the file.
72    fn from_config(config: &TelemetryInterfaceConfig) -> Self {
73        Self {
74            enabled: Overridable::new(config.enabled.unwrap_or_default()),
75            period: Overridable::new(config.period_duration().unwrap_or(DEFAULT_PERIOD)),
76        }
77    }
78
79    /// Creates a task config from the override, with default defaults
80    fn from_override(over: &TelemetryInterfaceConfig) -> Option<Self> {
81        if over.enabled.is_none() && over.period.is_none() {
82            return None;
83        }
84
85        let enabled = match over.enabled {
86            Some(enabled) => Overridable::with_override(false, enabled),
87            None => Overridable::new(false),
88        };
89        let period = match over.period_duration() {
90            Some(period) => Overridable::with_override(DEFAULT_PERIOD, period),
91            None => Overridable::new(DEFAULT_PERIOD),
92        };
93
94        Some(Self { enabled, period })
95    }
96}
97
98impl Default for TaskConfig {
99    fn default() -> Self {
100        Self {
101            enabled: Overridable::new(false),
102            period: Overridable::new(DEFAULT_PERIOD),
103        }
104    }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
108pub struct Overridable<T> {
109    default: T,
110    value: Option<T>,
111}
112
113impl<T> Overridable<T> {
114    #[must_use]
115    fn new(default: T) -> Self {
116        Self {
117            default,
118            value: None,
119        }
120    }
121
122    #[must_use]
123    fn with_override(default: T, value: T) -> Self {
124        Self {
125            default,
126            value: Some(value),
127        }
128    }
129
130    #[must_use]
131    fn get(&self) -> &T {
132        self.value.as_ref().unwrap_or(&self.default)
133    }
134
135    #[must_use]
136    fn get_override(&self) -> Option<&T> {
137        self.value.as_ref()
138    }
139
140    fn set(&mut self, value: T) {
141        self.value.replace(value);
142    }
143
144    fn unset(&mut self) {
145        self.value.take();
146    }
147
148    /// Returns `true` if the overridable has a custom value.
149    #[must_use]
150    fn is_overwritten(&self) -> bool {
151        self.value.is_some()
152    }
153}
154
155impl<T> Default for Overridable<T>
156where
157    T: Default,
158{
159    fn default() -> Self {
160        Self::new(T::default())
161    }
162}
163
164impl<T> Deref for Overridable<T> {
165    type Target = T;
166
167    fn deref(&self) -> &Self::Target {
168        self.get()
169    }
170}
171
172#[derive(Debug)]
173pub struct Telemetry<C> {
174    client: C,
175    configs: HashMap<TelemetryInterface, TaskConfig>,
176    tasks: TelemetryTasks,
177    file_state: FileStateRepository<Vec<TelemetryInterfaceConfig<'static>>>,
178    #[cfg(feature = "containers")]
179    containers: std::sync::Arc<tokio::sync::OnceCell<edgehog_containers::local::ContainerHandle>>,
180}
181
182impl<C> Telemetry<C> {
183    pub async fn from_config(
184        client: C,
185        configs: &[TelemetryInterfaceConfig<'_>],
186        store_directory: PathBuf,
187        #[cfg(feature = "containers")] containers: std::sync::Arc<
188            tokio::sync::OnceCell<edgehog_containers::local::ContainerHandle>,
189        >,
190    ) -> Self {
191        let configs = configs
192            .iter()
193            .filter_map(|cfg| {
194                let interface = match TelemetryInterface::from_str(&cfg.interface_name) {
195                    Ok(interface) => interface,
196                    Err(err) => {
197                        error!("{err}");
198
199                        return None;
200                    }
201                };
202
203                Some((interface, TaskConfig::from_config(cfg)))
204            })
205            .collect();
206
207        let mut telemetry = Telemetry {
208            client,
209            configs,
210            tasks: TelemetryTasks::new(),
211            file_state: FileStateRepository::new(&store_directory, TELEMETRY_PATH),
212            #[cfg(feature = "containers")]
213            containers,
214        };
215
216        telemetry.read_filestate().await;
217
218        telemetry
219    }
220
221    async fn read_filestate(&mut self) {
222        if !self.file_state.exists().await {
223            return;
224        }
225
226        let saved_configs = match self.file_state.read().await {
227            Ok(cfgs) => cfgs,
228            Err(err) => {
229                // Don't error here since the file is corrupted, but it will be overwritten
230                error!(
231                    "couldn't read the saved telemetry configs: {}",
232                    stable_eyre::Report::new(err)
233                );
234
235                return;
236            }
237        };
238
239        for saved_cfg in saved_configs {
240            let interface = match TelemetryInterface::from_str(&saved_cfg.interface_name) {
241                Ok(interface) => interface,
242                Err(err) => {
243                    error!("{err}");
244
245                    continue;
246                }
247            };
248
249            let entry = self.configs.entry(interface).and_modify(|cfg| {
250                if let Some(enabled) = saved_cfg.enabled {
251                    cfg.enabled.set(enabled);
252                }
253                if let Some(period) = saved_cfg.period_duration() {
254                    cfg.period.set(period);
255                }
256            });
257
258            if let Some(cfg) = TaskConfig::from_override(&saved_cfg) {
259                entry.or_insert(cfg);
260            }
261        }
262    }
263
264    async fn initial_telemetry(&mut self)
265    where
266        C: Client + Send + Sync + 'static,
267    {
268        self::status::initial_telemetry(&mut self.client).await;
269        self::stats::initial_telemetry(&mut self.client).await;
270    }
271
272    pub fn run_telemetry(&mut self)
273    where
274        C: Client + Send + Sync + 'static,
275    {
276        for (interface, config) in &self.configs {
277            self.tasks.spawn_task(
278                &self.client,
279                *interface,
280                *config,
281                #[cfg(feature = "containers")]
282                &self.containers,
283            );
284        }
285    }
286
287    async fn save_telemetry_config(&self) {
288        let telemetry_config = self
289            .configs
290            .iter()
291            .filter_map(|(interface, cfg)| {
292                if cfg.enabled.is_overwritten() || cfg.period.is_overwritten() {
293                    Some(TelemetryInterfaceConfig {
294                        interface_name: Cow::Borrowed(interface.as_interface()),
295                        enabled: cfg.enabled.get_override().copied(),
296                        period: cfg.period.get_override().map(Duration::as_secs),
297                    })
298                } else {
299                    None
300                }
301            })
302            .collect();
303
304        if let Err(err) = self.file_state.write(&telemetry_config).await {
305            error!(
306                "failed to write telemetry: {}",
307                stable_eyre::Report::new(err)
308            );
309        }
310    }
311}
312
313#[async_trait]
314impl<C> Actor for Telemetry<C>
315where
316    C: Client + Send + Sync + 'static,
317{
318    type Msg = TelemetryEvent;
319
320    fn task() -> &'static str {
321        "telemetry"
322    }
323
324    async fn init(&mut self) -> stable_eyre::Result<()> {
325        self.initial_telemetry().await;
326
327        self.run_telemetry();
328
329        Ok(())
330    }
331
332    async fn handle(&mut self, msg: Self::Msg) -> stable_eyre::Result<()> {
333        let interface = match TelemetryInterface::from_str(&msg.interface) {
334            Ok(itf) => itf,
335            Err(err) => {
336                error!(
337                    error = format!("{:#}", stable_eyre::Report::new(err)),
338                    "couldn't parse telemetry interface"
339                );
340
341                return Ok(());
342            }
343        };
344
345        let config = self.configs.entry(interface).or_default();
346
347        match msg.config {
348            TelemetryConfig::Enable(Some(enabled)) => {
349                config.enabled.set(enabled);
350            }
351            TelemetryConfig::Enable(None) => {
352                config.enabled.unset();
353            }
354            TelemetryConfig::Period(Some(period)) => {
355                config.period.set(period.0);
356            }
357            TelemetryConfig::Period(None) => {
358                config.period.unset();
359            }
360        };
361
362        // This function will check if we actually need to start the task
363        self.tasks.spawn_task(
364            &self.client,
365            interface,
366            *config,
367            #[cfg(feature = "containers")]
368            &self.containers,
369        );
370
371        self.save_telemetry_config().await;
372
373        Ok(())
374    }
375}
376
377/// Handle spawning and cancellation for the telemetry tasks
378#[derive(Debug)]
379struct TelemetryTasks {
380    cancellation: CancellationToken,
381    tasks: HashMap<TelemetryInterface, CancellationToken>,
382}
383
384impl TelemetryTasks {
385    fn new() -> Self {
386        Self {
387            cancellation: CancellationToken::new(),
388            tasks: HashMap::new(),
389        }
390    }
391
392    fn spawn_task<C>(
393        &mut self,
394        client: &C,
395        t_itf: TelemetryInterface,
396        task_config: TaskConfig,
397        #[cfg(feature = "containers")] containers: &std::sync::Arc<
398            tokio::sync::OnceCell<edgehog_containers::local::ContainerHandle>,
399        >,
400    ) where
401        C: Client + Sync + Send + 'static,
402    {
403        if !task_config.enabled.get() {
404            debug!("task {} disabled", t_itf);
405
406            if let Some(cancel) = self.tasks.remove(&t_itf) {
407                cancel.cancel();
408            }
409
410            return;
411        }
412
413        let period = task_config.period.get();
414        if period.is_zero() {
415            debug!("period is 0 for task {}", t_itf);
416
417            if let Some(cancel) = self.tasks.remove(&t_itf) {
418                cancel.cancel();
419            }
420
421            return;
422        };
423
424        if let Some(cancel) = self.tasks.remove(&t_itf) {
425            debug!("stopping previous task");
426
427            cancel.cancel();
428        }
429
430        let cancel = self.cancellation.child_token();
431
432        Task::spawn(
433            client.clone(),
434            cancel.clone(),
435            t_itf,
436            *period,
437            #[cfg(feature = "containers")]
438            containers,
439        );
440
441        self.tasks.insert(t_itf, cancel);
442    }
443}
444
445#[cfg(test)]
446pub(crate) mod tests {
447    use super::*;
448
449    use astarte_device_sdk::store::SqliteStore;
450    use astarte_device_sdk::transport::mqtt::Mqtt;
451    use astarte_device_sdk_mock::MockDeviceClient;
452    use event::TelemetryPeriod;
453    use mockall::{predicate, Sequence};
454    use tempdir::TempDir;
455
456    use super::status::runtime_info::tests::mock_runtime_info_telemetry;
457
458    const TELEMETRY_PATH: &str = "telemetry.json";
459
460    /// Creates a temporary directory that will be deleted when the returned TempDir is dropped.
461    fn temp_dir() -> (TempDir, PathBuf) {
462        let dir = TempDir::new("edgehog-telemetry").unwrap();
463        let path = dir.path().to_owned();
464
465        (dir, path)
466    }
467
468    fn mock_telemetry(
469        client: MockDeviceClient<Mqtt<SqliteStore>>,
470    ) -> (Telemetry<MockDeviceClient<Mqtt<SqliteStore>>>, TempDir) {
471        let (dir, path) = temp_dir();
472
473        (
474            Telemetry {
475                client,
476                configs: HashMap::new(),
477                tasks: TelemetryTasks::new(),
478                file_state: FileStateRepository::new(&path, TELEMETRY_PATH),
479                #[cfg(feature = "containers")]
480                containers: std::sync::Arc::default(),
481            },
482            dir,
483        )
484    }
485
486    #[tokio::test]
487    async fn telemetry_default_test() {
488        let interface = "io.edgehog.devicemanager.SystemStatus";
489        let configs = vec![TelemetryInterfaceConfig {
490            interface_name: std::borrow::Cow::Borrowed(interface),
491            enabled: Some(true),
492            period: Some(10),
493        }];
494
495        let (_dir, t_dir) = temp_dir();
496
497        let client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
498
499        let tel = Telemetry::from_config(
500            client,
501            &configs,
502            t_dir,
503            #[cfg(feature = "containers")]
504            std::sync::Arc::default(),
505        )
506        .await;
507
508        let system_status_config = tel.configs.get(&TelemetryInterface::SystemStatus).unwrap();
509
510        assert!(system_status_config.enabled.get());
511        assert_eq!(*system_status_config.period.get(), Duration::from_secs(10));
512    }
513
514    #[tokio::test]
515    async fn telemetry_set_test() {
516        let interface = "io.edgehog.devicemanager.SystemStatus";
517        let configs = vec![TelemetryInterfaceConfig {
518            interface_name: interface.into(),
519            enabled: Some(true),
520            period: Some(10),
521        }];
522
523        let (_dir, t_dir) = temp_dir();
524
525        let client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
526
527        let mut tel = Telemetry::from_config(
528            client,
529            &configs,
530            t_dir.clone(),
531            #[cfg(feature = "containers")]
532            std::sync::Arc::default(),
533        )
534        .await;
535
536        let events = [
537            TelemetryEvent {
538                interface: interface.to_string(),
539                config: TelemetryConfig::Enable(Some(false)),
540            },
541            TelemetryEvent {
542                interface: interface.to_string(),
543                config: TelemetryConfig::Period(Some(TelemetryPeriod(Duration::from_secs(30)))),
544            },
545        ];
546
547        for e in events {
548            tel.handle(e).await.unwrap();
549        }
550
551        let config = tel.configs.get(&TelemetryInterface::SystemStatus).unwrap();
552
553        assert!(config.enabled.is_overwritten());
554        assert!(!config.enabled.get());
555        assert!(config.period.is_overwritten());
556        assert_eq!(*config.period.get(), Duration::from_secs(30));
557
558        let telemetry_repo = FileStateRepository::new(&t_dir, TELEMETRY_PATH);
559        let saved_config: Vec<TelemetryInterfaceConfig> = telemetry_repo.read().await.unwrap();
560
561        assert_eq!(saved_config.len(), 1);
562
563        let system_status_config = saved_config.first().unwrap();
564        assert_eq!(system_status_config.enabled, Some(false));
565        assert_eq!(system_status_config.period, Some(30));
566    }
567
568    #[tokio::test]
569    async fn telemetry_unset_test() {
570        let interface = "io.edgehog.devicemanager.SystemStatus";
571        let configs = vec![TelemetryInterfaceConfig {
572            interface_name: interface.into(),
573            enabled: Some(true),
574            period: Some(10),
575        }];
576
577        let (_dir, t_dir) = temp_dir();
578        let mut client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
579        let mut seq = Sequence::new();
580
581        client
582            .expect_clone()
583            .times(2)
584            .in_sequence(&mut seq)
585            .returning(MockDeviceClient::new);
586
587        let mut tel = Telemetry::from_config(
588            client,
589            &configs,
590            t_dir.clone(),
591            #[cfg(feature = "containers")]
592            std::sync::Arc::default(),
593        )
594        .await;
595
596        let events = [
597            TelemetryEvent {
598                interface: interface.to_string(),
599                config: TelemetryConfig::Enable(None),
600            },
601            TelemetryEvent {
602                interface: interface.to_string(),
603                config: TelemetryConfig::Period(None),
604            },
605        ];
606
607        for e in events {
608            tel.handle(e).await.unwrap();
609        }
610
611        let config = tel.configs.get(&TelemetryInterface::SystemStatus).unwrap();
612
613        assert!(!config.enabled.is_overwritten());
614        assert!(config.enabled.get());
615        assert!(!config.period.is_overwritten());
616        assert_eq!(*config.period.get(), Duration::from_secs(10));
617
618        let telemetry_repo = FileStateRepository::new(&t_dir, TELEMETRY_PATH);
619        let saved_config: Vec<TelemetryInterfaceConfig> = telemetry_repo.read().await.unwrap();
620
621        assert!(saved_config.is_empty());
622    }
623
624    #[tokio::test]
625    async fn send_initial_telemetry_success() {
626        let client = {
627            let mut client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
628            let mut seq = Sequence::new();
629
630            client
631                .expect_set_property()
632                .once()
633                .in_sequence(&mut seq)
634                .with(
635                    predicate::eq("io.edgehog.devicemanager.OSInfo"),
636                    predicate::eq("/osName"),
637                    predicate::always(),
638                )
639                .returning(|_, _, _| Ok(()));
640
641            client
642                .expect_set_property()
643                .once()
644                .in_sequence(&mut seq)
645                .with(
646                    predicate::eq("io.edgehog.devicemanager.OSInfo"),
647                    predicate::eq("/osVersion"),
648                    predicate::always(),
649                )
650                .returning(|_, _, _| Ok(()));
651
652            client
653                .expect_set_property()
654                .times(..)
655                .with(
656                    predicate::eq("io.edgehog.devicemanager.HardwareInfo"),
657                    predicate::always(),
658                    predicate::always(),
659                )
660                .returning(|_, _, _| Ok(()));
661
662            mock_runtime_info_telemetry(&mut client, &mut seq);
663
664            client
665                .expect_send_object_with_timestamp()
666                .times(..)
667                .with(
668                    predicate::eq("io.edgehog.devicemanager.StorageUsage"),
669                    predicate::always(),
670                    predicate::always(),
671                    predicate::always(),
672                )
673                .returning(|_, _, _, _| Ok(()));
674
675            client
676                .expect_set_property()
677                .times(..)
678                .with(
679                    predicate::eq("io.edgehog.devicemanager.NetworkInterfaceProperties"),
680                    predicate::always(),
681                    predicate::always(),
682                )
683                .returning(|_, _, _| Ok(()));
684
685            client
686                .expect_set_property()
687                .with(
688                    predicate::eq("io.edgehog.devicemanager.SystemInfo"),
689                    predicate::always(),
690                    predicate::always(),
691                )
692                .returning(|_, _, _| Ok(()));
693
694            client
695                .expect_set_property()
696                .with(
697                    predicate::eq("io.edgehog.devicemanager.BaseImage"),
698                    predicate::always(),
699                    predicate::always(),
700                )
701                .returning(|_, _, _| Ok(()));
702
703            client
704                .expect_send_object_with_timestamp()
705                .with(
706                    predicate::eq("io.edgehog.devicemanager.SystemStatus"),
707                    predicate::eq("/systemStatus"),
708                    predicate::always(),
709                    predicate::always(),
710                )
711                .returning(|_, _, _, _| Ok(()));
712
713            client
714        };
715
716        let (mut telemetry, _dir) = mock_telemetry(client);
717
718        telemetry.initial_telemetry().await;
719    }
720}