1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use v_authorization::common::AuthorizationContext;
3use chrono::{DateTime, Utc};
4use io::Error;
5use heed::{Env, EnvOpenOptions, Database, RoTxn};
6use heed::types::Str;
7use std::cmp::PartialEq;
8use std::io::ErrorKind;
9use std::mem::ManuallyDrop;
10use std::path::PathBuf;
11use std::time;
12use std::time::SystemTime;
13use std::{io, thread};
14use v_authorization::common::{Storage, Trace};
15use v_authorization::*;
16
17const DB_PATH: &str = "./data/acl-indexes/";
18const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
19
20use crate::stat_manager::StatPub;
21
22#[derive(Debug, Eq, PartialEq, Clone)]
23enum StatMode {
24 Full,
25 Minimal,
26 None,
27}
28
29struct Stat {
30 point: StatPub,
31 mode: StatMode,
32}
33
34pub struct LmdbAzContext {
35 env: ManuallyDrop<Env>,
36 cache_env: Option<Env>,
37 authorize_counter: u64,
38 max_authorize_counter: u64,
39 stat: Option<Stat>,
40}
41
42fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
43 loop {
44 let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
45
46 if !path.exists() {
47 error!("LIB_AZ: Database does not exist at path: {}", path.display());
48 thread::sleep(time::Duration::from_secs(3));
49 error!("Retrying database connection...");
50 continue;
51 }
52
53 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
54 Ok(env) => {
55 info!("LIB_AZ: Opened environment at path: {}", DB_PATH);
56
57 let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
58 point: p,
59 mode: stat_mode.clone(),
60 });
61
62 if let Some(_stat) = &stat_ctx {
63 info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
64 info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
65 }
66
67 return if use_cache.unwrap_or(false) {
68 let cache_env = match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
69 Ok(env) => {
70 info!("LIB_AZ: Opened cache environment at path: {}", CACHE_DB_PATH);
71 Some(env)
72 },
73 Err(e) => {
74 warn!("LIB_AZ: Error opening cache environment: {:?}. Proceeding without cache.", e);
75 None
76 },
77 };
78
79 LmdbAzContext {
80 env: ManuallyDrop::new(env),
81 cache_env,
82 authorize_counter: 0,
83 max_authorize_counter: max_read_counter,
84 stat: stat_ctx,
85 }
86 } else {
87 LmdbAzContext {
88 env: ManuallyDrop::new(env),
89 cache_env: None,
90 authorize_counter: 0,
91 max_authorize_counter: max_read_counter,
92 stat: stat_ctx,
93 }
94 };
95 },
96 Err(e) => {
97 error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
98 thread::sleep(time::Duration::from_secs(3));
99 },
100 }
101 }
102}
103
104impl LmdbAzContext {
105 pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
106 let mode = if let Some(v) = stat_mode_str {
107 match v.to_lowercase().as_str() {
108 "full" => StatMode::Full,
109 "minimal" => StatMode::Minimal,
110 "off" => StatMode::None,
111 "none" => StatMode::None,
112 _ => StatMode::Full,
113 }
114 } else {
115 StatMode::Full
116 };
117
118 open(max_read_counter, stat_collector_url, mode, use_cache)
119 }
120
121 pub fn new(max_read_counter: u64) -> LmdbAzContext {
122 open(max_read_counter, None, StatMode::None, None)
123 }
124}
125
126impl Default for LmdbAzContext {
127 fn default() -> Self {
128 Self::new(u64::MAX)
129 }
130}
131
132impl Drop for LmdbAzContext {
133 fn drop(&mut self) {
134 unsafe {
136 ManuallyDrop::drop(&mut self.env);
137 }
138 }
139}
140
141impl AuthorizationContext for LmdbAzContext {
142 fn authorize(&mut self, uri: &str, user_uri: &str, request_access: u8, _is_check_for_reload: bool) -> Result<u8, std::io::Error> {
143 let mut t = Trace {
144 acl: &mut String::new(),
145 is_acl: false,
146 group: &mut String::new(),
147 is_group: false,
148 info: &mut String::new(),
149 is_info: false,
150 str_num: 0,
151 };
152
153 let start_time = SystemTime::now();
154
155 let r = self.authorize_and_trace(uri, user_uri, request_access, _is_check_for_reload, &mut t);
156
157 if let Some(stat) = &mut self.stat {
158 if stat.mode == StatMode::Full || stat.mode == StatMode::Minimal {
159 let elapsed = start_time.elapsed().unwrap_or_default();
160 stat.point.set_duration(elapsed);
161 if let Err(e) = stat.point.flush() {
162 warn!("fail flush stat, err={:?}", e);
163 }
164 }
165 }
166
167 r
168 }
169
170 fn authorize_and_trace(
171 &mut self,
172 uri: &str,
173 user_uri: &str,
174 request_access: u8,
175 _is_check_for_reload: bool,
176 trace: &mut Trace,
177 ) -> Result<u8, std::io::Error> {
178 self.authorize_counter += 1;
179 if self.authorize_counter >= self.max_authorize_counter {
181 self.authorize_counter = 0;
183
184 unsafe {
186 ManuallyDrop::drop(&mut self.env);
187 }
188
189 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
191 Ok(env1) => {
192 self.env = ManuallyDrop::new(env1);
193 info!("LIB_AZ: Reopened environment to free memory");
194 },
195 Err(e1) => {
196 error!("Authorize: Error reopening environment: {:?}", e1);
197 return Err(Error::new(ErrorKind::Other, format!("Authorize: Err reopening environment: {:?}", e1)));
198 },
199 }
200 }
201
202 match self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace) {
203 Ok(r) => {
204 return Ok(r);
205 },
206 Err(e) => {
207 info!("retrying authorization after error, reopening environment");
209
210 unsafe {
212 ManuallyDrop::drop(&mut self.env);
213 }
214
215 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
217 Ok(env1) => {
218 self.env = ManuallyDrop::new(env1);
219 },
220 Err(e1) => {
221 error!("Authorize: Error reopening environment after error: {:?}", e1);
222 return Err(e);
223 },
224 }
225 },
226 }
227 self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace)
229 }
230}
231
232pub struct AzLmdbStorage<'a> {
233 txn: &'a RoTxn<'a>,
234 db: Database<Str, Str>,
235 cache_txn: Option<&'a RoTxn<'a>>,
236 cache_db: Option<Database<Str, Str>>,
237 stat: &'a mut Option<Stat>,
238}
239
240fn message(key: &str, use_cache: bool, from_cache: bool) -> String {
241 match (use_cache, from_cache) {
242 (true, true) => format!("{}/C", key),
243 (true, false) => format!("{}/cB", key),
244 (false, _) => format!("{}/B", key),
245 }
246}
247
248impl<'a> Storage for AzLmdbStorage<'a> {
249 fn get(&mut self, key: &str) -> io::Result<Option<String>> {
250 if let Some(cache_db) = self.cache_db {
251 if let Some(cache_txn) = self.cache_txn {
252 match cache_db.get(cache_txn, key) {
253 Ok(Some(val)) => {
254 if let Some(stat) = self.stat {
255 if stat.mode == StatMode::Full {
256 stat.point.collect(message(key, true, true));
257 }
258 }
259 debug!("@cache val={}", val);
260 return Ok(Some(val.to_string()));
261 },
262 Ok(None) => {
263 },
265 Err(_e) => {
266 },
268 }
269 }
270 }
271
272 match self.db.get(self.txn, key) {
273 Ok(Some(val)) => {
274 if let Some(stat) = self.stat {
275 if stat.mode == StatMode::Full {
276 stat.point.collect(message(key, self.cache_db.is_some(), false));
277 }
278 }
279 debug!("@db val={}", val);
280 Ok(Some(val.to_string()))
281 },
282 Ok(None) => Ok(None),
283 Err(e) => Err(Error::new(ErrorKind::Other, format!("Authorize: db.get {:?}, {}", e, key))),
284 }
285 }
286
287 fn fiber_yield(&self) {}
288
289 fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
290 decode_rec_to_rights(src, result)
291 }
292
293 fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
294 decode_rec_to_rightset(src, new_rights)
295 }
296
297 fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
298 decode_filter(filter_value)
299 }
300}
301
302impl LmdbAzContext {
303 fn authorize_use_db(
304 &mut self,
305 uri: &str,
306 user_uri: &str,
307 request_access: u8,
308 _is_check_for_reload: bool,
309 trace: &mut Trace,
310 ) -> Result<u8, std::io::Error> {
311 let txn = match self.env.read_txn() {
312 Ok(txn1) => txn1,
313 Err(e) => {
314 return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING TRANSACTION {:?}", e)));
315 },
316 };
317
318 let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
319 Ok(Some(db_res)) => db_res,
320 Ok(None) => {
321 return Err(Error::new(ErrorKind::Other, "Authorize: database not found"));
322 },
323 Err(e) => {
324 return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening database: {:?}", e)));
325 },
326 };
327
328 let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
329 let txn_cache = match env.read_txn() {
330 Ok(txn1) => txn1,
331 Err(e) => {
332 return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
333 },
334 };
335
336 let db = match env.open_database(&txn_cache, None) {
337 Ok(Some(db_res)) => Some(db_res),
338 Ok(None) => {
339 warn!("Authorize: cache database not found");
340 None
341 },
342 Err(e) => {
343 warn!("Authorize: Err opening cache database: {:?}", e);
344 None
345 },
346 };
347
348 (Some(txn_cache), db, true)
349 } else {
350 (None, None, false)
351 };
352
353 let cache_txn_ptr = if cache_txn_ref {
354 cache_txn_owned.as_ref().map(|v| &**v)
355 } else {
356 None
357 };
358
359 let mut storage = AzLmdbStorage {
360 txn: &txn,
361 db,
362 cache_txn: cache_txn_ptr,
363 cache_db,
364 stat: &mut self.stat,
365 };
366
367 authorize(uri, user_uri, request_access, &mut storage, trace)
368 }
369}
370