Skip to main content

edgehog_device_runtime/controller/
mod.rs

1// This file is part of Edgehog.
2//
3// Copyright 2024 - 2025 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19#[cfg(feature = "containers")]
20use std::sync::Arc;
21
22use actor::Actor;
23use astarte_device_sdk::client::RecvError;
24use astarte_device_sdk::prelude::PropAccess;
25use astarte_device_sdk::FromEvent;
26use stable_eyre::eyre::Error;
27#[cfg(feature = "containers")]
28use tokio::sync::OnceCell;
29use tokio::{sync::mpsc, task::JoinSet};
30use tokio_util::sync::CancellationToken;
31use tracing::{error, info};
32
33use crate::commands::execute_command;
34use crate::error::DeviceManagerError;
35use crate::telemetry::event::TelemetryEvent;
36use crate::telemetry::Telemetry;
37use crate::{Client, DeviceManagerOptions};
38
39#[cfg(all(feature = "zbus", target_os = "linux"))]
40use crate::led_behavior::{LedBlink, LedEvent};
41#[cfg(all(feature = "zbus", target_os = "linux"))]
42use crate::ota::ota_handler::OtaHandler;
43
44use self::event::RuntimeEvent;
45
46pub mod actor;
47pub mod event;
48
49#[derive(Debug)]
50pub struct Runtime<T> {
51    client: T,
52    telemetry_tx: mpsc::Sender<TelemetryEvent>,
53    #[cfg(feature = "containers")]
54    containers_tx: mpsc::UnboundedSender<Box<edgehog_containers::requests::ContainerRequest>>,
55    #[cfg(feature = "forwarder")]
56    forwarder: crate::forwarder::Forwarder<T>,
57    #[cfg(all(feature = "zbus", target_os = "linux"))]
58    led_tx: mpsc::Sender<LedEvent>,
59    #[cfg(all(feature = "zbus", target_os = "linux"))]
60    ota_handler: OtaHandler,
61}
62
63impl<C> Runtime<C> {
64    pub async fn new(
65        tasks: &mut JoinSet<stable_eyre::Result<()>>,
66        opts: DeviceManagerOptions,
67        client: C,
68        cancel: CancellationToken,
69    ) -> Result<Self, DeviceManagerError>
70    where
71        C: Client + PropAccess + Send + Sync + 'static,
72    {
73        #[cfg(feature = "systemd")]
74        crate::systemd_wrapper::systemd_notify_status("Initializing");
75
76        info!("Initializing");
77
78        // needed for warning
79        #[cfg(not(feature = "service"))]
80        let _ = cancel;
81
82        #[cfg(feature = "containers")]
83        let store = Self::store(&opts.store_directory).await?;
84        #[cfg(feature = "containers")]
85        let container_handle = Arc::new(OnceCell::new());
86
87        #[cfg(all(feature = "zbus", target_os = "linux"))]
88        let ota_handler = OtaHandler::start(tasks, client.clone(), &opts).await?;
89
90        #[cfg(all(feature = "zbus", target_os = "linux"))]
91        let led_tx = {
92            let (led_tx, led_rx) = mpsc::channel(8);
93            tasks.spawn(LedBlink.spawn(led_rx));
94            led_tx
95        };
96
97        let (telemetry_tx, telemetry_rx) = mpsc::channel(8);
98
99        let telemetry = Telemetry::from_config(
100            client.clone(),
101            &opts.telemetry_config.unwrap_or_default(),
102            opts.store_directory.clone(),
103            #[cfg(feature = "containers")]
104            Arc::clone(&container_handle),
105        )
106        .await;
107
108        tasks.spawn(telemetry.spawn(telemetry_rx));
109
110        #[cfg(feature = "containers")]
111        let containers_tx = Self::setup_containers(
112            client.clone(),
113            opts.containers,
114            &store,
115            &container_handle,
116            tasks,
117        )
118        .await?;
119
120        #[cfg(feature = "service")]
121        Self::setup_service(
122            opts.service.unwrap_or_default(),
123            #[cfg(feature = "containers")]
124            &container_handle,
125            tasks,
126            cancel,
127        );
128
129        #[cfg(feature = "forwarder")]
130        // Initialize the forwarder instance
131        let forwarder = crate::forwarder::Forwarder::init(client.clone()).await?;
132
133        Ok(Self {
134            client,
135            telemetry_tx,
136            #[cfg(feature = "containers")]
137            containers_tx,
138            #[cfg(feature = "forwarder")]
139            forwarder,
140            #[cfg(all(feature = "zbus", target_os = "linux"))]
141            led_tx,
142            #[cfg(all(feature = "zbus", target_os = "linux"))]
143            ota_handler,
144        })
145    }
146
147    #[cfg(feature = "containers")]
148    async fn setup_containers(
149        client: C,
150        config: crate::containers::ContainersConfig,
151        store: &edgehog_store::db::Handle,
152        container_handle: &Arc<OnceCell<edgehog_containers::local::ContainerHandle>>,
153        tasks: &mut JoinSet<stable_eyre::Result<()>>,
154    ) -> Result<
155        mpsc::UnboundedSender<Box<edgehog_containers::requests::ContainerRequest>>,
156        DeviceManagerError,
157    >
158    where
159        C: Client + Clone + Send + Sync + 'static,
160    {
161        let (container_tx, container_rx) = mpsc::unbounded_channel();
162
163        let containers = crate::containers::ContainerService::new(
164            client,
165            config,
166            store,
167            container_handle,
168            tasks,
169        )
170        .await?;
171
172        tasks.spawn(containers.spawn_unbounded(container_rx));
173
174        Ok(container_tx)
175    }
176
177    #[cfg(feature = "service")]
178    fn setup_service(
179        config: edgehog_service::config::Config,
180        #[cfg(feature = "containers")] container_handle: &Arc<
181            OnceCell<edgehog_containers::local::ContainerHandle>,
182        >,
183        tasks: &mut JoinSet<stable_eyre::Result<()>>,
184        cancel: CancellationToken,
185    ) where
186        C: Client + Clone + Send + Sync + 'static,
187    {
188        if !config.enabled {
189            use tracing::debug;
190
191            debug!("local service not enabled");
192
193            return;
194        }
195
196        let options = match edgehog_service::service::ServiceOptions::try_from(config) {
197            Ok(opt) => opt,
198            Err(err) => {
199                error!(error = format!("{err:#}"), "invalid service options");
200
201                return;
202            }
203        };
204
205        let service = edgehog_service::service::EdgehogService::new(
206            options,
207            #[cfg(feature = "containers")]
208            Arc::clone(container_handle),
209        );
210
211        tasks.spawn(async {
212            info!("starting local service");
213
214            service.run(cancel).await?;
215
216            Ok(())
217        });
218    }
219
220    pub async fn run(&mut self) -> Result<(), DeviceManagerError>
221    where
222        C: Client + Send + Sync + 'static,
223    {
224        #[cfg(feature = "systemd")]
225        crate::systemd_wrapper::systemd_notify_status("Running");
226
227        info!("Running");
228
229        loop {
230            let event = match self.client.recv().await {
231                Ok(event) => RuntimeEvent::from_event(event)?,
232                Err(RecvError::Disconnected) => {
233                    error!("the Runtime was disconnected");
234
235                    return Ok(());
236                }
237                Err(err) => {
238                    error!("error received: {}", Error::from(err));
239
240                    continue;
241                }
242            };
243
244            self.handle_event(event).await;
245        }
246    }
247
248    async fn handle_event(&mut self, event: RuntimeEvent)
249    where
250        C: Client + Send + Sync + 'static,
251    {
252        match event {
253            RuntimeEvent::Command(cmd) => {
254                #[cfg(all(feature = "zbus", target_os = "linux"))]
255                if cmd.is_reboot() && self.ota_handler.in_progress() {
256                    error!("cannot reboot during OTA update");
257
258                    return;
259                }
260
261                if let Err(err) = execute_command(cmd).await {
262                    error!(
263                        "command failed to execute: {}",
264                        stable_eyre::Report::new(err)
265                    );
266                }
267            }
268            RuntimeEvent::Telemetry(event) => {
269                if self.telemetry_tx.send(event).await.is_err() {
270                    error!("couldn't send the telemetry event");
271                }
272            }
273            #[cfg(all(feature = "zbus", target_os = "linux"))]
274            RuntimeEvent::Led(event) => {
275                if self.led_tx.send(event).await.is_err() {
276                    error!("couldn't send the led event");
277                }
278            }
279            #[cfg(all(feature = "zbus", target_os = "linux"))]
280            RuntimeEvent::Ota(ota) => {
281                if let Err(err) = self.ota_handler.handle_event(ota).await {
282                    error!(
283                        "error while processing ota event {}",
284                        stable_eyre::Report::new(err)
285                    );
286                }
287            }
288            #[cfg(all(feature = "containers", target_os = "linux"))]
289            RuntimeEvent::Container(event) => {
290                if self.containers_tx.send(event).is_err() {
291                    error!("couldn't handle the container event")
292                }
293            }
294            #[cfg(all(feature = "forwarder", target_os = "linux"))]
295            RuntimeEvent::Session(event) => {
296                self.forwarder.handle_sessions(event);
297            }
298        }
299    }
300
301    #[cfg(feature = "containers")]
302    async fn store(
303        store_dir: &std::path::Path,
304    ) -> Result<edgehog_store::db::Handle, edgehog_store::db::HandleError> {
305        use edgehog_store::db::Handle;
306
307        let db_file = store_dir.join("state.db");
308
309        let store = Handle::open(&db_file).await?;
310
311        Ok(store)
312    }
313}