guardian_db/stores/event_log_store/log.rs
1use crate::data_store::Datastore;
2use crate::error::{GuardianError, Result};
3use crate::ipfs_core_api::client::IpfsClient;
4use crate::ipfs_log::{entry::Entry, identity::Identity};
5use crate::p2p::events::EventBus;
6use crate::stores::base_store::base_store::BaseStore;
7use crate::stores::operation::{operation, operation::Operation};
8use crate::traits::{self, EventLogStore, Store, StreamOptions};
9use crate::{address::Address, stores::event_log_store::index::new_event_index};
10use cid::Cid;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{Span, instrument};
14
15/// Implementação do trait `EventLogStore` para `GuardianDBEventLogStore`.
16#[async_trait::async_trait]
17impl EventLogStore for GuardianDBEventLogStore {
18 /// Adiciona um novo dado ao log.
19 async fn add(&mut self, data: Vec<u8>) -> std::result::Result<Operation, Self::Error> {
20 // Chama o método interno da struct GuardianDBEventLogStore
21 GuardianDBEventLogStore::add(self, data).await
22 }
23
24 /// Obtém uma entrada específica do log pelo seu CID.
25 async fn get(&self, cid: Cid) -> std::result::Result<Operation, Self::Error> {
26 // Chama o método interno da struct GuardianDBEventLogStore
27 GuardianDBEventLogStore::get(self, cid).await
28 }
29
30 /// Retorna uma lista de operações que ocorreram na store, com opções de filtro.
31 async fn list(
32 &self,
33 options: Option<StreamOptions>,
34 ) -> std::result::Result<Vec<Operation>, Self::Error> {
35 // Chama o método interno da struct GuardianDBEventLogStore
36 GuardianDBEventLogStore::list(self, options).await
37 }
38}
39
40pub struct GuardianDBEventLogStore {
41 basestore: Arc<BaseStore>,
42 span: Span,
43}
44
45// Implementação da trait Store (que é herdada por EventLogStore)
46#[async_trait::async_trait]
47impl Store for GuardianDBEventLogStore {
48 type Error = GuardianError;
49
50 #[allow(deprecated)]
51 fn events(&self) -> &dyn crate::events::EmitterInterface {
52 self.basestore.events()
53 }
54
55 async fn close(&self) -> std::result::Result<(), Self::Error> {
56 self.basestore.close().await
57 }
58
59 fn address(&self) -> &dyn crate::address::Address {
60 Store::address(self.basestore.as_ref())
61 }
62
63 fn index(&self) -> Box<dyn crate::traits::StoreIndex<Error = GuardianError> + Send + Sync> {
64 self.basestore.index()
65 }
66
67 fn store_type(&self) -> &str {
68 "eventlog"
69 }
70
71 fn replication_status(&self) -> crate::stores::replicator::replication_info::ReplicationInfo {
72 self.basestore.replication_status()
73 }
74
75 fn replicator(&self) -> Option<Arc<crate::stores::replicator::replicator::Replicator>> {
76 self.basestore.replicator()
77 }
78
79 fn cache(&self) -> Arc<dyn Datastore> {
80 self.basestore.cache()
81 }
82
83 async fn drop(&mut self) -> std::result::Result<(), Self::Error> {
84 // ***BaseStore não tem método async drop público, então implementamos uma limpeza básica
85 // A limpeza real é feita automaticamente quando o BaseStore é dropped
86 Ok(())
87 }
88
89 async fn load(&mut self, amount: usize) -> std::result::Result<(), Self::Error> {
90 self.basestore.load(Some(amount as isize)).await
91 }
92
93 async fn sync(
94 &mut self,
95 heads: Vec<crate::ipfs_log::entry::Entry>,
96 ) -> std::result::Result<(), Self::Error> {
97 self.basestore.sync(heads).await
98 }
99
100 async fn load_more_from(&mut self, _amount: u64, entries: Vec<crate::ipfs_log::entry::Entry>) {
101 let _ = self.basestore.load_more_from(entries);
102 }
103
104 async fn load_from_snapshot(&mut self) -> std::result::Result<(), Self::Error> {
105 self.basestore.load_from_snapshot().await
106 }
107
108 fn op_log(&self) -> Arc<parking_lot::RwLock<crate::ipfs_log::log::Log>> {
109 self.basestore.op_log()
110 }
111
112 fn ipfs(&self) -> Arc<crate::ipfs_core_api::client::IpfsClient> {
113 self.basestore.ipfs()
114 }
115
116 fn db_name(&self) -> &str {
117 self.basestore.db_name()
118 }
119
120 fn identity(&self) -> &Identity {
121 self.basestore.identity()
122 }
123
124 fn access_controller(&self) -> &dyn crate::access_controller::traits::AccessController {
125 self.basestore.access_controller()
126 }
127
128 async fn add_operation(
129 &mut self,
130 op: Operation,
131 on_progress_callback: Option<tokio::sync::mpsc::Sender<crate::ipfs_log::entry::Entry>>,
132 ) -> std::result::Result<crate::ipfs_log::entry::Entry, Self::Error> {
133 self.basestore.add_operation(op, on_progress_callback).await
134 }
135
136 fn span(&self) -> Arc<tracing::Span> {
137 Arc::new(self.span.clone())
138 }
139
140 fn tracer(&self) -> Arc<crate::traits::TracerWrapper> {
141 self.basestore.tracer()
142 }
143
144 fn event_bus(&self) -> Arc<EventBus> {
145 self.basestore.event_bus()
146 }
147
148 fn as_any(&self) -> &dyn std::any::Any {
149 self
150 }
151}
152
153impl GuardianDBEventLogStore {
154 /// Getter para acessar o BaseStore interno
155 pub fn basestore(&self) -> &BaseStore {
156 &self.basestore
157 }
158
159 /// Retorna uma referência ao span de tracing para instrumentação
160 pub fn span(&self) -> &Span {
161 &self.span
162 }
163
164 /// Instancia uma nova EventLogStore, adaptada para usar cliente nativo Iroh.
165 ///
166 /// # Argumentos
167 ///
168 /// * `ipfs_client` - Cliente IPFS compartilhado via Arc para operações de rede
169 /// * `identity` - Identidade do nó para assinatura de entradas
170 /// * `addr` - Endereço da store para identificação única
171 /// * `options` - Opções de configuração da store (índice, cache, etc.)
172 ///
173 /// # Retorna
174 ///
175 /// Uma nova instância de `GuardianDBEventLogStore` configurada e pronta para uso
176 ///
177 /// # Erros
178 ///
179 /// Retorna `GuardianError::Store` se:
180 /// - A inicialização do BaseStore falhar
181 /// - As opções de configuração forem inválidas
182 #[instrument(level = "debug", skip(ipfs_client, identity, addr, options))]
183 pub async fn new(
184 ipfs_client: Arc<IpfsClient>,
185 identity: Arc<Identity>,
186 addr: Arc<dyn Address + Send + Sync>,
187 mut options: traits::NewStoreOptions,
188 ) -> Result<Self> {
189 // Validação básica dos parâmetros - verifica se os componentes essenciais existem
190 if addr.to_string().is_empty() {
191 return Err(GuardianError::Store(
192 "Invalid address provided, cannot create EventLogStore".to_string(),
193 ));
194 }
195
196 options.index = Some(Box::new(new_event_index));
197 // Inicializa o BaseStore com as opções fornecidas
198 let basestore = BaseStore::new(ipfs_client, identity, addr.clone(), Some(options))
199 .await
200 .map_err(|e| {
201 GuardianError::Store(format!(
202 "Failed to initialize base store for EventLogStore: {}",
203 e
204 ))
205 })?;
206
207 // Cria span para esta instância da EventLogStore
208 let span = tracing::info_span!("event_log_store", address = %addr.to_string());
209
210 Ok(GuardianDBEventLogStore { basestore, span })
211 }
212
213 /// Coleta todas as operações de uma stream em um vetor.
214 #[instrument(level = "debug", skip(self, options))]
215 pub async fn list(&self, options: Option<StreamOptions>) -> Result<Vec<Operation>> {
216 let _entered = self.span.enter();
217 let (tx, mut rx) = mpsc::channel(1);
218
219 // Para simplificar, vamos executar diretamente em vez de spawn
220 self.stream(tx, options).await?;
221
222 let mut operations = Vec::new();
223 while let Some(op) = rx.recv().await {
224 operations.push(op);
225 }
226
227 Ok(operations)
228 }
229
230 /// Cria e adiciona uma nova operação "ADD" ao log.
231 ///
232 /// # Argumentos
233 ///
234 /// * `value` - Dados em bytes para adicionar ao log
235 ///
236 /// # Retorna
237 ///
238 /// A operação criada com seu CID único
239 ///
240 /// # Erros
241 ///
242 /// - Se os dados estiverem vazios (opcional, dependendo da política)
243 /// - Se a adição ao BaseStore falhar
244 /// - Se a conversão Entry -> Operation falhar
245 #[instrument(level = "debug", skip(self, value))]
246 pub async fn add(&mut self, value: Vec<u8>) -> Result<Operation> {
247 // Validação opcional: verificar se há dados
248 if value.is_empty() {
249 return Err(GuardianError::Store(
250 "Cannot add empty data to EventLogStore".to_string(),
251 ));
252 }
253
254 let op = Operation::new(None, "ADD".to_string(), Some(value));
255
256 // `add_operation` retorna um `Entry`.
257 let entry = self
258 .add_operation(op, None)
259 .await
260 .map_err(|e| GuardianError::Store(format!("Failed to add operation to log: {}", e)))?;
261
262 // `parse_operation` converte o `Entry` de volta para uma `Operation`.
263 let op_result = operation::parse_operation(entry).map_err(|e| {
264 GuardianError::Store(format!("Failed to parse newly created entry: {}", e))
265 })?;
266
267 Ok(op_result)
268 }
269
270 /// Recupera uma única operação do log pelo seu CID.
271 ///
272 /// # Argumentos
273 ///
274 /// * `cid` - Content Identifier da entrada desejada
275 ///
276 /// # Retorna
277 ///
278 /// A operação correspondente ao CID fornecido
279 ///
280 /// # Erros
281 ///
282 /// - Se o CID não for encontrado no log
283 /// - Se a stream não retornar resultados
284 #[instrument(level = "debug", skip(self))]
285 pub async fn get(&self, cid: Cid) -> Result<Operation> {
286 let _entered = self.span.enter();
287 let (tx, mut rx) = mpsc::channel(1);
288
289 let stream_options = StreamOptions {
290 gte: Some(cid),
291 amount: Some(1),
292 ..Default::default()
293 };
294
295 // Para simplificar, vamos executar diretamente
296 self.stream(tx, Some(stream_options)).await?;
297
298 // Aguarda o primeiro valor
299 if let Some(value) = rx.recv().await {
300 Ok(value)
301 } else {
302 Err(GuardianError::Store(format!(
303 "No operation found for CID: {}",
304 cid
305 )))
306 }
307 }
308
309 /// Busca entradas, as converte em operações e as envia através de um canal.
310 #[instrument(level = "debug", skip(self, result_chan, options))]
311 pub async fn stream(
312 &self,
313 result_chan: mpsc::Sender<Operation>,
314 options: Option<StreamOptions>,
315 ) -> Result<()> {
316 // A função `query` retorna as entradas (entries) do log.
317 let messages = self
318 .query(options)
319 .map_err(|e| GuardianError::Store(format!("unable to fetch query results: {}", e)))?;
320
321 for message in messages {
322 // Converte cada entrada em uma Operação.
323 let op = operation::parse_operation(message)
324 .map_err(|e| GuardianError::Store(format!("unable to parse operation: {}", e)))?;
325
326 // Envia a operação pelo canal. Se o receptor for fechado, o envio falhará
327 // e o loop será interrompido, o que é o comportamento esperado.
328 if result_chan.send(op).await.is_err() {
329 // O receptor foi fechado, então podemos parar de enviar.
330 break;
331 }
332 }
333
334 // Em Rust, o canal é fechado automaticamente quando `result_chan` (o Sender)
335 // sai de escopo, então uma chamada explícita como `close(resultChan)` não é necessária.
336 Ok(())
337 }
338
339 /// Executa a lógica de busca no índice do log com base nas opções de filtro.
340 ///
341 /// # Performance
342 ///
343 /// - Usa o índice quando disponível para queries otimizadas
344 /// - Fallback para acesso direto ao oplog quando necessário
345 /// - Suporta filtros por CID (gt, gte, lt, lte) e limitação de quantidade
346 #[instrument(level = "debug", skip(self, options))]
347 fn query(&self, options: Option<StreamOptions>) -> Result<Vec<Entry>> {
348 let options = options.unwrap_or_default();
349
350 // Tenta usar o índice primeiro para melhor performance
351 let events = match self.basestore.with_index(|index| {
352 // Implementa busca otimizada no índice baseada nas StreamOptions
353 self.optimized_index_query(index, &options)
354 }) {
355 Some(Some(indexed_results)) => indexed_results,
356 _ => {
357 // Fallback: acessa o oplog diretamente quando índice não está disponível
358 // ou não suporta a query específica
359 self.basestore.with_oplog(|log| {
360 log.values()
361 .iter()
362 .map(|arc_entry| (**arc_entry).clone())
363 .collect::<Vec<_>>()
364 })
365 }
366 };
367
368 // Calcula a quantidade de itens a serem retornados.
369 let amount = match options.amount {
370 Some(a) if a > -1 => a as usize,
371 _ => events.len(), // Se amount for -1 ou None, pega todos.
372 };
373
374 if options.gt.is_some() || options.gte.is_some() {
375 // Caso "maior que" (Greater Than)
376 let cid = options.gt.or(options.gte).unwrap();
377 let inclusive = options.gte.is_some();
378 return Ok(self.read(&events, Some(cid), amount, inclusive));
379 }
380
381 let cid = options.lt.or(options.lte);
382
383 // Caso "menor que" (Lower Than) ou N últimos.
384 // Inverte os eventos para buscar dos mais recentes para os mais antigos.
385 let mut events = events;
386 events.reverse();
387
388 // A busca é inclusiva se LTE for definido ou se nenhum limite (LT/LTE) for definido.
389 let inclusive = options.lte.is_some() || cid.is_none();
390 let mut result = self.read(&events, cid, amount, inclusive);
391
392 // Desfaz a inversão do resultado para manter a ordem cronológica original.
393 result.reverse();
394
395 Ok(result)
396 }
397
398 /// Função auxiliar para ler uma fatia de entradas a partir de um hash.
399 ///
400 /// # Argumentos
401 ///
402 /// * `ops` - Slice de entradas para filtrar
403 /// * `hash` - CID opcional para usar como ponto de início
404 /// * `amount` - Quantidade máxima de entradas a retornar
405 /// * `inclusive` - Se deve incluir a entrada com o hash fornecido
406 ///
407 /// # Retorna
408 ///
409 /// Vetor de entradas filtradas baseado nos critérios
410 ///
411 /// # Performance
412 ///
413 /// - O(n) para encontrar o índice inicial por hash
414 /// - O(amount) para coletar os resultados
415 /// - Otimizada para uso com iteradores Rust
416 fn read(&self, ops: &[Entry], hash: Option<Cid>, amount: usize, inclusive: bool) -> Vec<Entry> {
417 if amount == 0 {
418 return Vec::new();
419 }
420
421 // Encontra o índice inicial.
422 let mut start_index = 0;
423 if let Some(h) = hash {
424 if let Some(idx) = ops.iter().position(|e| e.hash() == h.to_string()) {
425 start_index = idx;
426 } else {
427 // Se o hash não for encontrado, não há o que retornar.
428 return Vec::new();
429 }
430 }
431
432 // Se não for inclusivo, começa a partir do próximo elemento.
433 if !inclusive {
434 start_index += 1;
435 }
436
437 // Limita a quantidade de elementos e coleta o resultado.
438 ops.iter().skip(start_index).take(amount).cloned().collect()
439 }
440
441 /// Busca otimizada no índice baseada nas StreamOptions.
442 ///
443 /// Utiliza os novos métodos opcionais do trait StoreIndex
444 ///
445 /// # Argumentos
446 ///
447 /// * `index` - Referência ao índice da store
448 /// * `options` - Opções de filtro da stream
449 ///
450 /// # Retorna
451 ///
452 /// `Some(Vec<Entry>)` se conseguir processar a query otimizada
453 /// `None` se deve usar fallback (oplog direto)
454 ///
455 /// # Casos Otimizados (Implementados)
456 ///
457 /// 1. **Amount-only queries**: Últimas N entradas usando `get_last_entries()`
458 /// 2. **Range queries**: Faixas específicas usando `get_entries_range()`
459 /// 3. **CID queries**: Busca por CID usando `get_entry_by_cid()`
460 ///
461 /// # Casos de Fallback
462 ///
463 /// 1. **Índice não suporta Entry**: `supports_entry_queries()` retorna false
464 /// 2. **Queries complexas**: Combinações não otimizadas
465 /// 3. **Índice vazio**: Nenhuma entrada disponível
466 ///
467 /// # Performance
468 ///
469 /// - **get_last_entries()**: O(k) onde k = número de entradas solicitadas
470 /// - **get_entry_by_cid()**: O(n) atual, O(1) futuro com índice por CID
471 /// - **get_entries_range()**: O(k) onde k = tamanho do range
472 fn optimized_index_query(
473 &self,
474 index: &dyn crate::traits::StoreIndex<Error = GuardianError>,
475 options: &StreamOptions,
476 ) -> Option<Vec<Entry>> {
477 // Verifica se o índice suporta queries otimizadas com Entry completas
478 if !index.supports_entry_queries() {
479 return None; // Fallback para oplog
480 }
481
482 // Validação rápida: verifica se o índice tem dados
483 let total_entries = match index.len() {
484 Ok(len) if len > 0 => len,
485 _ => return None, // Índice vazio - usa fallback
486 };
487
488 // Query simples por quantidade (caso mais comum)
489 let is_simple_amount_query = options.gt.is_none()
490 && options.gte.is_none()
491 && options.lt.is_none()
492 && options.lte.is_none();
493
494 if is_simple_amount_query {
495 let amount = match options.amount {
496 Some(a) if a > 0 => (a as usize).min(total_entries),
497 Some(-1) | None => total_entries, // -1 ou None significa "todas"
498 _ => return None, // Valor inválido
499 };
500
501 // Usa o método otimizado do índice
502 return index.get_last_entries(amount);
503 }
504
505 // Query por CID específico (get operation)
506 if let Some(cid) = options.gte
507 && options.amount == Some(1)
508 && options.gt.is_none()
509 && options.lt.is_none()
510 && options.lte.is_none()
511 {
512 // Query pontual por CID - usa busca otimizada
513 if let Some(entry) = index.get_entry_by_cid(&cid) {
514 return Some(vec![entry]);
515 } else {
516 return Some(Vec::new()); // CID não encontrado
517 }
518 }
519
520 // Otimizações futuras: Ranges específicos (futuro)
521 // Por enquanto, queries com múltiplos CIDs usam fallback
522 // que já implementa a lógica correta
523 //
524 // Futuras otimizações:
525 // - Range por posição quando CIDs são consecutivos
526 // - Cache de queries frequentes
527 // - Índice temporal para filtros por timestamp
528
529 None // Usa fallback para queries complexas
530 }
531}