use std::env;
use std::error::Error;
use std::io::{BufRead, BufReader};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use regex::Regex;
use crate::field::FieldRange;
use crate::helper::ingest::{ingest_loop, BuildOptions, SendRawOrBuild};
use crate::reader::CommandCollector;
use crate::{SkimItem, SkimItemReceiver, SkimItemSender};
const CMD_CHANNEL_SIZE: usize = 1_024;
const DELIMITER_STR: &str = r"[\t\n ]+";
pub enum CollectorInput {
Pipe(Box<dyn BufRead + Send>),
Command(String),
}
#[derive(Debug)]
pub struct SkimItemReaderOption {
use_ansi_color: bool,
transform_fields: Vec<FieldRange>,
matching_fields: Vec<FieldRange>,
delimiter: Regex,
line_ending: u8,
show_error: bool,
}
impl Default for SkimItemReaderOption {
fn default() -> Self {
Self {
line_ending: b'\n',
use_ansi_color: false,
transform_fields: Vec::new(),
matching_fields: Vec::new(),
delimiter: Regex::new(DELIMITER_STR).unwrap(),
show_error: false,
}
}
}
impl SkimItemReaderOption {
pub fn line_ending(mut self, line_ending: u8) -> Self {
self.line_ending = line_ending;
self
}
pub fn ansi(mut self, enable: bool) -> Self {
self.use_ansi_color = enable;
self
}
pub fn delimiter(mut self, delimiter: &str) -> Self {
if !delimiter.is_empty() {
self.delimiter = Regex::new(delimiter).unwrap_or_else(|_| Regex::new(DELIMITER_STR).unwrap());
}
self
}
pub fn with_nth(mut self, with_nth: &str) -> Self {
if !with_nth.is_empty() {
self.transform_fields = with_nth.split(',').filter_map(FieldRange::from_str).collect();
}
self
}
pub fn transform_fields(mut self, transform_fields: Vec<FieldRange>) -> Self {
self.transform_fields = transform_fields;
self
}
pub fn nth(mut self, nth: &str) -> Self {
if !nth.is_empty() {
self.matching_fields = nth.split(',').filter_map(FieldRange::from_str).collect();
}
self
}
pub fn matching_fields(mut self, matching_fields: Vec<FieldRange>) -> Self {
self.matching_fields = matching_fields;
self
}
pub fn read0(mut self, enable: bool) -> Self {
if enable {
self.line_ending = b'\0';
} else {
self.line_ending = b'\n';
}
self
}
pub fn show_error(mut self, show_error: bool) -> Self {
self.show_error = show_error;
self
}
pub fn build(self) -> Self {
self
}
pub fn is_simple(&self) -> bool {
!self.use_ansi_color && self.matching_fields.is_empty() && self.transform_fields.is_empty()
}
}
pub struct SkimItemReader {
option: Arc<SkimItemReaderOption>,
}
impl Default for SkimItemReader {
fn default() -> Self {
Self {
option: Arc::new(Default::default()),
}
}
}
impl SkimItemReader {
pub fn new(option: SkimItemReaderOption) -> Self {
Self {
option: Arc::new(option),
}
}
pub fn option(mut self, option: SkimItemReaderOption) -> Self {
self.option = Arc::new(option);
self
}
}
impl SkimItemReader {
pub fn of_bufread(&self, source: Box<dyn BufRead + Send>) -> (SkimItemReceiver, Option<JoinHandle<()>>) {
if self.option.is_simple() {
self.raw_bufread(source)
} else {
let (rx_item, _tx_item, opt_ingest_handle) = self
.read_and_collect_from_command(Arc::new(AtomicUsize::new(0)), CollectorInput::Pipe(Box::new(source)));
(rx_item, opt_ingest_handle)
}
}
fn raw_bufread(&self, source: Box<dyn BufRead + Send>) -> (SkimItemReceiver, Option<JoinHandle<()>>) {
let (tx_item, rx_item): (SkimItemSender, SkimItemReceiver) = unbounded();
let line_ending = self.option.line_ending;
let ingest_handle = thread::spawn(move || {
ingest_loop(source, line_ending, tx_item, SendRawOrBuild::Raw);
});
(rx_item, Some(ingest_handle))
}
#[allow(clippy::type_complexity)]
fn read_and_collect_from_command(
&self,
components_to_stop: Arc<AtomicUsize>,
input: CollectorInput,
) -> (Receiver<Arc<dyn SkimItem>>, Sender<i32>, Option<JoinHandle<()>>) {
let (tx_interrupt, rx_interrupt) = bounded(CMD_CHANNEL_SIZE);
let (tx_item, rx_item): (SkimItemSender, SkimItemReceiver) = unbounded();
match input {
CollectorInput::Pipe(source) => {
let started = Arc::new(AtomicBool::new(false));
let started_clone = started.clone();
let tx_interrupt_clone = tx_interrupt.clone();
let option = self.option.clone();
let ingest_handle = thread::spawn(move || {
debug!("collector: command collector start");
components_to_stop.fetch_add(1, Ordering::SeqCst);
started_clone.store(true, Ordering::SeqCst);
let opts = BuildOptions {
ansi_enabled: option.use_ansi_color,
trans_fields: &option.transform_fields,
matching_fields: &option.matching_fields,
delimiter: &option.delimiter,
};
ingest_loop(source, option.line_ending, tx_item, SendRawOrBuild::Build(opts));
let _ = tx_interrupt_clone.send(1); components_to_stop.fetch_sub(1, Ordering::SeqCst);
debug!("collector: command collector stop");
});
while !started.load(Ordering::SeqCst) {
}
(rx_item, tx_interrupt, Some(ingest_handle))
}
CollectorInput::Command(cmd) => {
let command = get_command_output(&cmd).expect("command not found").0;
let started = Arc::new(AtomicBool::new(false));
let started_clone = started.clone();
let components_to_stop_clone = components_to_stop;
let send_error = self.option.show_error;
let ingest_handle = thread::spawn(move || {
debug!("collector: command killer start");
components_to_stop_clone.fetch_add(1, Ordering::SeqCst);
started_clone.store(true, Ordering::SeqCst);
let _ = rx_interrupt.recv(); if let Some(mut child) = command {
let _ = child.kill();
let _ = child.wait();
if send_error {
let has_error = child
.try_wait()
.map(|os| os.map(|s| !s.success()).unwrap_or(true))
.unwrap_or(false);
if has_error {
let output = child.wait_with_output().expect("could not retrieve error message");
for line in String::from_utf8_lossy(&output.stderr).lines() {
let _ = tx_item.send(Arc::new(line.to_string()));
}
}
}
}
components_to_stop_clone.fetch_sub(1, Ordering::SeqCst);
debug!("collector: command killer stop");
});
while !started.load(Ordering::SeqCst) {
}
(rx_item, tx_interrupt, Some(ingest_handle))
}
}
}
}
impl CommandCollector for SkimItemReader {
fn invoke(
&mut self,
cmd: &str,
components_to_stop: Arc<AtomicUsize>,
) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>) {
self.read_and_collect_from_command(components_to_stop, CollectorInput::Command(cmd.to_string()))
}
}
type CommandOutput = (Option<Child>, Box<dyn BufRead + Send>);
fn get_command_output(cmd: &str) -> Result<CommandOutput, Box<dyn Error>> {
let shell = env::var("SHELL").unwrap_or_else(|_| "sh".to_string());
let mut command: Child = Command::new(shell)
.arg("-c")
.arg(cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = command
.stdout
.take()
.ok_or_else(|| "command output: unwrap failed".to_owned())?;
Ok((Some(command), Box::new(BufReader::new(stdout))))
}