skim/
reader.rs

1use crate::global::mark_new_run;
2use crate::item::ItemPool;
3///! Reader is used for reading items from datasource (e.g. stdin or command output)
4///!
5///! After reading in a line, reader will save an item into the pool(items)
6use crate::options::SkimOptions;
7use crate::{SkimItem, SkimItemReceiver};
8use crossbeam_channel::TryRecvError;
9use crossbeam_channel::{unbounded, Select, Sender};
10use std::cell::RefCell;
11use std::rc::Rc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::sync::{Arc, RwLock, TryLockError, Weak};
14use std::thread::{self, sleep, JoinHandle};
15use std::time::Duration;
16
17#[cfg(feature = "malloc_trim")]
18#[cfg(target_os = "linux")]
19#[cfg(target_env = "gnu")]
20use crate::malloc_trim;
21
22pub const ITEMS_INITIAL_CAPACITY: usize = 65_536;
23const SLEEP_FAST: Duration = Duration::from_millis(1);
24const SLEEP_SLOW: Duration = Duration::from_millis(10);
25
26pub trait CommandCollector {
27    /// execute the `cmd` and produce a
28    /// - skim item producer
29    /// - a channel sender, any message send would mean to terminate the `cmd` process (for now).
30    ///
31    /// Internally, the command collector may start several threads(components), the collector
32    /// should add `1` on every thread creation and sub `1` on thread termination. reader would use
33    /// this information to determine whether the collector had stopped or not.
34    fn invoke(
35        &mut self,
36        cmd: &str,
37        components_to_stop: Arc<AtomicUsize>,
38    ) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>);
39}
40
41pub struct ReaderControl {
42    tx_interrupt: Sender<i32>,
43    tx_interrupt_cmd: Option<Sender<i32>>,
44    components_to_stop: Arc<AtomicUsize>,
45    items: Arc<RwLock<Vec<Arc<dyn SkimItem>>>>,
46    thread_reader: Option<JoinHandle<()>>,
47    thread_ingest: Option<JoinHandle<()>>,
48}
49
50impl Drop for ReaderControl {
51    fn drop(&mut self) {
52        self.kill();
53        drop(self.take());
54    }
55}
56
57impl ReaderControl {
58    #[allow(dropping_copy_types)]
59    pub fn kill(&mut self) {
60        debug!(
61            "kill reader, components before: {}",
62            self.components_to_stop.load(Ordering::SeqCst)
63        );
64
65        let _ = self.tx_interrupt_cmd.as_ref().map(|tx| tx.send(1));
66        let _ = self.tx_interrupt.send(1);
67
68        while !self.all_stopped() {}
69
70        let opt_thread_reader = self.thread_reader.take();
71        let opt_thread_ingest = self.thread_ingest.take();
72
73        rayon::spawn(move || {
74            let _ = drop(opt_thread_reader.and_then(|reader_handle| reader_handle.join().ok()));
75            let _ = drop(opt_thread_ingest.and_then(|ingest_handle| ingest_handle.join().ok()));
76            #[cfg(feature = "malloc_trim")]
77            #[cfg(target_os = "linux")]
78            #[cfg(target_env = "gnu")]
79            malloc_trim();
80        });
81    }
82
83    pub fn take(&mut self) -> Vec<Arc<dyn SkimItem>> {
84        if let Ok(mut locked) = self.items.write() {
85            return std::mem::take(&mut locked);
86        }
87
88        Vec::new()
89    }
90
91    pub fn transfer_items(&mut self, item_pool: &Arc<ItemPool>) {
92        if let Ok(mut locked) = self.items.try_write() {
93            item_pool.append(&mut locked);
94        }
95    }
96
97    pub fn all_stopped(&self) -> bool {
98        self.components_to_stop.load(Ordering::SeqCst) == 0
99    }
100
101    pub fn is_empty(&self) -> bool {
102        if let Ok(locked) = self.items.read() {
103            return locked.is_empty();
104        }
105
106        false
107    }
108
109    pub fn is_done(&self) -> bool {
110        self.all_stopped() && self.is_empty()
111    }
112}
113
114pub struct Reader {
115    cmd_collector: Rc<RefCell<dyn CommandCollector>>,
116    rx_item: Option<SkimItemReceiver>,
117}
118
119impl Reader {
120    pub fn with_options(options: &SkimOptions) -> Self {
121        Self {
122            cmd_collector: options.cmd_collector.clone(),
123            rx_item: None,
124        }
125    }
126
127    pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
128        self.rx_item = rx_item;
129        self
130    }
131
132    pub fn run(&mut self, cmd: &str) -> ReaderControl {
133        mark_new_run(cmd);
134
135        let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
136        let items_strong = Arc::new(RwLock::new(Vec::with_capacity(ITEMS_INITIAL_CAPACITY)));
137        let items_weak = Arc::downgrade(&items_strong);
138
139        let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
140            self.rx_item.take().map(|rx| (rx, None, None)).unwrap_or_else(|| {
141                let components_to_stop_clone = components_to_stop.clone();
142                let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
143                    self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
144                (rx_item, Some(tx_interrupt_cmd), opt_ingest_handle)
145            });
146
147        let components_to_stop_clone = components_to_stop.clone();
148        let (tx_interrupt, thread_reader) = collect_item(components_to_stop_clone, rx_item, items_weak);
149
150        ReaderControl {
151            tx_interrupt,
152            tx_interrupt_cmd,
153            components_to_stop,
154            items: items_strong,
155            thread_reader: Some(thread_reader),
156            thread_ingest: opt_ingest_handle,
157        }
158    }
159}
160
161fn collect_item(
162    components_to_stop: Arc<AtomicUsize>,
163    rx_item: SkimItemReceiver,
164    items_weak: Weak<RwLock<Vec<Arc<dyn SkimItem>>>>,
165) -> (Sender<i32>, JoinHandle<()>) {
166    let (tx_interrupt, rx_interrupt) = unbounded();
167
168    let started = Arc::new(AtomicBool::new(false));
169    let started_clone = started.clone();
170    let thread_reader = thread::spawn(move || {
171        debug!("reader: collect_item start");
172        components_to_stop.fetch_add(1, Ordering::SeqCst);
173        started_clone.store(true, Ordering::SeqCst); // notify parent that it is started
174
175        let mut sel = Select::new();
176        let item_channel = sel.recv(&rx_item);
177        let interrupt_channel = sel.recv(&rx_interrupt);
178        let mut empty_count = 0usize;
179
180        if let Some(items_strong) = Weak::upgrade(&items_weak) {
181            loop {
182                match sel.ready() {
183                    i if i == item_channel && !rx_item.is_empty() => {
184                        match items_strong.try_write() {
185                            Ok(mut locked) => {
186                                locked.extend(rx_item.try_iter());
187                                drop(locked);
188
189                                // slow path
190                                if empty_count >= 1 {
191                                    // faster for slow path but not for fast path
192                                    sleep(SLEEP_SLOW);
193                                    continue;
194                                }
195
196                                // fast path
197                                sleep(SLEEP_FAST);
198                                continue;
199                            }
200                            Err(err) => match err {
201                                TryLockError::Poisoned(_) => {
202                                    eprintln!("ERROR: The lock could not be acquired because another thread failed while holding the lock.");
203                                    std::process::exit(1)
204                                }
205                                TryLockError::WouldBlock => {
206                                    sleep(SLEEP_SLOW);
207                                    continue;
208                                }
209                            },
210                        }
211                    }
212                    i if i == item_channel => match rx_item.try_recv() {
213                        Err(TryRecvError::Disconnected) => break,
214                        _ => {
215                            empty_count += 1;
216                            continue;
217                        }
218                    },
219                    i if i == interrupt_channel && !rx_item.is_empty() => continue,
220                    i if i == interrupt_channel => break,
221                    _ => unreachable!(),
222                }
223            }
224        }
225
226        components_to_stop.fetch_sub(1, Ordering::SeqCst);
227        debug!("reader: collect_item stop");
228    });
229
230    while !started.load(Ordering::SeqCst) {
231        // busy waiting for the thread to start. (components_to_stop is added)
232    }
233
234    (tx_interrupt, thread_reader)
235}