use std::cell::RefCell;
use std::thread::JoinHandle;
use crate::block::{Block, BlockStructure};
use crate::network::Coord;
use crate::operator::{Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;
thread_local! {
static COORD: RefCell<Option<Coord>> = RefCell::new(None);
}
pub fn replica_coord() -> Option<Coord> {
COORD.with(|x| *x.borrow())
}
struct CatchPanic<F: FnOnce()> {
primed: bool,
handler: Option<F>,
}
impl<F: FnOnce()> CatchPanic<F> {
fn new(handler: F) -> Self {
Self {
primed: true,
handler: Some(handler),
}
}
fn defuse(&mut self) {
self.primed = false;
}
}
impl<F: FnOnce()> Drop for CatchPanic<F> {
fn drop(&mut self) {
if self.primed {
(self.handler.take().unwrap())();
}
}
}
pub(crate) fn spawn_worker<OperatorChain>(
mut block: Block<OperatorChain>,
metadata: &mut ExecutionMetadata,
) -> (JoinHandle<()>, BlockStructure)
where
OperatorChain: Operator + 'static,
OperatorChain::Out: Send,
{
let coord = metadata.coord;
debug!("starting worker {}: {}", coord, block.to_string(),);
block.operators.setup(metadata);
let structure = block.operators.structure();
let join_handle = std::thread::Builder::new()
.name(format!("block-{}", block.id))
.spawn(move || {
COORD.with(|x| *x.borrow_mut() = Some(coord));
do_work(block, coord)
})
.unwrap();
(join_handle, structure)
}
fn do_work<Op: Operator>(mut block: Block<Op>, coord: Coord) {
let mut catch_panic = CatchPanic::new(|| {
error!("worker {} crashed!", coord);
});
while !matches!(block.operators.next(), StreamElement::Terminate) {
}
catch_panic.defuse();
info!("worker {} completed", coord);
}