1use std::error::Error;
2use heed3::byteorder::BE;
3use heed3::types::*;
4use heed3::{RwTxn, Database, PutFlags};
5
6use super::env::Env;
7
8use super::reader::{Reader, Item};
9use super::writer::Writer;
10
11pub static KEY_CONSUMER_FILE: &str = "FILE";
12pub static KEY_CONSUMER_OFFSET: &str = "OFFSET";
13pub static KEY_CONSUMER_BYTES_READ: &str = "BYTES_READ";
14
15pub trait Topic {
16 fn get_env(&self) -> &Env;
17 fn get_producer_db(&self) -> Database<U64<BE>, U64<BE>>;
18 fn get_consumer_db(&self) -> Database<Str, U64<BE>>;
19
20 fn lag(&self) -> Result<u64, Box<dyn Error>> {
21 let txn = self.get_env().write_txn()?;
22
23 let mut pit = self.get_producer_db().iter(&txn)?.move_between_keys();
24 let mut total: u64 = 0;
25 while let Some((_, v)) = pit.next().transpose()? {
26 total += v;
27 }
28
29 let head_offset = self.get_consumer_db().get(&txn, KEY_CONSUMER_OFFSET)?.unwrap_or(0);
30 Ok(total - head_offset)
31 }
32}
33
34pub struct Producer<'env> {
35 env: &'env Env,
36 producer_db: Database<U64<BE>, U64<BE>>,
37 consumer_db: Database<Str, U64<BE>>,
38 writer: Writer,
39 chunk_size: u64,
40}
41
42impl<'env> Topic for Producer<'env> {
43 fn get_env(&self) -> &Env {
44 self.env
45 }
46
47 fn get_producer_db(&self) -> Database<U64<BE>, U64<BE>> {
48 self.producer_db
49 }
50
51 fn get_consumer_db(&self) -> Database<Str, U64<BE>> {
52 self.consumer_db
53 }
54}
55
56impl<'env> Producer<'env> {
57 pub fn new(env: &'env Env, name: &str, chunk_size: Option<u64>) -> Result<Self, Box<dyn Error>> {
58 let mut txn = env.write_txn()?;
59 let producer_db: Database<U64<BE>, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "producer"))?;
60 let consumer_db: Database<Str, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "consumer"))?;
61
62 if let Ok(_) = consumer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &KEY_CONSUMER_FILE, &0) {
63 producer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &0, &0)?;
64 consumer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &KEY_CONSUMER_OFFSET, &0)?;
65 consumer_db.put_with_flags(&mut txn, PutFlags::NO_OVERWRITE, &KEY_CONSUMER_BYTES_READ, &0)?;
66 }
67
68 let (tail_file, _) = producer_db.iter(&txn)?.last().transpose()?.unwrap();
69 let writer = Writer::new(&env.root, name, tail_file)?;
70
71 txn.commit()?;
72
73 Ok(Producer { env, producer_db, consumer_db, writer, chunk_size: chunk_size.unwrap_or(64 * 1024 * 1024) })
74 }
75
76 pub fn push_back_batch<'a, B>(&mut self, messages: &'a B) -> Result<(), Box<dyn Error>>
77 where B: AsRef<[&'a [u8]]>
78 {
79 let mut txn = self.env.write_txn()?;
80 let (mut tail_file, mut offset) = self.producer_db.iter(&txn)?.last().transpose()?.unwrap();
81 if tail_file > self.writer.get_file_num() {
82 self.writer.rotate(Some(tail_file))?;
83 }
84
85 if self.writer.file_size()? > self.chunk_size {
86 self.writer.rotate(None)?;
87 tail_file += 1;
88 offset = 0;
89 self.producer_db.put(&mut txn, &tail_file, &0)?;
90 }
91 self.writer.put_batch(messages)?;
92 self.producer_db.put(&mut txn, &tail_file, &(offset + messages.as_ref().len() as u64))?;
93 txn.commit()?;
94 Ok(())
95 }
96
97 pub fn push_back<'a>(&mut self, message: &'a [u8]) -> Result<(), Box<dyn Error>> {
98 self.push_back_batch(&[message])
99 }
100}
101
102pub struct Consumer<'env> {
103 env: &'env Env,
104 producer_db: Database<U64<BE>, U64<BE>>,
105 consumer_db: Database<Str, U64<BE>>,
106 reader: Reader,
107 chunks_to_keep: u64,
108}
109
110impl <'env> Topic for Consumer<'env> {
111 fn get_env(&self) -> &Env {
112 self.env
113 }
114
115 fn get_producer_db(&self) -> Database<U64<BE>, U64<BE>> {
116 self.producer_db
117 }
118
119 fn get_consumer_db(&self) -> Database<Str, U64<BE>> {
120 self.consumer_db
121 }
122}
123
124impl <'env> Consumer<'env> {
125 pub fn new(env: &'env Env, name: &str, chunks_to_keep: Option<u64>) -> Result<Self, Box<dyn Error>> {
126 let mut txn = env.write_txn()?;
127 let producer_db: Database<U64<BE>, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "producer"))?;
128 let consumer_db: Database<Str, U64<BE>> = env.db(&mut txn, &format!("{}_{}", name, "consumer"))?;
129
130 let file_num = consumer_db.get(&txn, &KEY_CONSUMER_FILE)?.unwrap();
131 let bytes_read = consumer_db.get(&txn, &KEY_CONSUMER_BYTES_READ)?.unwrap();
132 txn.commit()?;
133
134 let mut reader = Reader::new(&env.root, name, file_num)?;
135 if bytes_read > 0 {
136 reader.set_bytes_read(bytes_read)?;
137 }
138
139 Ok(Consumer { env, producer_db, consumer_db, reader, chunks_to_keep: chunks_to_keep.unwrap_or(8) })
140 }
141
142 pub fn pop_front_n(&mut self, n: u64) -> Result<Vec<Item>, Box<dyn Error>> {
143 let mut txn = self.env.write_txn()?;
144 self.check_chunks_to_keep(&mut txn)?;
145
146 let mut items = vec![];
147 let mut delta = 0;
148 for _ in 0..n {
149 match self.reader.read() {
150 Ok(item) => {
151 items.push(item);
152 delta += 1;
153 },
154 Err(_) => {
155 if self.rotate(&mut txn)? {
156 items.push(self.reader.read()?);
157 delta = 1;
158 } else {
159 break;
160 }
161 }
162 }
163 }
164
165 self.inc_offset(&mut txn, delta)?;
166 txn.commit()?;
167 Ok(items)
168 }
169
170 pub fn pop_front(&mut self) -> Result<Option<Item>, Box<dyn Error>> {
171 let mut txn = self.env.write_txn()?;
172 self.check_chunks_to_keep(&mut txn)?;
173
174 match self.reader.read() {
175 Ok(item) => {
176 self.inc_offset(&mut txn, 1)?;
177 txn.commit()?;
178 return Ok(Some(item));
179 },
180 Err(_) => {
181 if self.rotate(&mut txn)? {
182 let item = self.reader.read()?;
183 self.inc_offset(&mut txn, 1)?;
184 txn.commit()?;
185 return Ok(Some(item));
186 } else {
187 txn.commit()?;
188 return Ok(None);
189 }
190 }
191 }
192 }
193
194 fn inc_offset(&mut self, txn: &mut RwTxn, delta: u64) -> Result<(), Box<dyn Error>> {
195 let offset = self.consumer_db.get(&txn, &KEY_CONSUMER_OFFSET)?.unwrap();
196 self.consumer_db.put(txn, &KEY_CONSUMER_OFFSET, &(offset + delta))?;
197
198 self.consumer_db.put(txn, &KEY_CONSUMER_BYTES_READ, &self.reader.get_bytes_read())?;
199 Ok(())
200 }
201
202 fn check_chunks_to_keep(&mut self, txn: &mut RwTxn) -> Result<(), Box<dyn Error>> {
203 let head = self.consumer_db.get(&txn, &KEY_CONSUMER_FILE)?.unwrap();
204 if head != self.reader.get_file_num() {
205 self.reader.rotate(Some(head))?;
206 }
207
208 let bytes_read = self.consumer_db.get(&txn, &KEY_CONSUMER_BYTES_READ)?.unwrap();
209 if bytes_read != self.reader.get_bytes_read() {
210 self.reader.set_bytes_read(bytes_read)?;
211 }
212
213 let (tail, _) = self.producer_db.iter(&txn)?.last().transpose()?.unwrap();
214 let chunk_to_remove: i64 = tail as i64 + 1 - head as i64 - self.chunks_to_keep as i64;
215 for _ in 0..chunk_to_remove {
216 self.rotate(txn)?;
217 }
218
219 Ok(())
220 }
221
222 fn rotate(&mut self, txn: &mut RwTxn) -> Result<bool, Box<dyn Error>> {
223 let head = self.consumer_db.get(&txn, &KEY_CONSUMER_FILE)?.unwrap();
224 let (tail, _) = self.producer_db.iter(&txn)?.last().transpose()?.unwrap();
225 if tail > head {
226 self.reader.rotate(None)?;
227 self.producer_db.delete(txn, &head)?;
228 self.consumer_db.put(txn, &KEY_CONSUMER_FILE, &(head + 1))?;
229 self.consumer_db.put(txn, &KEY_CONSUMER_OFFSET, &0)?;
230 self.consumer_db.put(txn, &KEY_CONSUMER_BYTES_READ, &0)?;
231 Ok(true)
232 } else {
233 Ok(false)
234 }
235 }
236}