1use crate::ast::*;
2use serde_json::Value;
3use std::collections::{HashMap, HashSet};
4use tracing;
5
6pub type Register = usize;
7
8#[derive(Debug, Clone)]
9pub enum OpCode {
10 LoadEventField {
11 path: FieldPath,
12 dest: Register,
13 default: Option<Value>,
14 },
15 LoadConstant {
16 value: Value,
17 dest: Register,
18 },
19 CopyRegister {
20 source: Register,
21 dest: Register,
22 },
23 CopyRegisterIfNull {
25 source: Register,
26 dest: Register,
27 },
28 GetEventType {
29 dest: Register,
30 },
31 CreateObject {
32 dest: Register,
33 },
34 SetField {
35 object: Register,
36 path: String,
37 value: Register,
38 },
39 SetFields {
40 object: Register,
41 fields: Vec<(String, Register)>,
42 },
43 GetField {
44 object: Register,
45 path: String,
46 dest: Register,
47 },
48 ReadOrInitState {
49 state_id: u32,
50 key: Register,
51 default: Value,
52 dest: Register,
53 },
54 UpdateState {
55 state_id: u32,
56 key: Register,
57 value: Register,
58 },
59 AppendToArray {
60 object: Register,
61 path: String,
62 value: Register,
63 },
64 GetCurrentTimestamp {
65 dest: Register,
66 },
67 CreateEvent {
68 dest: Register,
69 event_value: Register,
70 },
71 CreateCapture {
72 dest: Register,
73 capture_value: Register,
74 },
75 Transform {
76 source: Register,
77 dest: Register,
78 transformation: Transformation,
79 },
80 EmitMutation {
81 entity_name: String,
82 key: Register,
83 state: Register,
84 },
85 SetFieldIfNull {
86 object: Register,
87 path: String,
88 value: Register,
89 },
90 SetFieldMax {
91 object: Register,
92 path: String,
93 value: Register,
94 },
95 UpdateTemporalIndex {
96 state_id: u32,
97 index_name: String,
98 lookup_value: Register,
99 primary_key: Register,
100 timestamp: Register,
101 },
102 LookupTemporalIndex {
103 state_id: u32,
104 index_name: String,
105 lookup_value: Register,
106 timestamp: Register,
107 dest: Register,
108 },
109 UpdateLookupIndex {
110 state_id: u32,
111 index_name: String,
112 lookup_value: Register,
113 primary_key: Register,
114 },
115 LookupIndex {
116 state_id: u32,
117 index_name: String,
118 lookup_value: Register,
119 dest: Register,
120 },
121 SetFieldSum {
123 object: Register,
124 path: String,
125 value: Register,
126 },
127 SetFieldIncrement {
129 object: Register,
130 path: String,
131 },
132 SetFieldMin {
134 object: Register,
135 path: String,
136 value: Register,
137 },
138 SetFieldWhen {
141 object: Register,
142 path: String,
143 value: Register,
144 when_instruction: String,
145 entity_name: String,
146 key_reg: Register,
147 condition_field: Option<FieldPath>,
148 condition_op: Option<ComparisonOp>,
149 condition_value: Option<Value>,
150 },
151 AddToUniqueSet {
154 state_id: u32,
155 set_name: String,
156 value: Register,
157 count_object: Register,
158 count_path: String,
159 },
160 ConditionalSetField {
162 object: Register,
163 path: String,
164 value: Register,
165 condition_field: FieldPath,
166 condition_op: ComparisonOp,
167 condition_value: Value,
168 },
169 ConditionalIncrement {
171 object: Register,
172 path: String,
173 condition_field: FieldPath,
174 condition_op: ComparisonOp,
175 condition_value: Value,
176 },
177 EvaluateComputedFields {
180 state: Register,
181 computed_paths: Vec<String>,
182 },
183 UpdatePdaReverseLookup {
186 state_id: u32,
187 lookup_name: String,
188 pda_address: Register,
189 primary_key: Register,
190 },
191}
192
193pub struct EntityBytecode {
194 pub state_id: u32,
195 pub handlers: HashMap<String, Vec<OpCode>>,
196 pub entity_name: String,
197 pub when_events: HashSet<String>,
198 pub non_emitted_fields: HashSet<String>,
199 #[allow(clippy::type_complexity)]
201 pub computed_fields_evaluator: Option<
202 Box<
203 dyn Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>> + Send + Sync,
204 >,
205 >,
206}
207
208impl std::fmt::Debug for EntityBytecode {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("EntityBytecode")
211 .field("state_id", &self.state_id)
212 .field("handlers", &self.handlers)
213 .field("entity_name", &self.entity_name)
214 .field("when_events", &self.when_events)
215 .field("non_emitted_fields", &self.non_emitted_fields)
216 .field(
217 "computed_fields_evaluator",
218 &self.computed_fields_evaluator.is_some(),
219 )
220 .finish()
221 }
222}
223
224#[derive(Debug)]
225pub struct MultiEntityBytecode {
226 pub entities: HashMap<String, EntityBytecode>,
227 pub event_routing: HashMap<String, Vec<String>>,
228 pub when_events: HashSet<String>,
229 pub proto_router: crate::proto_router::ProtoRouter,
230}
231
232impl MultiEntityBytecode {
233 pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
234 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
235 let entity_bytecode = compiler.compile_entity();
236
237 let mut entities = HashMap::new();
238 let mut event_routing = HashMap::new();
239 let mut when_events = HashSet::new();
240
241 for event_type in entity_bytecode.handlers.keys() {
242 event_routing
243 .entry(event_type.clone())
244 .or_insert_with(Vec::new)
245 .push(entity_name.clone());
246 }
247
248 when_events.extend(entity_bytecode.when_events.iter().cloned());
249
250 entities.insert(entity_name, entity_bytecode);
251
252 MultiEntityBytecode {
253 entities,
254 event_routing,
255 when_events,
256 proto_router: crate::proto_router::ProtoRouter::new(),
257 }
258 }
259
260 pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
261 let entities = HashMap::new();
262 let event_routing = HashMap::new();
263 let when_events = HashSet::new();
264
265 if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
266 panic!("from_entities requires type information - use builder pattern instead");
267 }
268
269 MultiEntityBytecode {
270 entities,
271 event_routing,
272 when_events,
273 proto_router: crate::proto_router::ProtoRouter::new(),
274 }
275 }
276
277 #[allow(clippy::new_ret_no_self)]
278 pub fn new() -> MultiEntityBytecodeBuilder {
279 MultiEntityBytecodeBuilder {
280 entities: HashMap::new(),
281 event_routing: HashMap::new(),
282 when_events: HashSet::new(),
283 proto_router: crate::proto_router::ProtoRouter::new(),
284 }
285 }
286}
287
288pub struct MultiEntityBytecodeBuilder {
289 entities: HashMap<String, EntityBytecode>,
290 event_routing: HashMap<String, Vec<String>>,
291 when_events: HashSet<String>,
292 proto_router: crate::proto_router::ProtoRouter,
293}
294
295impl MultiEntityBytecodeBuilder {
296 pub fn add_entity<S>(
297 self,
298 entity_name: String,
299 spec: TypedStreamSpec<S>,
300 state_id: u32,
301 ) -> Self {
302 self.add_entity_with_evaluator(
303 entity_name,
304 spec,
305 state_id,
306 None::<fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>>,
307 )
308 }
309
310 pub fn add_entity_with_evaluator<S, F>(
311 mut self,
312 entity_name: String,
313 spec: TypedStreamSpec<S>,
314 state_id: u32,
315 evaluator: Option<F>,
316 ) -> Self
317 where
318 F: Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>
319 + Send
320 + Sync
321 + 'static,
322 {
323 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
324 let mut entity_bytecode = compiler.compile_entity();
325
326 if let Some(eval) = evaluator {
328 entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
329 }
330
331 for event_type in entity_bytecode.handlers.keys() {
332 self.event_routing
333 .entry(event_type.clone())
334 .or_default()
335 .push(entity_name.clone());
336 }
337
338 self.when_events
339 .extend(entity_bytecode.when_events.iter().cloned());
340
341 self.entities.insert(entity_name, entity_bytecode);
342 self
343 }
344
345 pub fn build(self) -> MultiEntityBytecode {
346 MultiEntityBytecode {
347 entities: self.entities,
348 event_routing: self.event_routing,
349 when_events: self.when_events,
350 proto_router: self.proto_router,
351 }
352 }
353}
354
355pub struct TypedCompiler<S> {
356 pub spec: TypedStreamSpec<S>,
357 entity_name: String,
358 state_id: u32,
359}
360
361impl<S> TypedCompiler<S> {
362 pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
363 TypedCompiler {
364 spec,
365 entity_name,
366 state_id: 0,
367 }
368 }
369
370 pub fn with_state_id(mut self, state_id: u32) -> Self {
371 self.state_id = state_id;
372 self
373 }
374
375 pub fn compile(&self) -> MultiEntityBytecode {
376 let entity_bytecode = self.compile_entity();
377
378 let mut entities = HashMap::new();
379 let mut event_routing = HashMap::new();
380 let mut when_events = HashSet::new();
381
382 for event_type in entity_bytecode.handlers.keys() {
383 event_routing
384 .entry(event_type.clone())
385 .or_insert_with(Vec::new)
386 .push(self.entity_name.clone());
387 }
388
389 when_events.extend(entity_bytecode.when_events.iter().cloned());
390
391 entities.insert(self.entity_name.clone(), entity_bytecode);
392
393 MultiEntityBytecode {
394 entities,
395 event_routing,
396 when_events,
397 proto_router: crate::proto_router::ProtoRouter::new(),
398 }
399 }
400
401 fn compile_entity(&self) -> EntityBytecode {
402 let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
403 let mut when_events: HashSet<String> = HashSet::new();
404 let mut emit_by_path: HashMap<String, bool> = HashMap::new();
405
406 let mut debug_info = Vec::new();
408 for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
409 let event_type = self.get_event_type(&handler_spec.source);
410 let program_id = match &handler_spec.source {
411 crate::ast::SourceSpec::Source { program_id, .. } => {
412 program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
413 }
414 };
415 debug_info.push(format!(
416 " [{}] EventType={}, Mappings={}, ProgramId={}",
417 index,
418 event_type,
419 handler_spec.mappings.len(),
420 program_id
421 ));
422 }
423
424 for handler_spec in &self.spec.handlers {
434 for mapping in &handler_spec.mappings {
435 if let Some(when) = &mapping.when {
436 when_events.insert(when.clone());
437 }
438 let entry = emit_by_path
439 .entry(mapping.target_path.clone())
440 .or_insert(false);
441 *entry |= mapping.emit;
442 }
443 let opcodes = self.compile_handler(handler_spec);
444 let event_type = self.get_event_type(&handler_spec.source);
445
446 if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
447 let mut existing_setup = Vec::new();
452 let mut existing_mappings = Vec::new();
453 let mut existing_teardown = Vec::new();
454 let mut section = 0; for opcode in existing_opcodes.iter() {
457 match opcode {
458 OpCode::ReadOrInitState { .. } => {
459 existing_setup.push(opcode.clone());
460 section = 1; }
462 OpCode::UpdateState { .. } => {
463 existing_teardown.push(opcode.clone());
464 section = 2; }
466 OpCode::EmitMutation { .. } => {
467 existing_teardown.push(opcode.clone());
468 }
469 _ if section == 0 => existing_setup.push(opcode.clone()),
470 _ if section == 1 => existing_mappings.push(opcode.clone()),
471 _ => existing_teardown.push(opcode.clone()),
472 }
473 }
474
475 let mut new_mappings = Vec::new();
477 section = 0;
478
479 for opcode in opcodes.iter() {
480 match opcode {
481 OpCode::ReadOrInitState { .. } => {
482 section = 1; }
484 OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
485 section = 2; }
487 _ if section == 1 => {
488 new_mappings.push(opcode.clone());
489 }
490 _ => {} }
492 }
493
494 let mut merged = Vec::new();
496 merged.extend(existing_setup);
497 merged.extend(existing_mappings);
498 merged.extend(new_mappings.clone());
499 merged.extend(existing_teardown);
500
501 *existing_opcodes = merged;
502 } else {
503 handlers.insert(event_type, opcodes);
504 }
505 }
506
507 for hook in &self.spec.instruction_hooks {
509 let event_type = hook.instruction_type.clone();
510
511 let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
512 let key_reg = 20;
513 let state_reg = 2;
514 let resolved_key_reg = 19;
515 let temp_reg = 18;
516
517 let mut ops = Vec::new();
518
519 ops.push(OpCode::LoadEventField {
521 path: FieldPath::new(&["__resolved_primary_key"]),
522 dest: resolved_key_reg,
523 default: Some(serde_json::json!(null)),
524 });
525
526 ops.push(OpCode::CopyRegister {
528 source: resolved_key_reg,
529 dest: key_reg,
530 });
531
532 if let Some(lookup_path) = &hook.lookup_by {
534 ops.push(OpCode::LoadEventField {
536 path: lookup_path.clone(),
537 dest: temp_reg,
538 default: None,
539 });
540
541 ops.push(OpCode::Transform {
543 source: temp_reg,
544 dest: temp_reg,
545 transformation: Transformation::HexEncode,
546 });
547
548 ops.push(OpCode::CopyRegisterIfNull {
550 source: temp_reg,
551 dest: key_reg,
552 });
553 }
554
555 ops.push(OpCode::ReadOrInitState {
556 state_id: self.state_id,
557 key: key_reg,
558 default: serde_json::json!({}),
559 dest: state_reg,
560 });
561
562 ops.push(OpCode::UpdateState {
563 state_id: self.state_id,
564 key: key_reg,
565 value: state_reg,
566 });
567
568 ops
569 });
570
571 let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
573
574 let insert_pos = handler_opcodes
578 .iter()
579 .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
580 .or_else(|| {
581 handler_opcodes
582 .iter()
583 .position(|op| matches!(op, OpCode::UpdateState { .. }))
584 });
585
586 if let Some(pos) = insert_pos {
587 for (i, opcode) in hook_opcodes.into_iter().enumerate() {
589 handler_opcodes.insert(pos + i, opcode);
590 }
591 }
592 }
593
594 let non_emitted_fields: HashSet<String> = emit_by_path
595 .into_iter()
596 .filter_map(|(path, emit)| if emit { None } else { Some(path) })
597 .collect();
598
599 EntityBytecode {
600 state_id: self.state_id,
601 handlers,
602 entity_name: self.entity_name.clone(),
603 when_events,
604 non_emitted_fields,
605 computed_fields_evaluator: None,
606 }
607 }
608
609 fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
610 let mut ops = Vec::new();
611 let state_reg = 2;
612 let key_reg = 20;
613
614 ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
615
616 ops.push(OpCode::ReadOrInitState {
617 state_id: self.state_id,
618 key: key_reg,
619 default: serde_json::json!({}),
620 dest: state_reg,
621 });
622
623 ops.extend(self.compile_temporal_index_update(
630 &spec.key_resolution,
631 key_reg,
632 &spec.mappings,
633 ));
634
635 for mapping in &spec.mappings {
636 ops.extend(self.compile_mapping(mapping, state_reg, key_reg));
637 }
638
639 ops.push(OpCode::EvaluateComputedFields {
641 state: state_reg,
642 computed_paths: self.spec.computed_fields.clone(),
643 });
644
645 ops.push(OpCode::UpdateState {
646 state_id: self.state_id,
647 key: key_reg,
648 value: state_reg,
649 });
650
651 if spec.emit {
652 ops.push(OpCode::EmitMutation {
653 entity_name: self.entity_name.clone(),
654 key: key_reg,
655 state: state_reg,
656 });
657 }
658
659 ops
660 }
661
662 fn compile_mapping(
663 &self,
664 mapping: &TypedFieldMapping<S>,
665 state_reg: Register,
666 key_reg: Register,
667 ) -> Vec<OpCode> {
668 let mut ops = Vec::new();
669 let temp_reg = 10;
670
671 ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
672
673 if let Some(transform) = &mapping.transform {
674 ops.push(OpCode::Transform {
675 source: temp_reg,
676 dest: temp_reg,
677 transformation: transform.clone(),
678 });
679 }
680
681 if let Some(when_instruction) = &mapping.when {
682 if !matches!(mapping.population, PopulationStrategy::LastWrite)
683 && !matches!(mapping.population, PopulationStrategy::Merge)
684 {
685 tracing::warn!(
686 "#[map] when ignores population strategy {:?}",
687 mapping.population
688 );
689 }
690 let (condition_field, condition_op, condition_value) = mapping
691 .condition
692 .as_ref()
693 .and_then(|cond| cond.parsed.as_ref())
694 .and_then(|parsed| match parsed {
695 ParsedCondition::Comparison { field, op, value } => {
696 Some((Some(field.clone()), Some(op.clone()), Some(value.clone())))
697 }
698 ParsedCondition::Logical { .. } => {
699 tracing::warn!("Logical conditions not yet supported for #[map] when");
700 None
701 }
702 })
703 .unwrap_or((None, None, None));
704
705 ops.push(OpCode::SetFieldWhen {
706 object: state_reg,
707 path: mapping.target_path.clone(),
708 value: temp_reg,
709 when_instruction: when_instruction.clone(),
710 entity_name: self.entity_name.clone(),
711 key_reg,
712 condition_field,
713 condition_op,
714 condition_value,
715 });
716 return ops;
717 }
718
719 if let Some(condition) = &mapping.condition {
720 if let Some(parsed) = &condition.parsed {
721 match parsed {
722 ParsedCondition::Comparison {
723 field,
724 op,
725 value: cond_value,
726 } => {
727 if matches!(mapping.population, PopulationStrategy::LastWrite)
728 || matches!(mapping.population, PopulationStrategy::Merge)
729 {
730 ops.push(OpCode::ConditionalSetField {
731 object: state_reg,
732 path: mapping.target_path.clone(),
733 value: temp_reg,
734 condition_field: field.clone(),
735 condition_op: op.clone(),
736 condition_value: cond_value.clone(),
737 });
738 return ops;
739 }
740
741 if matches!(mapping.population, PopulationStrategy::Count) {
742 ops.push(OpCode::ConditionalIncrement {
743 object: state_reg,
744 path: mapping.target_path.clone(),
745 condition_field: field.clone(),
746 condition_op: op.clone(),
747 condition_value: cond_value.clone(),
748 });
749 return ops;
750 }
751
752 tracing::warn!(
753 "Conditional #[map] not supported for population strategy {:?}",
754 mapping.population
755 );
756 }
757 ParsedCondition::Logical { .. } => {
758 tracing::warn!("Logical conditions not yet supported for #[map]");
759 }
760 }
761 }
762 }
763
764 match &mapping.population {
765 PopulationStrategy::Append => {
766 ops.push(OpCode::AppendToArray {
767 object: state_reg,
768 path: mapping.target_path.clone(),
769 value: temp_reg,
770 });
771 }
772 PopulationStrategy::LastWrite => {
773 ops.push(OpCode::SetField {
774 object: state_reg,
775 path: mapping.target_path.clone(),
776 value: temp_reg,
777 });
778 }
779 PopulationStrategy::SetOnce => {
780 ops.push(OpCode::SetFieldIfNull {
781 object: state_reg,
782 path: mapping.target_path.clone(),
783 value: temp_reg,
784 });
785 }
786 PopulationStrategy::Merge => {
787 ops.push(OpCode::SetField {
788 object: state_reg,
789 path: mapping.target_path.clone(),
790 value: temp_reg,
791 });
792 }
793 PopulationStrategy::Max => {
794 ops.push(OpCode::SetFieldMax {
795 object: state_reg,
796 path: mapping.target_path.clone(),
797 value: temp_reg,
798 });
799 }
800 PopulationStrategy::Sum => {
801 ops.push(OpCode::SetFieldSum {
802 object: state_reg,
803 path: mapping.target_path.clone(),
804 value: temp_reg,
805 });
806 }
807 PopulationStrategy::Count => {
808 ops.push(OpCode::SetFieldIncrement {
810 object: state_reg,
811 path: mapping.target_path.clone(),
812 });
813 }
814 PopulationStrategy::Min => {
815 ops.push(OpCode::SetFieldMin {
816 object: state_reg,
817 path: mapping.target_path.clone(),
818 value: temp_reg,
819 });
820 }
821 PopulationStrategy::UniqueCount => {
822 let set_name = format!("{}_unique_set", mapping.target_path);
825 ops.push(OpCode::AddToUniqueSet {
826 state_id: self.state_id,
827 set_name,
828 value: temp_reg,
829 count_object: state_reg,
830 count_path: mapping.target_path.clone(),
831 });
832 }
833 }
834
835 ops
836 }
837
838 fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
839 match source {
840 MappingSource::FromSource {
841 path,
842 default,
843 transform,
844 } => {
845 let mut ops = vec![OpCode::LoadEventField {
846 path: path.clone(),
847 dest,
848 default: default.clone(),
849 }];
850
851 if let Some(transform_type) = transform {
853 ops.push(OpCode::Transform {
854 source: dest,
855 dest,
856 transformation: transform_type.clone(),
857 });
858 }
859
860 ops
861 }
862 MappingSource::Constant(val) => {
863 vec![OpCode::LoadConstant {
864 value: val.clone(),
865 dest,
866 }]
867 }
868 MappingSource::AsEvent { fields } => {
869 let mut ops = Vec::new();
870
871 if fields.is_empty() {
872 let event_data_reg = dest + 1;
873 ops.push(OpCode::LoadEventField {
874 path: FieldPath::new(&[]),
875 dest: event_data_reg,
876 default: Some(serde_json::json!({})),
877 });
878 ops.push(OpCode::CreateEvent {
879 dest,
880 event_value: event_data_reg,
881 });
882 } else {
883 let data_obj_reg = dest + 1;
884 ops.push(OpCode::CreateObject { dest: data_obj_reg });
885
886 let mut field_registers = Vec::new();
887 let mut current_reg = dest + 2;
888
889 for field_source in fields.iter() {
890 if let MappingSource::FromSource {
891 path,
892 default,
893 transform,
894 } = &**field_source
895 {
896 ops.push(OpCode::LoadEventField {
897 path: path.clone(),
898 dest: current_reg,
899 default: default.clone(),
900 });
901
902 if let Some(transform_type) = transform {
903 ops.push(OpCode::Transform {
904 source: current_reg,
905 dest: current_reg,
906 transformation: transform_type.clone(),
907 });
908 }
909
910 if let Some(field_name) = path.segments.last() {
911 field_registers.push((field_name.clone(), current_reg));
912 }
913 current_reg += 1;
914 }
915 }
916
917 if !field_registers.is_empty() {
918 ops.push(OpCode::SetFields {
919 object: data_obj_reg,
920 fields: field_registers,
921 });
922 }
923
924 ops.push(OpCode::CreateEvent {
925 dest,
926 event_value: data_obj_reg,
927 });
928 }
929
930 ops
931 }
932 MappingSource::WholeSource => {
933 vec![OpCode::LoadEventField {
934 path: FieldPath::new(&[]),
935 dest,
936 default: Some(serde_json::json!({})),
937 }]
938 }
939 MappingSource::AsCapture { field_transforms } => {
940 let capture_data_reg = 22; let mut ops = vec![OpCode::LoadEventField {
943 path: FieldPath::new(&[]),
944 dest: capture_data_reg,
945 default: Some(serde_json::json!({})),
946 }];
947
948 let field_reg = 24;
952 let transformed_reg = 25;
953
954 for (field_name, transform) in field_transforms {
955 ops.push(OpCode::GetField {
958 object: capture_data_reg,
959 path: field_name.clone(),
960 dest: field_reg,
961 });
962
963 ops.push(OpCode::Transform {
965 source: field_reg,
966 dest: transformed_reg,
967 transformation: transform.clone(),
968 });
969
970 ops.push(OpCode::SetField {
972 object: capture_data_reg,
973 path: field_name.clone(),
974 value: transformed_reg,
975 });
976 }
977
978 ops.push(OpCode::CreateCapture {
980 dest,
981 capture_value: capture_data_reg,
982 });
983
984 ops
985 }
986 MappingSource::FromContext { field } => {
987 vec![OpCode::LoadEventField {
989 path: FieldPath::new(&["__update_context", field.as_str()]),
990 dest,
991 default: Some(serde_json::json!(null)),
992 }]
993 }
994 MappingSource::Computed { .. } => {
995 vec![]
996 }
997 MappingSource::FromState { .. } => {
998 vec![]
999 }
1000 }
1001 }
1002
1003 pub fn compile_key_loading(
1004 &self,
1005 resolution: &KeyResolutionStrategy,
1006 key_reg: Register,
1007 mappings: &[TypedFieldMapping<S>],
1008 ) -> Vec<OpCode> {
1009 let mut ops = Vec::new();
1010
1011 let resolved_key_reg = 19; ops.push(OpCode::LoadEventField {
1015 path: FieldPath::new(&["__resolved_primary_key"]),
1016 dest: resolved_key_reg,
1017 default: Some(serde_json::json!(null)),
1018 });
1019
1020 match resolution {
1022 KeyResolutionStrategy::Embedded { primary_field } => {
1023 ops.push(OpCode::CopyRegister {
1025 source: resolved_key_reg,
1026 dest: key_reg,
1027 });
1028
1029 let effective_primary_field = if primary_field.segments.is_empty() {
1031 if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
1033 auto_field
1034 } else {
1035 primary_field.clone()
1036 }
1037 } else {
1038 primary_field.clone()
1039 };
1040
1041 if !effective_primary_field.segments.is_empty() {
1045 let temp_reg = 18;
1046 let transform_reg = 23; ops.push(OpCode::LoadEventField {
1049 path: effective_primary_field.clone(),
1050 dest: temp_reg,
1051 default: None,
1052 });
1053
1054 let primary_key_transform = self
1057 .find_primary_key_transformation(mappings)
1058 .or_else(|| self.find_inherited_primary_key_transformation());
1059
1060 if let Some(transform) = primary_key_transform {
1061 ops.push(OpCode::Transform {
1063 source: temp_reg,
1064 dest: transform_reg,
1065 transformation: transform,
1066 });
1067 ops.push(OpCode::CopyRegisterIfNull {
1069 source: transform_reg,
1070 dest: key_reg,
1071 });
1072 } else {
1073 ops.push(OpCode::CopyRegisterIfNull {
1075 source: temp_reg,
1076 dest: key_reg,
1077 });
1078 }
1079 }
1080 }
1083 KeyResolutionStrategy::Lookup { primary_field } => {
1084 let lookup_reg = 15;
1085 let result_reg = 17;
1086
1087 ops.push(OpCode::CopyRegister {
1089 source: resolved_key_reg,
1090 dest: lookup_reg,
1091 });
1092
1093 let temp_reg = 18;
1094 ops.push(OpCode::LoadEventField {
1095 path: primary_field.clone(),
1096 dest: temp_reg,
1097 default: None,
1098 });
1099 ops.push(OpCode::CopyRegisterIfNull {
1100 source: temp_reg,
1101 dest: lookup_reg,
1102 });
1103
1104 let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
1105 let effective_index_name =
1106 index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
1107
1108 ops.push(OpCode::LookupIndex {
1109 state_id: self.state_id,
1110 index_name: effective_index_name,
1111 lookup_value: lookup_reg,
1112 dest: result_reg,
1113 });
1114 ops.push(OpCode::CopyRegister {
1123 source: result_reg,
1124 dest: key_reg,
1125 });
1126 }
1127 KeyResolutionStrategy::Computed {
1128 primary_field,
1129 compute_partition: _,
1130 } => {
1131 ops.push(OpCode::CopyRegister {
1133 source: resolved_key_reg,
1134 dest: key_reg,
1135 });
1136 let temp_reg = 18;
1137 ops.push(OpCode::LoadEventField {
1138 path: primary_field.clone(),
1139 dest: temp_reg,
1140 default: None,
1141 });
1142 ops.push(OpCode::CopyRegisterIfNull {
1143 source: temp_reg,
1144 dest: key_reg,
1145 });
1146 }
1147 KeyResolutionStrategy::TemporalLookup {
1148 lookup_field,
1149 timestamp_field,
1150 index_name,
1151 } => {
1152 ops.push(OpCode::CopyRegister {
1154 source: resolved_key_reg,
1155 dest: key_reg,
1156 });
1157 let lookup_reg = 15;
1158 let timestamp_reg = 16;
1159 let result_reg = 17;
1160
1161 ops.push(OpCode::LoadEventField {
1162 path: lookup_field.clone(),
1163 dest: lookup_reg,
1164 default: None,
1165 });
1166
1167 ops.push(OpCode::LoadEventField {
1168 path: timestamp_field.clone(),
1169 dest: timestamp_reg,
1170 default: None,
1171 });
1172
1173 ops.push(OpCode::LookupTemporalIndex {
1174 state_id: self.state_id,
1175 index_name: index_name.clone(),
1176 lookup_value: lookup_reg,
1177 timestamp: timestamp_reg,
1178 dest: result_reg,
1179 });
1180
1181 ops.push(OpCode::CopyRegisterIfNull {
1182 source: result_reg,
1183 dest: key_reg,
1184 });
1185 }
1186 }
1187
1188 ops
1189 }
1190
1191 fn find_primary_key_transformation(
1192 &self,
1193 mappings: &[TypedFieldMapping<S>],
1194 ) -> Option<Transformation> {
1195 let primary_key = self.spec.identity.primary_keys.first()?;
1197 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1198
1199 for mapping in mappings {
1201 if mapping.target_path == *primary_key
1203 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1204 {
1205 if let Some(transform) = &mapping.transform {
1207 return Some(transform.clone());
1208 }
1209
1210 if let MappingSource::FromSource {
1212 transform: Some(transform),
1213 ..
1214 } = &mapping.source
1215 {
1216 return Some(transform.clone());
1217 }
1218 }
1219 }
1220
1221 for mapping in mappings {
1223 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1224 if let Some(transform) = field_transforms.get(&primary_field_name) {
1225 return Some(transform.clone());
1226 }
1227 }
1228 }
1229
1230 None
1231 }
1232
1233 pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1236 let primary_key = self.spec.identity.primary_keys.first()?;
1237
1238 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1240
1241 for handler in &self.spec.handlers {
1243 for mapping in &handler.mappings {
1244 if mapping.target_path == *primary_key
1246 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1247 {
1248 if let MappingSource::FromSource {
1250 path, transform, ..
1251 } = &mapping.source
1252 {
1253 if path.segments.last() == Some(&primary_field_name) {
1254 return mapping.transform.clone().or_else(|| transform.clone());
1256 }
1257 }
1258 }
1259
1260 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1262 if let Some(transform) = field_transforms.get(&primary_field_name) {
1263 return Some(transform.clone());
1264 }
1265 }
1266 }
1267 }
1268
1269 None
1270 }
1271
1272 fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1274 primary_key.split('.').next_back().map(|s| s.to_string())
1276 }
1277
1278 pub fn auto_detect_primary_field(
1281 &self,
1282 current_mappings: &[TypedFieldMapping<S>],
1283 ) -> Option<FieldPath> {
1284 let primary_key = self.spec.identity.primary_keys.first()?;
1285
1286 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1288
1289 if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1291 return Some(FieldPath::new(&[&primary_field_name]));
1292 }
1293
1294 None
1295 }
1296
1297 fn current_account_has_primary_field(
1300 &self,
1301 field_name: &str,
1302 mappings: &[TypedFieldMapping<S>],
1303 ) -> bool {
1304 for mapping in mappings {
1306 if let MappingSource::FromSource { path, .. } = &mapping.source {
1307 if path.segments.last() == Some(&field_name.to_string()) {
1309 return true;
1310 }
1311 }
1312 }
1313
1314 false
1315 }
1316
1317 #[allow(dead_code)]
1319 fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1320 for mapping in mappings {
1321 if let MappingSource::FromSource { path, .. } = &mapping.source {
1322 if path.segments.last() == Some(&field_name.to_string()) {
1323 return true;
1324 }
1325 }
1326 }
1327 false
1328 }
1329
1330 #[allow(dead_code)]
1333 fn field_exists_in_mappings(
1334 &self,
1335 field_name: &str,
1336 mappings: &[TypedFieldMapping<S>],
1337 ) -> bool {
1338 for mapping in mappings {
1340 if let MappingSource::FromSource { path, .. } = &mapping.source {
1341 if path.segments.last() == Some(&field_name.to_string()) {
1342 return true;
1343 }
1344 }
1345 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1347 if field_transforms.contains_key(field_name) {
1348 return true;
1349 }
1350 }
1351 }
1352 false
1353 }
1354
1355 fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1356 if field_path.segments.is_empty() {
1357 return None;
1358 }
1359
1360 let lookup_field_name = field_path.segments.last().unwrap();
1361
1362 for lookup_index in &self.spec.identity.lookup_indexes {
1363 let index_field_name = lookup_index
1364 .field_name
1365 .split('.')
1366 .next_back()
1367 .unwrap_or(&lookup_index.field_name);
1368 if index_field_name == lookup_field_name {
1369 return Some(format!("{}_lookup_index", index_field_name));
1370 }
1371 }
1372
1373 None
1374 }
1375
1376 fn find_lookup_index_for_lookup_field(
1379 &self,
1380 primary_field: &FieldPath,
1381 mappings: &[TypedFieldMapping<S>],
1382 ) -> Option<String> {
1383 let primary_path = primary_field.segments.join(".");
1385
1386 for mapping in mappings {
1388 if let MappingSource::FromSource { path, .. } = &mapping.source {
1390 let source_path = path.segments.join(".");
1391 if source_path == primary_path {
1392 for lookup_index in &self.spec.identity.lookup_indexes {
1394 if mapping.target_path == lookup_index.field_name {
1395 let index_field_name = lookup_index
1396 .field_name
1397 .split('.')
1398 .next_back()
1399 .unwrap_or(&lookup_index.field_name);
1400 return Some(format!("{}_lookup_index", index_field_name));
1401 }
1402 }
1403 }
1404 }
1405 }
1406
1407 self.find_lookup_index_for_field(primary_field)
1409 }
1410
1411 fn find_source_path_for_lookup_index(
1415 &self,
1416 mappings: &[TypedFieldMapping<S>],
1417 lookup_field_name: &str,
1418 ) -> Option<Vec<String>> {
1419 for mapping in mappings {
1420 if mapping.target_path == lookup_field_name {
1421 if let MappingSource::FromSource { path, .. } = &mapping.source {
1422 return Some(path.segments.clone());
1423 }
1424 }
1425 }
1426 None
1427 }
1428
1429 fn compile_temporal_index_update(
1430 &self,
1431 resolution: &KeyResolutionStrategy,
1432 key_reg: Register,
1433 mappings: &[TypedFieldMapping<S>],
1434 ) -> Vec<OpCode> {
1435 let mut ops = Vec::new();
1436
1437 for lookup_index in &self.spec.identity.lookup_indexes {
1438 let lookup_reg = 17;
1439 let source_field = lookup_index
1440 .field_name
1441 .split('.')
1442 .next_back()
1443 .unwrap_or(&lookup_index.field_name);
1444
1445 match resolution {
1446 KeyResolutionStrategy::Embedded { primary_field: _ } => {
1447 let source_path_opt =
1450 self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1451
1452 let load_path = if let Some(ref path) = source_path_opt {
1453 FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1454 } else {
1455 FieldPath::new(&[source_field])
1457 };
1458
1459 ops.push(OpCode::LoadEventField {
1460 path: load_path,
1461 dest: lookup_reg,
1462 default: None,
1463 });
1464
1465 if let Some(temporal_field_name) = &lookup_index.temporal_field {
1466 let timestamp_reg = 18;
1467
1468 ops.push(OpCode::LoadEventField {
1469 path: FieldPath::new(&[temporal_field_name]),
1470 dest: timestamp_reg,
1471 default: None,
1472 });
1473
1474 let index_name = format!("{}_temporal_index", source_field);
1475 ops.push(OpCode::UpdateTemporalIndex {
1476 state_id: self.state_id,
1477 index_name,
1478 lookup_value: lookup_reg,
1479 primary_key: key_reg,
1480 timestamp: timestamp_reg,
1481 });
1482
1483 let simple_index_name = format!("{}_lookup_index", source_field);
1484 ops.push(OpCode::UpdateLookupIndex {
1485 state_id: self.state_id,
1486 index_name: simple_index_name,
1487 lookup_value: lookup_reg,
1488 primary_key: key_reg,
1489 });
1490 } else {
1491 let index_name = format!("{}_lookup_index", source_field);
1492 ops.push(OpCode::UpdateLookupIndex {
1493 state_id: self.state_id,
1494 index_name,
1495 lookup_value: lookup_reg,
1496 primary_key: key_reg,
1497 });
1498 }
1499
1500 if source_path_opt.is_some() {
1504 ops.push(OpCode::UpdatePdaReverseLookup {
1505 state_id: self.state_id,
1506 lookup_name: "default_pda_lookup".to_string(),
1507 pda_address: lookup_reg,
1508 primary_key: key_reg,
1509 });
1510 }
1511 }
1512 KeyResolutionStrategy::Lookup { primary_field } => {
1513 let has_mapping_to_lookup_field = mappings
1516 .iter()
1517 .any(|m| m.target_path == lookup_index.field_name);
1518
1519 if has_mapping_to_lookup_field {
1520 let path_segments: Vec<&str> =
1523 primary_field.segments.iter().map(|s| s.as_str()).collect();
1524 ops.push(OpCode::LoadEventField {
1525 path: FieldPath::new(&path_segments),
1526 dest: lookup_reg,
1527 default: None,
1528 });
1529
1530 let index_name = format!("{}_lookup_index", source_field);
1531 ops.push(OpCode::UpdateLookupIndex {
1532 state_id: self.state_id,
1533 index_name,
1534 lookup_value: lookup_reg,
1535 primary_key: key_reg,
1536 });
1537 }
1538 }
1539 KeyResolutionStrategy::Computed { .. }
1540 | KeyResolutionStrategy::TemporalLookup { .. } => {
1541 }
1543 }
1544 }
1545
1546 ops
1547 }
1548
1549 fn get_event_type(&self, source: &SourceSpec) -> String {
1550 match source {
1551 SourceSpec::Source { type_name, .. } => type_name.clone(),
1552 }
1553 }
1554
1555 fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1556 let mut ops = Vec::new();
1557 let state_reg = 2;
1558
1559 for action in actions {
1560 match action {
1561 HookAction::SetField {
1562 target_field,
1563 source,
1564 condition,
1565 } => {
1566 let _ = condition;
1568
1569 let temp_reg = 11; let load_ops = self.compile_mapping_source(source, temp_reg);
1573 ops.extend(load_ops);
1574
1575 if let MappingSource::FromSource {
1577 transform: Some(transform_type),
1578 ..
1579 } = source
1580 {
1581 ops.push(OpCode::Transform {
1582 source: temp_reg,
1583 dest: temp_reg,
1584 transformation: transform_type.clone(),
1585 });
1586 }
1587
1588 if let Some(cond_expr) = condition {
1590 if let Some(parsed) = &cond_expr.parsed {
1591 let cond_check_ops = self.compile_condition_check(
1593 parsed,
1594 temp_reg,
1595 state_reg,
1596 target_field,
1597 );
1598 ops.extend(cond_check_ops);
1599 } else {
1600 ops.push(OpCode::SetField {
1602 object: state_reg,
1603 path: target_field.clone(),
1604 value: temp_reg,
1605 });
1606 }
1607 } else {
1608 ops.push(OpCode::SetField {
1610 object: state_reg,
1611 path: target_field.clone(),
1612 value: temp_reg,
1613 });
1614 }
1615 }
1616 HookAction::IncrementField {
1617 target_field,
1618 increment_by,
1619 condition,
1620 } => {
1621 if let Some(cond_expr) = condition {
1622 if let Some(parsed) = &cond_expr.parsed {
1623 let cond_check_ops = self.compile_conditional_increment(
1628 parsed,
1629 state_reg,
1630 target_field,
1631 *increment_by,
1632 );
1633 ops.extend(cond_check_ops);
1634 } else {
1635 ops.push(OpCode::SetFieldIncrement {
1637 object: state_reg,
1638 path: target_field.clone(),
1639 });
1640 }
1641 } else {
1642 ops.push(OpCode::SetFieldIncrement {
1644 object: state_reg,
1645 path: target_field.clone(),
1646 });
1647 }
1648 }
1649 HookAction::RegisterPdaMapping { .. } => {
1650 }
1653 }
1654 }
1655
1656 ops
1657 }
1658
1659 fn compile_condition_check(
1660 &self,
1661 condition: &ParsedCondition,
1662 value_reg: Register,
1663 state_reg: Register,
1664 target_field: &str,
1665 ) -> Vec<OpCode> {
1666 match condition {
1667 ParsedCondition::Comparison {
1668 field,
1669 op,
1670 value: cond_value,
1671 } => {
1672 vec![OpCode::ConditionalSetField {
1674 object: state_reg,
1675 path: target_field.to_string(),
1676 value: value_reg,
1677 condition_field: field.clone(),
1678 condition_op: op.clone(),
1679 condition_value: cond_value.clone(),
1680 }]
1681 }
1682 ParsedCondition::Logical { .. } => {
1683 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1685 vec![OpCode::SetField {
1686 object: state_reg,
1687 path: target_field.to_string(),
1688 value: value_reg,
1689 }]
1690 }
1691 }
1692 }
1693
1694 fn compile_conditional_increment(
1695 &self,
1696 condition: &ParsedCondition,
1697 state_reg: Register,
1698 target_field: &str,
1699 _increment_by: i64,
1700 ) -> Vec<OpCode> {
1701 match condition {
1702 ParsedCondition::Comparison {
1703 field,
1704 op,
1705 value: cond_value,
1706 } => {
1707 vec![OpCode::ConditionalIncrement {
1708 object: state_reg,
1709 path: target_field.to_string(),
1710 condition_field: field.clone(),
1711 condition_op: op.clone(),
1712 condition_value: cond_value.clone(),
1713 }]
1714 }
1715 ParsedCondition::Logical { .. } => {
1716 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1717 vec![OpCode::SetFieldIncrement {
1718 object: state_reg,
1719 path: target_field.to_string(),
1720 }]
1721 }
1722 }
1723 }
1724}