1use lmdb::{Cursor, Database, DatabaseFlags, Error, RwTransaction, Transaction, WriteFlags};
2use lmdb_sys::MDB_LAST;
3use super::env::Env;
4
5use super::reader::{Reader, Item};
6use super::writer::Writer;
7
8pub static KEY_COMSUMER_FILE: [u8; 1] = [0];
9pub static KEY_COMSUMER_OFFSET: [u8; 2] = [0, 0];
10
11pub fn slice_to_u64(slice: &[u8]) -> Result<u64, Error> {
12 let arr: [u8; 8] = slice.try_into().map_err(|_| Error::Corrupted)?;
13 Ok(u64::from_be_bytes(arr))
14}
15
16pub fn u64_to_bytes(v: u64) -> [u8; 8] {
17 v.to_be_bytes()
18}
19
20pub trait Topic {
21 fn get_env(&self) -> &Env;
22 fn get_db(&self) -> Database;
23
24 fn lag(&self) -> Result<u64, Error> {
25 let txn = self.get_env().transaction_ro()?;
26 let db = self.get_db();
27 let head = Self::get_value(db, &txn, &KEY_COMSUMER_FILE)?;
28 let (tail, _) = Self::get_tail(db, &txn)?;
29 let total = (head..tail + 1)
30 .map(|v| Self::get_value(db, &txn, &u64_to_bytes(v)).unwrap_or(0))
31 .reduce(|acc, v| acc + v)
32 .unwrap_or(0);
33
34 let head_offset = Self::get_value(db, &txn, &KEY_COMSUMER_OFFSET)?;
35 Ok(total - head_offset)
36 }
37
38 fn get_tail<TXN>(db: Database, txn: &TXN) -> Result<(u64, u64), Error>
39 where TXN: Transaction
40 {
41 let cur = txn.open_ro_cursor(db)?;
42 if let (Some(key), value) = cur.get(None, None, MDB_LAST)? {
43 Ok((slice_to_u64(key)?, slice_to_u64(value)?))
44 } else {
45 Err(Error::NotFound)
46 }
47 }
48
49 fn get_value<TXN>(db: Database, txn: &TXN, key: &[u8]) -> Result<u64, Error>
50 where TXN: Transaction
51 {
52 let value = txn.get(db, &key)?;
53 slice_to_u64(value)
54 }
55}
56
57pub struct Producer<'env> {
58 env: &'env Env,
59 db: Database,
60 writer: Writer,
61 chunk_size: u64,
62}
63
64impl<'env> Topic for Producer<'env> {
65 fn get_env(&self) -> &Env {
66 self.env
67 }
68
69 fn get_db(&self) -> Database {
70 self.db
71 }
72}
73
74impl<'env> Producer<'env> {
75 pub fn new(env: &'env Env, name: &str, chunk_size: Option<u64>) -> Result<Self, anyhow::Error> {
76 let mut txn = env.transaction_rw()?;
77 let db = unsafe { txn.create_db(Some(name), DatabaseFlags::empty())? };
78
79 let zero = &u64_to_bytes(0);
80 if let Ok(_) = txn.put(db, &KEY_COMSUMER_FILE, zero, WriteFlags::NO_OVERWRITE) {
81 txn.put(db, &KEY_COMSUMER_OFFSET, zero, WriteFlags::NO_OVERWRITE)?;
82 txn.put(db, zero, zero, WriteFlags::NO_OVERWRITE)?;
83 }
84
85 let (tail_file, _) = Self::get_tail(db, &txn)?;
86 let writer = Writer::new(&env.root, name, tail_file)?;
87
88 txn.commit()?;
89
90 Ok(Producer { env, db, writer, chunk_size: chunk_size.unwrap_or(64 * 1024 * 1024) })
91 }
92
93 pub fn push_back_batch<'a, B>(&mut self, messages: &'a B) -> Result<(), anyhow::Error>
94 where B: AsRef<[&'a [u8]]>
95 {
96 let mut txn = self.env.transaction_rw()?;
97 let (mut tail_file, mut count) = Self::get_tail(self.db, &txn)?;
98 if self.writer.file_size()? > self.chunk_size {
99 self.writer.rotate()?;
100 tail_file += 1;
101 count = 0;
102 txn.put(self.db, &u64_to_bytes(tail_file), &u64_to_bytes(0), WriteFlags::empty())?;
103 }
104 self.writer.put_batch(messages)?;
105 txn.put(self.db, &u64_to_bytes(tail_file), &u64_to_bytes(count + messages.as_ref().len() as u64), WriteFlags::empty())?;
106 txn.commit()?;
107 Ok(())
108 }
109
110 pub fn push_back<'a>(&mut self, message: &'a [u8]) -> Result<(), anyhow::Error> {
111 self.push_back_batch(&[message])
112 }
113}
114
115pub struct Comsumer<'env> {
116 env: &'env Env,
117 db: Database,
118 reader: Reader,
119 chunks_to_keep: u64,
120}
121
122impl <'env> Topic for Comsumer<'env> {
123 fn get_env(&self) -> &Env {
124 self.env
125 }
126
127 fn get_db(&self) -> Database {
128 self.db
129 }
130}
131
132impl <'env> Comsumer<'env> {
133 pub fn new(env: &'env Env, name: &str, chunks_to_keep: Option<u64>) -> Result<Self, anyhow::Error> {
134 let db = env.lmdb_env.open_db(Some(name))?;
135 let txn = env.transaction_ro()?;
136 let mut reader = Reader::new(&env.root, name, Self::get_value(db, &txn, &KEY_COMSUMER_FILE)?)?;
137
138 let offset = Self::get_value(db, &txn, &KEY_COMSUMER_OFFSET)?;
139 for _ in 0..offset {
140 reader.read()?;
141 }
142
143 Ok(Comsumer { env, db, reader, chunks_to_keep: chunks_to_keep.unwrap_or(8) })
144 }
145
146 pub fn pop_front_n(&mut self, n: u64) -> Result<Vec<Item>, anyhow::Error> {
147 let mut txn: RwTransaction<'_> = self.env.transaction_rw()?;
148 self.check_chunks_to_keep(&mut txn)?;
149
150 let mut items = vec![];
151 let mut delta = 0;
152 for _ in 0..n {
153 match self.reader.read() {
154 Ok(item) => {
155 items.push(item);
156 delta += 1;
157 },
158 Err(_) => {
159 if self.rotate(&mut txn)? {
160 items.push(self.reader.read()?);
161 delta = 1;
162 } else {
163 txn.commit()?;
164 return Ok(items);
165 }
166 }
167 }
168 }
169
170 self.bump_offset(&mut txn, delta)?;
171 txn.commit()?;
172 Ok(items)
173 }
174
175 pub fn pop_front(&mut self) -> Result<Option<Item>, anyhow::Error> {
176 let mut txn = self.env.transaction_rw()?;
177 self.check_chunks_to_keep(&mut txn)?;
178
179 match self.reader.read() {
180 Ok(item) => {
181 self.bump_offset(&mut txn, 1)?;
182 txn.commit()?;
183 return Ok(Some(item));
184 },
185 Err(_) => {
186 if self.rotate(&mut txn)? {
187 let item = self.reader.read()?;
188 self.bump_offset(&mut txn, 1)?;
189 txn.commit()?;
190 return Ok(Some(item));
191 } else {
192 txn.commit()?;
193 return Ok(None);
194 }
195 }
196 }
197 }
198
199 fn check_chunks_to_keep(&mut self, txn: &mut RwTransaction) -> Result<(), anyhow::Error> {
200 let head = Self::get_value(self.db, txn, &KEY_COMSUMER_FILE)?;
201 let (tail, _) = Self::get_tail(self.db, txn)?;
202 let chunk_to_remove: i64 = tail as i64 + 1 - head as i64 - self.chunks_to_keep as i64;
203 for _ in 0..chunk_to_remove {
204 self.rotate(txn)?;
205 }
206
207 Ok(())
208 }
209
210 fn rotate(&mut self, txn: &mut RwTransaction) -> Result<bool, anyhow::Error> {
211 let head = Self::get_value(self.db, txn, &KEY_COMSUMER_FILE)?;
212 let (tail, _) = Self::get_tail(self.db, txn)?;
213 if tail > head {
214 self.reader.rotate()?;
215 txn.del(self.db, &u64_to_bytes(head), None)?;
216 txn.put(self.db, &KEY_COMSUMER_FILE, &u64_to_bytes(head + 1), WriteFlags::empty())?;
217 txn.put(self.db, &KEY_COMSUMER_OFFSET, &u64_to_bytes(0), WriteFlags::empty())?;
218 Ok(true)
219 } else {
220 Ok(false)
221 }
222 }
223
224 fn bump_offset(&self, txn: &mut RwTransaction, delta: u64) -> Result<(), Error> {
225 let old_offset = Self::get_value(self.db, txn, &KEY_COMSUMER_OFFSET)?;
226 txn.put(self.db, &KEY_COMSUMER_OFFSET, &u64_to_bytes(old_offset + delta), WriteFlags::empty())?;
227 Ok(())
228 }
229}