1use v_authorization::record_formats::{decode_filter, decode_rec_to_rights, decode_rec_to_rightset};
2use chrono::{DateTime, Utc};
3use io::Error;
4use heed::{Env, EnvOpenOptions, Database, RoTxn};
5use heed::types::Str;
6use std::path::PathBuf;
7use std::sync::{Arc, OnceLock};
8use std::time;
9use std::{io, thread};
10use v_authorization::common::{Storage, Trace};
11use v_authorization::*;
12
13const DB_PATH: &str = "./data/acl-indexes/";
14const CACHE_DB_PATH: &str = "./data/acl-cache-indexes/";
15
16use crate::stat_manager::{StatPub, StatMode, Stat, parse_stat_mode, format_stat_message};
17use crate::common::AuthorizationHelper;
18use crate::impl_authorization_context;
19
20static GLOBAL_ENV: OnceLock<Arc<Env>> = OnceLock::new();
22static GLOBAL_CACHE_ENV: OnceLock<Arc<Env>> = OnceLock::new();
23
24pub struct LmdbAzContext {
25 env: Arc<Env>,
26 cache_env: Option<Arc<Env>>,
27 authorize_counter: u64,
28 max_authorize_counter: u64,
29 stat: Option<Stat>,
30}
31
32fn open(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode: StatMode, use_cache: Option<bool>) -> LmdbAzContext {
33 let env = GLOBAL_ENV.get_or_init(|| {
35 loop {
36 let path: PathBuf = PathBuf::from(format!("{}{}", DB_PATH, "data.mdb"));
37
38 if !path.exists() {
39 error!("LIB_AZ: Database does not exist at path: {}", path.display());
40 thread::sleep(time::Duration::from_secs(3));
41 error!("Retrying database connection...");
42 continue;
43 }
44
45 match unsafe { EnvOpenOptions::new().max_dbs(1).open(DB_PATH) } {
46 Ok(env) => {
47 info!("LIB_AZ: Opened shared environment at path: {}", DB_PATH);
48 return Arc::new(env);
49 },
50 Err(e) => {
51 error!("Authorize: Error opening environment: {:?}. Retrying in 3 seconds...", e);
52 thread::sleep(time::Duration::from_secs(3));
53 },
54 }
55 }
56 }).clone();
57
58 let stat_ctx = stat_collector_url.clone().and_then(|s| StatPub::new(&s).ok()).map(|p| Stat {
59 point: p,
60 mode: stat_mode.clone(),
61 });
62
63 if let Some(_stat) = &stat_ctx {
64 info!("LIB_AZ: Stat collector URL: {:?}", stat_collector_url);
65 info!("LIB_AZ: Stat mode: {:?}", &stat_mode);
66 }
67
68 let cache_env = if use_cache.unwrap_or(false) {
69 let cache_result = GLOBAL_CACHE_ENV.get_or_init(|| {
71 match unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH) } {
72 Ok(env) => {
73 info!("LIB_AZ: Opened shared cache environment at path: {}", CACHE_DB_PATH);
74 Arc::new(env)
75 },
76 Err(e) => {
77 warn!("LIB_AZ: Error opening cache environment: {:?}. Cache will not be used.", e);
78 Arc::new(unsafe { EnvOpenOptions::new().max_dbs(1).open(CACHE_DB_PATH).unwrap() })
80 },
81 }
82 });
83 Some(cache_result.clone())
84 } else {
85 None
86 };
87
88 LmdbAzContext {
89 env,
90 cache_env,
91 authorize_counter: 0,
92 max_authorize_counter: max_read_counter,
93 stat: stat_ctx,
94 }
95}
96
97impl LmdbAzContext {
98 pub fn new_with_config(max_read_counter: u64, stat_collector_url: Option<String>, stat_mode_str: Option<String>, use_cache: Option<bool>) -> LmdbAzContext {
99 let mode = parse_stat_mode(stat_mode_str);
100 open(max_read_counter, stat_collector_url, mode, use_cache)
101 }
102
103 pub fn new(max_read_counter: u64) -> LmdbAzContext {
104 open(max_read_counter, None, StatMode::None, None)
105 }
106}
107
108impl Default for LmdbAzContext {
109 fn default() -> Self {
110 Self::new(u64::MAX)
111 }
112}
113
114impl AuthorizationHelper for LmdbAzContext {
115 fn get_stat_mut(&mut self) -> &mut Option<Stat> {
116 &mut self.stat
117 }
118
119 fn get_authorize_counter(&self) -> u64 {
120 self.authorize_counter
121 }
122
123 fn get_max_authorize_counter(&self) -> u64 {
124 self.max_authorize_counter
125 }
126
127 fn set_authorize_counter(&mut self, value: u64) {
128 self.authorize_counter = value;
129 }
130
131 fn authorize_use_db(
132 &mut self,
133 uri: &str,
134 user_uri: &str,
135 request_access: u8,
136 _is_check_for_reload: bool,
137 trace: &mut Trace,
138 ) -> Result<u8, std::io::Error> {
139 let txn = match self.env.read_txn() {
140 Ok(txn1) => txn1,
141 Err(e) => {
142 return Err(Error::other(format!("Authorize:CREATING TRANSACTION {:?}", e)));
143 },
144 };
145
146 let db: Database<Str, Str> = match self.env.open_database(&txn, None) {
147 Ok(Some(db_res)) => db_res,
148 Ok(None) => {
149 return Err(Error::other("Authorize: database not found"));
150 },
151 Err(e) => {
152 return Err(Error::other(format!("Authorize: Err opening database: {:?}", e)));
153 },
154 };
155
156 let (cache_txn_owned, cache_db, cache_txn_ref) = if let Some(env) = &self.cache_env {
157 let txn_cache = match env.read_txn() {
158 Ok(txn1) => txn1,
159 Err(e) => {
160 return Err(Error::other(format!("Authorize:CREATING CACHE TRANSACTION {:?}", e)));
161 },
162 };
163
164 let db = match env.open_database(&txn_cache, None) {
165 Ok(Some(db_res)) => Some(db_res),
166 Ok(None) => {
167 warn!("Authorize: cache database not found");
168 None
169 },
170 Err(e) => {
171 warn!("Authorize: Err opening cache database: {:?}", e);
172 None
173 },
174 };
175
176 (Some(txn_cache), db, true)
177 } else {
178 (None, None, false)
179 };
180
181 let cache_txn_ptr = if cache_txn_ref {
182 cache_txn_owned.as_deref()
183 } else {
184 None
185 };
186
187 let mut storage = AzLmdbStorage {
188 txn: &txn,
189 db,
190 cache_txn: cache_txn_ptr,
191 cache_db,
192 stat: &mut self.stat,
193 };
194
195 authorize(uri, user_uri, request_access, &mut storage, trace)
196 }
197}
198
199impl_authorization_context!(LmdbAzContext);
200
201pub struct AzLmdbStorage<'a> {
202 txn: &'a RoTxn<'a>,
203 db: Database<Str, Str>,
204 cache_txn: Option<&'a RoTxn<'a>>,
205 cache_db: Option<Database<Str, Str>>,
206 stat: &'a mut Option<Stat>,
207}
208
209impl<'a> Storage for AzLmdbStorage<'a> {
210 fn get(&mut self, key: &str) -> io::Result<Option<String>> {
211 if let Some(cache_db) = self.cache_db {
212 if let Some(cache_txn) = self.cache_txn {
213 match cache_db.get(cache_txn, key) {
214 Ok(Some(val)) => {
215 if let Some(stat) = self.stat {
216 if stat.mode == StatMode::Full {
217 stat.point.collect(format_stat_message(key, true, true));
218 }
219 }
220 debug!("@cache val={}", val);
221 return Ok(Some(val.to_string()));
222 },
223 Ok(None) => {
224 },
226 Err(_e) => {
227 },
229 }
230 }
231 }
232
233 match self.db.get(self.txn, key) {
234 Ok(Some(val)) => {
235 if let Some(stat) = self.stat {
236 if stat.mode == StatMode::Full {
237 stat.point.collect(format_stat_message(key, self.cache_db.is_some(), false));
238 }
239 }
240 debug!("@db val={}", val);
241 Ok(Some(val.to_string()))
242 },
243 Ok(None) => Ok(None),
244 Err(e) => Err(Error::other(format!("Authorize: db.get {:?}, {}", e, key))),
245 }
246 }
247
248 fn fiber_yield(&self) {}
249
250 fn decode_rec_to_rights(&self, src: &str, result: &mut Vec<ACLRecord>) -> (bool, Option<DateTime<Utc>>) {
251 decode_rec_to_rights(src, result)
252 }
253
254 fn decode_rec_to_rightset(&self, src: &str, new_rights: &mut ACLRecordSet) -> (bool, Option<DateTime<Utc>>) {
255 decode_rec_to_rightset(src, new_rights)
256 }
257
258 fn decode_filter(&self, filter_value: String) -> (Option<ACLRecord>, Option<DateTime<Utc>>) {
259 decode_filter(filter_value)
260 }
261}
262