thalo_message_store/
message_store.rs1use std::path::Path;
2
3use sled::{Db, Mode};
4use thalo::stream_name::{Category, StreamName};
5
6use crate::error::Result;
7use crate::global_event_log::GlobalEventLog;
8use crate::id_generator::IdGenerator;
9use crate::outbox::Outbox;
10use crate::projection::{Projection, PROJECTION_POSITIONS_TREE};
11use crate::stream::Stream;
12
13#[derive(Clone)]
14pub struct MessageStore {
15 db: Db,
16 id_generator: IdGenerator,
17}
18
19impl MessageStore {
20 pub fn new(db: Db) -> Result<Self> {
21 let global_event_log = GlobalEventLog::new(db)?;
22 let last_id = global_event_log.last_position()?;
23 let id_generator = IdGenerator::new(last_id);
24
25 Ok(MessageStore {
26 db: global_event_log.db,
27 id_generator,
28 })
29 }
30
31 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
32 let db = sled::Config::new()
33 .flush_every_ms(None)
34 .mode(Mode::LowSpace)
35 .path(path)
36 .open()?;
37 MessageStore::new(db)
38 }
39
40 pub fn global_event_log(&self) -> Result<GlobalEventLog> {
41 GlobalEventLog::new(self.db.clone())
42 }
43
44 pub fn stream<'a>(&self, stream_name: StreamName<'a>) -> Result<Stream<'a>> {
45 Ok(Stream::new(
46 self.id_generator.clone(),
47 self.db.open_tree(stream_name.as_bytes())?,
48 self.global_event_log()?,
49 stream_name,
50 ))
51 }
52
53 pub fn projection(&self, name: impl Into<String>) -> Result<Projection> {
54 Projection::new(&self.db, name.into())
55 }
56
57 pub async fn flush_projections(&self) -> Result<usize> {
58 let tree = self.db.open_tree(PROJECTION_POSITIONS_TREE)?;
59 Ok(tree.flush_async().await?)
60 }
61
62 pub fn outbox(&self, category: Category<'_>) -> Result<Outbox> {
63 let tree_name = Category::from_parts(category, &["outbox"])?;
64 let tree = self.db.open_tree(tree_name.as_bytes())?;
65 let outbox = Outbox::new(tree);
66 Ok(outbox)
67 }
68}