edgehog_device_runtime_containers/store/
volume.rs1use std::collections::HashMap;
20
21use diesel::{delete, insert_or_ignore_into, ExpressionMethods, OptionalExtension, RunQueryDsl};
22use diesel::{update, QueryDsl};
23use edgehog_store::conversions::SqlUuid;
24use edgehog_store::db::HandleError;
25use edgehog_store::models::containers::container::ContainerMissingVolume;
26use edgehog_store::models::containers::volume::{Volume, VolumeDriverOpts, VolumeStatus};
27use edgehog_store::models::QueryModel;
28use edgehog_store::schema::containers::{container_volumes, volume_driver_opts, volumes};
29use itertools::Itertools;
30use tracing::instrument;
31use uuid::Uuid;
32
33use crate::docker::volume::Volume as ContainerVolume;
34use crate::requests::volume::CreateVolume;
35use crate::resource::volume::VolumeResource;
36
37use super::{split_key_value, Result, StateStore, StoreError};
38
39impl StateStore {
40 #[instrument(skip_all, fields(%volume.id))]
42 pub(crate) async fn create_volume(&self, volume: CreateVolume) -> Result<()> {
43 let opts = Vec::<VolumeDriverOpts>::try_from(&volume)?;
44 let volume = Volume::from(volume);
45
46 self.handle
47 .for_write(move |writer| {
48 insert_or_ignore_into(volumes::table)
49 .values(&volume)
50 .execute(writer)?;
51
52 insert_or_ignore_into(volume_driver_opts::table)
53 .values(opts)
54 .execute(writer)?;
55
56 insert_or_ignore_into(container_volumes::table)
57 .values(ContainerMissingVolume::find_by_volume(&volume.id))
58 .execute(writer)?;
59
60 delete(ContainerMissingVolume::find_by_volume(&volume.id)).execute(writer)?;
61
62 Ok(())
63 })
64 .await?;
65
66 Ok(())
67 }
68
69 #[instrument(skip(self))]
71 pub(crate) async fn update_volume_status(&self, id: Uuid, status: VolumeStatus) -> Result<()> {
72 self.handle
73 .for_write(move |writer| {
74 let updated = update(Volume::find_id(&SqlUuid::new(id)))
75 .set(volumes::status.eq(status))
76 .execute(writer)?;
77
78 HandleError::check_modified(updated, 1)?;
79
80 Ok(())
81 })
82 .await?;
83
84 Ok(())
85 }
86
87 #[instrument(skip(self))]
89 pub(crate) async fn delete_volume(&self, id: Uuid) -> Result<()> {
90 self.handle
91 .for_write(move |writer| {
92 let updated = delete(Volume::find_id(&SqlUuid::new(id))).execute(writer)?;
93
94 HandleError::check_modified(updated, 1)?;
95
96 Ok(())
97 })
98 .await?;
99
100 Ok(())
101 }
102
103 #[instrument(skip(self))]
104 pub(crate) async fn load_volumes_to_publish(&mut self) -> Result<Vec<SqlUuid>> {
105 self.load_volumes_in_state(VolumeStatus::Received).await
106 }
107
108 #[instrument(skip(self))]
109 pub(crate) async fn load_volumes_in_state(&self, state: VolumeStatus) -> Result<Vec<SqlUuid>> {
110 let volumes = self
111 .handle
112 .for_read(move |reader| {
113 let volumes = volumes::table
114 .select(volumes::id)
115 .filter(volumes::status.eq(state))
116 .load::<SqlUuid>(reader)?;
117
118 Ok(volumes)
119 })
120 .await?;
121
122 Ok(volumes)
123 }
124
125 #[instrument(skip(self))]
127 pub(crate) async fn find_volume(&self, id: Uuid) -> Result<Option<VolumeResource>> {
128 let volume = self
129 .handle
130 .for_read(move |reader| {
131 let Some(volume): Option<Volume> = Volume::find_id(&SqlUuid::new(id))
132 .first(reader)
133 .optional()?
134 else {
135 return Ok(None);
136 };
137
138 let driver_opts: HashMap<String, String> = volume_driver_opts::table
139 .filter(volume_driver_opts::volume_id.eq(&SqlUuid::new(id)))
140 .load::<VolumeDriverOpts>(reader)?
141 .into_iter()
142 .map(|opt| (opt.name, opt.value))
143 .collect();
144
145 Ok(Some(VolumeResource::new(ContainerVolume::new(
146 *volume.id,
147 volume.driver,
148 driver_opts,
149 ))))
150 })
151 .await?;
152
153 Ok(volume)
154 }
155
156 #[instrument(skip(self))]
158 pub(crate) async fn check_volume_exists(&self, id: Uuid) -> Result<bool> {
159 self.handle
160 .for_read(move |reader| {
161 Volume::exists(&SqlUuid::new(id))
162 .get_result::<bool>(reader)
163 .map_err(HandleError::Query)
164 })
165 .await
166 .map_err(StoreError::Handle)
167 }
168}
169
170impl From<CreateVolume> for Volume {
171 fn from(
172 CreateVolume {
173 id,
174 deployment_id: _,
175 driver,
176 options: _,
177 }: CreateVolume,
178 ) -> Self {
179 Self {
180 id: SqlUuid::new(id),
181 status: VolumeStatus::default(),
182 driver,
183 }
184 }
185}
186
187impl TryFrom<&CreateVolume> for Vec<VolumeDriverOpts> {
188 type Error = StoreError;
189
190 fn try_from(value: &CreateVolume) -> std::result::Result<Self, Self::Error> {
191 let volume_id = SqlUuid::new(value.id);
192
193 value
194 .options
195 .iter()
196 .map(|s| {
197 split_key_value(s)
198 .map(|(name, value)| VolumeDriverOpts {
199 volume_id,
200 name: name.to_string(),
201 value: value.unwrap_or_default().to_string(),
204 })
205 .ok_or(StoreError::ParseKeyValue {
206 ctx: "volume driver options",
207 value: s.to_string(),
208 })
209 })
210 .try_collect()
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use crate::requests::ReqUuid;
217
218 use super::*;
219
220 use edgehog_store::db;
221 use pretty_assertions::assert_eq;
222 use tempfile::TempDir;
223
224 async fn find_volume(store: &StateStore, id: Uuid) -> Option<Volume> {
225 store
226 .handle
227 .for_read(move |reader| {
228 Volume::find_id(&SqlUuid::new(id))
229 .first(reader)
230 .optional()
231 .map_err(HandleError::Query)
232 })
233 .await
234 .unwrap()
235 }
236
237 impl StateStore {
238 pub(crate) async fn volume_opts(&self, volume_id: Uuid) -> Result<Vec<VolumeDriverOpts>> {
239 let volume = self
240 .handle
241 .for_read(move |reader| {
242 let volume: Vec<VolumeDriverOpts> = volume_driver_opts::table
243 .filter(volume_driver_opts::volume_id.eq(SqlUuid::new(volume_id)))
244 .load(reader)?;
245
246 Ok(volume)
247 })
248 .await?;
249
250 Ok(volume)
251 }
252 }
253
254 #[tokio::test]
255 async fn should_store() {
256 let tmp = TempDir::with_prefix("store_volume").unwrap();
257 let db_file = tmp.path().join("state.db");
258 let db_file = db_file.to_str().unwrap();
259
260 let handle = db::Handle::open(db_file).await.unwrap();
261 let store = StateStore::new(handle);
262
263 let volume_id = Uuid::new_v4();
264 let deployment_id = Uuid::new_v4();
265 let volume = CreateVolume {
266 id: ReqUuid(volume_id),
267 deployment_id: ReqUuid(deployment_id),
268 driver: "local".to_string(),
269 options: ["device=tmpfs", "o=size=100m,uid=1000", "type=tmpfs"]
270 .map(str::to_string)
271 .to_vec(),
272 };
273 store.create_volume(volume).await.unwrap();
274
275 let res = find_volume(&store, volume_id).await.unwrap();
276
277 let exp = Volume {
278 id: SqlUuid::new(volume_id),
279 status: VolumeStatus::Received,
280 driver: "local".to_string(),
281 };
282
283 assert_eq!(res, exp);
284
285 let volume_opts = store.volume_opts(volume_id).await.unwrap();
286
287 assert_eq!(
288 volume_opts,
289 vec![
290 VolumeDriverOpts {
291 volume_id: SqlUuid::new(volume_id),
292 name: "device".to_string(),
293 value: "tmpfs".to_string()
294 },
295 VolumeDriverOpts {
296 volume_id: SqlUuid::new(volume_id),
297 name: "o".to_string(),
298 value: "size=100m,uid=1000".to_string()
299 },
300 VolumeDriverOpts {
301 volume_id: SqlUuid::new(volume_id),
302 name: "type".to_string(),
303 value: "tmpfs".to_string()
304 }
305 ]
306 );
307 }
308
309 #[tokio::test]
310 async fn should_store_empty_option() {
311 let tmp = TempDir::with_prefix("store_volume").unwrap();
312 let db_file = tmp.path().join("state.db");
313 let db_file = db_file.to_str().unwrap();
314
315 let handle = db::Handle::open(db_file).await.unwrap();
316 let store = StateStore::new(handle);
317
318 let volume_id = Uuid::new_v4();
319 let deployment_id = Uuid::new_v4();
320 let volume = CreateVolume {
321 id: ReqUuid(volume_id),
322 deployment_id: ReqUuid(deployment_id),
323 driver: "local".to_string(),
324 options: [
325 "device=tmpfs",
326 "o=size=100m,uid=1000",
327 "type=tmpfs",
328 "empty=",
329 ]
330 .map(str::to_string)
331 .to_vec(),
332 };
333 store.create_volume(volume).await.unwrap();
334
335 let res = find_volume(&store, volume_id).await.unwrap();
336
337 let exp = Volume {
338 id: SqlUuid::new(volume_id),
339 status: VolumeStatus::Received,
340 driver: "local".to_string(),
341 };
342
343 assert_eq!(res, exp);
344
345 let volume_opts = store.volume_opts(volume_id).await.unwrap();
346
347 assert_eq!(
348 volume_opts,
349 vec![
350 VolumeDriverOpts {
351 volume_id: SqlUuid::new(volume_id),
352 name: "device".to_string(),
353 value: "tmpfs".to_string()
354 },
355 VolumeDriverOpts {
356 volume_id: SqlUuid::new(volume_id),
357 name: "empty".to_string(),
358 value: String::new()
359 },
360 VolumeDriverOpts {
361 volume_id: SqlUuid::new(volume_id),
362 name: "o".to_string(),
363 value: "size=100m,uid=1000".to_string()
364 },
365 VolumeDriverOpts {
366 volume_id: SqlUuid::new(volume_id),
367 name: "type".to_string(),
368 value: "tmpfs".to_string()
369 },
370 ]
371 );
372 }
373
374 #[tokio::test]
375 async fn should_update() {
376 let tmp = TempDir::with_prefix("update_volume").unwrap();
377 let db_file = tmp.path().join("state.db");
378 let db_file = db_file.to_str().unwrap();
379
380 let handle = db::Handle::open(db_file).await.unwrap();
381 let store = StateStore::new(handle);
382
383 let volume_id = Uuid::new_v4();
384 let deployment_id = Uuid::new_v4();
385 let volume = CreateVolume {
386 id: ReqUuid(volume_id),
387 deployment_id: ReqUuid(deployment_id),
388 driver: "local".to_string(),
389 options: ["device=tmpfs", "o=size=100m,uid=1000", "type=tmpfs"]
390 .map(str::to_string)
391 .to_vec(),
392 };
393 store.create_volume(volume).await.unwrap();
394
395 store
396 .update_volume_status(volume_id, VolumeStatus::Published)
397 .await
398 .unwrap();
399
400 let res = find_volume(&store, volume_id).await.unwrap();
401
402 let exp = Volume {
403 id: SqlUuid::new(volume_id),
404 status: VolumeStatus::Published,
405 driver: "local".to_string(),
406 };
407
408 assert_eq!(res, exp);
409 }
410
411 #[tokio::test]
412 async fn check_volume_exists() {
413 let tmp = TempDir::with_prefix("check_volume_exists").unwrap();
414 let db_file = tmp.path().join("state.db");
415 let db_file = db_file.to_str().unwrap();
416
417 let handle = db::Handle::open(db_file).await.unwrap();
418 let store = StateStore::new(handle);
419
420 let volume_id = Uuid::new_v4();
421 let deployment_id = Uuid::new_v4();
422 let volume = CreateVolume {
423 id: ReqUuid(volume_id),
424 deployment_id: ReqUuid(deployment_id),
425 driver: "local".to_string(),
426 options: ["device=tmpfs", "o=size=100m,uid=1000", "type=tmpfs"]
427 .map(str::to_string)
428 .to_vec(),
429 };
430 store.create_volume(volume).await.unwrap();
431
432 assert!(store.check_volume_exists(volume_id).await.unwrap());
433 }
434}