1use crate::ast::*;
2use serde_json::Value;
3use std::collections::HashMap;
4use tracing;
5
6pub type Register = usize;
7
8#[derive(Debug, Clone)]
9pub enum OpCode {
10 LoadEventField {
11 path: FieldPath,
12 dest: Register,
13 default: Option<Value>,
14 },
15 LoadConstant {
16 value: Value,
17 dest: Register,
18 },
19 CopyRegister {
20 source: Register,
21 dest: Register,
22 },
23 CopyRegisterIfNull {
25 source: Register,
26 dest: Register,
27 },
28 GetEventType {
29 dest: Register,
30 },
31 CreateObject {
32 dest: Register,
33 },
34 SetField {
35 object: Register,
36 path: String,
37 value: Register,
38 },
39 SetFields {
40 object: Register,
41 fields: Vec<(String, Register)>,
42 },
43 GetField {
44 object: Register,
45 path: String,
46 dest: Register,
47 },
48 ReadOrInitState {
49 state_id: u32,
50 key: Register,
51 default: Value,
52 dest: Register,
53 },
54 UpdateState {
55 state_id: u32,
56 key: Register,
57 value: Register,
58 },
59 AppendToArray {
60 object: Register,
61 path: String,
62 value: Register,
63 },
64 GetCurrentTimestamp {
65 dest: Register,
66 },
67 CreateEvent {
68 dest: Register,
69 event_value: Register,
70 },
71 CreateCapture {
72 dest: Register,
73 capture_value: Register,
74 },
75 Transform {
76 source: Register,
77 dest: Register,
78 transformation: Transformation,
79 },
80 EmitMutation {
81 entity_name: String,
82 key: Register,
83 state: Register,
84 },
85 SetFieldIfNull {
86 object: Register,
87 path: String,
88 value: Register,
89 },
90 SetFieldMax {
91 object: Register,
92 path: String,
93 value: Register,
94 },
95 UpdateTemporalIndex {
96 state_id: u32,
97 index_name: String,
98 lookup_value: Register,
99 primary_key: Register,
100 timestamp: Register,
101 },
102 LookupTemporalIndex {
103 state_id: u32,
104 index_name: String,
105 lookup_value: Register,
106 timestamp: Register,
107 dest: Register,
108 },
109 UpdateLookupIndex {
110 state_id: u32,
111 index_name: String,
112 lookup_value: Register,
113 primary_key: Register,
114 },
115 LookupIndex {
116 state_id: u32,
117 index_name: String,
118 lookup_value: Register,
119 dest: Register,
120 },
121 SetFieldSum {
123 object: Register,
124 path: String,
125 value: Register,
126 },
127 SetFieldIncrement {
129 object: Register,
130 path: String,
131 },
132 SetFieldMin {
134 object: Register,
135 path: String,
136 value: Register,
137 },
138 AddToUniqueSet {
141 state_id: u32,
142 set_name: String,
143 value: Register,
144 count_object: Register,
145 count_path: String,
146 },
147 ConditionalSetField {
149 object: Register,
150 path: String,
151 value: Register,
152 condition_field: FieldPath,
153 condition_op: ComparisonOp,
154 condition_value: Value,
155 },
156 ConditionalIncrement {
158 object: Register,
159 path: String,
160 condition_field: FieldPath,
161 condition_op: ComparisonOp,
162 condition_value: Value,
163 },
164 EvaluateComputedFields {
167 state: Register,
168 computed_paths: Vec<String>,
169 },
170 UpdatePdaReverseLookup {
173 state_id: u32,
174 lookup_name: String,
175 pda_address: Register,
176 primary_key: Register,
177 },
178}
179
180pub struct EntityBytecode {
181 pub state_id: u32,
182 pub handlers: HashMap<String, Vec<OpCode>>,
183 pub entity_name: String,
184 #[allow(clippy::type_complexity)]
186 pub computed_fields_evaluator: Option<
187 Box<
188 dyn Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>> + Send + Sync,
189 >,
190 >,
191}
192
193impl std::fmt::Debug for EntityBytecode {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("EntityBytecode")
196 .field("state_id", &self.state_id)
197 .field("handlers", &self.handlers)
198 .field("entity_name", &self.entity_name)
199 .field(
200 "computed_fields_evaluator",
201 &self.computed_fields_evaluator.is_some(),
202 )
203 .finish()
204 }
205}
206
207#[derive(Debug)]
208pub struct MultiEntityBytecode {
209 pub entities: HashMap<String, EntityBytecode>,
210 pub event_routing: HashMap<String, Vec<String>>,
211 pub proto_router: crate::proto_router::ProtoRouter,
212}
213
214impl MultiEntityBytecode {
215 pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
216 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
217 let entity_bytecode = compiler.compile_entity();
218
219 let mut entities = HashMap::new();
220 let mut event_routing = HashMap::new();
221
222 for event_type in entity_bytecode.handlers.keys() {
223 event_routing
224 .entry(event_type.clone())
225 .or_insert_with(Vec::new)
226 .push(entity_name.clone());
227 }
228
229 entities.insert(entity_name, entity_bytecode);
230
231 MultiEntityBytecode {
232 entities,
233 event_routing,
234 proto_router: crate::proto_router::ProtoRouter::new(),
235 }
236 }
237
238 pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
239 let entities = HashMap::new();
240 let event_routing = HashMap::new();
241
242 if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
243 panic!("from_entities requires type information - use builder pattern instead");
244 }
245
246 MultiEntityBytecode {
247 entities,
248 event_routing,
249 proto_router: crate::proto_router::ProtoRouter::new(),
250 }
251 }
252
253 #[allow(clippy::new_ret_no_self)]
254 pub fn new() -> MultiEntityBytecodeBuilder {
255 MultiEntityBytecodeBuilder {
256 entities: HashMap::new(),
257 event_routing: HashMap::new(),
258 proto_router: crate::proto_router::ProtoRouter::new(),
259 }
260 }
261}
262
263pub struct MultiEntityBytecodeBuilder {
264 entities: HashMap<String, EntityBytecode>,
265 event_routing: HashMap<String, Vec<String>>,
266 proto_router: crate::proto_router::ProtoRouter,
267}
268
269impl MultiEntityBytecodeBuilder {
270 pub fn add_entity<S>(
271 self,
272 entity_name: String,
273 spec: TypedStreamSpec<S>,
274 state_id: u32,
275 ) -> Self {
276 self.add_entity_with_evaluator(
277 entity_name,
278 spec,
279 state_id,
280 None::<fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>>,
281 )
282 }
283
284 pub fn add_entity_with_evaluator<S, F>(
285 mut self,
286 entity_name: String,
287 spec: TypedStreamSpec<S>,
288 state_id: u32,
289 evaluator: Option<F>,
290 ) -> Self
291 where
292 F: Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>
293 + Send
294 + Sync
295 + 'static,
296 {
297 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
298 let mut entity_bytecode = compiler.compile_entity();
299
300 if let Some(eval) = evaluator {
302 entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
303 }
304
305 for event_type in entity_bytecode.handlers.keys() {
306 self.event_routing
307 .entry(event_type.clone())
308 .or_default()
309 .push(entity_name.clone());
310 }
311
312 self.entities.insert(entity_name, entity_bytecode);
313 self
314 }
315
316 pub fn build(self) -> MultiEntityBytecode {
317 MultiEntityBytecode {
318 entities: self.entities,
319 event_routing: self.event_routing,
320 proto_router: self.proto_router,
321 }
322 }
323}
324
325pub struct TypedCompiler<S> {
326 pub spec: TypedStreamSpec<S>,
327 entity_name: String,
328 state_id: u32,
329}
330
331impl<S> TypedCompiler<S> {
332 pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
333 TypedCompiler {
334 spec,
335 entity_name,
336 state_id: 0,
337 }
338 }
339
340 pub fn with_state_id(mut self, state_id: u32) -> Self {
341 self.state_id = state_id;
342 self
343 }
344
345 pub fn compile(&self) -> MultiEntityBytecode {
346 let entity_bytecode = self.compile_entity();
347
348 let mut entities = HashMap::new();
349 let mut event_routing = HashMap::new();
350
351 for event_type in entity_bytecode.handlers.keys() {
352 event_routing
353 .entry(event_type.clone())
354 .or_insert_with(Vec::new)
355 .push(self.entity_name.clone());
356 }
357
358 entities.insert(self.entity_name.clone(), entity_bytecode);
359
360 MultiEntityBytecode {
361 entities,
362 event_routing,
363 proto_router: crate::proto_router::ProtoRouter::new(),
364 }
365 }
366
367 fn compile_entity(&self) -> EntityBytecode {
368 let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
369
370 let mut debug_info = Vec::new();
372 for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
373 let event_type = self.get_event_type(&handler_spec.source);
374 let program_id = match &handler_spec.source {
375 crate::ast::SourceSpec::Source { program_id, .. } => {
376 program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
377 }
378 };
379 debug_info.push(format!(
380 " [{}] EventType={}, Mappings={}, ProgramId={}",
381 index,
382 event_type,
383 handler_spec.mappings.len(),
384 program_id
385 ));
386 }
387
388 for handler_spec in &self.spec.handlers {
398 let opcodes = self.compile_handler(handler_spec);
399 let event_type = self.get_event_type(&handler_spec.source);
400
401 if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
402 let mut existing_setup = Vec::new();
407 let mut existing_mappings = Vec::new();
408 let mut existing_teardown = Vec::new();
409 let mut section = 0; for opcode in existing_opcodes.iter() {
412 match opcode {
413 OpCode::ReadOrInitState { .. } => {
414 existing_setup.push(opcode.clone());
415 section = 1; }
417 OpCode::UpdateState { .. } => {
418 existing_teardown.push(opcode.clone());
419 section = 2; }
421 OpCode::EmitMutation { .. } => {
422 existing_teardown.push(opcode.clone());
423 }
424 _ if section == 0 => existing_setup.push(opcode.clone()),
425 _ if section == 1 => existing_mappings.push(opcode.clone()),
426 _ => existing_teardown.push(opcode.clone()),
427 }
428 }
429
430 let mut new_mappings = Vec::new();
432 section = 0;
433
434 for opcode in opcodes.iter() {
435 match opcode {
436 OpCode::ReadOrInitState { .. } => {
437 section = 1; }
439 OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
440 section = 2; }
442 _ if section == 1 => {
443 new_mappings.push(opcode.clone());
444 }
445 _ => {} }
447 }
448
449 let mut merged = Vec::new();
451 merged.extend(existing_setup);
452 merged.extend(existing_mappings);
453 merged.extend(new_mappings.clone());
454 merged.extend(existing_teardown);
455
456 *existing_opcodes = merged;
457 } else {
458 handlers.insert(event_type, opcodes);
459 }
460 }
461
462 for hook in &self.spec.instruction_hooks {
464 let event_type = hook.instruction_type.clone();
465
466 let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
467 let key_reg = 20;
468 let state_reg = 2;
469 let resolved_key_reg = 19;
470 let temp_reg = 18;
471
472 let mut ops = Vec::new();
473
474 ops.push(OpCode::LoadEventField {
476 path: FieldPath::new(&["__resolved_primary_key"]),
477 dest: resolved_key_reg,
478 default: Some(serde_json::json!(null)),
479 });
480
481 ops.push(OpCode::CopyRegister {
483 source: resolved_key_reg,
484 dest: key_reg,
485 });
486
487 if let Some(lookup_path) = &hook.lookup_by {
489 ops.push(OpCode::LoadEventField {
491 path: lookup_path.clone(),
492 dest: temp_reg,
493 default: None,
494 });
495
496 ops.push(OpCode::Transform {
498 source: temp_reg,
499 dest: temp_reg,
500 transformation: Transformation::HexEncode,
501 });
502
503 ops.push(OpCode::CopyRegisterIfNull {
505 source: temp_reg,
506 dest: key_reg,
507 });
508 }
509
510 ops.push(OpCode::ReadOrInitState {
511 state_id: self.state_id,
512 key: key_reg,
513 default: serde_json::json!({}),
514 dest: state_reg,
515 });
516
517 ops.push(OpCode::UpdateState {
518 state_id: self.state_id,
519 key: key_reg,
520 value: state_reg,
521 });
522
523 ops
524 });
525
526 let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
528
529 let insert_pos = handler_opcodes
533 .iter()
534 .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
535 .or_else(|| {
536 handler_opcodes
537 .iter()
538 .position(|op| matches!(op, OpCode::UpdateState { .. }))
539 });
540
541 if let Some(pos) = insert_pos {
542 for (i, opcode) in hook_opcodes.into_iter().enumerate() {
544 handler_opcodes.insert(pos + i, opcode);
545 }
546 }
547 }
548
549 EntityBytecode {
550 state_id: self.state_id,
551 handlers,
552 entity_name: self.entity_name.clone(),
553 computed_fields_evaluator: None,
554 }
555 }
556
557 fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
558 let mut ops = Vec::new();
559 let state_reg = 2;
560 let key_reg = 20;
561
562 ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
563
564 ops.extend(self.compile_temporal_index_update(
565 &spec.key_resolution,
566 key_reg,
567 &spec.mappings,
568 ));
569
570 ops.push(OpCode::ReadOrInitState {
571 state_id: self.state_id,
572 key: key_reg,
573 default: serde_json::json!({}),
574 dest: state_reg,
575 });
576
577 for mapping in &spec.mappings {
578 ops.extend(self.compile_mapping(mapping, state_reg));
579 }
580
581 ops.push(OpCode::EvaluateComputedFields {
583 state: state_reg,
584 computed_paths: self.spec.computed_fields.clone(),
585 });
586
587 ops.push(OpCode::UpdateState {
588 state_id: self.state_id,
589 key: key_reg,
590 value: state_reg,
591 });
592
593 if spec.emit {
594 ops.push(OpCode::EmitMutation {
595 entity_name: self.entity_name.clone(),
596 key: key_reg,
597 state: state_reg,
598 });
599 }
600
601 ops
602 }
603
604 fn compile_mapping(&self, mapping: &TypedFieldMapping<S>, state_reg: Register) -> Vec<OpCode> {
605 let mut ops = Vec::new();
606 let temp_reg = 10;
607
608 ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
609
610 if let Some(transform) = &mapping.transform {
611 ops.push(OpCode::Transform {
612 source: temp_reg,
613 dest: temp_reg,
614 transformation: transform.clone(),
615 });
616 }
617
618 match &mapping.population {
619 PopulationStrategy::Append => {
620 ops.push(OpCode::AppendToArray {
621 object: state_reg,
622 path: mapping.target_path.clone(),
623 value: temp_reg,
624 });
625 }
626 PopulationStrategy::LastWrite => {
627 ops.push(OpCode::SetField {
628 object: state_reg,
629 path: mapping.target_path.clone(),
630 value: temp_reg,
631 });
632 }
633 PopulationStrategy::SetOnce => {
634 ops.push(OpCode::SetFieldIfNull {
635 object: state_reg,
636 path: mapping.target_path.clone(),
637 value: temp_reg,
638 });
639 }
640 PopulationStrategy::Merge => {
641 ops.push(OpCode::SetField {
642 object: state_reg,
643 path: mapping.target_path.clone(),
644 value: temp_reg,
645 });
646 }
647 PopulationStrategy::Max => {
648 ops.push(OpCode::SetFieldMax {
649 object: state_reg,
650 path: mapping.target_path.clone(),
651 value: temp_reg,
652 });
653 }
654 PopulationStrategy::Sum => {
655 ops.push(OpCode::SetFieldSum {
656 object: state_reg,
657 path: mapping.target_path.clone(),
658 value: temp_reg,
659 });
660 }
661 PopulationStrategy::Count => {
662 ops.push(OpCode::SetFieldIncrement {
664 object: state_reg,
665 path: mapping.target_path.clone(),
666 });
667 }
668 PopulationStrategy::Min => {
669 ops.push(OpCode::SetFieldMin {
670 object: state_reg,
671 path: mapping.target_path.clone(),
672 value: temp_reg,
673 });
674 }
675 PopulationStrategy::UniqueCount => {
676 let set_name = format!("{}_unique_set", mapping.target_path);
679 ops.push(OpCode::AddToUniqueSet {
680 state_id: self.state_id,
681 set_name,
682 value: temp_reg,
683 count_object: state_reg,
684 count_path: mapping.target_path.clone(),
685 });
686 }
687 }
688
689 ops
690 }
691
692 fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
693 match source {
694 MappingSource::FromSource {
695 path,
696 default,
697 transform,
698 } => {
699 let mut ops = vec![OpCode::LoadEventField {
700 path: path.clone(),
701 dest,
702 default: default.clone(),
703 }];
704
705 if let Some(transform_type) = transform {
707 ops.push(OpCode::Transform {
708 source: dest,
709 dest,
710 transformation: transform_type.clone(),
711 });
712 }
713
714 ops
715 }
716 MappingSource::Constant(val) => {
717 vec![OpCode::LoadConstant {
718 value: val.clone(),
719 dest,
720 }]
721 }
722 MappingSource::AsEvent { fields } => {
723 let mut ops = Vec::new();
724
725 if fields.is_empty() {
726 let event_data_reg = dest + 1;
727 ops.push(OpCode::LoadEventField {
728 path: FieldPath::new(&[]),
729 dest: event_data_reg,
730 default: Some(serde_json::json!({})),
731 });
732 ops.push(OpCode::CreateEvent {
733 dest,
734 event_value: event_data_reg,
735 });
736 } else {
737 let data_obj_reg = dest + 1;
738 ops.push(OpCode::CreateObject { dest: data_obj_reg });
739
740 let mut field_registers = Vec::new();
741 let mut current_reg = dest + 2;
742
743 for field_source in fields.iter() {
744 if let MappingSource::FromSource {
745 path,
746 default,
747 transform,
748 } = &**field_source
749 {
750 ops.push(OpCode::LoadEventField {
751 path: path.clone(),
752 dest: current_reg,
753 default: default.clone(),
754 });
755
756 if let Some(transform_type) = transform {
757 ops.push(OpCode::Transform {
758 source: current_reg,
759 dest: current_reg,
760 transformation: transform_type.clone(),
761 });
762 }
763
764 if let Some(field_name) = path.segments.last() {
765 field_registers.push((field_name.clone(), current_reg));
766 }
767 current_reg += 1;
768 }
769 }
770
771 if !field_registers.is_empty() {
772 ops.push(OpCode::SetFields {
773 object: data_obj_reg,
774 fields: field_registers,
775 });
776 }
777
778 ops.push(OpCode::CreateEvent {
779 dest,
780 event_value: data_obj_reg,
781 });
782 }
783
784 ops
785 }
786 MappingSource::WholeSource => {
787 vec![OpCode::LoadEventField {
788 path: FieldPath::new(&[]),
789 dest,
790 default: Some(serde_json::json!({})),
791 }]
792 }
793 MappingSource::AsCapture { field_transforms } => {
794 let capture_data_reg = 22; let mut ops = vec![OpCode::LoadEventField {
797 path: FieldPath::new(&[]),
798 dest: capture_data_reg,
799 default: Some(serde_json::json!({})),
800 }];
801
802 let field_reg = 24;
806 let transformed_reg = 25;
807
808 for (field_name, transform) in field_transforms {
809 ops.push(OpCode::GetField {
812 object: capture_data_reg,
813 path: field_name.clone(),
814 dest: field_reg,
815 });
816
817 ops.push(OpCode::Transform {
819 source: field_reg,
820 dest: transformed_reg,
821 transformation: transform.clone(),
822 });
823
824 ops.push(OpCode::SetField {
826 object: capture_data_reg,
827 path: field_name.clone(),
828 value: transformed_reg,
829 });
830 }
831
832 ops.push(OpCode::CreateCapture {
834 dest,
835 capture_value: capture_data_reg,
836 });
837
838 ops
839 }
840 MappingSource::FromContext { field } => {
841 vec![OpCode::LoadEventField {
843 path: FieldPath::new(&["__update_context", field.as_str()]),
844 dest,
845 default: Some(serde_json::json!(null)),
846 }]
847 }
848 MappingSource::Computed { .. } => {
849 vec![]
850 }
851 MappingSource::FromState { .. } => {
852 vec![]
853 }
854 }
855 }
856
857 pub fn compile_key_loading(
858 &self,
859 resolution: &KeyResolutionStrategy,
860 key_reg: Register,
861 mappings: &[TypedFieldMapping<S>],
862 ) -> Vec<OpCode> {
863 let mut ops = Vec::new();
864
865 let resolved_key_reg = 19; ops.push(OpCode::LoadEventField {
869 path: FieldPath::new(&["__resolved_primary_key"]),
870 dest: resolved_key_reg,
871 default: Some(serde_json::json!(null)),
872 });
873
874 ops.push(OpCode::CopyRegister {
876 source: resolved_key_reg,
877 dest: key_reg,
878 });
879
880 match resolution {
882 KeyResolutionStrategy::Embedded { primary_field } => {
883 let effective_primary_field = if primary_field.segments.is_empty() {
885 if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
887 auto_field
888 } else {
889 primary_field.clone()
890 }
891 } else {
892 primary_field.clone()
893 };
894
895 if !effective_primary_field.segments.is_empty() {
899 let temp_reg = 18;
900 let transform_reg = 23; ops.push(OpCode::LoadEventField {
903 path: effective_primary_field.clone(),
904 dest: temp_reg,
905 default: None,
906 });
907
908 let primary_key_transform = self
911 .find_primary_key_transformation(mappings)
912 .or_else(|| self.find_inherited_primary_key_transformation());
913
914 if let Some(transform) = primary_key_transform {
915 ops.push(OpCode::Transform {
917 source: temp_reg,
918 dest: transform_reg,
919 transformation: transform,
920 });
921 ops.push(OpCode::CopyRegisterIfNull {
923 source: transform_reg,
924 dest: key_reg,
925 });
926 } else {
927 ops.push(OpCode::CopyRegisterIfNull {
929 source: temp_reg,
930 dest: key_reg,
931 });
932 }
933 }
934 }
937 KeyResolutionStrategy::Lookup { primary_field } => {
938 let lookup_reg = 15;
939 let result_reg = 17;
940
941 ops.push(OpCode::LoadEventField {
942 path: primary_field.clone(),
943 dest: lookup_reg,
944 default: None,
945 });
946
947 let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
948 let effective_index_name =
949 index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
950
951 ops.push(OpCode::LookupIndex {
952 state_id: self.state_id,
953 index_name: effective_index_name,
954 lookup_value: lookup_reg,
955 dest: result_reg,
956 });
957 ops.push(OpCode::CopyRegisterIfNull {
966 source: result_reg,
967 dest: key_reg,
968 });
969 }
970 KeyResolutionStrategy::Computed {
971 primary_field,
972 compute_partition: _,
973 } => {
974 let temp_reg = 18;
975 ops.push(OpCode::LoadEventField {
976 path: primary_field.clone(),
977 dest: temp_reg,
978 default: None,
979 });
980 ops.push(OpCode::CopyRegisterIfNull {
981 source: temp_reg,
982 dest: key_reg,
983 });
984 }
985 KeyResolutionStrategy::TemporalLookup {
986 lookup_field,
987 timestamp_field,
988 index_name,
989 } => {
990 let lookup_reg = 15;
991 let timestamp_reg = 16;
992 let result_reg = 17;
993
994 ops.push(OpCode::LoadEventField {
995 path: lookup_field.clone(),
996 dest: lookup_reg,
997 default: None,
998 });
999
1000 ops.push(OpCode::LoadEventField {
1001 path: timestamp_field.clone(),
1002 dest: timestamp_reg,
1003 default: None,
1004 });
1005
1006 ops.push(OpCode::LookupTemporalIndex {
1007 state_id: self.state_id,
1008 index_name: index_name.clone(),
1009 lookup_value: lookup_reg,
1010 timestamp: timestamp_reg,
1011 dest: result_reg,
1012 });
1013
1014 ops.push(OpCode::CopyRegisterIfNull {
1015 source: result_reg,
1016 dest: key_reg,
1017 });
1018 }
1019 }
1020
1021 ops
1022 }
1023
1024 fn find_primary_key_transformation(
1025 &self,
1026 mappings: &[TypedFieldMapping<S>],
1027 ) -> Option<Transformation> {
1028 let primary_key = self.spec.identity.primary_keys.first()?;
1030 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1031
1032 for mapping in mappings {
1034 if mapping.target_path == *primary_key
1036 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1037 {
1038 if let Some(transform) = &mapping.transform {
1040 return Some(transform.clone());
1041 }
1042
1043 if let MappingSource::FromSource {
1045 transform: Some(transform),
1046 ..
1047 } = &mapping.source
1048 {
1049 return Some(transform.clone());
1050 }
1051 }
1052 }
1053
1054 for mapping in mappings {
1056 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1057 if let Some(transform) = field_transforms.get(&primary_field_name) {
1058 return Some(transform.clone());
1059 }
1060 }
1061 }
1062
1063 None
1064 }
1065
1066 pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1069 let primary_key = self.spec.identity.primary_keys.first()?;
1070
1071 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1073
1074 for handler in &self.spec.handlers {
1076 for mapping in &handler.mappings {
1077 if mapping.target_path == *primary_key
1079 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1080 {
1081 if let MappingSource::FromSource {
1083 path, transform, ..
1084 } = &mapping.source
1085 {
1086 if path.segments.last() == Some(&primary_field_name) {
1087 return mapping.transform.clone().or_else(|| transform.clone());
1089 }
1090 }
1091 }
1092
1093 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1095 if let Some(transform) = field_transforms.get(&primary_field_name) {
1096 return Some(transform.clone());
1097 }
1098 }
1099 }
1100 }
1101
1102 None
1103 }
1104
1105 fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1107 primary_key.split('.').next_back().map(|s| s.to_string())
1109 }
1110
1111 pub fn auto_detect_primary_field(
1114 &self,
1115 current_mappings: &[TypedFieldMapping<S>],
1116 ) -> Option<FieldPath> {
1117 let primary_key = self.spec.identity.primary_keys.first()?;
1118
1119 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1121
1122 if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1124 return Some(FieldPath::new(&[&primary_field_name]));
1125 }
1126
1127 None
1128 }
1129
1130 fn current_account_has_primary_field(
1133 &self,
1134 field_name: &str,
1135 mappings: &[TypedFieldMapping<S>],
1136 ) -> bool {
1137 for mapping in mappings {
1139 if let MappingSource::FromSource { path, .. } = &mapping.source {
1140 if path.segments.last() == Some(&field_name.to_string()) {
1142 return true;
1143 }
1144 }
1145 }
1146
1147 false
1148 }
1149
1150 #[allow(dead_code)]
1152 fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1153 for mapping in mappings {
1154 if let MappingSource::FromSource { path, .. } = &mapping.source {
1155 if path.segments.last() == Some(&field_name.to_string()) {
1156 return true;
1157 }
1158 }
1159 }
1160 false
1161 }
1162
1163 #[allow(dead_code)]
1166 fn field_exists_in_mappings(
1167 &self,
1168 field_name: &str,
1169 mappings: &[TypedFieldMapping<S>],
1170 ) -> bool {
1171 for mapping in mappings {
1173 if let MappingSource::FromSource { path, .. } = &mapping.source {
1174 if path.segments.last() == Some(&field_name.to_string()) {
1175 return true;
1176 }
1177 }
1178 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1180 if field_transforms.contains_key(field_name) {
1181 return true;
1182 }
1183 }
1184 }
1185 false
1186 }
1187
1188 fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1189 if field_path.segments.is_empty() {
1190 return None;
1191 }
1192
1193 let lookup_field_name = field_path.segments.last().unwrap();
1194
1195 for lookup_index in &self.spec.identity.lookup_indexes {
1196 let index_field_name = lookup_index
1197 .field_name
1198 .split('.')
1199 .next_back()
1200 .unwrap_or(&lookup_index.field_name);
1201 if index_field_name == lookup_field_name {
1202 return Some(format!("{}_lookup_index", index_field_name));
1203 }
1204 }
1205
1206 None
1207 }
1208
1209 fn find_lookup_index_for_lookup_field(
1212 &self,
1213 primary_field: &FieldPath,
1214 mappings: &[TypedFieldMapping<S>],
1215 ) -> Option<String> {
1216 let primary_path = primary_field.segments.join(".");
1218
1219 for mapping in mappings {
1221 if let MappingSource::FromSource { path, .. } = &mapping.source {
1223 let source_path = path.segments.join(".");
1224 if source_path == primary_path {
1225 for lookup_index in &self.spec.identity.lookup_indexes {
1227 if mapping.target_path == lookup_index.field_name {
1228 let index_field_name = lookup_index
1229 .field_name
1230 .split('.')
1231 .next_back()
1232 .unwrap_or(&lookup_index.field_name);
1233 return Some(format!("{}_lookup_index", index_field_name));
1234 }
1235 }
1236 }
1237 }
1238 }
1239
1240 self.find_lookup_index_for_field(primary_field)
1242 }
1243
1244 fn find_source_path_for_lookup_index(
1248 &self,
1249 mappings: &[TypedFieldMapping<S>],
1250 lookup_field_name: &str,
1251 ) -> Option<Vec<String>> {
1252 for mapping in mappings {
1253 if mapping.target_path == lookup_field_name {
1254 if let MappingSource::FromSource { path, .. } = &mapping.source {
1255 return Some(path.segments.clone());
1256 }
1257 }
1258 }
1259 None
1260 }
1261
1262 fn compile_temporal_index_update(
1263 &self,
1264 resolution: &KeyResolutionStrategy,
1265 key_reg: Register,
1266 mappings: &[TypedFieldMapping<S>],
1267 ) -> Vec<OpCode> {
1268 let mut ops = Vec::new();
1269
1270 for lookup_index in &self.spec.identity.lookup_indexes {
1271 let lookup_reg = 17;
1272 let source_field = lookup_index
1273 .field_name
1274 .split('.')
1275 .next_back()
1276 .unwrap_or(&lookup_index.field_name);
1277
1278 match resolution {
1279 KeyResolutionStrategy::Embedded { primary_field: _ } => {
1280 let source_path_opt =
1283 self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1284
1285 let load_path = if let Some(ref path) = source_path_opt {
1286 FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1287 } else {
1288 FieldPath::new(&[source_field])
1290 };
1291
1292 ops.push(OpCode::LoadEventField {
1293 path: load_path,
1294 dest: lookup_reg,
1295 default: None,
1296 });
1297
1298 if let Some(temporal_field_name) = &lookup_index.temporal_field {
1299 let timestamp_reg = 18;
1300
1301 ops.push(OpCode::LoadEventField {
1302 path: FieldPath::new(&[temporal_field_name]),
1303 dest: timestamp_reg,
1304 default: None,
1305 });
1306
1307 let index_name = format!("{}_temporal_index", source_field);
1308 ops.push(OpCode::UpdateTemporalIndex {
1309 state_id: self.state_id,
1310 index_name,
1311 lookup_value: lookup_reg,
1312 primary_key: key_reg,
1313 timestamp: timestamp_reg,
1314 });
1315
1316 let simple_index_name = format!("{}_lookup_index", source_field);
1317 ops.push(OpCode::UpdateLookupIndex {
1318 state_id: self.state_id,
1319 index_name: simple_index_name,
1320 lookup_value: lookup_reg,
1321 primary_key: key_reg,
1322 });
1323 } else {
1324 let index_name = format!("{}_lookup_index", source_field);
1325 ops.push(OpCode::UpdateLookupIndex {
1326 state_id: self.state_id,
1327 index_name,
1328 lookup_value: lookup_reg,
1329 primary_key: key_reg,
1330 });
1331 }
1332
1333 if source_path_opt.is_some() {
1337 ops.push(OpCode::UpdatePdaReverseLookup {
1338 state_id: self.state_id,
1339 lookup_name: "default_pda_lookup".to_string(),
1340 pda_address: lookup_reg,
1341 primary_key: key_reg,
1342 });
1343 }
1344 }
1345 KeyResolutionStrategy::Lookup { primary_field } => {
1346 let has_mapping_to_lookup_field = mappings
1349 .iter()
1350 .any(|m| m.target_path == lookup_index.field_name);
1351
1352 if has_mapping_to_lookup_field {
1353 let path_segments: Vec<&str> =
1356 primary_field.segments.iter().map(|s| s.as_str()).collect();
1357 ops.push(OpCode::LoadEventField {
1358 path: FieldPath::new(&path_segments),
1359 dest: lookup_reg,
1360 default: None,
1361 });
1362
1363 let index_name = format!("{}_lookup_index", source_field);
1364 ops.push(OpCode::UpdateLookupIndex {
1365 state_id: self.state_id,
1366 index_name,
1367 lookup_value: lookup_reg,
1368 primary_key: key_reg,
1369 });
1370 }
1371 }
1372 KeyResolutionStrategy::Computed { .. }
1373 | KeyResolutionStrategy::TemporalLookup { .. } => {
1374 }
1376 }
1377 }
1378
1379 ops
1380 }
1381
1382 fn get_event_type(&self, source: &SourceSpec) -> String {
1383 match source {
1384 SourceSpec::Source { type_name, .. } => type_name.clone(),
1385 }
1386 }
1387
1388 fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1389 let mut ops = Vec::new();
1390 let state_reg = 2;
1391
1392 for action in actions {
1393 match action {
1394 HookAction::SetField {
1395 target_field,
1396 source,
1397 condition,
1398 } => {
1399 let _ = condition;
1401
1402 let temp_reg = 11; let load_ops = self.compile_mapping_source(source, temp_reg);
1406 ops.extend(load_ops);
1407
1408 if let MappingSource::FromSource {
1410 transform: Some(transform_type),
1411 ..
1412 } = source
1413 {
1414 ops.push(OpCode::Transform {
1415 source: temp_reg,
1416 dest: temp_reg,
1417 transformation: transform_type.clone(),
1418 });
1419 }
1420
1421 if let Some(cond_expr) = condition {
1423 if let Some(parsed) = &cond_expr.parsed {
1424 let cond_check_ops = self.compile_condition_check(
1426 parsed,
1427 temp_reg,
1428 state_reg,
1429 target_field,
1430 );
1431 ops.extend(cond_check_ops);
1432 } else {
1433 ops.push(OpCode::SetField {
1435 object: state_reg,
1436 path: target_field.clone(),
1437 value: temp_reg,
1438 });
1439 }
1440 } else {
1441 ops.push(OpCode::SetField {
1443 object: state_reg,
1444 path: target_field.clone(),
1445 value: temp_reg,
1446 });
1447 }
1448 }
1449 HookAction::IncrementField {
1450 target_field,
1451 increment_by,
1452 condition,
1453 } => {
1454 if let Some(cond_expr) = condition {
1455 if let Some(parsed) = &cond_expr.parsed {
1456 let cond_check_ops = self.compile_conditional_increment(
1461 parsed,
1462 state_reg,
1463 target_field,
1464 *increment_by,
1465 );
1466 ops.extend(cond_check_ops);
1467 } else {
1468 ops.push(OpCode::SetFieldIncrement {
1470 object: state_reg,
1471 path: target_field.clone(),
1472 });
1473 }
1474 } else {
1475 ops.push(OpCode::SetFieldIncrement {
1477 object: state_reg,
1478 path: target_field.clone(),
1479 });
1480 }
1481 }
1482 HookAction::RegisterPdaMapping { .. } => {
1483 }
1486 }
1487 }
1488
1489 ops
1490 }
1491
1492 fn compile_condition_check(
1493 &self,
1494 condition: &ParsedCondition,
1495 value_reg: Register,
1496 state_reg: Register,
1497 target_field: &str,
1498 ) -> Vec<OpCode> {
1499 match condition {
1500 ParsedCondition::Comparison {
1501 field,
1502 op,
1503 value: cond_value,
1504 } => {
1505 vec![OpCode::ConditionalSetField {
1507 object: state_reg,
1508 path: target_field.to_string(),
1509 value: value_reg,
1510 condition_field: field.clone(),
1511 condition_op: op.clone(),
1512 condition_value: cond_value.clone(),
1513 }]
1514 }
1515 ParsedCondition::Logical { .. } => {
1516 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1518 vec![OpCode::SetField {
1519 object: state_reg,
1520 path: target_field.to_string(),
1521 value: value_reg,
1522 }]
1523 }
1524 }
1525 }
1526
1527 fn compile_conditional_increment(
1528 &self,
1529 condition: &ParsedCondition,
1530 state_reg: Register,
1531 target_field: &str,
1532 _increment_by: i64,
1533 ) -> Vec<OpCode> {
1534 match condition {
1535 ParsedCondition::Comparison {
1536 field,
1537 op,
1538 value: cond_value,
1539 } => {
1540 vec![OpCode::ConditionalIncrement {
1541 object: state_reg,
1542 path: target_field.to_string(),
1543 condition_field: field.clone(),
1544 condition_op: op.clone(),
1545 condition_value: cond_value.clone(),
1546 }]
1547 }
1548 ParsedCondition::Logical { .. } => {
1549 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1550 vec![OpCode::SetFieldIncrement {
1551 object: state_reg,
1552 path: target_field.to_string(),
1553 }]
1554 }
1555 }
1556 }
1557}