v_authorization_impl/
az_mdbx.rs

1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use libmdbx::{Database, NoWriteMap};
5use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7use std::sync::LazyLock;
8use std::time;
9use std::{io, thread};
10use v_authorization::common::{Storage, Trace};
11use v_authorization::*;
12
13const DB_PATH: &str = "./data/acl-mdbx-indexes/";
14const CACHE_DB_PATH: &str = "./data/acl-cache-mdbx-indexes/";
15
16use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
17use crate::common::AuthorizationHelper;
18use crate::impl_authorization_context;
19
20// Global shared databases for multi-threaded access
21static GLOBAL_DB: LazyLock<Mutex<Option<Arc<Database<NoWriteMap>>>>> = LazyLock::new(|| Mutex::new(None));
22static GLOBAL_CACHE_DB: LazyLock<Mutex<Option<Arc<Database<NoWriteMap>>>>> = LazyLock::new(|| Mutex::new(None));
23
24// Reset global databases (useful for tests)
25// This drops the Arc references and allows the database to be fully closed
26pub fn reset_global_envs() {
27    let mut db = GLOBAL_DB.lock().unwrap();
28    *db = None;
29    
30    let mut cache_db = GLOBAL_CACHE_DB.lock().unwrap();
31    *cache_db = None;
32    
33    info!("LIB_AZ_MDBX: Reset global databases");
34}
35
36// Helper function to force sync of database
37// This ensures data is written to disk before reopening
38pub fn sync_env() -> bool {
39    let db_opt = GLOBAL_DB.lock().unwrap();
40    if let Some(db) = db_opt.as_ref() {
41        match db.sync(true) {
42            Ok(_) => {
43                info!("LIB_AZ_MDBX: Successfully synced database");
44                true
45            },
46            Err(e) => {
47                error!("LIB_AZ_MDBX: Failed to sync database: {:?}", e);
48                false
49            }
50        }
51    } else {
52        true // No database to sync
53    }
54}
55
56pub struct MdbxAzContext {
57    db: Arc<Database<NoWriteMap>>,
58    cache_db: Option<Arc<Database<NoWriteMap>>>,
59    authorize_counter: u64,
60    max_authorize_counter: u64,
61    stat: Option<Stat>,
62}
63
64fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> MdbxAzContext {
65    // Get or initialize global database (shared across all threads)
66    let db = {
67        let mut db_lock = GLOBAL_DB.lock().unwrap();
68        
69        if let Some(existing_db) = db_lock.as_ref() {
70            existing_db.clone()
71        } else {
72            // Create new database
73            let new_db = loop {
74                let path: PathBuf = PathBuf::from(DB_PATH);
75
76                if !path.exists() {
77                    error!("LIB_AZ: Database directory does not exist at path: {}", path.display());
78                    thread::sleep(time::Duration::from_secs(3));
79                    error!("Retrying database connection...");
80                    continue;
81                }
82
83                match Database::open(&path) {
84                    Ok(database) => {
85                        info!("LIB_AZ: Opened shared MDBX database at path: {}", DB_PATH);
86                        break Arc::new(database);
87                    },
88                    Err(e) => {
89                        error!("Authorize: Error opening MDBX database: {:?}. Retrying in 3 seconds...", e);
90                        thread::sleep(time::Duration::from_secs(3));
91                    },
92                }
93            };
94            
95            *db_lock = Some(new_db.clone());
96            new_db
97        }
98    };
99
100    let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
101        point: p,
102        mode: stat_mode.clone(),
103    });
104
105    if let Some(_stat) = &stat_ctx {
106        info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
107        info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
108    }
109
110    let cache_db = if use_cache.unwrap_or(false) {
111        let mut cache_lock = GLOBAL_CACHE_DB.lock().unwrap();
112        
113        if let Some(existing_cache) = cache_lock.as_ref() {
114            Some(existing_cache.clone())
115        } else {
116            // Try to initialize cache database
117            let path: PathBuf = PathBuf::from(CACHE_DB_PATH);
118            match Database::open(&path) {
119                Ok(database) => {
120                    info!("LIB_AZ: Opened shared MDBX cache database at path: {}", CACHE_DB_PATH);
121                    let arc_db = Arc::new(database);
122                    *cache_lock = Some(arc_db.clone());
123                    Some(arc_db)
124                },
125                Err(e) => {
126                    warn!("LIB_AZ: Error opening MDBX cache database: {:?}. Cache will not be used.", e);
127                    None
128                },
129            }
130        }
131    } else {
132        None
133    };
134
135    MdbxAzContext {
136        db,
137        cache_db,
138        authorize_counter: 0,
139        max_authorize_counter: max_read_counter,
140        stat: stat_ctx,
141    }
142}
143
144impl MdbxAzContext {
145    pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> MdbxAzContext {
146        let mode = parse_stat_mode(stat_mode_str);
147        open(max_read_counter, stat_collector_url, mode, use_cache)
148    }
149
150    pub fn new(max_read_counter: u64) -> MdbxAzContext {
151        open(max_read_counter, None, StatMode::None, None)
152    }
153}
154
155impl Default for MdbxAzContext {
156    fn default() -> Self {
157        Self::new(u64::MAX)
158    }
159}
160
161impl AuthorizationHelper for MdbxAzContext {
162    fn get_stat_mut(&mut self) -> &mut Option<Stat> {
163        &mut self.stat
164    }
165
166    fn get_authorize_counter(&self) -> u64 {
167        self.authorize_counter
168    }
169
170    fn get_max_authorize_counter(&self) -> u64 {
171        self.max_authorize_counter
172    }
173
174    fn set_authorize_counter(&mut self, value: u64) {
175        self.authorize_counter = value;
176    }
177
178    fn authorize_use_db(
179        &mut self,
180        uri: &str,
181        user_uri: &str,
182        request_access: u8,
183        _is_check_for_reload: bool,
184        trace: &mut Trace,
185    ) -> Result<u8, std::io::Error> {
186        let mut storage = AzMdbxStorage {
187            db: &self.db,
188            cache_db: self.cache_db.as_deref(),
189            stat: &mut self.stat,
190        };
191
192        authorize(uri, user_uri, request_access, &mut storage, trace)
193    }
194}
195
196impl_authorization_context!(MdbxAzContext);
197
198pub struct AzMdbxStorage<'a> {
199    db: &'a Database<NoWriteMap>,
200    cache_db: Option<&'a Database<NoWriteMap>>,
201    stat: &'a mut Option<Stat>,
202}
203
204impl<'a> AzMdbxStorage<'a> {
205    // Helper function to read from database with less nesting
206    fn read_from_db(&mut self, db: &Database<NoWriteMap>, key: &str, from_cache: bool) -> io::Result<Option<String>> {
207        let txn = db.begin_ro_txn()
208            .map_err(|e| Error::other(format!("Authorize: failed to begin transaction {:?}", e)))?;
209        
210        let table = txn.open_table(None)
211            .map_err(|e| Error::other(format!("Authorize: failed to open table {:?}", e)))?;
212        
213        match txn.get::<Vec<u8>>(&table, key.as_bytes()) {
214            Ok(Some(val)) => {
215                let val_str = std::str::from_utf8(&val)
216                    .map_err(|_| Error::other("Failed to decode UTF-8"))?;
217                
218                if let Some(stat) = self.stat {
219                    if stat.mode == StatMode::Full {
220                        stat.point.collect(format_stat_message(key, self.cache_db.is_some(), from_cache));
221                    }
222                }
223                
224                if from_cache {
225                    debug!("@cache val={}", val_str);
226                } else {
227                    debug!("@db val={}", val_str);
228                }
229                
230                Ok(Some(val_str.to_string()))
231            },
232            Ok(None) => Ok(None),
233            Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
234        }
235    }
236}
237
238impl<'a> Storage for AzMdbxStorage<'a> {
239    fn get(&mut self, key: &str) -> io::Result<Option<String>> {
240        // Try to read from cache first
241        if let Some(cache_db) = self.cache_db {
242            if let Ok(Some(value)) = self.read_from_db(cache_db, key, true) {
243                return Ok(Some(value));
244            }
245            // If cache read failed or returned None, continue to main database
246        }
247
248        // Read from main database
249        self.read_from_db(self.db, key, false)
250    }
251
252    fn fiber_yield(&self) {}
253
254    fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
255        decode_rec_to_rights(src, result)
256    }
257
258    fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
259        decode_rec_to_rightset(src, new_rights)
260    }
261
262    fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
263        decode_filter(filter_value)
264    }
265}
266
267