skim/
reader.rs

1use crate::global::mark_new_run;
2use crate::item::ItemPool;
3///! Reader is used for reading items from datasource (e.g. stdin or command output)
4///!
5///! After reading in a line, reader will save an item into the pool(items)
6use crate::options::SkimOptions;
7use crate::{SkimItem, SkimItemReceiver};
8use crossbeam_channel::TryRecvError;
9use crossbeam_channel::{unbounded, Select, Sender};
10use std::cell::RefCell;
11use std::rc::Rc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex, TryLockError, Weak};
14use std::thread::{self, sleep, JoinHandle};
15use std::time::Duration;
16
17#[cfg(feature = "malloc_trim")]
18#[cfg(target_os = "linux")]
19#[cfg(target_env = "gnu")]
20use crate::malloc_trim;
21
22pub const ITEMS_INITIAL_CAPACITY: usize = 65_536;
23const SLEEP_FAST: Duration = Duration::from_millis(1);
24const SLEEP_SLOW: Duration = Duration::from_millis(10);
25
26pub trait CommandCollector {
27    /// execute the `cmd` and produce a
28    /// - skim item producer
29    /// - a channel sender, any message send would mean to terminate the `cmd` process (for now).
30    ///
31    /// Internally, the command collector may start several threads(components), the collector
32    /// should add `1` on every thread creation and sub `1` on thread termination. reader would use
33    /// this information to determine whether the collector had stopped or not.
34    fn invoke(
35        &mut self,
36        cmd: &str,
37        components_to_stop: Arc<AtomicUsize>,
38    ) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>);
39}
40
41pub struct ReaderControl {
42    tx_interrupt: Sender<i32>,
43    tx_interrupt_cmd: Option<Sender<i32>>,
44    components_to_stop: Arc<AtomicUsize>,
45    items: Arc<Mutex<Vec<Arc<dyn SkimItem>>>>,
46    thread_reader: Option<JoinHandle<()>>,
47    thread_ingest: Option<JoinHandle<()>>,
48}
49
50impl Drop for ReaderControl {
51    fn drop(&mut self) {
52        self.kill();
53        drop(self.take());
54    }
55}
56
57impl ReaderControl {
58    #[allow(dropping_copy_types)]
59    pub fn kill(&mut self) {
60        debug!(
61            "kill reader, components before: {}",
62            self.components_to_stop.load(Ordering::SeqCst)
63        );
64
65        let _ = self.tx_interrupt_cmd.as_ref().map(|tx| tx.send(1));
66        let _ = self.tx_interrupt.send(1);
67
68        while !self.all_stopped() {}
69
70        let opt_thread_reader = self.thread_reader.take();
71        let opt_thread_ingest = self.thread_ingest.take();
72
73        rayon::spawn(move || {
74            drop(opt_thread_reader.and_then(|reader_handle| reader_handle.join().ok()));
75            drop(opt_thread_ingest.and_then(|ingest_handle| ingest_handle.join().ok()));
76
77            #[cfg(feature = "malloc_trim")]
78            #[cfg(target_os = "linux")]
79            #[cfg(target_env = "gnu")]
80            malloc_trim();
81        });
82    }
83
84    pub fn take(&mut self) -> Vec<Arc<dyn SkimItem>> {
85        if let Ok(mut locked) = self.items.try_lock() {
86            return std::mem::take(&mut locked);
87        }
88
89        Vec::new()
90    }
91
92    pub fn transfer_items(&mut self, item_pool: &Arc<ItemPool>) {
93        if let Ok(mut locked) = self.items.try_lock() {
94            item_pool.append(&mut locked);
95        }
96    }
97
98    pub fn all_stopped(&self) -> bool {
99        self.components_to_stop.load(Ordering::SeqCst) == 0
100    }
101
102    pub fn is_empty(&self) -> bool {
103        if let Ok(locked) = self.items.try_lock() {
104            return locked.is_empty();
105        }
106
107        false
108    }
109
110    pub fn is_done(&self) -> bool {
111        self.all_stopped() && self.is_empty()
112    }
113}
114
115pub struct Reader {
116    cmd_collector: Rc<RefCell<dyn CommandCollector>>,
117    rx_item: Option<SkimItemReceiver>,
118}
119
120impl Reader {
121    pub fn with_options(options: &SkimOptions) -> Self {
122        Self {
123            cmd_collector: options.cmd_collector.clone(),
124            rx_item: None,
125        }
126    }
127
128    pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
129        self.rx_item = rx_item;
130        self
131    }
132
133    pub fn run(&mut self, cmd: &str) -> ReaderControl {
134        mark_new_run(cmd);
135
136        let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
137        let items_strong = Arc::new(Mutex::new(Vec::with_capacity(ITEMS_INITIAL_CAPACITY)));
138        let items_weak = Arc::downgrade(&items_strong);
139
140        let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
141            self.rx_item.take().map(|rx| (rx, None, None)).unwrap_or_else(|| {
142                let components_to_stop_clone = components_to_stop.clone();
143                let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
144                    self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
145                (rx_item, Some(tx_interrupt_cmd), opt_ingest_handle)
146            });
147
148        let components_to_stop_clone = components_to_stop.clone();
149        let (tx_interrupt, thread_reader) = collect_item(components_to_stop_clone, rx_item, items_weak);
150
151        ReaderControl {
152            tx_interrupt,
153            tx_interrupt_cmd,
154            components_to_stop,
155            items: items_strong,
156            thread_reader: Some(thread_reader),
157            thread_ingest: opt_ingest_handle,
158        }
159    }
160}
161
162fn collect_item(
163    components_to_stop: Arc<AtomicUsize>,
164    rx_item: SkimItemReceiver,
165    items_weak: Weak<Mutex<Vec<Arc<dyn SkimItem>>>>,
166) -> (Sender<i32>, JoinHandle<()>) {
167    let (tx_interrupt, rx_interrupt) = unbounded();
168
169    let started = Arc::new(AtomicBool::new(false));
170    let started_clone = started.clone();
171    let thread_reader = thread::spawn(move || {
172        debug!("reader: collect_item start");
173        components_to_stop.fetch_add(1, Ordering::SeqCst);
174        started_clone.store(true, Ordering::SeqCst); // notify parent that it is started
175
176        let mut sel = Select::new();
177        let item_channel = sel.recv(&rx_item);
178        let interrupt_channel = sel.recv(&rx_interrupt);
179        let mut empty_count = 0usize;
180        let Some(items_strong) = Weak::upgrade(&items_weak) else {
181            return;
182        };
183
184        loop {
185            match sel.ready() {
186                i if i == item_channel && !rx_item.is_empty() => {
187                    match items_strong.try_lock() {
188                        Ok(mut locked) => {
189                            let mut flattened = rx_item.try_iter().flatten().collect();
190                            locked.append(&mut flattened);
191                            drop(locked);
192
193                            // slow path
194                            if empty_count >= 1 {
195                                // faster for slow path but not for fast path
196                                sleep(SLEEP_SLOW);
197                                continue;
198                            }
199
200                            // fast path
201                            sleep(SLEEP_FAST);
202                            continue;
203                        }
204                        Err(err) => match err {
205                            TryLockError::Poisoned(_) => {
206                                eprintln!("ERROR: The lock could not be acquired because another thread failed while holding the lock.");
207                                std::process::exit(1)
208                            }
209                            TryLockError::WouldBlock => {
210                                sleep(SLEEP_SLOW);
211                                continue;
212                            }
213                        },
214                    }
215                }
216                i if i == item_channel => match rx_item.try_recv() {
217                    Err(TryRecvError::Disconnected) => break,
218                    Err(TryRecvError::Empty) | _ => {
219                        empty_count += 1;
220                        continue;
221                    }
222                },
223                i if i == interrupt_channel => break,
224                _ => unreachable!(),
225            }
226        }
227
228        components_to_stop.fetch_sub(1, Ordering::SeqCst);
229        debug!("reader: collect_item stop");
230    });
231
232    while !started.load(Ordering::SeqCst) {
233        // busy waiting for the thread to start. (components_to_stop is added)
234    }
235
236    (tx_interrupt, thread_reader)
237}