use crate::connector::local::Local;
use crate::connector::Connector;
use crate::ConnectorStream;
use async_stream::stream;
use glob::glob;
use serde::{Deserialize, Serialize};
use std::io::Result;
use std::io::{Error, ErrorKind};
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct Wildcard {
pub paths: Vec<String>,
}
impl Wildcard {
pub fn new(connector: &Local) -> Result<Self> {
if connector.path().is_empty() {
return Err(Error::new(
ErrorKind::InvalidInput,
"The field 'path' for a local connector can't be an empty string".to_string(),
));
}
let paths: Vec<String> = match glob(connector.path().as_str()) {
Ok(paths) => Ok(paths
.filter(|p| p.is_ok())
.map(|p| p.unwrap().display().to_string())
.collect()),
Err(e) => Err(Error::new(ErrorKind::InvalidInput, e)),
}?;
if paths.is_empty() {
return Err(Error::new(
ErrorKind::NotFound,
format!(
"No files found with this path pattern '{}'",
connector.path()
),
));
}
Ok(Wildcard { paths })
}
#[instrument(name = "wildcard::paginate")]
pub async fn paginate(&self, connector: &Local) -> Result<ConnectorStream> {
let connector = connector.clone();
let mut paths = self.paths.clone().into_iter();
Ok(Box::pin(stream! {
for path in &mut paths {
let mut new_connector = connector.clone();
new_connector.path = path.clone();
trace!(connector = format!("{:?}", new_connector).as_str(), "Yield a new connector");
yield Ok(Box::new(new_connector) as Box<dyn Connector>);
}
trace!("Stop yielding new connector");
}))
}
}
#[cfg(test)]
mod tests {
use smol::stream::StreamExt;
use super::*;
use crate::{connector::local::Local, document::json::Json};
use macro_rules_attribute::apply;
use smol_macros::test;
#[apply(test!)]
async fn paginate() {
let document = Json::default();
let mut connector = Local::default();
connector.path = "./data/one_line.*".to_string();
connector.set_document(Box::new(document)).unwrap();
let paginator = Wildcard::new(&connector).unwrap();
let mut paging = paginator.paginate(&connector).await.unwrap();
let connector = paging.next().await.transpose().unwrap().unwrap();
let file_len1 = connector.len().await.unwrap();
assert!(
0 < file_len1,
"The size of the file must be upper than zero."
);
let connector = paging.next().await.transpose().unwrap().unwrap();
let file_len2 = connector.len().await.unwrap();
assert!(
0 < file_len2,
"The size of the file must be upper than zero."
);
assert!(
file_len1 != file_len2,
"The file size of this two files are not different."
);
}
}