engula_journal/mem/
journal.rs1use std::{
16 collections::{hash_map, HashMap},
17 sync::Arc,
18};
19
20use tokio::sync::Mutex;
21
22use super::stream::Stream;
23use crate::{async_trait, Error, Result};
24
25#[derive(Clone)]
26pub struct Journal {
27 streams: Arc<Mutex<HashMap<String, Stream>>>,
28}
29
30impl Default for Journal {
31 fn default() -> Self {
32 Self {
33 streams: Arc::new(Mutex::new(HashMap::new())),
34 }
35 }
36}
37
38#[async_trait]
39impl crate::Journal for Journal {
40 type Stream = Stream;
41
42 async fn stream(&self, name: &str) -> Result<Self::Stream> {
43 let streams = self.streams.lock().await;
44 match streams.get(name) {
45 Some(stream) => Ok(stream.clone()),
46 None => Err(Error::NotFound(format!("stream '{}'", name))),
47 }
48 }
49
50 async fn create_stream(&self, name: &str) -> Result<Self::Stream> {
51 let stream = Stream::default();
52 let mut streams = self.streams.lock().await;
53 match streams.entry(name.to_owned()) {
54 hash_map::Entry::Vacant(ent) => {
55 ent.insert(stream.clone());
56 Ok(stream)
57 }
58 hash_map::Entry::Occupied(ent) => {
59 Err(Error::AlreadyExists(format!("stream '{}'", ent.key())))
60 }
61 }
62 }
63
64 async fn delete_stream(&self, name: &str) -> Result<()> {
65 let mut streams = self.streams.lock().await;
66 match streams.remove(name) {
67 Some(_) => Ok(()),
68 None => Err(Error::NotFound(format!("stream '{}'", name))),
69 }
70 }
71}