edgehog_device_runtime/controller/
mod.rs1#[cfg(feature = "containers")]
20use std::sync::Arc;
21
22use actor::Actor;
23use astarte_device_sdk::client::RecvError;
24use astarte_device_sdk::prelude::PropAccess;
25use astarte_device_sdk::FromEvent;
26use stable_eyre::eyre::Error;
27#[cfg(feature = "containers")]
28use tokio::sync::OnceCell;
29use tokio::{sync::mpsc, task::JoinSet};
30use tokio_util::sync::CancellationToken;
31use tracing::{error, info};
32
33use crate::commands::execute_command;
34use crate::error::DeviceManagerError;
35use crate::telemetry::event::TelemetryEvent;
36use crate::telemetry::Telemetry;
37use crate::{Client, DeviceManagerOptions};
38
39#[cfg(all(feature = "zbus", target_os = "linux"))]
40use crate::led_behavior::{LedBlink, LedEvent};
41#[cfg(all(feature = "zbus", target_os = "linux"))]
42use crate::ota::ota_handler::OtaHandler;
43
44use self::event::RuntimeEvent;
45
46pub mod actor;
47pub mod event;
48
49#[derive(Debug)]
50pub struct Runtime<T> {
51 client: T,
52 telemetry_tx: mpsc::Sender<TelemetryEvent>,
53 #[cfg(feature = "containers")]
54 containers_tx: mpsc::UnboundedSender<Box<edgehog_containers::requests::ContainerRequest>>,
55 #[cfg(feature = "forwarder")]
56 forwarder: crate::forwarder::Forwarder<T>,
57 #[cfg(all(feature = "zbus", target_os = "linux"))]
58 led_tx: mpsc::Sender<LedEvent>,
59 #[cfg(all(feature = "zbus", target_os = "linux"))]
60 ota_handler: OtaHandler,
61}
62
63impl<C> Runtime<C> {
64 pub async fn new(
65 tasks: &mut JoinSet<stable_eyre::Result<()>>,
66 opts: DeviceManagerOptions,
67 client: C,
68 cancel: CancellationToken,
69 ) -> Result<Self, DeviceManagerError>
70 where
71 C: Client + PropAccess + Send + Sync + 'static,
72 {
73 #[cfg(feature = "systemd")]
74 crate::systemd_wrapper::systemd_notify_status("Initializing");
75
76 info!("Initializing");
77
78 #[cfg(not(feature = "service"))]
80 let _ = cancel;
81
82 #[cfg(feature = "containers")]
83 let store = Self::store(&opts.store_directory).await?;
84 #[cfg(feature = "containers")]
85 let container_handle = Arc::new(OnceCell::new());
86
87 #[cfg(all(feature = "zbus", target_os = "linux"))]
88 let ota_handler = OtaHandler::start(tasks, client.clone(), &opts).await?;
89
90 #[cfg(all(feature = "zbus", target_os = "linux"))]
91 let led_tx = {
92 let (led_tx, led_rx) = mpsc::channel(8);
93 tasks.spawn(LedBlink.spawn(led_rx));
94 led_tx
95 };
96
97 let (telemetry_tx, telemetry_rx) = mpsc::channel(8);
98
99 let telemetry = Telemetry::from_config(
100 client.clone(),
101 &opts.telemetry_config.unwrap_or_default(),
102 opts.store_directory.clone(),
103 #[cfg(feature = "containers")]
104 Arc::clone(&container_handle),
105 )
106 .await;
107
108 tasks.spawn(telemetry.spawn(telemetry_rx));
109
110 #[cfg(feature = "containers")]
111 let containers_tx = Self::setup_containers(
112 client.clone(),
113 opts.containers,
114 &store,
115 &container_handle,
116 tasks,
117 )
118 .await?;
119
120 #[cfg(feature = "service")]
121 Self::setup_service(
122 opts.service.unwrap_or_default(),
123 #[cfg(feature = "containers")]
124 &container_handle,
125 tasks,
126 cancel,
127 );
128
129 #[cfg(feature = "forwarder")]
130 let forwarder = crate::forwarder::Forwarder::init(client.clone()).await?;
132
133 Ok(Self {
134 client,
135 telemetry_tx,
136 #[cfg(feature = "containers")]
137 containers_tx,
138 #[cfg(feature = "forwarder")]
139 forwarder,
140 #[cfg(all(feature = "zbus", target_os = "linux"))]
141 led_tx,
142 #[cfg(all(feature = "zbus", target_os = "linux"))]
143 ota_handler,
144 })
145 }
146
147 #[cfg(feature = "containers")]
148 async fn setup_containers(
149 client: C,
150 config: crate::containers::ContainersConfig,
151 store: &edgehog_store::db::Handle,
152 container_handle: &Arc<OnceCell<edgehog_containers::local::ContainerHandle>>,
153 tasks: &mut JoinSet<stable_eyre::Result<()>>,
154 ) -> Result<
155 mpsc::UnboundedSender<Box<edgehog_containers::requests::ContainerRequest>>,
156 DeviceManagerError,
157 >
158 where
159 C: Client + Clone + Send + Sync + 'static,
160 {
161 let (container_tx, container_rx) = mpsc::unbounded_channel();
162
163 let containers = crate::containers::ContainerService::new(
164 client,
165 config,
166 store,
167 container_handle,
168 tasks,
169 )
170 .await?;
171
172 tasks.spawn(containers.spawn_unbounded(container_rx));
173
174 Ok(container_tx)
175 }
176
177 #[cfg(feature = "service")]
178 fn setup_service(
179 config: edgehog_service::config::Config,
180 #[cfg(feature = "containers")] container_handle: &Arc<
181 OnceCell<edgehog_containers::local::ContainerHandle>,
182 >,
183 tasks: &mut JoinSet<stable_eyre::Result<()>>,
184 cancel: CancellationToken,
185 ) where
186 C: Client + Clone + Send + Sync + 'static,
187 {
188 if !config.enabled {
189 use tracing::debug;
190
191 debug!("local service not enabled");
192
193 return;
194 }
195
196 let options = match edgehog_service::service::ServiceOptions::try_from(config) {
197 Ok(opt) => opt,
198 Err(err) => {
199 error!(error = format!("{err:#}"), "invalid service options");
200
201 return;
202 }
203 };
204
205 let service = edgehog_service::service::EdgehogService::new(
206 options,
207 #[cfg(feature = "containers")]
208 Arc::clone(container_handle),
209 );
210
211 tasks.spawn(async {
212 info!("starting local service");
213
214 service.run(cancel).await?;
215
216 Ok(())
217 });
218 }
219
220 pub async fn run(&mut self) -> Result<(), DeviceManagerError>
221 where
222 C: Client + Send + Sync + 'static,
223 {
224 #[cfg(feature = "systemd")]
225 crate::systemd_wrapper::systemd_notify_status("Running");
226
227 info!("Running");
228
229 loop {
230 let event = match self.client.recv().await {
231 Ok(event) => RuntimeEvent::from_event(event)?,
232 Err(RecvError::Disconnected) => {
233 error!("the Runtime was disconnected");
234
235 return Ok(());
236 }
237 Err(err) => {
238 error!("error received: {}", Error::from(err));
239
240 continue;
241 }
242 };
243
244 self.handle_event(event).await;
245 }
246 }
247
248 async fn handle_event(&mut self, event: RuntimeEvent)
249 where
250 C: Client + Send + Sync + 'static,
251 {
252 match event {
253 RuntimeEvent::Command(cmd) => {
254 #[cfg(all(feature = "zbus", target_os = "linux"))]
255 if cmd.is_reboot() && self.ota_handler.in_progress() {
256 error!("cannot reboot during OTA update");
257
258 return;
259 }
260
261 if let Err(err) = execute_command(cmd).await {
262 error!(
263 "command failed to execute: {}",
264 stable_eyre::Report::new(err)
265 );
266 }
267 }
268 RuntimeEvent::Telemetry(event) => {
269 if self.telemetry_tx.send(event).await.is_err() {
270 error!("couldn't send the telemetry event");
271 }
272 }
273 #[cfg(all(feature = "zbus", target_os = "linux"))]
274 RuntimeEvent::Led(event) => {
275 if self.led_tx.send(event).await.is_err() {
276 error!("couldn't send the led event");
277 }
278 }
279 #[cfg(all(feature = "zbus", target_os = "linux"))]
280 RuntimeEvent::Ota(ota) => {
281 if let Err(err) = self.ota_handler.handle_event(ota).await {
282 error!(
283 "error while processing ota event {}",
284 stable_eyre::Report::new(err)
285 );
286 }
287 }
288 #[cfg(all(feature = "containers", target_os = "linux"))]
289 RuntimeEvent::Container(event) => {
290 if self.containers_tx.send(event).is_err() {
291 error!("couldn't handle the container event")
292 }
293 }
294 #[cfg(all(feature = "forwarder", target_os = "linux"))]
295 RuntimeEvent::Session(event) => {
296 self.forwarder.handle_sessions(event);
297 }
298 }
299 }
300
301 #[cfg(feature = "containers")]
302 async fn store(
303 store_dir: &std::path::Path,
304 ) -> Result<edgehog_store::db::Handle, edgehog_store::db::HandleError> {
305 use edgehog_store::db::Handle;
306
307 let db_file = store_dir.join("state.db");
308
309 let store = Handle::open(&db_file).await?;
310
311 Ok(store)
312 }
313}