1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use libmdbx::{Database, NoWriteMap};
5use std::path::PathBuf;
6use std::sync::{Arc, OnceLock};
7use std::time;
8use std::{io, thread};
9use v_authorization::common::{Storage, Trace};
10use v_authorization::*;
11
12const DB_PATH: &str = "./data/acl-mdbx-indexes/";
13const CACHE_DB_PATH: &str = "./data/acl-cache-mdbx-indexes/";
14
15use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
16use crate::common::AuthorizationHelper;
17use crate::impl_authorization_context;
18
19static GLOBAL_DB: OnceLock<Arc<Database<NoWriteMap>>> = OnceLock::new();
21static GLOBAL_CACHE_DB: OnceLock<Arc<Database<NoWriteMap>>> = OnceLock::new();
22
23pub struct MdbxAzContext {
24 db: Arc<Database<NoWriteMap>>,
25 cache_db: Option<Arc<Database<NoWriteMap>>>,
26 authorize_counter: u64,
27 max_authorize_counter: u64,
28 stat: Option<Stat>,
29}
30
31fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> MdbxAzContext {
32 let db = GLOBAL_DB.get_or_init(|| {
34 loop {
35 let path: PathBuf = PathBuf::from(DB_PATH);
36
37 if !path.exists() {
38 error!("LIB_AZ: Database directory does not exist at path: {}", path.display());
39 thread::sleep(time::Duration::from_secs(3));
40 error!("Retrying database connection...");
41 continue;
42 }
43
44 match Database::open(&path) {
45 Ok(database) => {
46 info!("LIB_AZ: Opened shared MDBX database at path: {}", DB_PATH);
47 return Arc::new(database);
48 },
49 Err(e) => {
50 error!("Authorize: Error opening MDBX database: {:?}. Retrying in 3 seconds...", e);
51 thread::sleep(time::Duration::from_secs(3));
52 },
53 }
54 }
55 }).clone();
56
57 let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
58 point: p,
59 mode: stat_mode.clone(),
60 });
61
62 if let Some(_stat) = &stat_ctx {
63 info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
64 info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
65 }
66
67 let cache_db = if use_cache.unwrap_or(false) {
68 let cache_result = GLOBAL_CACHE_DB.get_or_init(|| {
70 let path: PathBuf = PathBuf::from(CACHE_DB_PATH);
71 match Database::open(&path) {
72 Ok(database) => {
73 info!("LIB_AZ: Opened shared MDBX cache database at path: {}", CACHE_DB_PATH);
74 Arc::new(database)
75 },
76 Err(e) => {
77 warn!("LIB_AZ: Error opening MDBX cache database: {:?}. Cache will not be used.", e);
78 match Database::open(CACHE_DB_PATH) {
80 Ok(db) => Arc::new(db),
81 Err(_) => panic!("Failed to create fallback cache database"),
82 }
83 },
84 }
85 });
86 Some(cache_result.clone())
87 } else {
88 None
89 };
90
91 MdbxAzContext {
92 db,
93 cache_db,
94 authorize_counter: 0,
95 max_authorize_counter: max_read_counter,
96 stat: stat_ctx,
97 }
98}
99
100impl MdbxAzContext {
101 pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> MdbxAzContext {
102 let mode = parse_stat_mode(stat_mode_str);
103 open(max_read_counter, stat_collector_url, mode, use_cache)
104 }
105
106 pub fn new(max_read_counter: u64) -> MdbxAzContext {
107 open(max_read_counter, None, StatMode::None, None)
108 }
109}
110
111impl Default for MdbxAzContext {
112 fn default() -> Self {
113 Self::new(u64::MAX)
114 }
115}
116
117impl AuthorizationHelper for MdbxAzContext {
118 fn get_stat_mut(&mut self) -> &mut Option<Stat> {
119 &mut self.stat
120 }
121
122 fn get_authorize_counter(&self) -> u64 {
123 self.authorize_counter
124 }
125
126 fn get_max_authorize_counter(&self) -> u64 {
127 self.max_authorize_counter
128 }
129
130 fn set_authorize_counter(&mut self, value: u64) {
131 self.authorize_counter = value;
132 }
133
134 fn authorize_use_db(
135 &mut self,
136 uri: &str,
137 user_uri: &str,
138 request_access: u8,
139 _is_check_for_reload: bool,
140 trace: &mut Trace,
141 ) -> Result<u8, std::io::Error> {
142 let mut storage = AzMdbxStorage {
143 db: &self.db,
144 cache_db: self.cache_db.as_deref(),
145 stat: &mut self.stat,
146 };
147
148 authorize(uri, user_uri, request_access, &mut storage, trace)
149 }
150}
151
152impl_authorization_context!(MdbxAzContext);
153
154pub struct AzMdbxStorage<'a> {
155 db: &'a Database<NoWriteMap>,
156 cache_db: Option<&'a Database<NoWriteMap>>,
157 stat: &'a mut Option<Stat>,
158}
159
160impl<'a> AzMdbxStorage<'a> {
161 fn read_from_db(&mut self, db: &Database<NoWriteMap>, key: &str, from_cache: bool) -> io::Result<Option<String>> {
163 let txn = db.begin_ro_txn()
164 .map_err(|e| Error::other(format!("Authorize: failed to begin transaction {:?}", e)))?;
165
166 let table = txn.open_table(None)
167 .map_err(|e| Error::other(format!("Authorize: failed to open table {:?}", e)))?;
168
169 match txn.get::<Vec<u8>>(&table, key.as_bytes()) {
170 Ok(Some(val)) => {
171 let val_str = std::str::from_utf8(&val)
172 .map_err(|_| Error::other("Failed to decode UTF-8"))?;
173
174 if let Some(stat) = self.stat {
175 if stat.mode == StatMode::Full {
176 stat.point.collect(format_stat_message(key, self.cache_db.is_some(), from_cache));
177 }
178 }
179
180 if from_cache {
181 debug!("@cache val={}", val_str);
182 } else {
183 debug!("@db val={}", val_str);
184 }
185
186 Ok(Some(val_str.to_string()))
187 },
188 Ok(None) => Ok(None),
189 Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
190 }
191 }
192}
193
194impl<'a> Storage for AzMdbxStorage<'a> {
195 fn get(&mut self, key: &str) -> io::Result<Option<String>> {
196 if let Some(cache_db) = self.cache_db {
198 if let Ok(Some(value)) = self.read_from_db(cache_db, key, true) {
199 return Ok(Some(value));
200 }
201 }
203
204 self.read_from_db(self.db, key, false)
206 }
207
208 fn fiber_yield(&self) {}
209
210 fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
211 decode_rec_to_rights(src, result)
212 }
213
214 fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
215 decode_rec_to_rightset(src, new_rights)
216 }
217
218 fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
219 decode_filter(filter_value)
220 }
221}
222