1use crate::global::mark_new_run;
2use crate::item::ItemPool;
3use crate::options::SkimOptions;
7use crate::{SkimItem, SkimItemReceiver};
8use crossbeam_channel::TryRecvError;
9use crossbeam_channel::{unbounded, Select, Sender};
10use std::cell::RefCell;
11use std::rc::Rc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex, TryLockError, Weak};
14use std::thread::{self, sleep, JoinHandle};
15use std::time::Duration;
16
17#[cfg(feature = "malloc_trim")]
18#[cfg(target_os = "linux")]
19#[cfg(target_env = "gnu")]
20use crate::malloc_trim;
21
22pub const ITEMS_INITIAL_CAPACITY: usize = 65_536;
23const SLEEP_FAST: Duration = Duration::from_millis(1);
24const SLEEP_SLOW: Duration = Duration::from_millis(10);
25
26pub trait CommandCollector {
27 fn invoke(
35 &mut self,
36 cmd: &str,
37 components_to_stop: Arc<AtomicUsize>,
38 ) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>);
39}
40
41pub struct ReaderControl {
42 tx_interrupt: Sender<i32>,
43 tx_interrupt_cmd: Option<Sender<i32>>,
44 components_to_stop: Arc<AtomicUsize>,
45 items: Arc<Mutex<Vec<Arc<dyn SkimItem>>>>,
46 thread_reader: Option<JoinHandle<()>>,
47 thread_ingest: Option<JoinHandle<()>>,
48}
49
50impl Drop for ReaderControl {
51 fn drop(&mut self) {
52 self.kill();
53 drop(self.take());
54 }
55}
56
57impl ReaderControl {
58 #[allow(dropping_copy_types)]
59 pub fn kill(&mut self) {
60 debug!(
61 "kill reader, components before: {}",
62 self.components_to_stop.load(Ordering::SeqCst)
63 );
64
65 let _ = self.tx_interrupt_cmd.as_ref().map(|tx| tx.send(1));
66 let _ = self.tx_interrupt.send(1);
67
68 while !self.all_stopped() {}
69
70 let opt_thread_reader = self.thread_reader.take();
71 let opt_thread_ingest = self.thread_ingest.take();
72
73 rayon::spawn(move || {
74 drop(opt_thread_reader.and_then(|reader_handle| reader_handle.join().ok()));
75 drop(opt_thread_ingest.and_then(|ingest_handle| ingest_handle.join().ok()));
76
77 #[cfg(feature = "malloc_trim")]
78 #[cfg(target_os = "linux")]
79 #[cfg(target_env = "gnu")]
80 malloc_trim();
81 });
82 }
83
84 pub fn take(&mut self) -> Vec<Arc<dyn SkimItem>> {
85 if let Ok(mut locked) = self.items.try_lock() {
86 return std::mem::take(&mut locked);
87 }
88
89 Vec::new()
90 }
91
92 pub fn transfer_items(&mut self, item_pool: &Arc<ItemPool>) {
93 if let Ok(mut locked) = self.items.try_lock() {
94 item_pool.append(&mut locked);
95 }
96 }
97
98 pub fn all_stopped(&self) -> bool {
99 self.components_to_stop.load(Ordering::SeqCst) == 0
100 }
101
102 pub fn is_empty(&self) -> bool {
103 if let Ok(locked) = self.items.try_lock() {
104 return locked.is_empty();
105 }
106
107 false
108 }
109
110 pub fn is_done(&self) -> bool {
111 self.all_stopped() && self.is_empty()
112 }
113}
114
115pub struct Reader {
116 cmd_collector: Rc<RefCell<dyn CommandCollector>>,
117 rx_item: Option<SkimItemReceiver>,
118}
119
120impl Reader {
121 pub fn with_options(options: &SkimOptions) -> Self {
122 Self {
123 cmd_collector: options.cmd_collector.clone(),
124 rx_item: None,
125 }
126 }
127
128 pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
129 self.rx_item = rx_item;
130 self
131 }
132
133 pub fn run(&mut self, cmd: &str) -> ReaderControl {
134 mark_new_run(cmd);
135
136 let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
137 let items_strong = Arc::new(Mutex::new(Vec::with_capacity(ITEMS_INITIAL_CAPACITY)));
138 let items_weak = Arc::downgrade(&items_strong);
139
140 let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
141 self.rx_item.take().map(|rx| (rx, None, None)).unwrap_or_else(|| {
142 let components_to_stop_clone = components_to_stop.clone();
143 let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
144 self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
145 (rx_item, Some(tx_interrupt_cmd), opt_ingest_handle)
146 });
147
148 let components_to_stop_clone = components_to_stop.clone();
149 let (tx_interrupt, thread_reader) = collect_item(components_to_stop_clone, rx_item, items_weak);
150
151 ReaderControl {
152 tx_interrupt,
153 tx_interrupt_cmd,
154 components_to_stop,
155 items: items_strong,
156 thread_reader: Some(thread_reader),
157 thread_ingest: opt_ingest_handle,
158 }
159 }
160}
161
162fn collect_item(
163 components_to_stop: Arc<AtomicUsize>,
164 rx_item: SkimItemReceiver,
165 items_weak: Weak<Mutex<Vec<Arc<dyn SkimItem>>>>,
166) -> (Sender<i32>, JoinHandle<()>) {
167 let (tx_interrupt, rx_interrupt) = unbounded();
168
169 let started = Arc::new(AtomicBool::new(false));
170 let started_clone = started.clone();
171 let thread_reader = thread::spawn(move || {
172 debug!("reader: collect_item start");
173 components_to_stop.fetch_add(1, Ordering::SeqCst);
174 started_clone.store(true, Ordering::SeqCst); let mut sel = Select::new();
177 let item_channel = sel.recv(&rx_item);
178 let interrupt_channel = sel.recv(&rx_interrupt);
179 let mut empty_count = 0usize;
180 let Some(items_strong) = Weak::upgrade(&items_weak) else {
181 return;
182 };
183
184 loop {
185 match sel.ready() {
186 i if i == item_channel && !rx_item.is_empty() => {
187 match items_strong.try_lock() {
188 Ok(mut locked) => {
189 let mut flattened = rx_item.try_iter().flatten().collect();
190 locked.append(&mut flattened);
191 drop(locked);
192
193 if empty_count >= 1 {
195 sleep(SLEEP_SLOW);
197 continue;
198 }
199
200 sleep(SLEEP_FAST);
202 continue;
203 }
204 Err(err) => match err {
205 TryLockError::Poisoned(_) => {
206 eprintln!("ERROR: The lock could not be acquired because another thread failed while holding the lock.");
207 std::process::exit(1)
208 }
209 TryLockError::WouldBlock => {
210 sleep(SLEEP_SLOW);
211 continue;
212 }
213 },
214 }
215 }
216 i if i == item_channel => match rx_item.try_recv() {
217 Err(TryRecvError::Disconnected) => break,
218 Err(TryRecvError::Empty) | _ => {
219 empty_count += 1;
220 continue;
221 }
222 },
223 i if i == interrupt_channel => break,
224 _ => unreachable!(),
225 }
226 }
227
228 components_to_stop.fetch_sub(1, Ordering::SeqCst);
229 debug!("reader: collect_item stop");
230 });
231
232 while !started.load(Ordering::SeqCst) {
233 }
235
236 (tx_interrupt, thread_reader)
237}