v_authorization_impl/
az_mdbx.rs

1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use libmdbx::{Database, NoWriteMap};
5use std::path::PathBuf;
6use std::sync::{Arc, OnceLock};
7use std::time;
8use std::{io, thread};
9use v_authorization::common::{Storage, Trace};
10use v_authorization::*;
11
12const DB_PATH: &str = "./data/acl-mdbx-indexes/";
13const CACHE_DB_PATH: &str = "./data/acl-cache-mdbx-indexes/";
14
15use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
16use crate::common::AuthorizationHelper;
17use crate::impl_authorization_context;
18
19// Global shared databases for multi-threaded access
20static GLOBAL_DB: OnceLock<Arc<Database<NoWriteMap>>> = OnceLock::new();
21static GLOBAL_CACHE_DB: OnceLock<Arc<Database<NoWriteMap>>> = OnceLock::new();
22
23pub struct MdbxAzContext {
24    db: Arc<Database<NoWriteMap>>,
25    cache_db: Option<Arc<Database<NoWriteMap>>>,
26    authorize_counter: u64,
27    max_authorize_counter: u64,
28    stat: Option<Stat>,
29}
30
31fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> MdbxAzContext {
32    // Get or initialize global database (shared across all threads)
33    let db = GLOBAL_DB.get_or_init(|| {
34        loop {
35            let path: PathBuf = PathBuf::from(DB_PATH);
36
37            if !path.exists() {
38                error!("LIB_AZ: Database directory does not exist at path: {}", path.display());
39                thread::sleep(time::Duration::from_secs(3));
40                error!("Retrying database connection...");
41                continue;
42            }
43
44            match Database::open(&path) {
45                Ok(database) => {
46                    info!("LIB_AZ: Opened shared MDBX database at path: {}", DB_PATH);
47                    return Arc::new(database);
48                },
49                Err(e) => {
50                    error!("Authorize: Error opening MDBX database: {:?}. Retrying in 3 seconds...", e);
51                    thread::sleep(time::Duration::from_secs(3));
52                },
53            }
54        }
55    }).clone();
56
57    let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
58        point: p,
59        mode: stat_mode.clone(),
60    });
61
62    if let Some(_stat) = &stat_ctx {
63        info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
64        info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
65    }
66
67    let cache_db = if use_cache.unwrap_or(false) {
68        // Get or try to initialize cache database
69        let cache_result = GLOBAL_CACHE_DB.get_or_init(|| {
70            let path: PathBuf = PathBuf::from(CACHE_DB_PATH);
71            match Database::open(&path) {
72                Ok(database) => {
73                    info!("LIB_AZ: Opened shared MDBX cache database at path: {}", CACHE_DB_PATH);
74                    Arc::new(database)
75                },
76                Err(e) => {
77                    warn!("LIB_AZ: Error opening MDBX cache database: {:?}. Cache will not be used.", e);
78                    // Return empty Arc - use dummy database
79                    match Database::open(CACHE_DB_PATH) {
80                        Ok(db) => Arc::new(db),
81                        Err(_) => panic!("Failed to create fallback cache database"),
82                    }
83                },
84            }
85        });
86        Some(cache_result.clone())
87    } else {
88        None
89    };
90
91    MdbxAzContext {
92        db,
93        cache_db,
94        authorize_counter: 0,
95        max_authorize_counter: max_read_counter,
96        stat: stat_ctx,
97    }
98}
99
100impl MdbxAzContext {
101    pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> MdbxAzContext {
102        let mode = parse_stat_mode(stat_mode_str);
103        open(max_read_counter, stat_collector_url, mode, use_cache)
104    }
105
106    pub fn new(max_read_counter: u64) -> MdbxAzContext {
107        open(max_read_counter, None, StatMode::None, None)
108    }
109}
110
111impl Default for MdbxAzContext {
112    fn default() -> Self {
113        Self::new(u64::MAX)
114    }
115}
116
117impl AuthorizationHelper for MdbxAzContext {
118    fn get_stat_mut(&mut self) -> &mut Option<Stat> {
119        &mut self.stat
120    }
121
122    fn get_authorize_counter(&self) -> u64 {
123        self.authorize_counter
124    }
125
126    fn get_max_authorize_counter(&self) -> u64 {
127        self.max_authorize_counter
128    }
129
130    fn set_authorize_counter(&mut self, value: u64) {
131        self.authorize_counter = value;
132    }
133
134    fn authorize_use_db(
135        &mut self,
136        uri: &str,
137        user_uri: &str,
138        request_access: u8,
139        _is_check_for_reload: bool,
140        trace: &mut Trace,
141    ) -> Result<u8, std::io::Error> {
142        let mut storage = AzMdbxStorage {
143            db: &self.db,
144            cache_db: self.cache_db.as_deref(),
145            stat: &mut self.stat,
146        };
147
148        authorize(uri, user_uri, request_access, &mut storage, trace)
149    }
150}
151
152impl_authorization_context!(MdbxAzContext);
153
154pub struct AzMdbxStorage<'a> {
155    db: &'a Database<NoWriteMap>,
156    cache_db: Option<&'a Database<NoWriteMap>>,
157    stat: &'a mut Option<Stat>,
158}
159
160impl<'a> AzMdbxStorage<'a> {
161    // Helper function to read from database with less nesting
162    fn read_from_db(&mut self, db: &Database<NoWriteMap>, key: &str, from_cache: bool) -> io::Result<Option<String>> {
163        let txn = db.begin_ro_txn()
164            .map_err(|e| Error::other(format!("Authorize: failed to begin transaction {:?}", e)))?;
165        
166        let table = txn.open_table(None)
167            .map_err(|e| Error::other(format!("Authorize: failed to open table {:?}", e)))?;
168        
169        match txn.get::<Vec<u8>>(&table, key.as_bytes()) {
170            Ok(Some(val)) => {
171                let val_str = std::str::from_utf8(&val)
172                    .map_err(|_| Error::other("Failed to decode UTF-8"))?;
173                
174                if let Some(stat) = self.stat {
175                    if stat.mode == StatMode::Full {
176                        stat.point.collect(format_stat_message(key, self.cache_db.is_some(), from_cache));
177                    }
178                }
179                
180                if from_cache {
181                    debug!("@cache val={}", val_str);
182                } else {
183                    debug!("@db val={}", val_str);
184                }
185                
186                Ok(Some(val_str.to_string()))
187            },
188            Ok(None) => Ok(None),
189            Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
190        }
191    }
192}
193
194impl<'a> Storage for AzMdbxStorage<'a> {
195    fn get(&mut self, key: &str) -> io::Result<Option<String>> {
196        // Try to read from cache first
197        if let Some(cache_db) = self.cache_db {
198            if let Ok(Some(value)) = self.read_from_db(cache_db, key, true) {
199                return Ok(Some(value));
200            }
201            // If cache read failed or returned None, continue to main database
202        }
203
204        // Read from main database
205        self.read_from_db(self.db, key, false)
206    }
207
208    fn fiber_yield(&self) {}
209
210    fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
211        decode_rec_to_rights(src, result)
212    }
213
214    fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
215        decode_rec_to_rightset(src, new_rights)
216    }
217
218    fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
219        decode_filter(filter_value)
220    }
221}
222