Skip to main content

v_authorization_impl/
az_lmdb.rs

1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use v_authorization::common::AuthorizationContext;
3use chrono::{DateTime, Utc};
4use io::Error;
5use heed::{Env, EnvOpenOptions, Database, RoTxn};
6use heed::types::Str;
7use std::cmp::PartialEq;
8use std::io::ErrorKind;
9use std::path::PathBuf;
10use std::sync::{Arc, OnceLock};
11use std::time;
12use std::time::SystemTime;
13use std::{io, thread};
14use v_authorization::common::{Storage, Trace};
15use v_authorization::*;
16
17const DB_PATH: &str = "./data/acl-indexes/";
18const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
19
20use crate::stat_manager::StatPub;
21
22// Global shared environments for multi-threaded access
23static GLOBAL_ENV: OnceLock<Arc<Env>> = OnceLock::new();
24static GLOBAL_CACHE_ENV: OnceLock<Arc<Env>> = OnceLock::new();
25
26#[derive(Debug, Eq, PartialEq, Clone)]
27enum StatMode {
28    Full,
29    Minimal,
30    None,
31}
32
33struct Stat {
34    point: StatPub,
35    mode: StatMode,
36}
37
38pub struct LmdbAzContext {
39    env: Arc<Env>,
40    cache_env: Option<Arc<Env>>,
41    authorize_counter: u64,
42    max_authorize_counter: u64,
43    stat: Option<Stat>,
44}
45
46fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
47    // Get or initialize global environment (shared across all threads)
48    let env = GLOBAL_ENV.get_or_init(|| {
49        loop {
50            let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
51
52            if !path.exists() {
53                error!("LIB_AZ: Database does not exist at path: {}", path.display());
54                thread::sleep(time::Duration::from_secs(3));
55                error!("Retrying database connection...");
56                continue;
57            }
58
59            match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
60                Ok(env) => {
61                    info!("LIB_AZ: Opened shared environment at path: {}", DB_PATH);
62                    return Arc::new(env);
63                },
64                Err(e) => {
65                    error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
66                    thread::sleep(time::Duration::from_secs(3));
67                },
68            }
69        }
70    }).clone();
71
72    let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
73        point: p,
74        mode: stat_mode.clone(),
75    });
76
77    if let Some(_stat) = &stat_ctx {
78        info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
79        info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
80    }
81
82    let cache_env = if use_cache.unwrap_or(false) {
83        // Get or try to initialize cache environment
84        let cache_result = GLOBAL_CACHE_ENV.get_or_init(|| {
85            match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
86                Ok(env) => {
87                    info!("LIB_AZ: Opened shared cache environment at path: {}", CACHE_DB_PATH);
88                    Arc::new(env)
89                },
90                Err(e) => {
91                    warn!("LIB_AZ: Error opening cache environment: {:?}. Cache will not be used.", e);
92                    // Use a closed/empty environment as marker
93                    Arc::new(unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH).unwrap() })
94                },
95            }
96        });
97        Some(cache_result.clone())
98    } else {
99        None
100    };
101
102    LmdbAzContext {
103        env,
104        cache_env,
105        authorize_counter: 0,
106        max_authorize_counter: max_read_counter,
107        stat: stat_ctx,
108    }
109}
110
111impl LmdbAzContext {
112    pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
113        let mode = if let Some(v) = stat_mode_str {
114            match v.to_lowercase().as_str() {
115                "full" => StatMode::Full,
116                "minimal" => StatMode::Minimal,
117                "off" => StatMode::None,
118                "none" => StatMode::None,
119                _ => StatMode::Full,
120            }
121        } else {
122            StatMode::Full
123        };
124
125        open(max_read_counter, stat_collector_url, mode, use_cache)
126    }
127
128    pub fn new(max_read_counter: u64) -> LmdbAzContext {
129        open(max_read_counter, None, StatMode::None, None)
130    }
131}
132
133impl Default for LmdbAzContext {
134    fn default() -> Self {
135        Self::new(u64::MAX)
136    }
137}
138
139impl AuthorizationContext for LmdbAzContext {
140    fn authorize(&mut self, uri: &str, user_uri: &str, request_access: u8, _is_check_for_reload: bool) -> Result<u8, std::io::Error> {
141        let mut t = Trace {
142            acl: &mut String::new(),
143            is_acl: false,
144            group: &mut String::new(),
145            is_group: false,
146            info: &mut String::new(),
147            is_info: false,
148            str_num: 0,
149        };
150
151        let start_time = SystemTime::now();
152
153        let r = self.authorize_and_trace(uri, user_uri, request_access, _is_check_for_reload, &mut t);
154
155        if let Some(stat) = &mut self.stat {
156            if stat.mode == StatMode::Full || stat.mode == StatMode::Minimal {
157                let elapsed = start_time.elapsed().unwrap_or_default();
158                stat.point.set_duration(elapsed);
159                if let Err(e) = stat.point.flush() {
160                    warn!("fail flush stat, err={:?}", e);
161                }
162            }
163        }
164
165        r
166    }
167
168    fn authorize_and_trace(
169        &mut self,
170        uri: &str,
171        user_uri: &str,
172        request_access: u8,
173        _is_check_for_reload: bool,
174        trace: &mut Trace,
175    ) -> Result<u8, std::io::Error> {
176        self.authorize_counter += 1;
177        //info!("az counter={}", self.authorize_counter);
178        if self.authorize_counter >= self.max_authorize_counter {
179            //info!("az reset counter, counter > {}", self.max_authorize_counter);
180            self.authorize_counter = 0;
181            // Note: with shared Arc<Env>, we don't reopen the environment
182            // The environment is shared across all threads and persists
183        }
184
185        match self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace) {
186            Ok(r) => {
187                return Ok(r);
188            },
189            Err(_e) => {
190                // Retry authorization on db error
191                info!("retrying authorization after error");
192                // Note: with shared Arc<Env>, we can't reopen the environment here
193                // Just retry with the existing environment
194            },
195        }
196        // retry authorization if db err
197        self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace)
198    }
199}
200
201pub struct AzLmdbStorage<'a> {
202    txn: &'a RoTxn<'a>,
203    db: Database<Str, Str>,
204    cache_txn: Option<&'a RoTxn<'a>>,
205    cache_db: Option<Database<Str, Str>>,
206    stat: &'a mut Option<Stat>,
207}
208
209fn message(key: &str, use_cache: bool, from_cache: bool) -> String {
210    match (use_cache, from_cache) {
211        (true, true) => format!("{}/C", key),
212        (true, false) => format!("{}/cB", key),
213        (false, _) => format!("{}/B", key),
214    }
215}
216
217impl<'a> Storage for AzLmdbStorage<'a> {
218    fn get(&mut self, key: &str) -> io::Result<Option<String>> {
219        if let Some(cache_db) = self.cache_db {
220            if let Some(cache_txn) = self.cache_txn {
221                match cache_db.get(cache_txn, key) {
222                    Ok(Some(val)) => {
223                        if let Some(stat) = self.stat {
224                            if stat.mode == StatMode::Full {
225                                stat.point.collect(message(key, true, true));
226                            }
227                        }
228                        debug!("@cache val={}", val);
229                        return Ok(Some(val.to_string()));
230                    },
231                    Ok(None) => {
232                        // Data not found in cache, continue reading from main database
233                    },
234                    Err(_e) => {
235                        // Error reading cache, continue reading from main database
236                    },
237                }
238            }
239        }
240
241        match self.db.get(self.txn, key) {
242            Ok(Some(val)) => {
243                if let Some(stat) = self.stat {
244                    if stat.mode == StatMode::Full {
245                        stat.point.collect(message(key, self.cache_db.is_some(), false));
246                    }
247                }
248                debug!("@db val={}", val);
249                Ok(Some(val.to_string()))
250            },
251            Ok(None) => Ok(None),
252            Err(e) => Err(Error::new(ErrorKind::Other, format!("Authorize: db.get {:?}, {}", e, key))),
253        }
254    }
255
256    fn fiber_yield(&self) {}
257
258    fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
259        decode_rec_to_rights(src, result)
260    }
261
262    fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
263        decode_rec_to_rightset(src, new_rights)
264    }
265
266    fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
267        decode_filter(filter_value)
268    }
269}
270
271impl LmdbAzContext {
272    fn authorize_use_db(
273        &mut self,
274        uri: &str,
275        user_uri: &str,
276        request_access: u8,
277        _is_check_for_reload: bool,
278        trace: &mut Trace,
279    ) -> Result<u8, std::io::Error> {
280        let txn = match self.env.read_txn() {
281            Ok(txn1) => txn1,
282            Err(e) => {
283                return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING TRANSACTION {:?}", e)));
284            },
285        };
286
287        let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
288            Ok(Some(db_res)) => db_res,
289            Ok(None) => {
290                return Err(Error::new(ErrorKind::Other, "Authorize: database not found"));
291            },
292            Err(e) => {
293                return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening database: {:?}", e)));
294            },
295        };
296
297        let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
298            let txn_cache = match env.read_txn() {
299                Ok(txn1) => txn1,
300                Err(e) => {
301                    return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
302                },
303            };
304            
305            let db = match env.open_database(&txn_cache, None) {
306                Ok(Some(db_res)) => Some(db_res),
307                Ok(None) => {
308                    warn!("Authorize: cache database not found");
309                    None
310                },
311                Err(e) => {
312                    warn!("Authorize: Err opening cache database: {:?}", e);
313                    None
314                },
315            };
316            
317            (Some(txn_cache), db, true)
318        } else {
319            (None, None, false)
320        };
321
322        let cache_txn_ptr = if cache_txn_ref { 
323            cache_txn_owned.as_ref().map(|v| &**v)
324        } else { 
325            None 
326        };
327        
328        let mut storage = AzLmdbStorage {
329            txn: &txn,
330            db,
331            cache_txn: cache_txn_ptr,
332            cache_db,
333            stat: &mut self.stat,
334        };
335
336        authorize(uri, user_uri, request_access, &mut storage, trace)
337    }
338}
339