1use crate::{
2 chain::ChainId,
3 cli::{
4 call_expr::{Argument, CallExpr, Span},
5 parse_operator_data,
6 },
7 liveness_analysis::OperatorCallEffect,
8 options::session_setup::SessionSetupData,
9 typeline_error::TypelineError,
10 utils::{indexing_type::IndexingType, string_store::StringStoreEntry},
11};
12
13use super::{
14 errors::OperatorCreationError,
15 nop::OpNop,
16 operator::{
17 OffsetInAggregation, Operator, OperatorDataId, OperatorId,
18 OperatorOffsetInChain, OutputFieldKind,
19 },
20 utils::nested_op::{setup_op_outputs_for_nested_op, NestedOp},
21};
22
23pub struct OpKey {
24 pub key: String,
25 pub key_interned: Option<StringStoreEntry>,
26 pub nested_op: Option<NestedOp>,
27}
28
29pub fn parse_op_key(
30 sess: &mut SessionSetupData,
31 mut arg: Argument,
32) -> Result<Box<dyn Operator>, TypelineError> {
33 let expr = CallExpr::from_argument_mut(&mut arg)?;
34 let op_name = expr.op_name;
35
36 if expr.args.len() < 2 {
37 return Err(OperatorCreationError::new(
38 "missing label argument for operator `key`",
39 expr.span,
40 )
41 .into());
42 }
43
44 let key_span = expr.args[0].span;
45
46 let key = std::mem::take(&mut expr.args[0].value)
47 .into_maybe_text()
48 .ok_or_else(|| expr.error_positional_arg_not_plaintext(key_span))?;
49
50 let key = key
51 .into_text()
52 .ok_or_else(|| expr.error_arg_invalid_utf8(op_name, key_span))?;
53
54 let mut nested_op = None;
55 if let Some(arg) = expr.args.get_mut(1) {
56 let span = arg.span;
57 let op = parse_operator_data(sess, std::mem::take(arg))?;
58 nested_op = Some(NestedOp::Operator(Box::new((op, span))));
59 }
60
61 if expr.args.len() > 2 {
62 return Err(OperatorCreationError::new(
63 "operator key only accepts two arguments`",
64 expr.args[2].span,
65 )
66 .into());
67 }
68
69 Ok(Box::new(OpKey {
70 key,
71 key_interned: None,
72 nested_op,
73 }))
74}
75
76pub fn create_op_key(key: String) -> Box<dyn Operator> {
77 Box::new(OpKey {
78 key,
79 key_interned: None,
80 nested_op: None,
81 })
82}
83
84pub fn create_op_key_with_op(
85 key: String,
86 op: Box<dyn Operator>,
87) -> Box<dyn Operator> {
88 Box::new(OpKey {
89 key,
90 key_interned: None,
91 nested_op: Some(NestedOp::Operator(Box::new((op, Span::Generated)))),
92 })
93}
94
95impl Operator for OpKey {
96 fn setup(
97 &mut self,
98 sess: &mut SessionSetupData,
99 op_data_id: OperatorDataId,
100 chain_id: ChainId,
101 offset_in_chain: OperatorOffsetInChain,
102 span: Span,
103 ) -> Result<OperatorId, TypelineError> {
104 self.key_interned = Some(sess.string_store.intern_cloned(&self.key));
105 let op_id = sess.add_op(op_data_id, chain_id, offset_in_chain, span);
106 let Some(nested_op) = &mut self.nested_op else {
107 return Ok(op_id);
108 };
109 let NestedOp::Operator(op_span) = nested_op else {
110 panic!("operator was already set up");
111 };
112 let (sub_op, span) = *std::mem::replace(
113 op_span,
114 Box::new((Box::new(OpNop::default()), Span::default())),
115 );
116 let sub_op_id = sess.setup_op_from_data(
117 sub_op,
118 sess.curr_chain,
119 OperatorOffsetInChain::AggregationMember(
120 op_id,
121 OffsetInAggregation::ZERO,
122 ),
123 span,
124 )?;
125 self.nested_op = Some(NestedOp::SetUp(sub_op_id));
126
127 Ok(op_id)
128 }
129 fn update_bb_for_op(
130 &self,
131 sess: &crate::context::SessionData,
132 ld: &mut crate::liveness_analysis::LivenessData,
133 _op_id: OperatorId,
134 op_n: super::operator::OffsetInChain,
135 cn: &crate::chain::Chain,
136 bb_id: crate::liveness_analysis::BasicBlockId,
137 ) -> bool {
138 let Some(nested_op) = &self.nested_op else {
139 return false;
140 };
141 let &NestedOp::SetUp(sub_op_id) = nested_op else {
142 unreachable!()
143 };
144 ld.update_bb_for_op(sess, sub_op_id, op_n, cn, bb_id)
145 }
146
147 fn default_name(&self) -> super::operator::OperatorName {
148 "key".into()
149 }
150
151 fn debug_op_name(&self) -> super::operator::OperatorName {
152 let Some(nested) = &self.nested_op else {
153 return self.default_name();
154 };
155 match nested {
156 NestedOp::Operator(nested_op) => {
157 format!(
158 "[ key '{}' {} ]",
159 self.key,
160 nested_op.0.debug_op_name()
161 )
162 }
163 NestedOp::SetUp(op_id) => {
164 format!("[ key '{}' <op {op_id:02}> ]", self.key)
165 }
166 }
167 .into()
168 }
169
170 fn update_variable_liveness(
171 &self,
172 sess: &crate::context::SessionData,
173 ld: &mut crate::liveness_analysis::LivenessData,
174 op_offset_after_last_write: super::operator::OffsetInChain,
175 _op_id: OperatorId,
176 bb_id: crate::liveness_analysis::BasicBlockId,
177 input_field: crate::liveness_analysis::OpOutputIdx,
178 output: &mut crate::liveness_analysis::OperatorLivenessOutput,
179 ) {
180 if let Some(NestedOp::SetUp(nested_op_id)) = self.nested_op {
181 sess.operator_data[sess.op_data_id(nested_op_id)]
182 .update_variable_liveness(
183 sess,
184 ld,
185 op_offset_after_last_write,
186 nested_op_id,
187 bb_id,
188 input_field,
189 output,
190 );
191 }
192
193 let var_id = ld.var_names[&self.key_interned.unwrap()];
194 ld.vars_to_op_outputs_map[var_id] = output.primary_output;
195 ld.op_outputs[output.primary_output]
196 .field_references
197 .push(input_field);
198 if let Some(prev_tgt) = ld.key_aliases_map.insert(var_id, input_field)
199 {
200 ld.apply_var_remapping(var_id, prev_tgt);
201 }
202 output.primary_output = input_field;
203 output.call_effect = OperatorCallEffect::NoCall;
204 }
205
206 fn output_field_kind(
207 &self,
208 sess: &crate::context::SessionData,
209 _op_id: OperatorId,
210 ) -> super::operator::OutputFieldKind {
211 let Some(nested) = &self.nested_op else {
212 return OutputFieldKind::SameAsInput;
213 };
214 let &NestedOp::SetUp(op_id) = nested else {
215 unreachable!()
216 };
217 sess.operator_data[sess.op_data_id(op_id)]
218 .output_field_kind(sess, op_id)
219 }
220
221 fn register_output_var_names(
222 &self,
223 ld: &mut crate::liveness_analysis::LivenessData,
224 sess: &crate::context::SessionData,
225 _op_id: OperatorId,
226 ) {
227 ld.add_var_name(self.key_interned.unwrap());
228 if let Some(NestedOp::SetUp(op_id)) = self.nested_op {
229 sess.operator_data[sess.op_data_id(op_id)]
230 .register_output_var_names(ld, sess, op_id);
231 }
232 }
233
234 fn output_count(
235 &self,
236 sess: &crate::context::SessionData,
237 _op_id: OperatorId,
238 ) -> usize {
239 let Some(nested) = &self.nested_op else {
240 return 0;
241 };
242 let &NestedOp::SetUp(op_id) = nested else {
243 unreachable!()
244 };
245 sess.operator_data[sess.op_data_id(op_id)].output_count(sess, op_id)
246 }
247
248 fn aggregation_member(
249 &self,
250 agg_offset: OffsetInAggregation,
251 ) -> Option<OperatorId> {
252 if agg_offset != OffsetInAggregation::ZERO {
253 return None;
254 }
255 if let Some(NestedOp::SetUp(op_id)) = self.nested_op {
256 return Some(op_id);
257 }
258 None
259 }
260
261 fn on_liveness_computed(
262 &mut self,
263 sess: &mut crate::context::SessionData,
264 ld: &crate::liveness_analysis::LivenessData,
265 _op_id: OperatorId,
266 ) {
267 if let Some(NestedOp::SetUp(op_id)) = self.nested_op {
268 let op_data_id = sess.op_data_id(op_id);
269 let mut op_data = std::mem::replace(
270 &mut sess.operator_data[op_data_id],
271 Box::new(OpNop::default()),
272 );
273 op_data.on_liveness_computed(sess, ld, op_id);
274 sess.operator_data[op_data_id] = op_data;
275 }
276 }
277
278 fn assign_op_outputs(
279 &mut self,
280 sess: &mut crate::context::SessionData,
281 ld: &mut crate::liveness_analysis::LivenessData,
282 op_id: OperatorId,
283 output_count: &mut crate::liveness_analysis::OpOutputIdx,
284 ) {
285 if let Some(nested_op) = &self.nested_op {
286 setup_op_outputs_for_nested_op(
287 nested_op,
288 sess,
289 ld,
290 op_id,
291 output_count,
292 );
293 return;
294 }
295 let op_base = &mut sess.operator_bases[op_id];
296 op_base.outputs_start = *output_count;
297 op_base.outputs_end = op_base.outputs_start;
298 }
299
300 fn has_dynamic_outputs(
301 &self,
302 sess: &crate::context::SessionData,
303 _op_id: OperatorId,
304 ) -> bool {
305 let Some(nested) = &self.nested_op else {
306 return false;
307 };
308 let &NestedOp::SetUp(op_id) = nested else {
309 unreachable!()
310 };
311 sess.operator_data[sess.op_data_id(op_id)]
312 .has_dynamic_outputs(sess, op_id)
313 }
314
315 fn build_transforms<'a>(
316 &'a self,
317 _job: &mut crate::job::Job<'a>,
318 _tf_state: &mut super::transform::TransformState,
319 _op_id: OperatorId,
320 _prebound_outputs: &super::operator::PreboundOutputsMap,
321 ) -> super::operator::TransformInstatiation<'a> {
322 unreachable!()
323 }
324
325 fn as_any(&self) -> Option<&dyn std::any::Any> {
326 Some(self)
327 }
328
329 fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
330 Some(self)
331 }
332}