lmdb_queue/
topic.rs

1use lmdb::{Cursor, Database, DatabaseFlags, Error, RwTransaction, Transaction, WriteFlags};
2use lmdb_sys::{MDB_LAST, MDB_SET_RANGE};
3use super::env::Env;
4
5use super::reader::{Reader, Item};
6use super::writer::Writer;
7
8pub static KEY_CONSUMER_FILE: [u8; 1] = [0];
9pub static KEY_CONSUMER_OFFSET: [u8; 2] = [0, 0];
10pub static KEY_CONSUMER_BYTES_READ: [u8; 3] = [0, 0, 0];
11
12pub fn slice_to_u64(slice: &[u8]) -> Result<u64, Error> {
13    let arr: [u8; 8] = slice.try_into().map_err(|_| Error::Corrupted)?;
14    Ok(u64::from_be_bytes(arr))
15}
16
17pub fn u64_to_bytes(v: u64) -> [u8; 8] {
18    v.to_be_bytes()
19}
20
21pub trait Topic {
22    fn get_env(&self) -> &Env;
23    fn get_db(&self) -> Database;
24
25    fn lag(&self) -> Result<u64, Error> {
26        let txn = self.get_env().transaction_ro()?;
27        let db = self.get_db();
28        let head = Self::get_value(db, &txn, &KEY_CONSUMER_FILE)?;
29        let (tail, _) = Self::get_tail(db, &txn)?;
30        let total = (head..tail + 1)
31            .map(|v| Self::get_value(db, &txn, &u64_to_bytes(v)).unwrap_or(0))
32            .reduce(|acc, v| acc + v)
33            .unwrap_or(0);
34
35        let head_offset = Self::get_value(db, &txn, &KEY_CONSUMER_OFFSET)?;
36        Ok(total - head_offset)
37    }
38
39    fn inc(&self, txn: &mut RwTransaction, key: &[u8], delta: u64) -> Result<(), Error> {
40        let mut cur = txn.open_rw_cursor(self.get_db())?;
41        let (_, old) = cur.get(Some(&key), None, MDB_SET_RANGE)?;
42        let old_val = slice_to_u64(old)?;
43        cur.put(&key, &u64_to_bytes(old_val + delta), WriteFlags::CURRENT)
44    }
45
46    fn replace(&self, txn: &mut RwTransaction, key: &[u8], value: u64) -> Result<(), Error> {
47        let mut cur = txn.open_rw_cursor(self.get_db())?;
48        cur.get(Some(&key), None, MDB_SET_RANGE)?;
49        cur.put(&key, &u64_to_bytes(value), WriteFlags::CURRENT)
50    }
51
52    fn get_tail<TXN>(db: Database, txn: &TXN) -> Result<(u64, u64), Error>
53    where TXN: Transaction
54    {
55        let cur = txn.open_ro_cursor(db)?;
56        if let (Some(key), value) = cur.get(None, None, MDB_LAST)? {
57            Ok((slice_to_u64(key)?, slice_to_u64(value)?))
58        } else {
59            Err(Error::NotFound)
60        }
61    }
62
63    fn get_value<TXN>(db: Database, txn: &TXN, key: &[u8]) -> Result<u64, Error>
64    where TXN: Transaction
65    {
66        let value = txn.get(db, &key)?;
67        slice_to_u64(value)
68    }
69}
70
71pub struct Producer<'env> {
72    env: &'env Env,
73    db: Database,
74    writer: Writer,
75    chunk_size: u64,
76}
77
78impl<'env> Topic for Producer<'env> {
79    fn get_env(&self) -> &Env {
80        self.env
81    }
82
83    fn get_db(&self) -> Database {
84        self.db
85    }
86}
87
88impl<'env> Producer<'env> {
89    pub fn new(env: &'env Env, name: &str, chunk_size: Option<u64>) -> Result<Self, anyhow::Error> {
90        let mut txn = env.transaction_rw()?;
91        let db = unsafe { txn.create_db(Some(name), DatabaseFlags::empty())? };
92
93        let zero = &u64_to_bytes(0);
94        if let Ok(_) = txn.put(db, &KEY_CONSUMER_FILE, zero, WriteFlags::NO_OVERWRITE) {
95            txn.put(db, &KEY_CONSUMER_OFFSET, zero, WriteFlags::NO_OVERWRITE)?;
96            txn.put(db, &KEY_CONSUMER_BYTES_READ, zero, WriteFlags::NO_OVERWRITE)?;
97            txn.put(db, zero, zero, WriteFlags::NO_OVERWRITE)?;
98        }
99
100        let (tail_file, _) = Self::get_tail(db, &txn)?;
101        let writer = Writer::new(&env.root, name, tail_file)?;
102
103        txn.commit()?;
104
105        Ok(Producer { env, db, writer, chunk_size: chunk_size.unwrap_or(64 * 1024 * 1024) })
106    }
107
108    pub fn push_back_batch<'a, B>(&mut self, messages: &'a B) -> Result<(), anyhow::Error>
109    where B: AsRef<[&'a [u8]]>
110    {
111        let mut txn = self.env.transaction_rw()?;
112        let (mut tail_file, _) = Self::get_tail(self.db, &txn)?;
113        if self.writer.file_size()? > self.chunk_size {
114            self.writer.rotate()?;
115            tail_file += 1;
116            txn.put(self.db, &u64_to_bytes(tail_file), &u64_to_bytes(0), WriteFlags::empty())?;
117        }
118        self.writer.put_batch(messages)?;
119        self.inc(&mut txn, &u64_to_bytes(tail_file), messages.as_ref().len() as u64)?;
120        txn.commit()?;
121        Ok(())
122    }
123
124    pub fn push_back<'a>(&mut self, message: &'a [u8]) -> Result<(), anyhow::Error> {
125        self.push_back_batch(&[message])
126    }
127}
128
129pub struct Consumer<'env> {
130    env: &'env Env,
131    db: Database,
132    reader: Reader,
133    chunks_to_keep: u64,
134}
135
136impl <'env> Topic for Consumer<'env> {
137    fn get_env(&self) -> &Env {
138        self.env
139    }
140
141    fn get_db(&self) -> Database {
142        self.db
143    }    
144}
145
146impl <'env> Consumer<'env> {
147    pub fn new(env: &'env Env, name: &str, chunks_to_keep: Option<u64>) -> Result<Self, anyhow::Error> {
148        let txn = env.transaction_ro()?;
149        let db = unsafe { txn.open_db(Some(name))? };
150        // let offset = Self::get_value(db, &txn, &KEY_CONSUMER_OFFSET)?;
151        let bytes_read = Self::get_value(db, &txn, &KEY_CONSUMER_BYTES_READ)?;
152        let file_num = Self::get_value(db, &txn, &KEY_CONSUMER_FILE)?;
153        txn.commit()?;
154
155        let mut reader = Reader::new(&env.root, name, file_num)?;
156        if bytes_read > 0 {
157            reader.set_bytes_read(bytes_read)?;
158        }
159
160        Ok(Consumer { env, db, reader, chunks_to_keep: chunks_to_keep.unwrap_or(8) })
161    }
162
163    pub fn pop_front_n(&mut self, n: u64) -> Result<Vec<Item>, anyhow::Error> {
164        let mut txn: RwTransaction<'_> = self.env.transaction_rw()?;
165        self.check_chunks_to_keep(&mut txn)?;
166
167        let mut items = vec![];
168        let mut delta = 0;
169        for _ in 0..n {
170            match self.reader.read() {
171                Ok(item) => {
172                    items.push(item);
173                    delta += 1;
174                },
175                Err(_) => {
176                    if self.rotate(&mut txn)? {
177                        items.push(self.reader.read()?);
178                        delta = 1;
179                    } else {
180                        break;
181                    }
182                }
183            }
184        }
185
186        self.inc(&mut txn, &KEY_CONSUMER_OFFSET, delta)?;
187        self.replace(&mut txn, &KEY_CONSUMER_BYTES_READ, self.reader.get_bytes_read())?;
188        txn.commit()?;
189        Ok(items)
190    }
191
192    pub fn pop_front(&mut self) -> Result<Option<Item>, anyhow::Error> {
193        let mut txn = self.env.transaction_rw()?;
194        self.check_chunks_to_keep(&mut txn)?;
195
196        match self.reader.read() {
197            Ok(item) => {
198                self.inc(&mut txn, &KEY_CONSUMER_OFFSET, 1)?;
199                self.replace(&mut txn, &KEY_CONSUMER_BYTES_READ, self.reader.get_bytes_read())?;
200                txn.commit()?;
201                return Ok(Some(item));
202            },
203            Err(_) => {
204                if self.rotate(&mut txn)? {
205                    let item = self.reader.read()?;
206                    self.inc(&mut txn, &KEY_CONSUMER_OFFSET, 1)?;
207                    self.replace(&mut txn, &KEY_CONSUMER_BYTES_READ, self.reader.get_bytes_read())?;
208                    txn.commit()?;
209                    return Ok(Some(item));
210                } else {
211                    txn.commit()?;
212                    return Ok(None);
213                }
214            }
215        }
216    }
217
218    fn check_chunks_to_keep(&mut self, txn: &mut RwTransaction) -> Result<(), anyhow::Error> {
219        let head = Self::get_value(self.db, txn, &KEY_CONSUMER_FILE)?;
220        let (tail, _) = Self::get_tail(self.db, txn)?;
221        let chunk_to_remove: i64 = tail as i64 + 1 - head as i64 - self.chunks_to_keep as i64;
222        for _ in 0..chunk_to_remove {
223            self.rotate(txn)?;
224        }
225
226        Ok(())
227    }
228
229    fn rotate(&mut self, txn: &mut RwTransaction) -> Result<bool, anyhow::Error> {
230        let head = Self::get_value(self.db, txn, &KEY_CONSUMER_FILE)?;
231        let (tail, _) = Self::get_tail(self.db, txn)?;
232        if tail > head {
233            self.reader.rotate()?;
234            txn.del(self.db, &u64_to_bytes(head), None)?;
235            self.replace(txn, &KEY_CONSUMER_FILE, head + 1)?;
236            self.replace(txn, &KEY_CONSUMER_OFFSET, 0)?;
237            self.replace(txn, &KEY_CONSUMER_BYTES_READ, 0)?;
238            Ok(true)
239        } else {
240            Ok(false)
241        }
242    }
243}