1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use libmdbx::{Database, NoWriteMap};
5use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7use std::sync::LazyLock;
8use std::time;
9use std::{io, thread};
10use v_authorization::common::{Storage, Trace};
11use v_authorization::*;
12
13const DB_PATH: &str = "./data/acl-mdbx-indexes/";
14const CACHE_DB_PATH: &str = "./data/acl-cache-mdbx-indexes/";
15
16use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
17use crate::common::AuthorizationHelper;
18use crate::impl_authorization_context;
19
20static GLOBAL_DB: LazyLock<Mutex<Option<Arc<Database<NoWriteMap>>>>> = LazyLock::new(|| Mutex::new(None));
22static GLOBAL_CACHE_DB: LazyLock<Mutex<Option<Arc<Database<NoWriteMap>>>>> = LazyLock::new(|| Mutex::new(None));
23
24pub fn reset_global_envs() {
27 let mut db = GLOBAL_DB.lock().unwrap();
28 *db = None;
29
30 let mut cache_db = GLOBAL_CACHE_DB.lock().unwrap();
31 *cache_db = None;
32
33 info!("LIB_AZ_MDBX: Reset global databases");
34}
35
36pub fn sync_env() -> bool {
39 let db_opt = GLOBAL_DB.lock().unwrap();
40 if let Some(db) = db_opt.as_ref() {
41 match db.sync(true) {
42 Ok(_) => {
43 info!("LIB_AZ_MDBX: Successfully synced database");
44 true
45 },
46 Err(e) => {
47 error!("LIB_AZ_MDBX: Failed to sync database: {:?}", e);
48 false
49 }
50 }
51 } else {
52 true }
54}
55
56pub struct MdbxAzContext {
57 db: Arc<Database<NoWriteMap>>,
58 cache_db: Option<Arc<Database<NoWriteMap>>>,
59 authorize_counter: u64,
60 max_authorize_counter: u64,
61 stat: Option<Stat>,
62}
63
64fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> MdbxAzContext {
65 let db = {
67 let mut db_lock = GLOBAL_DB.lock().unwrap();
68
69 if let Some(existing_db) = db_lock.as_ref() {
70 existing_db.clone()
71 } else {
72 let new_db = loop {
74 let path: PathBuf = PathBuf::from(DB_PATH);
75
76 if !path.exists() {
77 error!("LIB_AZ: Database directory does not exist at path: {}", path.display());
78 thread::sleep(time::Duration::from_secs(3));
79 error!("Retrying database connection...");
80 continue;
81 }
82
83 match Database::open(&path) {
84 Ok(database) => {
85 info!("LIB_AZ: Opened shared MDBX database at path: {}", DB_PATH);
86 break Arc::new(database);
87 },
88 Err(e) => {
89 error!("Authorize: Error opening MDBX database: {:?}. Retrying in 3 seconds...", e);
90 thread::sleep(time::Duration::from_secs(3));
91 },
92 }
93 };
94
95 *db_lock = Some(new_db.clone());
96 new_db
97 }
98 };
99
100 let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
101 point: p,
102 mode: stat_mode.clone(),
103 });
104
105 if let Some(_stat) = &stat_ctx {
106 info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
107 info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
108 }
109
110 let cache_db = if use_cache.unwrap_or(false) {
111 let mut cache_lock = GLOBAL_CACHE_DB.lock().unwrap();
112
113 if let Some(existing_cache) = cache_lock.as_ref() {
114 Some(existing_cache.clone())
115 } else {
116 let path: PathBuf = PathBuf::from(CACHE_DB_PATH);
118 match Database::open(&path) {
119 Ok(database) => {
120 info!("LIB_AZ: Opened shared MDBX cache database at path: {}", CACHE_DB_PATH);
121 let arc_db = Arc::new(database);
122 *cache_lock = Some(arc_db.clone());
123 Some(arc_db)
124 },
125 Err(e) => {
126 warn!("LIB_AZ: Error opening MDBX cache database: {:?}. Cache will not be used.", e);
127 None
128 },
129 }
130 }
131 } else {
132 None
133 };
134
135 MdbxAzContext {
136 db,
137 cache_db,
138 authorize_counter: 0,
139 max_authorize_counter: max_read_counter,
140 stat: stat_ctx,
141 }
142}
143
144impl MdbxAzContext {
145 pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> MdbxAzContext {
146 let mode = parse_stat_mode(stat_mode_str);
147 open(max_read_counter, stat_collector_url, mode, use_cache)
148 }
149
150 pub fn new(max_read_counter: u64) -> MdbxAzContext {
151 open(max_read_counter, None, StatMode::None, None)
152 }
153}
154
155impl Default for MdbxAzContext {
156 fn default() -> Self {
157 Self::new(u64::MAX)
158 }
159}
160
161impl AuthorizationHelper for MdbxAzContext {
162 fn get_stat_mut(&mut self) -> &mut Option<Stat> {
163 &mut self.stat
164 }
165
166 fn get_authorize_counter(&self) -> u64 {
167 self.authorize_counter
168 }
169
170 fn get_max_authorize_counter(&self) -> u64 {
171 self.max_authorize_counter
172 }
173
174 fn set_authorize_counter(&mut self, value: u64) {
175 self.authorize_counter = value;
176 }
177
178 fn authorize_use_db(
179 &mut self,
180 uri: &str,
181 user_uri: &str,
182 request_access: u8,
183 _is_check_for_reload: bool,
184 trace: &mut Trace,
185 ) -> Result<u8, std::io::Error> {
186 let mut storage = AzMdbxStorage {
187 db: &self.db,
188 cache_db: self.cache_db.as_deref(),
189 stat: &mut self.stat,
190 };
191
192 authorize(uri, user_uri, request_access, &mut storage, trace)
193 }
194}
195
196impl_authorization_context!(MdbxAzContext);
197
198pub struct AzMdbxStorage<'a> {
199 db: &'a Database<NoWriteMap>,
200 cache_db: Option<&'a Database<NoWriteMap>>,
201 stat: &'a mut Option<Stat>,
202}
203
204impl<'a> AzMdbxStorage<'a> {
205 fn read_from_db(&mut self, db: &Database<NoWriteMap>, key: &str, from_cache: bool) -> io::Result<Option<String>> {
207 let txn = db.begin_ro_txn()
208 .map_err(|e| Error::other(format!("Authorize: failed to begin transaction {:?}", e)))?;
209
210 let table = txn.open_table(None)
211 .map_err(|e| Error::other(format!("Authorize: failed to open table {:?}", e)))?;
212
213 match txn.get::<Vec<u8>>(&table, key.as_bytes()) {
214 Ok(Some(val)) => {
215 let val_str = std::str::from_utf8(&val)
216 .map_err(|_| Error::other("Failed to decode UTF-8"))?;
217
218 if let Some(stat) = self.stat {
219 if stat.mode == StatMode::Full {
220 stat.point.collect(format_stat_message(key, self.cache_db.is_some(), from_cache));
221 }
222 }
223
224 if from_cache {
225 debug!("@cache val={}", val_str);
226 } else {
227 debug!("@db val={}", val_str);
228 }
229
230 Ok(Some(val_str.to_string()))
231 },
232 Ok(None) => Ok(None),
233 Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
234 }
235 }
236}
237
238impl<'a> Storage for AzMdbxStorage<'a> {
239 fn get(&mut self, key: &str) -> io::Result<Option<String>> {
240 if let Some(cache_db) = self.cache_db {
242 if let Ok(Some(value)) = self.read_from_db(cache_db, key, true) {
243 return Ok(Some(value));
244 }
245 }
247
248 self.read_from_db(self.db, key, false)
250 }
251
252 fn fiber_yield(&self) {}
253
254 fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
255 decode_rec_to_rights(src, result)
256 }
257
258 fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
259 decode_rec_to_rightset(src, new_rights)
260 }
261
262 fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
263 decode_filter(filter_value)
264 }
265}
266
267