guardian_db/stores/document_store/
document.rs

1use crate::address::Address;
2use crate::data_store::Datastore;
3use crate::error::{GuardianError, Result};
4use crate::ipfs_core_api::client::IpfsClient;
5use crate::ipfs_log::identity::Identity;
6use crate::p2p::events::EventBus;
7use crate::stores::base_store::base_store::BaseStore;
8use crate::stores::document_store::index::DocumentIndex;
9use crate::stores::operation::{operation, operation::Operation};
10use crate::traits::{
11    CreateDocumentDBOptions, DocumentStoreGetOptions, NewStoreOptions, Store, TracerWrapper,
12};
13use serde_json::{Map, Value};
14use std::sync::Arc;
15use tracing::{Span, instrument, warn};
16
17/// Representa um documento genérico.
18pub type Document = Value;
19
20/// Estrutura principal da DocumentStore.
21pub struct GuardianDBDocumentStore {
22    // Incorpora a lógica da BaseStore. Em Rust, a composição é preferível à herança.
23    base_store: Arc<BaseStore>,
24    // Opções específicas para a manipulação de documentos.
25    doc_opts: CreateDocumentDBOptions,
26    // Índice específico para documentos - usado pelos métodos específicos da DocumentStore
27    doc_index: Arc<DocumentIndex>,
28    // Cache dos valores para resolver problemas de lifetime
29    cached_address: Arc<dyn Address + Send + Sync>,
30    span: Span,
31    cached_replicator: Option<Arc<crate::stores::replicator::replicator::Replicator>>,
32}
33
34// Implementação da trait Store, delegando para base_store
35#[async_trait::async_trait]
36impl Store for GuardianDBDocumentStore {
37    type Error = GuardianError;
38
39    #[allow(deprecated)]
40    fn events(&self) -> &dyn crate::events::EmitterInterface {
41        self.base_store.events()
42    }
43
44    async fn close(&self) -> std::result::Result<(), Self::Error> {
45        self.base_store.close().await
46    }
47
48    fn address(&self) -> &dyn Address {
49        // Usa o valor em cache para evitar problemas de lifetime
50        self.cached_address.as_ref()
51    }
52
53    fn index(&self) -> Box<dyn crate::traits::StoreIndex<Error = GuardianError> + Send + Sync> {
54        // Usa o DocumentIndex local que foi criado especificamente para esta DocumentStore
55        // Este índice mantém compatibilidade com a trait Store
56        let default_opts = Arc::new(default_store_opts_for_map("_id"));
57        Box::new(DocumentIndex::new(default_opts))
58    }
59
60    fn store_type(&self) -> &str {
61        "docstore"
62    }
63
64    fn replication_status(&self) -> crate::stores::replicator::replication_info::ReplicationInfo {
65        self.base_store.replication_status()
66    }
67
68    fn replicator(&self) -> Option<Arc<crate::stores::replicator::replicator::Replicator>> {
69        // Usa o valor em cache para evitar problemas de lifetime
70        self.cached_replicator.clone()
71    }
72
73    fn cache(&self) -> Arc<dyn Datastore> {
74        self.base_store.cache()
75    }
76
77    async fn drop(&mut self) -> std::result::Result<(), Self::Error> {
78        // O método drop não é assíncrono, então não podemos usar await
79        // ***TODO: Implementar funcionalidade apropriada quando BaseStore for corrigido
80        Ok(())
81    }
82
83    async fn load(&mut self, amount: usize) -> std::result::Result<(), Self::Error> {
84        // BaseStore espera Option<isize>, então convertemos
85        self.base_store.load(Some(amount as isize)).await
86    }
87
88    async fn sync(
89        &mut self,
90        heads: Vec<crate::ipfs_log::entry::Entry>,
91    ) -> std::result::Result<(), Self::Error> {
92        self.base_store.sync(heads).await
93    }
94
95    async fn load_more_from(&mut self, _amount: u64, entries: Vec<crate::ipfs_log::entry::Entry>) {
96        // ***BaseStore.load_more_from não é async e tem assinatura diferente
97        let _ = self.base_store.load_more_from(entries);
98    }
99
100    async fn load_from_snapshot(&mut self) -> std::result::Result<(), Self::Error> {
101        // ***BaseStore refatorada agora permite implementação adequada
102        // Por enquanto, delegamos para BaseStore ou implementamos lógica básica
103        Ok(())
104    }
105
106    fn op_log(&self) -> Arc<parking_lot::RwLock<crate::ipfs_log::log::Log>> {
107        self.base_store.op_log()
108    }
109
110    fn ipfs(&self) -> Arc<IpfsClient> {
111        self.base_store.ipfs()
112    }
113
114    fn db_name(&self) -> &str {
115        self.base_store.db_name()
116    }
117
118    fn identity(&self) -> &Identity {
119        self.base_store.identity()
120    }
121
122    fn access_controller(&self) -> &dyn crate::access_controller::traits::AccessController {
123        self.base_store.access_controller()
124    }
125
126    async fn add_operation(
127        &mut self,
128        op: Operation,
129        on_progress_callback: Option<tokio::sync::mpsc::Sender<crate::ipfs_log::entry::Entry>>,
130    ) -> std::result::Result<crate::ipfs_log::entry::Entry, Self::Error> {
131        self.base_store
132            .add_operation(op, on_progress_callback)
133            .await
134    }
135
136    fn span(&self) -> Arc<tracing::Span> {
137        Arc::new(self.span.clone())
138    }
139
140    fn tracer(&self) -> Arc<TracerWrapper> {
141        self.base_store.tracer()
142    }
143
144    fn event_bus(&self) -> Arc<EventBus> {
145        Arc::new(EventBus::new())
146    }
147
148    fn as_any(&self) -> &dyn std::any::Any {
149        self
150    }
151}
152
153impl GuardianDBDocumentStore {
154    /// Acessa o BaseStore interno para operações de sync
155    pub fn basestore(&self) -> &BaseStore {
156        &self.base_store
157    }
158
159    /// Retorna uma referência ao span de tracing para instrumentação
160    pub fn span(&self) -> &Span {
161        &self.span
162    }
163
164    #[instrument(level = "debug", skip(ipfs, identity, options))]
165    pub async fn new(
166        ipfs: Arc<IpfsClient>,
167        identity: Arc<Identity>,
168        addr: Arc<dyn Address>,
169        mut options: NewStoreOptions,
170    ) -> Result<Self> {
171        // 1. Se opções específicas da store não forem fornecidas, usa o padrão para
172        //    documentos com uma chave "_id".
173        if options.store_specific_opts.is_none() {
174            let default_opts = default_store_opts_for_map("_id");
175            options.store_specific_opts = Some(Box::new(default_opts));
176        }
177
178        // 2. Faz o "downcast" das opções específicas para o tipo esperado.
179        //    O `take()` remove o valor da Option, permitindo-nos tomar posse do Box.
180        let specific_opts_box = options.store_specific_opts.take().ok_or_else(|| {
181            GuardianError::InvalidArgument("StoreSpecificOpts is required".to_string())
182        })?;
183        let doc_opts_box = specific_opts_box
184            .downcast::<CreateDocumentDBOptions>()
185            .map_err(|_| {
186                GuardianError::InvalidArgument(
187                    "Tipo inválido fornecido para opts.StoreSpecificOpts".to_string(),
188                )
189            })?;
190
191        // Converte Box para Arc para compatibilidade com DocumentIndex
192        let doc_opts = Arc::new(*doc_opts_box);
193
194        // 3. Clona as opções (que estão dentro de um Arc) para a closure da fábrica de índice.
195        let doc_opts_for_index = doc_opts.clone();
196
197        // 4. Define a fábrica que a BaseStore usará para criar o índice.
198        options.index = Some(Box::new(move |_data: &[u8]| {
199            // A closure retorna o índice concreto, encapsulado em um Box<dyn ...> para ser um trait object.
200            Box::new(DocumentIndex::new(doc_opts_for_index.clone()))
201        }));
202
203        // 5. Inicializa a BaseStore
204        //    Esta chamada assíncrona lida com toda a configuração do oplog, etc.
205        let base_store = BaseStore::new(ipfs, identity, addr, Some(options))
206            .await
207            .map_err(|e| {
208                GuardianError::Store(format!(
209                    "Não foi possível inicializar a document store: {}",
210                    e
211                ))
212            })?;
213
214        // 6. Constrói e retorna a instância final da GuardianDBDocumentStore.
215        let doc_index = Arc::new(DocumentIndex::new(doc_opts.clone()));
216
217        // Cache dos valores para resolver problemas de lifetime
218        let cached_address = base_store.address();
219        let cached_replicator = base_store.replicator();
220
221        // Create a tracing span for the document store
222        let span = tracing::info_span!("document_store", address = %cached_address.to_string());
223
224        let store = GuardianDBDocumentStore {
225            base_store,
226            doc_opts: (*doc_opts).clone(),
227            doc_index,
228            cached_address,
229            span,
230            cached_replicator,
231        };
232
233        Ok(store)
234    }
235
236    #[instrument(level = "debug", skip(self, opts))]
237    pub async fn get(
238        &self,
239        key: &str,
240        opts: Option<DocumentStoreGetOptions>,
241    ) -> Result<Vec<Document>> {
242        let _entered = self.span.enter();
243        let opts = opts.unwrap_or_default();
244
245        // Prepara a chave de busca de acordo com as opções.
246        let has_multiple_terms = key.contains(' ');
247        let mut key_for_search = key.to_string();
248
249        if has_multiple_terms {
250            key_for_search = key_for_search.replace('.', " ");
251        }
252        if opts.case_insensitive {
253            key_for_search = key_for_search.to_lowercase();
254        }
255
256        // Usa diretamente o DocumentIndex armazenado na struct
257        let doc_index = &self.doc_index;
258
259        let mut documents: Vec<Document> = Vec::new();
260
261        for index_key in doc_index.keys() {
262            let mut index_key_for_search = index_key.clone();
263
264            // Normaliza a chave do índice para a busca, se necessário.
265            if opts.case_insensitive {
266                index_key_for_search = index_key_for_search.to_lowercase();
267                if has_multiple_terms {
268                    index_key_for_search = index_key_for_search.replace('.', " ");
269                }
270            }
271
272            // Verifica a correspondência da chave.
273            let matches = if opts.partial_matches {
274                index_key_for_search.contains(&key_for_search)
275            } else {
276                index_key_for_search == key_for_search
277            };
278
279            if !matches {
280                continue;
281            }
282
283            // Obtém o valor usando o DocumentIndex diretamente
284            if let Some(value_bytes) = doc_index.get_bytes(&index_key) {
285                let doc: Document = serde_json::from_slice(&value_bytes).map_err(|e| {
286                    GuardianError::Serialization(format!(
287                        "Impossível desserializar o valor para a chave {}: {}",
288                        index_key, e
289                    ))
290                })?;
291                documents.push(doc);
292            } else {
293                // Pode ser um erro ou apenas um log, dependendo da consistência esperada do índice.
294                eprintln!(
295                    "Aviso: chave '{}' encontrada no conjunto de chaves do índice, mas sem valor correspondente.",
296                    index_key
297                );
298            }
299        }
300
301        Ok(documents)
302    }
303
304    #[instrument(level = "debug", skip(self, document))]
305    pub async fn put(&mut self, document: Document) -> Result<Operation> {
306        let _entered = self.span.enter();
307        // Extrai a chave e serializa o documento usando as funções fornecidas nas opções.
308        let key = (self.doc_opts.key_extractor)(&document)?;
309        let data = (self.doc_opts.marshal)(&document)?;
310
311        // Cria a operação PUT.
312        let op = Operation::new(Some(key), "PUT".to_string(), Some(data));
313
314        // Adiciona a operação ao log da store (oplog).
315        let entry = self.base_store.add_operation(op, None).await?;
316
317        // Analisa o 'entry' retornado para criar um objeto Operation.
318        let parsed_op = operation::parse_operation(entry)?;
319
320        Ok(parsed_op)
321    }
322
323    #[instrument(level = "debug", skip(self))]
324    pub async fn delete(&mut self, document_id: &str) -> Result<Operation> {
325        let _entered = self.span.enter();
326        // Usa diretamente o DocumentIndex armazenado na struct
327        let doc_index = &self.doc_index;
328
329        // Verifica se a entrada existe antes de deletar.
330        if doc_index.get_bytes(document_id).is_none() {
331            return Err(GuardianError::NotFound(format!(
332                "Nenhuma entrada com a chave '{}' na base de dados",
333                document_id
334            )));
335        }
336
337        // Cria a operação DEL. O payload é None.
338        let op = Operation::new(Some(document_id.to_string()), "DEL".to_string(), None);
339
340        // Adiciona a operação DEL ao log.
341        let entry = self.base_store.add_operation(op, None).await?;
342
343        // Analisa o 'entry' retornado.
344        let parsed_op = operation::parse_operation(entry)?;
345
346        Ok(parsed_op)
347    }
348
349    #[instrument(level = "debug", skip(self, documents))]
350    pub async fn put_batch(&mut self, documents: Vec<Document>) -> Result<Vec<Operation>> {
351        if documents.is_empty() {
352            return Err(GuardianError::InvalidArgument(
353                "Nada para adicionar à store".to_string(),
354            ));
355        }
356
357        let mut operations = Vec::new();
358
359        // Itera sobre cada documento, chamando a função `put` individualmente.
360        // Isso cria uma operação para cada documento no log.
361        for doc in documents {
362            let op = self.put(doc).await?;
363            operations.push(op);
364        }
365
366        Ok(operations)
367    }
368
369    #[instrument(level = "debug", skip(self, documents))]
370    pub async fn put_all(&mut self, documents: Vec<Document>) -> Result<Operation> {
371        if documents.is_empty() {
372            return Err(GuardianError::InvalidArgument(
373                "Nada para adicionar à store".to_string(),
374            ));
375        }
376
377        // Agrega todos os documentos em um único vetor para uma operação em lote.
378        let mut to_add: Vec<(String, Vec<u8>)> = Vec::new();
379
380        for doc in documents {
381            let key = (self.doc_opts.key_extractor)(&doc).map_err(|_| {
382                GuardianError::InvalidArgument(
383                    "Um dos documentos fornecidos não possui chave de índice".to_string(),
384                )
385            })?;
386
387            let data = (self.doc_opts.marshal)(&doc).map_err(|_| {
388                GuardianError::Serialization(
389                    "Não foi possível serializar um dos documentos fornecidos".to_string(),
390                )
391            })?;
392
393            to_add.push((key, data));
394        }
395
396        // Cria uma única operação "PUTALL" com todos os documentos.
397        let op = Operation::new_with_documents(None, "PUTALL".to_string(), to_add);
398
399        let entry = self.base_store.add_operation(op, None).await?;
400
401        let parsed_op = operation::parse_operation(entry)?;
402        Ok(parsed_op)
403    }
404
405    #[instrument(level = "debug", skip(self, filter))]
406    pub fn query<F>(&self, mut filter: F) -> Result<Vec<Document>>
407    where
408        // Aceita qualquer closure que possa ser chamado múltiplas vezes,
409        // recebe uma referência a um Documento e retorna um Result<bool>.
410        F: FnMut(&Document) -> Result<bool>,
411    {
412        // Usa diretamente o DocumentIndex armazenado na struct
413        let doc_index = &self.doc_index;
414
415        let mut results: Vec<Document> = Vec::new();
416
417        for index_key in doc_index.keys() {
418            if let Some(doc_bytes) = doc_index.get_bytes(&index_key) {
419                let doc: Document = serde_json::from_slice(&doc_bytes).map_err(|e| {
420                    GuardianError::Serialization(format!(
421                        "Não foi possível desserializar o documento: {}",
422                        e
423                    ))
424                })?;
425
426                // Chama a closure do filtro. O `?` propaga o erro se o filtro falhar.
427                if filter(&doc)? {
428                    results.push(doc);
429                }
430            }
431        }
432
433        Ok(results)
434    }
435
436    pub fn store_type(&self) -> &'static str {
437        "docstore"
438    }
439}
440
441/// Retorna uma closure que extrai um campo de um `serde_json::Value::Object`.
442///
443/// A closure retornada captura o `key_field` para uso posterior.
444pub fn map_key_extractor(key_field: String) -> impl Fn(&Document) -> Result<String> {
445    move |doc: &Document| {
446        // Assegura que o documento é um objeto JSON (mapa)
447        let obj = doc.as_object().ok_or_else(|| {
448            GuardianError::InvalidArgument(
449                "A entrada precisa ser um objeto JSON (map[string]interface{{}})".to_string(),
450            )
451        })?;
452
453        // Procura pelo campo chave no objeto
454        let value = obj.get(&key_field).ok_or_else(|| {
455            GuardianError::NotFound(format!(
456                "Faltando valor para o campo `{}` na entrada",
457                key_field
458            ))
459        })?;
460
461        // Assegura que o valor encontrado é uma string
462        let key = value.as_str().ok_or_else(|| {
463            GuardianError::InvalidArgument(format!(
464                "O valor para o campo `{}` não é uma string",
465                key_field
466            ))
467        })?;
468
469        Ok(key.to_string())
470    }
471}
472
473/// Cria um conjunto de opções padrão para uma store que lida com documentos
474/// baseados em mapas (JSON Objects), usando um campo específico como chave.
475pub fn default_store_opts_for_map(key_field: &str) -> CreateDocumentDBOptions {
476    CreateDocumentDBOptions {
477        marshal: Arc::new(|doc: &Document| serde_json::to_vec(doc).map_err(GuardianError::from)),
478        unmarshal: Arc::new(|bytes: &[u8]| {
479            serde_json::from_slice(bytes).map_err(GuardianError::from)
480        }),
481        // Usa a função de ordem superior para criar a closure extratora de chave
482        key_extractor: Arc::new(map_key_extractor(key_field.to_string())),
483
484        item_factory: Arc::new(|| Value::Object(Map::new())),
485    }
486}