1use crate::address::{Address, GuardianDBAddress};
2use crate::cache::level_down::LevelDownCache;
3use crate::db_manifest;
4use crate::error::{GuardianError, Result};
5use crate::ipfs_core_api::{backends::IrohBackend, client::IpfsClient, config::ClientConfig};
6use crate::ipfs_log::identity::{Identity, Signatures};
7pub use crate::ipfs_log::identity_provider::Keystore;
8use crate::keystore::SledKeystore;
9use crate::p2p::events::Emitter;
10pub use crate::p2p::events::EventBus;
11pub use crate::p2p::events::EventBus as EventBusImpl;
12use crate::traits::{
13 AccessControllerConstructor, BaseGuardianDB, CreateDBOptions, DetermineAddressOptions,
14 DirectChannel, DirectChannelFactory, DirectChannelOptions, EventPubSubPayload,
15 MessageExchangeHeads, MessageMarshaler, PubSubInterface, Store, StoreConstructor,
16 TracerWrapper,
17};
18use hex;
19use libp2p::PeerId;
20use opentelemetry::global::BoxedTracer;
21use parking_lot::RwLock;
22use rand::RngCore;
23use secp256k1;
24use std::collections::HashMap;
25use std::path::{Path, PathBuf};
26use std::pin::Pin;
27use std::sync::Arc;
28use tokio::task::JoinHandle;
29use tokio_util::sync::CancellationToken;
30use tracing::Span;
31
32type CloseKeystoreFn = Arc<RwLock<Option<Box<dyn Fn() -> Result<()> + Send + Sync>>>>;
34type GuardianStore = dyn Store<Error = GuardianError> + Send + Sync;
36
37#[derive(Default)]
39pub struct NewGuardianDBOptions {
40 pub id: Option<String>,
41 pub peer_id: Option<PeerId>,
42 pub directory: Option<PathBuf>,
43 pub keystore: Option<Box<dyn Keystore + Send + Sync>>,
44 pub cache: Option<Arc<LevelDownCache>>,
45 pub identity: Option<Identity>,
46 pub close_keystore: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
47 pub tracer: Option<Arc<BoxedTracer>>,
48 pub direct_channel_factory: Option<DirectChannelFactory>,
49 pub pubsub: Option<Box<dyn PubSubInterface<Error = GuardianError>>>,
50 pub message_marshaler: Option<Box<dyn MessageMarshaler<Error = GuardianError>>>,
51 pub event_bus: Option<Arc<EventBusImpl>>,
52}
53
54pub struct GuardianDB {
55 ipfs: IpfsClient,
56 identity: Arc<RwLock<Identity>>,
57 id: Arc<RwLock<PeerId>>,
58 keystore: Arc<RwLock<Option<Box<dyn Keystore + Send + Sync>>>>,
59 close_keystore: CloseKeystoreFn,
60 tracer: Arc<BoxedTracer>,
61 span: Span,
62 stores: Arc<RwLock<HashMap<String, Arc<GuardianStore>>>>,
63 #[allow(dead_code)]
64 direct_channel: Arc<dyn DirectChannel<Error = GuardianError> + Send + Sync>,
65 access_controller_types: Arc<RwLock<HashMap<String, AccessControllerConstructor>>>,
66 store_types: Arc<RwLock<HashMap<String, StoreConstructor>>>,
67 directory: PathBuf,
68 cache: Arc<RwLock<Arc<LevelDownCache>>>,
69 #[allow(dead_code)]
70 pubsub: Option<Box<dyn PubSubInterface<Error = GuardianError>>>,
71 event_bus: Arc<EventBusImpl>,
72 #[allow(dead_code)]
73 message_marshaler: Arc<dyn MessageMarshaler<Error = GuardianError> + Send + Sync>,
74 _monitor_handle: JoinHandle<()>, cancellation_token: CancellationToken,
76 emitters: Arc<Emitters>,
77}
78
79#[derive(Clone)]
80pub struct EventExchangeHeads {
81 pub peer: PeerId,
82 pub message: MessageExchangeHeads,
83}
84
85#[derive(Clone)]
87pub struct EventGuardianDBReady {
88 pub address: String,
89 pub db_type: String,
90}
91
92#[derive(Clone)]
93pub struct EventPeerConnected {
94 pub peer_id: String,
95 pub address: String,
96}
97
98#[derive(Clone)]
99pub struct EventPeerDisconnected {
100 pub peer_id: String,
101 pub address: String,
102}
103
104#[derive(Clone)]
105pub struct EventDatabaseCreated {
106 pub address: String,
107 pub name: String,
108 pub db_type: String,
109}
110
111#[derive(Clone)]
112pub struct EventDatabaseDropped {
113 pub address: String,
114 pub name: String,
115}
116
117#[derive(Clone)]
119pub struct EventStoreUpdated {
120 pub store_address: String,
121 pub store_type: String,
122 pub entries_added: usize,
123 pub total_entries: usize,
124 pub timestamp: chrono::DateTime<chrono::Utc>,
125}
126
127#[derive(Clone)]
128pub struct EventSyncCompleted {
129 pub store_address: String,
130 pub peer_id: String,
131 pub heads_synced: usize,
132 pub duration_ms: u64,
133 pub success: bool,
134 pub timestamp: chrono::DateTime<chrono::Utc>,
135}
136
137#[derive(Clone)]
138pub struct EventNewEntries {
139 pub store_address: String,
140 pub entries: Vec<crate::ipfs_log::entry::Entry>,
141 pub total_entries: usize,
142 pub timestamp: chrono::DateTime<chrono::Utc>,
143}
144
145#[derive(Clone)]
146pub struct EventSyncError {
147 pub store_address: String,
148 pub peer_id: String,
149 pub error_message: String,
150 pub heads_count: usize,
151 pub error_type: SyncErrorType,
152 pub timestamp: chrono::DateTime<chrono::Utc>,
153}
154
155#[derive(Clone, Debug)]
156pub enum SyncErrorType {
157 PermissionDenied,
158 NetworkError,
159 ValidationError,
160 StoreError,
161 UnknownError,
162}
163
164#[derive(Clone)]
165pub struct EventPermissionDenied {
166 pub store_address: String,
167 pub identity_id: String,
168 pub identity_pubkey: String,
169 pub required_permission: String,
170 pub timestamp: chrono::DateTime<chrono::Utc>,
171}
172
173pub struct Emitters {
174 pub ready: Emitter<EventGuardianDBReady>,
175 pub peer_connected: Emitter<EventPeerConnected>,
176 pub peer_disconnected: Emitter<EventPeerDisconnected>,
177 pub new_heads: Emitter<EventExchangeHeads>,
178 pub database_created: Emitter<EventDatabaseCreated>,
179 pub database_dropped: Emitter<EventDatabaseDropped>,
180 pub store_updated: Emitter<EventStoreUpdated>,
182 pub sync_completed: Emitter<EventSyncCompleted>,
183 pub new_entries: Emitter<EventNewEntries>,
184 pub sync_error: Emitter<EventSyncError>,
186 pub permission_denied: Emitter<EventPermissionDenied>,
187}
188
189impl Emitters {
190 pub async fn generate_emitters(event_bus: &EventBusImpl) -> Result<Self> {
192 Ok(Self {
193 ready: event_bus.emitter().await?,
194 peer_connected: event_bus.emitter().await?,
195 peer_disconnected: event_bus.emitter().await?,
196 new_heads: event_bus.emitter().await?,
197 database_created: event_bus.emitter().await?,
198 database_dropped: event_bus.emitter().await?,
199 store_updated: event_bus.emitter().await?,
201 sync_completed: event_bus.emitter().await?,
202 new_entries: event_bus.emitter().await?,
203 sync_error: event_bus.emitter().await?,
205 permission_denied: event_bus.emitter().await?,
206 })
207 }
208}
209
210impl EventExchangeHeads {
211 pub fn new(p: PeerId, msg: MessageExchangeHeads) -> Self {
213 Self {
214 peer: p,
215 message: msg,
216 }
217 }
218}
219
220impl EventStoreUpdated {
221 pub fn new(
222 store_address: String,
223 store_type: String,
224 entries_added: usize,
225 total_entries: usize,
226 ) -> Self {
227 Self {
228 store_address,
229 store_type,
230 entries_added,
231 total_entries,
232 timestamp: chrono::Utc::now(),
233 }
234 }
235}
236
237impl EventSyncCompleted {
238 pub fn new(
239 store_address: String,
240 peer_id: String,
241 heads_synced: usize,
242 duration_ms: u64,
243 success: bool,
244 ) -> Self {
245 Self {
246 store_address,
247 peer_id,
248 heads_synced,
249 duration_ms,
250 success,
251 timestamp: chrono::Utc::now(),
252 }
253 }
254}
255
256impl EventNewEntries {
257 pub fn new(
258 store_address: String,
259 entries: Vec<crate::ipfs_log::entry::Entry>,
260 total_entries: usize,
261 ) -> Self {
262 Self {
263 store_address,
264 entries,
265 total_entries,
266 timestamp: chrono::Utc::now(),
267 }
268 }
269}
270
271impl EventSyncError {
272 pub fn new(
273 store_address: String,
274 peer_id: String,
275 error_message: String,
276 heads_count: usize,
277 error_type: SyncErrorType,
278 ) -> Self {
279 Self {
280 store_address,
281 peer_id,
282 error_message,
283 heads_count,
284 error_type,
285 timestamp: chrono::Utc::now(),
286 }
287 }
288}
289
290impl EventPermissionDenied {
291 pub fn new(
292 store_address: String,
293 identity_id: String,
294 identity_pubkey: String,
295 required_permission: String,
296 ) -> Self {
297 Self {
298 store_address,
299 identity_id,
300 identity_pubkey,
301 required_permission,
302 timestamp: chrono::Utc::now(),
303 }
304 }
305}
306
307impl GuardianDB {
308 pub async fn new(
310 ipfs_config: Option<ClientConfig>,
311 options: Option<NewGuardianDBOptions>,
312 ) -> Result<Self> {
313 let mut options = options.unwrap_or_default();
314
315 let config = ipfs_config.unwrap_or_default();
317
318 let peer_id = options.peer_id.unwrap_or_else(PeerId::random);
320
321 let iroh_backend = Arc::new(IrohBackend::new(&config).await?);
323 let ipfs_client = IpfsClient::new_with_backend(iroh_backend).await?;
324
325 let default_dir = PathBuf::from("./GuardianDB").join(peer_id.to_string());
327 let directory = options.directory.as_ref().unwrap_or(&default_dir);
328
329 if options.keystore.is_none() {
332 let sled_path = if directory.to_string_lossy() == "./GuardianDB/in-memory" {
334 None
335 } else {
336 Some(directory.join(peer_id.to_string()).join("keystore"))
337 };
338
339 let keystore = SledKeystore::new(sled_path)
341 .map_err(|e| GuardianError::Other(format!("Falha ao criar o keystore: {}", e)))?;
342
343 let keystore_clone = keystore.clone();
345 options.close_keystore = Some(Box::new(move || {
346 tokio::task::block_in_place(|| {
347 tokio::runtime::Handle::current()
348 .block_on(async { keystore_clone.close().await })
349 })
350 }));
351
352 options.keystore = Some(Box::new(keystore));
354 }
355
356 let identity = if let Some(identity) = options.identity {
358 identity
359 } else {
360 let id = options
361 .id
362 .as_deref()
363 .unwrap_or(&peer_id.to_string())
364 .to_string();
365 let _keystore = options.keystore.as_ref().ok_or_else(|| {
366 GuardianError::Other("Keystore é necessário para criar uma identidade".to_string())
367 })?;
368
369 use crate::ipfs_log::identity::Identity;
371
372 let mut rng = rand::rng();
374 let mut secret_bytes = [0u8; 32];
375 rng.fill_bytes(&mut secret_bytes);
376
377 let pub_key_hex = hex::encode(secret_bytes);
379
380 let signatures = Signatures::new(
382 &format!("id_sig_{}", id),
383 &format!("pk_sig_{}", pub_key_hex),
384 );
385
386 Identity::new(&id, &pub_key_hex, signatures)
387 };
388
389 options.identity = Some(identity.clone());
390
391 Self::new_guardian_db(ipfs_client, identity, Some(options)).await
393 }
394
395 pub async fn new_guardian_db(
397 ipfs: IpfsClient,
398 identity: Identity,
399 options: Option<NewGuardianDBOptions>,
400 ) -> Result<Self> {
401 let options = options.unwrap_or_default();
403
404 let tracer = options.tracer.unwrap_or_else(|| {
406 Arc::new(BoxedTracer::new(Box::new(
408 opentelemetry::trace::noop::NoopTracer::new(),
409 )))
410 });
411
412 let span = tracing::info_span!("guardian_db", peer_id = %identity.id());
414 let event_bus = Arc::new(EventBusImpl::new());
416
417 let direct_channel_factory = options.direct_channel_factory.unwrap_or_else(|| {
419 let factory_span = tracing::info_span!("direct_channel_factory");
420 let _enter = factory_span.enter();
421 let temp_span = tracing::Span::none();
423 crate::p2p::pubsub::direct_channel::init_direct_channel_factory(
424 temp_span,
425 PeerId::random(),
426 )
427 });
428 let cancellation_token = CancellationToken::new();
429
430 let emitters = Emitters::generate_emitters(&event_bus).await.map_err(|e| {
432 GuardianError::Other(format!("Falha ao gerar emitters do EventBus: {}", e))
433 })?;
434
435 let direct_channel = make_direct_channel(
438 &event_bus,
439 direct_channel_factory,
440 &DirectChannelOptions::default(),
441 )
442 .await?;
443
444 let message_marshaler_arc: Arc<dyn MessageMarshaler<Error = GuardianError> + Send + Sync> =
445 match options.message_marshaler {
446 Some(boxed_marshaler) => {
447 unsafe {
449 Arc::from_raw(Box::into_raw(boxed_marshaler)
450 as *const (dyn MessageMarshaler<Error = GuardianError> + Send + Sync))
451 }
452 }
453 None => {
454 Arc::new(crate::message_marshaler::GuardianJSONMarshaler::new())
456 }
457 };
458 let cache = options.cache.unwrap_or_else(|| {
459 Arc::new(crate::cache::level_down::LevelDownCache::new(None))
461 });
462 let directory = options
463 .directory
464 .unwrap_or_else(|| PathBuf::from("./GuardianDB/in-memory")); let instance = GuardianDB {
468 ipfs,
469 identity: Arc::new(RwLock::new(identity.clone())),
470 id: Arc::new(RwLock::new(
471 PeerId::from_bytes(identity.pub_key.as_bytes())
472 .unwrap_or_else(|_| PeerId::random()),
473 )), pubsub: options.pubsub,
475 cache: Arc::new(RwLock::new(cache)),
476 directory,
477 event_bus: event_bus.clone(),
478 stores: Arc::new(RwLock::new(HashMap::new())),
479 direct_channel,
480 close_keystore: Arc::new(RwLock::new(options.close_keystore)),
481 keystore: Arc::new(RwLock::new(options.keystore)),
482 store_types: Arc::new(RwLock::new(HashMap::new())),
483 access_controller_types: Arc::new(RwLock::new(HashMap::new())),
484 tracer,
485 message_marshaler: message_marshaler_arc,
486 cancellation_token: cancellation_token.clone(),
487 emitters: Arc::new(emitters),
488 _monitor_handle: Self::start_monitor_task(
490 event_bus.clone(),
491 cancellation_token.clone(),
492 span.clone(),
493 ),
494 span,
495 };
496
497 instance.register_default_store_types();
500
501 tracing::debug!("Configurando emitters do EventBus");
503
504 tracing::debug!("Iniciando monitor do canal direto");
506
507 let ready_event = EventGuardianDBReady {
509 address: format!("/GuardianDB/{}", instance.peer_id()),
510 db_type: "GuardianDB".to_string(),
511 };
512
513 if let Err(e) = instance.emitters.ready.emit(ready_event) {
514 tracing::warn!("Falha ao emitir evento GuardianDB ready: {}", e);
515 } else {
516 tracing::debug!("Evento GuardianDB ready emitido com sucesso");
517 }
518
519 Ok(instance)
520 }
521
522 pub fn tracer(&self) -> Arc<BoxedTracer> {
524 self.tracer.clone()
525 }
526
527 pub fn span(&self) -> &Span {
529 &self.span
530 }
531
532 pub fn ipfs(&self) -> &IpfsClient {
534 &self.ipfs
535 }
536
537 pub fn identity(&self) -> Identity {
540 self.identity.read().clone()
541 }
542
543 pub fn peer_id(&self) -> PeerId {
546 *self.id.read()
547 }
548
549 pub fn keystore(&self) -> Arc<RwLock<Option<Box<dyn Keystore + Send + Sync>>>> {
552 self.keystore.clone()
553 }
554
555 pub fn close_keystore(&self) -> Option<Box<dyn Fn() -> Result<()> + Send + Sync>> {
569 let guard = self.close_keystore.read();
571 if guard.is_some() {
573 let close_keystore_clone = self.close_keystore.clone();
575 Some(Box::new(move || {
578 let guard = close_keystore_clone.read();
579 if let Some(close_fn) = guard.as_ref() {
580 close_fn() } else {
582 Ok(()) }
584 }))
585 } else {
586 None }
588 }
589
590 pub fn set_store(&self, address: String, store: Arc<GuardianStore>) {
593 self.stores.write().insert(address, store);
594 }
595
596 pub fn delete_store(&self, address: &str) {
599 self.stores.write().remove(address);
600 }
601
602 pub fn get_store(&self, address: &str) -> Option<Arc<GuardianStore>> {
605 self.stores.read().get(address).cloned()
606 }
607
608 pub async fn close_all_stores(&self) {
612 let stores_to_close: Vec<Arc<GuardianStore>> =
613 self.stores.read().values().cloned().collect();
614
615 tracing::debug!(
616 store_count = stores_to_close.len(),
617 "Iniciando fechamento de stores"
618 );
619
620 for (index, store) in stores_to_close.iter().enumerate() {
621 tracing::debug!(
622 store_index = index + 1,
623 total_stores = stores_to_close.len(),
624 store_type = store.store_type(),
625 address = %store.address(),
626 "Fechando store"
627 );
628
629 match store.close().await {
630 Ok(()) => {
631 tracing::debug!(
632 store_type = store.store_type(),
633 address = %store.address(),
634 "Store fechada com sucesso"
635 );
636 }
637 Err(e) => {
638 tracing::error!(
639 store_type = store.store_type(),
640 address = %store.address(),
641 error = %e,
642 "Erro ao fechar store"
643 );
644 }
646 }
647 }
648
649 self.stores.write().clear();
651 tracing::debug!(
652 stores_count = stores_to_close.len(),
653 "Todas as stores foram processadas e removidas do mapa"
654 );
655 }
656
657 pub fn close_cache(&self) {
660 tracing::debug!("Iniciando fechamento do cache");
661
662 let cache_guard = self.cache.write();
664
665 match cache_guard.close_internal() {
667 Ok(()) => {
668 tracing::debug!("Cache fechado com sucesso");
669 }
670 Err(e) => {
671 tracing::error!(error = %e, "Erro ao fechar cache");
672 }
673 }
674
675 }
677
678 pub async fn close_direct_connections(&self) {
680 tracing::debug!("Iniciando fechamento do canal direto");
681
682 match self.direct_channel.close_shared().await {
683 Ok(()) => {
684 tracing::debug!("Canal direto fechado com sucesso");
685 }
686 Err(e) => {
687 tracing::error!(
688 error = %e,
689 "Erro ao fechar canal direto"
690 );
691 }
692 }
693 }
694
695 pub fn close_key_store(&self) {
698 let guard = self.close_keystore.write();
699 if let Some(close_fn) = guard.as_ref()
700 && let Err(e) = close_fn()
701 {
702 tracing::error!(error = %e, "não foi possível fechar o keystore");
703 }
704 }
705
706 pub fn get_access_controller_type(
709 &self,
710 controller_type: &str,
711 ) -> Option<AccessControllerConstructor> {
712 tracing::debug!(
713 controller_type = controller_type,
714 "Buscando construtor de AccessController"
715 );
716
717 let access_controllers = self.access_controller_types.read();
718
719 match access_controllers.get(controller_type) {
720 Some(constructor) => {
721 tracing::debug!(
722 controller_type = controller_type,
723 "Construtor de AccessController encontrado"
724 );
725 Some(constructor.clone())
726 }
727 None => {
728 tracing::debug!(
729 controller_type = controller_type,
730 available_types = ?access_controllers.keys().collect::<Vec<_>>(),
731 "Construtor de AccessController não encontrado"
732 );
733 None
734 }
735 }
736 }
737
738 pub fn access_controller_types_names(&self) -> Vec<String> {
741 self.access_controller_types
742 .read()
743 .keys()
744 .cloned()
745 .collect()
746 }
747
748 pub fn unregister_access_controller_type(&self, controller_type: &str) {
751 self.access_controller_types.write().remove(controller_type);
752 }
753
754 pub fn register_access_controller_type_with_name(
760 &self,
761 controller_type: &str,
762 constructor: AccessControllerConstructor,
763 ) -> Result<()> {
764 tracing::debug!(
765 controller_type = %controller_type,
766 "Registrando novo tipo de AccessController"
767 );
768
769 if controller_type.is_empty() {
771 return Err(GuardianError::InvalidArgument(
772 "O tipo do controller não pode ser uma string vazia".to_string(),
773 ));
774 }
775
776 if controller_type.len() > 100 {
777 return Err(GuardianError::InvalidArgument(
778 "O tipo do controller é muito longo (máximo 100 caracteres)".to_string(),
779 ));
780 }
781
782 let valid_types = ["simple", "guardian", "ipfs"];
784 if !valid_types.contains(&controller_type) {
785 tracing::warn!(
786 controller_type = %controller_type,
787 valid_types = ?valid_types,
788 "Tipo de AccessController não reconhecido - registrando mesmo assim"
789 );
790 }
791
792 {
794 let existing_types = self.access_controller_types.read();
795 if existing_types.contains_key(controller_type) {
796 tracing::warn!(
797 controller_type = %controller_type,
798 "AccessController já registrado - sobrescrevendo"
799 );
800 } else {
801 tracing::debug!(
802 controller_type = %controller_type,
803 "Novo tipo de AccessController sendo registrado"
804 );
805 }
806 }
807
808 self.access_controller_types
810 .write()
811 .insert(controller_type.to_string(), constructor);
812
813 tracing::debug!(
814 controller_type = %controller_type,
815 "AccessController registrado com sucesso"
816 );
817
818 Ok(())
819 }
820
821 pub async fn register_access_controller_type(
823 &self,
824 constructor: AccessControllerConstructor,
825 ) -> Result<()> {
826 tracing::debug!("Usando registro legado com tipo padrão 'simple'");
827 self.register_access_controller_type_with_name("simple", constructor)
828 }
829
830 pub fn register_store_type(&self, store_type: String, constructor: StoreConstructor) {
831 self.store_types.write().insert(store_type, constructor);
832 }
833
834 pub fn unregister_store_type(&self, store_type: &str) {
836 self.store_types.write().remove(store_type);
837 }
838
839 pub fn store_types_names(&self) -> Vec<String> {
841 self.store_types.read().keys().cloned().collect()
842 }
843
844 pub fn get_store_constructor(&self, store_type: &str) -> Option<StoreConstructor> {
847 tracing::debug!(store_type = store_type, "Buscando construtor de Store");
848
849 let store_constructors = self.store_types.read();
850
851 match store_constructors.get(store_type) {
852 Some(constructor) => {
853 tracing::debug!(store_type = store_type, "Construtor de Store encontrado");
854 Some(constructor.clone())
855 }
856 None => {
857 tracing::debug!(
858 store_type = store_type,
859 available_types = ?store_constructors.keys().collect::<Vec<_>>(),
860 "Construtor de Store não encontrado"
861 );
862 None
863 }
864 }
865 }
866
867 pub async fn close(&self) -> Result<()> {
869 let _entered = self.span.enter();
870 tracing::debug!("Iniciando fechamento do GuardianDB");
871
872 tracing::debug!("Fechando todas as stores");
874 self.close_all_stores().await;
875
876 tracing::debug!("Fechando conexões diretas");
878 self.close_direct_connections().await;
879
880 tracing::debug!("Fechando cache");
882 self.close_cache();
883
884 tracing::debug!("Fechando keystore");
886 self.close_key_store();
887
888 tracing::debug!("Emitters serão fechados automaticamente com o EventBus");
892
893 tracing::debug!("Cancelando tarefas em background");
895 self.cancellation_token.cancel();
896
897 tracing::debug!("Abortando task do monitor do canal direto");
899 self._monitor_handle.abort();
900
901 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
903
904 tracing::debug!("GuardianDB fechado com sucesso");
905 Ok(())
906 }
907
908 pub async fn create(
910 &self,
911 name: &str,
912 store_type: &str,
913 options: Option<CreateDBOptions>,
914 ) -> Result<Arc<GuardianStore>> {
915 let _entered = self.span.enter();
916 tracing::debug!("Create()");
917 let options = options.unwrap_or_default();
918
919 let directory = options
921 .directory
922 .clone()
923 .unwrap_or_else(|| self.directory.to_string_lossy().to_string());
924 let mut options = options;
925 options.directory = Some(directory.clone());
926
927 tracing::debug!(
928 name = name,
929 store_type = store_type,
930 directory = %directory,
931 "Criando banco de dados"
932 );
933
934 let determine_opts = crate::traits::DetermineAddressOptions {
936 only_hash: None,
937 replicate: None,
938 access_controller:
939 crate::access_controller::manifest::CreateAccessControllerOptions::new_empty(),
940 };
941 let db_address = self
942 .determine_address(name, store_type, Some(determine_opts))
943 .await?;
944
945 let directory_path = PathBuf::from(&directory);
947 self.load_cache(directory_path.as_path(), &db_address)
948 .await?;
949
950 let have_db = self.have_local_data(&db_address).await;
952
953 if have_db && !options.overwrite.unwrap_or(false) {
954 return Err(GuardianError::DatabaseAlreadyExists(db_address.to_string()));
955 }
956
957 self.add_manifest_to_cache(&directory_path, &db_address)
959 .await
960 .map_err(|e| {
961 GuardianError::Other(format!(
962 "não foi possível adicionar o manifesto ao cache: {}",
963 e
964 ))
965 })?;
966
967 tracing::debug!(
968 address = %db_address,
969 "Banco de dados criado"
970 );
971
972 self.open(&db_address.to_string(), options).await
974 }
975
976 pub async fn open(
978 &self,
979 db_address: &str,
980 options: CreateDBOptions,
981 ) -> Result<Arc<GuardianStore>> {
982 let _entered = self.span.enter();
983 tracing::debug!(address = db_address, "abrindo store GuardianDB");
984 let mut options = options;
985
986 let directory = options
987 .directory
988 .clone()
989 .unwrap_or_else(|| self.directory.to_string_lossy().to_string());
990
991 if crate::address::is_valid(db_address).is_err() {
993 tracing::warn!(address = db_address, "open: Endereço GuardianDB inválido");
994 if !options.create.unwrap_or(false) {
995 return Err(GuardianError::InvalidArgument("'options.create' definido como 'false'. Se você quer criar um banco de dados, defina como 'true'".to_string()));
996 }
997 let store_type = options.store_type.as_deref().unwrap_or("");
998 if store_type.is_empty() {
999 let available_types = self.store_types_names();
1000 let types_list = if available_types.is_empty() {
1001 "Nenhum tipo de store registrado".to_string()
1002 } else {
1003 format!("Tipos disponíveis: {}", available_types.join(", "))
1004 };
1005 return Err(GuardianError::InvalidArgument(format!(
1006 "Tipo de banco de dados não fornecido! Forneça um tipo com 'options.store_type'. {}",
1007 types_list
1008 )));
1009 }
1010
1011 options.overwrite = Some(true);
1012 let new_options = CreateDBOptions {
1014 overwrite: Some(true),
1015 create: Some(true),
1016 store_type: Some(store_type.to_string()),
1017 ..Default::default()
1018 };
1019
1020 return Box::pin(self.create(db_address, store_type, Some(new_options))).await;
1022 }
1023
1024 let parsed_address = crate::address::parse(db_address)
1025 .map_err(|e| GuardianError::Other(format!("Erro ao fazer parse do endereço: {}", e)))?;
1026
1027 let directory_path = PathBuf::from(&directory);
1028 self.load_cache(directory_path.as_path(), &parsed_address)
1029 .await?;
1030
1031 if options.local_only.unwrap_or(false) && !self.have_local_data(&parsed_address).await {
1032 return Err(GuardianError::NotFound(format!(
1033 "O banco de dados não existe localmente: {}",
1034 db_address
1035 )));
1036 }
1037
1038 let manifest_type = if self.have_local_data(&parsed_address).await {
1040 tracing::debug!("Dados encontrados localmente, tentando ler do cache antes do IPFS");
1042
1043 let _cache_key = format!("{}/_manifest", parsed_address);
1045
1046 let cache_result = {
1048 let cache = self.cache.read();
1049 let directory_str = directory_path.to_string_lossy();
1050
1051 match cache.load_internal(&directory_str, &parsed_address as &dyn Address) {
1053 Ok(wrapped_cache) => {
1054 let manifest_key = format!("{}/_manifest", parsed_address);
1056
1057 tracing::debug!(
1058 key = %manifest_key,
1059 cache_loaded = true,
1060 "Verificando manifesto no cache"
1061 );
1062
1063 let mut ctx: Box<dyn std::any::Any> = Box::new(());
1065 let key = crate::data_store::Key::new(&manifest_key);
1066
1067 match wrapped_cache.get(ctx.as_mut(), &key) {
1069 Ok(manifest_data) => {
1070 tracing::debug!(
1071 key = %manifest_key,
1072 data_size = manifest_data.len(),
1073 "Manifesto encontrado no cache"
1074 );
1075
1076 let manifest_type =
1078 String::from_utf8_lossy(&manifest_data).to_string();
1079
1080 if self.get_store_constructor(&manifest_type).is_some() {
1082 tracing::debug!(
1083 manifest_type = %manifest_type,
1084 "Manifesto válido encontrado no cache"
1085 );
1086 Some(manifest_data)
1087 } else {
1088 tracing::warn!(
1089 manifest_type = %manifest_type,
1090 available_types = ?self.store_types_names(),
1091 "Tipo de manifesto no cache não está registrado"
1092 );
1093 None
1094 }
1095 }
1096 Err(e) => {
1097 tracing::debug!(
1098 key = %manifest_key,
1099 error = %e,
1100 "Manifesto não encontrado no cache"
1101 );
1102 None
1103 }
1104 }
1105 }
1106 Err(e) => {
1107 tracing::debug!(
1108 error = %e,
1109 "Falha ao carregar cache, usando IPFS"
1110 );
1111 None
1112 }
1113 }
1114 };
1115
1116 match cache_result {
1117 Some(cached_data) => {
1118 tracing::debug!("Manifesto encontrado no cache local");
1119 String::from_utf8_lossy(&cached_data).to_string()
1121 }
1122 None => {
1123 tracing::debug!("Cache miss, lendo manifesto do IPFS");
1124 let manifest =
1125 db_manifest::read_db_manifest(self.ipfs(), &parsed_address.get_root())
1126 .await
1127 .map_err(|e| {
1128 GuardianError::Other(format!(
1129 "Não foi possível ler o manifesto do IPFS: {}",
1130 e
1131 ))
1132 })?;
1133 manifest.get_type
1134 }
1135 }
1136 } else {
1137 tracing::debug!("Dados não encontrados localmente, lendo manifesto do IPFS");
1139 let manifest = db_manifest::read_db_manifest(self.ipfs(), &parsed_address.get_root())
1140 .await
1141 .map_err(|e| {
1142 GuardianError::Other(format!("Não foi possível ler o manifesto do IPFS: {}", e))
1143 })?;
1144 manifest.get_type
1145 };
1146
1147 tracing::debug!(manifest_type = %manifest_type, "Tipo do banco de dados detectado");
1148 tracing::debug!("Criando instância da store");
1149
1150 self.create_store(&manifest_type, &parsed_address, options)
1151 .await
1152 }
1153
1154 pub async fn determine_address(
1156 &self,
1157 name: &str,
1158 store_type: &str,
1159 options: Option<DetermineAddressOptions>,
1160 ) -> Result<GuardianDBAddress> {
1161 let _options = options.unwrap_or_default();
1162
1163 if self.get_store_constructor(store_type).is_none() {
1165 let available_types = self.store_types_names();
1166 return Err(GuardianError::InvalidArgument(format!(
1167 "Tipo de banco de dados inválido: {}. Tipos disponíveis: {:?}",
1168 store_type, available_types
1169 )));
1170 }
1171
1172 if crate::address::is_valid(name).is_ok() {
1173 return Err(GuardianError::InvalidArgument(
1174 "O nome do banco de dados fornecido já é um endereço válido".to_string(),
1175 ));
1176 }
1177
1178 let _ac_params =
1180 crate::access_controller::manifest::CreateAccessControllerOptions::new_empty();
1181
1182 let identity_hash = hex::encode(self.identity().pub_key.as_bytes());
1185 let ac_address_string = format!("/ipfs/{}/access_controller/{}", name, &identity_hash[..8]);
1186
1187 tracing::debug!(
1188 address = %ac_address_string,
1189 identity = %&identity_hash[..16],
1190 "Access Controller criado"
1191 );
1192
1193 let manifest_hash =
1195 db_manifest::create_db_manifest(self.ipfs(), name, store_type, &ac_address_string)
1196 .await
1197 .map_err(|e| {
1198 GuardianError::Other(format!(
1199 "não foi possível salvar o manifesto no ipfs: {}",
1200 e
1201 ))
1202 })?;
1203
1204 let addr_string = format!("/GuardianDB/{}/{}", manifest_hash, name);
1206 crate::address::parse(&addr_string)
1207 .map_err(|e| GuardianError::Other(format!("Erro ao fazer parse do endereço: {}", e)))
1208 }
1209
1210 pub async fn load_cache(&self, directory: &Path, db_address: &GuardianDBAddress) -> Result<()> {
1212 let cache = self.cache.read();
1214 let directory_str = directory.to_string_lossy();
1215
1216 tracing::debug!(
1217 address = %db_address,
1218 directory = %directory_str,
1219 "Carregando cache para endereço"
1220 );
1221
1222 let _loaded_cache = cache
1224 .load_internal(&directory_str, db_address)
1225 .map_err(|e| GuardianError::Other(format!("Falha ao carregar cache: {}", e)))?;
1226
1227 tracing::debug!(address = %db_address, "Cache carregado com sucesso");
1228 Ok(())
1229 }
1230
1231 pub async fn have_local_data(&self, db_address: &GuardianDBAddress) -> bool {
1233 let _cache_key = format!("{}/_manifest", db_address);
1234
1235 let cache = self.cache.read();
1237 let directory_str = "./GuardianDB"; match cache.load_internal(directory_str, db_address) {
1241 Ok(wrapped_cache) => {
1242 let manifest_key = format!("{}/_manifest", db_address);
1244
1245 let mut ctx: Box<dyn std::any::Any> = Box::new(());
1247 let key = crate::data_store::Key::new(&manifest_key);
1248
1249 match wrapped_cache.get(ctx.as_mut(), &key) {
1251 Ok(manifest_data) => {
1252 if !manifest_data.is_empty() {
1254 tracing::debug!(
1255 address = %db_address,
1256 manifest_size = manifest_data.len(),
1257 "Dados locais encontrados no cache"
1258 );
1259 true
1260 } else {
1261 tracing::debug!(
1262 address = %db_address,
1263 "Manifesto vazio encontrado no cache"
1264 );
1265 false
1266 }
1267 }
1268 Err(e) => {
1269 tracing::debug!(
1270 address = %db_address,
1271 error = %e,
1272 "Manifesto não encontrado no cache"
1273 );
1274 false
1275 }
1276 }
1277 }
1278 Err(e) => {
1279 tracing::debug!(
1280 address = %db_address,
1281 error = %e,
1282 "Falha ao carregar cache para verificação de dados locais"
1283 );
1284 false
1285 }
1286 }
1287 }
1288
1289 pub async fn add_manifest_to_cache(
1291 &self,
1292 directory: &Path,
1293 db_address: &GuardianDBAddress,
1294 ) -> Result<()> {
1295 let cache_key = format!("{}/_manifest", db_address);
1296 let root_hash_bytes = db_address.get_root().to_string().into_bytes();
1297
1298 let wrapped_cache = {
1300 let cache = self.cache.read();
1301 let directory_str = directory.to_string_lossy();
1302
1303 cache
1305 .load_internal(&directory_str, db_address)
1306 .map_err(|e| GuardianError::Other(format!("Falha ao carregar cache: {}", e)))?
1307 };
1308
1309 let key = crate::data_store::Key::new(&cache_key);
1311
1312 let manifest_data = if let Ok(manifest) =
1315 db_manifest::read_db_manifest(self.ipfs(), &db_address.get_root()).await
1316 {
1317 manifest.get_type.into_bytes()
1319 } else {
1320 root_hash_bytes
1322 };
1323
1324 let mut ctx: Box<dyn std::any::Any + Send + Sync> = Box::new(());
1326
1327 match wrapped_cache.put(ctx.as_mut(), &key, &manifest_data) {
1328 Ok(()) => {
1329 tracing::debug!(
1330 cache_key = %cache_key,
1331 data_size = manifest_data.len(),
1332 address = %db_address,
1333 "Manifesto armazenado no cache com sucesso"
1334 );
1335 }
1336 Err(e) => {
1337 tracing::warn!(
1338 cache_key = %cache_key,
1339 error = %e,
1340 address = %db_address,
1341 "Falha ao armazenar manifesto no cache"
1342 );
1343 }
1345 }
1346
1347 tracing::debug!(
1348 address = %db_address,
1349 directory = %directory.to_string_lossy(),
1350 cache_key = %cache_key,
1351 "Manifesto adicionado ao cache"
1352 );
1353
1354 Ok(())
1355 }
1356
1357 pub async fn create_store(
1360 &self,
1361 store_type: &str,
1362 address: &GuardianDBAddress,
1363 options: CreateDBOptions,
1364 ) -> Result<Arc<GuardianStore>> {
1365 tracing::debug!(
1366 store_type = store_type,
1367 address = %address,
1368 "Criando store"
1369 );
1370
1371 let constructor = self.get_store_constructor(store_type).ok_or_else(|| {
1373 let available_types = self.store_types_names();
1374 GuardianError::InvalidArgument(format!(
1375 "Tipo de store '{}' não registrado. Tipos disponíveis: {:?}",
1376 store_type, available_types
1377 ))
1378 })?;
1379
1380 let new_store_options = self.convert_create_to_store_options(options).await?;
1382
1383 let ipfs_client = Arc::new(self.ipfs().clone());
1385 let identity = Arc::new(self.identity());
1386 let store_address = Box::new(address.clone()) as Box<dyn Address>;
1387
1388 tracing::debug!(
1389 store_type = store_type,
1390 address = %address,
1391 "Executando construtor da store"
1392 );
1393
1394 let store_result =
1396 constructor(ipfs_client, identity, store_address, new_store_options).await;
1397
1398 let store = match store_result {
1399 Ok(store) => store,
1400 Err(e) => {
1401 tracing::error!(
1402 store_type = store_type,
1403 address = %address,
1404 error = %e,
1405 "Falha ao criar store"
1406 );
1407 return Err(e);
1408 }
1409 };
1410
1411 let boxed_store = store as Box<dyn Store<Error = GuardianError> + Send + Sync>;
1413 let arc_store: Arc<GuardianStore> = Arc::from(boxed_store);
1414
1415 self.set_store(address.to_string(), arc_store.clone());
1417
1418 tracing::debug!(
1419 store_type = store_type,
1420 address = %address,
1421 store_type_confirmed = arc_store.store_type(),
1422 "Store criada e registrada com sucesso"
1423 );
1424
1425 Ok(arc_store)
1426 }
1427
1428 async fn convert_create_to_store_options(
1430 &self,
1431 options: CreateDBOptions,
1432 ) -> Result<crate::traits::NewStoreOptions> {
1433 use crate::traits::NewStoreOptions;
1434
1435 tracing::debug!("Convertendo opções para criação de store");
1436
1437 let access_controller = if let Some(manifest_params) = options.access_controller {
1439 tracing::debug!("Convertendo ManifestParams para AccessController");
1440
1441 let controller_type = manifest_params.get_type();
1443
1444 tracing::debug!(
1445 controller_type = %controller_type,
1446 "Criando access controller a partir do manifesto"
1447 );
1448
1449 let permissions = manifest_params.get_all_access();
1451
1452 match controller_type {
1454 "simple" | "" => {
1455 tracing::debug!("Criando SimpleAccessController");
1456
1457 let simple_controller =
1458 crate::access_controller::simple::SimpleAccessController::new(
1459 if permissions.is_empty() {
1460 None
1461 } else {
1462 Some(permissions)
1463 },
1464 );
1465 Some(Arc::new(simple_controller)
1466 as Arc<
1467 dyn crate::access_controller::traits::AccessController,
1468 >)
1469 }
1470 "guardian" => {
1471 tracing::debug!("Criando GuardianAccessController");
1472
1473 let simple_controller =
1475 crate::access_controller::simple::SimpleAccessController::new(
1476 if permissions.is_empty() {
1477 None
1478 } else {
1479 Some(permissions)
1480 },
1481 );
1482 Some(Arc::new(simple_controller)
1483 as Arc<
1484 dyn crate::access_controller::traits::AccessController,
1485 >)
1486 }
1487 "ipfs" => {
1488 tracing::debug!(
1489 "IPFS AccessController não implementado, usando SimpleAccessController"
1490 );
1491
1492 let simple_controller =
1493 crate::access_controller::simple::SimpleAccessController::new(
1494 if permissions.is_empty() {
1495 None
1496 } else {
1497 Some(permissions)
1498 },
1499 );
1500 Some(Arc::new(simple_controller)
1501 as Arc<
1502 dyn crate::access_controller::traits::AccessController,
1503 >)
1504 }
1505 _ => {
1506 tracing::warn!(
1507 controller_type = %controller_type,
1508 "Tipo de access controller não reconhecido, usando SimpleAccessController"
1509 );
1510
1511 let simple_controller =
1512 crate::access_controller::simple::SimpleAccessController::new(
1513 if permissions.is_empty() {
1514 None
1515 } else {
1516 Some(permissions)
1517 },
1518 );
1519 Some(Arc::new(simple_controller)
1520 as Arc<
1521 dyn crate::access_controller::traits::AccessController,
1522 >)
1523 }
1524 }
1525 } else {
1526 tracing::debug!("Nenhum access controller especificado, usando padrão");
1527 None
1528 };
1529
1530 let store_options = NewStoreOptions {
1532 event_bus: None, index: None, access_controller, cache: None, cache_destroy: None,
1537 replication_concurrency: None,
1538 reference_count: None,
1539 replicate: Some(true), max_history: None,
1541 directory: options
1542 .directory
1543 .unwrap_or_else(|| "./GuardianDB".to_string()),
1544 sort_fn: None,
1545 span: None, tracer: None, pubsub: None, message_marshaler: None, peer_id: libp2p::PeerId::random(), direct_channel: None, close_func: None,
1552 store_specific_opts: None,
1553 };
1554
1555 tracing::debug!("Opções convertidas com sucesso");
1556 Ok(store_options)
1557 }
1558
1559 pub async fn register_default_access_controller_types(&self) -> Result<()> {
1561 tracing::debug!("Registrando construtores padrão de access controllers");
1562
1563 let simple_constructor =
1565 Arc::new(
1566 |_base_guardian: Arc<
1567 dyn crate::traits::BaseGuardianDB<Error = crate::error::GuardianError>,
1568 >,
1569 options: &crate::access_controller::manifest::CreateAccessControllerOptions,
1570 _access_controller_options: Option<
1571 Vec<crate::access_controller::traits::Option>,
1572 >| {
1573 let options = options.clone(); Box::pin(async move {
1575 use crate::access_controller::simple::SimpleAccessController;
1576 let access_controller = SimpleAccessController::from_options(options)
1577 .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1578 Ok(Arc::new(access_controller) as Arc<dyn crate::access_controller::traits::AccessController>)
1579 }) as Pin<Box<dyn std::future::Future<Output = crate::error::Result<Arc<dyn crate::access_controller::traits::AccessController>>> + Send>>
1580 },
1581 );
1582
1583 self.register_access_controller_type_with_name("simple", simple_constructor)?;
1585
1586 tracing::debug!(
1587 types = ?self.access_controller_types_names(),
1588 "Construtores padrão de access controllers registrados"
1589 );
1590
1591 Ok(())
1592 }
1593
1594 pub fn register_default_store_types(&self) {
1596 tracing::debug!("Registrando construtores padrão de stores");
1597
1598 let eventlog_constructor = Arc::new(
1600 |ipfs: Arc<crate::ipfs_core_api::client::IpfsClient>,
1601 identity: Arc<crate::ipfs_log::identity::Identity>,
1602 address: Box<dyn crate::address::Address>,
1603 options: crate::traits::NewStoreOptions| {
1604 Box::pin(async move {
1605 use crate::stores::event_log_store::log::GuardianDBEventLogStore;
1606 let arc_address: Arc<dyn crate::address::Address + Send + Sync> =
1608 Arc::from(address as Box<dyn crate::address::Address + Send + Sync>);
1609
1610 let store = GuardianDBEventLogStore::new(ipfs, identity, arc_address, options)
1611 .await
1612 .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1613
1614 Ok(Box::new(store)
1615 as Box<
1616 dyn crate::traits::Store<Error = crate::error::GuardianError>,
1617 >)
1618 })
1619 as Pin<
1620 Box<
1621 dyn std::future::Future<
1622 Output = crate::error::Result<
1623 Box<
1624 dyn crate::traits::Store<
1625 Error = crate::error::GuardianError,
1626 >,
1627 >,
1628 >,
1629 > + Send,
1630 >,
1631 >
1632 },
1633 );
1634 let keyvalue_constructor = Arc::new(
1636 |ipfs: Arc<crate::ipfs_core_api::client::IpfsClient>,
1637 identity: Arc<crate::ipfs_log::identity::Identity>,
1638 address: Box<dyn crate::address::Address>,
1639 options: crate::traits::NewStoreOptions| {
1640 Box::pin(async move {
1641 use crate::stores::kv_store::keyvalue::GuardianDBKeyValue;
1642 let arc_address: Arc<dyn crate::address::Address + Send + Sync> =
1644 Arc::from(address as Box<dyn crate::address::Address + Send + Sync>);
1645
1646 let store = GuardianDBKeyValue::new(ipfs, identity, arc_address, Some(options))
1647 .await
1648 .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1649
1650 Ok(Box::new(store)
1651 as Box<
1652 dyn crate::traits::Store<Error = crate::error::GuardianError>,
1653 >)
1654 })
1655 as Pin<
1656 Box<
1657 dyn std::future::Future<
1658 Output = crate::error::Result<
1659 Box<
1660 dyn crate::traits::Store<
1661 Error = crate::error::GuardianError,
1662 >,
1663 >,
1664 >,
1665 > + Send,
1666 >,
1667 >
1668 },
1669 );
1670
1671 let document_constructor = Arc::new(
1673 |ipfs: Arc<crate::ipfs_core_api::client::IpfsClient>,
1674 identity: Arc<crate::ipfs_log::identity::Identity>,
1675 address: Box<dyn crate::address::Address>,
1676 options: crate::traits::NewStoreOptions| {
1677 Box::pin(async move {
1678 use crate::stores::document_store::document::GuardianDBDocumentStore;
1679
1680 let arc_address: Arc<dyn crate::address::Address> =
1682 Arc::from(address as Box<dyn crate::address::Address>);
1683
1684 let store = GuardianDBDocumentStore::new(ipfs, identity, arc_address, options)
1685 .await
1686 .map_err(|e| crate::error::GuardianError::Store(e.to_string()))?;
1687
1688 Ok(Box::new(store)
1689 as Box<
1690 dyn crate::traits::Store<Error = crate::error::GuardianError>,
1691 >)
1692 })
1693 as Pin<
1694 Box<
1695 dyn std::future::Future<
1696 Output = crate::error::Result<
1697 Box<
1698 dyn crate::traits::Store<
1699 Error = crate::error::GuardianError,
1700 >,
1701 >,
1702 >,
1703 > + Send,
1704 >,
1705 >
1706 },
1707 );
1708
1709 self.register_store_type("eventlog".to_string(), eventlog_constructor);
1711 self.register_store_type("keyvalue".to_string(), keyvalue_constructor);
1712 self.register_store_type("document".to_string(), document_constructor);
1713
1714 tracing::debug!(
1715 types = ?self.store_types_names(),
1716 "Construtores padrão registrados"
1717 );
1718 }
1719
1720 pub fn event_bus(&self) -> Arc<EventBusImpl> {
1722 self.event_bus.clone()
1723 }
1724
1725 pub async fn monitor_direct_channel(
1727 &self,
1728 event_bus: Arc<EventBusImpl>,
1729 ) -> Result<JoinHandle<()>> {
1730 let mut receiver = event_bus
1731 .subscribe::<EventPubSubPayload>()
1732 .await
1733 .map_err(|e| {
1734 GuardianError::Other(format!(
1735 "não foi possível se inscrever nos eventos do pubsub: {}",
1736 e
1737 ))
1738 })?;
1739
1740 let token = self.cancellation_token.clone();
1742 let message_marshaler = self.message_marshaler.clone();
1743 let emitters = self.emitters.clone();
1744 let stores = self.stores.clone();
1745
1746 let handle = tokio::spawn(async move {
1747 tracing::debug!("Monitor do canal direto iniciado");
1748
1749 loop {
1750 tokio::select! {
1751 _ = token.cancelled() => {
1753 tracing::debug!("monitor_direct_channel encerrando");
1754 return;
1755 }
1756 maybe_event = receiver.recv() => {
1758 match maybe_event {
1759 Ok(event) => {
1760 tracing::trace!(
1761 peer = %event.peer,
1762 payload_size = event.payload.len(),
1763 "Evento recebido no canal direto"
1764 );
1765
1766 let msg = match message_marshaler.unmarshal(&event.payload) {
1768 Ok(msg) => msg,
1769 Err(e) => {
1770 tracing::warn!(
1771 peer = %event.peer,
1772 error = %e,
1773 payload_size = event.payload.len(),
1774 "Falha ao deserializar mensagem do canal direto"
1775 );
1776 continue;
1777 }
1778 };
1779
1780 tracing::debug!(
1781 peer = %event.peer,
1782 store_address = %msg.address,
1783 heads_count = msg.heads.len(),
1784 "Mensagem deserializada com sucesso"
1785 );
1786
1787 let store = {
1789 let stores_guard = stores.read();
1790 stores_guard.get(&msg.address).cloned()
1791 };
1792
1793 let _store = match store {
1794 Some(store) => store,
1795 None => {
1796 tracing::debug!(
1797 store_address = %msg.address,
1798 peer = %event.peer,
1799 "Store não encontrada para endereço, ignorando mensagem"
1800 );
1801 continue;
1802 }
1803 };
1804
1805 let valid_heads: Vec<_> = msg.heads.iter()
1808 .filter(|head| !head.id.is_empty() && !head.payload.is_empty())
1809 .cloned()
1810 .collect();
1811
1812 if valid_heads.is_empty() {
1813 tracing::warn!(
1814 store_address = %msg.address,
1815 peer = %event.peer,
1816 total_heads = msg.heads.len(),
1817 "Todos os heads recebidos são inválidos"
1818 );
1819 continue;
1820 }
1821
1822 tracing::debug!(
1823 store_address = %msg.address,
1824 peer = %event.peer,
1825 valid_heads = valid_heads.len(),
1826 total_heads = msg.heads.len(),
1827 "Processando heads válidos"
1828 );
1829
1830 tracing::debug!(
1833 store_address = %msg.address,
1834 peer = %event.peer,
1835 valid_heads = valid_heads.len(),
1836 "Iniciando sincronização com a store"
1837 );
1838
1839 let sync_result = Self::sync_store_with_heads(&_store, valid_heads.clone()).await;
1842
1843 match sync_result {
1844 Ok(()) => {
1845 tracing::debug!(
1846 store_address = %msg.address,
1847 peer = %event.peer,
1848 processed_heads = valid_heads.len(),
1849 "Sincronização de heads completada com sucesso"
1850 );
1851 }
1852 Err(e) => {
1853 tracing::error!(
1854 store_address = %msg.address,
1855 peer = %event.peer,
1856 error = %e,
1857 attempted_heads = valid_heads.len(),
1858 "Erro durante sincronização de heads"
1859 );
1860 }
1862 }
1863
1864 let exchange_event = EventExchangeHeads::new(event.peer, msg);
1866 if let Err(e) = emitters.new_heads.emit(exchange_event) {
1867 tracing::error!(
1868 error = %e,
1869 peer = %event.peer,
1870 "Erro ao emitir evento new_heads"
1871 );
1872 } else {
1873 tracing::trace!(peer = %event.peer, "Evento new_heads emitido com sucesso");
1874 }
1875 }
1876 Err(_) => {
1877 tracing::debug!("Canal de eventos fechado, encerrando monitor");
1879 break;
1880 }
1881 }
1882 }
1883 }
1884 }
1885
1886 tracing::debug!("Monitor do canal direto finalizado");
1887 });
1888
1889 Ok(handle)
1890 }
1891
1892 async fn sync_store_with_heads(
1895 store: &Arc<GuardianStore>,
1896 heads: Vec<crate::ipfs_log::entry::Entry>,
1897 ) -> Result<()> {
1898 if let Some(base_store) = store
1901 .as_any()
1902 .downcast_ref::<crate::stores::base_store::base_store::BaseStore>()
1903 {
1904 return base_store.sync(heads).await.map_err(|e| {
1906 GuardianError::Store(format!("Erro na sincronização BaseStore: {}", e))
1907 });
1908 }
1909 if let Some(event_log_store) = store
1912 .as_any()
1913 .downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>(
1914 ) {
1915 let base_store = event_log_store.basestore();
1917 return base_store.sync(heads).await.map_err(|e| {
1918 GuardianError::Store(format!("Erro na sincronização EventLogStore: {}", e))
1919 });
1920 }
1921
1922 if let Some(kv_store) = store
1924 .as_any()
1925 .downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>()
1926 {
1927 let base_store = kv_store.basestore();
1929 return base_store.sync(heads).await.map_err(|e| {
1930 GuardianError::Store(format!("Erro na sincronização KeyValueStore: {}", e))
1931 });
1932 }
1933
1934 if let Some(doc_store) = store
1936 .as_any()
1937 .downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>(
1938 ) {
1939 let base_store = doc_store.basestore();
1941 return base_store.sync(heads).await.map_err(|e| {
1942 GuardianError::Store(format!("Erro na sincronização DocumentStore: {}", e))
1943 });
1944 }
1945
1946 Err(GuardianError::Other(
1948 "Tipo de store não suportado para sincronização ou downcast falhou".to_string(),
1949 ))
1950 }
1951
1952 async fn get_store_total_entries(&self, store: &Arc<GuardianStore>) -> Result<usize> {
1955 if let Some(base_store) = store
1958 .as_any()
1959 .downcast_ref::<crate::stores::base_store::base_store::BaseStore>()
1960 {
1961 let op_log = base_store.op_log();
1963 let log = op_log.read();
1964 return Ok(log.len());
1965 }
1966
1967 if let Some(event_log_store) = store
1970 .as_any()
1971 .downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>(
1972 ) {
1973 let base_store = event_log_store.basestore();
1974 let op_log = base_store.op_log();
1975 let log = op_log.read();
1976 return Ok(log.len());
1977 }
1978
1979 if let Some(kv_store) = store
1981 .as_any()
1982 .downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>()
1983 {
1984 let base_store = kv_store.basestore();
1985 let op_log = base_store.op_log();
1986 let log = op_log.read();
1987 return Ok(log.len());
1988 }
1989
1990 if let Some(doc_store) = store
1992 .as_any()
1993 .downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>(
1994 ) {
1995 let base_store = doc_store.basestore();
1996 let op_log = base_store.op_log();
1997 let log = op_log.read();
1998 return Ok(log.len());
1999 }
2000
2001 Err(GuardianError::Other(
2003 "Tipo de store não suportado para obter total de entradas ou downcast falhou"
2004 .to_string(),
2005 ))
2006 }
2007
2008 fn start_monitor_task(
2011 event_bus: Arc<EventBusImpl>,
2012 cancellation_token: CancellationToken,
2013 span: Span,
2014 ) -> JoinHandle<()> {
2015 tokio::spawn(async move {
2016 let _enter = span.enter();
2017 let mut receiver = match event_bus.subscribe::<EventPubSubPayload>().await {
2019 Ok(rx) => rx,
2020 Err(e) => {
2021 tracing::error!("Falha ao se inscrever nos eventos do pubsub: {}", e);
2022 return;
2023 }
2024 };
2025
2026 tracing::debug!("Monitor do canal direto iniciado");
2027
2028 loop {
2029 tokio::select! {
2030 _ = cancellation_token.cancelled() => {
2032 tracing::debug!("Monitor do canal direto encerrando");
2033 return;
2034 }
2035 maybe_event = receiver.recv() => {
2037 match maybe_event {
2038 Ok(event) => {
2039 tracing::trace!(
2040 peer = %event.peer,
2041 "Evento recebido no monitor do canal direto"
2042 );
2043
2044 tracing::debug!(
2050 event_type = "pubsub_payload",
2051 from_peer = %event.peer,
2052 payload_size = event.payload.len(),
2053 "Processando evento de canal direto"
2054 );
2055
2056 }
2059 Err(_) => {
2060 tracing::debug!("Canal de eventos fechado, encerrando monitor");
2061 break;
2062 }
2063 }
2064 }
2065 }
2066 }
2067 })
2068 }
2069
2070 async fn verify_heads_permissions(
2095 &self,
2096 heads: &[crate::ipfs_log::entry::Entry],
2097 store: &Arc<GuardianStore>,
2098 ) -> Result<Vec<crate::ipfs_log::entry::Entry>> {
2099 tracing::debug!(
2100 heads_count = heads.len(),
2101 "Iniciando verificação de permissões para heads"
2102 );
2103
2104 let mut authorized_heads = Vec::new();
2105 let mut denied_count = 0;
2106 let mut no_identity_count = 0;
2107
2108 let access_controller = {
2110 if let Some(event_log_store) = store.as_any().downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>() {
2112 event_log_store.basestore().access_controller()
2113 } else if let Some(kv_store) = store.as_any().downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>() {
2114 kv_store.basestore().access_controller()
2115 } else if let Some(doc_store) = store.as_any().downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>() {
2116 doc_store.basestore().access_controller()
2117 } else if let Some(base_store) = store.as_any().downcast_ref::<crate::stores::base_store::base_store::BaseStore>() {
2118 base_store.access_controller()
2119 } else {
2120 tracing::warn!("Tipo de store não suportado para verificação de permissões");
2121 return Err(GuardianError::Store(
2122 "Store type not supported for permission verification".to_string()
2123 ));
2124 }
2125 };
2126
2127 tracing::debug!(
2128 access_controller_type = access_controller.get_type(),
2129 "Access Controller obtido"
2130 );
2131
2132 for (i, head) in heads.iter().enumerate() {
2134 let identity = match &head.identity {
2136 Some(identity) => identity,
2137 None => {
2138 tracing::debug!(
2139 head_index = i + 1,
2140 total_heads = heads.len(),
2141 head_hash = %head.hash,
2142 "Head rejeitado: sem identidade"
2143 );
2144 no_identity_count += 1;
2145 continue;
2146 }
2147 };
2148
2149 if identity.id().is_empty() || identity.pub_key().is_empty() {
2151 tracing::debug!(
2152 head_index = i + 1,
2153 total_heads = heads.len(),
2154 head_hash = %head.hash,
2155 "Head rejeitado: identidade inválida"
2156 );
2157 denied_count += 1;
2158 continue;
2159 }
2160
2161 let identity_key = identity.pub_key();
2163 let has_write_permission = match access_controller.get_authorized_by_role("write").await
2164 {
2165 Ok(authorized_keys) => {
2166 authorized_keys.contains(&identity_key.to_string())
2168 || authorized_keys.contains(&identity.id().to_string())
2169 || authorized_keys.contains(&"*".to_string()) }
2171 Err(e) => {
2172 tracing::warn!(
2173 head_index = i + 1,
2174 total_heads = heads.len(),
2175 error = %e,
2176 head_hash = %head.hash,
2177 "Head erro ao verificar permissões"
2178 );
2179
2180 let permission_denied_event = EventPermissionDenied::new(
2182 store.address().to_string(),
2183 identity.id().to_string(),
2184 identity_key.to_string(),
2185 "write".to_string(),
2186 );
2187
2188 if let Err(emit_err) = self
2189 .emitters
2190 .permission_denied
2191 .emit(permission_denied_event)
2192 {
2193 tracing::warn!(error = %emit_err, "Erro ao emitir evento PermissionDenied");
2194 }
2195
2196 false }
2198 };
2199
2200 if !has_write_permission {
2201 tracing::debug!(
2202 head_index = i + 1,
2203 total_heads = heads.len(),
2204 head_hash = %head.hash,
2205 identity_id = identity.id(),
2206 "Head rejeitado: sem permissão de escrita"
2207 );
2208
2209 let permission_denied_event = EventPermissionDenied::new(
2211 store.address().to_string(),
2212 identity.id().to_string(),
2213 identity_key.to_string(),
2214 "write".to_string(),
2215 );
2216
2217 if let Err(e) = self
2218 .emitters
2219 .permission_denied
2220 .emit(permission_denied_event)
2221 {
2222 tracing::warn!(error = %e, "Erro ao emitir evento PermissionDenied");
2223 }
2224
2225 denied_count += 1;
2226 continue;
2227 }
2228
2229 let has_admin_permission = match access_controller.get_authorized_by_role("admin").await
2231 {
2232 Ok(admin_keys) => {
2233 admin_keys.contains(&identity_key.to_string())
2234 || admin_keys.contains(&identity.id().to_string())
2235 || admin_keys.contains(&"*".to_string())
2236 }
2237 Err(_) => false, };
2239
2240 if has_write_permission || has_admin_permission {
2242 let permission_type = if has_admin_permission {
2243 "admin"
2244 } else {
2245 "write"
2246 };
2247 tracing::debug!(
2248 head_index = i + 1,
2249 total_heads = heads.len(),
2250 permission_type = permission_type,
2251 head_hash = %head.hash,
2252 identity_id = identity.id(),
2253 "Head autorizado"
2254 );
2255
2256 authorized_heads.push(head.clone());
2257 } else {
2258 tracing::debug!(
2259 head_index = i + 1,
2260 total_heads = heads.len(),
2261 head_hash = %head.hash,
2262 identity_id = identity.id(),
2263 "Head rejeitado: sem permissões adequadas"
2264 );
2265
2266 let permission_denied_event = EventPermissionDenied::new(
2268 store.address().to_string(),
2269 identity.id().to_string(),
2270 identity_key.to_string(),
2271 "write/admin".to_string(),
2272 );
2273
2274 if let Err(e) = self
2275 .emitters
2276 .permission_denied
2277 .emit(permission_denied_event)
2278 {
2279 tracing::warn!(error = %e, "Erro ao emitir evento PermissionDenied");
2280 }
2281
2282 denied_count += 1;
2283 }
2284 }
2285
2286 let authorized_count = authorized_heads.len();
2288 let total_heads = heads.len();
2289
2290 tracing::debug!(
2291 total_heads = total_heads,
2292 authorized_heads = authorized_count,
2293 denied_heads = denied_count,
2294 no_identity_heads = no_identity_count,
2295 access_controller_type = access_controller.get_type(),
2296 "Verificação de permissões concluída"
2297 );
2298
2299 if authorized_count == 0 && total_heads > 0 {
2300 tracing::warn!(
2301 total_heads = total_heads,
2302 "ATENÇÃO: Todos os heads foram rejeitados por falta de permissões"
2303 );
2304
2305 if let Ok(write_keys) = access_controller.get_authorized_by_role("write").await {
2307 tracing::debug!(write_keys = ?write_keys, "Chaves autorizadas para escrita");
2308 }
2309 if let Ok(admin_keys) = access_controller.get_authorized_by_role("admin").await {
2310 tracing::debug!(admin_keys = ?admin_keys, "Chaves autorizadas para admin");
2311 }
2312 } else if authorized_count < total_heads {
2313 tracing::info!(
2314 authorized_heads = authorized_count,
2315 total_heads = total_heads,
2316 rejected_heads = total_heads - authorized_count,
2317 "Verificação parcial de permissões concluída"
2318 );
2319 } else if authorized_count == total_heads && total_heads > 0 {
2320 tracing::debug!(
2321 authorized_heads = total_heads,
2322 "Verificação completa: todos os heads foram autorizados"
2323 );
2324 }
2325
2326 Ok(authorized_heads)
2327 }
2328
2329 async fn verify_identity_cryptographically(&self, identity: &Identity) -> Result<()> {
2346 if identity.id().is_empty() {
2348 return Err(GuardianError::Store(
2349 "Identity ID cannot be empty".to_string(),
2350 ));
2351 }
2352
2353 if identity.pub_key().is_empty() {
2354 return Err(GuardianError::Store(
2355 "Identity public key cannot be empty".to_string(),
2356 ));
2357 }
2358
2359 let pub_key_hex = identity.pub_key();
2361 let pub_key_bytes = match hex::decode(pub_key_hex) {
2362 Ok(bytes) => bytes,
2363 Err(e) => {
2364 return Err(GuardianError::Store(format!(
2365 "Failed to decode public key from hex: {}",
2366 e
2367 )));
2368 }
2369 };
2370
2371 let secp = secp256k1::Secp256k1::new();
2372 let public_key = match secp256k1::PublicKey::from_slice(&pub_key_bytes) {
2373 Ok(pk) => pk,
2374 Err(e) => {
2375 return Err(GuardianError::Store(format!(
2376 "Invalid secp256k1 public key: {}",
2377 e
2378 )));
2379 }
2380 };
2381
2382 let signatures = identity.signatures();
2384
2385 if !signatures.id().is_empty() {
2387 match self.verify_signature_with_secp256k1(
2388 identity.id(),
2389 signatures.id(),
2390 &public_key,
2391 &secp,
2392 ) {
2393 Ok(true) => {
2394 tracing::debug!("Identity ID signature verified successfully");
2395 }
2396 Ok(false) => {
2397 return Err(GuardianError::Store(
2398 "Identity ID signature verification failed".to_string(),
2399 ));
2400 }
2401 Err(e) => {
2402 return Err(GuardianError::Store(format!(
2403 "Error verifying ID signature: {}",
2404 e
2405 )));
2406 }
2407 }
2408 }
2409
2410 if !signatures.pub_key().is_empty() {
2412 let pub_key_data = format!("{}{}", identity.pub_key(), signatures.id());
2414
2415 match self.verify_signature_with_secp256k1(
2416 &pub_key_data,
2417 signatures.pub_key(),
2418 &public_key,
2419 &secp,
2420 ) {
2421 Ok(true) => {
2422 tracing::debug!("Identity public key signature verified successfully");
2423 }
2424 Ok(false) => {
2425 return Err(GuardianError::Store(
2426 "Identity public key signature verification failed".to_string(),
2427 ));
2428 }
2429 Err(e) => {
2430 return Err(GuardianError::Store(format!(
2431 "Error verifying public key signature: {}",
2432 e
2433 )));
2434 }
2435 }
2436 }
2437
2438 if let Some(libp2p_key) = identity.public_key() {
2440 let peer_id = libp2p_identity::PeerId::from_public_key(&libp2p_key);
2442 tracing::debug!(peer_id = %peer_id, "Identity verified with libp2p PeerID");
2443 }
2444
2445 tracing::debug!(
2446 identity_id = identity.id(),
2447 public_key_len = identity.pub_key().len(),
2448 "Identity cryptographic verification completed successfully"
2449 );
2450
2451 Ok(())
2452 }
2453
2454 fn verify_signature_with_secp256k1(
2469 &self,
2470 message: &str,
2471 signature_str: &str,
2472 public_key: &secp256k1::PublicKey,
2473 secp: &secp256k1::Secp256k1<secp256k1::All>,
2474 ) -> Result<bool> {
2475 use secp256k1::Message;
2476 use sha2::{Digest, Sha256};
2477 use std::str::FromStr;
2478
2479 let mut hasher = Sha256::new();
2481 hasher.update(message.as_bytes());
2482 let message_hash = hasher.finalize();
2483 let message_hash_array: [u8; 32] = message_hash.into();
2484
2485 let secp_message = Message::from_digest(message_hash_array);
2487
2488 let signature = match secp256k1::ecdsa::Signature::from_str(signature_str) {
2490 Ok(sig) => sig,
2491 Err(e) => {
2492 tracing::debug!(
2493 signature = signature_str,
2494 error = %e,
2495 "Failed to parse signature"
2496 );
2497 return Ok(false); }
2499 };
2500
2501 match secp.verify_ecdsa(secp_message, &signature, public_key) {
2503 Ok(()) => {
2504 tracing::debug!("Signature verification successful");
2505 Ok(true)
2506 }
2507 Err(e) => {
2508 tracing::debug!(error = %e, "Signature verification failed");
2509 Ok(false) }
2511 }
2512 }
2513
2514 pub async fn handle_event_exchange_heads(
2547 &self,
2548 event: &MessageExchangeHeads,
2549 store: Arc<GuardianStore>,
2550 ) -> Result<()> {
2551 let heads = &event.heads;
2552 let store_address = &event.address;
2553
2554 tracing::debug!(
2555 peer_id = %self.peer_id(),
2556 count = heads.len(),
2557 store_address = store_address,
2558 "Processando evento de exchange heads"
2559 );
2560
2561 if heads.is_empty() {
2562 tracing::debug!("Nenhum head recebido para sincronização");
2563 return Ok(());
2564 }
2565
2566 let mut valid_heads = Vec::new();
2568 let mut skipped_count = 0;
2569
2570 for (i, head) in heads.iter().enumerate() {
2571 if head.hash.is_empty() || head.payload.is_empty() {
2573 tracing::debug!(
2574 head_index = i + 1,
2575 total_heads = heads.len(),
2576 "Head ignorado: dados inválidos (hash ou payload vazio)"
2577 );
2578 skipped_count += 1;
2579 continue;
2580 }
2581
2582 if head.id.is_empty() {
2584 tracing::debug!(
2585 head_index = i + 1,
2586 total_heads = heads.len(),
2587 "Head ignorado: ID vazio"
2588 );
2589 skipped_count += 1;
2590 continue;
2591 }
2592
2593 if let Some(identity) = &head.identity {
2595 if identity.id().is_empty() || identity.pub_key().is_empty() {
2596 tracing::warn!(
2597 head_index = i + 1,
2598 total_heads = heads.len(),
2599 head_hash = %head.hash,
2600 "Head com identidade inválida"
2601 );
2602 } else {
2603 tracing::debug!(
2604 head_index = i + 1,
2605 total_heads = heads.len(),
2606 head_hash = %head.hash,
2607 identity_id = identity.id(),
2608 "Head com identidade válida"
2609 );
2610
2611 match self.verify_identity_cryptographically(identity).await {
2613 Ok(()) => {
2614 tracing::debug!(
2615 head_index = i + 1,
2616 total_heads = heads.len(),
2617 head_hash = %head.hash,
2618 "Head identidade verificada criptograficamente"
2619 );
2620 }
2621 Err(e) => {
2622 tracing::warn!(
2623 head_index = i + 1,
2624 total_heads = heads.len(),
2625 head_hash = %head.hash,
2626 error = %e,
2627 "Head falha na verificação criptográfica da identidade"
2628 );
2629 }
2632 }
2633 }
2634 }
2635
2636 valid_heads.push(head.clone());
2637
2638 tracing::debug!(
2639 head_index = i + 1,
2640 total_heads = heads.len(),
2641 head_hash = %head.hash,
2642 clock_id = head.clock.id(),
2643 clock_time = head.clock.time(),
2644 "Head validado"
2645 );
2646 }
2647
2648 if valid_heads.is_empty() {
2649 tracing::warn!(
2650 total_heads = heads.len(),
2651 "Todos os heads recebidos são inválidos"
2652 );
2653 return Ok(());
2654 }
2655
2656 if skipped_count > 0 {
2657 tracing::debug!(
2658 valid_heads = valid_heads.len(),
2659 total_heads = heads.len(),
2660 skipped_count = skipped_count,
2661 "Validação concluída com heads ignorados"
2662 );
2663 }
2664
2665 tracing::debug!(
2667 valid_heads_count = valid_heads.len(),
2668 "Verificando permissões de acesso para heads"
2669 );
2670
2671 let permitted_heads = self.verify_heads_permissions(&valid_heads, &store).await?;
2673
2674 tracing::debug!(
2676 permitted_heads_count = permitted_heads.len(),
2677 "Verificando duplicatas no oplog para heads"
2678 );
2679
2680 let mut new_heads = Vec::new();
2681 let mut duplicate_count = 0;
2682
2683 for (i, head) in permitted_heads.iter().enumerate() {
2685 let head_hash = head.hash();
2687 let already_exists = {
2688 if let Some(event_log_store) = store.as_any()
2690 .downcast_ref::<crate::stores::event_log_store::log::GuardianDBEventLogStore>()
2691 {
2692 event_log_store.basestore().op_log().read().has(head_hash)
2693 } else if let Some(kv_store) = store.as_any()
2694 .downcast_ref::<crate::stores::kv_store::keyvalue::GuardianDBKeyValue>()
2695 {
2696 kv_store.basestore().op_log().read().has(head_hash)
2697 } else if let Some(doc_store) = store.as_any()
2698 .downcast_ref::<crate::stores::document_store::document::GuardianDBDocumentStore>()
2699 {
2700 doc_store.basestore().op_log().read().has(head_hash)
2701 } else if let Some(base_store) = store.as_any()
2702 .downcast_ref::<crate::stores::base_store::base_store::BaseStore>()
2703 {
2704 base_store.op_log().read().has(head_hash)
2705 } else {
2706 tracing::warn!(
2707 head_index = i + 1,
2708 total_heads = permitted_heads.len(),
2709 head_hash = %head_hash,
2710 "Tipo de store não suportado para verificação de duplicatas, assumindo novo"
2711 );
2712 false }
2714 };
2715
2716 if already_exists {
2717 tracing::debug!(
2718 head_index = i + 1,
2719 total_heads = permitted_heads.len(),
2720 head_hash = %head_hash,
2721 "Head já existe no oplog (duplicata)"
2722 );
2723 duplicate_count += 1;
2724 } else {
2725 tracing::debug!(
2726 head_index = i + 1,
2727 total_heads = permitted_heads.len(),
2728 head_hash = %head_hash,
2729 "Head é novo, adicionando para sincronização"
2730 );
2731 new_heads.push(head.clone());
2732 }
2733 }
2734
2735 if new_heads.is_empty() {
2736 tracing::debug!(
2737 duplicate_count = duplicate_count,
2738 "Todos os heads são duplicatas, sincronização desnecessária"
2739 );
2740 return Ok(());
2741 }
2742
2743 if duplicate_count > 0 {
2744 tracing::debug!(
2745 new_heads = new_heads.len(),
2746 total_heads = heads.len(),
2747 duplicate_count = duplicate_count,
2748 "Duplicatas detectadas"
2749 );
2750 }
2751
2752 tracing::debug!(
2754 valid_heads = new_heads.len(),
2755 store_address = store_address,
2756 "Iniciando sincronização com a store"
2757 );
2758
2759 let new_heads_count = new_heads.len();
2761 let sync_start_time = std::time::Instant::now();
2762
2763 let entries_for_events = new_heads.clone();
2765
2766 let sync_result = Self::sync_store_with_heads(&store, new_heads).await;
2768
2769 let sync_duration = sync_start_time.elapsed();
2771 let duration_ms = sync_duration.as_millis() as u64;
2772
2773 match sync_result {
2774 Ok(()) => {
2775 tracing::debug!(
2776 processed_count = new_heads_count,
2777 store_address = store_address,
2778 duration_ms = duration_ms,
2779 "Sincronização de heads concluída com sucesso"
2780 );
2781
2782 let exchange_event = EventExchangeHeads::new(self.peer_id(), event.clone());
2785
2786 if let Err(e) = self.emitters.new_heads.emit(exchange_event) {
2787 tracing::warn!(error = %e, "Falha ao emitir evento new_heads");
2788 } else {
2789 tracing::trace!(
2790 processed_heads = new_heads_count,
2791 "Evento new_heads emitido com sucesso"
2792 );
2793 }
2794
2795 let store_type = store.store_type();
2798 let total_entries = self.get_store_total_entries(&store).await.unwrap_or(0);
2799
2800 let store_updated_event = EventStoreUpdated::new(
2802 store_address.clone(),
2803 store_type.to_string(),
2804 new_heads_count,
2805 total_entries,
2806 );
2807
2808 if let Err(e) = self.emitters.store_updated.emit(store_updated_event) {
2809 tracing::warn!(error = %e, "Falha ao emitir evento store_updated");
2810 } else {
2811 tracing::debug!(
2812 store_address = store_address,
2813 entries_added = new_heads_count,
2814 "Evento store_updated emitido com sucesso"
2815 );
2816 }
2817
2818 let sync_completed_event = EventSyncCompleted::new(
2820 store_address.clone(),
2821 self.peer_id().to_string(),
2822 new_heads_count,
2823 duration_ms,
2824 true, );
2826
2827 if let Err(e) = self.emitters.sync_completed.emit(sync_completed_event) {
2828 tracing::warn!(error = %e, "Falha ao emitir evento sync_completed");
2829 } else {
2830 tracing::debug!(
2831 store_address = store_address,
2832 duration_ms = duration_ms,
2833 "Evento sync_completed emitido com sucesso"
2834 );
2835 }
2836
2837 if !entries_for_events.is_empty() {
2839 let new_entries_event = EventNewEntries::new(
2840 store_address.clone(),
2841 entries_for_events,
2842 total_entries,
2843 );
2844
2845 if let Err(e) = self.emitters.new_entries.emit(new_entries_event) {
2846 tracing::warn!(error = %e, "Falha ao emitir evento new_entries");
2847 } else {
2848 tracing::debug!(
2849 store_address = store_address,
2850 new_entries_count = new_heads_count,
2851 "Evento new_entries emitido com sucesso"
2852 );
2853 }
2854 }
2855 }
2856 Err(e) => {
2857 tracing::error!(
2858 error = %e,
2859 store_address = store_address,
2860 heads_count = new_heads_count,
2861 duration_ms = duration_ms,
2862 "Falha na sincronização de heads"
2863 );
2864
2865 let error_type = match &e {
2868 GuardianError::Store(_) => SyncErrorType::StoreError,
2869 GuardianError::Network(_) => SyncErrorType::NetworkError,
2870 GuardianError::InvalidArgument(_) => SyncErrorType::ValidationError,
2871 _ => SyncErrorType::UnknownError,
2872 };
2873
2874 let sync_error_event = EventSyncError::new(
2875 store_address.clone(),
2876 self.peer_id().to_string(),
2877 e.to_string(),
2878 new_heads_count,
2879 error_type.clone(),
2880 );
2881
2882 if let Err(emit_err) = self.emitters.sync_error.emit(sync_error_event) {
2883 tracing::warn!(
2884 error = %emit_err,
2885 original_error = %e,
2886 "Falha ao emitir evento sync_error"
2887 );
2888 } else {
2889 tracing::debug!(
2890 store_address = store_address,
2891 error_type = ?error_type,
2892 "Evento sync_error emitido com sucesso"
2893 );
2894 }
2895
2896 let sync_completed_event = EventSyncCompleted::new(
2898 store_address.clone(),
2899 self.peer_id().to_string(),
2900 0, duration_ms,
2902 false, );
2904
2905 if let Err(emit_err) = self.emitters.sync_completed.emit(sync_completed_event) {
2906 tracing::warn!(error = %emit_err, "Falha ao emitir evento sync_completed (erro)");
2907 }
2908
2909 return Err(e);
2910 }
2911 }
2912
2913 tracing::debug!(
2914 total_heads_received = heads.len(),
2915 heads_processed = new_heads_count,
2916 heads_skipped = skipped_count,
2917 store_address = store_address,
2918 "Processamento de exchange heads completado com sucesso"
2919 );
2920
2921 Ok(())
2922 }
2923}
2924
2925pub async fn make_direct_channel(
2927 event_bus: &EventBusImpl,
2928 factory: DirectChannelFactory,
2929 options: &DirectChannelOptions,
2930) -> Result<Arc<dyn DirectChannel<Error = GuardianError> + Send + Sync>> {
2931 let emitter = crate::p2p::events::PayloadEmitter::new(event_bus)
2932 .await
2933 .map_err(|e| {
2934 GuardianError::Other(format!(
2935 "não foi possível inicializar o emitter do pubsub: {}",
2936 e
2937 ))
2938 })?;
2939
2940 let channel = factory(Arc::new(emitter), Some((*options).clone()))
2942 .await
2943 .map_err(|e| GuardianError::Other(format!("Falha ao criar canal direto: {}", e)))?;
2944
2945 tracing::debug!("Canal direto criado com sucesso usando factory fornecida");
2946 Ok(channel)
2947}
2948
2949impl Drop for GuardianDB {
2951 fn drop(&mut self) {
2952 self._monitor_handle.abort();
2954
2955 self.cancellation_token.cancel();
2957
2958 }
2961}
2962
2963#[async_trait::async_trait]
2965impl BaseGuardianDB for GuardianDB {
2966 type Error = GuardianError;
2967
2968 fn ipfs(&self) -> Arc<IpfsClient> {
2969 Arc::new(self.ipfs.clone())
2970 }
2971
2972 fn identity(&self) -> Arc<Identity> {
2973 let identity_guard = self.identity.read();
2975 Arc::new(identity_guard.clone())
2976 }
2977
2978 async fn open(
2979 &self,
2980 address: &str,
2981 options: &mut CreateDBOptions,
2982 ) -> std::result::Result<Arc<dyn Store<Error = GuardianError>>, Self::Error> {
2983 let options_copy = CreateDBOptions {
2985 event_bus: options.event_bus.clone(),
2986 directory: options.directory.clone(),
2987 overwrite: options.overwrite,
2988 local_only: options.local_only,
2989 create: options.create,
2990 store_type: options.store_type.clone(),
2991 access_controller_address: options.access_controller_address.clone(),
2992 access_controller: None, replicate: options.replicate,
2994 keystore: options.keystore.clone(),
2995 cache: options.cache.clone(),
2996 identity: options.identity.clone(),
2997 sort_fn: options.sort_fn,
2998 timeout: options.timeout,
2999 message_marshaler: options.message_marshaler.clone(),
3000 span: options.span.clone(),
3001 close_func: None,
3002 store_specific_opts: None,
3003 };
3004
3005 let arc_store = GuardianDB::open(self, address, options_copy).await?;
3007
3008 let store_dyn: Arc<dyn Store<Error = GuardianError>> =
3010 arc_store as Arc<dyn Store<Error = GuardianError>>;
3011
3012 Ok(store_dyn)
3013 }
3014
3015 fn get_store(&self, address: &str) -> Option<Arc<dyn Store<Error = GuardianError>>> {
3016 if let Some(arc_store) = GuardianDB::get_store(self, address) {
3018 let store_dyn: Arc<dyn Store<Error = GuardianError>> =
3020 arc_store as Arc<dyn Store<Error = GuardianError>>;
3021 Some(store_dyn)
3022 } else {
3023 None
3024 }
3025 }
3026
3027 async fn create(
3028 &self,
3029 name: &str,
3030 store_type: &str,
3031 options: &mut CreateDBOptions,
3032 ) -> std::result::Result<Arc<dyn Store<Error = GuardianError>>, Self::Error> {
3033 let options_copy = CreateDBOptions {
3035 event_bus: options.event_bus.clone(),
3036 directory: options.directory.clone(),
3037 overwrite: options.overwrite,
3038 local_only: options.local_only,
3039 create: options.create,
3040 store_type: Some(store_type.to_string()),
3041 access_controller_address: options.access_controller_address.clone(),
3042 access_controller: None,
3043 replicate: options.replicate,
3044 keystore: options.keystore.clone(),
3045 cache: options.cache.clone(),
3046 identity: options.identity.clone(),
3047 sort_fn: options.sort_fn,
3048 timeout: options.timeout,
3049 message_marshaler: options.message_marshaler.clone(),
3050 span: options.span.clone(),
3051 close_func: None,
3052 store_specific_opts: None,
3053 };
3054
3055 let arc_store = GuardianDB::create(self, name, store_type, Some(options_copy)).await?;
3057
3058 let store_dyn: Arc<dyn Store<Error = GuardianError>> =
3060 arc_store as Arc<dyn Store<Error = GuardianError>>;
3061
3062 Ok(store_dyn)
3063 }
3064
3065 async fn determine_address(
3066 &self,
3067 name: &str,
3068 store_type: &str,
3069 options: &DetermineAddressOptions,
3070 ) -> std::result::Result<Box<dyn Address>, Self::Error> {
3071 let guardian_address =
3073 GuardianDB::determine_address(self, name, store_type, Some(options.clone())).await?;
3074
3075 let boxed_address: Box<dyn Address> = Box::new(guardian_address);
3077
3078 Ok(boxed_address)
3079 }
3080
3081 fn register_store_type(&mut self, store_type: &str, constructor: StoreConstructor) {
3082 let mut types = self.store_types.write();
3084 types.insert(store_type.to_string(), constructor);
3085 tracing::debug!("Registered store type: {}", store_type);
3086 }
3087
3088 fn unregister_store_type(&mut self, store_type: &str) {
3089 let mut types = self.store_types.write();
3091 types.remove(store_type);
3092 tracing::debug!("Unregistered store type: {}", store_type);
3093 }
3094
3095 fn register_access_controller_type(
3096 &mut self,
3097 constructor: AccessControllerConstructor,
3098 ) -> std::result::Result<(), Self::Error> {
3099 let mut types = self.access_controller_types.write();
3101 types.insert("default".to_string(), constructor);
3102 tracing::debug!("Registered access controller type: default");
3103 Ok(())
3104 }
3105
3106 fn unregister_access_controller_type(&mut self, controller_type: &str) {
3107 let mut types = self.access_controller_types.write();
3109 types.remove(controller_type);
3110 tracing::debug!("Unregistered access controller type: {}", controller_type);
3111 }
3112
3113 fn get_access_controller_type(
3114 &self,
3115 controller_type: &str,
3116 ) -> Option<AccessControllerConstructor> {
3117 let types = self.access_controller_types.read();
3119 types.get(controller_type).cloned()
3120 }
3121
3122 fn event_bus(&self) -> EventBus {
3123 (*self.event_bus).clone()
3124 }
3125
3126 fn span(&self) -> &tracing::Span {
3127 &self.span
3128 }
3129
3130 fn tracer(&self) -> Arc<TracerWrapper> {
3131 Arc::new(TracerWrapper::OpenTelemetry(self.tracer.clone()))
3132 }
3133}