noir_compute/block/
structure.rs

1//! Types that describe the structure of an execution graph. For debugging purposes
2
3use std::fmt::{Display, Formatter};
4
5use serde::{Deserialize, Serialize};
6
7use crate::block::NextStrategy;
8use crate::operator::{ExchangeData, KeyerFn};
9use crate::scheduler::BlockId;
10
11/// Wrapper type that contains a string representing the type.
12///
13/// The internal representation should not be considered unique nor exact. Its purpose is to be
14/// nice to look at.
15#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
16pub struct DataType(String);
17
18/// The structural information about a block.
19///
20/// This contains the structural information about the block and the operators it contains.
21#[derive(Clone, Debug, Default, Serialize, Deserialize)]
22pub struct BlockStructure {
23    /// The structural information about the operators inside the block.
24    ///
25    /// The first in the list is the start of the block, while the last is the operator that ends
26    /// the block.
27    pub operators: Vec<OperatorStructure>,
28}
29
30/// The structural information about an operator.
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct OperatorStructure {
33    /// The title of the operator.
34    pub title: String,
35    /// The subtitle of the operator.
36    pub subtitle: String,
37    /// The kind of operator: `Operator`, `Source` or `Sink`.
38    pub kind: OperatorKind,
39    /// The list of receivers this operator registers for the block.
40    ///
41    /// This does not contain the receiver from the previous operator in the block.
42    pub receivers: Vec<OperatorReceiver>,
43    /// The list of connections this operator makes.
44    ///
45    /// This does not count the connection to the next operator in the block: that connection is
46    /// added automatically.
47    pub connections: Vec<Connection>,
48    /// The type of the data that comes out of this operator.
49    pub out_type: DataType,
50}
51
52/// The kind of operator: either `Operator`, `Source` or `Sink`.
53///
54/// This value can be used for customizing the look of the operator.
55#[derive(Clone, Debug, Serialize, Deserialize)]
56pub enum OperatorKind {
57    /// This operator is a normal operator.
58    Operator,
59    /// This operator is a sink, i.e. no message will leave it.
60    Sink,
61    /// This operator is a source, i.e. no message will enter it.
62    Source,
63}
64
65/// A receiver registered by an operator.
66///
67/// This receiver tells that an operator will receive some data from the network from the specified
68/// block. Inside a block there cannot be two operators that register a receiver from the same block
69/// id.
70#[derive(Clone, Debug, Serialize, Deserialize)]
71pub struct OperatorReceiver {
72    /// The identifier of the block from which the data is arriving.
73    pub previous_block_id: BlockId,
74    /// The type of the data coming from the channel.
75    pub data_type: DataType,
76}
77
78/// A connection registered by an operator.
79///
80/// This tell that an operator will establish a connection with an external block. That block should
81/// have registered the corresponding receiver. The strategy can be used to customize the look of
82/// this connection.
83#[derive(Clone, Debug, Serialize, Deserialize)]
84pub struct Connection {
85    /// The id of the block that this operator is connecting to.
86    pub to_block_id: BlockId,
87    /// The type of data going in the channel.
88    pub data_type: DataType,
89    /// The strategy used for sending the data in the channel.
90    pub strategy: ConnectionStrategy,
91}
92
93/// The strategy used for sending the data in a channel.
94#[derive(Clone, Debug, Serialize, Deserialize)]
95pub enum ConnectionStrategy {
96    /// The data will sent to the only replica possible. Refer to `NextStrategy::OnlyOne`.
97    OnlyOne,
98    /// A random replica is chosen for sending the data.
99    Random,
100    /// A key-based approach is used for choosing the next replica.
101    GroupBy,
102    /// All the replicas receive all the elements of the stream.
103    All,
104}
105
106impl DataType {
107    /// Construct the `DataType` for the specified type.
108    pub fn of<T: ?Sized>() -> Self {
109        let type_name = std::any::type_name::<T>();
110        Self(DataType::clean_str(type_name))
111    }
112
113    /// Cleanup the type information returned by `std::any::type_name`. This will remove a lot of
114    /// unnecessary information from the type (like the path), keeping just the final name and the
115    /// type parameters.
116    fn clean_str(s: &str) -> String {
117        let mut result = "".to_string();
118        let mut current_ident = "".to_string();
119        for c in s.chars() {
120            // c is part of an identifier
121            if c.is_alphanumeric() {
122                current_ident.push(c);
123                // the current identifier was just a type path
124            } else if c == ':' {
125                current_ident = "".to_string();
126                // other characters like space, <, >
127            } else {
128                // if there was an identifier, this character marks its end
129                if !current_ident.is_empty() {
130                    result += &current_ident;
131                    current_ident = "".to_string();
132                }
133                result.push(c);
134            }
135        }
136        if !current_ident.is_empty() {
137            result += &current_ident;
138        }
139        result
140    }
141}
142
143impl Display for DataType {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        write!(f, "{}", self.0)
146    }
147}
148
149impl BlockStructure {
150    /// Add a new operator inside this block.
151    pub fn add_operator(mut self, operator: OperatorStructure) -> Self {
152        self.operators.push(operator);
153        self
154    }
155}
156
157impl OperatorStructure {
158    /// Crate a new [`OperatorStructure`] with the given title that produces the given elements.
159    pub fn new<Out: ?Sized, S: Into<String>>(title: S) -> Self {
160        Self {
161            title: title.into(),
162            subtitle: "".into(),
163            kind: OperatorKind::Operator,
164            receivers: Default::default(),
165            connections: Default::default(),
166            out_type: DataType::of::<Out>(),
167        }
168    }
169}
170
171impl OperatorReceiver {
172    /// Crate a new [`OperatorReceiver`] with the given type that will receive from the given block.
173    pub fn new<T: ?Sized>(previous_block_id: BlockId) -> Self {
174        Self {
175            previous_block_id,
176            data_type: DataType::of::<T>(),
177        }
178    }
179}
180
181impl Connection {
182    pub(crate) fn new<T: ExchangeData, IndexFn>(
183        to_block_id: BlockId,
184        strategy: &NextStrategy<T, IndexFn>,
185    ) -> Self
186    where
187        IndexFn: KeyerFn<u64, T>,
188    {
189        Self {
190            to_block_id,
191            data_type: DataType::of::<T>(),
192            strategy: strategy.into(),
193        }
194    }
195}
196
197impl<Out: ExchangeData, IndexFn> From<&NextStrategy<Out, IndexFn>> for ConnectionStrategy
198where
199    IndexFn: KeyerFn<u64, Out>,
200{
201    fn from(strategy: &NextStrategy<Out, IndexFn>) -> Self {
202        match strategy {
203            NextStrategy::OnlyOne => ConnectionStrategy::OnlyOne,
204            NextStrategy::Random => ConnectionStrategy::Random,
205            NextStrategy::GroupBy(_, _) => ConnectionStrategy::GroupBy,
206            NextStrategy::All => ConnectionStrategy::All,
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use crate::block::DataType;
214
215    #[test]
216    fn test_data_type_clean() {
217        let dataset = [
218            ("aaa", "aaa"),
219            ("aaa::bbb::ccc", "ccc"),
220            ("(aaa, bbb::ccc::ddd)", "(aaa, ddd)"),
221            ("aaa<bbb>", "aaa<bbb>"),
222            ("aaa::bbb::ccc<ddd::eee>", "ccc<eee>"),
223            ("aaa::bbb<ccc::ddd<eee>>", "bbb<ddd<eee>>"),
224        ];
225        for (input, expected) in &dataset {
226            assert_eq!(&DataType::clean_str(input), expected);
227        }
228    }
229}