1use std::str::FromStr;
20use std::{borrow::Cow, collections::HashMap, ops::Deref, path::PathBuf};
21
22use async_trait::async_trait;
23use event::{TelemetryConfig, TelemetryEvent};
24use serde::{Deserialize, Serialize};
25use tokio::time::Duration;
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error};
28
29use crate::Client;
30
31use crate::{
32 controller::actor::Actor,
33 repository::{file_state_repository::FileStateRepository, StateRepository},
34};
35
36use self::sender::Task;
37use self::stats::TelemetryInterface;
38
39pub mod event;
40mod sender;
41mod stats;
42pub mod status;
43
44const TELEMETRY_PATH: &str = "telemetry.json";
45
46const DEFAULT_PERIOD: Duration = Duration::from_secs(60);
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct TelemetryInterfaceConfig<'a> {
50 pub interface_name: Cow<'a, str>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub enabled: Option<bool>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub period: Option<u64>,
55}
56
57impl TelemetryInterfaceConfig<'_> {
58 fn period_duration(&self) -> Option<Duration> {
59 self.period.map(Duration::from_secs)
60 }
61}
62
63#[derive(Debug, Clone, Copy)]
65pub struct TaskConfig {
66 pub enabled: Overridable<bool>,
67 pub period: Overridable<Duration>,
68}
69
70impl TaskConfig {
71 fn from_config(config: &TelemetryInterfaceConfig) -> Self {
73 Self {
74 enabled: Overridable::new(config.enabled.unwrap_or_default()),
75 period: Overridable::new(config.period_duration().unwrap_or(DEFAULT_PERIOD)),
76 }
77 }
78
79 fn from_override(over: &TelemetryInterfaceConfig) -> Option<Self> {
81 if over.enabled.is_none() && over.period.is_none() {
82 return None;
83 }
84
85 let enabled = match over.enabled {
86 Some(enabled) => Overridable::with_override(false, enabled),
87 None => Overridable::new(false),
88 };
89 let period = match over.period_duration() {
90 Some(period) => Overridable::with_override(DEFAULT_PERIOD, period),
91 None => Overridable::new(DEFAULT_PERIOD),
92 };
93
94 Some(Self { enabled, period })
95 }
96}
97
98impl Default for TaskConfig {
99 fn default() -> Self {
100 Self {
101 enabled: Overridable::new(false),
102 period: Overridable::new(DEFAULT_PERIOD),
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
108pub struct Overridable<T> {
109 default: T,
110 value: Option<T>,
111}
112
113impl<T> Overridable<T> {
114 #[must_use]
115 fn new(default: T) -> Self {
116 Self {
117 default,
118 value: None,
119 }
120 }
121
122 #[must_use]
123 fn with_override(default: T, value: T) -> Self {
124 Self {
125 default,
126 value: Some(value),
127 }
128 }
129
130 #[must_use]
131 fn get(&self) -> &T {
132 self.value.as_ref().unwrap_or(&self.default)
133 }
134
135 #[must_use]
136 fn get_override(&self) -> Option<&T> {
137 self.value.as_ref()
138 }
139
140 fn set(&mut self, value: T) {
141 self.value.replace(value);
142 }
143
144 fn unset(&mut self) {
145 self.value.take();
146 }
147
148 #[must_use]
150 fn is_overwritten(&self) -> bool {
151 self.value.is_some()
152 }
153}
154
155impl<T> Default for Overridable<T>
156where
157 T: Default,
158{
159 fn default() -> Self {
160 Self::new(T::default())
161 }
162}
163
164impl<T> Deref for Overridable<T> {
165 type Target = T;
166
167 fn deref(&self) -> &Self::Target {
168 self.get()
169 }
170}
171
172#[derive(Debug)]
173pub struct Telemetry<C> {
174 client: C,
175 configs: HashMap<TelemetryInterface, TaskConfig>,
176 tasks: TelemetryTasks,
177 file_state: FileStateRepository<Vec<TelemetryInterfaceConfig<'static>>>,
178 #[cfg(feature = "containers")]
179 containers: std::sync::Arc<tokio::sync::OnceCell<edgehog_containers::local::ContainerHandle>>,
180}
181
182impl<C> Telemetry<C> {
183 pub async fn from_config(
184 client: C,
185 configs: &[TelemetryInterfaceConfig<'_>],
186 store_directory: PathBuf,
187 #[cfg(feature = "containers")] containers: std::sync::Arc<
188 tokio::sync::OnceCell<edgehog_containers::local::ContainerHandle>,
189 >,
190 ) -> Self {
191 let configs = configs
192 .iter()
193 .filter_map(|cfg| {
194 let interface = match TelemetryInterface::from_str(&cfg.interface_name) {
195 Ok(interface) => interface,
196 Err(err) => {
197 error!("{err}");
198
199 return None;
200 }
201 };
202
203 Some((interface, TaskConfig::from_config(cfg)))
204 })
205 .collect();
206
207 let mut telemetry = Telemetry {
208 client,
209 configs,
210 tasks: TelemetryTasks::new(),
211 file_state: FileStateRepository::new(&store_directory, TELEMETRY_PATH),
212 #[cfg(feature = "containers")]
213 containers,
214 };
215
216 telemetry.read_filestate().await;
217
218 telemetry
219 }
220
221 async fn read_filestate(&mut self) {
222 if !self.file_state.exists().await {
223 return;
224 }
225
226 let saved_configs = match self.file_state.read().await {
227 Ok(cfgs) => cfgs,
228 Err(err) => {
229 error!(
231 "couldn't read the saved telemetry configs: {}",
232 stable_eyre::Report::new(err)
233 );
234
235 return;
236 }
237 };
238
239 for saved_cfg in saved_configs {
240 let interface = match TelemetryInterface::from_str(&saved_cfg.interface_name) {
241 Ok(interface) => interface,
242 Err(err) => {
243 error!("{err}");
244
245 continue;
246 }
247 };
248
249 let entry = self.configs.entry(interface).and_modify(|cfg| {
250 if let Some(enabled) = saved_cfg.enabled {
251 cfg.enabled.set(enabled);
252 }
253 if let Some(period) = saved_cfg.period_duration() {
254 cfg.period.set(period);
255 }
256 });
257
258 if let Some(cfg) = TaskConfig::from_override(&saved_cfg) {
259 entry.or_insert(cfg);
260 }
261 }
262 }
263
264 async fn initial_telemetry(&mut self)
265 where
266 C: Client + Send + Sync + 'static,
267 {
268 self::status::initial_telemetry(&mut self.client).await;
269 self::stats::initial_telemetry(&mut self.client).await;
270 }
271
272 pub fn run_telemetry(&mut self)
273 where
274 C: Client + Send + Sync + 'static,
275 {
276 for (interface, config) in &self.configs {
277 self.tasks.spawn_task(
278 &self.client,
279 *interface,
280 *config,
281 #[cfg(feature = "containers")]
282 &self.containers,
283 );
284 }
285 }
286
287 async fn save_telemetry_config(&self) {
288 let telemetry_config = self
289 .configs
290 .iter()
291 .filter_map(|(interface, cfg)| {
292 if cfg.enabled.is_overwritten() || cfg.period.is_overwritten() {
293 Some(TelemetryInterfaceConfig {
294 interface_name: Cow::Borrowed(interface.as_interface()),
295 enabled: cfg.enabled.get_override().copied(),
296 period: cfg.period.get_override().map(Duration::as_secs),
297 })
298 } else {
299 None
300 }
301 })
302 .collect();
303
304 if let Err(err) = self.file_state.write(&telemetry_config).await {
305 error!(
306 "failed to write telemetry: {}",
307 stable_eyre::Report::new(err)
308 );
309 }
310 }
311}
312
313#[async_trait]
314impl<C> Actor for Telemetry<C>
315where
316 C: Client + Send + Sync + 'static,
317{
318 type Msg = TelemetryEvent;
319
320 fn task() -> &'static str {
321 "telemetry"
322 }
323
324 async fn init(&mut self) -> stable_eyre::Result<()> {
325 self.initial_telemetry().await;
326
327 self.run_telemetry();
328
329 Ok(())
330 }
331
332 async fn handle(&mut self, msg: Self::Msg) -> stable_eyre::Result<()> {
333 let interface = match TelemetryInterface::from_str(&msg.interface) {
334 Ok(itf) => itf,
335 Err(err) => {
336 error!(
337 error = format!("{:#}", stable_eyre::Report::new(err)),
338 "couldn't parse telemetry interface"
339 );
340
341 return Ok(());
342 }
343 };
344
345 let config = self.configs.entry(interface).or_default();
346
347 match msg.config {
348 TelemetryConfig::Enable(Some(enabled)) => {
349 config.enabled.set(enabled);
350 }
351 TelemetryConfig::Enable(None) => {
352 config.enabled.unset();
353 }
354 TelemetryConfig::Period(Some(period)) => {
355 config.period.set(period.0);
356 }
357 TelemetryConfig::Period(None) => {
358 config.period.unset();
359 }
360 };
361
362 self.tasks.spawn_task(
364 &self.client,
365 interface,
366 *config,
367 #[cfg(feature = "containers")]
368 &self.containers,
369 );
370
371 self.save_telemetry_config().await;
372
373 Ok(())
374 }
375}
376
377#[derive(Debug)]
379struct TelemetryTasks {
380 cancellation: CancellationToken,
381 tasks: HashMap<TelemetryInterface, CancellationToken>,
382}
383
384impl TelemetryTasks {
385 fn new() -> Self {
386 Self {
387 cancellation: CancellationToken::new(),
388 tasks: HashMap::new(),
389 }
390 }
391
392 fn spawn_task<C>(
393 &mut self,
394 client: &C,
395 t_itf: TelemetryInterface,
396 task_config: TaskConfig,
397 #[cfg(feature = "containers")] containers: &std::sync::Arc<
398 tokio::sync::OnceCell<edgehog_containers::local::ContainerHandle>,
399 >,
400 ) where
401 C: Client + Sync + Send + 'static,
402 {
403 if !task_config.enabled.get() {
404 debug!("task {} disabled", t_itf);
405
406 if let Some(cancel) = self.tasks.remove(&t_itf) {
407 cancel.cancel();
408 }
409
410 return;
411 }
412
413 let period = task_config.period.get();
414 if period.is_zero() {
415 debug!("period is 0 for task {}", t_itf);
416
417 if let Some(cancel) = self.tasks.remove(&t_itf) {
418 cancel.cancel();
419 }
420
421 return;
422 };
423
424 if let Some(cancel) = self.tasks.remove(&t_itf) {
425 debug!("stopping previous task");
426
427 cancel.cancel();
428 }
429
430 let cancel = self.cancellation.child_token();
431
432 Task::spawn(
433 client.clone(),
434 cancel.clone(),
435 t_itf,
436 *period,
437 #[cfg(feature = "containers")]
438 containers,
439 );
440
441 self.tasks.insert(t_itf, cancel);
442 }
443}
444
445#[cfg(test)]
446pub(crate) mod tests {
447 use super::*;
448
449 use astarte_device_sdk::store::SqliteStore;
450 use astarte_device_sdk::transport::mqtt::Mqtt;
451 use astarte_device_sdk_mock::MockDeviceClient;
452 use event::TelemetryPeriod;
453 use mockall::{predicate, Sequence};
454 use tempdir::TempDir;
455
456 use super::status::runtime_info::tests::mock_runtime_info_telemetry;
457
458 const TELEMETRY_PATH: &str = "telemetry.json";
459
460 fn temp_dir() -> (TempDir, PathBuf) {
462 let dir = TempDir::new("edgehog-telemetry").unwrap();
463 let path = dir.path().to_owned();
464
465 (dir, path)
466 }
467
468 fn mock_telemetry(
469 client: MockDeviceClient<Mqtt<SqliteStore>>,
470 ) -> (Telemetry<MockDeviceClient<Mqtt<SqliteStore>>>, TempDir) {
471 let (dir, path) = temp_dir();
472
473 (
474 Telemetry {
475 client,
476 configs: HashMap::new(),
477 tasks: TelemetryTasks::new(),
478 file_state: FileStateRepository::new(&path, TELEMETRY_PATH),
479 #[cfg(feature = "containers")]
480 containers: std::sync::Arc::default(),
481 },
482 dir,
483 )
484 }
485
486 #[tokio::test]
487 async fn telemetry_default_test() {
488 let interface = "io.edgehog.devicemanager.SystemStatus";
489 let configs = vec![TelemetryInterfaceConfig {
490 interface_name: std::borrow::Cow::Borrowed(interface),
491 enabled: Some(true),
492 period: Some(10),
493 }];
494
495 let (_dir, t_dir) = temp_dir();
496
497 let client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
498
499 let tel = Telemetry::from_config(
500 client,
501 &configs,
502 t_dir,
503 #[cfg(feature = "containers")]
504 std::sync::Arc::default(),
505 )
506 .await;
507
508 let system_status_config = tel.configs.get(&TelemetryInterface::SystemStatus).unwrap();
509
510 assert!(system_status_config.enabled.get());
511 assert_eq!(*system_status_config.period.get(), Duration::from_secs(10));
512 }
513
514 #[tokio::test]
515 async fn telemetry_set_test() {
516 let interface = "io.edgehog.devicemanager.SystemStatus";
517 let configs = vec![TelemetryInterfaceConfig {
518 interface_name: interface.into(),
519 enabled: Some(true),
520 period: Some(10),
521 }];
522
523 let (_dir, t_dir) = temp_dir();
524
525 let client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
526
527 let mut tel = Telemetry::from_config(
528 client,
529 &configs,
530 t_dir.clone(),
531 #[cfg(feature = "containers")]
532 std::sync::Arc::default(),
533 )
534 .await;
535
536 let events = [
537 TelemetryEvent {
538 interface: interface.to_string(),
539 config: TelemetryConfig::Enable(Some(false)),
540 },
541 TelemetryEvent {
542 interface: interface.to_string(),
543 config: TelemetryConfig::Period(Some(TelemetryPeriod(Duration::from_secs(30)))),
544 },
545 ];
546
547 for e in events {
548 tel.handle(e).await.unwrap();
549 }
550
551 let config = tel.configs.get(&TelemetryInterface::SystemStatus).unwrap();
552
553 assert!(config.enabled.is_overwritten());
554 assert!(!config.enabled.get());
555 assert!(config.period.is_overwritten());
556 assert_eq!(*config.period.get(), Duration::from_secs(30));
557
558 let telemetry_repo = FileStateRepository::new(&t_dir, TELEMETRY_PATH);
559 let saved_config: Vec<TelemetryInterfaceConfig> = telemetry_repo.read().await.unwrap();
560
561 assert_eq!(saved_config.len(), 1);
562
563 let system_status_config = saved_config.first().unwrap();
564 assert_eq!(system_status_config.enabled, Some(false));
565 assert_eq!(system_status_config.period, Some(30));
566 }
567
568 #[tokio::test]
569 async fn telemetry_unset_test() {
570 let interface = "io.edgehog.devicemanager.SystemStatus";
571 let configs = vec![TelemetryInterfaceConfig {
572 interface_name: interface.into(),
573 enabled: Some(true),
574 period: Some(10),
575 }];
576
577 let (_dir, t_dir) = temp_dir();
578 let mut client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
579 let mut seq = Sequence::new();
580
581 client
582 .expect_clone()
583 .times(2)
584 .in_sequence(&mut seq)
585 .returning(MockDeviceClient::new);
586
587 let mut tel = Telemetry::from_config(
588 client,
589 &configs,
590 t_dir.clone(),
591 #[cfg(feature = "containers")]
592 std::sync::Arc::default(),
593 )
594 .await;
595
596 let events = [
597 TelemetryEvent {
598 interface: interface.to_string(),
599 config: TelemetryConfig::Enable(None),
600 },
601 TelemetryEvent {
602 interface: interface.to_string(),
603 config: TelemetryConfig::Period(None),
604 },
605 ];
606
607 for e in events {
608 tel.handle(e).await.unwrap();
609 }
610
611 let config = tel.configs.get(&TelemetryInterface::SystemStatus).unwrap();
612
613 assert!(!config.enabled.is_overwritten());
614 assert!(config.enabled.get());
615 assert!(!config.period.is_overwritten());
616 assert_eq!(*config.period.get(), Duration::from_secs(10));
617
618 let telemetry_repo = FileStateRepository::new(&t_dir, TELEMETRY_PATH);
619 let saved_config: Vec<TelemetryInterfaceConfig> = telemetry_repo.read().await.unwrap();
620
621 assert!(saved_config.is_empty());
622 }
623
624 #[tokio::test]
625 async fn send_initial_telemetry_success() {
626 let client = {
627 let mut client = MockDeviceClient::<Mqtt<SqliteStore>>::new();
628 let mut seq = Sequence::new();
629
630 client
631 .expect_set_property()
632 .once()
633 .in_sequence(&mut seq)
634 .with(
635 predicate::eq("io.edgehog.devicemanager.OSInfo"),
636 predicate::eq("/osName"),
637 predicate::always(),
638 )
639 .returning(|_, _, _| Ok(()));
640
641 client
642 .expect_set_property()
643 .once()
644 .in_sequence(&mut seq)
645 .with(
646 predicate::eq("io.edgehog.devicemanager.OSInfo"),
647 predicate::eq("/osVersion"),
648 predicate::always(),
649 )
650 .returning(|_, _, _| Ok(()));
651
652 client
653 .expect_set_property()
654 .times(..)
655 .with(
656 predicate::eq("io.edgehog.devicemanager.HardwareInfo"),
657 predicate::always(),
658 predicate::always(),
659 )
660 .returning(|_, _, _| Ok(()));
661
662 mock_runtime_info_telemetry(&mut client, &mut seq);
663
664 client
665 .expect_send_object_with_timestamp()
666 .times(..)
667 .with(
668 predicate::eq("io.edgehog.devicemanager.StorageUsage"),
669 predicate::always(),
670 predicate::always(),
671 predicate::always(),
672 )
673 .returning(|_, _, _, _| Ok(()));
674
675 client
676 .expect_set_property()
677 .times(..)
678 .with(
679 predicate::eq("io.edgehog.devicemanager.NetworkInterfaceProperties"),
680 predicate::always(),
681 predicate::always(),
682 )
683 .returning(|_, _, _| Ok(()));
684
685 client
686 .expect_set_property()
687 .with(
688 predicate::eq("io.edgehog.devicemanager.SystemInfo"),
689 predicate::always(),
690 predicate::always(),
691 )
692 .returning(|_, _, _| Ok(()));
693
694 client
695 .expect_set_property()
696 .with(
697 predicate::eq("io.edgehog.devicemanager.BaseImage"),
698 predicate::always(),
699 predicate::always(),
700 )
701 .returning(|_, _, _| Ok(()));
702
703 client
704 .expect_send_object_with_timestamp()
705 .with(
706 predicate::eq("io.edgehog.devicemanager.SystemStatus"),
707 predicate::eq("/systemStatus"),
708 predicate::always(),
709 predicate::always(),
710 )
711 .returning(|_, _, _, _| Ok(()));
712
713 client
714 };
715
716 let (mut telemetry, _dir) = mock_telemetry(client);
717
718 telemetry.initial_telemetry().await;
719 }
720}