use crate::global::mark_new_run;
use crate::options::SkimOptions;
use crate::{SkimItem, SkimItemReceiver};
use crossbeam_channel::TryRecvError;
use crossbeam_channel::{unbounded, Select, Sender};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak};
use std::thread::{self, sleep, JoinHandle};
use std::time::Duration;
#[cfg(feature = "malloc_trim")]
#[cfg(target_os = "linux")]
#[cfg(target_env = "gnu")]
use crate::malloc_trim;
const ITEMS_INITIAL_CAPACITY: usize = 65536;
const SLEEP_FAST: Duration = Duration::from_millis(1);
const SLEEP_SLOW: Duration = Duration::from_millis(20);
pub trait CommandCollector {
fn invoke(
&mut self,
cmd: &str,
components_to_stop: Arc<AtomicUsize>,
) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>);
}
pub struct ReaderControl {
tx_interrupt: Sender<i32>,
tx_interrupt_cmd: Option<Sender<i32>>,
components_to_stop: Arc<AtomicUsize>,
items: Arc<RwLock<Vec<Arc<dyn SkimItem>>>>,
thread_reader: Option<JoinHandle<()>>,
thread_ingest: Option<JoinHandle<()>>,
}
impl Drop for ReaderControl {
fn drop(&mut self) {
self.kill();
drop(self.take());
}
}
impl ReaderControl {
#[allow(dropping_copy_types)]
pub fn kill(&mut self) {
debug!(
"kill reader, components before: {}",
self.components_to_stop.load(Ordering::SeqCst)
);
let _ = self.tx_interrupt_cmd.as_ref().map(|tx| tx.send(1));
let _ = self.tx_interrupt.send(1);
while !self.all_stopped() {}
let opt_thread_reader = self.thread_reader.take();
let opt_thread_ingest = self.thread_ingest.take();
rayon::spawn(move || {
let _ = drop(opt_thread_reader.and_then(|reader_handle| reader_handle.join().ok()));
let _ = drop(opt_thread_ingest.and_then(|ingest_handle| ingest_handle.join().ok()));
#[cfg(feature = "malloc_trim")]
#[cfg(target_os = "linux")]
#[cfg(target_env = "gnu")]
malloc_trim();
});
}
pub fn take(&mut self) -> Vec<Arc<dyn SkimItem>> {
if let Ok(mut locked) = self.items.try_write() {
let locked_capacity = locked.capacity();
return std::mem::replace(&mut locked, Vec::with_capacity(locked_capacity));
}
Vec::new()
}
pub fn all_stopped(&self) -> bool {
self.components_to_stop.load(Ordering::SeqCst) == 0
}
pub fn is_empty(&self) -> bool {
if let Ok(locked) = self.items.read() {
return locked.is_empty();
}
false
}
pub fn is_done(&self) -> bool {
self.all_stopped() && self.is_empty()
}
}
pub struct Reader {
cmd_collector: Rc<RefCell<dyn CommandCollector>>,
rx_item: Option<SkimItemReceiver>,
}
impl Reader {
pub fn with_options(options: &SkimOptions) -> Self {
Self {
cmd_collector: options.cmd_collector.clone(),
rx_item: None,
}
}
pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
self.rx_item = rx_item;
self
}
pub fn run(&mut self, cmd: &str) -> ReaderControl {
mark_new_run(cmd);
let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let items_strong = Arc::new(RwLock::new(Vec::with_capacity(ITEMS_INITIAL_CAPACITY)));
let items_weak = Arc::downgrade(&items_strong);
let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
self.rx_item.take().map(|rx| (rx, None, None)).unwrap_or_else(|| {
let components_to_stop_clone = components_to_stop.clone();
let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
(rx_item, Some(tx_interrupt_cmd), opt_ingest_handle)
});
let components_to_stop_clone = components_to_stop.clone();
let (tx_interrupt, thread_reader) = collect_item(components_to_stop_clone, rx_item, items_weak);
ReaderControl {
tx_interrupt,
tx_interrupt_cmd,
components_to_stop,
items: items_strong,
thread_reader: Some(thread_reader),
thread_ingest: opt_ingest_handle,
}
}
}
fn collect_item(
components_to_stop: Arc<AtomicUsize>,
rx_item: SkimItemReceiver,
items_weak: Weak<RwLock<Vec<Arc<dyn SkimItem>>>>,
) -> (Sender<i32>, JoinHandle<()>) {
let (tx_interrupt, rx_interrupt) = unbounded();
let started = Arc::new(AtomicBool::new(false));
let started_clone = started.clone();
let thread_reader = thread::spawn(move || {
debug!("reader: collect_item start");
components_to_stop.fetch_add(1, Ordering::SeqCst);
started_clone.store(true, Ordering::SeqCst); let mut sel = Select::new();
let item_channel = sel.recv(&rx_item);
let interrupt_channel = sel.recv(&rx_interrupt);
let mut empty_count = 0usize;
if let Some(items_strong) = Weak::upgrade(&items_weak) {
loop {
let rx_item_is_not_empty = !rx_item.is_empty();
match sel.ready() {
i if i == item_channel && rx_item_is_not_empty => {
let Ok(mut locked) = items_strong.try_write() else {
continue;
};
locked.extend(rx_item.try_iter());
drop(locked);
if empty_count > 1 {
sleep(SLEEP_SLOW);
continue;
}
sleep(SLEEP_FAST);
}
i if i == item_channel => match rx_item.try_recv() {
Err(TryRecvError::Disconnected) => break,
_ => {
empty_count += 1;
continue;
}
},
i if i == interrupt_channel && rx_item_is_not_empty => continue,
i if i == interrupt_channel => break,
_ => unreachable!(),
}
}
}
components_to_stop.fetch_sub(1, Ordering::SeqCst);
debug!("reader: collect_item stop");
#[cfg(feature = "malloc_trim")]
#[cfg(target_os = "linux")]
#[cfg(target_env = "gnu")]
malloc_trim();
});
while !started.load(Ordering::SeqCst) {
}
(tx_interrupt, thread_reader)
}