1use std::fmt::{Debug, Display};
22
23use astarte_device_sdk::event::FromEventError;
24use edgehog_store::{conversions::SqlUuid, models::containers::deployment::DeploymentStatus};
25use tokio::sync::mpsc;
26use tracing::{debug, error, info, instrument, warn};
27use uuid::Uuid;
28
29use crate::{
30 error::DockerError,
31 events::deployment::{DeploymentEvent, EventStatus},
32 properties::{deployment::AvailableDeployment, AvailableProp, Client},
33 requests::{
34 deployment::{CommandValue, DeploymentCommand, DeploymentUpdate},
35 ReqError,
36 },
37 resource::{
38 container::ContainerResource, deployment::Deployment,
39 device_mapping::DeviceMappingResource, image::ImageResource, network::NetworkResource,
40 volume::VolumeResource, Context, Create, Resource, ResourceError, State,
41 },
42 store::{StateStore, StoreError},
43 Docker,
44};
45
46use self::events::ContainerEvent;
47
48pub mod events;
49
50type Result<T> = std::result::Result<T, ServiceError>;
51
52#[non_exhaustive]
54#[derive(Debug, displaydoc::Display, thiserror::Error)]
55pub enum ServiceError {
56 FromEvent(#[from] FromEventError),
58 Docker(#[source] DockerError),
60 Astarte(#[from] astarte_device_sdk::Error),
62 Request(#[from] ReqError),
64 Store(#[from] StoreError),
66 Resource(#[from] ResourceError),
68}
69
70impl<T> From<T> for ServiceError
71where
72 T: Into<DockerError>,
73{
74 fn from(value: T) -> Self {
75 ServiceError::Docker(value.into())
76 }
77}
78
79#[derive(Debug)]
84pub struct Service<D> {
85 client: Docker,
86 device: D,
87 events: mpsc::UnboundedReceiver<ContainerEvent>,
89 store: StateStore,
90}
91
92impl<D> Service<D> {
93 #[doc(hidden)]
95 #[must_use]
96 pub fn new(
97 client: Docker,
98 device: D,
99 events: mpsc::UnboundedReceiver<ContainerEvent>,
100 store: StateStore,
101 ) -> Self {
102 Self {
103 client,
104 device,
105 events,
106 store,
107 }
108 }
109
110 fn context(&mut self, id: impl Into<Uuid>) -> Context<'_, D> {
111 Context {
112 id: id.into(),
113 store: &mut self.store,
114 device: &mut self.device,
115 client: &mut self.client,
116 }
117 }
118
119 #[instrument(skip_all)]
121 pub async fn init(&mut self) -> Result<()>
122 where
123 D: Client + Send + Sync + 'static,
124 {
125 self.publish_received().await?;
126
127 self.init_delete_deployments().await?;
130 self.init_stop_deployments().await?;
131 self.init_start_deployments().await?;
132
133 info!("init completed");
134
135 Ok(())
136 }
137
138 #[instrument(skip_all)]
139 async fn publish_received(&mut self) -> Result<()>
140 where
141 D: Client + Send + Sync + 'static,
142 {
143 for id in self.store.load_images_to_publish().await? {
144 ImageResource::publish(self.context(id)).await?;
145 }
146
147 for id in self.store.load_volumes_to_publish().await? {
148 VolumeResource::publish(self.context(id)).await?;
149 }
150
151 for id in self.store.load_networks_to_publish().await? {
152 NetworkResource::publish(self.context(id)).await?;
153 }
154
155 for id in self.store.load_device_mappings_to_publish().await? {
156 DeviceMappingResource::publish(self.context(id)).await?;
157 }
158
159 for id in self.store.load_containers_to_publish().await? {
160 ContainerResource::publish(self.context(id)).await?;
161 }
162
163 for id in self
164 .store
165 .load_deployments_in(DeploymentStatus::Received)
166 .await?
167 {
168 Deployment::publish(self.context(id)).await?;
169 }
170
171 Ok(())
172 }
173
174 #[instrument(skip_all)]
175 async fn init_start_deployments(&mut self) -> Result<()>
176 where
177 D: Client + Send + Sync + 'static,
178 {
179 for id in self
180 .store
181 .load_deployments_in(DeploymentStatus::Started)
182 .await?
183 {
184 self.start(*id).await;
185 }
186
187 Ok(())
188 }
189
190 #[instrument(skip_all)]
191 async fn init_stop_deployments(&mut self) -> Result<()>
192 where
193 D: Client + Send + Sync + 'static,
194 {
195 for id in self
196 .store
197 .load_deployments_in(DeploymentStatus::Stopped)
198 .await?
199 {
200 self.stop(*id).await;
201 }
202
203 Ok(())
204 }
205
206 #[instrument(skip_all)]
207 async fn init_delete_deployments(&mut self) -> Result<()>
208 where
209 D: Client + Send + Sync + 'static,
210 {
211 for id in self
212 .store
213 .load_deployments_in(DeploymentStatus::Deleted)
214 .await?
215 {
216 self.delete(*id).await;
217 }
218
219 Ok(())
220 }
221
222 #[instrument(skip_all)]
224 pub async fn handle_events(&mut self)
225 where
226 D: Client + Send + Sync + 'static,
227 {
228 while let Some(event) = self.events.recv().await {
229 self.on_event(event).await;
230 }
231
232 info!("event receiver disconnected");
233 }
234
235 #[instrument(skip_all)]
236 async fn on_event(&mut self, event: ContainerEvent)
237 where
238 D: Client + Send + Sync + 'static,
239 {
240 match event {
241 ContainerEvent::Resource {
242 resource,
243 deployment,
244 } => {
245 self.resource_req(resource, deployment).await;
246 }
247 ContainerEvent::DeploymentCmd(DeploymentCommand {
248 id,
249 command: CommandValue::Start,
250 }) => {
251 self.start(id).await;
252 }
253 ContainerEvent::DeploymentCmd(DeploymentCommand {
254 id,
255 command: CommandValue::Stop,
256 }) => {
257 self.stop(id).await;
258 }
259 ContainerEvent::DeploymentCmd(DeploymentCommand {
260 id,
261 command: CommandValue::Delete,
262 }) => {
263 self.delete(id).await;
264 }
265 ContainerEvent::DeploymentUpdate(from_to) => {
266 self.update(from_to).await;
267 }
268 ContainerEvent::Refresh(id) => {
269 self.refresh(id).await;
270 }
271 }
272 }
273
274 #[instrument(skip_all, fields(%id))]
275 async fn resource_req(&mut self, id: Id, deployment_id: Uuid)
276 where
277 D: Client + Send + Sync + 'static,
278 {
279 let res = match id.resource_type() {
280 ResourceType::Image => ImageResource::publish(self.context(*id.uuid())).await,
281 ResourceType::Volume => VolumeResource::publish(self.context(*id.uuid())).await,
282 ResourceType::Network => NetworkResource::publish(self.context(*id.uuid())).await,
283 ResourceType::DeviceMapping => {
284 DeviceMappingResource::publish(self.context(*id.uuid())).await
285 }
286 ResourceType::Container => ContainerResource::publish(self.context(*id.uuid())).await,
287 ResourceType::Deployment => Deployment::publish(self.context(*id.uuid())).await,
288 };
289
290 if let Err(err) = res {
291 let error = format!("{:#}", eyre::Report::new(err));
292 error!(error, "failed to create resource");
293
294 DeploymentEvent::new(EventStatus::Error, error)
295 .send(&deployment_id, &mut self.device)
296 .await;
297 }
298 }
299
300 #[instrument(skip(self))]
302 pub async fn start(&mut self, id: Uuid)
303 where
304 D: Client + Send + Sync + 'static,
305 {
306 let deployment = match self.store.find_complete_deployment(id).await {
307 Ok(Some(deployment)) => deployment,
308 Ok(None) => {
309 error!("{id} not found");
310
311 DeploymentEvent::new(EventStatus::Error, format!("{id} not found"))
312 .send(&id, &mut self.device)
313 .await;
314
315 return;
316 }
317 Err(err) => {
318 let err = format!("{:#}", eyre::Report::new(err));
319
320 error!(error = err, "couldn't start deployment");
321
322 DeploymentEvent::new(EventStatus::Error, err)
323 .send(&id, &mut self.device)
324 .await;
325
326 return;
327 }
328 };
329
330 info!("starting deployment");
331
332 DeploymentEvent::new(EventStatus::Starting, "")
333 .send(&id, &mut self.device)
334 .await;
335
336 if let Err(err) = self.start_deployment(id, deployment).await {
337 let err = format!("{:#}", eyre::Report::new(err));
338
339 error!(error = err, "couldn't start deployment");
340
341 DeploymentEvent::new(EventStatus::Error, err)
342 .send(&id, &mut self.device)
343 .await;
344
345 return;
346 }
347
348 DeploymentEvent::new(EventStatus::Started, "")
349 .send(&id, &mut self.device)
350 .await;
351
352 info!("deployment started");
353 }
354
355 async fn start_deployment(&mut self, deployment_id: Uuid, deployment: Deployment) -> Result<()>
356 where
357 D: Client + Send + Sync + 'static,
358 {
359 for id in deployment.images {
360 ImageResource::up(self.context(id)).await?;
361 }
362
363 for id in deployment.volumes {
364 VolumeResource::up(self.context(id)).await?;
365 }
366
367 for id in deployment.networks {
368 NetworkResource::up(self.context(id)).await?;
369 }
370
371 for id in deployment.containers {
372 let mut container = ContainerResource::up(self.context(id)).await?;
373
374 container.start(self.context(id)).await?;
375 }
376
377 AvailableDeployment::new(&deployment_id)
378 .send(
379 &mut self.device,
380 crate::properties::deployment::DeploymentStatus::Started,
381 )
382 .await
383 .map_err(ResourceError::Property)?;
384
385 Ok(())
386 }
387
388 #[instrument(skip(self))]
390 pub async fn stop(&mut self, id: Uuid)
391 where
392 D: Client + Send + Sync + 'static,
393 {
394 let containers = match self.store.load_deployment_containers(id).await {
395 Ok(Some(containers)) => containers,
396 Ok(None) => {
397 error!("{id} not found");
398
399 DeploymentEvent::new(EventStatus::Error, format!("{id} not found"))
400 .send(&id, &mut self.device)
401 .await;
402
403 return;
404 }
405 Err(err) => {
406 let err = format!("{:#}", eyre::Report::new(err));
407
408 error!(error = err, "couldn't start deployment");
409
410 DeploymentEvent::new(EventStatus::Error, err)
411 .send(&id, &mut self.device)
412 .await;
413
414 return;
415 }
416 };
417
418 info!("stopping deployment");
419
420 DeploymentEvent::new(EventStatus::Stopping, "")
421 .send(&id, &mut self.device)
422 .await;
423
424 if let Err(err) = self.stop_deployment(id, containers).await {
425 let err = format!("{:#}", eyre::Report::new(err));
426
427 error!(error = err, "couldn't stop deployment");
428
429 DeploymentEvent::new(EventStatus::Error, err)
430 .send(&id, &mut self.device)
431 .await;
432
433 return;
434 }
435
436 DeploymentEvent::new(EventStatus::Stopped, "")
437 .send(&id, &mut self.device)
438 .await;
439
440 info!("deployment stopped");
441 }
442
443 #[instrument(skip(self, containers))]
444 async fn stop_deployment(&mut self, deployment: Uuid, containers: Vec<SqlUuid>) -> Result<()>
445 where
446 D: Client + Send + Sync + 'static,
447 {
448 for id in containers {
449 debug!(%id, "stopping container");
450
451 let mut ctx = self.context(id);
452 let (state, mut container) = ContainerResource::fetch(&mut ctx).await?;
453
454 match state {
455 State::Missing => {
456 warn!(%id, "container already missing, cannot stop");
457
458 continue;
459 }
460 State::Created => {}
461 }
462
463 container.stop(ctx).await?;
464 }
465
466 AvailableDeployment::new(&deployment)
467 .send(
468 &mut self.device,
469 crate::properties::deployment::DeploymentStatus::Stopped,
470 )
471 .await
472 .map_err(ResourceError::from)?;
473
474 Ok(())
475 }
476
477 #[instrument(skip(self))]
479 pub async fn delete(&mut self, id: Uuid)
480 where
481 D: Client + Send + Sync + 'static,
482 {
483 let deployment = match self.store.find_deployment_for_delete(id).await {
484 Ok(Some(deployment)) => deployment,
485 Ok(None) => {
486 error!("{id} not found");
487
488 DeploymentEvent::new(EventStatus::Error, format!("{id} not found"))
489 .send(&id, &mut self.device)
490 .await;
491
492 return;
493 }
494 Err(err) => {
495 let err = format!("{:#}", eyre::Report::new(err));
496
497 error!(error = err, "couldn't delete deployment");
498
499 DeploymentEvent::new(EventStatus::Error, err)
500 .send(&id, &mut self.device)
501 .await;
502
503 return;
504 }
505 };
506
507 info!("deleting deployment");
508
509 DeploymentEvent::new(EventStatus::Deleting, "")
510 .send(&id, &mut self.device)
511 .await;
512
513 if let Err(err) = self.delete_deployment(id, deployment).await {
514 let err = format!("{:#}", eyre::Report::new(err));
515
516 error!(error = err, "couldn't delete deployment");
517
518 DeploymentEvent::new(EventStatus::Error, err)
519 .send(&id, &mut self.device)
520 .await;
521
522 return;
523 }
524
525 info!("deployment deleted");
526 }
527
528 async fn delete_deployment(&mut self, deployment_id: Uuid, deployment: Deployment) -> Result<()>
529 where
530 D: Client + Send + Sync + 'static,
531 {
532 for id in deployment.containers {
533 ContainerResource::down(self.context(id)).await?;
534 }
535
536 for id in deployment.volumes {
537 VolumeResource::down(self.context(id)).await?;
538 }
539
540 for id in deployment.networks {
541 NetworkResource::down(self.context(id)).await?;
542 }
543
544 for id in deployment.images {
545 ImageResource::down(self.context(id)).await?;
546 }
547
548 for id in deployment.device_mapping {
549 self.store.delete_device_mapping(id).await?;
550 }
551
552 AvailableDeployment::new(&deployment_id)
553 .unset(&mut self.device)
554 .await
555 .map_err(ResourceError::from)?;
556
557 self.store.delete_deployment(deployment_id).await?;
558
559 Ok(())
560 }
561
562 #[instrument(skip(self))]
564 pub async fn update(&mut self, bundle: DeploymentUpdate)
565 where
566 D: Client + Send + Sync + 'static,
567 {
568 let from_deployment = match self
569 .store
570 .load_deployment_containers_update_from(bundle)
571 .await
572 {
573 Ok(Some(deployment)) => deployment,
574 Ok(None) => {
575 let msg = format!("{} not found", bundle.from);
576 error!("{msg}");
577
578 DeploymentEvent::new(EventStatus::Error, msg)
579 .send(&bundle.from, &mut self.device)
580 .await;
581
582 return;
583 }
584 Err(err) => {
585 let err = format!("{:#}", eyre::Report::new(err));
586
587 error!(error = err, "couldn't update deployment");
588
589 DeploymentEvent::new(EventStatus::Error, err)
590 .send(&bundle.from, &mut self.device)
591 .await;
592
593 return;
594 }
595 };
596
597 let to_deployment = match self.store.find_complete_deployment(bundle.to).await {
598 Ok(Some(deployment)) => deployment,
599 Ok(None) => {
600 let msg = format!("{} not found", bundle.to);
601 error!("{msg}");
602
603 DeploymentEvent::new(EventStatus::Error, msg)
604 .send(&bundle.to, &mut self.device)
605 .await;
606
607 return;
608 }
609 Err(err) => {
610 let err = format!("{:#}", eyre::Report::new(err));
611
612 error!(error = err, "couldn't update deployment");
613
614 DeploymentEvent::new(EventStatus::Error, err)
615 .send(&bundle.to, &mut self.device)
616 .await;
617
618 return;
619 }
620 };
621
622 info!("updating deployment");
623
624 DeploymentEvent::new(EventStatus::Updating, "")
625 .send(&bundle.from, &mut self.device)
626 .await;
627
628 if let Err(err) = self
630 .update_deployment(bundle, from_deployment, to_deployment)
631 .await
632 {
633 let err = format!("{:#}", eyre::Report::new(err));
634
635 error!(error = err, "couldn't update deployment");
636
637 DeploymentEvent::new(EventStatus::Error, err)
638 .send(&bundle.from, &mut self.device)
639 .await;
640
641 return;
642 }
643
644 info!("deployment updated");
645 }
646
647 async fn update_deployment(
648 &mut self,
649 bundle: DeploymentUpdate,
650 to_stop: Vec<SqlUuid>,
651 to_start: Deployment,
652 ) -> Result<()>
653 where
654 D: Client + Send + Sync + 'static,
655 {
656 self.stop_deployment(bundle.from, to_stop).await?;
657
658 self.start_deployment(bundle.to, to_start).await?;
659
660 Ok(())
661 }
662
663 #[instrument(skip(self))]
664 async fn refresh(&mut self, id: Id)
665 where
666 D: Client + Send + Sync + 'static,
667 {
668 let mut ctx = self.context(*id.uuid());
669 let res = match id.resource_type() {
670 ResourceType::Image => ImageResource::refresh(&mut ctx).await,
671 ResourceType::Volume => VolumeResource::refresh(&mut ctx).await,
672 ResourceType::Network => NetworkResource::refresh(&mut ctx).await,
673 ResourceType::Container => ContainerResource::refresh(&mut ctx).await,
674 ResourceType::Deployment | ResourceType::DeviceMapping => {
675 debug!("nothing to refresh");
676
677 return;
678 }
679 };
680
681 if let Err(err) = res {
682 error!(
683 error = format!("{:#}", eyre::Report::new(err)),
684 "couldn't refresh resource status"
685 );
686 }
687 }
688}
689
690#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
692pub struct Id {
693 rt: ResourceType,
694 id: Uuid,
695}
696
697impl Id {
698 pub fn new(rt: ResourceType, id: Uuid) -> Self {
700 Self { rt, id }
701 }
702
703 pub(crate) fn uuid(&self) -> &Uuid {
704 &self.id
705 }
706
707 fn resource_type(&self) -> ResourceType {
708 self.rt
709 }
710}
711
712impl Display for Id {
713 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
714 write!(f, "{} ({})", self.rt, self.id)
715 }
716}
717
718#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
720pub enum ResourceType {
721 Image,
723 Volume,
725 Network,
727 DeviceMapping,
729 Container,
731 Deployment,
733}
734
735impl Display for ResourceType {
736 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
737 match self {
738 ResourceType::Image => write!(f, "Image"),
739 ResourceType::Volume => write!(f, "Volume"),
740 ResourceType::Network => write!(f, "Network"),
741 ResourceType::DeviceMapping => write!(f, "DeviceMapping"),
742 ResourceType::Container => write!(f, "Container"),
743 ResourceType::Deployment => write!(f, "Deployment"),
744 }
745 }
746}
747
748#[cfg(test)]
749mod tests {
750 use std::collections::HashMap;
751 use std::vec;
752
753 use astarte_device_sdk::aggregate::AstarteObject;
754 use astarte_device_sdk::store::SqliteStore;
755 use astarte_device_sdk::transport::mqtt::Mqtt;
756 use astarte_device_sdk::{AstarteData, FromEvent};
757 use astarte_device_sdk_mock::mockall::Sequence;
758 use astarte_device_sdk_mock::MockDeviceClient;
759 use bollard::query_parameters::CreateImageOptions;
760 use edgehog_store::db;
761 use mockall::predicate;
762 use pretty_assertions::assert_eq;
763 use tempfile::TempDir;
764
765 use crate::container::{Binding, Container, ContainerId, DeviceMapping, PortBindingMap};
766 use crate::image::Image;
767 use crate::network::{Network, NetworkId};
768 use crate::properties::container::ContainerStatus;
769 use crate::properties::deployment::DeploymentStatus;
770 use crate::requests::container::tests::create_container_request_event;
771 use crate::requests::container::RestartPolicy;
772 use crate::requests::deployment::tests::create_deployment_request_event;
773 use crate::requests::device_mapping::tests::create_device_mapping_request_event;
774 use crate::requests::image::tests::create_image_request_event;
775 use crate::requests::network::tests::create_network_request_event;
776 use crate::requests::volume::tests::create_volume_request_event;
777 use crate::requests::ContainerRequest;
778 use crate::volume::{Volume, VolumeId};
779 use crate::{docker, docker_mock};
780
781 use super::events::ServiceHandle;
782 use super::*;
783
784 async fn mock_service(
785 tempdir: &TempDir,
786 client: Docker,
787 device: MockDeviceClient<Mqtt<SqliteStore>>,
788 ) -> (
789 Service<MockDeviceClient<Mqtt<SqliteStore>>>,
790 ServiceHandle<MockDeviceClient<Mqtt<SqliteStore>>>,
791 ) {
792 let db_file = tempdir.path().join("state.db");
793 let db_file = db_file.to_str().unwrap();
794
795 let handle = db::Handle::open(db_file).await.unwrap();
796 let store = StateStore::new(handle);
797
798 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
799
800 let handle = ServiceHandle::new(device.clone(), store.clone(), tx);
801 let service = Service::new(client, device, rx, store);
802
803 (service, handle)
804 }
805
806 #[tokio::test]
807 async fn should_add_an_image() {
808 let tmpdir = TempDir::new().unwrap();
809
810 let id = Uuid::new_v4();
811 let deployment_id = Uuid::new_v4();
812
813 let client = Docker::connect().await.unwrap();
814 let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
815 let mut seq = Sequence::new();
816
817 device
818 .expect_clone()
819 .once()
820 .in_sequence(&mut seq)
821 .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
822
823 let image_path = format!("/{id}/pulled");
824 device
825 .expect_set_property()
826 .once()
827 .in_sequence(&mut seq)
828 .with(
829 predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
830 predicate::eq(image_path),
831 predicate::eq(AstarteData::Boolean(false)),
832 )
833 .returning(|_, _, _| Ok(()));
834
835 let (mut service, mut handle) = mock_service(&tmpdir, client, device).await;
836
837 let reference = "docker.io/nginx:stable-alpine-slim";
838 let create_image_req = create_image_request_event(id, deployment_id, reference, "");
839
840 let req = ContainerRequest::from_event(create_image_req).unwrap();
841
842 handle.on_event(req).await.unwrap();
843
844 let event = service.events.recv().await.unwrap();
845 service.on_event(event).await;
846
847 let resource = service.store.find_image(id).await.unwrap().unwrap();
848
849 let exp = Image::new(None, reference.to_string(), None);
850
851 assert_eq!(resource.image, exp);
852 }
853
854 #[tokio::test]
855 async fn should_add_a_volume() {
856 let tempdir = TempDir::new().unwrap();
857
858 let id = Uuid::new_v4();
859 let deployment_id = Uuid::new_v4();
860
861 let client = Docker::connect().await.unwrap();
862 let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
863 let mut seq = Sequence::new();
864
865 device
866 .expect_clone()
867 .once()
868 .in_sequence(&mut seq)
869 .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
870
871 let endpoint = format!("/{id}/created");
872 device
873 .expect_set_property()
874 .once()
875 .in_sequence(&mut seq)
876 .with(
877 predicate::eq("io.edgehog.devicemanager.apps.AvailableVolumes"),
878 predicate::eq(endpoint),
879 predicate::eq(AstarteData::Boolean(false)),
880 )
881 .returning(|_, _, _| Ok(()));
882
883 let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
884
885 let create_volume_req =
886 create_volume_request_event(id, deployment_id, "local", &["foo=bar", "some="]);
887
888 let req = ContainerRequest::from_event(create_volume_req).unwrap();
889
890 handle.on_event(req).await.unwrap();
891 let event = service.events.recv().await.unwrap();
892 service.on_event(event).await;
893
894 let resource = service.store.find_volume(id).await.unwrap().unwrap();
895
896 let exp = Volume {
897 id: VolumeId::new(id),
898 driver: "local".to_string(),
899 driver_opts: HashMap::from([
900 ("foo".to_string(), "bar".to_string()),
901 ("some".to_string(), "".to_string()),
902 ]),
903 };
904
905 assert_eq!(resource.volume, exp);
906 }
907
908 #[tokio::test]
909 async fn should_add_a_network() {
910 let tempdir = TempDir::new().unwrap();
911
912 let id = Uuid::new_v4();
913 let deployment_id = Uuid::new_v4();
914
915 let client = Docker::connect().await.unwrap();
916 let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
917 let mut seq = Sequence::new();
918
919 device
920 .expect_clone()
921 .once()
922 .in_sequence(&mut seq)
923 .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
924
925 let endpoint = format!("/{id}/created");
926 device
927 .expect_set_property()
928 .once()
929 .in_sequence(&mut seq)
930 .with(
931 predicate::eq("io.edgehog.devicemanager.apps.AvailableNetworks"),
932 predicate::eq(endpoint),
933 predicate::eq(AstarteData::Boolean(false)),
934 )
935 .returning(|_, _, _| Ok(()));
936
937 let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
938
939 let create_network_req = create_network_request_event(id, deployment_id, "bridged", &[]);
940
941 let req = ContainerRequest::from_event(create_network_req).unwrap();
942
943 handle.on_event(req).await.unwrap();
944
945 let event = service.events.recv().await.unwrap();
946 service.on_event(event).await;
947
948 let resource = service.store.find_network(id).await.unwrap().unwrap();
949
950 let exp = Network {
951 id: NetworkId::new(None, id),
952 driver: "bridged".to_string(),
953 internal: false,
954 enable_ipv6: false,
955 driver_opts: HashMap::new(),
956 };
957
958 assert_eq!(resource.network, exp);
959 }
960
961 #[tokio::test]
962 async fn should_add_a_container() {
963 let tempdir = TempDir::new().unwrap();
964
965 let id = Uuid::new_v4();
966 let image_id = Uuid::new_v4();
967 let network_id = Uuid::new_v4();
968 let device_mapping_id = Uuid::new_v4();
969 let deployment_id = Uuid::new_v4();
970
971 let client = Docker::connect().await.unwrap();
972 let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
973 let mut seq = Sequence::new();
974
975 device
976 .expect_clone()
977 .once()
978 .in_sequence(&mut seq)
979 .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
980
981 let image_path = format!("/{image_id}/pulled");
982 device
983 .expect_set_property()
984 .once()
985 .in_sequence(&mut seq)
986 .with(
987 predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
988 predicate::eq(image_path),
989 predicate::eq(AstarteData::Boolean(false)),
990 )
991 .returning(|_, _, _| Ok(()));
992
993 let endpoint = format!("/{network_id}/created");
994 device
995 .expect_set_property()
996 .once()
997 .in_sequence(&mut seq)
998 .with(
999 predicate::eq("io.edgehog.devicemanager.apps.AvailableNetworks"),
1000 predicate::eq(endpoint),
1001 predicate::eq(AstarteData::Boolean(false)),
1002 )
1003 .returning(|_, _, _| Ok(()));
1004
1005 device
1006 .expect_set_property()
1007 .once()
1008 .in_sequence(&mut seq)
1009 .with(
1010 predicate::eq("io.edgehog.devicemanager.apps.AvailableDeviceMappings"),
1011 predicate::eq(format!("/{device_mapping_id}/present")),
1012 predicate::eq(AstarteData::Boolean(true)),
1013 )
1014 .returning(|_, _, _| Ok(()));
1015
1016 let endpoint = format!("/{id}/status");
1017 device
1018 .expect_set_property()
1019 .once()
1020 .in_sequence(&mut seq)
1021 .with(
1022 predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1023 predicate::eq(endpoint),
1024 predicate::eq(AstarteData::from("Received")),
1025 )
1026 .returning(|_, _, _| Ok(()));
1027
1028 let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
1029
1030 let reference = "docker.io/nginx:stable-alpine-slim";
1032 let create_image_req = create_image_request_event(image_id, deployment_id, reference, "");
1033
1034 let req = ContainerRequest::from_event(create_image_req).unwrap();
1035 handle.on_event(req).await.unwrap();
1036 let event = service.events.recv().await.unwrap();
1037 service.on_event(event).await;
1038
1039 let create_network_req =
1041 create_network_request_event(network_id, deployment_id, "bridged", &[]);
1042 let req = ContainerRequest::from_event(create_network_req).unwrap();
1043 handle.on_event(req).await.unwrap();
1044 let event = service.events.recv().await.unwrap();
1045 service.on_event(event).await;
1046
1047 let create_device_mapping_req = create_device_mapping_request_event(
1049 device_mapping_id,
1050 deployment_id,
1051 "/dev/tty12",
1052 "/dev/tty12",
1053 );
1054 let req = ContainerRequest::from_event(create_device_mapping_req).unwrap();
1055 handle.on_event(req).await.unwrap();
1056 let event = service.events.recv().await.unwrap();
1057 service.on_event(event).await;
1058
1059 let create_container_req = create_container_request_event(
1061 id,
1062 deployment_id,
1063 image_id,
1064 "image",
1065 &[network_id],
1066 &[device_mapping_id.to_string()],
1067 );
1068
1069 let req = ContainerRequest::from_event(create_container_req).unwrap();
1070
1071 handle.on_event(req).await.unwrap();
1072
1073 let event = service.events.recv().await.unwrap();
1074 service.on_event(event).await;
1075
1076 let resource = service.store.find_container(id).await.unwrap().unwrap();
1077
1078 let exp = Container {
1079 id: ContainerId::new(None, id),
1080 image: "docker.io/nginx:stable-alpine-slim".to_string(),
1081 network_mode: "bridge".to_string(),
1082 networks: vec![network_id.to_string()],
1083 hostname: Some("hostname".to_string()),
1084 restart_policy: RestartPolicy::No,
1085 env: vec!["env".to_string()],
1086 binds: vec!["binds".to_string()],
1087 port_bindings: PortBindingMap(HashMap::from_iter([(
1088 "80/tcp".to_string(),
1089 vec![Binding {
1090 host_ip: None,
1091 host_port: Some(80),
1092 }],
1093 )])),
1094 extra_hosts: vec!["host.docker.internal:host-gateway".to_string()],
1095 cap_add: vec!["CAP_CHOWN".to_string()],
1096 cap_drop: vec!["CAP_KILL".to_string()],
1097 device_mappings: vec![DeviceMapping {
1098 path_on_host: "/dev/tty12".to_string(),
1099 path_in_container: "/dev/tty12".to_string(),
1100 cgroup_permissions: Some("msv".to_string()),
1101 }],
1102 privileged: false,
1103 cpu_period: Some(1000),
1104 cpu_quota: Some(100),
1105 cpu_realtime_period: Some(1000),
1106 cpu_realtime_runtime: Some(100),
1107 memory: Some(4096),
1108 memory_reservation: Some(1024),
1109 memory_swap: Some(8192),
1110 memory_swappiness: Some(50),
1111 volume_driver: Some("local".to_string()),
1112 read_only_rootfs: true,
1113 storage_opt: HashMap::from_iter([("size".to_string(), "1024k".to_string())]),
1114 };
1115
1116 assert_eq!(resource, exp);
1117 }
1118
1119 #[tokio::test]
1120 async fn should_start_deployment() {
1121 let tempdir = TempDir::new().unwrap();
1122
1123 let image_id = Uuid::new_v4();
1124 let container_id = Uuid::new_v4();
1125 let deployment_id = Uuid::new_v4();
1126
1127 let reference = "docker.io/nginx:stable-alpine-slim";
1128
1129 let client = docker_mock!(docker::Client::connect_with_local_defaults().unwrap(), {
1130 use self::docker::tests::not_found_response;
1131 use futures::StreamExt;
1132
1133 let mut mock = docker::Client::new();
1134 let mut seq = mockall::Sequence::new();
1135
1136 mock.expect_inspect_image()
1137 .withf(move |name| name == reference)
1138 .once()
1139 .in_sequence(&mut seq)
1140 .returning(|_| Err(not_found_response()));
1141
1142 mock.expect_create_image()
1143 .with(
1144 predicate::eq(Some(CreateImageOptions {
1145 from_image: Some(reference.to_string()),
1146 ..Default::default()
1147 })),
1148 predicate::always(),
1149 predicate::eq(None),
1150 )
1151 .once()
1152 .in_sequence(&mut seq)
1153 .returning(|_, _, _| futures::stream::empty().boxed());
1154
1155 mock.expect_inspect_image()
1156 .withf(move |name| name ==reference)
1157 .once()
1158 .in_sequence(&mut seq)
1159 .returning(|_| {
1160 Ok(bollard::models::ImageInspect {
1161 id: Some(
1162 "sha256:d2c94e258dcb3c5ac2798d32e1249e42ef01cba4841c2234249495f87264ac5a".to_string(),
1163 ),
1164 ..Default::default()
1165 })
1166 });
1167
1168 let container_name = container_id.to_string();
1169 mock.expect_inspect_container()
1170 .withf(move |name, _option| name == container_name)
1171 .once()
1172 .in_sequence(&mut seq)
1173 .returning(|_, _| Err(docker::tests::not_found_response()));
1174
1175 let name_exp = container_id.to_string();
1176 mock.expect_create_container()
1177 .withf(move |option, config| {
1178 option
1179 .as_ref()
1180 .and_then(|opt| opt.name.as_ref())
1181 .is_some_and(|name| *name == name_exp)
1182 && config
1183 .image
1184 .as_ref()
1185 .is_some_and(|image| image == reference)
1186 })
1187 .once()
1188 .in_sequence(&mut seq)
1189 .returning(move |_, _| {
1190 Ok(bollard::models::ContainerCreateResponse {
1191 id: "container_id".to_string(),
1192 warnings: Vec::new(),
1193 })
1194 });
1195
1196 mock.expect_start_container()
1197 .withf(move |id, _| id == "container_id")
1198 .once()
1199 .in_sequence(&mut seq)
1200 .returning(move |_, _| Ok(()));
1201
1202 mock
1203 });
1204 let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
1205 let mut seq = Sequence::new();
1206
1207 device
1208 .expect_clone()
1209 .once()
1210 .in_sequence(&mut seq)
1211 .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
1212
1213 let image_path = format!("/{image_id}/pulled");
1214 device
1215 .expect_set_property()
1216 .once()
1217 .in_sequence(&mut seq)
1218 .with(
1219 predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
1220 predicate::eq(image_path),
1221 predicate::eq(AstarteData::Boolean(false)),
1222 )
1223 .returning(|_, _, _| Ok(()));
1224
1225 let endpoint = format!("/{container_id}/status");
1226 device
1227 .expect_set_property()
1228 .once()
1229 .in_sequence(&mut seq)
1230 .with(
1231 predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1232 predicate::eq(endpoint),
1233 predicate::eq(AstarteData::from("Received")),
1234 )
1235 .returning(|_, _, _| Ok(()));
1236
1237 let endpoint = format!("/{deployment_id}/status");
1238 device
1239 .expect_set_property()
1240 .once()
1241 .in_sequence(&mut seq)
1242 .with(
1243 predicate::eq("io.edgehog.devicemanager.apps.AvailableDeployments"),
1244 predicate::eq(endpoint),
1245 predicate::eq(AstarteData::String("Stopped".to_string())),
1246 )
1247 .returning(|_, _, _| Ok(()));
1248
1249 device
1250 .expect_send_object_with_timestamp()
1251 .once()
1252 .in_sequence(&mut seq)
1253 .with(
1254 predicate::eq("io.edgehog.devicemanager.apps.DeploymentEvent"),
1255 predicate::eq(format!("/{deployment_id}")),
1256 predicate::eq(AstarteObject::from_iter([
1257 ("status".to_string(), AstarteData::from("Starting")),
1258 ("message".to_string(), AstarteData::from("")),
1259 ])),
1260 predicate::always(),
1261 )
1262 .returning(|_, _, _, _| Ok(()));
1263
1264 let image_path = format!("/{image_id}/pulled");
1265 device
1266 .expect_set_property()
1267 .once()
1268 .in_sequence(&mut seq)
1269 .withf(move |interface, path, value| {
1270 interface == "io.edgehog.devicemanager.apps.AvailableImages"
1271 && path == (image_path)
1272 && *value == AstarteData::Boolean(true)
1273 })
1274 .returning(|_, _, _| Ok(()));
1275
1276 let endpoint = format!("/{container_id}/status");
1277 device
1278 .expect_set_property()
1279 .once()
1280 .in_sequence(&mut seq)
1281 .withf(move |interface, path, value| {
1282 interface == "io.edgehog.devicemanager.apps.AvailableContainers"
1283 && path == endpoint
1284 && *value == ContainerStatus::Created.to_string()
1285 })
1286 .returning(|_, _, _| Ok(()));
1287
1288 let endpoint = format!("/{container_id}/status");
1289 device
1290 .expect_set_property()
1291 .once()
1292 .in_sequence(&mut seq)
1293 .withf(move |interface, path, value| {
1294 interface == "io.edgehog.devicemanager.apps.AvailableContainers"
1295 && path == endpoint
1296 && *value == ContainerStatus::Running.to_string()
1297 })
1298 .returning(|_, _, _| Ok(()));
1299
1300 let endpoint = format!("/{deployment_id}/status");
1301 device
1302 .expect_set_property()
1303 .once()
1304 .in_sequence(&mut seq)
1305 .withf(move |interface, path, value| {
1306 interface == "io.edgehog.devicemanager.apps.AvailableDeployments"
1307 && path == endpoint
1308 && *value == DeploymentStatus::Started.to_string()
1309 })
1310 .returning(|_, _, _| Ok(()));
1311
1312 device
1313 .expect_send_object_with_timestamp()
1314 .once()
1315 .in_sequence(&mut seq)
1316 .with(
1317 predicate::eq("io.edgehog.devicemanager.apps.DeploymentEvent"),
1318 predicate::eq(format!("/{deployment_id}")),
1319 predicate::eq(AstarteObject::from_iter([
1320 ("status".to_string(), AstarteData::from("Started")),
1321 ("message".to_string(), AstarteData::from("")),
1322 ])),
1323 predicate::always(),
1324 )
1325 .returning(|_, _, _, _| Ok(()));
1326
1327 let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
1328
1329 let create_image_req = create_image_request_event(image_id, deployment_id, reference, "");
1330
1331 let image_req = ContainerRequest::from_event(create_image_req).unwrap();
1332
1333 let create_container_req = create_container_request_event(
1334 container_id,
1335 deployment_id,
1336 image_id,
1337 reference,
1338 &Vec::<Uuid>::new(),
1339 &Vec::<Uuid>::new(),
1340 );
1341
1342 let container_req = ContainerRequest::from_event(create_container_req).unwrap();
1343
1344 let create_deployment_req = create_deployment_request_event(
1345 &deployment_id.to_string(),
1346 &[&container_id.to_string()],
1347 );
1348
1349 let deployment_req = ContainerRequest::from_event(create_deployment_req).unwrap();
1350
1351 let start = ContainerRequest::DeploymentCommand(DeploymentCommand {
1352 id: deployment_id,
1353 command: CommandValue::Start,
1354 });
1355
1356 handle.on_event(image_req).await.unwrap();
1357 handle.on_event(container_req).await.unwrap();
1358 handle.on_event(deployment_req).await.unwrap();
1359 handle.on_event(start).await.unwrap();
1360
1361 let image_event = service.events.recv().await.unwrap();
1362 service.on_event(image_event).await;
1363 let container_event = service.events.recv().await.unwrap();
1364 service.on_event(container_event).await;
1365 let deployment_event = service.events.recv().await.unwrap();
1366 service.on_event(deployment_event).await;
1367 let start_event = service.events.recv().await.unwrap();
1368 service.on_event(start_event).await;
1369 }
1370
1371 #[tokio::test]
1372 async fn should_delete_deployment_no_start() {
1373 let tempdir = TempDir::new().unwrap();
1374
1375 let image_id = Uuid::new_v4();
1376 let container_id = Uuid::new_v4();
1377 let deployment_id = Uuid::new_v4();
1378
1379 let reference = "docker.io/nginx:stable-alpine-slim";
1380
1381 let client = docker_mock!(docker::Client::connect_with_local_defaults().unwrap(), {
1382 use self::docker::tests::not_found_response;
1383
1384 let mut mock = docker::Client::new();
1385 let mut seq = mockall::Sequence::new();
1386
1387 let container_name = container_id.to_string();
1388 mock.expect_inspect_container()
1389 .withf(move |name, _option| name == container_name)
1390 .once()
1391 .in_sequence(&mut seq)
1392 .returning(|_, _| Err(docker::tests::not_found_response()));
1393
1394 mock.expect_inspect_image()
1395 .withf(move |name| name == reference)
1396 .once()
1397 .in_sequence(&mut seq)
1398 .returning(|_| Err(not_found_response()));
1399
1400 mock
1401 });
1402 let mut device = MockDeviceClient::<Mqtt<SqliteStore>>::new();
1403 let mut seq = Sequence::new();
1404
1405 device
1406 .expect_clone()
1407 .once()
1408 .in_sequence(&mut seq)
1409 .returning(MockDeviceClient::<Mqtt<SqliteStore>>::new);
1410
1411 let image_path = format!("/{image_id}/pulled");
1412 device
1413 .expect_set_property()
1414 .once()
1415 .in_sequence(&mut seq)
1416 .with(
1417 predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
1418 predicate::eq(image_path),
1419 predicate::eq(AstarteData::Boolean(false)),
1420 )
1421 .returning(|_, _, _| Ok(()));
1422
1423 let endpoint = format!("/{container_id}/status");
1424 device
1425 .expect_set_property()
1426 .once()
1427 .in_sequence(&mut seq)
1428 .with(
1429 predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1430 predicate::eq(endpoint),
1431 predicate::eq(AstarteData::from("Received")),
1432 )
1433 .returning(|_, _, _| Ok(()));
1434
1435 let endpoint = format!("/{deployment_id}/status");
1436 device
1437 .expect_set_property()
1438 .once()
1439 .in_sequence(&mut seq)
1440 .with(
1441 predicate::eq("io.edgehog.devicemanager.apps.AvailableDeployments"),
1442 predicate::eq(endpoint),
1443 predicate::eq(AstarteData::String("Stopped".to_string())),
1444 )
1445 .returning(|_, _, _| Ok(()));
1446
1447 device
1448 .expect_send_object_with_timestamp()
1449 .once()
1450 .in_sequence(&mut seq)
1451 .with(
1452 predicate::eq("io.edgehog.devicemanager.apps.DeploymentEvent"),
1453 predicate::eq(format!("/{deployment_id}")),
1454 predicate::eq(AstarteObject::from_iter([
1455 ("status".to_string(), AstarteData::from("Deleting")),
1456 ("message".to_string(), AstarteData::from("")),
1457 ])),
1458 predicate::always(),
1459 )
1460 .returning(|_, _, _, _| Ok(()));
1461
1462 let endpoint = format!("/{container_id}/status");
1463 device
1464 .expect_unset_property()
1465 .once()
1466 .in_sequence(&mut seq)
1467 .with(
1468 predicate::eq("io.edgehog.devicemanager.apps.AvailableContainers"),
1469 predicate::eq(endpoint),
1470 )
1471 .returning(|_, _| Ok(()));
1472
1473 let image_path = format!("/{image_id}/pulled");
1474 device
1475 .expect_unset_property()
1476 .once()
1477 .in_sequence(&mut seq)
1478 .with(
1479 predicate::eq("io.edgehog.devicemanager.apps.AvailableImages"),
1480 predicate::eq(image_path),
1481 )
1482 .returning(|_, _| Ok(()));
1483
1484 let endpoint = format!("/{deployment_id}/status");
1485 device
1486 .expect_unset_property()
1487 .once()
1488 .in_sequence(&mut seq)
1489 .with(
1490 predicate::eq("io.edgehog.devicemanager.apps.AvailableDeployments"),
1491 predicate::eq(endpoint),
1492 )
1493 .returning(|_, _| Ok(()));
1494
1495 let (mut service, mut handle) = mock_service(&tempdir, client, device).await;
1496
1497 let create_image_req = create_image_request_event(image_id, deployment_id, reference, "");
1498
1499 let image_req = ContainerRequest::from_event(create_image_req).unwrap();
1500
1501 let create_container_req = create_container_request_event(
1502 container_id,
1503 deployment_id,
1504 image_id,
1505 reference,
1506 &Vec::<Uuid>::new(),
1507 &Vec::<Uuid>::new(),
1508 );
1509
1510 let container_req = ContainerRequest::from_event(create_container_req).unwrap();
1511
1512 let create_deployment_req = create_deployment_request_event(
1513 &deployment_id.to_string(),
1514 &[&container_id.to_string()],
1515 );
1516
1517 let deployment_req = ContainerRequest::from_event(create_deployment_req).unwrap();
1518
1519 let delete = ContainerRequest::DeploymentCommand(DeploymentCommand {
1520 id: deployment_id,
1521 command: CommandValue::Delete,
1522 });
1523
1524 handle.on_event(image_req).await.unwrap();
1525 handle.on_event(container_req).await.unwrap();
1526 handle.on_event(deployment_req).await.unwrap();
1527 handle.on_event(delete).await.unwrap();
1528
1529 let image_event = service.events.recv().await.unwrap();
1530 service.on_event(image_event).await;
1531 let container_event = service.events.recv().await.unwrap();
1532 service.on_event(container_event).await;
1533 let deployment_event = service.events.recv().await.unwrap();
1534 service.on_event(deployment_event).await;
1535 let start_event = service.events.recv().await.unwrap();
1536 service.on_event(start_event).await;
1537 }
1538}