lmdb_queue/
env.rs

1use std::path::Path;
2use lmdb::{Environment, RoTransaction, RwTransaction, EnvironmentFlags, Error};
3use libc::{c_uint, size_t};
4
5use super::topic::{Comsumer, Producer};
6
7#[cfg(test)]
8use super::topic::Topic;
9
10pub struct Env {
11    pub lmdb_env: lmdb::Environment,
12    pub root: String,
13}
14
15impl Env {
16    pub fn new<P: AsRef<Path>>(root: P, max_topics: Option<c_uint>, map_size: Option<size_t>) -> Result<Env, Error> {
17        Environment::new()
18            .set_map_size(map_size.unwrap_or(256 * 1024 * 1024))
19            .set_max_dbs(max_topics.unwrap_or(256))
20            .set_flags(EnvironmentFlags::NO_SYNC | EnvironmentFlags::NO_TLS | EnvironmentFlags::NO_SUB_DIR)
21            .open(root.as_ref())
22            .map(|lmdb_env| Env { lmdb_env, root: root.as_ref().to_str().unwrap().to_string() })
23    }
24
25    pub fn producer(&self, name: &str, chunk_size: Option<u64>) -> Result<super::topic::Producer, anyhow::Error> {
26        Producer::new(&self, name, chunk_size)
27    }
28
29    pub fn comsumer(&self, name: &str, chunks_to_keep: Option<u64>) -> Result<super::topic::Comsumer, anyhow::Error> {
30        Comsumer::new(&self, name, chunks_to_keep)
31    }
32
33    pub fn transaction_ro(&self) -> Result<RoTransaction, Error> {
34        self.lmdb_env.begin_ro_txn()
35    }
36
37    pub fn transaction_rw(&self) -> Result<RwTransaction, Error> {
38        self.lmdb_env.begin_rw_txn()
39    }
40}
41
42#[test]
43fn test_single() -> Result<(), anyhow::Error> {
44    let env = Env::new("/tmp/foo_env", None, None)?;
45    let mut producer = env.producer("test", Some(16 *1024 * 1024))?;
46    for i in 0..1024*1024 {
47        producer.push_back(&format!("{}", i).as_bytes())?;
48    }
49
50    let mut comsumer = env.comsumer("test", None)?;
51    let lag = comsumer.lag()?;
52    println!("Current lag is: {}", lag);
53
54    let mut message_count = 0;
55    loop {
56        let item = comsumer.pop_front()?;
57        if let Some(item) = item {
58            message_count += 1;
59            if message_count % (1024 * 100) == 0 {
60                println!("Got message: {}", String::from_utf8(item.data)?);
61                let cur_lag = comsumer.lag()?;
62                assert!(lag == cur_lag + message_count as u64);
63            }
64        } else {
65            println!("Read {} messages.", message_count);
66            break;
67        }
68    }
69
70    Ok(())
71}
72
73#[test]
74fn test_batch() -> Result<(), anyhow::Error> {
75    let env = Env::new("/tmp/foo_env", None, None)?;
76    let mut producer = env.producer("test", Some(16 * 1024 * 1024))?;
77    for i in 0..1024*100 {
78        let vec: Vec<String> = (0..10).map(|v| format!("{}_{}", i, v)).collect();
79        let batch: Vec<&[u8]> = vec.iter().map(|v| v.as_bytes()).collect();
80
81        producer.push_back_batch(&batch)?;
82    }
83
84    let mut comsumer = env.comsumer("test", None)?;
85    let lag = comsumer.lag()?;
86    println!("Current lag is: {}", lag);
87    let mut message_count = 0;
88    loop {
89        let items = comsumer.pop_front_n(10)?;
90        if items.len() > 0 {
91            message_count += items.len();
92            if message_count % (1024 * 100) == 0 {
93                println!("Got message: {}", String::from_utf8(items[0].data.clone())?);
94                let cur_lag = comsumer.lag()?;
95                assert!(lag == cur_lag + message_count as u64);
96            }
97        } else {
98            println!("Read {} messages.", message_count);
99            break;
100        }
101    }
102
103    Ok(())
104}