1use crate::global::mark_new_run;
2use crate::item::ItemPool;
3use crate::options::SkimOptions;
7use crate::{SkimItem, SkimItemReceiver};
8use crossbeam_channel::TryRecvError;
9use crossbeam_channel::{unbounded, Select, Sender};
10use std::cell::RefCell;
11use std::rc::Rc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::sync::{Arc, RwLock, TryLockError, Weak};
14use std::thread::{self, sleep, JoinHandle};
15use std::time::Duration;
16
17#[cfg(feature = "malloc_trim")]
18#[cfg(target_os = "linux")]
19#[cfg(target_env = "gnu")]
20use crate::malloc_trim;
21
22pub const ITEMS_INITIAL_CAPACITY: usize = 65_536;
23const SLEEP_FAST: Duration = Duration::from_millis(1);
24const SLEEP_SLOW: Duration = Duration::from_millis(10);
25
26pub trait CommandCollector {
27 fn invoke(
35 &mut self,
36 cmd: &str,
37 components_to_stop: Arc<AtomicUsize>,
38 ) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>);
39}
40
41pub struct ReaderControl {
42 tx_interrupt: Sender<i32>,
43 tx_interrupt_cmd: Option<Sender<i32>>,
44 components_to_stop: Arc<AtomicUsize>,
45 items: Arc<RwLock<Vec<Arc<dyn SkimItem>>>>,
46 thread_reader: Option<JoinHandle<()>>,
47 thread_ingest: Option<JoinHandle<()>>,
48}
49
50impl Drop for ReaderControl {
51 fn drop(&mut self) {
52 self.kill();
53 drop(self.take());
54 }
55}
56
57impl ReaderControl {
58 #[allow(dropping_copy_types)]
59 pub fn kill(&mut self) {
60 debug!(
61 "kill reader, components before: {}",
62 self.components_to_stop.load(Ordering::SeqCst)
63 );
64
65 let _ = self.tx_interrupt_cmd.as_ref().map(|tx| tx.send(1));
66 let _ = self.tx_interrupt.send(1);
67
68 while !self.all_stopped() {}
69
70 let opt_thread_reader = self.thread_reader.take();
71 let opt_thread_ingest = self.thread_ingest.take();
72
73 rayon::spawn(move || {
74 let _ = drop(opt_thread_reader.and_then(|reader_handle| reader_handle.join().ok()));
75 let _ = drop(opt_thread_ingest.and_then(|ingest_handle| ingest_handle.join().ok()));
76 #[cfg(feature = "malloc_trim")]
77 #[cfg(target_os = "linux")]
78 #[cfg(target_env = "gnu")]
79 malloc_trim();
80 });
81 }
82
83 pub fn take(&mut self) -> Vec<Arc<dyn SkimItem>> {
84 if let Ok(mut locked) = self.items.write() {
85 return std::mem::take(&mut locked);
86 }
87
88 Vec::new()
89 }
90
91 pub fn transfer_items(&mut self, item_pool: &Arc<ItemPool>) {
92 if let Ok(mut locked) = self.items.try_write() {
93 item_pool.append(&mut locked);
94 }
95 }
96
97 pub fn all_stopped(&self) -> bool {
98 self.components_to_stop.load(Ordering::SeqCst) == 0
99 }
100
101 pub fn is_empty(&self) -> bool {
102 if let Ok(locked) = self.items.read() {
103 return locked.is_empty();
104 }
105
106 false
107 }
108
109 pub fn is_done(&self) -> bool {
110 self.all_stopped() && self.is_empty()
111 }
112}
113
114pub struct Reader {
115 cmd_collector: Rc<RefCell<dyn CommandCollector>>,
116 rx_item: Option<SkimItemReceiver>,
117}
118
119impl Reader {
120 pub fn with_options(options: &SkimOptions) -> Self {
121 Self {
122 cmd_collector: options.cmd_collector.clone(),
123 rx_item: None,
124 }
125 }
126
127 pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
128 self.rx_item = rx_item;
129 self
130 }
131
132 pub fn run(&mut self, cmd: &str) -> ReaderControl {
133 mark_new_run(cmd);
134
135 let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
136 let items_strong = Arc::new(RwLock::new(Vec::with_capacity(ITEMS_INITIAL_CAPACITY)));
137 let items_weak = Arc::downgrade(&items_strong);
138
139 let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
140 self.rx_item.take().map(|rx| (rx, None, None)).unwrap_or_else(|| {
141 let components_to_stop_clone = components_to_stop.clone();
142 let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
143 self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
144 (rx_item, Some(tx_interrupt_cmd), opt_ingest_handle)
145 });
146
147 let components_to_stop_clone = components_to_stop.clone();
148 let (tx_interrupt, thread_reader) = collect_item(components_to_stop_clone, rx_item, items_weak);
149
150 ReaderControl {
151 tx_interrupt,
152 tx_interrupt_cmd,
153 components_to_stop,
154 items: items_strong,
155 thread_reader: Some(thread_reader),
156 thread_ingest: opt_ingest_handle,
157 }
158 }
159}
160
161fn collect_item(
162 components_to_stop: Arc<AtomicUsize>,
163 rx_item: SkimItemReceiver,
164 items_weak: Weak<RwLock<Vec<Arc<dyn SkimItem>>>>,
165) -> (Sender<i32>, JoinHandle<()>) {
166 let (tx_interrupt, rx_interrupt) = unbounded();
167
168 let started = Arc::new(AtomicBool::new(false));
169 let started_clone = started.clone();
170 let thread_reader = thread::spawn(move || {
171 debug!("reader: collect_item start");
172 components_to_stop.fetch_add(1, Ordering::SeqCst);
173 started_clone.store(true, Ordering::SeqCst); let mut sel = Select::new();
176 let item_channel = sel.recv(&rx_item);
177 let interrupt_channel = sel.recv(&rx_interrupt);
178 let mut empty_count = 0usize;
179
180 if let Some(items_strong) = Weak::upgrade(&items_weak) {
181 loop {
182 match sel.ready() {
183 i if i == item_channel && !rx_item.is_empty() => {
184 match items_strong.try_write() {
185 Ok(mut locked) => {
186 locked.extend(rx_item.try_iter());
187 drop(locked);
188
189 if empty_count >= 1 {
191 sleep(SLEEP_SLOW);
193 continue;
194 }
195
196 sleep(SLEEP_FAST);
198 continue;
199 }
200 Err(err) => match err {
201 TryLockError::Poisoned(_) => {
202 eprintln!("ERROR: The lock could not be acquired because another thread failed while holding the lock.");
203 std::process::exit(1)
204 }
205 TryLockError::WouldBlock => {
206 sleep(SLEEP_SLOW);
207 continue;
208 }
209 },
210 }
211 }
212 i if i == item_channel => match rx_item.try_recv() {
213 Err(TryRecvError::Disconnected) => break,
214 _ => {
215 empty_count += 1;
216 continue;
217 }
218 },
219 i if i == interrupt_channel && !rx_item.is_empty() => continue,
220 i if i == interrupt_channel => break,
221 _ => unreachable!(),
222 }
223 }
224 }
225
226 components_to_stop.fetch_sub(1, Ordering::SeqCst);
227 debug!("reader: collect_item stop");
228 });
229
230 while !started.load(Ordering::SeqCst) {
231 }
233
234 (tx_interrupt, thread_reader)
235}