guardian_db/stores/document_store/
document.rs1use crate::address::Address;
2use crate::data_store::Datastore;
3use crate::error::{GuardianError, Result};
4use crate::ipfs_core_api::client::IpfsClient;
5use crate::ipfs_log::identity::Identity;
6use crate::p2p::events::EventBus;
7use crate::stores::base_store::base_store::BaseStore;
8use crate::stores::document_store::index::DocumentIndex;
9use crate::stores::operation::{operation, operation::Operation};
10use crate::traits::{
11 CreateDocumentDBOptions, DocumentStoreGetOptions, NewStoreOptions, Store, TracerWrapper,
12};
13use serde_json::{Map, Value};
14use std::sync::Arc;
15use tracing::{Span, instrument, warn};
16
17pub type Document = Value;
19
20pub struct GuardianDBDocumentStore {
22 base_store: Arc<BaseStore>,
24 doc_opts: CreateDocumentDBOptions,
26 doc_index: Arc<DocumentIndex>,
28 cached_address: Arc<dyn Address + Send + Sync>,
30 span: Span,
31 cached_replicator: Option<Arc<crate::stores::replicator::replicator::Replicator>>,
32}
33
34#[async_trait::async_trait]
36impl Store for GuardianDBDocumentStore {
37 type Error = GuardianError;
38
39 #[allow(deprecated)]
40 fn events(&self) -> &dyn crate::events::EmitterInterface {
41 self.base_store.events()
42 }
43
44 async fn close(&self) -> std::result::Result<(), Self::Error> {
45 self.base_store.close().await
46 }
47
48 fn address(&self) -> &dyn Address {
49 self.cached_address.as_ref()
51 }
52
53 fn index(&self) -> Box<dyn crate::traits::StoreIndex<Error = GuardianError> + Send + Sync> {
54 let default_opts = Arc::new(default_store_opts_for_map("_id"));
57 Box::new(DocumentIndex::new(default_opts))
58 }
59
60 fn store_type(&self) -> &str {
61 "docstore"
62 }
63
64 fn replication_status(&self) -> crate::stores::replicator::replication_info::ReplicationInfo {
65 self.base_store.replication_status()
66 }
67
68 fn replicator(&self) -> Option<Arc<crate::stores::replicator::replicator::Replicator>> {
69 self.cached_replicator.clone()
71 }
72
73 fn cache(&self) -> Arc<dyn Datastore> {
74 self.base_store.cache()
75 }
76
77 async fn drop(&mut self) -> std::result::Result<(), Self::Error> {
78 Ok(())
81 }
82
83 async fn load(&mut self, amount: usize) -> std::result::Result<(), Self::Error> {
84 self.base_store.load(Some(amount as isize)).await
86 }
87
88 async fn sync(
89 &mut self,
90 heads: Vec<crate::ipfs_log::entry::Entry>,
91 ) -> std::result::Result<(), Self::Error> {
92 self.base_store.sync(heads).await
93 }
94
95 async fn load_more_from(&mut self, _amount: u64, entries: Vec<crate::ipfs_log::entry::Entry>) {
96 let _ = self.base_store.load_more_from(entries);
98 }
99
100 async fn load_from_snapshot(&mut self) -> std::result::Result<(), Self::Error> {
101 Ok(())
104 }
105
106 fn op_log(&self) -> Arc<parking_lot::RwLock<crate::ipfs_log::log::Log>> {
107 self.base_store.op_log()
108 }
109
110 fn ipfs(&self) -> Arc<IpfsClient> {
111 self.base_store.ipfs()
112 }
113
114 fn db_name(&self) -> &str {
115 self.base_store.db_name()
116 }
117
118 fn identity(&self) -> &Identity {
119 self.base_store.identity()
120 }
121
122 fn access_controller(&self) -> &dyn crate::access_controller::traits::AccessController {
123 self.base_store.access_controller()
124 }
125
126 async fn add_operation(
127 &mut self,
128 op: Operation,
129 on_progress_callback: Option<tokio::sync::mpsc::Sender<crate::ipfs_log::entry::Entry>>,
130 ) -> std::result::Result<crate::ipfs_log::entry::Entry, Self::Error> {
131 self.base_store
132 .add_operation(op, on_progress_callback)
133 .await
134 }
135
136 fn span(&self) -> Arc<tracing::Span> {
137 Arc::new(self.span.clone())
138 }
139
140 fn tracer(&self) -> Arc<TracerWrapper> {
141 self.base_store.tracer()
142 }
143
144 fn event_bus(&self) -> Arc<EventBus> {
145 Arc::new(EventBus::new())
146 }
147
148 fn as_any(&self) -> &dyn std::any::Any {
149 self
150 }
151}
152
153impl GuardianDBDocumentStore {
154 pub fn basestore(&self) -> &BaseStore {
156 &self.base_store
157 }
158
159 pub fn span(&self) -> &Span {
161 &self.span
162 }
163
164 #[instrument(level = "debug", skip(ipfs, identity, options))]
165 pub async fn new(
166 ipfs: Arc<IpfsClient>,
167 identity: Arc<Identity>,
168 addr: Arc<dyn Address>,
169 mut options: NewStoreOptions,
170 ) -> Result<Self> {
171 if options.store_specific_opts.is_none() {
174 let default_opts = default_store_opts_for_map("_id");
175 options.store_specific_opts = Some(Box::new(default_opts));
176 }
177
178 let specific_opts_box = options.store_specific_opts.take().ok_or_else(|| {
181 GuardianError::InvalidArgument("StoreSpecificOpts is required".to_string())
182 })?;
183 let doc_opts_box = specific_opts_box
184 .downcast::<CreateDocumentDBOptions>()
185 .map_err(|_| {
186 GuardianError::InvalidArgument(
187 "Tipo inválido fornecido para opts.StoreSpecificOpts".to_string(),
188 )
189 })?;
190
191 let doc_opts = Arc::new(*doc_opts_box);
193
194 let doc_opts_for_index = doc_opts.clone();
196
197 options.index = Some(Box::new(move |_data: &[u8]| {
199 Box::new(DocumentIndex::new(doc_opts_for_index.clone()))
201 }));
202
203 let base_store = BaseStore::new(ipfs, identity, addr, Some(options))
206 .await
207 .map_err(|e| {
208 GuardianError::Store(format!(
209 "Não foi possível inicializar a document store: {}",
210 e
211 ))
212 })?;
213
214 let doc_index = Arc::new(DocumentIndex::new(doc_opts.clone()));
216
217 let cached_address = base_store.address();
219 let cached_replicator = base_store.replicator();
220
221 let span = tracing::info_span!("document_store", address = %cached_address.to_string());
223
224 let store = GuardianDBDocumentStore {
225 base_store,
226 doc_opts: (*doc_opts).clone(),
227 doc_index,
228 cached_address,
229 span,
230 cached_replicator,
231 };
232
233 Ok(store)
234 }
235
236 #[instrument(level = "debug", skip(self, opts))]
237 pub async fn get(
238 &self,
239 key: &str,
240 opts: Option<DocumentStoreGetOptions>,
241 ) -> Result<Vec<Document>> {
242 let _entered = self.span.enter();
243 let opts = opts.unwrap_or_default();
244
245 let has_multiple_terms = key.contains(' ');
247 let mut key_for_search = key.to_string();
248
249 if has_multiple_terms {
250 key_for_search = key_for_search.replace('.', " ");
251 }
252 if opts.case_insensitive {
253 key_for_search = key_for_search.to_lowercase();
254 }
255
256 let doc_index = &self.doc_index;
258
259 let mut documents: Vec<Document> = Vec::new();
260
261 for index_key in doc_index.keys() {
262 let mut index_key_for_search = index_key.clone();
263
264 if opts.case_insensitive {
266 index_key_for_search = index_key_for_search.to_lowercase();
267 if has_multiple_terms {
268 index_key_for_search = index_key_for_search.replace('.', " ");
269 }
270 }
271
272 let matches = if opts.partial_matches {
274 index_key_for_search.contains(&key_for_search)
275 } else {
276 index_key_for_search == key_for_search
277 };
278
279 if !matches {
280 continue;
281 }
282
283 if let Some(value_bytes) = doc_index.get_bytes(&index_key) {
285 let doc: Document = serde_json::from_slice(&value_bytes).map_err(|e| {
286 GuardianError::Serialization(format!(
287 "Impossível desserializar o valor para a chave {}: {}",
288 index_key, e
289 ))
290 })?;
291 documents.push(doc);
292 } else {
293 eprintln!(
295 "Aviso: chave '{}' encontrada no conjunto de chaves do índice, mas sem valor correspondente.",
296 index_key
297 );
298 }
299 }
300
301 Ok(documents)
302 }
303
304 #[instrument(level = "debug", skip(self, document))]
305 pub async fn put(&mut self, document: Document) -> Result<Operation> {
306 let _entered = self.span.enter();
307 let key = (self.doc_opts.key_extractor)(&document)?;
309 let data = (self.doc_opts.marshal)(&document)?;
310
311 let op = Operation::new(Some(key), "PUT".to_string(), Some(data));
313
314 let entry = self.base_store.add_operation(op, None).await?;
316
317 let parsed_op = operation::parse_operation(entry)?;
319
320 Ok(parsed_op)
321 }
322
323 #[instrument(level = "debug", skip(self))]
324 pub async fn delete(&mut self, document_id: &str) -> Result<Operation> {
325 let _entered = self.span.enter();
326 let doc_index = &self.doc_index;
328
329 if doc_index.get_bytes(document_id).is_none() {
331 return Err(GuardianError::NotFound(format!(
332 "Nenhuma entrada com a chave '{}' na base de dados",
333 document_id
334 )));
335 }
336
337 let op = Operation::new(Some(document_id.to_string()), "DEL".to_string(), None);
339
340 let entry = self.base_store.add_operation(op, None).await?;
342
343 let parsed_op = operation::parse_operation(entry)?;
345
346 Ok(parsed_op)
347 }
348
349 #[instrument(level = "debug", skip(self, documents))]
350 pub async fn put_batch(&mut self, documents: Vec<Document>) -> Result<Vec<Operation>> {
351 if documents.is_empty() {
352 return Err(GuardianError::InvalidArgument(
353 "Nada para adicionar à store".to_string(),
354 ));
355 }
356
357 let mut operations = Vec::new();
358
359 for doc in documents {
362 let op = self.put(doc).await?;
363 operations.push(op);
364 }
365
366 Ok(operations)
367 }
368
369 #[instrument(level = "debug", skip(self, documents))]
370 pub async fn put_all(&mut self, documents: Vec<Document>) -> Result<Operation> {
371 if documents.is_empty() {
372 return Err(GuardianError::InvalidArgument(
373 "Nada para adicionar à store".to_string(),
374 ));
375 }
376
377 let mut to_add: Vec<(String, Vec<u8>)> = Vec::new();
379
380 for doc in documents {
381 let key = (self.doc_opts.key_extractor)(&doc).map_err(|_| {
382 GuardianError::InvalidArgument(
383 "Um dos documentos fornecidos não possui chave de índice".to_string(),
384 )
385 })?;
386
387 let data = (self.doc_opts.marshal)(&doc).map_err(|_| {
388 GuardianError::Serialization(
389 "Não foi possível serializar um dos documentos fornecidos".to_string(),
390 )
391 })?;
392
393 to_add.push((key, data));
394 }
395
396 let op = Operation::new_with_documents(None, "PUTALL".to_string(), to_add);
398
399 let entry = self.base_store.add_operation(op, None).await?;
400
401 let parsed_op = operation::parse_operation(entry)?;
402 Ok(parsed_op)
403 }
404
405 #[instrument(level = "debug", skip(self, filter))]
406 pub fn query<F>(&self, mut filter: F) -> Result<Vec<Document>>
407 where
408 F: FnMut(&Document) -> Result<bool>,
411 {
412 let doc_index = &self.doc_index;
414
415 let mut results: Vec<Document> = Vec::new();
416
417 for index_key in doc_index.keys() {
418 if let Some(doc_bytes) = doc_index.get_bytes(&index_key) {
419 let doc: Document = serde_json::from_slice(&doc_bytes).map_err(|e| {
420 GuardianError::Serialization(format!(
421 "Não foi possível desserializar o documento: {}",
422 e
423 ))
424 })?;
425
426 if filter(&doc)? {
428 results.push(doc);
429 }
430 }
431 }
432
433 Ok(results)
434 }
435
436 pub fn store_type(&self) -> &'static str {
437 "docstore"
438 }
439}
440
441pub fn map_key_extractor(key_field: String) -> impl Fn(&Document) -> Result<String> {
445 move |doc: &Document| {
446 let obj = doc.as_object().ok_or_else(|| {
448 GuardianError::InvalidArgument(
449 "A entrada precisa ser um objeto JSON (map[string]interface{{}})".to_string(),
450 )
451 })?;
452
453 let value = obj.get(&key_field).ok_or_else(|| {
455 GuardianError::NotFound(format!(
456 "Faltando valor para o campo `{}` na entrada",
457 key_field
458 ))
459 })?;
460
461 let key = value.as_str().ok_or_else(|| {
463 GuardianError::InvalidArgument(format!(
464 "O valor para o campo `{}` não é uma string",
465 key_field
466 ))
467 })?;
468
469 Ok(key.to_string())
470 }
471}
472
473pub fn default_store_opts_for_map(key_field: &str) -> CreateDocumentDBOptions {
476 CreateDocumentDBOptions {
477 marshal: Arc::new(|doc: &Document| serde_json::to_vec(doc).map_err(GuardianError::from)),
478 unmarshal: Arc::new(|bytes: &[u8]| {
479 serde_json::from_slice(bytes).map_err(GuardianError::from)
480 }),
481 key_extractor: Arc::new(map_key_extractor(key_field.to_string())),
483
484 item_factory: Arc::new(|| Value::Object(Map::new())),
485 }
486}