1use std::{collections::HashSet, path::Path, sync::Arc};
2
3use chrono::{DateTime, Utc};
4use redb::{ReadableTable, TableDefinition, TypeName, Value};
5use serde::{Deserialize, Serialize};
6
7use crate::{
8 common::{Binary, Process, ProcessState, Stack},
9 error::{Error, InnerError, Result},
10};
11
12const DB_FILE: &str = "db.redb";
13const METADATA: TableDefinition<u8, Metadata> = TableDefinition::new("metadata");
14const BINARY: TableDefinition<&str, Binary> = TableDefinition::new("binary");
15const PROCESS: TableDefinition<&str, Process> = TableDefinition::new("process");
16const STACK: TableDefinition<&str, Stack> = TableDefinition::new("stack");
17
18#[derive(Debug, Default, Deserialize, Serialize)]
19struct Metadata {
20 binaries_updated_at: DateTime<Utc>,
21 config_updated_at: DateTime<Utc>,
22 default_stack: Option<String>,
23}
24
25impl Value for Metadata {
26 type SelfType<'a>
27 = Metadata
28 where
29 Self: 'a;
30
31 type AsBytes<'a>
32 = Vec<u8>
33 where
34 Self: 'a;
35
36 fn fixed_width() -> Option<usize> {
37 None
38 }
39
40 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
41 where
42 Self: 'a,
43 {
44 serde_json::from_slice(data).unwrap()
45 }
46
47 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
48 where
49 Self: 'b,
50 {
51 serde_json::to_vec(value).unwrap()
52 }
53
54 fn type_name() -> redb::TypeName {
55 TypeName::new("jocker-lib_metadata")
56 }
57}
58
59impl Value for Binary {
60 type SelfType<'a>
61 = Binary
62 where
63 Self: 'a;
64
65 type AsBytes<'a>
66 = Vec<u8>
67 where
68 Self: 'a;
69
70 fn fixed_width() -> Option<usize> {
71 None
72 }
73
74 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
75 where
76 Self: 'a,
77 {
78 serde_json::from_slice(data).unwrap()
79 }
80
81 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
82 where
83 Self: 'b,
84 {
85 serde_json::to_vec(value).unwrap()
86 }
87
88 fn type_name() -> TypeName {
89 TypeName::new("jocker-lib_binary-package")
90 }
91}
92
93impl Value for Process {
94 type SelfType<'a>
95 = Process
96 where
97 Self: 'a;
98
99 type AsBytes<'a>
100 = Vec<u8>
101 where
102 Self: 'a;
103
104 fn fixed_width() -> Option<usize> {
105 None
106 }
107
108 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
109 where
110 Self: 'a,
111 {
112 serde_json::from_slice(data).unwrap()
113 }
114
115 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
116 where
117 Self: 'b,
118 {
119 serde_json::to_vec(value).unwrap()
120 }
121
122 fn type_name() -> TypeName {
123 TypeName::new("jocker-lib_process")
124 }
125}
126
127impl Value for Stack {
128 type SelfType<'a>
129 = Stack
130 where
131 Self: 'a;
132
133 type AsBytes<'a>
134 = Vec<u8>
135 where
136 Self: 'a;
137
138 fn fixed_width() -> Option<usize> {
139 None
140 }
141
142 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
143 where
144 Self: 'a,
145 {
146 serde_json::from_slice(data).unwrap()
147 }
148
149 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
150 where
151 Self: 'b,
152 {
153 serde_json::to_vec(value).unwrap()
154 }
155
156 fn type_name() -> TypeName {
157 TypeName::new("jocker-lib_stack")
158 }
159}
160
161pub(crate) struct Database {
162 db: Arc<redb::Database>,
163}
164
165impl Database {
166 pub(crate) async fn new(database_directory_path: impl AsRef<Path>) -> Result<Self> {
167 let database_path = database_directory_path.as_ref().join(DB_FILE);
168 let mut db = redb::Database::create(database_path)?;
169 db.upgrade()?;
170 let txn = db.begin_write()?;
171 {
172 txn.open_table(METADATA)?;
173 txn.open_table(BINARY)?;
174 txn.open_table(PROCESS)?;
175 txn.open_table(STACK)?;
176 }
177 txn.commit()?;
178 Ok(Self { db: Arc::new(db) })
179 }
180
181 pub(crate) async fn get_binaries(&self) -> Result<Vec<Binary>> {
182 let txn = self.db.begin_read()?;
183 let table = txn.open_table(BINARY)?;
184
185 Ok(table
186 .iter()?
187 .map(|v| v.map(|vv| vv.1.value()))
188 .collect::<std::result::Result<_, _>>()?)
189 }
190
191 pub(crate) async fn get_binaries_updated_at(&self) -> Result<Option<DateTime<Utc>>> {
192 let txn = self.db.begin_read()?;
193 let table = txn.open_table(METADATA)?;
194
195 Ok(table.get(0)?.map(|v| v.value().binaries_updated_at))
196 }
197
198 pub(crate) async fn get_config_updated_at(&self) -> Result<Option<DateTime<Utc>>> {
199 let txn = self.db.begin_read()?;
200 let table = txn.open_table(METADATA)?;
201
202 Ok(table.get(0)?.map(|v| v.value().config_updated_at))
203 }
204
205 pub(crate) async fn get_default_stack(&self) -> Result<Option<String>> {
206 let txn = self.db.begin_read()?;
207 let table = txn.open_table(METADATA)?;
208
209 Ok(table.get(0)?.and_then(|v| v.value().default_stack))
210 }
211
212 pub(crate) async fn get_processes(&self) -> Result<Vec<Process>> {
213 let txn = self.db.begin_read()?;
214 let table = txn.open_table(PROCESS)?;
215
216 Ok(table
217 .iter()?
218 .map(|v| v.map(|vv| vv.1.value()))
219 .collect::<std::result::Result<_, _>>()?)
220 }
221
222 pub(crate) async fn get_stack(&self, stack: &str) -> Result<Stack> {
223 let txn = self.db.begin_read()?;
224 let table = txn.open_table(STACK)?;
225
226 table
227 .get(stack)?
228 .map(|v| v.value())
229 .ok_or_else(|| Error::new(InnerError::StackNotFound(stack.to_owned())))
230 }
231
232 pub(crate) async fn set_binaries(&self, binaries: &[Binary]) -> Result<()> {
233 let txn = self.db.begin_write()?;
234 {
235 txn.delete_table(BINARY)?;
236 let mut table = txn.open_table(BINARY)?;
237 for bin in binaries {
238 table.insert(bin.name.as_str(), bin)?;
239 }
240 }
241 txn.commit()?;
242 Ok(())
243 }
244
245 pub(crate) async fn set_binaries_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
246 let txn = self.db.begin_write()?;
247 {
248 let mut metadata = txn
249 .open_table(METADATA)?
250 .get(0)?
251 .map(|v| v.value())
252 .unwrap_or_default();
253 metadata.binaries_updated_at = date;
254 let mut table = txn.open_table(METADATA)?;
255 table.insert(0, metadata)?;
256 }
257 txn.commit()?;
258 Ok(())
259 }
260
261 pub(crate) async fn set_config_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
262 let txn = self.db.begin_write()?;
263 {
264 let mut metadata = txn
265 .open_table(METADATA)?
266 .get(0)?
267 .map(|v| v.value())
268 .unwrap_or_default();
269 metadata.config_updated_at = date;
270 let mut table = txn.open_table(METADATA)?;
271 table.insert(0, metadata)?;
272 }
273 txn.commit()?;
274 Ok(())
275 }
276
277 pub(crate) async fn set_default_stack(&self, stack: &Option<String>) -> Result<()> {
278 if let Some(stack) = stack {
279 self.get_stack(stack).await?;
280 }
281 let txn = self.db.begin_write()?;
282 {
283 let mut metadata = txn
284 .open_table(METADATA)?
285 .get(0)?
286 .map(|v| v.value())
287 .unwrap_or_default();
288 metadata.default_stack = stack.clone();
289 let mut table = txn.open_table(METADATA)?;
290 table.insert(0, metadata)?;
291 }
292 txn.commit()?;
293 Ok(())
294 }
295
296 pub(crate) async fn set_process_pid(
297 &self,
298 process_name: &str,
299 pid: Option<usize>,
300 ) -> Result<()> {
301 let txn = self.db.begin_write()?;
302 {
303 let mut process = txn
304 .open_table(PROCESS)?
305 .get(process_name)?
306 .map(|v| v.value())
307 .unwrap_or_default();
308 process.pid = pid;
309 let mut table = txn.open_table(PROCESS)?;
310 table.insert(process_name, process)?;
311 }
312 txn.commit()?;
313 Ok(())
314 }
315
316 pub(crate) async fn set_process_state(
317 &self,
318 process_name: &str,
319 state: ProcessState,
320 ) -> Result<()> {
321 let txn = self.db.begin_write()?;
322 {
323 let mut process = txn
324 .open_table(PROCESS)?
325 .get(process_name)?
326 .map(|v| v.value())
327 .unwrap_or_default();
328 process.state = state;
329 let mut table = txn.open_table(PROCESS)?;
330 table.insert(process_name, process)?;
331 }
332 txn.commit()?;
333 Ok(())
334 }
335
336 pub(crate) async fn set_processes(&self, processes: &[Process]) -> Result<()> {
337 let txn = self.db.begin_write()?;
338 {
339 txn.delete_table(PROCESS)?;
340 let mut table = txn.open_table(PROCESS)?;
341 for process in processes {
342 table.insert(process.name.as_str(), process)?;
343 }
344 }
345 txn.commit()?;
346 Ok(())
347 }
348
349 pub(crate) async fn set_stacks(&self, stacks: &[Stack]) -> Result<()> {
350 let processes: HashSet<String> = self
351 .get_processes()
352 .await?
353 .iter()
354 .map(|p| p.name.to_owned())
355 .collect();
356
357 let txn = self.db.begin_write()?;
359 {
360 txn.delete_table(STACK)?;
361 let mut table = txn.open_table(STACK)?;
362 for stack in stacks {
363 let stack_processes = stack.processes.iter();
364 let inherited_processes = stack.inherited_processes.iter();
365 let missing_processes: Vec<String> = stack_processes
366 .clone()
367 .chain(inherited_processes.clone())
368 .filter(|&stack_process| !processes.contains(stack_process))
369 .cloned()
370 .collect();
371 if !missing_processes.is_empty() {
372 return Err(Error::new(InnerError::ProcessNotFound(missing_processes)));
373 }
374 table.insert(stack.name.as_str(), stack)?;
375 }
376 }
377 txn.commit()?;
378 Ok(())
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use std::{collections::HashMap, thread::sleep, time::Duration};
385
386 use tempfile::{tempdir, TempDir};
387 use url::Url;
388
389 use super::*;
390
391 #[tokio::test]
392 async fn get_set_binaries() {
393 let (dir, db) = setup().await.unwrap();
394 let base_url = format!("file://{}", dir.path().to_str().unwrap());
395 let source_bins = [
396 Binary {
397 name: "foo".to_owned(),
398 id: Url::parse(&format!("{base_url}/foo")).unwrap(),
399 },
400 Binary {
401 name: "bar".to_owned(),
402 id: Url::parse(&format!("{base_url}/bar")).unwrap(),
403 },
404 Binary {
405 name: "baz".to_owned(),
406 id: Url::parse(&format!("{base_url}/baz")).unwrap(),
407 },
408 ];
409
410 let bins = db.get_binaries().await.unwrap();
411 assert_eq!(bins.len(), 0);
412
413 db.set_binaries(&source_bins[0..1]).await.unwrap();
414 let bins = db.get_binaries().await.unwrap();
415 assert_eq!(bins.len(), 1);
416 assert_eq!(bins[0].name, source_bins[0].name);
417 assert_eq!(bins[0].id, source_bins[0].id);
418
419 db.set_binaries(&source_bins[1..2]).await.unwrap();
420 let bins = db.get_binaries().await.unwrap();
421 assert_eq!(bins.len(), 1);
422 assert_eq!(bins[0].name, source_bins[1].name);
423 assert_eq!(bins[0].id, source_bins[1].id);
424
425 db.set_binaries(&source_bins).await.unwrap();
426 let bins = db.get_binaries().await.unwrap();
427 assert_eq!(bins.len(), 3);
428 assert_eq!(bins[0].name, source_bins[1].name);
430 assert_eq!(bins[0].id, source_bins[1].id);
431 assert_eq!(bins[1].name, source_bins[2].name);
432 assert_eq!(bins[1].id, source_bins[2].id);
433 assert_eq!(bins[2].name, source_bins[0].name);
434 assert_eq!(bins[2].id, source_bins[0].id);
435 }
436
437 #[tokio::test]
438 async fn get_set_binaries_updated_at() {
439 let (dir, db) = setup().await.unwrap();
440
441 let date = db.get_binaries_updated_at().await.unwrap();
442 assert!(date.is_none());
443 sleep(Duration::from_millis(100));
444
445 let now = Utc::now();
446 db.set_binaries_updated_at(now).await.unwrap();
447 let date = db.get_binaries_updated_at().await.unwrap();
448 assert_eq!(date, Some(now));
449
450 drop(dir);
451 }
452
453 #[tokio::test]
454 async fn get_set_config_updated_at() {
455 let (dir, db) = setup().await.unwrap();
456
457 let date = db.get_config_updated_at().await.unwrap();
458 assert!(date.is_none());
459
460 let now = Utc::now();
461 db.set_config_updated_at(now).await.unwrap();
462 let date = db.get_config_updated_at().await.unwrap();
463 assert_eq!(date, Some(now));
464
465 drop(dir);
466 }
467
468 #[tokio::test]
469 async fn get_set_default_stack() {
470 let (dir, db) = setup().await.unwrap();
471
472 let stack = db.get_default_stack().await.unwrap();
473 assert!(stack.is_none());
474
475 let default_stack = None;
476 db.set_default_stack(&default_stack).await.unwrap();
477 let stack = db.get_default_stack().await.unwrap();
478 assert_eq!(stack, default_stack);
479
480 let default_stack = Some("foo".to_owned());
481 let err = db.set_default_stack(&default_stack).await;
482 assert!(err.is_err());
483
484 let processes = test_processes();
485 db.set_processes(&processes).await.unwrap();
486 let stacks = test_stacks();
487 db.set_stacks(&stacks).await.unwrap();
488 let default_stack = Some("foo".to_owned());
489 db.set_default_stack(&default_stack).await.unwrap();
490 let stack = db.get_default_stack().await.unwrap();
491 assert_eq!(stack, default_stack);
492
493 let default_stack = None;
494 db.set_default_stack(&default_stack).await.unwrap();
495 let stack = db.get_default_stack().await.unwrap();
496 assert_eq!(stack, default_stack);
497
498 drop(dir);
499 }
500
501 #[tokio::test]
502 async fn get_set_process_properties() {
503 let (dir, db) = setup().await.unwrap();
504
505 let processes = db.get_processes().await.unwrap();
506 assert!(processes.is_empty());
507
508 let expected_processes = test_processes();
509 db.set_processes(&expected_processes).await.unwrap();
510 db.set_process_pid(&expected_processes[0].name, Some(42))
511 .await
512 .unwrap();
513 db.set_process_state(&expected_processes[0].name, ProcessState::Building)
514 .await
515 .unwrap();
516 let processes = db.get_processes().await.unwrap();
517 assert_eq!(processes.len(), 2);
518 assert_eq!(processes[0], expected_processes[1]);
519 assert_eq!(processes[1].name, expected_processes[0].name);
520 assert_eq!(processes[1].pid(), &Some(42));
521 assert_eq!(processes[1].state, ProcessState::Building);
522
523 drop(dir);
524 }
525
526 #[tokio::test]
527 async fn get_set_processes() {
528 let (dir, db) = setup().await.unwrap();
529
530 let processes = db.get_processes().await.unwrap();
531 assert!(processes.is_empty());
532
533 let expected_processes = test_processes();
534 db.set_processes(&expected_processes).await.unwrap();
535 let processes = db.get_processes().await.unwrap();
536 assert_eq!(processes.len(), 2);
537 assert_eq!(processes[0], expected_processes[1]);
538 assert_eq!(processes[1], expected_processes[0]);
539
540 db.set_processes(&expected_processes[1..=1]).await.unwrap();
541 let processes = db.get_processes().await.unwrap();
542 assert_eq!(processes.len(), 1);
543 assert_eq!(processes[0], expected_processes[1]);
544
545 drop(dir);
546 }
547
548 #[tokio::test]
549 async fn get_set_stacks() {
550 let (dir, db) = setup().await.unwrap();
551
552 let stack = db.get_stack("foo").await.unwrap_err();
553 assert!(matches!(stack.inner_error, InnerError::StackNotFound(_)));
554
555 let expected_processes = test_processes();
556 db.set_processes(&expected_processes).await.unwrap();
557 let expected_stacks = test_stacks();
558 db.set_stacks(&expected_stacks).await.unwrap();
559 let stack = db.get_stack("foo").await.unwrap();
560 assert_eq!(&stack.name, "foo");
561 assert_eq!(stack.processes, HashSet::from(["bar".to_owned()]));
562 assert_eq!(stack.inherited_processes, HashSet::new());
563 let stack = db.get_stack("baz").await.unwrap();
564 assert_eq!(&stack.name, "baz");
565 assert_eq!(stack.processes, HashSet::from(["foo".to_owned()]));
566 assert_eq!(stack.inherited_processes, HashSet::from(["bar".to_owned()]));
567
568 db.set_processes(&expected_processes[1..=1]).await.unwrap();
569 let processes = db.get_processes().await.unwrap();
570 assert_eq!(processes.len(), 1);
571 assert_eq!(processes[0], expected_processes[1]);
572
573 drop(dir);
574 }
575
576 async fn setup() -> Result<(TempDir, Database)> {
577 let dir = tempdir()?;
578 let db = Database::new(&dir).await?;
579 Ok((dir, db))
580 }
581
582 fn test_processes() -> Vec<Process> {
583 vec![
584 Process {
585 name: "foo".to_owned(),
586 binary: "foo".to_owned(),
587 state: ProcessState::Stopped,
588 pid: None,
589 args: Vec::new(),
590 cargo_args: Vec::new(),
591 env: HashMap::new(),
592 },
593 Process {
594 name: "bar".to_owned(),
595 binary: "bar".to_owned(),
596 state: ProcessState::Stopped,
597 pid: None,
598 args: Vec::new(),
599 cargo_args: Vec::new(),
600 env: HashMap::new(),
601 },
602 ]
603 }
604
605 fn test_stacks() -> Vec<Stack> {
606 vec![
607 Stack {
608 name: "foo".to_owned(),
609 processes: HashSet::from(["bar".to_owned()]),
610 inherited_processes: Default::default(),
611 },
612 Stack {
613 name: "baz".to_owned(),
614 processes: HashSet::from(["foo".to_owned()]),
615 inherited_processes: HashSet::from(["bar".to_owned()]),
616 },
617 ]
618 }
619}