lmdb_queue/
topic.rs

1use lmdb::{Cursor, Database, DatabaseFlags, Error, RwTransaction, Transaction, WriteFlags};
2use lmdb_sys::MDB_LAST;
3use super::env::Env;
4
5use super::reader::{Reader, Item};
6use super::writer::Writer;
7
8pub static KEY_COMSUMER_FILE: [u8; 1] = [0];
9pub static KEY_COMSUMER_OFFSET: [u8; 2] = [0, 0];
10
11pub fn slice_to_u64(slice: &[u8]) -> Result<u64, Error> {
12    let arr: [u8; 8] = slice.try_into().map_err(|_| Error::Corrupted)?;
13    Ok(u64::from_be_bytes(arr))
14}
15
16pub fn u64_to_bytes(v: u64) -> [u8; 8] {
17    v.to_be_bytes()
18}
19
20pub trait Topic {
21    fn get_env(&self) -> &Env;
22    fn get_db(&self) -> Database;
23
24    fn lag(&self) -> Result<u64, Error> {
25        let txn = self.get_env().transaction_ro()?;
26        let db = self.get_db();
27        let head = Self::get_value(db, &txn, &KEY_COMSUMER_FILE)?;
28        let (tail, _) = Self::get_tail(db, &txn)?;
29        let total = (head..tail + 1)
30            .map(|v| Self::get_value(db, &txn, &u64_to_bytes(v)).unwrap_or(0))
31            .reduce(|acc, v| acc + v)
32            .unwrap_or(0);
33
34        let head_offset = Self::get_value(db, &txn, &KEY_COMSUMER_OFFSET)?;
35        Ok(total - head_offset)
36    }
37
38    fn get_tail<TXN>(db: Database, txn: &TXN) -> Result<(u64, u64), Error>
39    where TXN: Transaction
40    {
41        let cur = txn.open_ro_cursor(db)?;
42        if let (Some(key), value) = cur.get(None, None, MDB_LAST)? {
43            Ok((slice_to_u64(key)?, slice_to_u64(value)?))
44        } else {
45            Err(Error::NotFound)
46        }
47    }
48
49    fn get_value<TXN>(db: Database, txn: &TXN, key: &[u8]) -> Result<u64, Error>
50    where TXN: Transaction
51    {
52        let value = txn.get(db, &key)?;
53        slice_to_u64(value)
54    }
55}
56
57pub struct Producer<'env> {
58    env: &'env Env,
59    db: Database,
60    writer: Writer,
61    chunk_size: u64,
62}
63
64impl<'env> Topic for Producer<'env> {
65    fn get_env(&self) -> &Env {
66        self.env
67    }
68
69    fn get_db(&self) -> Database {
70        self.db
71    }
72}
73
74impl<'env> Producer<'env> {
75    pub fn new(env: &'env Env, name: &str, chunk_size: Option<u64>) -> Result<Self, anyhow::Error> {
76        let mut txn = env.transaction_rw()?;
77        let db = unsafe { txn.create_db(Some(name), DatabaseFlags::empty())? };
78
79        let zero = &u64_to_bytes(0);
80        if let Ok(_) = txn.put(db, &KEY_COMSUMER_FILE, zero, WriteFlags::NO_OVERWRITE) {
81            txn.put(db, &KEY_COMSUMER_OFFSET, zero, WriteFlags::NO_OVERWRITE)?;
82            txn.put(db, zero, zero, WriteFlags::NO_OVERWRITE)?;
83        }
84
85        let (tail_file, _) = Self::get_tail(db, &txn)?;
86        let writer = Writer::new(&env.root, name, tail_file)?;
87
88        txn.commit()?;
89
90        Ok(Producer { env, db, writer, chunk_size: chunk_size.unwrap_or(64 * 1024 * 1024) })
91    }
92
93    pub fn push_back_batch<'a, B>(&mut self, messages: &'a B) -> Result<(), anyhow::Error>
94    where B: AsRef<[&'a [u8]]>
95    {
96        let mut txn = self.env.transaction_rw()?;
97        let (mut tail_file, mut count) = Self::get_tail(self.db, &txn)?;
98        if self.writer.file_size()? > self.chunk_size {
99            self.writer.rotate()?;
100            tail_file += 1;
101            count = 0;
102            txn.put(self.db, &u64_to_bytes(tail_file), &u64_to_bytes(0), WriteFlags::empty())?;
103        }
104        self.writer.put_batch(messages)?;
105        txn.put(self.db, &u64_to_bytes(tail_file), &u64_to_bytes(count + messages.as_ref().len() as u64), WriteFlags::empty())?;
106        txn.commit()?;
107        Ok(())
108    }
109
110    pub fn push_back<'a>(&mut self, message: &'a [u8]) -> Result<(), anyhow::Error> {
111        self.push_back_batch(&[message])
112    }
113}
114
115pub struct Comsumer<'env> {
116    env: &'env Env,
117    db: Database,
118    reader: Reader,
119    chunks_to_keep: u64,
120}
121
122impl <'env> Topic for Comsumer<'env> {
123    fn get_env(&self) -> &Env {
124        self.env
125    }
126
127    fn get_db(&self) -> Database {
128        self.db
129    }    
130}
131
132impl <'env> Comsumer<'env> {
133    pub fn new(env: &'env Env, name: &str, chunks_to_keep: Option<u64>) -> Result<Self, anyhow::Error> {
134        let db = env.lmdb_env.open_db(Some(name))?;
135        let txn = env.transaction_ro()?;
136        let mut reader = Reader::new(&env.root, name, Self::get_value(db, &txn, &KEY_COMSUMER_FILE)?)?;
137
138        let offset = Self::get_value(db, &txn, &KEY_COMSUMER_OFFSET)?;
139        for _ in 0..offset {
140            reader.read()?;
141        }
142
143        Ok(Comsumer { env, db, reader, chunks_to_keep: chunks_to_keep.unwrap_or(8) })
144    }
145
146    pub fn pop_front_n(&mut self, n: u64) -> Result<Vec<Item>, anyhow::Error> {
147        let mut txn: RwTransaction<'_> = self.env.transaction_rw()?;
148        self.check_chunks_to_keep(&mut txn)?;
149
150        let mut items = vec![];
151        let mut delta = 0;
152        for _ in 0..n {
153            match self.reader.read() {
154                Ok(item) => {
155                    items.push(item);
156                    delta += 1;
157                },
158                Err(_) => {
159                    if self.rotate(&mut txn)? {
160                        items.push(self.reader.read()?);
161                        delta = 1;
162                    } else {
163                        txn.commit()?;
164                        return Ok(items);
165                    }
166                }
167            }
168        }
169
170        self.bump_offset(&mut txn, delta)?;
171        txn.commit()?;
172        Ok(items)
173    }
174
175    pub fn pop_front(&mut self) -> Result<Option<Item>, anyhow::Error> {
176        let mut txn = self.env.transaction_rw()?;
177        self.check_chunks_to_keep(&mut txn)?;
178
179        match self.reader.read() {
180            Ok(item) => {
181                self.bump_offset(&mut txn, 1)?;
182                txn.commit()?;
183                return Ok(Some(item));
184            },
185            Err(_) => {
186                if self.rotate(&mut txn)? {
187                    let item = self.reader.read()?;
188                    self.bump_offset(&mut txn, 1)?;
189                    txn.commit()?;
190                    return Ok(Some(item));
191                } else {
192                    txn.commit()?;
193                    return Ok(None);
194                }
195            }
196        }
197    }
198
199    fn check_chunks_to_keep(&mut self, txn: &mut RwTransaction) -> Result<(), anyhow::Error> {
200        let head = Self::get_value(self.db, txn, &KEY_COMSUMER_FILE)?;
201        let (tail, _) = Self::get_tail(self.db, txn)?;
202        let chunk_to_remove: i64 = tail as i64 + 1 - head as i64 - self.chunks_to_keep as i64;
203        for _ in 0..chunk_to_remove {
204            self.rotate(txn)?;
205        }
206
207        Ok(())
208    }
209
210    fn rotate(&mut self, txn: &mut RwTransaction) -> Result<bool, anyhow::Error> {
211        let head = Self::get_value(self.db, txn, &KEY_COMSUMER_FILE)?;
212        let (tail, _) = Self::get_tail(self.db, txn)?;
213        if tail > head {
214            self.reader.rotate()?;
215            txn.del(self.db, &u64_to_bytes(head), None)?;
216            txn.put(self.db, &KEY_COMSUMER_FILE, &u64_to_bytes(head + 1), WriteFlags::empty())?;
217            txn.put(self.db, &KEY_COMSUMER_OFFSET, &u64_to_bytes(0), WriteFlags::empty())?;
218            Ok(true)
219        } else {
220            Ok(false)
221        }
222    }
223
224    fn bump_offset(&self, txn: &mut RwTransaction, delta: u64) -> Result<(), Error> {
225        let old_offset = Self::get_value(self.db, txn, &KEY_COMSUMER_OFFSET)?;
226        txn.put(self.db, &KEY_COMSUMER_OFFSET, &u64_to_bytes(old_offset + delta), WriteFlags::empty())?;
227        Ok(())
228    }
229}