tomq 0.1.2

jq, but from TOML
use serde_json::Value;
use std::io::{ErrorKind, Read, Write};
use std::panic::resume_unwind;
use std::path::PathBuf;
#[cfg(feature = "threaded")]
use std::{panic, thread::JoinHandle};
use toml_edit::Document;

use crate::bin_finder::find_bin;
use crate::dociter::TomqIter;
use crate::error::Result;
use crate::error::TomqError;
use crate::opts::RootKey;
use crate::output::{output_to_bat, CopyFromReader, Output, OutputWriter};
use crate::tomly::{print_toml_from_json_to_stdout, print_toml_from_json_values};
use crate::{output_to_stdout, TomlOpts};

/// Creates Json Documents from multiple Toml documents and pipe them to the installed **jq** if found,
/// or pipe it to the standard output if **jq** cannot be found.
///
/// This also re-transcode the Json documents to Toml documents if requested.
///
/// If **threaded** feature is enabled, reading **jq** output and feeding to stdout (or *re-trasconding*
/// to Toml and then writing to stdout) will take in place a dedicated Thread, otherwise, it's done
/// right after all the data is fed to **jq**.
pub(crate) fn toml_to_jq(
    docs: impl Iterator<Item = Result<Document>>,
    opts: TomlOpts,
) -> Result<i32> {
    let json_values = create_json_from_toml(docs);

    json_to_jq(json_values, opts)
}

/// Creates Json Documents from multiple Toml documents and pipe them to the stdout.
///
/// This also re-transcode the Json documents to Toml documents if requested.
pub(crate) fn toml_to_json_to_stdout(
    docs: impl Iterator<Item = Result<Document>>,
    opts: TomlOpts,
) -> Result<i32> {
    let json_values = create_json_from_toml(docs);
    print_from_iterator(
        json_values,
        opts.pretty_print,
        &opts.root_key,
        opts.fallback_key,
        &opts.multi_doc_separator,
        opts.skip_invalid,
        opts.null_to_empty_doc,
        opts.bat,
        opts.retranscode,
    )
}

fn create_json_from_toml(docs: impl Iterator<Item = Result<Document>>) -> impl TomqIter<Value> {
    docs.map(|doc| {
        doc.and_then(|doc| toml_edit::de::from_document::<Value>(doc).map_err(TomqError::TomlDe))
    })
}

fn json_to_jq(json_values: impl Iterator<Item = Result<Value>>, opts: TomlOpts) -> Result<i32> {
    let (jq_found, jq) = match find_jq() {
        Ok(jq) => (true, jq),
        Err(_) => (false, PathBuf::from("jq")),
    };

    let mut jq = match std::process::Command::new(jq)
        .stdin(std::process::Stdio::piped())
        .stdout(if opts.retranscode || opts.bat {
            std::process::Stdio::piped()
        } else {
            std::process::Stdio::inherit()
        })
        .args(opts.jq_filter)
        .args(opts.jq_args)
        .spawn()
    {
        Ok(jq) => jq,
        Err(e) if e.kind() == ErrorKind::NotFound && !jq_found => {
            return Err(TomqError::JqNotFound);
        }
        Err(e) => return Err(e.into()),
    };

    let pretty_print = opts.pretty_print;
    let root_key = opts.root_key;
    let fallback_key = opts.fallback_key;
    let multi_doc_separator = opts.multi_doc_separator;
    let bat = opts.bat;
    let retranscode = opts.retranscode;
    let skip_invalid = opts.skip_invalid;
    let null_to_empty_doc = opts.null_to_empty_doc;

    #[cfg(feature = "threaded")]
    let mut tr_join = match jq.stdout.take() {
        None => None,
        Some(stdout) => {
            let pretty_print = pretty_print;
            let root_key = root_key;
            let fallback_key = fallback_key;
            let multi_doc_separator = multi_doc_separator;
            let bat = bat;
            let retranscode = retranscode;
            let skip_invalid = skip_invalid;
            let null_to_empty_doc = null_to_empty_doc;
            let thread: JoinHandle<Result<()>> = std::thread::spawn(move || {
                print_output(
                    stdout,
                    pretty_print,
                    &root_key,
                    fallback_key,
                    &multi_doc_separator,
                    skip_invalid,
                    null_to_empty_doc,
                    bat,
                    retranscode,
                )?;
                Ok(())
            });
            Some(thread)
        }
    };

    // stdin must be dropped, otherwise jq will never exit (since it is waiting for data).
    {
        let mut stdin = jq.stdin.take().unwrap();
        for json_value in json_values {
            let json_value = json_value?;
            match serde_json::ser::to_writer(&mut stdin, &json_value).map_err(|e| {
                TomqError::SerdeWithContext(e, "while writing to jq stdin".to_string())
            }) {
                Err(tqe) =>
                {
                    #[cfg(feature = "threaded")]
                    if let Some(tr_join) = tr_join.take() {
                        let tqe = format!("{tqe}");
                        let _ = tr_join
                            .join()
                            .map_err(|e| match e.downcast_ref::<String>() {
                                None => resume_unwind(Box::new((tqe.clone(), e))),
                                Some(d) => {
                                    TomqError::ChainedFailure(format!("{tqe}"), format!("{d}"))
                                }
                            })?
                            .map_err(|e| {
                                TomqError::ChainedFailure(format!("{tqe}"), format!("{e}"))
                            })?;
                    }
                }
                _ => {}
            }
        }
        stdin.flush()?;
    }

    #[cfg(not(feature = "threaded"))]
    match jq.stdout.take() {
        Some(stdout) => {
            print_output(
                stdout,
                pretty_print,
                &root_key,
                &multi_doc_separator,
                skip_invalid,
                null_to_empty_doc,
                bat,
                retranscode,
            )?;
        }
        _ => {}
    };

    #[cfg(feature = "threaded")]
    {
        match tr_join {
            Some(handle) => match handle.join() {
                Ok(r) => {
                    r?;
                }
                Err(e) => {
                    panic::resume_unwind(e);
                }
            },
            None => {}
        }
    }

    Ok(jq.wait()?.code().unwrap_or(0))
}

