v_authorization_impl/
az_lmdb.rs

1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use heed::{Env, EnvOpenOptions, Database, RoTxn};
5use heed::types::Str;
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::sync::LazyLock;
9use std::time;
10use std::{io, thread};
11use v_authorization::common::{Storage, Trace};
12use v_authorization::*;
13
14const DB_PATH: &str = "./data/acl-indexes/";
15const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
16
17use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
18use crate::common::AuthorizationHelper;
19use crate::impl_authorization_context;
20
21// Global shared environments for multi-threaded access
22static GLOBAL_ENV: LazyLock<Mutex<Option<Arc<Env>>>> = LazyLock::new(|| Mutex::new(None));
23static GLOBAL_CACHE_ENV: LazyLock<Mutex<Option<Arc<Env>>>> = LazyLock::new(|| Mutex::new(None));
24
25// Reset global environments (useful for tests)
26// This drops the Arc references and allows the database to be fully closed
27pub fn reset_global_envs() {
28    let mut env = GLOBAL_ENV.lock().unwrap();
29    *env = None;
30    
31    let mut cache_env = GLOBAL_CACHE_ENV.lock().unwrap();
32    *cache_env = None;
33    
34    info!("LIB_AZ: Reset global environments");
35}
36
37// Helper function to force sync of environment
38// This ensures data is written to disk before reopening
39pub fn sync_env() -> bool {
40    let env_opt = GLOBAL_ENV.lock().unwrap();
41    if let Some(env) = env_opt.as_ref() {
42        match env.force_sync() {
43            Ok(_) => {
44                info!("LIB_AZ: Successfully synced environment");
45                true
46            },
47            Err(e) => {
48                error!("LIB_AZ: Failed to sync environment: {:?}", e);
49                false
50            }
51        }
52    } else {
53        true // No environment to sync
54    }
55}
56
57pub struct LmdbAzContext {
58    env: Arc<Env>,
59    cache_env: Option<Arc<Env>>,
60    authorize_counter: u64,
61    max_authorize_counter: u64,
62    stat: Option<Stat>,
63}
64
65fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
66    // Get or initialize global environment (shared across all threads)
67    let env = {
68        let mut env_lock = GLOBAL_ENV.lock().unwrap();
69        
70        if let Some(existing_env) = env_lock.as_ref() {
71            existing_env.clone()
72        } else {
73            // Create new environment
74            let new_env = loop {
75                let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
76
77                if !path.exists() {
78                    error!("LIB_AZ: Database does not exist at path: {}", path.display());
79                    thread::sleep(time::Duration::from_secs(3));
80                    error!("Retrying database connection...");
81                    continue;
82                }
83
84                match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
85                    Ok(env) => {
86                        info!("LIB_AZ: Opened shared environment at path: {}", DB_PATH);
87                        break Arc::new(env);
88                    },
89                    Err(e) => {
90                        error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
91                        thread::sleep(time::Duration::from_secs(3));
92                    },
93                }
94            };
95            
96            *env_lock = Some(new_env.clone());
97            new_env
98        }
99    };
100
101    let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
102        point: p,
103        mode: stat_mode.clone(),
104    });
105
106    if let Some(_stat) = &stat_ctx {
107        info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
108        info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
109    }
110
111    let cache_env = if use_cache.unwrap_or(false) {
112        let mut cache_lock = GLOBAL_CACHE_ENV.lock().unwrap();
113        
114        if let Some(existing_cache) = cache_lock.as_ref() {
115            Some(existing_cache.clone())
116        } else {
117            // Try to initialize cache environment
118            match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
119                Ok(env) => {
120                    info!("LIB_AZ: Opened shared cache environment at path: {}", CACHE_DB_PATH);
121                    let arc_env = Arc::new(env);
122                    *cache_lock = Some(arc_env.clone());
123                    Some(arc_env)
124                },
125                Err(e) => {
126                    warn!("LIB_AZ: Error opening cache environment: {:?}. Cache will not be used.", e);
127                    None
128                },
129            }
130        }
131    } else {
132        None
133    };
134
135    LmdbAzContext {
136        env,
137        cache_env,
138        authorize_counter: 0,
139        max_authorize_counter: max_read_counter,
140        stat: stat_ctx,
141    }
142}
143
144impl LmdbAzContext {
145    pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
146        let mode = parse_stat_mode(stat_mode_str);
147        open(max_read_counter, stat_collector_url, mode, use_cache)
148    }
149
150    pub fn new(max_read_counter: u64) -> LmdbAzContext {
151        open(max_read_counter, None, StatMode::None, None)
152    }
153}
154
155impl Default for LmdbAzContext {
156    fn default() -> Self {
157        Self::new(u64::MAX)
158    }
159}
160
161impl AuthorizationHelper for LmdbAzContext {
162    fn get_stat_mut(&mut self) -> &mut Option<Stat> {
163        &mut self.stat
164    }
165
166    fn get_authorize_counter(&self) -> u64 {
167        self.authorize_counter
168    }
169
170    fn get_max_authorize_counter(&self) -> u64 {
171        self.max_authorize_counter
172    }
173
174    fn set_authorize_counter(&mut self, value: u64) {
175        self.authorize_counter = value;
176    }
177
178    fn authorize_use_db(
179        &mut self,
180        uri: &str,
181        user_uri: &str,
182        request_access: u8,
183        _is_check_for_reload: bool,
184        trace: &mut Trace,
185    ) -> Result<u8, std::io::Error> {
186        let txn = match self.env.read_txn() {
187            Ok(txn1) => txn1,
188            Err(e) => {
189                return Err(Error::other(format!("Authorize:CREATING TRANSACTION {:?}", e)));
190            },
191        };
192
193        let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
194            Ok(Some(db_res)) => db_res,
195            Ok(None) => {
196                return Err(Error::other("Authorize: database not found"));
197            },
198            Err(e) => {
199                return Err(Error::other(format!("Authorize: Err opening database: {:?}", e)));
200            },
201        };
202
203        let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
204            let txn_cache = match env.read_txn() {
205                Ok(txn1) => txn1,
206                Err(e) => {
207                    return Err(Error::other(format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
208                },
209            };
210            
211            let db = match env.open_database(&txn_cache, None) {
212                Ok(Some(db_res)) => Some(db_res),
213                Ok(None) => {
214                    warn!("Authorize: cache database not found");
215                    None
216                },
217                Err(e) => {
218                    warn!("Authorize: Err opening cache database: {:?}", e);
219                    None
220                },
221            };
222            
223            (Some(txn_cache), db, true)
224        } else {
225            (None, None, false)
226        };
227
228        let cache_txn_ptr = if cache_txn_ref { 
229            cache_txn_owned.as_deref()
230        } else { 
231            None 
232        };
233        
234        let mut storage = AzLmdbStorage {
235            txn: &txn,
236            db,
237            cache_txn: cache_txn_ptr,
238            cache_db,
239            stat: &mut self.stat,
240        };
241
242        authorize(uri, user_uri, request_access, &mut storage, trace)
243    }
244}
245
246impl_authorization_context!(LmdbAzContext);
247
248pub struct AzLmdbStorage<'a> {
249    txn: &'a RoTxn<'a>,
250    db: Database<Str, Str>,
251    cache_txn: Option<&'a RoTxn<'a>>,
252    cache_db: Option<Database<Str, Str>>,
253    stat: &'a mut Option<Stat>,
254}
255
256impl<'a> Storage for AzLmdbStorage<'a> {
257    fn get(&mut self, key: &str) -> io::Result<Option<String>> {
258        if let Some(cache_db) = self.cache_db {
259            if let Some(cache_txn) = self.cache_txn {
260                match cache_db.get(cache_txn, key) {
261                    Ok(Some(val)) => {
262                        if let Some(stat) = self.stat {
263                            if stat.mode == StatMode::Full {
264                                stat.point.collect(format_stat_message(key, true, true));
265                            }
266                        }
267                        debug!("@cache val={}", val);
268                        return Ok(Some(val.to_string()));
269                    },
270                    Ok(None) => {
271                        // Data not found in cache, continue reading from main database
272                    },
273                    Err(_e) => {
274                        // Error reading cache, continue reading from main database
275                    },
276                }
277            }
278        }
279
280        match self.db.get(self.txn, key) {
281            Ok(Some(val)) => {
282                if let Some(stat) = self.stat {
283                    if stat.mode == StatMode::Full {
284                        stat.point.collect(format_stat_message(key, self.cache_db.is_some(), false));
285                    }
286                }
287                debug!("@db val={}", val);
288                Ok(Some(val.to_string()))
289            },
290            Ok(None) => Ok(None),
291            Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
292        }
293    }
294
295    fn fiber_yield(&self) {}
296
297    fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
298        decode_rec_to_rights(src, result)
299    }
300
301    fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
302        decode_rec_to_rightset(src, new_rights)
303    }
304
305    fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
306        decode_filter(filter_value)
307    }
308}
309