1use crate::ast::*;
2use serde_json::Value;
3use std::collections::{HashMap, HashSet};
4use tracing;
5
6pub type Register = usize;
7
8fn stop_field_path(target_path: &str) -> String {
9 format!("__stop:{}", target_path)
10}
11
12#[derive(Debug, Clone)]
13pub enum OpCode {
14 LoadEventField {
15 path: FieldPath,
16 dest: Register,
17 default: Option<Value>,
18 },
19 LoadConstant {
20 value: Value,
21 dest: Register,
22 },
23 CopyRegister {
24 source: Register,
25 dest: Register,
26 },
27 CopyRegisterIfNull {
29 source: Register,
30 dest: Register,
31 },
32 GetEventType {
33 dest: Register,
34 },
35 CreateObject {
36 dest: Register,
37 },
38 SetField {
39 object: Register,
40 path: String,
41 value: Register,
42 },
43 SetFields {
44 object: Register,
45 fields: Vec<(String, Register)>,
46 },
47 GetField {
48 object: Register,
49 path: String,
50 dest: Register,
51 },
52 ReadOrInitState {
53 state_id: u32,
54 key: Register,
55 default: Value,
56 dest: Register,
57 },
58 UpdateState {
59 state_id: u32,
60 key: Register,
61 value: Register,
62 },
63 AppendToArray {
64 object: Register,
65 path: String,
66 value: Register,
67 },
68 GetCurrentTimestamp {
69 dest: Register,
70 },
71 CreateEvent {
72 dest: Register,
73 event_value: Register,
74 },
75 CreateCapture {
76 dest: Register,
77 capture_value: Register,
78 },
79 Transform {
80 source: Register,
81 dest: Register,
82 transformation: Transformation,
83 },
84 EmitMutation {
85 entity_name: String,
86 key: Register,
87 state: Register,
88 },
89 SetFieldIfNull {
90 object: Register,
91 path: String,
92 value: Register,
93 },
94 SetFieldMax {
95 object: Register,
96 path: String,
97 value: Register,
98 },
99 UpdateTemporalIndex {
100 state_id: u32,
101 index_name: String,
102 lookup_value: Register,
103 primary_key: Register,
104 timestamp: Register,
105 },
106 LookupTemporalIndex {
107 state_id: u32,
108 index_name: String,
109 lookup_value: Register,
110 timestamp: Register,
111 dest: Register,
112 },
113 UpdateLookupIndex {
114 state_id: u32,
115 index_name: String,
116 lookup_value: Register,
117 primary_key: Register,
118 },
119 LookupIndex {
120 state_id: u32,
121 index_name: String,
122 lookup_value: Register,
123 dest: Register,
124 },
125 SetFieldSum {
127 object: Register,
128 path: String,
129 value: Register,
130 },
131 SetFieldIncrement {
133 object: Register,
134 path: String,
135 },
136 SetFieldMin {
138 object: Register,
139 path: String,
140 value: Register,
141 },
142 SetFieldWhen {
145 object: Register,
146 path: String,
147 value: Register,
148 when_instruction: String,
149 entity_name: String,
150 key_reg: Register,
151 condition_field: Option<FieldPath>,
152 condition_op: Option<ComparisonOp>,
153 condition_value: Option<Value>,
154 },
155 SetFieldUnlessStopped {
158 object: Register,
159 path: String,
160 value: Register,
161 stop_field: String,
162 stop_instruction: String,
163 entity_name: String,
164 key_reg: Register,
165 },
166 AddToUniqueSet {
169 state_id: u32,
170 set_name: String,
171 value: Register,
172 count_object: Register,
173 count_path: String,
174 },
175 ConditionalSetField {
177 object: Register,
178 path: String,
179 value: Register,
180 condition_field: FieldPath,
181 condition_op: ComparisonOp,
182 condition_value: Value,
183 },
184 ConditionalIncrement {
186 object: Register,
187 path: String,
188 condition_field: FieldPath,
189 condition_op: ComparisonOp,
190 condition_value: Value,
191 },
192 EvaluateComputedFields {
195 state: Register,
196 computed_paths: Vec<String>,
197 },
198 QueueResolver {
200 state_id: u32,
201 entity_name: String,
202 resolver: ResolverType,
203 input_path: Option<String>,
204 input_value: Option<Value>,
205 strategy: ResolveStrategy,
206 extracts: Vec<ResolverExtractSpec>,
207 state: Register,
208 key: Register,
209 },
210 UpdatePdaReverseLookup {
213 state_id: u32,
214 lookup_name: String,
215 pda_address: Register,
216 primary_key: Register,
217 },
218}
219
220pub struct EntityBytecode {
221 pub state_id: u32,
222 pub handlers: HashMap<String, Vec<OpCode>>,
223 pub entity_name: String,
224 pub when_events: HashSet<String>,
225 pub non_emitted_fields: HashSet<String>,
226 pub computed_paths: Vec<String>,
227 #[allow(clippy::type_complexity)]
230 pub computed_fields_evaluator: Option<
231 Box<
232 dyn Fn(
233 &mut Value,
234 Option<u64>,
235 i64,
236 ) -> std::result::Result<(), Box<dyn std::error::Error>>
237 + Send
238 + Sync,
239 >,
240 >,
241}
242
243impl std::fmt::Debug for EntityBytecode {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 f.debug_struct("EntityBytecode")
246 .field("state_id", &self.state_id)
247 .field("handlers", &self.handlers)
248 .field("entity_name", &self.entity_name)
249 .field("when_events", &self.when_events)
250 .field("non_emitted_fields", &self.non_emitted_fields)
251 .field("computed_paths", &self.computed_paths)
252 .field(
253 "computed_fields_evaluator",
254 &self.computed_fields_evaluator.is_some(),
255 )
256 .finish()
257 }
258}
259
260#[derive(Debug)]
261pub struct MultiEntityBytecode {
262 pub entities: HashMap<String, EntityBytecode>,
263 pub event_routing: HashMap<String, Vec<String>>,
264 pub when_events: HashSet<String>,
265 pub proto_router: crate::proto_router::ProtoRouter,
266}
267
268impl MultiEntityBytecode {
269 pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
270 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
271 let entity_bytecode = compiler.compile_entity();
272
273 let mut entities = HashMap::new();
274 let mut event_routing = HashMap::new();
275 let mut when_events = HashSet::new();
276
277 for event_type in entity_bytecode.handlers.keys() {
278 event_routing
279 .entry(event_type.clone())
280 .or_insert_with(Vec::new)
281 .push(entity_name.clone());
282 }
283
284 when_events.extend(entity_bytecode.when_events.iter().cloned());
285
286 entities.insert(entity_name, entity_bytecode);
287
288 MultiEntityBytecode {
289 entities,
290 event_routing,
291 when_events,
292 proto_router: crate::proto_router::ProtoRouter::new(),
293 }
294 }
295
296 pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
297 let entities = HashMap::new();
298 let event_routing = HashMap::new();
299 let when_events = HashSet::new();
300
301 if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
302 panic!("from_entities requires type information - use builder pattern instead");
303 }
304
305 MultiEntityBytecode {
306 entities,
307 event_routing,
308 when_events,
309 proto_router: crate::proto_router::ProtoRouter::new(),
310 }
311 }
312
313 #[allow(clippy::new_ret_no_self)]
314 pub fn new() -> MultiEntityBytecodeBuilder {
315 MultiEntityBytecodeBuilder {
316 entities: HashMap::new(),
317 event_routing: HashMap::new(),
318 when_events: HashSet::new(),
319 proto_router: crate::proto_router::ProtoRouter::new(),
320 }
321 }
322}
323
324pub struct MultiEntityBytecodeBuilder {
325 entities: HashMap<String, EntityBytecode>,
326 event_routing: HashMap<String, Vec<String>>,
327 when_events: HashSet<String>,
328 proto_router: crate::proto_router::ProtoRouter,
329}
330
331impl MultiEntityBytecodeBuilder {
332 pub fn add_entity<S>(
333 self,
334 entity_name: String,
335 spec: TypedStreamSpec<S>,
336 state_id: u32,
337 ) -> Self {
338 self.add_entity_with_evaluator(
339 entity_name,
340 spec,
341 state_id,
342 None::<
343 fn(
344 &mut Value,
345 Option<u64>,
346 i64,
347 ) -> std::result::Result<(), Box<dyn std::error::Error>>,
348 >,
349 )
350 }
351
352 pub fn add_entity_with_evaluator<S, F>(
353 mut self,
354 entity_name: String,
355 spec: TypedStreamSpec<S>,
356 state_id: u32,
357 evaluator: Option<F>,
358 ) -> Self
359 where
360 F: Fn(&mut Value, Option<u64>, i64) -> std::result::Result<(), Box<dyn std::error::Error>>
361 + Send
362 + Sync
363 + 'static,
364 {
365 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
366 let mut entity_bytecode = compiler.compile_entity();
367
368 if let Some(eval) = evaluator {
370 entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
371 }
372
373 for event_type in entity_bytecode.handlers.keys() {
374 self.event_routing
375 .entry(event_type.clone())
376 .or_default()
377 .push(entity_name.clone());
378 }
379
380 self.when_events
381 .extend(entity_bytecode.when_events.iter().cloned());
382
383 self.entities.insert(entity_name, entity_bytecode);
384 self
385 }
386
387 pub fn build(self) -> MultiEntityBytecode {
388 MultiEntityBytecode {
389 entities: self.entities,
390 event_routing: self.event_routing,
391 when_events: self.when_events,
392 proto_router: self.proto_router,
393 }
394 }
395}
396
397pub struct TypedCompiler<S> {
398 pub spec: TypedStreamSpec<S>,
399 entity_name: String,
400 state_id: u32,
401}
402
403impl<S> TypedCompiler<S> {
404 pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
405 TypedCompiler {
406 spec,
407 entity_name,
408 state_id: 0,
409 }
410 }
411
412 pub fn with_state_id(mut self, state_id: u32) -> Self {
413 self.state_id = state_id;
414 self
415 }
416
417 pub fn compile(&self) -> MultiEntityBytecode {
418 let entity_bytecode = self.compile_entity();
419
420 let mut entities = HashMap::new();
421 let mut event_routing = HashMap::new();
422 let mut when_events = HashSet::new();
423
424 for event_type in entity_bytecode.handlers.keys() {
425 event_routing
426 .entry(event_type.clone())
427 .or_insert_with(Vec::new)
428 .push(self.entity_name.clone());
429 }
430
431 when_events.extend(entity_bytecode.when_events.iter().cloned());
432
433 entities.insert(self.entity_name.clone(), entity_bytecode);
434
435 MultiEntityBytecode {
436 entities,
437 event_routing,
438 when_events,
439 proto_router: crate::proto_router::ProtoRouter::new(),
440 }
441 }
442
443 fn compile_entity(&self) -> EntityBytecode {
444 let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
445 let mut when_events: HashSet<String> = HashSet::new();
446 let mut emit_by_path: HashMap<String, bool> = HashMap::new();
447
448 let mut debug_info = Vec::new();
450 for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
451 let event_type = self.get_event_type(&handler_spec.source);
452 let program_id = match &handler_spec.source {
453 crate::ast::SourceSpec::Source { program_id, .. } => {
454 program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
455 }
456 };
457 debug_info.push(format!(
458 " [{}] EventType={}, Mappings={}, ProgramId={}",
459 index,
460 event_type,
461 handler_spec.mappings.len(),
462 program_id
463 ));
464 }
465
466 for handler_spec in &self.spec.handlers {
476 for mapping in &handler_spec.mappings {
477 if let Some(when) = &mapping.when {
478 when_events.insert(when.clone());
479 }
480 let entry = emit_by_path
481 .entry(mapping.target_path.clone())
482 .or_insert(false);
483 *entry |= mapping.emit;
484 if mapping.stop.is_some() {
485 emit_by_path
486 .entry(stop_field_path(&mapping.target_path))
487 .or_insert(false);
488 }
489 }
490 let opcodes = self.compile_handler(handler_spec);
491 let event_type = self.get_event_type(&handler_spec.source);
492
493 if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
494 let mut existing_setup = Vec::new();
499 let mut existing_mappings = Vec::new();
500 let mut existing_teardown = Vec::new();
501 let mut section = 0; for opcode in existing_opcodes.iter() {
504 match opcode {
505 OpCode::ReadOrInitState { .. } => {
506 existing_setup.push(opcode.clone());
507 section = 1; }
509 OpCode::UpdateState { .. } => {
510 existing_teardown.push(opcode.clone());
511 section = 2; }
513 OpCode::EmitMutation { .. } => {
514 existing_teardown.push(opcode.clone());
515 }
516 _ if section == 0 => existing_setup.push(opcode.clone()),
517 _ if section == 1 => existing_mappings.push(opcode.clone()),
518 _ => existing_teardown.push(opcode.clone()),
519 }
520 }
521
522 let mut new_mappings = Vec::new();
524 section = 0;
525
526 for opcode in opcodes.iter() {
527 match opcode {
528 OpCode::ReadOrInitState { .. } => {
529 section = 1; }
531 OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
532 section = 2; }
534 _ if section == 1 => {
535 new_mappings.push(opcode.clone());
536 }
537 _ => {} }
539 }
540
541 let mut merged = Vec::new();
543 merged.extend(existing_setup);
544 merged.extend(existing_mappings);
545 merged.extend(new_mappings.clone());
546 merged.extend(existing_teardown);
547
548 *existing_opcodes = merged;
549 } else {
550 handlers.insert(event_type, opcodes);
551 }
552 }
553
554 for hook in &self.spec.instruction_hooks {
556 let event_type = hook.instruction_type.clone();
557
558 let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
559 let key_reg = 20;
560 let state_reg = 2;
561 let resolved_key_reg = 19;
562 let temp_reg = 18;
563
564 let mut ops = Vec::new();
565
566 ops.push(OpCode::LoadEventField {
568 path: FieldPath::new(&["__resolved_primary_key"]),
569 dest: resolved_key_reg,
570 default: Some(serde_json::json!(null)),
571 });
572
573 ops.push(OpCode::CopyRegister {
575 source: resolved_key_reg,
576 dest: key_reg,
577 });
578
579 if let Some(lookup_path) = &hook.lookup_by {
581 ops.push(OpCode::LoadEventField {
583 path: lookup_path.clone(),
584 dest: temp_reg,
585 default: None,
586 });
587
588 ops.push(OpCode::Transform {
590 source: temp_reg,
591 dest: temp_reg,
592 transformation: Transformation::HexEncode,
593 });
594
595 ops.push(OpCode::CopyRegisterIfNull {
597 source: temp_reg,
598 dest: key_reg,
599 });
600 }
601
602 ops.push(OpCode::ReadOrInitState {
603 state_id: self.state_id,
604 key: key_reg,
605 default: serde_json::json!({}),
606 dest: state_reg,
607 });
608
609 ops.push(OpCode::UpdateState {
610 state_id: self.state_id,
611 key: key_reg,
612 value: state_reg,
613 });
614
615 ops
616 });
617
618 let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
620
621 let insert_pos = handler_opcodes
625 .iter()
626 .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
627 .or_else(|| {
628 handler_opcodes
629 .iter()
630 .position(|op| matches!(op, OpCode::UpdateState { .. }))
631 });
632
633 if let Some(pos) = insert_pos {
634 for (i, opcode) in hook_opcodes.into_iter().enumerate() {
636 handler_opcodes.insert(pos + i, opcode);
637 }
638 }
639 }
640
641 let non_emitted_fields: HashSet<String> = emit_by_path
642 .into_iter()
643 .filter_map(|(path, emit)| if emit { None } else { Some(path) })
644 .collect();
645
646 EntityBytecode {
647 state_id: self.state_id,
648 handlers,
649 entity_name: self.entity_name.clone(),
650 when_events,
651 non_emitted_fields,
652 computed_paths: self.spec.computed_fields.clone(),
653 computed_fields_evaluator: None,
654 }
655 }
656
657 fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
658 let mut ops = Vec::new();
659 let state_reg = 2;
660 let key_reg = 20;
661
662 ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
663
664 ops.push(OpCode::ReadOrInitState {
665 state_id: self.state_id,
666 key: key_reg,
667 default: serde_json::json!({}),
668 dest: state_reg,
669 });
670
671 ops.extend(self.compile_temporal_index_update(
678 &spec.key_resolution,
679 key_reg,
680 &spec.mappings,
681 ));
682
683 for mapping in &spec.mappings {
684 ops.extend(self.compile_mapping(mapping, state_reg, key_reg));
685 }
686
687 ops.extend(self.compile_resolvers(state_reg, key_reg));
688
689 ops.push(OpCode::EvaluateComputedFields {
691 state: state_reg,
692 computed_paths: self.spec.computed_fields.clone(),
693 });
694
695 ops.push(OpCode::UpdateState {
696 state_id: self.state_id,
697 key: key_reg,
698 value: state_reg,
699 });
700
701 if spec.emit {
702 ops.push(OpCode::EmitMutation {
703 entity_name: self.entity_name.clone(),
704 key: key_reg,
705 state: state_reg,
706 });
707 }
708
709 ops
710 }
711
712 fn compile_resolvers(&self, state_reg: Register, key_reg: Register) -> Vec<OpCode> {
713 let mut ops = Vec::new();
714
715 for resolver_spec in &self.spec.resolver_specs {
716 ops.push(OpCode::QueueResolver {
717 state_id: self.state_id,
718 entity_name: self.entity_name.clone(),
719 resolver: resolver_spec.resolver.clone(),
720 input_path: resolver_spec.input_path.clone(),
721 input_value: resolver_spec.input_value.clone(),
722 strategy: resolver_spec.strategy.clone(),
723 extracts: resolver_spec.extracts.clone(),
724 state: state_reg,
725 key: key_reg,
726 });
727 }
728
729 ops
730 }
731
732 fn compile_mapping(
733 &self,
734 mapping: &TypedFieldMapping<S>,
735 state_reg: Register,
736 key_reg: Register,
737 ) -> Vec<OpCode> {
738 let mut ops = Vec::new();
739 let temp_reg = 10;
740
741 ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
742
743 if let Some(transform) = &mapping.transform {
744 ops.push(OpCode::Transform {
745 source: temp_reg,
746 dest: temp_reg,
747 transformation: transform.clone(),
748 });
749 }
750
751 if let Some(stop_instruction) = &mapping.stop {
752 if mapping.when.is_some() {
753 tracing::warn!(
754 "#[map] stop and when both set for {}. Ignoring when.",
755 mapping.target_path
756 );
757 }
758 if !matches!(mapping.population, PopulationStrategy::LastWrite)
759 && !matches!(mapping.population, PopulationStrategy::Merge)
760 {
761 tracing::warn!(
762 "#[map] stop ignores population strategy {:?}",
763 mapping.population
764 );
765 }
766
767 ops.push(OpCode::SetFieldUnlessStopped {
768 object: state_reg,
769 path: mapping.target_path.clone(),
770 value: temp_reg,
771 stop_field: stop_field_path(&mapping.target_path),
772 stop_instruction: stop_instruction.clone(),
773 entity_name: self.entity_name.clone(),
774 key_reg,
775 });
776 return ops;
777 }
778
779 if let Some(when_instruction) = &mapping.when {
780 if !matches!(mapping.population, PopulationStrategy::LastWrite)
781 && !matches!(mapping.population, PopulationStrategy::Merge)
782 {
783 tracing::warn!(
784 "#[map] when ignores population strategy {:?}",
785 mapping.population
786 );
787 }
788 let (condition_field, condition_op, condition_value) = mapping
789 .condition
790 .as_ref()
791 .and_then(|cond| cond.parsed.as_ref())
792 .and_then(|parsed| match parsed {
793 ParsedCondition::Comparison { field, op, value } => {
794 Some((Some(field.clone()), Some(op.clone()), Some(value.clone())))
795 }
796 ParsedCondition::Logical { .. } => {
797 tracing::warn!("Logical conditions not yet supported for #[map] when");
798 None
799 }
800 })
801 .unwrap_or((None, None, None));
802
803 ops.push(OpCode::SetFieldWhen {
804 object: state_reg,
805 path: mapping.target_path.clone(),
806 value: temp_reg,
807 when_instruction: when_instruction.clone(),
808 entity_name: self.entity_name.clone(),
809 key_reg,
810 condition_field,
811 condition_op,
812 condition_value,
813 });
814 return ops;
815 }
816
817 if let Some(condition) = &mapping.condition {
818 if let Some(parsed) = &condition.parsed {
819 match parsed {
820 ParsedCondition::Comparison {
821 field,
822 op,
823 value: cond_value,
824 } => {
825 if matches!(mapping.population, PopulationStrategy::LastWrite)
826 || matches!(mapping.population, PopulationStrategy::Merge)
827 {
828 ops.push(OpCode::ConditionalSetField {
829 object: state_reg,
830 path: mapping.target_path.clone(),
831 value: temp_reg,
832 condition_field: field.clone(),
833 condition_op: op.clone(),
834 condition_value: cond_value.clone(),
835 });
836 return ops;
837 }
838
839 if matches!(mapping.population, PopulationStrategy::Count) {
840 ops.push(OpCode::ConditionalIncrement {
841 object: state_reg,
842 path: mapping.target_path.clone(),
843 condition_field: field.clone(),
844 condition_op: op.clone(),
845 condition_value: cond_value.clone(),
846 });
847 return ops;
848 }
849
850 tracing::warn!(
851 "Conditional #[map] not supported for population strategy {:?}",
852 mapping.population
853 );
854 }
855 ParsedCondition::Logical { .. } => {
856 tracing::warn!("Logical conditions not yet supported for #[map]");
857 }
858 }
859 }
860 }
861
862 match &mapping.population {
863 PopulationStrategy::Append => {
864 ops.push(OpCode::AppendToArray {
865 object: state_reg,
866 path: mapping.target_path.clone(),
867 value: temp_reg,
868 });
869 }
870 PopulationStrategy::LastWrite => {
871 ops.push(OpCode::SetField {
872 object: state_reg,
873 path: mapping.target_path.clone(),
874 value: temp_reg,
875 });
876 }
877 PopulationStrategy::SetOnce => {
878 ops.push(OpCode::SetFieldIfNull {
879 object: state_reg,
880 path: mapping.target_path.clone(),
881 value: temp_reg,
882 });
883 }
884 PopulationStrategy::Merge => {
885 ops.push(OpCode::SetField {
886 object: state_reg,
887 path: mapping.target_path.clone(),
888 value: temp_reg,
889 });
890 }
891 PopulationStrategy::Max => {
892 ops.push(OpCode::SetFieldMax {
893 object: state_reg,
894 path: mapping.target_path.clone(),
895 value: temp_reg,
896 });
897 }
898 PopulationStrategy::Sum => {
899 ops.push(OpCode::SetFieldSum {
900 object: state_reg,
901 path: mapping.target_path.clone(),
902 value: temp_reg,
903 });
904 }
905 PopulationStrategy::Count => {
906 ops.push(OpCode::SetFieldIncrement {
908 object: state_reg,
909 path: mapping.target_path.clone(),
910 });
911 }
912 PopulationStrategy::Min => {
913 ops.push(OpCode::SetFieldMin {
914 object: state_reg,
915 path: mapping.target_path.clone(),
916 value: temp_reg,
917 });
918 }
919 PopulationStrategy::UniqueCount => {
920 let set_name = format!("{}_unique_set", mapping.target_path);
923 ops.push(OpCode::AddToUniqueSet {
924 state_id: self.state_id,
925 set_name,
926 value: temp_reg,
927 count_object: state_reg,
928 count_path: mapping.target_path.clone(),
929 });
930 }
931 }
932
933 ops
934 }
935
936 fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
937 match source {
938 MappingSource::FromSource {
939 path,
940 default,
941 transform,
942 } => {
943 let mut ops = vec![OpCode::LoadEventField {
944 path: path.clone(),
945 dest,
946 default: default.clone(),
947 }];
948
949 if let Some(transform_type) = transform {
951 ops.push(OpCode::Transform {
952 source: dest,
953 dest,
954 transformation: transform_type.clone(),
955 });
956 }
957
958 ops
959 }
960 MappingSource::Constant(val) => {
961 vec![OpCode::LoadConstant {
962 value: val.clone(),
963 dest,
964 }]
965 }
966 MappingSource::AsEvent { fields } => {
967 let mut ops = Vec::new();
968
969 if fields.is_empty() {
970 let event_data_reg = dest + 1;
971 ops.push(OpCode::LoadEventField {
972 path: FieldPath::new(&[]),
973 dest: event_data_reg,
974 default: Some(serde_json::json!({})),
975 });
976 ops.push(OpCode::CreateEvent {
977 dest,
978 event_value: event_data_reg,
979 });
980 } else {
981 let data_obj_reg = dest + 1;
982 ops.push(OpCode::CreateObject { dest: data_obj_reg });
983
984 let mut field_registers = Vec::new();
985 let mut current_reg = dest + 2;
986
987 for field_source in fields.iter() {
988 if let MappingSource::FromSource {
989 path,
990 default,
991 transform,
992 } = &**field_source
993 {
994 ops.push(OpCode::LoadEventField {
995 path: path.clone(),
996 dest: current_reg,
997 default: default.clone(),
998 });
999
1000 if let Some(transform_type) = transform {
1001 ops.push(OpCode::Transform {
1002 source: current_reg,
1003 dest: current_reg,
1004 transformation: transform_type.clone(),
1005 });
1006 }
1007
1008 if let Some(field_name) = path.segments.last() {
1009 field_registers.push((field_name.clone(), current_reg));
1010 }
1011 current_reg += 1;
1012 }
1013 }
1014
1015 if !field_registers.is_empty() {
1016 ops.push(OpCode::SetFields {
1017 object: data_obj_reg,
1018 fields: field_registers,
1019 });
1020 }
1021
1022 ops.push(OpCode::CreateEvent {
1023 dest,
1024 event_value: data_obj_reg,
1025 });
1026 }
1027
1028 ops
1029 }
1030 MappingSource::WholeSource => {
1031 vec![OpCode::LoadEventField {
1032 path: FieldPath::new(&[]),
1033 dest,
1034 default: Some(serde_json::json!({})),
1035 }]
1036 }
1037 MappingSource::AsCapture { field_transforms } => {
1038 let capture_data_reg = 22; let mut ops = vec![OpCode::LoadEventField {
1041 path: FieldPath::new(&[]),
1042 dest: capture_data_reg,
1043 default: Some(serde_json::json!({})),
1044 }];
1045
1046 let field_reg = 24;
1050 let transformed_reg = 25;
1051
1052 for (field_name, transform) in field_transforms {
1053 ops.push(OpCode::GetField {
1056 object: capture_data_reg,
1057 path: field_name.clone(),
1058 dest: field_reg,
1059 });
1060
1061 ops.push(OpCode::Transform {
1063 source: field_reg,
1064 dest: transformed_reg,
1065 transformation: transform.clone(),
1066 });
1067
1068 ops.push(OpCode::SetField {
1070 object: capture_data_reg,
1071 path: field_name.clone(),
1072 value: transformed_reg,
1073 });
1074 }
1075
1076 ops.push(OpCode::CreateCapture {
1078 dest,
1079 capture_value: capture_data_reg,
1080 });
1081
1082 ops
1083 }
1084 MappingSource::FromContext { field } => {
1085 vec![OpCode::LoadEventField {
1087 path: FieldPath::new(&["__update_context", field.as_str()]),
1088 dest,
1089 default: Some(serde_json::json!(null)),
1090 }]
1091 }
1092 MappingSource::Computed { .. } => {
1093 vec![]
1094 }
1095 MappingSource::FromState { .. } => {
1096 vec![]
1097 }
1098 }
1099 }
1100
1101 pub fn compile_key_loading(
1102 &self,
1103 resolution: &KeyResolutionStrategy,
1104 key_reg: Register,
1105 mappings: &[TypedFieldMapping<S>],
1106 ) -> Vec<OpCode> {
1107 let mut ops = Vec::new();
1108
1109 let resolved_key_reg = 19; ops.push(OpCode::LoadEventField {
1113 path: FieldPath::new(&["__resolved_primary_key"]),
1114 dest: resolved_key_reg,
1115 default: Some(serde_json::json!(null)),
1116 });
1117
1118 match resolution {
1120 KeyResolutionStrategy::Embedded { primary_field } => {
1121 ops.push(OpCode::CopyRegister {
1123 source: resolved_key_reg,
1124 dest: key_reg,
1125 });
1126
1127 let effective_primary_field = if primary_field.segments.is_empty() {
1129 if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
1131 auto_field
1132 } else {
1133 primary_field.clone()
1134 }
1135 } else {
1136 primary_field.clone()
1137 };
1138
1139 if !effective_primary_field.segments.is_empty() {
1143 let temp_reg = 18;
1144 let transform_reg = 23; ops.push(OpCode::LoadEventField {
1147 path: effective_primary_field.clone(),
1148 dest: temp_reg,
1149 default: None,
1150 });
1151
1152 let primary_key_transform = self
1155 .find_primary_key_transformation(mappings)
1156 .or_else(|| self.find_inherited_primary_key_transformation());
1157
1158 if let Some(transform) = primary_key_transform {
1159 ops.push(OpCode::Transform {
1161 source: temp_reg,
1162 dest: transform_reg,
1163 transformation: transform,
1164 });
1165 ops.push(OpCode::CopyRegisterIfNull {
1167 source: transform_reg,
1168 dest: key_reg,
1169 });
1170 } else {
1171 ops.push(OpCode::CopyRegisterIfNull {
1173 source: temp_reg,
1174 dest: key_reg,
1175 });
1176 }
1177 }
1178 }
1181 KeyResolutionStrategy::Lookup { primary_field } => {
1182 let lookup_reg = 15;
1183 let result_reg = 17;
1184
1185 ops.push(OpCode::CopyRegister {
1187 source: resolved_key_reg,
1188 dest: lookup_reg,
1189 });
1190
1191 let temp_reg = 18;
1192 ops.push(OpCode::LoadEventField {
1193 path: primary_field.clone(),
1194 dest: temp_reg,
1195 default: None,
1196 });
1197 ops.push(OpCode::CopyRegisterIfNull {
1198 source: temp_reg,
1199 dest: lookup_reg,
1200 });
1201
1202 let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
1203 let effective_index_name =
1204 index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
1205
1206 ops.push(OpCode::LookupIndex {
1207 state_id: self.state_id,
1208 index_name: effective_index_name,
1209 lookup_value: lookup_reg,
1210 dest: result_reg,
1211 });
1212 ops.push(OpCode::CopyRegister {
1221 source: result_reg,
1222 dest: key_reg,
1223 });
1224 }
1225 KeyResolutionStrategy::Computed {
1226 primary_field,
1227 compute_partition: _,
1228 } => {
1229 ops.push(OpCode::CopyRegister {
1231 source: resolved_key_reg,
1232 dest: key_reg,
1233 });
1234 let temp_reg = 18;
1235 ops.push(OpCode::LoadEventField {
1236 path: primary_field.clone(),
1237 dest: temp_reg,
1238 default: None,
1239 });
1240 ops.push(OpCode::CopyRegisterIfNull {
1241 source: temp_reg,
1242 dest: key_reg,
1243 });
1244 }
1245 KeyResolutionStrategy::TemporalLookup {
1246 lookup_field,
1247 timestamp_field,
1248 index_name,
1249 } => {
1250 ops.push(OpCode::CopyRegister {
1252 source: resolved_key_reg,
1253 dest: key_reg,
1254 });
1255 let lookup_reg = 15;
1256 let timestamp_reg = 16;
1257 let result_reg = 17;
1258
1259 ops.push(OpCode::LoadEventField {
1260 path: lookup_field.clone(),
1261 dest: lookup_reg,
1262 default: None,
1263 });
1264
1265 ops.push(OpCode::LoadEventField {
1266 path: timestamp_field.clone(),
1267 dest: timestamp_reg,
1268 default: None,
1269 });
1270
1271 ops.push(OpCode::LookupTemporalIndex {
1272 state_id: self.state_id,
1273 index_name: index_name.clone(),
1274 lookup_value: lookup_reg,
1275 timestamp: timestamp_reg,
1276 dest: result_reg,
1277 });
1278
1279 ops.push(OpCode::CopyRegisterIfNull {
1280 source: result_reg,
1281 dest: key_reg,
1282 });
1283 }
1284 }
1285
1286 ops
1287 }
1288
1289 fn find_primary_key_transformation(
1290 &self,
1291 mappings: &[TypedFieldMapping<S>],
1292 ) -> Option<Transformation> {
1293 let primary_key = self.spec.identity.primary_keys.first()?;
1295 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1296
1297 for mapping in mappings {
1299 if mapping.target_path == *primary_key
1301 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1302 {
1303 if let Some(transform) = &mapping.transform {
1305 return Some(transform.clone());
1306 }
1307
1308 if let MappingSource::FromSource {
1310 transform: Some(transform),
1311 ..
1312 } = &mapping.source
1313 {
1314 return Some(transform.clone());
1315 }
1316 }
1317 }
1318
1319 for mapping in mappings {
1321 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1322 if let Some(transform) = field_transforms.get(&primary_field_name) {
1323 return Some(transform.clone());
1324 }
1325 }
1326 }
1327
1328 None
1329 }
1330
1331 pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1334 let primary_key = self.spec.identity.primary_keys.first()?;
1335
1336 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1338
1339 for handler in &self.spec.handlers {
1341 for mapping in &handler.mappings {
1342 if mapping.target_path == *primary_key
1344 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1345 {
1346 if let MappingSource::FromSource {
1348 path, transform, ..
1349 } = &mapping.source
1350 {
1351 if path.segments.last() == Some(&primary_field_name) {
1352 return mapping.transform.clone().or_else(|| transform.clone());
1354 }
1355 }
1356 }
1357
1358 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1360 if let Some(transform) = field_transforms.get(&primary_field_name) {
1361 return Some(transform.clone());
1362 }
1363 }
1364 }
1365 }
1366
1367 None
1368 }
1369
1370 fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1372 primary_key.split('.').next_back().map(|s| s.to_string())
1374 }
1375
1376 pub fn auto_detect_primary_field(
1379 &self,
1380 current_mappings: &[TypedFieldMapping<S>],
1381 ) -> Option<FieldPath> {
1382 let primary_key = self.spec.identity.primary_keys.first()?;
1383
1384 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1386
1387 if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1389 return Some(FieldPath::new(&[&primary_field_name]));
1390 }
1391
1392 None
1393 }
1394
1395 fn current_account_has_primary_field(
1398 &self,
1399 field_name: &str,
1400 mappings: &[TypedFieldMapping<S>],
1401 ) -> bool {
1402 for mapping in mappings {
1404 if let MappingSource::FromSource { path, .. } = &mapping.source {
1405 if path.segments.last() == Some(&field_name.to_string()) {
1407 return true;
1408 }
1409 }
1410 }
1411
1412 false
1413 }
1414
1415 #[allow(dead_code)]
1417 fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1418 for mapping in mappings {
1419 if let MappingSource::FromSource { path, .. } = &mapping.source {
1420 if path.segments.last() == Some(&field_name.to_string()) {
1421 return true;
1422 }
1423 }
1424 }
1425 false
1426 }
1427
1428 #[allow(dead_code)]
1431 fn field_exists_in_mappings(
1432 &self,
1433 field_name: &str,
1434 mappings: &[TypedFieldMapping<S>],
1435 ) -> bool {
1436 for mapping in mappings {
1438 if let MappingSource::FromSource { path, .. } = &mapping.source {
1439 if path.segments.last() == Some(&field_name.to_string()) {
1440 return true;
1441 }
1442 }
1443 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1445 if field_transforms.contains_key(field_name) {
1446 return true;
1447 }
1448 }
1449 }
1450 false
1451 }
1452
1453 fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1454 if field_path.segments.is_empty() {
1455 return None;
1456 }
1457
1458 let lookup_field_name = field_path.segments.last().unwrap();
1459
1460 for lookup_index in &self.spec.identity.lookup_indexes {
1461 let index_field_name = lookup_index
1462 .field_name
1463 .split('.')
1464 .next_back()
1465 .unwrap_or(&lookup_index.field_name);
1466 if index_field_name == lookup_field_name {
1467 return Some(format!("{}_lookup_index", index_field_name));
1468 }
1469 }
1470
1471 None
1472 }
1473
1474 fn find_lookup_index_for_lookup_field(
1477 &self,
1478 primary_field: &FieldPath,
1479 mappings: &[TypedFieldMapping<S>],
1480 ) -> Option<String> {
1481 let primary_path = primary_field.segments.join(".");
1483
1484 for mapping in mappings {
1486 if let MappingSource::FromSource { path, .. } = &mapping.source {
1488 let source_path = path.segments.join(".");
1489 if source_path == primary_path {
1490 for lookup_index in &self.spec.identity.lookup_indexes {
1492 if mapping.target_path == lookup_index.field_name {
1493 let index_field_name = lookup_index
1494 .field_name
1495 .split('.')
1496 .next_back()
1497 .unwrap_or(&lookup_index.field_name);
1498 return Some(format!("{}_lookup_index", index_field_name));
1499 }
1500 }
1501 }
1502 }
1503 }
1504
1505 self.find_lookup_index_for_field(primary_field)
1507 }
1508
1509 fn find_source_path_for_lookup_index(
1513 &self,
1514 mappings: &[TypedFieldMapping<S>],
1515 lookup_field_name: &str,
1516 ) -> Option<Vec<String>> {
1517 for mapping in mappings {
1518 if mapping.target_path == lookup_field_name {
1519 if let MappingSource::FromSource { path, .. } = &mapping.source {
1520 return Some(path.segments.clone());
1521 }
1522 }
1523 }
1524 None
1525 }
1526
1527 fn compile_temporal_index_update(
1528 &self,
1529 resolution: &KeyResolutionStrategy,
1530 key_reg: Register,
1531 mappings: &[TypedFieldMapping<S>],
1532 ) -> Vec<OpCode> {
1533 let mut ops = Vec::new();
1534
1535 for lookup_index in &self.spec.identity.lookup_indexes {
1536 let lookup_reg = 17;
1537 let source_field = lookup_index
1538 .field_name
1539 .split('.')
1540 .next_back()
1541 .unwrap_or(&lookup_index.field_name);
1542
1543 match resolution {
1544 KeyResolutionStrategy::Embedded { primary_field: _ } => {
1545 let source_path_opt =
1548 self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1549
1550 let load_path = if let Some(ref path) = source_path_opt {
1551 FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1552 } else {
1553 FieldPath::new(&[source_field])
1555 };
1556
1557 ops.push(OpCode::LoadEventField {
1558 path: load_path,
1559 dest: lookup_reg,
1560 default: None,
1561 });
1562
1563 if let Some(temporal_field_name) = &lookup_index.temporal_field {
1564 let timestamp_reg = 18;
1565
1566 ops.push(OpCode::LoadEventField {
1567 path: FieldPath::new(&[temporal_field_name]),
1568 dest: timestamp_reg,
1569 default: None,
1570 });
1571
1572 let index_name = format!("{}_temporal_index", source_field);
1573 ops.push(OpCode::UpdateTemporalIndex {
1574 state_id: self.state_id,
1575 index_name,
1576 lookup_value: lookup_reg,
1577 primary_key: key_reg,
1578 timestamp: timestamp_reg,
1579 });
1580
1581 let simple_index_name = format!("{}_lookup_index", source_field);
1582 ops.push(OpCode::UpdateLookupIndex {
1583 state_id: self.state_id,
1584 index_name: simple_index_name,
1585 lookup_value: lookup_reg,
1586 primary_key: key_reg,
1587 });
1588 } else {
1589 let index_name = format!("{}_lookup_index", source_field);
1590 ops.push(OpCode::UpdateLookupIndex {
1591 state_id: self.state_id,
1592 index_name,
1593 lookup_value: lookup_reg,
1594 primary_key: key_reg,
1595 });
1596 }
1597
1598 if source_path_opt.is_some() {
1602 ops.push(OpCode::UpdatePdaReverseLookup {
1603 state_id: self.state_id,
1604 lookup_name: "default_pda_lookup".to_string(),
1605 pda_address: lookup_reg,
1606 primary_key: key_reg,
1607 });
1608 }
1609 }
1610 KeyResolutionStrategy::Lookup { primary_field } => {
1611 let has_mapping_to_lookup_field = mappings
1614 .iter()
1615 .any(|m| m.target_path == lookup_index.field_name);
1616
1617 if has_mapping_to_lookup_field {
1618 let path_segments: Vec<&str> =
1621 primary_field.segments.iter().map(|s| s.as_str()).collect();
1622 ops.push(OpCode::LoadEventField {
1623 path: FieldPath::new(&path_segments),
1624 dest: lookup_reg,
1625 default: None,
1626 });
1627
1628 let index_name = format!("{}_lookup_index", source_field);
1629 ops.push(OpCode::UpdateLookupIndex {
1630 state_id: self.state_id,
1631 index_name,
1632 lookup_value: lookup_reg,
1633 primary_key: key_reg,
1634 });
1635 }
1636 }
1637 KeyResolutionStrategy::Computed { .. }
1638 | KeyResolutionStrategy::TemporalLookup { .. } => {
1639 }
1641 }
1642 }
1643
1644 ops
1645 }
1646
1647 fn get_event_type(&self, source: &SourceSpec) -> String {
1648 match source {
1649 SourceSpec::Source { type_name, .. } => type_name.clone(),
1650 }
1651 }
1652
1653 fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1654 let mut ops = Vec::new();
1655 let state_reg = 2;
1656
1657 for action in actions {
1658 match action {
1659 HookAction::SetField {
1660 target_field,
1661 source,
1662 condition,
1663 } => {
1664 let _ = condition;
1666
1667 let temp_reg = 11; let load_ops = self.compile_mapping_source(source, temp_reg);
1671 ops.extend(load_ops);
1672
1673 if let MappingSource::FromSource {
1675 transform: Some(transform_type),
1676 ..
1677 } = source
1678 {
1679 ops.push(OpCode::Transform {
1680 source: temp_reg,
1681 dest: temp_reg,
1682 transformation: transform_type.clone(),
1683 });
1684 }
1685
1686 if let Some(cond_expr) = condition {
1688 if let Some(parsed) = &cond_expr.parsed {
1689 let cond_check_ops = self.compile_condition_check(
1691 parsed,
1692 temp_reg,
1693 state_reg,
1694 target_field,
1695 );
1696 ops.extend(cond_check_ops);
1697 } else {
1698 ops.push(OpCode::SetField {
1700 object: state_reg,
1701 path: target_field.clone(),
1702 value: temp_reg,
1703 });
1704 }
1705 } else {
1706 ops.push(OpCode::SetField {
1708 object: state_reg,
1709 path: target_field.clone(),
1710 value: temp_reg,
1711 });
1712 }
1713 }
1714 HookAction::IncrementField {
1715 target_field,
1716 increment_by,
1717 condition,
1718 } => {
1719 if let Some(cond_expr) = condition {
1720 if let Some(parsed) = &cond_expr.parsed {
1721 let cond_check_ops = self.compile_conditional_increment(
1726 parsed,
1727 state_reg,
1728 target_field,
1729 *increment_by,
1730 );
1731 ops.extend(cond_check_ops);
1732 } else {
1733 ops.push(OpCode::SetFieldIncrement {
1735 object: state_reg,
1736 path: target_field.clone(),
1737 });
1738 }
1739 } else {
1740 ops.push(OpCode::SetFieldIncrement {
1742 object: state_reg,
1743 path: target_field.clone(),
1744 });
1745 }
1746 }
1747 HookAction::RegisterPdaMapping { .. } => {
1748 }
1751 }
1752 }
1753
1754 ops
1755 }
1756
1757 fn compile_condition_check(
1758 &self,
1759 condition: &ParsedCondition,
1760 value_reg: Register,
1761 state_reg: Register,
1762 target_field: &str,
1763 ) -> Vec<OpCode> {
1764 match condition {
1765 ParsedCondition::Comparison {
1766 field,
1767 op,
1768 value: cond_value,
1769 } => {
1770 vec![OpCode::ConditionalSetField {
1772 object: state_reg,
1773 path: target_field.to_string(),
1774 value: value_reg,
1775 condition_field: field.clone(),
1776 condition_op: op.clone(),
1777 condition_value: cond_value.clone(),
1778 }]
1779 }
1780 ParsedCondition::Logical { .. } => {
1781 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1783 vec![OpCode::SetField {
1784 object: state_reg,
1785 path: target_field.to_string(),
1786 value: value_reg,
1787 }]
1788 }
1789 }
1790 }
1791
1792 fn compile_conditional_increment(
1793 &self,
1794 condition: &ParsedCondition,
1795 state_reg: Register,
1796 target_field: &str,
1797 _increment_by: i64,
1798 ) -> Vec<OpCode> {
1799 match condition {
1800 ParsedCondition::Comparison {
1801 field,
1802 op,
1803 value: cond_value,
1804 } => {
1805 vec![OpCode::ConditionalIncrement {
1806 object: state_reg,
1807 path: target_field.to_string(),
1808 condition_field: field.clone(),
1809 condition_op: op.clone(),
1810 condition_value: cond_value.clone(),
1811 }]
1812 }
1813 ParsedCondition::Logical { .. } => {
1814 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1815 vec![OpCode::SetFieldIncrement {
1816 object: state_reg,
1817 path: target_field.to_string(),
1818 }]
1819 }
1820 }
1821 }
1822}