use crate::global::mark_new_run;
use crate::options::SkimOptions;
use crate::spinlock::SpinLock;
use crate::{SkimItem, SkimItemReceiver};
use crossbeam_channel::{bounded, Select, Sender};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::thread::{self, JoinHandle};
const CHANNEL_SIZE: usize = 1024;
const ITEMS_INITIAL_CAPACITY: usize = 65536;
pub trait CommandCollector {
fn invoke(
&mut self,
cmd: &str,
components_to_stop: Arc<AtomicUsize>,
) -> (SkimItemReceiver, Sender<i32>, Option<JoinHandle<()>>);
}
pub struct ReaderControl {
tx_interrupt: Sender<i32>,
tx_interrupt_cmd: Option<Sender<i32>>,
components_to_stop: Arc<AtomicUsize>,
items: Arc<SpinLock<Vec<Arc<dyn SkimItem>>>>,
thread_reader: Option<JoinHandle<()>>,
thread_ingest: Option<JoinHandle<()>>,
}
impl Drop for ReaderControl {
fn drop(&mut self) {
self.kill();
drop(self.take());
#[cfg(target_os = "linux")]
#[cfg(target_env = "gnu")]
unsafe {
let _ = libc::malloc_trim(0);
};
}
}
impl ReaderControl {
pub fn kill(&mut self) {
debug!(
"kill reader, components before: {}",
self.components_to_stop.load(Ordering::SeqCst)
);
let _ = self.tx_interrupt_cmd.as_ref().map(|tx| tx.send(1));
let _ = self.tx_interrupt.send(1);
if let Some(handle) = self.thread_reader.take() {
let _ = handle.join();
}
if let Some(handle) = self.thread_ingest.take() {
let _ = handle.join();
}
while self.components_to_stop.load(Ordering::SeqCst) != 0 {}
}
pub fn take(&self) -> Vec<Arc<dyn SkimItem>> {
let mut locked = self.items.lock();
std::mem::take(&mut *locked)
}
pub fn is_done(&self) -> bool {
let locked = self.items.lock();
self.components_to_stop.load(Ordering::SeqCst) == 0 && locked.is_empty()
}
}
pub struct Reader {
cmd_collector: Rc<RefCell<dyn CommandCollector>>,
rx_item: Option<SkimItemReceiver>,
}
impl Reader {
pub fn with_options(options: &SkimOptions) -> Self {
Self {
cmd_collector: options.cmd_collector.clone(),
rx_item: None,
}
}
pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
self.rx_item = rx_item;
self
}
pub fn run(&mut self, cmd: &str) -> ReaderControl {
mark_new_run(cmd);
let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let items_strong = Arc::new(SpinLock::new(Vec::with_capacity(ITEMS_INITIAL_CAPACITY)));
let items_weak = Arc::downgrade(&items_strong);
let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
self.rx_item.take().map(|rx| (rx, None, None)).unwrap_or_else(|| {
let components_to_stop_clone = components_to_stop.clone();
let (rx_item, tx_interrupt_cmd, opt_ingest_handle) =
self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
(rx_item, Some(tx_interrupt_cmd), opt_ingest_handle)
});
let components_to_stop_clone = components_to_stop.clone();
let (tx_interrupt, thread_reader) = collect_item(components_to_stop_clone, rx_item, items_weak);
ReaderControl {
tx_interrupt,
tx_interrupt_cmd,
components_to_stop,
items: items_strong,
thread_reader: Some(thread_reader),
thread_ingest: opt_ingest_handle,
}
}
}
fn collect_item(
components_to_stop: Arc<AtomicUsize>,
rx_item: SkimItemReceiver,
items_weak: Weak<SpinLock<Vec<Arc<dyn SkimItem>>>>,
) -> (Sender<i32>, JoinHandle<()>) {
let (tx_interrupt, rx_interrupt) = bounded(CHANNEL_SIZE);
let started = Arc::new(AtomicBool::new(false));
let started_clone = started.clone();
let thread_reader = thread::spawn(move || {
debug!("reader: collect_item start");
components_to_stop.fetch_add(1, Ordering::SeqCst);
started_clone.store(true, Ordering::SeqCst);
let mut sel = Select::new();
let item_channel = sel.recv(&rx_item);
let interrupt_channel = sel.recv(&rx_interrupt);
if let Some(items_strong) = items_weak.upgrade() {
'outer: loop {
match sel.ready() {
i if i == item_channel => {
let mut locked = items_strong.lock();
'inner: for _ in 0..128 {
match rx_item.try_recv() {
Ok(item) => locked.push(item),
Err(err) => {
if err.is_disconnected() {
break 'outer;
}
if err.is_empty() {
break 'inner;
}
}
}
}
}
i if i == interrupt_channel => break 'outer,
_ => unreachable!(),
}
}
}
components_to_stop.fetch_sub(1, Ordering::SeqCst);
debug!("reader: collect_item stop");
});
while !started.load(Ordering::SeqCst) {
}
(tx_interrupt, thread_reader)
}