noir_compute/
environment.rs1use parking_lot::Mutex;
2use std::any::TypeId;
3use std::sync::Arc;
4use std::thread::available_parallelism;
5
6use crate::block::{Block, Scheduling};
7use crate::config::RuntimeConfig;
8use crate::operator::iteration::IterationStateLock;
9use crate::operator::source::Source;
10use crate::operator::{Data, Operator};
11#[cfg(feature = "ssh")]
12use crate::scheduler::{BlockId, Scheduler};
13use crate::stream::Stream;
14use crate::{BatchMode, CoordUInt};
15
16pub(crate) struct StreamContextInner {
21 pub(crate) config: RuntimeConfig,
23 block_count: CoordUInt,
25 scheduler: Option<Scheduler>,
28}
29
30pub struct StreamContext {
44 inner: Arc<Mutex<StreamContextInner>>,
46}
47
48impl Default for StreamContext {
49 fn default() -> Self {
50 Self::new(RuntimeConfig::local(
51 available_parallelism().map(|q| q.get()).unwrap_or(1) as u64,
52 ))
53 }
54}
55
56impl StreamContext {
57 pub fn new(config: RuntimeConfig) -> Self {
59 debug!("new environment");
60 StreamContext {
61 inner: Arc::new(Mutex::new(StreamContextInner::new(config))),
62 }
63 }
64
65 pub fn stream<S>(&self, source: S) -> Stream<S>
67 where
68 S: Source + Send + 'static,
69 {
70 let mut inner = self.inner.lock();
71 if let RuntimeConfig::Remote(remote) = &inner.config {
72 assert!(remote.host_id.is_some(), "remote config must be started using RuntimeConfig::spawn_remote_workers(). (Or initialize `host_id` correctly)");
73 }
74
75 let block = inner.new_block(source, Default::default(), Default::default());
76 Stream::new(self.inner.clone(), block)
77 }
78
79 #[cfg(feature = "async-tokio")]
81 pub async fn execute(self) {
82 let mut env = self.inner.lock();
83 info!("starting execution ({} blocks)", env.block_count);
84 let scheduler = env.scheduler.take().unwrap();
85 let block_count = env.block_count;
86 drop(env);
87 scheduler.start(block_count).await;
88 info!("finished execution");
89 }
90
91 pub fn execute_blocking(self) {
96 let mut env = self.inner.lock();
97 info!("starting execution ({} blocks)", env.block_count);
98 let scheduler = env.scheduler.take().unwrap();
99 scheduler.start_blocking(env.block_count);
100 info!("finished execution");
101 }
102
103 pub fn parallelism(&self) -> CoordUInt {
105 match &self.inner.lock().config {
106 RuntimeConfig::Local(local) => local.num_cores,
107 RuntimeConfig::Remote(remote) => remote.hosts.iter().map(|h| h.num_cores).sum(),
108 }
109 }
110}
111
112impl StreamContextInner {
113 fn new(config: RuntimeConfig) -> Self {
114 Self {
115 config: config.clone(),
116 block_count: 0,
117 scheduler: Some(Scheduler::new(config)),
118 }
119 }
120
121 pub(crate) fn new_block<S: Source>(
122 &mut self,
123 source: S,
124 batch_mode: BatchMode,
125 iteration_ctx: Vec<Arc<IterationStateLock>>,
126 ) -> Block<S> {
127 let new_id = self.new_block_id();
128 let replication = source.replication();
129 let scheduling = Scheduling { replication };
130 info!("new block (b{new_id:02}), replication {replication:?}",);
131 Block::new(new_id, source, batch_mode, iteration_ctx, scheduling)
132 }
133
134 pub(crate) fn close_block<Out: Data, Op: Operator<Out = Out> + 'static>(
135 &mut self,
136 block: Block<Op>,
137 ) -> BlockId {
138 let id = block.id;
139 let scheduler = self.scheduler_mut();
140 scheduler.schedule_block(block);
141 id
142 }
143
144 pub(crate) fn connect_blocks<Out: 'static>(&mut self, from: BlockId, to: BlockId) {
145 let scheduler = self.scheduler_mut();
146 scheduler.connect_blocks(from, to, TypeId::of::<Out>());
147 }
148
149 pub(crate) fn clone_block<Op: Operator>(&mut self, block: &Block<Op>) -> Block<Op> {
150 let mut new_block = block.clone();
151 new_block.id = self.new_block_id();
152
153 let prev_nodes = self.scheduler_mut().prev_blocks(block.id).unwrap();
154 for (prev_node, typ) in prev_nodes.into_iter() {
155 self.scheduler_mut()
156 .connect_blocks(prev_node, new_block.id, typ);
157 }
158
159 new_block
160 }
161
162 fn new_block_id(&mut self) -> BlockId {
164 let new_id = self.block_count;
165 self.block_count += 1;
166 debug!("new block_id (b{new_id:02})");
167 new_id
168 }
169
170 pub(crate) fn scheduler_mut(&mut self) -> &mut Scheduler {
173 self.scheduler
174 .as_mut()
175 .expect("The environment has already been started, cannot access the scheduler")
176 }
177}