guardian_db/
base_guardian.rs

1use crate::address::{Address, GuardianDBAddress};
2use crate::cache::level_down::LevelDownCache;
3use crate::db_manifest;
4use crate::error::{GuardianError, Result};
5use crate::ipfs_core_api::{backends::IrohBackend, client::IpfsClient, config::ClientConfig};
6use crate::ipfs_log::identity::{Identity, Signatures};
7pub use crate::ipfs_log::identity_provider::Keystore;
8use crate::keystore::SledKeystore;
9use crate::p2p::events::Emitter;
10pub use crate::p2p::events::EventBus;
11pub use crate::p2p::events::EventBus as EventBusImpl;
12use crate::traits::{
13    AccessControllerConstructor, BaseGuardianDB, CreateDBOptions, DetermineAddressOptions,
14    DirectChannel, DirectChannelFactory, DirectChannelOptions, EventPubSubPayload,
15    MessageExchangeHeads, MessageMarshaler, PubSubInterface, Store, StoreConstructor,
16    TracerWrapper,
17};
18use hex;
19use libp2p::PeerId;
20use opentelemetry::global::BoxedTracer;
21use parking_lot::RwLock;
22use rand::RngCore;
23use secp256k1;
24use std::collections::HashMap;
25use std::path::{Path, PathBuf};
26use std::pin::Pin;
27use std::sync::Arc;
28use tokio::task::JoinHandle;
29use tokio_util::sync::CancellationToken;
30use tracing::Span;
31
32// Type aliases para simplificar tipos complexos
33type CloseKeystoreFn = Arc<RwLock<Option<Box<dyn Fn() -> Result<()> + Send + Sync>>>>;
34// Type alias para Store com GuardianError
35type GuardianStore = dyn Store<Error = GuardianError> + Send + Sync;
36
37// Usamos `Option<T>` para campos que podem não ser fornecidos.
38#[derive(Default)]
39pub struct NewGuardianDBOptions {
40    pub id: Option<String>,
41    pub peer_id: Option<PeerId>,
42    pub directory: Option<PathBuf>,
43    pub keystore: Option<Box<dyn Keystore + Send + Sync>>,
44    pub cache: Option<Arc<LevelDownCache>>,
45    pub identity: Option<Identity>,
46    pub close_keystore: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
47    pub tracer: Option<Arc<BoxedTracer>>,
48    pub direct_channel_factory: Option<DirectChannelFactory>,
49    pub pubsub: Option<Box<dyn PubSubInterface<Error = GuardianError>>>,
50    pub message_marshaler: Option<Box<dyn MessageMarshaler<Error = GuardianError>>>,
51    pub event_bus: Option<Arc<EventBusImpl>>,
52}
53
54pub struct GuardianDB {
55    ipfs: IpfsClient,
56    identity: Arc<RwLock<Identity>>,
57    id: Arc<RwLock<PeerId>>,
58    keystore: Arc<RwLock<Option<Box<dyn Keystore + Send + Sync>>>>,
59    close_keystore: CloseKeystoreFn,
60    tracer: Arc<BoxedTracer>,
61    span: Span,
62    stores: Arc<RwLock<HashMap<String, Arc<GuardianStore>>>>,
63    #[allow(dead_code)]
64    direct_channel: Arc<dyn DirectChannel<Error = GuardianError> + Send + Sync>,
65    access_controller_types: Arc<RwLock<HashMap<String, AccessControllerConstructor>>>,
66    store_types: Arc<RwLock<HashMap<String, StoreConstructor>>>,
67    directory: PathBuf,
68    cache: Arc<RwLock<Arc<LevelDownCache>>>,
69    #[allow(dead_code)]
70    pubsub: Option<Box<dyn PubSubInterface<Error = GuardianError>>>,
71    event_bus: Arc<EventBusImpl>,
72    #[allow(dead_code)]
73    message_marshaler: Arc<dyn MessageMarshaler<Error = GuardianError> + Send + Sync>,
74    _monitor_handle: JoinHandle<()>, // Handle para a task em background, para que possa ser cancelada no Drop.
75    cancellation_token: CancellationToken,
76    emitters: Arc<Emitters>,
77}
78
79#[derive(Clone)]
80pub struct EventExchangeHeads {
81    pub peer: PeerId,
82    pub message: MessageExchangeHeads,
83}
84
85// GuardianDB-level events
86#[derive(Clone)]
87pub struct EventGuardianDBReady {
88    pub address: String,
89    pub db_type: String,
90}
91
92#[derive(Clone)]
93pub struct EventPeerConnected {
94    pub peer_id: String,
95    pub address: String,
96}
97
98#[derive(Clone)]
99pub struct EventPeerDisconnected {
100    pub peer_id: String,
101    pub address: String,
102}
103
104#[derive(Clone)]
105pub struct EventDatabaseCreated {
106    pub address: String,
107    pub name: String,
108    pub db_type: String,
109}
110
111#[derive(Clone)]
112pub struct EventDatabaseDropped {
113    pub address: String,
114    pub name: String,
115}
116
117// Store-specific events
118#[derive(Clone)]
119pub struct EventStoreUpdated {
120    pub store_address: String,
121    pub store_type: String,
122    pub entries_added: usize,
123    pub total_entries: usize,
124    pub timestamp: chrono::DateTime<chrono::Utc>,
125}
126
127#[derive(Clone)]
128pub struct EventSyncCompleted {
129    pub store_address: String,
130    pub peer_id: String,
131    pub heads_synced: usize,
132    pub duration_ms: u64,
133    pub success: bool,
134    pub timestamp: chrono::DateTime<chrono::Utc>,
135}
136
137#[derive(Clone)]
138pub struct EventNewEntries {
139    pub store_address: String,
140    pub entries: Vec<crate::ipfs_log::entry::Entry>,
141    pub total_entries: usize,
142    pub timestamp: chrono::DateTime<chrono::Utc>,
143}
144
145#[derive(Clone)]
146pub struct EventSyncError {
147    pub store_address: String,
148    pub peer_id: String,
149    pub error_message: String,
150    pub heads_count: usize,
151    pub error_type: SyncErrorType,
152    pub timestamp: chrono::DateTime<chrono::Utc>,
153}
154
155#[derive(Clone, Debug)]
156pub enum SyncErrorType {
157    PermissionDenied,
158    NetworkError,
159    ValidationError,
160    StoreError,
161    UnknownError,
162}
163
164#[derive(Clone)]
165pub struct EventPermissionDenied {
166    pub store_address: String,
167    pub identity_id: String,
168    pub identity_pubkey: String,
169    pub required_permission: String,
170    pub timestamp: chrono::DateTime<chrono::Utc>,
171}
172
173pub struct Emitters {
174    pub ready: Emitter<EventGuardianDBReady>,
175    pub peer_connected: Emitter<EventPeerConnected>,
176    pub peer_disconnected: Emitter<EventPeerDisconnected>,
177    pub new_heads: Emitter<EventExchangeHeads>,
178    pub database_created: Emitter<EventDatabaseCreated>,
179    pub database_dropped: Emitter<EventDatabaseDropped>,
180    // Store-specific events
181    pub store_updated: Emitter<EventStoreUpdated>,
182    pub sync_completed: Emitter<EventSyncCompleted>,
183    pub new_entries: Emitter<EventNewEntries>,
184    // Error events
185    pub sync_error: Emitter<EventSyncError>,
186    pub permission_denied: Emitter<EventPermissionDenied>,
187}
188
189impl Emitters {
190    /// Generate emitters from an EventBus instance
191    pub async fn generate_emitters(event_bus: &EventBusImpl) -> Result<Self> {
192        Ok(Self {
193            ready: event_bus.emitter().await?,
194            peer_connected: event_bus.emitter().await?,
195            peer_disconnected: event_bus.emitter().await?,
196            new_heads: event_bus.emitter().await?,
197            database_created: event_bus.emitter().await?,
198            database_dropped: event_bus.emitter().await?,
199            // Store-specific events
200            store_updated: event_bus.emitter().await?,
201            sync_completed: event_bus.emitter().await?,
202            new_entries: event_bus.emitter().await?,
203            // Error events
204            sync_error: event_bus.emitter().await?,
205            permission_denied: event_bus.emitter().await?,
206        })
207    }
208}
209
210impl EventExchangeHeads {
211    /// Cria uma nova instância de EventExchangeHeads.
212    pub fn new(p: PeerId, msg: MessageExchangeHeads) -> Self {
213        Self {
214            peer: p,
215            message: msg,
216        }
217    }
218}
219
220impl EventStoreUpdated {
221    pub fn new(
222        store_address: String,
223        store_type: String,
224        entries_added: usize,
225        total_entries: usize,
226    ) -> Self {
227        Self {
228            store_address,
229            store_type,
230            entries_added,
231            total_entries,
232            timestamp: chrono::Utc::now(),
233        }
234    }
235}
236
237impl EventSyncCompleted {
238    pub fn new(
239        store_address: String,
240        peer_id: String,
241        heads_synced: usize,
242        duration_ms: u64,
243        success: bool,
244    ) -> Self {
245        Self {
246            store_address,
247            peer_id,
248            heads_synced,
249            duration_ms,
250            success,
251            timestamp: chrono::Utc::now(),
252        }
253    }
254}
255
256impl EventNewEntries {
257    pub fn new(
258        store_address: String,
259        entries: Vec<crate::ipfs_log::entry::Entry>,
260        total_entries: usize,
261    ) -> Self {
262        Self {
263            store_address,
264            entries,
265            total_entries,
266            timestamp: chrono::Utc::now(),
267        }
268    }
269}
270
271impl EventSyncError {
272    pub fn new(
273        store_address: String,
274        peer_id: String,
275        error_message: String,
276        heads_count: usize,
277        error_type: SyncErrorType,
278    ) -> Self {
279        Self {
280            store_address,
281            peer_id,
282            error_message,
283            heads_count,
284            error_type,
285            timestamp: chrono::Utc::now(),
286        }
287    }
288}
289
290impl EventPermissionDenied {
291    pub fn new(
292        store_address: String,
293        identity_id: String,
294        identity_pubkey: String,
295        required_permission: String,
296    ) -> Self {
297        Self {
298            store_address,
299            identity_id,
300            identity_pubkey,
301            required_permission,
302            timestamp: chrono::Utc::now(),
303        }
304    }
305}
306
307impl GuardianDB {
308    /// Construtor de alto nível que configura o Keystore e a Identidade.
309    pub async fn new(
310        ipfs_config: Option<ClientConfig>,
311        options: Option<NewGuardianDBOptions>,
312    ) -> Result<Self> {
313        let mut options = options.unwrap_or_default();
314
315        // Usar configuração padrão do IPFS se não fornecida
316        let config = ipfs_config.unwrap_or_default();
317
318        // Extrair peer_id ou gerar aleatório
319        let peer_id = options.peer_id.unwrap_or_else(PeerId::random);
320
321        // Criar backend Iroh
322        let iroh_backend = Arc::new(IrohBackend::new(&config).await?);
323        let ipfs_client = IpfsClient::new_with_backend(iroh_backend).await?;
324
325        // Se o diretório não for fornecido, usa um padrão baseado no peer_id
326        let default_dir = PathBuf::from("./GuardianDB").join(peer_id.to_string());
327        let directory = options.directory.as_ref().unwrap_or(&default_dir);
328
329        // Configura o Keystore se nenhum for fornecido.
330        // Usa o banco de dados `sled` como substituto do `leveldb`.
331        if options.keystore.is_none() {
332            // Em `sled`, None para o path significa in-memory
333            let sled_path = if directory.to_string_lossy() == "./GuardianDB/in-memory" {
334                None
335            } else {
336                Some(directory.join(peer_id.to_string()).join("keystore"))
337            };
338
339            // Cria o keystore usando nossa implementação SledKeystore
340            let keystore = SledKeystore::new(sled_path)
341                .map_err(|e| GuardianError::Other(format!("Falha ao criar o keystore: {}", e)))?;
342
343            // Cria a closure de fechamento
344            let keystore_clone = keystore.clone();
345            options.close_keystore = Some(Box::new(move || {
346                tokio::task::block_in_place(|| {
347                    tokio::runtime::Handle::current()
348                        .block_on(async { keystore_clone.close().await })
349                })
350            }));
351
352            // Define o keystore nas opções
353            options.keystore = Some(Box::new(keystore));
354        }
355
356        // Configura a identidade se nenhuma for fornecida.
357        let identity = if let Some(identity) = options.identity {
358            identity
359        } else {
360            let id = options
361                .id
362                .as_deref()
363                .unwrap_or(&peer_id.to_string())
364                .to_string();
365            let _keystore = options.keystore.as_ref().ok_or_else(|| {
366                GuardianError::Other("Keystore é necessário para criar uma identidade".to_string())
367            })?;
368
369            // Criar uma identidade usando o keystore configurado
370            use crate::ipfs_log::identity::Identity;
371
372            // Gerar uma chave secreta para a identidade
373            let mut rng = rand::rng();
374            let mut secret_bytes = [0u8; 32];
375            rng.fill_bytes(&mut secret_bytes);
376
377            // Gerar chave pública a partir da privada (simplificado)
378            let pub_key_hex = hex::encode(secret_bytes);
379
380            // Criar assinaturas criptográficas para a identidade
381            let signatures = Signatures::new(
382                &format!("id_sig_{}", id),
383                &format!("pk_sig_{}", pub_key_hex),
384            );
385
386            Identity::new(&id, &pub_key_hex, signatures)
387        };
388
389        options.identity = Some(identity.clone());
390
391        // Chama o construtor principal com as opções totalmente configuradas.
392        Self::new_guardian_db(ipfs_client, identity, Some(options)).await
393    }
394
395    /// Construtor principal para uma instância de GuardianDB.
396    pub async fn new_guardian_db(
397        ipfs: IpfsClient,
398        identity: Identity,
399        options: Option<NewGuardianDBOptions>,
400    ) -> Result<Self> {
401        // Usa as opções fornecidas ou cria um valor padrão.
402        let options = options.unwrap_or_default();
403
404        // 1. Configuração de valores padrão para as opções
405        let tracer = options.tracer.unwrap_or_else(|| {
406            // Usar um tracer básico para telemetria
407            Arc::new(BoxedTracer::new(Box::new(
408                opentelemetry::trace::noop::NoopTracer::new(),
409            )))
410        });
411
412        // Cria span para esta instância do GuardianDB
413        let span = tracing::info_span!("guardian_db", peer_id = %identity.id());
414        // Initialize EventBus with proper configuration
415        let event_bus = Arc::new(EventBusImpl::new());
416
417        // Criar DirectChannelFactory
418        let direct_channel_factory = options.direct_channel_factory.unwrap_or_else(|| {
419            let factory_span = tracing::info_span!("direct_channel_factory");
420            let _enter = factory_span.enter();
421            // Usar span de tracing para compatibilidade
422            let temp_span = tracing::Span::none();
423            crate::p2p::pubsub::direct_channel::init_direct_channel_factory(
424                temp_span,
425                PeerId::random(),
426            )
427        });
428        let cancellation_token = CancellationToken::new();
429
430        // Criar emitters usando o EventBus
431        let emitters = Emitters::generate_emitters(&event_bus).await.map_err(|e| {
432            GuardianError::Other(format!("Falha ao gerar emitters do EventBus: {}", e))
433        })?;
434
435        // 2. Inicialização de componentes
436        // Criar canal direto usando nossa factory
437        let direct_channel = make_direct_channel(
438            &event_bus,
439            direct_channel_factory,
440            &DirectChannelOptions::default(),
441        )
442        .await?;
443
444        let message_marshaler_arc: Arc<dyn MessageMarshaler<Error = GuardianError> + Send + Sync> =
445            match options.message_marshaler {
446                Some(boxed_marshaler) => {
447                    // Usar unsafe apenas quando necessário, mas de forma controlada
448                    unsafe {
449                        Arc::from_raw(Box::into_raw(boxed_marshaler)
450                            as *const (dyn MessageMarshaler<Error = GuardianError> + Send + Sync))
451                    }
452                }
453                None => {
454                    // Criar diretamente como Arc para evitar conversão
455                    Arc::new(crate::message_marshaler::GuardianJSONMarshaler::new())
456                }
457            };
458        let cache = options.cache.unwrap_or_else(|| {
459            // Cria um cache com configuração adequada
460            Arc::new(crate::cache::level_down::LevelDownCache::new(None))
461        });
462        let directory = options
463            .directory
464            .unwrap_or_else(|| PathBuf::from("./GuardianDB/in-memory")); // Padrão para dados em memória
465
466        // 3. Instanciação da struct GuardianDB
467        let instance = GuardianDB {
468            ipfs,
469            identity: Arc::new(RwLock::new(identity.clone())),
470            id: Arc::new(RwLock::new(
471                PeerId::from_bytes(identity.pub_key.as_bytes())
472                    .unwrap_or_else(|_| PeerId::random()),
473            )), // Converte pub_key para PeerId
474            pubsub: options.pubsub,
475            cache: Arc::new(RwLock::new(cache)),
476            directory,
477            event_bus: event_bus.clone(),
478            stores: Arc::new(RwLock::new(HashMap::new())),
479            direct_channel,
480            close_keystore: Arc::new(RwLock::new(options.close_keystore)),
481            keystore: Arc::new(RwLock::new(options.keystore)),
482            store_types: Arc::new(RwLock::new(HashMap::new())),
483            access_controller_types: Arc::new(RwLock::new(HashMap::new())),
484            tracer,
485            message_marshaler: message_marshaler_arc,
486            cancellation_token: cancellation_token.clone(),
487            emitters: Arc::new(emitters),
488            // Inicia o monitor do canal direto usando a função helper
489            _monitor_handle: Self::start_monitor_task(
490                event_bus.clone(),
491                cancellation_token.clone(),
492                span.clone(),
493            ),
494            span,
495        };
496
497        // 4. Configuração pós-inicialização
498        // Registra os construtores padrão de stores
499        instance.register_default_store_types();
500
501        // Configura o emitter "newHeads" no event_bus
502        tracing::debug!("Configurando emitters do EventBus");
503
504        // Inicia o monitor do canal direto de forma independente
505        tracing::debug!("Iniciando monitor do canal direto");
506
507        // Emite evento de inicialização do GuardianDB
508        let ready_event = EventGuardianDBReady {
509            address: format!("/GuardianDB/{}", instance.peer_id()),
510            db_type: "GuardianDB".to_string(),
511        };
512
513        if let Err(e) = instance.emitters.ready.emit(ready_event) {
514            tracing::warn!("Falha ao emitir evento GuardianDB ready: {}", e);
515        } else {
516            tracing::debug!("Evento GuardianDB ready emitido com sucesso");
517        }
518
519        Ok(instance)
520    }
521
522    /// Retorna o tracer para telemetria e monitoramento.
523    pub fn tracer(&self) -> Arc<BoxedTracer> {
524        self.tracer.clone()
525    }
526
527    /// Retorna uma referência ao span de tracing para instrumentação
528    pub fn span(&self) -> &Span {
529        &self.span
530    }
531
532    /// Retorna o cliente da API do IPFS (Iroh nativo).
533    pub fn ipfs(&self) -> &IpfsClient {
534        &self.ipfs
535    }
536
537    /// Retorna a identidade da instância do GuardianDB.
538    /// A identidade é clonada para que o chamador possa usá-la sem manter o lock de leitura ativo.
539    pub fn identity(&self) -> Identity {
540        self.identity.read().clone()
541    }
542
543    /// Retorna o PeerId da instância do GuardianDB.
544    /// `PeerId` implementa o trait `Copy`, então o valor é copiado, o que é muito eficiente.
545    pub fn peer_id(&self) -> PeerId {
546        *self.id.read()
547    }
548
549    /// Retorna um clone do `Arc` para o Keystore, permitindo o acesso compartilhado.
550    /// O keystore é configurado durante a inicialização e pode ser usado para operações criptográficas.
551    pub fn keystore(&self) -> Arc<RwLock<Option<Box<dyn Keystore + Send + Sync>>>> {
552        self.keystore.clone()
553    }
554
555    /// Retorna a função de fechamento para o Keystore, se existir.
556    ///
557    /// Esta função retorna uma closure que pode ser chamada para fechar o keystore.
558    /// A closure captura uma referência clonada para o campo close_keystore interno.
559    ///
560    /// # Retorna
561    ///
562    /// - `Some(closure)` se uma função de fechamento foi configurada durante a inicialização
563    /// - `None` se nenhuma função de fechamento foi definida
564    ///
565    /// # Alternativa
566    ///
567    /// Para uma interface mais simples, use `close_key_store()` diretamente.
568    pub fn close_keystore(&self) -> Option<Box<dyn Fn() -> Result<()> + Send + Sync>> {
569        // Adquire lock de leitura para verificar se existe uma função de fechamento
570        let guard = self.close_keystore.read();
571        // Verifica se existe uma função de fechamento
572        if guard.is_some() {
573            // Se existe, clona o Arc interno para capturar na closure
574            let close_keystore_clone = self.close_keystore.clone();
575            // Retorna uma nova closure que executa a função de fechamento
576            // Dentro da closure retornada, verifica novamente se a função ainda existe
577            Some(Box::new(move || {
578                let guard = close_keystore_clone.read();
579                if let Some(close_fn) = guard.as_ref() {
580                    close_fn() // Executa a função se ainda existir
581                } else {
582                    Ok(()) // Função foi removida entre a verificação e a execução
583                }
584            }))
585        } else {
586            None // Entre a primeira e segunda verificação, outro thread pode ter removido a função
587        }
588    }
589
590    /// Adiciona ou atualiza uma store no mapa de stores gerenciadas.
591    /// Esta operação adquire um lock de escrita.
592    pub fn set_store(&self, address: String, store: Arc<GuardianStore>) {
593        self.stores.write().insert(address, store);
594    }
595
596    /// Remove uma store do mapa de stores gerenciadas.
597    /// Esta operação adquire um lock de escrita.
598    pub fn delete_store(&self, address: &str) {
599        self.stores.write().remove(address);
600    }
601
602    /// Busca uma store no mapa pelo seu endereço.
603    /// Retorna `Some(store)` se encontrada, ou `None` caso contrário.
604    pub fn get_store(&self, address: &str) -> Option<Arc<GuardianStore>> {
605        self.stores.read().get(address).cloned()
606    }
607
608    /// Itera sobre todas as stores gerenciadas e chama o método `close()` de cada uma.
609    /// Clona a lista de stores para evitar manter o lock durante a chamada a `close()`,
610    /// prevenindo possíveis deadlocks.
611    pub async fn close_all_stores(&self) {
612        let stores_to_close: Vec<Arc<GuardianStore>> =
613            self.stores.read().values().cloned().collect();
614
615        tracing::debug!(
616            store_count = stores_to_close.len(),
617            "Iniciando fechamento de stores"
618        );
619
620        for (index, store) in stores_to_close.iter().enumerate() {
621            tracing::debug!(
622                store_index = index + 1,
623                total_stores = stores_to_close.len(),
624                store_type = store.store_type(),
625                address = %store.address(),
626                "Fechando store"
627            );
628
629            match store.close().await {
630                Ok(()) => {
631                    tracing::debug!(
632                        store_type = store.store_type(),
633                        address = %store.address(),
634                        "Store fechada com sucesso"
635                    );
636                }
637                Err(e) => {
638                    tracing::error!(
639                        store_type = store.store_type(),
640                        address = %store.address(),
641                        error = %e,
642                        "Erro ao fechar store"
643                    );
644                    // Continua fechando outras stores mesmo se uma falhar
645                }
646            }
647        }
648
649        // Limpa o mapa de stores após fechar todas
650        self.stores.write().clear();
651        tracing::debug!(
652            stores_count = stores_to_close.len(),
653            "Todas as stores foram processadas e removidas do mapa"
654        );
655    }
656
657    /// Fecha o cache LevelDown, garantindo que todos os dados sejam persistidos
658    /// e liberando os recursos associados.
659    pub fn close_cache(&self) {
660        tracing::debug!("Iniciando fechamento do cache");
661
662        // Obtém lock de escrita no cache para realizar o fechamento
663        let cache_guard = self.cache.write();
664
665        // Fecha o cache usando o método direto da instância
666        match cache_guard.close_internal() {
667            Ok(()) => {
668                tracing::debug!("Cache fechado com sucesso");
669            }
670            Err(e) => {
671                tracing::error!(error = %e, "Erro ao fechar cache");
672            }
673        }
674
675        // O lock é automaticamente liberado quando cache_guard sai de escopo
676    }
677
678    /// Fecha o canal de comunicação direta e registra um erro se a operação falhar.
679    pub async fn close_direct_connections(&self) {
680        tracing::debug!("Iniciando fechamento do canal direto");
681
682        match self.direct_channel.close_shared().await {
683            Ok(()) => {
684                tracing::debug!("Canal direto fechado com sucesso");
685            }
686            Err(e) => {
687                tracing::error!(
688                    error = %e,
689                    "Erro ao fechar canal direto"
690                );
691            }
692        }
693    }
694
695    /// Executa a função de fechamento do keystore, se ela tiver sido definida.
696    /// Adquire um lock de escrita para garantir que a função não seja modificada enquanto é lida e executada.
697    pub fn close_key_store(&self) {
698        let guard = self.close_keystore.write();
699        if let Some(close_fn) = guard.as_ref()
700            && let Err(e) = close_fn()
701        {
702            tracing::error!(error = %e, "não foi possível fechar o keystore");
703        }
704    }
705
706    /// Busca um construtor de AccessController pelo seu tipo (nome).
707    /// Retorna `Some(constructor)` se encontrado, ou `None` caso contrário.
708    pub fn get_access_controller_type(
709        &self,
710        controller_type: &str,
711    ) -> Option<AccessControllerConstructor> {
712        tracing::debug!(
713            controller_type = controller_type,
714            "Buscando construtor de AccessController"
715        );
716
717        let access_controllers = self.access_controller_types.read();
718
719        match access_controllers.get(controller_type) {
720            Some(constructor) => {
721                tracing::debug!(
722                    controller_type = controller_type,
723                    "Construtor de AccessController encontrado"
724                );
725                Some(constructor.clone())
726            }
727            None => {
728                tracing::debug!(
729                    controller_type = controller_type,
730                    available_types = ?access_controllers.keys().collect::<Vec<_>>(),
731                    "Construtor de AccessController não encontrado"
732                );
733                None
734            }
735        }
736    }
737
738    /// Retorna uma lista com os nomes de todos os tipos de AccessController registrados.
739    /// Função auxiliar para debug e listagem de tipos disponíveis.
740    pub fn access_controller_types_names(&self) -> Vec<String> {
741        self.access_controller_types
742            .read()
743            .keys()
744            .cloned()
745            .collect()
746    }
747
748    /// Remove um construtor de AccessController do mapa pelo seu tipo.
749    /// Esta operação adquire um lock de escrita.
750    pub fn unregister_access_controller_type(&self, controller_type: &str) {
751        self.access_controller_types.write().remove(controller_type);
752    }
753
754    /// Registra um novo tipo de AccessController.
755    /// A função construtora é executada uma vez para determinar o nome do tipo.
756    ///
757    /// Executa o construtor para determinar o tipo dinâmico
758    /// Registra um novo tipo de AccessController com tipo explícito.
759    pub fn register_access_controller_type_with_name(
760        &self,
761        controller_type: &str,
762        constructor: AccessControllerConstructor,
763    ) -> Result<()> {
764        tracing::debug!(
765            controller_type = %controller_type,
766            "Registrando novo tipo de AccessController"
767        );
768
769        // Validações do tipo
770        if controller_type.is_empty() {
771            return Err(GuardianError::InvalidArgument(
772                "O tipo do controller não pode ser uma string vazia".to_string(),
773            ));
774        }
775
776        if controller_type.len() > 100 {
777            return Err(GuardianError::InvalidArgument(
778                "O tipo do controller é muito longo (máximo 100 caracteres)".to_string(),
779            ));
780        }
781
782        // Valida tipos conhecidos
783        let valid_types = ["simple", "guardian", "ipfs"];
784        if !valid_types.contains(&controller_type) {
785            tracing::warn!(
786                controller_type = %controller_type,
787                valid_types = ?valid_types,
788                "Tipo de AccessController não reconhecido - registrando mesmo assim"
789            );
790        }
791
792        // Verifica se o tipo já está registrado
793        {
794            let existing_types = self.access_controller_types.read();
795            if existing_types.contains_key(controller_type) {
796                tracing::warn!(
797                    controller_type = %controller_type,
798                    "AccessController já registrado - sobrescrevendo"
799                );
800            } else {
801                tracing::debug!(
802                    controller_type = %controller_type,
803                    "Novo tipo de AccessController sendo registrado"
804                );
805            }
806        }
807
808        // Registra o construtor no mapa
809        self.access_controller_types
810            .write()
811            .insert(controller_type.to_string(), constructor);
812
813        tracing::debug!(
814            controller_type = %controller_type,
815            "AccessController registrado com sucesso"
816        );
817
818        Ok(())
819    }
820
821    /// Método legado mantido por compatibilidade - usa tipo padrão "simple"
822    pub async fn register_access_controller_type(
823        &self,
824        constructor: AccessControllerConstructor,
825    ) -> Result<()> {
826        tracing::debug!("Usando registro legado com tipo padrão 'simple'");
827        self.register_access_controller_type_with_name("simple", constructor)
828    }
829
830    pub fn register_store_type(&self, store_type: String, constructor: StoreConstructor) {
831        self.store_types.write().insert(store_type, constructor);
832    }
833
834    /// Remove um construtor de Store do mapa pelo seu tipo.
835    pub fn unregister_store_type(&self, store_type: &str) {
836        self.store_types.write().remove(store_type);
837    }
838
839    /// Retorna uma lista com os nomes de todos os tipos de Store registrados.
840    pub fn store_types_names(&self) -> Vec<String> {
841        self.store_types.read().keys().cloned().collect()
842    }
843
844    /// Busca um construtor de Store pelo seu tipo (nome).
845    /// Retorna `Some(constructor)` se encontrado, ou `None` caso contrário.
846    pub fn get_store_constructor(&self, store_type: &str) -> Option<StoreConstructor> {
847        tracing::debug!(store_type = store_type, "Buscando construtor de Store");
848
849        let store_constructors = self.store_types.read();
850
851        match store_constructors.get(store_type) {
852            Some(constructor) => {
853                tracing::debug!(store_type = store_type, "Construtor de Store encontrado");
854                Some(constructor.clone())
855            }
856            None => {
857                tracing::debug!(
858                    store_type = store_type,
859                    available_types = ?store_constructors.keys().collect::<Vec<_>>(),
860                    "Construtor de Store não encontrado"
861                );
862                None
863            }
864        }
865    }
866
867    /// Encerra a instância do GuardianDB, fechando todas as stores, conexões e tarefas em background.
868    pub async fn close(&self) -> Result<()> {
869        let _entered = self.span.enter();
870        tracing::debug!("Iniciando fechamento do GuardianDB");
871
872        // Close all stores first (async operation) - com tratamento de erro
873        tracing::debug!("Fechando todas as stores");
874        self.close_all_stores().await;
875
876        // Close direct connections (async operation) - com tratamento de erro
877        tracing::debug!("Fechando conexões diretas");
878        self.close_direct_connections().await;
879
880        // Close cache (synchronous operation)
881        tracing::debug!("Fechando cache");
882        self.close_cache();
883
884        // Close keystore (synchronous operation) - com tratamento de erro
885        tracing::debug!("Fechando keystore");
886        self.close_key_store();
887
888        // Fechar emitters usando o EventBus
889        // Note: Nossos emitters não precisam de close explícito pois usam Tokio broadcast channels
890        // que são automaticamente limpos quando o EventBus é dropado
891        tracing::debug!("Emitters serão fechados automaticamente com o EventBus");
892
893        // Sinaliza para todas as tarefas em background (como `monitor_direct_channel`) para encerrarem.
894        tracing::debug!("Cancelando tarefas em background");
895        self.cancellation_token.cancel();
896
897        // Abortar explicitamente a task do monitor para evitar quedas na finalização
898        tracing::debug!("Abortando task do monitor do canal direto");
899        self._monitor_handle.abort();
900
901        // Pequeno atraso para permitir que o abort propague
902        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
903
904        tracing::debug!("GuardianDB fechado com sucesso");
905        Ok(())
906    }
907
908    /// Cria um novo banco de dados (store), determina seu endereço, salva localmente e o abre.
909    pub async fn create(
910        &self,
911        name: &str,
912        store_type: &str,
913        options: Option<CreateDBOptions>,
914    ) -> Result<Arc<GuardianStore>> {
915        let _entered = self.span.enter();
916        tracing::debug!("Create()");
917        let options = options.unwrap_or_default();
918
919        // O diretório pode ser passado como uma opção, caso contrário, usa o padrão da instância.
920        let directory = options
921            .directory
922            .clone()
923            .unwrap_or_else(|| self.directory.to_string_lossy().to_string());
924        let mut options = options;
925        options.directory = Some(directory.clone());
926
927        tracing::debug!(
928            name = name,
929            store_type = store_type,
930            directory = %directory,
931            "Criando banco de dados"
932        );
933
934        // Cria o endereço do banco de dados.
935        let determine_opts = crate::traits::DetermineAddressOptions {
936            only_hash: None,
937            replicate: None,
938            access_controller:
939                crate::access_controller::manifest::CreateAccessControllerOptions::new_empty(),
940        };
941        let db_address = self
942            .determine_address(name, store_type, Some(determine_opts))
943            .await?;
944
945        // Carrega o cache salvo localmente.
946        let directory_path = PathBuf::from(&directory);
947        self.load_cache(directory_path.as_path(), &db_address)
948            .await?;
949
950        // Verifica se o banco de dados já existe localmente.
951        let have_db = self.have_local_data(&db_address).await;
952
953        if have_db && !options.overwrite.unwrap_or(false) {
954            return Err(GuardianError::DatabaseAlreadyExists(db_address.to_string()));
955        }
956
957        // Salva o manifesto do banco de dados localmente.
958        self.add_manifest_to_cache(&directory_path, &db_address)
959            .await
960            .map_err(|e| {
961                GuardianError::Other(format!(
962                    "não foi possível adicionar o manifesto ao cache: {}",
963                    e
964                ))
965            })?;
966
967        tracing::debug!(
968            address = %db_address,
969            "Banco de dados criado"
970        );
971
972        // Abre o banco de dados.
973        self.open(&db_address.to_string(), options).await
974    }
975
976    /// Abre um banco de dados a partir de um endereço GuardianDB.
977    pub async fn open(
978        &self,
979        db_address: &str,
980        options: CreateDBOptions,
981    ) -> Result<Arc<GuardianStore>> {
982        let _entered = self.span.enter();
983        tracing::debug!(address = db_address, "abrindo store GuardianDB");
984        let mut options = options;
985
986        let directory = options
987            .directory
988            .clone()
989            .unwrap_or_else(|| self.directory.to_string_lossy().to_string());
990
991        // Valida o endereço. Se for inválido, tenta criar um novo banco de dados se a opção `create` for verdadeira.
992        if crate::address::is_valid(db_address).is_err() {
993            tracing::warn!(address = db_address, "open: Endereço GuardianDB inválido");
994            if !options.create.unwrap_or(false) {
995                return Err(GuardianError::InvalidArgument("'options.create' definido como 'false'. Se você quer criar um banco de dados, defina como 'true'".to_string()));
996            }
997            let store_type = options.store_type.as_deref().unwrap_or("");
998            if store_type.is_empty() {
999                let available_types = self.store_types_names();
1000                let types_list = if available_types.is_empty() {
1001                    "Nenhum tipo de store registrado".to_string()
1002                } else {
1003                    format!("Tipos disponíveis: {}", available_types.join(", "))
1004                };
1005                return Err(GuardianError::InvalidArgument(format!(
1006                    "Tipo de banco de dados não fornecido! Forneça um tipo com 'options.store_type'. {}",
1007                    types_list
1008                )));
1009            }
1010
1011            options.overwrite = Some(true);
1012            // Para evitar o borrow check, criamos novas options
1013            let new_options = CreateDBOptions {
1014                overwrite: Some(true),
1015                create: Some(true),
1016                store_type: Some(store_type.to_string()),
1017                ..Default::default()
1018            };
1019
1020            // Use Box::pin to break recursion
1021            return Box::pin(self.create(db_address, store_type, Some(new_options))).await;
1022        }
1023
1024        let parsed_address = crate::address::parse(db_address)
1025            .map_err(|e| GuardianError::Other(format!("Erro ao fazer parse do endereço: {}", e)))?;
1026
1027        let directory_path = PathBuf::from(&directory);
1028        self.load_cache(directory_path.as_path(), &parsed_address)
1029            .await?;
1030
1031        if options.local_only.unwrap_or(false) && !self.have_local_data(&parsed_address).await {
1032            return Err(GuardianError::NotFound(format!(
1033                "O banco de dados não existe localmente: {}",
1034                db_address
1035            )));
1036        }
1037
1038        // Lê o manifesto do IPFS para determinar o tipo do banco de dados
1039        let manifest_type = if self.have_local_data(&parsed_address).await {
1040            // Se temos dados locais, primeiro tenta ler do cache local
1041            tracing::debug!("Dados encontrados localmente, tentando ler do cache antes do IPFS");
1042
1043            // Leitura do cache local
1044            let _cache_key = format!("{}/_manifest", parsed_address);
1045
1046            // Tenta primeiro o cache, depois fallback para IPFS
1047            let cache_result = {
1048                let cache = self.cache.read();
1049                let directory_str = directory_path.to_string_lossy();
1050
1051                // Tenta carregar os dados do cache usando métodos internos
1052                match cache.load_internal(&directory_str, &parsed_address as &dyn Address) {
1053                    Ok(wrapped_cache) => {
1054                        // Cache carregado com sucesso, agora verifica se o manifesto existe
1055                        let manifest_key = format!("{}/_manifest", parsed_address);
1056
1057                        tracing::debug!(
1058                            key = %manifest_key,
1059                            cache_loaded = true,
1060                            "Verificando manifesto no cache"
1061                        );
1062
1063                        // Prepara contexto e chave para o cache
1064                        let mut ctx: Box<dyn std::any::Any> = Box::new(());
1065                        let key = crate::data_store::Key::new(&manifest_key);
1066
1067                        // Tenta obter o manifesto do cache
1068                        match wrapped_cache.get(ctx.as_mut(), &key) {
1069                            Ok(manifest_data) => {
1070                                tracing::debug!(
1071                                    key = %manifest_key,
1072                                    data_size = manifest_data.len(),
1073                                    "Manifesto encontrado no cache"
1074                                );
1075
1076                                // Valida se os dados são um tipo de store válido
1077                                let manifest_type =
1078                                    String::from_utf8_lossy(&manifest_data).to_string();
1079
1080                                // Verifica se o tipo está registrado
1081                                if self.get_store_constructor(&manifest_type).is_some() {
1082                                    tracing::debug!(
1083                                        manifest_type = %manifest_type,
1084                                        "Manifesto válido encontrado no cache"
1085                                    );
1086                                    Some(manifest_data)
1087                                } else {
1088                                    tracing::warn!(
1089                                        manifest_type = %manifest_type,
1090                                        available_types = ?self.store_types_names(),
1091                                        "Tipo de manifesto no cache não está registrado"
1092                                    );
1093                                    None
1094                                }
1095                            }
1096                            Err(e) => {
1097                                tracing::debug!(
1098                                    key = %manifest_key,
1099                                    error = %e,
1100                                    "Manifesto não encontrado no cache"
1101                                );
1102                                None
1103                            }
1104                        }
1105                    }
1106                    Err(e) => {
1107                        tracing::debug!(
1108                            error = %e,
1109                            "Falha ao carregar cache, usando IPFS"
1110                        );
1111                        None
1112                    }
1113                }
1114            };
1115
1116            match cache_result {
1117                Some(cached_data) => {
1118                    tracing::debug!("Manifesto encontrado no cache local");
1119                    // Parse do tipo do manifesto a partir dos dados do cache
1120                    String::from_utf8_lossy(&cached_data).to_string()
1121                }
1122                None => {
1123                    tracing::debug!("Cache miss, lendo manifesto do IPFS");
1124                    let manifest =
1125                        db_manifest::read_db_manifest(self.ipfs(), &parsed_address.get_root())
1126                            .await
1127                            .map_err(|e| {
1128                                GuardianError::Other(format!(
1129                                    "Não foi possível ler o manifesto do IPFS: {}",
1130                                    e
1131                                ))
1132                            })?;
1133                    manifest.get_type
1134                }
1135            }
1136        } else {
1137            // Se não temos dados locais, lê diretamente do IPFS
1138            tracing::debug!("Dados não encontrados localmente, lendo manifesto do IPFS");
1139            let manifest = db_manifest::read_db_manifest(self.ipfs(), &parsed_address.get_root())
1140                .await
1141                .map_err(|e| {
1142                    GuardianError::Other(format!("Não foi possível ler o manifesto do IPFS: {}", e))
1143                })?;
1144            manifest.get_type
1145        };
1146
1147        tracing::debug!(manifest_type = %manifest_type, "Tipo do banco de dados detectado");
1148        tracing::debug!("Criando instância da store");
1149
1150        self.create_store(&manifest_type, &parsed_address, options)
1151            .await
1152    }
1153
1154    /// Determina o endereço de um banco de dados criando seu manifesto e salvando no IPFS.
1155    pub async fn determine_address(
1156        &self,
1157        name: &str,
1158        store_type: &str,
1159        options: Option<DetermineAddressOptions>,
1160    ) -> Result<GuardianDBAddress> {
1161        let _options = options.unwrap_or_default();
1162
1163        // Valida se o tipo de store está registrado
1164        if self.get_store_constructor(store_type).is_none() {
1165            let available_types = self.store_types_names();
1166            return Err(GuardianError::InvalidArgument(format!(
1167                "Tipo de banco de dados inválido: {}. Tipos disponíveis: {:?}",
1168                store_type, available_types
1169            )));
1170        }
1171
1172        if crate::address::is_valid(name).is_ok() {
1173            return Err(GuardianError::InvalidArgument(
1174                "O nome do banco de dados fornecido já é um endereço válido".to_string(),
1175            ));
1176        }
1177
1178        // Cria opções para o access controller com configurações adequadas
1179        let _ac_params =
1180            crate::access_controller::manifest::CreateAccessControllerOptions::new_empty();
1181
1182        // Criação do Access Controller
1183        // Gera um endereço baseado no hash do manifesto e identidade do usuário
1184        let identity_hash = hex::encode(self.identity().pub_key.as_bytes());
1185        let ac_address_string = format!("/ipfs/{}/access_controller/{}", name, &identity_hash[..8]);
1186
1187        tracing::debug!(
1188            address = %ac_address_string,
1189            identity = %&identity_hash[..16],
1190            "Access Controller criado"
1191        );
1192
1193        // Cria o manifesto do banco de dados no IPFS
1194        let manifest_hash =
1195            db_manifest::create_db_manifest(self.ipfs(), name, store_type, &ac_address_string)
1196                .await
1197                .map_err(|e| {
1198                    GuardianError::Other(format!(
1199                        "não foi possível salvar o manifesto no ipfs: {}",
1200                        e
1201                    ))
1202                })?;
1203
1204        // Constrói e retorna o endereço final do GuardianDB
1205        let addr_string = format!("/GuardianDB/{}/{}", manifest_hash, name);
1206        crate::address::parse(&addr_string)
1207            .map_err(|e| GuardianError::Other(format!("Erro ao fazer parse do endereço: {}", e)))
1208    }
1209
1210    /// Carrega o cache para um determinado endereço de banco de dados.
1211    pub async fn load_cache(&self, directory: &Path, db_address: &GuardianDBAddress) -> Result<()> {
1212        // Carrega o cache usando o LevelDownCache
1213        let cache = self.cache.read();
1214        let directory_str = directory.to_string_lossy();
1215
1216        tracing::debug!(
1217            address = %db_address,
1218            directory = %directory_str,
1219            "Carregando cache para endereço"
1220        );
1221
1222        // Carrega o cache específico para este endereço
1223        let _loaded_cache = cache
1224            .load_internal(&directory_str, db_address)
1225            .map_err(|e| GuardianError::Other(format!("Falha ao carregar cache: {}", e)))?;
1226
1227        tracing::debug!(address = %db_address, "Cache carregado com sucesso");
1228        Ok(())
1229    }
1230
1231    /// Verifica se o manifesto de um banco de dados existe no cache local.
1232    pub async fn have_local_data(&self, db_address: &GuardianDBAddress) -> bool {
1233        let _cache_key = format!("{}/_manifest", db_address);
1234
1235        // Verificar se os dados existem no cache
1236        let cache = self.cache.read();
1237        let directory_str = "./GuardianDB"; // Diretório padrão
1238
1239        // Tenta carregar o cache e verificar se o manifesto existe
1240        match cache.load_internal(directory_str, db_address) {
1241            Ok(wrapped_cache) => {
1242                // Verifica se a chave do manifesto existe no cache
1243                let manifest_key = format!("{}/_manifest", db_address);
1244
1245                // Prepara contexto e chave para verificar existência
1246                let mut ctx: Box<dyn std::any::Any> = Box::new(());
1247                let key = crate::data_store::Key::new(&manifest_key);
1248
1249                // Tenta obter o manifesto do cache para verificar se existe
1250                match wrapped_cache.get(ctx.as_mut(), &key) {
1251                    Ok(manifest_data) => {
1252                        // Manifesto encontrado, verifica se os dados são válidos
1253                        if !manifest_data.is_empty() {
1254                            tracing::debug!(
1255                                address = %db_address,
1256                                manifest_size = manifest_data.len(),
1257                                "Dados locais encontrados no cache"
1258                            );
1259                            true
1260                        } else {
1261                            tracing::debug!(
1262                                address = %db_address,
1263                                "Manifesto vazio encontrado no cache"
1264                            );
1265                            false
1266                        }
1267                    }
1268                    Err(e) => {
1269                        tracing::debug!(
1270                            address = %db_address,
1271                            error = %e,
1272                            "Manifesto não encontrado no cache"
1273                        );
1274                        false
1275                    }
1276                }
1277            }
1278            Err(e) => {
1279                tracing::debug!(
1280                    address = %db_address,
1281                    error = %e,
1282                    "Falha ao carregar cache para verificação de dados locais"
1283                );
1284                false
1285            }
1286        }
1287    }
1288
1289    /// Adiciona o hash do manifesto de um banco de dados ao cache local.
1290    pub async fn add_manifest_to_cache(
1291        &self,
1292        directory: &Path,
1293        db_address: &GuardianDBAddress,
1294    ) -> Result<()> {
1295        let cache_key = format!("{}/_manifest", db_address);
1296        let root_hash_bytes = db_address.get_root().to_string().into_bytes();
1297
1298        // Armazenar o manifesto no cache
1299        let wrapped_cache = {
1300            let cache = self.cache.read();
1301            let directory_str = directory.to_string_lossy();
1302
1303            // Carrega ou cria o datastore para este endereço
1304            cache
1305                .load_internal(&directory_str, db_address)
1306                .map_err(|e| GuardianError::Other(format!("Falha ao carregar cache: {}", e)))?
1307        };
1308
1309        // Armazena o hash do manifesto no cache de forma concreta
1310        let key = crate::data_store::Key::new(&cache_key);
1311
1312        // Armazena o tipo de manifesto (não apenas o hash) para facilitar a verificação
1313        // Busca o tipo do manifesto se ele estiver disponível
1314        let manifest_data = if let Ok(manifest) =
1315            db_manifest::read_db_manifest(self.ipfs(), &db_address.get_root()).await
1316        {
1317            // Se conseguimos ler o manifesto do IPFS, armazenamos o tipo
1318            manifest.get_type.into_bytes()
1319        } else {
1320            // Fallback: armazena apenas o hash da raiz como indicador de existência
1321            root_hash_bytes
1322        };
1323
1324        // Cria context depois do await para evitar problemas de Send
1325        let mut ctx: Box<dyn std::any::Any + Send + Sync> = Box::new(());
1326
1327        match wrapped_cache.put(ctx.as_mut(), &key, &manifest_data) {
1328            Ok(()) => {
1329                tracing::debug!(
1330                    cache_key = %cache_key,
1331                    data_size = manifest_data.len(),
1332                    address = %db_address,
1333                    "Manifesto armazenado no cache com sucesso"
1334                );
1335            }
1336            Err(e) => {
1337                tracing::warn!(
1338                    cache_key = %cache_key,
1339                    error = %e,
1340                    address = %db_address,
1341                    "Falha ao armazenar manifesto no cache"
1342                );
1343                // Não retorna erro pois é uma otimização, não operação crítica
1344            }
1345        }
1346
1347        tracing::debug!(
1348            address = %db_address,
1349            directory = %directory.to_string_lossy(),
1350            cache_key = %cache_key,
1351            "Manifesto adicionado ao cache"
1352        );
1353
1354        Ok(())
1355    }
1356
1357    /// Lida com a lógica complexa de instanciar uma nova Store, incluindo a resolução
1358    /// do Access Controller, carregamento de cache e configuração de todas as opções.
1359    pub async fn create_store(
1360        &self,
1361        store_type: &str,
1362        address: &GuardianDBAddress,
1363        options: CreateDBOptions,
1364    ) -> Result<Arc<GuardianStore>> {
1365        tracing::debug!(
1366            store_type = store_type,
1367            address = %address,
1368            "Criando store"
1369        );
1370
1371        // 1. Busca o construtor registrado para o tipo de store
1372        let constructor = self.get_store_constructor(store_type).ok_or_else(|| {
1373            let available_types = self.store_types_names();
1374            GuardianError::InvalidArgument(format!(
1375                "Tipo de store '{}' não registrado. Tipos disponíveis: {:?}",
1376                store_type, available_types
1377            ))
1378        })?;
1379
1380        // 2. Converte CreateDBOptions para NewStoreOptions
1381        let new_store_options = self.convert_create_to_store_options(options).await?;
1382
1383        // 3. Prepara argumentos para o construtor
1384        let ipfs_client = Arc::new(self.ipfs().clone());
1385        let identity = Arc::new(self.identity());
1386        let store_address = Box::new(address.clone()) as Box<dyn Address>;
1387
1388        tracing::debug!(
1389            store_type = store_type,
1390            address = %address,
1391            "Executando construtor da store"
1392        );
1393
1394        // 4. Executa o construtor
1395        let store_result =
1396            constructor(ipfs_client, identity, store_address, new_store_options).await;
1397
1398        let store = match store_result {
1399            Ok(store) => store,
1400            Err(e) => {
1401                tracing::error!(
1402                    store_type = store_type,
1403                    address = %address,
1404                    error = %e,
1405                    "Falha ao criar store"
1406                );
1407                return Err(e);
1408            }
1409        };
1410
1411        // 5. Converte para Arc<GuardianStore>
1412        let boxed_store = store as Box<dyn Store<Error = GuardianError> + Send + Sync>;
1413        let arc_store: Arc<GuardianStore> = Arc::from(boxed_store);
1414
1415        // 6. Registra a store no mapa gerenciado
1416        self.set_store(address.to_string(), arc_store.clone());
1417
1418        tracing::debug!(
1419            store_type = store_type,
1420            address = %address,
1421            store_type_confirmed = arc_store.store_type(),
1422            "Store criada e registrada com sucesso"
1423        );
1424
1425        Ok(arc_store)
1426    }
1427
1428    /// Converte CreateDBOptions para NewStoreOptions necessário pelos construtores
1429    async fn convert_create_to_store_options(
1430        &self,
1431        options: CreateDBOptions,
1432    ) -> Result<crate::traits::NewStoreOptions> {
1433        use crate::traits::NewStoreOptions;
1434
1435        tracing::debug!("Convertendo opções para criação de store");
1436
1437        // Converte access_controller de ManifestParams para AccessController
1438        let access_controller = if let Some(manifest_params) = options.access_controller {
1439            tracing::debug!("Convertendo ManifestParams para AccessController");
1440
1441            // Extrai informações do ManifestParams
1442            let controller_type = manifest_params.get_type();
1443
1444            tracing::debug!(
1445                controller_type = %controller_type,
1446                "Criando access controller a partir do manifesto"
1447            );
1448
1449            // Extrai as permissões do ManifestParams
1450            let permissions = manifest_params.get_all_access();
1451
1452            // Cria AccessController baseado no tipo
1453            match controller_type {
1454                "simple" | "" => {
1455                    tracing::debug!("Criando SimpleAccessController");
1456
1457                    let simple_controller =
1458                        crate::access_controller::simple::SimpleAccessController::new(
1459                            if permissions.is_empty() {
1460                                None
1461                            } else {
1462                                Some(permissions)
1463                            },
1464                        );
1465                    Some(Arc::new(simple_controller)
1466                        as Arc<
1467                            dyn crate::access_controller::traits::AccessController,
1468                        >)
1469                }
1470                "guardian" => {
1471                    tracing::debug!("Criando GuardianAccessController");
1472
1473                    // Para GuardianAccessController, usa configuração básica
1474                    let simple_controller =
1475                        crate::access_controller::simple::SimpleAccessController::new(
1476                            if permissions.is_empty() {
1477                                None
1478                            } else {
1479                                Some(permissions)
1480                            },
1481                        );
1482                    Some(Arc::new(simple_controller)
1483                        as Arc<
1484                            dyn crate::access_controller::traits::AccessController,
1485                        >)
1486                }
1487                "ipfs" => {
1488                    tracing::debug!(
1489                        "IPFS AccessController não implementado, usando SimpleAccessController"
1490                    );
1491
1492                    let simple_controller =
1493                        crate::access_controller::simple::SimpleAccessController::new(
1494                            if permissions.is_empty() {
1495                                None
1496                            } else {
1497                                Some(permissions)
1498                            },
1499                        );
1500                    Some(Arc::new(simple_controller)
1501                        as Arc<
1502                            dyn crate::access_controller::traits::AccessController,
1503                        >)
1504                }
1505                _ => {
1506                    tracing::warn!(
1507                        controller_type = %controller_type,
1508                        "Tipo de access controller não reconhecido, usando SimpleAccessController"
1509                    );
1510
1511                    let simple_controller =
1512                        crate::access_controller::simple::SimpleAccessController::new(
1513                            if permissions.is_empty() {
1514                                None
1515                            } else {
1516                                Some(permissions)
1517                            },
1518                        );
1519                    Some(Arc::new(simple_controller)
1520                        as Arc<
1521                            dyn crate::access_controller::traits::AccessController,
1522                        >)
1523                }
1524            }
1525        } else {
1526            tracing::debug!("Nenhum access controller especificado, usando padrão");
1527            None
1528        };
1529
1530        // Converte as opções básicas mantendo compatibilidade
1531        let store_options = NewStoreOptions {
1532            event_bus: None,   // Será configurado pela BaseStore
1533            index: None,       // Será configurado pelo construtor específico
1534            access_controller, // AccessController convertido do ManifestParams
1535            cache: None,       // Usa cache padrão
1536            cache_destroy: None,
1537            replication_concurrency: None,
1538            reference_count: None,
1539            replicate: Some(true), // Por padrão, habilita replicação
1540            max_history: None,
1541            directory: options
1542                .directory
1543                .unwrap_or_else(|| "./GuardianDB".to_string()),
1544            sort_fn: None,
1545            span: None,                        // Será configurado pela BaseStore
1546            tracer: None,                      // Será configurado pela BaseStore
1547            pubsub: None,                      // Será configurado pela BaseStore
1548            message_marshaler: None,           // Será configurado pela BaseStore
1549            peer_id: libp2p::PeerId::random(), // Temporário, será sobrescrito
1550            direct_channel: None,              // Será configurado pela BaseStore
1551            close_func: None,
1552            store_specific_opts: None,
1553        };
1554
1555        tracing::debug!("Opções convertidas com sucesso");
1556        Ok(store_options)
1557    }
1558
1559    /// Registra os construtores padrão de access controllers disponíveis
1560    pub async fn register_default_access_controller_types(&self) -> Result<()> {
1561        tracing::debug!("Registrando construtores padrão de access controllers");
1562
1563        // Registra SimpleAccessController
1564        let simple_constructor =
1565            Arc::new(
1566                |_base_guardian: Arc<
1567                    dyn crate::traits::BaseGuardianDB<Error = crate::error::GuardianError>,
1568                >,
1569                 options: &crate::access_controller::manifest::CreateAccessControllerOptions,
1570                 _access_controller_options: Option<
1571                    Vec<crate::access_controller::traits::Option>,
1572                >| {
1573                    let options = options.clone(); // Clone to move into the async block
1574                    Box::pin(async move {
1575                use crate::access_controller::simple::SimpleAccessController;
1576                let access_controller = SimpleAccessController::from_options(options)
1577                    .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1578                Ok(Arc::new(access_controller) as Arc<dyn crate::access_controller::traits::AccessController>)
1579            }) as Pin<Box<dyn std::future::Future<Output = crate::error::Result<Arc<dyn crate::access_controller::traits::AccessController>>> + Send>>
1580                },
1581            );
1582
1583        // Efetua o registro usando o novo método com tipo explícito
1584        self.register_access_controller_type_with_name("simple", simple_constructor)?;
1585
1586        tracing::debug!(
1587            types = ?self.access_controller_types_names(),
1588            "Construtores padrão de access controllers registrados"
1589        );
1590
1591        Ok(())
1592    }
1593
1594    /// Registra os construtores padrão de stores disponíveis
1595    pub fn register_default_store_types(&self) {
1596        tracing::debug!("Registrando construtores padrão de stores");
1597
1598        // Registra EventLogStore
1599        let eventlog_constructor = Arc::new(
1600            |ipfs: Arc<crate::ipfs_core_api::client::IpfsClient>,
1601             identity: Arc<crate::ipfs_log::identity::Identity>,
1602             address: Box<dyn crate::address::Address>,
1603             options: crate::traits::NewStoreOptions| {
1604                Box::pin(async move {
1605                    use crate::stores::event_log_store::log::GuardianDBEventLogStore;
1606                    // Converte Box<dyn Address> para Arc<dyn Address + Send + Sync>
1607                    let arc_address: Arc<dyn crate::address::Address + Send + Sync> =
1608                        Arc::from(address as Box<dyn crate::address::Address + Send + Sync>);
1609
1610                    let store = GuardianDBEventLogStore::new(ipfs, identity, arc_address, options)
1611                        .await
1612                        .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1613
1614                    Ok(Box::new(store)
1615                        as Box<
1616                            dyn crate::traits::Store<Error = crate::error::GuardianError>,
1617                        >)
1618                })
1619                    as Pin<
1620                        Box<
1621                            dyn std::future::Future<
1622                                    Output = crate::error::Result<
1623                                        Box<
1624                                            dyn crate::traits::Store<
1625                                                    Error = crate::error::GuardianError,
1626                                                >,
1627                                        >,
1628                                    >,
1629                                > + Send,
1630                        >,
1631                    >
1632            },
1633        );
1634        // Registra KeyValueStore
1635        let keyvalue_constructor = Arc::new(
1636            |ipfs: Arc<crate::ipfs_core_api::client::IpfsClient>,
1637             identity: Arc<crate::ipfs_log::identity::Identity>,
1638             address: Box<dyn crate::address::Address>,
1639             options: crate::traits::NewStoreOptions| {
1640                Box::pin(async move {
1641                    use crate::stores::kv_store::keyvalue::GuardianDBKeyValue;
1642                    // Converte Box<dyn Address> para Arc<dyn Address + Send + Sync>
1643                    let arc_address: Arc<dyn crate::address::Address + Send + Sync> =
1644                        Arc::from(address as Box<dyn crate::address::Address + Send + Sync>);
1645
1646                    let store = GuardianDBKeyValue::new(ipfs, identity, arc_address, Some(options))
1647                        .await
1648                        .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1649
1650                    Ok(Box::new(store)
1651                        as Box<
1652                            dyn crate::traits::Store<Error = crate::error::GuardianError>,
1653                        >)
1654                })
1655                    as Pin<
1656                        Box<
1657                            dyn std::future::Future<
1658                                    Output = crate::error::Result<
1659                                        Box<
1660                                            dyn crate::traits::Store<
1661                                                    Error = crate::error::GuardianError,
1662                                                >,
1663                                        >,
1664                                    >,
1665                                > + Send,
1666                        >,
1667                    >
1668            },
1669        );
1670
1671        // Registra DocumentStore
1672        let document_constructor = Arc::new(
1673            |ipfs: Arc<crate::ipfs_core_api::client::IpfsClient>,
1674             identity: Arc<crate::ipfs_log::identity::Identity>,
1675             address: Box<dyn crate::address::Address>,
1676             options: crate::traits::NewStoreOptions| {
1677                Box::pin(async move {
1678                    use crate::stores::document_store::document::GuardianDBDocumentStore;
1679
1680                    // Converte Box<dyn Address> para Arc<dyn Address>
1681                    let arc_address: Arc<dyn crate::address::Address> =
1682                        Arc::from(address as Box<dyn crate::address::Address>);
1683
1684                    let store = GuardianDBDocumentStore::new(ipfs, identity, arc_address, options)
1685                        .await
1686                        .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1687
1688                    Ok(Box::new(store)
1689                        as Box<
1690                            dyn crate::traits::Store<Error = crate::error::GuardianError>,
1691                        >)
1692                })
1693                    as Pin<
1694                        Box<
1695                            dyn std::future::Future<
1696                                    Output = crate::error::Result<
1697                                        Box<
1698                                            dyn crate::traits::Store<
1699                                                    Error = crate::error::GuardianError,
1700                                                >,
1701                                        >,
1702                                    >,
1703                                > + Send,
1704                        >,
1705                    >
1706            },
1707        );
1708
1709        // Efetua os registros
1710        self.register_store_type("eventlog".to_string(), eventlog_constructor);
1711        self.register_store_type("keyvalue".to_string(), keyvalue_constructor);
1712        self.register_store_type("document".to_string(), document_constructor);
1713
1714        tracing::debug!(
1715            types = ?self.store_types_names(),
1716            "Construtores padrão registrados"
1717        );
1718    }
1719
1720    /// Retorna o barramento de eventos da instância do GuardianDB.
1721    pub fn event_bus(&self) -> Arc<EventBusImpl> {
1722        self.event_bus.clone()
1723    }
1724
1725    /// Inicia uma tarefa em background para escutar eventos do pubsub e processá-los.
1726    pub async fn monitor_direct_channel(
1727        &self,
1728        event_bus: Arc<EventBusImpl>,
1729    ) -> Result<JoinHandle<()>> {
1730        let mut receiver = event_bus
1731            .subscribe::<EventPubSubPayload>()
1732            .await
1733            .map_err(|e| {
1734                GuardianError::Other(format!(
1735                    "não foi possível se inscrever nos eventos do pubsub: {}",
1736                    e
1737                ))
1738            })?;
1739
1740        // Clona os Arcs e outros dados necessários para a tarefa assíncrona
1741        let token = self.cancellation_token.clone();
1742        let message_marshaler = self.message_marshaler.clone();
1743        let emitters = self.emitters.clone();
1744        let stores = self.stores.clone();
1745
1746        let handle = tokio::spawn(async move {
1747            tracing::debug!("Monitor do canal direto iniciado");
1748
1749            loop {
1750                tokio::select! {
1751                    // Escuta o sinal de cancelamento
1752                    _ = token.cancelled() => {
1753                        tracing::debug!("monitor_direct_channel encerrando");
1754                        return;
1755                    }
1756                    // Escuta por novos eventos
1757                    maybe_event = receiver.recv() => {
1758                        match maybe_event {
1759                            Ok(event) => {
1760                                tracing::trace!(
1761                                    peer = %event.peer,
1762                                    payload_size = event.payload.len(),
1763                                    "Evento recebido no canal direto"
1764                                );
1765
1766                                // ETAPA 1: Deserialização da mensagem usando message_marshaler
1767                                let msg = match message_marshaler.unmarshal(&event.payload) {
1768                                    Ok(msg) => msg,
1769                                    Err(e) => {
1770                                        tracing::warn!(
1771                                            peer = %event.peer,
1772                                            error = %e,
1773                                            payload_size = event.payload.len(),
1774                                            "Falha ao deserializar mensagem do canal direto"
1775                                        );
1776                                        continue;
1777                                    }
1778                                };
1779
1780                                tracing::debug!(
1781                                    peer = %event.peer,
1782                                    store_address = %msg.address,
1783                                    heads_count = msg.heads.len(),
1784                                    "Mensagem deserializada com sucesso"
1785                                );
1786
1787                                // ETAPA 2: Busca da store correspondente pelo endereço
1788                                let store = {
1789                                    let stores_guard = stores.read();
1790                                    stores_guard.get(&msg.address).cloned()
1791                                };
1792
1793                                let _store = match store {
1794                                    Some(store) => store,
1795                                    None => {
1796                                        tracing::debug!(
1797                                            store_address = %msg.address,
1798                                            peer = %event.peer,
1799                                            "Store não encontrada para endereço, ignorando mensagem"
1800                                        );
1801                                        continue;
1802                                    }
1803                                };
1804
1805                                // ETAPA 3: Processamento da troca de heads
1806                                // Realiza validação básica dos heads recebidos
1807                                let valid_heads: Vec<_> = msg.heads.iter()
1808                                    .filter(|head| !head.id.is_empty() && !head.payload.is_empty())
1809                                    .cloned()
1810                                    .collect();
1811
1812                                if valid_heads.is_empty() {
1813                                    tracing::warn!(
1814                                        store_address = %msg.address,
1815                                        peer = %event.peer,
1816                                        total_heads = msg.heads.len(),
1817                                        "Todos os heads recebidos são inválidos"
1818                                    );
1819                                    continue;
1820                                }
1821
1822                                tracing::debug!(
1823                                    store_address = %msg.address,
1824                                    peer = %event.peer,
1825                                    valid_heads = valid_heads.len(),
1826                                    total_heads = msg.heads.len(),
1827                                    "Processando heads válidos"
1828                                );
1829
1830                                // ETAPA 4: Sincronização efetiva com a store
1831                                // Sincronização usando o método sync da store
1832                                tracing::debug!(
1833                                    store_address = %msg.address,
1834                                    peer = %event.peer,
1835                                    valid_heads = valid_heads.len(),
1836                                    "Iniciando sincronização com a store"
1837                                );
1838
1839                                // Realiza a sincronização usando o método sync da trait Store
1840                                // Nota: Usamos interior mutability para compatibilidade com Arc<>
1841                                let sync_result = Self::sync_store_with_heads(&_store, valid_heads.clone()).await;
1842
1843                                match sync_result {
1844                                    Ok(()) => {
1845                                        tracing::debug!(
1846                                            store_address = %msg.address,
1847                                            peer = %event.peer,
1848                                            processed_heads = valid_heads.len(),
1849                                            "Sincronização de heads completada com sucesso"
1850                                        );
1851                                    }
1852                                    Err(e) => {
1853                                        tracing::error!(
1854                                            store_address = %msg.address,
1855                                            peer = %event.peer,
1856                                            error = %e,
1857                                            attempted_heads = valid_heads.len(),
1858                                            "Erro durante sincronização de heads"
1859                                        );
1860                                        // Não fazemos continue aqui para permitir emissão de evento mesmo com erro
1861                                    }
1862                                }
1863
1864                                // ETAPA 5: Emissão de evento para notificar componentes interessados
1865                                let exchange_event = EventExchangeHeads::new(event.peer, msg);
1866                                if let Err(e) = emitters.new_heads.emit(exchange_event) {
1867                                    tracing::error!(
1868                                        error = %e,
1869                                        peer = %event.peer,
1870                                        "Erro ao emitir evento new_heads"
1871                                    );
1872                                } else {
1873                                    tracing::trace!(peer = %event.peer, "Evento new_heads emitido com sucesso");
1874                                }
1875                            }
1876                            Err(_) => {
1877                                // O canal foi fechado, encerra a tarefa.
1878                                tracing::debug!("Canal de eventos fechado, encerrando monitor");
1879                                break;
1880                            }
1881                        }
1882                    }
1883                }
1884            }
1885
1886            tracing::debug!("Monitor do canal direto finalizado");
1887        });
1888
1889        Ok(handle)
1890    }
1891
1892    /// Método helper para sincronizar uma store com heads recebidos
1893    /// Resolve o problema de mutabilidade quando trabalhando com Arc<GuardianStore>
1894    async fn sync_store_with_heads(
1895        store: &Arc<GuardianStore>,
1896        heads: Vec<crate::ipfs_log::entry::Entry>,
1897    ) -> Result<()> {
1898        // Estratégia: Usar interior mutability através de downcasting para BaseStore
1899        // Primeiro, tenta fazer downcast para BaseStore diretamente
1900        if let Some(base_store) = store
1901            .as_any()
1902            .downcast_ref::<crate::stores::base_store::base_store::BaseStore>()
1903        {
1904            // BaseStore funciona com interior mutability
1905            return base_store.sync(heads).await.map_err(|e| {
1906                GuardianError::Store(format!("Erro na sincronização BaseStore: {}", e))
1907            });
1908        }
1909        // Fallback: Para stores que não expõem BaseStore diretamente
1910        // EventLogStore - tenta acessar BaseStore interno
1911        if let Some(event_log_store) = store
1912            .as_any()
1913            .downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>(
1914        ) {
1915            // Acessa o BaseStore interno que tem sync(&self)
1916            let base_store = event_log_store.basestore();
1917            return base_store.sync(heads).await.map_err(|e| {
1918                GuardianError::Store(format!("Erro na sincronização EventLogStore: {}", e))
1919            });
1920        }
1921
1922        // KeyValueStore - tenta acessar BaseStore interno
1923        if let Some(kv_store) = store
1924            .as_any()
1925            .downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>()
1926        {
1927            // Acessa o BaseStore interno que tem sync(&self)
1928            let base_store = kv_store.basestore();
1929            return base_store.sync(heads).await.map_err(|e| {
1930                GuardianError::Store(format!("Erro na sincronização KeyValueStore: {}", e))
1931            });
1932        }
1933
1934        // DocumentStore - tenta acessar BaseStore interno
1935        if let Some(doc_store) = store
1936            .as_any()
1937            .downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>(
1938        ) {
1939            // Acessa o BaseStore interno que tem sync(&self)
1940            let base_store = doc_store.basestore();
1941            return base_store.sync(heads).await.map_err(|e| {
1942                GuardianError::Store(format!("Erro na sincronização DocumentStore: {}", e))
1943            });
1944        }
1945
1946        // Se nenhum downcast funcionou, retorna erro
1947        Err(GuardianError::Other(
1948            "Tipo de store não suportado para sincronização ou downcast falhou".to_string(),
1949        ))
1950    }
1951
1952    /// Método helper para obter o número total de entradas em uma store
1953    /// Usado para gerar eventos informativos sobre o estado da store
1954    async fn get_store_total_entries(&self, store: &Arc<GuardianStore>) -> Result<usize> {
1955        // Tenta acessar o BaseStore interno para obter informações do oplog
1956        // Primeiro, tenta fazer downcast para BaseStore diretamente
1957        if let Some(base_store) = store
1958            .as_any()
1959            .downcast_ref::<crate::stores::base_store::base_store::BaseStore>()
1960        {
1961            // Acessa o oplog para obter o número de entradas
1962            let op_log = base_store.op_log();
1963            let log = op_log.read();
1964            return Ok(log.len());
1965        }
1966
1967        // Fallback: Para stores que não expõem BaseStore diretamente
1968        // EventLogStore - tenta acessar BaseStore interno
1969        if let Some(event_log_store) = store
1970            .as_any()
1971            .downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>(
1972        ) {
1973            let base_store = event_log_store.basestore();
1974            let op_log = base_store.op_log();
1975            let log = op_log.read();
1976            return Ok(log.len());
1977        }
1978
1979        // KeyValueStore - tenta acessar BaseStore interno
1980        if let Some(kv_store) = store
1981            .as_any()
1982            .downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>()
1983        {
1984            let base_store = kv_store.basestore();
1985            let op_log = base_store.op_log();
1986            let log = op_log.read();
1987            return Ok(log.len());
1988        }
1989
1990        // DocumentStore - tenta acessar BaseStore interno
1991        if let Some(doc_store) = store
1992            .as_any()
1993            .downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>(
1994        ) {
1995            let base_store = doc_store.basestore();
1996            let op_log = base_store.op_log();
1997            let log = op_log.read();
1998            return Ok(log.len());
1999        }
2000
2001        // Se nenhum downcast funcionou, retorna erro
2002        Err(GuardianError::Other(
2003            "Tipo de store não suportado para obter total de entradas ou downcast falhou"
2004                .to_string(),
2005        ))
2006    }
2007
2008    /// Função helper estática para criar e iniciar o monitor do canal direto
2009    /// durante a inicialização, evitando problemas de referência circular
2010    fn start_monitor_task(
2011        event_bus: Arc<EventBusImpl>,
2012        cancellation_token: CancellationToken,
2013        span: Span,
2014    ) -> JoinHandle<()> {
2015        tokio::spawn(async move {
2016            let _enter = span.enter();
2017            // Tenta se inscrever nos eventos do pubsub
2018            let mut receiver = match event_bus.subscribe::<EventPubSubPayload>().await {
2019                Ok(rx) => rx,
2020                Err(e) => {
2021                    tracing::error!("Falha ao se inscrever nos eventos do pubsub: {}", e);
2022                    return;
2023                }
2024            };
2025
2026            tracing::debug!("Monitor do canal direto iniciado");
2027
2028            loop {
2029                tokio::select! {
2030                    // Escuta o sinal de cancelamento
2031                    _ = cancellation_token.cancelled() => {
2032                        tracing::debug!("Monitor do canal direto encerrando");
2033                        return;
2034                    }
2035                    // Escuta por novos eventos
2036                    maybe_event = receiver.recv() => {
2037                        match maybe_event {
2038                            Ok(event) => {
2039                                tracing::trace!(
2040                                    peer = %event.peer,
2041                                    "Evento recebido no monitor do canal direto"
2042                                );
2043
2044                                // Processa diferentes tipos de eventos do canal direto:
2045                                // 1. Eventos de troca de heads (sincronização de dados)
2046                                // 2. Eventos de peer connection/disconnection
2047                                // 3. Eventos de mensagens do protocolo
2048
2049                                tracing::debug!(
2050                                    event_type = "pubsub_payload",
2051                                    from_peer = %event.peer,
2052                                    payload_size = event.payload.len(),
2053                                    "Processando evento de canal direto"
2054                                );
2055
2056                                // Note: O processamento completo é feito pelo monitor principal via monitor_direct_channel()
2057                                // que tem acesso ao message_marshaler, stores e emitters
2058                            }
2059                            Err(_) => {
2060                                tracing::debug!("Canal de eventos fechado, encerrando monitor");
2061                                break;
2062                            }
2063                        }
2064                    }
2065                }
2066            }
2067        })
2068    }
2069
2070    /// Verifica as permissões de acesso dos heads usando o Access Controller da store
2071    ///
2072    /// Realiza verificação completa de permissões para cada head:
2073    /// 1. Extração da identidade do head
2074    /// 2. Verificação de permissões de escrita via Access Controller
2075    /// 3. Validação de assinatura da identidade se necessário
2076    /// 4. Filtragem de heads não autorizados
2077    ///
2078    /// # Argumentos
2079    ///
2080    /// * `heads` - Lista de heads a serem verificados
2081    /// * `store` - Store que contém o Access Controller para verificação
2082    ///
2083    /// # Retorna
2084    ///
2085    /// * `Ok(Vec<Entry>)` - Lista filtrada contendo apenas heads autorizados
2086    /// * `Err(GuardianError)` - Se houve erro crítico na verificação
2087    ///
2088    /// # Política de Segurança
2089    ///
2090    /// - Heads sem identidade são **rejeitados** por motivos de segurança
2091    /// - Identidades inválidas ou não autorizadas são **rejeitadas**
2092    /// - Falhas de verificação são logadas mas não interrompem o processamento
2093    /// - Apenas heads explicitamente autorizados são aceitos
2094    async fn verify_heads_permissions(
2095        &self,
2096        heads: &[crate::ipfs_log::entry::Entry],
2097        store: &Arc<GuardianStore>,
2098    ) -> Result<Vec<crate::ipfs_log::entry::Entry>> {
2099        tracing::debug!(
2100            heads_count = heads.len(),
2101            "Iniciando verificação de permissões para heads"
2102        );
2103
2104        let mut authorized_heads = Vec::new();
2105        let mut denied_count = 0;
2106        let mut no_identity_count = 0;
2107
2108        // Obtém o Access Controller da store para verificação
2109        let access_controller = {
2110            // Tenta acessar o BaseStore interno das stores conhecidas
2111            if let Some(event_log_store) = store.as_any().downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>() {
2112                event_log_store.basestore().access_controller()
2113            } else if let Some(kv_store) = store.as_any().downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>() {
2114                kv_store.basestore().access_controller()
2115            } else if let Some(doc_store) = store.as_any().downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>() {
2116                doc_store.basestore().access_controller()
2117            } else if let Some(base_store) = store.as_any().downcast_ref::<crate::stores::base_store::base_store::BaseStore>() {
2118                base_store.access_controller()
2119            } else {
2120                tracing::warn!("Tipo de store não suportado para verificação de permissões");
2121                return Err(GuardianError::Store(
2122                    "Store type not supported for permission verification".to_string()
2123                ));
2124            }
2125        };
2126
2127        tracing::debug!(
2128            access_controller_type = access_controller.get_type(),
2129            "Access Controller obtido"
2130        );
2131
2132        // Verificação individual de cada head
2133        for (i, head) in heads.iter().enumerate() {
2134            // VERIFICAÇÃO 1: Presença de identidade
2135            let identity = match &head.identity {
2136                Some(identity) => identity,
2137                None => {
2138                    tracing::debug!(
2139                        head_index = i + 1,
2140                        total_heads = heads.len(),
2141                        head_hash = %head.hash,
2142                        "Head rejeitado: sem identidade"
2143                    );
2144                    no_identity_count += 1;
2145                    continue;
2146                }
2147            };
2148
2149            // VERIFICAÇÃO 2: Validação básica da identidade
2150            if identity.id().is_empty() || identity.pub_key().is_empty() {
2151                tracing::debug!(
2152                    head_index = i + 1,
2153                    total_heads = heads.len(),
2154                    head_hash = %head.hash,
2155                    "Head rejeitado: identidade inválida"
2156                );
2157                denied_count += 1;
2158                continue;
2159            }
2160
2161            // VERIFICAÇÃO 3: Permissões de escrita via Access Controller
2162            let identity_key = identity.pub_key();
2163            let has_write_permission = match access_controller.get_authorized_by_role("write").await
2164            {
2165                Ok(authorized_keys) => {
2166                    // Verifica se a chave está explicitamente autorizada
2167                    authorized_keys.contains(&identity_key.to_string())
2168                        || authorized_keys.contains(&identity.id().to_string())
2169                        || authorized_keys.contains(&"*".to_string()) // Permissão universal
2170                }
2171                Err(e) => {
2172                    tracing::warn!(
2173                        head_index = i + 1,
2174                        total_heads = heads.len(),
2175                        error = %e,
2176                        head_hash = %head.hash,
2177                        "Head erro ao verificar permissões"
2178                    );
2179
2180                    // Emitir evento de erro de permissão
2181                    let permission_denied_event = EventPermissionDenied::new(
2182                        store.address().to_string(),
2183                        identity.id().to_string(),
2184                        identity_key.to_string(),
2185                        "write".to_string(),
2186                    );
2187
2188                    if let Err(emit_err) = self
2189                        .emitters
2190                        .permission_denied
2191                        .emit(permission_denied_event)
2192                    {
2193                        tracing::warn!(error = %emit_err, "Erro ao emitir evento PermissionDenied");
2194                    }
2195
2196                    false // Em caso de erro, nega acesso por segurança
2197                }
2198            };
2199
2200            if !has_write_permission {
2201                tracing::debug!(
2202                    head_index = i + 1,
2203                    total_heads = heads.len(),
2204                    head_hash = %head.hash,
2205                    identity_id = identity.id(),
2206                    "Head rejeitado: sem permissão de escrita"
2207                );
2208
2209                // Emitir evento de permissão negada
2210                let permission_denied_event = EventPermissionDenied::new(
2211                    store.address().to_string(),
2212                    identity.id().to_string(),
2213                    identity_key.to_string(),
2214                    "write".to_string(),
2215                );
2216
2217                if let Err(e) = self
2218                    .emitters
2219                    .permission_denied
2220                    .emit(permission_denied_event)
2221                {
2222                    tracing::warn!(error = %e, "Erro ao emitir evento PermissionDenied");
2223                }
2224
2225                denied_count += 1;
2226                continue;
2227            }
2228
2229            // VERIFICAÇÃO 4: Verifica também permissões administrativas como fallback
2230            let has_admin_permission = match access_controller.get_authorized_by_role("admin").await
2231            {
2232                Ok(admin_keys) => {
2233                    admin_keys.contains(&identity_key.to_string())
2234                        || admin_keys.contains(&identity.id().to_string())
2235                        || admin_keys.contains(&"*".to_string())
2236                }
2237                Err(_) => false, // Não crítico se admin falhar
2238            };
2239
2240            // VERIFICAÇÃO 5: Aceita head se tem permissão de escrita ou admin
2241            if has_write_permission || has_admin_permission {
2242                let permission_type = if has_admin_permission {
2243                    "admin"
2244                } else {
2245                    "write"
2246                };
2247                tracing::debug!(
2248                    head_index = i + 1,
2249                    total_heads = heads.len(),
2250                    permission_type = permission_type,
2251                    head_hash = %head.hash,
2252                    identity_id = identity.id(),
2253                    "Head autorizado"
2254                );
2255
2256                authorized_heads.push(head.clone());
2257            } else {
2258                tracing::debug!(
2259                    head_index = i + 1,
2260                    total_heads = heads.len(),
2261                    head_hash = %head.hash,
2262                    identity_id = identity.id(),
2263                    "Head rejeitado: sem permissões adequadas"
2264                );
2265
2266                // Emitir evento de permissão negada final
2267                let permission_denied_event = EventPermissionDenied::new(
2268                    store.address().to_string(),
2269                    identity.id().to_string(),
2270                    identity_key.to_string(),
2271                    "write/admin".to_string(),
2272                );
2273
2274                if let Err(e) = self
2275                    .emitters
2276                    .permission_denied
2277                    .emit(permission_denied_event)
2278                {
2279                    tracing::warn!(error = %e, "Erro ao emitir evento PermissionDenied");
2280                }
2281
2282                denied_count += 1;
2283            }
2284        }
2285
2286        // Log detalhado dos resultados da verificação
2287        let authorized_count = authorized_heads.len();
2288        let total_heads = heads.len();
2289
2290        tracing::debug!(
2291            total_heads = total_heads,
2292            authorized_heads = authorized_count,
2293            denied_heads = denied_count,
2294            no_identity_heads = no_identity_count,
2295            access_controller_type = access_controller.get_type(),
2296            "Verificação de permissões concluída"
2297        );
2298
2299        if authorized_count == 0 && total_heads > 0 {
2300            tracing::warn!(
2301                total_heads = total_heads,
2302                "ATENÇÃO: Todos os heads foram rejeitados por falta de permissões"
2303            );
2304
2305            // Lista as chaves autorizadas para debug
2306            if let Ok(write_keys) = access_controller.get_authorized_by_role("write").await {
2307                tracing::debug!(write_keys = ?write_keys, "Chaves autorizadas para escrita");
2308            }
2309            if let Ok(admin_keys) = access_controller.get_authorized_by_role("admin").await {
2310                tracing::debug!(admin_keys = ?admin_keys, "Chaves autorizadas para admin");
2311            }
2312        } else if authorized_count < total_heads {
2313            tracing::info!(
2314                authorized_heads = authorized_count,
2315                total_heads = total_heads,
2316                rejected_heads = total_heads - authorized_count,
2317                "Verificação parcial de permissões concluída"
2318            );
2319        } else if authorized_count == total_heads && total_heads > 0 {
2320            tracing::debug!(
2321                authorized_heads = total_heads,
2322                "Verificação completa: todos os heads foram autorizados"
2323            );
2324        }
2325
2326        Ok(authorized_heads)
2327    }
2328
2329    /// Verifica criptograficamente a validade de uma identidade
2330    ///
2331    /// Realiza verificação completa da identidade usando:
2332    /// 1. Validação da chave pública
2333    /// 2. Verificação de assinatura usando secp256k1
2334    /// 3. Validação das assinaturas de identidade e chave pública
2335    /// 4. Verificação da integridade dos dados assinados
2336    ///
2337    /// # Argumentos
2338    ///
2339    /// * `identity` - A identidade a ser verificada
2340    ///
2341    /// # Retorna
2342    ///
2343    /// * `Ok(())` se a identidade é válida
2344    /// * `Err(GuardianError)` se a verificação falhou
2345    async fn verify_identity_cryptographically(&self, identity: &Identity) -> Result<()> {
2346        // ETAPA 1: Validação básica dos campos obrigatórios
2347        if identity.id().is_empty() {
2348            return Err(GuardianError::Store(
2349                "Identity ID cannot be empty".to_string(),
2350            ));
2351        }
2352
2353        if identity.pub_key().is_empty() {
2354            return Err(GuardianError::Store(
2355                "Identity public key cannot be empty".to_string(),
2356            ));
2357        }
2358
2359        // ETAPA 2: Validação da chave pública usando secp256k1
2360        let pub_key_hex = identity.pub_key();
2361        let pub_key_bytes = match hex::decode(pub_key_hex) {
2362            Ok(bytes) => bytes,
2363            Err(e) => {
2364                return Err(GuardianError::Store(format!(
2365                    "Failed to decode public key from hex: {}",
2366                    e
2367                )));
2368            }
2369        };
2370
2371        let secp = secp256k1::Secp256k1::new();
2372        let public_key = match secp256k1::PublicKey::from_slice(&pub_key_bytes) {
2373            Ok(pk) => pk,
2374            Err(e) => {
2375                return Err(GuardianError::Store(format!(
2376                    "Invalid secp256k1 public key: {}",
2377                    e
2378                )));
2379            }
2380        };
2381
2382        // ETAPA 3: Verificação das assinaturas da identidade
2383        let signatures = identity.signatures();
2384
2385        // Verifica assinatura do ID
2386        if !signatures.id().is_empty() {
2387            match self.verify_signature_with_secp256k1(
2388                identity.id(),
2389                signatures.id(),
2390                &public_key,
2391                &secp,
2392            ) {
2393                Ok(true) => {
2394                    tracing::debug!("Identity ID signature verified successfully");
2395                }
2396                Ok(false) => {
2397                    return Err(GuardianError::Store(
2398                        "Identity ID signature verification failed".to_string(),
2399                    ));
2400                }
2401                Err(e) => {
2402                    return Err(GuardianError::Store(format!(
2403                        "Error verifying ID signature: {}",
2404                        e
2405                    )));
2406                }
2407            }
2408        }
2409
2410        // Verifica assinatura da chave pública
2411        if !signatures.pub_key().is_empty() {
2412            // Reconstrói os dados que foram assinados para a chave pública
2413            let pub_key_data = format!("{}{}", identity.pub_key(), signatures.id());
2414
2415            match self.verify_signature_with_secp256k1(
2416                &pub_key_data,
2417                signatures.pub_key(),
2418                &public_key,
2419                &secp,
2420            ) {
2421                Ok(true) => {
2422                    tracing::debug!("Identity public key signature verified successfully");
2423                }
2424                Ok(false) => {
2425                    return Err(GuardianError::Store(
2426                        "Identity public key signature verification failed".to_string(),
2427                    ));
2428                }
2429                Err(e) => {
2430                    return Err(GuardianError::Store(format!(
2431                        "Error verifying public key signature: {}",
2432                        e
2433                    )));
2434                }
2435            }
2436        }
2437
2438        // ETAPA 4: Verificação adicional usando libp2p se disponível
2439        if let Some(libp2p_key) = identity.public_key() {
2440            // Verifica se a chave libp2p é consistente com a chave secp256k1
2441            let peer_id = libp2p_identity::PeerId::from_public_key(&libp2p_key);
2442            tracing::debug!(peer_id = %peer_id, "Identity verified with libp2p PeerID");
2443        }
2444
2445        tracing::debug!(
2446            identity_id = identity.id(),
2447            public_key_len = identity.pub_key().len(),
2448            "Identity cryptographic verification completed successfully"
2449        );
2450
2451        Ok(())
2452    }
2453
2454    /// Verifica uma assinatura usando secp256k1
2455    ///
2456    /// # Argumentos
2457    ///
2458    /// * `message` - A mensagem original que foi assinada
2459    /// * `signature_str` - A assinatura em formato string
2460    /// * `public_key` - A chave pública secp256k1
2461    /// * `secp` - Instância do secp256k1
2462    ///
2463    /// # Retorna
2464    ///
2465    /// * `Ok(true)` se a assinatura é válida
2466    /// * `Ok(false)` se a assinatura é inválida
2467    /// * `Err(GuardianError)` se houve erro no processo de verificação
2468    fn verify_signature_with_secp256k1(
2469        &self,
2470        message: &str,
2471        signature_str: &str,
2472        public_key: &secp256k1::PublicKey,
2473        secp: &secp256k1::Secp256k1<secp256k1::All>,
2474    ) -> Result<bool> {
2475        use secp256k1::Message;
2476        use sha2::{Digest, Sha256};
2477        use std::str::FromStr;
2478
2479        // Cria hash SHA256 da mensagem
2480        let mut hasher = Sha256::new();
2481        hasher.update(message.as_bytes());
2482        let message_hash = hasher.finalize();
2483        let message_hash_array: [u8; 32] = message_hash.into();
2484
2485        // Cria mensagem secp256k1 a partir do hash
2486        let secp_message = Message::from_digest(message_hash_array);
2487
2488        // Parse da assinatura
2489        let signature = match secp256k1::ecdsa::Signature::from_str(signature_str) {
2490            Ok(sig) => sig,
2491            Err(e) => {
2492                tracing::debug!(
2493                    signature = signature_str,
2494                    error = %e,
2495                    "Failed to parse signature"
2496                );
2497                return Ok(false); // Assinatura inválida, não erro fatal
2498            }
2499        };
2500
2501        // Verifica a assinatura
2502        match secp.verify_ecdsa(secp_message, &signature, public_key) {
2503            Ok(()) => {
2504                tracing::debug!("Signature verification successful");
2505                Ok(true)
2506            }
2507            Err(e) => {
2508                tracing::debug!(error = %e, "Signature verification failed");
2509                Ok(false) // Assinatura inválida, não erro fatal
2510            }
2511        }
2512    }
2513
2514    /// Processa um evento de troca de "heads", sincronizando as novas entradas com a store local.
2515    ///
2516    /// Realiza a sincronização completa dos heads recebidos, incluindo:
2517    /// 1. Validação de integridade dos heads
2518    /// 2. Verificação de permissões de acesso
2519    /// 3. Detecção de duplicatas existentes
2520    /// 4. Sincronização efetiva com a store
2521    /// 5. Emissão de eventos de progresso
2522    ///
2523    /// # Argumentos
2524    ///
2525    /// * `event` - Evento contendo os heads a serem sincronizados e metadados
2526    /// * `store` - Referência à store que receberá os heads
2527    ///
2528    /// # Processamento
2529    ///
2530    /// 1. **Validação Básica**: Verifica se os heads possuem dados válidos (hash, payload)
2531    /// 2. **Controle de Acesso**: Usa o access controller da store para validar permissões
2532    /// 3. **Detecção de Duplicatas**: Consulta o oplog para evitar reprocessamento
2533    /// 4. **Sincronização**: Delega para o método `sync()` da store que implementa a lógica completa
2534    /// 5. **Eventos**: Emite eventos de progresso para componentes interessados
2535    ///
2536    /// # Performance
2537    ///
2538    /// - **O(n)** onde n = número de heads recebidos
2539    /// - **Paralelização**: Validação sequencial, mas sync em batch para eficiência
2540    /// - **Cache-aware**: Aproveita índices existentes para detecção de duplicatas
2541    ///
2542    /// # Erros
2543    ///
2544    /// - Retorna erro se a sincronização da store falhar
2545    /// - Heads individuais inválidos são ignorados (logged) mas não causam falha geral
2546    pub async fn handle_event_exchange_heads(
2547        &self,
2548        event: &MessageExchangeHeads,
2549        store: Arc<GuardianStore>,
2550    ) -> Result<()> {
2551        let heads = &event.heads;
2552        let store_address = &event.address;
2553
2554        tracing::debug!(
2555            peer_id = %self.peer_id(),
2556            count = heads.len(),
2557            store_address = store_address,
2558            "Processando evento de exchange heads"
2559        );
2560
2561        if heads.is_empty() {
2562            tracing::debug!("Nenhum head recebido para sincronização");
2563            return Ok(());
2564        }
2565
2566        // ETAPA 1: Validação básica e filtragem de heads inválidos
2567        let mut valid_heads = Vec::new();
2568        let mut skipped_count = 0;
2569
2570        for (i, head) in heads.iter().enumerate() {
2571            // Validação de integridade básica
2572            if head.hash.is_empty() || head.payload.is_empty() {
2573                tracing::debug!(
2574                    head_index = i + 1,
2575                    total_heads = heads.len(),
2576                    "Head ignorado: dados inválidos (hash ou payload vazio)"
2577                );
2578                skipped_count += 1;
2579                continue;
2580            }
2581
2582            // Validação de estrutura
2583            if head.id.is_empty() {
2584                tracing::debug!(
2585                    head_index = i + 1,
2586                    total_heads = heads.len(),
2587                    "Head ignorado: ID vazio"
2588                );
2589                skipped_count += 1;
2590                continue;
2591            }
2592
2593            // Validação de identidade (se disponível)
2594            if let Some(identity) = &head.identity {
2595                if identity.id().is_empty() || identity.pub_key().is_empty() {
2596                    tracing::warn!(
2597                        head_index = i + 1,
2598                        total_heads = heads.len(),
2599                        head_hash = %head.hash,
2600                        "Head com identidade inválida"
2601                    );
2602                } else {
2603                    tracing::debug!(
2604                        head_index = i + 1,
2605                        total_heads = heads.len(),
2606                        head_hash = %head.hash,
2607                        identity_id = identity.id(),
2608                        "Head com identidade válida"
2609                    );
2610
2611                    // Verificação criptográfica da identidade
2612                    match self.verify_identity_cryptographically(identity).await {
2613                        Ok(()) => {
2614                            tracing::debug!(
2615                                head_index = i + 1,
2616                                total_heads = heads.len(),
2617                                head_hash = %head.hash,
2618                                "Head identidade verificada criptograficamente"
2619                            );
2620                        }
2621                        Err(e) => {
2622                            tracing::warn!(
2623                                head_index = i + 1,
2624                                total_heads = heads.len(),
2625                                head_hash = %head.hash,
2626                                error = %e,
2627                                "Head falha na verificação criptográfica da identidade"
2628                            );
2629                            // Continua processamento mesmo com falha na verificação para compatibilidade
2630                            // Note: Em ambiente de produção, você pode escolher rejeitar heads com identidades inválidas
2631                        }
2632                    }
2633                }
2634            }
2635
2636            valid_heads.push(head.clone());
2637
2638            tracing::debug!(
2639                head_index = i + 1,
2640                total_heads = heads.len(),
2641                head_hash = %head.hash,
2642                clock_id = head.clock.id(),
2643                clock_time = head.clock.time(),
2644                "Head validado"
2645            );
2646        }
2647
2648        if valid_heads.is_empty() {
2649            tracing::warn!(
2650                total_heads = heads.len(),
2651                "Todos os heads recebidos são inválidos"
2652            );
2653            return Ok(());
2654        }
2655
2656        if skipped_count > 0 {
2657            tracing::debug!(
2658                valid_heads = valid_heads.len(),
2659                total_heads = heads.len(),
2660                skipped_count = skipped_count,
2661                "Validação concluída com heads ignorados"
2662            );
2663        }
2664
2665        // ETAPA 2: Verificação de permissões de acesso via Access Controller
2666        tracing::debug!(
2667            valid_heads_count = valid_heads.len(),
2668            "Verificando permissões de acesso para heads"
2669        );
2670
2671        // Verificação completa de permissões usando o Access Controller da store
2672        let permitted_heads = self.verify_heads_permissions(&valid_heads, &store).await?;
2673
2674        // ETAPA 3: Detecção de duplicatas consultando oplog existente
2675        tracing::debug!(
2676            permitted_heads_count = permitted_heads.len(),
2677            "Verificando duplicatas no oplog para heads"
2678        );
2679
2680        let mut new_heads = Vec::new();
2681        let mut duplicate_count = 0;
2682
2683        // Para cada head, verifica se já existe no oplog da store
2684        for (i, head) in permitted_heads.iter().enumerate() {
2685            // Verifica se o head já existe no oplog da store
2686            let head_hash = head.hash();
2687            let already_exists = {
2688                // Tenta acessar o oplog através dos tipos de store conhecidos
2689                if let Some(event_log_store) = store.as_any()
2690                    .downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>()
2691                {
2692                    event_log_store.basestore().op_log().read().has(head_hash)
2693                } else if let Some(kv_store) = store.as_any()
2694                    .downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>()
2695                {
2696                    kv_store.basestore().op_log().read().has(head_hash)
2697                } else if let Some(doc_store) = store.as_any()
2698                    .downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>()
2699                {
2700                    doc_store.basestore().op_log().read().has(head_hash)
2701                } else if let Some(base_store) = store.as_any()
2702                    .downcast_ref::<crate::stores::base_store::base_store::BaseStore>()
2703                {
2704                    base_store.op_log().read().has(head_hash)
2705                } else {
2706                    tracing::warn!(
2707                        head_index = i + 1,
2708                        total_heads = permitted_heads.len(),
2709                        head_hash = %head_hash,
2710                        "Tipo de store não suportado para verificação de duplicatas, assumindo novo"
2711                    );
2712                    false // Se não conseguimos verificar, assumimos que é novo
2713                }
2714            };
2715
2716            if already_exists {
2717                tracing::debug!(
2718                    head_index = i + 1,
2719                    total_heads = permitted_heads.len(),
2720                    head_hash = %head_hash,
2721                    "Head já existe no oplog (duplicata)"
2722                );
2723                duplicate_count += 1;
2724            } else {
2725                tracing::debug!(
2726                    head_index = i + 1,
2727                    total_heads = permitted_heads.len(),
2728                    head_hash = %head_hash,
2729                    "Head é novo, adicionando para sincronização"
2730                );
2731                new_heads.push(head.clone());
2732            }
2733        }
2734
2735        if new_heads.is_empty() {
2736            tracing::debug!(
2737                duplicate_count = duplicate_count,
2738                "Todos os heads são duplicatas, sincronização desnecessária"
2739            );
2740            return Ok(());
2741        }
2742
2743        if duplicate_count > 0 {
2744            tracing::debug!(
2745                new_heads = new_heads.len(),
2746                total_heads = heads.len(),
2747                duplicate_count = duplicate_count,
2748                "Duplicatas detectadas"
2749            );
2750        }
2751
2752        // ETAPA 4: Sincronização efetiva com a store
2753        tracing::debug!(
2754            valid_heads = new_heads.len(),
2755            store_address = store_address,
2756            "Iniciando sincronização com a store"
2757        );
2758
2759        // Armazena o count antes de mover o vector e mede o tempo de sincronização
2760        let new_heads_count = new_heads.len();
2761        let sync_start_time = std::time::Instant::now();
2762
2763        // Cria uma cópia das entradas para uso nos eventos
2764        let entries_for_events = new_heads.clone();
2765
2766        // Hlper method que resolve problemas de mutabilidade
2767        let sync_result = Self::sync_store_with_heads(&store, new_heads).await;
2768
2769        // Calcula duração da sincronização
2770        let sync_duration = sync_start_time.elapsed();
2771        let duration_ms = sync_duration.as_millis() as u64;
2772
2773        match sync_result {
2774            Ok(()) => {
2775                tracing::debug!(
2776                    processed_count = new_heads_count,
2777                    store_address = store_address,
2778                    duration_ms = duration_ms,
2779                    "Sincronização de heads concluída com sucesso"
2780                );
2781
2782                // ETAPA 5: Emissão de eventos de sucesso
2783                // Emite evento de sincronização para componentes interessados
2784                let exchange_event = EventExchangeHeads::new(self.peer_id(), event.clone());
2785
2786                if let Err(e) = self.emitters.new_heads.emit(exchange_event) {
2787                    tracing::warn!(error = %e, "Falha ao emitir evento new_heads");
2788                } else {
2789                    tracing::trace!(
2790                        processed_heads = new_heads_count,
2791                        "Evento new_heads emitido com sucesso"
2792                    );
2793                }
2794
2795                // ETAPA 6: Emissão de eventos específicos da store
2796                // Obtém informações da store para os eventos
2797                let store_type = store.store_type();
2798                let total_entries = self.get_store_total_entries(&store).await.unwrap_or(0);
2799
2800                // EventStoreUpdated: Notifica mudanças na store
2801                let store_updated_event = EventStoreUpdated::new(
2802                    store_address.clone(),
2803                    store_type.to_string(),
2804                    new_heads_count,
2805                    total_entries,
2806                );
2807
2808                if let Err(e) = self.emitters.store_updated.emit(store_updated_event) {
2809                    tracing::warn!(error = %e, "Falha ao emitir evento store_updated");
2810                } else {
2811                    tracing::debug!(
2812                        store_address = store_address,
2813                        entries_added = new_heads_count,
2814                        "Evento store_updated emitido com sucesso"
2815                    );
2816                }
2817
2818                // EventSyncCompleted: Notifica conclusão da sincronização
2819                let sync_completed_event = EventSyncCompleted::new(
2820                    store_address.clone(),
2821                    self.peer_id().to_string(),
2822                    new_heads_count,
2823                    duration_ms,
2824                    true, // success = true
2825                );
2826
2827                if let Err(e) = self.emitters.sync_completed.emit(sync_completed_event) {
2828                    tracing::warn!(error = %e, "Falha ao emitir evento sync_completed");
2829                } else {
2830                    tracing::debug!(
2831                        store_address = store_address,
2832                        duration_ms = duration_ms,
2833                        "Evento sync_completed emitido com sucesso"
2834                    );
2835                }
2836
2837                // EventNewEntries: Notifica novas entradas adicionadas
2838                if !entries_for_events.is_empty() {
2839                    let new_entries_event = EventNewEntries::new(
2840                        store_address.clone(),
2841                        entries_for_events,
2842                        total_entries,
2843                    );
2844
2845                    if let Err(e) = self.emitters.new_entries.emit(new_entries_event) {
2846                        tracing::warn!(error = %e, "Falha ao emitir evento new_entries");
2847                    } else {
2848                        tracing::debug!(
2849                            store_address = store_address,
2850                            new_entries_count = new_heads_count,
2851                            "Evento new_entries emitido com sucesso"
2852                        );
2853                    }
2854                }
2855            }
2856            Err(e) => {
2857                tracing::error!(
2858                    error = %e,
2859                    store_address = store_address,
2860                    heads_count = new_heads_count,
2861                    duration_ms = duration_ms,
2862                    "Falha na sincronização de heads"
2863                );
2864
2865                // Emite eventos de erro para componentes interessados
2866                // EventSyncError: Erro geral de sincronização
2867                let error_type = match &e {
2868                    GuardianError::Store(_) => SyncErrorType::StoreError,
2869                    GuardianError::Network(_) => SyncErrorType::NetworkError,
2870                    GuardianError::InvalidArgument(_) => SyncErrorType::ValidationError,
2871                    _ => SyncErrorType::UnknownError,
2872                };
2873
2874                let sync_error_event = EventSyncError::new(
2875                    store_address.clone(),
2876                    self.peer_id().to_string(),
2877                    e.to_string(),
2878                    new_heads_count,
2879                    error_type.clone(),
2880                );
2881
2882                if let Err(emit_err) = self.emitters.sync_error.emit(sync_error_event) {
2883                    tracing::warn!(
2884                        error = %emit_err,
2885                        original_error = %e,
2886                        "Falha ao emitir evento sync_error"
2887                    );
2888                } else {
2889                    tracing::debug!(
2890                        store_address = store_address,
2891                        error_type = ?error_type,
2892                        "Evento sync_error emitido com sucesso"
2893                    );
2894                }
2895
2896                // EventSyncCompleted com success = false
2897                let sync_completed_event = EventSyncCompleted::new(
2898                    store_address.clone(),
2899                    self.peer_id().to_string(),
2900                    0, // heads_synced = 0 devido ao erro
2901                    duration_ms,
2902                    false, // success = false
2903                );
2904
2905                if let Err(emit_err) = self.emitters.sync_completed.emit(sync_completed_event) {
2906                    tracing::warn!(error = %emit_err, "Falha ao emitir evento sync_completed (erro)");
2907                }
2908
2909                return Err(e);
2910            }
2911        }
2912
2913        tracing::debug!(
2914            total_heads_received = heads.len(),
2915            heads_processed = new_heads_count,
2916            heads_skipped = skipped_count,
2917            store_address = store_address,
2918            "Processamento de exchange heads completado com sucesso"
2919        );
2920
2921        Ok(())
2922    }
2923}
2924
2925/// Função auxiliar para criar um canal de comunicação direta.
2926pub async fn make_direct_channel(
2927    event_bus: &EventBusImpl,
2928    factory: DirectChannelFactory,
2929    options: &DirectChannelOptions,
2930) -> Result<Arc<dyn DirectChannel<Error = GuardianError> + Send + Sync>> {
2931    let emitter = crate::p2p::events::PayloadEmitter::new(event_bus)
2932        .await
2933        .map_err(|e| {
2934            GuardianError::Other(format!(
2935                "não foi possível inicializar o emitter do pubsub: {}",
2936                e
2937            ))
2938        })?;
2939
2940    // Usa a factory fornecida para criar o canal direto
2941    let channel = factory(Arc::new(emitter), Some((*options).clone()))
2942        .await
2943        .map_err(|e| GuardianError::Other(format!("Falha ao criar canal direto: {}", e)))?;
2944
2945    tracing::debug!("Canal direto criado com sucesso usando factory fornecida");
2946    Ok(channel)
2947}
2948
2949/// Implementação do Drop trait para garantir cleanup seguro do GuardianDB
2950impl Drop for GuardianDB {
2951    fn drop(&mut self) {
2952        // Abort da task do monitor para evitar acesso a memória já liberada
2953        self._monitor_handle.abort();
2954
2955        // Cancela o token para sinalizar a todas as tasks que devem parar
2956        self.cancellation_token.cancel();
2957
2958        // Não podemos usar async no Drop, então apenas fazemos abort e cancel
2959        // O resto do cleanup será feito pelos destructors automáticos dos Arcs
2960    }
2961}
2962
2963/// Implementação da trait BaseGuardianDB para GuardianDB
2964#[async_trait::async_trait]
2965impl BaseGuardianDB for GuardianDB {
2966    type Error = GuardianError;
2967
2968    fn ipfs(&self) -> Arc<IpfsClient> {
2969        Arc::new(self.ipfs.clone())
2970    }
2971
2972    fn identity(&self) -> Arc<Identity> {
2973        // Cria um clone do Arc<Identity> a partir do RwLock
2974        let identity_guard = self.identity.read();
2975        Arc::new(identity_guard.clone())
2976    }
2977
2978    async fn open(
2979        &self,
2980        address: &str,
2981        options: &mut CreateDBOptions,
2982    ) -> std::result::Result<Arc<dyn Store<Error = GuardianError>>, Self::Error> {
2983        // Cria uma cópia das opções para usar com o método interno
2984        let options_copy = CreateDBOptions {
2985            event_bus: options.event_bus.clone(),
2986            directory: options.directory.clone(),
2987            overwrite: options.overwrite,
2988            local_only: options.local_only,
2989            create: options.create,
2990            store_type: options.store_type.clone(),
2991            access_controller_address: options.access_controller_address.clone(),
2992            access_controller: None, // Será resolvido internamente se necessário
2993            replicate: options.replicate,
2994            keystore: options.keystore.clone(),
2995            cache: options.cache.clone(),
2996            identity: options.identity.clone(),
2997            sort_fn: options.sort_fn,
2998            timeout: options.timeout,
2999            message_marshaler: options.message_marshaler.clone(),
3000            span: options.span.clone(),
3001            close_func: None,
3002            store_specific_opts: None,
3003        };
3004
3005        // Chama o método open interno do GuardianDB
3006        let arc_store = GuardianDB::open(self, address, options_copy).await?;
3007
3008        // Converte Arc<GuardianStore> para Arc<dyn Store>
3009        let store_dyn: Arc<dyn Store<Error = GuardianError>> =
3010            arc_store as Arc<dyn Store<Error = GuardianError>>;
3011
3012        Ok(store_dyn)
3013    }
3014
3015    fn get_store(&self, address: &str) -> Option<Arc<dyn Store<Error = GuardianError>>> {
3016        // Usa o método get_store interno do GuardianDB
3017        if let Some(arc_store) = GuardianDB::get_store(self, address) {
3018            // Converte Arc<GuardianStore> para Arc<dyn Store>
3019            let store_dyn: Arc<dyn Store<Error = GuardianError>> =
3020                arc_store as Arc<dyn Store<Error = GuardianError>>;
3021            Some(store_dyn)
3022        } else {
3023            None
3024        }
3025    }
3026
3027    async fn create(
3028        &self,
3029        name: &str,
3030        store_type: &str,
3031        options: &mut CreateDBOptions,
3032    ) -> std::result::Result<Arc<dyn Store<Error = GuardianError>>, Self::Error> {
3033        // Cria uma cópia das opções para usar com o método interno
3034        let options_copy = CreateDBOptions {
3035            event_bus: options.event_bus.clone(),
3036            directory: options.directory.clone(),
3037            overwrite: options.overwrite,
3038            local_only: options.local_only,
3039            create: options.create,
3040            store_type: Some(store_type.to_string()),
3041            access_controller_address: options.access_controller_address.clone(),
3042            access_controller: None,
3043            replicate: options.replicate,
3044            keystore: options.keystore.clone(),
3045            cache: options.cache.clone(),
3046            identity: options.identity.clone(),
3047            sort_fn: options.sort_fn,
3048            timeout: options.timeout,
3049            message_marshaler: options.message_marshaler.clone(),
3050            span: options.span.clone(),
3051            close_func: None,
3052            store_specific_opts: None,
3053        };
3054
3055        // Chama o método create interno do GuardianDB
3056        let arc_store = GuardianDB::create(self, name, store_type, Some(options_copy)).await?;
3057
3058        // Converte Arc<GuardianStore> para Arc<dyn Store>
3059        let store_dyn: Arc<dyn Store<Error = GuardianError>> =
3060            arc_store as Arc<dyn Store<Error = GuardianError>>;
3061
3062        Ok(store_dyn)
3063    }
3064
3065    async fn determine_address(
3066        &self,
3067        name: &str,
3068        store_type: &str,
3069        options: &DetermineAddressOptions,
3070    ) -> std::result::Result<Box<dyn Address>, Self::Error> {
3071        // Usa o método determine_address interno do GuardianDB
3072        let guardian_address =
3073            GuardianDB::determine_address(self, name, store_type, Some(options.clone())).await?;
3074
3075        // Converte GuardianDBAddress para Box<dyn Address>
3076        let boxed_address: Box<dyn Address> = Box::new(guardian_address);
3077
3078        Ok(boxed_address)
3079    }
3080
3081    fn register_store_type(&mut self, store_type: &str, constructor: StoreConstructor) {
3082        // Usa o método register_store_type existente (evita recursão chamando método interno)
3083        let mut types = self.store_types.write();
3084        types.insert(store_type.to_string(), constructor);
3085        tracing::debug!("Registered store type: {}", store_type);
3086    }
3087
3088    fn unregister_store_type(&mut self, store_type: &str) {
3089        // Usa o método unregister_store_type existente (evita recursão chamando método interno)
3090        let mut types = self.store_types.write();
3091        types.remove(store_type);
3092        tracing::debug!("Unregistered store type: {}", store_type);
3093    }
3094
3095    fn register_access_controller_type(
3096        &mut self,
3097        constructor: AccessControllerConstructor,
3098    ) -> std::result::Result<(), Self::Error> {
3099        // Registra com tipo padrão "default" para evitar recursão
3100        let mut types = self.access_controller_types.write();
3101        types.insert("default".to_string(), constructor);
3102        tracing::debug!("Registered access controller type: default");
3103        Ok(())
3104    }
3105
3106    fn unregister_access_controller_type(&mut self, controller_type: &str) {
3107        // Usa método interno para evitar recursão
3108        let mut types = self.access_controller_types.write();
3109        types.remove(controller_type);
3110        tracing::debug!("Unregistered access controller type: {}", controller_type);
3111    }
3112
3113    fn get_access_controller_type(
3114        &self,
3115        controller_type: &str,
3116    ) -> Option<AccessControllerConstructor> {
3117        // Usa acesso direto para evitar recursão
3118        let types = self.access_controller_types.read();
3119        types.get(controller_type).cloned()
3120    }
3121
3122    fn event_bus(&self) -> EventBus {
3123        (*self.event_bus).clone()
3124    }
3125
3126    fn span(&self) -> &tracing::Span {
3127        &self.span
3128    }
3129
3130    fn tracer(&self) -> Arc<TracerWrapper> {
3131        Arc::new(TracerWrapper::OpenTelemetry(self.tracer.clone()))
3132    }
3133}