Skip to main content

event_index/
event_index.rs

1use std::path::{Path, PathBuf};
2
3use trine_kv::{Bucket, Db, Error, Result, WriteBatch, WriteOptions};
4
5fn main() -> Result<()> {
6    let path = temp_path("trine-kv-event-index");
7    reset_dir(&path)?;
8
9    let log = EventLog::open(&path)?;
10    log.append(&Event::new("000001", "acct-a", "invoice-created"))?;
11    log.append(&Event::new("000002", "acct-b", "invoice-created"))?;
12    log.append(&Event::new("000003", "acct-a", "invoice-paid"))?;
13
14    assert_eq!(
15        log.events_for_account("acct-a")?
16            .iter()
17            .map(|event| event.body.as_str())
18            .collect::<Vec<_>>(),
19        ["invoice-created", "invoice-paid"]
20    );
21
22    log.flush()?;
23    drop(log);
24
25    let reopened = EventLog::open(&path)?;
26    assert_eq!(
27        reopened
28            .events_for_account("acct-b")?
29            .first()
30            .map(|event| event.id.as_str()),
31        Some("000002")
32    );
33
34    drop(reopened);
35    std::fs::remove_dir_all(path)?;
36    Ok(())
37}
38
39struct EventLog {
40    db: Db,
41    events: Bucket,
42    by_account: Bucket,
43}
44
45impl EventLog {
46    fn open(path: &Path) -> Result<Self> {
47        let db = Db::open_sync(path)?;
48        let events = db.bucket_sync("events")?;
49        let by_account = db.bucket_sync("events_by_account")?;
50        Ok(Self {
51            db,
52            events,
53            by_account,
54        })
55    }
56
57    fn append(&self, event: &Event) -> Result<()> {
58        let mut batch = WriteBatch::new();
59        batch.put_bucket("events", event_key(&event.id), event.encode()?)?;
60        batch.put_bucket(
61            "events_by_account",
62            account_event_key(&event.account_id, &event.id),
63            event.id.as_bytes(),
64        )?;
65        self.db.write_sync(batch, WriteOptions::default())?;
66        Ok(())
67    }
68
69    fn events_for_account(&self, account_id: &str) -> Result<Vec<Event>> {
70        self.by_account
71            .prefix_sync(account_event_prefix(account_id))?
72            .map(|item| {
73                let index = item?;
74                let event_id = std::str::from_utf8(&index.value)
75                    .map_err(|_| invalid_event("index value is not UTF-8"))?;
76                let bytes = self.events.get_sync(&event_key(event_id))?.ok_or_else(|| {
77                    Error::Corruption {
78                        message: format!("event index points at missing event {event_id}"),
79                    }
80                })?;
81                Event::decode(&bytes)
82            })
83            .collect()
84    }
85
86    fn flush(&self) -> Result<()> {
87        self.db.flush_sync()
88    }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92struct Event {
93    id: String,
94    account_id: String,
95    body: String,
96}
97
98impl Event {
99    fn new(id: &str, account_id: &str, body: &str) -> Self {
100        Self {
101            id: id.to_owned(),
102            account_id: account_id.to_owned(),
103            body: body.to_owned(),
104        }
105    }
106
107    fn encode(&self) -> Result<Vec<u8>> {
108        encode_fields(&[&self.id, &self.account_id, &self.body])
109    }
110
111    fn decode(bytes: &[u8]) -> Result<Self> {
112        let mut fields = FieldCursor::new(bytes);
113        let event = Self {
114            id: fields.read_string()?,
115            account_id: fields.read_string()?,
116            body: fields.read_string()?,
117        };
118        fields.finish()?;
119        Ok(event)
120    }
121}
122
123fn event_key(id: &str) -> Vec<u8> {
124    format!("event/{id}").into_bytes()
125}
126
127fn account_event_key(account_id: &str, event_id: &str) -> Vec<u8> {
128    format!("account/{account_id}/event/{event_id}").into_bytes()
129}
130
131fn account_event_prefix(account_id: &str) -> Vec<u8> {
132    format!("account/{account_id}/event/").into_bytes()
133}
134
135fn encode_fields(fields: &[&str]) -> Result<Vec<u8>> {
136    let mut bytes = Vec::new();
137    for field in fields {
138        let len = u32::try_from(field.len())
139            .map_err(|_| Error::invalid_options("event field exceeds u32::MAX"))?;
140        bytes.extend_from_slice(&len.to_le_bytes());
141        bytes.extend_from_slice(field.as_bytes());
142    }
143    Ok(bytes)
144}
145
146struct FieldCursor<'bytes> {
147    bytes: &'bytes [u8],
148    offset: usize,
149}
150
151impl<'bytes> FieldCursor<'bytes> {
152    const fn new(bytes: &'bytes [u8]) -> Self {
153        Self { bytes, offset: 0 }
154    }
155
156    fn read_string(&mut self) -> Result<String> {
157        let len_bytes = self
158            .bytes
159            .get(self.offset..self.offset.saturating_add(4))
160            .ok_or_else(|| invalid_event("short field length"))?;
161        self.offset += 4;
162
163        let len =
164            u32::from_le_bytes([len_bytes[0], len_bytes[1], len_bytes[2], len_bytes[3]]) as usize;
165        let end = self
166            .offset
167            .checked_add(len)
168            .ok_or_else(|| invalid_event("field length overflows usize"))?;
169        let value = self
170            .bytes
171            .get(self.offset..end)
172            .ok_or_else(|| invalid_event("short field bytes"))?;
173        self.offset = end;
174
175        std::str::from_utf8(value)
176            .map(str::to_owned)
177            .map_err(|_| invalid_event("field is not UTF-8"))
178    }
179
180    fn finish(&self) -> Result<()> {
181        if self.offset == self.bytes.len() {
182            return Ok(());
183        }
184        Err(invalid_event("trailing bytes"))
185    }
186}
187
188fn invalid_event(message: &'static str) -> Error {
189    Error::InvalidFormat {
190        message: format!("invalid event record: {message}"),
191    }
192}
193
194fn temp_path(name: &str) -> PathBuf {
195    std::env::temp_dir().join(format!("{name}-{}", std::process::id()))
196}
197
198fn reset_dir(path: &Path) -> Result<()> {
199    match std::fs::remove_dir_all(path) {
200        Ok(()) => {}
201        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
202        Err(error) => return Err(Error::Io(error)),
203    }
204    Ok(())
205}