thalo_message_store/
message_store.rs

1use std::path::Path;
2
3use sled::{Db, Mode};
4use thalo::stream_name::{Category, StreamName};
5
6use crate::error::Result;
7use crate::global_event_log::GlobalEventLog;
8use crate::id_generator::IdGenerator;
9use crate::outbox::Outbox;
10use crate::projection::{Projection, PROJECTION_POSITIONS_TREE};
11use crate::stream::Stream;
12
13#[derive(Clone)]
14pub struct MessageStore {
15    db: Db,
16    id_generator: IdGenerator,
17}
18
19impl MessageStore {
20    pub fn new(db: Db) -> Result<Self> {
21        let global_event_log = GlobalEventLog::new(db)?;
22        let last_id = global_event_log.last_position()?;
23        let id_generator = IdGenerator::new(last_id);
24
25        Ok(MessageStore {
26            db: global_event_log.db,
27            id_generator,
28        })
29    }
30
31    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
32        let db = sled::Config::new()
33            .flush_every_ms(None)
34            .mode(Mode::LowSpace)
35            .path(path)
36            .open()?;
37        MessageStore::new(db)
38    }
39
40    pub fn global_event_log(&self) -> Result<GlobalEventLog> {
41        GlobalEventLog::new(self.db.clone())
42    }
43
44    pub fn stream<'a>(&self, stream_name: StreamName<'a>) -> Result<Stream<'a>> {
45        Ok(Stream::new(
46            self.id_generator.clone(),
47            self.db.open_tree(stream_name.as_bytes())?,
48            self.global_event_log()?,
49            stream_name,
50        ))
51    }
52
53    pub fn projection(&self, name: impl Into<String>) -> Result<Projection> {
54        Projection::new(&self.db, name.into())
55    }
56
57    pub async fn flush_projections(&self) -> Result<usize> {
58        let tree = self.db.open_tree(PROJECTION_POSITIONS_TREE)?;
59        Ok(tree.flush_async().await?)
60    }
61
62    pub fn outbox(&self, category: Category<'_>) -> Result<Outbox> {
63        let tree_name = Category::from_parts(category, &["outbox"])?;
64        let tree = self.db.open_tree(tree_name.as_bytes())?;
65        let outbox = Outbox::new(tree);
66        Ok(outbox)
67    }
68}