lmdb_queue/
topic.rs

1use std::error::Error;
2use heed3::byteorder::BE;
3use heed3::types::*;
4use heed3::{RwTxn, Database, PutFlags};
5
6use super::env::Env;
7
8use super::reader::{Reader, Item};
9use super::writer::Writer;
10
11pub static KEY_CONSUMER_FILE: &str = "FILE";
12pub static KEY_CONSUMER_OFFSET: &str = "OFFSET";
13pub static KEY_CONSUMER_BYTES_READ: &str = "BYTES_READ";
14
15pub trait Topic {
16    fn get_env(&self) -> &Env;
17    fn get_producer_db(&self) -> Database<U64<BE>, U64<BE>>;
18    fn get_consumer_db(&self) -> Database<Str, U64<BE>>;
19
20    fn lag(&self) -> Result<u64, Box<dyn Error>> {
21        let txn = self.get_env().write_txn()?;
22
23        let mut pit = self.get_producer_db().iter(&txn)?.move_between_keys();
24        let mut total: u64 = 0;
25        while let Some((_, v)) = pit.next().transpose()? {
26            total += v;
27        }
28
29        let head_offset = self.get_consumer_db().get(&txn, KEY_CONSUMER_OFFSET)?.unwrap_or(0);
30        Ok(total - head_offset)
31    }
32}
33
34pub struct Producer<'env> {
35    env: &'env Env,
36    producer_db: Database<U64<BE>, U64<BE>>,
37    consumer_db: Database<Str, U64<BE>>,
38    writer: Writer,
39    chunk_size: u64,
40}
41
42impl<'env> Topic for Producer<'env> {
43    fn get_env(&self) -> &Env {
44        self.env
45    }
46
47    fn get_producer_db(&self) -> Database<U64<BE>, U64<BE>> {
48        self.producer_db
49    }
50
51    fn get_consumer_db(&self) -> Database<Str, U64<BE>> {
52        self.consumer_db
53    }
54}
55
56impl<'env> Producer<'env> {
57    pub fn new(env: &'env Env, name: &str, chunk_size: Option<u64>) -> Result<Self, Box<dyn Error>> {
58        let mut txn = env.write_txn()?;
59        let producer_db: Database<U64<BE>, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "producer"))?;
60        let consumer_db: Database<Str, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "consumer"))?;
61
62        if let Ok(_) = consumer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &KEY_CONSUMER_FILE, &0) {
63            producer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &0, &0)?;
64            consumer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &KEY_CONSUMER_OFFSET, &0)?;
65            consumer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &KEY_CONSUMER_BYTES_READ, &0)?;
66        }
67
68        let (tail_file, _) = producer_db.iter(&txn)?.last().transpose()?.unwrap();
69        let writer = Writer::new(&env.root, name, tail_file)?;
70
71        txn.commit()?;
72
73        Ok(Producer { env, producer_db, consumer_db, writer, chunk_size: chunk_size.unwrap_or(64 * 1024 * 1024) })
74    }
75
76    pub fn push_back_batch<'a, B>(&mut self, messages: &'a B) -> Result<(), Box<dyn Error>>
77    where B: AsRef<[&'a [u8]]>
78    {
79        let mut txn = self.env.write_txn()?;
80        let (mut tail_file, mut offset) = self.producer_db.iter(&txn)?.last().transpose()?.unwrap();
81        if tail_file > self.writer.get_file_num() {
82            self.writer.rotate(Some(tail_file))?;
83        }
84
85        if self.writer.file_size()? > self.chunk_size {
86            self.writer.rotate(None)?;
87            tail_file += 1;
88            offset = 0;
89            self.producer_db.put(&mut txn, &tail_file, &0)?;
90        }
91        self.writer.put_batch(messages)?;
92        self.producer_db.put(&mut txn, &tail_file, &(offset + messages.as_ref().len() as u64))?;
93        txn.commit()?;
94        Ok(())
95    }
96
97    pub fn push_back<'a>(&mut self, message: &'a [u8]) -> Result<(), Box<dyn Error>> {
98        self.push_back_batch(&[message])
99    }
100}
101
102pub struct Consumer<'env> {
103    env: &'env Env,
104    producer_db: Database<U64<BE>, U64<BE>>,
105    consumer_db: Database<Str, U64<BE>>,
106    reader: Reader,
107    chunks_to_keep: u64,
108}
109
110impl <'env> Topic for Consumer<'env> {
111    fn get_env(&self) -> &Env {
112        self.env
113    }
114
115    fn get_producer_db(&self) -> Database<U64<BE>, U64<BE>> {
116        self.producer_db
117    }
118
119    fn get_consumer_db(&self) -> Database<Str, U64<BE>> {
120        self.consumer_db
121    }
122}
123
124impl <'env> Consumer<'env> {
125    pub fn new(env: &'env Env, name: &str, chunks_to_keep: Option<u64>) -> Result<Self, Box<dyn Error>> {
126        let mut txn = env.write_txn()?;
127        let producer_db: Database<U64<BE>, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "producer"))?;
128        let consumer_db: Database<Str, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "consumer"))?;
129
130        let file_num = consumer_db.get(&txn, &KEY_CONSUMER_FILE)?.unwrap();
131        let bytes_read = consumer_db.get(&txn, &KEY_CONSUMER_BYTES_READ)?.unwrap();
132        txn.commit()?;
133
134        let mut reader = Reader::new(&env.root, name, file_num)?;
135        if bytes_read > 0 {
136            reader.set_bytes_read(bytes_read)?;
137        }
138
139        Ok(Consumer { env, producer_db, consumer_db, reader, chunks_to_keep: chunks_to_keep.unwrap_or(8) })
140    }
141
142    pub fn pop_front_n(&mut self, n: u64) -> Result<Vec<Item>, Box<dyn Error>> {
143        let mut txn = self.env.write_txn()?;
144        self.check_chunks_to_keep(&mut txn)?;
145
146        let mut items = vec![];
147        let mut delta = 0;
148        for _ in 0..n {
149            match self.reader.read() {
150                Ok(item) => {
151                    items.push(item);
152                    delta += 1;
153                },
154                Err(_) => {
155                    if self.rotate(&mut txn)? {
156                        items.push(self.reader.read()?);
157                        delta = 1;
158                    } else {
159                        break;
160                    }
161                }
162            }
163        }
164
165        self.inc_offset(&mut txn, delta)?;
166        txn.commit()?;
167        Ok(items)
168    }
169
170    pub fn pop_front(&mut self) -> Result<Option<Item>, Box<dyn Error>> {
171        let mut txn = self.env.write_txn()?;
172        self.check_chunks_to_keep(&mut txn)?;
173
174        match self.reader.read() {
175            Ok(item) => {
176                self.inc_offset(&mut txn, 1)?;
177                txn.commit()?;
178                return Ok(Some(item));
179            },
180            Err(_) => {
181                if self.rotate(&mut txn)? {
182                    let item = self.reader.read()?;
183                    self.inc_offset(&mut txn, 1)?;
184                    txn.commit()?;
185                    return Ok(Some(item));
186                } else {
187                    txn.commit()?;
188                    return Ok(None);
189                }
190            }
191        }
192    }
193
194    fn inc_offset(&mut self, txn: &mut RwTxn, delta: u64) -> Result<(), Box<dyn Error>> {
195        let offset = self.consumer_db.get(&txn, &KEY_CONSUMER_OFFSET)?.unwrap();
196        self.consumer_db.put(txn, &KEY_CONSUMER_OFFSET, &(offset + delta))?;
197
198        self.consumer_db.put(txn, &KEY_CONSUMER_BYTES_READ, &self.reader.get_bytes_read())?;
199        Ok(())
200    }
201
202    fn check_chunks_to_keep(&mut self, txn: &mut RwTxn) -> Result<(), Box<dyn Error>> {
203        let head = self.consumer_db.get(&txn, &KEY_CONSUMER_FILE)?.unwrap();
204        if head != self.reader.get_file_num() {
205            self.reader.rotate(Some(head))?;
206        }
207
208        let bytes_read = self.consumer_db.get(&txn, &KEY_CONSUMER_BYTES_READ)?.unwrap();
209        if bytes_read != self.reader.get_bytes_read() {
210            self.reader.set_bytes_read(bytes_read)?;
211        }
212
213        let (tail, _) = self.producer_db.iter(&txn)?.last().transpose()?.unwrap();
214        let chunk_to_remove: i64 = tail as i64 + 1 - head as i64 - self.chunks_to_keep as i64;
215        for _ in 0..chunk_to_remove {
216            self.rotate(txn)?;
217        }
218
219        Ok(())
220    }
221
222    fn rotate(&mut self, txn: &mut RwTxn) -> Result<bool, Box<dyn Error>> {
223        let head = self.consumer_db.get(&txn, &KEY_CONSUMER_FILE)?.unwrap();
224        let (tail, _) = self.producer_db.iter(&txn)?.last().transpose()?.unwrap();
225        if tail > head {
226            self.reader.rotate(None)?;
227            self.producer_db.delete(txn, &head)?;
228            self.consumer_db.put(txn, &KEY_CONSUMER_FILE, &(head + 1))?;
229            self.consumer_db.put(txn, &KEY_CONSUMER_OFFSET, &0)?;
230            self.consumer_db.put(txn, &KEY_CONSUMER_BYTES_READ, &0)?;
231            Ok(true)
232        } else {
233            Ok(false)
234        }
235    }
236}