use crate::item::ItemPool;
use crate::options::SkimOptions;
use crate::prelude::{Sender, SkimItemReader};
use crate::spinlock::SpinLock;
use crate::{SkimItem, SkimItemReceiver};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub trait CommandCollector {
fn invoke(
&mut self,
cmd: &str,
components_to_stop: Arc<AtomicUsize>,
) -> (SkimItemReceiver, crate::prelude::Sender<i32>);
}
pub struct ReaderControl {
tx_interrupt: Sender<i32>,
tx_interrupt_cmd: Option<Sender<i32>>,
components_to_stop: Arc<AtomicUsize>,
items: Arc<SpinLock<Vec<Arc<dyn SkimItem>>>>,
}
impl ReaderControl {
pub fn kill(&mut self) {
debug!(
"kill reader, components before: {}",
self.components_to_stop.load(Ordering::SeqCst)
);
let _ = self.tx_interrupt_cmd.clone().map(|tx| tx.send(1));
let _ = self.tx_interrupt.send(1);
while self.components_to_stop.load(Ordering::SeqCst) != 0 {}
}
pub fn take(&self) -> Vec<Arc<dyn SkimItem>> {
let mut items = self.items.lock();
let mut ret = Vec::with_capacity(items.len());
ret.append(&mut items);
ret
}
pub fn is_done(&self) -> bool {
let items = self.items.lock();
self.components_to_stop.load(Ordering::SeqCst) == 0 && items.is_empty()
}
}
impl Drop for ReaderControl {
fn drop(&mut self) {
self.kill();
}
}
pub struct Reader {
cmd_collector: Rc<RefCell<dyn CommandCollector>>,
rx_item: Option<SkimItemReceiver>,
}
impl Reader {
pub fn from_options(options: &SkimOptions) -> Self {
Self {
cmd_collector: options.cmd_collector.clone(),
rx_item: None,
}
}
pub fn source(mut self, rx_item: Option<SkimItemReceiver>) -> Self {
self.rx_item = rx_item;
self
}
pub fn run(&mut self, app_tx: Sender<Vec<Arc<dyn SkimItem>>>, cmd: &str) -> ReaderControl {
let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let items = Arc::new(SpinLock::new(Vec::new()));
let (rx_item, tx_interrupt_cmd) = self.rx_item.take().map(|rx| (rx, None)).unwrap_or_else(|| {
let components_to_stop_clone = components_to_stop.clone();
let (rx_item, tx_interrupt_cmd) = self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
(rx_item, Some(tx_interrupt_cmd))
});
let components_to_stop_clone = components_to_stop.clone();
let tx_interrupt = collect_items(components_to_stop_clone, rx_item, move |items| _ = app_tx.send(items));
ReaderControl {
tx_interrupt,
tx_interrupt_cmd,
components_to_stop,
items,
}
}
pub fn collect(&mut self, item_pool: Arc<ItemPool>, cmd: &str) -> ReaderControl {
let components_to_stop: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let items = Arc::new(SpinLock::new(Vec::new()));
let (rx_item, tx_interrupt_cmd) = self.rx_item.take().map(|rx| (rx, None)).unwrap_or_else(|| {
let components_to_stop_clone = components_to_stop.clone();
let (rx_item, tx_interrupt_cmd) = self.cmd_collector.borrow_mut().invoke(cmd, components_to_stop_clone);
(rx_item, Some(tx_interrupt_cmd))
});
let components_to_stop_clone = components_to_stop.clone();
let tx_interrupt = collect_items(components_to_stop_clone, rx_item, move |items| {
item_pool.append(items);
});
debug!("collect: started ({components_to_stop:?} components)");
ReaderControl {
tx_interrupt,
tx_interrupt_cmd,
components_to_stop,
items,
}
}
}
impl Default for Reader {
fn default() -> Self {
Self {
cmd_collector: Rc::new(RefCell::new(SkimItemReader::default())) as Rc<RefCell<dyn CommandCollector>>,
rx_item: Default::default(),
}
}
}
fn collect_items<F>(components_to_stop: Arc<AtomicUsize>, rx_item: SkimItemReceiver, callback: F) -> Sender<i32>
where
F: Fn(Vec<Arc<dyn SkimItem>>) + Send + 'static,
{
let (tx_interrupt, rx_interrupt) = crate::prelude::bounded(8);
let started = Arc::new(AtomicBool::new(false));
let started_clone = started.clone();
std::thread::spawn(move || {
debug!("collect_item start");
components_to_stop.fetch_add(1, Ordering::SeqCst);
started_clone.store(true, Ordering::SeqCst);
loop {
if let Ok(Some(msg)) = rx_interrupt.try_recv() {
debug!("interrupt: {msg}");
break;
}
match rx_item.try_recv() {
Ok(Some(items)) => {
trace!("collect_item: got {} items", items.len());
callback(items);
}
Ok(None) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
Err(_) => {
break;
}
}
}
components_to_stop.fetch_sub(1, Ordering::SeqCst);
debug!("collect_item stop");
});
while !started.load(Ordering::SeqCst) {
}
tx_interrupt
}