1use lmdb::{Cursor, Database, DatabaseFlags, Error, RwTransaction, Transaction, WriteFlags};
2use lmdb_sys::{MDB_LAST, MDB_SET_RANGE};
3use super::env::Env;
4
5use super::reader::{Reader, Item};
6use super::writer::Writer;
7
8pub static KEY_CONSUMER_FILE: [u8; 1] = [0];
9pub static KEY_CONSUMER_OFFSET: [u8; 2] = [0, 0];
10pub static KEY_CONSUMER_BYTES_READ: [u8; 3] = [0, 0, 0];
11
12pub fn slice_to_u64(slice: &[u8]) -> Result<u64, Error> {
13 let arr: [u8; 8] = slice.try_into().map_err(|_| Error::Corrupted)?;
14 Ok(u64::from_be_bytes(arr))
15}
16
17pub fn u64_to_bytes(v: u64) -> [u8; 8] {
18 v.to_be_bytes()
19}
20
21pub trait Topic {
22 fn get_env(&self) -> &Env;
23 fn get_db(&self) -> Database;
24
25 fn lag(&self) -> Result<u64, Error> {
26 let txn = self.get_env().transaction_ro()?;
27 let db = self.get_db();
28 let head = Self::get_value(db, &txn, &KEY_CONSUMER_FILE)?;
29 let (tail, _) = Self::get_tail(db, &txn)?;
30 let total = (head..tail + 1)
31 .map(|v| Self::get_value(db, &txn, &u64_to_bytes(v)).unwrap_or(0))
32 .reduce(|acc, v| acc + v)
33 .unwrap_or(0);
34
35 let head_offset = Self::get_value(db, &txn, &KEY_CONSUMER_OFFSET)?;
36 Ok(total - head_offset)
37 }
38
39 fn inc(&self, txn: &mut RwTransaction, key: &[u8], delta: u64) -> Result<(), Error> {
40 let mut cur = txn.open_rw_cursor(self.get_db())?;
41 let (_, old) = cur.get(Some(&key), None, MDB_SET_RANGE)?;
42 let old_val = slice_to_u64(old)?;
43 cur.put(&key, &u64_to_bytes(old_val + delta), WriteFlags::CURRENT)
44 }
45
46 fn replace(&self, txn: &mut RwTransaction, key: &[u8], value: u64) -> Result<(), Error> {
47 let mut cur = txn.open_rw_cursor(self.get_db())?;
48 cur.get(Some(&key), None, MDB_SET_RANGE)?;
49 cur.put(&key, &u64_to_bytes(value), WriteFlags::CURRENT)
50 }
51
52 fn get_tail<TXN>(db: Database, txn: &TXN) -> Result<(u64, u64), Error>
53 where TXN: Transaction
54 {
55 let cur = txn.open_ro_cursor(db)?;
56 if let (Some(key), value) = cur.get(None, None, MDB_LAST)? {
57 Ok((slice_to_u64(key)?, slice_to_u64(value)?))
58 } else {
59 Err(Error::NotFound)
60 }
61 }
62
63 fn get_value<TXN>(db: Database, txn: &TXN, key: &[u8]) -> Result<u64, Error>
64 where TXN: Transaction
65 {
66 let value = txn.get(db, &key)?;
67 slice_to_u64(value)
68 }
69}
70
71pub struct Producer<'env> {
72 env: &'env Env,
73 db: Database,
74 writer: Writer,
75 chunk_size: u64,
76}
77
78impl<'env> Topic for Producer<'env> {
79 fn get_env(&self) -> &Env {
80 self.env
81 }
82
83 fn get_db(&self) -> Database {
84 self.db
85 }
86}
87
88impl<'env> Producer<'env> {
89 pub fn new(env: &'env Env, name: &str, chunk_size: Option<u64>) -> Result<Self, anyhow::Error> {
90 let mut txn = env.transaction_rw()?;
91 let db = unsafe { txn.create_db(Some(name), DatabaseFlags::empty())? };
92
93 let zero = &u64_to_bytes(0);
94 if let Ok(_) = txn.put(db, &KEY_CONSUMER_FILE, zero, WriteFlags::NO_OVERWRITE) {
95 txn.put(db, &KEY_CONSUMER_OFFSET, zero, WriteFlags::NO_OVERWRITE)?;
96 txn.put(db, &KEY_CONSUMER_BYTES_READ, zero, WriteFlags::NO_OVERWRITE)?;
97 txn.put(db, zero, zero, WriteFlags::NO_OVERWRITE)?;
98 }
99
100 let (tail_file, _) = Self::get_tail(db, &txn)?;
101 let writer = Writer::new(&env.root, name, tail_file)?;
102
103 txn.commit()?;
104
105 Ok(Producer { env, db, writer, chunk_size: chunk_size.unwrap_or(64 * 1024 * 1024) })
106 }
107
108 pub fn push_back_batch<'a, B>(&mut self, messages: &'a B) -> Result<(), anyhow::Error>
109 where B: AsRef<[&'a [u8]]>
110 {
111 let mut txn = self.env.transaction_rw()?;
112 let (mut tail_file, _) = Self::get_tail(self.db, &txn)?;
113 if self.writer.file_size()? > self.chunk_size {
114 self.writer.rotate()?;
115 tail_file += 1;
116 txn.put(self.db, &u64_to_bytes(tail_file), &u64_to_bytes(0), WriteFlags::empty())?;
117 }
118 self.writer.put_batch(messages)?;
119 self.inc(&mut txn, &u64_to_bytes(tail_file), messages.as_ref().len() as u64)?;
120 txn.commit()?;
121 Ok(())
122 }
123
124 pub fn push_back<'a>(&mut self, message: &'a [u8]) -> Result<(), anyhow::Error> {
125 self.push_back_batch(&[message])
126 }
127}
128
129pub struct Consumer<'env> {
130 env: &'env Env,
131 db: Database,
132 reader: Reader,
133 chunks_to_keep: u64,
134}
135
136impl <'env> Topic for Consumer<'env> {
137 fn get_env(&self) -> &Env {
138 self.env
139 }
140
141 fn get_db(&self) -> Database {
142 self.db
143 }
144}
145
146impl <'env> Consumer<'env> {
147 pub fn new(env: &'env Env, name: &str, chunks_to_keep: Option<u64>) -> Result<Self, anyhow::Error> {
148 let txn = env.transaction_ro()?;
149 let db = unsafe { txn.open_db(Some(name))? };
150 let bytes_read = Self::get_value(db, &txn, &KEY_CONSUMER_BYTES_READ)?;
152 let file_num = Self::get_value(db, &txn, &KEY_CONSUMER_FILE)?;
153 txn.commit()?;
154
155 let mut reader = Reader::new(&env.root, name, file_num)?;
156 if bytes_read > 0 {
157 reader.set_bytes_read(bytes_read)?;
158 }
159
160 Ok(Consumer { env, db, reader, chunks_to_keep: chunks_to_keep.unwrap_or(8) })
161 }
162
163 pub fn pop_front_n(&mut self, n: u64) -> Result<Vec<Item>, anyhow::Error> {
164 let mut txn: RwTransaction<'_> = self.env.transaction_rw()?;
165 self.check_chunks_to_keep(&mut txn)?;
166
167 let mut items = vec![];
168 let mut delta = 0;
169 for _ in 0..n {
170 match self.reader.read() {
171 Ok(item) => {
172 items.push(item);
173 delta += 1;
174 },
175 Err(_) => {
176 if self.rotate(&mut txn)? {
177 items.push(self.reader.read()?);
178 delta = 1;
179 } else {
180 break;
181 }
182 }
183 }
184 }
185
186 self.inc(&mut txn, &KEY_CONSUMER_OFFSET, delta)?;
187 self.replace(&mut txn, &KEY_CONSUMER_BYTES_READ, self.reader.get_bytes_read())?;
188 txn.commit()?;
189 Ok(items)
190 }
191
192 pub fn pop_front(&mut self) -> Result<Option<Item>, anyhow::Error> {
193 let mut txn = self.env.transaction_rw()?;
194 self.check_chunks_to_keep(&mut txn)?;
195
196 match self.reader.read() {
197 Ok(item) => {
198 self.inc(&mut txn, &KEY_CONSUMER_OFFSET, 1)?;
199 self.replace(&mut txn, &KEY_CONSUMER_BYTES_READ, self.reader.get_bytes_read())?;
200 txn.commit()?;
201 return Ok(Some(item));
202 },
203 Err(_) => {
204 if self.rotate(&mut txn)? {
205 let item = self.reader.read()?;
206 self.inc(&mut txn, &KEY_CONSUMER_OFFSET, 1)?;
207 self.replace(&mut txn, &KEY_CONSUMER_BYTES_READ, self.reader.get_bytes_read())?;
208 txn.commit()?;
209 return Ok(Some(item));
210 } else {
211 txn.commit()?;
212 return Ok(None);
213 }
214 }
215 }
216 }
217
218 fn check_chunks_to_keep(&mut self, txn: &mut RwTransaction) -> Result<(), anyhow::Error> {
219 let head = Self::get_value(self.db, txn, &KEY_CONSUMER_FILE)?;
220 let (tail, _) = Self::get_tail(self.db, txn)?;
221 let chunk_to_remove: i64 = tail as i64 + 1 - head as i64 - self.chunks_to_keep as i64;
222 for _ in 0..chunk_to_remove {
223 self.rotate(txn)?;
224 }
225
226 Ok(())
227 }
228
229 fn rotate(&mut self, txn: &mut RwTransaction) -> Result<bool, anyhow::Error> {
230 let head = Self::get_value(self.db, txn, &KEY_CONSUMER_FILE)?;
231 let (tail, _) = Self::get_tail(self.db, txn)?;
232 if tail > head {
233 self.reader.rotate()?;
234 txn.del(self.db, &u64_to_bytes(head), None)?;
235 self.replace(txn, &KEY_CONSUMER_FILE, head + 1)?;
236 self.replace(txn, &KEY_CONSUMER_OFFSET, 0)?;
237 self.replace(txn, &KEY_CONSUMER_BYTES_READ, 0)?;
238 Ok(true)
239 } else {
240 Ok(false)
241 }
242 }
243}