1use metamatch::metamatch;
2use num::{BigInt, BigRational};
3use std::{borrow::Cow, io::BufReader, sync::Arc};
4
5use crate::{
6 cli::call_expr::{Argument, CallExpr, ParsedArgValue, Span},
7 job::JobData,
8 options::{
9 chain_settings::{ChainSetting, SettingUseFloatingPointMath},
10 session_setup::SessionSetupData,
11 },
12 record_data::{
13 array::Array,
14 custom_data::CustomDataBox,
15 field_data::FieldValueRepr,
16 field_value::{FieldValue, FieldValueKind, Object},
17 iter_hall::FieldIterId,
18 push_interface::PushInterface,
19 stream_value::{StreamValue, StreamValueData},
20 },
21 typeline_error::TypelineError,
22 tyson::{parse_tyson, TysonParseError},
23 utils::{cow_to_small_str, cow_to_str},
24};
25
26use super::{
27 errors::{OperatorApplicationError, OperatorCreationError},
28 operator::{Operator, OperatorName, TransformInstatiation},
29 transform::{Transform, TransformId, TransformState},
30 utils::maintain_single_value::{maintain_single_value, ExplicitCount},
31};
32
33#[derive(Clone)]
34pub enum Literal {
35 Bytes(Vec<u8>),
36 StreamBytes(Arc<Vec<u8>>),
37 Text(String),
38 StreamString(Arc<String>),
39 Object(Object),
40 Array(Array),
41 Int(i64),
42 BigInt(BigInt),
43 Float(f64),
44 BigRational(BigRational),
45 Null,
46 Undefined,
47 Error(String),
48 StreamError(String),
49 Custom(CustomDataBox),
50 Argument(Argument),
51}
52
53#[derive(Clone)]
54pub struct OpLiteral {
55 pub data: Literal,
56 pub insert_count: Option<usize>,
57}
58
59pub struct TfLiteral<'a> {
60 data: &'a Literal,
61 explicit_count: Option<ExplicitCount>,
62 value_inserted: bool,
63 iter_id: FieldIterId,
64}
65
66impl Operator for OpLiteral {
67 fn default_name(&self) -> OperatorName {
68 match &self.data {
69 Literal::Null => "null",
70 Literal::Undefined => "undefined",
71 Literal::Text(_) => "str",
72 Literal::StreamString(_) => "~str",
73 Literal::Bytes(_) => "bytes",
74 Literal::StreamBytes(_) => "~bytes",
75 Literal::Error(_) => "error",
76 Literal::StreamError(_) => "~error",
77 Literal::Int(_) => "int",
78 Literal::BigInt(_) => "integer",
79 Literal::Float(_) => "float",
80 Literal::BigRational(_) => "rational",
81 Literal::Object(_) => "object",
82 Literal::Array(_) => "array",
83 Literal::Argument(_) => "argument",
84 Literal::Custom(v) => return cow_to_small_str(v.type_name()),
85 }
86 .into()
87 }
88
89 fn output_count(
90 &self,
91 _sess: &crate::context::SessionData,
92 _op_id: super::operator::OperatorId,
93 ) -> usize {
94 1
95 }
96
97 fn has_dynamic_outputs(
98 &self,
99 _sess: &crate::context::SessionData,
100 _op_id: super::operator::OperatorId,
101 ) -> bool {
102 false
103 }
104
105 fn build_transforms<'a>(
106 &'a self,
107 job: &mut crate::job::Job<'a>,
108 tf_state: &mut TransformState,
109 _op_id: super::operator::OperatorId,
110 _prebound_outputs: &super::operator::PreboundOutputsMap,
111 ) -> super::operator::TransformInstatiation<'a> {
112 let actor_id = job.job_data.add_actor_for_tf_state(tf_state);
113 let iter_id = job.job_data.claim_iter_for_tf_state(tf_state);
114 TransformInstatiation::Single(Box::new(TfLiteral {
115 data: &self.data,
116 explicit_count: self
117 .insert_count
118 .map(|count| ExplicitCount { count, actor_id }),
119 value_inserted: false,
120 iter_id,
121 }))
122 }
123
124 fn update_variable_liveness(
125 &self,
126 _sess: &crate::context::SessionData,
127 _ld: &mut crate::liveness_analysis::LivenessData,
128 _op_offset_after_last_write: super::operator::OffsetInChain,
129 _op_id: super::operator::OperatorId,
130 _bb_id: crate::liveness_analysis::BasicBlockId,
131 _input_field: crate::liveness_analysis::OpOutputIdx,
132 output: &mut crate::liveness_analysis::OperatorLivenessOutput,
133 ) {
134 output.flags.may_dup_or_drop = self.insert_count.is_some();
135 output.flags.input_accessed = false;
136 output.flags.non_stringified_input_access = false;
137 }
138}
139
140pub fn parse_op_literal_zst(
141 expr: &CallExpr,
142 literal: Literal,
143) -> Result<Box<dyn Operator>, TypelineError> {
144 let insert_count = parse_insert_count_reject_value(expr)?;
145 Ok(Box::new(OpLiteral {
146 data: literal,
147 insert_count,
148 }))
149}
150pub fn parse_op_str(
151 sess: &mut SessionSetupData,
152 expr: &CallExpr,
153 stream: bool,
154) -> Result<Box<dyn Operator>, TypelineError> {
155 let (insert_count, value, _value_span) =
156 parse_insert_count_and_value_args_str(sess, expr)?;
157 let value_owned = value.into_owned();
158 Ok(Box::new(OpLiteral {
159 data: if stream {
160 Literal::StreamString(Arc::new(value_owned))
161 } else {
162 Literal::Text(value_owned)
163 },
164 insert_count,
165 }))
166}
167
168pub fn parse_op_error(
169 sess: &mut SessionSetupData,
170 expr: &CallExpr,
171 stream: bool,
172) -> Result<Box<dyn Operator>, TypelineError> {
173 let (insert_count, value, _value_span) =
174 parse_insert_count_and_value_args_str(sess, expr)?;
175 let value_owned = value.into_owned();
176 Ok(Box::new(OpLiteral {
177 data: if stream {
178 Literal::StreamError(value_owned)
179 } else {
180 Literal::Error(value_owned)
181 },
182 insert_count,
183 }))
184}
185
186pub fn parse_insert_count_reject_value(
187 expr: &CallExpr,
188) -> Result<Option<usize>, TypelineError> {
189 let mut insert_count = None;
190 for arg in expr.parsed_args_iter() {
191 match arg.value {
192 ParsedArgValue::NamedArg { key, value } => {
193 if key == "i" || key == "insert_count" {
194 insert_count =
195 Some(value.try_cast_int(false).ok_or_else(|| {
196 expr.error_arg_invalid_int(key, arg.span)
197 })? as usize);
198 continue;
199 }
200 return Err(expr
201 .error_named_arg_unsupported(key, arg.span)
202 .into());
203 }
204 ParsedArgValue::Flag(flag) => {
205 return Err(expr
206 .error_flag_unsupported(flag, arg.span)
207 .into());
208 }
209 ParsedArgValue::PositionalArg { .. } => {
210 return Err(expr
211 .error_positional_args_unsupported(arg.span)
212 .into())
213 }
214 }
215 }
216 Ok(insert_count)
217}
218
219pub fn parse_insert_count_and_value_args<'a>(
220 sess: &mut SessionSetupData,
221 expr: &'a CallExpr<'a>,
222) -> Result<(Option<usize>, Cow<'a, [u8]>, Span), TypelineError> {
223 let mut insert_count = None;
224 let mut value = None;
225 for arg in expr.parsed_args_iter_with_bounded_positionals(1, 1) {
226 let arg = arg?;
227 match arg.value {
228 ParsedArgValue::Flag(flag) => {
229 return Err(expr
230 .error_flag_unsupported(flag, arg.span)
231 .into());
232 }
233 ParsedArgValue::NamedArg { key, value } => {
234 if key == "i" || key == "insert_count" {
235 insert_count =
237 Some(value.try_cast_int(false).ok_or_else(|| {
238 expr.error_arg_invalid_int(key, arg.span)
239 })? as usize);
240 continue;
241 }
242 return Err(expr
243 .error_named_arg_unsupported(key, arg.span)
244 .into());
245 }
246 ParsedArgValue::PositionalArg { arg, .. } => {
247 value = Some((arg.stringify(sess).into_bytes_cow(), arg.span));
249 }
250 }
251 }
252 let (value, value_span) = value.unwrap();
253 Ok((insert_count, value, value_span))
254}
255
256pub fn parse_insert_count_and_value_args_str<'a>(
257 sess: &mut SessionSetupData,
258 expr: &'a CallExpr<'a>,
259) -> Result<(Option<usize>, Cow<'a, str>, Span), TypelineError> {
260 let (insert_count, value, value_span) =
261 parse_insert_count_and_value_args(sess, expr)?;
262 let value_str = cow_to_str(value)
263 .map_err(|_| expr.error_positional_arg_invalid_utf8(value_span))?;
264
265 Ok((insert_count, value_str, value_span))
266}
267
268pub fn parse_op_int(
269 sess: &mut SessionSetupData,
270 expr: &CallExpr,
271) -> Result<Box<dyn Operator>, TypelineError> {
272 let (insert_count, value, value_span) =
273 parse_insert_count_and_value_args_str(sess, expr)?;
274
275 let data = if let Ok(i) = str::parse::<i64>(&value) {
276 Literal::Int(i)
277 } else {
278 let Ok(big_int) = str::parse::<BigInt>(&value) else {
279 return Err(expr
280 .error_positional_arg_invalid_int(value_span)
281 .into());
282 };
283 Literal::BigInt(big_int)
284 };
285 Ok(Box::new(OpLiteral { data, insert_count }))
286}
287pub fn parse_op_bytes(
288 sess: &mut SessionSetupData,
289 arg: &mut Argument,
290 stream: bool,
291) -> Result<Box<dyn Operator>, TypelineError> {
292 let call_expr = CallExpr::from_argument_mut(arg)?;
293 let (insert_count, value, _value_span) =
294 parse_insert_count_and_value_args(sess, &call_expr)?;
295 Ok(Box::new(OpLiteral {
296 data: if stream {
297 Literal::StreamBytes(Arc::new(value.into_owned()))
298 } else {
299 Literal::Bytes(value.into_owned())
300 },
301 insert_count,
302 }))
303}
304pub fn field_value_to_literal(v: FieldValue) -> Literal {
305 metamatch!(match v {
306 #[expand(REP in [Null, Undefined])]
307 FieldValue::REP => Literal::REP,
308
309 #[expand(REP in [Int, Float, Bytes, Text, Array, Custom])]
310 FieldValue::REP(v) => Literal::REP(v),
311
312 #[expand(REP in [BigInt, BigRational, Argument, Object])]
313 FieldValue::REP(v) => Literal::REP(*v),
314
315 FieldValue::Error(v) => Literal::Error(v.message().to_owned()),
316
317 #[expand_pattern(REP in [OpDecl, StreamValueId, FieldReference, SlicedFieldReference])]
318 FieldValue::REP(_) => {
319 panic!("{} is not a valid literal", v.kind().to_str())
320 }
321 })
322}
323pub fn parse_op_tyson(
324 sess: &mut SessionSetupData,
325 expr: &CallExpr,
326 affinity: FieldValueKind,
327) -> Result<Box<dyn Operator>, TypelineError> {
328 let (insert_count, value, value_span) =
329 parse_insert_count_and_value_args(sess, expr)?;
330 let value = parse_tyson(
331 BufReader::new(&*value),
332 use_fpm(&mut Some(sess)),
333 Some(&sess.extensions),
334 )
335 .map_err(|e| {
336 OperatorCreationError::new_s(
337 format!(
338 "failed to parse value as {}: {}",
339 affinity.to_str(),
340 match e {
341 TysonParseError::Io(e) => e.to_string(),
342 TysonParseError::InvalidSyntax { kind, .. } =>
343 kind.to_string(),
344 }
345 ),
346 value_span,
347 )
348 })?;
349 let lit = field_value_to_literal(value);
350 Ok(Box::new(OpLiteral {
351 data: lit,
352 insert_count,
353 }))
354}
355
356pub fn use_fpm(sess: &mut Option<&mut SessionSetupData>) -> bool {
357 sess.as_deref_mut()
358 .map(|sess| {
359 sess.get_chain_setting::<SettingUseFloatingPointMath>(
360 sess.curr_chain,
361 )
362 })
363 .unwrap_or(SettingUseFloatingPointMath::DEFAULT)
364}
365
366pub fn build_op_tyson_value(
367 mut sess: Option<&mut SessionSetupData>,
368 value: &[u8],
369 value_span: Span,
370 insert_count: Option<usize>,
371) -> Result<Box<dyn Operator>, TypelineError> {
372 let value = parse_tyson(
373 value,
374 use_fpm(&mut sess),
375 sess.as_ref().map(|sess| &*sess.extensions),
376 )
377 .map_err(|e| {
378 OperatorCreationError::new_s(format!("invalid tyson: {e}"), value_span)
379 })?;
380 let lit = field_value_to_literal(value);
381 Ok(Box::new(OpLiteral {
382 data: lit,
383 insert_count,
384 }))
385}
386
387pub fn parse_op_tyson_value(
388 sess: &mut SessionSetupData,
389 expr: &CallExpr,
390) -> Result<Box<dyn Operator>, TypelineError> {
391 let (insert_count, value, value_span) =
392 parse_insert_count_and_value_args(sess, expr)?;
393 build_op_tyson_value(Some(sess), &value, value_span, insert_count)
394}
395
396pub fn create_op_literal_with_insert_count(
397 data: Literal,
398 insert_count: Option<usize>,
399) -> Box<dyn Operator> {
400 Box::new(OpLiteral { data, insert_count })
401}
402
403pub fn create_op_literal(data: Literal) -> Box<dyn Operator> {
404 create_op_literal_with_insert_count(data, None)
405}
406pub fn create_op_literal_n(
407 data: Literal,
408 insert_count: usize,
409) -> Box<dyn Operator> {
410 create_op_literal_with_insert_count(data, Some(insert_count))
411}
412
413pub fn create_op_error(str: &str) -> Box<dyn Operator> {
414 create_op_literal(Literal::Error(str.to_owned()))
415}
416pub fn create_op_str(str: &str) -> Box<dyn Operator> {
417 create_op_literal(Literal::Text(str.to_owned()))
418}
419pub fn create_op_stream_bytes(v: &[u8]) -> Box<dyn Operator> {
420 create_op_literal(Literal::StreamBytes(Arc::new(v.to_owned())))
421}
422pub fn create_op_stream_str(v: &str) -> Box<dyn Operator> {
423 create_op_literal(Literal::StreamString(Arc::new(v.to_owned())))
424}
425pub fn create_op_bytes(v: &[u8]) -> Box<dyn Operator> {
426 create_op_literal(Literal::Bytes(v.to_owned()))
427}
428pub fn create_op_stream_error(str: &str) -> Box<dyn Operator> {
429 create_op_literal(Literal::StreamError(str.to_owned()))
430}
431pub fn create_op_int(v: i64) -> Box<dyn Operator> {
432 create_op_literal(Literal::Int(v))
433}
434pub fn create_op_int_big(v: BigInt) -> Box<dyn Operator> {
435 create_op_literal(Literal::BigInt(v))
436}
437pub fn create_op_null() -> Box<dyn Operator> {
438 create_op_literal(Literal::Null)
439}
440pub fn create_op_undefined() -> Box<dyn Operator> {
441 create_op_literal(Literal::Undefined)
442}
443pub fn create_op_v(str: &str) -> Result<Box<dyn Operator>, TypelineError> {
444 build_op_tyson_value(None, str.as_bytes(), Span::Generated, None)
445}
446
447pub fn create_op_error_n(str: &str, insert_count: usize) -> Box<dyn Operator> {
448 create_op_literal_n(Literal::Error(str.to_owned()), insert_count)
449}
450pub fn create_op_str_n(str: &str, insert_count: usize) -> Box<dyn Operator> {
451 create_op_literal_n(Literal::Text(str.to_owned()), insert_count)
452}
453pub fn create_op_stream_bytes_n(
454 v: &[u8],
455 insert_count: usize,
456) -> Box<dyn Operator> {
457 create_op_literal_n(
458 Literal::StreamBytes(Arc::new(v.to_owned())),
459 insert_count,
460 )
461}
462pub fn create_op_stream_str_n(
463 v: &str,
464 insert_count: usize,
465) -> Box<dyn Operator> {
466 create_op_literal_n(
467 Literal::StreamString(Arc::new(v.to_owned())),
468 insert_count,
469 )
470}
471pub fn create_op_bytes_n(v: &[u8], insert_count: usize) -> Box<dyn Operator> {
472 create_op_literal_n(Literal::Bytes(v.to_owned()), insert_count)
473}
474pub fn create_op_stream_error_n(
475 str: &str,
476 insert_count: usize,
477) -> Box<dyn Operator> {
478 create_op_literal_n(Literal::StreamError(str.to_owned()), insert_count)
479}
480pub fn create_op_int_n(v: i64, insert_count: usize) -> Box<dyn Operator> {
481 create_op_literal_n(Literal::Int(v), insert_count)
482}
483pub fn create_op_null_n(insert_count: usize) -> Box<dyn Operator> {
484 create_op_literal_n(Literal::Null, insert_count)
485}
486pub fn create_op_success_n(insert_count: usize) -> Box<dyn Operator> {
487 create_op_literal_n(Literal::Undefined, insert_count)
488}
489pub fn create_op_v_n(
490 str: &str,
491 insert_count: usize,
492) -> Result<Box<dyn Operator>, TypelineError> {
493 build_op_tyson_value(
494 None,
495 str.as_bytes(),
496 Span::Generated,
497 Some(insert_count),
498 )
499}
500
501pub fn insert_value(
502 jd: &mut JobData,
503 tf_id: TransformId,
504 lit: &mut TfLiteral,
505) {
506 let tf = &jd.tf_mgr.transforms[tf_id];
507 let op_id = tf.op_id.unwrap();
508 let of_id = jd.tf_mgr.prepare_output_field(
509 &mut jd.field_mgr,
510 &mut jd.match_set_mgr,
511 tf_id,
512 );
513 let mut output_field = jd.field_mgr.fields[of_id].borrow_mut();
514 metamatch!(match lit.data {
515 #[expand(REP in [Null, Undefined])]
516 Literal::REP => {
517 output_field
518 .iter_hall
519 .push_zst(FieldValueRepr::REP, 1, true)
520 }
521
522 #[expand((REP, PUSH_FN, VAL) in [
523 (Int, push_int, *v),
524 (Float, push_float, *v),
525 (Bytes, push_bytes, v),
526 (Text, push_str, v),
527 (Object, push_object, v.clone()),
528 (Array, push_array, v.clone()),
529 (BigInt, push_big_int, v.clone()),
530 (BigRational, push_big_rational, v.clone()),
531 (Custom, push_custom, v.clone()),
532 (Argument, push_fixed_size_type, v.clone()),
533 ])]
534 Literal::REP(v) => {
535 output_field.iter_hall.PUSH_FN(VAL, 1, true, true)
536 }
537
538 #[expand((LIT, DATA) in [(StreamString, Text), (StreamBytes, Bytes)])]
539 Literal::LIT(ss) => {
540 let sv_id = jd.sv_mgr.claim_stream_value(
541 StreamValue::from_data_done(StreamValueData::DATA {
542 data: ss.clone(),
543 range: 0..ss.len(),
544 }),
545 );
546 output_field
547 .iter_hall
548 .push_stream_value_id(sv_id, 1, true, false);
549 }
550
551 Literal::Error(e) => output_field.iter_hall.push_error(
552 OperatorApplicationError::new_s(e.clone(), op_id),
553 1,
554 true,
555 false,
556 ),
557
558 Literal::StreamError(ss) => {
559 let sv_id = jd.sv_mgr.claim_stream_value(StreamValue {
560 error: Some(Arc::new(OperatorApplicationError::new_s(
561 ss.clone(),
562 op_id,
563 ))),
564 done: true,
565 ..Default::default()
566 });
567 output_field
568 .iter_hall
569 .push_stream_value_id(sv_id, 1, true, false);
570 }
571 })
572}
573
574impl<'a> Transform<'a> for TfLiteral<'a> {
575 fn update(&mut self, jd: &mut JobData<'a>, tf_id: TransformId) {
576 if !self.value_inserted {
577 self.value_inserted = true;
578 insert_value(jd, tf_id, self);
579 }
580 let (batch_size, ps) = maintain_single_value(
581 jd,
582 tf_id,
583 self.explicit_count.as_ref(),
584 self.iter_id,
585 );
586 jd.tf_mgr.submit_batch_ready_for_more(tf_id, batch_size, ps);
587 }
588}