1use std::path::Path;
2use lmdb::{Environment, RoTransaction, RwTransaction, EnvironmentFlags, Error};
3use libc::{c_uint, size_t};
4
5use super::topic::{Comsumer, Producer};
6
7#[cfg(test)]
8use super::topic::Topic;
9
10pub struct Env {
11 pub lmdb_env: lmdb::Environment,
12 pub root: String,
13}
14
15impl Env {
16 pub fn new<P: AsRef<Path>>(root: P, max_topics: Option<c_uint>, map_size: Option<size_t>) -> Result<Env, Error> {
17 Environment::new()
18 .set_map_size(map_size.unwrap_or(256 * 1024 * 1024))
19 .set_max_dbs(max_topics.unwrap_or(256))
20 .set_flags(EnvironmentFlags::NO_SYNC | EnvironmentFlags::NO_TLS | EnvironmentFlags::NO_SUB_DIR)
21 .open(root.as_ref())
22 .map(|lmdb_env| Env { lmdb_env, root: root.as_ref().to_str().unwrap().to_string() })
23 }
24
25 pub fn producer(&self, name: &str, chunk_size: Option<u64>) -> Result<super::topic::Producer, anyhow::Error> {
26 Producer::new(&self, name, chunk_size)
27 }
28
29 pub fn comsumer(&self, name: &str, chunks_to_keep: Option<u64>) -> Result<super::topic::Comsumer, anyhow::Error> {
30 Comsumer::new(&self, name, chunks_to_keep)
31 }
32
33 pub fn transaction_ro(&self) -> Result<RoTransaction, Error> {
34 self.lmdb_env.begin_ro_txn()
35 }
36
37 pub fn transaction_rw(&self) -> Result<RwTransaction, Error> {
38 self.lmdb_env.begin_rw_txn()
39 }
40}
41
42#[test]
43fn test_single() -> Result<(), anyhow::Error> {
44 let env = Env::new("/tmp/foo_env", None, None)?;
45 let mut producer = env.producer("test", Some(16 *1024 * 1024))?;
46 for i in 0..1024*1024 {
47 producer.push_back(&format!("{}", i).as_bytes())?;
48 }
49
50 let mut comsumer = env.comsumer("test", None)?;
51 let lag = comsumer.lag()?;
52 println!("Current lag is: {}", lag);
53
54 let mut message_count = 0;
55 loop {
56 let item = comsumer.pop_front()?;
57 if let Some(item) = item {
58 message_count += 1;
59 if message_count % (1024 * 100) == 0 {
60 println!("Got message: {}", String::from_utf8(item.data)?);
61 let cur_lag = comsumer.lag()?;
62 assert!(lag == cur_lag + message_count as u64);
63 }
64 } else {
65 println!("Read {} messages.", message_count);
66 break;
67 }
68 }
69
70 Ok(())
71}
72
73#[test]
74fn test_batch() -> Result<(), anyhow::Error> {
75 let env = Env::new("/tmp/foo_env", None, None)?;
76 let mut producer = env.producer("test", Some(16 * 1024 * 1024))?;
77 for i in 0..1024*100 {
78 let vec: Vec<String> = (0..10).map(|v| format!("{}_{}", i, v)).collect();
79 let batch: Vec<&[u8]> = vec.iter().map(|v| v.as_bytes()).collect();
80
81 producer.push_back_batch(&batch)?;
82 }
83
84 let mut comsumer = env.comsumer("test", None)?;
85 let lag = comsumer.lag()?;
86 println!("Current lag is: {}", lag);
87 let mut message_count = 0;
88 loop {
89 let items = comsumer.pop_front_n(10)?;
90 if items.len() > 0 {
91 message_count += items.len();
92 if message_count % (1024 * 100) == 0 {
93 println!("Got message: {}", String::from_utf8(items[0].data.clone())?);
94 let cur_lag = comsumer.lag()?;
95 assert!(lag == cur_lag + message_count as u64);
96 }
97 } else {
98 println!("Read {} messages.", message_count);
99 break;
100 }
101 }
102
103 Ok(())
104}