Skip to main content

chewdata/connector/
local.rs

1//! Local file connector
2//!
3//! This connector reads from and writes to **local filesystem files**.
4//!
5//! It supports:
6//! - Reading **one or multiple files** using glob wildcards (`*`)
7//! - Dynamic paths using **Mustache templates**
8//! - Optional **checksum verification**
9//! - Optional **in-memory caching**
10//!
11//! ---
12//!
13//! ## Configuration
14//!
15//! | Key | Alias | Description | Default | Possible Values |
16//! |-----|-------|-------------|---------|-----------------|
17//! | `type` | – | Required to select this connector | `local` | `local` |
18//! | `metadata` | `meta` | Override or enrich resource metadata | `null` | [`crate::Metadata`] |
19//! | `path` | – | File path or glob pattern. Supports `*` and Mustache variables | `null` | `String` |
20//! | `parameters` | `params` | Variables injected into the path template | `null` | JSON object |
21//! | `algo_with_checksum` | `checksum` | Checksum validation in the form `algorithm:checksum` | `null` | `sha224`, `sha256`, `sha384`, `sha512`, `sha3_*` |
22//!
23//! ---
24//!
25//! ## Example
26//!
27//! ```json
28//! [
29//!   {
30//!     "type": "reader",
31//!     "connector": {
32//!       "type": "local",
33//!       "path": "./{{ folder }}/*.json",
34//!       "metadata": {
35//!         "content-type": "application/json; charset=utf-8"
36//!       },
37//!       "parameters": {
38//!         "folder": "my_folder"
39//!       }
40//!     }
41//!   }
42//! ]
43//! ```
44use super::paginator::local::wildcard::Wildcard;
45use super::Connector;
46use crate::document::Document;
47use crate::helper::checksum::{hasher, str_to_algorithm_name_with_checksum};
48use crate::helper::mustache::Mustache;
49use crate::helper::string::DisplayOnlyForDebugging;
50use crate::{DataResult, DataSet, DataStream, Metadata};
51use async_fs::OpenOptions;
52use async_lock::Mutex;
53use async_stream::stream;
54use async_trait::async_trait;
55use futures::Stream;
56use glob::glob;
57use json_value_merge::Merge;
58use serde::{Deserialize, Serialize};
59use serde_json::{Map, Value};
60use smol::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
61use std::collections::HashMap;
62use std::io;
63use std::pin::Pin;
64use std::sync::{Arc, OnceLock};
65use std::{
66    fmt,
67    io::{Error, ErrorKind, Result, SeekFrom},
68};
69
70type SharedCache = Arc<Mutex<HashMap<String, Vec<DataResult>>>>;
71static CACHES: OnceLock<SharedCache> = OnceLock::new();
72
73#[derive(Deserialize, Serialize, Clone, Default)]
74#[serde(default, deny_unknown_fields)]
75pub struct Local {
76    #[serde(skip)]
77    document: Option<Box<dyn Document>>,
78    #[serde(rename = "metadata")]
79    #[serde(alias = "meta")]
80    pub metadata: Metadata,
81    pub path: String,
82    #[serde(alias = "params")]
83    pub parameters: Value,
84    pub is_cached: bool,
85    #[serde(alias = "checksum")]
86    pub algo_with_checksum: Option<String>,
87}
88
89impl fmt::Debug for Local {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        f.debug_struct("Local")
92            .field("metadata", &self.metadata.display_only_for_debugging())
93            .field("path", &self.path)
94            .field("parameters", &self.parameters.display_only_for_debugging())
95            .field("is_cached", &self.is_cached)
96            .field("algo_with_checksum", &self.algo_with_checksum)
97            .finish()
98    }
99}
100
101impl Local {
102    pub fn new(path: String) -> Self {
103        Local {
104            path,
105            ..Default::default()
106        }
107    }
108    fn caches() -> &'static Arc<Mutex<HashMap<String, Vec<DataResult>>>> {
109        CACHES.get_or_init(|| Arc::new(Mutex::new(HashMap::new())))
110    }
111    fn cache_key(&self) -> String {
112        self.path()
113    }
114    pub async fn cache(&self) -> io::Result<Option<Vec<DataResult>>> {
115        let key = self.cache_key();
116        let caches = Self::caches();
117        let guard = caches.lock().await;
118
119        if let Some(dataset) = guard.get(&key) {
120            info!(cache_key = key, "Cache hit");
121            Ok(Some(dataset.clone()))
122        } else {
123            Ok(None)
124        }
125    }
126    pub async fn set_cache(&self, dataset: &[DataResult]) {
127        let key = self.cache_key();
128        let caches = Self::caches();
129        caches.lock().await.insert(key.clone(), dataset.to_vec());
130        info!(cache_key = key, "Cache stored");
131    }
132}
133
134#[async_trait]
135impl Connector for Local {
136    /// See [`Connector::set_document`] for more details.
137    fn set_document(&mut self, document: Box<dyn Document>) -> Result<()> {
138        self.document = Some(document);
139
140        Ok(())
141    }
142    /// See [`Connector::document`] for more details.
143    fn document(&self) -> Result<&dyn Document> {
144        self.document.as_deref().ok_or_else(|| {
145            Error::new(
146                ErrorKind::InvalidInput,
147                "The document has not been set in the connector",
148            )
149        })
150    }
151    /// See [`Connector::path`] for more details.
152    ///
153    /// # Examples
154    ///
155    /// ```
156    /// use chewdata::connector::local::Local;
157    /// use chewdata::connector::Connector;
158    /// use serde_json::Value;
159    ///
160    /// let mut connector = Local::default();
161    /// connector.path = "/dir/filename_{{ field }}.ext".to_string();
162    /// let params: Value = serde_json::from_str(r#"{"field":"value"}"#).unwrap();
163    /// connector.set_parameters(params);
164    /// assert_eq!("/dir/filename_value.ext", connector.path());
165    /// ```
166    fn path(&self) -> String {
167        if !self.is_variable() {
168            return self.path.clone();
169        }
170
171        let mut params = self.parameters.clone();
172        params.merge(&serde_json::json!({
173            "metadata": self.metadata()
174        }));
175
176        let mut path = self.path.clone();
177        path.replace_mustache(params);
178        path
179    }
180    /// See [`Connector::len`] for more details.
181    ///
182    /// # Examples
183    ///
184    /// ```
185    /// use chewdata::connector::local::Local;
186    /// use chewdata::connector::Connector;
187    /// use std::io;
188    ///
189    /// use macro_rules_attribute::apply;
190    /// use smol_macros::main;
191    ///
192    /// #[apply(main!)]
193    /// async fn main() -> io::Result<()> {
194    ///     let mut connector = Local::default();
195    ///     connector.path = "./Cargo.toml".to_string();
196    ///     assert!(0 < connector.len().await?, "The length of the document is not greather than 0");
197    ///     connector.path = "./not_found_file".to_string();
198    ///     assert_eq!(0, connector.len().await?);
199    ///     Ok(())
200    /// }
201    /// ```
202    #[instrument(name = "local::len")]
203    async fn len(&self) -> Result<usize> {
204        if self.path.contains('*') {
205            return Err(Error::other("len() method not available for wildcard path"));
206        }
207
208        let len = match async_fs::metadata(self.path()).await {
209            Ok(metadata) => {
210                let len = metadata.len() as usize;
211                info!(len = len, "Find the length");
212                len
213            }
214            Err(_) => {
215                let len = 0;
216                info!(len = len, "Can't find the length");
217                len
218            }
219        };
220
221        Ok(len)
222    }
223    /// See [`Connector::set_parameters`] for more details.
224    fn set_parameters(&mut self, parameters: Value) {
225        self.parameters = parameters;
226    }
227    /// See [`Connector::is_variable`] for more details.
228    ///
229    /// # Examples
230    ///
231    /// ```
232    /// use chewdata::connector::local::Local;
233    /// use chewdata::connector::Connector;
234    /// use serde_json::Value;
235    ///
236    /// let mut connector = Local::default();
237    /// assert_eq!(false, connector.is_variable());
238    /// connector.path = "/dir/filename_{{ field }}.ext".to_string();
239    /// assert_eq!(true, connector.is_variable());
240    /// ```
241    fn is_variable(&self) -> bool {
242        self.path.has_mustache()
243    }
244    /// See [`Connector::is_resource_will_change`] for more details.
245    ///
246    /// # Examples
247    ///
248    /// ```
249    /// use chewdata::connector::local::Local;
250    /// use chewdata::connector::Connector;
251    /// use serde_json::Value;
252    ///
253    /// let mut connector = Local::default();
254    /// let params = serde_json::from_str(r#"{"field":"test"}"#).unwrap();
255    /// assert_eq!(false, connector.is_resource_will_change(Value::Null).unwrap());
256    /// connector.path = "/dir/static.ext".to_string();
257    /// assert_eq!(false, connector.is_resource_will_change(Value::Null).unwrap());
258    /// connector.path = "/dir/dynamic_{{ field }}.ext".to_string();
259    /// assert_eq!(true, connector.is_resource_will_change(params).unwrap());
260    /// ```
261    fn is_resource_will_change(&self, new_parameters: Value) -> Result<bool> {
262        if !self.is_variable() {
263            trace!("Stay link to the same resource");
264            return Ok(false);
265        }
266
267        let mut metadata_kv = Map::default();
268        metadata_kv.insert("metadata".to_string(), self.metadata().into());
269        let metadata = Value::Object(metadata_kv);
270
271        let mut new_parameters = new_parameters;
272        new_parameters.merge(&metadata);
273        let mut old_parameters = self.parameters.clone();
274        old_parameters.merge(&metadata);
275
276        let mut previous_path = self.path.clone();
277        previous_path.replace_mustache(old_parameters);
278
279        let mut new_path = self.path.clone();
280        new_path.replace_mustache(new_parameters);
281
282        if previous_path == new_path {
283            trace!(path = previous_path, "Stay link to the same resource");
284            return Ok(false);
285        }
286
287        info!(
288            previous_path = previous_path,
289            new_path = new_path,
290            "Will use another resource, regarding the new parameters"
291        );
292        Ok(true)
293    }
294    /// See [`Connector::metadata`] for more details.
295    fn metadata(&self) -> Metadata {
296        match &self.document {
297            Some(document) => self.metadata.clone().merge(&document.metadata()),
298            None => self.metadata.clone(),
299        }
300    }
301    /// See [`Connector::fetch`] for more details.
302    ///
303    /// # Examples
304    ///
305    /// ```
306    /// use chewdata::connector::local::Local;
307    /// use chewdata::connector::Connector;
308    /// use chewdata::document::json::Json;
309    /// use smol::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
310    /// use smol::prelude::*;
311    /// use smol::stream::StreamExt;
312    /// use std::io;
313    ///
314    /// use macro_rules_attribute::apply;
315    /// use smol_macros::main;
316    ///
317    /// #[apply(main!)]
318    /// async fn main() -> io::Result<()> {
319    ///     let document = Box::new(Json::default());
320    ///     let mut connector = Local::default();
321    ///     connector.set_document(document);
322    ///     connector.path = "./data/one_line.json".to_string();
323    ///     let datastream = connector.fetch().await.unwrap().unwrap();
324    ///     assert!(
325    ///         0 < datastream.count().await,
326    ///         "The inner connector should have a size upper than zero"
327    ///     );
328    ///
329    ///     Ok(())
330    /// }
331    /// ```
332    #[instrument(name = "local::fetch")]
333    async fn fetch(&mut self) -> std::io::Result<Option<DataStream>> {
334        let document = self.document()?;
335        let mut buff = Vec::default();
336        let path = self.path();
337        let algo_with_checksum_opt = self.algo_with_checksum.clone();
338
339        if path.has_mustache() {
340            return Err(Error::new(
341                ErrorKind::InvalidInput,
342                format!("This path '{}' is not fully resolved", path),
343            ));
344        }
345
346        if self.is_cached {
347            if let Some(dataset) = self.cache().await? {
348                return Ok(Some(Box::pin(stream! {
349                    for data in dataset {
350                        yield data;
351                    }
352                })));
353            }
354        }
355
356        OpenOptions::new()
357            .read(true)
358            .write(false)
359            .create(false)
360            .append(false)
361            .truncate(false)
362            .open(&path)
363            .await?
364            .read_to_end(&mut buff)
365            .await?;
366
367        info!(path = path, "Fetch data with success");
368
369        if !document.has_data(&buff)? {
370            return Ok(None);
371        }
372
373        if let Some(algorithm_name_with_checksum) = &algo_with_checksum_opt {
374            if let (algorithm_name, Some(checksum)) =
375                str_to_algorithm_name_with_checksum(algorithm_name_with_checksum)?
376            {
377                let mut hasher = hasher(algorithm_name)?;
378                hasher.update(&buff);
379
380                let digest = base16ct::lower::encode_string(&hasher.finalize());
381
382                if !digest.eq(checksum) {
383                    return Err(io::Error::new(
384                        io::ErrorKind::PermissionDenied,
385                        format!(
386                            "Checksum verification failed. {}({}) != configuration({})",
387                            path, digest, checksum
388                        ),
389                    ));
390                }
391            };
392        }
393
394        let dataset = document.read(&buff)?;
395
396        if self.is_cached {
397            self.set_cache(&dataset).await;
398        }
399
400        Ok(Some(Box::pin(stream! {
401            for data in dataset {
402                yield data;
403            }
404        })))
405    }
406    /// See [`Connector::send`] for more details.
407    ///
408    /// # Examples
409    ///
410    /// ```
411    /// use chewdata::connector::local::Local;
412    /// use chewdata::connector::Connector;
413    /// use chewdata::document::json::Json;
414    /// use chewdata::DataResult;
415    /// use smol::prelude::*;
416    /// use std::io;
417    ///
418    /// use macro_rules_attribute::apply;
419    /// use smol_macros::main;
420    ///
421    /// #[apply(main!)]
422    /// async fn main() -> io::Result<()> {
423    ///     let document = Box::new(Json::default());
424    ///
425    ///     let expected_result1 =
426    ///         DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
427    ///     let dataset = vec![expected_result1.clone()];
428    ///     let mut connector = Local::default();
429    ///     connector.path = "./data/out/test_local_send".to_string();
430    ///     connector.set_document(document)?;
431    ///     connector.erase().await.unwrap();
432    ///     connector.send(&dataset).await.unwrap();
433    ///
434    ///     let mut connector_read = connector.clone();
435    ///     let mut datastream = connector_read.fetch().await.unwrap().unwrap();
436    ///     assert_eq!(expected_result1.clone(), datastream.next().await.unwrap());
437    ///
438    ///     let expected_result2 =
439    ///         DataResult::Ok(serde_json::from_str(r#"{"column1":"value2"}"#).unwrap());
440    ///     let dataset = vec![expected_result2.clone()];
441    ///     connector.send(&dataset).await.unwrap();
442    ///
443    ///     let mut connector_read = connector.clone();
444    ///     let mut datastream = connector_read.fetch().await.unwrap().unwrap();
445    ///     assert_eq!(expected_result1, datastream.next().await.unwrap());
446    ///     assert_eq!(expected_result2, datastream.next().await.unwrap());
447    ///
448    ///     Ok(())
449    /// }
450    /// ```
451    #[instrument(skip(dataset), name = "local::send")]
452    async fn send(&mut self, dataset: &DataSet) -> std::io::Result<Option<DataStream>> {
453        let document = self.document()?;
454        let terminator = document.terminator()?;
455        let footer = document.footer(dataset)?;
456        let header = document.header(dataset)?;
457        let body = document.write(dataset)?;
458        let path = self.path();
459
460        if path.has_mustache() {
461            return Err(Error::new(
462                ErrorKind::InvalidInput,
463                format!("This path '{}' is not fully resolved", path),
464            ));
465        }
466
467        let position = match document.can_append() {
468            true => Some(-(footer.len() as isize)),
469            false => None,
470        };
471
472        let mut file = OpenOptions::new()
473            .create(true)
474            .read(true)
475            .write(true)
476            .truncate(false)
477            .open(path.as_str())
478            .await?;
479
480        trace!(path = path, "Lock the resource");
481
482        let file_len = file.metadata().await?.len();
483
484        match position {
485            Some(pos) => match file_len as isize + pos {
486                start if start > 0 => file.seek(SeekFrom::Start(start as u64)).await,
487                _ => file.seek(SeekFrom::Start(0)).await,
488            },
489            None => file.seek(SeekFrom::Start(0)).await,
490        }?;
491
492        if 0 == file_len {
493            file.write_all(&header).await?;
494        }
495        if 0 < file_len && file_len > (header.len() as u64 + footer.len() as u64) {
496            file.write_all(&terminator).await?;
497        }
498        file.write_all(&body).await?;
499        file.write_all(&footer).await?;
500        file.flush().await?;
501        trace!(path = path, "Write data into the resource");
502
503        let checksum = match &self.algo_with_checksum {
504            Some(algorithm_name_with_checksum) => {
505                let (algorithm_name, _) =
506                    str_to_algorithm_name_with_checksum(algorithm_name_with_checksum)?;
507                let mut hasher = hasher(algorithm_name)?;
508                let mut buff = Vec::default();
509                file.seek(SeekFrom::Start(0)).await?;
510                file.read_to_end(&mut buff).await?;
511                hasher.update(&buff);
512                base16ct::lower::encode_string(&hasher.finalize())
513            }
514            None => "algorithm undefined".to_string(),
515        };
516
517        info!(path = path, checksum = checksum, "Send data with success");
518
519        Ok(None)
520    }
521    /// See [`Connector::erase`] for more details.
522    ///
523    /// # Examples
524    ///
525    /// ```
526    /// use chewdata::connector::local::Local;
527    /// use chewdata::connector::Connector;
528    /// use chewdata::document::json::Json;
529    /// use chewdata::DataResult;
530    /// use smol::prelude::*;
531    /// use std::io;
532    ///
533    /// use macro_rules_attribute::apply;
534    /// use smol_macros::main;
535    ///
536    /// #[apply(main!)]
537    /// async fn main() -> io::Result<()> {
538    ///     let document = Box::new(Json::default());
539    ///
540    ///     let mut connector = Local::default();
541    ///     connector.path = "./data/out/test_local_erase".to_string();
542    ///     let expected_result =
543    ///         DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
544    ///     let dataset = vec![expected_result];
545    ///     connector.set_document(document);
546    ///
547    ///     connector.send(&dataset).await.unwrap();
548    ///     connector.erase().await.unwrap();
549    ///     let datastream = connector.fetch().await.unwrap();
550    ///     assert!(datastream.is_none(), "No datastream with empty body");
551    ///
552    ///     Ok(())
553    /// }
554    /// ```
555    #[instrument(name = "local::erase")]
556    async fn erase(&mut self) -> Result<()> {
557        let path = self.path();
558
559        if path.has_mustache() {
560            return Err(Error::new(
561                ErrorKind::InvalidInput,
562                format!("This path '{}' is not fully resolved", path),
563            ));
564        }
565
566        let paths = glob(path.as_str()).map_err(|e| Error::new(ErrorKind::NotFound, e))?;
567        for path_result in paths {
568            match path_result {
569                Ok(path) => {
570                    OpenOptions::new()
571                        .read(false)
572                        .create(true)
573                        .append(false)
574                        .write(true)
575                        .truncate(true)
576                        .open(path.display().to_string())
577                        .await?
578                }
579                Err(e) => return Err(Error::new(ErrorKind::NotFound, e)),
580            };
581        }
582
583        info!(path = path, "Erase data with success");
584        Ok(())
585    }
586    /// See [`Connector::paginate`] for more details.
587    async fn paginate(
588        &self,
589    ) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>> {
590        Wildcard::new(self)?.paginate(self).await
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use macro_rules_attribute::apply;
597    use smol::stream::StreamExt;
598    use smol_macros::test;
599
600    use super::*;
601    use crate::document::json::Json;
602    use crate::DataResult;
603
604    #[test]
605    fn is_variable() {
606        let mut connector = Local::default();
607        assert_eq!(false, connector.is_variable());
608        connector.path = "/dir/filename_{{ field }}.ext".to_string();
609        assert_eq!(true, connector.is_variable());
610    }
611    #[test]
612    fn is_resource_will_change() {
613        let mut connector = Local::default();
614        let params = serde_json::from_str(r#"{"field":"test"}"#).unwrap();
615        assert_eq!(
616            false,
617            connector.is_resource_will_change(Value::Null).unwrap()
618        );
619        connector.path = "/dir/static.ext".to_string();
620        assert_eq!(
621            false,
622            connector.is_resource_will_change(Value::Null).unwrap()
623        );
624        connector.path = "/dir/dynamic_{{ field }}.ext".to_string();
625        assert_eq!(true, connector.is_resource_will_change(params).unwrap());
626    }
627    #[test]
628    fn path() {
629        let mut connector = Local::default();
630        connector.path = "/dir/filename_{{ field }}.ext".to_string();
631        let params: Value = serde_json::from_str(r#"{"field":"value"}"#).unwrap();
632        connector.set_parameters(params);
633        assert_eq!("/dir/filename_value.ext", connector.path());
634    }
635    #[apply(test!)]
636    async fn len() {
637        let mut connector = Local::default();
638        connector.path = "./data/one_line.json".to_string();
639        assert!(
640            0 < connector.len().await.unwrap(),
641            "The length of the document is not greather than 0."
642        );
643        connector.path = "./not_found_file".to_string();
644        assert_eq!(0, connector.len().await.unwrap());
645    }
646    #[apply(test!)]
647    async fn is_empty() {
648        let mut connector = Local::default();
649        connector.path = "./data/one_line.json".to_string();
650        assert_eq!(false, connector.is_empty().await.unwrap());
651        connector.path = "./null_file".to_string();
652        assert_eq!(true, connector.is_empty().await.unwrap());
653    }
654    #[apply(test!)]
655    async fn fetch() {
656        let document = Json::default();
657        let mut connector = Local::default();
658        connector.path = "./data/one_line.json".to_string();
659        connector.set_document(Box::new(document)).unwrap();
660        let datastream = connector.fetch().await.unwrap().unwrap();
661        assert!(
662            0 < datastream.count().await,
663            "The inner connector should have a size upper than zero."
664        );
665    }
666    #[apply(test!)]
667    async fn send() {
668        let document = Json::default();
669
670        let expected_result1 =
671            DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
672        let dataset = vec![expected_result1.clone()];
673        let mut connector = Local::default();
674        connector.path = "./data/out/test_local_send".to_string();
675        connector.erase().await.unwrap();
676        connector.set_document(Box::new(document)).unwrap();
677        connector.send(&dataset).await.unwrap();
678
679        let mut connector_read = connector.clone();
680        let mut datastream = connector_read.fetch().await.unwrap().unwrap();
681        assert_eq!(expected_result1.clone(), datastream.next().await.unwrap());
682
683        let expected_result2 =
684            DataResult::Ok(serde_json::from_str(r#"{"column1":"value2"}"#).unwrap());
685        let dataset = vec![expected_result2.clone()];
686        connector.send(&dataset).await.unwrap();
687
688        let mut connector_read = connector.clone();
689        let mut datastream = connector_read.fetch().await.unwrap().unwrap();
690        assert_eq!(expected_result1, datastream.next().await.unwrap());
691        assert_eq!(expected_result2, datastream.next().await.unwrap());
692    }
693    #[apply(test!)]
694    async fn erase() {
695        let document = Json::default();
696
697        let mut connector = Local::default();
698        connector.path = "./data/out/test_local_erase".to_string();
699        let expected_result =
700            DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
701        let dataset = vec![expected_result];
702        connector.set_document(Box::new(document)).unwrap();
703        connector.send(&dataset).await.unwrap();
704        connector.erase().await.unwrap();
705        let datastream = connector.fetch().await.unwrap();
706        assert!(datastream.is_none(), "No datastream with empty body.");
707    }
708    #[apply(test!)]
709    async fn erase_with_wildcard() {
710        let document = Json::default();
711
712        let mut connector = Local::default();
713        connector.path = "./data/out/test_local_erase_with_wildcard".to_string();
714        let expected_result =
715            DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
716        let dataset = vec![expected_result];
717        connector.set_document(Box::new(document)).unwrap();
718        connector.send(&dataset).await.unwrap();
719        connector.erase().await.unwrap();
720        let datastream = connector.fetch().await.unwrap();
721        assert!(datastream.is_none(), "No datastream with empty body.");
722    }
723}