guardian_db/traits.rs
1use crate::access_controller::{
2 manifest::ManifestParams, traits::AccessController, traits::Option as AccessControllerOption,
3};
4use crate::address::Address;
5use crate::data_store::Datastore;
6use crate::error::GuardianError;
7use crate::events::{self, EmitterInterface};
8use crate::ipfs_core_api::client::IpfsClient;
9use crate::ipfs_log::{entry::Entry, identity::Identity, log::Log};
10use crate::p2p::events::EventBus;
11use crate::stores::{
12 operation::operation::Operation,
13 replicator::{replication_info::ReplicationInfo, replicator::Replicator},
14};
15use cid::Cid;
16use futures::stream::Stream;
17use libp2p::core::PeerId;
18use opentelemetry::global::{BoxedSpan, BoxedTracer};
19use opentelemetry::trace::{Tracer, noop::NoopTracer};
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize};
22use std::any::Any;
23use std::error::Error;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::mpsc;
29use tracing::Span;
30
31// Type aliases para reduzir complexidade de tipos
32type KeyExtractorFn =
33 Arc<dyn Fn(&serde_json::Value) -> Result<String, GuardianError> + Send + Sync>;
34type MarshalFn = Arc<dyn Fn(&serde_json::Value) -> Result<Vec<u8>, GuardianError> + Send + Sync>;
35type UnmarshalFn = Arc<dyn Fn(&[u8]) -> Result<serde_json::Value, GuardianError> + Send + Sync>;
36type ItemFactoryFn = Arc<dyn Fn() -> serde_json::Value + Send + Sync>;
37type CleanupCallback = Box<
38 dyn FnOnce() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> + Send + Sync,
39>;
40
41// Re-export from the canonical location to avoid duplication
42pub use crate::stores::replicator::traits::SortFn;
43
44// Type aliases para melhorar legibilidade de assinaturas complexas
45/// Alias para documentos dinâmicos thread-safe
46pub type Document = Box<dyn Any + Send + Sync>;
47
48/// Alias para resultado padrão com GuardianError
49pub type GuardianResult<T> = std::result::Result<T, GuardianError>;
50
51/// Alias para filtros de query assíncronos
52pub type AsyncDocumentFilter = Pin<
53 Box<
54 dyn Fn(
55 &Document,
56 )
57 -> Pin<Box<dyn Future<Output = Result<bool, Box<dyn Error + Send + Sync>>> + Send>>
58 + Send
59 + Sync,
60 >,
61>;
62
63/// Alias para callback de progresso
64pub type ProgressCallback = mpsc::Sender<Entry>;
65
66/// Wrapper para diferentes tipos de tracer, integrado com o sistema tracing
67///
68/// Este enum permite usar tanto tracers OpenTelemetry quanto o sistema
69/// tracing nativo do Rust de forma transparente.
70#[derive(Default)]
71pub enum TracerWrapper {
72 /// Tracer OpenTelemetry para observabilidade distribuída
73 OpenTelemetry(Arc<BoxedTracer>),
74 /// Tracer baseado no sistema tracing nativo do Rust
75 #[default]
76 Tracing,
77 /// Tracer noop para quando telemetria está desabilitada
78 Noop(NoopTracer),
79}
80
81impl Clone for TracerWrapper {
82 fn clone(&self) -> Self {
83 match self {
84 TracerWrapper::OpenTelemetry(tracer) => TracerWrapper::OpenTelemetry(tracer.clone()),
85 TracerWrapper::Tracing => TracerWrapper::Tracing,
86 TracerWrapper::Noop(_) => TracerWrapper::Noop(NoopTracer::new()),
87 }
88 }
89}
90
91impl TracerWrapper {
92 /// Cria um novo TracerWrapper usando o sistema tracing nativo
93 pub fn new_tracing() -> Self {
94 TracerWrapper::Tracing
95 }
96
97 /// Cria um novo TracerWrapper usando OpenTelemetry
98 pub fn new_opentelemetry(tracer: Arc<BoxedTracer>) -> Self {
99 TracerWrapper::OpenTelemetry(tracer)
100 }
101
102 /// Cria um TracerWrapper noop (sem operação)
103 pub fn new_noop() -> Self {
104 TracerWrapper::Noop(NoopTracer::new())
105 }
106
107 /// Inicia um novo span instrumentado
108 ///
109 /// Este método cria spans de forma consistente independentemente
110 /// do tipo de tracer sendo usado.
111 pub fn start_span(&self, name: &str) -> TracerSpan {
112 match self {
113 TracerWrapper::OpenTelemetry(tracer) => {
114 // Para OpenTelemetry, cria um span usando a trait Tracer
115 let span = tracer.start(name.to_string());
116 TracerSpan::OpenTelemetry(span)
117 }
118 TracerWrapper::Tracing => {
119 // Para tracing nativo, usa a macro tracing::span!
120 let span = tracing::info_span!("guardian_db", operation = name);
121 TracerSpan::Tracing(span)
122 }
123 TracerWrapper::Noop(_) => {
124 // Para noop, retorna um span vazio
125 TracerSpan::Noop
126 }
127 }
128 }
129
130 /// Verifica se o tracer está ativo (não é noop)
131 pub fn is_active(&self) -> bool {
132 !matches!(self, TracerWrapper::Noop(_))
133 }
134
135 /// Retorna o tipo do tracer como string para logs/debug
136 pub fn tracer_type(&self) -> &'static str {
137 match self {
138 TracerWrapper::OpenTelemetry(_) => "opentelemetry",
139 TracerWrapper::Tracing => "tracing",
140 TracerWrapper::Noop(_) => "noop",
141 }
142 }
143}
144
145/// Enum para representar diferentes tipos de spans instrumentados
146///
147/// Permite trabalhar com spans de diferentes sistemas de tracing
148/// de forma unificada.
149pub enum TracerSpan {
150 /// Span OpenTelemetry para observabilidade distribuída
151 OpenTelemetry(BoxedSpan),
152 /// Span do sistema tracing nativo do Rust
153 Tracing(tracing::Span),
154 /// Span noop para quando telemetria está desabilitada
155 Noop,
156}
157
158impl TracerSpan {
159 /// Adiciona um atributo/campo ao span
160 pub fn set_attribute<T: Into<opentelemetry::Value>>(&mut self, key: &str, value: T) {
161 match self {
162 TracerSpan::OpenTelemetry(span) => {
163 use opentelemetry::trace::Span as OtelSpan;
164 span.set_attribute(opentelemetry::KeyValue::new(key.to_string(), value));
165 }
166 TracerSpan::Tracing(span) => {
167 // Para tracing, registra como evento dentro do span
168 span.in_scope(|| {
169 tracing::info!(key = %format!("{:?}", value.into()), "span_attribute");
170 });
171 }
172 TracerSpan::Noop => {
173 // Noop - não faz nada
174 }
175 }
176 }
177
178 /// Registra um evento no span
179 pub fn add_event(&mut self, name: &str, attributes: Vec<(&str, &str)>) {
180 match self {
181 TracerSpan::OpenTelemetry(span) => {
182 use opentelemetry::trace::Span as OtelSpan;
183 let attrs: Vec<opentelemetry::KeyValue> = attributes
184 .into_iter()
185 .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string()))
186 .collect();
187 span.add_event(name.to_string(), attrs);
188 }
189 TracerSpan::Tracing(span) => {
190 // Para tracing, registra como evento estruturado
191 span.in_scope(|| {
192 let fields: std::collections::HashMap<&str, &str> =
193 attributes.into_iter().collect();
194 tracing::info!(event = name, ?fields, "span_event");
195 });
196 }
197 TracerSpan::Noop => {
198 // Noop - não faz nada
199 }
200 }
201 }
202
203 /// Marca o span como erro
204 pub fn set_error<E: std::fmt::Display>(&mut self, error: E) {
205 match self {
206 TracerSpan::OpenTelemetry(span) => {
207 use opentelemetry::trace::Span as OtelSpan;
208 span.set_status(opentelemetry::trace::Status::Error {
209 description: std::borrow::Cow::Owned(error.to_string()),
210 });
211 span.set_attribute(opentelemetry::KeyValue::new("error".to_string(), true));
212 span.set_attribute(opentelemetry::KeyValue::new(
213 "error.message".to_string(),
214 error.to_string(),
215 ));
216 }
217 TracerSpan::Tracing(span) => {
218 span.in_scope(|| {
219 tracing::error!(error = %error, "span_error");
220 });
221 }
222 TracerSpan::Noop => {
223 // Noop - não faz nada
224 }
225 }
226 }
227
228 /// Finaliza o span explicitamente
229 pub fn finish(mut self) {
230 match &mut self {
231 TracerSpan::OpenTelemetry(_span) => {
232 // OpenTelemetry spans são finalizados automaticamente no Drop
233 // Mas podemos marcar como concluído aqui se necessário
234 }
235 TracerSpan::Tracing(_span) => {
236 // Tracing spans são finalizados automaticamente quando saem de escopo
237 // Não é necessário fazer nada aqui
238 }
239 TracerSpan::Noop => {
240 // Noop - não faz nada
241 }
242 }
243 // O Drop será chamado automaticamente quando self sair de escopo
244 }
245}
246
247impl Drop for TracerSpan {
248 fn drop(&mut self) {
249 // Para OpenTelemetry, garantimos que o span seja finalizado
250 match self {
251 TracerSpan::OpenTelemetry(_span) => {
252 // OpenTelemetry spans são finalizados automaticamente quando Drop
253 // Não precisamos chamar end() explicitamente aqui
254 }
255 TracerSpan::Tracing(_) => {
256 // Tracing spans são finalizados automaticamente quando saem de escopo
257 }
258 TracerSpan::Noop => {
259 // Noop - não faz nada
260 }
261 }
262 }
263}
264
265#[derive(Serialize, Deserialize, Debug, Clone)]
266pub struct MessageExchangeHeads {
267 #[serde(rename = "address")]
268 pub address: String,
269
270 #[serde(rename = "heads")]
271 pub heads: Vec<Entry>,
272}
273
274pub trait MessageMarshaler: Send + Sync {
275 /// Define um tipo de erro associado para flexibilidade na implementação.
276 type Error: std::error::Error + Send + Sync + 'static;
277
278 /// Serializa uma mensagem para um vetor de bytes.
279 fn marshal(&self, msg: &MessageExchangeHeads) -> Result<Vec<u8>, Self::Error>;
280
281 /// Desserializa um vetor de bytes para uma mensagem.
282 fn unmarshal(&self, data: &[u8]) -> Result<MessageExchangeHeads, Self::Error>;
283}
284
285#[derive(Default)]
286pub struct CreateDBOptions {
287 pub event_bus: Option<EventBus>,
288 pub directory: Option<String>,
289 pub overwrite: Option<bool>,
290 pub local_only: Option<bool>,
291 pub create: Option<bool>,
292 pub store_type: Option<String>,
293 pub access_controller_address: Option<String>,
294 pub access_controller: Option<Box<dyn ManifestParams>>,
295 pub replicate: Option<bool>,
296 pub keystore: Option<Arc<dyn crate::ipfs_log::identity_provider::Keystore>>,
297 pub cache: Option<Arc<dyn Datastore>>,
298 pub identity: Option<Identity>,
299 pub sort_fn: Option<SortFn>,
300 pub timeout: Option<Duration>,
301 pub message_marshaler: Option<Arc<dyn MessageMarshaler<Error = GuardianError>>>,
302 pub span: Option<Span>,
303 pub close_func: Option<Box<dyn FnOnce() + Send>>,
304 pub store_specific_opts: Option<Box<dyn Any + Send + Sync>>,
305}
306
307impl Clone for CreateDBOptions {
308 fn clone(&self) -> Self {
309 Self {
310 event_bus: self.event_bus.clone(),
311 directory: self.directory.clone(),
312 overwrite: self.overwrite,
313 local_only: self.local_only,
314 create: self.create,
315 store_type: self.store_type.clone(),
316 access_controller_address: self.access_controller_address.clone(),
317 access_controller: None, // Cannot clone Box<dyn ManifestParams>
318 replicate: self.replicate,
319 keystore: self.keystore.clone(),
320 cache: self.cache.clone(),
321 identity: self.identity.clone(),
322 sort_fn: self.sort_fn,
323 timeout: self.timeout,
324 message_marshaler: self.message_marshaler.clone(),
325 span: self.span.clone(),
326 close_func: None, // Cannot clone Box<dyn FnOnce()>
327 store_specific_opts: None, // Cannot clone Box<dyn Any>
328 }
329 }
330}
331
332// Usando Arc<dyn Fn> em vez de Box<dyn Fn> para permitir clonagem
333pub type StoreConstructor = Arc<
334 dyn Fn(
335 Arc<IpfsClient>,
336 Arc<Identity>,
337 Box<dyn Address>,
338 NewStoreOptions,
339 ) -> Pin<
340 Box<
341 dyn Future<Output = Result<Box<dyn Store<Error = GuardianError>>, GuardianError>>
342 + Send,
343 >,
344 > + Send
345 + Sync,
346>;
347
348#[derive(Clone)]
349pub struct CreateDocumentDBOptions {
350 /// Extrai a chave de um documento genérico.
351 pub key_extractor: KeyExtractorFn,
352
353 /// Serializa um documento genérico para bytes.
354 pub marshal: MarshalFn,
355
356 /// Desserializa bytes para um documento genérico.
357 pub unmarshal: UnmarshalFn,
358
359 /// Cria uma nova instância vazia do tipo de item do documento.
360 pub item_factory: ItemFactoryFn,
361}
362
363#[derive(Default, Clone)]
364pub struct DetermineAddressOptions {
365 pub only_hash: Option<bool>,
366 pub replicate: Option<bool>,
367 pub access_controller: crate::access_controller::manifest::CreateAccessControllerOptions,
368}
369
370#[async_trait::async_trait]
371pub trait BaseGuardianDB: Send + Sync {
372 /// Define um tipo de erro associado para flexibilidade na implementação.
373 type Error: Error + Send + Sync + 'static;
374
375 /// Retorna a instância da API do IPFS.
376 fn ipfs(&self) -> Arc<IpfsClient>;
377
378 /// Retorna a identidade utilizada pelo GuardianDB.
379 fn identity(&self) -> Arc<Identity>;
380
381 /// Cria ou abre uma store com o endereço e opções fornecidos.
382 async fn open(
383 &self,
384 address: &str,
385 options: &mut CreateDBOptions,
386 ) -> Result<Arc<dyn Store<Error = GuardianError>>, Self::Error>;
387
388 /// Retorna uma instância da store se ela já estiver aberta.
389 fn get_store(&self, address: &str) -> Option<Arc<dyn Store<Error = GuardianError>>>;
390
391 /// Cria uma nova store com o nome, tipo e opções fornecidos.
392 async fn create(
393 &self,
394 name: &str,
395 store_type: &str,
396 options: &mut CreateDBOptions,
397 ) -> Result<Arc<dyn Store<Error = GuardianError>>, Self::Error>;
398
399 /// Determina o endereço de um banco de dados com base nos seus parâmetros.
400 async fn determine_address(
401 &self,
402 name: &str,
403 store_type: &str,
404 options: &DetermineAddressOptions,
405 ) -> Result<Box<dyn Address>, Self::Error>;
406
407 /// Registra um novo tipo de Store.
408 fn register_store_type(&mut self, store_type: &str, constructor: StoreConstructor);
409
410 /// Desregistra um tipo de Store.
411 fn unregister_store_type(&mut self, store_type: &str);
412
413 /// Registra um novo tipo de Access Controller.
414 fn register_access_controller_type(
415 &mut self,
416 constructor: AccessControllerConstructor,
417 ) -> Result<(), Self::Error>;
418
419 /// Desregistra um tipo de Access Controller.
420 fn unregister_access_controller_type(&mut self, controller_type: &str);
421
422 /// Obtém um construtor de Access Controller pelo seu tipo.
423 fn get_access_controller_type(
424 &self,
425 controller_type: &str,
426 ) -> Option<AccessControllerConstructor>;
427
428 /// Retorna o barramento de eventos.
429 fn event_bus(&self) -> EventBus;
430
431 /// Retorna o span para tracing.
432 fn span(&self) -> &tracing::Span;
433
434 /// Retorna o tracer para telemetria.
435 fn tracer(&self) -> Arc<TracerWrapper>;
436}
437
438/// Expõe um método para criar ou abrir uma `DocumentStore`.
439#[async_trait::async_trait]
440pub trait GuardianDBDocumentStoreProvider {
441 /// Define um tipo de erro associado para este trait.
442 type Error: Error + Send + Sync + 'static;
443
444 /// Cria ou abre uma DocumentStore.
445 async fn docs(
446 &self,
447 address: &str,
448 options: &mut CreateDBOptions,
449 ) -> Result<Box<dyn DocumentStore<Error = GuardianError>>, Self::Error>;
450}
451/// Combina as traits `BaseGuardianDB` e `GuardianDBDocumentStoreProvider`.
452pub trait GuardianDBDocumentStore: BaseGuardianDB + GuardianDBDocumentStoreProvider {}
453
454// Implementação "blanket" que aplica automaticamente a trait `GuardianDBDocumentStore`
455impl<T: BaseGuardianDB + GuardianDBDocumentStoreProvider> GuardianDBDocumentStore for T {}
456
457/// Expõe um método para criar ou abrir uma `KeyValueStore`.
458#[async_trait::async_trait]
459pub trait GuardianDBKVStoreProvider: Send + Sync {
460 /// Define um tipo de erro associado para este trait.
461 type Error: Error + Send + Sync + 'static;
462
463 /// Cria ou abre uma KeyValueStore.
464 async fn key_value(
465 &self,
466 address: &str,
467 options: &mut CreateDBOptions,
468 ) -> Result<Box<dyn KeyValueStore<Error = GuardianError>>, Self::Error>;
469}
470
471/// Combina as traits `BaseGuardianDB` e `GuardianDBKVStoreProvider`.
472pub trait GuardianDBKVStore: BaseGuardianDB + GuardianDBKVStoreProvider {}
473
474// Implementação "blanket" que aplica automaticamente a trait `GuardianDBKVStore`
475// a qualquer tipo que já satisfaça as condições.
476impl<T: BaseGuardianDB + GuardianDBKVStoreProvider> GuardianDBKVStore for T {}
477
478/// Expõe um método para criar ou abrir uma `EventLogStore`.
479#[async_trait::async_trait]
480pub trait GuardianDBLogStoreProvider {
481 /// Define um tipo de erro associado para este trait.
482 type Error: Error + Send + Sync + 'static;
483
484 /// Cria ou abre uma EventLogStore (um log de eventos append-only).
485 async fn log(
486 &self,
487 address: &str,
488 options: &mut CreateDBOptions,
489 ) -> Result<Box<dyn EventLogStore<Error = GuardianError>>, Self::Error>;
490}
491
492/// Combina as traits `BaseGuardianDB` e `GuardianDBLogStoreProvider`.
493pub trait GuardianDBLogStore: BaseGuardianDB + GuardianDBLogStoreProvider {}
494
495// Implementação "blanket" para `GuardianDBLogStore`.
496impl<T: BaseGuardianDB + GuardianDBLogStoreProvider> GuardianDBLogStore for T {}
497
498/// Combina todas as traits principais do GuardianDB.
499pub trait GuardianDB:
500 BaseGuardianDB
501 + GuardianDBKVStoreProvider
502 + GuardianDBLogStoreProvider
503 + GuardianDBDocumentStoreProvider
504{
505}
506
507// A implementação "blanket" permite que qualquer tipo que já satisfaça todas
508// as constraints seja automaticamente considerado `GuardianDB`.
509impl<
510 T: BaseGuardianDB
511 + GuardianDBKVStoreProvider
512 + GuardianDBLogStoreProvider
513 + GuardianDBDocumentStoreProvider,
514> GuardianDB for T
515{
516}
517
518#[derive(Default, Debug, Clone)]
519pub struct StreamOptions {
520 /// "Greater Than": Retorna entradas que são posteriores à CID fornecida.
521 pub gt: Option<Cid>,
522
523 /// "Greater Than or Equal": Retorna entradas que são a CID fornecida ou posteriores.
524 pub gte: Option<Cid>,
525
526 /// "Less Than": Retorna entradas que são anteriores à CID fornecida.
527 pub lt: Option<Cid>,
528
529 /// "Less Than or Equal": Retorna entradas que são a CID fornecida ou anteriores.
530 pub lte: Option<Cid>,
531
532 /// Limita o número de entradas a serem retornadas.
533 pub amount: Option<i32>,
534}
535
536pub trait StoreEvents {
537 fn subscribe(&mut self);
538}
539
540/// Define as operações comuns a todos os tipos de stores.
541#[async_trait::async_trait]
542pub trait Store: Send + Sync {
543 type Error: std::error::Error + Send + Sync + 'static;
544
545 #[deprecated(note = "use event_bus() instead")]
546 fn events(&self) -> &dyn EmitterInterface;
547
548 /// Fecha a store e libera seus recursos.
549 /// Modificado para aceitar &self em vez de &mut self para compatibilidade com Arc<T>
550 async fn close(&self) -> Result<(), Self::Error>;
551
552 /// Retorna o endereço da store.
553 fn address(&self) -> &dyn Address;
554
555 /// Retorna o índice da store, que mantém o estado atual dos dados.
556 /// Retorna Box para evitar problemas de lifetime com RwLock
557 fn index(&self) -> Box<dyn StoreIndex<Error = Self::Error> + Send + Sync>;
558
559 /// Retorna o tipo da store como uma string (ex: "eventlog", "kvstore").
560 fn store_type(&self) -> &str;
561
562 /// Retorna o status atual da replicação.
563 fn replication_status(&self) -> ReplicationInfo;
564
565 /// Retorna o replicador responsável pela sincronização de dados.
566 fn replicator(&self) -> Option<Arc<Replicator>>;
567
568 /// Retorna o cache da store.
569 fn cache(&self) -> Arc<dyn Datastore>;
570
571 /// Remove todo o conteúdo local da store.
572 async fn drop(&mut self) -> Result<(), Self::Error>;
573
574 /// Carrega as `amount` entradas mais recentes da rede.
575 async fn load(&mut self, amount: usize) -> Result<(), Self::Error>;
576
577 /// Sincroniza a store com uma lista de `heads` (entradas mais recentes) de outro par.
578 async fn sync(&mut self, heads: Vec<Entry>) -> Result<(), Self::Error>;
579
580 /// Carrega mais entradas a partir de um conjunto de CIDs conhecidos.
581 async fn load_more_from(&mut self, amount: u64, entries: Vec<Entry>);
582
583 /// Carrega o conteúdo da store a partir de um snapshot.
584 async fn load_from_snapshot(&mut self) -> Result<(), Self::Error>;
585
586 /// Retorna o log de operações (OpLog) subjacente.
587 /// Modificado para retornar Arc para evitar problemas de lifetime
588 fn op_log(&self) -> Arc<RwLock<Log>>;
589
590 /// Retorna a instância da API do IPFS.
591 fn ipfs(&self) -> Arc<IpfsClient>;
592
593 /// Retorna o nome do banco de dados.
594 fn db_name(&self) -> &str;
595
596 /// Retorna a identidade usada pela store.
597 fn identity(&self) -> &Identity;
598
599 /// Retorna o controlador de acesso da store.
600 fn access_controller(&self) -> &dyn AccessController;
601
602 /// Adiciona uma nova operação à store.
603 async fn add_operation(
604 &mut self,
605 op: Operation,
606 on_progress_callback: Option<ProgressCallback>,
607 ) -> Result<Entry, Self::Error>;
608
609 /// Retorna o span.
610 /// Modificado para retornar Arc para evitar problemas de lifetime
611 fn span(&self) -> Arc<Span>;
612
613 /// Retorna o tracer para telemetria.
614 fn tracer(&self) -> Arc<TracerWrapper>;
615
616 /// Retorna o barramento de eventos.
617 fn event_bus(&self) -> Arc<EventBus>;
618
619 /// Método auxiliar para downcast
620 fn as_any(&self) -> &dyn std::any::Any;
621}
622
623/// Uma store que se comporta como um log de eventos "append-only" distribuído.
624/// Herda todas as funcionalidades da trait `Store` e adiciona operações
625/// específicas para logs sequenciais imutáveis.
626///
627/// Ideal para casos de uso como auditoria, event sourcing, e sistemas
628/// que requerem histórico completo e ordenado de eventos.
629#[async_trait::async_trait]
630pub trait EventLogStore: Store {
631 /// Adiciona um novo dado ao log.
632 /// Os dados são anexados de forma sequencial e imutável.
633 ///
634 /// # Argumentos
635 /// * `data` - Os dados binários a serem adicionados ao log
636 ///
637 /// # Retorna
638 /// A operação ADD criada, contendo metadados do evento adicionado
639 async fn add(&mut self, data: Vec<u8>) -> Result<Operation, Self::Error>;
640
641 /// Obtém uma entrada específica do log pelo seu CID.
642 /// Permite acesso direto a qualquer entrada histórica.
643 ///
644 /// # Argumentos
645 /// * `cid` - O Content Identifier da entrada desejada
646 ///
647 /// # Retorna
648 /// A operação correspondente ao CID, ou erro se não encontrada
649 async fn get(&self, cid: Cid) -> Result<Operation, Self::Error>;
650
651 /// Retorna um stream de operações, com opções de filtro.
652 /// Em Rust, em vez de passar um canal, é idiomático retornar um `Stream`.
653 ///
654 /// # TODO
655 /// Esta funcionalidade requer implementação cuidadosa de Stream para evitar
656 /// problemas de lifetime. Por enquanto, use `list()` para casos síncronos.
657 ///
658 /// # Implementação futura
659 /// ```ignore
660 /// async fn stream(&self, options: Option<StreamOptions>)
661 /// -> Result<Pin<Box<dyn Stream<Item = Operation> + Send>>, Self::Error>;
662 /// ```
663 /// Retorna uma lista de operações que ocorreram na store, com opções de filtro.
664 /// Permite consultas históricas com critérios específicos de tempo/posição.
665 ///
666 /// # Argumentos
667 /// * `options` - Filtros opcionais para limitar/ordenar os resultados
668 ///
669 /// # Retorna
670 /// Lista ordenada de operações que atendem aos critérios
671 async fn list(&self, options: Option<StreamOptions>) -> Result<Vec<Operation>, Self::Error>;
672}
673
674/// Uma store que se comporta como um banco de dados chave-valor distribuído.
675/// Herda todas as funcionalidades da trait `Store` e adiciona operações
676/// específicas para pares chave-valor com semântica CRDT.
677///
678/// Todas as operações são replicadas automaticamente através da rede
679/// e mantêm consistência eventual entre os peers.
680#[async_trait::async_trait]
681pub trait KeyValueStore: Store {
682 /// Retorna todos os pares chave-valor da store em um mapa.
683 /// Esta operação lê o estado atual do índice local.
684 fn all(&self) -> std::collections::HashMap<String, Vec<u8>>;
685
686 /// Define um valor para uma chave específica.
687 /// Cria uma nova operação PUT no log distribuído que será replicada.
688 ///
689 /// # Argumentos
690 /// * `key` - A chave para associar ao valor (não pode estar vazia)
691 /// * `value` - Os dados binários a serem armazenados
692 ///
693 /// # Retorna
694 /// A operação PUT criada, ou erro se a operação falhar
695 async fn put(&mut self, key: &str, value: Vec<u8>) -> Result<Operation, Self::Error>;
696
697 /// Remove uma chave e seu valor associado.
698 /// Cria uma nova operação DEL no log distribuído que será replicada.
699 ///
700 /// # Argumentos
701 /// * `key` - A chave a ser removida
702 ///
703 /// # Retorna
704 /// A operação DEL criada, ou erro se a chave não existir ou operação falhar
705 async fn delete(&mut self, key: &str) -> Result<Operation, Self::Error>;
706
707 /// Obtém o valor associado a uma chave.
708 /// Consulta o índice local para o estado mais recente.
709 ///
710 /// # Argumentos
711 /// * `key` - A chave a ser procurada
712 ///
713 /// # Retorna
714 /// `Some(Vec<u8>)` se a chave existir, `None` se não existir, ou erro se houver falha no acesso
715 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Self::Error>;
716}
717
718/// Uma struct simples para passar opções ao método `get` de uma DocumentStore.
719#[derive(Default, Debug, Clone, Copy)]
720pub struct DocumentStoreGetOptions {
721 pub case_insensitive: bool,
722 pub partial_matches: bool,
723}
724
725#[derive(Default, Debug, Clone)]
726pub struct DocumentStoreQueryOptions {
727 pub limit: Option<usize>,
728 pub skip: Option<usize>,
729 pub sort: Option<String>,
730}
731
732/// Uma store que lida com documentos (objetos semi-estruturados).
733///
734/// Esta trait combina funcionalidades de store básica com operações específicas
735/// para documentos, incluindo consultas avançadas e operações em lote.
736#[async_trait::async_trait]
737pub trait DocumentStore: Store {
738 /// Armazena um único documento.
739 /// O documento deve implementar as traits Send + Sync para thread safety.
740 async fn put(&mut self, document: Document) -> Result<Operation, Self::Error>;
741
742 /// Deleta um documento pela sua chave.
743 /// Retorna a operação de deleção que foi aplicada ao log.
744 async fn delete(&mut self, key: &str) -> Result<Operation, Self::Error>;
745
746 /// Adiciona múltiplos documentos em operações separadas e retorna a última.
747 /// Cada documento é processado individualmente, criando uma entrada separada no log.
748 async fn put_batch(&mut self, values: Vec<Document>) -> Result<Operation, Self::Error>;
749
750 /// Adiciona múltiplos documentos em uma única operação e a retorna.
751 /// Todos os documentos são incluídos em uma única entrada do log.
752 async fn put_all(&mut self, values: Vec<Document>) -> Result<Operation, Self::Error>;
753
754 /// Recupera documentos por uma chave, com opções de busca.
755 /// Suporta busca case-insensitive e correspondências parciais baseadas nas opções.
756 async fn get(
757 &self,
758 key: &str,
759 opts: Option<DocumentStoreGetOptions>,
760 ) -> Result<Vec<Document>, Self::Error>;
761
762 /// Encontra documentos usando uma função de filtro (predicado).
763 async fn query(&self, filter: AsyncDocumentFilter) -> Result<Vec<Document>, Self::Error>;
764}
765
766/// Index contém o estado atual de uma store. Ele processa o log de
767/// operações (`OpLog`) para construir a visão mais recente dos dados,
768/// implementando a lógica do CRDT.
769pub trait StoreIndex: Send + Sync {
770 type Error: Error + Send + Sync + 'static;
771
772 /// Verifica se uma chave existe no índice.
773 /// Método seguro que não requer acesso aos dados em si.
774 fn contains_key(&self, key: &str) -> std::result::Result<bool, Self::Error>;
775
776 /// Retorna uma cópia dos dados para uma chave específica como bytes.
777 /// Método seguro que funciona com qualquer implementação de sincronização.
778 fn get_bytes(&self, key: &str) -> std::result::Result<Option<Vec<u8>>, Self::Error>;
779
780 /// Retorna todas as chaves disponíveis no índice.
781 /// Útil para iteração e operações de listagem.
782 fn keys(&self) -> std::result::Result<Vec<String>, Self::Error>;
783
784 /// Retorna o número de entradas no índice.
785 fn len(&self) -> std::result::Result<usize, Self::Error>;
786
787 /// Verifica se o índice está vazio.
788 fn is_empty(&self) -> std::result::Result<bool, Self::Error>;
789
790 /// Atualiza o índice aplicando novas entradas do log de operações.
791 /// Recebe `&mut self` pois este método modifica o estado do índice.
792 fn update_index(
793 &mut self,
794 log: &Log,
795 entries: &[Entry],
796 ) -> std::result::Result<(), Self::Error>;
797
798 /// Limpa todos os dados do índice.
799 /// Útil para reset ou reconstrução completa.
800 fn clear(&mut self) -> std::result::Result<(), Self::Error>;
801
802 // === MÉTODOS OPCIONAIS PARA OTIMIZAÇÃO ===
803
804 /// Retorna um range de entradas completas (se suportado pelo índice).
805 ///
806 /// Este método opcional permite que índices que mantêm Entry completas
807 /// exponham acesso direto otimizado para queries de range.
808 ///
809 /// # Argumentos
810 ///
811 /// * `start` - Índice inicial (inclusivo)
812 /// * `end` - Índice final (exclusivo)
813 ///
814 /// # Retorna
815 ///
816 /// `Some(Vec<Entry>)` se o índice suporta acesso direto a Entry
817 /// `None` se o índice não suporta ou range inválido
818 ///
819 /// # Performance
820 ///
821 /// - O(1) para validação de range
822 /// - O(end - start) para coleção dos resultados
823 /// - Evita deserialização de bytes para Entry
824 fn get_entries_range(&self, _start: usize, _end: usize) -> Option<Vec<Entry>> {
825 // ***Implementação padrão retorna None - índices que suportam podem override
826 None
827 }
828
829 /// Retorna as últimas N entradas (se suportado pelo índice).
830 ///
831 /// Otimização comum para EventLogStore onde frequentemente
832 /// queremos as entradas mais recentes.
833 ///
834 /// # Argumentos
835 ///
836 /// * `count` - Número de entradas a retornar
837 ///
838 /// # Retorna
839 ///
840 /// `Some(Vec<Entry>)` se o índice suporta acesso direto
841 /// `None` se não suportado
842 fn get_last_entries(&self, _count: usize) -> Option<Vec<Entry>> {
843 // ***Implementação padrão retorna None
844 None
845 }
846
847 /// Retorna uma Entry específica por CID (se suportado pelo índice).
848 ///
849 /// Permite busca O(1) ou O(log n) por CID ao invés de busca linear.
850 ///
851 /// # Argumentos
852 ///
853 /// * `cid` - Content Identifier da entrada desejada
854 ///
855 /// # Retorna
856 ///
857 /// `Some(Entry)` se encontrada e suportada
858 /// `None` se não encontrada ou não suportada
859 fn get_entry_by_cid(&self, _cid: &Cid) -> Option<Entry> {
860 // ***Implementação padrão retorna None
861 None
862 }
863
864 /// Verifica se o índice suporta queries otimizadas com Entry completas.
865 ///
866 /// Permite que o código cliente determine se pode usar os métodos
867 /// opcionais de otimização.
868 fn supports_entry_queries(&self) -> bool {
869 // ***Implementação padrão retorna false
870 false
871 }
872}
873
874/// Opções detalhadas para a criação de uma nova instância de Store.
875/// Esta struct é o ponto central de configuração para todas as funcionalidades
876/// avançadas de uma store, incluindo índices, cache, replicação e telemetria.
877pub struct NewStoreOptions {
878 // === CORE CONFIGURATION ===
879 /// Barramento de eventos para comunicação interna
880 pub event_bus: Option<EventBus>,
881
882 /// Construtor do índice personalizado para a store
883 pub index: Option<IndexConstructor>,
884
885 /// Controlador de acesso para permissões e autenticação
886 pub access_controller: Option<Arc<dyn AccessController>>,
887
888 /// Diretório base para armazenamento de dados
889 pub directory: String,
890
891 /// Função de ordenação personalizada para entradas do log
892 pub sort_fn: Option<SortFn>,
893
894 // === NETWORKING & P2P ===
895 /// Identificador único do peer na rede P2P
896 pub peer_id: PeerId,
897
898 /// Interface PubSub para comunicação distribuída
899 pub pubsub: Option<Arc<dyn PubSubInterface<Error = GuardianError>>>,
900
901 /// Canal direto para comunicação peer-to-peer
902 pub direct_channel: Option<Arc<dyn DirectChannel<Error = GuardianError>>>,
903
904 /// Marshaler para serialização de mensagens de rede
905 pub message_marshaler: Option<Arc<dyn MessageMarshaler<Error = GuardianError>>>,
906
907 // === PERFORMANCE & STORAGE ===
908 /// Sistema de cache para otimização de acesso a dados
909 pub cache: Option<Arc<dyn Datastore>>,
910
911 /// Callback para destruição do cache (pode falhar)
912 pub cache_destroy: Option<CleanupCallback>,
913
914 /// Número de workers para replicação concorrente
915 pub replication_concurrency: Option<u32>,
916
917 /// Contador de referências para garbage collection
918 pub reference_count: Option<i32>,
919
920 /// Limite máximo de entradas no histórico
921 pub max_history: Option<i32>,
922
923 // === BEHAVIOR FLAGS ===
924 /// Habilita/desabilita replicação automática
925 pub replicate: Option<bool>,
926
927 // === OBSERVABILITY ===
928 /// Sistema de logging estruturado
929 pub span: Option<Span>,
930
931 /// Tracer para telemetria distribuída (OpenTelemetry)
932 pub tracer: Option<Arc<TracerWrapper>>,
933
934 // === LIFECYCLE MANAGEMENT ===
935 /// Callback executado no fechamento da store
936 pub close_func: Option<Box<dyn FnOnce() + Send>>,
937
938 // === EXTENSIBILITY ===
939 /// Opções específicas do tipo de store (extensibilidade)
940 /// Permite que diferentes tipos de store tenham configurações customizadas
941 pub store_specific_opts: Option<Box<dyn Any + Send + Sync>>,
942}
943
944impl Default for NewStoreOptions {
945 fn default() -> Self {
946 let peer_id = PeerId::random();
947
948 Self {
949 event_bus: None,
950 index: None,
951 access_controller: None,
952 directory: String::new(),
953 sort_fn: None,
954 peer_id,
955 pubsub: None,
956 direct_channel: None,
957 message_marshaler: None,
958 cache: None,
959 cache_destroy: None,
960 replication_concurrency: None,
961 reference_count: None,
962 max_history: None,
963 replicate: None,
964 span: None,
965 tracer: None,
966 close_func: None,
967 store_specific_opts: None,
968 }
969 }
970}
971
972/// Opções para configurar um `DirectChannel`.
973#[derive(Default, Clone)]
974pub struct DirectChannelOptions {
975 pub span: Option<Span>,
976}
977
978/// Trait para a comunicação direta com outro par na rede.
979#[async_trait::async_trait]
980pub trait DirectChannel: Send + Sync + std::any::Any {
981 type Error: Error + Send + Sync + 'static;
982
983 /// Espera até que a conexão com o outro par seja estabelecida.
984 async fn connect(&mut self, peer: PeerId) -> Result<(), Self::Error>;
985
986 /// Envia dados para o outro par.
987 async fn send(&mut self, peer: PeerId, data: Vec<u8>) -> Result<(), Self::Error>;
988
989 /// Fecha a conexão.
990 async fn close(&mut self) -> Result<(), Self::Error>;
991
992 /// Fecha a conexão usando referência compartilhada (&self).
993 /// Este método permite fechar o canal quando usado dentro de Arc<>.
994 async fn close_shared(&self) -> Result<(), Self::Error>;
995
996 /// Método auxiliar para downcast
997 fn as_any(&self) -> &dyn std::any::Any;
998}
999
1000/// Define o conteúdo de uma mensagem recebida via pubsub ou canal direto.
1001/// Esta struct é necessária para a definição de `DirectChannelEmitter`.
1002#[derive(Debug, Clone)]
1003pub struct EventPubSubPayload {
1004 pub payload: Vec<u8>,
1005 pub peer: PeerId,
1006}
1007
1008/// Uma trait usada para emitir eventos recebidos de um `DirectChannel`.
1009#[async_trait::async_trait]
1010pub trait DirectChannelEmitter: Send + Sync {
1011 type Error: Error + Send + Sync + 'static;
1012
1013 /// Emite um payload recebido.
1014 async fn emit(&self, payload: EventPubSubPayload) -> Result<(), Self::Error>;
1015
1016 /// Fecha o emissor.
1017 async fn close(&self) -> Result<(), Self::Error>;
1018}
1019
1020/// Uma fábrica para criar instâncias de `DirectChannel`.
1021pub type DirectChannelFactory = Arc<
1022 dyn Fn(
1023 Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
1024 Option<DirectChannelOptions>,
1025 ) -> Pin<
1026 Box<
1027 dyn Future<
1028 Output = Result<
1029 Arc<dyn DirectChannel<Error = GuardianError>>,
1030 Box<dyn Error + Send + Sync>,
1031 >,
1032 > + Send,
1033 >,
1034 > + Send
1035 + Sync,
1036>;
1037
1038/// Define o protótipo de uma função (ou closure) que constrói e retorna
1039/// uma nova instância de um `StoreIndex`.
1040pub type IndexConstructor =
1041 Box<dyn Fn(&[u8]) -> Box<dyn StoreIndex<Error = GuardianError>> + Send + Sync>;
1042
1043/// Um protótipo para a função de callback que é acionada quando novas entradas
1044/// (`Entry`) são escritas na store. É um tipo de função assíncrona.
1045pub type OnWritePrototype = Box<
1046 dyn Fn(
1047 Cid,
1048 Entry,
1049 Vec<Cid>,
1050 )
1051 -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send>>
1052 + Send
1053 + Sync,
1054>;
1055
1056/// Representa uma nova mensagem recebida em um tópico pub/sub.
1057#[derive(Debug, Clone)]
1058pub struct EventPubSubMessage {
1059 pub content: Vec<u8>,
1060}
1061
1062/// Define o protótipo para um construtor de `AccessController`.
1063pub type AccessControllerConstructor = Arc<
1064 dyn Fn(
1065 Arc<dyn BaseGuardianDB<Error = GuardianError>>,
1066 &crate::access_controller::manifest::CreateAccessControllerOptions,
1067 Option<Vec<AccessControllerOption>>,
1068 )
1069 -> Pin<Box<dyn Future<Output = Result<Arc<dyn AccessController>, GuardianError>> + Send>>
1070 + Send
1071 + Sync,
1072>;
1073
1074/// Representa a inscrição em um tópico pub/sub específico.
1075#[async_trait::async_trait]
1076pub trait PubSubTopic: Send + Sync {
1077 type Error: Error + Send + Sync + 'static;
1078
1079 /// Publica uma nova mensagem no tópico.
1080 async fn publish(&self, message: Vec<u8>) -> Result<(), Self::Error>;
1081
1082 /// Lista os pares (peers) conectados a este tópico.
1083 async fn peers(&self) -> Result<Vec<PeerId>, Self::Error>;
1084
1085 /// Observa os pares que entram e saem do tópico.
1086 async fn watch_peers(
1087 &self,
1088 ) -> Result<Pin<Box<dyn Stream<Item = events::Event> + Send>>, Self::Error>;
1089
1090 /// Observa as novas mensagens publicadas no tópico.
1091 async fn watch_messages(
1092 &self,
1093 ) -> Result<Pin<Box<dyn Stream<Item = EventPubSubMessage> + Send>>, Self::Error>;
1094
1095 /// Retorna o nome do tópico.
1096 fn topic(&self) -> &str;
1097}
1098
1099/// Trait principal do sistema pub/sub.
1100#[async_trait::async_trait]
1101pub trait PubSubInterface: Send + Sync + std::any::Any {
1102 type Error: Error + Send + Sync + 'static;
1103
1104 /// Inscreve-se em um tópico.
1105 async fn topic_subscribe(
1106 &mut self,
1107 topic: &str,
1108 ) -> Result<Arc<dyn PubSubTopic<Error = GuardianError>>, Self::Error>;
1109
1110 /// Método auxiliar para downcast
1111 fn as_any(&self) -> &dyn std::any::Any;
1112}
1113
1114/// Opções para a criação de uma inscrição em um tópico Pub/Sub.
1115#[derive(Default, Clone)]
1116pub struct PubSubSubscriptionOptions {
1117 pub span: Option<Span>,
1118 pub tracer: Option<Arc<TracerWrapper>>,
1119}
1120
1121/// EventPubSub::Leave
1122/// Representa um evento disparado quando um par (peer) sai
1123/// de um tópico do canal Pub/Sub.
1124///
1125/// EventPubSub::Join
1126/// Representa um evento disparado quando um par (peer) entra
1127/// em um tópico do canal Pub/Sub.
1128#[derive(Debug, Clone, PartialEq, Eq)]
1129pub enum EventPubSub {
1130 Join { topic: String, peer: PeerId },
1131 Leave { topic: String, peer: PeerId },
1132}