v_authorization_impl/
az_lmdb.rs

1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use v_authorization::common::AuthorizationContext;
3use chrono::{DateTime, Utc};
4use io::Error;
5use heed::{Env, EnvOpenOptions, Database, RoTxn};
6use heed::types::Str;
7use std::cmp::PartialEq;
8use std::io::ErrorKind;
9use std::mem::ManuallyDrop;
10use std::path::PathBuf;
11use std::time;
12use std::time::SystemTime;
13use std::{io, thread};
14use v_authorization::common::{Storage, Trace};
15use v_authorization::*;
16
17const DB_PATH: &str = "./data/acl-indexes/";
18const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
19
20use crate::stat_manager::StatPub;
21
22#[derive(Debug, Eq, PartialEq, Clone)]
23enum StatMode {
24    Full,
25    Minimal,
26    None,
27}
28
29struct Stat {
30    point: StatPub,
31    mode: StatMode,
32}
33
34pub struct LmdbAzContext {
35    env: ManuallyDrop<Env>,
36    cache_env: Option<Env>,
37    authorize_counter: u64,
38    max_authorize_counter: u64,
39    stat: Option<Stat>,
40}
41
42fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
43    loop {
44        let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
45
46        if !path.exists() {
47            error!("LIB_AZ: Database does not exist at path: {}", path.display());
48            thread::sleep(time::Duration::from_secs(3));
49            error!("Retrying database connection...");
50            continue;
51        }
52
53        match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
54            Ok(env) => {
55                info!("LIB_AZ: Opened environment at path: {}", DB_PATH);
56
57                let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
58                    point: p,
59                    mode: stat_mode.clone(),
60                });
61
62                if let Some(_stat) = &stat_ctx {
63                    info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
64                    info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
65                }
66
67                return if use_cache.unwrap_or(false) {
68                    let cache_env = match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
69                        Ok(env) => {
70                            info!("LIB_AZ: Opened cache environment at path: {}", CACHE_DB_PATH);
71                            Some(env)
72                        },
73                        Err(e) => {
74                            warn!("LIB_AZ: Error opening cache environment: {:?}. Proceeding without cache.", e);
75                            None
76                        },
77                    };
78
79                    LmdbAzContext {
80                        env: ManuallyDrop::new(env),
81                        cache_env,
82                        authorize_counter: 0,
83                        max_authorize_counter: max_read_counter,
84                        stat: stat_ctx,
85                    }
86                } else {
87                    LmdbAzContext {
88                        env: ManuallyDrop::new(env),
89                        cache_env: None,
90                        authorize_counter: 0,
91                        max_authorize_counter: max_read_counter,
92                        stat: stat_ctx,
93                    }
94                };
95            },
96            Err(e) => {
97                error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
98                thread::sleep(time::Duration::from_secs(3));
99            },
100        }
101    }
102}
103
104impl LmdbAzContext {
105    pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
106        let mode = if let Some(v) = stat_mode_str {
107            match v.to_lowercase().as_str() {
108                "full" => StatMode::Full,
109                "minimal" => StatMode::Minimal,
110                "off" => StatMode::None,
111                "none" => StatMode::None,
112                _ => StatMode::Full,
113            }
114        } else {
115            StatMode::Full
116        };
117
118        open(max_read_counter, stat_collector_url, mode, use_cache)
119    }
120
121    pub fn new(max_read_counter: u64) -> LmdbAzContext {
122        open(max_read_counter, None, StatMode::None, None)
123    }
124}
125
126impl Default for LmdbAzContext {
127    fn default() -> Self {
128        Self::new(u64::MAX)
129    }
130}
131
132impl Drop for LmdbAzContext {
133    fn drop(&mut self) {
134        // Explicitly drop the environment when LmdbAzContext is destroyed
135        unsafe {
136            ManuallyDrop::drop(&mut self.env);
137        }
138    }
139}
140
141impl AuthorizationContext for LmdbAzContext {
142    fn authorize(&mut self, uri: &str, user_uri: &str, request_access: u8, _is_check_for_reload: bool) -> Result<u8, std::io::Error> {
143        let mut t = Trace {
144            acl: &mut String::new(),
145            is_acl: false,
146            group: &mut String::new(),
147            is_group: false,
148            info: &mut String::new(),
149            is_info: false,
150            str_num: 0,
151        };
152
153        let start_time = SystemTime::now();
154
155        let r = self.authorize_and_trace(uri, user_uri, request_access, _is_check_for_reload, &mut t);
156
157        if let Some(stat) = &mut self.stat {
158            if stat.mode == StatMode::Full || stat.mode == StatMode::Minimal {
159                let elapsed = start_time.elapsed().unwrap_or_default();
160                stat.point.set_duration(elapsed);
161                if let Err(e) = stat.point.flush() {
162                    warn!("fail flush stat, err={:?}", e);
163                }
164            }
165        }
166
167        r
168    }
169
170    fn authorize_and_trace(
171        &mut self,
172        uri: &str,
173        user_uri: &str,
174        request_access: u8,
175        _is_check_for_reload: bool,
176        trace: &mut Trace,
177    ) -> Result<u8, std::io::Error> {
178        self.authorize_counter += 1;
179        //info!("az counter={}", self.authorize_counter);
180        if self.authorize_counter >= self.max_authorize_counter {
181            //info!("az reopen, counter > {}", self.max_authorize_counter);
182            self.authorize_counter = 0;
183            
184            // Explicitly close old environment to free cached pages from memory
185            unsafe {
186                ManuallyDrop::drop(&mut self.env);
187            }
188            
189            // Open new environment
190            match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
191                Ok(env1) => {
192                    self.env = ManuallyDrop::new(env1);
193                    info!("LIB_AZ: Reopened environment to free memory");
194                },
195                Err(e1) => {
196                    error!("Authorize: Error reopening environment: {:?}", e1);
197                    return Err(Error::new(ErrorKind::Other, format!("Authorize: Err reopening environment: {:?}", e1)));
198                },
199            }
200        }
201
202        match self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace) {
203            Ok(r) => {
204                return Ok(r);
205            },
206            Err(e) => {
207                // Retry authorization on db error by reopening environment
208                info!("retrying authorization after error, reopening environment");
209                
210                // Explicitly close old environment
211                unsafe {
212                    ManuallyDrop::drop(&mut self.env);
213                }
214                
215                // Open new environment
216                match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
217                    Ok(env1) => {
218                        self.env = ManuallyDrop::new(env1);
219                    },
220                    Err(e1) => {
221                        error!("Authorize: Error reopening environment after error: {:?}", e1);
222                        return Err(e);
223                    },
224                }
225            },
226        }
227        // retry authorization if db err
228        self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace)
229    }
230}
231
232pub struct AzLmdbStorage<'a> {
233    txn: &'a RoTxn<'a>,
234    db: Database<Str, Str>,
235    cache_txn: Option<&'a RoTxn<'a>>,
236    cache_db: Option<Database<Str, Str>>,
237    stat: &'a mut Option<Stat>,
238}
239
240fn message(key: &str, use_cache: bool, from_cache: bool) -> String {
241    match (use_cache, from_cache) {
242        (true, true) => format!("{}/C", key),
243        (true, false) => format!("{}/cB", key),
244        (false, _) => format!("{}/B", key),
245    }
246}
247
248impl<'a> Storage for AzLmdbStorage<'a> {
249    fn get(&mut self, key: &str) -> io::Result<Option<String>> {
250        if let Some(cache_db) = self.cache_db {
251            if let Some(cache_txn) = self.cache_txn {
252                match cache_db.get(cache_txn, key) {
253                    Ok(Some(val)) => {
254                        if let Some(stat) = self.stat {
255                            if stat.mode == StatMode::Full {
256                                stat.point.collect(message(key, true, true));
257                            }
258                        }
259                        debug!("@cache val={}", val);
260                        return Ok(Some(val.to_string()));
261                    },
262                    Ok(None) => {
263                        // Data not found in cache, continue reading from main database
264                    },
265                    Err(_e) => {
266                        // Error reading cache, continue reading from main database
267                    },
268                }
269            }
270        }
271
272        match self.db.get(self.txn, key) {
273            Ok(Some(val)) => {
274                if let Some(stat) = self.stat {
275                    if stat.mode == StatMode::Full {
276                        stat.point.collect(message(key, self.cache_db.is_some(), false));
277                    }
278                }
279                debug!("@db val={}", val);
280                Ok(Some(val.to_string()))
281            },
282            Ok(None) => Ok(None),
283            Err(e) => Err(Error::new(ErrorKind::Other, format!("Authorize: db.get {:?}, {}", e, key))),
284        }
285    }
286
287    fn fiber_yield(&self) {}
288
289    fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
290        decode_rec_to_rights(src, result)
291    }
292
293    fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
294        decode_rec_to_rightset(src, new_rights)
295    }
296
297    fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
298        decode_filter(filter_value)
299    }
300}
301
302impl LmdbAzContext {
303    fn authorize_use_db(
304        &mut self,
305        uri: &str,
306        user_uri: &str,
307        request_access: u8,
308        _is_check_for_reload: bool,
309        trace: &mut Trace,
310    ) -> Result<u8, std::io::Error> {
311        let txn = match self.env.read_txn() {
312            Ok(txn1) => txn1,
313            Err(e) => {
314                return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING TRANSACTION {:?}", e)));
315            },
316        };
317
318        let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
319            Ok(Some(db_res)) => db_res,
320            Ok(None) => {
321                return Err(Error::new(ErrorKind::Other, "Authorize: database not found"));
322            },
323            Err(e) => {
324                return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening database: {:?}", e)));
325            },
326        };
327
328        let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
329            let txn_cache = match env.read_txn() {
330                Ok(txn1) => txn1,
331                Err(e) => {
332                    return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
333                },
334            };
335            
336            let db = match env.open_database(&txn_cache, None) {
337                Ok(Some(db_res)) => Some(db_res),
338                Ok(None) => {
339                    warn!("Authorize: cache database not found");
340                    None
341                },
342                Err(e) => {
343                    warn!("Authorize: Err opening cache database: {:?}", e);
344                    None
345                },
346            };
347            
348            (Some(txn_cache), db, true)
349        } else {
350            (None, None, false)
351        };
352
353        let cache_txn_ptr = if cache_txn_ref { 
354            cache_txn_owned.as_ref().map(|v| &**v)
355        } else { 
356            None 
357        };
358        
359        let mut storage = AzLmdbStorage {
360            txn: &txn,
361            db,
362            cache_txn: cache_txn_ptr,
363            cache_db,
364            stat: &mut self.stat,
365        };
366
367        authorize(uri, user_uri, request_access, &mut storage, trace)
368    }
369}
370