1use crate::ast::*;
2use serde_json::Value;
3use std::collections::{HashMap, HashSet};
4use tracing;
5
6pub type Register = usize;
7
8fn stop_field_path(target_path: &str) -> String {
9 format!("__stop:{}", target_path)
10}
11
12#[derive(Debug, Clone)]
13pub enum OpCode {
14 AbortIfNullKey {
21 key: Register,
22 is_account_event: bool,
23 },
24 LoadEventField {
25 path: FieldPath,
26 dest: Register,
27 default: Option<Value>,
28 },
29 LoadConstant {
30 value: Value,
31 dest: Register,
32 },
33 CopyRegister {
34 source: Register,
35 dest: Register,
36 },
37 CopyRegisterIfNull {
39 source: Register,
40 dest: Register,
41 },
42 GetEventType {
43 dest: Register,
44 },
45 CreateObject {
46 dest: Register,
47 },
48 SetField {
49 object: Register,
50 path: String,
51 value: Register,
52 },
53 SetFields {
54 object: Register,
55 fields: Vec<(String, Register)>,
56 },
57 GetField {
58 object: Register,
59 path: String,
60 dest: Register,
61 },
62 ReadOrInitState {
63 state_id: u32,
64 key: Register,
65 default: Value,
66 dest: Register,
67 },
68 UpdateState {
69 state_id: u32,
70 key: Register,
71 value: Register,
72 },
73 AppendToArray {
74 object: Register,
75 path: String,
76 value: Register,
77 },
78 GetCurrentTimestamp {
79 dest: Register,
80 },
81 CreateEvent {
82 dest: Register,
83 event_value: Register,
84 },
85 CreateCapture {
86 dest: Register,
87 capture_value: Register,
88 },
89 Transform {
90 source: Register,
91 dest: Register,
92 transformation: Transformation,
93 },
94 EmitMutation {
95 entity_name: String,
96 key: Register,
97 state: Register,
98 },
99 SetFieldIfNull {
100 object: Register,
101 path: String,
102 value: Register,
103 },
104 SetFieldMax {
105 object: Register,
106 path: String,
107 value: Register,
108 },
109 UpdateTemporalIndex {
110 state_id: u32,
111 index_name: String,
112 lookup_value: Register,
113 primary_key: Register,
114 timestamp: Register,
115 },
116 LookupTemporalIndex {
117 state_id: u32,
118 index_name: String,
119 lookup_value: Register,
120 timestamp: Register,
121 dest: Register,
122 },
123 UpdateLookupIndex {
124 state_id: u32,
125 index_name: String,
126 lookup_value: Register,
127 primary_key: Register,
128 },
129 LookupIndex {
130 state_id: u32,
131 index_name: String,
132 lookup_value: Register,
133 dest: Register,
134 },
135 SetFieldSum {
137 object: Register,
138 path: String,
139 value: Register,
140 },
141 SetFieldIncrement {
143 object: Register,
144 path: String,
145 },
146 SetFieldMin {
148 object: Register,
149 path: String,
150 value: Register,
151 },
152 SetFieldWhen {
155 object: Register,
156 path: String,
157 value: Register,
158 when_instruction: String,
159 entity_name: String,
160 key_reg: Register,
161 condition_field: Option<FieldPath>,
162 condition_op: Option<ComparisonOp>,
163 condition_value: Option<Value>,
164 },
165 SetFieldUnlessStopped {
168 object: Register,
169 path: String,
170 value: Register,
171 stop_field: String,
172 stop_instruction: String,
173 entity_name: String,
174 key_reg: Register,
175 },
176 AddToUniqueSet {
179 state_id: u32,
180 set_name: String,
181 value: Register,
182 count_object: Register,
183 count_path: String,
184 },
185 ConditionalSetField {
187 object: Register,
188 path: String,
189 value: Register,
190 condition_field: FieldPath,
191 condition_op: ComparisonOp,
192 condition_value: Value,
193 },
194 ConditionalIncrement {
196 object: Register,
197 path: String,
198 condition_field: FieldPath,
199 condition_op: ComparisonOp,
200 condition_value: Value,
201 },
202 EvaluateComputedFields {
205 state: Register,
206 computed_paths: Vec<String>,
207 },
208 QueueResolver {
210 state_id: u32,
211 entity_name: String,
212 resolver: ResolverType,
213 input_path: Option<String>,
214 input_value: Option<Value>,
215 url_template: Option<Vec<UrlTemplatePart>>,
216 strategy: ResolveStrategy,
217 extracts: Vec<ResolverExtractSpec>,
218 condition: Option<ResolverCondition>,
219 schedule_at: Option<String>,
220 state: Register,
221 key: Register,
222 },
223 UpdatePdaReverseLookup {
226 state_id: u32,
227 lookup_name: String,
228 pda_address: Register,
229 primary_key: Register,
230 },
231}
232
233pub struct EntityBytecode {
234 pub state_id: u32,
235 pub handlers: HashMap<String, Vec<OpCode>>,
236 pub entity_name: String,
237 pub when_events: HashSet<String>,
238 pub non_emitted_fields: HashSet<String>,
239 pub computed_paths: Vec<String>,
240 #[allow(clippy::type_complexity)]
243 pub computed_fields_evaluator: Option<
244 Box<
245 dyn Fn(
246 &mut Value,
247 Option<u64>,
248 i64,
249 ) -> std::result::Result<(), Box<dyn std::error::Error>>
250 + Send
251 + Sync,
252 >,
253 >,
254}
255
256impl std::fmt::Debug for EntityBytecode {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 f.debug_struct("EntityBytecode")
259 .field("state_id", &self.state_id)
260 .field("handlers", &self.handlers)
261 .field("entity_name", &self.entity_name)
262 .field("when_events", &self.when_events)
263 .field("non_emitted_fields", &self.non_emitted_fields)
264 .field("computed_paths", &self.computed_paths)
265 .field(
266 "computed_fields_evaluator",
267 &self.computed_fields_evaluator.is_some(),
268 )
269 .finish()
270 }
271}
272
273#[derive(Debug)]
274pub struct MultiEntityBytecode {
275 pub entities: HashMap<String, EntityBytecode>,
276 pub event_routing: HashMap<String, Vec<String>>,
277 pub when_events: HashSet<String>,
278 pub proto_router: crate::proto_router::ProtoRouter,
279}
280
281impl MultiEntityBytecode {
282 pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
283 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
284 let entity_bytecode = compiler.compile_entity();
285
286 let mut entities = HashMap::new();
287 let mut event_routing = HashMap::new();
288 let mut when_events = HashSet::new();
289
290 for event_type in entity_bytecode.handlers.keys() {
291 event_routing
292 .entry(event_type.clone())
293 .or_insert_with(Vec::new)
294 .push(entity_name.clone());
295 }
296
297 when_events.extend(entity_bytecode.when_events.iter().cloned());
298
299 entities.insert(entity_name, entity_bytecode);
300
301 MultiEntityBytecode {
302 entities,
303 event_routing,
304 when_events,
305 proto_router: crate::proto_router::ProtoRouter::new(),
306 }
307 }
308
309 pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
310 let entities = HashMap::new();
311 let event_routing = HashMap::new();
312 let when_events = HashSet::new();
313
314 if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
315 panic!("from_entities requires type information - use builder pattern instead");
316 }
317
318 MultiEntityBytecode {
319 entities,
320 event_routing,
321 when_events,
322 proto_router: crate::proto_router::ProtoRouter::new(),
323 }
324 }
325
326 #[allow(clippy::new_ret_no_self)]
327 pub fn new() -> MultiEntityBytecodeBuilder {
328 MultiEntityBytecodeBuilder {
329 entities: HashMap::new(),
330 event_routing: HashMap::new(),
331 when_events: HashSet::new(),
332 proto_router: crate::proto_router::ProtoRouter::new(),
333 }
334 }
335}
336
337pub struct MultiEntityBytecodeBuilder {
338 entities: HashMap<String, EntityBytecode>,
339 event_routing: HashMap<String, Vec<String>>,
340 when_events: HashSet<String>,
341 proto_router: crate::proto_router::ProtoRouter,
342}
343
344impl MultiEntityBytecodeBuilder {
345 pub fn add_entity<S>(
346 self,
347 entity_name: String,
348 spec: TypedStreamSpec<S>,
349 state_id: u32,
350 ) -> Self {
351 self.add_entity_with_evaluator(
352 entity_name,
353 spec,
354 state_id,
355 None::<
356 fn(
357 &mut Value,
358 Option<u64>,
359 i64,
360 ) -> std::result::Result<(), Box<dyn std::error::Error>>,
361 >,
362 )
363 }
364
365 pub fn add_entity_with_evaluator<S, F>(
366 mut self,
367 entity_name: String,
368 spec: TypedStreamSpec<S>,
369 state_id: u32,
370 evaluator: Option<F>,
371 ) -> Self
372 where
373 F: Fn(&mut Value, Option<u64>, i64) -> std::result::Result<(), Box<dyn std::error::Error>>
374 + Send
375 + Sync
376 + 'static,
377 {
378 let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
379 let mut entity_bytecode = compiler.compile_entity();
380
381 if let Some(eval) = evaluator {
383 entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
384 }
385
386 for event_type in entity_bytecode.handlers.keys() {
387 self.event_routing
388 .entry(event_type.clone())
389 .or_default()
390 .push(entity_name.clone());
391 }
392
393 self.when_events
394 .extend(entity_bytecode.when_events.iter().cloned());
395
396 self.entities.insert(entity_name, entity_bytecode);
397 self
398 }
399
400 pub fn build(self) -> MultiEntityBytecode {
401 MultiEntityBytecode {
402 entities: self.entities,
403 event_routing: self.event_routing,
404 when_events: self.when_events,
405 proto_router: self.proto_router,
406 }
407 }
408}
409
410pub struct TypedCompiler<S> {
411 pub spec: TypedStreamSpec<S>,
412 entity_name: String,
413 state_id: u32,
414}
415
416impl<S> TypedCompiler<S> {
417 pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
418 TypedCompiler {
419 spec,
420 entity_name,
421 state_id: 0,
422 }
423 }
424
425 pub fn with_state_id(mut self, state_id: u32) -> Self {
426 self.state_id = state_id;
427 self
428 }
429
430 pub fn compile(&self) -> MultiEntityBytecode {
431 let entity_bytecode = self.compile_entity();
432
433 let mut entities = HashMap::new();
434 let mut event_routing = HashMap::new();
435 let mut when_events = HashSet::new();
436
437 for event_type in entity_bytecode.handlers.keys() {
438 event_routing
439 .entry(event_type.clone())
440 .or_insert_with(Vec::new)
441 .push(self.entity_name.clone());
442 }
443
444 when_events.extend(entity_bytecode.when_events.iter().cloned());
445
446 entities.insert(self.entity_name.clone(), entity_bytecode);
447
448 MultiEntityBytecode {
449 entities,
450 event_routing,
451 when_events,
452 proto_router: crate::proto_router::ProtoRouter::new(),
453 }
454 }
455
456 fn compile_entity(&self) -> EntityBytecode {
457 let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
458 let mut when_events: HashSet<String> = HashSet::new();
459 let mut emit_by_path: HashMap<String, bool> = HashMap::new();
460
461 let mut debug_info = Vec::new();
463 for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
464 let event_type = self.get_event_type(&handler_spec.source);
465 let program_id = match &handler_spec.source {
466 crate::ast::SourceSpec::Source { program_id, .. } => {
467 program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
468 }
469 };
470 debug_info.push(format!(
471 " [{}] EventType={}, Mappings={}, ProgramId={}",
472 index,
473 event_type,
474 handler_spec.mappings.len(),
475 program_id
476 ));
477 }
478
479 for handler_spec in &self.spec.handlers {
489 for mapping in &handler_spec.mappings {
490 if let Some(when) = &mapping.when {
491 when_events.insert(when.clone());
492 }
493 let entry = emit_by_path
494 .entry(mapping.target_path.clone())
495 .or_insert(false);
496 *entry |= mapping.emit;
497 if mapping.stop.is_some() {
498 emit_by_path
499 .entry(stop_field_path(&mapping.target_path))
500 .or_insert(false);
501 }
502 }
503 let opcodes = self.compile_handler(handler_spec);
504 let event_type = self.get_event_type(&handler_spec.source);
505
506 if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
507 let mut existing_setup = Vec::new();
512 let mut existing_mappings = Vec::new();
513 let mut existing_teardown = Vec::new();
514 let mut section = 0; for opcode in existing_opcodes.iter() {
517 match opcode {
518 OpCode::ReadOrInitState { .. } => {
519 existing_setup.push(opcode.clone());
520 section = 1; }
522 OpCode::UpdateState { .. } => {
523 existing_teardown.push(opcode.clone());
524 section = 2; }
526 OpCode::EmitMutation { .. } => {
527 existing_teardown.push(opcode.clone());
528 }
529 _ if section == 0 => existing_setup.push(opcode.clone()),
530 _ if section == 1 => existing_mappings.push(opcode.clone()),
531 _ => existing_teardown.push(opcode.clone()),
532 }
533 }
534
535 let mut new_mappings = Vec::new();
537 section = 0;
538
539 for opcode in opcodes.iter() {
540 match opcode {
541 OpCode::ReadOrInitState { .. } => {
542 section = 1; }
544 OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
545 section = 2; }
547 _ if section == 1 => {
548 new_mappings.push(opcode.clone());
549 }
550 _ => {} }
552 }
553
554 let mut merged = Vec::new();
556 merged.extend(existing_setup);
557 merged.extend(existing_mappings);
558 merged.extend(new_mappings.clone());
559 merged.extend(existing_teardown);
560
561 *existing_opcodes = merged;
562 } else {
563 handlers.insert(event_type, opcodes);
564 }
565 }
566
567 for hook in &self.spec.instruction_hooks {
569 let event_type = hook.instruction_type.clone();
570
571 let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
572 let key_reg = 20;
573 let state_reg = 2;
574 let resolved_key_reg = 19;
575 let temp_reg = 18;
576
577 let mut ops = Vec::new();
578
579 ops.push(OpCode::LoadEventField {
581 path: FieldPath::new(&["__resolved_primary_key"]),
582 dest: resolved_key_reg,
583 default: Some(serde_json::json!(null)),
584 });
585
586 ops.push(OpCode::CopyRegister {
588 source: resolved_key_reg,
589 dest: key_reg,
590 });
591
592 if let Some(lookup_path) = &hook.lookup_by {
594 ops.push(OpCode::LoadEventField {
596 path: lookup_path.clone(),
597 dest: temp_reg,
598 default: None,
599 });
600
601 ops.push(OpCode::Transform {
603 source: temp_reg,
604 dest: temp_reg,
605 transformation: Transformation::HexEncode,
606 });
607
608 ops.push(OpCode::CopyRegisterIfNull {
610 source: temp_reg,
611 dest: key_reg,
612 });
613 }
614
615 ops.push(OpCode::ReadOrInitState {
616 state_id: self.state_id,
617 key: key_reg,
618 default: serde_json::json!({}),
619 dest: state_reg,
620 });
621
622 ops.push(OpCode::UpdateState {
623 state_id: self.state_id,
624 key: key_reg,
625 value: state_reg,
626 });
627
628 ops
629 });
630
631 let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
633
634 let insert_pos = handler_opcodes
638 .iter()
639 .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
640 .or_else(|| {
641 handler_opcodes
642 .iter()
643 .position(|op| matches!(op, OpCode::UpdateState { .. }))
644 });
645
646 if let Some(pos) = insert_pos {
647 for (i, opcode) in hook_opcodes.into_iter().enumerate() {
649 handler_opcodes.insert(pos + i, opcode);
650 }
651 }
652 }
653
654 let non_emitted_fields: HashSet<String> = emit_by_path
655 .into_iter()
656 .filter_map(|(path, emit)| if emit { None } else { Some(path) })
657 .collect();
658
659 EntityBytecode {
660 state_id: self.state_id,
661 handlers,
662 entity_name: self.entity_name.clone(),
663 when_events,
664 non_emitted_fields,
665 computed_paths: self.spec.computed_fields.clone(),
666 computed_fields_evaluator: None,
667 }
668 }
669
670 fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
671 let mut ops = Vec::new();
672 let state_reg = 2;
673 let key_reg = 20;
674
675 ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
676
677 let is_account_event = matches!(
683 spec.source,
684 SourceSpec::Source {
685 is_account: true,
686 ..
687 }
688 );
689 ops.push(OpCode::AbortIfNullKey {
690 key: key_reg,
691 is_account_event,
692 });
693
694 ops.push(OpCode::ReadOrInitState {
695 state_id: self.state_id,
696 key: key_reg,
697 default: serde_json::json!({}),
698 dest: state_reg,
699 });
700
701 ops.extend(self.compile_temporal_index_update(
708 &spec.key_resolution,
709 key_reg,
710 &spec.mappings,
711 ));
712
713 for mapping in &spec.mappings {
714 ops.extend(self.compile_mapping(mapping, state_reg, key_reg));
715 }
716
717 ops.extend(self.compile_resolvers(state_reg, key_reg));
718
719 ops.push(OpCode::EvaluateComputedFields {
721 state: state_reg,
722 computed_paths: self.spec.computed_fields.clone(),
723 });
724
725 ops.push(OpCode::UpdateState {
726 state_id: self.state_id,
727 key: key_reg,
728 value: state_reg,
729 });
730
731 if spec.emit {
732 ops.push(OpCode::EmitMutation {
733 entity_name: self.entity_name.clone(),
734 key: key_reg,
735 state: state_reg,
736 });
737 }
738
739 ops
740 }
741
742 fn compile_resolvers(&self, state_reg: Register, key_reg: Register) -> Vec<OpCode> {
743 let mut ops = Vec::new();
744
745 for resolver_spec in &self.spec.resolver_specs {
746 let url_template = match &resolver_spec.resolver {
747 ResolverType::Url(config) => match &config.url_source {
748 UrlSource::Template(parts) => Some(parts.clone()),
749 _ => None,
750 },
751 _ => None,
752 };
753
754 ops.push(OpCode::QueueResolver {
755 state_id: self.state_id,
756 entity_name: self.entity_name.clone(),
757 resolver: resolver_spec.resolver.clone(),
758 input_path: resolver_spec.input_path.clone(),
759 input_value: resolver_spec.input_value.clone(),
760 url_template,
761 strategy: resolver_spec.strategy.clone(),
762 extracts: resolver_spec.extracts.clone(),
763 condition: resolver_spec.condition.clone(),
764 schedule_at: resolver_spec.schedule_at.clone(),
765 state: state_reg,
766 key: key_reg,
767 });
768 }
769
770 ops
771 }
772
773 fn compile_mapping(
774 &self,
775 mapping: &TypedFieldMapping<S>,
776 state_reg: Register,
777 key_reg: Register,
778 ) -> Vec<OpCode> {
779 let mut ops = Vec::new();
780 let temp_reg = 10;
781
782 ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
783
784 if let Some(transform) = &mapping.transform {
785 ops.push(OpCode::Transform {
786 source: temp_reg,
787 dest: temp_reg,
788 transformation: transform.clone(),
789 });
790 }
791
792 if let Some(stop_instruction) = &mapping.stop {
793 if mapping.when.is_some() {
794 tracing::warn!(
795 "#[map] stop and when both set for {}. Ignoring when.",
796 mapping.target_path
797 );
798 }
799 if !matches!(mapping.population, PopulationStrategy::LastWrite)
800 && !matches!(mapping.population, PopulationStrategy::Merge)
801 {
802 tracing::warn!(
803 "#[map] stop ignores population strategy {:?}",
804 mapping.population
805 );
806 }
807
808 ops.push(OpCode::SetFieldUnlessStopped {
809 object: state_reg,
810 path: mapping.target_path.clone(),
811 value: temp_reg,
812 stop_field: stop_field_path(&mapping.target_path),
813 stop_instruction: stop_instruction.clone(),
814 entity_name: self.entity_name.clone(),
815 key_reg,
816 });
817 return ops;
818 }
819
820 if let Some(when_instruction) = &mapping.when {
821 if !matches!(mapping.population, PopulationStrategy::LastWrite)
822 && !matches!(mapping.population, PopulationStrategy::Merge)
823 {
824 tracing::warn!(
825 "#[map] when ignores population strategy {:?}",
826 mapping.population
827 );
828 }
829 let (condition_field, condition_op, condition_value) = mapping
830 .condition
831 .as_ref()
832 .and_then(|cond| cond.parsed.as_ref())
833 .and_then(|parsed| match parsed {
834 ParsedCondition::Comparison { field, op, value } => {
835 Some((Some(field.clone()), Some(op.clone()), Some(value.clone())))
836 }
837 ParsedCondition::Logical { .. } => {
838 tracing::warn!("Logical conditions not yet supported for #[map] when");
839 None
840 }
841 })
842 .unwrap_or((None, None, None));
843
844 ops.push(OpCode::SetFieldWhen {
845 object: state_reg,
846 path: mapping.target_path.clone(),
847 value: temp_reg,
848 when_instruction: when_instruction.clone(),
849 entity_name: self.entity_name.clone(),
850 key_reg,
851 condition_field,
852 condition_op,
853 condition_value,
854 });
855 return ops;
856 }
857
858 if let Some(condition) = &mapping.condition {
859 if let Some(parsed) = &condition.parsed {
860 match parsed {
861 ParsedCondition::Comparison {
862 field,
863 op,
864 value: cond_value,
865 } => {
866 if matches!(mapping.population, PopulationStrategy::LastWrite)
867 || matches!(mapping.population, PopulationStrategy::Merge)
868 {
869 ops.push(OpCode::ConditionalSetField {
870 object: state_reg,
871 path: mapping.target_path.clone(),
872 value: temp_reg,
873 condition_field: field.clone(),
874 condition_op: op.clone(),
875 condition_value: cond_value.clone(),
876 });
877 return ops;
878 }
879
880 if matches!(mapping.population, PopulationStrategy::Count) {
881 ops.push(OpCode::ConditionalIncrement {
882 object: state_reg,
883 path: mapping.target_path.clone(),
884 condition_field: field.clone(),
885 condition_op: op.clone(),
886 condition_value: cond_value.clone(),
887 });
888 return ops;
889 }
890
891 tracing::warn!(
892 "Conditional #[map] not supported for population strategy {:?}",
893 mapping.population
894 );
895 }
896 ParsedCondition::Logical { .. } => {
897 tracing::warn!("Logical conditions not yet supported for #[map]");
898 }
899 }
900 }
901 }
902
903 match &mapping.population {
904 PopulationStrategy::Append => {
905 ops.push(OpCode::AppendToArray {
906 object: state_reg,
907 path: mapping.target_path.clone(),
908 value: temp_reg,
909 });
910 }
911 PopulationStrategy::LastWrite => {
912 ops.push(OpCode::SetField {
913 object: state_reg,
914 path: mapping.target_path.clone(),
915 value: temp_reg,
916 });
917 }
918 PopulationStrategy::SetOnce => {
919 ops.push(OpCode::SetFieldIfNull {
920 object: state_reg,
921 path: mapping.target_path.clone(),
922 value: temp_reg,
923 });
924 }
925 PopulationStrategy::Merge => {
926 ops.push(OpCode::SetField {
927 object: state_reg,
928 path: mapping.target_path.clone(),
929 value: temp_reg,
930 });
931 }
932 PopulationStrategy::Max => {
933 ops.push(OpCode::SetFieldMax {
934 object: state_reg,
935 path: mapping.target_path.clone(),
936 value: temp_reg,
937 });
938 }
939 PopulationStrategy::Sum => {
940 ops.push(OpCode::SetFieldSum {
941 object: state_reg,
942 path: mapping.target_path.clone(),
943 value: temp_reg,
944 });
945 }
946 PopulationStrategy::Count => {
947 ops.push(OpCode::SetFieldIncrement {
949 object: state_reg,
950 path: mapping.target_path.clone(),
951 });
952 }
953 PopulationStrategy::Min => {
954 ops.push(OpCode::SetFieldMin {
955 object: state_reg,
956 path: mapping.target_path.clone(),
957 value: temp_reg,
958 });
959 }
960 PopulationStrategy::UniqueCount => {
961 let set_name = format!("{}_unique_set", mapping.target_path);
964 ops.push(OpCode::AddToUniqueSet {
965 state_id: self.state_id,
966 set_name,
967 value: temp_reg,
968 count_object: state_reg,
969 count_path: mapping.target_path.clone(),
970 });
971 }
972 }
973
974 ops
975 }
976
977 fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
978 match source {
979 MappingSource::FromSource {
980 path,
981 default,
982 transform,
983 } => {
984 let mut ops = vec![OpCode::LoadEventField {
985 path: path.clone(),
986 dest,
987 default: default.clone(),
988 }];
989
990 if let Some(transform_type) = transform {
992 ops.push(OpCode::Transform {
993 source: dest,
994 dest,
995 transformation: transform_type.clone(),
996 });
997 }
998
999 ops
1000 }
1001 MappingSource::Constant(val) => {
1002 vec![OpCode::LoadConstant {
1003 value: val.clone(),
1004 dest,
1005 }]
1006 }
1007 MappingSource::AsEvent { fields } => {
1008 let mut ops = Vec::new();
1009
1010 if fields.is_empty() {
1011 let event_data_reg = dest + 1;
1012 ops.push(OpCode::LoadEventField {
1013 path: FieldPath::new(&[]),
1014 dest: event_data_reg,
1015 default: Some(serde_json::json!({})),
1016 });
1017 ops.push(OpCode::CreateEvent {
1018 dest,
1019 event_value: event_data_reg,
1020 });
1021 } else {
1022 let data_obj_reg = dest + 1;
1023 ops.push(OpCode::CreateObject { dest: data_obj_reg });
1024
1025 let mut field_registers = Vec::new();
1026 let mut current_reg = dest + 2;
1027
1028 for field_source in fields.iter() {
1029 if let MappingSource::FromSource {
1030 path,
1031 default,
1032 transform,
1033 } = &**field_source
1034 {
1035 ops.push(OpCode::LoadEventField {
1036 path: path.clone(),
1037 dest: current_reg,
1038 default: default.clone(),
1039 });
1040
1041 if let Some(transform_type) = transform {
1042 ops.push(OpCode::Transform {
1043 source: current_reg,
1044 dest: current_reg,
1045 transformation: transform_type.clone(),
1046 });
1047 }
1048
1049 if let Some(field_name) = path.segments.last() {
1050 field_registers.push((field_name.clone(), current_reg));
1051 }
1052 current_reg += 1;
1053 }
1054 }
1055
1056 if !field_registers.is_empty() {
1057 ops.push(OpCode::SetFields {
1058 object: data_obj_reg,
1059 fields: field_registers,
1060 });
1061 }
1062
1063 ops.push(OpCode::CreateEvent {
1064 dest,
1065 event_value: data_obj_reg,
1066 });
1067 }
1068
1069 ops
1070 }
1071 MappingSource::WholeSource => {
1072 vec![OpCode::LoadEventField {
1073 path: FieldPath::new(&[]),
1074 dest,
1075 default: Some(serde_json::json!({})),
1076 }]
1077 }
1078 MappingSource::AsCapture { field_transforms } => {
1079 let capture_data_reg = 22; let mut ops = vec![OpCode::LoadEventField {
1082 path: FieldPath::new(&[]),
1083 dest: capture_data_reg,
1084 default: Some(serde_json::json!({})),
1085 }];
1086
1087 let field_reg = 24;
1091 let transformed_reg = 25;
1092
1093 for (field_name, transform) in field_transforms {
1094 ops.push(OpCode::GetField {
1097 object: capture_data_reg,
1098 path: field_name.clone(),
1099 dest: field_reg,
1100 });
1101
1102 ops.push(OpCode::Transform {
1104 source: field_reg,
1105 dest: transformed_reg,
1106 transformation: transform.clone(),
1107 });
1108
1109 ops.push(OpCode::SetField {
1111 object: capture_data_reg,
1112 path: field_name.clone(),
1113 value: transformed_reg,
1114 });
1115 }
1116
1117 ops.push(OpCode::CreateCapture {
1119 dest,
1120 capture_value: capture_data_reg,
1121 });
1122
1123 ops
1124 }
1125 MappingSource::FromContext { field } => {
1126 vec![OpCode::LoadEventField {
1128 path: FieldPath::new(&["__update_context", field.as_str()]),
1129 dest,
1130 default: Some(serde_json::json!(null)),
1131 }]
1132 }
1133 MappingSource::Computed { .. } => {
1134 vec![]
1135 }
1136 MappingSource::FromState { .. } => {
1137 vec![]
1138 }
1139 }
1140 }
1141
1142 pub fn compile_key_loading(
1143 &self,
1144 resolution: &KeyResolutionStrategy,
1145 key_reg: Register,
1146 mappings: &[TypedFieldMapping<S>],
1147 ) -> Vec<OpCode> {
1148 let mut ops = Vec::new();
1149
1150 let resolved_key_reg = 19; ops.push(OpCode::LoadEventField {
1154 path: FieldPath::new(&["__resolved_primary_key"]),
1155 dest: resolved_key_reg,
1156 default: Some(serde_json::json!(null)),
1157 });
1158
1159 match resolution {
1161 KeyResolutionStrategy::Embedded { primary_field } => {
1162 ops.push(OpCode::CopyRegister {
1164 source: resolved_key_reg,
1165 dest: key_reg,
1166 });
1167
1168 let effective_primary_field = if primary_field.segments.is_empty() {
1170 if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
1172 auto_field
1173 } else {
1174 primary_field.clone()
1175 }
1176 } else {
1177 primary_field.clone()
1178 };
1179
1180 if !effective_primary_field.segments.is_empty() {
1184 let temp_reg = 18;
1185 let transform_reg = 23; ops.push(OpCode::LoadEventField {
1188 path: effective_primary_field.clone(),
1189 dest: temp_reg,
1190 default: None,
1191 });
1192
1193 let primary_key_transform = self
1196 .find_primary_key_transformation(mappings)
1197 .or_else(|| self.find_inherited_primary_key_transformation());
1198
1199 if let Some(transform) = primary_key_transform {
1200 ops.push(OpCode::Transform {
1202 source: temp_reg,
1203 dest: transform_reg,
1204 transformation: transform,
1205 });
1206 ops.push(OpCode::CopyRegisterIfNull {
1208 source: transform_reg,
1209 dest: key_reg,
1210 });
1211 } else {
1212 ops.push(OpCode::CopyRegisterIfNull {
1214 source: temp_reg,
1215 dest: key_reg,
1216 });
1217 }
1218 }
1219 }
1222 KeyResolutionStrategy::Lookup { primary_field } => {
1223 let lookup_reg = 15;
1224 let result_reg = 17;
1225
1226 ops.push(OpCode::CopyRegister {
1232 source: resolved_key_reg,
1233 dest: lookup_reg,
1234 });
1235
1236 let temp_reg = 18;
1237 ops.push(OpCode::LoadEventField {
1238 path: primary_field.clone(),
1239 dest: temp_reg,
1240 default: None,
1241 });
1242 ops.push(OpCode::CopyRegisterIfNull {
1243 source: temp_reg,
1244 dest: lookup_reg,
1245 });
1246
1247 let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
1248 let effective_index_name =
1249 index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
1250
1251 ops.push(OpCode::LookupIndex {
1252 state_id: self.state_id,
1253 index_name: effective_index_name,
1254 lookup_value: lookup_reg,
1255 dest: result_reg,
1256 });
1257 ops.push(OpCode::CopyRegister {
1264 source: result_reg,
1265 dest: key_reg,
1266 });
1267 ops.push(OpCode::CopyRegister {
1282 source: result_reg,
1283 dest: key_reg,
1284 });
1285 }
1286 KeyResolutionStrategy::Computed {
1287 primary_field,
1288 compute_partition: _,
1289 } => {
1290 ops.push(OpCode::CopyRegister {
1292 source: resolved_key_reg,
1293 dest: key_reg,
1294 });
1295 let temp_reg = 18;
1296 ops.push(OpCode::LoadEventField {
1297 path: primary_field.clone(),
1298 dest: temp_reg,
1299 default: None,
1300 });
1301 ops.push(OpCode::CopyRegisterIfNull {
1302 source: temp_reg,
1303 dest: key_reg,
1304 });
1305 }
1306 KeyResolutionStrategy::TemporalLookup {
1307 lookup_field,
1308 timestamp_field,
1309 index_name,
1310 } => {
1311 ops.push(OpCode::CopyRegister {
1313 source: resolved_key_reg,
1314 dest: key_reg,
1315 });
1316 let lookup_reg = 15;
1317 let timestamp_reg = 16;
1318 let result_reg = 17;
1319
1320 ops.push(OpCode::LoadEventField {
1321 path: lookup_field.clone(),
1322 dest: lookup_reg,
1323 default: None,
1324 });
1325
1326 ops.push(OpCode::LoadEventField {
1327 path: timestamp_field.clone(),
1328 dest: timestamp_reg,
1329 default: None,
1330 });
1331
1332 ops.push(OpCode::LookupTemporalIndex {
1333 state_id: self.state_id,
1334 index_name: index_name.clone(),
1335 lookup_value: lookup_reg,
1336 timestamp: timestamp_reg,
1337 dest: result_reg,
1338 });
1339
1340 ops.push(OpCode::CopyRegisterIfNull {
1341 source: result_reg,
1342 dest: key_reg,
1343 });
1344 }
1345 }
1346
1347 ops
1348 }
1349
1350 fn find_primary_key_transformation(
1351 &self,
1352 mappings: &[TypedFieldMapping<S>],
1353 ) -> Option<Transformation> {
1354 let primary_key = self.spec.identity.primary_keys.first()?;
1356 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1357
1358 for mapping in mappings {
1360 if mapping.target_path == *primary_key
1362 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1363 {
1364 if let Some(transform) = &mapping.transform {
1366 return Some(transform.clone());
1367 }
1368
1369 if let MappingSource::FromSource {
1371 transform: Some(transform),
1372 ..
1373 } = &mapping.source
1374 {
1375 return Some(transform.clone());
1376 }
1377 }
1378 }
1379
1380 for mapping in mappings {
1382 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1383 if let Some(transform) = field_transforms.get(&primary_field_name) {
1384 return Some(transform.clone());
1385 }
1386 }
1387 }
1388
1389 None
1390 }
1391
1392 pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1395 let primary_key = self.spec.identity.primary_keys.first()?;
1396
1397 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1399
1400 for handler in &self.spec.handlers {
1402 for mapping in &handler.mappings {
1403 if mapping.target_path == *primary_key
1405 || mapping.target_path.ends_with(&format!(".{}", primary_key))
1406 {
1407 if let MappingSource::FromSource {
1409 path, transform, ..
1410 } = &mapping.source
1411 {
1412 if path.segments.last() == Some(&primary_field_name) {
1413 return mapping.transform.clone().or_else(|| transform.clone());
1415 }
1416 }
1417 }
1418
1419 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1421 if let Some(transform) = field_transforms.get(&primary_field_name) {
1422 return Some(transform.clone());
1423 }
1424 }
1425 }
1426 }
1427
1428 None
1429 }
1430
1431 fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1433 primary_key.split('.').next_back().map(|s| s.to_string())
1435 }
1436
1437 pub fn auto_detect_primary_field(
1440 &self,
1441 current_mappings: &[TypedFieldMapping<S>],
1442 ) -> Option<FieldPath> {
1443 let primary_key = self.spec.identity.primary_keys.first()?;
1444
1445 let primary_field_name = self.extract_primary_field_name(primary_key)?;
1447
1448 if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1450 return Some(FieldPath::new(&[&primary_field_name]));
1451 }
1452
1453 None
1454 }
1455
1456 fn current_account_has_primary_field(
1459 &self,
1460 field_name: &str,
1461 mappings: &[TypedFieldMapping<S>],
1462 ) -> bool {
1463 for mapping in mappings {
1465 if let MappingSource::FromSource { path, .. } = &mapping.source {
1466 if path.segments.last() == Some(&field_name.to_string()) {
1468 return true;
1469 }
1470 }
1471 }
1472
1473 false
1474 }
1475
1476 #[allow(dead_code)]
1478 fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1479 for mapping in mappings {
1480 if let MappingSource::FromSource { path, .. } = &mapping.source {
1481 if path.segments.last() == Some(&field_name.to_string()) {
1482 return true;
1483 }
1484 }
1485 }
1486 false
1487 }
1488
1489 #[allow(dead_code)]
1492 fn field_exists_in_mappings(
1493 &self,
1494 field_name: &str,
1495 mappings: &[TypedFieldMapping<S>],
1496 ) -> bool {
1497 for mapping in mappings {
1499 if let MappingSource::FromSource { path, .. } = &mapping.source {
1500 if path.segments.last() == Some(&field_name.to_string()) {
1501 return true;
1502 }
1503 }
1504 if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1506 if field_transforms.contains_key(field_name) {
1507 return true;
1508 }
1509 }
1510 }
1511 false
1512 }
1513
1514 fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1515 if field_path.segments.is_empty() {
1516 return None;
1517 }
1518
1519 let lookup_field_name = field_path.segments.last().unwrap();
1520
1521 for lookup_index in &self.spec.identity.lookup_indexes {
1522 let index_field_name = lookup_index
1523 .field_name
1524 .split('.')
1525 .next_back()
1526 .unwrap_or(&lookup_index.field_name);
1527 if index_field_name == lookup_field_name {
1528 return Some(format!("{}_lookup_index", index_field_name));
1529 }
1530 }
1531
1532 None
1533 }
1534
1535 fn find_lookup_index_for_lookup_field(
1538 &self,
1539 primary_field: &FieldPath,
1540 mappings: &[TypedFieldMapping<S>],
1541 ) -> Option<String> {
1542 let primary_path = primary_field.segments.join(".");
1544
1545 for mapping in mappings {
1547 if let MappingSource::FromSource { path, .. } = &mapping.source {
1549 let source_path = path.segments.join(".");
1550 if source_path == primary_path {
1551 for lookup_index in &self.spec.identity.lookup_indexes {
1553 if mapping.target_path == lookup_index.field_name {
1554 let index_field_name = lookup_index
1555 .field_name
1556 .split('.')
1557 .next_back()
1558 .unwrap_or(&lookup_index.field_name);
1559 return Some(format!("{}_lookup_index", index_field_name));
1560 }
1561 }
1562 }
1563 }
1564 }
1565
1566 self.find_lookup_index_for_field(primary_field)
1568 }
1569
1570 fn find_source_path_for_lookup_index(
1574 &self,
1575 mappings: &[TypedFieldMapping<S>],
1576 lookup_field_name: &str,
1577 ) -> Option<Vec<String>> {
1578 for mapping in mappings {
1579 if mapping.target_path == lookup_field_name {
1580 if let MappingSource::FromSource { path, .. } = &mapping.source {
1581 return Some(path.segments.clone());
1582 }
1583 }
1584 }
1585 None
1586 }
1587
1588 fn compile_temporal_index_update(
1589 &self,
1590 resolution: &KeyResolutionStrategy,
1591 key_reg: Register,
1592 mappings: &[TypedFieldMapping<S>],
1593 ) -> Vec<OpCode> {
1594 let mut ops = Vec::new();
1595
1596 for lookup_index in &self.spec.identity.lookup_indexes {
1597 let lookup_reg = 17;
1598 let source_field = lookup_index
1599 .field_name
1600 .split('.')
1601 .next_back()
1602 .unwrap_or(&lookup_index.field_name);
1603
1604 match resolution {
1605 KeyResolutionStrategy::Embedded { primary_field: _ } => {
1606 let source_path_opt =
1609 self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1610
1611 let load_path = if let Some(ref path) = source_path_opt {
1612 FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1613 } else {
1614 FieldPath::new(&[source_field])
1616 };
1617
1618 ops.push(OpCode::LoadEventField {
1619 path: load_path,
1620 dest: lookup_reg,
1621 default: None,
1622 });
1623
1624 if let Some(temporal_field_name) = &lookup_index.temporal_field {
1625 let timestamp_reg = 18;
1626
1627 ops.push(OpCode::LoadEventField {
1628 path: FieldPath::new(&[temporal_field_name]),
1629 dest: timestamp_reg,
1630 default: None,
1631 });
1632
1633 let index_name = format!("{}_temporal_index", source_field);
1634 ops.push(OpCode::UpdateTemporalIndex {
1635 state_id: self.state_id,
1636 index_name,
1637 lookup_value: lookup_reg,
1638 primary_key: key_reg,
1639 timestamp: timestamp_reg,
1640 });
1641
1642 let simple_index_name = format!("{}_lookup_index", source_field);
1643 ops.push(OpCode::UpdateLookupIndex {
1644 state_id: self.state_id,
1645 index_name: simple_index_name,
1646 lookup_value: lookup_reg,
1647 primary_key: key_reg,
1648 });
1649 } else {
1650 let index_name = format!("{}_lookup_index", source_field);
1651 ops.push(OpCode::UpdateLookupIndex {
1652 state_id: self.state_id,
1653 index_name,
1654 lookup_value: lookup_reg,
1655 primary_key: key_reg,
1656 });
1657 }
1658
1659 if source_path_opt.is_some() {
1663 ops.push(OpCode::UpdatePdaReverseLookup {
1664 state_id: self.state_id,
1665 lookup_name: "default_pda_lookup".to_string(),
1666 pda_address: lookup_reg,
1667 primary_key: key_reg,
1668 });
1669 }
1670 }
1671 KeyResolutionStrategy::Lookup { primary_field } => {
1672 let has_mapping_to_lookup_field = mappings
1675 .iter()
1676 .any(|m| m.target_path == lookup_index.field_name);
1677
1678 if has_mapping_to_lookup_field {
1679 let path_segments: Vec<&str> =
1682 primary_field.segments.iter().map(|s| s.as_str()).collect();
1683 ops.push(OpCode::LoadEventField {
1684 path: FieldPath::new(&path_segments),
1685 dest: lookup_reg,
1686 default: None,
1687 });
1688
1689 let index_name = format!("{}_lookup_index", source_field);
1690 ops.push(OpCode::UpdateLookupIndex {
1691 state_id: self.state_id,
1692 index_name,
1693 lookup_value: lookup_reg,
1694 primary_key: key_reg,
1695 });
1696 }
1697 }
1698 KeyResolutionStrategy::Computed { .. }
1699 | KeyResolutionStrategy::TemporalLookup { .. } => {
1700 }
1702 }
1703 }
1704
1705 ops
1706 }
1707
1708 fn get_event_type(&self, source: &SourceSpec) -> String {
1709 match source {
1710 SourceSpec::Source { type_name, .. } => type_name.clone(),
1711 }
1712 }
1713
1714 fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1715 let mut ops = Vec::new();
1716 let state_reg = 2;
1717
1718 for action in actions {
1719 match action {
1720 HookAction::SetField {
1721 target_field,
1722 source,
1723 condition,
1724 } => {
1725 let _ = condition;
1727
1728 let temp_reg = 11; let load_ops = self.compile_mapping_source(source, temp_reg);
1732 ops.extend(load_ops);
1733
1734 if let MappingSource::FromSource {
1736 transform: Some(transform_type),
1737 ..
1738 } = source
1739 {
1740 ops.push(OpCode::Transform {
1741 source: temp_reg,
1742 dest: temp_reg,
1743 transformation: transform_type.clone(),
1744 });
1745 }
1746
1747 if let Some(cond_expr) = condition {
1749 if let Some(parsed) = &cond_expr.parsed {
1750 let cond_check_ops = self.compile_condition_check(
1752 parsed,
1753 temp_reg,
1754 state_reg,
1755 target_field,
1756 );
1757 ops.extend(cond_check_ops);
1758 } else {
1759 ops.push(OpCode::SetField {
1761 object: state_reg,
1762 path: target_field.clone(),
1763 value: temp_reg,
1764 });
1765 }
1766 } else {
1767 ops.push(OpCode::SetField {
1769 object: state_reg,
1770 path: target_field.clone(),
1771 value: temp_reg,
1772 });
1773 }
1774 }
1775 HookAction::IncrementField {
1776 target_field,
1777 increment_by,
1778 condition,
1779 } => {
1780 if let Some(cond_expr) = condition {
1781 if let Some(parsed) = &cond_expr.parsed {
1782 let cond_check_ops = self.compile_conditional_increment(
1787 parsed,
1788 state_reg,
1789 target_field,
1790 *increment_by,
1791 );
1792 ops.extend(cond_check_ops);
1793 } else {
1794 ops.push(OpCode::SetFieldIncrement {
1796 object: state_reg,
1797 path: target_field.clone(),
1798 });
1799 }
1800 } else {
1801 ops.push(OpCode::SetFieldIncrement {
1803 object: state_reg,
1804 path: target_field.clone(),
1805 });
1806 }
1807 }
1808 HookAction::RegisterPdaMapping { .. } => {
1809 }
1812 }
1813 }
1814
1815 ops
1816 }
1817
1818 fn compile_condition_check(
1819 &self,
1820 condition: &ParsedCondition,
1821 value_reg: Register,
1822 state_reg: Register,
1823 target_field: &str,
1824 ) -> Vec<OpCode> {
1825 match condition {
1826 ParsedCondition::Comparison {
1827 field,
1828 op,
1829 value: cond_value,
1830 } => {
1831 vec![OpCode::ConditionalSetField {
1833 object: state_reg,
1834 path: target_field.to_string(),
1835 value: value_reg,
1836 condition_field: field.clone(),
1837 condition_op: op.clone(),
1838 condition_value: cond_value.clone(),
1839 }]
1840 }
1841 ParsedCondition::Logical { .. } => {
1842 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1844 vec![OpCode::SetField {
1845 object: state_reg,
1846 path: target_field.to_string(),
1847 value: value_reg,
1848 }]
1849 }
1850 }
1851 }
1852
1853 fn compile_conditional_increment(
1854 &self,
1855 condition: &ParsedCondition,
1856 state_reg: Register,
1857 target_field: &str,
1858 _increment_by: i64,
1859 ) -> Vec<OpCode> {
1860 match condition {
1861 ParsedCondition::Comparison {
1862 field,
1863 op,
1864 value: cond_value,
1865 } => {
1866 vec![OpCode::ConditionalIncrement {
1867 object: state_reg,
1868 path: target_field.to_string(),
1869 condition_field: field.clone(),
1870 condition_op: op.clone(),
1871 condition_value: cond_value.clone(),
1872 }]
1873 }
1874 ParsedCondition::Logical { .. } => {
1875 tracing::warn!("Logical conditions not yet supported in instruction hooks");
1876 vec![OpCode::SetFieldIncrement {
1877 object: state_reg,
1878 path: target_field.to_string(),
1879 }]
1880 }
1881 }
1882 }
1883}