rustix_bl/
persistencer.rs1use config::StaticConfig;
2use lmdb;
3use datastore::Datastore;
4use rustix_event_shop::Event;
5use rustix_event_shop::BLEvents;
6use serde_json::Error as Error_JSON;
7use lmdb::Error as Error_LMDB;
8use lmdb::EnvironmentBuilder;
9use lmdb::Environment;
10use lmdb::Database;
11use lmdb::DatabaseFlags;
12use lmdb::RwTransaction;
13use lmdb::RoTransaction;
14use lmdb::RoCursor;
15use lmdb::WriteFlags;
16use std::str;
17use std::path::Path;
18use lmdb::Cursor;
19use lmdb::Transaction;
20use std::marker::Sized;
21use std::convert::AsRef;
22use bincode::{deserialize, serialize, Infinite};
23use serde_json;
24use std;
25use std::error::Error;
26use std::fmt;
27use errors;
28use std::io::Cursor as IOCursor;
29use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
30
31quick_error! {
32 #[derive(Debug)]
33 pub enum RustixError {
34 DB(err: Error_LMDB) {}
36 SerialJson(err: Error_JSON) {}
38 SerialUTF8(err: std::str::Utf8Error) {}
40 Init(err: errors::custom_errors::CustomRustixFrontendError) {}
42 Other(err: Box<std::error::Error>) {
44 cause(&**err)
45 description(err.description())
46 }
47 }
48}
49
50
51impl std::convert::From<errors::custom_errors::CustomRustixFrontendError> for RustixError {
52 fn from(e: errors::custom_errors::CustomRustixFrontendError) -> Self {
53 return RustixError::Init(e);
54 }
55}
56
57impl std::convert::From<Error_JSON> for RustixError {
58 fn from(e: Error_JSON) -> Self {
59 return RustixError::SerialJson(e);
60 }
61}
62
63impl std::convert::From<std::str::Utf8Error> for RustixError {
64 fn from(e: std::str::Utf8Error) -> Self {
65 return RustixError::SerialUTF8(e);
66 }
67}
68
69impl std::convert::From<Error_LMDB> for RustixError {
70 fn from(e: Error_LMDB) -> Self {
71 return RustixError::DB(e);
72 }
73}
74
75
76pub trait Persistencer {
77 fn test_store_apply(&mut self, event: &BLEvents, datastore: &mut Datastore) -> bool;
78
79 fn reload_from_filepath(&mut self, datastore: &mut Datastore) -> Result<u64, RustixError>;
81 }
83
84#[derive(Debug)]
85pub struct LmdbDb {
86 pub db: lmdb::Database,
87 pub db_env: lmdb::Environment,
88}
89
90#[derive(Debug)]
91pub struct FilePersister {
92 pub config: StaticConfig,
93 pub lmdb: Option<LmdbDb>,
94}
95
96impl FilePersister {
97 pub fn new(config: StaticConfig) -> Result<Self, lmdb::Error> {
98 let lmdb = if config.use_persistence {
99 let dir: &std::path::Path = std::path::Path::new(&config.persistence_file_path);
100 let db_flags: lmdb::DatabaseFlags = lmdb::DatabaseFlags::empty();
101 println!("trying to get env");
102 let db_environment = try!(lmdb::Environment::new().set_max_dbs(1).open(&dir));
103 println!("trying to get database");
104 let database = try!(db_environment.create_db(None, db_flags));
105 println!("gotten database");
106 Some(LmdbDb {
107 db: database,
108 db_env: db_environment,
109 })
110 } else {
111 None
112 };
113 println!("first part finished");
114
115 let mut fp = FilePersister {
116 config: config,
117 lmdb: lmdb,
118 };
119
120 return Ok(fp);
121 }
122}
123
124
125pub trait LMDBPersistencer {
126 fn store_event_in_db(&mut self, id: u64, event: &BLEvents) -> Result<(), RustixError>;
127}
128
129fn transform_u32_to_array_of_u8(x: u32) -> [u8; 4] {
130 let b1: u8 = ((x >> 24) & 0xff) as u8;
131 let b2: u8 = ((x >> 16) & 0xff) as u8;
132 let b3: u8 = ((x >> 8) & 0xff) as u8;
133 let b4: u8 = (x & 0xff) as u8;
134 return [b1, b2, b3, b4];
135}
136
137impl LMDBPersistencer for FilePersister {
138 fn store_event_in_db(&mut self, id: u64, event: &BLEvents) -> Result<(), RustixError> {
139 match self.lmdb {
140 Some(ref lmdb) => {
141 let mut rw_transaction: RwTransaction = try!(lmdb.db_env.begin_rw_txn());
142 let tx_flags: WriteFlags = WriteFlags::empty();
143 let key = id_to_key(id);let data = try!(serde_json::to_string(event));
145 let result = rw_transaction.put(lmdb.db, &key, &data, tx_flags);
146 try!(rw_transaction.commit());
147 }
148 None => (),
149 }
150 return Ok(());
151 }
152
153}
154
155impl Persistencer for FilePersister {
156 fn test_store_apply(&mut self, event: &BLEvents, datastore: &mut Datastore) -> bool {
157 let allowed = event.can_be_applied(datastore);
158 println!("Result with allowed = {} for event: {:?}", allowed, event);
159 if allowed {
160 let id: u64 = datastore.version + 1u64;
161 match self.store_event_in_db(id, event) {
162 Err(e) => {
163 println!("Failure storing for {:?}", event);
164 return false
165 },
166 Ok(t) => {
167 datastore.version += 1u64;
168 println!("Success storing for {:?} with new version #{}", event, datastore.version);
169 return event.apply(datastore, &self.config);
170 }
171 }
172 } else {
173 return false;
174 }
175 }
176
177 fn reload_from_filepath(&mut self, datastore: &mut Datastore) -> Result<u64, RustixError> {
178 let counter = datastore.version;
179
180 println!("Reloading events from lmdb with counter = {}", counter);
181
182 match self.lmdb {
183 Some(ref lmdb) => {
184 let count = lmdb.db_env.stat().unwrap().entries() as u64;
186 if count > counter {
187 let tx = try!(lmdb.db_env.begin_ro_txn());
188 {
189 let mut cursor: RoCursor = try!(tx.open_ro_cursor(lmdb.db));
190
191 let key = id_to_key(counter + 1u64);
192 let iter = if counter != 0u64 {
193 cursor.iter_from(key)
194 } else {
195 cursor.iter_start()
196 };
197 for keyvalue in iter {
198 let (key, value) = keyvalue;
199 let id = key_to_id(key);
200 let json = try!(str::from_utf8(value));
201 println!("{:?} [ {:?} ] ==> {:?}", id, key, json);
202 let event: BLEvents = try!(serde_json::from_str(json));
203 if event.can_be_applied(datastore) {
204 event.apply(datastore, &self.config);
205 datastore.version += 1u64;
206 } else {
207 println!("CARE: could not apply event {:?} to datastore state: {:?}", event, datastore);
208 }
209 }
210 }
211 }
212 }
213 None => (),
214 }
215
216 return Ok(datastore.version);
217 }
218}
219
220
221pub fn id_to_key(id: u64) -> Vec<u8> {
222 let mut wtr = vec![];
223 wtr.write_u64::<BigEndian>(id).unwrap();
224 return wtr;
225}
226
227pub fn key_to_id(key: &[u8]) -> u64 {
228 let mut rdr = IOCursor::new(key);
229 return rdr.read_u64::<BigEndian>().unwrap();
230}