guardian_db/access_controller/
ipfs.rs

1use crate::access_controller::{
2    manifest::CreateAccessControllerOptions, manifest::Manifest, manifest::ManifestParams,
3};
4use crate::address::Address;
5use crate::error::{GuardianError, Result};
6use crate::ipfs_core_api::client::IpfsClient;
7use crate::ipfs_log::{access_controller::LogEntry, identity_provider::IdentityProvider};
8use async_trait::async_trait;
9use cid::Cid;
10use serde::{Deserialize, Serialize};
11use std::convert::TryFrom;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use tracing::{Span, debug, instrument, warn};
15
16#[derive(Debug, Serialize, Deserialize)]
17struct CborWriteAccess {
18    #[serde(rename = "write")]
19    write: String,
20}
21struct ControllerState {
22    write_access: Vec<String>,
23}
24
25/// Estrutura principal do controlador de acesso IPFS.
26pub struct IpfsAccessController {
27    ipfs: Arc<IpfsClient>,
28    state: RwLock<ControllerState>,
29    span: Span,
30}
31
32impl IpfsAccessController {
33    pub fn get_type(&self) -> &'static str {
34        "ipfs"
35    }
36
37    /// Este controlador não tem um endereço próprio, então retorna None.
38    pub fn address(&self) -> Option<Box<dyn Address>> {
39        None
40    }
41
42    #[instrument(skip(self, entry, identity_provider, _additional_context))]
43    pub async fn can_append(
44        &self,
45        entry: &dyn LogEntry,
46        identity_provider: &dyn IdentityProvider,
47        _additional_context: &dyn crate::ipfs_log::access_controller::CanAppendAdditionalContext,
48    ) -> Result<()> {
49        let state = self.state.read().await;
50        let key = entry.get_identity().id();
51
52        for allowed_key in state.write_access.iter() {
53            if allowed_key == key || allowed_key == "*" {
54                // Se a chave for autorizada, verifica a identidade
55                return identity_provider
56                    .verify_identity(entry.get_identity())
57                    .await;
58            }
59        }
60
61        Err(GuardianError::Store(
62            "Chave não tem permissão de escrita".to_string(),
63        ))
64    }
65
66    pub async fn get_authorized_by_role(&self, role: &str) -> Result<Vec<String>> {
67        let state = self.state.read().await;
68        // 'admin' e 'write' são a mesma coisa para este controlador.
69        if role == "admin" || role == "write" {
70            Ok(state.write_access.clone())
71        } else {
72            Ok(vec![])
73        }
74    }
75
76    #[instrument(skip(self))]
77    pub async fn grant(&self, capability: &str, key_id: &str) -> Result<()> {
78        if capability != "write" {
79            return Err(GuardianError::Store(format!(
80                "IpfsAccessController only supports 'write' capability, got '{}'",
81                capability
82            )));
83        }
84
85        let mut state = self.state.write().await;
86        if !state.write_access.contains(&key_id.to_string()) {
87            state.write_access.push(key_id.to_string());
88            debug!(target: "ipfs_access_controller",
89                capability = %capability,
90                key_id = %key_id,
91                total_keys = state.write_access.len(),
92                "Permission granted successfully"
93            );
94        } else {
95            debug!(target: "ipfs_access_controller",
96                capability = %capability,
97                key_id = %key_id,
98                "Permission already exists"
99            );
100        }
101        Ok(())
102    }
103
104    #[instrument(skip(self))]
105    pub async fn revoke(&self, capability: &str, key_id: &str) -> Result<()> {
106        if capability != "write" {
107            return Err(GuardianError::Store(format!(
108                "IpfsAccessController only supports 'write' capability, got '{}'",
109                capability
110            )));
111        }
112
113        let mut state = self.state.write().await;
114        let initial_len = state.write_access.len();
115        state.write_access.retain(|k| k != key_id);
116
117        if state.write_access.len() < initial_len {
118            debug!(target: "ipfs_access_controller",
119                capability = %capability,
120                key_id = %key_id,
121                remaining_keys = state.write_access.len(),
122                "Permission revoked successfully"
123            );
124        } else {
125            debug!(target: "ipfs_access_controller",
126                capability = %capability,
127                key_id = %key_id,
128                "Permission not found for revocation"
129            );
130        }
131        Ok(())
132    }
133
134    #[instrument(skip(self), fields(address = %address))]
135    pub async fn load(&self, address: &str) -> Result<()> {
136        let state = self.state.read().await;
137        debug!(target: "ipfs_access_controller", address = %address, "Lendo permissões do controlador de acesso IPFS");
138        drop(state); // Liberamos o lock de leitura antes das operações de escrita
139
140        let cid = Cid::try_from(address)?;
141        let ipfs = self.ipfs.clone();
142        let cid_string = cid.to_string();
143
144        // Spawn a blocking task to handle the non-Send IPFS operations
145        let manifest_data = tokio::task::spawn_blocking(move || {
146            // Use tokio runtime handle to run async code in blocking context
147            let rt = tokio::runtime::Handle::current();
148            rt.block_on(async move {
149                // 1. Lê o manifesto CBOR principal
150                let mut manifest_reader = ipfs.cat(&cid_string).await?;
151                let mut manifest_data = Vec::new();
152                tokio::io::AsyncReadExt::read_to_end(&mut manifest_reader, &mut manifest_data)
153                    .await
154                    .map_err(|e| crate::error::GuardianError::Io(e.to_string()))?;
155
156                Ok::<Vec<u8>, crate::error::GuardianError>(manifest_data)
157            })
158        })
159        .await
160        .map_err(|e| GuardianError::Store(format!("Task join error: {}", e)))??;
161
162        let manifest: Manifest = serde_cbor::from_slice(&manifest_data)?;
163
164        // 2. Lê o conteúdo das permissões usando o endereço do manifesto
165        let access_data_cid = manifest.params.address();
166        let ipfs_clone = self.ipfs.clone();
167        let access_data_cid_string = access_data_cid.to_string();
168
169        // Spawn another blocking task for the second IPFS operation
170        let access_data_bytes = tokio::task::spawn_blocking(move || {
171            let rt = tokio::runtime::Handle::current();
172            rt.block_on(async move {
173                let mut access_reader = ipfs_clone.cat(&access_data_cid_string).await?;
174                let mut access_data_bytes = Vec::new();
175                tokio::io::AsyncReadExt::read_to_end(&mut access_reader, &mut access_data_bytes)
176                    .await
177                    .map_err(|e| crate::error::GuardianError::Io(e.to_string()))?;
178
179                Ok::<Vec<u8>, crate::error::GuardianError>(access_data_bytes)
180            })
181        })
182        .await
183        .map_err(|e| GuardianError::Store(format!("Task join error: {}", e)))??;
184
185        let write_access_data: CborWriteAccess = serde_cbor::from_slice(&access_data_bytes)?;
186
187        // 3. O conteúdo é uma string JSON, que precisa ser deserializada também
188        let write_access: Vec<String> = serde_json::from_str(&write_access_data.write)?;
189
190        // 4. Atualiza o estado interno com as novas permissões
191        let mut state = self.state.write().await;
192        state.write_access = write_access;
193
194        Ok(())
195    }
196
197    #[instrument(skip(self))]
198    pub async fn save(&self) -> Result<CreateAccessControllerOptions> {
199        let state = self.state.read().await;
200        let write_access_json = serde_json::to_string(&state.write_access)?;
201        let cbor_data = CborWriteAccess {
202            write: write_access_json,
203        };
204        // Serializa a estrutura CBOR em bytes
205        let cbor_bytes = serde_cbor::to_vec(&cbor_data)?;
206
207        let ipfs = self.ipfs.clone();
208        // Spawn a blocking task to handle the non-Send IPFS operations
209        let response = tokio::task::spawn_blocking(move || {
210            // Use tokio runtime handle to run async code in blocking context
211            let rt = tokio::runtime::Handle::current();
212            rt.block_on(async move {
213                // Salva os bytes no IPFS usando helper method
214                ipfs.add_bytes(cbor_bytes).await
215            })
216        })
217        .await
218        .map_err(|e| GuardianError::Store(format!("Task join error: {}", e)))??;
219        let cid = Cid::try_from(response.hash.as_str())?;
220        debug!(target: "ipfs_access_controller", cid = %cid, "Controlador de acesso IPFS salvo");
221        // Cria e retorna os parâmetros do novo manifesto
222        Ok(CreateAccessControllerOptions::new(
223            cid,
224            false,
225            "ipfs".to_string(),
226        ))
227    }
228
229    #[instrument(skip(self))]
230    pub async fn close(&self) -> Result<()> {
231        // Para IpfsAccessController, close é uma operação no-op já que é baseado em IPFS
232        // O estado é mantido no IPFS e não há recursos locais para fechar
233        debug!(target: "ipfs_access_controller", "Closing IPFS access controller");
234
235        let state = self.state.read().await;
236        debug!(target: "ipfs_access_controller",
237            write_access_count = state.write_access.len(),
238            "IPFS access controller closed successfully"
239        );
240
241        Ok(())
242    }
243
244    #[instrument(skip(ipfs_client, params), fields(identity_id = %identity_id))]
245    pub fn new(
246        ipfs_client: Arc<IpfsClient>,
247        identity_id: String,
248        mut params: CreateAccessControllerOptions,
249    ) -> Result<Self> {
250        if params.get_access("write").is_none() {
251            params.set_access("write".to_string(), vec![identity_id]);
252        }
253
254        let initial_state = ControllerState {
255            write_access: params.get_access("write").unwrap_or_default(),
256        };
257
258        Ok(Self {
259            ipfs: ipfs_client,
260            state: RwLock::new(initial_state),
261            span: tracing::info_span!("ipfs_access_controller", controller_type = "ipfs"),
262        })
263    }
264
265    /// Retorna uma referência ao span para contexto de tracing
266    pub fn span(&self) -> &Span {
267        &self.span
268    }
269}
270
271#[async_trait]
272impl crate::access_controller::traits::AccessController for IpfsAccessController {
273    fn get_type(&self) -> &str {
274        "ipfs"
275    }
276
277    async fn get_authorized_by_role(&self, role: &str) -> Result<Vec<String>> {
278        let state = self.state.read().await;
279
280        match role {
281            "write" => Ok(state.write_access.clone()),
282            "read" => Ok(state.write_access.clone()), // Por padrão, quem pode escrever pode ler
283            "admin" => Ok(state.write_access.clone()), // Por padrão, usa mesmas permissões
284            _ => Ok(Vec::new()),
285        }
286    }
287
288    async fn grant(&self, capability: &str, key_id: &str) -> Result<()> {
289        if capability != "write" {
290            return Err(GuardianError::Store(format!(
291                "IpfsAccessController only supports 'write' capability, got '{}'",
292                capability
293            )));
294        }
295
296        let mut state = self.state.write().await;
297        if !state.write_access.contains(&key_id.to_string()) {
298            state.write_access.push(key_id.to_string());
299        }
300        Ok(())
301    }
302
303    async fn revoke(&self, capability: &str, key_id: &str) -> Result<()> {
304        if capability != "write" {
305            return Err(GuardianError::Store(format!(
306                "IpfsAccessController only supports 'write' capability, got '{}'",
307                capability
308            )));
309        }
310
311        let mut state = self.state.write().await;
312        state.write_access.retain(|k| k != key_id);
313        Ok(())
314    }
315
316    async fn load(&self, address: &str) -> Result<()> {
317        self.load(address).await
318    }
319
320    async fn save(&self) -> Result<Box<dyn crate::access_controller::manifest::ManifestParams>> {
321        let options = self.save().await?;
322        Ok(Box::new(options))
323    }
324
325    async fn close(&self) -> Result<()> {
326        IpfsAccessController::close(self).await
327    }
328
329    async fn can_append(
330        &self,
331        entry: &dyn crate::ipfs_log::access_controller::LogEntry,
332        _identity_provider: &dyn crate::ipfs_log::identity_provider::IdentityProvider,
333        _additional_context: &dyn crate::ipfs_log::access_controller::CanAppendAdditionalContext,
334    ) -> Result<()> {
335        let state = self.state.read().await;
336        let entry_identity = entry.get_identity();
337        let entry_id = entry_identity.id();
338
339        // Verifica se a identidade tem permissão de escrita
340        if state.write_access.contains(&"*".to_string())
341            || state.write_access.contains(&entry_id.to_string())
342        {
343            Ok(())
344        } else {
345            Err(GuardianError::Store(format!(
346                "Access denied: identity {} not authorized for write operations",
347                entry_id
348            )))
349        }
350    }
351}