v_authorization_impl/
az_lmdb.rs

1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use heed::{Env, EnvOpenOptions, Database, RoTxn};
5use heed::types::Str;
6use std::path::PathBuf;
7use std::sync::{Arc, OnceLock};
8use std::time;
9use std::{io, thread};
10use v_authorization::common::{Storage, Trace};
11use v_authorization::*;
12
13const DB_PATH: &str = "./data/acl-indexes/";
14const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
15
16use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
17use crate::common::AuthorizationHelper;
18use crate::impl_authorization_context;
19
20// Global shared environments for multi-threaded access
21static GLOBAL_ENV: OnceLock<Arc<Env>> = OnceLock::new();
22static GLOBAL_CACHE_ENV: OnceLock<Arc<Env>> = OnceLock::new();
23
24pub struct LmdbAzContext {
25    env: Arc<Env>,
26    cache_env: Option<Arc<Env>>,
27    authorize_counter: u64,
28    max_authorize_counter: u64,
29    stat: Option<Stat>,
30}
31
32fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
33    // Get or initialize global environment (shared across all threads)
34    let env = GLOBAL_ENV.get_or_init(|| {
35        loop {
36            let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
37
38            if !path.exists() {
39                error!("LIB_AZ: Database does not exist at path: {}", path.display());
40                thread::sleep(time::Duration::from_secs(3));
41                error!("Retrying database connection...");
42                continue;
43            }
44
45            match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
46                Ok(env) => {
47                    info!("LIB_AZ: Opened shared environment at path: {}", DB_PATH);
48                    return Arc::new(env);
49                },
50                Err(e) => {
51                    error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
52                    thread::sleep(time::Duration::from_secs(3));
53                },
54            }
55        }
56    }).clone();
57
58    let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
59        point: p,
60        mode: stat_mode.clone(),
61    });
62
63    if let Some(_stat) = &stat_ctx {
64        info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
65        info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
66    }
67
68    let cache_env = if use_cache.unwrap_or(false) {
69        // Get or try to initialize cache environment
70        let cache_result = GLOBAL_CACHE_ENV.get_or_init(|| {
71            match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
72                Ok(env) => {
73                    info!("LIB_AZ: Opened shared cache environment at path: {}", CACHE_DB_PATH);
74                    Arc::new(env)
75                },
76                Err(e) => {
77                    warn!("LIB_AZ: Error opening cache environment: {:?}. Cache will not be used.", e);
78                    // Use a closed/empty environment as marker
79                    Arc::new(unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH).unwrap() })
80                },
81            }
82        });
83        Some(cache_result.clone())
84    } else {
85        None
86    };
87
88    LmdbAzContext {
89        env,
90        cache_env,
91        authorize_counter: 0,
92        max_authorize_counter: max_read_counter,
93        stat: stat_ctx,
94    }
95}
96
97impl LmdbAzContext {
98    pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
99        let mode = parse_stat_mode(stat_mode_str);
100        open(max_read_counter, stat_collector_url, mode, use_cache)
101    }
102
103    pub fn new(max_read_counter: u64) -> LmdbAzContext {
104        open(max_read_counter, None, StatMode::None, None)
105    }
106}
107
108impl Default for LmdbAzContext {
109    fn default() -> Self {
110        Self::new(u64::MAX)
111    }
112}
113
114impl AuthorizationHelper for LmdbAzContext {
115    fn get_stat_mut(&mut self) -> &mut Option<Stat> {
116        &mut self.stat
117    }
118
119    fn get_authorize_counter(&self) -> u64 {
120        self.authorize_counter
121    }
122
123    fn get_max_authorize_counter(&self) -> u64 {
124        self.max_authorize_counter
125    }
126
127    fn set_authorize_counter(&mut self, value: u64) {
128        self.authorize_counter = value;
129    }
130
131    fn authorize_use_db(
132        &mut self,
133        uri: &str,
134        user_uri: &str,
135        request_access: u8,
136        _is_check_for_reload: bool,
137        trace: &mut Trace,
138    ) -> Result<u8, std::io::Error> {
139        let txn = match self.env.read_txn() {
140            Ok(txn1) => txn1,
141            Err(e) => {
142                return Err(Error::other(format!("Authorize:CREATING TRANSACTION {:?}", e)));
143            },
144        };
145
146        let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
147            Ok(Some(db_res)) => db_res,
148            Ok(None) => {
149                return Err(Error::other("Authorize: database not found"));
150            },
151            Err(e) => {
152                return Err(Error::other(format!("Authorize: Err opening database: {:?}", e)));
153            },
154        };
155
156        let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
157            let txn_cache = match env.read_txn() {
158                Ok(txn1) => txn1,
159                Err(e) => {
160                    return Err(Error::other(format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
161                },
162            };
163            
164            let db = match env.open_database(&txn_cache, None) {
165                Ok(Some(db_res)) => Some(db_res),
166                Ok(None) => {
167                    warn!("Authorize: cache database not found");
168                    None
169                },
170                Err(e) => {
171                    warn!("Authorize: Err opening cache database: {:?}", e);
172                    None
173                },
174            };
175            
176            (Some(txn_cache), db, true)
177        } else {
178            (None, None, false)
179        };
180
181        let cache_txn_ptr = if cache_txn_ref { 
182            cache_txn_owned.as_deref()
183        } else { 
184            None 
185        };
186        
187        let mut storage = AzLmdbStorage {
188            txn: &txn,
189            db,
190            cache_txn: cache_txn_ptr,
191            cache_db,
192            stat: &mut self.stat,
193        };
194
195        authorize(uri, user_uri, request_access, &mut storage, trace)
196    }
197}
198
199impl_authorization_context!(LmdbAzContext);
200
201pub struct AzLmdbStorage<'a> {
202    txn: &'a RoTxn<'a>,
203    db: Database<Str, Str>,
204    cache_txn: Option<&'a RoTxn<'a>>,
205    cache_db: Option<Database<Str, Str>>,
206    stat: &'a mut Option<Stat>,
207}
208
209impl<'a> Storage for AzLmdbStorage<'a> {
210    fn get(&mut self, key: &str) -> io::Result<Option<String>> {
211        if let Some(cache_db) = self.cache_db {
212            if let Some(cache_txn) = self.cache_txn {
213                match cache_db.get(cache_txn, key) {
214                    Ok(Some(val)) => {
215                        if let Some(stat) = self.stat {
216                            if stat.mode == StatMode::Full {
217                                stat.point.collect(format_stat_message(key, true, true));
218                            }
219                        }
220                        debug!("@cache val={}", val);
221                        return Ok(Some(val.to_string()));
222                    },
223                    Ok(None) => {
224                        // Data not found in cache, continue reading from main database
225                    },
226                    Err(_e) => {
227                        // Error reading cache, continue reading from main database
228                    },
229                }
230            }
231        }
232
233        match self.db.get(self.txn, key) {
234            Ok(Some(val)) => {
235                if let Some(stat) = self.stat {
236                    if stat.mode == StatMode::Full {
237                        stat.point.collect(format_stat_message(key, self.cache_db.is_some(), false));
238                    }
239                }
240                debug!("@db val={}", val);
241                Ok(Some(val.to_string()))
242            },
243            Ok(None) => Ok(None),
244            Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
245        }
246    }
247
248    fn fiber_yield(&self) {}
249
250    fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
251        decode_rec_to_rights(src, result)
252    }
253
254    fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
255        decode_rec_to_rightset(src, new_rights)
256    }
257
258    fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
259        decode_filter(filter_value)
260    }
261}
262