1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use v_authorization::common::AuthorizationContext;
3use chrono::{DateTime, Utc};
4use io::Error;
5use heed::{Env, EnvOpenOptions, Database, RoTxn};
6use heed::types::Str;
7use std::cmp::PartialEq;
8use std::io::ErrorKind;
9use std::path::PathBuf;
10use std::sync::{Arc, OnceLock};
11use std::time;
12use std::time::SystemTime;
13use std::{io, thread};
14use v_authorization::common::{Storage, Trace};
15use v_authorization::*;
16
17const DB_PATH: &str = "./data/acl-indexes/";
18const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
19
20use crate::stat_manager::StatPub;
21
22static GLOBAL_ENV: OnceLock<Arc<Env>> = OnceLock::new();
24static GLOBAL_CACHE_ENV: OnceLock<Arc<Env>> = OnceLock::new();
25
26#[derive(Debug, Eq, PartialEq, Clone)]
27enum StatMode {
28 Full,
29 Minimal,
30 None,
31}
32
33struct Stat {
34 point: StatPub,
35 mode: StatMode,
36}
37
38pub struct LmdbAzContext {
39 env: Arc<Env>,
40 cache_env: Option<Arc<Env>>,
41 authorize_counter: u64,
42 max_authorize_counter: u64,
43 stat: Option<Stat>,
44}
45
46fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
47 let env = GLOBAL_ENV.get_or_init(|| {
49 loop {
50 let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
51
52 if !path.exists() {
53 error!("LIB_AZ: Database does not exist at path: {}", path.display());
54 thread::sleep(time::Duration::from_secs(3));
55 error!("Retrying database connection...");
56 continue;
57 }
58
59 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
60 Ok(env) => {
61 info!("LIB_AZ: Opened shared environment at path: {}", DB_PATH);
62 return Arc::new(env);
63 },
64 Err(e) => {
65 error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
66 thread::sleep(time::Duration::from_secs(3));
67 },
68 }
69 }
70 }).clone();
71
72 let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
73 point: p,
74 mode: stat_mode.clone(),
75 });
76
77 if let Some(_stat) = &stat_ctx {
78 info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
79 info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
80 }
81
82 let cache_env = if use_cache.unwrap_or(false) {
83 let cache_result = GLOBAL_CACHE_ENV.get_or_init(|| {
85 match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
86 Ok(env) => {
87 info!("LIB_AZ: Opened shared cache environment at path: {}", CACHE_DB_PATH);
88 Arc::new(env)
89 },
90 Err(e) => {
91 warn!("LIB_AZ: Error opening cache environment: {:?}. Cache will not be used.", e);
92 Arc::new(unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH).unwrap() })
94 },
95 }
96 });
97 Some(cache_result.clone())
98 } else {
99 None
100 };
101
102 LmdbAzContext {
103 env,
104 cache_env,
105 authorize_counter: 0,
106 max_authorize_counter: max_read_counter,
107 stat: stat_ctx,
108 }
109}
110
111impl LmdbAzContext {
112 pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
113 let mode = if let Some(v) = stat_mode_str {
114 match v.to_lowercase().as_str() {
115 "full" => StatMode::Full,
116 "minimal" => StatMode::Minimal,
117 "off" => StatMode::None,
118 "none" => StatMode::None,
119 _ => StatMode::None,
120 }
121 } else {
122 StatMode::None
123 };
124
125 open(max_read_counter, stat_collector_url, mode, use_cache)
126 }
127
128 pub fn new(max_read_counter: u64) -> LmdbAzContext {
129 open(max_read_counter, None, StatMode::None, None)
130 }
131}
132
133impl Default for LmdbAzContext {
134 fn default() -> Self {
135 Self::new(u64::MAX)
136 }
137}
138
139impl AuthorizationContext for LmdbAzContext {
140 fn authorize(&mut self, uri: &str, user_uri: &str, request_access: u8, _is_check_for_reload: bool) -> Result<u8, std::io::Error> {
141 let mut t = Trace {
142 acl: &mut String::new(),
143 is_acl: false,
144 group: &mut String::new(),
145 is_group: false,
146 info: &mut String::new(),
147 is_info: false,
148 str_num: 0,
149 };
150
151 let start_time = SystemTime::now();
152
153 let r = self.authorize_and_trace(uri, user_uri, request_access, _is_check_for_reload, &mut t);
154
155 if let Some(stat) = &mut self.stat {
156 if stat.mode == StatMode::Full || stat.mode == StatMode::Minimal {
157 let elapsed = start_time.elapsed().unwrap_or_default();
158 stat.point.set_duration(elapsed);
159 if let Err(e) = stat.point.flush() {
160 warn!("fail flush stat, err={:?}", e);
161 }
162 }
163 }
164
165 r
166 }
167
168 fn authorize_and_trace(
169 &mut self,
170 uri: &str,
171 user_uri: &str,
172 request_access: u8,
173 _is_check_for_reload: bool,
174 trace: &mut Trace,
175 ) -> Result<u8, std::io::Error> {
176 self.authorize_counter += 1;
177 if self.authorize_counter >= self.max_authorize_counter {
179 self.authorize_counter = 0;
181 }
184
185 match self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace) {
186 Ok(r) => {
187 return Ok(r);
188 },
189 Err(_e) => {
190 info!("retrying authorization after error");
192 },
195 }
196 self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace)
198 }
199}
200
201pub struct AzLmdbStorage<'a> {
202 txn: &'a RoTxn<'a>,
203 db: Database<Str, Str>,
204 cache_txn: Option<&'a RoTxn<'a>>,
205 cache_db: Option<Database<Str, Str>>,
206 stat: &'a mut Option<Stat>,
207}
208
209fn message(key: &str, use_cache: bool, from_cache: bool) -> String {
210 match (use_cache, from_cache) {
211 (true, true) => format!("{}/C", key),
212 (true, false) => format!("{}/cB", key),
213 (false, _) => format!("{}/B", key),
214 }
215}
216
217impl<'a> Storage for AzLmdbStorage<'a> {
218 fn get(&mut self, key: &str) -> io::Result<Option<String>> {
219 if let Some(cache_db) = self.cache_db {
220 if let Some(cache_txn) = self.cache_txn {
221 match cache_db.get(cache_txn, key) {
222 Ok(Some(val)) => {
223 if let Some(stat) = self.stat {
224 if stat.mode == StatMode::Full {
225 stat.point.collect(message(key, true, true));
226 }
227 }
228 debug!("@cache val={}", val);
229 return Ok(Some(val.to_string()));
230 },
231 Ok(None) => {
232 },
234 Err(_e) => {
235 },
237 }
238 }
239 }
240
241 match self.db.get(self.txn, key) {
242 Ok(Some(val)) => {
243 if let Some(stat) = self.stat {
244 if stat.mode == StatMode::Full {
245 stat.point.collect(message(key, self.cache_db.is_some(), false));
246 }
247 }
248 debug!("@db val={}", val);
249 Ok(Some(val.to_string()))
250 },
251 Ok(None) => Ok(None),
252 Err(e) => Err(Error::new(ErrorKind::Other, format!("Authorize: db.get {:?}, {}", e, key))),
253 }
254 }
255
256 fn fiber_yield(&self) {}
257
258 fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
259 decode_rec_to_rights(src, result)
260 }
261
262 fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
263 decode_rec_to_rightset(src, new_rights)
264 }
265
266 fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
267 decode_filter(filter_value)
268 }
269}
270
271impl LmdbAzContext {
272 fn authorize_use_db(
273 &mut self,
274 uri: &str,
275 user_uri: &str,
276 request_access: u8,
277 _is_check_for_reload: bool,
278 trace: &mut Trace,
279 ) -> Result<u8, std::io::Error> {
280 let txn = match self.env.read_txn() {
281 Ok(txn1) => txn1,
282 Err(e) => {
283 return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING TRANSACTION {:?}", e)));
284 },
285 };
286
287 let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
288 Ok(Some(db_res)) => db_res,
289 Ok(None) => {
290 return Err(Error::new(ErrorKind::Other, "Authorize: database not found"));
291 },
292 Err(e) => {
293 return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening database: {:?}", e)));
294 },
295 };
296
297 let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
298 let txn_cache = match env.read_txn() {
299 Ok(txn1) => txn1,
300 Err(e) => {
301 return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
302 },
303 };
304
305 let db = match env.open_database(&txn_cache, None) {
306 Ok(Some(db_res)) => Some(db_res),
307 Ok(None) => {
308 warn!("Authorize: cache database not found");
309 None
310 },
311 Err(e) => {
312 warn!("Authorize: Err opening cache database: {:?}", e);
313 None
314 },
315 };
316
317 (Some(txn_cache), db, true)
318 } else {
319 (None, None, false)
320 };
321
322 let cache_txn_ptr = if cache_txn_ref {
323 cache_txn_owned.as_ref().map(|v| &**v)
324 } else {
325 None
326 };
327
328 let mut storage = AzLmdbStorage {
329 txn: &txn,
330 db,
331 cache_txn: cache_txn_ptr,
332 cache_db,
333 stat: &mut self.stat,
334 };
335
336 authorize(uri, user_uri, request_access, &mut storage, trace)
337 }
338}
339