use std::{
cmp,
collections::{BinaryHeap, HashMap},
sync::Arc,
time::Instant,
};
use parking_lot::Mutex;
use tracing::{info, span, warn, Level};
use super::shell;
pub fn run(
new_sess: crossbeam_channel::Receiver<(String, Instant)>,
shells: Arc<Mutex<HashMap<String, Box<shell::Session>>>>,
) -> anyhow::Result<()> {
let _s = span!(Level::INFO, "ttl_reaper").entered();
let mut heap = BinaryHeap::new();
let mut gen_ids = HashMap::new();
loop {
while heap.is_empty() {
match new_sess.recv() {
Ok((session_name, reap_at)) => {
let gen_id = gen_ids.entry(session_name.clone()).or_insert(0);
*gen_id += 1;
info!(
"scheduling first sess {}:{} to be reaped at {:?}",
&session_name, *gen_id, reap_at
);
heap.push(Reapable { session_name, gen_id: *gen_id, reap_at });
}
Err(crossbeam_channel::RecvError) => {
info!("bailing due to RecvError in empty heap loop");
return Ok(());
}
}
}
while !heap.is_empty() {
let wake_at = if let Some(reapable) = heap.peek() {
reapable.reap_at
} else {
warn!("no reapable even with heap len {}, should be impossible", heap.len());
continue;
};
crossbeam_channel::select! {
recv(new_sess) -> new_sess_msg => {
match new_sess_msg {
Ok((session_name, reap_at)) => {
let gen_id = gen_ids.entry(session_name.clone()).or_insert(0);
*gen_id += 1;
info!("scheduling {}:{} to be reaped at {:?}",
&session_name, *gen_id, reap_at);
heap.push(Reapable {
session_name,
gen_id: *gen_id,
reap_at,
});
}
Err(crossbeam_channel::RecvError) => {
info!("bailing due to RecvError");
return Ok(())
},
}
}
recv(crossbeam_channel::at(wake_at)) -> _ => {
let reapable = heap.pop()
.expect("there to be an entry in a non-empty heap");
info!("waking up to reap {:?}", reapable);
let current_gen = gen_ids.get(&reapable.session_name)
.copied().unwrap_or(0);
if current_gen != reapable.gen_id {
info!("ignoring {}:{} because current gen is {:?}",
&reapable.session_name, reapable.gen_id, current_gen);
continue;
}
let _s = span!(Level::INFO, "lock(shells)").entered();
let mut shells = shells.lock();
if let Some(sess) = shells.get(&reapable.session_name) {
if let Err(e) = sess.kill() {
warn!("error trying to kill '{}': {:?}",
reapable.session_name, e);
}
} else {
warn!("tried to kill '{}' but it wasn't in the shells tab",
reapable.session_name);
continue;
}
shells.remove(&reapable.session_name);
}
}
}
}
}
#[derive(Debug)]
struct Reapable {
session_name: String,
gen_id: usize,
reap_at: Instant,
}
impl cmp::PartialEq for Reapable {
fn eq(&self, rhs: &Reapable) -> bool {
self.reap_at == rhs.reap_at
}
}
impl cmp::Eq for Reapable {}
impl cmp::PartialOrd for Reapable {
fn partial_cmp(&self, other: &Reapable) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl cmp::Ord for Reapable {
fn cmp(&self, other: &Reapable) -> cmp::Ordering {
other.reap_at.cmp(&self.reap_at)
}
}