1use std::env;
3use std::error::Error;
4use std::io::{BufRead, BufReader};
5
6use std::process::{Child, Command, Stdio};
7use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10
11use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
12use regex::Regex;
13
14use crate::field::FieldRange;
15use crate::helper::ingest::{ingest_loop, BuildOptions, SendRawOrBuild};
16use crate::reader::CommandCollector;
17use crate::{SkimItem, SkimItemReceiver, SkimItemSender};
18
19#[cfg(feature = "malloc_trim")]
20#[cfg(target_os = "linux")]
21#[cfg(target_env = "gnu")]
22use crate::malloc_trim;
23
24const CMD_CHANNEL_SIZE: usize = 1_024;
25const DELIMITER_STR: &str = r"[\t\n ]+";
26
27pub enum CollectorInput {
28 Pipe(Box<dyn BufRead + Send>),
29 Command(String),
30}
31
32#[derive(Debug)]
33pub struct SkimItemReaderOption {
34 use_ansi_color: bool,
35 transform_fields: Vec<FieldRange>,
36 matching_fields: Vec<FieldRange>,
37 delimiter: Regex,
38 line_ending: u8,
39 show_error: bool,
40}
41
42impl Default for SkimItemReaderOption {
43 fn default() -> Self {
44 Self {
45 line_ending: b'\n',
46 use_ansi_color: false,
47 transform_fields: Vec::new(),
48 matching_fields: Vec::new(),
49 delimiter: Regex::new(DELIMITER_STR).unwrap(),
50 show_error: false,
51 }
52 }
53}
54
55impl SkimItemReaderOption {
56 pub fn line_ending(mut self, line_ending: u8) -> Self {
57 self.line_ending = line_ending;
58 self
59 }
60
61 pub fn ansi(mut self, enable: bool) -> Self {
62 self.use_ansi_color = enable;
63 self
64 }
65
66 pub fn delimiter(mut self, delimiter: &str) -> Self {
67 if !delimiter.is_empty() {
68 self.delimiter = Regex::new(delimiter).unwrap_or_else(|_| Regex::new(DELIMITER_STR).unwrap());
69 }
70 self
71 }
72
73 pub fn with_nth(mut self, with_nth: &str) -> Self {
74 if !with_nth.is_empty() {
75 self.transform_fields = with_nth.split(',').filter_map(FieldRange::from_str).collect();
76 }
77 self
78 }
79
80 pub fn transform_fields(mut self, transform_fields: Vec<FieldRange>) -> Self {
81 self.transform_fields = transform_fields;
82 self
83 }
84
85 pub fn nth(mut self, nth: &str) -> Self {
86 if !nth.is_empty() {
87 self.matching_fields = nth.split(',').filter_map(FieldRange::from_str).collect();
88 }
89 self
90 }
91
92 pub fn matching_fields(mut self, matching_fields: Vec<FieldRange>) -> Self {
93 self.matching_fields = matching_fields;
94 self
95 }
96
97 pub fn read0(mut self, enable: bool) -> Self {
98 if enable {
99 self.line_ending = b'\0';
100 } else {
101 self.line_ending = b'\n';
102 }
103 self
104 }
105
106 pub fn show_error(mut self, show_error: bool) -> Self {
107 self.show_error = show_error;
108 self
109 }
110
111 pub fn build(self) -> Self {
112 self
113 }
114
115 pub fn is_simple(&self) -> bool {
116 !self.use_ansi_color && self.matching_fields.is_empty() && self.transform_fields.is_empty()
117 }
118}
119
120pub struct SkimItemReader {
121 option: Arc<SkimItemReaderOption>,
122}
123
124impl Default for SkimItemReader {
125 fn default() -> Self {
126 Self {
127 option: Arc::new(Default::default()),
128 }
129 }
130}
131
132impl SkimItemReader {
133 pub fn new(option: SkimItemReaderOption) -> Self {
134 Self {
135 option: Arc::new(option),
136 }
137 }
138
139 pub fn option(mut self, option: SkimItemReaderOption) -> Self {
140 self.option = Arc::new(option);
141 self
142 }
143}
144
145impl SkimItemReader {
146 pub fn of_bufread(&self, source: Box<dyn BufRead + Send>) -> (SkimItemReceiver, Option<JoinHandle<()>>) {
147 if self.option.is_simple() {
148 self.raw_bufread(source)
149 } else {
150 let (rx_item, _tx_item, opt_ingest_handle) = self
151 .read_and_collect_from_command(Arc::new(AtomicUsize::new(0)), CollectorInput::Pipe(Box::new(source)));
152 (rx_item, opt_ingest_handle)
153 }
154 }
155
156 fn raw_bufread(&self, source: Box<dyn BufRead + Send>) -> (SkimItemReceiver, Option<JoinHandle<()>>) {
158 let (tx_item, rx_item): (SkimItemSender, SkimItemReceiver) = unbounded();
159 let line_ending = self.option.line_ending;
160
161 let ingest_handle = thread::spawn(move || {
162 ingest_loop(source, line_ending, &tx_item, &SendRawOrBuild::Raw);
163
164 #[cfg(feature = "malloc_trim")]
165 #[cfg(target_os = "linux")]
166 #[cfg(target_env = "gnu")]
167 malloc_trim();
168 });
169
170 (rx_item, Some(ingest_handle))
171 }
172
173 #[allow(clippy::type_complexity)]
176 fn read_and_collect_from_command(
177 &self,
178 components_to_stop: Arc<AtomicUsize>,
179 input: CollectorInput,
180 ) -> (Receiver<Vec<Arc<dyn SkimItem>>>, Sender<i32>, Option<JoinHandle<()>>) {
181 let (tx_interrupt, rx_interrupt) = bounded(CMD_CHANNEL_SIZE);
182 let (tx_item, rx_item): (SkimItemSender, SkimItemReceiver) = unbounded();
183
184 match input {
185 CollectorInput::Pipe(source) => {
186 let started = Arc::new(AtomicBool::new(false));
187 let started_clone = started.clone();
188 let tx_interrupt_clone = tx_interrupt.clone();
189 let option = self.option.clone();
190 let ingest_handle = thread::spawn(move || {
191 debug!("collector: command collector start");
192 components_to_stop.fetch_add(1, Ordering::SeqCst);
193 started_clone.store(true, Ordering::SeqCst);
194 let opts = BuildOptions {
197 ansi_enabled: option.use_ansi_color,
198 trans_fields: &option.transform_fields,
199 matching_fields: &option.matching_fields,
200 delimiter: &option.delimiter,
201 };
202
203 ingest_loop(source, option.line_ending, &tx_item, &SendRawOrBuild::Build(opts));
204
205 let _ = tx_interrupt_clone.send(1); components_to_stop.fetch_sub(1, Ordering::SeqCst);
207 debug!("collector: command collector stop");
208 });
209
210 while !started.load(Ordering::SeqCst) {
211 }
213
214 (rx_item, tx_interrupt, Some(ingest_handle))
215 }
216 CollectorInput::Command(cmd) => {
217 let command = get_command_output(&cmd).expect("command not found").0;
218
219 let started = Arc::new(AtomicBool::new(false));
220 let started_clone = started.clone();
221 let components_to_stop_clone = components_to_stop;
222 let send_error = self.option.show_error;
223 let ingest_handle = thread::spawn(move || {
225 debug!("collector: command killer start");
226 components_to_stop_clone.fetch_add(1, Ordering::SeqCst);
227 started_clone.store(true, Ordering::SeqCst); let _ = rx_interrupt.recv(); if let Some(mut child) = command {
231 let _ = child.kill();
233 let _ = child.wait();
234
235 if send_error {
236 let has_error = child
237 .try_wait()
238 .map(|os| os.map(|s| !s.success()).unwrap_or(true))
239 .unwrap_or(false);
240 if has_error {
241 let output = child.wait_with_output().expect("could not retrieve error message");
242 for line in String::from_utf8_lossy(&output.stderr).lines() {
243 let _ = tx_item.send(vec![Arc::new(line.to_string())]);
244 }
245 }
246 }
247 }
248
249 components_to_stop_clone.fetch_sub(1, Ordering::SeqCst);
250 debug!("collector: command killer stop");
251 });
252
253 while !started.load(Ordering::SeqCst) {
254 }
256
257 (rx_item, tx_interrupt, Some(ingest_handle))
258 }
259 }
260 }
261}
262
263impl CommandCollector for SkimItemReader {
264 fn invoke(
265 &mut self,
266 cmd: &str,
267 components_to_stop: Arc<AtomicUsize>,
268 ) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>) {
269 self.read_and_collect_from_command(components_to_stop, CollectorInput::Command(cmd.to_string()))
270 }
271}
272
273type CommandOutput = (Option<Child>, Box<dyn BufRead + Send>);
274
275fn get_command_output(cmd: &str) -> Result<CommandOutput, Box<dyn Error>> {
276 let shell = env::var("SHELL").unwrap_or_else(|_| "sh".to_string());
277 let mut command: Child = Command::new(shell)
278 .arg("-c")
279 .arg(cmd)
280 .stdout(Stdio::piped())
281 .stderr(Stdio::piped())
282 .spawn()?;
283
284 let stdout = command
285 .stdout
286 .take()
287 .ok_or_else(|| "command output: unwrap failed".to_owned())?;
288
289 Ok((Some(command), Box::new(BufReader::new(stdout))))
290}