use parking_lot::Mutex;
use std::any::TypeId;
use std::sync::Arc;
use std::thread::available_parallelism;
use crate::block::{Block, Scheduling};
use crate::config::RuntimeConfig;
use crate::operator::iteration::IterationStateLock;
use crate::operator::source::Source;
use crate::operator::{Data, Operator};
#[cfg(feature = "ssh")]
use crate::scheduler::{BlockId, Scheduler};
use crate::stream::Stream;
use crate::{BatchMode, CoordUInt};
pub(crate) struct StreamContextInner {
pub(crate) config: RuntimeConfig,
block_count: CoordUInt,
scheduler: Option<Scheduler>,
}
pub struct StreamContext {
inner: Arc<Mutex<StreamContextInner>>,
}
impl Default for StreamContext {
fn default() -> Self {
Self::new(RuntimeConfig::local(
available_parallelism().map(|q| q.get()).unwrap_or(1) as u64,
))
}
}
impl StreamContext {
pub fn new(config: RuntimeConfig) -> Self {
debug!("new environment");
StreamContext {
inner: Arc::new(Mutex::new(StreamContextInner::new(config))),
}
}
pub fn stream<S>(&self, source: S) -> Stream<S>
where
S: Source + Send + 'static,
{
let mut inner = self.inner.lock();
if let RuntimeConfig::Remote(remote) = &inner.config {
assert!(remote.host_id.is_some(), "remote config must be started using RuntimeConfig::spawn_remote_workers(). (Or initialize `host_id` correctly)");
}
let block = inner.new_block(source, Default::default(), Default::default());
Stream::new(self.inner.clone(), block)
}
#[cfg(feature = "async-tokio")]
pub async fn execute(self) {
let mut env = self.inner.lock();
info!("starting execution ({} blocks)", env.block_count);
let scheduler = env.scheduler.take().unwrap();
let block_count = env.block_count;
drop(env);
scheduler.start(block_count).await;
info!("finished execution");
}
pub fn execute_blocking(self) {
let mut env = self.inner.lock();
info!("starting execution ({} blocks)", env.block_count);
let scheduler = env.scheduler.take().unwrap();
scheduler.start_blocking(env.block_count);
info!("finished execution");
}
pub fn parallelism(&self) -> CoordUInt {
match &self.inner.lock().config {
RuntimeConfig::Local(local) => local.num_cores,
RuntimeConfig::Remote(remote) => remote.hosts.iter().map(|h| h.num_cores).sum(),
}
}
}
impl StreamContextInner {
fn new(config: RuntimeConfig) -> Self {
Self {
config: config.clone(),
block_count: 0,
scheduler: Some(Scheduler::new(config)),
}
}
pub(crate) fn new_block<S: Source>(
&mut self,
source: S,
batch_mode: BatchMode,
iteration_ctx: Vec<Arc<IterationStateLock>>,
) -> Block<S> {
let new_id = self.new_block_id();
let replication = source.replication();
let scheduling = Scheduling { replication };
info!("new block (b{new_id:02}), replication {replication:?}",);
Block::new(new_id, source, batch_mode, iteration_ctx, scheduling)
}
pub(crate) fn close_block<Out: Data, Op: Operator<Out = Out> + 'static>(
&mut self,
block: Block<Op>,
) -> BlockId {
let id = block.id;
let scheduler = self.scheduler_mut();
scheduler.schedule_block(block);
id
}
pub(crate) fn connect_blocks<Out: 'static>(&mut self, from: BlockId, to: BlockId) {
let scheduler = self.scheduler_mut();
scheduler.connect_blocks(from, to, TypeId::of::<Out>());
}
pub(crate) fn clone_block<Op: Operator>(&mut self, block: &Block<Op>) -> Block<Op> {
let mut new_block = block.clone();
new_block.id = self.new_block_id();
let prev_nodes = self.scheduler_mut().prev_blocks(block.id).unwrap();
for (prev_node, typ) in prev_nodes.into_iter() {
self.scheduler_mut()
.connect_blocks(prev_node, new_block.id, typ);
}
new_block
}
fn new_block_id(&mut self) -> BlockId {
let new_id = self.block_count;
self.block_count += 1;
debug!("new block_id (b{new_id:02})");
new_id
}
pub(crate) fn scheduler_mut(&mut self) -> &mut Scheduler {
self.scheduler
.as_mut()
.expect("The environment has already been started, cannot access the scheduler")
}
}