Skip to main content

hyperstack_interpreter/
compiler.rs

1use crate::ast::*;
2use serde_json::Value;
3use std::collections::{HashMap, HashSet};
4use tracing;
5
6pub type Register = usize;
7
8fn stop_field_path(target_path: &str) -> String {
9    format!("__stop:{}", target_path)
10}
11
12#[derive(Debug, Clone)]
13pub enum OpCode {
14    LoadEventField {
15        path: FieldPath,
16        dest: Register,
17        default: Option<Value>,
18    },
19    LoadConstant {
20        value: Value,
21        dest: Register,
22    },
23    CopyRegister {
24        source: Register,
25        dest: Register,
26    },
27    /// Copy from source to dest only if dest is currently null
28    CopyRegisterIfNull {
29        source: Register,
30        dest: Register,
31    },
32    GetEventType {
33        dest: Register,
34    },
35    CreateObject {
36        dest: Register,
37    },
38    SetField {
39        object: Register,
40        path: String,
41        value: Register,
42    },
43    SetFields {
44        object: Register,
45        fields: Vec<(String, Register)>,
46    },
47    GetField {
48        object: Register,
49        path: String,
50        dest: Register,
51    },
52    ReadOrInitState {
53        state_id: u32,
54        key: Register,
55        default: Value,
56        dest: Register,
57    },
58    UpdateState {
59        state_id: u32,
60        key: Register,
61        value: Register,
62    },
63    AppendToArray {
64        object: Register,
65        path: String,
66        value: Register,
67    },
68    GetCurrentTimestamp {
69        dest: Register,
70    },
71    CreateEvent {
72        dest: Register,
73        event_value: Register,
74    },
75    CreateCapture {
76        dest: Register,
77        capture_value: Register,
78    },
79    Transform {
80        source: Register,
81        dest: Register,
82        transformation: Transformation,
83    },
84    EmitMutation {
85        entity_name: String,
86        key: Register,
87        state: Register,
88    },
89    SetFieldIfNull {
90        object: Register,
91        path: String,
92        value: Register,
93    },
94    SetFieldMax {
95        object: Register,
96        path: String,
97        value: Register,
98    },
99    UpdateTemporalIndex {
100        state_id: u32,
101        index_name: String,
102        lookup_value: Register,
103        primary_key: Register,
104        timestamp: Register,
105    },
106    LookupTemporalIndex {
107        state_id: u32,
108        index_name: String,
109        lookup_value: Register,
110        timestamp: Register,
111        dest: Register,
112    },
113    UpdateLookupIndex {
114        state_id: u32,
115        index_name: String,
116        lookup_value: Register,
117        primary_key: Register,
118    },
119    LookupIndex {
120        state_id: u32,
121        index_name: String,
122        lookup_value: Register,
123        dest: Register,
124    },
125    /// Sum a numeric value to a field (accumulator)
126    SetFieldSum {
127        object: Register,
128        path: String,
129        value: Register,
130    },
131    /// Increment a counter field by 1
132    SetFieldIncrement {
133        object: Register,
134        path: String,
135    },
136    /// Set field to minimum value
137    SetFieldMin {
138        object: Register,
139        path: String,
140        value: Register,
141    },
142    /// Set field only if a specific instruction type was seen in the same transaction.
143    /// If not seen yet, defers the operation for later completion.
144    SetFieldWhen {
145        object: Register,
146        path: String,
147        value: Register,
148        when_instruction: String,
149        entity_name: String,
150        key_reg: Register,
151        condition_field: Option<FieldPath>,
152        condition_op: Option<ComparisonOp>,
153        condition_value: Option<Value>,
154    },
155    /// Set field unless stopped by a specific instruction.
156    /// Stop is tracked by a per-entity stop flag.
157    SetFieldUnlessStopped {
158        object: Register,
159        path: String,
160        value: Register,
161        stop_field: String,
162        stop_instruction: String,
163        entity_name: String,
164        key_reg: Register,
165    },
166    /// Add value to unique set and update count
167    /// Maintains internal Set, field stores count
168    AddToUniqueSet {
169        state_id: u32,
170        set_name: String,
171        value: Register,
172        count_object: Register,
173        count_path: String,
174    },
175    /// Conditionally set a field based on a comparison
176    ConditionalSetField {
177        object: Register,
178        path: String,
179        value: Register,
180        condition_field: FieldPath,
181        condition_op: ComparisonOp,
182        condition_value: Value,
183    },
184    /// Conditionally increment a field based on a comparison
185    ConditionalIncrement {
186        object: Register,
187        path: String,
188        condition_field: FieldPath,
189        condition_op: ComparisonOp,
190        condition_value: Value,
191    },
192    /// Evaluate computed fields (calls external hook if provided)
193    /// computed_paths: List of paths that will be computed (for dirty tracking)
194    EvaluateComputedFields {
195        state: Register,
196        computed_paths: Vec<String>,
197    },
198    /// Queue a resolver for asynchronous enrichment
199    QueueResolver {
200        state_id: u32,
201        entity_name: String,
202        resolver: ResolverType,
203        input_path: Option<String>,
204        input_value: Option<Value>,
205        url_template: Option<Vec<UrlTemplatePart>>,
206        strategy: ResolveStrategy,
207        extracts: Vec<ResolverExtractSpec>,
208        condition: Option<ResolverCondition>,
209        schedule_at: Option<String>,
210        state: Register,
211        key: Register,
212    },
213    /// Update PDA reverse lookup table
214    /// Maps a PDA address to its primary key for reverse lookups
215    UpdatePdaReverseLookup {
216        state_id: u32,
217        lookup_name: String,
218        pda_address: Register,
219        primary_key: Register,
220    },
221}
222
223pub struct EntityBytecode {
224    pub state_id: u32,
225    pub handlers: HashMap<String, Vec<OpCode>>,
226    pub entity_name: String,
227    pub when_events: HashSet<String>,
228    pub non_emitted_fields: HashSet<String>,
229    pub computed_paths: Vec<String>,
230    /// Optional callback for evaluating computed fields
231    /// Parameters: state, context_slot (Option<u64>), context_timestamp (i64)
232    #[allow(clippy::type_complexity)]
233    pub computed_fields_evaluator: Option<
234        Box<
235            dyn Fn(
236                    &mut Value,
237                    Option<u64>,
238                    i64,
239                ) -> std::result::Result<(), Box<dyn std::error::Error>>
240                + Send
241                + Sync,
242        >,
243    >,
244}
245
246impl std::fmt::Debug for EntityBytecode {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        f.debug_struct("EntityBytecode")
249            .field("state_id", &self.state_id)
250            .field("handlers", &self.handlers)
251            .field("entity_name", &self.entity_name)
252            .field("when_events", &self.when_events)
253            .field("non_emitted_fields", &self.non_emitted_fields)
254            .field("computed_paths", &self.computed_paths)
255            .field(
256                "computed_fields_evaluator",
257                &self.computed_fields_evaluator.is_some(),
258            )
259            .finish()
260    }
261}
262
263#[derive(Debug)]
264pub struct MultiEntityBytecode {
265    pub entities: HashMap<String, EntityBytecode>,
266    pub event_routing: HashMap<String, Vec<String>>,
267    pub when_events: HashSet<String>,
268    pub proto_router: crate::proto_router::ProtoRouter,
269}
270
271impl MultiEntityBytecode {
272    pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
273        let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
274        let entity_bytecode = compiler.compile_entity();
275
276        let mut entities = HashMap::new();
277        let mut event_routing = HashMap::new();
278        let mut when_events = HashSet::new();
279
280        for event_type in entity_bytecode.handlers.keys() {
281            event_routing
282                .entry(event_type.clone())
283                .or_insert_with(Vec::new)
284                .push(entity_name.clone());
285        }
286
287        when_events.extend(entity_bytecode.when_events.iter().cloned());
288
289        entities.insert(entity_name, entity_bytecode);
290
291        MultiEntityBytecode {
292            entities,
293            event_routing,
294            when_events,
295            proto_router: crate::proto_router::ProtoRouter::new(),
296        }
297    }
298
299    pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
300        let entities = HashMap::new();
301        let event_routing = HashMap::new();
302        let when_events = HashSet::new();
303
304        if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
305            panic!("from_entities requires type information - use builder pattern instead");
306        }
307
308        MultiEntityBytecode {
309            entities,
310            event_routing,
311            when_events,
312            proto_router: crate::proto_router::ProtoRouter::new(),
313        }
314    }
315
316    #[allow(clippy::new_ret_no_self)]
317    pub fn new() -> MultiEntityBytecodeBuilder {
318        MultiEntityBytecodeBuilder {
319            entities: HashMap::new(),
320            event_routing: HashMap::new(),
321            when_events: HashSet::new(),
322            proto_router: crate::proto_router::ProtoRouter::new(),
323        }
324    }
325}
326
327pub struct MultiEntityBytecodeBuilder {
328    entities: HashMap<String, EntityBytecode>,
329    event_routing: HashMap<String, Vec<String>>,
330    when_events: HashSet<String>,
331    proto_router: crate::proto_router::ProtoRouter,
332}
333
334impl MultiEntityBytecodeBuilder {
335    pub fn add_entity<S>(
336        self,
337        entity_name: String,
338        spec: TypedStreamSpec<S>,
339        state_id: u32,
340    ) -> Self {
341        self.add_entity_with_evaluator(
342            entity_name,
343            spec,
344            state_id,
345            None::<
346                fn(
347                    &mut Value,
348                    Option<u64>,
349                    i64,
350                ) -> std::result::Result<(), Box<dyn std::error::Error>>,
351            >,
352        )
353    }
354
355    pub fn add_entity_with_evaluator<S, F>(
356        mut self,
357        entity_name: String,
358        spec: TypedStreamSpec<S>,
359        state_id: u32,
360        evaluator: Option<F>,
361    ) -> Self
362    where
363        F: Fn(&mut Value, Option<u64>, i64) -> std::result::Result<(), Box<dyn std::error::Error>>
364            + Send
365            + Sync
366            + 'static,
367    {
368        let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
369        let mut entity_bytecode = compiler.compile_entity();
370
371        // Store the evaluator callback if provided
372        if let Some(eval) = evaluator {
373            entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
374        }
375
376        for event_type in entity_bytecode.handlers.keys() {
377            self.event_routing
378                .entry(event_type.clone())
379                .or_default()
380                .push(entity_name.clone());
381        }
382
383        self.when_events
384            .extend(entity_bytecode.when_events.iter().cloned());
385
386        self.entities.insert(entity_name, entity_bytecode);
387        self
388    }
389
390    pub fn build(self) -> MultiEntityBytecode {
391        MultiEntityBytecode {
392            entities: self.entities,
393            event_routing: self.event_routing,
394            when_events: self.when_events,
395            proto_router: self.proto_router,
396        }
397    }
398}
399
400pub struct TypedCompiler<S> {
401    pub spec: TypedStreamSpec<S>,
402    entity_name: String,
403    state_id: u32,
404}
405
406impl<S> TypedCompiler<S> {
407    pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
408        TypedCompiler {
409            spec,
410            entity_name,
411            state_id: 0,
412        }
413    }
414
415    pub fn with_state_id(mut self, state_id: u32) -> Self {
416        self.state_id = state_id;
417        self
418    }
419
420    pub fn compile(&self) -> MultiEntityBytecode {
421        let entity_bytecode = self.compile_entity();
422
423        let mut entities = HashMap::new();
424        let mut event_routing = HashMap::new();
425        let mut when_events = HashSet::new();
426
427        for event_type in entity_bytecode.handlers.keys() {
428            event_routing
429                .entry(event_type.clone())
430                .or_insert_with(Vec::new)
431                .push(self.entity_name.clone());
432        }
433
434        when_events.extend(entity_bytecode.when_events.iter().cloned());
435
436        entities.insert(self.entity_name.clone(), entity_bytecode);
437
438        MultiEntityBytecode {
439            entities,
440            event_routing,
441            when_events,
442            proto_router: crate::proto_router::ProtoRouter::new(),
443        }
444    }
445
446    fn compile_entity(&self) -> EntityBytecode {
447        let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
448        let mut when_events: HashSet<String> = HashSet::new();
449        let mut emit_by_path: HashMap<String, bool> = HashMap::new();
450
451        // DEBUG: Collect all handler info before processing
452        let mut debug_info = Vec::new();
453        for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
454            let event_type = self.get_event_type(&handler_spec.source);
455            let program_id = match &handler_spec.source {
456                crate::ast::SourceSpec::Source { program_id, .. } => {
457                    program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
458                }
459            };
460            debug_info.push(format!(
461                "  [{}] EventType={}, Mappings={}, ProgramId={}",
462                index,
463                event_type,
464                handler_spec.mappings.len(),
465                program_id
466            ));
467        }
468
469        // DEBUG: Log handler information (optional - can be removed later)
470        // Uncomment to debug handler processing:
471        // if self.entity_name == "PumpfunToken" {
472        //     eprintln!("🔍 Compiling {} handlers for {}", self.spec.handlers.len(), self.entity_name);
473        //     for info in &debug_info {
474        //         eprintln!("{}", info);
475        //     }
476        // }
477
478        for handler_spec in &self.spec.handlers {
479            for mapping in &handler_spec.mappings {
480                if let Some(when) = &mapping.when {
481                    when_events.insert(when.clone());
482                }
483                let entry = emit_by_path
484                    .entry(mapping.target_path.clone())
485                    .or_insert(false);
486                *entry |= mapping.emit;
487                if mapping.stop.is_some() {
488                    emit_by_path
489                        .entry(stop_field_path(&mapping.target_path))
490                        .or_insert(false);
491                }
492            }
493            let opcodes = self.compile_handler(handler_spec);
494            let event_type = self.get_event_type(&handler_spec.source);
495
496            if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
497                // Merge strategy: Take ALL operations from BOTH handlers
498                // Keep setup from first, combine all mappings, keep one teardown
499
500                // Split existing handler into: setup, mappings, teardown
501                let mut existing_setup = Vec::new();
502                let mut existing_mappings = Vec::new();
503                let mut existing_teardown = Vec::new();
504                let mut section = 0; // 0=setup, 1=mappings, 2=teardown
505
506                for opcode in existing_opcodes.iter() {
507                    match opcode {
508                        OpCode::ReadOrInitState { .. } => {
509                            existing_setup.push(opcode.clone());
510                            section = 1; // Next opcodes are mappings
511                        }
512                        OpCode::UpdateState { .. } => {
513                            existing_teardown.push(opcode.clone());
514                            section = 2; // Next opcodes are teardown
515                        }
516                        OpCode::EmitMutation { .. } => {
517                            existing_teardown.push(opcode.clone());
518                        }
519                        _ if section == 0 => existing_setup.push(opcode.clone()),
520                        _ if section == 1 => existing_mappings.push(opcode.clone()),
521                        _ => existing_teardown.push(opcode.clone()),
522                    }
523                }
524
525                // Extract mappings from new handler (skip setup and teardown)
526                let mut new_mappings = Vec::new();
527                section = 0;
528
529                for opcode in opcodes.iter() {
530                    match opcode {
531                        OpCode::ReadOrInitState { .. } => {
532                            section = 1; // Start capturing mappings
533                        }
534                        OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
535                            section = 2; // Stop capturing
536                        }
537                        _ if section == 1 => {
538                            new_mappings.push(opcode.clone());
539                        }
540                        _ => {} // Skip setup and teardown from new handler
541                    }
542                }
543
544                // Rebuild: setup + existing_mappings + new_mappings + teardown
545                let mut merged = Vec::new();
546                merged.extend(existing_setup);
547                merged.extend(existing_mappings);
548                merged.extend(new_mappings.clone());
549                merged.extend(existing_teardown);
550
551                *existing_opcodes = merged;
552            } else {
553                handlers.insert(event_type, opcodes);
554            }
555        }
556
557        // Process instruction_hooks to add SetField/IncrementField operations
558        for hook in &self.spec.instruction_hooks {
559            let event_type = hook.instruction_type.clone();
560
561            let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
562                let key_reg = 20;
563                let state_reg = 2;
564                let resolved_key_reg = 19;
565                let temp_reg = 18;
566
567                let mut ops = Vec::new();
568
569                // First, try to load __resolved_primary_key from resolver
570                ops.push(OpCode::LoadEventField {
571                    path: FieldPath::new(&["__resolved_primary_key"]),
572                    dest: resolved_key_reg,
573                    default: Some(serde_json::json!(null)),
574                });
575
576                // Copy to key_reg (unconditionally, may be null)
577                ops.push(OpCode::CopyRegister {
578                    source: resolved_key_reg,
579                    dest: key_reg,
580                });
581
582                // If hook has lookup_by, use it to load primary key from instruction accounts
583                if let Some(lookup_path) = &hook.lookup_by {
584                    // Load the primary key from the instruction's lookup_by field (e.g., accounts.signer)
585                    ops.push(OpCode::LoadEventField {
586                        path: lookup_path.clone(),
587                        dest: temp_reg,
588                        default: None,
589                    });
590
591                    // Apply HexEncode transformation (accounts are byte arrays)
592                    ops.push(OpCode::Transform {
593                        source: temp_reg,
594                        dest: temp_reg,
595                        transformation: Transformation::HexEncode,
596                    });
597
598                    // Use this as fallback if __resolved_primary_key was null
599                    ops.push(OpCode::CopyRegisterIfNull {
600                        source: temp_reg,
601                        dest: key_reg,
602                    });
603                }
604
605                ops.push(OpCode::ReadOrInitState {
606                    state_id: self.state_id,
607                    key: key_reg,
608                    default: serde_json::json!({}),
609                    dest: state_reg,
610                });
611
612                ops.push(OpCode::UpdateState {
613                    state_id: self.state_id,
614                    key: key_reg,
615                    value: state_reg,
616                });
617
618                ops
619            });
620
621            // Generate opcodes for each action in the hook
622            let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
623
624            // Insert hook opcodes before EvaluateComputedFields (if present) or UpdateState
625            // Hook actions (like whale_trade_count increment) must run before computed fields
626            // are evaluated, since computed fields may depend on the modified state
627            let insert_pos = handler_opcodes
628                .iter()
629                .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
630                .or_else(|| {
631                    handler_opcodes
632                        .iter()
633                        .position(|op| matches!(op, OpCode::UpdateState { .. }))
634                });
635
636            if let Some(pos) = insert_pos {
637                // Insert hook opcodes before EvaluateComputedFields or UpdateState
638                for (i, opcode) in hook_opcodes.into_iter().enumerate() {
639                    handler_opcodes.insert(pos + i, opcode);
640                }
641            }
642        }
643
644        let non_emitted_fields: HashSet<String> = emit_by_path
645            .into_iter()
646            .filter_map(|(path, emit)| if emit { None } else { Some(path) })
647            .collect();
648
649        EntityBytecode {
650            state_id: self.state_id,
651            handlers,
652            entity_name: self.entity_name.clone(),
653            when_events,
654            non_emitted_fields,
655            computed_paths: self.spec.computed_fields.clone(),
656            computed_fields_evaluator: None,
657        }
658    }
659
660    fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
661        let mut ops = Vec::new();
662        let state_reg = 2;
663        let key_reg = 20;
664
665        ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
666
667        ops.push(OpCode::ReadOrInitState {
668            state_id: self.state_id,
669            key: key_reg,
670            default: serde_json::json!({}),
671            dest: state_reg,
672        });
673
674        // Index updates must come AFTER ReadOrInitState so the state table exists.
675        // ReadOrInitState lazily creates the state table via entry().or_insert_with(),
676        // but index opcodes (UpdateLookupIndex, UpdateTemporalIndex, UpdatePdaReverseLookup)
677        // use get_mut() which fails if the table doesn't exist yet.
678        // This ordering also means stale/duplicate updates (caught by ReadOrInitState's
679        // recency check) correctly skip index updates too.
680        ops.extend(self.compile_temporal_index_update(
681            &spec.key_resolution,
682            key_reg,
683            &spec.mappings,
684        ));
685
686        for mapping in &spec.mappings {
687            ops.extend(self.compile_mapping(mapping, state_reg, key_reg));
688        }
689
690        ops.extend(self.compile_resolvers(state_reg, key_reg));
691
692        // Evaluate computed fields after all mappings but before updating state
693        ops.push(OpCode::EvaluateComputedFields {
694            state: state_reg,
695            computed_paths: self.spec.computed_fields.clone(),
696        });
697
698        ops.push(OpCode::UpdateState {
699            state_id: self.state_id,
700            key: key_reg,
701            value: state_reg,
702        });
703
704        if spec.emit {
705            ops.push(OpCode::EmitMutation {
706                entity_name: self.entity_name.clone(),
707                key: key_reg,
708                state: state_reg,
709            });
710        }
711
712        ops
713    }
714
715    fn compile_resolvers(&self, state_reg: Register, key_reg: Register) -> Vec<OpCode> {
716        let mut ops = Vec::new();
717
718        for resolver_spec in &self.spec.resolver_specs {
719            let url_template = match &resolver_spec.resolver {
720                ResolverType::Url(config) => match &config.url_source {
721                    UrlSource::Template(parts) => Some(parts.clone()),
722                    _ => None,
723                },
724                _ => None,
725            };
726
727            ops.push(OpCode::QueueResolver {
728                state_id: self.state_id,
729                entity_name: self.entity_name.clone(),
730                resolver: resolver_spec.resolver.clone(),
731                input_path: resolver_spec.input_path.clone(),
732                input_value: resolver_spec.input_value.clone(),
733                url_template,
734                strategy: resolver_spec.strategy.clone(),
735                extracts: resolver_spec.extracts.clone(),
736                condition: resolver_spec.condition.clone(),
737                schedule_at: resolver_spec.schedule_at.clone(),
738                state: state_reg,
739                key: key_reg,
740            });
741        }
742
743        ops
744    }
745
746    fn compile_mapping(
747        &self,
748        mapping: &TypedFieldMapping<S>,
749        state_reg: Register,
750        key_reg: Register,
751    ) -> Vec<OpCode> {
752        let mut ops = Vec::new();
753        let temp_reg = 10;
754
755        ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
756
757        if let Some(transform) = &mapping.transform {
758            ops.push(OpCode::Transform {
759                source: temp_reg,
760                dest: temp_reg,
761                transformation: transform.clone(),
762            });
763        }
764
765        if let Some(stop_instruction) = &mapping.stop {
766            if mapping.when.is_some() {
767                tracing::warn!(
768                    "#[map] stop and when both set for {}. Ignoring when.",
769                    mapping.target_path
770                );
771            }
772            if !matches!(mapping.population, PopulationStrategy::LastWrite)
773                && !matches!(mapping.population, PopulationStrategy::Merge)
774            {
775                tracing::warn!(
776                    "#[map] stop ignores population strategy {:?}",
777                    mapping.population
778                );
779            }
780
781            ops.push(OpCode::SetFieldUnlessStopped {
782                object: state_reg,
783                path: mapping.target_path.clone(),
784                value: temp_reg,
785                stop_field: stop_field_path(&mapping.target_path),
786                stop_instruction: stop_instruction.clone(),
787                entity_name: self.entity_name.clone(),
788                key_reg,
789            });
790            return ops;
791        }
792
793        if let Some(when_instruction) = &mapping.when {
794            if !matches!(mapping.population, PopulationStrategy::LastWrite)
795                && !matches!(mapping.population, PopulationStrategy::Merge)
796            {
797                tracing::warn!(
798                    "#[map] when ignores population strategy {:?}",
799                    mapping.population
800                );
801            }
802            let (condition_field, condition_op, condition_value) = mapping
803                .condition
804                .as_ref()
805                .and_then(|cond| cond.parsed.as_ref())
806                .and_then(|parsed| match parsed {
807                    ParsedCondition::Comparison { field, op, value } => {
808                        Some((Some(field.clone()), Some(op.clone()), Some(value.clone())))
809                    }
810                    ParsedCondition::Logical { .. } => {
811                        tracing::warn!("Logical conditions not yet supported for #[map] when");
812                        None
813                    }
814                })
815                .unwrap_or((None, None, None));
816
817            ops.push(OpCode::SetFieldWhen {
818                object: state_reg,
819                path: mapping.target_path.clone(),
820                value: temp_reg,
821                when_instruction: when_instruction.clone(),
822                entity_name: self.entity_name.clone(),
823                key_reg,
824                condition_field,
825                condition_op,
826                condition_value,
827            });
828            return ops;
829        }
830
831        if let Some(condition) = &mapping.condition {
832            if let Some(parsed) = &condition.parsed {
833                match parsed {
834                    ParsedCondition::Comparison {
835                        field,
836                        op,
837                        value: cond_value,
838                    } => {
839                        if matches!(mapping.population, PopulationStrategy::LastWrite)
840                            || matches!(mapping.population, PopulationStrategy::Merge)
841                        {
842                            ops.push(OpCode::ConditionalSetField {
843                                object: state_reg,
844                                path: mapping.target_path.clone(),
845                                value: temp_reg,
846                                condition_field: field.clone(),
847                                condition_op: op.clone(),
848                                condition_value: cond_value.clone(),
849                            });
850                            return ops;
851                        }
852
853                        if matches!(mapping.population, PopulationStrategy::Count) {
854                            ops.push(OpCode::ConditionalIncrement {
855                                object: state_reg,
856                                path: mapping.target_path.clone(),
857                                condition_field: field.clone(),
858                                condition_op: op.clone(),
859                                condition_value: cond_value.clone(),
860                            });
861                            return ops;
862                        }
863
864                        tracing::warn!(
865                            "Conditional #[map] not supported for population strategy {:?}",
866                            mapping.population
867                        );
868                    }
869                    ParsedCondition::Logical { .. } => {
870                        tracing::warn!("Logical conditions not yet supported for #[map]");
871                    }
872                }
873            }
874        }
875
876        match &mapping.population {
877            PopulationStrategy::Append => {
878                ops.push(OpCode::AppendToArray {
879                    object: state_reg,
880                    path: mapping.target_path.clone(),
881                    value: temp_reg,
882                });
883            }
884            PopulationStrategy::LastWrite => {
885                ops.push(OpCode::SetField {
886                    object: state_reg,
887                    path: mapping.target_path.clone(),
888                    value: temp_reg,
889                });
890            }
891            PopulationStrategy::SetOnce => {
892                ops.push(OpCode::SetFieldIfNull {
893                    object: state_reg,
894                    path: mapping.target_path.clone(),
895                    value: temp_reg,
896                });
897            }
898            PopulationStrategy::Merge => {
899                ops.push(OpCode::SetField {
900                    object: state_reg,
901                    path: mapping.target_path.clone(),
902                    value: temp_reg,
903                });
904            }
905            PopulationStrategy::Max => {
906                ops.push(OpCode::SetFieldMax {
907                    object: state_reg,
908                    path: mapping.target_path.clone(),
909                    value: temp_reg,
910                });
911            }
912            PopulationStrategy::Sum => {
913                ops.push(OpCode::SetFieldSum {
914                    object: state_reg,
915                    path: mapping.target_path.clone(),
916                    value: temp_reg,
917                });
918            }
919            PopulationStrategy::Count => {
920                // Count doesn't need the value, just increment
921                ops.push(OpCode::SetFieldIncrement {
922                    object: state_reg,
923                    path: mapping.target_path.clone(),
924                });
925            }
926            PopulationStrategy::Min => {
927                ops.push(OpCode::SetFieldMin {
928                    object: state_reg,
929                    path: mapping.target_path.clone(),
930                    value: temp_reg,
931                });
932            }
933            PopulationStrategy::UniqueCount => {
934                // UniqueCount requires maintaining an internal set
935                // The field stores the count, but we track unique values in a hidden set
936                let set_name = format!("{}_unique_set", mapping.target_path);
937                ops.push(OpCode::AddToUniqueSet {
938                    state_id: self.state_id,
939                    set_name,
940                    value: temp_reg,
941                    count_object: state_reg,
942                    count_path: mapping.target_path.clone(),
943                });
944            }
945        }
946
947        ops
948    }
949
950    fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
951        match source {
952            MappingSource::FromSource {
953                path,
954                default,
955                transform,
956            } => {
957                let mut ops = vec![OpCode::LoadEventField {
958                    path: path.clone(),
959                    dest,
960                    default: default.clone(),
961                }];
962
963                // Apply transform if specified in the source
964                if let Some(transform_type) = transform {
965                    ops.push(OpCode::Transform {
966                        source: dest,
967                        dest,
968                        transformation: transform_type.clone(),
969                    });
970                }
971
972                ops
973            }
974            MappingSource::Constant(val) => {
975                vec![OpCode::LoadConstant {
976                    value: val.clone(),
977                    dest,
978                }]
979            }
980            MappingSource::AsEvent { fields } => {
981                let mut ops = Vec::new();
982
983                if fields.is_empty() {
984                    let event_data_reg = dest + 1;
985                    ops.push(OpCode::LoadEventField {
986                        path: FieldPath::new(&[]),
987                        dest: event_data_reg,
988                        default: Some(serde_json::json!({})),
989                    });
990                    ops.push(OpCode::CreateEvent {
991                        dest,
992                        event_value: event_data_reg,
993                    });
994                } else {
995                    let data_obj_reg = dest + 1;
996                    ops.push(OpCode::CreateObject { dest: data_obj_reg });
997
998                    let mut field_registers = Vec::new();
999                    let mut current_reg = dest + 2;
1000
1001                    for field_source in fields.iter() {
1002                        if let MappingSource::FromSource {
1003                            path,
1004                            default,
1005                            transform,
1006                        } = &**field_source
1007                        {
1008                            ops.push(OpCode::LoadEventField {
1009                                path: path.clone(),
1010                                dest: current_reg,
1011                                default: default.clone(),
1012                            });
1013
1014                            if let Some(transform_type) = transform {
1015                                ops.push(OpCode::Transform {
1016                                    source: current_reg,
1017                                    dest: current_reg,
1018                                    transformation: transform_type.clone(),
1019                                });
1020                            }
1021
1022                            if let Some(field_name) = path.segments.last() {
1023                                field_registers.push((field_name.clone(), current_reg));
1024                            }
1025                            current_reg += 1;
1026                        }
1027                    }
1028
1029                    if !field_registers.is_empty() {
1030                        ops.push(OpCode::SetFields {
1031                            object: data_obj_reg,
1032                            fields: field_registers,
1033                        });
1034                    }
1035
1036                    ops.push(OpCode::CreateEvent {
1037                        dest,
1038                        event_value: data_obj_reg,
1039                    });
1040                }
1041
1042                ops
1043            }
1044            MappingSource::WholeSource => {
1045                vec![OpCode::LoadEventField {
1046                    path: FieldPath::new(&[]),
1047                    dest,
1048                    default: Some(serde_json::json!({})),
1049                }]
1050            }
1051            MappingSource::AsCapture { field_transforms } => {
1052                // AsCapture loads the whole source, applies field-level transforms, and wraps in CaptureWrapper
1053                let capture_data_reg = 22; // Temp register for capture data before wrapping
1054                let mut ops = vec![OpCode::LoadEventField {
1055                    path: FieldPath::new(&[]),
1056                    dest: capture_data_reg,
1057                    default: Some(serde_json::json!({})),
1058                }];
1059
1060                // Apply transforms to specific fields in the loaded object
1061                // IMPORTANT: Use registers that don't conflict with key_reg (20)
1062                // Using 24 and 25 to avoid conflicts with key loading (uses 18, 19, 20, 23)
1063                let field_reg = 24;
1064                let transformed_reg = 25;
1065
1066                for (field_name, transform) in field_transforms {
1067                    // Load the field from the capture_data_reg (not from event!)
1068                    // Use GetField opcode to read from a register instead of LoadEventField
1069                    ops.push(OpCode::GetField {
1070                        object: capture_data_reg,
1071                        path: field_name.clone(),
1072                        dest: field_reg,
1073                    });
1074
1075                    // Transform it
1076                    ops.push(OpCode::Transform {
1077                        source: field_reg,
1078                        dest: transformed_reg,
1079                        transformation: transform.clone(),
1080                    });
1081
1082                    // Set it back into the capture data object
1083                    ops.push(OpCode::SetField {
1084                        object: capture_data_reg,
1085                        path: field_name.clone(),
1086                        value: transformed_reg,
1087                    });
1088                }
1089
1090                // Wrap the capture data in CaptureWrapper with metadata
1091                ops.push(OpCode::CreateCapture {
1092                    dest,
1093                    capture_value: capture_data_reg,
1094                });
1095
1096                ops
1097            }
1098            MappingSource::FromContext { field } => {
1099                // Load from instruction context (timestamp, slot, signature)
1100                vec![OpCode::LoadEventField {
1101                    path: FieldPath::new(&["__update_context", field.as_str()]),
1102                    dest,
1103                    default: Some(serde_json::json!(null)),
1104                }]
1105            }
1106            MappingSource::Computed { .. } => {
1107                vec![]
1108            }
1109            MappingSource::FromState { .. } => {
1110                vec![]
1111            }
1112        }
1113    }
1114
1115    pub fn compile_key_loading(
1116        &self,
1117        resolution: &KeyResolutionStrategy,
1118        key_reg: Register,
1119        mappings: &[TypedFieldMapping<S>],
1120    ) -> Vec<OpCode> {
1121        let mut ops = Vec::new();
1122
1123        // First, try to load __resolved_primary_key from resolver
1124        // This allows resolvers to override the key resolution
1125        let resolved_key_reg = 19; // Use a temp register
1126        ops.push(OpCode::LoadEventField {
1127            path: FieldPath::new(&["__resolved_primary_key"]),
1128            dest: resolved_key_reg,
1129            default: Some(serde_json::json!(null)),
1130        });
1131
1132        // Now do the normal key resolution
1133        match resolution {
1134            KeyResolutionStrategy::Embedded { primary_field } => {
1135                // Copy resolver result to key_reg (may be null)
1136                ops.push(OpCode::CopyRegister {
1137                    source: resolved_key_reg,
1138                    dest: key_reg,
1139                });
1140
1141                // Enhanced key resolution: check for auto-inheritance when primary_field is empty
1142                let effective_primary_field = if primary_field.segments.is_empty() {
1143                    // Try to auto-detect primary field from account schema
1144                    if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
1145                        auto_field
1146                    } else {
1147                        primary_field.clone()
1148                    }
1149                } else {
1150                    primary_field.clone()
1151                };
1152
1153                // Skip fallback key loading if effective primary_field is still empty
1154                // This happens for account types that rely solely on __resolved_primary_key
1155                // (e.g., accounts with #[resolve_key_for] resolvers)
1156                if !effective_primary_field.segments.is_empty() {
1157                    let temp_reg = 18;
1158                    let transform_reg = 23; // Register for transformed key
1159
1160                    ops.push(OpCode::LoadEventField {
1161                        path: effective_primary_field.clone(),
1162                        dest: temp_reg,
1163                        default: None,
1164                    });
1165
1166                    // Check if there's a transformation for the primary key field
1167                    // First try the current mappings, then inherited transformations
1168                    let primary_key_transform = self
1169                        .find_primary_key_transformation(mappings)
1170                        .or_else(|| self.find_inherited_primary_key_transformation());
1171
1172                    if let Some(transform) = primary_key_transform {
1173                        // Apply transformation to the loaded key
1174                        ops.push(OpCode::Transform {
1175                            source: temp_reg,
1176                            dest: transform_reg,
1177                            transformation: transform,
1178                        });
1179                        // Use transformed value as key
1180                        ops.push(OpCode::CopyRegisterIfNull {
1181                            source: transform_reg,
1182                            dest: key_reg,
1183                        });
1184                    } else {
1185                        // No transformation, use raw value
1186                        ops.push(OpCode::CopyRegisterIfNull {
1187                            source: temp_reg,
1188                            dest: key_reg,
1189                        });
1190                    }
1191                }
1192                // If effective_primary_field is empty, key_reg will only contain __resolved_primary_key
1193                // (loaded earlier at line 513-522), or remain null if resolver didn't set it
1194            }
1195            KeyResolutionStrategy::Lookup { primary_field } => {
1196                let lookup_reg = 15;
1197                let result_reg = 17;
1198
1199                // Prefer resolver-provided key as lookup input
1200                ops.push(OpCode::CopyRegister {
1201                    source: resolved_key_reg,
1202                    dest: lookup_reg,
1203                });
1204
1205                let temp_reg = 18;
1206                ops.push(OpCode::LoadEventField {
1207                    path: primary_field.clone(),
1208                    dest: temp_reg,
1209                    default: None,
1210                });
1211                ops.push(OpCode::CopyRegisterIfNull {
1212                    source: temp_reg,
1213                    dest: lookup_reg,
1214                });
1215
1216                let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
1217                let effective_index_name =
1218                    index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
1219
1220                ops.push(OpCode::LookupIndex {
1221                    state_id: self.state_id,
1222                    index_name: effective_index_name,
1223                    lookup_value: lookup_reg,
1224                    dest: result_reg,
1225                });
1226                // NOTE: We intentionally do NOT fall back to lookup_reg when LookupIndex returns null.
1227                // If the lookup fails (because the RoundState account hasn't been processed yet),
1228                // the result_reg will remain null, and the mutation will be skipped.
1229                // Previously we had: CopyRegisterIfNull { source: lookup_reg, dest: result_reg }
1230                // which caused the PDA address to be used as the key instead of the round_id.
1231                // This resulted in mutations with key = PDA address instead of key = primary_key.
1232
1233                // First, set key_reg to __resolved_primary_key (may be null).
1234                // When the flush path provides a resolved key via PDA reverse lookup,
1235                // this gives it priority over the LookupIndex result.
1236                // This matches the pattern used by Embedded and Computed strategies.
1237                ops.push(OpCode::CopyRegister {
1238                    source: resolved_key_reg,
1239                    dest: key_reg,
1240                });
1241                // If key_reg is still null (no resolved key from flush), use the LookupIndex result.
1242                // This preserves the original behavior for first-arrival events where
1243                // __resolved_primary_key is not yet set.
1244                ops.push(OpCode::CopyRegisterIfNull {
1245                    source: result_reg,
1246                    dest: key_reg,
1247                });
1248            }
1249            KeyResolutionStrategy::Computed {
1250                primary_field,
1251                compute_partition: _,
1252            } => {
1253                // Copy resolver result to key_reg (may be null)
1254                ops.push(OpCode::CopyRegister {
1255                    source: resolved_key_reg,
1256                    dest: key_reg,
1257                });
1258                let temp_reg = 18;
1259                ops.push(OpCode::LoadEventField {
1260                    path: primary_field.clone(),
1261                    dest: temp_reg,
1262                    default: None,
1263                });
1264                ops.push(OpCode::CopyRegisterIfNull {
1265                    source: temp_reg,
1266                    dest: key_reg,
1267                });
1268            }
1269            KeyResolutionStrategy::TemporalLookup {
1270                lookup_field,
1271                timestamp_field,
1272                index_name,
1273            } => {
1274                // Copy resolver result to key_reg (may be null)
1275                ops.push(OpCode::CopyRegister {
1276                    source: resolved_key_reg,
1277                    dest: key_reg,
1278                });
1279                let lookup_reg = 15;
1280                let timestamp_reg = 16;
1281                let result_reg = 17;
1282
1283                ops.push(OpCode::LoadEventField {
1284                    path: lookup_field.clone(),
1285                    dest: lookup_reg,
1286                    default: None,
1287                });
1288
1289                ops.push(OpCode::LoadEventField {
1290                    path: timestamp_field.clone(),
1291                    dest: timestamp_reg,
1292                    default: None,
1293                });
1294
1295                ops.push(OpCode::LookupTemporalIndex {
1296                    state_id: self.state_id,
1297                    index_name: index_name.clone(),
1298                    lookup_value: lookup_reg,
1299                    timestamp: timestamp_reg,
1300                    dest: result_reg,
1301                });
1302
1303                ops.push(OpCode::CopyRegisterIfNull {
1304                    source: result_reg,
1305                    dest: key_reg,
1306                });
1307            }
1308        }
1309
1310        ops
1311    }
1312
1313    fn find_primary_key_transformation(
1314        &self,
1315        mappings: &[TypedFieldMapping<S>],
1316    ) -> Option<Transformation> {
1317        // Find the first primary key in the identity spec
1318        let primary_key = self.spec.identity.primary_keys.first()?;
1319        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1320
1321        // Look for a mapping that targets this primary key
1322        for mapping in mappings {
1323            // Check if this mapping targets the primary key field
1324            if mapping.target_path == *primary_key
1325                || mapping.target_path.ends_with(&format!(".{}", primary_key))
1326            {
1327                // Check mapping-level transform first
1328                if let Some(transform) = &mapping.transform {
1329                    return Some(transform.clone());
1330                }
1331
1332                // Then check source-level transform
1333                if let MappingSource::FromSource {
1334                    transform: Some(transform),
1335                    ..
1336                } = &mapping.source
1337                {
1338                    return Some(transform.clone());
1339                }
1340            }
1341        }
1342
1343        // If no explicit primary key mapping found, check AsCapture field transforms
1344        for mapping in mappings {
1345            if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1346                if let Some(transform) = field_transforms.get(&primary_field_name) {
1347                    return Some(transform.clone());
1348                }
1349            }
1350        }
1351
1352        None
1353    }
1354
1355    /// Look for primary key mappings in other handlers of the same entity
1356    /// This enables cross-handler inheritance of key transformations
1357    pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1358        let primary_key = self.spec.identity.primary_keys.first()?;
1359
1360        // Extract the field name from the primary key path (e.g., "id.authority" -> "authority")
1361        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1362
1363        // Search through all handlers in the spec for primary key mappings
1364        for handler in &self.spec.handlers {
1365            for mapping in &handler.mappings {
1366                // Look for mappings targeting the primary key
1367                if mapping.target_path == *primary_key
1368                    || mapping.target_path.ends_with(&format!(".{}", primary_key))
1369                {
1370                    // Check if this mapping comes from a field matching the primary key name
1371                    if let MappingSource::FromSource {
1372                        path, transform, ..
1373                    } = &mapping.source
1374                    {
1375                        if path.segments.last() == Some(&primary_field_name) {
1376                            // Return mapping-level transform first, then source-level transform
1377                            return mapping.transform.clone().or_else(|| transform.clone());
1378                        }
1379                    }
1380                }
1381
1382                // Also check AsCapture field transforms for the primary field
1383                if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1384                    if let Some(transform) = field_transforms.get(&primary_field_name) {
1385                        return Some(transform.clone());
1386                    }
1387                }
1388            }
1389        }
1390
1391        None
1392    }
1393
1394    /// Extract the field name from a primary key path (e.g., "id.authority" -> "authority")
1395    fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1396        // Split by '.' and take the last segment
1397        primary_key.split('.').next_back().map(|s| s.to_string())
1398    }
1399
1400    /// Auto-detect primary field from account schema when no explicit mapping exists
1401    /// This looks for account types that have an 'authority' field and tries to use it
1402    pub fn auto_detect_primary_field(
1403        &self,
1404        current_mappings: &[TypedFieldMapping<S>],
1405    ) -> Option<FieldPath> {
1406        let primary_key = self.spec.identity.primary_keys.first()?;
1407
1408        // Extract the field name from the primary key (e.g., "id.authority" -> "authority")
1409        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1410
1411        // Check if current handler can access the primary field
1412        if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1413            return Some(FieldPath::new(&[&primary_field_name]));
1414        }
1415
1416        None
1417    }
1418
1419    /// Check if the current account type has the primary field
1420    /// This is determined by looking at the mappings to see what fields are available
1421    fn current_account_has_primary_field(
1422        &self,
1423        field_name: &str,
1424        mappings: &[TypedFieldMapping<S>],
1425    ) -> bool {
1426        // Look through the mappings to see if any reference the primary field
1427        for mapping in mappings {
1428            if let MappingSource::FromSource { path, .. } = &mapping.source {
1429                // Check if this mapping sources from the primary field
1430                if path.segments.last() == Some(&field_name.to_string()) {
1431                    return true;
1432                }
1433            }
1434        }
1435
1436        false
1437    }
1438
1439    /// Check if handler has access to a specific field in its source account
1440    #[allow(dead_code)]
1441    fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1442        for mapping in mappings {
1443            if let MappingSource::FromSource { path, .. } = &mapping.source {
1444                if path.segments.last() == Some(&field_name.to_string()) {
1445                    return true;
1446                }
1447            }
1448        }
1449        false
1450    }
1451
1452    /// Check if field exists by looking at mappings (IDL-agnostic approach)
1453    /// This avoids hardcoding account schemas and uses actual mapping evidence
1454    #[allow(dead_code)]
1455    fn field_exists_in_mappings(
1456        &self,
1457        field_name: &str,
1458        mappings: &[TypedFieldMapping<S>],
1459    ) -> bool {
1460        // Look through current mappings to see if the field is referenced
1461        for mapping in mappings {
1462            if let MappingSource::FromSource { path, .. } = &mapping.source {
1463                if path.segments.last() == Some(&field_name.to_string()) {
1464                    return true;
1465                }
1466            }
1467            // Also check AsCapture field transforms
1468            if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1469                if field_transforms.contains_key(field_name) {
1470                    return true;
1471                }
1472            }
1473        }
1474        false
1475    }
1476
1477    fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1478        if field_path.segments.is_empty() {
1479            return None;
1480        }
1481
1482        let lookup_field_name = field_path.segments.last().unwrap();
1483
1484        for lookup_index in &self.spec.identity.lookup_indexes {
1485            let index_field_name = lookup_index
1486                .field_name
1487                .split('.')
1488                .next_back()
1489                .unwrap_or(&lookup_index.field_name);
1490            if index_field_name == lookup_field_name {
1491                return Some(format!("{}_lookup_index", index_field_name));
1492            }
1493        }
1494
1495        None
1496    }
1497
1498    /// Find lookup index for a Lookup key resolution by checking if there's a mapping
1499    /// from the primary_field to a lookup index field.
1500    fn find_lookup_index_for_lookup_field(
1501        &self,
1502        primary_field: &FieldPath,
1503        mappings: &[TypedFieldMapping<S>],
1504    ) -> Option<String> {
1505        // Build the primary field path string
1506        let primary_path = primary_field.segments.join(".");
1507
1508        // Check if there's a mapping from this primary field to a lookup index field
1509        for mapping in mappings {
1510            // Check if the mapping source path matches the primary field
1511            if let MappingSource::FromSource { path, .. } = &mapping.source {
1512                let source_path = path.segments.join(".");
1513                if source_path == primary_path {
1514                    // Check if the target is a lookup index field
1515                    for lookup_index in &self.spec.identity.lookup_indexes {
1516                        if mapping.target_path == lookup_index.field_name {
1517                            let index_field_name = lookup_index
1518                                .field_name
1519                                .split('.')
1520                                .next_back()
1521                                .unwrap_or(&lookup_index.field_name);
1522                            return Some(format!("{}_lookup_index", index_field_name));
1523                        }
1524                    }
1525                }
1526            }
1527        }
1528
1529        // Fall back to direct field name matching
1530        self.find_lookup_index_for_field(primary_field)
1531    }
1532
1533    /// Find the source path for a lookup index field by looking at mappings.
1534    /// For example, if target_path is "id.round_address" and the mapping is
1535    /// `id.round_address <- __account_address`, this returns ["__account_address"].
1536    fn find_source_path_for_lookup_index(
1537        &self,
1538        mappings: &[TypedFieldMapping<S>],
1539        lookup_field_name: &str,
1540    ) -> Option<Vec<String>> {
1541        for mapping in mappings {
1542            if mapping.target_path == lookup_field_name {
1543                if let MappingSource::FromSource { path, .. } = &mapping.source {
1544                    return Some(path.segments.clone());
1545                }
1546            }
1547        }
1548        None
1549    }
1550
1551    fn compile_temporal_index_update(
1552        &self,
1553        resolution: &KeyResolutionStrategy,
1554        key_reg: Register,
1555        mappings: &[TypedFieldMapping<S>],
1556    ) -> Vec<OpCode> {
1557        let mut ops = Vec::new();
1558
1559        for lookup_index in &self.spec.identity.lookup_indexes {
1560            let lookup_reg = 17;
1561            let source_field = lookup_index
1562                .field_name
1563                .split('.')
1564                .next_back()
1565                .unwrap_or(&lookup_index.field_name);
1566
1567            match resolution {
1568                KeyResolutionStrategy::Embedded { primary_field: _ } => {
1569                    // For Embedded handlers, find the mapping that targets this lookup index field
1570                    // and use its source path to load the lookup value
1571                    let source_path_opt =
1572                        self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1573
1574                    let load_path = if let Some(ref path) = source_path_opt {
1575                        FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1576                    } else {
1577                        // Fallback to source_field if no mapping found
1578                        FieldPath::new(&[source_field])
1579                    };
1580
1581                    ops.push(OpCode::LoadEventField {
1582                        path: load_path,
1583                        dest: lookup_reg,
1584                        default: None,
1585                    });
1586
1587                    if let Some(temporal_field_name) = &lookup_index.temporal_field {
1588                        let timestamp_reg = 18;
1589
1590                        ops.push(OpCode::LoadEventField {
1591                            path: FieldPath::new(&[temporal_field_name]),
1592                            dest: timestamp_reg,
1593                            default: None,
1594                        });
1595
1596                        let index_name = format!("{}_temporal_index", source_field);
1597                        ops.push(OpCode::UpdateTemporalIndex {
1598                            state_id: self.state_id,
1599                            index_name,
1600                            lookup_value: lookup_reg,
1601                            primary_key: key_reg,
1602                            timestamp: timestamp_reg,
1603                        });
1604
1605                        let simple_index_name = format!("{}_lookup_index", source_field);
1606                        ops.push(OpCode::UpdateLookupIndex {
1607                            state_id: self.state_id,
1608                            index_name: simple_index_name,
1609                            lookup_value: lookup_reg,
1610                            primary_key: key_reg,
1611                        });
1612                    } else {
1613                        let index_name = format!("{}_lookup_index", source_field);
1614                        ops.push(OpCode::UpdateLookupIndex {
1615                            state_id: self.state_id,
1616                            index_name,
1617                            lookup_value: lookup_reg,
1618                            primary_key: key_reg,
1619                        });
1620                    }
1621
1622                    // Also update PDA reverse lookup table if there's a resolver configured for this entity
1623                    // This allows instruction handlers to look up the primary key from PDA addresses
1624                    // Only do this when the source path is different (e.g., __account_address -> id.round_address)
1625                    if source_path_opt.is_some() {
1626                        ops.push(OpCode::UpdatePdaReverseLookup {
1627                            state_id: self.state_id,
1628                            lookup_name: "default_pda_lookup".to_string(),
1629                            pda_address: lookup_reg,
1630                            primary_key: key_reg,
1631                        });
1632                    }
1633                }
1634                KeyResolutionStrategy::Lookup { primary_field } => {
1635                    // For Lookup handlers, check if there's a mapping that targets this lookup index field
1636                    // If so, the lookup value is the same as the primary_field used for key resolution
1637                    let has_mapping_to_lookup_field = mappings
1638                        .iter()
1639                        .any(|m| m.target_path == lookup_index.field_name);
1640
1641                    if has_mapping_to_lookup_field {
1642                        // Load the lookup value from the event using the primary_field path
1643                        // (this is the same value used for key resolution)
1644                        let path_segments: Vec<&str> =
1645                            primary_field.segments.iter().map(|s| s.as_str()).collect();
1646                        ops.push(OpCode::LoadEventField {
1647                            path: FieldPath::new(&path_segments),
1648                            dest: lookup_reg,
1649                            default: None,
1650                        });
1651
1652                        let index_name = format!("{}_lookup_index", source_field);
1653                        ops.push(OpCode::UpdateLookupIndex {
1654                            state_id: self.state_id,
1655                            index_name,
1656                            lookup_value: lookup_reg,
1657                            primary_key: key_reg,
1658                        });
1659                    }
1660                }
1661                KeyResolutionStrategy::Computed { .. }
1662                | KeyResolutionStrategy::TemporalLookup { .. } => {
1663                    // Computed and TemporalLookup handlers don't populate lookup indexes
1664                }
1665            }
1666        }
1667
1668        ops
1669    }
1670
1671    fn get_event_type(&self, source: &SourceSpec) -> String {
1672        match source {
1673            SourceSpec::Source { type_name, .. } => type_name.clone(),
1674        }
1675    }
1676
1677    fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1678        let mut ops = Vec::new();
1679        let state_reg = 2;
1680
1681        for action in actions {
1682            match action {
1683                HookAction::SetField {
1684                    target_field,
1685                    source,
1686                    condition,
1687                } => {
1688                    // Check if there's a condition - evaluation handled in VM
1689                    let _ = condition;
1690
1691                    let temp_reg = 11; // Use register 11 for hook values
1692
1693                    // Load the source value
1694                    let load_ops = self.compile_mapping_source(source, temp_reg);
1695                    ops.extend(load_ops);
1696
1697                    // Apply transformation if specified in source
1698                    if let MappingSource::FromSource {
1699                        transform: Some(transform_type),
1700                        ..
1701                    } = source
1702                    {
1703                        ops.push(OpCode::Transform {
1704                            source: temp_reg,
1705                            dest: temp_reg,
1706                            transformation: transform_type.clone(),
1707                        });
1708                    }
1709
1710                    // Conditionally set the field based on parsed condition
1711                    if let Some(cond_expr) = condition {
1712                        if let Some(parsed) = &cond_expr.parsed {
1713                            // Generate condition check opcodes
1714                            let cond_check_ops = self.compile_condition_check(
1715                                parsed,
1716                                temp_reg,
1717                                state_reg,
1718                                target_field,
1719                            );
1720                            ops.extend(cond_check_ops);
1721                        } else {
1722                            // No parsed condition, set unconditionally
1723                            ops.push(OpCode::SetField {
1724                                object: state_reg,
1725                                path: target_field.clone(),
1726                                value: temp_reg,
1727                            });
1728                        }
1729                    } else {
1730                        // No condition, set unconditionally
1731                        ops.push(OpCode::SetField {
1732                            object: state_reg,
1733                            path: target_field.clone(),
1734                            value: temp_reg,
1735                        });
1736                    }
1737                }
1738                HookAction::IncrementField {
1739                    target_field,
1740                    increment_by,
1741                    condition,
1742                } => {
1743                    if let Some(cond_expr) = condition {
1744                        if let Some(parsed) = &cond_expr.parsed {
1745                            // For increment with condition, we need to:
1746                            // 1. Load the condition field
1747                            // 2. Check the condition
1748                            // 3. Conditionally increment
1749                            let cond_check_ops = self.compile_conditional_increment(
1750                                parsed,
1751                                state_reg,
1752                                target_field,
1753                                *increment_by,
1754                            );
1755                            ops.extend(cond_check_ops);
1756                        } else {
1757                            // No parsed condition, increment unconditionally
1758                            ops.push(OpCode::SetFieldIncrement {
1759                                object: state_reg,
1760                                path: target_field.clone(),
1761                            });
1762                        }
1763                    } else {
1764                        // No condition, increment unconditionally
1765                        ops.push(OpCode::SetFieldIncrement {
1766                            object: state_reg,
1767                            path: target_field.clone(),
1768                        });
1769                    }
1770                }
1771                HookAction::RegisterPdaMapping { .. } => {
1772                    // PDA registration is handled elsewhere (in resolvers)
1773                    // Skip for now
1774                }
1775            }
1776        }
1777
1778        ops
1779    }
1780
1781    fn compile_condition_check(
1782        &self,
1783        condition: &ParsedCondition,
1784        value_reg: Register,
1785        state_reg: Register,
1786        target_field: &str,
1787    ) -> Vec<OpCode> {
1788        match condition {
1789            ParsedCondition::Comparison {
1790                field,
1791                op,
1792                value: cond_value,
1793            } => {
1794                // Generate ConditionalSetField opcode
1795                vec![OpCode::ConditionalSetField {
1796                    object: state_reg,
1797                    path: target_field.to_string(),
1798                    value: value_reg,
1799                    condition_field: field.clone(),
1800                    condition_op: op.clone(),
1801                    condition_value: cond_value.clone(),
1802                }]
1803            }
1804            ParsedCondition::Logical { .. } => {
1805                // Logical conditions not yet supported, fall back to unconditional
1806                tracing::warn!("Logical conditions not yet supported in instruction hooks");
1807                vec![OpCode::SetField {
1808                    object: state_reg,
1809                    path: target_field.to_string(),
1810                    value: value_reg,
1811                }]
1812            }
1813        }
1814    }
1815
1816    fn compile_conditional_increment(
1817        &self,
1818        condition: &ParsedCondition,
1819        state_reg: Register,
1820        target_field: &str,
1821        _increment_by: i64,
1822    ) -> Vec<OpCode> {
1823        match condition {
1824            ParsedCondition::Comparison {
1825                field,
1826                op,
1827                value: cond_value,
1828            } => {
1829                vec![OpCode::ConditionalIncrement {
1830                    object: state_reg,
1831                    path: target_field.to_string(),
1832                    condition_field: field.clone(),
1833                    condition_op: op.clone(),
1834                    condition_value: cond_value.clone(),
1835                }]
1836            }
1837            ParsedCondition::Logical { .. } => {
1838                tracing::warn!("Logical conditions not yet supported in instruction hooks");
1839                vec![OpCode::SetFieldIncrement {
1840                    object: state_reg,
1841                    path: target_field.to_string(),
1842                }]
1843            }
1844        }
1845    }
1846}