noir_compute/block/
mod.rs

1use std::fmt::{Debug, Display, Formatter};
2use std::hash::{Hash, Hasher};
3use std::sync::Arc;
4
5pub use batcher::BatchMode;
6pub(crate) use batcher::*;
7pub(crate) use graph_generator::*;
8pub(crate) use next_strategy::*;
9pub(crate) use structure::*;
10
11use crate::operator::iteration::IterationStateLock;
12use crate::operator::Operator;
13use crate::scheduler::BlockId;
14use crate::CoordUInt;
15
16mod batcher;
17mod graph_generator;
18mod next_strategy;
19pub mod structure;
20
21/// A chain of operators that will be run inside the same host. The block takes as input elements of
22/// type `In` and produces elements of type `Out`.
23///
24/// The type `In` is used to make sure the blocks are connected following the correct type.
25///
26/// `OperatorChain` is the type of the chain of operators inside the block. It must be an operator
27/// that yields values of type `Out`.
28#[derive(Debug)]
29pub(crate) struct Block<OperatorChain>
30where
31    OperatorChain: Operator,
32{
33    /// The identifier of the block inside the environment.
34    pub(crate) id: BlockId,
35    /// The current chain of operators.
36    pub(crate) operators: OperatorChain,
37    /// The batch mode of this block.
38    pub(crate) batch_mode: BatchMode,
39    /// This block may be inside a number of iteration loops, this stack keeps track of the state
40    /// lock for each of them.
41    pub(crate) iteration_ctx: Vec<Arc<IterationStateLock>>,
42    /// Whether this block has `NextStrategy::OnlyOne`.
43    pub(crate) is_only_one_strategy: bool,
44    /// The set of requirements that the block imposes on the scheduler.
45    pub(crate) scheduling: Scheduling,
46}
47
48impl<OperatorChain> Clone for Block<OperatorChain>
49where
50    OperatorChain: Operator,
51{
52    fn clone(&self) -> Self {
53        Self {
54            id: self.id,
55            operators: self.operators.clone(),
56            batch_mode: self.batch_mode,
57            iteration_ctx: self.iteration_ctx.clone(),
58            is_only_one_strategy: self.is_only_one_strategy,
59            scheduling: self.scheduling.clone(),
60        }
61    }
62}
63
64impl<OperatorChain> Block<OperatorChain>
65where
66    OperatorChain: Operator,
67{
68    /// Add an operator to the end of the block
69    pub fn add_operator<Op2, GetOp>(self, get_operator: GetOp) -> Block<Op2>
70    where
71        Op2: Operator,
72        GetOp: FnOnce(OperatorChain) -> Op2,
73    {
74        Block {
75            id: self.id,
76            operators: get_operator(self.operators),
77            batch_mode: self.batch_mode,
78            iteration_ctx: self.iteration_ctx,
79            is_only_one_strategy: false,
80            scheduling: self.scheduling,
81        }
82    }
83}
84
85#[derive(Clone, Debug, Default)]
86pub(crate) struct Scheduling {
87    /// If some of the operators inside the chain require a limit on the parallelism of this node,
88    /// it is stored here. `None` means that the scheduler is allowed to spawn as many copies of
89    /// this block as it likes.
90    ///
91    /// The value specified is only an upper bound, the scheduler is allowed to spawn less blocks,
92    pub(crate) replication: Replication,
93}
94
95/// Replication factor for a block
96#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
97pub enum Replication {
98    /// The number of replicas is unlimited and will be determined by the launch configuration.
99    #[default]
100    Unlimited,
101    /// The number of replicas is limited to a fixed number.
102    Limited(CoordUInt),
103    /// The number of replicas is limited to one per host.
104    Host,
105    /// The number of replicas is limited to one across all the hosts.
106    One,
107}
108
109impl Replication {
110    pub fn new_unlimited() -> Self {
111        Self::Unlimited
112    }
113
114    pub fn new_limited(size: CoordUInt) -> Self {
115        assert!(size > 0, "Replication limit must be greater than zero!");
116        Self::Limited(size)
117    }
118
119    pub fn new_host() -> Self {
120        Self::Host
121    }
122
123    pub fn new_one() -> Self {
124        Self::One
125    }
126
127    pub fn is_unlimited(&self) -> bool {
128        matches!(self, Replication::Unlimited)
129    }
130    pub fn intersect(&self, rhs: Self) -> Self {
131        match (*self, rhs) {
132            (Replication::One, _) | (_, Replication::One) => Replication::One,
133            (Replication::Host, _) | (_, Replication::Host) => Replication::Host,
134            (Replication::Limited(n), Replication::Limited(m)) => Replication::Limited(n.min(m)),
135            (Replication::Limited(n), _) | (_, Replication::Limited(n)) => Replication::Limited(n),
136            (Replication::Unlimited, Replication::Unlimited) => Replication::Unlimited,
137        }
138    }
139
140    pub(crate) fn clamp(&self, n: CoordUInt) -> CoordUInt {
141        match self {
142            Replication::Unlimited => n,
143            Replication::Limited(q) => n.min(*q),
144            Replication::Host => 1,
145            Replication::One => 1,
146        }
147    }
148}
149
150impl<OperatorChain> Block<OperatorChain>
151where
152    OperatorChain: Operator,
153{
154    pub fn new(
155        id: BlockId,
156        operators: OperatorChain,
157        batch_mode: BatchMode,
158        iteration_ctx: Vec<Arc<IterationStateLock>>,
159        scheduling: Scheduling,
160    ) -> Self {
161        Self {
162            id,
163            operators,
164            batch_mode,
165            iteration_ctx,
166            is_only_one_strategy: false,
167            scheduling,
168        }
169    }
170
171    /// Obtain a vector of opaque items representing the stack of iterations.
172    ///
173    /// An empty vector is returned when the block is outside any iterations, more than one element
174    /// if it's inside nested iterations.
175    pub(crate) fn iteration_ctx(&self) -> Vec<*const ()> {
176        self.iteration_ctx
177            .iter()
178            .map(|s| Arc::as_ptr(s) as *const ())
179            .collect()
180    }
181}
182
183impl<OperatorChain> Display for Block<OperatorChain>
184where
185    OperatorChain: Operator,
186{
187    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
188        write!(f, "{}", self.operators)
189    }
190}
191
192impl Scheduling {
193    /// Limit the maximum parallelism of this block.
194    pub(crate) fn replication(&mut self, replication: Replication) {
195        self.replication = self.replication.intersect(replication);
196    }
197}
198
199/// Hashing function for group by operations
200pub fn group_by_hash<T: Hash>(item: &T) -> u64 {
201    let mut hasher = wyhash::WyHash::with_seed(0x0123456789abcdef);
202    item.hash(&mut hasher);
203    hasher.finish()
204}
205
206/// Hasher used for internal hashmaps that have coordinates as keys
207/// (optimized for small keys)
208pub type CoordHasherBuilder = fxhash::FxBuildHasher;
209
210/// Hasher used for StreamElement keys
211/// (for all around good performance)
212pub type GroupHasherBuilder = core::hash::BuildHasherDefault<wyhash::WyHash>;