Skip to main content

edgehog_device_runtime_containers/store/
volume.rs

1// This file is part of Edgehog.
2//
3// Copyright 2025 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19use std::collections::HashMap;
20
21use diesel::{delete, insert_or_ignore_into, ExpressionMethods, OptionalExtension, RunQueryDsl};
22use diesel::{update, QueryDsl};
23use edgehog_store::conversions::SqlUuid;
24use edgehog_store::db::HandleError;
25use edgehog_store::models::containers::container::ContainerMissingVolume;
26use edgehog_store::models::containers::volume::{Volume, VolumeDriverOpts, VolumeStatus};
27use edgehog_store::models::QueryModel;
28use edgehog_store::schema::containers::{container_volumes, volume_driver_opts, volumes};
29use itertools::Itertools;
30use tracing::instrument;
31use uuid::Uuid;
32
33use crate::docker::volume::Volume as ContainerVolume;
34use crate::requests::volume::CreateVolume;
35use crate::resource::volume::VolumeResource;
36
37use super::{split_key_value, Result, StateStore, StoreError};
38
39impl StateStore {
40    /// Stores the volume received from the CreateRequest
41    #[instrument(skip_all, fields(%volume.id))]
42    pub(crate) async fn create_volume(&self, volume: CreateVolume) -> Result<()> {
43        let opts = Vec::<VolumeDriverOpts>::try_from(&volume)?;
44        let volume = Volume::from(volume);
45
46        self.handle
47            .for_write(move |writer| {
48                insert_or_ignore_into(volumes::table)
49                    .values(&volume)
50                    .execute(writer)?;
51
52                insert_or_ignore_into(volume_driver_opts::table)
53                    .values(opts)
54                    .execute(writer)?;
55
56                insert_or_ignore_into(container_volumes::table)
57                    .values(ContainerMissingVolume::find_by_volume(&volume.id))
58                    .execute(writer)?;
59
60                delete(ContainerMissingVolume::find_by_volume(&volume.id)).execute(writer)?;
61
62                Ok(())
63            })
64            .await?;
65
66        Ok(())
67    }
68
69    /// Updates the status of a volume
70    #[instrument(skip(self))]
71    pub(crate) async fn update_volume_status(&self, id: Uuid, status: VolumeStatus) -> Result<()> {
72        self.handle
73            .for_write(move |writer| {
74                let updated = update(Volume::find_id(&SqlUuid::new(id)))
75                    .set(volumes::status.eq(status))
76                    .execute(writer)?;
77
78                HandleError::check_modified(updated, 1)?;
79
80                Ok(())
81            })
82            .await?;
83
84        Ok(())
85    }
86
87    /// Delete the [`Volume`] with the given [`id`](Uuid).
88    #[instrument(skip(self))]
89    pub(crate) async fn delete_volume(&self, id: Uuid) -> Result<()> {
90        self.handle
91            .for_write(move |writer| {
92                let updated = delete(Volume::find_id(&SqlUuid::new(id))).execute(writer)?;
93
94                HandleError::check_modified(updated, 1)?;
95
96                Ok(())
97            })
98            .await?;
99
100        Ok(())
101    }
102
103    #[instrument(skip(self))]
104    pub(crate) async fn load_volumes_to_publish(&mut self) -> Result<Vec<SqlUuid>> {
105        self.load_volumes_in_state(VolumeStatus::Received).await
106    }
107
108    #[instrument(skip(self))]
109    pub(crate) async fn load_volumes_in_state(&self, state: VolumeStatus) -> Result<Vec<SqlUuid>> {
110        let volumes = self
111            .handle
112            .for_read(move |reader| {
113                let volumes = volumes::table
114                    .select(volumes::id)
115                    .filter(volumes::status.eq(state))
116                    .load::<SqlUuid>(reader)?;
117
118                Ok(volumes)
119            })
120            .await?;
121
122        Ok(volumes)
123    }
124
125    /// Fetches an volume by id
126    #[instrument(skip(self))]
127    pub(crate) async fn find_volume(&self, id: Uuid) -> Result<Option<VolumeResource>> {
128        let volume = self
129            .handle
130            .for_read(move |reader| {
131                let Some(volume): Option<Volume> = Volume::find_id(&SqlUuid::new(id))
132                    .first(reader)
133                    .optional()?
134                else {
135                    return Ok(None);
136                };
137
138                let driver_opts: HashMap<String, String> = volume_driver_opts::table
139                    .filter(volume_driver_opts::volume_id.eq(&SqlUuid::new(id)))
140                    .load::<VolumeDriverOpts>(reader)?
141                    .into_iter()
142                    .map(|opt| (opt.name, opt.value))
143                    .collect();
144
145                Ok(Some(VolumeResource::new(ContainerVolume::new(
146                    *volume.id,
147                    volume.driver,
148                    driver_opts,
149                ))))
150            })
151            .await?;
152
153        Ok(volume)
154    }
155
156    /// Fetches an volume by id
157    #[instrument(skip(self))]
158    pub(crate) async fn check_volume_exists(&self, id: Uuid) -> Result<bool> {
159        self.handle
160            .for_read(move |reader| {
161                Volume::exists(&SqlUuid::new(id))
162                    .get_result::<bool>(reader)
163                    .map_err(HandleError::Query)
164            })
165            .await
166            .map_err(StoreError::Handle)
167    }
168}
169
170impl From<CreateVolume> for Volume {
171    fn from(
172        CreateVolume {
173            id,
174            deployment_id: _,
175            driver,
176            options: _,
177        }: CreateVolume,
178    ) -> Self {
179        Self {
180            id: SqlUuid::new(id),
181            status: VolumeStatus::default(),
182            driver,
183        }
184    }
185}
186
187impl TryFrom<&CreateVolume> for Vec<VolumeDriverOpts> {
188    type Error = StoreError;
189
190    fn try_from(value: &CreateVolume) -> std::result::Result<Self, Self::Error> {
191        let volume_id = SqlUuid::new(value.id);
192
193        value
194            .options
195            .iter()
196            .map(|s| {
197                split_key_value(s)
198                    .map(|(name, value)| VolumeDriverOpts {
199                        volume_id,
200                        name: name.to_string(),
201                        // NOTE: Default to empty string, this is a sane approach even if the
202                        //       behaviour is not directly specified in the Docker docs.
203                        value: value.unwrap_or_default().to_string(),
204                    })
205                    .ok_or(StoreError::ParseKeyValue {
206                        ctx: "volume driver options",
207                        value: s.to_string(),
208                    })
209            })
210            .try_collect()
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use crate::requests::ReqUuid;
217
218    use super::*;
219
220    use edgehog_store::db;
221    use pretty_assertions::assert_eq;
222    use tempfile::TempDir;
223
224    async fn find_volume(store: &StateStore, id: Uuid) -> Option<Volume> {
225        store
226            .handle
227            .for_read(move |reader| {
228                Volume::find_id(&SqlUuid::new(id))
229                    .first(reader)
230                    .optional()
231                    .map_err(HandleError::Query)
232            })
233            .await
234            .unwrap()
235    }
236
237    impl StateStore {
238        pub(crate) async fn volume_opts(&self, volume_id: Uuid) -> Result<Vec<VolumeDriverOpts>> {
239            let volume = self
240                .handle
241                .for_read(move |reader| {
242                    let volume: Vec<VolumeDriverOpts> = volume_driver_opts::table
243                        .filter(volume_driver_opts::volume_id.eq(SqlUuid::new(volume_id)))
244                        .load(reader)?;
245
246                    Ok(volume)
247                })
248                .await?;
249
250            Ok(volume)
251        }
252    }
253
254    #[tokio::test]
255    async fn should_store() {
256        let tmp = TempDir::with_prefix("store_volume").unwrap();
257        let db_file = tmp.path().join("state.db");
258        let db_file = db_file.to_str().unwrap();
259
260        let handle = db::Handle::open(db_file).await.unwrap();
261        let store = StateStore::new(handle);
262
263        let volume_id = Uuid::new_v4();
264        let deployment_id = Uuid::new_v4();
265        let volume = CreateVolume {
266            id: ReqUuid(volume_id),
267            deployment_id: ReqUuid(deployment_id),
268            driver: "local".to_string(),
269            options: ["device=tmpfs", "o=size=100m,uid=1000", "type=tmpfs"]
270                .map(str::to_string)
271                .to_vec(),
272        };
273        store.create_volume(volume).await.unwrap();
274
275        let res = find_volume(&store, volume_id).await.unwrap();
276
277        let exp = Volume {
278            id: SqlUuid::new(volume_id),
279            status: VolumeStatus::Received,
280            driver: "local".to_string(),
281        };
282
283        assert_eq!(res, exp);
284
285        let volume_opts = store.volume_opts(volume_id).await.unwrap();
286
287        assert_eq!(
288            volume_opts,
289            vec![
290                VolumeDriverOpts {
291                    volume_id: SqlUuid::new(volume_id),
292                    name: "device".to_string(),
293                    value: "tmpfs".to_string()
294                },
295                VolumeDriverOpts {
296                    volume_id: SqlUuid::new(volume_id),
297                    name: "o".to_string(),
298                    value: "size=100m,uid=1000".to_string()
299                },
300                VolumeDriverOpts {
301                    volume_id: SqlUuid::new(volume_id),
302                    name: "type".to_string(),
303                    value: "tmpfs".to_string()
304                }
305            ]
306        );
307    }
308
309    #[tokio::test]
310    async fn should_store_empty_option() {
311        let tmp = TempDir::with_prefix("store_volume").unwrap();
312        let db_file = tmp.path().join("state.db");
313        let db_file = db_file.to_str().unwrap();
314
315        let handle = db::Handle::open(db_file).await.unwrap();
316        let store = StateStore::new(handle);
317
318        let volume_id = Uuid::new_v4();
319        let deployment_id = Uuid::new_v4();
320        let volume = CreateVolume {
321            id: ReqUuid(volume_id),
322            deployment_id: ReqUuid(deployment_id),
323            driver: "local".to_string(),
324            options: [
325                "device=tmpfs",
326                "o=size=100m,uid=1000",
327                "type=tmpfs",
328                "empty=",
329            ]
330            .map(str::to_string)
331            .to_vec(),
332        };
333        store.create_volume(volume).await.unwrap();
334
335        let res = find_volume(&store, volume_id).await.unwrap();
336
337        let exp = Volume {
338            id: SqlUuid::new(volume_id),
339            status: VolumeStatus::Received,
340            driver: "local".to_string(),
341        };
342
343        assert_eq!(res, exp);
344
345        let volume_opts = store.volume_opts(volume_id).await.unwrap();
346
347        assert_eq!(
348            volume_opts,
349            vec![
350                VolumeDriverOpts {
351                    volume_id: SqlUuid::new(volume_id),
352                    name: "device".to_string(),
353                    value: "tmpfs".to_string()
354                },
355                VolumeDriverOpts {
356                    volume_id: SqlUuid::new(volume_id),
357                    name: "empty".to_string(),
358                    value: String::new()
359                },
360                VolumeDriverOpts {
361                    volume_id: SqlUuid::new(volume_id),
362                    name: "o".to_string(),
363                    value: "size=100m,uid=1000".to_string()
364                },
365                VolumeDriverOpts {
366                    volume_id: SqlUuid::new(volume_id),
367                    name: "type".to_string(),
368                    value: "tmpfs".to_string()
369                },
370            ]
371        );
372    }
373
374    #[tokio::test]
375    async fn should_update() {
376        let tmp = TempDir::with_prefix("update_volume").unwrap();
377        let db_file = tmp.path().join("state.db");
378        let db_file = db_file.to_str().unwrap();
379
380        let handle = db::Handle::open(db_file).await.unwrap();
381        let store = StateStore::new(handle);
382
383        let volume_id = Uuid::new_v4();
384        let deployment_id = Uuid::new_v4();
385        let volume = CreateVolume {
386            id: ReqUuid(volume_id),
387            deployment_id: ReqUuid(deployment_id),
388            driver: "local".to_string(),
389            options: ["device=tmpfs", "o=size=100m,uid=1000", "type=tmpfs"]
390                .map(str::to_string)
391                .to_vec(),
392        };
393        store.create_volume(volume).await.unwrap();
394
395        store
396            .update_volume_status(volume_id, VolumeStatus::Published)
397            .await
398            .unwrap();
399
400        let res = find_volume(&store, volume_id).await.unwrap();
401
402        let exp = Volume {
403            id: SqlUuid::new(volume_id),
404            status: VolumeStatus::Published,
405            driver: "local".to_string(),
406        };
407
408        assert_eq!(res, exp);
409    }
410
411    #[tokio::test]
412    async fn check_volume_exists() {
413        let tmp = TempDir::with_prefix("check_volume_exists").unwrap();
414        let db_file = tmp.path().join("state.db");
415        let db_file = db_file.to_str().unwrap();
416
417        let handle = db::Handle::open(db_file).await.unwrap();
418        let store = StateStore::new(handle);
419
420        let volume_id = Uuid::new_v4();
421        let deployment_id = Uuid::new_v4();
422        let volume = CreateVolume {
423            id: ReqUuid(volume_id),
424            deployment_id: ReqUuid(deployment_id),
425            driver: "local".to_string(),
426            options: ["device=tmpfs", "o=size=100m,uid=1000", "type=tmpfs"]
427                .map(str::to_string)
428                .to_vec(),
429        };
430        store.create_volume(volume).await.unwrap();
431
432        assert!(store.check_volume_exists(volume_id).await.unwrap());
433    }
434}