1use crate::ast::*;
2use serde_json::Value;
3use std::collections::HashMap;
4use tracing;
5
6pub type Register = usize;
7
8#[derive(Debug, Clone)]
9pub enum OpCode {
10 LoadEventField {
11 path: FieldPath,
12 dest: Register,
13 default: Option<Value>,
14 },
15 LoadConstant {
16 value: Value,
17 dest: Register,
18 },
19 CopyRegister {
20 source: Register,
21 dest: Register,
22 },
23 CopyRegisterIfNull {
25 source: Register,
26 dest: Register,
27 },
28 GetEventType {
29 dest: Register,
30 },
31 CreateObject {
32 dest: Register,
33 },
34 SetField {
35 object: Register,
36 path: String,
37 value: Register,
38 },
39 SetFields {
40 object: Register,
41 fields: Vec<(String, Register)>,
42 },
43 GetField {
44 object: Register,
45 path: String,
46 dest: Register,
47 },
48 ReadOrInitState {
49 state_id: u32,
50 key: Register,
51 default: Value,
52 dest: Register,
53 },
54 UpdateState {
55 state_id: u32,
56 key: Register,
57 value: Register,
58 },
59 AppendToArray {
60 object: Register,
61 path: String,
62 value: Register,
63 },
64 GetCurrentTimestamp {
65 dest: Register,
66 },
67 CreateEvent {
68 dest: Register,
69 event_value: Register,
70 },
71 CreateCapture {
72 dest: Register,
73 capture_value: Register,
74 },
75 Transform {
76 source: Register,
77 dest: Register,
78 transformation: Transformation,
79 },
80 EmitMutation {
81 entity_name: String,
82 key: Register,
83 state: Register,
84 },
85 SetFieldIfNull {
86 object: Register,
87 path: String,
88 value: Register,
89 },
90 SetFieldMax {
91 object: Register,
92 path: String,
93 value: Register,
94 },
95 UpdateTemporalIndex {
96 state_id: u32,
97 index_name: String,
98 lookup_value: Register,
99 primary_key: Register,
100 timestamp: Register,
101 },
102 LookupTemporalIndex {
103 state_id: u32,
104 index_name: String,
105 lookup_value: Register,
106 timestamp: Register,
107 dest: Register,
108 },
109 UpdateLookupIndex {
110 state_id: u32,
111 index_name: String,
112 lookup_value: Register,
113 primary_key: Register,
114 },
115 LookupIndex {
116 state_id: u32,
117 index_name: String,
118 lookup_value: Register,
119 dest: Register,
120 },
121 SetFieldSum {
123 object: Register,
124 path: String,
125 value: Register,
126 },
127 SetFieldIncrement {
129 object: Register,
130 path: String,
131 },
132 SetFieldMin {
134 object: Register,
135 path: String,
136 value: Register,
137 },
138 AddToUniqueSet {
141 state_id: u32,
142 set_name: String,
143 value: Register,
144 count_object: Register,
145 count_path: String,
146 },
147 ConditionalSetField {
149 object: Register,
150 path: String,
151 value: Register,
152 condition_field: FieldPath,
153 condition_op: ComparisonOp,
154 condition_value: Value,
155 },
156 ConditionalIncrement {
158 object: Register,
159 path: String,
160 condition_field: FieldPath,
161 condition_op: ComparisonOp,
162 condition_value: Value,
163 },
164 EvaluateComputedFields {
167 state: Register,
168 computed_paths: Vec<String>,
169 },
170 UpdatePdaReverseLookup {
173 state_id: u32,
174 lookup_name: String,
175 pda_address: Register,
176 primary_key: Register,
177 },
178}
179
180pub struct EntityBytecode {
181 pub state_id: u32,
182 pub handlers: HashMap<String, Vec<OpCode>>,
183 pub entity_name: String,
184 #[allow(clippy::type_complexity)]
186 pub computed_fields_evaluator: Option<
187 Box<
188 dyn Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>> + Send + Sync,
189 >,
190 >,
191}
192
193impl std::fmt::Debug for EntityBytecode {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("EntityBytecode")
196 .field("state_id", &self.state_id)
197 .field("handlers", &self.handlers)
198 .field("entity_name", &self.entity_name)
199 .field(
200 "computed_fields_evaluator",
201 &self.computed_fields_evaluator.is_some(),
202 )
203 .finish()
204 }
205}
206
207#[derive(Debug)]
208pub struct MultiEntityBytecode {
209 pub entities: HashMap<String, EntityBytecode>,
210 pub event_routing: HashMap<String, Vec<String>>,
211 pub proto_router: crate::proto_router::ProtoRouter,
212}
213
214impl MultiEntityBytecode {
215 pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
216 tracing::info!(
217 "🔨 Compiling entity {} with {} handlers",
218 entity_name,
219 spec.handlers.len()
220 );
221 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
222 let entity_bytecode = compiler.compile_entity();
223 tracing::info!(
224 "🔨 Compiled {} handlers for {}",
225 entity_bytecode.handlers.len(),
226 entity_name
227 );
228
229 let mut entities = HashMap::new();
230 let mut event_routing = HashMap::new();
231
232 for event_type in entity_bytecode.handlers.keys() {
233 event_routing
234 .entry(event_type.clone())
235 .or_insert_with(Vec::new)
236 .push(entity_name.clone());
237 }
238
239 entities.insert(entity_name, entity_bytecode);
240
241 MultiEntityBytecode {
242 entities,
243 event_routing,
244 proto_router: crate::proto_router::ProtoRouter::new(),
245 }
246 }
247
248 pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
249 let entities = HashMap::new();
250 let event_routing = HashMap::new();
251
252 if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
253 panic!("from_entities requires type information - use builder pattern instead");
254 }
255
256 MultiEntityBytecode {
257 entities,
258 event_routing,
259 proto_router: crate::proto_router::ProtoRouter::new(),
260 }
261 }
262
263 #[allow(clippy::new_ret_no_self)]
264 pub fn new() -> MultiEntityBytecodeBuilder {
265 MultiEntityBytecodeBuilder {
266 entities: HashMap::new(),
267 event_routing: HashMap::new(),
268 proto_router: crate::proto_router::ProtoRouter::new(),
269 }
270 }
271}
272
273pub struct MultiEntityBytecodeBuilder {
274 entities: HashMap<String, EntityBytecode>,
275 event_routing: HashMap<String, Vec<String>>,
276 proto_router: crate::proto_router::ProtoRouter,
277}
278
279impl MultiEntityBytecodeBuilder {
280 pub fn add_entity<S>(
281 self,
282 entity_name: String,
283 spec: TypedStreamSpec<S>,
284 state_id: u32,
285 ) -> Self {
286 self.add_entity_with_evaluator(
287 entity_name,
288 spec,
289 state_id,
290 None::<fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>>,
291 )
292 }
293
294 pub fn add_entity_with_evaluator<S, F>(
295 mut self,
296 entity_name: String,
297 spec: TypedStreamSpec<S>,
298 state_id: u32,
299 evaluator: Option<F>,
300 ) -> Self
301 where
302 F: Fn(&mut Value) -> std::result::Result<(), Box<dyn std::error::Error>>
303 + Send
304 + Sync
305 + 'static,
306 {
307 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
308 let mut entity_bytecode = compiler.compile_entity();
309
310 if let Some(eval) = evaluator {
312 entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
313 }
314
315 for event_type in entity_bytecode.handlers.keys() {
316 self.event_routing
317 .entry(event_type.clone())
318 .or_default()
319 .push(entity_name.clone());
320 }
321
322 self.entities.insert(entity_name, entity_bytecode);
323 self
324 }
325
326 pub fn build(self) -> MultiEntityBytecode {
327 MultiEntityBytecode {
328 entities: self.entities,
329 event_routing: self.event_routing,
330 proto_router: self.proto_router,
331 }
332 }
333}
334
335pub struct TypedCompiler<S> {
336 pub spec: TypedStreamSpec<S>,
337 entity_name: String,
338 state_id: u32,
339}
340
341impl<S> TypedCompiler<S> {
342 pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
343 TypedCompiler {
344 spec,
345 entity_name,
346 state_id: 0,
347 }
348 }
349
350 pub fn with_state_id(mut self, state_id: u32) -> Self {
351 self.state_id = state_id;
352 self
353 }
354
355 pub fn compile(&self) -> MultiEntityBytecode {
356 let entity_bytecode = self.compile_entity();
357
358 let mut entities = HashMap::new();
359 let mut event_routing = HashMap::new();
360
361 for event_type in entity_bytecode.handlers.keys() {
362 event_routing
363 .entry(event_type.clone())
364 .or_insert_with(Vec::new)
365 .push(self.entity_name.clone());
366 }
367
368 entities.insert(self.entity_name.clone(), entity_bytecode);
369
370 MultiEntityBytecode {
371 entities,
372 event_routing,
373 proto_router: crate::proto_router::ProtoRouter::new(),
374 }
375 }
376
377 fn compile_entity(&self) -> EntityBytecode {
378 let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
379
380 let mut debug_info = Vec::new();
382 for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
383 let event_type = self.get_event_type(&handler_spec.source);
384 let program_id = match &handler_spec.source {
385 crate::ast::SourceSpec::Source { program_id, .. } => {
386 program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
387 }
388 };
389 debug_info.push(format!(
390 " [{}] EventType={}, Mappings={}, ProgramId={}",
391 index,
392 event_type,
393 handler_spec.mappings.len(),
394 program_id
395 ));
396 }
397
398 for handler_spec in &self.spec.handlers {
408 let opcodes = self.compile_handler(handler_spec);
409 let event_type = self.get_event_type(&handler_spec.source);
410
411 if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
412 let mut existing_setup = Vec::new();
417 let mut existing_mappings = Vec::new();
418 let mut existing_teardown = Vec::new();
419 let mut section = 0; for opcode in existing_opcodes.iter() {
422 match opcode {
423 OpCode::ReadOrInitState { .. } => {
424 existing_setup.push(opcode.clone());
425 section = 1; }
427 OpCode::UpdateState { .. } => {
428 existing_teardown.push(opcode.clone());
429 section = 2; }
431 OpCode::EmitMutation { .. } => {
432 existing_teardown.push(opcode.clone());
433 }
434 _ if section == 0 => existing_setup.push(opcode.clone()),
435 _ if section == 1 => existing_mappings.push(opcode.clone()),
436 _ => existing_teardown.push(opcode.clone()),
437 }
438 }
439
440 let mut new_mappings = Vec::new();
442 section = 0;
443
444 for opcode in opcodes.iter() {
445 match opcode {
446 OpCode::ReadOrInitState { .. } => {
447 section = 1; }
449 OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
450 section = 2; }
452 _ if section == 1 => {
453 new_mappings.push(opcode.clone());
454 }
455 _ => {} }
457 }
458
459 let mut merged = Vec::new();
461 merged.extend(existing_setup);
462 merged.extend(existing_mappings);
463 merged.extend(new_mappings.clone());
464 merged.extend(existing_teardown);
465
466 *existing_opcodes = merged;
467 } else {
468 handlers.insert(event_type, opcodes);
469 }
470 }
471
472 for hook in &self.spec.instruction_hooks {
474 let event_type = hook.instruction_type.clone();
475
476 let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
478 tracing::debug!(
479 " Creating new handler for {} with key loading from lookup_by",
480 event_type
481 );
482 let key_reg = 20;
484 let state_reg = 2;
485 let resolved_key_reg = 19;
486 let temp_reg = 18;
487
488 let mut ops = Vec::new();
489
490 ops.push(OpCode::LoadEventField {
492 path: FieldPath::new(&["__resolved_primary_key"]),
493 dest: resolved_key_reg,
494 default: Some(serde_json::json!(null)),
495 });
496
497 ops.push(OpCode::CopyRegister {
499 source: resolved_key_reg,
500 dest: key_reg,
501 });
502
503 if let Some(lookup_path) = &hook.lookup_by {
505 ops.push(OpCode::LoadEventField {
507 path: lookup_path.clone(),
508 dest: temp_reg,
509 default: None,
510 });
511
512 ops.push(OpCode::Transform {
514 source: temp_reg,
515 dest: temp_reg,
516 transformation: Transformation::HexEncode,
517 });
518
519 ops.push(OpCode::CopyRegisterIfNull {
521 source: temp_reg,
522 dest: key_reg,
523 });
524 }
525
526 ops.push(OpCode::ReadOrInitState {
527 state_id: self.state_id,
528 key: key_reg,
529 default: serde_json::json!({}),
530 dest: state_reg,
531 });
532
533 ops.push(OpCode::UpdateState {
534 state_id: self.state_id,
535 key: key_reg,
536 value: state_reg,
537 });
538
539 ops
540 });
541
542 let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
544
545 let insert_pos = handler_opcodes
549 .iter()
550 .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
551 .or_else(|| {
552 handler_opcodes
553 .iter()
554 .position(|op| matches!(op, OpCode::UpdateState { .. }))
555 });
556
557 if let Some(pos) = insert_pos {
558 for (i, opcode) in hook_opcodes.into_iter().enumerate() {
560 handler_opcodes.insert(pos + i, opcode);
561 }
562 }
563 }
564
565 EntityBytecode {
566 state_id: self.state_id,
567 handlers,
568 entity_name: self.entity_name.clone(),
569 computed_fields_evaluator: None,
570 }
571 }
572
573 fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
574 let mut ops = Vec::new();
575 let state_reg = 2;
576 let key_reg = 20;
577
578 ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
579
580 ops.extend(self.compile_temporal_index_update(
581 &spec.key_resolution,
582 key_reg,
583 &spec.mappings,
584 ));
585
586 ops.push(OpCode::ReadOrInitState {
587 state_id: self.state_id,
588 key: key_reg,
589 default: serde_json::json!({}),
590 dest: state_reg,
591 });
592
593 for mapping in &spec.mappings {
594 ops.extend(self.compile_mapping(mapping, state_reg));
595 }
596
597 ops.push(OpCode::EvaluateComputedFields {
599 state: state_reg,
600 computed_paths: self.spec.computed_fields.clone(),
601 });
602
603 ops.push(OpCode::UpdateState {
604 state_id: self.state_id,
605 key: key_reg,
606 value: state_reg,
607 });
608
609 if spec.emit {
610 ops.push(OpCode::EmitMutation {
611 entity_name: self.entity_name.clone(),
612 key: key_reg,
613 state: state_reg,
614 });
615 }
616
617 ops
618 }
619
620 fn compile_mapping(&self, mapping: &TypedFieldMapping<S>, state_reg: Register) -> Vec<OpCode> {
621 let mut ops = Vec::new();
622 let temp_reg = 10;
623
624 ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
625
626 if let Some(transform) = &mapping.transform {
627 ops.push(OpCode::Transform {
628 source: temp_reg,
629 dest: temp_reg,
630 transformation: transform.clone(),
631 });
632 }
633
634 match &mapping.population {
635 PopulationStrategy::Append => {
636 ops.push(OpCode::AppendToArray {
637 object: state_reg,
638 path: mapping.target_path.clone(),
639 value: temp_reg,
640 });
641 }
642 PopulationStrategy::LastWrite => {
643 ops.push(OpCode::SetField {
644 object: state_reg,
645 path: mapping.target_path.clone(),
646 value: temp_reg,
647 });
648 }
649 PopulationStrategy::SetOnce => {
650 ops.push(OpCode::SetFieldIfNull {
651 object: state_reg,
652 path: mapping.target_path.clone(),
653 value: temp_reg,
654 });
655 }
656 PopulationStrategy::Merge => {
657 ops.push(OpCode::SetField {
658 object: state_reg,
659 path: mapping.target_path.clone(),
660 value: temp_reg,
661 });
662 }
663 PopulationStrategy::Max => {
664 ops.push(OpCode::SetFieldMax {
665 object: state_reg,
666 path: mapping.target_path.clone(),
667 value: temp_reg,
668 });
669 }
670 PopulationStrategy::Sum => {
671 ops.push(OpCode::SetFieldSum {
672 object: state_reg,
673 path: mapping.target_path.clone(),
674 value: temp_reg,
675 });
676 }
677 PopulationStrategy::Count => {
678 ops.push(OpCode::SetFieldIncrement {
680 object: state_reg,
681 path: mapping.target_path.clone(),
682 });
683 }
684 PopulationStrategy::Min => {
685 ops.push(OpCode::SetFieldMin {
686 object: state_reg,
687 path: mapping.target_path.clone(),
688 value: temp_reg,
689 });
690 }
691 PopulationStrategy::UniqueCount => {
692 let set_name = format!("{}_unique_set", mapping.target_path);
695 ops.push(OpCode::AddToUniqueSet {
696 state_id: self.state_id,
697 set_name,
698 value: temp_reg,
699 count_object: state_reg,
700 count_path: mapping.target_path.clone(),
701 });
702 }
703 }
704
705 ops
706 }
707
708 fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
709 match source {
710 MappingSource::FromSource {
711 path,
712 default,
713 transform,
714 } => {
715 let mut ops = vec![OpCode::LoadEventField {
716 path: path.clone(),
717 dest,
718 default: default.clone(),
719 }];
720
721 if let Some(transform_type) = transform {
723 ops.push(OpCode::Transform {
724 source: dest,
725 dest,
726 transformation: transform_type.clone(),
727 });
728 }
729
730 ops
731 }
732 MappingSource::Constant(val) => {
733 vec![OpCode::LoadConstant {
734 value: val.clone(),
735 dest,
736 }]
737 }
738 MappingSource::AsEvent { fields } => {
739 let mut ops = Vec::new();
740
741 if fields.is_empty() {
742 let event_data_reg = dest + 1;
743 ops.push(OpCode::LoadEventField {
744 path: FieldPath::new(&[]),
745 dest: event_data_reg,
746 default: Some(serde_json::json!({})),
747 });
748 ops.push(OpCode::CreateEvent {
749 dest,
750 event_value: event_data_reg,
751 });
752 } else {
753 let data_obj_reg = dest + 1;
754 ops.push(OpCode::CreateObject { dest: data_obj_reg });
755
756 let mut field_registers = Vec::new();
757 let mut current_reg = dest + 2;
758
759 for field_source in fields.iter() {
760 if let MappingSource::FromSource {
761 path,
762 default,
763 transform,
764 } = &**field_source
765 {
766 ops.push(OpCode::LoadEventField {
767 path: path.clone(),
768 dest: current_reg,
769 default: default.clone(),
770 });
771
772 if let Some(transform_type) = transform {
773 ops.push(OpCode::Transform {
774 source: current_reg,
775 dest: current_reg,
776 transformation: transform_type.clone(),
777 });
778 }
779
780 if let Some(field_name) = path.segments.last() {
781 field_registers.push((field_name.clone(), current_reg));
782 }
783 current_reg += 1;
784 }
785 }
786
787 if !field_registers.is_empty() {
788 ops.push(OpCode::SetFields {
789 object: data_obj_reg,
790 fields: field_registers,
791 });
792 }
793
794 ops.push(OpCode::CreateEvent {
795 dest,
796 event_value: data_obj_reg,
797 });
798 }
799
800 ops
801 }
802 MappingSource::WholeSource => {
803 vec![OpCode::LoadEventField {
804 path: FieldPath::new(&[]),
805 dest,
806 default: Some(serde_json::json!({})),
807 }]
808 }
809 MappingSource::AsCapture { field_transforms } => {
810 let capture_data_reg = 22; let mut ops = vec![OpCode::LoadEventField {
813 path: FieldPath::new(&[]),
814 dest: capture_data_reg,
815 default: Some(serde_json::json!({})),
816 }];
817
818 let field_reg = 24;
822 let transformed_reg = 25;
823
824 for (field_name, transform) in field_transforms {
825 ops.push(OpCode::GetField {
828 object: capture_data_reg,
829 path: field_name.clone(),
830 dest: field_reg,
831 });
832
833 ops.push(OpCode::Transform {
835 source: field_reg,
836 dest: transformed_reg,
837 transformation: transform.clone(),
838 });
839
840 ops.push(OpCode::SetField {
842 object: capture_data_reg,
843 path: field_name.clone(),
844 value: transformed_reg,
845 });
846 }
847
848 ops.push(OpCode::CreateCapture {
850 dest,
851 capture_value: capture_data_reg,
852 });
853
854 ops
855 }
856 MappingSource::FromContext { field } => {
857 vec![OpCode::LoadEventField {
859 path: FieldPath::new(&["__update_context", field.as_str()]),
860 dest,
861 default: Some(serde_json::json!(null)),
862 }]
863 }
864 MappingSource::Computed { .. } => {
865 vec![]
866 }
867 MappingSource::FromState { .. } => {
868 vec![]
869 }
870 }
871 }
872
873 pub fn compile_key_loading(
874 &self,
875 resolution: &KeyResolutionStrategy,
876 key_reg: Register,
877 mappings: &[TypedFieldMapping<S>],
878 ) -> Vec<OpCode> {
879 let mut ops = Vec::new();
880
881 let resolved_key_reg = 19; ops.push(OpCode::LoadEventField {
885 path: FieldPath::new(&["__resolved_primary_key"]),
886 dest: resolved_key_reg,
887 default: Some(serde_json::json!(null)),
888 });
889
890 ops.push(OpCode::CopyRegister {
892 source: resolved_key_reg,
893 dest: key_reg,
894 });
895
896 match resolution {
898 KeyResolutionStrategy::Embedded { primary_field } => {
899 let effective_primary_field = if primary_field.segments.is_empty() {
901 if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
903 auto_field
904 } else {
905 primary_field.clone()
906 }
907 } else {
908 primary_field.clone()
909 };
910
911 if !effective_primary_field.segments.is_empty() {
915 let temp_reg = 18;
916 let transform_reg = 23; ops.push(OpCode::LoadEventField {
919 path: effective_primary_field.clone(),
920 dest: temp_reg,
921 default: None,
922 });
923
924 let primary_key_transform = self
927 .find_primary_key_transformation(mappings)
928 .or_else(|| self.find_inherited_primary_key_transformation());
929
930 if let Some(transform) = primary_key_transform {
931 ops.push(OpCode::Transform {
933 source: temp_reg,
934 dest: transform_reg,
935 transformation: transform,
936 });
937 ops.push(OpCode::CopyRegisterIfNull {
939 source: transform_reg,
940 dest: key_reg,
941 });
942 } else {
943 ops.push(OpCode::CopyRegisterIfNull {
945 source: temp_reg,
946 dest: key_reg,
947 });
948 }
949 }
950 }
953 KeyResolutionStrategy::Lookup { primary_field } => {
954 let lookup_reg = 15;
955 let result_reg = 17;
956
957 tracing::debug!(
958 "Compiling Lookup key_resolution: primary_field={:?}",
959 primary_field.segments
960 );
961
962 ops.push(OpCode::LoadEventField {
963 path: primary_field.clone(),
964 dest: lookup_reg,
965 default: None,
966 });
967
968 let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
970
971 tracing::debug!(" Lookup index search result: {:?}", index_name);
972
973 let effective_index_name = index_name.unwrap_or_else(|| {
976 tracing::debug!(
977 "No lookup index configured for primary_field={:?}, using default_pda_lookup",
978 primary_field.segments
979 );
980 "default_pda_lookup".to_string()
981 });
982
983 ops.push(OpCode::LookupIndex {
984 state_id: self.state_id,
985 index_name: effective_index_name,
986 lookup_value: lookup_reg,
987 dest: result_reg,
988 });
989 ops.push(OpCode::CopyRegisterIfNull {
998 source: result_reg,
999 dest: key_reg,
1000 });
1001 }
1002 KeyResolutionStrategy::Computed {
1003 primary_field,
1004 compute_partition: _,
1005 } => {
1006 let temp_reg = 18;
1007 ops.push(OpCode::LoadEventField {
1008 path: primary_field.clone(),
1009 dest: temp_reg,
1010 default: None,
1011 });
1012 ops.push(OpCode::CopyRegisterIfNull {
1013 source: temp_reg,
1014 dest: key_reg,
1015 });
1016 }
1017 KeyResolutionStrategy::TemporalLookup {
1018 lookup_field,
1019 timestamp_field,
1020 index_name,
1021 } => {
1022 let lookup_reg = 15;
1023 let timestamp_reg = 16;
1024 let result_reg = 17;
1025
1026 ops.push(OpCode::LoadEventField {
1027 path: lookup_field.clone(),
1028 dest: lookup_reg,
1029 default: None,
1030 });
1031
1032 ops.push(OpCode::LoadEventField {
1033 path: timestamp_field.clone(),
1034 dest: timestamp_reg,
1035 default: None,
1036 });
1037
1038 ops.push(OpCode::LookupTemporalIndex {
1039 state_id: self.state_id,
1040 index_name: index_name.clone(),
1041 lookup_value: lookup_reg,
1042 timestamp: timestamp_reg,
1043 dest: result_reg,
1044 });
1045
1046 ops.push(OpCode::CopyRegisterIfNull {
1047 source: result_reg,
1048 dest: key_reg,
1049 });
1050 }
1051 }
1052
1053 ops
1054 }
1055
1056 fn find_primary_key_transformation(
1057 &self,
1058 mappings: &[TypedFieldMapping<S>],
1059 ) -> Option<Transformation> {
1060 let primary_key = self.spec.identity.primary_keys.first()?;
1062 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1063
1064 for mapping in mappings {
1066 if mapping.target_path == *primary_key
1068 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1069 {
1070 if let Some(transform) = &mapping.transform {
1072 return Some(transform.clone());
1073 }
1074
1075 if let MappingSource::FromSource {
1077 transform: Some(transform),
1078 ..
1079 } = &mapping.source
1080 {
1081 return Some(transform.clone());
1082 }
1083 }
1084 }
1085
1086 for mapping in mappings {
1088 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1089 if let Some(transform) = field_transforms.get(&primary_field_name) {
1090 return Some(transform.clone());
1091 }
1092 }
1093 }
1094
1095 None
1096 }
1097
1098 pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1101 let primary_key = self.spec.identity.primary_keys.first()?;
1102
1103 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1105
1106 for handler in &self.spec.handlers {
1108 for mapping in &handler.mappings {
1109 if mapping.target_path == *primary_key
1111 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1112 {
1113 if let MappingSource::FromSource {
1115 path, transform, ..
1116 } = &mapping.source
1117 {
1118 if path.segments.last() == Some(&primary_field_name) {
1119 return mapping.transform.clone().or_else(|| transform.clone());
1121 }
1122 }
1123 }
1124
1125 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1127 if let Some(transform) = field_transforms.get(&primary_field_name) {
1128 return Some(transform.clone());
1129 }
1130 }
1131 }
1132 }
1133
1134 None
1135 }
1136
1137 fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1139 primary_key.split('.').next_back().map(|s| s.to_string())
1141 }
1142
1143 pub fn auto_detect_primary_field(
1146 &self,
1147 current_mappings: &[TypedFieldMapping<S>],
1148 ) -> Option<FieldPath> {
1149 let primary_key = self.spec.identity.primary_keys.first()?;
1150
1151 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1153
1154 if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1156 return Some(FieldPath::new(&[&primary_field_name]));
1157 }
1158
1159 None
1160 }
1161
1162 fn current_account_has_primary_field(
1165 &self,
1166 field_name: &str,
1167 mappings: &[TypedFieldMapping<S>],
1168 ) -> bool {
1169 for mapping in mappings {
1171 if let MappingSource::FromSource { path, .. } = &mapping.source {
1172 if path.segments.last() == Some(&field_name.to_string()) {
1174 return true;
1175 }
1176 }
1177 }
1178
1179 false
1180 }
1181
1182 #[allow(dead_code)]
1184 fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1185 for mapping in mappings {
1186 if let MappingSource::FromSource { path, .. } = &mapping.source {
1187 if path.segments.last() == Some(&field_name.to_string()) {
1188 return true;
1189 }
1190 }
1191 }
1192 false
1193 }
1194
1195 #[allow(dead_code)]
1198 fn field_exists_in_mappings(
1199 &self,
1200 field_name: &str,
1201 mappings: &[TypedFieldMapping<S>],
1202 ) -> bool {
1203 for mapping in mappings {
1205 if let MappingSource::FromSource { path, .. } = &mapping.source {
1206 if path.segments.last() == Some(&field_name.to_string()) {
1207 return true;
1208 }
1209 }
1210 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1212 if field_transforms.contains_key(field_name) {
1213 return true;
1214 }
1215 }
1216 }
1217 false
1218 }
1219
1220 fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1221 if field_path.segments.is_empty() {
1222 return None;
1223 }
1224
1225 let lookup_field_name = field_path.segments.last().unwrap();
1226
1227 for lookup_index in &self.spec.identity.lookup_indexes {
1228 let index_field_name = lookup_index
1229 .field_name
1230 .split('.')
1231 .next_back()
1232 .unwrap_or(&lookup_index.field_name);
1233 if index_field_name == lookup_field_name {
1234 return Some(format!("{}_lookup_index", index_field_name));
1235 }
1236 }
1237
1238 None
1239 }
1240
1241 fn find_lookup_index_for_lookup_field(
1244 &self,
1245 primary_field: &FieldPath,
1246 mappings: &[TypedFieldMapping<S>],
1247 ) -> Option<String> {
1248 let primary_path = primary_field.segments.join(".");
1250
1251 for mapping in mappings {
1253 if let MappingSource::FromSource { path, .. } = &mapping.source {
1255 let source_path = path.segments.join(".");
1256 if source_path == primary_path {
1257 for lookup_index in &self.spec.identity.lookup_indexes {
1259 if mapping.target_path == lookup_index.field_name {
1260 let index_field_name = lookup_index
1261 .field_name
1262 .split('.')
1263 .next_back()
1264 .unwrap_or(&lookup_index.field_name);
1265 return Some(format!("{}_lookup_index", index_field_name));
1266 }
1267 }
1268 }
1269 }
1270 }
1271
1272 self.find_lookup_index_for_field(primary_field)
1274 }
1275
1276 fn find_source_path_for_lookup_index(
1280 &self,
1281 mappings: &[TypedFieldMapping<S>],
1282 lookup_field_name: &str,
1283 ) -> Option<Vec<String>> {
1284 for mapping in mappings {
1285 if mapping.target_path == lookup_field_name {
1286 if let MappingSource::FromSource { path, .. } = &mapping.source {
1287 return Some(path.segments.clone());
1288 }
1289 }
1290 }
1291 None
1292 }
1293
1294 fn compile_temporal_index_update(
1295 &self,
1296 resolution: &KeyResolutionStrategy,
1297 key_reg: Register,
1298 mappings: &[TypedFieldMapping<S>],
1299 ) -> Vec<OpCode> {
1300 let mut ops = Vec::new();
1301
1302 for lookup_index in &self.spec.identity.lookup_indexes {
1303 let lookup_reg = 17;
1304 let source_field = lookup_index
1305 .field_name
1306 .split('.')
1307 .next_back()
1308 .unwrap_or(&lookup_index.field_name);
1309
1310 match resolution {
1311 KeyResolutionStrategy::Embedded { primary_field: _ } => {
1312 let source_path_opt =
1315 self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1316
1317 let load_path = if let Some(ref path) = source_path_opt {
1318 FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1319 } else {
1320 FieldPath::new(&[source_field])
1322 };
1323
1324 ops.push(OpCode::LoadEventField {
1325 path: load_path,
1326 dest: lookup_reg,
1327 default: None,
1328 });
1329
1330 if let Some(temporal_field_name) = &lookup_index.temporal_field {
1331 let timestamp_reg = 18;
1332
1333 ops.push(OpCode::LoadEventField {
1334 path: FieldPath::new(&[temporal_field_name]),
1335 dest: timestamp_reg,
1336 default: None,
1337 });
1338
1339 let index_name = format!("{}_temporal_index", source_field);
1340 ops.push(OpCode::UpdateTemporalIndex {
1341 state_id: self.state_id,
1342 index_name,
1343 lookup_value: lookup_reg,
1344 primary_key: key_reg,
1345 timestamp: timestamp_reg,
1346 });
1347
1348 let simple_index_name = format!("{}_lookup_index", source_field);
1349 ops.push(OpCode::UpdateLookupIndex {
1350 state_id: self.state_id,
1351 index_name: simple_index_name,
1352 lookup_value: lookup_reg,
1353 primary_key: key_reg,
1354 });
1355 } else {
1356 let index_name = format!("{}_lookup_index", source_field);
1357 ops.push(OpCode::UpdateLookupIndex {
1358 state_id: self.state_id,
1359 index_name,
1360 lookup_value: lookup_reg,
1361 primary_key: key_reg,
1362 });
1363 }
1364
1365 if source_path_opt.is_some() {
1369 ops.push(OpCode::UpdatePdaReverseLookup {
1370 state_id: self.state_id,
1371 lookup_name: "default_pda_lookup".to_string(),
1372 pda_address: lookup_reg,
1373 primary_key: key_reg,
1374 });
1375 }
1376 }
1377 KeyResolutionStrategy::Lookup { primary_field } => {
1378 let has_mapping_to_lookup_field = mappings
1381 .iter()
1382 .any(|m| m.target_path == lookup_index.field_name);
1383
1384 if has_mapping_to_lookup_field {
1385 let path_segments: Vec<&str> =
1388 primary_field.segments.iter().map(|s| s.as_str()).collect();
1389 ops.push(OpCode::LoadEventField {
1390 path: FieldPath::new(&path_segments),
1391 dest: lookup_reg,
1392 default: None,
1393 });
1394
1395 let index_name = format!("{}_lookup_index", source_field);
1396 ops.push(OpCode::UpdateLookupIndex {
1397 state_id: self.state_id,
1398 index_name,
1399 lookup_value: lookup_reg,
1400 primary_key: key_reg,
1401 });
1402 }
1403 }
1404 KeyResolutionStrategy::Computed { .. }
1405 | KeyResolutionStrategy::TemporalLookup { .. } => {
1406 }
1408 }
1409 }
1410
1411 ops
1412 }
1413
1414 fn get_event_type(&self, source: &SourceSpec) -> String {
1415 match source {
1416 SourceSpec::Source { type_name, .. } => type_name.clone(),
1417 }
1418 }
1419
1420 fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1421 let mut ops = Vec::new();
1422 let state_reg = 2;
1423
1424 for action in actions {
1425 match action {
1426 HookAction::SetField {
1427 target_field,
1428 source,
1429 condition,
1430 } => {
1431 let _ = condition;
1433
1434 let temp_reg = 11; let load_ops = self.compile_mapping_source(source, temp_reg);
1438 ops.extend(load_ops);
1439
1440 if let MappingSource::FromSource {
1442 transform: Some(transform_type),
1443 ..
1444 } = source
1445 {
1446 ops.push(OpCode::Transform {
1447 source: temp_reg,
1448 dest: temp_reg,
1449 transformation: transform_type.clone(),
1450 });
1451 }
1452
1453 if let Some(cond_expr) = condition {
1455 if let Some(parsed) = &cond_expr.parsed {
1456 let cond_check_ops = self.compile_condition_check(
1458 parsed,
1459 temp_reg,
1460 state_reg,
1461 target_field,
1462 );
1463 ops.extend(cond_check_ops);
1464 } else {
1465 ops.push(OpCode::SetField {
1467 object: state_reg,
1468 path: target_field.clone(),
1469 value: temp_reg,
1470 });
1471 }
1472 } else {
1473 ops.push(OpCode::SetField {
1475 object: state_reg,
1476 path: target_field.clone(),
1477 value: temp_reg,
1478 });
1479 }
1480 }
1481 HookAction::IncrementField {
1482 target_field,
1483 increment_by,
1484 condition,
1485 } => {
1486 if let Some(cond_expr) = condition {
1487 if let Some(parsed) = &cond_expr.parsed {
1488 let cond_check_ops = self.compile_conditional_increment(
1493 parsed,
1494 state_reg,
1495 target_field,
1496 *increment_by,
1497 );
1498 ops.extend(cond_check_ops);
1499 } else {
1500 ops.push(OpCode::SetFieldIncrement {
1502 object: state_reg,
1503 path: target_field.clone(),
1504 });
1505 }
1506 } else {
1507 ops.push(OpCode::SetFieldIncrement {
1509 object: state_reg,
1510 path: target_field.clone(),
1511 });
1512 }
1513 }
1514 HookAction::RegisterPdaMapping { .. } => {
1515 }
1518 }
1519 }
1520
1521 ops
1522 }
1523
1524 fn compile_condition_check(
1525 &self,
1526 condition: &ParsedCondition,
1527 value_reg: Register,
1528 state_reg: Register,
1529 target_field: &str,
1530 ) -> Vec<OpCode> {
1531 match condition {
1532 ParsedCondition::Comparison {
1533 field,
1534 op,
1535 value: cond_value,
1536 } => {
1537 vec![OpCode::ConditionalSetField {
1539 object: state_reg,
1540 path: target_field.to_string(),
1541 value: value_reg,
1542 condition_field: field.clone(),
1543 condition_op: op.clone(),
1544 condition_value: cond_value.clone(),
1545 }]
1546 }
1547 ParsedCondition::Logical { .. } => {
1548 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1550 vec![OpCode::SetField {
1551 object: state_reg,
1552 path: target_field.to_string(),
1553 value: value_reg,
1554 }]
1555 }
1556 }
1557 }
1558
1559 fn compile_conditional_increment(
1560 &self,
1561 condition: &ParsedCondition,
1562 state_reg: Register,
1563 target_field: &str,
1564 _increment_by: i64,
1565 ) -> Vec<OpCode> {
1566 match condition {
1567 ParsedCondition::Comparison {
1568 field,
1569 op,
1570 value: cond_value,
1571 } => {
1572 vec![OpCode::ConditionalIncrement {
1573 object: state_reg,
1574 path: target_field.to_string(),
1575 condition_field: field.clone(),
1576 condition_op: op.clone(),
1577 condition_value: cond_value.clone(),
1578 }]
1579 }
1580 ParsedCondition::Logical { .. } => {
1581 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1582 vec![OpCode::SetFieldIncrement {
1583 object: state_reg,
1584 path: target_field.to_string(),
1585 }]
1586 }
1587 }
1588 }
1589}