typeline_core/operators/utils/
basic_generator.rs1use crate::{
2 job::JobData,
3 liveness_analysis::OperatorLivenessOutput,
4 operators::{
5 operator::{Operator, OperatorName, TransformInstatiation},
6 transform::{Transform, TransformId},
7 },
8 record_data::{
9 action_buffer::ActorId, group_track::GroupTrackIterRef,
10 iter_hall::FieldIterId,
11 },
12};
13
14use super::generator_transform_update::{
15 handle_generator_transform_update, GeneratorMode, GeneratorSequence,
16};
17
18pub trait BasicGenerator: Send + Sync {
19 type Gen: GeneratorSequence + Send;
20 fn default_name(&self) -> OperatorName;
21 fn debug_op_name(&self) -> crate::operators::operator::OperatorName {
22 self.default_name()
23 }
24 fn generator_mode(&self) -> GeneratorMode;
25 fn create_generator(&self) -> Self::Gen;
26 fn update_variable_liveness(
27 &self,
28 _sess: &crate::context::SessionData,
29 _ld: &mut crate::liveness_analysis::LivenessData,
30 _op_offset_after_last_write: crate::operators::operator::OffsetInChain,
31 _op_id: crate::operators::operator::OperatorId,
32 _bb_id: crate::liveness_analysis::BasicBlockId,
33 _input_field: crate::liveness_analysis::OpOutputIdx,
34 output: &mut OperatorLivenessOutput,
35 ) {
36 output.flags.non_stringified_input_access = false;
37 output.flags.may_dup_or_drop = true;
38 }
39
40 fn on_liveness_computed(
41 &mut self,
42 _sess: &mut crate::context::SessionData,
43 _ld: &crate::liveness_analysis::LivenessData,
44 _op_id: crate::operators::operator::OperatorId,
45 ) {
46 }
47}
48
49pub struct BasicGeneratorWrapper<T> {
50 base: T,
51}
52
53pub struct BasicGeneratorTransform<'a, Op: BasicGenerator> {
54 op: &'a BasicGeneratorWrapper<Op>,
55 input_iter: FieldIterId,
56 group_track_iter: GroupTrackIterRef,
57 actor_id: ActorId,
58 generator: Op::Gen,
59}
60
61impl<'a, Op: BasicGenerator> Transform<'a>
62 for BasicGeneratorTransform<'a, Op>
63{
64 fn update(&mut self, jd: &mut JobData<'a>, tf_id: TransformId) {
65 handle_generator_transform_update(
66 jd,
67 tf_id,
68 self.input_iter,
69 self.actor_id,
70 self.group_track_iter,
71 &mut self.generator,
72 self.op.base.generator_mode(),
73 )
74 }
75}
76
77impl<T: BasicGenerator> Operator for BasicGeneratorWrapper<T> {
78 fn default_name(&self) -> OperatorName {
79 BasicGenerator::default_name(&self.base)
80 }
81
82 fn debug_op_name(&self) -> crate::operators::operator::OperatorName {
83 BasicGenerator::default_name(&self.base)
84 }
85
86 fn output_count(
87 &self,
88 _sess: &crate::context::SessionData,
89 _op_id: crate::operators::operator::OperatorId,
90 ) -> usize {
91 1
92 }
93
94 fn has_dynamic_outputs(
95 &self,
96 _sess: &crate::context::SessionData,
97 _op_id: crate::operators::operator::OperatorId,
98 ) -> bool {
99 false
100 }
101
102 fn build_transforms<'a>(
103 &'a self,
104 job: &mut crate::job::Job<'a>,
105 tf_state: &mut crate::operators::transform::TransformState,
106 _op_id: crate::operators::operator::OperatorId,
107 _prebound_outputs: &crate::operators::operator::PreboundOutputsMap,
108 ) -> TransformInstatiation<'a> {
109 let jd = &mut job.job_data;
110 let actor_id = jd.add_actor_for_tf_state(tf_state);
111 let input_iter = jd.claim_iter_for_tf_state(tf_state);
112 let group_track_iter =
113 jd.claim_group_track_iter_for_tf_state(tf_state);
114 TransformInstatiation::Single(Box::new(BasicGeneratorTransform {
115 op: self,
116 input_iter,
117 group_track_iter,
118 actor_id,
119 generator: self.base.create_generator(),
120 }))
121 }
122
123 fn output_field_kind(
124 &self,
125 _sess: &crate::context::SessionData,
126 _op_id: crate::operators::operator::OperatorId,
127 ) -> crate::operators::operator::OutputFieldKind {
128 crate::operators::operator::OutputFieldKind::Unique
129 }
130
131 fn update_variable_liveness(
132 &self,
133 sess: &crate::context::SessionData,
134 ld: &mut crate::liveness_analysis::LivenessData,
135 op_offset_after_last_write: crate::operators::operator::OffsetInChain,
136 op_id: crate::operators::operator::OperatorId,
137 bb_id: crate::liveness_analysis::BasicBlockId,
138 input_field: crate::liveness_analysis::OpOutputIdx,
139 output: &mut OperatorLivenessOutput,
140 ) {
141 self.base.update_variable_liveness(
142 sess,
143 ld,
144 op_offset_after_last_write,
145 op_id,
146 bb_id,
147 input_field,
148 output,
149 )
150 }
151
152 fn on_liveness_computed(
153 &mut self,
154 sess: &mut crate::context::SessionData,
155 ld: &crate::liveness_analysis::LivenessData,
156 op_id: crate::operators::operator::OperatorId,
157 ) {
158 self.base.on_liveness_computed(sess, ld, op_id)
159 }
160}
161
162impl<T: BasicGenerator + 'static> BasicGeneratorWrapper<T> {
163 pub fn new(base: T) -> Self {
164 Self { base }
165 }
166 pub fn new_operator(base: T) -> Box<dyn Operator> {
167 Box::new(Self::new(base))
168 }
169}