guardian_db/access_controller/
guardian.rs1use crate::access_controller::manifest::CreateAccessControllerOptions;
2use crate::access_controller::{manifest::ManifestParams, utils};
3use crate::address::Address;
4use crate::error::{GuardianError, Result};
5use crate::ipfs_log::{access_controller, identity_provider::IdentityProvider};
6use crate::p2p::events::{Emitter, EventBus};
7use crate::traits::{CreateDBOptions, GuardianDBKVStoreProvider, KeyValueStore};
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::{Span, debug, instrument, warn};
12
13type KVStoreType =
15 RwLock<Option<Arc<tokio::sync::Mutex<Box<dyn KeyValueStore<Error = GuardianError>>>>>>;
16
17#[derive(Debug, Clone)]
19struct StringAddress(String);
20
21impl std::fmt::Display for StringAddress {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "{}", self.0)
24 }
25}
26
27impl Address for StringAddress {
28 fn get_root(&self) -> cid::Cid {
29 cid::Cid::default() }
31
32 fn get_path(&self) -> &str {
33 &self.0
34 }
35
36 fn equals(&self, other: &dyn Address) -> bool {
37 format!("{}", self) == format!("{}", other)
38 }
39}
40
41#[derive(Debug, Clone)]
42pub struct EventUpdated {
43 pub controller_type: String,
44 pub address: String,
45 pub action: String,
46 pub timestamp: chrono::DateTime<chrono::Utc>,
47}
48
49impl EventUpdated {
50 pub fn new(controller_type: String, address: String, action: String) -> Self {
51 Self {
52 controller_type,
53 address,
54 action,
55 timestamp: chrono::Utc::now(),
56 }
57 }
58}
59
60pub struct GuardianDBAccessController {
61 event_bus: EventBus,
63
64 event_emitter: Arc<tokio::sync::Mutex<Option<Emitter<EventUpdated>>>>,
66
67 guardian_db: Arc<dyn GuardianDBKVStoreProvider<Error = GuardianError>>,
69
70 kv_store: KVStoreType,
73
74 options: Box<dyn ManifestParams>,
76
77 span: Span,
79}
80
81impl GuardianDBAccessController {
82 pub fn span(&self) -> &Span {
84 &self.span
85 }
86
87 pub fn get_type(&self) -> &'static str {
88 "GuardianDB"
89 }
90
91 pub async fn address(&self) -> Option<Box<dyn Address>> {
92 let store_guard = self.kv_store.read().await;
93 if let Some(store_arc) = store_guard.as_ref() {
95 let store = store_arc.lock().await;
96 let addr = store.address();
97 let addr_string = format!("{}", addr);
98 Some(Box::new(StringAddress(addr_string)) as Box<dyn Address>)
99 } else {
100 None
101 }
102 }
103
104 pub async fn get_authorized_by_role(&self, role: &str) -> Result<Vec<String>> {
105 let authorizations = self.get_authorizations().await?;
106
107 Ok(authorizations.get(role).cloned().unwrap_or_default())
109 }
110
111 async fn get_authorizations(&self) -> Result<HashMap<String, Vec<String>>> {
112 let mut authorizations_set: HashMap<String, HashSet<String>> = HashMap::new();
113
114 let store_guard = self.kv_store.read().await;
115 let store = match store_guard.as_ref() {
116 Some(s) => s,
117 None => return Ok(HashMap::new()),
119 };
120
121 let store_lock = store.lock().await;
123 let all_data = store_lock.all();
124
125 for (role, key_bytes) in all_data {
126 let authorized_keys: Vec<String> = serde_json::from_slice(&key_bytes)?;
127
128 let entry = authorizations_set.entry(role).or_default();
129 for key in authorized_keys {
130 entry.insert(key);
131 }
132 }
133
134 if let Some(write_keys) = authorizations_set.get("write").cloned() {
136 let admin_keys = authorizations_set.entry("admin".to_string()).or_default();
137 for key in write_keys.iter() {
138 admin_keys.insert(key.clone());
139 }
140 }
141
142 let authorizations_list = authorizations_set
144 .into_iter()
145 .map(|(permission, keys)| (permission, keys.into_iter().collect()))
146 .collect();
147
148 Ok(authorizations_list)
149 }
150
151 #[instrument(skip(self, entry, identity_provider, _additional_context))]
152 pub async fn can_append(
153 &self,
154 entry: &dyn access_controller::LogEntry,
155 identity_provider: &dyn IdentityProvider,
156 _additional_context: &dyn access_controller::CanAppendAdditionalContext,
157 ) -> Result<()> {
158 let write_access = self.get_authorized_by_role("write").await?;
159 let admin_access = self.get_authorized_by_role("admin").await?;
160
161 let access: HashSet<String> = write_access
162 .into_iter()
163 .chain(admin_access.into_iter())
164 .collect();
165
166 let entry_id = entry.get_identity().id();
167
168 if access.contains(entry_id) || access.contains("*") {
170 identity_provider
171 .verify_identity(entry.get_identity())
172 .await?;
173 return Ok(());
174 }
175
176 Err(GuardianError::Store("Não autorizado".to_string()))
177 }
178
179 #[allow(dead_code)]
180 #[instrument(skip(self), fields(capability = %capability, key_id = %key_id))]
181 pub async fn grant(&self, capability: &str, key_id: &str) -> Result<()> {
182 {
184 let store_guard = self.kv_store.read().await;
185 let store_arc = store_guard
186 .as_ref()
187 .ok_or_else(|| GuardianError::Store("kv_store não inicializado".to_string()))?;
188
189 let mut capabilities: HashSet<String> = self
191 .get_authorized_by_role(capability)
192 .await?
193 .into_iter()
194 .collect();
195
196 capabilities.insert(key_id.to_string());
197
198 let capabilities_vec: Vec<String> = capabilities.into_iter().collect();
199
200 let capabilities_json = serde_json::to_vec(&capabilities_vec)?;
201
202 let mut store = store_arc.lock().await;
204 store
205 .put(capability, capabilities_json)
206 .await
207 .map_err(|e| GuardianError::Store(format!("Erro ao salvar no store: {}", e)))?;
208 } self.on_update("grant", capability, key_id).await;
212
213 Ok(())
214 }
215
216 #[allow(dead_code)]
217 #[instrument(skip(self), fields(capability = %capability, key_id = %key_id))]
218 pub async fn revoke(&self, capability: &str, key_id: &str) -> Result<()> {
219 {
221 let store_guard = self.kv_store.read().await;
222 let store_arc = store_guard
223 .as_ref()
224 .ok_or_else(|| GuardianError::Store("kv_store não inicializado".to_string()))?;
225
226 let mut capabilities: Vec<String> = self.get_authorized_by_role(capability).await?;
227
228 capabilities.retain(|id| id != key_id);
230
231 let mut store = store_arc.lock().await;
232 if !capabilities.is_empty() {
233 let capabilities_json = serde_json::to_vec(&capabilities)?;
234
235 store
237 .put(capability, capabilities_json)
238 .await
239 .map_err(|e| {
240 GuardianError::Store(format!("Erro ao persistir permissões: {}", e))
241 })?;
242 } else {
243 store.delete(capability).await.map_err(|e| {
245 GuardianError::Store(format!("Erro ao remover permissões: {}", e))
246 })?;
247 }
248 } self.on_update("revoke", capability, key_id).await;
252
253 Ok(())
254 }
255
256 #[instrument(skip(self), fields(address = %address))]
257 pub async fn load(&self, address: &str) -> Result<()> {
258 let mut store_guard = self.kv_store.write().await;
259 if let Some(_store) = store_guard.take() {
261 }
263
264 let write_access = self.options.get_access("admin");
265 let write_access = match write_access {
266 Some(access) if !access.is_empty() => access,
267 _ => {
268 vec!["*".to_string()] }
271 };
272
273 let db_address = utils::ensure_address(address);
274
275 let mut store_options = CreateDBOptions::default();
276 let ipfs_ac_params = CreateAccessControllerOptions::new_simple("ipfs".to_string(), {
278 let mut access = HashMap::new();
279 access.insert("write".to_string(), write_access);
280 access
281 });
282 store_options.access_controller = Some(Box::new(ipfs_ac_params));
283
284 let store = self
286 .guardian_db
287 .key_value(&db_address, &mut store_options)
288 .await
289 .map_err(|e| GuardianError::Store(format!("Erro ao abrir key-value store: {}", e)))?;
290
291 *store_guard = Some(Arc::new(tokio::sync::Mutex::new(store)));
293
294 Ok(())
295 }
296
297 #[instrument(skip(self))]
298 pub async fn save(&self) -> Result<Box<dyn ManifestParams>> {
299 let store_guard = self.kv_store.read().await;
300 let store_arc = store_guard
301 .as_ref()
302 .ok_or_else(|| GuardianError::Store("kv_store não inicializado".to_string()))?;
303
304 let store = store_arc.lock().await;
305 let addr = store.address();
306 let addr_string = format!("{}", addr);
307
308 debug!(target: "access_controller", address = %addr_string, "Save executado para o store");
309
310 let cid = cid::Cid::default();
313
314 let params = CreateAccessControllerOptions::new(cid, false, "GuardianDB".to_string());
316 Ok(Box::new(params))
317 }
318
319 #[instrument(skip(self))]
320 pub async fn close(&self) -> Result<()> {
321 let mut store_guard = self.kv_store.write().await;
322 if let Some(store_arc) = store_guard.take() {
323 let store = store_arc.lock().await;
325 match store.close().await {
326 Ok(_) => debug!(target: "access_controller", "Store fechado com sucesso"),
327 Err(e) => warn!(target: "access_controller", error = %e, "Erro ao fechar o store"),
328 }
329 }
330 Ok(())
331 }
332
333 async fn on_update(&self, action: &str, capability: &str, key_id: &str) {
334 let mut emitter_guard = self.event_emitter.lock().await;
335
336 if emitter_guard.is_none() {
338 match self.event_bus.emitter::<EventUpdated>().await {
339 Ok(emitter) => {
340 *emitter_guard = Some(emitter);
341 }
342 Err(e) => {
343 warn!(target: "GuardianDB::ac", error = %e, "Falha ao inicializar event emitter");
344 return;
345 }
346 }
347 }
348
349 if let Some(emitter) = emitter_guard.as_ref() {
351 let address = self
352 .address()
353 .await
354 .map(|addr| format!("{}", addr))
355 .unwrap_or_else(|| "unknown".to_string());
356
357 let event = EventUpdated::new(
358 "guardian".to_string(),
359 address,
360 format!("{}:{}:{}", action, capability, key_id),
361 );
362
363 if let Err(e) = emitter.emit(event) {
364 warn!(target: "GuardianDB::ac", error = %e, "Falha ao emitir evento de atualização");
365 } else {
366 debug!(target: "GuardianDB::ac", action = %action, capability = %capability, key_id = %key_id, "Evento emitido com sucesso");
367 }
368 }
369 }
370
371 #[instrument(skip(guardian_db, params))]
372 pub async fn new(
373 guardian_db: Arc<dyn GuardianDBKVStoreProvider<Error = GuardianError>>,
374 params: Box<dyn crate::access_controller::manifest::ManifestParams>,
375 ) -> std::result::Result<Self, GuardianError> {
376 let kv_provider = guardian_db;
377 let addr_str = if !params.address().to_string().is_empty() {
378 params.address().to_string()
379 } else {
380 "default-access-controller".to_string()
381 };
382
383 let mut opts = CreateDBOptions::default();
384 let kv_store = kv_provider
385 .key_value(&addr_str, &mut opts)
386 .await
387 .map_err(|e| {
388 GuardianError::Store(format!("Erro ao inicializar key-value store: {}", e))
389 })?;
390
391 debug!(target: "access_controller", address = %addr_str, "Key-value store inicializada");
392
393 let event_bus = EventBus::new();
395 let _write_access = params.get_access("write");
396 let controller = Self {
397 event_bus,
398 event_emitter: Arc::new(tokio::sync::Mutex::new(None)), guardian_db: kv_provider,
400 kv_store: RwLock::new(Some(Arc::new(tokio::sync::Mutex::new(kv_store)))), options: params,
402 span: tracing::info_span!("guardian_access_controller", address = %addr_str),
404 };
405
406 let write_access = controller.options.get_access("write");
408 if let Some(access_keys) = write_access {
409 for key in access_keys {
410 controller.grant("write", &key).await?;
411 }
412 }
413
414 Ok(controller)
415 }
416}
417
418#[async_trait::async_trait]
420impl crate::access_controller::traits::AccessController for GuardianDBAccessController {
421 fn get_type(&self) -> &str {
422 "guardian"
423 }
424
425 async fn get_authorized_by_role(&self, role: &str) -> Result<Vec<String>> {
426 self.get_authorized_by_role(role).await
427 }
428
429 async fn grant(&self, capability: &str, key_id: &str) -> Result<()> {
430 self.grant(capability, key_id).await
431 }
432
433 async fn revoke(&self, capability: &str, key_id: &str) -> Result<()> {
434 self.revoke(capability, key_id).await
435 }
436
437 async fn load(&self, address: &str) -> Result<()> {
438 self.load(address).await
439 }
440
441 async fn save(&self) -> Result<Box<dyn crate::access_controller::manifest::ManifestParams>> {
442 self.save().await
443 }
444
445 async fn close(&self) -> Result<()> {
446 self.close().await
447 }
448
449 async fn can_append(
450 &self,
451 entry: &dyn crate::ipfs_log::access_controller::LogEntry,
452 identity_provider: &dyn crate::ipfs_log::identity_provider::IdentityProvider,
453 additional_context: &dyn crate::ipfs_log::access_controller::CanAppendAdditionalContext,
454 ) -> Result<()> {
455 self.can_append(entry, identity_provider, additional_context)
456 .await
457 }
458}