1use crate::global::mark_new_run;
2use crate::item::ItemPool;
3use crate::options::SkimOptions;
7use crate::{SkimItem, SkimItemReceiver};
8use crossbeam_channel::TryRecvError;
9use crossbeam_channel::{unbounded, Select, Sender};
10use std::cell::RefCell;
11use std::rc::Rc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::sync::{Arc, RwLock, TryLockError, Weak};
14use std::thread::{self, sleep, JoinHandle};
15use std::time::Duration;
16
17#[cfg(feature = "malloc_trim")]
18#[cfg(target_os = "linux")]
19#[cfg(target_env = "gnu")]
20use crate::malloc_trim;
21
22pub const ITEMS_INITIAL_CAPACITY: usize = 65_536;
23const SLEEP_FAST: Duration = Duration::from_millis(1);
24const SLEEP_SLOW: Duration = Duration::from_millis(10);
25
26pub trait CommandCollector {
27 fn invoke(
35 &mut self,
36 cmd: &str,
37 components_to_stop: Arc<AtomicUsize>,
38 ) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>);
39}
40
41pub struct ReaderControl {
42 tx_interrupt: Sender<i32>,
43 tx_interrupt_cmd: Option<Sender<i32>>,
44 components_to_stop: Arc<AtomicUsize>,
45 items: Arc<RwLock<Vec<Arc<dyn SkimItem>>>>,
46 thread_reader: Option<JoinHandle<()>>,
47 thread_ingest: Option<JoinHandle<()>>,
48}
49
50impl Drop for ReaderControl {
51 fn drop(&mut self) {
52 self.kill();
53 drop(self.take());
54 }
55}
56
57impl ReaderControl {
58 #[allow(dropping_copy_types)]
59 pub fn kill(&mut self) {
60 debug!(
61 "kill reader, components before: {}",
62 self.components_to_stop.load(Ordering::SeqCst)
63 );
64
65 let _ = self.tx_interrupt_cmd.as_ref().map(|tx| tx.send(1));
66 let _ = self.tx_interrupt.send(1);
67
68 while !self.all_stopped() {}
69
70 let opt_thread_reader = self.thread_reader.take();
71 let opt_thread_ingest = self.thread_ingest.take();
72
73 rayon::spawn(move || {
74 drop(opt_thread_reader.and_then(|reader_handle| reader_handle.join().ok()));
75 drop(opt_thread_ingest.and_then(|ingest_handle| ingest_handle.join().ok()));
76
77 #[cfg(feature = "malloc_trim")]
78 #[cfg(target_os = "linux")]
79 #[cfg(target_env = "gnu")]
80 malloc_trim();
81 });
82 }
83
84 pub fn take(&mut self) -> Vec<Arc<dyn SkimItem>> {
85 if let Ok(mut locked) = self.items.write() {
86 return std::mem::take(&mut locked);
87 }
88
89 Vec::new()
90 }
91
92 pub fn transfer_items(&mut self, item_pool: &Arc<ItemPool>) {
93 if let Ok(mut locked) = self.items.try_write() {
94 item_pool.append(&mut locked);
95 }
96 }
97
98 pub fn all_stopped(&self) -> bool {
99 self.components_to_stop.load(Ordering::SeqCst) == 0
100 }
101
102 pub fn is_empty(&self) -> bool {
103 if let Ok(locked) = self.items.read() {
104 return locked.is_empty();
105 }
106
107 false
108 }
109
110 pub fn is_done(&self) -> bool {
111 self.all_stopped() && self.is_empty()
112 }
113}
114
115pub struct Reader {
116 cmd_collector: Rc<RefCell<dyn CommandCollector>>,
117 rx_item: Option<SkimItemReceiver>,
118}
119
120impl Reader {
121 pub fn with_options(options: &SkimOptions) -> Self {
122 Self {
123 cmd_collector: options.cmd_collector.clone(),
124 rx_item: None,
125 }
126 }
127
128 pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
129 self.rx_item = rx_item;
130 self
131 }
132
133 pub fn run(&mut self, cmd: &str) -> ReaderControl {
134 mark_new_run(cmd);
135
136 let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
137 let items_strong = Arc::new(RwLock::new(Vec::with_capacity(ITEMS_INITIAL_CAPACITY)));
138 let items_weak = Arc::downgrade(&items_strong);
139
140 let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
141 self.rx_item.take().map(|rx| (rx, None, None)).unwrap_or_else(|| {
142 let components_to_stop_clone = components_to_stop.clone();
143 let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
144 self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
145 (rx_item, Some(tx_interrupt_cmd), opt_ingest_handle)
146 });
147
148 let components_to_stop_clone = components_to_stop.clone();
149 let (tx_interrupt, thread_reader) = collect_item(components_to_stop_clone, rx_item, items_weak);
150
151 ReaderControl {
152 tx_interrupt,
153 tx_interrupt_cmd,
154 components_to_stop,
155 items: items_strong,
156 thread_reader: Some(thread_reader),
157 thread_ingest: opt_ingest_handle,
158 }
159 }
160}
161
162fn collect_item(
163 components_to_stop: Arc<AtomicUsize>,
164 rx_item: SkimItemReceiver,
165 items_weak: Weak<RwLock<Vec<Arc<dyn SkimItem>>>>,
166) -> (Sender<i32>, JoinHandle<()>) {
167 let (tx_interrupt, rx_interrupt) = unbounded();
168
169 let started = Arc::new(AtomicBool::new(false));
170 let started_clone = started.clone();
171 let thread_reader = thread::spawn(move || {
172 debug!("reader: collect_item start");
173 components_to_stop.fetch_add(1, Ordering::SeqCst);
174 started_clone.store(true, Ordering::SeqCst); let mut sel = Select::new();
177 let item_channel = sel.recv(&rx_item);
178 let interrupt_channel = sel.recv(&rx_interrupt);
179 let mut empty_count = 0usize;
180
181 if let Some(items_strong) = Weak::upgrade(&items_weak) {
182 loop {
183 match sel.ready() {
184 i if i == item_channel && !rx_item.is_empty() => {
185 match items_strong.try_write() {
186 Ok(mut locked) => {
187 rx_item.try_iter().for_each(|mut vec| locked.append(&mut vec));
188 drop(locked);
189
190 if empty_count >= 1 {
192 sleep(SLEEP_SLOW);
194 continue;
195 }
196
197 sleep(SLEEP_FAST);
199 continue;
200 }
201 Err(err) => match err {
202 TryLockError::Poisoned(_) => {
203 eprintln!("ERROR: The lock could not be acquired because another thread failed while holding the lock.");
204 std::process::exit(1)
205 }
206 TryLockError::WouldBlock => {
207 sleep(SLEEP_SLOW);
208 continue;
209 }
210 },
211 }
212 }
213 i if i == item_channel => match rx_item.try_recv() {
214 Err(TryRecvError::Disconnected) => break,
215 _ => {
216 empty_count += 1;
217 continue;
218 }
219 },
220 i if i == interrupt_channel && !rx_item.is_empty() => continue,
221 i if i == interrupt_channel => break,
222 _ => unreachable!(),
223 }
224 }
225 }
226
227 components_to_stop.fetch_sub(1, Ordering::SeqCst);
228 debug!("reader: collect_item stop");
229 });
230
231 while !started.load(Ordering::SeqCst) {
232 }
234
235 (tx_interrupt, thread_reader)
236}