fn find_jq() -> Result<PathBuf> {
    Ok(find_bin("jq")?)
}

fn print_output<R: Read>(
    read: R,
    pretty_print: bool,
    root_key: &RootKey,
    fallback_key: Option<String>,
    multi_doc_separator: &str,
    skip_invalid: bool,
    null_to_empty_doc: bool,
    bat: bool,
    re_transcode: bool,
) -> Result<()> {
    if re_transcode {
        print_toml_from_json_to_stdout(
            read,
            pretty_print,
            &root_key,
            fallback_key,
            &multi_doc_separator,
            skip_invalid,
            null_to_empty_doc,
            bat,
        )
    } else {
        print_as_json(read, bat)
    }
}

/// Prints all JSON values to the stdout, re-trascoding if requested.
fn print_from_iterator(
    values: impl Iterator<Item = Result<Value>>,
    pretty_print: bool,
    root_key: &RootKey,
    fallback_key: Option<String>,
    multi_doc_separator: &str,
    skip_invalid: bool,
    null_to_empty_doc: bool,
    bat: bool,
    re_transcode: bool,
) -> Result<i32> {
    fn print_from_iterator_to_output(
        values: impl Iterator<Item = Result<Value>>,
        output: impl Output,
        pretty_print: bool,
    ) -> Result<i32> {
        let mut output = output.create_output_writer()?;
        for (idx, value) in values.enumerate() {
            if idx > 0 {
                output.write_all(b"\n")?;
            }

            let value = value?;
            if pretty_print {
                match serde_json::ser::to_writer_pretty(&mut output, &value) {
                    Err(e) if e.is_io() => {
                        return if output.wait()? == 0 {
                            Ok(0)
                        } else {
                            Err(e.into())
                        }
                    }
                    _ => {}
                }
            } else {
                match serde_json::ser::to_writer(&mut output, &value) {
                    Err(e) if e.is_io() => {
                        return if output.wait()? == 0 {
                            Ok(0)
                        } else {
                            Err(e.into())
                        }
                    }
                    _ => {}
                }
            }
        }

        output.wait()
    }

    if re_transcode {
        print_toml_from_json_values(
            values,
            pretty_print,
            root_key,
            fallback_key,
            multi_doc_separator,
            skip_invalid,
            null_to_empty_doc,
            bat,
        )?;
        Ok(0)
    } else {
        if bat {
            print_from_iterator_to_output(values, output_to_bat("json"), pretty_print)
        } else {
            print_from_iterator_to_output(values, output_to_stdout(), pretty_print)
        }
    }
}

fn print_as_json<R: Read>(child_stdout: R, bat: bool) -> Result<()> {
    if bat {
        print_json(child_stdout, output_to_bat("json"))
    } else {
        print_json(child_stdout, output_to_stdout())
    }
}

fn print_json<R: Read>(mut reader: R, mut output: impl CopyFromReader) -> Result<()> {
    Ok(output.copy_from_reader(&mut reader)?)
}