edgehog_device_runtime_containers/service/
events.rs1use edgehog_store::models::containers::deployment::DeploymentStatus;
25use tokio::sync::mpsc;
26use tracing::{error, instrument};
27use uuid::Uuid;
28
29use crate::{
30 events::deployment::{DeploymentEvent, EventStatus},
31 properties::Client,
32 requests::{
33 deployment::{DeploymentCommand, DeploymentUpdate},
34 ContainerRequest,
35 },
36 store::StateStore,
37};
38
39#[derive(Debug, thiserror::Error, displaydoc::Display)]
41pub enum EventError {
42 Disconnected,
44}
45
46use super::{CommandValue, Id, ResourceType};
47
48#[derive(Debug)]
50pub struct ServiceHandle<D> {
51 events: mpsc::UnboundedSender<ContainerEvent>,
53 device: D,
54 store: StateStore,
55}
56
57impl<D> ServiceHandle<D> {
58 pub fn new(
60 device: D,
61 store: StateStore,
62 events: mpsc::UnboundedSender<ContainerEvent>,
63 ) -> Self {
64 Self {
65 events,
66 device,
67 store,
68 }
69 }
70
71 #[instrument(skip_all)]
73 pub async fn on_event(&mut self, request: ContainerRequest) -> Result<(), EventError>
74 where
75 D: Client + Sync + 'static,
76 {
77 let event = ContainerEvent::from(&request);
78
79 let deployment_id = request.deployment_id();
80
81 self.persist_request(deployment_id, request).await;
82
83 self.events.send(event).map_err(|_err| {
84 error!("the container service disconnected");
85
86 EventError::Disconnected
87 })?;
88
89 Ok(())
90 }
91
92 #[instrument(skip_all, fields(deployment_id))]
93 async fn persist_request(&mut self, deployment_id: Uuid, request: ContainerRequest)
94 where
95 D: Client + Sync + 'static,
96 {
97 let res = match request {
98 ContainerRequest::Image(create_image) => self.store.create_image(create_image).await,
99 ContainerRequest::Volume(create_volume) => {
100 self.store.create_volume(create_volume).await
101 }
102 ContainerRequest::Network(create_network) => {
103 self.store.create_network(create_network).await
104 }
105 ContainerRequest::DeviceMapping(create_device_mapping) => {
106 self.store
107 .create_device_mapping(create_device_mapping)
108 .await
109 }
110 ContainerRequest::Container(create_container) => {
111 self.store.create_container(create_container).await
112 }
113 ContainerRequest::Deployment(create_deployment) => {
114 self.store.create_deployment(create_deployment).await
115 }
116 ContainerRequest::DeploymentCommand(DeploymentCommand { id, command }) => {
117 self.store
118 .update_deployment_status(id, command.into())
119 .await
120 }
121 ContainerRequest::DeploymentUpdate(DeploymentUpdate { from, to }) => {
122 self.store.deployment_update(from, to).await
123 }
124 };
125
126 if let Err(err) = res {
127 let error = format!("{:#}", eyre::Report::new(err));
128
129 error!(%error, "couldn't store request");
130
131 DeploymentEvent::new(EventStatus::Error, error)
132 .send(&deployment_id, &mut self.device)
133 .await;
134 }
135 }
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub enum ContainerEvent {
141 Resource {
143 resource: Id,
145 deployment: Uuid,
147 },
148 DeploymentCmd(DeploymentCommand),
150 DeploymentUpdate(DeploymentUpdate),
152 Refresh(Id),
156}
157
158impl From<&ContainerRequest> for ContainerEvent {
159 fn from(value: &ContainerRequest) -> Self {
160 match value {
161 ContainerRequest::Image(create_image) => {
162 let resource = Id::new(ResourceType::Image, create_image.id.0);
163
164 ContainerEvent::Resource {
165 resource,
166 deployment: create_image.deployment_id.0,
167 }
168 }
169 ContainerRequest::Volume(create_volume) => {
170 let resource = Id::new(ResourceType::Volume, create_volume.id.0);
171
172 ContainerEvent::Resource {
173 resource,
174 deployment: create_volume.deployment_id.0,
175 }
176 }
177 ContainerRequest::Network(create_network) => {
178 let resource = Id::new(ResourceType::Network, create_network.id.0);
179
180 ContainerEvent::Resource {
181 resource,
182 deployment: create_network.deployment_id.0,
183 }
184 }
185 ContainerRequest::DeviceMapping(create_device_mapping) => {
186 let resource = Id::new(ResourceType::DeviceMapping, create_device_mapping.id.0);
187
188 ContainerEvent::Resource {
189 resource,
190 deployment: create_device_mapping.deployment_id.0,
191 }
192 }
193 ContainerRequest::Container(create_container) => {
194 let resource = Id::new(ResourceType::Container, create_container.id.0);
195
196 ContainerEvent::Resource {
197 resource,
198 deployment: create_container.deployment_id.0,
199 }
200 }
201 ContainerRequest::Deployment(create_deployment) => {
202 let resource = Id::new(ResourceType::Deployment, create_deployment.id.0);
203
204 ContainerEvent::Resource {
205 resource,
206 deployment: create_deployment.id.0,
207 }
208 }
209 ContainerRequest::DeploymentCommand(cmd) => Self::DeploymentCmd(*cmd),
210 ContainerRequest::DeploymentUpdate(update) => Self::DeploymentUpdate(*update),
211 }
212 }
213}
214
215impl From<CommandValue> for DeploymentStatus {
216 fn from(value: CommandValue) -> Self {
217 match value {
218 CommandValue::Start => Self::Started,
219 CommandValue::Stop => Self::Stopped,
220 CommandValue::Delete => Self::Deleted,
221 }
222 }
223}