1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use heed::{Env, EnvOpenOptions, Database, RoTxn};
5use heed::types::Str;
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::sync::LazyLock;
9use std::time;
10use std::{io, thread};
11use v_authorization::common::{Storage, Trace};
12use v_authorization::*;
13
14const DB_PATH: &str = "./data/acl-indexes/";
15const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
16
17use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
18use crate::common::AuthorizationHelper;
19use crate::impl_authorization_context;
20
21static GLOBAL_ENV: LazyLock<Mutex<Option<Arc<Env>>>> = LazyLock::new(|| Mutex::new(None));
23static GLOBAL_CACHE_ENV: LazyLock<Mutex<Option<Arc<Env>>>> = LazyLock::new(|| Mutex::new(None));
24
25pub fn reset_global_envs() {
28 let mut env = GLOBAL_ENV.lock().unwrap();
29 *env = None;
30
31 let mut cache_env = GLOBAL_CACHE_ENV.lock().unwrap();
32 *cache_env = None;
33
34 info!("LIB_AZ: Reset global environments");
35}
36
37pub fn sync_env() -> bool {
40 let env_opt = GLOBAL_ENV.lock().unwrap();
41 if let Some(env) = env_opt.as_ref() {
42 match env.force_sync() {
43 Ok(_) => {
44 info!("LIB_AZ: Successfully synced environment");
45 true
46 },
47 Err(e) => {
48 error!("LIB_AZ: Failed to sync environment: {:?}", e);
49 false
50 }
51 }
52 } else {
53 true }
55}
56
57pub struct LmdbAzContext {
58 env: Arc<Env>,
59 cache_env: Option<Arc<Env>>,
60 authorize_counter: u64,
61 max_authorize_counter: u64,
62 stat: Option<Stat>,
63}
64
65fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
66 let env = {
68 let mut env_lock = GLOBAL_ENV.lock().unwrap();
69
70 if let Some(existing_env) = env_lock.as_ref() {
71 existing_env.clone()
72 } else {
73 let new_env = loop {
75 let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
76
77 if !path.exists() {
78 error!("LIB_AZ: Database does not exist at path: {}", path.display());
79 thread::sleep(time::Duration::from_secs(3));
80 error!("Retrying database connection...");
81 continue;
82 }
83
84 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
85 Ok(env) => {
86 info!("LIB_AZ: Opened shared environment at path: {}", DB_PATH);
87 break Arc::new(env);
88 },
89 Err(e) => {
90 error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
91 thread::sleep(time::Duration::from_secs(3));
92 },
93 }
94 };
95
96 *env_lock = Some(new_env.clone());
97 new_env
98 }
99 };
100
101 let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
102 point: p,
103 mode: stat_mode.clone(),
104 });
105
106 if let Some(_stat) = &stat_ctx {
107 info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
108 info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
109 }
110
111 let cache_env = if use_cache.unwrap_or(false) {
112 let mut cache_lock = GLOBAL_CACHE_ENV.lock().unwrap();
113
114 if let Some(existing_cache) = cache_lock.as_ref() {
115 Some(existing_cache.clone())
116 } else {
117 match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
119 Ok(env) => {
120 info!("LIB_AZ: Opened shared cache environment at path: {}", CACHE_DB_PATH);
121 let arc_env = Arc::new(env);
122 *cache_lock = Some(arc_env.clone());
123 Some(arc_env)
124 },
125 Err(e) => {
126 warn!("LIB_AZ: Error opening cache environment: {:?}. Cache will not be used.", e);
127 None
128 },
129 }
130 }
131 } else {
132 None
133 };
134
135 LmdbAzContext {
136 env,
137 cache_env,
138 authorize_counter: 0,
139 max_authorize_counter: max_read_counter,
140 stat: stat_ctx,
141 }
142}
143
144impl LmdbAzContext {
145 pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
146 let mode = parse_stat_mode(stat_mode_str);
147 open(max_read_counter, stat_collector_url, mode, use_cache)
148 }
149
150 pub fn new(max_read_counter: u64) -> LmdbAzContext {
151 open(max_read_counter, None, StatMode::None, None)
152 }
153}
154
155impl Default for LmdbAzContext {
156 fn default() -> Self {
157 Self::new(u64::MAX)
158 }
159}
160
161impl AuthorizationHelper for LmdbAzContext {
162 fn get_stat_mut(&mut self) -> &mut Option<Stat> {
163 &mut self.stat
164 }
165
166 fn get_authorize_counter(&self) -> u64 {
167 self.authorize_counter
168 }
169
170 fn get_max_authorize_counter(&self) -> u64 {
171 self.max_authorize_counter
172 }
173
174 fn set_authorize_counter(&mut self, value: u64) {
175 self.authorize_counter = value;
176 }
177
178 fn authorize_use_db(
179 &mut self,
180 uri: &str,
181 user_uri: &str,
182 request_access: u8,
183 _is_check_for_reload: bool,
184 trace: &mut Trace,
185 ) -> Result<u8, std::io::Error> {
186 let txn = match self.env.read_txn() {
187 Ok(txn1) => txn1,
188 Err(e) => {
189 return Err(Error::other(format!("Authorize:CREATING TRANSACTION {:?}", e)));
190 },
191 };
192
193 let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
194 Ok(Some(db_res)) => db_res,
195 Ok(None) => {
196 return Err(Error::other("Authorize: database not found"));
197 },
198 Err(e) => {
199 return Err(Error::other(format!("Authorize: Err opening database: {:?}", e)));
200 },
201 };
202
203 let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
204 let txn_cache = match env.read_txn() {
205 Ok(txn1) => txn1,
206 Err(e) => {
207 return Err(Error::other(format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
208 },
209 };
210
211 let db = match env.open_database(&txn_cache, None) {
212 Ok(Some(db_res)) => Some(db_res),
213 Ok(None) => {
214 warn!("Authorize: cache database not found");
215 None
216 },
217 Err(e) => {
218 warn!("Authorize: Err opening cache database: {:?}", e);
219 None
220 },
221 };
222
223 (Some(txn_cache), db, true)
224 } else {
225 (None, None, false)
226 };
227
228 let cache_txn_ptr = if cache_txn_ref {
229 cache_txn_owned.as_deref()
230 } else {
231 None
232 };
233
234 let mut storage = AzLmdbStorage {
235 txn: &txn,
236 db,
237 cache_txn: cache_txn_ptr,
238 cache_db,
239 stat: &mut self.stat,
240 };
241
242 authorize(uri, user_uri, request_access, &mut storage, trace)
243 }
244}
245
246impl_authorization_context!(LmdbAzContext);
247
248pub struct AzLmdbStorage<'a> {
249 txn: &'a RoTxn<'a>,
250 db: Database<Str, Str>,
251 cache_txn: Option<&'a RoTxn<'a>>,
252 cache_db: Option<Database<Str, Str>>,
253 stat: &'a mut Option<Stat>,
254}
255
256impl<'a> Storage for AzLmdbStorage<'a> {
257 fn get(&mut self, key: &str) -> io::Result<Option<String>> {
258 if let Some(cache_db) = self.cache_db {
259 if let Some(cache_txn) = self.cache_txn {
260 match cache_db.get(cache_txn, key) {
261 Ok(Some(val)) => {
262 if let Some(stat) = self.stat {
263 if stat.mode == StatMode::Full {
264 stat.point.collect(format_stat_message(key, true, true));
265 }
266 }
267 debug!("@cache val={}", val);
268 return Ok(Some(val.to_string()));
269 },
270 Ok(None) => {
271 },
273 Err(_e) => {
274 },
276 }
277 }
278 }
279
280 match self.db.get(self.txn, key) {
281 Ok(Some(val)) => {
282 if let Some(stat) = self.stat {
283 if stat.mode == StatMode::Full {
284 stat.point.collect(format_stat_message(key, self.cache_db.is_some(), false));
285 }
286 }
287 debug!("@db val={}", val);
288 Ok(Some(val.to_string()))
289 },
290 Ok(None) => Ok(None),
291 Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
292 }
293 }
294
295 fn fiber_yield(&self) {}
296
297 fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
298 decode_rec_to_rights(src, result)
299 }
300
301 fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
302 decode_rec_to_rightset(src, new_rights)
303 }
304
305 fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
306 decode_filter(filter_value)
307 }
308}
309