engula_journal/file/
journal.rs1use std::{path::PathBuf, sync::Arc};
16
17use tokio::{fs, sync::Mutex};
18
19use super::stream::Stream;
20use crate::{async_trait, Error, Result};
21
22#[derive(Clone)]
23pub struct Journal {
24 root: Arc<Mutex<PathBuf>>,
25 segment_size: usize,
26}
27
28impl Journal {
29 pub async fn open(root: impl Into<PathBuf>, segment_size: usize) -> Result<Self> {
30 let root = root.into();
31 Ok(Self {
32 root: Arc::new(Mutex::new(root)),
33 segment_size,
34 })
35 }
36}
37
38#[async_trait]
39impl crate::Journal for Journal {
40 type Stream = Stream;
41
42 async fn stream(&self, name: &str) -> Result<Stream> {
43 let root = self.root.lock().await;
44 let path = root.join(name);
45 if !path.exists() {
46 return Err(Error::NotFound(format!("stream '{:?}'", path)));
47 }
48 Stream::open(path, self.segment_size).await
49 }
50
51 async fn create_stream(&self, name: &str) -> Result<Stream> {
52 let root = self.root.lock().await;
53 let path = root.join(name);
54 if path.exists() {
55 return Err(Error::AlreadyExists(format!("stream '{:?}'", path)));
56 }
57 fs::create_dir_all(&path).await?;
58 Stream::open(path, self.segment_size).await
59 }
60
61 async fn delete_stream(&self, name: &str) -> Result<()> {
62 let root = self.root.lock().await;
63 let path = root.join(name);
64 if !path.exists() {
65 return Err(Error::NotFound(format!("stream '{:?}'", path)));
66 }
67 fs::remove_dir_all(path).await?;
68 Ok(())
69 }
70}