skim/helper/
item_reader.rs

1/// helper for turn a BufRead into a skim stream
2use std::env;
3use std::error::Error;
4use std::io::{BufRead, BufReader};
5
6use std::process::{Child, Command, Stdio};
7use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10
11use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
12use regex::Regex;
13
14use crate::field::FieldRange;
15use crate::helper::ingest::{ingest_loop, BuildOptions, SendRawOrBuild};
16use crate::reader::CommandCollector;
17use crate::{SkimItem, SkimItemReceiver, SkimItemSender};
18
19#[cfg(feature = "malloc_trim")]
20#[cfg(target_os = "linux")]
21#[cfg(target_env = "gnu")]
22use crate::malloc_trim;
23
24const CMD_CHANNEL_SIZE: usize = 1_024;
25const DELIMITER_STR: &str = r"[\t\n ]+";
26
27pub enum CollectorInput {
28    Pipe(Box<dyn BufRead + Send>),
29    Command(String),
30}
31
32#[derive(Debug)]
33pub struct SkimItemReaderOption {
34    use_ansi_color: bool,
35    transform_fields: Vec<FieldRange>,
36    matching_fields: Vec<FieldRange>,
37    delimiter: Regex,
38    line_ending: u8,
39    show_error: bool,
40}
41
42impl Default for SkimItemReaderOption {
43    fn default() -> Self {
44        Self {
45            line_ending: b'\n',
46            use_ansi_color: false,
47            transform_fields: Vec::new(),
48            matching_fields: Vec::new(),
49            delimiter: Regex::new(DELIMITER_STR).unwrap(),
50            show_error: false,
51        }
52    }
53}
54
55impl SkimItemReaderOption {
56    pub fn line_ending(mut self, line_ending: u8) -> Self {
57        self.line_ending = line_ending;
58        self
59    }
60
61    pub fn ansi(mut self, enable: bool) -> Self {
62        self.use_ansi_color = enable;
63        self
64    }
65
66    pub fn delimiter(mut self, delimiter: &str) -> Self {
67        if !delimiter.is_empty() {
68            self.delimiter = Regex::new(delimiter).unwrap_or_else(|_| Regex::new(DELIMITER_STR).unwrap());
69        }
70        self
71    }
72
73    pub fn with_nth(mut self, with_nth: &str) -> Self {
74        if !with_nth.is_empty() {
75            self.transform_fields = with_nth.split(',').filter_map(FieldRange::from_str).collect();
76        }
77        self
78    }
79
80    pub fn transform_fields(mut self, transform_fields: Vec<FieldRange>) -> Self {
81        self.transform_fields = transform_fields;
82        self
83    }
84
85    pub fn nth(mut self, nth: &str) -> Self {
86        if !nth.is_empty() {
87            self.matching_fields = nth.split(',').filter_map(FieldRange::from_str).collect();
88        }
89        self
90    }
91
92    pub fn matching_fields(mut self, matching_fields: Vec<FieldRange>) -> Self {
93        self.matching_fields = matching_fields;
94        self
95    }
96
97    pub fn read0(mut self, enable: bool) -> Self {
98        if enable {
99            self.line_ending = b'\0';
100        } else {
101            self.line_ending = b'\n';
102        }
103        self
104    }
105
106    pub fn show_error(mut self, show_error: bool) -> Self {
107        self.show_error = show_error;
108        self
109    }
110
111    pub fn build(self) -> Self {
112        self
113    }
114
115    pub fn is_simple(&self) -> bool {
116        !self.use_ansi_color && self.matching_fields.is_empty() && self.transform_fields.is_empty()
117    }
118}
119
120pub struct SkimItemReader {
121    option: Arc<SkimItemReaderOption>,
122}
123
124impl Default for SkimItemReader {
125    fn default() -> Self {
126        Self {
127            option: Arc::new(Default::default()),
128        }
129    }
130}
131
132impl SkimItemReader {
133    pub fn new(option: SkimItemReaderOption) -> Self {
134        Self {
135            option: Arc::new(option),
136        }
137    }
138
139    pub fn option(mut self, option: SkimItemReaderOption) -> Self {
140        self.option = Arc::new(option);
141        self
142    }
143}
144
145impl SkimItemReader {
146    pub fn of_bufread(&self, source: Box<dyn BufRead + Send>) -> (SkimItemReceiver, Option<JoinHandle<()>>) {
147        if self.option.is_simple() {
148            self.raw_bufread(source)
149        } else {
150            let (rx_item, _tx_item, opt_ingest_handle) = self
151                .read_and_collect_from_command(Arc::new(AtomicUsize::new(0)), CollectorInput::Pipe(Box::new(source)));
152            (rx_item, opt_ingest_handle)
153        }
154    }
155
156    /// helper: convert bufread into SkimItemReceiver
157    fn raw_bufread(&self, source: Box<dyn BufRead + Send>) -> (SkimItemReceiver, Option<JoinHandle<()>>) {
158        let (tx_item, rx_item): (SkimItemSender, SkimItemReceiver) = unbounded();
159        let line_ending = self.option.line_ending;
160
161        let ingest_handle = thread::spawn(move || {
162            ingest_loop(source, line_ending, &tx_item, &SendRawOrBuild::Raw);
163
164            #[cfg(feature = "malloc_trim")]
165            #[cfg(target_os = "linux")]
166            #[cfg(target_env = "gnu")]
167            malloc_trim();
168        });
169
170        (rx_item, Some(ingest_handle))
171    }
172
173    /// components_to_stop == 0 => all the threads have been stopped
174    /// return (channel_for_receive_item, channel_to_stop_command)
175    #[allow(clippy::type_complexity)]
176    fn read_and_collect_from_command(
177        &self,
178        components_to_stop: Arc<AtomicUsize>,
179        input: CollectorInput,
180    ) -> (Receiver<Vec<Arc<dyn SkimItem>>>, Sender<i32>, Option<JoinHandle<()>>) {
181        let (tx_interrupt, rx_interrupt) = bounded(CMD_CHANNEL_SIZE);
182        let (tx_item, rx_item): (SkimItemSender, SkimItemReceiver) = unbounded();
183
184        match input {
185            CollectorInput::Pipe(source) => {
186                let started = Arc::new(AtomicBool::new(false));
187                let started_clone = started.clone();
188                let tx_interrupt_clone = tx_interrupt.clone();
189                let option = self.option.clone();
190                let ingest_handle = thread::spawn(move || {
191                    debug!("collector: command collector start");
192                    components_to_stop.fetch_add(1, Ordering::SeqCst);
193                    started_clone.store(true, Ordering::SeqCst);
194                    // notify parent that it is started
195
196                    let opts = BuildOptions {
197                        ansi_enabled: option.use_ansi_color,
198                        trans_fields: &option.transform_fields,
199                        matching_fields: &option.matching_fields,
200                        delimiter: &option.delimiter,
201                    };
202
203                    ingest_loop(source, option.line_ending, &tx_item, &SendRawOrBuild::Build(opts));
204
205                    let _ = tx_interrupt_clone.send(1); // ensure the waiting thread will exit
206                    components_to_stop.fetch_sub(1, Ordering::SeqCst);
207                    debug!("collector: command collector stop");
208                });
209
210                while !started.load(Ordering::SeqCst) {
211                    // busy waiting for the thread to start. (components_to_stop is added)
212                }
213
214                (rx_item, tx_interrupt, Some(ingest_handle))
215            }
216            CollectorInput::Command(cmd) => {
217                let command = get_command_output(&cmd).expect("command not found").0;
218
219                let started = Arc::new(AtomicBool::new(false));
220                let started_clone = started.clone();
221                let components_to_stop_clone = components_to_stop;
222                let send_error = self.option.show_error;
223                // listening to close signal and kill command if needed
224                let ingest_handle = thread::spawn(move || {
225                    debug!("collector: command killer start");
226                    components_to_stop_clone.fetch_add(1, Ordering::SeqCst);
227                    started_clone.store(true, Ordering::SeqCst); // notify parent that it is started
228
229                    let _ = rx_interrupt.recv(); // block waiting
230                    if let Some(mut child) = command {
231                        // clean up resources
232                        let _ = child.kill();
233                        let _ = child.wait();
234
235                        if send_error {
236                            let has_error = child
237                                .try_wait()
238                                .map(|os| os.map(|s| !s.success()).unwrap_or(true))
239                                .unwrap_or(false);
240                            if has_error {
241                                let output = child.wait_with_output().expect("could not retrieve error message");
242                                for line in String::from_utf8_lossy(&output.stderr).lines() {
243                                    let _ = tx_item.send(vec![Arc::new(line.to_string())]);
244                                }
245                            }
246                        }
247                    }
248
249                    components_to_stop_clone.fetch_sub(1, Ordering::SeqCst);
250                    debug!("collector: command killer stop");
251                });
252
253                while !started.load(Ordering::SeqCst) {
254                    // busy waiting for the thread to start. (components_to_stop is added)
255                }
256
257                (rx_item, tx_interrupt, Some(ingest_handle))
258            }
259        }
260    }
261}
262
263impl CommandCollector for SkimItemReader {
264    fn invoke(
265        &mut self,
266        cmd: &str,
267        components_to_stop: Arc<AtomicUsize>,
268    ) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>) {
269        self.read_and_collect_from_command(components_to_stop, CollectorInput::Command(cmd.to_string()))
270    }
271}
272
273type CommandOutput = (Option<Child>, Box<dyn BufRead + Send>);
274
275fn get_command_output(cmd: &str) -> Result<CommandOutput, Box<dyn Error>> {
276    let shell = env::var("SHELL").unwrap_or_else(|_| "sh".to_string());
277    let mut command: Child = Command::new(shell)
278        .arg("-c")
279        .arg(cmd)
280        .stdout(Stdio::piped())
281        .stderr(Stdio::piped())
282        .spawn()?;
283
284    let stdout = command
285        .stdout
286        .take()
287        .ok_or_else(|| "command output: unwrap failed".to_owned())?;
288
289    Ok((Some(command), Box::new(BufReader::new(stdout))))
290}