use std::path::PathBuf;
use std::sync::Arc;
use futures::future::{select, Either};
use futures::FutureExt;
use log::{debug, error, warn};
use serde::Deserialize;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::mpsc;
use crate::comms::{Link, Terminated};
use crate::config::ConfigPath;
use crate::ingress;
use crate::payload::Update;
use crate::roto_runtime::types::{LogEntry, OutputStreamMessageRecord};
use crate::targets::Component;
use crate::targets::TargetCommand;
use crate::targets::WaitPoint;
#[derive(Debug, Deserialize)]
pub struct File {
#[serde(flatten)]
config: Config,
sources: Link,
}
#[derive(Debug, Deserialize)]
pub struct Config {
format: Format,
filename: ConfigPath,
}
#[derive(Debug, Deserialize)]
#[serde()]
pub enum Format {
#[serde(rename = "csv")]
Csv,
#[serde(rename = "json")]
Json,
#[serde(rename = "json-min")]
JsonMin,
}
impl File {
pub async fn run(
self,
component: Component,
cmd: mpsc::Receiver<TargetCommand>,
waitpoint: WaitPoint,
) -> Result<(), Terminated> {
FileRunner::new(self.config, component)
.run(self.sources, cmd, waitpoint)
.await
}
}
pub struct FileRunner {
component: Component,
config: Config,
ingresses: Arc<ingress::Register>,
target_file: Option<BufWriter<tokio::fs::File>>,
}
impl FileRunner {
pub fn new(config: Config, mut component: Component) -> Self {
let ingresses = component.ingresses().clone();
Self {
config,
component,
ingresses,
target_file: None
}
}
pub async fn run(
mut self,
mut sources: Link,
mut cmd_rx: mpsc::Receiver<TargetCommand>,
waitpoint: WaitPoint,
) -> Result<(), Terminated> {
let f = tokio::fs::File::create(self.config.filename.clone())
.await
.inspect_err(|e| error!("{}", e))
.map_err(|_| Terminated)
?;
self.target_file = Some(BufWriter::new(f));
sources.connect(false).await.unwrap();
let sources2 = sources.clone();
waitpoint.running().await;
loop {
match select(
cmd_rx.recv().boxed(),
sources.query().boxed(),
).await {
Either::Left((gate_cmd, _)) => {
match gate_cmd {
Some(cmd) => match cmd {
TargetCommand::Reconfigure { .. } => {
warn!("Reconfiguration for FileOut component not yet implemented");
}
TargetCommand::ReportLinks { report } => {
report.set_source(&sources2);
}
TargetCommand::Terminate => {
break
}
}
None => break
}
}
Either::Right((update, _)) => {
let update = match update {
Ok(upd) => upd,
Err(e) => {
debug!("Gate error in file-out target: {}", e);
break
}
};
match update {
Update::OutputStream(msgs) => {
for m in msgs {
let m = m.into_record();
if let Some(dst) = self.target_file.as_mut() {
if let OutputStreamMessageRecord::Entry(ref e) = m {
if let Some(ref custom_str) = e.custom {
dst.write_all(custom_str.as_ref()).await.unwrap();
dst.write_all(b"\n").await.unwrap();
continue;
}
}
match self.config.format {
Format::Csv => {
let mut wrt = csv::WriterBuilder::new().has_headers(false).from_writer(vec![]);
wrt.serialize(m).unwrap();
dst.write_all(&wrt.into_inner().unwrap()).await.unwrap();
}
Format::Json => {
if let Ok(bytes) = serde_json::to_vec(&m) {
dst.write_all(&bytes).await.unwrap();
dst.write_all(b"\n").await.unwrap();
}
}
Format::JsonMin => {
if let OutputStreamMessageRecord::Entry(e) = m {
if let Ok(bytes) = serde_json::to_vec(&e.into_minimal()) {
dst.write_all(&bytes).await.unwrap();
dst.write_all(b"\n").await.unwrap();
}
} else {
if let Ok(bytes) = serde_json::to_vec(&m) {
dst.write_all(&bytes).await.unwrap();
dst.write_all(b"\n").await.unwrap();
}
}
}
}
}
}
}
Update::Single(..) |
Update::Bulk(..) |
Update::Withdraw(..) |
Update::WithdrawBulk(..) |
Update::QueryResult(..) |
Update::UpstreamStatusChange(..) |
Update::Rtr(..) => { }
}
}
}
}
if let Some(dst) = self.target_file.as_mut() {
let _ = dst.flush().await;
}
Ok(())
}
}