1use super::paginator::local::wildcard::Wildcard;
45use super::Connector;
46use crate::document::Document;
47use crate::helper::checksum::{hasher, str_to_algorithm_name_with_checksum};
48use crate::helper::mustache::Mustache;
49use crate::helper::string::DisplayOnlyForDebugging;
50use crate::{DataResult, DataSet, DataStream, Metadata};
51use async_fs::OpenOptions;
52use async_lock::Mutex;
53use async_stream::stream;
54use async_trait::async_trait;
55use futures::Stream;
56use glob::glob;
57use json_value_merge::Merge;
58use serde::{Deserialize, Serialize};
59use serde_json::{Map, Value};
60use smol::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
61use std::collections::HashMap;
62use std::io;
63use std::pin::Pin;
64use std::sync::{Arc, OnceLock};
65use std::{
66 fmt,
67 io::{Error, ErrorKind, Result, SeekFrom},
68};
69
70type SharedCache = Arc<Mutex<HashMap<String, Vec<DataResult>>>>;
71static CACHES: OnceLock<SharedCache> = OnceLock::new();
72
73#[derive(Deserialize, Serialize, Clone, Default)]
74#[serde(default, deny_unknown_fields)]
75pub struct Local {
76 #[serde(skip)]
77 document: Option<Box<dyn Document>>,
78 #[serde(rename = "metadata")]
79 #[serde(alias = "meta")]
80 pub metadata: Metadata,
81 pub path: String,
82 #[serde(alias = "params")]
83 pub parameters: Value,
84 pub is_cached: bool,
85 #[serde(alias = "checksum")]
86 pub algo_with_checksum: Option<String>,
87}
88
89impl fmt::Debug for Local {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.debug_struct("Local")
92 .field("metadata", &self.metadata.display_only_for_debugging())
93 .field("path", &self.path)
94 .field("parameters", &self.parameters.display_only_for_debugging())
95 .field("is_cached", &self.is_cached)
96 .field("algo_with_checksum", &self.algo_with_checksum)
97 .finish()
98 }
99}
100
101impl Local {
102 pub fn new(path: String) -> Self {
103 Local {
104 path,
105 ..Default::default()
106 }
107 }
108 fn caches() -> &'static Arc<Mutex<HashMap<String, Vec<DataResult>>>> {
109 CACHES.get_or_init(|| Arc::new(Mutex::new(HashMap::new())))
110 }
111 fn cache_key(&self) -> String {
112 self.path()
113 }
114 pub async fn cache(&self) -> io::Result<Option<Vec<DataResult>>> {
115 let key = self.cache_key();
116 let caches = Self::caches();
117 let guard = caches.lock().await;
118
119 if let Some(dataset) = guard.get(&key) {
120 info!(cache_key = key, "Cache hit");
121 Ok(Some(dataset.clone()))
122 } else {
123 Ok(None)
124 }
125 }
126 pub async fn set_cache(&self, dataset: &[DataResult]) {
127 let key = self.cache_key();
128 let caches = Self::caches();
129 caches.lock().await.insert(key.clone(), dataset.to_vec());
130 info!(cache_key = key, "Cache stored");
131 }
132}
133
134#[async_trait]
135impl Connector for Local {
136 fn set_document(&mut self, document: Box<dyn Document>) -> Result<()> {
138 self.document = Some(document);
139
140 Ok(())
141 }
142 fn document(&self) -> Result<&dyn Document> {
144 self.document.as_deref().ok_or_else(|| {
145 Error::new(
146 ErrorKind::InvalidInput,
147 "The document has not been set in the connector",
148 )
149 })
150 }
151 fn path(&self) -> String {
167 if !self.is_variable() {
168 return self.path.clone();
169 }
170
171 let mut params = self.parameters.clone();
172 params.merge(&serde_json::json!({
173 "metadata": self.metadata()
174 }));
175
176 let mut path = self.path.clone();
177 path.replace_mustache(params);
178 path
179 }
180 #[instrument(name = "local::len")]
203 async fn len(&self) -> Result<usize> {
204 if self.path.contains('*') {
205 return Err(Error::other("len() method not available for wildcard path"));
206 }
207
208 let len = match async_fs::metadata(self.path()).await {
209 Ok(metadata) => {
210 let len = metadata.len() as usize;
211 info!(len = len, "Find the length");
212 len
213 }
214 Err(_) => {
215 let len = 0;
216 info!(len = len, "Can't find the length");
217 len
218 }
219 };
220
221 Ok(len)
222 }
223 fn set_parameters(&mut self, parameters: Value) {
225 self.parameters = parameters;
226 }
227 fn is_variable(&self) -> bool {
242 self.path.has_mustache()
243 }
244 fn is_resource_will_change(&self, new_parameters: Value) -> Result<bool> {
262 if !self.is_variable() {
263 trace!("Stay link to the same resource");
264 return Ok(false);
265 }
266
267 let mut metadata_kv = Map::default();
268 metadata_kv.insert("metadata".to_string(), self.metadata().into());
269 let metadata = Value::Object(metadata_kv);
270
271 let mut new_parameters = new_parameters;
272 new_parameters.merge(&metadata);
273 let mut old_parameters = self.parameters.clone();
274 old_parameters.merge(&metadata);
275
276 let mut previous_path = self.path.clone();
277 previous_path.replace_mustache(old_parameters);
278
279 let mut new_path = self.path.clone();
280 new_path.replace_mustache(new_parameters);
281
282 if previous_path == new_path {
283 trace!(path = previous_path, "Stay link to the same resource");
284 return Ok(false);
285 }
286
287 info!(
288 previous_path = previous_path,
289 new_path = new_path,
290 "Will use another resource, regarding the new parameters"
291 );
292 Ok(true)
293 }
294 fn metadata(&self) -> Metadata {
296 match &self.document {
297 Some(document) => self.metadata.clone().merge(&document.metadata()),
298 None => self.metadata.clone(),
299 }
300 }
301 #[instrument(name = "local::fetch")]
333 async fn fetch(&mut self) -> std::io::Result<Option<DataStream>> {
334 let document = self.document()?;
335 let mut buff = Vec::default();
336 let path = self.path();
337 let algo_with_checksum_opt = self.algo_with_checksum.clone();
338
339 if path.has_mustache() {
340 return Err(Error::new(
341 ErrorKind::InvalidInput,
342 format!("This path '{}' is not fully resolved", path),
343 ));
344 }
345
346 if self.is_cached {
347 if let Some(dataset) = self.cache().await? {
348 return Ok(Some(Box::pin(stream! {
349 for data in dataset {
350 yield data;
351 }
352 })));
353 }
354 }
355
356 OpenOptions::new()
357 .read(true)
358 .write(false)
359 .create(false)
360 .append(false)
361 .truncate(false)
362 .open(&path)
363 .await?
364 .read_to_end(&mut buff)
365 .await?;
366
367 info!(path = path, "Fetch data with success");
368
369 if !document.has_data(&buff)? {
370 return Ok(None);
371 }
372
373 if let Some(algorithm_name_with_checksum) = &algo_with_checksum_opt {
374 if let (algorithm_name, Some(checksum)) =
375 str_to_algorithm_name_with_checksum(algorithm_name_with_checksum)?
376 {
377 let mut hasher = hasher(algorithm_name)?;
378 hasher.update(&buff);
379
380 let digest = base16ct::lower::encode_string(&hasher.finalize());
381
382 if !digest.eq(checksum) {
383 return Err(io::Error::new(
384 io::ErrorKind::PermissionDenied,
385 format!(
386 "Checksum verification failed. {}({}) != configuration({})",
387 path, digest, checksum
388 ),
389 ));
390 }
391 };
392 }
393
394 let dataset = document.read(&buff)?;
395
396 if self.is_cached {
397 self.set_cache(&dataset).await;
398 }
399
400 Ok(Some(Box::pin(stream! {
401 for data in dataset {
402 yield data;
403 }
404 })))
405 }
406 #[instrument(skip(dataset), name = "local::send")]
452 async fn send(&mut self, dataset: &DataSet) -> std::io::Result<Option<DataStream>> {
453 let document = self.document()?;
454 let terminator = document.terminator()?;
455 let footer = document.footer(dataset)?;
456 let header = document.header(dataset)?;
457 let body = document.write(dataset)?;
458 let path = self.path();
459
460 if path.has_mustache() {
461 return Err(Error::new(
462 ErrorKind::InvalidInput,
463 format!("This path '{}' is not fully resolved", path),
464 ));
465 }
466
467 let position = match document.can_append() {
468 true => Some(-(footer.len() as isize)),
469 false => None,
470 };
471
472 let mut file = OpenOptions::new()
473 .create(true)
474 .read(true)
475 .write(true)
476 .truncate(false)
477 .open(path.as_str())
478 .await?;
479
480 trace!(path = path, "Lock the resource");
481
482 let file_len = file.metadata().await?.len();
483
484 match position {
485 Some(pos) => match file_len as isize + pos {
486 start if start > 0 => file.seek(SeekFrom::Start(start as u64)).await,
487 _ => file.seek(SeekFrom::Start(0)).await,
488 },
489 None => file.seek(SeekFrom::Start(0)).await,
490 }?;
491
492 if 0 == file_len {
493 file.write_all(&header).await?;
494 }
495 if 0 < file_len && file_len > (header.len() as u64 + footer.len() as u64) {
496 file.write_all(&terminator).await?;
497 }
498 file.write_all(&body).await?;
499 file.write_all(&footer).await?;
500 file.flush().await?;
501 trace!(path = path, "Write data into the resource");
502
503 let checksum = match &self.algo_with_checksum {
504 Some(algorithm_name_with_checksum) => {
505 let (algorithm_name, _) =
506 str_to_algorithm_name_with_checksum(algorithm_name_with_checksum)?;
507 let mut hasher = hasher(algorithm_name)?;
508 let mut buff = Vec::default();
509 file.seek(SeekFrom::Start(0)).await?;
510 file.read_to_end(&mut buff).await?;
511 hasher.update(&buff);
512 base16ct::lower::encode_string(&hasher.finalize())
513 }
514 None => "algorithm undefined".to_string(),
515 };
516
517 info!(path = path, checksum = checksum, "Send data with success");
518
519 Ok(None)
520 }
521 #[instrument(name = "local::erase")]
556 async fn erase(&mut self) -> Result<()> {
557 let path = self.path();
558
559 if path.has_mustache() {
560 return Err(Error::new(
561 ErrorKind::InvalidInput,
562 format!("This path '{}' is not fully resolved", path),
563 ));
564 }
565
566 let paths = glob(path.as_str()).map_err(|e| Error::new(ErrorKind::NotFound, e))?;
567 for path_result in paths {
568 match path_result {
569 Ok(path) => {
570 OpenOptions::new()
571 .read(false)
572 .create(true)
573 .append(false)
574 .write(true)
575 .truncate(true)
576 .open(path.display().to_string())
577 .await?
578 }
579 Err(e) => return Err(Error::new(ErrorKind::NotFound, e)),
580 };
581 }
582
583 info!(path = path, "Erase data with success");
584 Ok(())
585 }
586 async fn paginate(
588 &self,
589 ) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>> {
590 Wildcard::new(self)?.paginate(self).await
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use macro_rules_attribute::apply;
597 use smol::stream::StreamExt;
598 use smol_macros::test;
599
600 use super::*;
601 use crate::document::json::Json;
602 use crate::DataResult;
603
604 #[test]
605 fn is_variable() {
606 let mut connector = Local::default();
607 assert_eq!(false, connector.is_variable());
608 connector.path = "/dir/filename_{{ field }}.ext".to_string();
609 assert_eq!(true, connector.is_variable());
610 }
611 #[test]
612 fn is_resource_will_change() {
613 let mut connector = Local::default();
614 let params = serde_json::from_str(r#"{"field":"test"}"#).unwrap();
615 assert_eq!(
616 false,
617 connector.is_resource_will_change(Value::Null).unwrap()
618 );
619 connector.path = "/dir/static.ext".to_string();
620 assert_eq!(
621 false,
622 connector.is_resource_will_change(Value::Null).unwrap()
623 );
624 connector.path = "/dir/dynamic_{{ field }}.ext".to_string();
625 assert_eq!(true, connector.is_resource_will_change(params).unwrap());
626 }
627 #[test]
628 fn path() {
629 let mut connector = Local::default();
630 connector.path = "/dir/filename_{{ field }}.ext".to_string();
631 let params: Value = serde_json::from_str(r#"{"field":"value"}"#).unwrap();
632 connector.set_parameters(params);
633 assert_eq!("/dir/filename_value.ext", connector.path());
634 }
635 #[apply(test!)]
636 async fn len() {
637 let mut connector = Local::default();
638 connector.path = "./data/one_line.json".to_string();
639 assert!(
640 0 < connector.len().await.unwrap(),
641 "The length of the document is not greather than 0."
642 );
643 connector.path = "./not_found_file".to_string();
644 assert_eq!(0, connector.len().await.unwrap());
645 }
646 #[apply(test!)]
647 async fn is_empty() {
648 let mut connector = Local::default();
649 connector.path = "./data/one_line.json".to_string();
650 assert_eq!(false, connector.is_empty().await.unwrap());
651 connector.path = "./null_file".to_string();
652 assert_eq!(true, connector.is_empty().await.unwrap());
653 }
654 #[apply(test!)]
655 async fn fetch() {
656 let document = Json::default();
657 let mut connector = Local::default();
658 connector.path = "./data/one_line.json".to_string();
659 connector.set_document(Box::new(document)).unwrap();
660 let datastream = connector.fetch().await.unwrap().unwrap();
661 assert!(
662 0 < datastream.count().await,
663 "The inner connector should have a size upper than zero."
664 );
665 }
666 #[apply(test!)]
667 async fn send() {
668 let document = Json::default();
669
670 let expected_result1 =
671 DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
672 let dataset = vec![expected_result1.clone()];
673 let mut connector = Local::default();
674 connector.path = "./data/out/test_local_send".to_string();
675 connector.erase().await.unwrap();
676 connector.set_document(Box::new(document)).unwrap();
677 connector.send(&dataset).await.unwrap();
678
679 let mut connector_read = connector.clone();
680 let mut datastream = connector_read.fetch().await.unwrap().unwrap();
681 assert_eq!(expected_result1.clone(), datastream.next().await.unwrap());
682
683 let expected_result2 =
684 DataResult::Ok(serde_json::from_str(r#"{"column1":"value2"}"#).unwrap());
685 let dataset = vec![expected_result2.clone()];
686 connector.send(&dataset).await.unwrap();
687
688 let mut connector_read = connector.clone();
689 let mut datastream = connector_read.fetch().await.unwrap().unwrap();
690 assert_eq!(expected_result1, datastream.next().await.unwrap());
691 assert_eq!(expected_result2, datastream.next().await.unwrap());
692 }
693 #[apply(test!)]
694 async fn erase() {
695 let document = Json::default();
696
697 let mut connector = Local::default();
698 connector.path = "./data/out/test_local_erase".to_string();
699 let expected_result =
700 DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
701 let dataset = vec![expected_result];
702 connector.set_document(Box::new(document)).unwrap();
703 connector.send(&dataset).await.unwrap();
704 connector.erase().await.unwrap();
705 let datastream = connector.fetch().await.unwrap();
706 assert!(datastream.is_none(), "No datastream with empty body.");
707 }
708 #[apply(test!)]
709 async fn erase_with_wildcard() {
710 let document = Json::default();
711
712 let mut connector = Local::default();
713 connector.path = "./data/out/test_local_erase_with_wildcard".to_string();
714 let expected_result =
715 DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
716 let dataset = vec![expected_result];
717 connector.set_document(Box::new(document)).unwrap();
718 connector.send(&dataset).await.unwrap();
719 connector.erase().await.unwrap();
720 let datastream = connector.fetch().await.unwrap();
721 assert!(datastream.is_none(), "No datastream with empty body.");
722 }
723}