Skip to main content

hyperstack_interpreter/
compiler.rs

1use crate::ast::*;
2use serde_json::Value;
3use std::collections::{HashMap, HashSet};
4use tracing;
5
6pub type Register = usize;
7
8fn stop_field_path(target_path: &str) -> String {
9    format!("__stop:{}", target_path)
10}
11
12#[derive(Debug, Clone)]
13pub enum OpCode {
14    LoadEventField {
15        path: FieldPath,
16        dest: Register,
17        default: Option<Value>,
18    },
19    LoadConstant {
20        value: Value,
21        dest: Register,
22    },
23    CopyRegister {
24        source: Register,
25        dest: Register,
26    },
27    /// Copy from source to dest only if dest is currently null
28    CopyRegisterIfNull {
29        source: Register,
30        dest: Register,
31    },
32    GetEventType {
33        dest: Register,
34    },
35    CreateObject {
36        dest: Register,
37    },
38    SetField {
39        object: Register,
40        path: String,
41        value: Register,
42    },
43    SetFields {
44        object: Register,
45        fields: Vec<(String, Register)>,
46    },
47    GetField {
48        object: Register,
49        path: String,
50        dest: Register,
51    },
52    ReadOrInitState {
53        state_id: u32,
54        key: Register,
55        default: Value,
56        dest: Register,
57    },
58    UpdateState {
59        state_id: u32,
60        key: Register,
61        value: Register,
62    },
63    AppendToArray {
64        object: Register,
65        path: String,
66        value: Register,
67    },
68    GetCurrentTimestamp {
69        dest: Register,
70    },
71    CreateEvent {
72        dest: Register,
73        event_value: Register,
74    },
75    CreateCapture {
76        dest: Register,
77        capture_value: Register,
78    },
79    Transform {
80        source: Register,
81        dest: Register,
82        transformation: Transformation,
83    },
84    EmitMutation {
85        entity_name: String,
86        key: Register,
87        state: Register,
88    },
89    SetFieldIfNull {
90        object: Register,
91        path: String,
92        value: Register,
93    },
94    SetFieldMax {
95        object: Register,
96        path: String,
97        value: Register,
98    },
99    UpdateTemporalIndex {
100        state_id: u32,
101        index_name: String,
102        lookup_value: Register,
103        primary_key: Register,
104        timestamp: Register,
105    },
106    LookupTemporalIndex {
107        state_id: u32,
108        index_name: String,
109        lookup_value: Register,
110        timestamp: Register,
111        dest: Register,
112    },
113    UpdateLookupIndex {
114        state_id: u32,
115        index_name: String,
116        lookup_value: Register,
117        primary_key: Register,
118    },
119    LookupIndex {
120        state_id: u32,
121        index_name: String,
122        lookup_value: Register,
123        dest: Register,
124    },
125    /// Sum a numeric value to a field (accumulator)
126    SetFieldSum {
127        object: Register,
128        path: String,
129        value: Register,
130    },
131    /// Increment a counter field by 1
132    SetFieldIncrement {
133        object: Register,
134        path: String,
135    },
136    /// Set field to minimum value
137    SetFieldMin {
138        object: Register,
139        path: String,
140        value: Register,
141    },
142    /// Set field only if a specific instruction type was seen in the same transaction.
143    /// If not seen yet, defers the operation for later completion.
144    SetFieldWhen {
145        object: Register,
146        path: String,
147        value: Register,
148        when_instruction: String,
149        entity_name: String,
150        key_reg: Register,
151        condition_field: Option<FieldPath>,
152        condition_op: Option<ComparisonOp>,
153        condition_value: Option<Value>,
154    },
155    /// Set field unless stopped by a specific instruction.
156    /// Stop is tracked by a per-entity stop flag.
157    SetFieldUnlessStopped {
158        object: Register,
159        path: String,
160        value: Register,
161        stop_field: String,
162        stop_instruction: String,
163        entity_name: String,
164        key_reg: Register,
165    },
166    /// Add value to unique set and update count
167    /// Maintains internal Set, field stores count
168    AddToUniqueSet {
169        state_id: u32,
170        set_name: String,
171        value: Register,
172        count_object: Register,
173        count_path: String,
174    },
175    /// Conditionally set a field based on a comparison
176    ConditionalSetField {
177        object: Register,
178        path: String,
179        value: Register,
180        condition_field: FieldPath,
181        condition_op: ComparisonOp,
182        condition_value: Value,
183    },
184    /// Conditionally increment a field based on a comparison
185    ConditionalIncrement {
186        object: Register,
187        path: String,
188        condition_field: FieldPath,
189        condition_op: ComparisonOp,
190        condition_value: Value,
191    },
192    /// Evaluate computed fields (calls external hook if provided)
193    /// computed_paths: List of paths that will be computed (for dirty tracking)
194    EvaluateComputedFields {
195        state: Register,
196        computed_paths: Vec<String>,
197    },
198    /// Queue a resolver for asynchronous enrichment
199    QueueResolver {
200        state_id: u32,
201        entity_name: String,
202        resolver: ResolverType,
203        input_path: Option<String>,
204        input_value: Option<Value>,
205        strategy: ResolveStrategy,
206        extracts: Vec<ResolverExtractSpec>,
207        state: Register,
208        key: Register,
209    },
210    /// Update PDA reverse lookup table
211    /// Maps a PDA address to its primary key for reverse lookups
212    UpdatePdaReverseLookup {
213        state_id: u32,
214        lookup_name: String,
215        pda_address: Register,
216        primary_key: Register,
217    },
218}
219
220pub struct EntityBytecode {
221    pub state_id: u32,
222    pub handlers: HashMap<String, Vec<OpCode>>,
223    pub entity_name: String,
224    pub when_events: HashSet<String>,
225    pub non_emitted_fields: HashSet<String>,
226    pub computed_paths: Vec<String>,
227    /// Optional callback for evaluating computed fields
228    /// Parameters: state, context_slot (Option<u64>), context_timestamp (i64)
229    #[allow(clippy::type_complexity)]
230    pub computed_fields_evaluator: Option<
231        Box<
232            dyn Fn(
233                    &mut Value,
234                    Option<u64>,
235                    i64,
236                ) -> std::result::Result<(), Box<dyn std::error::Error>>
237                + Send
238                + Sync,
239        >,
240    >,
241}
242
243impl std::fmt::Debug for EntityBytecode {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        f.debug_struct("EntityBytecode")
246            .field("state_id", &self.state_id)
247            .field("handlers", &self.handlers)
248            .field("entity_name", &self.entity_name)
249            .field("when_events", &self.when_events)
250            .field("non_emitted_fields", &self.non_emitted_fields)
251            .field("computed_paths", &self.computed_paths)
252            .field(
253                "computed_fields_evaluator",
254                &self.computed_fields_evaluator.is_some(),
255            )
256            .finish()
257    }
258}
259
260#[derive(Debug)]
261pub struct MultiEntityBytecode {
262    pub entities: HashMap<String, EntityBytecode>,
263    pub event_routing: HashMap<String, Vec<String>>,
264    pub when_events: HashSet<String>,
265    pub proto_router: crate::proto_router::ProtoRouter,
266}
267
268impl MultiEntityBytecode {
269    pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
270        let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
271        let entity_bytecode = compiler.compile_entity();
272
273        let mut entities = HashMap::new();
274        let mut event_routing = HashMap::new();
275        let mut when_events = HashSet::new();
276
277        for event_type in entity_bytecode.handlers.keys() {
278            event_routing
279                .entry(event_type.clone())
280                .or_insert_with(Vec::new)
281                .push(entity_name.clone());
282        }
283
284        when_events.extend(entity_bytecode.when_events.iter().cloned());
285
286        entities.insert(entity_name, entity_bytecode);
287
288        MultiEntityBytecode {
289            entities,
290            event_routing,
291            when_events,
292            proto_router: crate::proto_router::ProtoRouter::new(),
293        }
294    }
295
296    pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
297        let entities = HashMap::new();
298        let event_routing = HashMap::new();
299        let when_events = HashSet::new();
300
301        if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
302            panic!("from_entities requires type information - use builder pattern instead");
303        }
304
305        MultiEntityBytecode {
306            entities,
307            event_routing,
308            when_events,
309            proto_router: crate::proto_router::ProtoRouter::new(),
310        }
311    }
312
313    #[allow(clippy::new_ret_no_self)]
314    pub fn new() -> MultiEntityBytecodeBuilder {
315        MultiEntityBytecodeBuilder {
316            entities: HashMap::new(),
317            event_routing: HashMap::new(),
318            when_events: HashSet::new(),
319            proto_router: crate::proto_router::ProtoRouter::new(),
320        }
321    }
322}
323
324pub struct MultiEntityBytecodeBuilder {
325    entities: HashMap<String, EntityBytecode>,
326    event_routing: HashMap<String, Vec<String>>,
327    when_events: HashSet<String>,
328    proto_router: crate::proto_router::ProtoRouter,
329}
330
331impl MultiEntityBytecodeBuilder {
332    pub fn add_entity<S>(
333        self,
334        entity_name: String,
335        spec: TypedStreamSpec<S>,
336        state_id: u32,
337    ) -> Self {
338        self.add_entity_with_evaluator(
339            entity_name,
340            spec,
341            state_id,
342            None::<
343                fn(
344                    &mut Value,
345                    Option<u64>,
346                    i64,
347                ) -> std::result::Result<(), Box<dyn std::error::Error>>,
348            >,
349        )
350    }
351
352    pub fn add_entity_with_evaluator<S, F>(
353        mut self,
354        entity_name: String,
355        spec: TypedStreamSpec<S>,
356        state_id: u32,
357        evaluator: Option<F>,
358    ) -> Self
359    where
360        F: Fn(&mut Value, Option<u64>, i64) -> std::result::Result<(), Box<dyn std::error::Error>>
361            + Send
362            + Sync
363            + 'static,
364    {
365        let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
366        let mut entity_bytecode = compiler.compile_entity();
367
368        // Store the evaluator callback if provided
369        if let Some(eval) = evaluator {
370            entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
371        }
372
373        for event_type in entity_bytecode.handlers.keys() {
374            self.event_routing
375                .entry(event_type.clone())
376                .or_default()
377                .push(entity_name.clone());
378        }
379
380        self.when_events
381            .extend(entity_bytecode.when_events.iter().cloned());
382
383        self.entities.insert(entity_name, entity_bytecode);
384        self
385    }
386
387    pub fn build(self) -> MultiEntityBytecode {
388        MultiEntityBytecode {
389            entities: self.entities,
390            event_routing: self.event_routing,
391            when_events: self.when_events,
392            proto_router: self.proto_router,
393        }
394    }
395}
396
397pub struct TypedCompiler<S> {
398    pub spec: TypedStreamSpec<S>,
399    entity_name: String,
400    state_id: u32,
401}
402
403impl<S> TypedCompiler<S> {
404    pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
405        TypedCompiler {
406            spec,
407            entity_name,
408            state_id: 0,
409        }
410    }
411
412    pub fn with_state_id(mut self, state_id: u32) -> Self {
413        self.state_id = state_id;
414        self
415    }
416
417    pub fn compile(&self) -> MultiEntityBytecode {
418        let entity_bytecode = self.compile_entity();
419
420        let mut entities = HashMap::new();
421        let mut event_routing = HashMap::new();
422        let mut when_events = HashSet::new();
423
424        for event_type in entity_bytecode.handlers.keys() {
425            event_routing
426                .entry(event_type.clone())
427                .or_insert_with(Vec::new)
428                .push(self.entity_name.clone());
429        }
430
431        when_events.extend(entity_bytecode.when_events.iter().cloned());
432
433        entities.insert(self.entity_name.clone(), entity_bytecode);
434
435        MultiEntityBytecode {
436            entities,
437            event_routing,
438            when_events,
439            proto_router: crate::proto_router::ProtoRouter::new(),
440        }
441    }
442
443    fn compile_entity(&self) -> EntityBytecode {
444        let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
445        let mut when_events: HashSet<String> = HashSet::new();
446        let mut emit_by_path: HashMap<String, bool> = HashMap::new();
447
448        // DEBUG: Collect all handler info before processing
449        let mut debug_info = Vec::new();
450        for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
451            let event_type = self.get_event_type(&handler_spec.source);
452            let program_id = match &handler_spec.source {
453                crate::ast::SourceSpec::Source { program_id, .. } => {
454                    program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
455                }
456            };
457            debug_info.push(format!(
458                "  [{}] EventType={}, Mappings={}, ProgramId={}",
459                index,
460                event_type,
461                handler_spec.mappings.len(),
462                program_id
463            ));
464        }
465
466        // DEBUG: Log handler information (optional - can be removed later)
467        // Uncomment to debug handler processing:
468        // if self.entity_name == "PumpfunToken" {
469        //     eprintln!("🔍 Compiling {} handlers for {}", self.spec.handlers.len(), self.entity_name);
470        //     for info in &debug_info {
471        //         eprintln!("{}", info);
472        //     }
473        // }
474
475        for handler_spec in &self.spec.handlers {
476            for mapping in &handler_spec.mappings {
477                if let Some(when) = &mapping.when {
478                    when_events.insert(when.clone());
479                }
480                let entry = emit_by_path
481                    .entry(mapping.target_path.clone())
482                    .or_insert(false);
483                *entry |= mapping.emit;
484                if mapping.stop.is_some() {
485                    emit_by_path
486                        .entry(stop_field_path(&mapping.target_path))
487                        .or_insert(false);
488                }
489            }
490            let opcodes = self.compile_handler(handler_spec);
491            let event_type = self.get_event_type(&handler_spec.source);
492
493            if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
494                // Merge strategy: Take ALL operations from BOTH handlers
495                // Keep setup from first, combine all mappings, keep one teardown
496
497                // Split existing handler into: setup, mappings, teardown
498                let mut existing_setup = Vec::new();
499                let mut existing_mappings = Vec::new();
500                let mut existing_teardown = Vec::new();
501                let mut section = 0; // 0=setup, 1=mappings, 2=teardown
502
503                for opcode in existing_opcodes.iter() {
504                    match opcode {
505                        OpCode::ReadOrInitState { .. } => {
506                            existing_setup.push(opcode.clone());
507                            section = 1; // Next opcodes are mappings
508                        }
509                        OpCode::UpdateState { .. } => {
510                            existing_teardown.push(opcode.clone());
511                            section = 2; // Next opcodes are teardown
512                        }
513                        OpCode::EmitMutation { .. } => {
514                            existing_teardown.push(opcode.clone());
515                        }
516                        _ if section == 0 => existing_setup.push(opcode.clone()),
517                        _ if section == 1 => existing_mappings.push(opcode.clone()),
518                        _ => existing_teardown.push(opcode.clone()),
519                    }
520                }
521
522                // Extract mappings from new handler (skip setup and teardown)
523                let mut new_mappings = Vec::new();
524                section = 0;
525
526                for opcode in opcodes.iter() {
527                    match opcode {
528                        OpCode::ReadOrInitState { .. } => {
529                            section = 1; // Start capturing mappings
530                        }
531                        OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
532                            section = 2; // Stop capturing
533                        }
534                        _ if section == 1 => {
535                            new_mappings.push(opcode.clone());
536                        }
537                        _ => {} // Skip setup and teardown from new handler
538                    }
539                }
540
541                // Rebuild: setup + existing_mappings + new_mappings + teardown
542                let mut merged = Vec::new();
543                merged.extend(existing_setup);
544                merged.extend(existing_mappings);
545                merged.extend(new_mappings.clone());
546                merged.extend(existing_teardown);
547
548                *existing_opcodes = merged;
549            } else {
550                handlers.insert(event_type, opcodes);
551            }
552        }
553
554        // Process instruction_hooks to add SetField/IncrementField operations
555        for hook in &self.spec.instruction_hooks {
556            let event_type = hook.instruction_type.clone();
557
558            let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
559                let key_reg = 20;
560                let state_reg = 2;
561                let resolved_key_reg = 19;
562                let temp_reg = 18;
563
564                let mut ops = Vec::new();
565
566                // First, try to load __resolved_primary_key from resolver
567                ops.push(OpCode::LoadEventField {
568                    path: FieldPath::new(&["__resolved_primary_key"]),
569                    dest: resolved_key_reg,
570                    default: Some(serde_json::json!(null)),
571                });
572
573                // Copy to key_reg (unconditionally, may be null)
574                ops.push(OpCode::CopyRegister {
575                    source: resolved_key_reg,
576                    dest: key_reg,
577                });
578
579                // If hook has lookup_by, use it to load primary key from instruction accounts
580                if let Some(lookup_path) = &hook.lookup_by {
581                    // Load the primary key from the instruction's lookup_by field (e.g., accounts.signer)
582                    ops.push(OpCode::LoadEventField {
583                        path: lookup_path.clone(),
584                        dest: temp_reg,
585                        default: None,
586                    });
587
588                    // Apply HexEncode transformation (accounts are byte arrays)
589                    ops.push(OpCode::Transform {
590                        source: temp_reg,
591                        dest: temp_reg,
592                        transformation: Transformation::HexEncode,
593                    });
594
595                    // Use this as fallback if __resolved_primary_key was null
596                    ops.push(OpCode::CopyRegisterIfNull {
597                        source: temp_reg,
598                        dest: key_reg,
599                    });
600                }
601
602                ops.push(OpCode::ReadOrInitState {
603                    state_id: self.state_id,
604                    key: key_reg,
605                    default: serde_json::json!({}),
606                    dest: state_reg,
607                });
608
609                ops.push(OpCode::UpdateState {
610                    state_id: self.state_id,
611                    key: key_reg,
612                    value: state_reg,
613                });
614
615                ops
616            });
617
618            // Generate opcodes for each action in the hook
619            let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
620
621            // Insert hook opcodes before EvaluateComputedFields (if present) or UpdateState
622            // Hook actions (like whale_trade_count increment) must run before computed fields
623            // are evaluated, since computed fields may depend on the modified state
624            let insert_pos = handler_opcodes
625                .iter()
626                .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
627                .or_else(|| {
628                    handler_opcodes
629                        .iter()
630                        .position(|op| matches!(op, OpCode::UpdateState { .. }))
631                });
632
633            if let Some(pos) = insert_pos {
634                // Insert hook opcodes before EvaluateComputedFields or UpdateState
635                for (i, opcode) in hook_opcodes.into_iter().enumerate() {
636                    handler_opcodes.insert(pos + i, opcode);
637                }
638            }
639        }
640
641        let non_emitted_fields: HashSet<String> = emit_by_path
642            .into_iter()
643            .filter_map(|(path, emit)| if emit { None } else { Some(path) })
644            .collect();
645
646        EntityBytecode {
647            state_id: self.state_id,
648            handlers,
649            entity_name: self.entity_name.clone(),
650            when_events,
651            non_emitted_fields,
652            computed_paths: self.spec.computed_fields.clone(),
653            computed_fields_evaluator: None,
654        }
655    }
656
657    fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
658        let mut ops = Vec::new();
659        let state_reg = 2;
660        let key_reg = 20;
661
662        ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
663
664        ops.push(OpCode::ReadOrInitState {
665            state_id: self.state_id,
666            key: key_reg,
667            default: serde_json::json!({}),
668            dest: state_reg,
669        });
670
671        // Index updates must come AFTER ReadOrInitState so the state table exists.
672        // ReadOrInitState lazily creates the state table via entry().or_insert_with(),
673        // but index opcodes (UpdateLookupIndex, UpdateTemporalIndex, UpdatePdaReverseLookup)
674        // use get_mut() which fails if the table doesn't exist yet.
675        // This ordering also means stale/duplicate updates (caught by ReadOrInitState's
676        // recency check) correctly skip index updates too.
677        ops.extend(self.compile_temporal_index_update(
678            &spec.key_resolution,
679            key_reg,
680            &spec.mappings,
681        ));
682
683        for mapping in &spec.mappings {
684            ops.extend(self.compile_mapping(mapping, state_reg, key_reg));
685        }
686
687        ops.extend(self.compile_resolvers(state_reg, key_reg));
688
689        // Evaluate computed fields after all mappings but before updating state
690        ops.push(OpCode::EvaluateComputedFields {
691            state: state_reg,
692            computed_paths: self.spec.computed_fields.clone(),
693        });
694
695        ops.push(OpCode::UpdateState {
696            state_id: self.state_id,
697            key: key_reg,
698            value: state_reg,
699        });
700
701        if spec.emit {
702            ops.push(OpCode::EmitMutation {
703                entity_name: self.entity_name.clone(),
704                key: key_reg,
705                state: state_reg,
706            });
707        }
708
709        ops
710    }
711
712    fn compile_resolvers(&self, state_reg: Register, key_reg: Register) -> Vec<OpCode> {
713        let mut ops = Vec::new();
714
715        for resolver_spec in &self.spec.resolver_specs {
716            ops.push(OpCode::QueueResolver {
717                state_id: self.state_id,
718                entity_name: self.entity_name.clone(),
719                resolver: resolver_spec.resolver.clone(),
720                input_path: resolver_spec.input_path.clone(),
721                input_value: resolver_spec.input_value.clone(),
722                strategy: resolver_spec.strategy.clone(),
723                extracts: resolver_spec.extracts.clone(),
724                state: state_reg,
725                key: key_reg,
726            });
727        }
728
729        ops
730    }
731
732    fn compile_mapping(
733        &self,
734        mapping: &TypedFieldMapping<S>,
735        state_reg: Register,
736        key_reg: Register,
737    ) -> Vec<OpCode> {
738        let mut ops = Vec::new();
739        let temp_reg = 10;
740
741        ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
742
743        if let Some(transform) = &mapping.transform {
744            ops.push(OpCode::Transform {
745                source: temp_reg,
746                dest: temp_reg,
747                transformation: transform.clone(),
748            });
749        }
750
751        if let Some(stop_instruction) = &mapping.stop {
752            if mapping.when.is_some() {
753                tracing::warn!(
754                    "#[map] stop and when both set for {}. Ignoring when.",
755                    mapping.target_path
756                );
757            }
758            if !matches!(mapping.population, PopulationStrategy::LastWrite)
759                && !matches!(mapping.population, PopulationStrategy::Merge)
760            {
761                tracing::warn!(
762                    "#[map] stop ignores population strategy {:?}",
763                    mapping.population
764                );
765            }
766
767            ops.push(OpCode::SetFieldUnlessStopped {
768                object: state_reg,
769                path: mapping.target_path.clone(),
770                value: temp_reg,
771                stop_field: stop_field_path(&mapping.target_path),
772                stop_instruction: stop_instruction.clone(),
773                entity_name: self.entity_name.clone(),
774                key_reg,
775            });
776            return ops;
777        }
778
779        if let Some(when_instruction) = &mapping.when {
780            if !matches!(mapping.population, PopulationStrategy::LastWrite)
781                && !matches!(mapping.population, PopulationStrategy::Merge)
782            {
783                tracing::warn!(
784                    "#[map] when ignores population strategy {:?}",
785                    mapping.population
786                );
787            }
788            let (condition_field, condition_op, condition_value) = mapping
789                .condition
790                .as_ref()
791                .and_then(|cond| cond.parsed.as_ref())
792                .and_then(|parsed| match parsed {
793                    ParsedCondition::Comparison { field, op, value } => {
794                        Some((Some(field.clone()), Some(op.clone()), Some(value.clone())))
795                    }
796                    ParsedCondition::Logical { .. } => {
797                        tracing::warn!("Logical conditions not yet supported for #[map] when");
798                        None
799                    }
800                })
801                .unwrap_or((None, None, None));
802
803            ops.push(OpCode::SetFieldWhen {
804                object: state_reg,
805                path: mapping.target_path.clone(),
806                value: temp_reg,
807                when_instruction: when_instruction.clone(),
808                entity_name: self.entity_name.clone(),
809                key_reg,
810                condition_field,
811                condition_op,
812                condition_value,
813            });
814            return ops;
815        }
816
817        if let Some(condition) = &mapping.condition {
818            if let Some(parsed) = &condition.parsed {
819                match parsed {
820                    ParsedCondition::Comparison {
821                        field,
822                        op,
823                        value: cond_value,
824                    } => {
825                        if matches!(mapping.population, PopulationStrategy::LastWrite)
826                            || matches!(mapping.population, PopulationStrategy::Merge)
827                        {
828                            ops.push(OpCode::ConditionalSetField {
829                                object: state_reg,
830                                path: mapping.target_path.clone(),
831                                value: temp_reg,
832                                condition_field: field.clone(),
833                                condition_op: op.clone(),
834                                condition_value: cond_value.clone(),
835                            });
836                            return ops;
837                        }
838
839                        if matches!(mapping.population, PopulationStrategy::Count) {
840                            ops.push(OpCode::ConditionalIncrement {
841                                object: state_reg,
842                                path: mapping.target_path.clone(),
843                                condition_field: field.clone(),
844                                condition_op: op.clone(),
845                                condition_value: cond_value.clone(),
846                            });
847                            return ops;
848                        }
849
850                        tracing::warn!(
851                            "Conditional #[map] not supported for population strategy {:?}",
852                            mapping.population
853                        );
854                    }
855                    ParsedCondition::Logical { .. } => {
856                        tracing::warn!("Logical conditions not yet supported for #[map]");
857                    }
858                }
859            }
860        }
861
862        match &mapping.population {
863            PopulationStrategy::Append => {
864                ops.push(OpCode::AppendToArray {
865                    object: state_reg,
866                    path: mapping.target_path.clone(),
867                    value: temp_reg,
868                });
869            }
870            PopulationStrategy::LastWrite => {
871                ops.push(OpCode::SetField {
872                    object: state_reg,
873                    path: mapping.target_path.clone(),
874                    value: temp_reg,
875                });
876            }
877            PopulationStrategy::SetOnce => {
878                ops.push(OpCode::SetFieldIfNull {
879                    object: state_reg,
880                    path: mapping.target_path.clone(),
881                    value: temp_reg,
882                });
883            }
884            PopulationStrategy::Merge => {
885                ops.push(OpCode::SetField {
886                    object: state_reg,
887                    path: mapping.target_path.clone(),
888                    value: temp_reg,
889                });
890            }
891            PopulationStrategy::Max => {
892                ops.push(OpCode::SetFieldMax {
893                    object: state_reg,
894                    path: mapping.target_path.clone(),
895                    value: temp_reg,
896                });
897            }
898            PopulationStrategy::Sum => {
899                ops.push(OpCode::SetFieldSum {
900                    object: state_reg,
901                    path: mapping.target_path.clone(),
902                    value: temp_reg,
903                });
904            }
905            PopulationStrategy::Count => {
906                // Count doesn't need the value, just increment
907                ops.push(OpCode::SetFieldIncrement {
908                    object: state_reg,
909                    path: mapping.target_path.clone(),
910                });
911            }
912            PopulationStrategy::Min => {
913                ops.push(OpCode::SetFieldMin {
914                    object: state_reg,
915                    path: mapping.target_path.clone(),
916                    value: temp_reg,
917                });
918            }
919            PopulationStrategy::UniqueCount => {
920                // UniqueCount requires maintaining an internal set
921                // The field stores the count, but we track unique values in a hidden set
922                let set_name = format!("{}_unique_set", mapping.target_path);
923                ops.push(OpCode::AddToUniqueSet {
924                    state_id: self.state_id,
925                    set_name,
926                    value: temp_reg,
927                    count_object: state_reg,
928                    count_path: mapping.target_path.clone(),
929                });
930            }
931        }
932
933        ops
934    }
935
936    fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
937        match source {
938            MappingSource::FromSource {
939                path,
940                default,
941                transform,
942            } => {
943                let mut ops = vec![OpCode::LoadEventField {
944                    path: path.clone(),
945                    dest,
946                    default: default.clone(),
947                }];
948
949                // Apply transform if specified in the source
950                if let Some(transform_type) = transform {
951                    ops.push(OpCode::Transform {
952                        source: dest,
953                        dest,
954                        transformation: transform_type.clone(),
955                    });
956                }
957
958                ops
959            }
960            MappingSource::Constant(val) => {
961                vec![OpCode::LoadConstant {
962                    value: val.clone(),
963                    dest,
964                }]
965            }
966            MappingSource::AsEvent { fields } => {
967                let mut ops = Vec::new();
968
969                if fields.is_empty() {
970                    let event_data_reg = dest + 1;
971                    ops.push(OpCode::LoadEventField {
972                        path: FieldPath::new(&[]),
973                        dest: event_data_reg,
974                        default: Some(serde_json::json!({})),
975                    });
976                    ops.push(OpCode::CreateEvent {
977                        dest,
978                        event_value: event_data_reg,
979                    });
980                } else {
981                    let data_obj_reg = dest + 1;
982                    ops.push(OpCode::CreateObject { dest: data_obj_reg });
983
984                    let mut field_registers = Vec::new();
985                    let mut current_reg = dest + 2;
986
987                    for field_source in fields.iter() {
988                        if let MappingSource::FromSource {
989                            path,
990                            default,
991                            transform,
992                        } = &**field_source
993                        {
994                            ops.push(OpCode::LoadEventField {
995                                path: path.clone(),
996                                dest: current_reg,
997                                default: default.clone(),
998                            });
999
1000                            if let Some(transform_type) = transform {
1001                                ops.push(OpCode::Transform {
1002                                    source: current_reg,
1003                                    dest: current_reg,
1004                                    transformation: transform_type.clone(),
1005                                });
1006                            }
1007
1008                            if let Some(field_name) = path.segments.last() {
1009                                field_registers.push((field_name.clone(), current_reg));
1010                            }
1011                            current_reg += 1;
1012                        }
1013                    }
1014
1015                    if !field_registers.is_empty() {
1016                        ops.push(OpCode::SetFields {
1017                            object: data_obj_reg,
1018                            fields: field_registers,
1019                        });
1020                    }
1021
1022                    ops.push(OpCode::CreateEvent {
1023                        dest,
1024                        event_value: data_obj_reg,
1025                    });
1026                }
1027
1028                ops
1029            }
1030            MappingSource::WholeSource => {
1031                vec![OpCode::LoadEventField {
1032                    path: FieldPath::new(&[]),
1033                    dest,
1034                    default: Some(serde_json::json!({})),
1035                }]
1036            }
1037            MappingSource::AsCapture { field_transforms } => {
1038                // AsCapture loads the whole source, applies field-level transforms, and wraps in CaptureWrapper
1039                let capture_data_reg = 22; // Temp register for capture data before wrapping
1040                let mut ops = vec![OpCode::LoadEventField {
1041                    path: FieldPath::new(&[]),
1042                    dest: capture_data_reg,
1043                    default: Some(serde_json::json!({})),
1044                }];
1045
1046                // Apply transforms to specific fields in the loaded object
1047                // IMPORTANT: Use registers that don't conflict with key_reg (20)
1048                // Using 24 and 25 to avoid conflicts with key loading (uses 18, 19, 20, 23)
1049                let field_reg = 24;
1050                let transformed_reg = 25;
1051
1052                for (field_name, transform) in field_transforms {
1053                    // Load the field from the capture_data_reg (not from event!)
1054                    // Use GetField opcode to read from a register instead of LoadEventField
1055                    ops.push(OpCode::GetField {
1056                        object: capture_data_reg,
1057                        path: field_name.clone(),
1058                        dest: field_reg,
1059                    });
1060
1061                    // Transform it
1062                    ops.push(OpCode::Transform {
1063                        source: field_reg,
1064                        dest: transformed_reg,
1065                        transformation: transform.clone(),
1066                    });
1067
1068                    // Set it back into the capture data object
1069                    ops.push(OpCode::SetField {
1070                        object: capture_data_reg,
1071                        path: field_name.clone(),
1072                        value: transformed_reg,
1073                    });
1074                }
1075
1076                // Wrap the capture data in CaptureWrapper with metadata
1077                ops.push(OpCode::CreateCapture {
1078                    dest,
1079                    capture_value: capture_data_reg,
1080                });
1081
1082                ops
1083            }
1084            MappingSource::FromContext { field } => {
1085                // Load from instruction context (timestamp, slot, signature)
1086                vec![OpCode::LoadEventField {
1087                    path: FieldPath::new(&["__update_context", field.as_str()]),
1088                    dest,
1089                    default: Some(serde_json::json!(null)),
1090                }]
1091            }
1092            MappingSource::Computed { .. } => {
1093                vec![]
1094            }
1095            MappingSource::FromState { .. } => {
1096                vec![]
1097            }
1098        }
1099    }
1100
1101    pub fn compile_key_loading(
1102        &self,
1103        resolution: &KeyResolutionStrategy,
1104        key_reg: Register,
1105        mappings: &[TypedFieldMapping<S>],
1106    ) -> Vec<OpCode> {
1107        let mut ops = Vec::new();
1108
1109        // First, try to load __resolved_primary_key from resolver
1110        // This allows resolvers to override the key resolution
1111        let resolved_key_reg = 19; // Use a temp register
1112        ops.push(OpCode::LoadEventField {
1113            path: FieldPath::new(&["__resolved_primary_key"]),
1114            dest: resolved_key_reg,
1115            default: Some(serde_json::json!(null)),
1116        });
1117
1118        // Now do the normal key resolution
1119        match resolution {
1120            KeyResolutionStrategy::Embedded { primary_field } => {
1121                // Copy resolver result to key_reg (may be null)
1122                ops.push(OpCode::CopyRegister {
1123                    source: resolved_key_reg,
1124                    dest: key_reg,
1125                });
1126
1127                // Enhanced key resolution: check for auto-inheritance when primary_field is empty
1128                let effective_primary_field = if primary_field.segments.is_empty() {
1129                    // Try to auto-detect primary field from account schema
1130                    if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
1131                        auto_field
1132                    } else {
1133                        primary_field.clone()
1134                    }
1135                } else {
1136                    primary_field.clone()
1137                };
1138
1139                // Skip fallback key loading if effective primary_field is still empty
1140                // This happens for account types that rely solely on __resolved_primary_key
1141                // (e.g., accounts with #[resolve_key_for] resolvers)
1142                if !effective_primary_field.segments.is_empty() {
1143                    let temp_reg = 18;
1144                    let transform_reg = 23; // Register for transformed key
1145
1146                    ops.push(OpCode::LoadEventField {
1147                        path: effective_primary_field.clone(),
1148                        dest: temp_reg,
1149                        default: None,
1150                    });
1151
1152                    // Check if there's a transformation for the primary key field
1153                    // First try the current mappings, then inherited transformations
1154                    let primary_key_transform = self
1155                        .find_primary_key_transformation(mappings)
1156                        .or_else(|| self.find_inherited_primary_key_transformation());
1157
1158                    if let Some(transform) = primary_key_transform {
1159                        // Apply transformation to the loaded key
1160                        ops.push(OpCode::Transform {
1161                            source: temp_reg,
1162                            dest: transform_reg,
1163                            transformation: transform,
1164                        });
1165                        // Use transformed value as key
1166                        ops.push(OpCode::CopyRegisterIfNull {
1167                            source: transform_reg,
1168                            dest: key_reg,
1169                        });
1170                    } else {
1171                        // No transformation, use raw value
1172                        ops.push(OpCode::CopyRegisterIfNull {
1173                            source: temp_reg,
1174                            dest: key_reg,
1175                        });
1176                    }
1177                }
1178                // If effective_primary_field is empty, key_reg will only contain __resolved_primary_key
1179                // (loaded earlier at line 513-522), or remain null if resolver didn't set it
1180            }
1181            KeyResolutionStrategy::Lookup { primary_field } => {
1182                let lookup_reg = 15;
1183                let result_reg = 17;
1184
1185                // Prefer resolver-provided key as lookup input
1186                ops.push(OpCode::CopyRegister {
1187                    source: resolved_key_reg,
1188                    dest: lookup_reg,
1189                });
1190
1191                let temp_reg = 18;
1192                ops.push(OpCode::LoadEventField {
1193                    path: primary_field.clone(),
1194                    dest: temp_reg,
1195                    default: None,
1196                });
1197                ops.push(OpCode::CopyRegisterIfNull {
1198                    source: temp_reg,
1199                    dest: lookup_reg,
1200                });
1201
1202                let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
1203                let effective_index_name =
1204                    index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
1205
1206                ops.push(OpCode::LookupIndex {
1207                    state_id: self.state_id,
1208                    index_name: effective_index_name,
1209                    lookup_value: lookup_reg,
1210                    dest: result_reg,
1211                });
1212                // NOTE: We intentionally do NOT fall back to lookup_reg when LookupIndex returns null.
1213                // If the lookup fails (because the RoundState account hasn't been processed yet),
1214                // the result_reg will remain null, and the mutation will be skipped.
1215                // Previously we had: CopyRegisterIfNull { source: lookup_reg, dest: result_reg }
1216                // which caused the PDA address to be used as the key instead of the round_id.
1217                // This resulted in mutations with key = PDA address instead of key = primary_key.
1218
1219                // Use lookup result (may be null). Do not preserve intermediate resolver key.
1220                ops.push(OpCode::CopyRegister {
1221                    source: result_reg,
1222                    dest: key_reg,
1223                });
1224            }
1225            KeyResolutionStrategy::Computed {
1226                primary_field,
1227                compute_partition: _,
1228            } => {
1229                // Copy resolver result to key_reg (may be null)
1230                ops.push(OpCode::CopyRegister {
1231                    source: resolved_key_reg,
1232                    dest: key_reg,
1233                });
1234                let temp_reg = 18;
1235                ops.push(OpCode::LoadEventField {
1236                    path: primary_field.clone(),
1237                    dest: temp_reg,
1238                    default: None,
1239                });
1240                ops.push(OpCode::CopyRegisterIfNull {
1241                    source: temp_reg,
1242                    dest: key_reg,
1243                });
1244            }
1245            KeyResolutionStrategy::TemporalLookup {
1246                lookup_field,
1247                timestamp_field,
1248                index_name,
1249            } => {
1250                // Copy resolver result to key_reg (may be null)
1251                ops.push(OpCode::CopyRegister {
1252                    source: resolved_key_reg,
1253                    dest: key_reg,
1254                });
1255                let lookup_reg = 15;
1256                let timestamp_reg = 16;
1257                let result_reg = 17;
1258
1259                ops.push(OpCode::LoadEventField {
1260                    path: lookup_field.clone(),
1261                    dest: lookup_reg,
1262                    default: None,
1263                });
1264
1265                ops.push(OpCode::LoadEventField {
1266                    path: timestamp_field.clone(),
1267                    dest: timestamp_reg,
1268                    default: None,
1269                });
1270
1271                ops.push(OpCode::LookupTemporalIndex {
1272                    state_id: self.state_id,
1273                    index_name: index_name.clone(),
1274                    lookup_value: lookup_reg,
1275                    timestamp: timestamp_reg,
1276                    dest: result_reg,
1277                });
1278
1279                ops.push(OpCode::CopyRegisterIfNull {
1280                    source: result_reg,
1281                    dest: key_reg,
1282                });
1283            }
1284        }
1285
1286        ops
1287    }
1288
1289    fn find_primary_key_transformation(
1290        &self,
1291        mappings: &[TypedFieldMapping<S>],
1292    ) -> Option<Transformation> {
1293        // Find the first primary key in the identity spec
1294        let primary_key = self.spec.identity.primary_keys.first()?;
1295        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1296
1297        // Look for a mapping that targets this primary key
1298        for mapping in mappings {
1299            // Check if this mapping targets the primary key field
1300            if mapping.target_path == *primary_key
1301                || mapping.target_path.ends_with(&format!(".{}", primary_key))
1302            {
1303                // Check mapping-level transform first
1304                if let Some(transform) = &mapping.transform {
1305                    return Some(transform.clone());
1306                }
1307
1308                // Then check source-level transform
1309                if let MappingSource::FromSource {
1310                    transform: Some(transform),
1311                    ..
1312                } = &mapping.source
1313                {
1314                    return Some(transform.clone());
1315                }
1316            }
1317        }
1318
1319        // If no explicit primary key mapping found, check AsCapture field transforms
1320        for mapping in mappings {
1321            if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1322                if let Some(transform) = field_transforms.get(&primary_field_name) {
1323                    return Some(transform.clone());
1324                }
1325            }
1326        }
1327
1328        None
1329    }
1330
1331    /// Look for primary key mappings in other handlers of the same entity
1332    /// This enables cross-handler inheritance of key transformations
1333    pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1334        let primary_key = self.spec.identity.primary_keys.first()?;
1335
1336        // Extract the field name from the primary key path (e.g., "id.authority" -> "authority")
1337        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1338
1339        // Search through all handlers in the spec for primary key mappings
1340        for handler in &self.spec.handlers {
1341            for mapping in &handler.mappings {
1342                // Look for mappings targeting the primary key
1343                if mapping.target_path == *primary_key
1344                    || mapping.target_path.ends_with(&format!(".{}", primary_key))
1345                {
1346                    // Check if this mapping comes from a field matching the primary key name
1347                    if let MappingSource::FromSource {
1348                        path, transform, ..
1349                    } = &mapping.source
1350                    {
1351                        if path.segments.last() == Some(&primary_field_name) {
1352                            // Return mapping-level transform first, then source-level transform
1353                            return mapping.transform.clone().or_else(|| transform.clone());
1354                        }
1355                    }
1356                }
1357
1358                // Also check AsCapture field transforms for the primary field
1359                if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1360                    if let Some(transform) = field_transforms.get(&primary_field_name) {
1361                        return Some(transform.clone());
1362                    }
1363                }
1364            }
1365        }
1366
1367        None
1368    }
1369
1370    /// Extract the field name from a primary key path (e.g., "id.authority" -> "authority")
1371    fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1372        // Split by '.' and take the last segment
1373        primary_key.split('.').next_back().map(|s| s.to_string())
1374    }
1375
1376    /// Auto-detect primary field from account schema when no explicit mapping exists
1377    /// This looks for account types that have an 'authority' field and tries to use it
1378    pub fn auto_detect_primary_field(
1379        &self,
1380        current_mappings: &[TypedFieldMapping<S>],
1381    ) -> Option<FieldPath> {
1382        let primary_key = self.spec.identity.primary_keys.first()?;
1383
1384        // Extract the field name from the primary key (e.g., "id.authority" -> "authority")
1385        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1386
1387        // Check if current handler can access the primary field
1388        if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1389            return Some(FieldPath::new(&[&primary_field_name]));
1390        }
1391
1392        None
1393    }
1394
1395    /// Check if the current account type has the primary field
1396    /// This is determined by looking at the mappings to see what fields are available
1397    fn current_account_has_primary_field(
1398        &self,
1399        field_name: &str,
1400        mappings: &[TypedFieldMapping<S>],
1401    ) -> bool {
1402        // Look through the mappings to see if any reference the primary field
1403        for mapping in mappings {
1404            if let MappingSource::FromSource { path, .. } = &mapping.source {
1405                // Check if this mapping sources from the primary field
1406                if path.segments.last() == Some(&field_name.to_string()) {
1407                    return true;
1408                }
1409            }
1410        }
1411
1412        false
1413    }
1414
1415    /// Check if handler has access to a specific field in its source account
1416    #[allow(dead_code)]
1417    fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1418        for mapping in mappings {
1419            if let MappingSource::FromSource { path, .. } = &mapping.source {
1420                if path.segments.last() == Some(&field_name.to_string()) {
1421                    return true;
1422                }
1423            }
1424        }
1425        false
1426    }
1427
1428    /// Check if field exists by looking at mappings (IDL-agnostic approach)
1429    /// This avoids hardcoding account schemas and uses actual mapping evidence
1430    #[allow(dead_code)]
1431    fn field_exists_in_mappings(
1432        &self,
1433        field_name: &str,
1434        mappings: &[TypedFieldMapping<S>],
1435    ) -> bool {
1436        // Look through current mappings to see if the field is referenced
1437        for mapping in mappings {
1438            if let MappingSource::FromSource { path, .. } = &mapping.source {
1439                if path.segments.last() == Some(&field_name.to_string()) {
1440                    return true;
1441                }
1442            }
1443            // Also check AsCapture field transforms
1444            if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1445                if field_transforms.contains_key(field_name) {
1446                    return true;
1447                }
1448            }
1449        }
1450        false
1451    }
1452
1453    fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1454        if field_path.segments.is_empty() {
1455            return None;
1456        }
1457
1458        let lookup_field_name = field_path.segments.last().unwrap();
1459
1460        for lookup_index in &self.spec.identity.lookup_indexes {
1461            let index_field_name = lookup_index
1462                .field_name
1463                .split('.')
1464                .next_back()
1465                .unwrap_or(&lookup_index.field_name);
1466            if index_field_name == lookup_field_name {
1467                return Some(format!("{}_lookup_index", index_field_name));
1468            }
1469        }
1470
1471        None
1472    }
1473
1474    /// Find lookup index for a Lookup key resolution by checking if there's a mapping
1475    /// from the primary_field to a lookup index field.
1476    fn find_lookup_index_for_lookup_field(
1477        &self,
1478        primary_field: &FieldPath,
1479        mappings: &[TypedFieldMapping<S>],
1480    ) -> Option<String> {
1481        // Build the primary field path string
1482        let primary_path = primary_field.segments.join(".");
1483
1484        // Check if there's a mapping from this primary field to a lookup index field
1485        for mapping in mappings {
1486            // Check if the mapping source path matches the primary field
1487            if let MappingSource::FromSource { path, .. } = &mapping.source {
1488                let source_path = path.segments.join(".");
1489                if source_path == primary_path {
1490                    // Check if the target is a lookup index field
1491                    for lookup_index in &self.spec.identity.lookup_indexes {
1492                        if mapping.target_path == lookup_index.field_name {
1493                            let index_field_name = lookup_index
1494                                .field_name
1495                                .split('.')
1496                                .next_back()
1497                                .unwrap_or(&lookup_index.field_name);
1498                            return Some(format!("{}_lookup_index", index_field_name));
1499                        }
1500                    }
1501                }
1502            }
1503        }
1504
1505        // Fall back to direct field name matching
1506        self.find_lookup_index_for_field(primary_field)
1507    }
1508
1509    /// Find the source path for a lookup index field by looking at mappings.
1510    /// For example, if target_path is "id.round_address" and the mapping is
1511    /// `id.round_address <- __account_address`, this returns ["__account_address"].
1512    fn find_source_path_for_lookup_index(
1513        &self,
1514        mappings: &[TypedFieldMapping<S>],
1515        lookup_field_name: &str,
1516    ) -> Option<Vec<String>> {
1517        for mapping in mappings {
1518            if mapping.target_path == lookup_field_name {
1519                if let MappingSource::FromSource { path, .. } = &mapping.source {
1520                    return Some(path.segments.clone());
1521                }
1522            }
1523        }
1524        None
1525    }
1526
1527    fn compile_temporal_index_update(
1528        &self,
1529        resolution: &KeyResolutionStrategy,
1530        key_reg: Register,
1531        mappings: &[TypedFieldMapping<S>],
1532    ) -> Vec<OpCode> {
1533        let mut ops = Vec::new();
1534
1535        for lookup_index in &self.spec.identity.lookup_indexes {
1536            let lookup_reg = 17;
1537            let source_field = lookup_index
1538                .field_name
1539                .split('.')
1540                .next_back()
1541                .unwrap_or(&lookup_index.field_name);
1542
1543            match resolution {
1544                KeyResolutionStrategy::Embedded { primary_field: _ } => {
1545                    // For Embedded handlers, find the mapping that targets this lookup index field
1546                    // and use its source path to load the lookup value
1547                    let source_path_opt =
1548                        self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1549
1550                    let load_path = if let Some(ref path) = source_path_opt {
1551                        FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1552                    } else {
1553                        // Fallback to source_field if no mapping found
1554                        FieldPath::new(&[source_field])
1555                    };
1556
1557                    ops.push(OpCode::LoadEventField {
1558                        path: load_path,
1559                        dest: lookup_reg,
1560                        default: None,
1561                    });
1562
1563                    if let Some(temporal_field_name) = &lookup_index.temporal_field {
1564                        let timestamp_reg = 18;
1565
1566                        ops.push(OpCode::LoadEventField {
1567                            path: FieldPath::new(&[temporal_field_name]),
1568                            dest: timestamp_reg,
1569                            default: None,
1570                        });
1571
1572                        let index_name = format!("{}_temporal_index", source_field);
1573                        ops.push(OpCode::UpdateTemporalIndex {
1574                            state_id: self.state_id,
1575                            index_name,
1576                            lookup_value: lookup_reg,
1577                            primary_key: key_reg,
1578                            timestamp: timestamp_reg,
1579                        });
1580
1581                        let simple_index_name = format!("{}_lookup_index", source_field);
1582                        ops.push(OpCode::UpdateLookupIndex {
1583                            state_id: self.state_id,
1584                            index_name: simple_index_name,
1585                            lookup_value: lookup_reg,
1586                            primary_key: key_reg,
1587                        });
1588                    } else {
1589                        let index_name = format!("{}_lookup_index", source_field);
1590                        ops.push(OpCode::UpdateLookupIndex {
1591                            state_id: self.state_id,
1592                            index_name,
1593                            lookup_value: lookup_reg,
1594                            primary_key: key_reg,
1595                        });
1596                    }
1597
1598                    // Also update PDA reverse lookup table if there's a resolver configured for this entity
1599                    // This allows instruction handlers to look up the primary key from PDA addresses
1600                    // Only do this when the source path is different (e.g., __account_address -> id.round_address)
1601                    if source_path_opt.is_some() {
1602                        ops.push(OpCode::UpdatePdaReverseLookup {
1603                            state_id: self.state_id,
1604                            lookup_name: "default_pda_lookup".to_string(),
1605                            pda_address: lookup_reg,
1606                            primary_key: key_reg,
1607                        });
1608                    }
1609                }
1610                KeyResolutionStrategy::Lookup { primary_field } => {
1611                    // For Lookup handlers, check if there's a mapping that targets this lookup index field
1612                    // If so, the lookup value is the same as the primary_field used for key resolution
1613                    let has_mapping_to_lookup_field = mappings
1614                        .iter()
1615                        .any(|m| m.target_path == lookup_index.field_name);
1616
1617                    if has_mapping_to_lookup_field {
1618                        // Load the lookup value from the event using the primary_field path
1619                        // (this is the same value used for key resolution)
1620                        let path_segments: Vec<&str> =
1621                            primary_field.segments.iter().map(|s| s.as_str()).collect();
1622                        ops.push(OpCode::LoadEventField {
1623                            path: FieldPath::new(&path_segments),
1624                            dest: lookup_reg,
1625                            default: None,
1626                        });
1627
1628                        let index_name = format!("{}_lookup_index", source_field);
1629                        ops.push(OpCode::UpdateLookupIndex {
1630                            state_id: self.state_id,
1631                            index_name,
1632                            lookup_value: lookup_reg,
1633                            primary_key: key_reg,
1634                        });
1635                    }
1636                }
1637                KeyResolutionStrategy::Computed { .. }
1638                | KeyResolutionStrategy::TemporalLookup { .. } => {
1639                    // Computed and TemporalLookup handlers don't populate lookup indexes
1640                }
1641            }
1642        }
1643
1644        ops
1645    }
1646
1647    fn get_event_type(&self, source: &SourceSpec) -> String {
1648        match source {
1649            SourceSpec::Source { type_name, .. } => type_name.clone(),
1650        }
1651    }
1652
1653    fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1654        let mut ops = Vec::new();
1655        let state_reg = 2;
1656
1657        for action in actions {
1658            match action {
1659                HookAction::SetField {
1660                    target_field,
1661                    source,
1662                    condition,
1663                } => {
1664                    // Check if there's a condition - evaluation handled in VM
1665                    let _ = condition;
1666
1667                    let temp_reg = 11; // Use register 11 for hook values
1668
1669                    // Load the source value
1670                    let load_ops = self.compile_mapping_source(source, temp_reg);
1671                    ops.extend(load_ops);
1672
1673                    // Apply transformation if specified in source
1674                    if let MappingSource::FromSource {
1675                        transform: Some(transform_type),
1676                        ..
1677                    } = source
1678                    {
1679                        ops.push(OpCode::Transform {
1680                            source: temp_reg,
1681                            dest: temp_reg,
1682                            transformation: transform_type.clone(),
1683                        });
1684                    }
1685
1686                    // Conditionally set the field based on parsed condition
1687                    if let Some(cond_expr) = condition {
1688                        if let Some(parsed) = &cond_expr.parsed {
1689                            // Generate condition check opcodes
1690                            let cond_check_ops = self.compile_condition_check(
1691                                parsed,
1692                                temp_reg,
1693                                state_reg,
1694                                target_field,
1695                            );
1696                            ops.extend(cond_check_ops);
1697                        } else {
1698                            // No parsed condition, set unconditionally
1699                            ops.push(OpCode::SetField {
1700                                object: state_reg,
1701                                path: target_field.clone(),
1702                                value: temp_reg,
1703                            });
1704                        }
1705                    } else {
1706                        // No condition, set unconditionally
1707                        ops.push(OpCode::SetField {
1708                            object: state_reg,
1709                            path: target_field.clone(),
1710                            value: temp_reg,
1711                        });
1712                    }
1713                }
1714                HookAction::IncrementField {
1715                    target_field,
1716                    increment_by,
1717                    condition,
1718                } => {
1719                    if let Some(cond_expr) = condition {
1720                        if let Some(parsed) = &cond_expr.parsed {
1721                            // For increment with condition, we need to:
1722                            // 1. Load the condition field
1723                            // 2. Check the condition
1724                            // 3. Conditionally increment
1725                            let cond_check_ops = self.compile_conditional_increment(
1726                                parsed,
1727                                state_reg,
1728                                target_field,
1729                                *increment_by,
1730                            );
1731                            ops.extend(cond_check_ops);
1732                        } else {
1733                            // No parsed condition, increment unconditionally
1734                            ops.push(OpCode::SetFieldIncrement {
1735                                object: state_reg,
1736                                path: target_field.clone(),
1737                            });
1738                        }
1739                    } else {
1740                        // No condition, increment unconditionally
1741                        ops.push(OpCode::SetFieldIncrement {
1742                            object: state_reg,
1743                            path: target_field.clone(),
1744                        });
1745                    }
1746                }
1747                HookAction::RegisterPdaMapping { .. } => {
1748                    // PDA registration is handled elsewhere (in resolvers)
1749                    // Skip for now
1750                }
1751            }
1752        }
1753
1754        ops
1755    }
1756
1757    fn compile_condition_check(
1758        &self,
1759        condition: &ParsedCondition,
1760        value_reg: Register,
1761        state_reg: Register,
1762        target_field: &str,
1763    ) -> Vec<OpCode> {
1764        match condition {
1765            ParsedCondition::Comparison {
1766                field,
1767                op,
1768                value: cond_value,
1769            } => {
1770                // Generate ConditionalSetField opcode
1771                vec![OpCode::ConditionalSetField {
1772                    object: state_reg,
1773                    path: target_field.to_string(),
1774                    value: value_reg,
1775                    condition_field: field.clone(),
1776                    condition_op: op.clone(),
1777                    condition_value: cond_value.clone(),
1778                }]
1779            }
1780            ParsedCondition::Logical { .. } => {
1781                // Logical conditions not yet supported, fall back to unconditional
1782                tracing::warn!("Logical conditions not yet supported in instruction hooks");
1783                vec![OpCode::SetField {
1784                    object: state_reg,
1785                    path: target_field.to_string(),
1786                    value: value_reg,
1787                }]
1788            }
1789        }
1790    }
1791
1792    fn compile_conditional_increment(
1793        &self,
1794        condition: &ParsedCondition,
1795        state_reg: Register,
1796        target_field: &str,
1797        _increment_by: i64,
1798    ) -> Vec<OpCode> {
1799        match condition {
1800            ParsedCondition::Comparison {
1801                field,
1802                op,
1803                value: cond_value,
1804            } => {
1805                vec![OpCode::ConditionalIncrement {
1806                    object: state_reg,
1807                    path: target_field.to_string(),
1808                    condition_field: field.clone(),
1809                    condition_op: op.clone(),
1810                    condition_value: cond_value.clone(),
1811                }]
1812            }
1813            ParsedCondition::Logical { .. } => {
1814                tracing::warn!("Logical conditions not yet supported in instruction hooks");
1815                vec![OpCode::SetFieldIncrement {
1816                    object: state_reg,
1817                    path: target_field.to_string(),
1818                }]
1819            }
1820        }
1821    }
1822}