guardian_db/access_controller/
ipfs.rs1use crate::access_controller::{
2 manifest::CreateAccessControllerOptions, manifest::Manifest, manifest::ManifestParams,
3};
4use crate::address::Address;
5use crate::error::{GuardianError, Result};
6use crate::ipfs_core_api::client::IpfsClient;
7use crate::ipfs_log::{access_controller::LogEntry, identity_provider::IdentityProvider};
8use async_trait::async_trait;
9use cid::Cid;
10use serde::{Deserialize, Serialize};
11use std::convert::TryFrom;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use tracing::{Span, debug, instrument, warn};
15
16#[derive(Debug, Serialize, Deserialize)]
17struct CborWriteAccess {
18 #[serde(rename = "write")]
19 write: String,
20}
21struct ControllerState {
22 write_access: Vec<String>,
23}
24
25pub struct IpfsAccessController {
27 ipfs: Arc<IpfsClient>,
28 state: RwLock<ControllerState>,
29 span: Span,
30}
31
32impl IpfsAccessController {
33 pub fn get_type(&self) -> &'static str {
34 "ipfs"
35 }
36
37 pub fn address(&self) -> Option<Box<dyn Address>> {
39 None
40 }
41
42 #[instrument(skip(self, entry, identity_provider, _additional_context))]
43 pub async fn can_append(
44 &self,
45 entry: &dyn LogEntry,
46 identity_provider: &dyn IdentityProvider,
47 _additional_context: &dyn crate::ipfs_log::access_controller::CanAppendAdditionalContext,
48 ) -> Result<()> {
49 let state = self.state.read().await;
50 let key = entry.get_identity().id();
51
52 for allowed_key in state.write_access.iter() {
53 if allowed_key == key || allowed_key == "*" {
54 return identity_provider
56 .verify_identity(entry.get_identity())
57 .await;
58 }
59 }
60
61 Err(GuardianError::Store(
62 "Chave não tem permissão de escrita".to_string(),
63 ))
64 }
65
66 pub async fn get_authorized_by_role(&self, role: &str) -> Result<Vec<String>> {
67 let state = self.state.read().await;
68 if role == "admin" || role == "write" {
70 Ok(state.write_access.clone())
71 } else {
72 Ok(vec![])
73 }
74 }
75
76 #[instrument(skip(self))]
77 pub async fn grant(&self, capability: &str, key_id: &str) -> Result<()> {
78 if capability != "write" {
79 return Err(GuardianError::Store(format!(
80 "IpfsAccessController only supports 'write' capability, got '{}'",
81 capability
82 )));
83 }
84
85 let mut state = self.state.write().await;
86 if !state.write_access.contains(&key_id.to_string()) {
87 state.write_access.push(key_id.to_string());
88 debug!(target: "ipfs_access_controller",
89 capability = %capability,
90 key_id = %key_id,
91 total_keys = state.write_access.len(),
92 "Permission granted successfully"
93 );
94 } else {
95 debug!(target: "ipfs_access_controller",
96 capability = %capability,
97 key_id = %key_id,
98 "Permission already exists"
99 );
100 }
101 Ok(())
102 }
103
104 #[instrument(skip(self))]
105 pub async fn revoke(&self, capability: &str, key_id: &str) -> Result<()> {
106 if capability != "write" {
107 return Err(GuardianError::Store(format!(
108 "IpfsAccessController only supports 'write' capability, got '{}'",
109 capability
110 )));
111 }
112
113 let mut state = self.state.write().await;
114 let initial_len = state.write_access.len();
115 state.write_access.retain(|k| k != key_id);
116
117 if state.write_access.len() < initial_len {
118 debug!(target: "ipfs_access_controller",
119 capability = %capability,
120 key_id = %key_id,
121 remaining_keys = state.write_access.len(),
122 "Permission revoked successfully"
123 );
124 } else {
125 debug!(target: "ipfs_access_controller",
126 capability = %capability,
127 key_id = %key_id,
128 "Permission not found for revocation"
129 );
130 }
131 Ok(())
132 }
133
134 #[instrument(skip(self), fields(address = %address))]
135 pub async fn load(&self, address: &str) -> Result<()> {
136 let state = self.state.read().await;
137 debug!(target: "ipfs_access_controller", address = %address, "Lendo permissões do controlador de acesso IPFS");
138 drop(state); let cid = Cid::try_from(address)?;
141 let ipfs = self.ipfs.clone();
142 let cid_string = cid.to_string();
143
144 let manifest_data = tokio::task::spawn_blocking(move || {
146 let rt = tokio::runtime::Handle::current();
148 rt.block_on(async move {
149 let mut manifest_reader = ipfs.cat(&cid_string).await?;
151 let mut manifest_data = Vec::new();
152 tokio::io::AsyncReadExt::read_to_end(&mut manifest_reader, &mut manifest_data)
153 .await
154 .map_err(|e| crate::error::GuardianError::Io(e.to_string()))?;
155
156 Ok::<Vec<u8>, crate::error::GuardianError>(manifest_data)
157 })
158 })
159 .await
160 .map_err(|e| GuardianError::Store(format!("Task join error: {}", e)))??;
161
162 let manifest: Manifest = serde_cbor::from_slice(&manifest_data)?;
163
164 let access_data_cid = manifest.params.address();
166 let ipfs_clone = self.ipfs.clone();
167 let access_data_cid_string = access_data_cid.to_string();
168
169 let access_data_bytes = tokio::task::spawn_blocking(move || {
171 let rt = tokio::runtime::Handle::current();
172 rt.block_on(async move {
173 let mut access_reader = ipfs_clone.cat(&access_data_cid_string).await?;
174 let mut access_data_bytes = Vec::new();
175 tokio::io::AsyncReadExt::read_to_end(&mut access_reader, &mut access_data_bytes)
176 .await
177 .map_err(|e| crate::error::GuardianError::Io(e.to_string()))?;
178
179 Ok::<Vec<u8>, crate::error::GuardianError>(access_data_bytes)
180 })
181 })
182 .await
183 .map_err(|e| GuardianError::Store(format!("Task join error: {}", e)))??;
184
185 let write_access_data: CborWriteAccess = serde_cbor::from_slice(&access_data_bytes)?;
186
187 let write_access: Vec<String> = serde_json::from_str(&write_access_data.write)?;
189
190 let mut state = self.state.write().await;
192 state.write_access = write_access;
193
194 Ok(())
195 }
196
197 #[instrument(skip(self))]
198 pub async fn save(&self) -> Result<CreateAccessControllerOptions> {
199 let state = self.state.read().await;
200 let write_access_json = serde_json::to_string(&state.write_access)?;
201 let cbor_data = CborWriteAccess {
202 write: write_access_json,
203 };
204 let cbor_bytes = serde_cbor::to_vec(&cbor_data)?;
206
207 let ipfs = self.ipfs.clone();
208 let response = tokio::task::spawn_blocking(move || {
210 let rt = tokio::runtime::Handle::current();
212 rt.block_on(async move {
213 ipfs.add_bytes(cbor_bytes).await
215 })
216 })
217 .await
218 .map_err(|e| GuardianError::Store(format!("Task join error: {}", e)))??;
219 let cid = Cid::try_from(response.hash.as_str())?;
220 debug!(target: "ipfs_access_controller", cid = %cid, "Controlador de acesso IPFS salvo");
221 Ok(CreateAccessControllerOptions::new(
223 cid,
224 false,
225 "ipfs".to_string(),
226 ))
227 }
228
229 #[instrument(skip(self))]
230 pub async fn close(&self) -> Result<()> {
231 debug!(target: "ipfs_access_controller", "Closing IPFS access controller");
234
235 let state = self.state.read().await;
236 debug!(target: "ipfs_access_controller",
237 write_access_count = state.write_access.len(),
238 "IPFS access controller closed successfully"
239 );
240
241 Ok(())
242 }
243
244 #[instrument(skip(ipfs_client, params), fields(identity_id = %identity_id))]
245 pub fn new(
246 ipfs_client: Arc<IpfsClient>,
247 identity_id: String,
248 mut params: CreateAccessControllerOptions,
249 ) -> Result<Self> {
250 if params.get_access("write").is_none() {
251 params.set_access("write".to_string(), vec![identity_id]);
252 }
253
254 let initial_state = ControllerState {
255 write_access: params.get_access("write").unwrap_or_default(),
256 };
257
258 Ok(Self {
259 ipfs: ipfs_client,
260 state: RwLock::new(initial_state),
261 span: tracing::info_span!("ipfs_access_controller", controller_type = "ipfs"),
262 })
263 }
264
265 pub fn span(&self) -> &Span {
267 &self.span
268 }
269}
270
271#[async_trait]
272impl crate::access_controller::traits::AccessController for IpfsAccessController {
273 fn get_type(&self) -> &str {
274 "ipfs"
275 }
276
277 async fn get_authorized_by_role(&self, role: &str) -> Result<Vec<String>> {
278 let state = self.state.read().await;
279
280 match role {
281 "write" => Ok(state.write_access.clone()),
282 "read" => Ok(state.write_access.clone()), "admin" => Ok(state.write_access.clone()), _ => Ok(Vec::new()),
285 }
286 }
287
288 async fn grant(&self, capability: &str, key_id: &str) -> Result<()> {
289 if capability != "write" {
290 return Err(GuardianError::Store(format!(
291 "IpfsAccessController only supports 'write' capability, got '{}'",
292 capability
293 )));
294 }
295
296 let mut state = self.state.write().await;
297 if !state.write_access.contains(&key_id.to_string()) {
298 state.write_access.push(key_id.to_string());
299 }
300 Ok(())
301 }
302
303 async fn revoke(&self, capability: &str, key_id: &str) -> Result<()> {
304 if capability != "write" {
305 return Err(GuardianError::Store(format!(
306 "IpfsAccessController only supports 'write' capability, got '{}'",
307 capability
308 )));
309 }
310
311 let mut state = self.state.write().await;
312 state.write_access.retain(|k| k != key_id);
313 Ok(())
314 }
315
316 async fn load(&self, address: &str) -> Result<()> {
317 self.load(address).await
318 }
319
320 async fn save(&self) -> Result<Box<dyn crate::access_controller::manifest::ManifestParams>> {
321 let options = self.save().await?;
322 Ok(Box::new(options))
323 }
324
325 async fn close(&self) -> Result<()> {
326 IpfsAccessController::close(self).await
327 }
328
329 async fn can_append(
330 &self,
331 entry: &dyn crate::ipfs_log::access_controller::LogEntry,
332 _identity_provider: &dyn crate::ipfs_log::identity_provider::IdentityProvider,
333 _additional_context: &dyn crate::ipfs_log::access_controller::CanAppendAdditionalContext,
334 ) -> Result<()> {
335 let state = self.state.read().await;
336 let entry_identity = entry.get_identity();
337 let entry_id = entry_identity.id();
338
339 if state.write_access.contains(&"*".to_string())
341 || state.write_access.contains(&entry_id.to_string())
342 {
343 Ok(())
344 } else {
345 Err(GuardianError::Store(format!(
346 "Access denied: identity {} not authorized for write operations",
347 entry_id
348 )))
349 }
350 }
351}