guardian_db/stores/event_log_store/
log.rs

1use crate::data_store::Datastore;
2use crate::error::{GuardianError, Result};
3use crate::ipfs_core_api::client::IpfsClient;
4use crate::ipfs_log::{entry::Entry, identity::Identity};
5use crate::p2p::events::EventBus;
6use crate::stores::base_store::base_store::BaseStore;
7use crate::stores::operation::{operation, operation::Operation};
8use crate::traits::{self, EventLogStore, Store, StreamOptions};
9use crate::{address::Address, stores::event_log_store::index::new_event_index};
10use cid::Cid;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{Span, instrument};
14
15/// Implementação do trait `EventLogStore` para `GuardianDBEventLogStore`.
16#[async_trait::async_trait]
17impl EventLogStore for GuardianDBEventLogStore {
18    /// Adiciona um novo dado ao log.
19    async fn add(&mut self, data: Vec<u8>) -> std::result::Result<Operation, Self::Error> {
20        // Chama o método interno da struct GuardianDBEventLogStore
21        GuardianDBEventLogStore::add(self, data).await
22    }
23
24    /// Obtém uma entrada específica do log pelo seu CID.
25    async fn get(&self, cid: Cid) -> std::result::Result<Operation, Self::Error> {
26        // Chama o método interno da struct GuardianDBEventLogStore
27        GuardianDBEventLogStore::get(self, cid).await
28    }
29
30    /// Retorna uma lista de operações que ocorreram na store, com opções de filtro.
31    async fn list(
32        &self,
33        options: Option<StreamOptions>,
34    ) -> std::result::Result<Vec<Operation>, Self::Error> {
35        // Chama o método interno da struct GuardianDBEventLogStore
36        GuardianDBEventLogStore::list(self, options).await
37    }
38}
39
40pub struct GuardianDBEventLogStore {
41    basestore: Arc<BaseStore>,
42    span: Span,
43}
44
45// Implementação da trait Store (que é herdada por EventLogStore)
46#[async_trait::async_trait]
47impl Store for GuardianDBEventLogStore {
48    type Error = GuardianError;
49
50    #[allow(deprecated)]
51    fn events(&self) -> &dyn crate::events::EmitterInterface {
52        self.basestore.events()
53    }
54
55    async fn close(&self) -> std::result::Result<(), Self::Error> {
56        self.basestore.close().await
57    }
58
59    fn address(&self) -> &dyn crate::address::Address {
60        Store::address(self.basestore.as_ref())
61    }
62
63    fn index(&self) -> Box<dyn crate::traits::StoreIndex<Error = GuardianError> + Send + Sync> {
64        self.basestore.index()
65    }
66
67    fn store_type(&self) -> &str {
68        "eventlog"
69    }
70
71    fn replication_status(&self) -> crate::stores::replicator::replication_info::ReplicationInfo {
72        self.basestore.replication_status()
73    }
74
75    fn replicator(&self) -> Option<Arc<crate::stores::replicator::replicator::Replicator>> {
76        self.basestore.replicator()
77    }
78
79    fn cache(&self) -> Arc<dyn Datastore> {
80        self.basestore.cache()
81    }
82
83    async fn drop(&mut self) -> std::result::Result<(), Self::Error> {
84        // ***BaseStore não tem método async drop público, então implementamos uma limpeza básica
85        // A limpeza real é feita automaticamente quando o BaseStore é dropped
86        Ok(())
87    }
88
89    async fn load(&mut self, amount: usize) -> std::result::Result<(), Self::Error> {
90        self.basestore.load(Some(amount as isize)).await
91    }
92
93    async fn sync(
94        &mut self,
95        heads: Vec<crate::ipfs_log::entry::Entry>,
96    ) -> std::result::Result<(), Self::Error> {
97        self.basestore.sync(heads).await
98    }
99
100    async fn load_more_from(&mut self, _amount: u64, entries: Vec<crate::ipfs_log::entry::Entry>) {
101        let _ = self.basestore.load_more_from(entries);
102    }
103
104    async fn load_from_snapshot(&mut self) -> std::result::Result<(), Self::Error> {
105        self.basestore.load_from_snapshot().await
106    }
107
108    fn op_log(&self) -> Arc<parking_lot::RwLock<crate::ipfs_log::log::Log>> {
109        self.basestore.op_log()
110    }
111
112    fn ipfs(&self) -> Arc<crate::ipfs_core_api::client::IpfsClient> {
113        self.basestore.ipfs()
114    }
115
116    fn db_name(&self) -> &str {
117        self.basestore.db_name()
118    }
119
120    fn identity(&self) -> &Identity {
121        self.basestore.identity()
122    }
123
124    fn access_controller(&self) -> &dyn crate::access_controller::traits::AccessController {
125        self.basestore.access_controller()
126    }
127
128    async fn add_operation(
129        &mut self,
130        op: Operation,
131        on_progress_callback: Option<tokio::sync::mpsc::Sender<crate::ipfs_log::entry::Entry>>,
132    ) -> std::result::Result<crate::ipfs_log::entry::Entry, Self::Error> {
133        self.basestore.add_operation(op, on_progress_callback).await
134    }
135
136    fn span(&self) -> Arc<tracing::Span> {
137        Arc::new(self.span.clone())
138    }
139
140    fn tracer(&self) -> Arc<crate::traits::TracerWrapper> {
141        self.basestore.tracer()
142    }
143
144    fn event_bus(&self) -> Arc<EventBus> {
145        self.basestore.event_bus()
146    }
147
148    fn as_any(&self) -> &dyn std::any::Any {
149        self
150    }
151}
152
153impl GuardianDBEventLogStore {
154    /// Getter para acessar o BaseStore interno
155    pub fn basestore(&self) -> &BaseStore {
156        &self.basestore
157    }
158
159    /// Retorna uma referência ao span de tracing para instrumentação
160    pub fn span(&self) -> &Span {
161        &self.span
162    }
163
164    /// Instancia uma nova EventLogStore, adaptada para usar cliente nativo Iroh.
165    ///
166    /// # Argumentos
167    ///
168    /// * `ipfs_client` - Cliente IPFS compartilhado via Arc para operações de rede
169    /// * `identity` - Identidade do nó para assinatura de entradas
170    /// * `addr` - Endereço da store para identificação única
171    /// * `options` - Opções de configuração da store (índice, cache, etc.)
172    ///
173    /// # Retorna
174    ///
175    /// Uma nova instância de `GuardianDBEventLogStore` configurada e pronta para uso
176    ///
177    /// # Erros
178    ///
179    /// Retorna `GuardianError::Store` se:
180    /// - A inicialização do BaseStore falhar
181    /// - As opções de configuração forem inválidas
182    #[instrument(level = "debug", skip(ipfs_client, identity, addr, options))]
183    pub async fn new(
184        ipfs_client: Arc<IpfsClient>,
185        identity: Arc<Identity>,
186        addr: Arc<dyn Address + Send + Sync>,
187        mut options: traits::NewStoreOptions,
188    ) -> Result<Self> {
189        // Validação básica dos parâmetros - verifica se os componentes essenciais existem
190        if addr.to_string().is_empty() {
191            return Err(GuardianError::Store(
192                "Invalid address provided, cannot create EventLogStore".to_string(),
193            ));
194        }
195
196        options.index = Some(Box::new(new_event_index));
197        // Inicializa o BaseStore com as opções fornecidas
198        let basestore = BaseStore::new(ipfs_client, identity, addr.clone(), Some(options))
199            .await
200            .map_err(|e| {
201                GuardianError::Store(format!(
202                    "Failed to initialize base store for EventLogStore: {}",
203                    e
204                ))
205            })?;
206
207        // Cria span para esta instância da EventLogStore
208        let span = tracing::info_span!("event_log_store", address = %addr.to_string());
209
210        Ok(GuardianDBEventLogStore { basestore, span })
211    }
212
213    /// Coleta todas as operações de uma stream em um vetor.
214    #[instrument(level = "debug", skip(self, options))]
215    pub async fn list(&self, options: Option<StreamOptions>) -> Result<Vec<Operation>> {
216        let _entered = self.span.enter();
217        let (tx, mut rx) = mpsc::channel(1);
218
219        // Para simplificar, vamos executar diretamente em vez de spawn
220        self.stream(tx, options).await?;
221
222        let mut operations = Vec::new();
223        while let Some(op) = rx.recv().await {
224            operations.push(op);
225        }
226
227        Ok(operations)
228    }
229
230    /// Cria e adiciona uma nova operação "ADD" ao log.
231    ///
232    /// # Argumentos
233    ///
234    /// * `value` - Dados em bytes para adicionar ao log
235    ///
236    /// # Retorna
237    ///
238    /// A operação criada com seu CID único
239    ///
240    /// # Erros
241    ///
242    /// - Se os dados estiverem vazios (opcional, dependendo da política)
243    /// - Se a adição ao BaseStore falhar
244    /// - Se a conversão Entry -> Operation falhar
245    #[instrument(level = "debug", skip(self, value))]
246    pub async fn add(&mut self, value: Vec<u8>) -> Result<Operation> {
247        // Validação opcional: verificar se há dados
248        if value.is_empty() {
249            return Err(GuardianError::Store(
250                "Cannot add empty data to EventLogStore".to_string(),
251            ));
252        }
253
254        let op = Operation::new(None, "ADD".to_string(), Some(value));
255
256        // `add_operation` retorna um `Entry`.
257        let entry = self
258            .add_operation(op, None)
259            .await
260            .map_err(|e| GuardianError::Store(format!("Failed to add operation to log: {}", e)))?;
261
262        // `parse_operation` converte o `Entry` de volta para uma `Operation`.
263        let op_result = operation::parse_operation(entry).map_err(|e| {
264            GuardianError::Store(format!("Failed to parse newly created entry: {}", e))
265        })?;
266
267        Ok(op_result)
268    }
269
270    /// Recupera uma única operação do log pelo seu CID.
271    ///
272    /// # Argumentos
273    ///
274    /// * `cid` - Content Identifier da entrada desejada
275    ///
276    /// # Retorna
277    ///
278    /// A operação correspondente ao CID fornecido
279    ///
280    /// # Erros
281    ///
282    /// - Se o CID não for encontrado no log
283    /// - Se a stream não retornar resultados
284    #[instrument(level = "debug", skip(self))]
285    pub async fn get(&self, cid: Cid) -> Result<Operation> {
286        let _entered = self.span.enter();
287        let (tx, mut rx) = mpsc::channel(1);
288
289        let stream_options = StreamOptions {
290            gte: Some(cid),
291            amount: Some(1),
292            ..Default::default()
293        };
294
295        // Para simplificar, vamos executar diretamente
296        self.stream(tx, Some(stream_options)).await?;
297
298        // Aguarda o primeiro valor
299        if let Some(value) = rx.recv().await {
300            Ok(value)
301        } else {
302            Err(GuardianError::Store(format!(
303                "No operation found for CID: {}",
304                cid
305            )))
306        }
307    }
308
309    /// Busca entradas, as converte em operações e as envia através de um canal.
310    #[instrument(level = "debug", skip(self, result_chan, options))]
311    pub async fn stream(
312        &self,
313        result_chan: mpsc::Sender<Operation>,
314        options: Option<StreamOptions>,
315    ) -> Result<()> {
316        // A função `query` retorna as entradas (entries) do log.
317        let messages = self
318            .query(options)
319            .map_err(|e| GuardianError::Store(format!("unable to fetch query results: {}", e)))?;
320
321        for message in messages {
322            // Converte cada entrada em uma Operação.
323            let op = operation::parse_operation(message)
324                .map_err(|e| GuardianError::Store(format!("unable to parse operation: {}", e)))?;
325
326            // Envia a operação pelo canal. Se o receptor for fechado, o envio falhará
327            // e o loop será interrompido, o que é o comportamento esperado.
328            if result_chan.send(op).await.is_err() {
329                // O receptor foi fechado, então podemos parar de enviar.
330                break;
331            }
332        }
333
334        // Em Rust, o canal é fechado automaticamente quando `result_chan` (o Sender)
335        // sai de escopo, então uma chamada explícita como `close(resultChan)` não é necessária.
336        Ok(())
337    }
338
339    /// Executa a lógica de busca no índice do log com base nas opções de filtro.
340    ///
341    /// # Performance
342    ///
343    /// - Usa o índice quando disponível para queries otimizadas
344    /// - Fallback para acesso direto ao oplog quando necessário
345    /// - Suporta filtros por CID (gt, gte, lt, lte) e limitação de quantidade
346    #[instrument(level = "debug", skip(self, options))]
347    fn query(&self, options: Option<StreamOptions>) -> Result<Vec<Entry>> {
348        let options = options.unwrap_or_default();
349
350        // Tenta usar o índice primeiro para melhor performance
351        let events = match self.basestore.with_index(|index| {
352            // Implementa busca otimizada no índice baseada nas StreamOptions
353            self.optimized_index_query(index, &options)
354        }) {
355            Some(Some(indexed_results)) => indexed_results,
356            _ => {
357                // Fallback: acessa o oplog diretamente quando índice não está disponível
358                // ou não suporta a query específica
359                self.basestore.with_oplog(|log| {
360                    log.values()
361                        .iter()
362                        .map(|arc_entry| (**arc_entry).clone())
363                        .collect::<Vec<_>>()
364                })
365            }
366        };
367
368        // Calcula a quantidade de itens a serem retornados.
369        let amount = match options.amount {
370            Some(a) if a > -1 => a as usize,
371            _ => events.len(), // Se amount for -1 ou None, pega todos.
372        };
373
374        if options.gt.is_some() || options.gte.is_some() {
375            // Caso "maior que" (Greater Than)
376            let cid = options.gt.or(options.gte).unwrap();
377            let inclusive = options.gte.is_some();
378            return Ok(self.read(&events, Some(cid), amount, inclusive));
379        }
380
381        let cid = options.lt.or(options.lte);
382
383        // Caso "menor que" (Lower Than) ou N últimos.
384        // Inverte os eventos para buscar dos mais recentes para os mais antigos.
385        let mut events = events;
386        events.reverse();
387
388        // A busca é inclusiva se LTE for definido ou se nenhum limite (LT/LTE) for definido.
389        let inclusive = options.lte.is_some() || cid.is_none();
390        let mut result = self.read(&events, cid, amount, inclusive);
391
392        // Desfaz a inversão do resultado para manter a ordem cronológica original.
393        result.reverse();
394
395        Ok(result)
396    }
397
398    /// Função auxiliar para ler uma fatia de entradas a partir de um hash.
399    ///
400    /// # Argumentos
401    ///
402    /// * `ops` - Slice de entradas para filtrar
403    /// * `hash` - CID opcional para usar como ponto de início
404    /// * `amount` - Quantidade máxima de entradas a retornar
405    /// * `inclusive` - Se deve incluir a entrada com o hash fornecido
406    ///
407    /// # Retorna
408    ///
409    /// Vetor de entradas filtradas baseado nos critérios
410    ///
411    /// # Performance
412    ///
413    /// - O(n) para encontrar o índice inicial por hash
414    /// - O(amount) para coletar os resultados
415    /// - Otimizada para uso com iteradores Rust
416    fn read(&self, ops: &[Entry], hash: Option<Cid>, amount: usize, inclusive: bool) -> Vec<Entry> {
417        if amount == 0 {
418            return Vec::new();
419        }
420
421        // Encontra o índice inicial.
422        let mut start_index = 0;
423        if let Some(h) = hash {
424            if let Some(idx) = ops.iter().position(|e| e.hash() == h.to_string()) {
425                start_index = idx;
426            } else {
427                // Se o hash não for encontrado, não há o que retornar.
428                return Vec::new();
429            }
430        }
431
432        // Se não for inclusivo, começa a partir do próximo elemento.
433        if !inclusive {
434            start_index += 1;
435        }
436
437        // Limita a quantidade de elementos e coleta o resultado.
438        ops.iter().skip(start_index).take(amount).cloned().collect()
439    }
440
441    /// Busca otimizada no índice baseada nas StreamOptions.
442    ///
443    /// Utiliza os novos métodos opcionais do trait StoreIndex
444    ///
445    /// # Argumentos
446    ///
447    /// * `index` - Referência ao índice da store
448    /// * `options` - Opções de filtro da stream
449    ///
450    /// # Retorna
451    ///
452    /// `Some(Vec<Entry>)` se conseguir processar a query otimizada
453    /// `None` se deve usar fallback (oplog direto)
454    ///
455    /// # Casos Otimizados (Implementados)
456    ///
457    /// 1. **Amount-only queries**: Últimas N entradas usando `get_last_entries()`
458    /// 2. **Range queries**: Faixas específicas usando `get_entries_range()`
459    /// 3. **CID queries**: Busca por CID usando `get_entry_by_cid()`
460    ///
461    /// # Casos de Fallback
462    ///
463    /// 1. **Índice não suporta Entry**: `supports_entry_queries()` retorna false
464    /// 2. **Queries complexas**: Combinações não otimizadas
465    /// 3. **Índice vazio**: Nenhuma entrada disponível
466    ///
467    /// # Performance
468    ///
469    /// - **get_last_entries()**: O(k) onde k = número de entradas solicitadas
470    /// - **get_entry_by_cid()**: O(n) atual, O(1) futuro com índice por CID
471    /// - **get_entries_range()**: O(k) onde k = tamanho do range
472    fn optimized_index_query(
473        &self,
474        index: &dyn crate::traits::StoreIndex<Error = GuardianError>,
475        options: &StreamOptions,
476    ) -> Option<Vec<Entry>> {
477        // Verifica se o índice suporta queries otimizadas com Entry completas
478        if !index.supports_entry_queries() {
479            return None; // Fallback para oplog
480        }
481
482        // Validação rápida: verifica se o índice tem dados
483        let total_entries = match index.len() {
484            Ok(len) if len > 0 => len,
485            _ => return None, // Índice vazio - usa fallback
486        };
487
488        // Query simples por quantidade (caso mais comum)
489        let is_simple_amount_query = options.gt.is_none()
490            && options.gte.is_none()
491            && options.lt.is_none()
492            && options.lte.is_none();
493
494        if is_simple_amount_query {
495            let amount = match options.amount {
496                Some(a) if a > 0 => (a as usize).min(total_entries),
497                Some(-1) | None => total_entries, // -1 ou None significa "todas"
498                _ => return None,                 // Valor inválido
499            };
500
501            // Usa o método otimizado do índice
502            return index.get_last_entries(amount);
503        }
504
505        // Query por CID específico (get operation)
506        if let Some(cid) = options.gte
507            && options.amount == Some(1)
508            && options.gt.is_none()
509            && options.lt.is_none()
510            && options.lte.is_none()
511        {
512            // Query pontual por CID - usa busca otimizada
513            if let Some(entry) = index.get_entry_by_cid(&cid) {
514                return Some(vec![entry]);
515            } else {
516                return Some(Vec::new()); // CID não encontrado
517            }
518        }
519
520        // Otimizações futuras: Ranges específicos (futuro)
521        // Por enquanto, queries com múltiplos CIDs usam fallback
522        // que já implementa a lógica correta
523        //
524        // Futuras otimizações:
525        // - Range por posição quando CIDs são consecutivos
526        // - Cache de queries frequentes
527        // - Índice temporal para filtros por timestamp
528
529        None // Usa fallback para queries complexas
530    }
531}