v_storage/
lmdb_storage.rs

1use v_individual_model::onto::individual::Individual;
2use v_individual_model::onto::parser::parse_raw;
3use crate::common::{Storage, StorageId, StorageMode, StorageResult};
4use lmdb_rs_m::core::EnvCreateFlags;
5use lmdb_rs_m::{DbFlags, DbHandle, EnvBuilder, Environment, MdbError};
6use lmdb_rs_m::{FromMdbValue, ToMdbValue};
7use std::iter::Iterator;
8
9pub struct LMDBStorage {
10    individuals_db: LmdbInstance,
11    tickets_db: LmdbInstance,
12    az_db: LmdbInstance,
13}
14
15pub struct LmdbInstance {
16    max_read_counter: u64,
17    path: String,
18    mode: StorageMode,
19    db_handle: Result<DbHandle, MdbError>,
20    db_env: Result<Environment, MdbError>,
21    read_counter: u64,
22}
23
24impl Default for LmdbInstance {
25    fn default() -> Self {
26        LmdbInstance {
27            max_read_counter: 1000,
28            path: String::default(),
29            mode: StorageMode::ReadOnly,
30            db_handle: Err(MdbError::Panic),
31            db_env: Err(MdbError::Panic),
32            read_counter: 0,
33        }
34    }
35}
36
37struct LmdbIterator {
38    keys: Vec<Vec<u8>>,
39    index: usize,
40}
41
42impl Iterator for LmdbIterator {
43    type Item = Vec<u8>;
44
45    fn next(&mut self) -> Option<Self::Item> {
46        if self.index >= self.keys.len() {
47            None
48        } else {
49            let key = self.keys[self.index].clone();
50            self.index += 1;
51            Some(key)
52        }
53    }
54}
55
56impl LmdbInstance {
57    pub fn new(path: &str, mode: StorageMode) -> Self {
58        LmdbInstance {
59            max_read_counter: 1000,
60            path: path.to_string(),
61            mode,
62            db_handle: Err(MdbError::Panic),
63            db_env: Err(MdbError::Panic),
64            read_counter: 0,
65        }
66    }
67
68    pub fn iter(&mut self) -> Box<dyn Iterator<Item = Vec<u8>>> {
69        if self.db_env.is_err() {
70            self.open();
71        }
72
73        match &self.db_env {
74            Ok(env) => match &self.db_handle {
75                Ok(handle) => match env.get_reader() {
76                    Ok(txn) => {
77                        let db = txn.bind(handle);
78                        let cursor_result = db.new_cursor();
79                        match cursor_result {
80                            Ok(mut cursor) => {
81                                let mut keys = Vec::new();
82                                while let Ok(()) = cursor.to_next_item() {
83                                    if let Ok(key) = cursor.get_key::<Vec<u8>>() {
84                                        keys.push(key);
85                                    }
86                                }
87                                Box::new(LmdbIterator {
88                                    keys,
89                                    index: 0,
90                                })
91                            },
92                            Err(_) => Box::new(std::iter::empty()),
93                        }
94                    },
95                    Err(_) => Box::new(std::iter::empty()),
96                },
97                Err(_) => Box::new(std::iter::empty()),
98            },
99            Err(_) => Box::new(std::iter::empty()),
100        }
101    }
102
103    pub fn open(&mut self) {
104        let flags = if self.mode == StorageMode::ReadOnly {
105            // MDB_NOLOCK (0x20000000) | MDB_RDONLY (0x20000) | MDB_NOMETASYNC (0x40000) | MDB_NOSYNC (0x10000)
106            EnvCreateFlags::from_bits_truncate(0x20000000 | 0x20000 | 0x40000 | 0x10000)
107        } else {
108            // MDB_NOLOCK (0x20000000) | MDB_NOMETASYNC (0x40000) | MDB_NOSYNC (0x10000)
109            EnvCreateFlags::from_bits_truncate(0x20000000 | 0x40000 | 0x10000)
110        };
111        
112        let env_builder = EnvBuilder::new().flags(flags);
113
114        let db_env = env_builder.open(&self.path, 0o644);
115
116        let db_handle = match &db_env {
117            Ok(env) => env.get_default_db(DbFlags::empty()),
118            Err(e) => {
119                error!("LMDB: fail opening read only environment, path=[{}], err={:?}", self.path, e);
120                Err(MdbError::Corrupted)
121            },
122        };
123
124        self.db_handle = db_handle;
125        self.db_env = db_env;
126        self.read_counter = 0;
127    }
128
129    fn get_individual(&mut self, uri: &str, iraw: &mut Individual) -> StorageResult<()> {
130        if let Some(val) = self.get::<&[u8]>(uri) {
131            iraw.set_raw(val);
132
133            return if parse_raw(iraw).is_ok() {
134                StorageResult::Ok(())
135            } else {
136                error!("LMDB: fail parse binobj, path=[{}], len={}, uri=[{}]", self.path, iraw.get_raw_len(), uri);
137                StorageResult::UnprocessableEntity
138            };
139        }
140
141        StorageResult::NotFound
142    }
143
144    fn get_v(&mut self, key: &str) -> Option<String> {
145        self.get::<String>(key)
146    }
147
148    fn get_raw(&mut self, key: &str) -> Option<Vec<u8>> {
149        self.get::<Vec<u8>>(key)
150    }
151
152    pub fn get<T: FromMdbValue>(&mut self, key: &str) -> Option<T> {
153        if self.db_env.is_err() {
154            self.open();
155        }
156
157        for _it in 0..2 {
158            let mut is_need_reopen = false;
159
160            self.read_counter += 1;
161            if self.read_counter > self.max_read_counter {
162                is_need_reopen = true;
163            }
164
165            match &self.db_env {
166                Ok(env) => match &self.db_handle {
167                    Ok(handle) => match env.get_reader() {
168                        Ok(txn) => {
169                            let db = txn.bind(handle);
170
171                            match db.get::<T>(&key) {
172                                Ok(val) => {
173                                    return Some(val);
174                                },
175                                Err(e) => match e {
176                                    MdbError::NotFound => {
177                                        return None;
178                                    },
179                                    _ => {
180                                        error!("LMDB: db.get failed for key=[{}], path=[{}], err={:?}", key, self.path, e);
181                                        return None;
182                                    },
183                                },
184                            }
185                        },
186                        Err(e) => match e {
187                            MdbError::Other(c, _) => {
188                                if c == -30785 {
189                                    is_need_reopen = true;
190                                } else {
191                                    error!("LMDB: failed to create transaction for key=[{}], path=[{}], err={}", key, self.path, e);
192                                    return None;
193                                }
194                            },
195                            _ => {
196                                error!("LMDB: failed to create transaction for key=[{}], path=[{}], err={}", key, self.path, e);
197                            },
198                        },
199                    },
200                    Err(e) => {
201                        error!("LMDB: db handle error for key=[{}], path=[{}], err={}", key, self.path, e);
202                        return None;
203                    },
204                },
205                Err(e) => match e {
206                    MdbError::Panic => {
207                        is_need_reopen = true;
208                    },
209                    _ => {
210                        error!("LMDB: db environment error for key=[{}], path=[{}], err={}", key, self.path, e);
211                        return None;
212                    },
213                },
214            }
215
216            if is_need_reopen {
217                warn!("db {} reopen for key=[{}]", self.path, key);
218                self.open();
219            }
220        }
221
222        None
223    }
224
225    pub fn count(&mut self) -> usize {
226        if self.db_env.is_err() {
227            self.open();
228        }
229
230        for _it in 0..2 {
231            let mut is_need_reopen = false;
232
233            match &self.db_env {
234                Ok(env) => match env.stat() {
235                    Ok(stat) => {
236                        return stat.ms_entries;
237                    },
238                    Err(e) => match e {
239                        MdbError::Other(c, _) => {
240                            if c == -30785 {
241                                is_need_reopen = true;
242                            } else {
243                                error!("LMDB: fail read stat for path=[{}], err={}", self.path, e);
244                                return 0;
245                            }
246                        },
247                        _ => {
248                            error!("LMDB: fail to create transaction for stat read, path=[{}], err={}", self.path, e);
249                        },
250                    },
251                },
252                Err(e) => match e {
253                    MdbError::Panic => {
254                        is_need_reopen = true;
255                    },
256                    _ => {
257                        error!("LMDB: db environment error while reading stat, path=[{}], err={}", self.path, e);
258                        return 0;
259                    },
260                },
261            }
262
263            if is_need_reopen {
264                warn!("db {} reopen for stat read", self.path);
265                self.open();
266            }
267        }
268
269        0
270    }
271
272    pub fn remove(&mut self, key: &str) -> bool {
273        if self.db_env.is_err() {
274            self.open();
275        }
276        remove_from_lmdb(&self.db_env, &self.db_handle, key, &self.path)
277    }
278
279    pub fn put<T: ToMdbValue>(&mut self, key: &str, val: T) -> bool {
280        if self.db_env.is_err() {
281            self.open();
282        }
283        put_kv_lmdb(&self.db_env, &self.db_handle, key, val, &self.path)
284    }
285}
286
287impl LMDBStorage {
288    pub fn new(db_path: &str, mode: StorageMode, max_read_counter_reopen: Option<u64>) -> LMDBStorage {
289        LMDBStorage {
290            individuals_db: LmdbInstance {
291                max_read_counter: max_read_counter_reopen.unwrap_or(u32::MAX as u64),
292                path: db_path.to_owned() + "/lmdb-individuals/",
293                mode: mode.clone(),
294                ..Default::default()
295            },
296            tickets_db: LmdbInstance {
297                max_read_counter: max_read_counter_reopen.unwrap_or(u32::MAX as u64),
298                path: db_path.to_owned() + "/lmdb-tickets/",
299                mode: mode.clone(),
300                ..Default::default()
301            },
302            az_db: LmdbInstance {
303                max_read_counter: max_read_counter_reopen.unwrap_or(u32::MAX as u64),
304                path: db_path.to_owned() + "/acl-indexes/",
305                mode: mode.clone(),
306                ..Default::default()
307            },
308        }
309    }
310
311    fn get_db_instance(&mut self, storage: &StorageId) -> &mut LmdbInstance {
312        match storage {
313            StorageId::Individuals => &mut self.individuals_db,
314            StorageId::Tickets => &mut self.tickets_db,
315            StorageId::Az => &mut self.az_db,
316        }
317    }
318
319    pub fn open(&mut self, storage: StorageId) {
320        let db_instance = self.get_db_instance(&storage);
321        db_instance.open();
322
323        info!("LMDBStorage: db {} open {:?}", db_instance.path, storage);
324    }
325}
326
327impl Storage for LMDBStorage {
328    fn get_individual(&mut self, storage: StorageId, uri: &str, iraw: &mut Individual) -> StorageResult<()> {
329        let db_instance = self.get_db_instance(&storage);
330        db_instance.get_individual(uri, iraw)
331    }
332
333    fn get_value(&mut self, storage: StorageId, key: &str) -> crate::common::StorageResult<String> {
334        let db_instance = self.get_db_instance(&storage);
335        match db_instance.get_v(key) {
336            Some(value) => crate::common::StorageResult::Ok(value),
337            None => crate::common::StorageResult::NotFound,
338        }
339    }
340
341    fn get_raw_value(&mut self, storage: StorageId, key: &str) -> crate::common::StorageResult<Vec<u8>> {
342        let db_instance = self.get_db_instance(&storage);
343        match db_instance.get_raw(key) {
344            Some(value) => crate::common::StorageResult::Ok(value),
345            None => crate::common::StorageResult::NotFound,
346        }
347    }
348
349    fn put_value(&mut self, storage: StorageId, key: &str, val: &str) -> crate::common::StorageResult<()> {
350        let db_instance = self.get_db_instance(&storage);
351        if put_kv_lmdb(&db_instance.db_env, &db_instance.db_handle, key, val.as_bytes(), &db_instance.path) {
352            crate::common::StorageResult::Ok(())
353        } else {
354            crate::common::StorageResult::Error("Failed to put value".to_string())
355        }
356    }
357
358    fn put_raw_value(&mut self, storage: StorageId, key: &str, val: Vec<u8>) -> crate::common::StorageResult<()> {
359        let db_instance = self.get_db_instance(&storage);
360        if put_kv_lmdb(&db_instance.db_env, &db_instance.db_handle, key, val.as_slice(), &db_instance.path) {
361            crate::common::StorageResult::Ok(())
362        } else {
363            crate::common::StorageResult::Error("Failed to put raw value".to_string())
364        }
365    }
366
367    fn remove_value(&mut self, storage: StorageId, key: &str) -> crate::common::StorageResult<()> {
368        let db_instance = self.get_db_instance(&storage);
369        if remove_from_lmdb(&db_instance.db_env, &db_instance.db_handle, key, &db_instance.path) {
370            crate::common::StorageResult::Ok(())
371        } else {
372            crate::common::StorageResult::NotFound
373        }
374    }
375
376    fn count(&mut self, storage: StorageId) -> crate::common::StorageResult<usize> {
377        let db_instance = self.get_db_instance(&storage);
378        crate::common::StorageResult::Ok(db_instance.count())
379    }
380}
381
382fn remove_from_lmdb(db_env: &Result<Environment, MdbError>, db_handle: &Result<DbHandle, MdbError>, key: &str, path: &str) -> bool {
383    match db_env {
384        Ok(env) => match env.new_transaction() {
385            Ok(txn) => match db_handle {
386                Ok(handle) => {
387                    let db = txn.bind(handle);
388                    if let Err(e) = db.del(&key) {
389                        error!("LMDB: failed to remove key=[{}] from path=[{}], err={}", key, path, e);
390                        return false;
391                    }
392
393                    if let Err(e) = txn.commit() {
394                        if let MdbError::Other(c, _) = e {
395                            if c == -30792 && grow_db(db_env, path) {
396                                return remove_from_lmdb(db_env, db_handle, key, path);
397                            }
398                        }
399                        error!("LMDB: failed to commit removal for key=[{}], path=[{}], err={}", key, path, e);
400                        return false;
401                    }
402                    true
403                },
404                Err(e) => {
405                    error!("LMDB: db handle error while removing key=[{}], path=[{}], err={}", key, path, e);
406                    false
407                },
408            },
409            Err(e) => {
410                error!("LMDB: failed to create transaction while removing key=[{}], path=[{}], err={}", key, path, e);
411                false
412            },
413        },
414        Err(e) => {
415            error!("LMDB: db environment error while removing key=[{}], path=[{}], err={}", key, path, e);
416            false
417        },
418    }
419}
420
421fn put_kv_lmdb<T: ToMdbValue>(db_env: &Result<Environment, MdbError>, db_handle: &Result<DbHandle, MdbError>, key: &str, val: T, path: &str) -> bool {
422    match db_env {
423        Ok(env) => match env.new_transaction() {
424            Ok(txn) => match db_handle {
425                Ok(handle) => {
426                    let db = txn.bind(handle);
427                    if let Err(e) = db.set(&key, &val) {
428                        error!("LMDB: failed to put key=[{}] into path=[{}], err={}", key, path, e);
429                        return false;
430                    }
431
432                    if let Err(e) = txn.commit() {
433                        if let MdbError::Other(c, _) = e {
434                            if c == -30792 && grow_db(db_env, path) {
435                                return put_kv_lmdb(db_env, db_handle, key, val, path);
436                            }
437                        }
438                        error!("LMDB: failed to commit put for key=[{}], path=[{}], err={}", key, path, e);
439                        return false;
440                    }
441                    true
442                },
443                Err(e) => {
444                    error!("LMDB: db handle error while putting key=[{}], path=[{}], err={}", key, path, e);
445                    false
446                },
447            },
448            Err(e) => {
449                error!("LMDB: failed to create transaction while putting key=[{}], path=[{}], err={}", key, path, e);
450                false
451            },
452        },
453        Err(e) => {
454            error!("LMDB: db environment error while putting key=[{}], path=[{}], err={}", key, path, e);
455            false
456        },
457    }
458}
459
460fn grow_db(db_env: &Result<Environment, MdbError>, path: &str) -> bool {
461    match db_env {
462        Ok(env) => {
463            if let Ok(stat) = env.info() {
464                let new_size = stat.me_mapsize + 100 * 10_048_576;
465                if env.set_mapsize(new_size).is_ok() {
466                    info!("success grow db, new size = {}", new_size);
467                    return true;
468                }
469            }
470        },
471        Err(e) => {
472            error!("LMDB: db environment error while growing db, path=[{}], err={}", path, e);
473        },
474    }
475    false
476}