event_index/
event_index.rs1use std::path::{Path, PathBuf};
2
3use trine_kv::{Bucket, Db, Error, Result, WriteBatch, WriteOptions};
4
5fn main() -> Result<()> {
6 let path = temp_path("trine-kv-event-index");
7 reset_dir(&path)?;
8
9 let log = EventLog::open(&path)?;
10 log.append(&Event::new("000001", "acct-a", "invoice-created"))?;
11 log.append(&Event::new("000002", "acct-b", "invoice-created"))?;
12 log.append(&Event::new("000003", "acct-a", "invoice-paid"))?;
13
14 assert_eq!(
15 log.events_for_account("acct-a")?
16 .iter()
17 .map(|event| event.body.as_str())
18 .collect::<Vec<_>>(),
19 ["invoice-created", "invoice-paid"]
20 );
21
22 log.flush()?;
23 drop(log);
24
25 let reopened = EventLog::open(&path)?;
26 assert_eq!(
27 reopened
28 .events_for_account("acct-b")?
29 .first()
30 .map(|event| event.id.as_str()),
31 Some("000002")
32 );
33
34 drop(reopened);
35 std::fs::remove_dir_all(path)?;
36 Ok(())
37}
38
39struct EventLog {
40 db: Db,
41 events: Bucket,
42 by_account: Bucket,
43}
44
45impl EventLog {
46 fn open(path: &Path) -> Result<Self> {
47 let db = Db::open_sync(path)?;
48 let events = db.bucket_sync("events")?;
49 let by_account = db.bucket_sync("events_by_account")?;
50 Ok(Self {
51 db,
52 events,
53 by_account,
54 })
55 }
56
57 fn append(&self, event: &Event) -> Result<()> {
58 let mut batch = WriteBatch::new();
59 batch.put_bucket("events", event_key(&event.id), event.encode()?)?;
60 batch.put_bucket(
61 "events_by_account",
62 account_event_key(&event.account_id, &event.id),
63 event.id.as_bytes(),
64 )?;
65 self.db.write_sync(batch, WriteOptions::default())?;
66 Ok(())
67 }
68
69 fn events_for_account(&self, account_id: &str) -> Result<Vec<Event>> {
70 self.by_account
71 .prefix_sync(account_event_prefix(account_id))?
72 .map(|item| {
73 let index = item?;
74 let event_id = std::str::from_utf8(&index.value)
75 .map_err(|_| invalid_event("index value is not UTF-8"))?;
76 let bytes = self.events.get_sync(&event_key(event_id))?.ok_or_else(|| {
77 Error::Corruption {
78 message: format!("event index points at missing event {event_id}"),
79 }
80 })?;
81 Event::decode(&bytes)
82 })
83 .collect()
84 }
85
86 fn flush(&self) -> Result<()> {
87 self.db.flush_sync()
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92struct Event {
93 id: String,
94 account_id: String,
95 body: String,
96}
97
98impl Event {
99 fn new(id: &str, account_id: &str, body: &str) -> Self {
100 Self {
101 id: id.to_owned(),
102 account_id: account_id.to_owned(),
103 body: body.to_owned(),
104 }
105 }
106
107 fn encode(&self) -> Result<Vec<u8>> {
108 encode_fields(&[&self.id, &self.account_id, &self.body])
109 }
110
111 fn decode(bytes: &[u8]) -> Result<Self> {
112 let mut fields = FieldCursor::new(bytes);
113 let event = Self {
114 id: fields.read_string()?,
115 account_id: fields.read_string()?,
116 body: fields.read_string()?,
117 };
118 fields.finish()?;
119 Ok(event)
120 }
121}
122
123fn event_key(id: &str) -> Vec<u8> {
124 format!("event/{id}").into_bytes()
125}
126
127fn account_event_key(account_id: &str, event_id: &str) -> Vec<u8> {
128 format!("account/{account_id}/event/{event_id}").into_bytes()
129}
130
131fn account_event_prefix(account_id: &str) -> Vec<u8> {
132 format!("account/{account_id}/event/").into_bytes()
133}
134
135fn encode_fields(fields: &[&str]) -> Result<Vec<u8>> {
136 let mut bytes = Vec::new();
137 for field in fields {
138 let len = u32::try_from(field.len())
139 .map_err(|_| Error::invalid_options("event field exceeds u32::MAX"))?;
140 bytes.extend_from_slice(&len.to_le_bytes());
141 bytes.extend_from_slice(field.as_bytes());
142 }
143 Ok(bytes)
144}
145
146struct FieldCursor<'bytes> {
147 bytes: &'bytes [u8],
148 offset: usize,
149}
150
151impl<'bytes> FieldCursor<'bytes> {
152 const fn new(bytes: &'bytes [u8]) -> Self {
153 Self { bytes, offset: 0 }
154 }
155
156 fn read_string(&mut self) -> Result<String> {
157 let len_bytes = self
158 .bytes
159 .get(self.offset..self.offset.saturating_add(4))
160 .ok_or_else(|| invalid_event("short field length"))?;
161 self.offset += 4;
162
163 let len =
164 u32::from_le_bytes([len_bytes[0], len_bytes[1], len_bytes[2], len_bytes[3]]) as usize;
165 let end = self
166 .offset
167 .checked_add(len)
168 .ok_or_else(|| invalid_event("field length overflows usize"))?;
169 let value = self
170 .bytes
171 .get(self.offset..end)
172 .ok_or_else(|| invalid_event("short field bytes"))?;
173 self.offset = end;
174
175 std::str::from_utf8(value)
176 .map(str::to_owned)
177 .map_err(|_| invalid_event("field is not UTF-8"))
178 }
179
180 fn finish(&self) -> Result<()> {
181 if self.offset == self.bytes.len() {
182 return Ok(());
183 }
184 Err(invalid_event("trailing bytes"))
185 }
186}
187
188fn invalid_event(message: &'static str) -> Error {
189 Error::InvalidFormat {
190 message: format!("invalid event record: {message}"),
191 }
192}
193
194fn temp_path(name: &str) -> PathBuf {
195 std::env::temp_dir().join(format!("{name}-{}", std::process::id()))
196}
197
198fn reset_dir(path: &Path) -> Result<()> {
199 match std::fs::remove_dir_all(path) {
200 Ok(()) => {}
201 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
202 Err(error) => return Err(Error::Io(error)),
203 }
204 Ok(())
205}