Skip to main content

edgehog_device_runtime_containers/service/
mod.rs

1// This file is part of Edgehog.
2//
3// Copyright 2024 - 2025 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Service to receive and handle the Astarte events.
20
21use std::fmt::{Debug, Display};
22
23use astarte_device_sdk::event::FromEventError;
24use edgehog_store::{conversions::SqlUuid, models::containers::deployment::DeploymentStatus};
25use tokio::sync::mpsc;
26use tracing::{debug, error, info, instrument, warn};
27use uuid::Uuid;
28
29use crate::{
30    error::DockerError,
31    events::deployment::{DeploymentEvent, EventStatus},
32    properties::{deployment::AvailableDeployment, AvailableProp, Client},
33    requests::{
34        deployment::{CommandValue, DeploymentCommand, DeploymentUpdate},
35        ReqError,
36    },
37    resource::{
38        container::ContainerResource, deployment::Deployment,
39        device_mapping::DeviceMappingResource, image::ImageResource, network::NetworkResource,
40        volume::VolumeResource, Context, Create, Resource, ResourceError, State,
41    },
42    store::{StateStore, StoreError},
43    Docker,
44};
45
46use self::events::ContainerEvent;
47
48pub mod events;
49
50type Result<T> = std::result::Result<T, ServiceError>;
51
52/// Error from the [`Service`].
53#[non_exhaustive]
54#[derive(Debug, displaydoc::Display, thiserror::Error)]
55pub enum ServiceError {
56    /// error converting event
57    FromEvent(#[from] FromEventError),
58    /// docker operation failed
59    Docker(#[source] DockerError),
60    /// couldn't send data to Astarte
61    Astarte(#[from] astarte_device_sdk::Error),
62    /// couldn't process request
63    Request(#[from] ReqError),
64    /// store operation failed
65    Store(#[from] StoreError),
66    /// couldn't complete operation on a resource
67    Resource(#[from] ResourceError),
68}
69
70impl<T> From<T> for ServiceError
71where
72    T: Into<DockerError>,
73{
74    fn from(value: T) -> Self {
75        ServiceError::Docker(value.into())
76    }
77}
78
79/// Manages the state of the Nodes.
80///
81/// It handles the events received from Astarte, storing and updating the new container resources
82/// and commands that are received by the Runtime.
83#[derive(Debug)]
84pub struct Service<D> {
85    client: Docker,
86    device: D,
87    /// Queue of events received from Astarte.
88    events: mpsc::UnboundedReceiver<ContainerEvent>,
89    store: StateStore,
90}
91
92impl<D> Service<D> {
93    /// Create a new service
94    #[doc(hidden)]
95    #[must_use]
96    pub fn new(
97        client: Docker,
98        device: D,
99        events: mpsc::UnboundedReceiver<ContainerEvent>,
100        store: StateStore,
101    ) -> Self {
102        Self {
103            client,
104            device,
105            events,
106            store,
107        }
108    }
109
110    fn context(&mut self, id: impl Into<Uuid>) -> Context<'_, D> {
111        Context {
112            id: id.into(),
113            store: &mut self.store,
114            device: &mut self.device,
115            client: &mut self.client,
116        }
117    }
118
119    /// Initialize the service, it will load all the already stored properties
120    #[instrument(skip_all)]
121    pub async fn init(&mut self) -> Result<()>
122    where
123        D: Client + Send + Sync + 'static,
124    {
125        self.publish_received().await?;
126
127        // Delete and stop must be before the start to ensure the ports and other resources are
128        // freed before starting the containers.
129        self.init_delete_deployments().await?;
130        self.init_stop_deployments().await?;
131        self.init_start_deployments().await?;
132
133        info!("init completed");
134
135        Ok(())
136    }
137
138    #[instrument(skip_all)]
139    async fn publish_received(&mut self) -> Result<()>
140    where
141        D: Client + Send + Sync + 'static,
142    {
143        for id in self.store.load_images_to_publish().await? {
144            ImageResource::publish(self.context(id)).await?;
145        }
146
147        for id in self.store.load_volumes_to_publish().await? {
148            VolumeResource::publish(self.context(id)).await?;
149        }
150
151        for id in self.store.load_networks_to_publish().await? {
152            NetworkResource::publish(self.context(id)).await?;
153        }
154
155        for id in self.store.load_device_mappings_to_publish().await? {
156            DeviceMappingResource::publish(self.context(id)).await?;
157        }
158
159        for id in self.store.load_containers_to_publish().await? {
160            ContainerResource::publish(self.context(id)).await?;
161        }
162
163        for id in self
164            .store
165            .load_deployments_in(DeploymentStatus::Received)
166            .await?
167        {
168            Deployment::publish(self.context(id)).await?;
169        }
170
171        Ok(())
172    }
173
174    #[instrument(skip_all)]
175    async fn init_start_deployments(&mut self) -> Result<()>
176    where
177        D: Client + Send + Sync + 'static,
178    {
179        for id in self
180            .store
181            .load_deployments_in(DeploymentStatus::Started)
182            .await?
183        {
184            self.start(*id).await;
185        }
186
187        Ok(())
188    }
189
190    #[instrument(skip_all)]
191    async fn init_stop_deployments(&mut self) -> Result<()>
192    where
193        D: Client + Send + Sync + 'static,
194    {
195        for id in self
196            .store
197            .load_deployments_in(DeploymentStatus::Stopped)
198            .await?
199        {
200            self.stop(*id).await;
201        }
202
203        Ok(())
204    }
205
206    #[instrument(skip_all)]
207    async fn init_delete_deployments(&mut self) -> Result<()>
208    where
209        D: Client + Send + Sync + 'static,
210    {
211        for id in self
212            .store
213            .load_deployments_in(DeploymentStatus::Deleted)
214            .await?
215        {
216            self.delete(*id).await;
217        }
218
219        Ok(())
220    }
221
222    /// Blocking call that will handle the events from Astarte and the containers.
223    #[instrument(skip_all)]
224    pub async fn handle_events(&mut self)
225    where
226        D: Client + Send + Sync + 'static,
227    {
228        while let Some(event) = self.events.recv().await {
229            self.on_event(event).await;
230        }
231
232        info!("event receiver disconnected");
233    }
234
235    #[instrument(skip_all)]
236    async fn on_event(&mut self, event: ContainerEvent)
237    where
238        D: Client + Send + Sync + 'static,
239    {
240        match event {
241            ContainerEvent::Resource {
242                resource,
243                deployment,
244            } => {
245                self.resource_req(resource, deployment).await;
246            }
247            ContainerEvent::DeploymentCmd(DeploymentCommand {
248                id,
249                command: CommandValue::Start,
250            }) => {
251                self.start(id).await;
252            }
253            ContainerEvent::DeploymentCmd(DeploymentCommand {
254                id,
255                command: CommandValue::Stop,
256            }) => {
257                self.stop(id).await;
258            }
259            ContainerEvent::DeploymentCmd(DeploymentCommand {
260                id,
261                command: CommandValue::Delete,
262            }) => {
263                self.delete(id).await;
264            }
265            ContainerEvent::DeploymentUpdate(from_to) => {
266                self.update(from_to).await;
267            }
268            ContainerEvent::Refresh(id) => {
269                self.refresh(id).await;
270            }
271        }
272    }
273
274    #[instrument(skip_all, fields(%id))]
275    async fn resource_req(&mut self, id: Id, deployment_id: Uuid)
276    where
277        D: Client + Send + Sync + 'static,
278    {
279        let res = match id.resource_type() {
280            ResourceType::Image => ImageResource::publish(self.context(*id.uuid())).await,
281            ResourceType::Volume => VolumeResource::publish(self.context(*id.uuid())).await,
282            ResourceType::Network => NetworkResource::publish(self.context(*id.uuid())).await,
283            ResourceType::DeviceMapping => {
284                DeviceMappingResource::publish(self.context(*id.uuid())).await
285            }
286            ResourceType::Container => ContainerResource::publish(self.context(*id.uuid())).await,
287            ResourceType::Deployment => Deployment::publish(self.context(*id.uuid())).await,
288        };
289
290        if let Err(err) = res {
291            let error = format!("{:#}", eyre::Report::new(err));
292            error!(error, "failed to create resource");
293
294            DeploymentEvent::new(EventStatus::Error, error)
295                .send(&deployment_id, &mut self.device)
296                .await;
297        }
298    }
299
300    /// Will start a Deployment
301    #[instrument(skip(self))]
302    pub async fn start(&mut self, id: Uuid)
303    where
304        D: Client + Send + Sync + 'static,
305    {
306        let deployment = match self.store.find_complete_deployment(id).await {
307            Ok(Some(deployment)) => deployment,
308            Ok(None) => {
309                error!("{id} not found");
310
311                DeploymentEvent::new(EventStatus::Error, format!("{id} not found"))
312                    .send(&id, &mut self.device)
313                    .await;
314
315                return;
316            }
317            Err(err) => {
318                let err = format!("{:#}", eyre::Report::new(err));
319
320                error!(error = err, "couldn't start deployment");
321
322                DeploymentEvent::new(EventStatus::Error, err)
323                    .send(&id, &mut self.device)
324                    .await;
325
326                return;
327            }
328        };
329
330        info!("starting deployment");
331
332        DeploymentEvent::new(EventStatus::Starting, "")
333            .send(&id, &mut self.device)
334            .await;
335
336        if let Err(err) = self.start_deployment(id, deployment).await {
337            let err = format!("{:#}", eyre::Report::new(err));
338
339            error!(error = err, "couldn't start deployment");
340
341            DeploymentEvent::new(EventStatus::Error, err)
342                .send(&id, &mut self.device)
343                .await;
344
345            return;
346        }
347
348        DeploymentEvent::new(EventStatus::Started, "")
349            .send(&id, &mut self.device)
350            .await;
351
352        info!("deployment started");
353    }
354
355    async fn start_deployment(&mut self, deployment_id: Uuid, deployment: Deployment) -> Result<()>
356    where
357        D: Client + Send + Sync + 'static,
358    {
359        for id in deployment.images {
360            ImageResource::up(self.context(id)).await?;
361        }
362
363        for id in deployment.volumes {
364            VolumeResource::up(self.context(id)).await?;
365        }
366
367        for id in deployment.networks {
368            NetworkResource::up(self.context(id)).await?;
369        }
370
371        for id in deployment.containers {
372            let mut container = ContainerResource::up(self.context(id)).await?;
373
374            container.start(self.context(id)).await?;
375        }
376
377        AvailableDeployment::new(&deployment_id)
378            .send(
379                &mut self.device,
380                crate::properties::deployment::DeploymentStatus::Started,
381            )
382            .await
383            .map_err(ResourceError::Property)?;
384
385        Ok(())
386    }
387
388    /// Will stop an application
389    #[instrument(skip(self))]
390    pub async fn stop(&mut self, id: Uuid)
391    where
392        D: Client + Send + Sync + 'static,
393    {
394        let containers = match self.store.load_deployment_containers(id).await {
395            Ok(Some(containers)) => containers,
396            Ok(None) => {
397                error!("{id} not found");
398
399                DeploymentEvent::new(EventStatus::Error, format!("{id} not found"))
400                    .send(&id, &mut self.device)
401                    .await;
402
403                return;
404            }
405            Err(err) => {
406                let err = format!("{:#}", eyre::Report::new(err));
407
408                error!(error = err, "couldn't start deployment");
409
410                DeploymentEvent::new(EventStatus::Error, err)
411                    .send(&id, &mut self.device)
412                    .await;
413
414                return;
415            }
416        };
417
418        info!("stopping deployment");
419
420        DeploymentEvent::new(EventStatus::Stopping, "")
421            .send(&id, &mut self.device)
422            .await;
423
424        if let Err(err) = self.stop_deployment(id, containers).await {
425            let err = format!("{:#}", eyre::Report::new(err));
426
427            error!(error = err, "couldn't stop deployment");
428
429            DeploymentEvent::new(EventStatus::Error, err)
430                .send(&id, &mut self.device)
431                .await;
432
433            return;
434        }
435
436        DeploymentEvent::new(EventStatus::Stopped, "")
437            .send(&id, &mut self.device)
438            .await;
439
440        info!("deployment stopped");
441    }
442
443    #[instrument(skip(self, containers))]
444    async fn stop_deployment(&mut self, deployment: Uuid, containers: Vec<SqlUuid>) -> Result<()>
445    where
446        D: Client + Send + Sync + 'static,
447    {
448        for id in containers {
449            debug!(%id, "stopping container");
450
451            let mut ctx = self.context(id);
452            let (state, mut container) = ContainerResource::fetch(&mut ctx).await?;
453
454            match state {
455                State::Missing => {
456                    warn!(%id, "container already missing, cannot stop");
457
458                    continue;
459                }
460                State::Created => {}
461            }
462
463            container.stop(ctx).await?;
464        }
465
466        AvailableDeployment::new(&deployment)
467            .send(
468                &mut self.device,
469                crate::properties::deployment::DeploymentStatus::Stopped,
470            )
471            .await
472            .map_err(ResourceError::from)?;
473
474        Ok(())
475    }
476
477    /// Will delete an application
478    #[instrument(skip(self))]
479    pub async fn delete(&mut self, id: Uuid)
480    where
481        D: Client + Send + Sync + 'static,
482    {
483        let deployment = match self.store.find_deployment_for_delete(id).await {
484            Ok(Some(deployment)) => deployment,
485            Ok(None) => {
486                error!("{id} not found");
487
488                DeploymentEvent::new(EventStatus::Error, format!("{id} not found"))
489                    .send(&id, &mut self.device)
490                    .await;
491
492                return;
493            }
494            Err(err) => {
495                let err = format!("{:#}", eyre::Report::new(err));
496
497                error!(error = err, "couldn't delete deployment");
498
499                DeploymentEvent::new(EventStatus::Error, err)
500                    .send(&id, &mut self.device)
501                    .await;
502
503                return;
504            }
505        };
506
507        info!("deleting deployment");
508
509        DeploymentEvent::new(EventStatus::Deleting, "")
510            .send(&id, &mut self.device)
511            .await;
512
513        if let Err(err) = self.delete_deployment(id, deployment).await {
514            let err = format!("{:#}", eyre::Report::new(err));
515
516            error!(error = err, "couldn't delete deployment");
517
518            DeploymentEvent::new(EventStatus::Error, err)
519                .send(&id, &mut self.device)
520                .await;
521
522            return;
523        }
524
525        info!("deployment deleted");
526    }
527
528    async fn delete_deployment(&mut self, deployment_id: Uuid, deployment: Deployment) -> Result<()>
529    where
530        D: Client + Send + Sync + 'static,
531    {
532        for id in deployment.containers {
533            ContainerResource::down(self.context(id)).await?;
534        }
535
536        for id in deployment.volumes {
537            VolumeResource::down(self.context(id)).await?;
538        }
539
540        for id in deployment.networks {
541            NetworkResource::down(self.context(id)).await?;
542        }
543
544        for id in deployment.images {
545            ImageResource::down(self.context(id)).await?;
546        }
547
548        for id in deployment.device_mapping {
549            self.store.delete_device_mapping(id).await?;
550        }
551
552        AvailableDeployment::new(&deployment_id)
553            .unset(&mut self.device)
554            .await
555            .map_err(ResourceError::from)?;
556
557        self.store.delete_deployment(deployment_id).await?;
558
559        Ok(())
560    }
561
562    /// Will update an application between deployments
563    #[instrument(skip(self))]
564    pub async fn update(&mut self, bundle: DeploymentUpdate)
565    where
566        D: Client + Send + Sync + 'static,
567    {
568        let from_deployment = match self
569            .store
570            .load_deployment_containers_update_from(bundle)
571            .await
572        {
573            Ok(Some(deployment)) => deployment,
574            Ok(None) => {
575                let msg = format!("{} not found", bundle.from);
576                error!("{msg}");
577
578                DeploymentEvent::new(EventStatus::Error, msg)
579                    .send(&bundle.from, &mut self.device)
580                    .await;
581
582                return;
583            }
584            Err(err) => {
585                let err = format!("{:#}", eyre::Report::new(err));
586
587                error!(error = err, "couldn't update deployment");
588
589                DeploymentEvent::new(EventStatus::Error, err)
590                    .send(&bundle.from, &mut self.device)
591                    .await;
592
593                return;
594            }
595        };
596
597        let to_deployment = match self.store.find_complete_deployment(bundle.to).await {
598            Ok(Some(deployment)) => deployment,
599            Ok(None) => {
600                let msg = format!("{} not found", bundle.to);
601                error!("{msg}");
602
603                DeploymentEvent::new(EventStatus::Error, msg)
604                    .send(&bundle.to, &mut self.device)
605                    .await;
606
607                return;
608            }
609            Err(err) => {
610                let err = format!("{:#}", eyre::Report::new(err));
611
612                error!(error = err, "couldn't update deployment");
613
614                DeploymentEvent::new(EventStatus::Error, err)
615                    .send(&bundle.to, &mut self.device)
616                    .await;
617
618                return;
619            }
620        };
621
622        info!("updating deployment");
623
624        DeploymentEvent::new(EventStatus::Updating, "")
625            .send(&bundle.from, &mut self.device)
626            .await;
627
628        // TODO: consider if it's necessary re-start the `from` containers or a retry logic
629        if let Err(err) = self
630            .update_deployment(bundle, from_deployment, to_deployment)
631            .await
632        {
633            let err = format!("{:#}", eyre::Report::new(err));
634
635            error!(error = err, "couldn't update deployment");
636
637            DeploymentEvent::new(EventStatus::Error, err)
638                .send(&bundle.from, &mut self.device)
639                .await;
640
641            return;
642        }
643
644        info!("deployment updated");
645    }
646
647    async fn update_deployment(
648        &mut self,
649        bundle: DeploymentUpdate,
650        to_stop: Vec<SqlUuid>,
651        to_start: Deployment,
652    ) -> Result<()>
653    where
654        D: Client + Send + Sync + 'static,
655    {
656        self.stop_deployment(bundle.from, to_stop).await?;
657
658        self.start_deployment(bundle.to, to_start).await?;
659
660        Ok(())
661    }
662
663    #[instrument(skip(self))]
664    async fn refresh(&mut self, id: Id)
665    where
666        D: Client + Send + Sync + 'static,
667    {
668        let mut ctx = self.context(*id.uuid());
669        let res = match id.resource_type() {
670            ResourceType::Image => ImageResource::refresh(&mut ctx).await,
671            ResourceType::Volume => VolumeResource::refresh(&mut ctx).await,
672            ResourceType::Network => NetworkResource::refresh(&mut ctx).await,
673            ResourceType::Container => ContainerResource::refresh(&mut ctx).await,
674            ResourceType::Deployment | ResourceType::DeviceMapping => {
675                debug!("nothing to refresh");
676
677                return;
678            }
679        };
680
681        if let Err(err) = res {
682            error!(
683                error = format!("{:#}", eyre::Report::new(err)),
684                "couldn't refresh resource status"
685            );
686        }
687    }
688}
689
690/// Id of the nodes in the Service graph
691#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
692pub struct Id {
693    rt: ResourceType,
694    id: Uuid,
695}
696
697impl Id {
698    /// Create a new ID
699    pub fn new(rt: ResourceType, id: Uuid) -> Self {
700        Self { rt, id }
701    }
702
703    pub(crate) fn uuid(&self) -> &Uuid {
704        &self.id
705    }
706
707    fn resource_type(&self) -> ResourceType {
708        self.rt
709    }
710}
711
712impl Display for Id {
713    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
714        write!(f, "{} ({})", self.rt, self.id)
715    }
716}
717
718/// Type of the container resource [`Id`]
719#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
720pub enum ResourceType {
721    /// Image resource.
722    Image,
723    /// Volume resource.
724    Volume,
725    /// Network resource.
726    Network,
727    /// Device mapping resource.
728    DeviceMapping,
729    /// Container resource.
730    Container,
731    /// Deployment resource.
732    Deployment,
733}
734
735impl Display for ResourceType {
736    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
737        match self {
738            ResourceType::Image => write!(f, "Image"),
739            ResourceType::Volume => write!(f, "Volume"),
740            ResourceType::Network => write!(f, "Network"),
741            ResourceType::DeviceMapping => write!(f, "DeviceMapping"),
742            ResourceType::Container => write!(f, "Container"),
743            ResourceType::Deployment => write!(f, "Deployment"),
744        }
745    }
746}
747
748#[cfg(test)]
749mod tests {
750    use std::collections::HashMap;
751    use std::vec;
752
753    use astarte_device_sdk::aggregate::AstarteObject;
754    use astarte_device_sdk::store::SqliteStore;
755    use astarte_device_sdk::transport::mqtt::Mqtt;
756    use astarte_device_sdk::{AstarteData, FromEvent};
757    use astarte_device_sdk_mock::mockall::Sequence;
758    use astarte_device_sdk_mock::MockDeviceClient;
759    use bollard::query_parameters::CreateImageOptions;
760    use edgehog_store::db;
761    use mockall::predicate;
762    use pretty_assertions::assert_eq;
763    use tempfile::TempDir;
764
765    use crate::container::{Binding, Container, ContainerId, DeviceMapping, PortBindingMap};
766    use crate::image::Image;
767    use crate::network::{Network, NetworkId};
768    use crate::properties::container::ContainerStatus;
769    use crate::properties::deployment::DeploymentStatus;
770    use crate::requests::container::tests::create_container_request_event;
771    use crate::requests::container::RestartPolicy;
772    use crate::requests::deployment::tests::create_deployment_request_event;
773    use crate::requests::device_mapping::tests::create_device_mapping_request_event;
774    use crate::requests::image::tests::create_image_request_event;
775    use crate::requests::network::tests::create_network_request_event;
776    use crate::requests::volume::tests::create_volume_request_event;
777    use crate::requests::ContainerRequest;
778    use crate::volume::{Volume, VolumeId};
779    use crate::{docker, docker_mock};
780
781    use super::events::ServiceHandle;
782    use super::*;
783
784    async fn mock_service(
785        tempdir: &TempDir,
786        client: Docker,
787        device: MockDeviceClient<Mqtt<SqliteStore>>,
788    ) -> (
789        Service<MockDeviceClient<Mqtt<SqliteStore>>>,
790        ServiceHandle<MockDeviceClient<Mqtt<SqliteStore>>>,
791    ) {
792        let db_file = tempdir.path().join("state.db");
793        let db_file = db_file.to_str().unwrap();
794
795        let handle = db::Handle::open(db_file).await.unwrap();
796        let store = StateStore::new(handle);
797
798        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
799
800        let handle = ServiceHandle::new(device.clone(), store.clone(), tx);
801        let service = Service::new(client, device, rx, store);
802
803        (service, handle)
804    }
805
806    #[tokio::test]
807    async fn should_add_an_image() {
808        let tmpdir = TempDir::new().unwrap();
809
810        let id = Uuid::new_v4();
811        let deployment_id = Uuid::new_v4();
812
813        let client = Docker::connect().await.unwrap();
814        let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
815        let mut seq = Sequence::new();
816
817        device
818            .expect_clone()
819            .once()
820            .in_sequence(&mut seq)
821            .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
822
823        let image_path = format!("/{id}/pulled");
824        device
825            .expect_set_property()
826            .once()
827            .in_sequence(&mut seq)
828            .with(
829                predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
830                predicate::eq(image_path),
831                predicate::eq(AstarteData::Boolean(false)),
832            )
833            .returning(|_, _, _| Ok(()));
834
835        let (mut service, mut handle) = mock_service(&tmpdir, client, device).await;
836
837        let reference = "docker.io/nginx:stable-alpine-slim";
838        let create_image_req = create_image_request_event(id, deployment_id, reference, "");
839
840        let req = ContainerRequest::from_event(create_image_req).unwrap();
841
842        handle.on_event(req).await.unwrap();
843
844        let event = service.events.recv().await.unwrap();
845        service.on_event(event).await;
846
847        let resource = service.store.find_image(id).await.unwrap().unwrap();
848
849        let exp = Image::new(None, reference.to_string(), None);
850
851        assert_eq!(resource.image, exp);
852    }
853
854    #[tokio::test]
855    async fn should_add_a_volume() {
856        let tempdir = TempDir::new().unwrap();
857
858        let id = Uuid::new_v4();
859        let deployment_id = Uuid::new_v4();
860
861        let client = Docker::connect().await.unwrap();
862        let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
863        let mut seq = Sequence::new();
864
865        device
866            .expect_clone()
867            .once()
868            .in_sequence(&mut seq)
869            .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
870
871        let endpoint = format!("/{id}/created");
872        device
873            .expect_set_property()
874            .once()
875            .in_sequence(&mut seq)
876            .with(
877                predicate::eq("io.edgehog.devicemanager.apps.AvailableVolumes"),
878                predicate::eq(endpoint),
879                predicate::eq(AstarteData::Boolean(false)),
880            )
881            .returning(|_, _, _| Ok(()));
882
883        let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
884
885        let create_volume_req =
886            create_volume_request_event(id, deployment_id, "local", &["foo=bar", "some="]);
887
888        let req = ContainerRequest::from_event(create_volume_req).unwrap();
889
890        handle.on_event(req).await.unwrap();
891        let event = service.events.recv().await.unwrap();
892        service.on_event(event).await;
893
894        let resource = service.store.find_volume(id).await.unwrap().unwrap();
895
896        let exp = Volume {
897            id: VolumeId::new(id),
898            driver: "local".to_string(),
899            driver_opts: HashMap::from([
900                ("foo".to_string(), "bar".to_string()),
901                ("some".to_string(), "".to_string()),
902            ]),
903        };
904
905        assert_eq!(resource.volume, exp);
906    }
907
908    #[tokio::test]
909    async fn should_add_a_network() {
910        let tempdir = TempDir::new().unwrap();
911
912        let id = Uuid::new_v4();
913        let deployment_id = Uuid::new_v4();
914
915        let client = Docker::connect().await.unwrap();
916        let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
917        let mut seq = Sequence::new();
918
919        device
920            .expect_clone()
921            .once()
922            .in_sequence(&mut seq)
923            .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
924
925        let endpoint = format!("/{id}/created");
926        device
927            .expect_set_property()
928            .once()
929            .in_sequence(&mut seq)
930            .with(
931                predicate::eq("io.edgehog.devicemanager.apps.AvailableNetworks"),
932                predicate::eq(endpoint),
933                predicate::eq(AstarteData::Boolean(false)),
934            )
935            .returning(|_, _, _| Ok(()));
936
937        let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
938
939        let create_network_req = create_network_request_event(id, deployment_id, "bridged", &[]);
940
941        let req = ContainerRequest::from_event(create_network_req).unwrap();
942
943        handle.on_event(req).await.unwrap();
944
945        let event = service.events.recv().await.unwrap();
946        service.on_event(event).await;
947
948        let resource = service.store.find_network(id).await.unwrap().unwrap();
949
950        let exp = Network {
951            id: NetworkId::new(None, id),
952            driver: "bridged".to_string(),
953            internal: false,
954            enable_ipv6: false,
955            driver_opts: HashMap::new(),
956        };
957
958        assert_eq!(resource.network, exp);
959    }
960
961    #[tokio::test]
962    async fn should_add_a_container() {
963        let tempdir = TempDir::new().unwrap();
964
965        let id = Uuid::new_v4();
966        let image_id = Uuid::new_v4();
967        let network_id = Uuid::new_v4();
968        let device_mapping_id = Uuid::new_v4();
969        let deployment_id = Uuid::new_v4();
970
971        let client = Docker::connect().await.unwrap();
972        let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
973        let mut seq = Sequence::new();
974
975        device
976            .expect_clone()
977            .once()
978            .in_sequence(&mut seq)
979            .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
980
981        let image_path = format!("/{image_id}/pulled");
982        device
983            .expect_set_property()
984            .once()
985            .in_sequence(&mut seq)
986            .with(
987                predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
988                predicate::eq(image_path),
989                predicate::eq(AstarteData::Boolean(false)),
990            )
991            .returning(|_, _, _| Ok(()));
992
993        let endpoint = format!("/{network_id}/created");
994        device
995            .expect_set_property()
996            .once()
997            .in_sequence(&mut seq)
998            .with(
999                predicate::eq("io.edgehog.devicemanager.apps.AvailableNetworks"),
1000                predicate::eq(endpoint),
1001                predicate::eq(AstarteData::Boolean(false)),
1002            )
1003            .returning(|_, _, _| Ok(()));
1004
1005        device
1006            .expect_set_property()
1007            .once()
1008            .in_sequence(&mut seq)
1009            .with(
1010                predicate::eq("io.edgehog.devicemanager.apps.AvailableDeviceMappings"),
1011                predicate::eq(format!("/{device_mapping_id}/present")),
1012                predicate::eq(AstarteData::Boolean(true)),
1013            )
1014            .returning(|_, _, _| Ok(()));
1015
1016        let endpoint = format!("/{id}/status");
1017        device
1018            .expect_set_property()
1019            .once()
1020            .in_sequence(&mut seq)
1021            .with(
1022                predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1023                predicate::eq(endpoint),
1024                predicate::eq(AstarteData::from("Received")),
1025            )
1026            .returning(|_, _, _| Ok(()));
1027
1028        let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
1029
1030        // Image
1031        let reference = "docker.io/nginx:stable-alpine-slim";
1032        let create_image_req = create_image_request_event(image_id, deployment_id, reference, "");
1033
1034        let req = ContainerRequest::from_event(create_image_req).unwrap();
1035        handle.on_event(req).await.unwrap();
1036        let event = service.events.recv().await.unwrap();
1037        service.on_event(event).await;
1038
1039        // Network
1040        let create_network_req =
1041            create_network_request_event(network_id, deployment_id, "bridged", &[]);
1042        let req = ContainerRequest::from_event(create_network_req).unwrap();
1043        handle.on_event(req).await.unwrap();
1044        let event = service.events.recv().await.unwrap();
1045        service.on_event(event).await;
1046
1047        // Device mapping
1048        let create_device_mapping_req = create_device_mapping_request_event(
1049            device_mapping_id,
1050            deployment_id,
1051            "/dev/tty12",
1052            "/dev/tty12",
1053        );
1054        let req = ContainerRequest::from_event(create_device_mapping_req).unwrap();
1055        handle.on_event(req).await.unwrap();
1056        let event = service.events.recv().await.unwrap();
1057        service.on_event(event).await;
1058
1059        // Container
1060        let create_container_req = create_container_request_event(
1061            id,
1062            deployment_id,
1063            image_id,
1064            "image",
1065            &[network_id],
1066            &[device_mapping_id.to_string()],
1067        );
1068
1069        let req = ContainerRequest::from_event(create_container_req).unwrap();
1070
1071        handle.on_event(req).await.unwrap();
1072
1073        let event = service.events.recv().await.unwrap();
1074        service.on_event(event).await;
1075
1076        let resource = service.store.find_container(id).await.unwrap().unwrap();
1077
1078        let exp = Container {
1079            id: ContainerId::new(None, id),
1080            image: "docker.io/nginx:stable-alpine-slim".to_string(),
1081            network_mode: "bridge".to_string(),
1082            networks: vec![network_id.to_string()],
1083            hostname: Some("hostname".to_string()),
1084            restart_policy: RestartPolicy::No,
1085            env: vec!["env".to_string()],
1086            binds: vec!["binds".to_string()],
1087            port_bindings: PortBindingMap(HashMap::from_iter([(
1088                "80/tcp".to_string(),
1089                vec![Binding {
1090                    host_ip: None,
1091                    host_port: Some(80),
1092                }],
1093            )])),
1094            extra_hosts: vec!["host.docker.internal:host-gateway".to_string()],
1095            cap_add: vec!["CAP_CHOWN".to_string()],
1096            cap_drop: vec!["CAP_KILL".to_string()],
1097            device_mappings: vec![DeviceMapping {
1098                path_on_host: "/dev/tty12".to_string(),
1099                path_in_container: "/dev/tty12".to_string(),
1100                cgroup_permissions: Some("msv".to_string()),
1101            }],
1102            privileged: false,
1103            cpu_period: Some(1000),
1104            cpu_quota: Some(100),
1105            cpu_realtime_period: Some(1000),
1106            cpu_realtime_runtime: Some(100),
1107            memory: Some(4096),
1108            memory_reservation: Some(1024),
1109            memory_swap: Some(8192),
1110            memory_swappiness: Some(50),
1111            volume_driver: Some("local".to_string()),
1112            read_only_rootfs: true,
1113            storage_opt: HashMap::from_iter([("size".to_string(), "1024k".to_string())]),
1114        };
1115
1116        assert_eq!(resource, exp);
1117    }
1118
1119    #[tokio::test]
1120    async fn should_start_deployment() {
1121        let tempdir = TempDir::new().unwrap();
1122
1123        let image_id = Uuid::new_v4();
1124        let container_id = Uuid::new_v4();
1125        let deployment_id = Uuid::new_v4();
1126
1127        let reference = "docker.io/nginx:stable-alpine-slim";
1128
1129        let client = docker_mock!(docker::Client::connect_with_local_defaults().unwrap(), {
1130            use self::docker::tests::not_found_response;
1131            use futures::StreamExt;
1132
1133            let mut mock = docker::Client::new();
1134            let mut seq = mockall::Sequence::new();
1135
1136            mock.expect_inspect_image()
1137                .withf(move |name| name == reference)
1138                .once()
1139                .in_sequence(&mut seq)
1140                .returning(|_| Err(not_found_response()));
1141
1142            mock.expect_create_image()
1143                .with(
1144                    predicate::eq(Some(CreateImageOptions {
1145                        from_image: Some(reference.to_string()),
1146                        ..Default::default()
1147                    })),
1148                    predicate::always(),
1149                    predicate::eq(None),
1150                )
1151                .once()
1152                .in_sequence(&mut seq)
1153                .returning(|_, _, _| futures::stream::empty().boxed());
1154
1155            mock.expect_inspect_image()
1156                .withf(move |name| name ==reference)
1157                .once()
1158                .in_sequence(&mut seq)
1159                .returning(|_| {
1160                    Ok(bollard::models::ImageInspect {
1161                    id: Some(
1162                        "sha256:d2c94e258dcb3c5ac2798d32e1249e42ef01cba4841c2234249495f87264ac5a".to_string(),
1163                    ),
1164                    ..Default::default()
1165                })
1166                });
1167
1168            let container_name = container_id.to_string();
1169            mock.expect_inspect_container()
1170                .withf(move |name, _option| name == container_name)
1171                .once()
1172                .in_sequence(&mut seq)
1173                .returning(|_, _| Err(docker::tests::not_found_response()));
1174
1175            let name_exp = container_id.to_string();
1176            mock.expect_create_container()
1177                .withf(move |option, config| {
1178                    option
1179                        .as_ref()
1180                        .and_then(|opt| opt.name.as_ref())
1181                        .is_some_and(|name| *name == name_exp)
1182                        && config
1183                            .image
1184                            .as_ref()
1185                            .is_some_and(|image| image == reference)
1186                })
1187                .once()
1188                .in_sequence(&mut seq)
1189                .returning(move |_, _| {
1190                    Ok(bollard::models::ContainerCreateResponse {
1191                        id: "container_id".to_string(),
1192                        warnings: Vec::new(),
1193                    })
1194                });
1195
1196            mock.expect_start_container()
1197                .withf(move |id, _| id == "container_id")
1198                .once()
1199                .in_sequence(&mut seq)
1200                .returning(move |_, _| Ok(()));
1201
1202            mock
1203        });
1204        let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
1205        let mut seq = Sequence::new();
1206
1207        device
1208            .expect_clone()
1209            .once()
1210            .in_sequence(&mut seq)
1211            .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
1212
1213        let image_path = format!("/{image_id}/pulled");
1214        device
1215            .expect_set_property()
1216            .once()
1217            .in_sequence(&mut seq)
1218            .with(
1219                predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
1220                predicate::eq(image_path),
1221                predicate::eq(AstarteData::Boolean(false)),
1222            )
1223            .returning(|_, _, _| Ok(()));
1224
1225        let endpoint = format!("/{container_id}/status");
1226        device
1227            .expect_set_property()
1228            .once()
1229            .in_sequence(&mut seq)
1230            .with(
1231                predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1232                predicate::eq(endpoint),
1233                predicate::eq(AstarteData::from("Received")),
1234            )
1235            .returning(|_, _, _| Ok(()));
1236
1237        let endpoint = format!("/{deployment_id}/status");
1238        device
1239            .expect_set_property()
1240            .once()
1241            .in_sequence(&mut seq)
1242            .with(
1243                predicate::eq("io.edgehog.devicemanager.apps.AvailableDeployments"),
1244                predicate::eq(endpoint),
1245                predicate::eq(AstarteData::String("Stopped".to_string())),
1246            )
1247            .returning(|_, _, _| Ok(()));
1248
1249        device
1250            .expect_send_object_with_timestamp()
1251            .once()
1252            .in_sequence(&mut seq)
1253            .with(
1254                predicate::eq("io.edgehog.devicemanager.apps.DeploymentEvent"),
1255                predicate::eq(format!("/{deployment_id}")),
1256                predicate::eq(AstarteObject::from_iter([
1257                    ("status".to_string(), AstarteData::from("Starting")),
1258                    ("message".to_string(), AstarteData::from("")),
1259                ])),
1260                predicate::always(),
1261            )
1262            .returning(|_, _, _, _| Ok(()));
1263
1264        let image_path = format!("/{image_id}/pulled");
1265        device
1266            .expect_set_property()
1267            .once()
1268            .in_sequence(&mut seq)
1269            .withf(move |interface, path, value| {
1270                interface == "io.edgehog.devicemanager.apps.AvailableImages"
1271                    && path == (image_path)
1272                    && *value == AstarteData::Boolean(true)
1273            })
1274            .returning(|_, _, _| Ok(()));
1275
1276        let endpoint = format!("/{container_id}/status");
1277        device
1278            .expect_set_property()
1279            .once()
1280            .in_sequence(&mut seq)
1281            .withf(move |interface, path, value| {
1282                interface == "io.edgehog.devicemanager.apps.AvailableContainers"
1283                    && path == endpoint
1284                    && *value == ContainerStatus::Created.to_string()
1285            })
1286            .returning(|_, _, _| Ok(()));
1287
1288        let endpoint = format!("/{container_id}/status");
1289        device
1290            .expect_set_property()
1291            .once()
1292            .in_sequence(&mut seq)
1293            .withf(move |interface, path, value| {
1294                interface == "io.edgehog.devicemanager.apps.AvailableContainers"
1295                    && path == endpoint
1296                    && *value == ContainerStatus::Running.to_string()
1297            })
1298            .returning(|_, _, _| Ok(()));
1299
1300        let endpoint = format!("/{deployment_id}/status");
1301        device
1302            .expect_set_property()
1303            .once()
1304            .in_sequence(&mut seq)
1305            .withf(move |interface, path, value| {
1306                interface == "io.edgehog.devicemanager.apps.AvailableDeployments"
1307                    && path == endpoint
1308                    && *value == DeploymentStatus::Started.to_string()
1309            })
1310            .returning(|_, _, _| Ok(()));
1311
1312        device
1313            .expect_send_object_with_timestamp()
1314            .once()
1315            .in_sequence(&mut seq)
1316            .with(
1317                predicate::eq("io.edgehog.devicemanager.apps.DeploymentEvent"),
1318                predicate::eq(format!("/{deployment_id}")),
1319                predicate::eq(AstarteObject::from_iter([
1320                    ("status".to_string(), AstarteData::from("Started")),
1321                    ("message".to_string(), AstarteData::from("")),
1322                ])),
1323                predicate::always(),
1324            )
1325            .returning(|_, _, _, _| Ok(()));
1326
1327        let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
1328
1329        let create_image_req = create_image_request_event(image_id, deployment_id, reference, "");
1330
1331        let image_req = ContainerRequest::from_event(create_image_req).unwrap();
1332
1333        let create_container_req = create_container_request_event(
1334            container_id,
1335            deployment_id,
1336            image_id,
1337            reference,
1338            &Vec::<Uuid>::new(),
1339            &Vec::<Uuid>::new(),
1340        );
1341
1342        let container_req = ContainerRequest::from_event(create_container_req).unwrap();
1343
1344        let create_deployment_req = create_deployment_request_event(
1345            &deployment_id.to_string(),
1346            &[&container_id.to_string()],
1347        );
1348
1349        let deployment_req = ContainerRequest::from_event(create_deployment_req).unwrap();
1350
1351        let start = ContainerRequest::DeploymentCommand(DeploymentCommand {
1352            id: deployment_id,
1353            command: CommandValue::Start,
1354        });
1355
1356        handle.on_event(image_req).await.unwrap();
1357        handle.on_event(container_req).await.unwrap();
1358        handle.on_event(deployment_req).await.unwrap();
1359        handle.on_event(start).await.unwrap();
1360
1361        let image_event = service.events.recv().await.unwrap();
1362        service.on_event(image_event).await;
1363        let container_event = service.events.recv().await.unwrap();
1364        service.on_event(container_event).await;
1365        let deployment_event = service.events.recv().await.unwrap();
1366        service.on_event(deployment_event).await;
1367        let start_event = service.events.recv().await.unwrap();
1368        service.on_event(start_event).await;
1369    }
1370
1371    #[tokio::test]
1372    async fn should_delete_deployment_no_start() {
1373        let tempdir = TempDir::new().unwrap();
1374
1375        let image_id = Uuid::new_v4();
1376        let container_id = Uuid::new_v4();
1377        let deployment_id = Uuid::new_v4();
1378
1379        let reference = "docker.io/nginx:stable-alpine-slim";
1380
1381        let client = docker_mock!(docker::Client::connect_with_local_defaults().unwrap(), {
1382            use self::docker::tests::not_found_response;
1383
1384            let mut mock = docker::Client::new();
1385            let mut seq = mockall::Sequence::new();
1386
1387            let container_name = container_id.to_string();
1388            mock.expect_inspect_container()
1389                .withf(move |name, _option| name == container_name)
1390                .once()
1391                .in_sequence(&mut seq)
1392                .returning(|_, _| Err(docker::tests::not_found_response()));
1393
1394            mock.expect_inspect_image()
1395                .withf(move |name| name == reference)
1396                .once()
1397                .in_sequence(&mut seq)
1398                .returning(|_| Err(not_found_response()));
1399
1400            mock
1401        });
1402        let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
1403        let mut seq = Sequence::new();
1404
1405        device
1406            .expect_clone()
1407            .once()
1408            .in_sequence(&mut seq)
1409            .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
1410
1411        let image_path = format!("/{image_id}/pulled");
1412        device
1413            .expect_set_property()
1414            .once()
1415            .in_sequence(&mut seq)
1416            .with(
1417                predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
1418                predicate::eq(image_path),
1419                predicate::eq(AstarteData::Boolean(false)),
1420            )
1421            .returning(|_, _, _| Ok(()));
1422
1423        let endpoint = format!("/{container_id}/status");
1424        device
1425            .expect_set_property()
1426            .once()
1427            .in_sequence(&mut seq)
1428            .with(
1429                predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1430                predicate::eq(endpoint),
1431                predicate::eq(AstarteData::from("Received")),
1432            )
1433            .returning(|_, _, _| Ok(()));
1434
1435        let endpoint = format!("/{deployment_id}/status");
1436        device
1437            .expect_set_property()
1438            .once()
1439            .in_sequence(&mut seq)
1440            .with(
1441                predicate::eq("io.edgehog.devicemanager.apps.AvailableDeployments"),
1442                predicate::eq(endpoint),
1443                predicate::eq(AstarteData::String("Stopped".to_string())),
1444            )
1445            .returning(|_, _, _| Ok(()));
1446
1447        device
1448            .expect_send_object_with_timestamp()
1449            .once()
1450            .in_sequence(&mut seq)
1451            .with(
1452                predicate::eq("io.edgehog.devicemanager.apps.DeploymentEvent"),
1453                predicate::eq(format!("/{deployment_id}")),
1454                predicate::eq(AstarteObject::from_iter([
1455                    ("status".to_string(), AstarteData::from("Deleting")),
1456                    ("message".to_string(), AstarteData::from("")),
1457                ])),
1458                predicate::always(),
1459            )
1460            .returning(|_, _, _, _| Ok(()));
1461
1462        let endpoint = format!("/{container_id}/status");
1463        device
1464            .expect_unset_property()
1465            .once()
1466            .in_sequence(&mut seq)
1467            .with(
1468                predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1469                predicate::eq(endpoint),
1470            )
1471            .returning(|_, _| Ok(()));
1472
1473        let image_path = format!("/{image_id}/pulled");
1474        device
1475            .expect_unset_property()
1476            .once()
1477            .in_sequence(&mut seq)
1478            .with(
1479                predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
1480                predicate::eq(image_path),
1481            )
1482            .returning(|_, _| Ok(()));
1483
1484        let endpoint = format!("/{deployment_id}/status");
1485        device
1486            .expect_unset_property()
1487            .once()
1488            .in_sequence(&mut seq)
1489            .with(
1490                predicate::eq("io.edgehog.devicemanager.apps.AvailableDeployments"),
1491                predicate::eq(endpoint),
1492            )
1493            .returning(|_, _| Ok(()));
1494
1495        let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
1496
1497        let create_image_req = create_image_request_event(image_id, deployment_id, reference, "");
1498
1499        let image_req = ContainerRequest::from_event(create_image_req).unwrap();
1500
1501        let create_container_req = create_container_request_event(
1502            container_id,
1503            deployment_id,
1504            image_id,
1505            reference,
1506            &Vec::<Uuid>::new(),
1507            &Vec::<Uuid>::new(),
1508        );
1509
1510        let container_req = ContainerRequest::from_event(create_container_req).unwrap();
1511
1512        let create_deployment_req = create_deployment_request_event(
1513            &deployment_id.to_string(),
1514            &[&container_id.to_string()],
1515        );
1516
1517        let deployment_req = ContainerRequest::from_event(create_deployment_req).unwrap();
1518
1519        let delete = ContainerRequest::DeploymentCommand(DeploymentCommand {
1520            id: deployment_id,
1521            command: CommandValue::Delete,
1522        });
1523
1524        handle.on_event(image_req).await.unwrap();
1525        handle.on_event(container_req).await.unwrap();
1526        handle.on_event(deployment_req).await.unwrap();
1527        handle.on_event(delete).await.unwrap();
1528
1529        let image_event = service.events.recv().await.unwrap();
1530        service.on_event(image_event).await;
1531        let container_event = service.events.recv().await.unwrap();
1532        service.on_event(container_event).await;
1533        let deployment_event = service.events.recv().await.unwrap();
1534        service.on_event(deployment_event).await;
1535        let start_event = service.events.recv().await.unwrap();
1536        service.on_event(start_event).await;
1537    }
1538}