1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use v_authorization::common::AuthorizationContext;
3use chrono::{DateTime, Utc};
4use io::Error;
5use heed::{Env, EnvOpenOptions, Database, RoTxn};
6use heed::types::Str;
7use std::cmp::PartialEq;
8use std::io::ErrorKind;
9use std::path::PathBuf;
10use std::time;
11use std::time::SystemTime;
12use std::{io, thread};
13use v_authorization::common::{Storage, Trace};
14use v_authorization::*;
15
16const DB_PATH: &str = "./data/acl-indexes/";
17const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
18
19use crate::stat_manager::StatPub;
20
21#[derive(Debug, Eq, PartialEq, Clone)]
22enum StatMode {
23 Full,
24 Minimal,
25 None,
26}
27
28struct Stat {
29 point: StatPub,
30 mode: StatMode,
31}
32
33pub struct LmdbAzContext {
34 env: Env,
35 cache_env: Option<Env>,
36 authorize_counter: u64,
37 max_authorize_counter: u64,
38 stat: Option<Stat>,
39}
40
41fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
42 loop {
43 let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
44
45 if !path.exists() {
46 error!("LIB_AZ: Database does not exist at path: {}", path.display());
47 thread::sleep(time::Duration::from_secs(3));
48 error!("Retrying database connection...");
49 continue;
50 }
51
52 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
53 Ok(env) => {
54 info!("LIB_AZ: Opened environment at path: {}", DB_PATH);
55
56 let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
57 point: p,
58 mode: stat_mode.clone(),
59 });
60
61 if let Some(_stat) = &stat_ctx {
62 info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
63 info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
64 }
65
66 return if use_cache.unwrap_or(false) {
67 let cache_env = match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
68 Ok(env) => {
69 info!("LIB_AZ: Opened cache environment at path: {}", CACHE_DB_PATH);
70 Some(env)
71 },
72 Err(e) => {
73 warn!("LIB_AZ: Error opening cache environment: {:?}. Proceeding without cache.", e);
74 None
75 },
76 };
77
78 LmdbAzContext {
79 env,
80 cache_env,
81 authorize_counter: 0,
82 max_authorize_counter: max_read_counter,
83 stat: stat_ctx,
84 }
85 } else {
86 LmdbAzContext {
87 env,
88 cache_env: None,
89 authorize_counter: 0,
90 max_authorize_counter: max_read_counter,
91 stat: stat_ctx,
92 }
93 };
94 },
95 Err(e) => {
96 error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
97 thread::sleep(time::Duration::from_secs(3));
98 },
99 }
100 }
101}
102
103impl LmdbAzContext {
104 pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
105 let mode = if let Some(v) = stat_mode_str {
106 match v.to_lowercase().as_str() {
107 "full" => StatMode::Full,
108 "minimal" => StatMode::Minimal,
109 "off" => StatMode::None,
110 "none" => StatMode::None,
111 _ => StatMode::Full,
112 }
113 } else {
114 StatMode::Full
115 };
116
117 open(max_read_counter, stat_collector_url, mode, use_cache)
118 }
119
120 pub fn new(max_read_counter: u64) -> LmdbAzContext {
121 open(max_read_counter, None, StatMode::None, None)
122 }
123}
124
125impl Default for LmdbAzContext {
126 fn default() -> Self {
127 Self::new(u64::MAX)
128 }
129}
130
131impl AuthorizationContext for LmdbAzContext {
132 fn authorize(&mut self, uri: &str, user_uri: &str, request_access: u8, _is_check_for_reload: bool) -> Result<u8, std::io::Error> {
133 let mut t = Trace {
134 acl: &mut String::new(),
135 is_acl: false,
136 group: &mut String::new(),
137 is_group: false,
138 info: &mut String::new(),
139 is_info: false,
140 str_num: 0,
141 };
142
143 let start_time = SystemTime::now();
144
145 let r = self.authorize_and_trace(uri, user_uri, request_access, _is_check_for_reload, &mut t);
146
147 if let Some(stat) = &mut self.stat {
148 if stat.mode == StatMode::Full || stat.mode == StatMode::Minimal {
149 let elapsed = start_time.elapsed().unwrap_or_default();
150 stat.point.set_duration(elapsed);
151 if let Err(e) = stat.point.flush() {
152 warn!("fail flush stat, err={:?}", e);
153 }
154 }
155 }
156
157 r
158 }
159
160 fn authorize_and_trace(
161 &mut self,
162 uri: &str,
163 user_uri: &str,
164 request_access: u8,
165 _is_check_for_reload: bool,
166 trace: &mut Trace,
167 ) -> Result<u8, std::io::Error> {
168 self.authorize_counter += 1;
169 if self.authorize_counter >= self.max_authorize_counter {
171 self.authorize_counter = 0;
173
174 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
175 Ok(env1) => {
176 self.env = env1;
177 },
178 Err(e1) => {
179 return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening environment: {:?}", e1)));
180 },
181 }
182 }
183
184 match self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace) {
185 Ok(r) => {
186 return Ok(r);
187 },
188 Err(e) => {
189 info!("reopen");
190
191 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
192 Ok(env1) => {
193 self.env = env1;
194 },
195 Err(e1) => {
196 error!("Authorize: Err opening environment: {:?}", e1);
197 return Err(e);
198 },
199 }
200 },
201 }
202 self.authorize_use_db(uri, user_uri, request_access, _is_check_for_reload, trace)
204 }
205}
206
207pub struct AzLmdbStorage<'a> {
208 txn: &'a RoTxn<'a>,
209 db: Database<Str, Str>,
210 cache_txn: Option<&'a RoTxn<'a>>,
211 cache_db: Option<Database<Str, Str>>,
212 stat: &'a mut Option<Stat>,
213}
214
215fn message(key: &str, use_cache: bool, from_cache: bool) -> String {
216 match (use_cache, from_cache) {
217 (true, true) => format!("{}/C", key),
218 (true, false) => format!("{}/cB", key),
219 (false, _) => format!("{}/B", key),
220 }
221}
222
223impl<'a> Storage for AzLmdbStorage<'a> {
224 fn get(&mut self, key: &str) -> io::Result<Option<String>> {
225 if let Some(cache_db) = self.cache_db {
226 if let Some(cache_txn) = self.cache_txn {
227 match cache_db.get(cache_txn, key) {
228 Ok(Some(val)) => {
229 if let Some(stat) = self.stat {
230 if stat.mode == StatMode::Full {
231 stat.point.collect(message(key, true, true));
232 }
233 }
234 debug!("@cache val={}", val);
235 return Ok(Some(val.to_string()));
236 },
237 Ok(None) => {
238 },
240 Err(_e) => {
241 },
243 }
244 }
245 }
246
247 match self.db.get(self.txn, key) {
248 Ok(Some(val)) => {
249 if let Some(stat) = self.stat {
250 if stat.mode == StatMode::Full {
251 stat.point.collect(message(key, self.cache_db.is_some(), false));
252 }
253 }
254 debug!("@db val={}", val);
255 Ok(Some(val.to_string()))
256 },
257 Ok(None) => Ok(None),
258 Err(e) => Err(Error::new(ErrorKind::Other, format!("Authorize: db.get {:?}, {}", e, key))),
259 }
260 }
261
262 fn fiber_yield(&self) {}
263
264 fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
265 decode_rec_to_rights(src, result)
266 }
267
268 fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
269 decode_rec_to_rightset(src, new_rights)
270 }
271
272 fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
273 decode_filter(filter_value)
274 }
275}
276
277impl LmdbAzContext {
278 fn authorize_use_db(
279 &mut self,
280 uri: &str,
281 user_uri: &str,
282 request_access: u8,
283 _is_check_for_reload: bool,
284 trace: &mut Trace,
285 ) -> Result<u8, std::io::Error> {
286 let txn = match self.env.read_txn() {
287 Ok(txn1) => txn1,
288 Err(e) => {
289 return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING TRANSACTION {:?}", e)));
290 },
291 };
292
293 let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
294 Ok(Some(db_res)) => db_res,
295 Ok(None) => {
296 return Err(Error::new(ErrorKind::Other, "Authorize: database not found"));
297 },
298 Err(e) => {
299 return Err(Error::new(ErrorKind::Other, format!("Authorize: Err opening database: {:?}", e)));
300 },
301 };
302
303 let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
304 let txn_cache = match env.read_txn() {
305 Ok(txn1) => txn1,
306 Err(e) => {
307 return Err(Error::new(ErrorKind::Other, format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
308 },
309 };
310
311 let db = match env.open_database(&txn_cache, None) {
312 Ok(Some(db_res)) => Some(db_res),
313 Ok(None) => {
314 warn!("Authorize: cache database not found");
315 None
316 },
317 Err(e) => {
318 warn!("Authorize: Err opening cache database: {:?}", e);
319 None
320 },
321 };
322
323 (Some(txn_cache), db, true)
324 } else {
325 (None, None, false)
326 };
327
328 let cache_txn_ptr = if cache_txn_ref {
329 cache_txn_owned.as_ref().map(|v| &**v)
330 } else {
331 None
332 };
333
334 let mut storage = AzLmdbStorage {
335 txn: &txn,
336 db,
337 cache_txn: cache_txn_ptr,
338 cache_db,
339 stat: &mut self.stat,
340 };
341
342 authorize(uri, user_uri, request_access, &mut storage, trace)
343 }
344}
345