Skip to main content

edgehog_device_runtime_containers/service/
events.rs

1// This file is part of Edgehog.
2//
3// Copyright 2025 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Handles incoming events from Astarte.
20//!
21//! When an event is received it will be handled by storing it to the [`StateStore`] and then the
22//! [Service](super::Service) will be notified.
23
24use edgehog_store::models::containers::deployment::DeploymentStatus;
25use tokio::sync::mpsc;
26use tracing::{error, instrument};
27use uuid::Uuid;
28
29use crate::{
30    events::deployment::{DeploymentEvent, EventStatus},
31    properties::Client,
32    requests::{
33        deployment::{DeploymentCommand, DeploymentUpdate},
34        ContainerRequest,
35    },
36    store::StateStore,
37};
38
39/// Error returned by the [`ServiceHandle`]
40#[derive(Debug, thiserror::Error, displaydoc::Display)]
41pub enum EventError {
42    /// couldn't handle the event since the service exited.
43    Disconnected,
44}
45
46use super::{CommandValue, Id, ResourceType};
47
48/// Handle to the [container service](super::Service).
49#[derive(Debug)]
50pub struct ServiceHandle<D> {
51    /// Queue of events received from Astarte.
52    events: mpsc::UnboundedSender<ContainerEvent>,
53    device: D,
54    store: StateStore,
55}
56
57impl<D> ServiceHandle<D> {
58    /// Create the handle from the [channel](mpsc::UnboundedSender) shared with the [`Service`](super::Service).
59    pub fn new(
60        device: D,
61        store: StateStore,
62        events: mpsc::UnboundedSender<ContainerEvent>,
63    ) -> Self {
64        Self {
65            events,
66            device,
67            store,
68        }
69    }
70
71    /// Handles an event from the image.
72    #[instrument(skip_all)]
73    pub async fn on_event(&mut self, request: ContainerRequest) -> Result<(), EventError>
74    where
75        D: Client + Sync + 'static,
76    {
77        let event = ContainerEvent::from(&request);
78
79        let deployment_id = request.deployment_id();
80
81        self.persist_request(deployment_id, request).await;
82
83        self.events.send(event).map_err(|_err| {
84            error!("the container service disconnected");
85
86            EventError::Disconnected
87        })?;
88
89        Ok(())
90    }
91
92    #[instrument(skip_all, fields(deployment_id))]
93    async fn persist_request(&mut self, deployment_id: Uuid, request: ContainerRequest)
94    where
95        D: Client + Sync + 'static,
96    {
97        let res = match request {
98            ContainerRequest::Image(create_image) => self.store.create_image(create_image).await,
99            ContainerRequest::Volume(create_volume) => {
100                self.store.create_volume(create_volume).await
101            }
102            ContainerRequest::Network(create_network) => {
103                self.store.create_network(create_network).await
104            }
105            ContainerRequest::DeviceMapping(create_device_mapping) => {
106                self.store
107                    .create_device_mapping(create_device_mapping)
108                    .await
109            }
110            ContainerRequest::Container(create_container) => {
111                self.store.create_container(create_container).await
112            }
113            ContainerRequest::Deployment(create_deployment) => {
114                self.store.create_deployment(create_deployment).await
115            }
116            ContainerRequest::DeploymentCommand(DeploymentCommand { id, command }) => {
117                self.store
118                    .update_deployment_status(id, command.into())
119                    .await
120            }
121            ContainerRequest::DeploymentUpdate(DeploymentUpdate { from, to }) => {
122                self.store.deployment_update(from, to).await
123            }
124        };
125
126        if let Err(err) = res {
127            let error = format!("{:#}", eyre::Report::new(err));
128
129            error!(%error, "couldn't store request");
130
131            DeploymentEvent::new(EventStatus::Error, error)
132                .send(&deployment_id, &mut self.device)
133                .await;
134        }
135    }
136}
137
138/// Event sent by the [`ServiceHandle`] to the [`Service`](crate::service::Service)
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub enum ContainerEvent {
141    /// Resource creation request event.
142    Resource {
143        /// Unique ID of the resource
144        resource: Id,
145        /// Deployment ID of the request
146        deployment: Uuid,
147    },
148    /// Deployment command event.
149    DeploymentCmd(DeploymentCommand),
150    /// Deployment update event.
151    DeploymentUpdate(DeploymentUpdate),
152    /// Container runtime event happened.
153    ///
154    /// We need to handle deletion or a container stopped outside of Edgehog.
155    Refresh(Id),
156}
157
158impl From<&ContainerRequest> for ContainerEvent {
159    fn from(value: &ContainerRequest) -> Self {
160        match value {
161            ContainerRequest::Image(create_image) => {
162                let resource = Id::new(ResourceType::Image, create_image.id.0);
163
164                ContainerEvent::Resource {
165                    resource,
166                    deployment: create_image.deployment_id.0,
167                }
168            }
169            ContainerRequest::Volume(create_volume) => {
170                let resource = Id::new(ResourceType::Volume, create_volume.id.0);
171
172                ContainerEvent::Resource {
173                    resource,
174                    deployment: create_volume.deployment_id.0,
175                }
176            }
177            ContainerRequest::Network(create_network) => {
178                let resource = Id::new(ResourceType::Network, create_network.id.0);
179
180                ContainerEvent::Resource {
181                    resource,
182                    deployment: create_network.deployment_id.0,
183                }
184            }
185            ContainerRequest::DeviceMapping(create_device_mapping) => {
186                let resource = Id::new(ResourceType::DeviceMapping, create_device_mapping.id.0);
187
188                ContainerEvent::Resource {
189                    resource,
190                    deployment: create_device_mapping.deployment_id.0,
191                }
192            }
193            ContainerRequest::Container(create_container) => {
194                let resource = Id::new(ResourceType::Container, create_container.id.0);
195
196                ContainerEvent::Resource {
197                    resource,
198                    deployment: create_container.deployment_id.0,
199                }
200            }
201            ContainerRequest::Deployment(create_deployment) => {
202                let resource = Id::new(ResourceType::Deployment, create_deployment.id.0);
203
204                ContainerEvent::Resource {
205                    resource,
206                    deployment: create_deployment.id.0,
207                }
208            }
209            ContainerRequest::DeploymentCommand(cmd) => Self::DeploymentCmd(*cmd),
210            ContainerRequest::DeploymentUpdate(update) => Self::DeploymentUpdate(*update),
211        }
212    }
213}
214
215impl From<CommandValue> for DeploymentStatus {
216    fn from(value: CommandValue) -> Self {
217        match value {
218            CommandValue::Start => Self::Started,
219            CommandValue::Stop => Self::Stopped,
220            CommandValue::Delete => Self::Deleted,
221        }
222    }
223}