1use v_individual_model::onto::individual::Individual;
2use v_individual_model::onto::parser::parse_raw;
3use crate::common::{Storage, StorageId, StorageMode, StorageResult};
4use lmdb_rs_m::core::EnvCreateFlags;
5use lmdb_rs_m::{DbFlags, DbHandle, EnvBuilder, Environment, MdbError};
6use lmdb_rs_m::{FromMdbValue, ToMdbValue};
7use std::iter::Iterator;
8
9pub struct LMDBStorage {
10 individuals_db: LmdbInstance,
11 tickets_db: LmdbInstance,
12 az_db: LmdbInstance,
13}
14
15pub struct LmdbInstance {
16 max_read_counter: u64,
17 path: String,
18 mode: StorageMode,
19 db_handle: Result<DbHandle, MdbError>,
20 db_env: Result<Environment, MdbError>,
21 read_counter: u64,
22}
23
24impl Default for LmdbInstance {
25 fn default() -> Self {
26 LmdbInstance {
27 max_read_counter: 1000,
28 path: String::default(),
29 mode: StorageMode::ReadOnly,
30 db_handle: Err(MdbError::Panic),
31 db_env: Err(MdbError::Panic),
32 read_counter: 0,
33 }
34 }
35}
36
37struct LmdbIterator {
38 keys: Vec<Vec<u8>>,
39 index: usize,
40}
41
42impl Iterator for LmdbIterator {
43 type Item = Vec<u8>;
44
45 fn next(&mut self) -> Option<Self::Item> {
46 if self.index >= self.keys.len() {
47 None
48 } else {
49 let key = self.keys[self.index].clone();
50 self.index += 1;
51 Some(key)
52 }
53 }
54}
55
56impl LmdbInstance {
57 pub fn new(path: &str, mode: StorageMode) -> Self {
58 LmdbInstance {
59 max_read_counter: 1000,
60 path: path.to_string(),
61 mode,
62 db_handle: Err(MdbError::Panic),
63 db_env: Err(MdbError::Panic),
64 read_counter: 0,
65 }
66 }
67
68 pub fn iter(&mut self) -> Box<dyn Iterator<Item = Vec<u8>>> {
69 if self.db_env.is_err() {
70 self.open();
71 }
72
73 match &self.db_env {
74 Ok(env) => match &self.db_handle {
75 Ok(handle) => match env.get_reader() {
76 Ok(txn) => {
77 let db = txn.bind(handle);
78 let cursor_result = db.new_cursor();
79 match cursor_result {
80 Ok(mut cursor) => {
81 let mut keys = Vec::new();
82 while let Ok(()) = cursor.to_next_item() {
83 if let Ok(key) = cursor.get_key::<Vec<u8>>() {
84 keys.push(key);
85 }
86 }
87 Box::new(LmdbIterator {
88 keys,
89 index: 0,
90 })
91 },
92 Err(_) => Box::new(std::iter::empty()),
93 }
94 },
95 Err(_) => Box::new(std::iter::empty()),
96 },
97 Err(_) => Box::new(std::iter::empty()),
98 },
99 Err(_) => Box::new(std::iter::empty()),
100 }
101 }
102
103 pub fn open(&mut self) {
104 let flags = if self.mode == StorageMode::ReadOnly {
105 EnvCreateFlags::from_bits_truncate(0x20000000 | 0x20000 | 0x40000 | 0x10000)
107 } else {
108 EnvCreateFlags::from_bits_truncate(0x20000000 | 0x40000 | 0x10000)
110 };
111
112 let env_builder = EnvBuilder::new().flags(flags);
113
114 let db_env = env_builder.open(&self.path, 0o644);
115
116 let db_handle = match &db_env {
117 Ok(env) => env.get_default_db(DbFlags::empty()),
118 Err(e) => {
119 error!("LMDB: fail opening read only environment, path=[{}], err={:?}", self.path, e);
120 Err(MdbError::Corrupted)
121 },
122 };
123
124 self.db_handle = db_handle;
125 self.db_env = db_env;
126 self.read_counter = 0;
127 }
128
129 fn get_individual(&mut self, uri: &str, iraw: &mut Individual) -> StorageResult<()> {
130 if let Some(val) = self.get::<&[u8]>(uri) {
131 iraw.set_raw(val);
132
133 return if parse_raw(iraw).is_ok() {
134 StorageResult::Ok(())
135 } else {
136 error!("LMDB: fail parse binobj, path=[{}], len={}, uri=[{}]", self.path, iraw.get_raw_len(), uri);
137 StorageResult::UnprocessableEntity
138 };
139 }
140
141 StorageResult::NotFound
142 }
143
144 fn get_v(&mut self, key: &str) -> Option<String> {
145 self.get::<String>(key)
146 }
147
148 fn get_raw(&mut self, key: &str) -> Option<Vec<u8>> {
149 self.get::<Vec<u8>>(key)
150 }
151
152 pub fn get<T: FromMdbValue>(&mut self, key: &str) -> Option<T> {
153 if self.db_env.is_err() {
154 self.open();
155 }
156
157 for _it in 0..2 {
158 let mut is_need_reopen = false;
159
160 self.read_counter += 1;
161 if self.read_counter > self.max_read_counter {
162 is_need_reopen = true;
163 }
164
165 match &self.db_env {
166 Ok(env) => match &self.db_handle {
167 Ok(handle) => match env.get_reader() {
168 Ok(txn) => {
169 let db = txn.bind(handle);
170
171 match db.get::<T>(&key) {
172 Ok(val) => {
173 return Some(val);
174 },
175 Err(e) => match e {
176 MdbError::NotFound => {
177 return None;
178 },
179 _ => {
180 error!("LMDB: db.get failed for key=[{}], path=[{}], err={:?}", key, self.path, e);
181 return None;
182 },
183 },
184 }
185 },
186 Err(e) => match e {
187 MdbError::Other(c, _) => {
188 if c == -30785 {
189 is_need_reopen = true;
190 } else {
191 error!("LMDB: failed to create transaction for key=[{}], path=[{}], err={}", key, self.path, e);
192 return None;
193 }
194 },
195 _ => {
196 error!("LMDB: failed to create transaction for key=[{}], path=[{}], err={}", key, self.path, e);
197 },
198 },
199 },
200 Err(e) => {
201 error!("LMDB: db handle error for key=[{}], path=[{}], err={}", key, self.path, e);
202 return None;
203 },
204 },
205 Err(e) => match e {
206 MdbError::Panic => {
207 is_need_reopen = true;
208 },
209 _ => {
210 error!("LMDB: db environment error for key=[{}], path=[{}], err={}", key, self.path, e);
211 return None;
212 },
213 },
214 }
215
216 if is_need_reopen {
217 warn!("db {} reopen for key=[{}]", self.path, key);
218 self.open();
219 }
220 }
221
222 None
223 }
224
225 pub fn count(&mut self) -> usize {
226 if self.db_env.is_err() {
227 self.open();
228 }
229
230 for _it in 0..2 {
231 let mut is_need_reopen = false;
232
233 match &self.db_env {
234 Ok(env) => match env.stat() {
235 Ok(stat) => {
236 return stat.ms_entries;
237 },
238 Err(e) => match e {
239 MdbError::Other(c, _) => {
240 if c == -30785 {
241 is_need_reopen = true;
242 } else {
243 error!("LMDB: fail read stat for path=[{}], err={}", self.path, e);
244 return 0;
245 }
246 },
247 _ => {
248 error!("LMDB: fail to create transaction for stat read, path=[{}], err={}", self.path, e);
249 },
250 },
251 },
252 Err(e) => match e {
253 MdbError::Panic => {
254 is_need_reopen = true;
255 },
256 _ => {
257 error!("LMDB: db environment error while reading stat, path=[{}], err={}", self.path, e);
258 return 0;
259 },
260 },
261 }
262
263 if is_need_reopen {
264 warn!("db {} reopen for stat read", self.path);
265 self.open();
266 }
267 }
268
269 0
270 }
271
272 pub fn remove(&mut self, key: &str) -> bool {
273 if self.db_env.is_err() {
274 self.open();
275 }
276 remove_from_lmdb(&self.db_env, &self.db_handle, key, &self.path)
277 }
278
279 pub fn put<T: ToMdbValue>(&mut self, key: &str, val: T) -> bool {
280 if self.db_env.is_err() {
281 self.open();
282 }
283 put_kv_lmdb(&self.db_env, &self.db_handle, key, val, &self.path)
284 }
285}
286
287impl LMDBStorage {
288 pub fn new(db_path: &str, mode: StorageMode, max_read_counter_reopen: Option<u64>) -> LMDBStorage {
289 LMDBStorage {
290 individuals_db: LmdbInstance {
291 max_read_counter: max_read_counter_reopen.unwrap_or(u32::MAX as u64),
292 path: db_path.to_owned() + "/lmdb-individuals/",
293 mode: mode.clone(),
294 ..Default::default()
295 },
296 tickets_db: LmdbInstance {
297 max_read_counter: max_read_counter_reopen.unwrap_or(u32::MAX as u64),
298 path: db_path.to_owned() + "/lmdb-tickets/",
299 mode: mode.clone(),
300 ..Default::default()
301 },
302 az_db: LmdbInstance {
303 max_read_counter: max_read_counter_reopen.unwrap_or(u32::MAX as u64),
304 path: db_path.to_owned() + "/acl-indexes/",
305 mode: mode.clone(),
306 ..Default::default()
307 },
308 }
309 }
310
311 fn get_db_instance(&mut self, storage: &StorageId) -> &mut LmdbInstance {
312 match storage {
313 StorageId::Individuals => &mut self.individuals_db,
314 StorageId::Tickets => &mut self.tickets_db,
315 StorageId::Az => &mut self.az_db,
316 }
317 }
318
319 pub fn open(&mut self, storage: StorageId) {
320 let db_instance = self.get_db_instance(&storage);
321 db_instance.open();
322
323 info!("LMDBStorage: db {} open {:?}", db_instance.path, storage);
324 }
325}
326
327impl Storage for LMDBStorage {
328 fn get_individual(&mut self, storage: StorageId, uri: &str, iraw: &mut Individual) -> StorageResult<()> {
329 let db_instance = self.get_db_instance(&storage);
330 db_instance.get_individual(uri, iraw)
331 }
332
333 fn get_value(&mut self, storage: StorageId, key: &str) -> crate::common::StorageResult<String> {
334 let db_instance = self.get_db_instance(&storage);
335 match db_instance.get_v(key) {
336 Some(value) => crate::common::StorageResult::Ok(value),
337 None => crate::common::StorageResult::NotFound,
338 }
339 }
340
341 fn get_raw_value(&mut self, storage: StorageId, key: &str) -> crate::common::StorageResult<Vec<u8>> {
342 let db_instance = self.get_db_instance(&storage);
343 match db_instance.get_raw(key) {
344 Some(value) => crate::common::StorageResult::Ok(value),
345 None => crate::common::StorageResult::NotFound,
346 }
347 }
348
349 fn put_value(&mut self, storage: StorageId, key: &str, val: &str) -> crate::common::StorageResult<()> {
350 let db_instance = self.get_db_instance(&storage);
351 if put_kv_lmdb(&db_instance.db_env, &db_instance.db_handle, key, val.as_bytes(), &db_instance.path) {
352 crate::common::StorageResult::Ok(())
353 } else {
354 crate::common::StorageResult::Error("Failed to put value".to_string())
355 }
356 }
357
358 fn put_raw_value(&mut self, storage: StorageId, key: &str, val: Vec<u8>) -> crate::common::StorageResult<()> {
359 let db_instance = self.get_db_instance(&storage);
360 if put_kv_lmdb(&db_instance.db_env, &db_instance.db_handle, key, val.as_slice(), &db_instance.path) {
361 crate::common::StorageResult::Ok(())
362 } else {
363 crate::common::StorageResult::Error("Failed to put raw value".to_string())
364 }
365 }
366
367 fn remove_value(&mut self, storage: StorageId, key: &str) -> crate::common::StorageResult<()> {
368 let db_instance = self.get_db_instance(&storage);
369 if remove_from_lmdb(&db_instance.db_env, &db_instance.db_handle, key, &db_instance.path) {
370 crate::common::StorageResult::Ok(())
371 } else {
372 crate::common::StorageResult::NotFound
373 }
374 }
375
376 fn count(&mut self, storage: StorageId) -> crate::common::StorageResult<usize> {
377 let db_instance = self.get_db_instance(&storage);
378 crate::common::StorageResult::Ok(db_instance.count())
379 }
380}
381
382fn remove_from_lmdb(db_env: &Result<Environment, MdbError>, db_handle: &Result<DbHandle, MdbError>, key: &str, path: &str) -> bool {
383 match db_env {
384 Ok(env) => match env.new_transaction() {
385 Ok(txn) => match db_handle {
386 Ok(handle) => {
387 let db = txn.bind(handle);
388 if let Err(e) = db.del(&key) {
389 error!("LMDB: failed to remove key=[{}] from path=[{}], err={}", key, path, e);
390 return false;
391 }
392
393 if let Err(e) = txn.commit() {
394 if let MdbError::Other(c, _) = e {
395 if c == -30792 && grow_db(db_env, path) {
396 return remove_from_lmdb(db_env, db_handle, key, path);
397 }
398 }
399 error!("LMDB: failed to commit removal for key=[{}], path=[{}], err={}", key, path, e);
400 return false;
401 }
402 true
403 },
404 Err(e) => {
405 error!("LMDB: db handle error while removing key=[{}], path=[{}], err={}", key, path, e);
406 false
407 },
408 },
409 Err(e) => {
410 error!("LMDB: failed to create transaction while removing key=[{}], path=[{}], err={}", key, path, e);
411 false
412 },
413 },
414 Err(e) => {
415 error!("LMDB: db environment error while removing key=[{}], path=[{}], err={}", key, path, e);
416 false
417 },
418 }
419}
420
421fn put_kv_lmdb<T: ToMdbValue>(db_env: &Result<Environment, MdbError>, db_handle: &Result<DbHandle, MdbError>, key: &str, val: T, path: &str) -> bool {
422 match db_env {
423 Ok(env) => match env.new_transaction() {
424 Ok(txn) => match db_handle {
425 Ok(handle) => {
426 let db = txn.bind(handle);
427 if let Err(e) = db.set(&key, &val) {
428 error!("LMDB: failed to put key=[{}] into path=[{}], err={}", key, path, e);
429 return false;
430 }
431
432 if let Err(e) = txn.commit() {
433 if let MdbError::Other(c, _) = e {
434 if c == -30792 && grow_db(db_env, path) {
435 return put_kv_lmdb(db_env, db_handle, key, val, path);
436 }
437 }
438 error!("LMDB: failed to commit put for key=[{}], path=[{}], err={}", key, path, e);
439 return false;
440 }
441 true
442 },
443 Err(e) => {
444 error!("LMDB: db handle error while putting key=[{}], path=[{}], err={}", key, path, e);
445 false
446 },
447 },
448 Err(e) => {
449 error!("LMDB: failed to create transaction while putting key=[{}], path=[{}], err={}", key, path, e);
450 false
451 },
452 },
453 Err(e) => {
454 error!("LMDB: db environment error while putting key=[{}], path=[{}], err={}", key, path, e);
455 false
456 },
457 }
458}
459
460fn grow_db(db_env: &Result<Environment, MdbError>, path: &str) -> bool {
461 match db_env {
462 Ok(env) => {
463 if let Ok(stat) = env.info() {
464 let new_size = stat.me_mapsize + 100 * 10_048_576;
465 if env.set_mapsize(new_size).is_ok() {
466 info!("success grow db, new size = {}", new_size);
467 return true;
468 }
469 }
470 },
471 Err(e) => {
472 error!("LMDB: db environment error while growing db, path=[{}], err={}", path, e);
473 },
474 }
475 false
476}