use super::Connector;
use crate::connector::paginator::once::Once;
use crate::document::Document;
use crate::helper::string::DisplayOnlyForDebugging;
use crate::{DataSet, DataStream, Metadata};
use async_stream::stream;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use smol::io::BufReader;
use smol::prelude::*;
use smol::Unblock;
use std::fmt;
use std::io::{stdin, stdout};
use std::io::{Error, ErrorKind, Result};
use std::pin::Pin;
#[derive(Deserialize, Serialize, Clone, Default)]
#[serde(default, deny_unknown_fields)]
pub struct Cli {
#[serde(skip)]
document: Option<Box<dyn Document>>,
#[serde(rename = "metadata")]
#[serde(alias = "meta")]
pub metadata: Metadata,
#[serde(default = "default_eof")]
#[serde(alias = "end_of_input")]
pub eoi: String,
}
fn default_eof() -> String {
"".to_string()
}
impl fmt::Debug for Cli {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Cli")
.field("document", &self.document.display_only_for_debugging())
.field("metadata", &self.metadata.display_only_for_debugging())
.field("eoi", &self.eoi)
.finish()
}
}
#[async_trait]
impl Connector for Cli {
fn set_document(&mut self, document: Box<dyn Document>) -> Result<()> {
self.document = Some(document.clone());
Ok(())
}
fn document(&self) -> Result<&dyn Document> {
self.document.as_deref().ok_or_else(|| {
Error::new(
ErrorKind::InvalidInput,
"The document has not been set in the connector",
)
})
}
fn path(&self) -> String {
"stdout".to_string()
}
fn metadata(&self) -> Metadata {
match &self.document {
Some(document) => self.metadata.clone().merge(&document.metadata()),
None => self.metadata.clone(),
}
}
fn set_parameters(&mut self, _parameters: Value) {}
fn is_variable(&self) -> bool {
false
}
fn is_resource_will_change(&self, _new_parameters: Value) -> Result<bool> {
Ok(false)
}
#[instrument(name = "cli::fetch")]
async fn fetch(&mut self) -> std::io::Result<Option<DataStream>> {
let document = self.document()?;
let mut buffer = String::default();
let mut lines = BufReader::new(Unblock::new(stdin())).lines();
while let Some(line) = lines.next().await {
let line = line?;
if line.trim_end() == self.eoi {
break;
}
buffer.push_str(&line);
buffer.push('\n');
}
trace!("Lines saved into the buffer");
if !document.has_data(buffer.as_bytes())? {
info!("No data found");
return Ok(None);
}
let dataset = document.read(&buffer.into_bytes())?;
info!("Fetch data with success");
Ok(Some(Box::pin(stream! {
for data in dataset {
yield data;
}
})))
}
#[instrument(name = "cli::send", skip(dataset))]
async fn send(&mut self, dataset: &DataSet) -> std::io::Result<Option<DataStream>> {
let mut buffer = Vec::default();
let document = self.document()?;
let mut stdout = Unblock::new(stdout());
buffer.append(&mut document.header(dataset)?);
buffer.append(&mut document.write(dataset)?);
buffer.append(&mut document.footer(dataset)?);
trace!("Write data into stdout");
stdout.write_all(&buffer).await?;
trace!("Flush data into stdout");
stdout.flush().await?;
info!("Send data with success");
Ok(None)
}
async fn erase(&mut self) -> Result<()> {
unimplemented!(
"IO connector can't erase data to the remote document. Use other connector type"
)
}
async fn paginate(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>> {
let paginator = Once {};
paginator.paginate(self).await
}
}
#[cfg(test)]
mod tests {
use crate::document::json::Json;
use super::*;
use macro_rules_attribute::apply;
use smol_macros::test;
#[apply(test!)]
async fn paginate() {
let document = Json::default();
let mut connector = Cli::default();
connector.set_document(Box::new(document)).unwrap();
let mut paging = connector.paginate().await.unwrap();
assert!(
paging.next().await.transpose().unwrap().is_some(),
"Can't get the first reader."
);
assert!(
paging.next().await.transpose().unwrap().is_none(),
"Must return only on connector for IO."
);
}
}