noir_compute/
environment.rs

1use parking_lot::Mutex;
2use std::any::TypeId;
3use std::sync::Arc;
4use std::thread::available_parallelism;
5
6use crate::block::{Block, Scheduling};
7use crate::config::RuntimeConfig;
8use crate::operator::iteration::IterationStateLock;
9use crate::operator::source::Source;
10use crate::operator::{Data, Operator};
11#[cfg(feature = "ssh")]
12use crate::scheduler::{BlockId, Scheduler};
13use crate::stream::Stream;
14use crate::{BatchMode, CoordUInt};
15
16// static LAST_REMOTE_CONFIG: Lazy<Mutex<Option<RemoteConfig>>> = Lazy::new(|| Mutex::new(None));
17
18/// Actual content of the StreamContext. This is stored inside a `Rc` and it's shared among all
19/// the blocks.
20pub(crate) struct StreamContextInner {
21    /// The configuration of the environment.
22    pub(crate) config: RuntimeConfig,
23    /// The number of blocks in the job graph, it's used to assign new ids to the blocks.
24    block_count: CoordUInt,
25    /// The scheduler that will start the computation. It's an option because it will be moved out
26    /// of this struct when the computation starts.
27    scheduler: Option<Scheduler>,
28}
29
30/// Streaming environment from which it's possible to register new streams and start the
31/// computation.
32///
33/// This is the entrypoint for the library: construct an environment providing an
34/// [`RuntimeConfig`], then you can ask new streams providing the source from where to read from.
35///
36/// If you want to use a distributed environment (i.e. using remote workers) you have to spawn them
37/// using [`spawn_remote_workers`](StreamContext::spawn_remote_workers) before asking for some stream.
38///
39/// When all the stream have been registered you have to call [`execute`](StreamContext::execute_blocking) that will consume the
40/// environment and start the computation. This function will return when the computation ends.
41///
42/// TODO: example usage
43pub struct StreamContext {
44    /// Reference to the actual content of the environment.
45    inner: Arc<Mutex<StreamContextInner>>,
46}
47
48impl Default for StreamContext {
49    fn default() -> Self {
50        Self::new(RuntimeConfig::local(
51            available_parallelism().map(|q| q.get()).unwrap_or(1) as u64,
52        ))
53    }
54}
55
56impl StreamContext {
57    /// Construct a new environment from the config.
58    pub fn new(config: RuntimeConfig) -> Self {
59        debug!("new environment");
60        StreamContext {
61            inner: Arc::new(Mutex::new(StreamContextInner::new(config))),
62        }
63    }
64
65    /// Construct a new stream bound to this environment starting with the specified source.
66    pub fn stream<S>(&self, source: S) -> Stream<S>
67    where
68        S: Source + Send + 'static,
69    {
70        let mut inner = self.inner.lock();
71        if let RuntimeConfig::Remote(remote) = &inner.config {
72            assert!(remote.host_id.is_some(), "remote config must be started using RuntimeConfig::spawn_remote_workers(). (Or initialize `host_id` correctly)");
73        }
74
75        let block = inner.new_block(source, Default::default(), Default::default());
76        Stream::new(self.inner.clone(), block)
77    }
78
79    /// Start the computation. Await on the returned future to actually start the computation.
80    #[cfg(feature = "async-tokio")]
81    pub async fn execute(self) {
82        let mut env = self.inner.lock();
83        info!("starting execution ({} blocks)", env.block_count);
84        let scheduler = env.scheduler.take().unwrap();
85        let block_count = env.block_count;
86        drop(env);
87        scheduler.start(block_count).await;
88        info!("finished execution");
89    }
90
91    /// Start the computation. Blocks until the computation is complete.
92    ///
93    /// Execute on a thread or use the async version [`execute`]
94    /// for non-blocking alternatives
95    pub fn execute_blocking(self) {
96        let mut env = self.inner.lock();
97        info!("starting execution ({} blocks)", env.block_count);
98        let scheduler = env.scheduler.take().unwrap();
99        scheduler.start_blocking(env.block_count);
100        info!("finished execution");
101    }
102
103    /// Get the total number of processing cores in the cluster.
104    pub fn parallelism(&self) -> CoordUInt {
105        match &self.inner.lock().config {
106            RuntimeConfig::Local(local) => local.num_cores,
107            RuntimeConfig::Remote(remote) => remote.hosts.iter().map(|h| h.num_cores).sum(),
108        }
109    }
110}
111
112impl StreamContextInner {
113    fn new(config: RuntimeConfig) -> Self {
114        Self {
115            config: config.clone(),
116            block_count: 0,
117            scheduler: Some(Scheduler::new(config)),
118        }
119    }
120
121    pub(crate) fn new_block<S: Source>(
122        &mut self,
123        source: S,
124        batch_mode: BatchMode,
125        iteration_ctx: Vec<Arc<IterationStateLock>>,
126    ) -> Block<S> {
127        let new_id = self.new_block_id();
128        let replication = source.replication();
129        let scheduling = Scheduling { replication };
130        info!("new block (b{new_id:02}), replication {replication:?}",);
131        Block::new(new_id, source, batch_mode, iteration_ctx, scheduling)
132    }
133
134    pub(crate) fn close_block<Out: Data, Op: Operator<Out = Out> + 'static>(
135        &mut self,
136        block: Block<Op>,
137    ) -> BlockId {
138        let id = block.id;
139        let scheduler = self.scheduler_mut();
140        scheduler.schedule_block(block);
141        id
142    }
143
144    pub(crate) fn connect_blocks<Out: 'static>(&mut self, from: BlockId, to: BlockId) {
145        let scheduler = self.scheduler_mut();
146        scheduler.connect_blocks(from, to, TypeId::of::<Out>());
147    }
148
149    pub(crate) fn clone_block<Op: Operator>(&mut self, block: &Block<Op>) -> Block<Op> {
150        let mut new_block = block.clone();
151        new_block.id = self.new_block_id();
152
153        let prev_nodes = self.scheduler_mut().prev_blocks(block.id).unwrap();
154        for (prev_node, typ) in prev_nodes.into_iter() {
155            self.scheduler_mut()
156                .connect_blocks(prev_node, new_block.id, typ);
157        }
158
159        new_block
160    }
161
162    /// Allocate a new BlockId inside the environment.
163    fn new_block_id(&mut self) -> BlockId {
164        let new_id = self.block_count;
165        self.block_count += 1;
166        debug!("new block_id (b{new_id:02})");
167        new_id
168    }
169
170    /// Return a mutable reference to the scheduler. This method will panic if the computation has
171    /// already been started.
172    pub(crate) fn scheduler_mut(&mut self) -> &mut Scheduler {
173        self.scheduler
174            .as_mut()
175            .expect("The environment has already been started, cannot access the scheduler")
176    }
177}