1use crate::ast::*;
2use serde_json::Value;
3use std::collections::{HashMap, HashSet};
4use tracing;
5
6pub type Register = usize;
7
8fn stop_field_path(target_path: &str) -> String {
9 format!("__stop:{}", target_path)
10}
11
12#[derive(Debug, Clone)]
13pub enum OpCode {
14 LoadEventField {
15 path: FieldPath,
16 dest: Register,
17 default: Option<Value>,
18 },
19 LoadConstant {
20 value: Value,
21 dest: Register,
22 },
23 CopyRegister {
24 source: Register,
25 dest: Register,
26 },
27 CopyRegisterIfNull {
29 source: Register,
30 dest: Register,
31 },
32 GetEventType {
33 dest: Register,
34 },
35 CreateObject {
36 dest: Register,
37 },
38 SetField {
39 object: Register,
40 path: String,
41 value: Register,
42 },
43 SetFields {
44 object: Register,
45 fields: Vec<(String, Register)>,
46 },
47 GetField {
48 object: Register,
49 path: String,
50 dest: Register,
51 },
52 ReadOrInitState {
53 state_id: u32,
54 key: Register,
55 default: Value,
56 dest: Register,
57 },
58 UpdateState {
59 state_id: u32,
60 key: Register,
61 value: Register,
62 },
63 AppendToArray {
64 object: Register,
65 path: String,
66 value: Register,
67 },
68 GetCurrentTimestamp {
69 dest: Register,
70 },
71 CreateEvent {
72 dest: Register,
73 event_value: Register,
74 },
75 CreateCapture {
76 dest: Register,
77 capture_value: Register,
78 },
79 Transform {
80 source: Register,
81 dest: Register,
82 transformation: Transformation,
83 },
84 EmitMutation {
85 entity_name: String,
86 key: Register,
87 state: Register,
88 },
89 SetFieldIfNull {
90 object: Register,
91 path: String,
92 value: Register,
93 },
94 SetFieldMax {
95 object: Register,
96 path: String,
97 value: Register,
98 },
99 UpdateTemporalIndex {
100 state_id: u32,
101 index_name: String,
102 lookup_value: Register,
103 primary_key: Register,
104 timestamp: Register,
105 },
106 LookupTemporalIndex {
107 state_id: u32,
108 index_name: String,
109 lookup_value: Register,
110 timestamp: Register,
111 dest: Register,
112 },
113 UpdateLookupIndex {
114 state_id: u32,
115 index_name: String,
116 lookup_value: Register,
117 primary_key: Register,
118 },
119 LookupIndex {
120 state_id: u32,
121 index_name: String,
122 lookup_value: Register,
123 dest: Register,
124 },
125 SetFieldSum {
127 object: Register,
128 path: String,
129 value: Register,
130 },
131 SetFieldIncrement {
133 object: Register,
134 path: String,
135 },
136 SetFieldMin {
138 object: Register,
139 path: String,
140 value: Register,
141 },
142 SetFieldWhen {
145 object: Register,
146 path: String,
147 value: Register,
148 when_instruction: String,
149 entity_name: String,
150 key_reg: Register,
151 condition_field: Option<FieldPath>,
152 condition_op: Option<ComparisonOp>,
153 condition_value: Option<Value>,
154 },
155 SetFieldUnlessStopped {
158 object: Register,
159 path: String,
160 value: Register,
161 stop_field: String,
162 stop_instruction: String,
163 entity_name: String,
164 key_reg: Register,
165 },
166 AddToUniqueSet {
169 state_id: u32,
170 set_name: String,
171 value: Register,
172 count_object: Register,
173 count_path: String,
174 },
175 ConditionalSetField {
177 object: Register,
178 path: String,
179 value: Register,
180 condition_field: FieldPath,
181 condition_op: ComparisonOp,
182 condition_value: Value,
183 },
184 ConditionalIncrement {
186 object: Register,
187 path: String,
188 condition_field: FieldPath,
189 condition_op: ComparisonOp,
190 condition_value: Value,
191 },
192 EvaluateComputedFields {
195 state: Register,
196 computed_paths: Vec<String>,
197 },
198 QueueResolver {
200 state_id: u32,
201 entity_name: String,
202 resolver: ResolverType,
203 input_path: Option<String>,
204 input_value: Option<Value>,
205 url_template: Option<Vec<UrlTemplatePart>>,
206 strategy: ResolveStrategy,
207 extracts: Vec<ResolverExtractSpec>,
208 condition: Option<ResolverCondition>,
209 schedule_at: Option<String>,
210 state: Register,
211 key: Register,
212 },
213 UpdatePdaReverseLookup {
216 state_id: u32,
217 lookup_name: String,
218 pda_address: Register,
219 primary_key: Register,
220 },
221}
222
223pub struct EntityBytecode {
224 pub state_id: u32,
225 pub handlers: HashMap<String, Vec<OpCode>>,
226 pub entity_name: String,
227 pub when_events: HashSet<String>,
228 pub non_emitted_fields: HashSet<String>,
229 pub computed_paths: Vec<String>,
230 #[allow(clippy::type_complexity)]
233 pub computed_fields_evaluator: Option<
234 Box<
235 dyn Fn(
236 &mut Value,
237 Option<u64>,
238 i64,
239 ) -> std::result::Result<(), Box<dyn std::error::Error>>
240 + Send
241 + Sync,
242 >,
243 >,
244}
245
246impl std::fmt::Debug for EntityBytecode {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 f.debug_struct("EntityBytecode")
249 .field("state_id", &self.state_id)
250 .field("handlers", &self.handlers)
251 .field("entity_name", &self.entity_name)
252 .field("when_events", &self.when_events)
253 .field("non_emitted_fields", &self.non_emitted_fields)
254 .field("computed_paths", &self.computed_paths)
255 .field(
256 "computed_fields_evaluator",
257 &self.computed_fields_evaluator.is_some(),
258 )
259 .finish()
260 }
261}
262
263#[derive(Debug)]
264pub struct MultiEntityBytecode {
265 pub entities: HashMap<String, EntityBytecode>,
266 pub event_routing: HashMap<String, Vec<String>>,
267 pub when_events: HashSet<String>,
268 pub proto_router: crate::proto_router::ProtoRouter,
269}
270
271impl MultiEntityBytecode {
272 pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
273 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
274 let entity_bytecode = compiler.compile_entity();
275
276 let mut entities = HashMap::new();
277 let mut event_routing = HashMap::new();
278 let mut when_events = HashSet::new();
279
280 for event_type in entity_bytecode.handlers.keys() {
281 event_routing
282 .entry(event_type.clone())
283 .or_insert_with(Vec::new)
284 .push(entity_name.clone());
285 }
286
287 when_events.extend(entity_bytecode.when_events.iter().cloned());
288
289 entities.insert(entity_name, entity_bytecode);
290
291 MultiEntityBytecode {
292 entities,
293 event_routing,
294 when_events,
295 proto_router: crate::proto_router::ProtoRouter::new(),
296 }
297 }
298
299 pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
300 let entities = HashMap::new();
301 let event_routing = HashMap::new();
302 let when_events = HashSet::new();
303
304 if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
305 panic!("from_entities requires type information - use builder pattern instead");
306 }
307
308 MultiEntityBytecode {
309 entities,
310 event_routing,
311 when_events,
312 proto_router: crate::proto_router::ProtoRouter::new(),
313 }
314 }
315
316 #[allow(clippy::new_ret_no_self)]
317 pub fn new() -> MultiEntityBytecodeBuilder {
318 MultiEntityBytecodeBuilder {
319 entities: HashMap::new(),
320 event_routing: HashMap::new(),
321 when_events: HashSet::new(),
322 proto_router: crate::proto_router::ProtoRouter::new(),
323 }
324 }
325}
326
327pub struct MultiEntityBytecodeBuilder {
328 entities: HashMap<String, EntityBytecode>,
329 event_routing: HashMap<String, Vec<String>>,
330 when_events: HashSet<String>,
331 proto_router: crate::proto_router::ProtoRouter,
332}
333
334impl MultiEntityBytecodeBuilder {
335 pub fn add_entity<S>(
336 self,
337 entity_name: String,
338 spec: TypedStreamSpec<S>,
339 state_id: u32,
340 ) -> Self {
341 self.add_entity_with_evaluator(
342 entity_name,
343 spec,
344 state_id,
345 None::<
346 fn(
347 &mut Value,
348 Option<u64>,
349 i64,
350 ) -> std::result::Result<(), Box<dyn std::error::Error>>,
351 >,
352 )
353 }
354
355 pub fn add_entity_with_evaluator<S, F>(
356 mut self,
357 entity_name: String,
358 spec: TypedStreamSpec<S>,
359 state_id: u32,
360 evaluator: Option<F>,
361 ) -> Self
362 where
363 F: Fn(&mut Value, Option<u64>, i64) -> std::result::Result<(), Box<dyn std::error::Error>>
364 + Send
365 + Sync
366 + 'static,
367 {
368 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
369 let mut entity_bytecode = compiler.compile_entity();
370
371 if let Some(eval) = evaluator {
373 entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
374 }
375
376 for event_type in entity_bytecode.handlers.keys() {
377 self.event_routing
378 .entry(event_type.clone())
379 .or_default()
380 .push(entity_name.clone());
381 }
382
383 self.when_events
384 .extend(entity_bytecode.when_events.iter().cloned());
385
386 self.entities.insert(entity_name, entity_bytecode);
387 self
388 }
389
390 pub fn build(self) -> MultiEntityBytecode {
391 MultiEntityBytecode {
392 entities: self.entities,
393 event_routing: self.event_routing,
394 when_events: self.when_events,
395 proto_router: self.proto_router,
396 }
397 }
398}
399
400pub struct TypedCompiler<S> {
401 pub spec: TypedStreamSpec<S>,
402 entity_name: String,
403 state_id: u32,
404}
405
406impl<S> TypedCompiler<S> {
407 pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
408 TypedCompiler {
409 spec,
410 entity_name,
411 state_id: 0,
412 }
413 }
414
415 pub fn with_state_id(mut self, state_id: u32) -> Self {
416 self.state_id = state_id;
417 self
418 }
419
420 pub fn compile(&self) -> MultiEntityBytecode {
421 let entity_bytecode = self.compile_entity();
422
423 let mut entities = HashMap::new();
424 let mut event_routing = HashMap::new();
425 let mut when_events = HashSet::new();
426
427 for event_type in entity_bytecode.handlers.keys() {
428 event_routing
429 .entry(event_type.clone())
430 .or_insert_with(Vec::new)
431 .push(self.entity_name.clone());
432 }
433
434 when_events.extend(entity_bytecode.when_events.iter().cloned());
435
436 entities.insert(self.entity_name.clone(), entity_bytecode);
437
438 MultiEntityBytecode {
439 entities,
440 event_routing,
441 when_events,
442 proto_router: crate::proto_router::ProtoRouter::new(),
443 }
444 }
445
446 fn compile_entity(&self) -> EntityBytecode {
447 let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
448 let mut when_events: HashSet<String> = HashSet::new();
449 let mut emit_by_path: HashMap<String, bool> = HashMap::new();
450
451 let mut debug_info = Vec::new();
453 for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
454 let event_type = self.get_event_type(&handler_spec.source);
455 let program_id = match &handler_spec.source {
456 crate::ast::SourceSpec::Source { program_id, .. } => {
457 program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
458 }
459 };
460 debug_info.push(format!(
461 " [{}] EventType={}, Mappings={}, ProgramId={}",
462 index,
463 event_type,
464 handler_spec.mappings.len(),
465 program_id
466 ));
467 }
468
469 for handler_spec in &self.spec.handlers {
479 for mapping in &handler_spec.mappings {
480 if let Some(when) = &mapping.when {
481 when_events.insert(when.clone());
482 }
483 let entry = emit_by_path
484 .entry(mapping.target_path.clone())
485 .or_insert(false);
486 *entry |= mapping.emit;
487 if mapping.stop.is_some() {
488 emit_by_path
489 .entry(stop_field_path(&mapping.target_path))
490 .or_insert(false);
491 }
492 }
493 let opcodes = self.compile_handler(handler_spec);
494 let event_type = self.get_event_type(&handler_spec.source);
495
496 if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
497 let mut existing_setup = Vec::new();
502 let mut existing_mappings = Vec::new();
503 let mut existing_teardown = Vec::new();
504 let mut section = 0; for opcode in existing_opcodes.iter() {
507 match opcode {
508 OpCode::ReadOrInitState { .. } => {
509 existing_setup.push(opcode.clone());
510 section = 1; }
512 OpCode::UpdateState { .. } => {
513 existing_teardown.push(opcode.clone());
514 section = 2; }
516 OpCode::EmitMutation { .. } => {
517 existing_teardown.push(opcode.clone());
518 }
519 _ if section == 0 => existing_setup.push(opcode.clone()),
520 _ if section == 1 => existing_mappings.push(opcode.clone()),
521 _ => existing_teardown.push(opcode.clone()),
522 }
523 }
524
525 let mut new_mappings = Vec::new();
527 section = 0;
528
529 for opcode in opcodes.iter() {
530 match opcode {
531 OpCode::ReadOrInitState { .. } => {
532 section = 1; }
534 OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
535 section = 2; }
537 _ if section == 1 => {
538 new_mappings.push(opcode.clone());
539 }
540 _ => {} }
542 }
543
544 let mut merged = Vec::new();
546 merged.extend(existing_setup);
547 merged.extend(existing_mappings);
548 merged.extend(new_mappings.clone());
549 merged.extend(existing_teardown);
550
551 *existing_opcodes = merged;
552 } else {
553 handlers.insert(event_type, opcodes);
554 }
555 }
556
557 for hook in &self.spec.instruction_hooks {
559 let event_type = hook.instruction_type.clone();
560
561 let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
562 let key_reg = 20;
563 let state_reg = 2;
564 let resolved_key_reg = 19;
565 let temp_reg = 18;
566
567 let mut ops = Vec::new();
568
569 ops.push(OpCode::LoadEventField {
571 path: FieldPath::new(&["__resolved_primary_key"]),
572 dest: resolved_key_reg,
573 default: Some(serde_json::json!(null)),
574 });
575
576 ops.push(OpCode::CopyRegister {
578 source: resolved_key_reg,
579 dest: key_reg,
580 });
581
582 if let Some(lookup_path) = &hook.lookup_by {
584 ops.push(OpCode::LoadEventField {
586 path: lookup_path.clone(),
587 dest: temp_reg,
588 default: None,
589 });
590
591 ops.push(OpCode::Transform {
593 source: temp_reg,
594 dest: temp_reg,
595 transformation: Transformation::HexEncode,
596 });
597
598 ops.push(OpCode::CopyRegisterIfNull {
600 source: temp_reg,
601 dest: key_reg,
602 });
603 }
604
605 ops.push(OpCode::ReadOrInitState {
606 state_id: self.state_id,
607 key: key_reg,
608 default: serde_json::json!({}),
609 dest: state_reg,
610 });
611
612 ops.push(OpCode::UpdateState {
613 state_id: self.state_id,
614 key: key_reg,
615 value: state_reg,
616 });
617
618 ops
619 });
620
621 let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
623
624 let insert_pos = handler_opcodes
628 .iter()
629 .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
630 .or_else(|| {
631 handler_opcodes
632 .iter()
633 .position(|op| matches!(op, OpCode::UpdateState { .. }))
634 });
635
636 if let Some(pos) = insert_pos {
637 for (i, opcode) in hook_opcodes.into_iter().enumerate() {
639 handler_opcodes.insert(pos + i, opcode);
640 }
641 }
642 }
643
644 let non_emitted_fields: HashSet<String> = emit_by_path
645 .into_iter()
646 .filter_map(|(path, emit)| if emit { None } else { Some(path) })
647 .collect();
648
649 EntityBytecode {
650 state_id: self.state_id,
651 handlers,
652 entity_name: self.entity_name.clone(),
653 when_events,
654 non_emitted_fields,
655 computed_paths: self.spec.computed_fields.clone(),
656 computed_fields_evaluator: None,
657 }
658 }
659
660 fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
661 let mut ops = Vec::new();
662 let state_reg = 2;
663 let key_reg = 20;
664
665 ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
666
667 ops.push(OpCode::ReadOrInitState {
668 state_id: self.state_id,
669 key: key_reg,
670 default: serde_json::json!({}),
671 dest: state_reg,
672 });
673
674 ops.extend(self.compile_temporal_index_update(
681 &spec.key_resolution,
682 key_reg,
683 &spec.mappings,
684 ));
685
686 for mapping in &spec.mappings {
687 ops.extend(self.compile_mapping(mapping, state_reg, key_reg));
688 }
689
690 ops.extend(self.compile_resolvers(state_reg, key_reg));
691
692 ops.push(OpCode::EvaluateComputedFields {
694 state: state_reg,
695 computed_paths: self.spec.computed_fields.clone(),
696 });
697
698 ops.push(OpCode::UpdateState {
699 state_id: self.state_id,
700 key: key_reg,
701 value: state_reg,
702 });
703
704 if spec.emit {
705 ops.push(OpCode::EmitMutation {
706 entity_name: self.entity_name.clone(),
707 key: key_reg,
708 state: state_reg,
709 });
710 }
711
712 ops
713 }
714
715 fn compile_resolvers(&self, state_reg: Register, key_reg: Register) -> Vec<OpCode> {
716 let mut ops = Vec::new();
717
718 for resolver_spec in &self.spec.resolver_specs {
719 let url_template = match &resolver_spec.resolver {
720 ResolverType::Url(config) => match &config.url_source {
721 UrlSource::Template(parts) => Some(parts.clone()),
722 _ => None,
723 },
724 _ => None,
725 };
726
727 ops.push(OpCode::QueueResolver {
728 state_id: self.state_id,
729 entity_name: self.entity_name.clone(),
730 resolver: resolver_spec.resolver.clone(),
731 input_path: resolver_spec.input_path.clone(),
732 input_value: resolver_spec.input_value.clone(),
733 url_template,
734 strategy: resolver_spec.strategy.clone(),
735 extracts: resolver_spec.extracts.clone(),
736 condition: resolver_spec.condition.clone(),
737 schedule_at: resolver_spec.schedule_at.clone(),
738 state: state_reg,
739 key: key_reg,
740 });
741 }
742
743 ops
744 }
745
746 fn compile_mapping(
747 &self,
748 mapping: &TypedFieldMapping<S>,
749 state_reg: Register,
750 key_reg: Register,
751 ) -> Vec<OpCode> {
752 let mut ops = Vec::new();
753 let temp_reg = 10;
754
755 ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
756
757 if let Some(transform) = &mapping.transform {
758 ops.push(OpCode::Transform {
759 source: temp_reg,
760 dest: temp_reg,
761 transformation: transform.clone(),
762 });
763 }
764
765 if let Some(stop_instruction) = &mapping.stop {
766 if mapping.when.is_some() {
767 tracing::warn!(
768 "#[map] stop and when both set for {}. Ignoring when.",
769 mapping.target_path
770 );
771 }
772 if !matches!(mapping.population, PopulationStrategy::LastWrite)
773 && !matches!(mapping.population, PopulationStrategy::Merge)
774 {
775 tracing::warn!(
776 "#[map] stop ignores population strategy {:?}",
777 mapping.population
778 );
779 }
780
781 ops.push(OpCode::SetFieldUnlessStopped {
782 object: state_reg,
783 path: mapping.target_path.clone(),
784 value: temp_reg,
785 stop_field: stop_field_path(&mapping.target_path),
786 stop_instruction: stop_instruction.clone(),
787 entity_name: self.entity_name.clone(),
788 key_reg,
789 });
790 return ops;
791 }
792
793 if let Some(when_instruction) = &mapping.when {
794 if !matches!(mapping.population, PopulationStrategy::LastWrite)
795 && !matches!(mapping.population, PopulationStrategy::Merge)
796 {
797 tracing::warn!(
798 "#[map] when ignores population strategy {:?}",
799 mapping.population
800 );
801 }
802 let (condition_field, condition_op, condition_value) = mapping
803 .condition
804 .as_ref()
805 .and_then(|cond| cond.parsed.as_ref())
806 .and_then(|parsed| match parsed {
807 ParsedCondition::Comparison { field, op, value } => {
808 Some((Some(field.clone()), Some(op.clone()), Some(value.clone())))
809 }
810 ParsedCondition::Logical { .. } => {
811 tracing::warn!("Logical conditions not yet supported for #[map] when");
812 None
813 }
814 })
815 .unwrap_or((None, None, None));
816
817 ops.push(OpCode::SetFieldWhen {
818 object: state_reg,
819 path: mapping.target_path.clone(),
820 value: temp_reg,
821 when_instruction: when_instruction.clone(),
822 entity_name: self.entity_name.clone(),
823 key_reg,
824 condition_field,
825 condition_op,
826 condition_value,
827 });
828 return ops;
829 }
830
831 if let Some(condition) = &mapping.condition {
832 if let Some(parsed) = &condition.parsed {
833 match parsed {
834 ParsedCondition::Comparison {
835 field,
836 op,
837 value: cond_value,
838 } => {
839 if matches!(mapping.population, PopulationStrategy::LastWrite)
840 || matches!(mapping.population, PopulationStrategy::Merge)
841 {
842 ops.push(OpCode::ConditionalSetField {
843 object: state_reg,
844 path: mapping.target_path.clone(),
845 value: temp_reg,
846 condition_field: field.clone(),
847 condition_op: op.clone(),
848 condition_value: cond_value.clone(),
849 });
850 return ops;
851 }
852
853 if matches!(mapping.population, PopulationStrategy::Count) {
854 ops.push(OpCode::ConditionalIncrement {
855 object: state_reg,
856 path: mapping.target_path.clone(),
857 condition_field: field.clone(),
858 condition_op: op.clone(),
859 condition_value: cond_value.clone(),
860 });
861 return ops;
862 }
863
864 tracing::warn!(
865 "Conditional #[map] not supported for population strategy {:?}",
866 mapping.population
867 );
868 }
869 ParsedCondition::Logical { .. } => {
870 tracing::warn!("Logical conditions not yet supported for #[map]");
871 }
872 }
873 }
874 }
875
876 match &mapping.population {
877 PopulationStrategy::Append => {
878 ops.push(OpCode::AppendToArray {
879 object: state_reg,
880 path: mapping.target_path.clone(),
881 value: temp_reg,
882 });
883 }
884 PopulationStrategy::LastWrite => {
885 ops.push(OpCode::SetField {
886 object: state_reg,
887 path: mapping.target_path.clone(),
888 value: temp_reg,
889 });
890 }
891 PopulationStrategy::SetOnce => {
892 ops.push(OpCode::SetFieldIfNull {
893 object: state_reg,
894 path: mapping.target_path.clone(),
895 value: temp_reg,
896 });
897 }
898 PopulationStrategy::Merge => {
899 ops.push(OpCode::SetField {
900 object: state_reg,
901 path: mapping.target_path.clone(),
902 value: temp_reg,
903 });
904 }
905 PopulationStrategy::Max => {
906 ops.push(OpCode::SetFieldMax {
907 object: state_reg,
908 path: mapping.target_path.clone(),
909 value: temp_reg,
910 });
911 }
912 PopulationStrategy::Sum => {
913 ops.push(OpCode::SetFieldSum {
914 object: state_reg,
915 path: mapping.target_path.clone(),
916 value: temp_reg,
917 });
918 }
919 PopulationStrategy::Count => {
920 ops.push(OpCode::SetFieldIncrement {
922 object: state_reg,
923 path: mapping.target_path.clone(),
924 });
925 }
926 PopulationStrategy::Min => {
927 ops.push(OpCode::SetFieldMin {
928 object: state_reg,
929 path: mapping.target_path.clone(),
930 value: temp_reg,
931 });
932 }
933 PopulationStrategy::UniqueCount => {
934 let set_name = format!("{}_unique_set", mapping.target_path);
937 ops.push(OpCode::AddToUniqueSet {
938 state_id: self.state_id,
939 set_name,
940 value: temp_reg,
941 count_object: state_reg,
942 count_path: mapping.target_path.clone(),
943 });
944 }
945 }
946
947 ops
948 }
949
950 fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
951 match source {
952 MappingSource::FromSource {
953 path,
954 default,
955 transform,
956 } => {
957 let mut ops = vec![OpCode::LoadEventField {
958 path: path.clone(),
959 dest,
960 default: default.clone(),
961 }];
962
963 if let Some(transform_type) = transform {
965 ops.push(OpCode::Transform {
966 source: dest,
967 dest,
968 transformation: transform_type.clone(),
969 });
970 }
971
972 ops
973 }
974 MappingSource::Constant(val) => {
975 vec![OpCode::LoadConstant {
976 value: val.clone(),
977 dest,
978 }]
979 }
980 MappingSource::AsEvent { fields } => {
981 let mut ops = Vec::new();
982
983 if fields.is_empty() {
984 let event_data_reg = dest + 1;
985 ops.push(OpCode::LoadEventField {
986 path: FieldPath::new(&[]),
987 dest: event_data_reg,
988 default: Some(serde_json::json!({})),
989 });
990 ops.push(OpCode::CreateEvent {
991 dest,
992 event_value: event_data_reg,
993 });
994 } else {
995 let data_obj_reg = dest + 1;
996 ops.push(OpCode::CreateObject { dest: data_obj_reg });
997
998 let mut field_registers = Vec::new();
999 let mut current_reg = dest + 2;
1000
1001 for field_source in fields.iter() {
1002 if let MappingSource::FromSource {
1003 path,
1004 default,
1005 transform,
1006 } = &**field_source
1007 {
1008 ops.push(OpCode::LoadEventField {
1009 path: path.clone(),
1010 dest: current_reg,
1011 default: default.clone(),
1012 });
1013
1014 if let Some(transform_type) = transform {
1015 ops.push(OpCode::Transform {
1016 source: current_reg,
1017 dest: current_reg,
1018 transformation: transform_type.clone(),
1019 });
1020 }
1021
1022 if let Some(field_name) = path.segments.last() {
1023 field_registers.push((field_name.clone(), current_reg));
1024 }
1025 current_reg += 1;
1026 }
1027 }
1028
1029 if !field_registers.is_empty() {
1030 ops.push(OpCode::SetFields {
1031 object: data_obj_reg,
1032 fields: field_registers,
1033 });
1034 }
1035
1036 ops.push(OpCode::CreateEvent {
1037 dest,
1038 event_value: data_obj_reg,
1039 });
1040 }
1041
1042 ops
1043 }
1044 MappingSource::WholeSource => {
1045 vec![OpCode::LoadEventField {
1046 path: FieldPath::new(&[]),
1047 dest,
1048 default: Some(serde_json::json!({})),
1049 }]
1050 }
1051 MappingSource::AsCapture { field_transforms } => {
1052 let capture_data_reg = 22; let mut ops = vec![OpCode::LoadEventField {
1055 path: FieldPath::new(&[]),
1056 dest: capture_data_reg,
1057 default: Some(serde_json::json!({})),
1058 }];
1059
1060 let field_reg = 24;
1064 let transformed_reg = 25;
1065
1066 for (field_name, transform) in field_transforms {
1067 ops.push(OpCode::GetField {
1070 object: capture_data_reg,
1071 path: field_name.clone(),
1072 dest: field_reg,
1073 });
1074
1075 ops.push(OpCode::Transform {
1077 source: field_reg,
1078 dest: transformed_reg,
1079 transformation: transform.clone(),
1080 });
1081
1082 ops.push(OpCode::SetField {
1084 object: capture_data_reg,
1085 path: field_name.clone(),
1086 value: transformed_reg,
1087 });
1088 }
1089
1090 ops.push(OpCode::CreateCapture {
1092 dest,
1093 capture_value: capture_data_reg,
1094 });
1095
1096 ops
1097 }
1098 MappingSource::FromContext { field } => {
1099 vec![OpCode::LoadEventField {
1101 path: FieldPath::new(&["__update_context", field.as_str()]),
1102 dest,
1103 default: Some(serde_json::json!(null)),
1104 }]
1105 }
1106 MappingSource::Computed { .. } => {
1107 vec![]
1108 }
1109 MappingSource::FromState { .. } => {
1110 vec![]
1111 }
1112 }
1113 }
1114
1115 pub fn compile_key_loading(
1116 &self,
1117 resolution: &KeyResolutionStrategy,
1118 key_reg: Register,
1119 mappings: &[TypedFieldMapping<S>],
1120 ) -> Vec<OpCode> {
1121 let mut ops = Vec::new();
1122
1123 let resolved_key_reg = 19; ops.push(OpCode::LoadEventField {
1127 path: FieldPath::new(&["__resolved_primary_key"]),
1128 dest: resolved_key_reg,
1129 default: Some(serde_json::json!(null)),
1130 });
1131
1132 match resolution {
1134 KeyResolutionStrategy::Embedded { primary_field } => {
1135 ops.push(OpCode::CopyRegister {
1137 source: resolved_key_reg,
1138 dest: key_reg,
1139 });
1140
1141 let effective_primary_field = if primary_field.segments.is_empty() {
1143 if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
1145 auto_field
1146 } else {
1147 primary_field.clone()
1148 }
1149 } else {
1150 primary_field.clone()
1151 };
1152
1153 if !effective_primary_field.segments.is_empty() {
1157 let temp_reg = 18;
1158 let transform_reg = 23; ops.push(OpCode::LoadEventField {
1161 path: effective_primary_field.clone(),
1162 dest: temp_reg,
1163 default: None,
1164 });
1165
1166 let primary_key_transform = self
1169 .find_primary_key_transformation(mappings)
1170 .or_else(|| self.find_inherited_primary_key_transformation());
1171
1172 if let Some(transform) = primary_key_transform {
1173 ops.push(OpCode::Transform {
1175 source: temp_reg,
1176 dest: transform_reg,
1177 transformation: transform,
1178 });
1179 ops.push(OpCode::CopyRegisterIfNull {
1181 source: transform_reg,
1182 dest: key_reg,
1183 });
1184 } else {
1185 ops.push(OpCode::CopyRegisterIfNull {
1187 source: temp_reg,
1188 dest: key_reg,
1189 });
1190 }
1191 }
1192 }
1195 KeyResolutionStrategy::Lookup { primary_field } => {
1196 let lookup_reg = 15;
1197 let result_reg = 17;
1198
1199 ops.push(OpCode::CopyRegister {
1201 source: resolved_key_reg,
1202 dest: lookup_reg,
1203 });
1204
1205 let temp_reg = 18;
1206 ops.push(OpCode::LoadEventField {
1207 path: primary_field.clone(),
1208 dest: temp_reg,
1209 default: None,
1210 });
1211 ops.push(OpCode::CopyRegisterIfNull {
1212 source: temp_reg,
1213 dest: lookup_reg,
1214 });
1215
1216 let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
1217 let effective_index_name =
1218 index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
1219
1220 ops.push(OpCode::LookupIndex {
1221 state_id: self.state_id,
1222 index_name: effective_index_name,
1223 lookup_value: lookup_reg,
1224 dest: result_reg,
1225 });
1226 ops.push(OpCode::CopyRegister {
1238 source: resolved_key_reg,
1239 dest: key_reg,
1240 });
1241 ops.push(OpCode::CopyRegisterIfNull {
1245 source: result_reg,
1246 dest: key_reg,
1247 });
1248 }
1249 KeyResolutionStrategy::Computed {
1250 primary_field,
1251 compute_partition: _,
1252 } => {
1253 ops.push(OpCode::CopyRegister {
1255 source: resolved_key_reg,
1256 dest: key_reg,
1257 });
1258 let temp_reg = 18;
1259 ops.push(OpCode::LoadEventField {
1260 path: primary_field.clone(),
1261 dest: temp_reg,
1262 default: None,
1263 });
1264 ops.push(OpCode::CopyRegisterIfNull {
1265 source: temp_reg,
1266 dest: key_reg,
1267 });
1268 }
1269 KeyResolutionStrategy::TemporalLookup {
1270 lookup_field,
1271 timestamp_field,
1272 index_name,
1273 } => {
1274 ops.push(OpCode::CopyRegister {
1276 source: resolved_key_reg,
1277 dest: key_reg,
1278 });
1279 let lookup_reg = 15;
1280 let timestamp_reg = 16;
1281 let result_reg = 17;
1282
1283 ops.push(OpCode::LoadEventField {
1284 path: lookup_field.clone(),
1285 dest: lookup_reg,
1286 default: None,
1287 });
1288
1289 ops.push(OpCode::LoadEventField {
1290 path: timestamp_field.clone(),
1291 dest: timestamp_reg,
1292 default: None,
1293 });
1294
1295 ops.push(OpCode::LookupTemporalIndex {
1296 state_id: self.state_id,
1297 index_name: index_name.clone(),
1298 lookup_value: lookup_reg,
1299 timestamp: timestamp_reg,
1300 dest: result_reg,
1301 });
1302
1303 ops.push(OpCode::CopyRegisterIfNull {
1304 source: result_reg,
1305 dest: key_reg,
1306 });
1307 }
1308 }
1309
1310 ops
1311 }
1312
1313 fn find_primary_key_transformation(
1314 &self,
1315 mappings: &[TypedFieldMapping<S>],
1316 ) -> Option<Transformation> {
1317 let primary_key = self.spec.identity.primary_keys.first()?;
1319 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1320
1321 for mapping in mappings {
1323 if mapping.target_path == *primary_key
1325 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1326 {
1327 if let Some(transform) = &mapping.transform {
1329 return Some(transform.clone());
1330 }
1331
1332 if let MappingSource::FromSource {
1334 transform: Some(transform),
1335 ..
1336 } = &mapping.source
1337 {
1338 return Some(transform.clone());
1339 }
1340 }
1341 }
1342
1343 for mapping in mappings {
1345 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1346 if let Some(transform) = field_transforms.get(&primary_field_name) {
1347 return Some(transform.clone());
1348 }
1349 }
1350 }
1351
1352 None
1353 }
1354
1355 pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1358 let primary_key = self.spec.identity.primary_keys.first()?;
1359
1360 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1362
1363 for handler in &self.spec.handlers {
1365 for mapping in &handler.mappings {
1366 if mapping.target_path == *primary_key
1368 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1369 {
1370 if let MappingSource::FromSource {
1372 path, transform, ..
1373 } = &mapping.source
1374 {
1375 if path.segments.last() == Some(&primary_field_name) {
1376 return mapping.transform.clone().or_else(|| transform.clone());
1378 }
1379 }
1380 }
1381
1382 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1384 if let Some(transform) = field_transforms.get(&primary_field_name) {
1385 return Some(transform.clone());
1386 }
1387 }
1388 }
1389 }
1390
1391 None
1392 }
1393
1394 fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1396 primary_key.split('.').next_back().map(|s| s.to_string())
1398 }
1399
1400 pub fn auto_detect_primary_field(
1403 &self,
1404 current_mappings: &[TypedFieldMapping<S>],
1405 ) -> Option<FieldPath> {
1406 let primary_key = self.spec.identity.primary_keys.first()?;
1407
1408 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1410
1411 if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1413 return Some(FieldPath::new(&[&primary_field_name]));
1414 }
1415
1416 None
1417 }
1418
1419 fn current_account_has_primary_field(
1422 &self,
1423 field_name: &str,
1424 mappings: &[TypedFieldMapping<S>],
1425 ) -> bool {
1426 for mapping in mappings {
1428 if let MappingSource::FromSource { path, .. } = &mapping.source {
1429 if path.segments.last() == Some(&field_name.to_string()) {
1431 return true;
1432 }
1433 }
1434 }
1435
1436 false
1437 }
1438
1439 #[allow(dead_code)]
1441 fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1442 for mapping in mappings {
1443 if let MappingSource::FromSource { path, .. } = &mapping.source {
1444 if path.segments.last() == Some(&field_name.to_string()) {
1445 return true;
1446 }
1447 }
1448 }
1449 false
1450 }
1451
1452 #[allow(dead_code)]
1455 fn field_exists_in_mappings(
1456 &self,
1457 field_name: &str,
1458 mappings: &[TypedFieldMapping<S>],
1459 ) -> bool {
1460 for mapping in mappings {
1462 if let MappingSource::FromSource { path, .. } = &mapping.source {
1463 if path.segments.last() == Some(&field_name.to_string()) {
1464 return true;
1465 }
1466 }
1467 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1469 if field_transforms.contains_key(field_name) {
1470 return true;
1471 }
1472 }
1473 }
1474 false
1475 }
1476
1477 fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1478 if field_path.segments.is_empty() {
1479 return None;
1480 }
1481
1482 let lookup_field_name = field_path.segments.last().unwrap();
1483
1484 for lookup_index in &self.spec.identity.lookup_indexes {
1485 let index_field_name = lookup_index
1486 .field_name
1487 .split('.')
1488 .next_back()
1489 .unwrap_or(&lookup_index.field_name);
1490 if index_field_name == lookup_field_name {
1491 return Some(format!("{}_lookup_index", index_field_name));
1492 }
1493 }
1494
1495 None
1496 }
1497
1498 fn find_lookup_index_for_lookup_field(
1501 &self,
1502 primary_field: &FieldPath,
1503 mappings: &[TypedFieldMapping<S>],
1504 ) -> Option<String> {
1505 let primary_path = primary_field.segments.join(".");
1507
1508 for mapping in mappings {
1510 if let MappingSource::FromSource { path, .. } = &mapping.source {
1512 let source_path = path.segments.join(".");
1513 if source_path == primary_path {
1514 for lookup_index in &self.spec.identity.lookup_indexes {
1516 if mapping.target_path == lookup_index.field_name {
1517 let index_field_name = lookup_index
1518 .field_name
1519 .split('.')
1520 .next_back()
1521 .unwrap_or(&lookup_index.field_name);
1522 return Some(format!("{}_lookup_index", index_field_name));
1523 }
1524 }
1525 }
1526 }
1527 }
1528
1529 self.find_lookup_index_for_field(primary_field)
1531 }
1532
1533 fn find_source_path_for_lookup_index(
1537 &self,
1538 mappings: &[TypedFieldMapping<S>],
1539 lookup_field_name: &str,
1540 ) -> Option<Vec<String>> {
1541 for mapping in mappings {
1542 if mapping.target_path == lookup_field_name {
1543 if let MappingSource::FromSource { path, .. } = &mapping.source {
1544 return Some(path.segments.clone());
1545 }
1546 }
1547 }
1548 None
1549 }
1550
1551 fn compile_temporal_index_update(
1552 &self,
1553 resolution: &KeyResolutionStrategy,
1554 key_reg: Register,
1555 mappings: &[TypedFieldMapping<S>],
1556 ) -> Vec<OpCode> {
1557 let mut ops = Vec::new();
1558
1559 for lookup_index in &self.spec.identity.lookup_indexes {
1560 let lookup_reg = 17;
1561 let source_field = lookup_index
1562 .field_name
1563 .split('.')
1564 .next_back()
1565 .unwrap_or(&lookup_index.field_name);
1566
1567 match resolution {
1568 KeyResolutionStrategy::Embedded { primary_field: _ } => {
1569 let source_path_opt =
1572 self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1573
1574 let load_path = if let Some(ref path) = source_path_opt {
1575 FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1576 } else {
1577 FieldPath::new(&[source_field])
1579 };
1580
1581 ops.push(OpCode::LoadEventField {
1582 path: load_path,
1583 dest: lookup_reg,
1584 default: None,
1585 });
1586
1587 if let Some(temporal_field_name) = &lookup_index.temporal_field {
1588 let timestamp_reg = 18;
1589
1590 ops.push(OpCode::LoadEventField {
1591 path: FieldPath::new(&[temporal_field_name]),
1592 dest: timestamp_reg,
1593 default: None,
1594 });
1595
1596 let index_name = format!("{}_temporal_index", source_field);
1597 ops.push(OpCode::UpdateTemporalIndex {
1598 state_id: self.state_id,
1599 index_name,
1600 lookup_value: lookup_reg,
1601 primary_key: key_reg,
1602 timestamp: timestamp_reg,
1603 });
1604
1605 let simple_index_name = format!("{}_lookup_index", source_field);
1606 ops.push(OpCode::UpdateLookupIndex {
1607 state_id: self.state_id,
1608 index_name: simple_index_name,
1609 lookup_value: lookup_reg,
1610 primary_key: key_reg,
1611 });
1612 } else {
1613 let index_name = format!("{}_lookup_index", source_field);
1614 ops.push(OpCode::UpdateLookupIndex {
1615 state_id: self.state_id,
1616 index_name,
1617 lookup_value: lookup_reg,
1618 primary_key: key_reg,
1619 });
1620 }
1621
1622 if source_path_opt.is_some() {
1626 ops.push(OpCode::UpdatePdaReverseLookup {
1627 state_id: self.state_id,
1628 lookup_name: "default_pda_lookup".to_string(),
1629 pda_address: lookup_reg,
1630 primary_key: key_reg,
1631 });
1632 }
1633 }
1634 KeyResolutionStrategy::Lookup { primary_field } => {
1635 let has_mapping_to_lookup_field = mappings
1638 .iter()
1639 .any(|m| m.target_path == lookup_index.field_name);
1640
1641 if has_mapping_to_lookup_field {
1642 let path_segments: Vec<&str> =
1645 primary_field.segments.iter().map(|s| s.as_str()).collect();
1646 ops.push(OpCode::LoadEventField {
1647 path: FieldPath::new(&path_segments),
1648 dest: lookup_reg,
1649 default: None,
1650 });
1651
1652 let index_name = format!("{}_lookup_index", source_field);
1653 ops.push(OpCode::UpdateLookupIndex {
1654 state_id: self.state_id,
1655 index_name,
1656 lookup_value: lookup_reg,
1657 primary_key: key_reg,
1658 });
1659 }
1660 }
1661 KeyResolutionStrategy::Computed { .. }
1662 | KeyResolutionStrategy::TemporalLookup { .. } => {
1663 }
1665 }
1666 }
1667
1668 ops
1669 }
1670
1671 fn get_event_type(&self, source: &SourceSpec) -> String {
1672 match source {
1673 SourceSpec::Source { type_name, .. } => type_name.clone(),
1674 }
1675 }
1676
1677 fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1678 let mut ops = Vec::new();
1679 let state_reg = 2;
1680
1681 for action in actions {
1682 match action {
1683 HookAction::SetField {
1684 target_field,
1685 source,
1686 condition,
1687 } => {
1688 let _ = condition;
1690
1691 let temp_reg = 11; let load_ops = self.compile_mapping_source(source, temp_reg);
1695 ops.extend(load_ops);
1696
1697 if let MappingSource::FromSource {
1699 transform: Some(transform_type),
1700 ..
1701 } = source
1702 {
1703 ops.push(OpCode::Transform {
1704 source: temp_reg,
1705 dest: temp_reg,
1706 transformation: transform_type.clone(),
1707 });
1708 }
1709
1710 if let Some(cond_expr) = condition {
1712 if let Some(parsed) = &cond_expr.parsed {
1713 let cond_check_ops = self.compile_condition_check(
1715 parsed,
1716 temp_reg,
1717 state_reg,
1718 target_field,
1719 );
1720 ops.extend(cond_check_ops);
1721 } else {
1722 ops.push(OpCode::SetField {
1724 object: state_reg,
1725 path: target_field.clone(),
1726 value: temp_reg,
1727 });
1728 }
1729 } else {
1730 ops.push(OpCode::SetField {
1732 object: state_reg,
1733 path: target_field.clone(),
1734 value: temp_reg,
1735 });
1736 }
1737 }
1738 HookAction::IncrementField {
1739 target_field,
1740 increment_by,
1741 condition,
1742 } => {
1743 if let Some(cond_expr) = condition {
1744 if let Some(parsed) = &cond_expr.parsed {
1745 let cond_check_ops = self.compile_conditional_increment(
1750 parsed,
1751 state_reg,
1752 target_field,
1753 *increment_by,
1754 );
1755 ops.extend(cond_check_ops);
1756 } else {
1757 ops.push(OpCode::SetFieldIncrement {
1759 object: state_reg,
1760 path: target_field.clone(),
1761 });
1762 }
1763 } else {
1764 ops.push(OpCode::SetFieldIncrement {
1766 object: state_reg,
1767 path: target_field.clone(),
1768 });
1769 }
1770 }
1771 HookAction::RegisterPdaMapping { .. } => {
1772 }
1775 }
1776 }
1777
1778 ops
1779 }
1780
1781 fn compile_condition_check(
1782 &self,
1783 condition: &ParsedCondition,
1784 value_reg: Register,
1785 state_reg: Register,
1786 target_field: &str,
1787 ) -> Vec<OpCode> {
1788 match condition {
1789 ParsedCondition::Comparison {
1790 field,
1791 op,
1792 value: cond_value,
1793 } => {
1794 vec![OpCode::ConditionalSetField {
1796 object: state_reg,
1797 path: target_field.to_string(),
1798 value: value_reg,
1799 condition_field: field.clone(),
1800 condition_op: op.clone(),
1801 condition_value: cond_value.clone(),
1802 }]
1803 }
1804 ParsedCondition::Logical { .. } => {
1805 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1807 vec![OpCode::SetField {
1808 object: state_reg,
1809 path: target_field.to_string(),
1810 value: value_reg,
1811 }]
1812 }
1813 }
1814 }
1815
1816 fn compile_conditional_increment(
1817 &self,
1818 condition: &ParsedCondition,
1819 state_reg: Register,
1820 target_field: &str,
1821 _increment_by: i64,
1822 ) -> Vec<OpCode> {
1823 match condition {
1824 ParsedCondition::Comparison {
1825 field,
1826 op,
1827 value: cond_value,
1828 } => {
1829 vec![OpCode::ConditionalIncrement {
1830 object: state_reg,
1831 path: target_field.to_string(),
1832 condition_field: field.clone(),
1833 condition_op: op.clone(),
1834 condition_value: cond_value.clone(),
1835 }]
1836 }
1837 ParsedCondition::Logical { .. } => {
1838 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1839 vec![OpCode::SetFieldIncrement {
1840 object: state_reg,
1841 path: target_field.to_string(),
1842 }]
1843 }
1844 }
1845 }
1846}