rustix_bl/
persistencer.rs

1use config::StaticConfig;
2use lmdb;
3use datastore::Datastore;
4use rustix_event_shop::Event;
5use rustix_event_shop::BLEvents;
6use serde_json::Error as Error_JSON;
7use lmdb::Error as Error_LMDB;
8use lmdb::EnvironmentBuilder;
9use lmdb::Environment;
10use lmdb::Database;
11use lmdb::DatabaseFlags;
12use lmdb::RwTransaction;
13use lmdb::RoTransaction;
14use lmdb::RoCursor;
15use lmdb::WriteFlags;
16use std::str;
17use std::path::Path;
18use lmdb::Cursor;
19use lmdb::Transaction;
20use std::marker::Sized;
21use std::convert::AsRef;
22use bincode::{deserialize, serialize, Infinite};
23use serde_json;
24use std;
25use std::error::Error;
26use std::fmt;
27use errors;
28use std::io::Cursor as IOCursor;
29use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
30
31quick_error! {
32    #[derive(Debug)]
33    pub enum RustixError {
34        /// DB Error
35        DB(err: Error_LMDB) {}
36        /// Serialization Error
37        SerialJson(err: Error_JSON) {}
38        /// Utf8 Error
39        SerialUTF8(err: std::str::Utf8Error) {}
40        /// My own Error
41        Init(err: errors::custom_errors::CustomRustixFrontendError) {}
42        ///other Error
43        Other(err: Box<std::error::Error>) {
44            cause(&**err)
45            description(err.description())
46        }
47    }
48}
49
50
51impl std::convert::From<errors::custom_errors::CustomRustixFrontendError> for RustixError {
52    fn from(e: errors::custom_errors::CustomRustixFrontendError) -> Self {
53        return RustixError::Init(e);
54    }
55}
56
57impl std::convert::From<Error_JSON> for RustixError {
58    fn from(e: Error_JSON) -> Self {
59        return RustixError::SerialJson(e);
60    }
61}
62
63impl std::convert::From<std::str::Utf8Error> for RustixError {
64    fn from(e: std::str::Utf8Error) -> Self {
65        return RustixError::SerialUTF8(e);
66    }
67}
68
69impl std::convert::From<Error_LMDB> for RustixError {
70    fn from(e: Error_LMDB) -> Self {
71        return RustixError::DB(e);
72    }
73}
74
75
76pub trait Persistencer {
77    fn test_store_apply(&mut self, event: &BLEvents, datastore: &mut Datastore) -> bool;
78
79    //returns number of events loaded
80    fn reload_from_filepath(&mut self, datastore: &mut Datastore) -> Result<u64, RustixError>;
81    //fn initialize(&mut self, datastore: &mut Datastore) -> Result<u32, RustixError>;
82}
83
84#[derive(Debug)]
85pub struct LmdbDb {
86    pub db: lmdb::Database,
87    pub db_env: lmdb::Environment,
88}
89
90#[derive(Debug)]
91pub struct FilePersister {
92    pub config: StaticConfig,
93    pub lmdb: Option<LmdbDb>,
94}
95
96impl FilePersister {
97    pub fn new(config: StaticConfig) -> Result<Self, lmdb::Error> {
98        let lmdb = if config.use_persistence {
99            let dir: &std::path::Path = std::path::Path::new(&config.persistence_file_path);
100            let db_flags: lmdb::DatabaseFlags = lmdb::DatabaseFlags::empty();
101            println!("trying to get env");
102            let db_environment = try!(lmdb::Environment::new().set_max_dbs(1).open(&dir));
103            println!("trying to get database");
104            let database = try!(db_environment.create_db(None, db_flags));
105            println!("gotten database");
106            Some(LmdbDb {
107                db: database,
108                db_env: db_environment,
109            })
110        } else {
111            None
112        };
113        println!("first part finished");
114
115        let mut fp = FilePersister {
116            config: config,
117            lmdb: lmdb,
118        };
119
120        return Ok(fp);
121    }
122}
123
124
125pub trait LMDBPersistencer {
126    fn store_event_in_db(&mut self, id: u64, event: &BLEvents) -> Result<(), RustixError>;
127}
128
129fn transform_u32_to_array_of_u8(x: u32) -> [u8; 4] {
130    let b1: u8 = ((x >> 24) & 0xff) as u8;
131    let b2: u8 = ((x >> 16) & 0xff) as u8;
132    let b3: u8 = ((x >> 8) & 0xff) as u8;
133    let b4: u8 = (x & 0xff) as u8;
134    return [b1, b2, b3, b4];
135}
136
137impl LMDBPersistencer for FilePersister {
138    fn store_event_in_db(&mut self, id: u64, event: &BLEvents) -> Result<(), RustixError> {
139        match self.lmdb {
140            Some(ref lmdb) => {
141                let mut rw_transaction: RwTransaction = try!(lmdb.db_env.begin_rw_txn());
142                let tx_flags: WriteFlags = WriteFlags::empty();
143                let key = id_to_key(id);//   transform_u32_to_array_of_u8(id);
144                let data = try!(serde_json::to_string(event));
145                let result = rw_transaction.put(lmdb.db, &key, &data, tx_flags);
146                try!(rw_transaction.commit());
147            }
148            None => (),
149        }
150        return Ok(());
151    }
152
153}
154
155impl Persistencer for FilePersister {
156    fn test_store_apply(&mut self, event: &BLEvents, datastore: &mut Datastore) -> bool {
157        let allowed = event.can_be_applied(datastore);
158        println!("Result with allowed = {} for event: {:?}", allowed, event);
159        if allowed {
160            let id: u64 = datastore.version + 1u64;
161            match self.store_event_in_db(id, event) {
162                Err(e) => {
163                    println!("Failure storing for {:?}", event);
164                    return false
165                },
166                Ok(t) => {
167                    datastore.version += 1u64;
168                    println!("Success storing for {:?} with new version #{}", event, datastore.version);
169                    return event.apply(datastore, &self.config);
170                }
171            }
172        } else {
173            return false;
174        }
175    }
176
177    fn reload_from_filepath(&mut self, datastore: &mut Datastore) -> Result<u64, RustixError> {
178        let counter = datastore.version;
179
180        println!("Reloading events from lmdb with counter = {}", counter);
181
182        match self.lmdb {
183            Some(ref lmdb) => {
184                //build and use iterator if database is non-empty
185                let count = lmdb.db_env.stat().unwrap().entries() as u64;
186                if count > counter {
187                    let tx = try!(lmdb.db_env.begin_ro_txn());
188                    {
189                        let mut cursor: RoCursor = try!(tx.open_ro_cursor(lmdb.db));
190
191                        let key = id_to_key(counter + 1u64);
192                        let iter = if counter != 0u64 {
193                            cursor.iter_from(key)
194                        } else {
195                            cursor.iter_start()
196                        };
197                        for keyvalue in iter {
198                            let (key, value) = keyvalue;
199                            let id = key_to_id(key);
200                            let json = try!(str::from_utf8(value));
201                            println!("{:?} [ {:?} ] ==> {:?}", id, key, json);
202                            let event: BLEvents = try!(serde_json::from_str(json));
203                            if event.can_be_applied(datastore) {
204                                event.apply(datastore, &self.config);
205                                datastore.version += 1u64;
206                            } else {
207                                println!("CARE: could not apply event {:?} to datastore state: {:?}", event, datastore);
208                            }
209                        }
210                    }
211                }
212            }
213            None => (),
214        }
215
216        return Ok(datastore.version);
217    }
218}
219
220
221pub fn id_to_key(id: u64) -> Vec<u8> {
222    let mut wtr = vec![];
223    wtr.write_u64::<BigEndian>(id).unwrap();
224    return wtr;
225}
226
227pub fn key_to_id(key: &[u8]) -> u64 {
228    let mut rdr = IOCursor::new(key);
229    return rdr.read_u64::<BigEndian>().unwrap();
230}