1use crate::ast::*;
2use serde_json::Value;
3use std::collections::HashMap;
4use tracing;
5
6pub type Register = usize;
7
8#[derive(Debug, Clone)]
9pub enum OpCode {
10 LoadEventField {
11 path: FieldPath,
12 dest: Register,
13 default: Option<Value>,
14 },
15 LoadConstant {
16 value: Value,
17 dest: Register,
18 },
19 CopyRegister {
20 source: Register,
21 dest: Register,
22 },
23 CopyRegisterIfNull {
25 source: Register,
26 dest: Register,
27 },
28 GetEventType {
29 dest: Register,
30 },
31 CreateObject {
32 dest: Register,
33 },
34 SetField {
35 object: Register,
36 path: String,
37 value: Register,
38 },
39 SetFields {
40 object: Register,
41 fields: Vec<(String, Register)>,
42 },
43 GetField {
44 object: Register,
45 path: String,
46 dest: Register,
47 },
48 ReadOrInitState {
49 state_id: u32,
50 key: Register,
51 default: Value,
52 dest: Register,
53 },
54 UpdateState {
55 state_id: u32,
56 key: Register,
57 value: Register,
58 },
59 AppendToArray {
60 object: Register,
61 path: String,
62 value: Register,
63 },
64 GetCurrentTimestamp {
65 dest: Register,
66 },
67 CreateEvent {
68 dest: Register,
69 event_value: Register,
70 },
71 CreateCapture {
72 dest: Register,
73 capture_value: Register,
74 },
75 Transform {
76 source: Register,
77 dest: Register,
78 transformation: Transformation,
79 },
80 EmitMutation {
81 entity_name: String,
82 key: Register,
83 state: Register,
84 },
85 SetFieldIfNull {
86 object: Register,
87 path: String,
88 value: Register,
89 },
90 SetFieldMax {
91 object: Register,
92 path: String,
93 value: Register,
94 },
95 UpdateTemporalIndex {
96 state_id: u32,
97 index_name: String,
98 lookup_value: Register,
99 primary_key: Register,
100 timestamp: Register,
101 },
102 LookupTemporalIndex {
103 state_id: u32,
104 index_name: String,
105 lookup_value: Register,
106 timestamp: Register,
107 dest: Register,
108 },
109 UpdateLookupIndex {
110 state_id: u32,
111 index_name: String,
112 lookup_value: Register,
113 primary_key: Register,
114 },
115 LookupIndex {
116 state_id: u32,
117 index_name: String,
118 lookup_value: Register,
119 dest: Register,
120 },
121 SetFieldSum {
123 object: Register,
124 path: String,
125 value: Register,
126 },
127 SetFieldIncrement {
129 object: Register,
130 path: String,
131 },
132 SetFieldMin {
134 object: Register,
135 path: String,
136 value: Register,
137 },
138 AddToUniqueSet {
141 state_id: u32,
142 set_name: String,
143 value: Register,
144 count_object: Register,
145 count_path: String,
146 },
147 ConditionalSetField {
149 object: Register,
150 path: String,
151 value: Register,
152 condition_field: FieldPath,
153 condition_op: ComparisonOp,
154 condition_value: Value,
155 },
156 ConditionalIncrement {
158 object: Register,
159 path: String,
160 condition_field: FieldPath,
161 condition_op: ComparisonOp,
162 condition_value: Value,
163 },
164 EvaluateComputedFields {
167 state: Register,
168 computed_paths: Vec<String>,
169 },
170 UpdatePdaReverseLookup {
173 state_id: u32,
174 lookup_name: String,
175 pda_address: Register,
176 primary_key: Register,
177 },
178}
179
180pub struct EntityBytecode {
181 pub state_id: u32,
182 pub handlers: HashMap<String, Vec<OpCode>>,
183 pub entity_name: String,
184 #[allow(clippy::type_complexity)]
186 pub computed_fields_evaluator: Option<
187 Box<
188 dyn Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>> + Send + Sync,
189 >,
190 >,
191}
192
193impl std::fmt::Debug for EntityBytecode {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("EntityBytecode")
196 .field("state_id", &self.state_id)
197 .field("handlers", &self.handlers)
198 .field("entity_name", &self.entity_name)
199 .field(
200 "computed_fields_evaluator",
201 &self.computed_fields_evaluator.is_some(),
202 )
203 .finish()
204 }
205}
206
207#[derive(Debug)]
208pub struct MultiEntityBytecode {
209 pub entities: HashMap<String, EntityBytecode>,
210 pub event_routing: HashMap<String, Vec<String>>,
211 pub proto_router: crate::proto_router::ProtoRouter,
212}
213
214impl MultiEntityBytecode {
215 pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
216 tracing::info!(
217 "🔨 Compiling entity {} with {} handlers",
218 entity_name,
219 spec.handlers.len()
220 );
221 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
222 let entity_bytecode = compiler.compile_entity();
223 tracing::info!(
224 "🔨 Compiled {} handlers for {}",
225 entity_bytecode.handlers.len(),
226 entity_name
227 );
228
229 let mut entities = HashMap::new();
230 let mut event_routing = HashMap::new();
231
232 for event_type in entity_bytecode.handlers.keys() {
233 event_routing
234 .entry(event_type.clone())
235 .or_insert_with(Vec::new)
236 .push(entity_name.clone());
237 }
238
239 entities.insert(entity_name, entity_bytecode);
240
241 MultiEntityBytecode {
242 entities,
243 event_routing,
244 proto_router: crate::proto_router::ProtoRouter::new(),
245 }
246 }
247
248 pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
249 let entities = HashMap::new();
250 let event_routing = HashMap::new();
251
252 if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
253 panic!("from_entities requires type information - use builder pattern instead");
254 }
255
256 MultiEntityBytecode {
257 entities,
258 event_routing,
259 proto_router: crate::proto_router::ProtoRouter::new(),
260 }
261 }
262
263 #[allow(clippy::new_ret_no_self)]
264 pub fn new() -> MultiEntityBytecodeBuilder {
265 MultiEntityBytecodeBuilder {
266 entities: HashMap::new(),
267 event_routing: HashMap::new(),
268 proto_router: crate::proto_router::ProtoRouter::new(),
269 }
270 }
271}
272
273pub struct MultiEntityBytecodeBuilder {
274 entities: HashMap<String, EntityBytecode>,
275 event_routing: HashMap<String, Vec<String>>,
276 proto_router: crate::proto_router::ProtoRouter,
277}
278
279impl MultiEntityBytecodeBuilder {
280 pub fn add_entity<S>(
281 self,
282 entity_name: String,
283 spec: TypedStreamSpec<S>,
284 state_id: u32,
285 ) -> Self {
286 self.add_entity_with_evaluator(
287 entity_name,
288 spec,
289 state_id,
290 None::<fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>>,
291 )
292 }
293
294 pub fn add_entity_with_evaluator<S, F>(
295 mut self,
296 entity_name: String,
297 spec: TypedStreamSpec<S>,
298 state_id: u32,
299 evaluator: Option<F>,
300 ) -> Self
301 where
302 F: Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>
303 + Send
304 + Sync
305 + 'static,
306 {
307 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
308 let mut entity_bytecode = compiler.compile_entity();
309
310 if let Some(eval) = evaluator {
312 entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
313 }
314
315 for event_type in entity_bytecode.handlers.keys() {
316 self.event_routing
317 .entry(event_type.clone())
318 .or_default()
319 .push(entity_name.clone());
320 }
321
322 self.entities.insert(entity_name, entity_bytecode);
323 self
324 }
325
326 pub fn build(self) -> MultiEntityBytecode {
327 MultiEntityBytecode {
328 entities: self.entities,
329 event_routing: self.event_routing,
330 proto_router: self.proto_router,
331 }
332 }
333}
334
335pub struct TypedCompiler<S> {
336 pub spec: TypedStreamSpec<S>,
337 entity_name: String,
338 state_id: u32,
339}
340
341impl<S> TypedCompiler<S> {
342 pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
343 TypedCompiler {
344 spec,
345 entity_name,
346 state_id: 0,
347 }
348 }
349
350 pub fn with_state_id(mut self, state_id: u32) -> Self {
351 self.state_id = state_id;
352 self
353 }
354
355 pub fn compile(&self) -> MultiEntityBytecode {
356 let entity_bytecode = self.compile_entity();
357
358 let mut entities = HashMap::new();
359 let mut event_routing = HashMap::new();
360
361 for event_type in entity_bytecode.handlers.keys() {
362 event_routing
363 .entry(event_type.clone())
364 .or_insert_with(Vec::new)
365 .push(self.entity_name.clone());
366 }
367
368 entities.insert(self.entity_name.clone(), entity_bytecode);
369
370 MultiEntityBytecode {
371 entities,
372 event_routing,
373 proto_router: crate::proto_router::ProtoRouter::new(),
374 }
375 }
376
377 fn compile_entity(&self) -> EntityBytecode {
378 let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
379
380 let mut debug_info = Vec::new();
382 for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
383 let event_type = self.get_event_type(&handler_spec.source);
384 let program_id = match &handler_spec.source {
385 crate::ast::SourceSpec::Source { program_id, .. } => {
386 program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
387 }
388 };
389 debug_info.push(format!(
390 " [{}] EventType={}, Mappings={}, ProgramId={}",
391 index,
392 event_type,
393 handler_spec.mappings.len(),
394 program_id
395 ));
396 }
397
398 for handler_spec in &self.spec.handlers {
408 let opcodes = self.compile_handler(handler_spec);
409 let event_type = self.get_event_type(&handler_spec.source);
410
411 if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
412 let mut existing_setup = Vec::new();
417 let mut existing_mappings = Vec::new();
418 let mut existing_teardown = Vec::new();
419 let mut section = 0; for opcode in existing_opcodes.iter() {
422 match opcode {
423 OpCode::ReadOrInitState { .. } => {
424 existing_setup.push(opcode.clone());
425 section = 1; }
427 OpCode::UpdateState { .. } => {
428 existing_teardown.push(opcode.clone());
429 section = 2; }
431 OpCode::EmitMutation { .. } => {
432 existing_teardown.push(opcode.clone());
433 }
434 _ if section == 0 => existing_setup.push(opcode.clone()),
435 _ if section == 1 => existing_mappings.push(opcode.clone()),
436 _ => existing_teardown.push(opcode.clone()),
437 }
438 }
439
440 let mut new_mappings = Vec::new();
442 section = 0;
443
444 for opcode in opcodes.iter() {
445 match opcode {
446 OpCode::ReadOrInitState { .. } => {
447 section = 1; }
449 OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
450 section = 2; }
452 _ if section == 1 => {
453 new_mappings.push(opcode.clone());
454 }
455 _ => {} }
457 }
458
459 let mut merged = Vec::new();
461 merged.extend(existing_setup);
462 merged.extend(existing_mappings);
463 merged.extend(new_mappings.clone());
464 merged.extend(existing_teardown);
465
466 *existing_opcodes = merged;
467 } else {
468 handlers.insert(event_type, opcodes);
469 }
470 }
471
472 for hook in &self.spec.instruction_hooks {
474 let event_type = hook.instruction_type.clone();
475
476 let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
478 tracing::debug!(
479 " Creating new handler for {} with key loading from lookup_by",
480 event_type
481 );
482 let key_reg = 20;
484 let state_reg = 2;
485 let resolved_key_reg = 19;
486 let temp_reg = 18;
487
488 let mut ops = Vec::new();
489
490 ops.push(OpCode::LoadEventField {
492 path: FieldPath::new(&["__resolved_primary_key"]),
493 dest: resolved_key_reg,
494 default: Some(serde_json::json!(null)),
495 });
496
497 ops.push(OpCode::CopyRegister {
499 source: resolved_key_reg,
500 dest: key_reg,
501 });
502
503 if let Some(lookup_path) = &hook.lookup_by {
505 ops.push(OpCode::LoadEventField {
507 path: lookup_path.clone(),
508 dest: temp_reg,
509 default: None,
510 });
511
512 ops.push(OpCode::Transform {
514 source: temp_reg,
515 dest: temp_reg,
516 transformation: Transformation::HexEncode,
517 });
518
519 ops.push(OpCode::CopyRegisterIfNull {
521 source: temp_reg,
522 dest: key_reg,
523 });
524 }
525
526 ops.push(OpCode::ReadOrInitState {
527 state_id: self.state_id,
528 key: key_reg,
529 default: serde_json::json!({}),
530 dest: state_reg,
531 });
532
533 ops.push(OpCode::UpdateState {
534 state_id: self.state_id,
535 key: key_reg,
536 value: state_reg,
537 });
538
539 ops
540 });
541
542 let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
544
545 let insert_pos = handler_opcodes
549 .iter()
550 .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
551 .or_else(|| {
552 handler_opcodes
553 .iter()
554 .position(|op| matches!(op, OpCode::UpdateState { .. }))
555 });
556
557 if let Some(pos) = insert_pos {
558 for (i, opcode) in hook_opcodes.into_iter().enumerate() {
560 handler_opcodes.insert(pos + i, opcode);
561 }
562 }
563 }
564
565 EntityBytecode {
566 state_id: self.state_id,
567 handlers,
568 entity_name: self.entity_name.clone(),
569 computed_fields_evaluator: None,
570 }
571 }
572
573 fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
574 let mut ops = Vec::new();
575 let state_reg = 2;
576 let key_reg = 20;
577
578 ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
579
580 ops.extend(self.compile_temporal_index_update(&spec.key_resolution, key_reg, &spec.mappings));
581
582 ops.push(OpCode::ReadOrInitState {
583 state_id: self.state_id,
584 key: key_reg,
585 default: serde_json::json!({}),
586 dest: state_reg,
587 });
588
589 for mapping in &spec.mappings {
590 ops.extend(self.compile_mapping(mapping, state_reg));
591 }
592
593 ops.push(OpCode::EvaluateComputedFields {
595 state: state_reg,
596 computed_paths: self.spec.computed_fields.clone(),
597 });
598
599 ops.push(OpCode::UpdateState {
600 state_id: self.state_id,
601 key: key_reg,
602 value: state_reg,
603 });
604
605 if spec.emit {
606 ops.push(OpCode::EmitMutation {
607 entity_name: self.entity_name.clone(),
608 key: key_reg,
609 state: state_reg,
610 });
611 }
612
613 ops
614 }
615
616 fn compile_mapping(&self, mapping: &TypedFieldMapping<S>, state_reg: Register) -> Vec<OpCode> {
617 let mut ops = Vec::new();
618 let temp_reg = 10;
619
620 ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
621
622 if let Some(transform) = &mapping.transform {
623 ops.push(OpCode::Transform {
624 source: temp_reg,
625 dest: temp_reg,
626 transformation: transform.clone(),
627 });
628 }
629
630 match &mapping.population {
631 PopulationStrategy::Append => {
632 ops.push(OpCode::AppendToArray {
633 object: state_reg,
634 path: mapping.target_path.clone(),
635 value: temp_reg,
636 });
637 }
638 PopulationStrategy::LastWrite => {
639 ops.push(OpCode::SetField {
640 object: state_reg,
641 path: mapping.target_path.clone(),
642 value: temp_reg,
643 });
644 }
645 PopulationStrategy::SetOnce => {
646 ops.push(OpCode::SetFieldIfNull {
647 object: state_reg,
648 path: mapping.target_path.clone(),
649 value: temp_reg,
650 });
651 }
652 PopulationStrategy::Merge => {
653 ops.push(OpCode::SetField {
654 object: state_reg,
655 path: mapping.target_path.clone(),
656 value: temp_reg,
657 });
658 }
659 PopulationStrategy::Max => {
660 ops.push(OpCode::SetFieldMax {
661 object: state_reg,
662 path: mapping.target_path.clone(),
663 value: temp_reg,
664 });
665 }
666 PopulationStrategy::Sum => {
667 ops.push(OpCode::SetFieldSum {
668 object: state_reg,
669 path: mapping.target_path.clone(),
670 value: temp_reg,
671 });
672 }
673 PopulationStrategy::Count => {
674 ops.push(OpCode::SetFieldIncrement {
676 object: state_reg,
677 path: mapping.target_path.clone(),
678 });
679 }
680 PopulationStrategy::Min => {
681 ops.push(OpCode::SetFieldMin {
682 object: state_reg,
683 path: mapping.target_path.clone(),
684 value: temp_reg,
685 });
686 }
687 PopulationStrategy::UniqueCount => {
688 let set_name = format!("{}_unique_set", mapping.target_path);
691 ops.push(OpCode::AddToUniqueSet {
692 state_id: self.state_id,
693 set_name,
694 value: temp_reg,
695 count_object: state_reg,
696 count_path: mapping.target_path.clone(),
697 });
698 }
699 }
700
701 ops
702 }
703
704 fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
705 match source {
706 MappingSource::FromSource {
707 path,
708 default,
709 transform,
710 } => {
711 let mut ops = vec![OpCode::LoadEventField {
712 path: path.clone(),
713 dest,
714 default: default.clone(),
715 }];
716
717 if let Some(transform_type) = transform {
719 ops.push(OpCode::Transform {
720 source: dest,
721 dest,
722 transformation: transform_type.clone(),
723 });
724 }
725
726 ops
727 }
728 MappingSource::Constant(val) => {
729 vec![OpCode::LoadConstant {
730 value: val.clone(),
731 dest,
732 }]
733 }
734 MappingSource::AsEvent { fields } => {
735 let mut ops = Vec::new();
736
737 if fields.is_empty() {
738 let event_data_reg = dest + 1;
739 ops.push(OpCode::LoadEventField {
740 path: FieldPath::new(&[]),
741 dest: event_data_reg,
742 default: Some(serde_json::json!({})),
743 });
744 ops.push(OpCode::CreateEvent {
745 dest,
746 event_value: event_data_reg,
747 });
748 } else {
749 let data_obj_reg = dest + 1;
750 ops.push(OpCode::CreateObject { dest: data_obj_reg });
751
752 let mut field_registers = Vec::new();
753 let mut current_reg = dest + 2;
754
755 for field_source in fields.iter() {
756 if let MappingSource::FromSource {
757 path,
758 default,
759 transform,
760 } = &**field_source
761 {
762 ops.push(OpCode::LoadEventField {
763 path: path.clone(),
764 dest: current_reg,
765 default: default.clone(),
766 });
767
768 if let Some(transform_type) = transform {
769 ops.push(OpCode::Transform {
770 source: current_reg,
771 dest: current_reg,
772 transformation: transform_type.clone(),
773 });
774 }
775
776 if let Some(field_name) = path.segments.last() {
777 field_registers.push((field_name.clone(), current_reg));
778 }
779 current_reg += 1;
780 }
781 }
782
783 if !field_registers.is_empty() {
784 ops.push(OpCode::SetFields {
785 object: data_obj_reg,
786 fields: field_registers,
787 });
788 }
789
790 ops.push(OpCode::CreateEvent {
791 dest,
792 event_value: data_obj_reg,
793 });
794 }
795
796 ops
797 }
798 MappingSource::WholeSource => {
799 vec![OpCode::LoadEventField {
800 path: FieldPath::new(&[]),
801 dest,
802 default: Some(serde_json::json!({})),
803 }]
804 }
805 MappingSource::AsCapture { field_transforms } => {
806 let capture_data_reg = 22; let mut ops = vec![OpCode::LoadEventField {
809 path: FieldPath::new(&[]),
810 dest: capture_data_reg,
811 default: Some(serde_json::json!({})),
812 }];
813
814 let field_reg = 24;
818 let transformed_reg = 25;
819
820 for (field_name, transform) in field_transforms {
821 ops.push(OpCode::GetField {
824 object: capture_data_reg,
825 path: field_name.clone(),
826 dest: field_reg,
827 });
828
829 ops.push(OpCode::Transform {
831 source: field_reg,
832 dest: transformed_reg,
833 transformation: transform.clone(),
834 });
835
836 ops.push(OpCode::SetField {
838 object: capture_data_reg,
839 path: field_name.clone(),
840 value: transformed_reg,
841 });
842 }
843
844 ops.push(OpCode::CreateCapture {
846 dest,
847 capture_value: capture_data_reg,
848 });
849
850 ops
851 }
852 MappingSource::FromContext { field } => {
853 vec![OpCode::LoadEventField {
855 path: FieldPath::new(&["__update_context", field.as_str()]),
856 dest,
857 default: Some(serde_json::json!(null)),
858 }]
859 }
860 MappingSource::Computed { .. } => {
861 vec![]
862 }
863 MappingSource::FromState { .. } => {
864 vec![]
865 }
866 }
867 }
868
869 pub fn compile_key_loading(
870 &self,
871 resolution: &KeyResolutionStrategy,
872 key_reg: Register,
873 mappings: &[TypedFieldMapping<S>],
874 ) -> Vec<OpCode> {
875 let mut ops = Vec::new();
876
877 let resolved_key_reg = 19; ops.push(OpCode::LoadEventField {
881 path: FieldPath::new(&["__resolved_primary_key"]),
882 dest: resolved_key_reg,
883 default: Some(serde_json::json!(null)),
884 });
885
886 ops.push(OpCode::CopyRegister {
888 source: resolved_key_reg,
889 dest: key_reg,
890 });
891
892 match resolution {
894 KeyResolutionStrategy::Embedded { primary_field } => {
895 let effective_primary_field = if primary_field.segments.is_empty() {
897 if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
899 auto_field
900 } else {
901 primary_field.clone()
902 }
903 } else {
904 primary_field.clone()
905 };
906
907 if !effective_primary_field.segments.is_empty() {
911 let temp_reg = 18;
912 let transform_reg = 23; ops.push(OpCode::LoadEventField {
915 path: effective_primary_field.clone(),
916 dest: temp_reg,
917 default: None,
918 });
919
920 let primary_key_transform = self
923 .find_primary_key_transformation(mappings)
924 .or_else(|| self.find_inherited_primary_key_transformation());
925
926 if let Some(transform) = primary_key_transform {
927 ops.push(OpCode::Transform {
929 source: temp_reg,
930 dest: transform_reg,
931 transformation: transform,
932 });
933 ops.push(OpCode::CopyRegisterIfNull {
935 source: transform_reg,
936 dest: key_reg,
937 });
938 } else {
939 ops.push(OpCode::CopyRegisterIfNull {
941 source: temp_reg,
942 dest: key_reg,
943 });
944 }
945 }
946 }
949 KeyResolutionStrategy::Lookup { primary_field } => {
950 let lookup_reg = 15;
951 let result_reg = 17;
952
953 tracing::debug!(
954 "Compiling Lookup key_resolution: primary_field={:?}",
955 primary_field.segments
956 );
957
958 ops.push(OpCode::LoadEventField {
959 path: primary_field.clone(),
960 dest: lookup_reg,
961 default: None,
962 });
963
964 let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
966
967 tracing::debug!(" Lookup index search result: {:?}", index_name);
968
969 let effective_index_name = index_name.unwrap_or_else(|| {
972 tracing::debug!(
973 "No lookup index configured for primary_field={:?}, using default_pda_lookup",
974 primary_field.segments
975 );
976 "default_pda_lookup".to_string()
977 });
978
979 ops.push(OpCode::LookupIndex {
980 state_id: self.state_id,
981 index_name: effective_index_name,
982 lookup_value: lookup_reg,
983 dest: result_reg,
984 });
985 ops.push(OpCode::CopyRegisterIfNull {
994 source: result_reg,
995 dest: key_reg,
996 });
997 }
998 KeyResolutionStrategy::Computed {
999 primary_field,
1000 compute_partition: _,
1001 } => {
1002 let temp_reg = 18;
1003 ops.push(OpCode::LoadEventField {
1004 path: primary_field.clone(),
1005 dest: temp_reg,
1006 default: None,
1007 });
1008 ops.push(OpCode::CopyRegisterIfNull {
1009 source: temp_reg,
1010 dest: key_reg,
1011 });
1012 }
1013 KeyResolutionStrategy::TemporalLookup {
1014 lookup_field,
1015 timestamp_field,
1016 index_name,
1017 } => {
1018 let lookup_reg = 15;
1019 let timestamp_reg = 16;
1020 let result_reg = 17;
1021
1022 ops.push(OpCode::LoadEventField {
1023 path: lookup_field.clone(),
1024 dest: lookup_reg,
1025 default: None,
1026 });
1027
1028 ops.push(OpCode::LoadEventField {
1029 path: timestamp_field.clone(),
1030 dest: timestamp_reg,
1031 default: None,
1032 });
1033
1034 ops.push(OpCode::LookupTemporalIndex {
1035 state_id: self.state_id,
1036 index_name: index_name.clone(),
1037 lookup_value: lookup_reg,
1038 timestamp: timestamp_reg,
1039 dest: result_reg,
1040 });
1041
1042 ops.push(OpCode::CopyRegisterIfNull {
1043 source: result_reg,
1044 dest: key_reg,
1045 });
1046 }
1047 }
1048
1049 ops
1050 }
1051
1052 fn find_primary_key_transformation(
1053 &self,
1054 mappings: &[TypedFieldMapping<S>],
1055 ) -> Option<Transformation> {
1056 let primary_key = self.spec.identity.primary_keys.first()?;
1058 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1059
1060 for mapping in mappings {
1062 if mapping.target_path == *primary_key
1064 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1065 {
1066 if let Some(transform) = &mapping.transform {
1068 return Some(transform.clone());
1069 }
1070
1071 if let MappingSource::FromSource {
1073 transform: Some(transform),
1074 ..
1075 } = &mapping.source
1076 {
1077 return Some(transform.clone());
1078 }
1079 }
1080 }
1081
1082 for mapping in mappings {
1084 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1085 if let Some(transform) = field_transforms.get(&primary_field_name) {
1086 return Some(transform.clone());
1087 }
1088 }
1089 }
1090
1091 None
1092 }
1093
1094 pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1097 let primary_key = self.spec.identity.primary_keys.first()?;
1098
1099 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1101
1102 for handler in &self.spec.handlers {
1104 for mapping in &handler.mappings {
1105 if mapping.target_path == *primary_key
1107 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1108 {
1109 if let MappingSource::FromSource {
1111 path, transform, ..
1112 } = &mapping.source
1113 {
1114 if path.segments.last() == Some(&primary_field_name) {
1115 return mapping.transform.clone().or_else(|| transform.clone());
1117 }
1118 }
1119 }
1120
1121 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1123 if let Some(transform) = field_transforms.get(&primary_field_name) {
1124 return Some(transform.clone());
1125 }
1126 }
1127 }
1128 }
1129
1130 None
1131 }
1132
1133 fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1135 primary_key.split('.').next_back().map(|s| s.to_string())
1137 }
1138
1139 pub fn auto_detect_primary_field(
1142 &self,
1143 current_mappings: &[TypedFieldMapping<S>],
1144 ) -> Option<FieldPath> {
1145 let primary_key = self.spec.identity.primary_keys.first()?;
1146
1147 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1149
1150 if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1152 return Some(FieldPath::new(&[&primary_field_name]));
1153 }
1154
1155 None
1156 }
1157
1158 fn current_account_has_primary_field(
1161 &self,
1162 field_name: &str,
1163 mappings: &[TypedFieldMapping<S>],
1164 ) -> bool {
1165 for mapping in mappings {
1167 if let MappingSource::FromSource { path, .. } = &mapping.source {
1168 if path.segments.last() == Some(&field_name.to_string()) {
1170 return true;
1171 }
1172 }
1173 }
1174
1175 false
1176 }
1177
1178 #[allow(dead_code)]
1180 fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1181 for mapping in mappings {
1182 if let MappingSource::FromSource { path, .. } = &mapping.source {
1183 if path.segments.last() == Some(&field_name.to_string()) {
1184 return true;
1185 }
1186 }
1187 }
1188 false
1189 }
1190
1191 #[allow(dead_code)]
1194 fn field_exists_in_mappings(
1195 &self,
1196 field_name: &str,
1197 mappings: &[TypedFieldMapping<S>],
1198 ) -> bool {
1199 for mapping in mappings {
1201 if let MappingSource::FromSource { path, .. } = &mapping.source {
1202 if path.segments.last() == Some(&field_name.to_string()) {
1203 return true;
1204 }
1205 }
1206 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1208 if field_transforms.contains_key(field_name) {
1209 return true;
1210 }
1211 }
1212 }
1213 false
1214 }
1215
1216 fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1217 if field_path.segments.is_empty() {
1218 return None;
1219 }
1220
1221 let lookup_field_name = field_path.segments.last().unwrap();
1222
1223 for lookup_index in &self.spec.identity.lookup_indexes {
1224 let index_field_name = lookup_index
1225 .field_name
1226 .split('.')
1227 .next_back()
1228 .unwrap_or(&lookup_index.field_name);
1229 if index_field_name == lookup_field_name {
1230 return Some(format!("{}_lookup_index", index_field_name));
1231 }
1232 }
1233
1234 None
1235 }
1236
1237 fn find_lookup_index_for_lookup_field(
1240 &self,
1241 primary_field: &FieldPath,
1242 mappings: &[TypedFieldMapping<S>],
1243 ) -> Option<String> {
1244 let primary_path = primary_field.segments.join(".");
1246
1247 for mapping in mappings {
1249 if let MappingSource::FromSource { path, .. } = &mapping.source {
1251 let source_path = path.segments.join(".");
1252 if source_path == primary_path {
1253 for lookup_index in &self.spec.identity.lookup_indexes {
1255 if mapping.target_path == lookup_index.field_name {
1256 let index_field_name = lookup_index
1257 .field_name
1258 .split('.')
1259 .next_back()
1260 .unwrap_or(&lookup_index.field_name);
1261 return Some(format!("{}_lookup_index", index_field_name));
1262 }
1263 }
1264 }
1265 }
1266 }
1267
1268 self.find_lookup_index_for_field(primary_field)
1270 }
1271
1272 fn find_source_path_for_lookup_index(
1276 &self,
1277 mappings: &[TypedFieldMapping<S>],
1278 lookup_field_name: &str,
1279 ) -> Option<Vec<String>> {
1280 for mapping in mappings {
1281 if mapping.target_path == lookup_field_name {
1282 if let MappingSource::FromSource { path, .. } = &mapping.source {
1283 return Some(path.segments.clone());
1284 }
1285 }
1286 }
1287 None
1288 }
1289
1290 fn compile_temporal_index_update(
1291 &self,
1292 resolution: &KeyResolutionStrategy,
1293 key_reg: Register,
1294 mappings: &[TypedFieldMapping<S>],
1295 ) -> Vec<OpCode> {
1296 let mut ops = Vec::new();
1297
1298 for lookup_index in &self.spec.identity.lookup_indexes {
1299 let lookup_reg = 17;
1300 let source_field = lookup_index
1301 .field_name
1302 .split('.')
1303 .next_back()
1304 .unwrap_or(&lookup_index.field_name);
1305
1306 match resolution {
1307 KeyResolutionStrategy::Embedded { primary_field: _ } => {
1308 let source_path_opt = self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1311
1312 let load_path = if let Some(ref path) = source_path_opt {
1313 FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1314 } else {
1315 FieldPath::new(&[source_field])
1317 };
1318
1319 ops.push(OpCode::LoadEventField {
1320 path: load_path,
1321 dest: lookup_reg,
1322 default: None,
1323 });
1324
1325 if let Some(temporal_field_name) = &lookup_index.temporal_field {
1326 let timestamp_reg = 18;
1327
1328 ops.push(OpCode::LoadEventField {
1329 path: FieldPath::new(&[temporal_field_name]),
1330 dest: timestamp_reg,
1331 default: None,
1332 });
1333
1334 let index_name = format!("{}_temporal_index", source_field);
1335 ops.push(OpCode::UpdateTemporalIndex {
1336 state_id: self.state_id,
1337 index_name,
1338 lookup_value: lookup_reg,
1339 primary_key: key_reg,
1340 timestamp: timestamp_reg,
1341 });
1342
1343 let simple_index_name = format!("{}_lookup_index", source_field);
1344 ops.push(OpCode::UpdateLookupIndex {
1345 state_id: self.state_id,
1346 index_name: simple_index_name,
1347 lookup_value: lookup_reg,
1348 primary_key: key_reg,
1349 });
1350 } else {
1351 let index_name = format!("{}_lookup_index", source_field);
1352 ops.push(OpCode::UpdateLookupIndex {
1353 state_id: self.state_id,
1354 index_name,
1355 lookup_value: lookup_reg,
1356 primary_key: key_reg,
1357 });
1358 }
1359
1360 if source_path_opt.is_some() {
1364 ops.push(OpCode::UpdatePdaReverseLookup {
1365 state_id: self.state_id,
1366 lookup_name: "default_pda_lookup".to_string(),
1367 pda_address: lookup_reg,
1368 primary_key: key_reg,
1369 });
1370 }
1371 }
1372 KeyResolutionStrategy::Lookup { primary_field } => {
1373 let has_mapping_to_lookup_field = mappings.iter().any(|m| {
1376 m.target_path == lookup_index.field_name
1377 });
1378
1379 if has_mapping_to_lookup_field {
1380 let path_segments: Vec<&str> = primary_field.segments.iter().map(|s| s.as_str()).collect();
1383 ops.push(OpCode::LoadEventField {
1384 path: FieldPath::new(&path_segments),
1385 dest: lookup_reg,
1386 default: None,
1387 });
1388
1389 let index_name = format!("{}_lookup_index", source_field);
1390 ops.push(OpCode::UpdateLookupIndex {
1391 state_id: self.state_id,
1392 index_name,
1393 lookup_value: lookup_reg,
1394 primary_key: key_reg,
1395 });
1396 }
1397 }
1398 KeyResolutionStrategy::Computed { .. } | KeyResolutionStrategy::TemporalLookup { .. } => {
1399 }
1401 }
1402 }
1403
1404 ops
1405 }
1406
1407 fn get_event_type(&self, source: &SourceSpec) -> String {
1408 match source {
1409 SourceSpec::Source { type_name, .. } => type_name.clone(),
1410 }
1411 }
1412
1413 fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1414 let mut ops = Vec::new();
1415 let state_reg = 2;
1416
1417 for action in actions {
1418 match action {
1419 HookAction::SetField {
1420 target_field,
1421 source,
1422 condition,
1423 } => {
1424 let _ = condition;
1426
1427 let temp_reg = 11; let load_ops = self.compile_mapping_source(source, temp_reg);
1431 ops.extend(load_ops);
1432
1433 if let MappingSource::FromSource { transform: Some(transform_type), .. } = source {
1435 ops.push(OpCode::Transform {
1436 source: temp_reg,
1437 dest: temp_reg,
1438 transformation: transform_type.clone(),
1439 });
1440 }
1441
1442 if let Some(cond_expr) = condition {
1444 if let Some(parsed) = &cond_expr.parsed {
1445 let cond_check_ops = self.compile_condition_check(
1447 parsed,
1448 temp_reg,
1449 state_reg,
1450 target_field,
1451 );
1452 ops.extend(cond_check_ops);
1453 } else {
1454 ops.push(OpCode::SetField {
1456 object: state_reg,
1457 path: target_field.clone(),
1458 value: temp_reg,
1459 });
1460 }
1461 } else {
1462 ops.push(OpCode::SetField {
1464 object: state_reg,
1465 path: target_field.clone(),
1466 value: temp_reg,
1467 });
1468 }
1469 }
1470 HookAction::IncrementField {
1471 target_field,
1472 increment_by,
1473 condition,
1474 } => {
1475 if let Some(cond_expr) = condition {
1476 if let Some(parsed) = &cond_expr.parsed {
1477 let cond_check_ops = self.compile_conditional_increment(
1482 parsed,
1483 state_reg,
1484 target_field,
1485 *increment_by,
1486 );
1487 ops.extend(cond_check_ops);
1488 } else {
1489 ops.push(OpCode::SetFieldIncrement {
1491 object: state_reg,
1492 path: target_field.clone(),
1493 });
1494 }
1495 } else {
1496 ops.push(OpCode::SetFieldIncrement {
1498 object: state_reg,
1499 path: target_field.clone(),
1500 });
1501 }
1502 }
1503 HookAction::RegisterPdaMapping { .. } => {
1504 }
1507 }
1508 }
1509
1510 ops
1511 }
1512
1513 fn compile_condition_check(
1514 &self,
1515 condition: &ParsedCondition,
1516 value_reg: Register,
1517 state_reg: Register,
1518 target_field: &str,
1519 ) -> Vec<OpCode> {
1520 match condition {
1521 ParsedCondition::Comparison {
1522 field,
1523 op,
1524 value: cond_value,
1525 } => {
1526 vec![OpCode::ConditionalSetField {
1528 object: state_reg,
1529 path: target_field.to_string(),
1530 value: value_reg,
1531 condition_field: field.clone(),
1532 condition_op: op.clone(),
1533 condition_value: cond_value.clone(),
1534 }]
1535 }
1536 ParsedCondition::Logical { .. } => {
1537 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1539 vec![OpCode::SetField {
1540 object: state_reg,
1541 path: target_field.to_string(),
1542 value: value_reg,
1543 }]
1544 }
1545 }
1546 }
1547
1548 fn compile_conditional_increment(
1549 &self,
1550 condition: &ParsedCondition,
1551 state_reg: Register,
1552 target_field: &str,
1553 _increment_by: i64,
1554 ) -> Vec<OpCode> {
1555 match condition {
1556 ParsedCondition::Comparison {
1557 field,
1558 op,
1559 value: cond_value,
1560 } => {
1561 vec![OpCode::ConditionalIncrement {
1562 object: state_reg,
1563 path: target_field.to_string(),
1564 condition_field: field.clone(),
1565 condition_op: op.clone(),
1566 condition_value: cond_value.clone(),
1567 }]
1568 }
1569 ParsedCondition::Logical { .. } => {
1570 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1571 vec![OpCode::SetFieldIncrement {
1572 object: state_reg,
1573 path: target_field.to_string(),
1574 }]
1575 }
1576 }
1577 }
1578}