jocker_lib/
database.rs

1use std::{collections::HashSet, path::Path, sync::Arc};
2
3use chrono::{DateTime, Utc};
4use redb::{ReadableTable, TableDefinition, TypeName, Value};
5use serde::{Deserialize, Serialize};
6
7use crate::{
8    common::{Binary, Process, ProcessState, Stack},
9    error::{Error, InnerError, Result},
10};
11
12const DB_FILE: &str = "db.redb";
13const METADATA: TableDefinition<u8, Metadata> = TableDefinition::new("metadata");
14const BINARY: TableDefinition<&str, Binary> = TableDefinition::new("binary");
15const PROCESS: TableDefinition<&str, Process> = TableDefinition::new("process");
16const STACK: TableDefinition<&str, Stack> = TableDefinition::new("stack");
17
18#[derive(Debug, Default, Deserialize, Serialize)]
19struct Metadata {
20    binaries_updated_at: DateTime<Utc>,
21    config_updated_at: DateTime<Utc>,
22    default_stack: Option<String>,
23}
24
25impl Value for Metadata {
26    type SelfType<'a>
27        = Metadata
28    where
29        Self: 'a;
30
31    type AsBytes<'a>
32        = Vec<u8>
33    where
34        Self: 'a;
35
36    fn fixed_width() -> Option<usize> {
37        None
38    }
39
40    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
41    where
42        Self: 'a,
43    {
44        serde_json::from_slice(data).unwrap()
45    }
46
47    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
48    where
49        Self: 'b,
50    {
51        serde_json::to_vec(value).unwrap()
52    }
53
54    fn type_name() -> redb::TypeName {
55        TypeName::new("jocker-lib_metadata")
56    }
57}
58
59impl Value for Binary {
60    type SelfType<'a>
61        = Binary
62    where
63        Self: 'a;
64
65    type AsBytes<'a>
66        = Vec<u8>
67    where
68        Self: 'a;
69
70    fn fixed_width() -> Option<usize> {
71        None
72    }
73
74    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
75    where
76        Self: 'a,
77    {
78        serde_json::from_slice(data).unwrap()
79    }
80
81    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
82    where
83        Self: 'b,
84    {
85        serde_json::to_vec(value).unwrap()
86    }
87
88    fn type_name() -> TypeName {
89        TypeName::new("jocker-lib_binary-package")
90    }
91}
92
93impl Value for Process {
94    type SelfType<'a>
95        = Process
96    where
97        Self: 'a;
98
99    type AsBytes<'a>
100        = Vec<u8>
101    where
102        Self: 'a;
103
104    fn fixed_width() -> Option<usize> {
105        None
106    }
107
108    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
109    where
110        Self: 'a,
111    {
112        serde_json::from_slice(data).unwrap()
113    }
114
115    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
116    where
117        Self: 'b,
118    {
119        serde_json::to_vec(value).unwrap()
120    }
121
122    fn type_name() -> TypeName {
123        TypeName::new("jocker-lib_process")
124    }
125}
126
127impl Value for Stack {
128    type SelfType<'a>
129        = Stack
130    where
131        Self: 'a;
132
133    type AsBytes<'a>
134        = Vec<u8>
135    where
136        Self: 'a;
137
138    fn fixed_width() -> Option<usize> {
139        None
140    }
141
142    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
143    where
144        Self: 'a,
145    {
146        serde_json::from_slice(data).unwrap()
147    }
148
149    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
150    where
151        Self: 'b,
152    {
153        serde_json::to_vec(value).unwrap()
154    }
155
156    fn type_name() -> TypeName {
157        TypeName::new("jocker-lib_stack")
158    }
159}
160
161pub(crate) struct Database {
162    db: Arc<redb::Database>,
163}
164
165impl Database {
166    pub(crate) async fn new(database_directory_path: impl AsRef<Path>) -> Result<Self> {
167        let database_path = database_directory_path.as_ref().join(DB_FILE);
168        let mut db = redb::Database::create(database_path)?;
169        db.upgrade()?;
170        let txn = db.begin_write()?;
171        {
172            txn.open_table(METADATA)?;
173            txn.open_table(BINARY)?;
174            txn.open_table(PROCESS)?;
175            txn.open_table(STACK)?;
176        }
177        txn.commit()?;
178        Ok(Self { db: Arc::new(db) })
179    }
180
181    pub(crate) async fn get_binaries(&self) -> Result<Vec<Binary>> {
182        let txn = self.db.begin_read()?;
183        let table = txn.open_table(BINARY)?;
184
185        Ok(table
186            .iter()?
187            .map(|v| v.map(|vv| vv.1.value()))
188            .collect::<std::result::Result<_, _>>()?)
189    }
190
191    pub(crate) async fn get_binaries_updated_at(&self) -> Result<Option<DateTime<Utc>>> {
192        let txn = self.db.begin_read()?;
193        let table = txn.open_table(METADATA)?;
194
195        Ok(table.get(0)?.map(|v| v.value().binaries_updated_at))
196    }
197
198    pub(crate) async fn get_config_updated_at(&self) -> Result<Option<DateTime<Utc>>> {
199        let txn = self.db.begin_read()?;
200        let table = txn.open_table(METADATA)?;
201
202        Ok(table.get(0)?.map(|v| v.value().config_updated_at))
203    }
204
205    pub(crate) async fn get_default_stack(&self) -> Result<Option<String>> {
206        let txn = self.db.begin_read()?;
207        let table = txn.open_table(METADATA)?;
208
209        Ok(table.get(0)?.and_then(|v| v.value().default_stack))
210    }
211
212    pub(crate) async fn get_processes(&self) -> Result<Vec<Process>> {
213        let txn = self.db.begin_read()?;
214        let table = txn.open_table(PROCESS)?;
215
216        Ok(table
217            .iter()?
218            .map(|v| v.map(|vv| vv.1.value()))
219            .collect::<std::result::Result<_, _>>()?)
220    }
221
222    pub(crate) async fn get_stack(&self, stack: &str) -> Result<Stack> {
223        let txn = self.db.begin_read()?;
224        let table = txn.open_table(STACK)?;
225
226        table
227            .get(stack)?
228            .map(|v| v.value())
229            .ok_or_else(|| Error::new(InnerError::StackNotFound(stack.to_owned())))
230    }
231
232    pub(crate) async fn set_binaries(&self, binaries: &[Binary]) -> Result<()> {
233        let txn = self.db.begin_write()?;
234        {
235            txn.delete_table(BINARY)?;
236            let mut table = txn.open_table(BINARY)?;
237            for bin in binaries {
238                table.insert(bin.name.as_str(), bin)?;
239            }
240        }
241        txn.commit()?;
242        Ok(())
243    }
244
245    pub(crate) async fn set_binaries_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
246        let txn = self.db.begin_write()?;
247        {
248            let mut metadata = txn
249                .open_table(METADATA)?
250                .get(0)?
251                .map(|v| v.value())
252                .unwrap_or_default();
253            metadata.binaries_updated_at = date;
254            let mut table = txn.open_table(METADATA)?;
255            table.insert(0, metadata)?;
256        }
257        txn.commit()?;
258        Ok(())
259    }
260
261    pub(crate) async fn set_config_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
262        let txn = self.db.begin_write()?;
263        {
264            let mut metadata = txn
265                .open_table(METADATA)?
266                .get(0)?
267                .map(|v| v.value())
268                .unwrap_or_default();
269            metadata.config_updated_at = date;
270            let mut table = txn.open_table(METADATA)?;
271            table.insert(0, metadata)?;
272        }
273        txn.commit()?;
274        Ok(())
275    }
276
277    pub(crate) async fn set_default_stack(&self, stack: &Option<String>) -> Result<()> {
278        if let Some(stack) = stack {
279            self.get_stack(stack).await?;
280        }
281        let txn = self.db.begin_write()?;
282        {
283            let mut metadata = txn
284                .open_table(METADATA)?
285                .get(0)?
286                .map(|v| v.value())
287                .unwrap_or_default();
288            metadata.default_stack = stack.clone();
289            let mut table = txn.open_table(METADATA)?;
290            table.insert(0, metadata)?;
291        }
292        txn.commit()?;
293        Ok(())
294    }
295
296    pub(crate) async fn set_process_pid(
297        &self,
298        process_name: &str,
299        pid: Option<usize>,
300    ) -> Result<()> {
301        let txn = self.db.begin_write()?;
302        {
303            let mut process = txn
304                .open_table(PROCESS)?
305                .get(process_name)?
306                .map(|v| v.value())
307                .unwrap_or_default();
308            process.pid = pid;
309            let mut table = txn.open_table(PROCESS)?;
310            table.insert(process_name, process)?;
311        }
312        txn.commit()?;
313        Ok(())
314    }
315
316    pub(crate) async fn set_process_state(
317        &self,
318        process_name: &str,
319        state: ProcessState,
320    ) -> Result<()> {
321        let txn = self.db.begin_write()?;
322        {
323            let mut process = txn
324                .open_table(PROCESS)?
325                .get(process_name)?
326                .map(|v| v.value())
327                .unwrap_or_default();
328            process.state = state;
329            let mut table = txn.open_table(PROCESS)?;
330            table.insert(process_name, process)?;
331        }
332        txn.commit()?;
333        Ok(())
334    }
335
336    pub(crate) async fn set_processes(&self, processes: &[Process]) -> Result<()> {
337        let txn = self.db.begin_write()?;
338        {
339            txn.delete_table(PROCESS)?;
340            let mut table = txn.open_table(PROCESS)?;
341            for process in processes {
342                table.insert(process.name.as_str(), process)?;
343            }
344        }
345        txn.commit()?;
346        Ok(())
347    }
348
349    pub(crate) async fn set_stacks(&self, stacks: &[Stack]) -> Result<()> {
350        let processes: HashSet<String> = self
351            .get_processes()
352            .await?
353            .iter()
354            .map(|p| p.name.to_owned())
355            .collect();
356
357        // Lock after getting processes to avoid deadlock
358        let txn = self.db.begin_write()?;
359        {
360            txn.delete_table(STACK)?;
361            let mut table = txn.open_table(STACK)?;
362            for stack in stacks {
363                let stack_processes = stack.processes.iter();
364                let inherited_processes = stack.inherited_processes.iter();
365                let missing_processes: Vec<String> = stack_processes
366                    .clone()
367                    .chain(inherited_processes.clone())
368                    .filter(|&stack_process| !processes.contains(stack_process))
369                    .cloned()
370                    .collect();
371                if !missing_processes.is_empty() {
372                    return Err(Error::new(InnerError::ProcessNotFound(missing_processes)));
373                }
374                table.insert(stack.name.as_str(), stack)?;
375            }
376        }
377        txn.commit()?;
378        Ok(())
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use std::{collections::HashMap, thread::sleep, time::Duration};
385
386    use tempfile::{tempdir, TempDir};
387    use url::Url;
388
389    use super::*;
390
391    #[tokio::test]
392    async fn get_set_binaries() {
393        let (dir, db) = setup().await.unwrap();
394        let base_url = format!("file://{}", dir.path().to_str().unwrap());
395        let source_bins = [
396            Binary {
397                name: "foo".to_owned(),
398                id: Url::parse(&format!("{base_url}/foo")).unwrap(),
399            },
400            Binary {
401                name: "bar".to_owned(),
402                id: Url::parse(&format!("{base_url}/bar")).unwrap(),
403            },
404            Binary {
405                name: "baz".to_owned(),
406                id: Url::parse(&format!("{base_url}/baz")).unwrap(),
407            },
408        ];
409
410        let bins = db.get_binaries().await.unwrap();
411        assert_eq!(bins.len(), 0);
412
413        db.set_binaries(&source_bins[0..1]).await.unwrap();
414        let bins = db.get_binaries().await.unwrap();
415        assert_eq!(bins.len(), 1);
416        assert_eq!(bins[0].name, source_bins[0].name);
417        assert_eq!(bins[0].id, source_bins[0].id);
418
419        db.set_binaries(&source_bins[1..2]).await.unwrap();
420        let bins = db.get_binaries().await.unwrap();
421        assert_eq!(bins.len(), 1);
422        assert_eq!(bins[0].name, source_bins[1].name);
423        assert_eq!(bins[0].id, source_bins[1].id);
424
425        db.set_binaries(&source_bins).await.unwrap();
426        let bins = db.get_binaries().await.unwrap();
427        assert_eq!(bins.len(), 3);
428        // Test order
429        assert_eq!(bins[0].name, source_bins[1].name);
430        assert_eq!(bins[0].id, source_bins[1].id);
431        assert_eq!(bins[1].name, source_bins[2].name);
432        assert_eq!(bins[1].id, source_bins[2].id);
433        assert_eq!(bins[2].name, source_bins[0].name);
434        assert_eq!(bins[2].id, source_bins[0].id);
435    }
436
437    #[tokio::test]
438    async fn get_set_binaries_updated_at() {
439        let (dir, db) = setup().await.unwrap();
440
441        let date = db.get_binaries_updated_at().await.unwrap();
442        assert!(date.is_none());
443        sleep(Duration::from_millis(100));
444
445        let now = Utc::now();
446        db.set_binaries_updated_at(now).await.unwrap();
447        let date = db.get_binaries_updated_at().await.unwrap();
448        assert_eq!(date, Some(now));
449
450        drop(dir);
451    }
452
453    #[tokio::test]
454    async fn get_set_config_updated_at() {
455        let (dir, db) = setup().await.unwrap();
456
457        let date = db.get_config_updated_at().await.unwrap();
458        assert!(date.is_none());
459
460        let now = Utc::now();
461        db.set_config_updated_at(now).await.unwrap();
462        let date = db.get_config_updated_at().await.unwrap();
463        assert_eq!(date, Some(now));
464
465        drop(dir);
466    }
467
468    #[tokio::test]
469    async fn get_set_default_stack() {
470        let (dir, db) = setup().await.unwrap();
471
472        let stack = db.get_default_stack().await.unwrap();
473        assert!(stack.is_none());
474
475        let default_stack = None;
476        db.set_default_stack(&default_stack).await.unwrap();
477        let stack = db.get_default_stack().await.unwrap();
478        assert_eq!(stack, default_stack);
479
480        let default_stack = Some("foo".to_owned());
481        let err = db.set_default_stack(&default_stack).await;
482        assert!(err.is_err());
483
484        let processes = test_processes();
485        db.set_processes(&processes).await.unwrap();
486        let stacks = test_stacks();
487        db.set_stacks(&stacks).await.unwrap();
488        let default_stack = Some("foo".to_owned());
489        db.set_default_stack(&default_stack).await.unwrap();
490        let stack = db.get_default_stack().await.unwrap();
491        assert_eq!(stack, default_stack);
492
493        let default_stack = None;
494        db.set_default_stack(&default_stack).await.unwrap();
495        let stack = db.get_default_stack().await.unwrap();
496        assert_eq!(stack, default_stack);
497
498        drop(dir);
499    }
500
501    #[tokio::test]
502    async fn get_set_process_properties() {
503        let (dir, db) = setup().await.unwrap();
504
505        let processes = db.get_processes().await.unwrap();
506        assert!(processes.is_empty());
507
508        let expected_processes = test_processes();
509        db.set_processes(&expected_processes).await.unwrap();
510        db.set_process_pid(&expected_processes[0].name, Some(42))
511            .await
512            .unwrap();
513        db.set_process_state(&expected_processes[0].name, ProcessState::Building)
514            .await
515            .unwrap();
516        let processes = db.get_processes().await.unwrap();
517        assert_eq!(processes.len(), 2);
518        assert_eq!(processes[0], expected_processes[1]);
519        assert_eq!(processes[1].name, expected_processes[0].name);
520        assert_eq!(processes[1].pid(), &Some(42));
521        assert_eq!(processes[1].state, ProcessState::Building);
522
523        drop(dir);
524    }
525
526    #[tokio::test]
527    async fn get_set_processes() {
528        let (dir, db) = setup().await.unwrap();
529
530        let processes = db.get_processes().await.unwrap();
531        assert!(processes.is_empty());
532
533        let expected_processes = test_processes();
534        db.set_processes(&expected_processes).await.unwrap();
535        let processes = db.get_processes().await.unwrap();
536        assert_eq!(processes.len(), 2);
537        assert_eq!(processes[0], expected_processes[1]);
538        assert_eq!(processes[1], expected_processes[0]);
539
540        db.set_processes(&expected_processes[1..=1]).await.unwrap();
541        let processes = db.get_processes().await.unwrap();
542        assert_eq!(processes.len(), 1);
543        assert_eq!(processes[0], expected_processes[1]);
544
545        drop(dir);
546    }
547
548    #[tokio::test]
549    async fn get_set_stacks() {
550        let (dir, db) = setup().await.unwrap();
551
552        let stack = db.get_stack("foo").await.unwrap_err();
553        assert!(matches!(stack.inner_error, InnerError::StackNotFound(_)));
554
555        let expected_processes = test_processes();
556        db.set_processes(&expected_processes).await.unwrap();
557        let expected_stacks = test_stacks();
558        db.set_stacks(&expected_stacks).await.unwrap();
559        let stack = db.get_stack("foo").await.unwrap();
560        assert_eq!(&stack.name, "foo");
561        assert_eq!(stack.processes, HashSet::from(["bar".to_owned()]));
562        assert_eq!(stack.inherited_processes, HashSet::new());
563        let stack = db.get_stack("baz").await.unwrap();
564        assert_eq!(&stack.name, "baz");
565        assert_eq!(stack.processes, HashSet::from(["foo".to_owned()]));
566        assert_eq!(stack.inherited_processes, HashSet::from(["bar".to_owned()]));
567
568        db.set_processes(&expected_processes[1..=1]).await.unwrap();
569        let processes = db.get_processes().await.unwrap();
570        assert_eq!(processes.len(), 1);
571        assert_eq!(processes[0], expected_processes[1]);
572
573        drop(dir);
574    }
575
576    async fn setup() -> Result<(TempDir, Database)> {
577        let dir = tempdir()?;
578        let db = Database::new(&dir).await?;
579        Ok((dir, db))
580    }
581
582    fn test_processes() -> Vec<Process> {
583        vec![
584            Process {
585                name: "foo".to_owned(),
586                binary: "foo".to_owned(),
587                state: ProcessState::Stopped,
588                pid: None,
589                args: Vec::new(),
590                cargo_args: Vec::new(),
591                env: HashMap::new(),
592            },
593            Process {
594                name: "bar".to_owned(),
595                binary: "bar".to_owned(),
596                state: ProcessState::Stopped,
597                pid: None,
598                args: Vec::new(),
599                cargo_args: Vec::new(),
600                env: HashMap::new(),
601            },
602        ]
603    }
604
605    fn test_stacks() -> Vec<Stack> {
606        vec![
607            Stack {
608                name: "foo".to_owned(),
609                processes: HashSet::from(["bar".to_owned()]),
610                inherited_processes: Default::default(),
611            },
612            Stack {
613                name: "baz".to_owned(),
614                processes: HashSet::from(["foo".to_owned()]),
615                inherited_processes: HashSet::from(["bar".to_owned()]),
616            },
617        ]
618    }
619}