v_authorization_impl/
az_lmdb.rs

1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use v_authorization::common::AuthorizationContext;
3use chrono::{DateTime, Utc};
4use io::Error;
5use heed::{Env, EnvOpenOptions, Database, RoTxn};
6use heed::types::Str;
7use std::cmp::PartialEq;
8use std::io::ErrorKind;
9use std::path::PathBuf;
10use std::time;
11use std::time::SystemTime;
12use std::{io, thread};
13use v_authorization::common::{Storage, Trace};
14use v_authorization::*;
15
16const DB_PATH: &str = "./data/acl-indexes/";
17const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
18
19use crate::stat_manager::StatPub;
20
21#[derive(Debug, Eq, PartialEq, Clone)]
22enum StatMode {
23    Full,
24    Minimal,
25    None,
26}
27
28struct Stat {
29    point: StatPub,
30    mode: StatMode,
31}
32
33pub struct LmdbAzContext {
34    env: Env,
35    cache_env: Option<Env>,
36    authorize_counter: u64,
37    max_authorize_counter: u64,
38    stat: Option<Stat>,
39}
40
41fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
42    loop {
43        let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
44
45        if !path.exists() {
46            error!("LIB_AZ: Database does not exist at path: {}", path.display());
47            thread::sleep(time::Duration::from_secs(3));
48            error!("Retrying database connection...");
49            continue;
50        }
51
52        match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
53            Ok(env) => {
54                info!("LIB_AZ: Opened environment at path: {}", DB_PATH);
55
56                let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
57                    point: p,
58                    mode: stat_mode.clone(),
59                });
60
61                if let Some(_stat) = &stat_ctx {
62                    info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
63                    info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
64                }
65
66                return if use_cache.unwrap_or(false) {
67                    let cache_env = match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
68                        Ok(env) => {
69                            info!("LIB_AZ: Opened cache environment at path: {}", CACHE_DB_PATH);
70                            Some(env)
71                        },
72                        Err(e) => {
73                            warn!("LIB_AZ: Error opening cache environment: {:?}. Proceeding without cache.", e);
74                            None
75                        },
76                    };
77
78                    LmdbAzContext {
79                        env,
80                        cache_env,
81                        authorize_counter: 0,
82                        max_authorize_counter: max_read_counter,
83                        stat: stat_ctx,
84                    }
85                } else {
86                    LmdbAzContext {
87                        env,
88                        cache_env: None,
89                        authorize_counter: 0,
90                        max_authorize_counter: max_read_counter,
91                        stat: stat_ctx,
92                    }
93                };
94            },
95            Err(e) => {
96                error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
97                thread::sleep(time::Duration::from_secs(3));
98            },
99        }
100    }
101}
102
103impl LmdbAzContext {
104    pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
105        let mode = if let Some(v) = stat_mode_str {
106            match v.to_lowercase().as_str() {
107                "full" => StatMode::Full,
108                "minimal" => StatMode::Minimal,
109                "off" => StatMode::None,
110                "none" => StatMode::None,
111                _ => StatMode::Full,
112            }
113        } else {
114            StatMode::Full
115        };
116
117        open(max_read_counter, stat_collector_url, mode, use_cache)
118    }
119
120    pub fn new(max_read_counter: u64) -> LmdbAzContext {
121        open(max_read_counter, None, StatMode::None, None)
122    }
123}
124
125impl Default for LmdbAzContext {
126    fn default() -> Self {
127        Self::new(u64::MAX)
128    }
129}
130
131impl AuthorizationContext for LmdbAzContext {
132    fn authorize(&mut self, uri: &str, user_uri: &str, request_access: u8, _is_check_for_reload: bool) -> Result<u8, std::io::Error> {
133        let mut t = Trace {
134            acl: &mut String::new(),
135            is_acl: false,
136            group: &mut String::new(),
137            is_group: false,
138            info: &mut String::new(),
139            is_info: false,
140            str_num: 0,
141        };
142
143        let start_time = SystemTime::now();
144
145        let r = self.authorize_and_trace(uri, user_uri, request_access, _is_check_for_reload, &mut t);
146
147        if let Some(stat) = &mut self.stat {
148            if stat.mode == StatMode::Full || stat.mode == StatMode::Minimal {
149                let elapsed = start_time.elapsed().unwrap_or_default();
150                stat.point.set_duration(elapsed);
151                if let Err(e) = stat.point.flush() {
152                    warn!("fail flush stat, err={:?}", e);
153                }
154            }
155        }
156
157        r
158    }
159
160    fn authorize_and_trace(
161        &mut self,
162        uri: &str,
163        user_uri: &str,
164        request_access: u8,
165        _is_check_for_reload: bool,
166        trace: &mut Trace,
167    ) -> Result<u8, std::io::Error> {
168        self.authorize_counter += 1;
169        //info!("az counter={}", self.authorize_counter);
170        if self.authorize_counter >= self.max_authorize_counter {
171            //info!("az reopen, counter > {}", self.max_authorize_counter);
172            self.authorize_counter = 0;
173
174            match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
175                Ok(env1) => {
176                    self.env = env1;
177                },
178                Err(e1) => {
179                    return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening environment: {:?}", e1)));
180                },
181            }
182        }
183
184        match self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace) {
185            Ok(r) => {
186                return Ok(r);
187            },
188            Err(e) => {
189                info!("reopen");
190
191                match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
192                    Ok(env1) => {
193                        self.env = env1;
194                    },
195                    Err(e1) => {
196                        error!("Authorize: Err opening environment: {:?}", e1);
197                        return Err(e);
198                    },
199                }
200            },
201        }
202        // retry authorization if db err
203        self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace)
204    }
205}
206
207pub struct AzLmdbStorage<'a> {
208    txn: &'a RoTxn<'a>,
209    db: Database<Str, Str>,
210    cache_txn: Option<&'a RoTxn<'a>>,
211    cache_db: Option<Database<Str, Str>>,
212    stat: &'a mut Option<Stat>,
213}
214
215fn message(key: &str, use_cache: bool, from_cache: bool) -> String {
216    match (use_cache, from_cache) {
217        (true, true) => format!("{}/C", key),
218        (true, false) => format!("{}/cB", key),
219        (false, _) => format!("{}/B", key),
220    }
221}
222
223impl<'a> Storage for AzLmdbStorage<'a> {
224    fn get(&mut self, key: &str) -> io::Result<Option<String>> {
225        if let Some(cache_db) = self.cache_db {
226            if let Some(cache_txn) = self.cache_txn {
227                match cache_db.get(cache_txn, key) {
228                    Ok(Some(val)) => {
229                        if let Some(stat) = self.stat {
230                            if stat.mode == StatMode::Full {
231                                stat.point.collect(message(key, true, true));
232                            }
233                        }
234                        debug!("@cache val={}", val);
235                        return Ok(Some(val.to_string()));
236                    },
237                    Ok(None) => {
238                        // Data not found in cache, continue reading from main database
239                    },
240                    Err(_e) => {
241                        // Error reading cache, continue reading from main database
242                    },
243                }
244            }
245        }
246
247        match self.db.get(self.txn, key) {
248            Ok(Some(val)) => {
249                if let Some(stat) = self.stat {
250                    if stat.mode == StatMode::Full {
251                        stat.point.collect(message(key, self.cache_db.is_some(), false));
252                    }
253                }
254                debug!("@db val={}", val);
255                Ok(Some(val.to_string()))
256            },
257            Ok(None) => Ok(None),
258            Err(e) => Err(Error::new(ErrorKind::Other, format!("Authorize: db.get {:?}, {}", e, key))),
259        }
260    }
261
262    fn fiber_yield(&self) {}
263
264    fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
265        decode_rec_to_rights(src, result)
266    }
267
268    fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
269        decode_rec_to_rightset(src, new_rights)
270    }
271
272    fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
273        decode_filter(filter_value)
274    }
275}
276
277impl LmdbAzContext {
278    fn authorize_use_db(
279        &mut self,
280        uri: &str,
281        user_uri: &str,
282        request_access: u8,
283        _is_check_for_reload: bool,
284        trace: &mut Trace,
285    ) -> Result<u8, std::io::Error> {
286        let txn = match self.env.read_txn() {
287            Ok(txn1) => txn1,
288            Err(e) => {
289                return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING TRANSACTION {:?}", e)));
290            },
291        };
292
293        let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
294            Ok(Some(db_res)) => db_res,
295            Ok(None) => {
296                return Err(Error::new(ErrorKind::Other, "Authorize: database not found"));
297            },
298            Err(e) => {
299                return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening database: {:?}", e)));
300            },
301        };
302
303        let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
304            let txn_cache = match env.read_txn() {
305                Ok(txn1) => txn1,
306                Err(e) => {
307                    return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
308                },
309            };
310            
311            let db = match env.open_database(&txn_cache, None) {
312                Ok(Some(db_res)) => Some(db_res),
313                Ok(None) => {
314                    warn!("Authorize: cache database not found");
315                    None
316                },
317                Err(e) => {
318                    warn!("Authorize: Err opening cache database: {:?}", e);
319                    None
320                },
321            };
322            
323            (Some(txn_cache), db, true)
324        } else {
325            (None, None, false)
326        };
327
328        let cache_txn_ptr = if cache_txn_ref { 
329            cache_txn_owned.as_ref().map(|v| &**v)
330        } else { 
331            None 
332        };
333        
334        let mut storage = AzLmdbStorage {
335            txn: &txn,
336            db,
337            cache_txn: cache_txn_ptr,
338            cache_db,
339            stat: &mut self.stat,
340        };
341
342        authorize(uri, user_uri, request_access, &mut storage, trace)
343    }
344}
345