Skip to main content

hyperstack_interpreter/
compiler.rs

1use crate::ast::*;
2use serde_json::Value;
3use std::collections::{HashMap, HashSet};
4use tracing;
5
6pub type Register = usize;
7
8fn stop_field_path(target_path: &str) -> String {
9    format!("__stop:{}", target_path)
10}
11
12#[derive(Debug, Clone)]
13pub enum OpCode {
14    /// Abort the handler with empty mutations when the key register is null
15    /// and the event is an account-state update (not IxState / CpiEvent).
16    /// Placed immediately after key resolution so that downstream opcodes
17    /// (index updates, field mappings, resolvers, emit) never execute with
18    /// a garbage key. The null-key event is then eligible for queueing by
19    /// process_event's miss-handling logic.
20    AbortIfNullKey {
21        key: Register,
22        is_account_event: bool,
23    },
24    LoadEventField {
25        path: FieldPath,
26        dest: Register,
27        default: Option<Value>,
28    },
29    LoadConstant {
30        value: Value,
31        dest: Register,
32    },
33    CopyRegister {
34        source: Register,
35        dest: Register,
36    },
37    /// Copy from source to dest only if dest is currently null
38    CopyRegisterIfNull {
39        source: Register,
40        dest: Register,
41    },
42    GetEventType {
43        dest: Register,
44    },
45    CreateObject {
46        dest: Register,
47    },
48    SetField {
49        object: Register,
50        path: String,
51        value: Register,
52    },
53    SetFields {
54        object: Register,
55        fields: Vec<(String, Register)>,
56    },
57    GetField {
58        object: Register,
59        path: String,
60        dest: Register,
61    },
62    ReadOrInitState {
63        state_id: u32,
64        key: Register,
65        default: Value,
66        dest: Register,
67    },
68    UpdateState {
69        state_id: u32,
70        key: Register,
71        value: Register,
72    },
73    AppendToArray {
74        object: Register,
75        path: String,
76        value: Register,
77    },
78    GetCurrentTimestamp {
79        dest: Register,
80    },
81    CreateEvent {
82        dest: Register,
83        event_value: Register,
84    },
85    CreateCapture {
86        dest: Register,
87        capture_value: Register,
88    },
89    Transform {
90        source: Register,
91        dest: Register,
92        transformation: Transformation,
93    },
94    EmitMutation {
95        entity_name: String,
96        key: Register,
97        state: Register,
98    },
99    SetFieldIfNull {
100        object: Register,
101        path: String,
102        value: Register,
103    },
104    SetFieldMax {
105        object: Register,
106        path: String,
107        value: Register,
108    },
109    UpdateTemporalIndex {
110        state_id: u32,
111        index_name: String,
112        lookup_value: Register,
113        primary_key: Register,
114        timestamp: Register,
115    },
116    LookupTemporalIndex {
117        state_id: u32,
118        index_name: String,
119        lookup_value: Register,
120        timestamp: Register,
121        dest: Register,
122    },
123    UpdateLookupIndex {
124        state_id: u32,
125        index_name: String,
126        lookup_value: Register,
127        primary_key: Register,
128    },
129    LookupIndex {
130        state_id: u32,
131        index_name: String,
132        lookup_value: Register,
133        dest: Register,
134    },
135    /// Sum a numeric value to a field (accumulator)
136    SetFieldSum {
137        object: Register,
138        path: String,
139        value: Register,
140    },
141    /// Increment a counter field by 1
142    SetFieldIncrement {
143        object: Register,
144        path: String,
145    },
146    /// Set field to minimum value
147    SetFieldMin {
148        object: Register,
149        path: String,
150        value: Register,
151    },
152    /// Set field only if a specific instruction type was seen in the same transaction.
153    /// If not seen yet, defers the operation for later completion.
154    SetFieldWhen {
155        object: Register,
156        path: String,
157        value: Register,
158        when_instruction: String,
159        entity_name: String,
160        key_reg: Register,
161        condition_field: Option<FieldPath>,
162        condition_op: Option<ComparisonOp>,
163        condition_value: Option<Value>,
164    },
165    /// Set field unless stopped by a specific instruction.
166    /// Stop is tracked by a per-entity stop flag.
167    SetFieldUnlessStopped {
168        object: Register,
169        path: String,
170        value: Register,
171        stop_field: String,
172        stop_instruction: String,
173        entity_name: String,
174        key_reg: Register,
175    },
176    /// Add value to unique set and update count
177    /// Maintains internal Set, field stores count
178    AddToUniqueSet {
179        state_id: u32,
180        set_name: String,
181        value: Register,
182        count_object: Register,
183        count_path: String,
184    },
185    /// Conditionally set a field based on a comparison
186    ConditionalSetField {
187        object: Register,
188        path: String,
189        value: Register,
190        condition_field: FieldPath,
191        condition_op: ComparisonOp,
192        condition_value: Value,
193    },
194    /// Conditionally increment a field based on a comparison
195    ConditionalIncrement {
196        object: Register,
197        path: String,
198        condition_field: FieldPath,
199        condition_op: ComparisonOp,
200        condition_value: Value,
201    },
202    /// Evaluate computed fields (calls external hook if provided)
203    /// computed_paths: List of paths that will be computed (for dirty tracking)
204    EvaluateComputedFields {
205        state: Register,
206        computed_paths: Vec<String>,
207    },
208    /// Queue a resolver for asynchronous enrichment
209    QueueResolver {
210        state_id: u32,
211        entity_name: String,
212        resolver: ResolverType,
213        input_path: Option<String>,
214        input_value: Option<Value>,
215        url_template: Option<Vec<UrlTemplatePart>>,
216        strategy: ResolveStrategy,
217        extracts: Vec<ResolverExtractSpec>,
218        condition: Option<ResolverCondition>,
219        schedule_at: Option<String>,
220        state: Register,
221        key: Register,
222    },
223    /// Update PDA reverse lookup table
224    /// Maps a PDA address to its primary key for reverse lookups
225    UpdatePdaReverseLookup {
226        state_id: u32,
227        lookup_name: String,
228        pda_address: Register,
229        primary_key: Register,
230    },
231}
232
233pub struct EntityBytecode {
234    pub state_id: u32,
235    pub handlers: HashMap<String, Vec<OpCode>>,
236    pub entity_name: String,
237    pub when_events: HashSet<String>,
238    pub non_emitted_fields: HashSet<String>,
239    pub computed_paths: Vec<String>,
240    /// Optional callback for evaluating computed fields
241    /// Parameters: state, context_slot (Option<u64>), context_timestamp (i64)
242    #[allow(clippy::type_complexity)]
243    pub computed_fields_evaluator: Option<
244        Box<
245            dyn Fn(
246                    &mut Value,
247                    Option<u64>,
248                    i64,
249                ) -> std::result::Result<(), Box<dyn std::error::Error>>
250                + Send
251                + Sync,
252        >,
253    >,
254}
255
256impl std::fmt::Debug for EntityBytecode {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        f.debug_struct("EntityBytecode")
259            .field("state_id", &self.state_id)
260            .field("handlers", &self.handlers)
261            .field("entity_name", &self.entity_name)
262            .field("when_events", &self.when_events)
263            .field("non_emitted_fields", &self.non_emitted_fields)
264            .field("computed_paths", &self.computed_paths)
265            .field(
266                "computed_fields_evaluator",
267                &self.computed_fields_evaluator.is_some(),
268            )
269            .finish()
270    }
271}
272
273#[derive(Debug)]
274pub struct MultiEntityBytecode {
275    pub entities: HashMap<String, EntityBytecode>,
276    pub event_routing: HashMap<String, Vec<String>>,
277    pub when_events: HashSet<String>,
278    pub proto_router: crate::proto_router::ProtoRouter,
279}
280
281impl MultiEntityBytecode {
282    pub fn from_single<S>(entity_name: String, spec: TypedStreamSpec<S>, state_id: u32) -> Self {
283        let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
284        let entity_bytecode = compiler.compile_entity();
285
286        let mut entities = HashMap::new();
287        let mut event_routing = HashMap::new();
288        let mut when_events = HashSet::new();
289
290        for event_type in entity_bytecode.handlers.keys() {
291            event_routing
292                .entry(event_type.clone())
293                .or_insert_with(Vec::new)
294                .push(entity_name.clone());
295        }
296
297        when_events.extend(entity_bytecode.when_events.iter().cloned());
298
299        entities.insert(entity_name, entity_bytecode);
300
301        MultiEntityBytecode {
302            entities,
303            event_routing,
304            when_events,
305            proto_router: crate::proto_router::ProtoRouter::new(),
306        }
307    }
308
309    pub fn from_entities(entities_vec: Vec<(String, Box<dyn std::any::Any>, u32)>) -> Self {
310        let entities = HashMap::new();
311        let event_routing = HashMap::new();
312        let when_events = HashSet::new();
313
314        if let Some((_entity_name, _spec_any, _state_id)) = entities_vec.into_iter().next() {
315            panic!("from_entities requires type information - use builder pattern instead");
316        }
317
318        MultiEntityBytecode {
319            entities,
320            event_routing,
321            when_events,
322            proto_router: crate::proto_router::ProtoRouter::new(),
323        }
324    }
325
326    #[allow(clippy::new_ret_no_self)]
327    pub fn new() -> MultiEntityBytecodeBuilder {
328        MultiEntityBytecodeBuilder {
329            entities: HashMap::new(),
330            event_routing: HashMap::new(),
331            when_events: HashSet::new(),
332            proto_router: crate::proto_router::ProtoRouter::new(),
333        }
334    }
335}
336
337pub struct MultiEntityBytecodeBuilder {
338    entities: HashMap<String, EntityBytecode>,
339    event_routing: HashMap<String, Vec<String>>,
340    when_events: HashSet<String>,
341    proto_router: crate::proto_router::ProtoRouter,
342}
343
344impl MultiEntityBytecodeBuilder {
345    pub fn add_entity<S>(
346        self,
347        entity_name: String,
348        spec: TypedStreamSpec<S>,
349        state_id: u32,
350    ) -> Self {
351        self.add_entity_with_evaluator(
352            entity_name,
353            spec,
354            state_id,
355            None::<
356                fn(
357                    &mut Value,
358                    Option<u64>,
359                    i64,
360                ) -> std::result::Result<(), Box<dyn std::error::Error>>,
361            >,
362        )
363    }
364
365    pub fn add_entity_with_evaluator<S, F>(
366        mut self,
367        entity_name: String,
368        spec: TypedStreamSpec<S>,
369        state_id: u32,
370        evaluator: Option<F>,
371    ) -> Self
372    where
373        F: Fn(&mut Value, Option<u64>, i64) -> std::result::Result<(), Box<dyn std::error::Error>>
374            + Send
375            + Sync
376            + 'static,
377    {
378        let compiler = TypedCompiler::new(spec, entity_name.clone()).with_state_id(state_id);
379        let mut entity_bytecode = compiler.compile_entity();
380
381        // Store the evaluator callback if provided
382        if let Some(eval) = evaluator {
383            entity_bytecode.computed_fields_evaluator = Some(Box::new(eval));
384        }
385
386        for event_type in entity_bytecode.handlers.keys() {
387            self.event_routing
388                .entry(event_type.clone())
389                .or_default()
390                .push(entity_name.clone());
391        }
392
393        self.when_events
394            .extend(entity_bytecode.when_events.iter().cloned());
395
396        self.entities.insert(entity_name, entity_bytecode);
397        self
398    }
399
400    pub fn build(self) -> MultiEntityBytecode {
401        MultiEntityBytecode {
402            entities: self.entities,
403            event_routing: self.event_routing,
404            when_events: self.when_events,
405            proto_router: self.proto_router,
406        }
407    }
408}
409
410pub struct TypedCompiler<S> {
411    pub spec: TypedStreamSpec<S>,
412    entity_name: String,
413    state_id: u32,
414}
415
416impl<S> TypedCompiler<S> {
417    pub fn new(spec: TypedStreamSpec<S>, entity_name: String) -> Self {
418        TypedCompiler {
419            spec,
420            entity_name,
421            state_id: 0,
422        }
423    }
424
425    pub fn with_state_id(mut self, state_id: u32) -> Self {
426        self.state_id = state_id;
427        self
428    }
429
430    pub fn compile(&self) -> MultiEntityBytecode {
431        let entity_bytecode = self.compile_entity();
432
433        let mut entities = HashMap::new();
434        let mut event_routing = HashMap::new();
435        let mut when_events = HashSet::new();
436
437        for event_type in entity_bytecode.handlers.keys() {
438            event_routing
439                .entry(event_type.clone())
440                .or_insert_with(Vec::new)
441                .push(self.entity_name.clone());
442        }
443
444        when_events.extend(entity_bytecode.when_events.iter().cloned());
445
446        entities.insert(self.entity_name.clone(), entity_bytecode);
447
448        MultiEntityBytecode {
449            entities,
450            event_routing,
451            when_events,
452            proto_router: crate::proto_router::ProtoRouter::new(),
453        }
454    }
455
456    fn compile_entity(&self) -> EntityBytecode {
457        let mut handlers: HashMap<String, Vec<OpCode>> = HashMap::new();
458        let mut when_events: HashSet<String> = HashSet::new();
459        let mut emit_by_path: HashMap<String, bool> = HashMap::new();
460
461        // DEBUG: Collect all handler info before processing
462        let mut debug_info = Vec::new();
463        for (index, handler_spec) in self.spec.handlers.iter().enumerate() {
464            let event_type = self.get_event_type(&handler_spec.source);
465            let program_id = match &handler_spec.source {
466                crate::ast::SourceSpec::Source { program_id, .. } => {
467                    program_id.as_ref().map(|s| s.as_str()).unwrap_or("null")
468                }
469            };
470            debug_info.push(format!(
471                "  [{}] EventType={}, Mappings={}, ProgramId={}",
472                index,
473                event_type,
474                handler_spec.mappings.len(),
475                program_id
476            ));
477        }
478
479        // DEBUG: Log handler information (optional - can be removed later)
480        // Uncomment to debug handler processing:
481        // if self.entity_name == "PumpfunToken" {
482        //     eprintln!("🔍 Compiling {} handlers for {}", self.spec.handlers.len(), self.entity_name);
483        //     for info in &debug_info {
484        //         eprintln!("{}", info);
485        //     }
486        // }
487
488        for handler_spec in &self.spec.handlers {
489            for mapping in &handler_spec.mappings {
490                if let Some(when) = &mapping.when {
491                    when_events.insert(when.clone());
492                }
493                let entry = emit_by_path
494                    .entry(mapping.target_path.clone())
495                    .or_insert(false);
496                *entry |= mapping.emit;
497                if mapping.stop.is_some() {
498                    emit_by_path
499                        .entry(stop_field_path(&mapping.target_path))
500                        .or_insert(false);
501                }
502            }
503            let opcodes = self.compile_handler(handler_spec);
504            let event_type = self.get_event_type(&handler_spec.source);
505
506            if let Some(existing_opcodes) = handlers.get_mut(&event_type) {
507                // Merge strategy: Take ALL operations from BOTH handlers
508                // Keep setup from first, combine all mappings, keep one teardown
509
510                // Split existing handler into: setup, mappings, teardown
511                let mut existing_setup = Vec::new();
512                let mut existing_mappings = Vec::new();
513                let mut existing_teardown = Vec::new();
514                let mut section = 0; // 0=setup, 1=mappings, 2=teardown
515
516                for opcode in existing_opcodes.iter() {
517                    match opcode {
518                        OpCode::ReadOrInitState { .. } => {
519                            existing_setup.push(opcode.clone());
520                            section = 1; // Next opcodes are mappings
521                        }
522                        OpCode::UpdateState { .. } => {
523                            existing_teardown.push(opcode.clone());
524                            section = 2; // Next opcodes are teardown
525                        }
526                        OpCode::EmitMutation { .. } => {
527                            existing_teardown.push(opcode.clone());
528                        }
529                        _ if section == 0 => existing_setup.push(opcode.clone()),
530                        _ if section == 1 => existing_mappings.push(opcode.clone()),
531                        _ => existing_teardown.push(opcode.clone()),
532                    }
533                }
534
535                // Extract mappings from new handler (skip setup and teardown)
536                let mut new_mappings = Vec::new();
537                section = 0;
538
539                for opcode in opcodes.iter() {
540                    match opcode {
541                        OpCode::ReadOrInitState { .. } => {
542                            section = 1; // Start capturing mappings
543                        }
544                        OpCode::UpdateState { .. } | OpCode::EmitMutation { .. } => {
545                            section = 2; // Stop capturing
546                        }
547                        _ if section == 1 => {
548                            new_mappings.push(opcode.clone());
549                        }
550                        _ => {} // Skip setup and teardown from new handler
551                    }
552                }
553
554                // Rebuild: setup + existing_mappings + new_mappings + teardown
555                let mut merged = Vec::new();
556                merged.extend(existing_setup);
557                merged.extend(existing_mappings);
558                merged.extend(new_mappings.clone());
559                merged.extend(existing_teardown);
560
561                *existing_opcodes = merged;
562            } else {
563                handlers.insert(event_type, opcodes);
564            }
565        }
566
567        // Process instruction_hooks to add SetField/IncrementField operations
568        for hook in &self.spec.instruction_hooks {
569            let event_type = hook.instruction_type.clone();
570
571            let handler_opcodes = handlers.entry(event_type.clone()).or_insert_with(|| {
572                let key_reg = 20;
573                let state_reg = 2;
574                let resolved_key_reg = 19;
575                let temp_reg = 18;
576
577                let mut ops = Vec::new();
578
579                // First, try to load __resolved_primary_key from resolver
580                ops.push(OpCode::LoadEventField {
581                    path: FieldPath::new(&["__resolved_primary_key"]),
582                    dest: resolved_key_reg,
583                    default: Some(serde_json::json!(null)),
584                });
585
586                // Copy to key_reg (unconditionally, may be null)
587                ops.push(OpCode::CopyRegister {
588                    source: resolved_key_reg,
589                    dest: key_reg,
590                });
591
592                // If hook has lookup_by, use it to load primary key from instruction accounts
593                if let Some(lookup_path) = &hook.lookup_by {
594                    // Load the primary key from the instruction's lookup_by field (e.g., accounts.signer)
595                    ops.push(OpCode::LoadEventField {
596                        path: lookup_path.clone(),
597                        dest: temp_reg,
598                        default: None,
599                    });
600
601                    // Apply HexEncode transformation (accounts are byte arrays)
602                    ops.push(OpCode::Transform {
603                        source: temp_reg,
604                        dest: temp_reg,
605                        transformation: Transformation::HexEncode,
606                    });
607
608                    // Use this as fallback if __resolved_primary_key was null
609                    ops.push(OpCode::CopyRegisterIfNull {
610                        source: temp_reg,
611                        dest: key_reg,
612                    });
613                }
614
615                ops.push(OpCode::ReadOrInitState {
616                    state_id: self.state_id,
617                    key: key_reg,
618                    default: serde_json::json!({}),
619                    dest: state_reg,
620                });
621
622                ops.push(OpCode::UpdateState {
623                    state_id: self.state_id,
624                    key: key_reg,
625                    value: state_reg,
626                });
627
628                ops
629            });
630
631            // Generate opcodes for each action in the hook
632            let hook_opcodes = self.compile_instruction_hook_actions(&hook.actions);
633
634            // Insert hook opcodes before EvaluateComputedFields (if present) or UpdateState
635            // Hook actions (like whale_trade_count increment) must run before computed fields
636            // are evaluated, since computed fields may depend on the modified state
637            let insert_pos = handler_opcodes
638                .iter()
639                .position(|op| matches!(op, OpCode::EvaluateComputedFields { .. }))
640                .or_else(|| {
641                    handler_opcodes
642                        .iter()
643                        .position(|op| matches!(op, OpCode::UpdateState { .. }))
644                });
645
646            if let Some(pos) = insert_pos {
647                // Insert hook opcodes before EvaluateComputedFields or UpdateState
648                for (i, opcode) in hook_opcodes.into_iter().enumerate() {
649                    handler_opcodes.insert(pos + i, opcode);
650                }
651            }
652        }
653
654        let non_emitted_fields: HashSet<String> = emit_by_path
655            .into_iter()
656            .filter_map(|(path, emit)| if emit { None } else { Some(path) })
657            .collect();
658
659        EntityBytecode {
660            state_id: self.state_id,
661            handlers,
662            entity_name: self.entity_name.clone(),
663            when_events,
664            non_emitted_fields,
665            computed_paths: self.spec.computed_fields.clone(),
666            computed_fields_evaluator: None,
667        }
668    }
669
670    fn compile_handler(&self, spec: &TypedHandlerSpec<S>) -> Vec<OpCode> {
671        let mut ops = Vec::new();
672        let state_reg = 2;
673        let key_reg = 20;
674
675        ops.extend(self.compile_key_loading(&spec.key_resolution, key_reg, &spec.mappings));
676
677        // Guard: if key resolved to null on an account-state event, abort
678        // early with empty mutations so process_event can queue the update
679        // for later reprocessing.  Without this, downstream opcodes would
680        // create a phantom entity keyed by null and produce non-empty
681        // mutations that prevent queueing.
682        let is_account_event = matches!(
683            spec.source,
684            SourceSpec::Source {
685                is_account: true,
686                ..
687            }
688        );
689        ops.push(OpCode::AbortIfNullKey {
690            key: key_reg,
691            is_account_event,
692        });
693
694        ops.push(OpCode::ReadOrInitState {
695            state_id: self.state_id,
696            key: key_reg,
697            default: serde_json::json!({}),
698            dest: state_reg,
699        });
700
701        // Index updates must come AFTER ReadOrInitState so the state table exists.
702        // ReadOrInitState lazily creates the state table via entry().or_insert_with(),
703        // but index opcodes (UpdateLookupIndex, UpdateTemporalIndex, UpdatePdaReverseLookup)
704        // use get_mut() which fails if the table doesn't exist yet.
705        // This ordering also means stale/duplicate updates (caught by ReadOrInitState's
706        // recency check) correctly skip index updates too.
707        ops.extend(self.compile_temporal_index_update(
708            &spec.key_resolution,
709            key_reg,
710            &spec.mappings,
711        ));
712
713        for mapping in &spec.mappings {
714            ops.extend(self.compile_mapping(mapping, state_reg, key_reg));
715        }
716
717        ops.extend(self.compile_resolvers(state_reg, key_reg));
718
719        // Evaluate computed fields after all mappings but before updating state
720        ops.push(OpCode::EvaluateComputedFields {
721            state: state_reg,
722            computed_paths: self.spec.computed_fields.clone(),
723        });
724
725        ops.push(OpCode::UpdateState {
726            state_id: self.state_id,
727            key: key_reg,
728            value: state_reg,
729        });
730
731        if spec.emit {
732            ops.push(OpCode::EmitMutation {
733                entity_name: self.entity_name.clone(),
734                key: key_reg,
735                state: state_reg,
736            });
737        }
738
739        ops
740    }
741
742    fn compile_resolvers(&self, state_reg: Register, key_reg: Register) -> Vec<OpCode> {
743        let mut ops = Vec::new();
744
745        for resolver_spec in &self.spec.resolver_specs {
746            let url_template = match &resolver_spec.resolver {
747                ResolverType::Url(config) => match &config.url_source {
748                    UrlSource::Template(parts) => Some(parts.clone()),
749                    _ => None,
750                },
751                _ => None,
752            };
753
754            ops.push(OpCode::QueueResolver {
755                state_id: self.state_id,
756                entity_name: self.entity_name.clone(),
757                resolver: resolver_spec.resolver.clone(),
758                input_path: resolver_spec.input_path.clone(),
759                input_value: resolver_spec.input_value.clone(),
760                url_template,
761                strategy: resolver_spec.strategy.clone(),
762                extracts: resolver_spec.extracts.clone(),
763                condition: resolver_spec.condition.clone(),
764                schedule_at: resolver_spec.schedule_at.clone(),
765                state: state_reg,
766                key: key_reg,
767            });
768        }
769
770        ops
771    }
772
773    fn compile_mapping(
774        &self,
775        mapping: &TypedFieldMapping<S>,
776        state_reg: Register,
777        key_reg: Register,
778    ) -> Vec<OpCode> {
779        let mut ops = Vec::new();
780        let temp_reg = 10;
781
782        ops.extend(self.compile_mapping_source(&mapping.source, temp_reg));
783
784        if let Some(transform) = &mapping.transform {
785            ops.push(OpCode::Transform {
786                source: temp_reg,
787                dest: temp_reg,
788                transformation: transform.clone(),
789            });
790        }
791
792        if let Some(stop_instruction) = &mapping.stop {
793            if mapping.when.is_some() {
794                tracing::warn!(
795                    "#[map] stop and when both set for {}. Ignoring when.",
796                    mapping.target_path
797                );
798            }
799            if !matches!(mapping.population, PopulationStrategy::LastWrite)
800                && !matches!(mapping.population, PopulationStrategy::Merge)
801            {
802                tracing::warn!(
803                    "#[map] stop ignores population strategy {:?}",
804                    mapping.population
805                );
806            }
807
808            ops.push(OpCode::SetFieldUnlessStopped {
809                object: state_reg,
810                path: mapping.target_path.clone(),
811                value: temp_reg,
812                stop_field: stop_field_path(&mapping.target_path),
813                stop_instruction: stop_instruction.clone(),
814                entity_name: self.entity_name.clone(),
815                key_reg,
816            });
817            return ops;
818        }
819
820        if let Some(when_instruction) = &mapping.when {
821            if !matches!(mapping.population, PopulationStrategy::LastWrite)
822                && !matches!(mapping.population, PopulationStrategy::Merge)
823            {
824                tracing::warn!(
825                    "#[map] when ignores population strategy {:?}",
826                    mapping.population
827                );
828            }
829            let (condition_field, condition_op, condition_value) = mapping
830                .condition
831                .as_ref()
832                .and_then(|cond| cond.parsed.as_ref())
833                .and_then(|parsed| match parsed {
834                    ParsedCondition::Comparison { field, op, value } => {
835                        Some((Some(field.clone()), Some(op.clone()), Some(value.clone())))
836                    }
837                    ParsedCondition::Logical { .. } => {
838                        tracing::warn!("Logical conditions not yet supported for #[map] when");
839                        None
840                    }
841                })
842                .unwrap_or((None, None, None));
843
844            ops.push(OpCode::SetFieldWhen {
845                object: state_reg,
846                path: mapping.target_path.clone(),
847                value: temp_reg,
848                when_instruction: when_instruction.clone(),
849                entity_name: self.entity_name.clone(),
850                key_reg,
851                condition_field,
852                condition_op,
853                condition_value,
854            });
855            return ops;
856        }
857
858        if let Some(condition) = &mapping.condition {
859            if let Some(parsed) = &condition.parsed {
860                match parsed {
861                    ParsedCondition::Comparison {
862                        field,
863                        op,
864                        value: cond_value,
865                    } => {
866                        if matches!(mapping.population, PopulationStrategy::LastWrite)
867                            || matches!(mapping.population, PopulationStrategy::Merge)
868                        {
869                            ops.push(OpCode::ConditionalSetField {
870                                object: state_reg,
871                                path: mapping.target_path.clone(),
872                                value: temp_reg,
873                                condition_field: field.clone(),
874                                condition_op: op.clone(),
875                                condition_value: cond_value.clone(),
876                            });
877                            return ops;
878                        }
879
880                        if matches!(mapping.population, PopulationStrategy::Count) {
881                            ops.push(OpCode::ConditionalIncrement {
882                                object: state_reg,
883                                path: mapping.target_path.clone(),
884                                condition_field: field.clone(),
885                                condition_op: op.clone(),
886                                condition_value: cond_value.clone(),
887                            });
888                            return ops;
889                        }
890
891                        tracing::warn!(
892                            "Conditional #[map] not supported for population strategy {:?}",
893                            mapping.population
894                        );
895                    }
896                    ParsedCondition::Logical { .. } => {
897                        tracing::warn!("Logical conditions not yet supported for #[map]");
898                    }
899                }
900            }
901        }
902
903        match &mapping.population {
904            PopulationStrategy::Append => {
905                ops.push(OpCode::AppendToArray {
906                    object: state_reg,
907                    path: mapping.target_path.clone(),
908                    value: temp_reg,
909                });
910            }
911            PopulationStrategy::LastWrite => {
912                ops.push(OpCode::SetField {
913                    object: state_reg,
914                    path: mapping.target_path.clone(),
915                    value: temp_reg,
916                });
917            }
918            PopulationStrategy::SetOnce => {
919                ops.push(OpCode::SetFieldIfNull {
920                    object: state_reg,
921                    path: mapping.target_path.clone(),
922                    value: temp_reg,
923                });
924            }
925            PopulationStrategy::Merge => {
926                ops.push(OpCode::SetField {
927                    object: state_reg,
928                    path: mapping.target_path.clone(),
929                    value: temp_reg,
930                });
931            }
932            PopulationStrategy::Max => {
933                ops.push(OpCode::SetFieldMax {
934                    object: state_reg,
935                    path: mapping.target_path.clone(),
936                    value: temp_reg,
937                });
938            }
939            PopulationStrategy::Sum => {
940                ops.push(OpCode::SetFieldSum {
941                    object: state_reg,
942                    path: mapping.target_path.clone(),
943                    value: temp_reg,
944                });
945            }
946            PopulationStrategy::Count => {
947                // Count doesn't need the value, just increment
948                ops.push(OpCode::SetFieldIncrement {
949                    object: state_reg,
950                    path: mapping.target_path.clone(),
951                });
952            }
953            PopulationStrategy::Min => {
954                ops.push(OpCode::SetFieldMin {
955                    object: state_reg,
956                    path: mapping.target_path.clone(),
957                    value: temp_reg,
958                });
959            }
960            PopulationStrategy::UniqueCount => {
961                // UniqueCount requires maintaining an internal set
962                // The field stores the count, but we track unique values in a hidden set
963                let set_name = format!("{}_unique_set", mapping.target_path);
964                ops.push(OpCode::AddToUniqueSet {
965                    state_id: self.state_id,
966                    set_name,
967                    value: temp_reg,
968                    count_object: state_reg,
969                    count_path: mapping.target_path.clone(),
970                });
971            }
972        }
973
974        ops
975    }
976
977    fn compile_mapping_source(&self, source: &MappingSource, dest: Register) -> Vec<OpCode> {
978        match source {
979            MappingSource::FromSource {
980                path,
981                default,
982                transform,
983            } => {
984                let mut ops = vec![OpCode::LoadEventField {
985                    path: path.clone(),
986                    dest,
987                    default: default.clone(),
988                }];
989
990                // Apply transform if specified in the source
991                if let Some(transform_type) = transform {
992                    ops.push(OpCode::Transform {
993                        source: dest,
994                        dest,
995                        transformation: transform_type.clone(),
996                    });
997                }
998
999                ops
1000            }
1001            MappingSource::Constant(val) => {
1002                vec![OpCode::LoadConstant {
1003                    value: val.clone(),
1004                    dest,
1005                }]
1006            }
1007            MappingSource::AsEvent { fields } => {
1008                let mut ops = Vec::new();
1009
1010                if fields.is_empty() {
1011                    let event_data_reg = dest + 1;
1012                    ops.push(OpCode::LoadEventField {
1013                        path: FieldPath::new(&[]),
1014                        dest: event_data_reg,
1015                        default: Some(serde_json::json!({})),
1016                    });
1017                    ops.push(OpCode::CreateEvent {
1018                        dest,
1019                        event_value: event_data_reg,
1020                    });
1021                } else {
1022                    let data_obj_reg = dest + 1;
1023                    ops.push(OpCode::CreateObject { dest: data_obj_reg });
1024
1025                    let mut field_registers = Vec::new();
1026                    let mut current_reg = dest + 2;
1027
1028                    for field_source in fields.iter() {
1029                        if let MappingSource::FromSource {
1030                            path,
1031                            default,
1032                            transform,
1033                        } = &**field_source
1034                        {
1035                            ops.push(OpCode::LoadEventField {
1036                                path: path.clone(),
1037                                dest: current_reg,
1038                                default: default.clone(),
1039                            });
1040
1041                            if let Some(transform_type) = transform {
1042                                ops.push(OpCode::Transform {
1043                                    source: current_reg,
1044                                    dest: current_reg,
1045                                    transformation: transform_type.clone(),
1046                                });
1047                            }
1048
1049                            if let Some(field_name) = path.segments.last() {
1050                                field_registers.push((field_name.clone(), current_reg));
1051                            }
1052                            current_reg += 1;
1053                        }
1054                    }
1055
1056                    if !field_registers.is_empty() {
1057                        ops.push(OpCode::SetFields {
1058                            object: data_obj_reg,
1059                            fields: field_registers,
1060                        });
1061                    }
1062
1063                    ops.push(OpCode::CreateEvent {
1064                        dest,
1065                        event_value: data_obj_reg,
1066                    });
1067                }
1068
1069                ops
1070            }
1071            MappingSource::WholeSource => {
1072                vec![OpCode::LoadEventField {
1073                    path: FieldPath::new(&[]),
1074                    dest,
1075                    default: Some(serde_json::json!({})),
1076                }]
1077            }
1078            MappingSource::AsCapture { field_transforms } => {
1079                // AsCapture loads the whole source, applies field-level transforms, and wraps in CaptureWrapper
1080                let capture_data_reg = 22; // Temp register for capture data before wrapping
1081                let mut ops = vec![OpCode::LoadEventField {
1082                    path: FieldPath::new(&[]),
1083                    dest: capture_data_reg,
1084                    default: Some(serde_json::json!({})),
1085                }];
1086
1087                // Apply transforms to specific fields in the loaded object
1088                // IMPORTANT: Use registers that don't conflict with key_reg (20)
1089                // Using 24 and 25 to avoid conflicts with key loading (uses 18, 19, 20, 23)
1090                let field_reg = 24;
1091                let transformed_reg = 25;
1092
1093                for (field_name, transform) in field_transforms {
1094                    // Load the field from the capture_data_reg (not from event!)
1095                    // Use GetField opcode to read from a register instead of LoadEventField
1096                    ops.push(OpCode::GetField {
1097                        object: capture_data_reg,
1098                        path: field_name.clone(),
1099                        dest: field_reg,
1100                    });
1101
1102                    // Transform it
1103                    ops.push(OpCode::Transform {
1104                        source: field_reg,
1105                        dest: transformed_reg,
1106                        transformation: transform.clone(),
1107                    });
1108
1109                    // Set it back into the capture data object
1110                    ops.push(OpCode::SetField {
1111                        object: capture_data_reg,
1112                        path: field_name.clone(),
1113                        value: transformed_reg,
1114                    });
1115                }
1116
1117                // Wrap the capture data in CaptureWrapper with metadata
1118                ops.push(OpCode::CreateCapture {
1119                    dest,
1120                    capture_value: capture_data_reg,
1121                });
1122
1123                ops
1124            }
1125            MappingSource::FromContext { field } => {
1126                // Load from instruction context (timestamp, slot, signature)
1127                vec![OpCode::LoadEventField {
1128                    path: FieldPath::new(&["__update_context", field.as_str()]),
1129                    dest,
1130                    default: Some(serde_json::json!(null)),
1131                }]
1132            }
1133            MappingSource::Computed { .. } => {
1134                vec![]
1135            }
1136            MappingSource::FromState { .. } => {
1137                vec![]
1138            }
1139        }
1140    }
1141
1142    pub fn compile_key_loading(
1143        &self,
1144        resolution: &KeyResolutionStrategy,
1145        key_reg: Register,
1146        mappings: &[TypedFieldMapping<S>],
1147    ) -> Vec<OpCode> {
1148        let mut ops = Vec::new();
1149
1150        // First, try to load __resolved_primary_key from resolver
1151        // This allows resolvers to override the key resolution
1152        let resolved_key_reg = 19; // Use a temp register
1153        ops.push(OpCode::LoadEventField {
1154            path: FieldPath::new(&["__resolved_primary_key"]),
1155            dest: resolved_key_reg,
1156            default: Some(serde_json::json!(null)),
1157        });
1158
1159        // Now do the normal key resolution
1160        match resolution {
1161            KeyResolutionStrategy::Embedded { primary_field } => {
1162                // Copy resolver result to key_reg (may be null)
1163                ops.push(OpCode::CopyRegister {
1164                    source: resolved_key_reg,
1165                    dest: key_reg,
1166                });
1167
1168                // Enhanced key resolution: check for auto-inheritance when primary_field is empty
1169                let effective_primary_field = if primary_field.segments.is_empty() {
1170                    // Try to auto-detect primary field from account schema
1171                    if let Some(auto_field) = self.auto_detect_primary_field(mappings) {
1172                        auto_field
1173                    } else {
1174                        primary_field.clone()
1175                    }
1176                } else {
1177                    primary_field.clone()
1178                };
1179
1180                // Skip fallback key loading if effective primary_field is still empty
1181                // This happens for account types that rely solely on __resolved_primary_key
1182                // (e.g., accounts with #[resolve_key_for] resolvers)
1183                if !effective_primary_field.segments.is_empty() {
1184                    let temp_reg = 18;
1185                    let transform_reg = 23; // Register for transformed key
1186
1187                    ops.push(OpCode::LoadEventField {
1188                        path: effective_primary_field.clone(),
1189                        dest: temp_reg,
1190                        default: None,
1191                    });
1192
1193                    // Check if there's a transformation for the primary key field
1194                    // First try the current mappings, then inherited transformations
1195                    let primary_key_transform = self
1196                        .find_primary_key_transformation(mappings)
1197                        .or_else(|| self.find_inherited_primary_key_transformation());
1198
1199                    if let Some(transform) = primary_key_transform {
1200                        // Apply transformation to the loaded key
1201                        ops.push(OpCode::Transform {
1202                            source: temp_reg,
1203                            dest: transform_reg,
1204                            transformation: transform,
1205                        });
1206                        // Use transformed value as key
1207                        ops.push(OpCode::CopyRegisterIfNull {
1208                            source: transform_reg,
1209                            dest: key_reg,
1210                        });
1211                    } else {
1212                        // No transformation, use raw value
1213                        ops.push(OpCode::CopyRegisterIfNull {
1214                            source: temp_reg,
1215                            dest: key_reg,
1216                        });
1217                    }
1218                }
1219                // If effective_primary_field is empty, key_reg will only contain __resolved_primary_key
1220                // (loaded earlier at line 513-522), or remain null if resolver didn't set it
1221            }
1222            KeyResolutionStrategy::Lookup { primary_field } => {
1223                let lookup_reg = 15;
1224                let result_reg = 17;
1225
1226                // Prefer resolver-provided key as lookup input.
1227                // When __resolved_primary_key is set (e.g. round_address from
1228                // PDA reverse lookup), use it directly — this gives a one-hop
1229                // lookup (round_address → round_id) instead of a two-hop chain
1230                // (Var address → PDA → round_address → round_id).
1231                ops.push(OpCode::CopyRegister {
1232                    source: resolved_key_reg,
1233                    dest: lookup_reg,
1234                });
1235
1236                let temp_reg = 18;
1237                ops.push(OpCode::LoadEventField {
1238                    path: primary_field.clone(),
1239                    dest: temp_reg,
1240                    default: None,
1241                });
1242                ops.push(OpCode::CopyRegisterIfNull {
1243                    source: temp_reg,
1244                    dest: lookup_reg,
1245                });
1246
1247                let index_name = self.find_lookup_index_for_lookup_field(primary_field, mappings);
1248                let effective_index_name =
1249                    index_name.unwrap_or_else(|| "default_pda_lookup".to_string());
1250
1251                ops.push(OpCode::LookupIndex {
1252                    state_id: self.state_id,
1253                    index_name: effective_index_name,
1254                    lookup_value: lookup_reg,
1255                    dest: result_reg,
1256                });
1257                // CRITICAL: For Lookup resolution, we ONLY use the lookup result.
1258                // If the lookup fails (result_reg is null), the mutation will be skipped.
1259                // We do NOT fall back to __resolved_primary_key or the raw lookup value,
1260                // because that would create a separate entity with the wrong key.
1261                // The resolver-provided key (e.g., PDA address) is only used as the lookup input,
1262                // NOT as the entity key. The entity key must come from the lookup index.
1263                ops.push(OpCode::CopyRegister {
1264                    source: result_reg,
1265                    dest: key_reg,
1266                });
1267                // NOTE: We intentionally do NOT fall back to lookup_reg when LookupIndex returns null.
1268                // If the lookup fails (because the RoundState account hasn't been processed yet),
1269                // the result_reg will remain null, and the mutation will be skipped.
1270                // Previously we had: CopyRegisterIfNull { source: lookup_reg, dest: result_reg }
1271                // which caused the PDA address to be used as the key instead of the round_id.
1272                // This resulted in mutations with key = PDA address instead of key = primary_key.
1273
1274                // Use LookupIndex result as the primary key. Do NOT fall back to
1275                // resolved_key_reg (__resolved_primary_key) because for Lookup handlers
1276                // it contains the PDA reverse-lookup result (e.g. round_address), which
1277                // is an intermediate value, not the actual primary key (e.g. round_id).
1278                // If the LookupIndex chain returns null (round not yet indexed), key
1279                // stays null so the update is queued and reprocessed once the lookup
1280                // index is populated—matching the pre-23503ac behaviour.
1281                ops.push(OpCode::CopyRegister {
1282                    source: result_reg,
1283                    dest: key_reg,
1284                });
1285            }
1286            KeyResolutionStrategy::Computed {
1287                primary_field,
1288                compute_partition: _,
1289            } => {
1290                // Copy resolver result to key_reg (may be null)
1291                ops.push(OpCode::CopyRegister {
1292                    source: resolved_key_reg,
1293                    dest: key_reg,
1294                });
1295                let temp_reg = 18;
1296                ops.push(OpCode::LoadEventField {
1297                    path: primary_field.clone(),
1298                    dest: temp_reg,
1299                    default: None,
1300                });
1301                ops.push(OpCode::CopyRegisterIfNull {
1302                    source: temp_reg,
1303                    dest: key_reg,
1304                });
1305            }
1306            KeyResolutionStrategy::TemporalLookup {
1307                lookup_field,
1308                timestamp_field,
1309                index_name,
1310            } => {
1311                // Copy resolver result to key_reg (may be null)
1312                ops.push(OpCode::CopyRegister {
1313                    source: resolved_key_reg,
1314                    dest: key_reg,
1315                });
1316                let lookup_reg = 15;
1317                let timestamp_reg = 16;
1318                let result_reg = 17;
1319
1320                ops.push(OpCode::LoadEventField {
1321                    path: lookup_field.clone(),
1322                    dest: lookup_reg,
1323                    default: None,
1324                });
1325
1326                ops.push(OpCode::LoadEventField {
1327                    path: timestamp_field.clone(),
1328                    dest: timestamp_reg,
1329                    default: None,
1330                });
1331
1332                ops.push(OpCode::LookupTemporalIndex {
1333                    state_id: self.state_id,
1334                    index_name: index_name.clone(),
1335                    lookup_value: lookup_reg,
1336                    timestamp: timestamp_reg,
1337                    dest: result_reg,
1338                });
1339
1340                ops.push(OpCode::CopyRegisterIfNull {
1341                    source: result_reg,
1342                    dest: key_reg,
1343                });
1344            }
1345        }
1346
1347        ops
1348    }
1349
1350    fn find_primary_key_transformation(
1351        &self,
1352        mappings: &[TypedFieldMapping<S>],
1353    ) -> Option<Transformation> {
1354        // Find the first primary key in the identity spec
1355        let primary_key = self.spec.identity.primary_keys.first()?;
1356        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1357
1358        // Look for a mapping that targets this primary key
1359        for mapping in mappings {
1360            // Check if this mapping targets the primary key field
1361            if mapping.target_path == *primary_key
1362                || mapping.target_path.ends_with(&format!(".{}", primary_key))
1363            {
1364                // Check mapping-level transform first
1365                if let Some(transform) = &mapping.transform {
1366                    return Some(transform.clone());
1367                }
1368
1369                // Then check source-level transform
1370                if let MappingSource::FromSource {
1371                    transform: Some(transform),
1372                    ..
1373                } = &mapping.source
1374                {
1375                    return Some(transform.clone());
1376                }
1377            }
1378        }
1379
1380        // If no explicit primary key mapping found, check AsCapture field transforms
1381        for mapping in mappings {
1382            if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1383                if let Some(transform) = field_transforms.get(&primary_field_name) {
1384                    return Some(transform.clone());
1385                }
1386            }
1387        }
1388
1389        None
1390    }
1391
1392    /// Look for primary key mappings in other handlers of the same entity
1393    /// This enables cross-handler inheritance of key transformations
1394    pub fn find_inherited_primary_key_transformation(&self) -> Option<Transformation> {
1395        let primary_key = self.spec.identity.primary_keys.first()?;
1396
1397        // Extract the field name from the primary key path (e.g., "id.authority" -> "authority")
1398        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1399
1400        // Search through all handlers in the spec for primary key mappings
1401        for handler in &self.spec.handlers {
1402            for mapping in &handler.mappings {
1403                // Look for mappings targeting the primary key
1404                if mapping.target_path == *primary_key
1405                    || mapping.target_path.ends_with(&format!(".{}", primary_key))
1406                {
1407                    // Check if this mapping comes from a field matching the primary key name
1408                    if let MappingSource::FromSource {
1409                        path, transform, ..
1410                    } = &mapping.source
1411                    {
1412                        if path.segments.last() == Some(&primary_field_name) {
1413                            // Return mapping-level transform first, then source-level transform
1414                            return mapping.transform.clone().or_else(|| transform.clone());
1415                        }
1416                    }
1417                }
1418
1419                // Also check AsCapture field transforms for the primary field
1420                if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1421                    if let Some(transform) = field_transforms.get(&primary_field_name) {
1422                        return Some(transform.clone());
1423                    }
1424                }
1425            }
1426        }
1427
1428        None
1429    }
1430
1431    /// Extract the field name from a primary key path (e.g., "id.authority" -> "authority")
1432    fn extract_primary_field_name(&self, primary_key: &str) -> Option<String> {
1433        // Split by '.' and take the last segment
1434        primary_key.split('.').next_back().map(|s| s.to_string())
1435    }
1436
1437    /// Auto-detect primary field from account schema when no explicit mapping exists
1438    /// This looks for account types that have an 'authority' field and tries to use it
1439    pub fn auto_detect_primary_field(
1440        &self,
1441        current_mappings: &[TypedFieldMapping<S>],
1442    ) -> Option<FieldPath> {
1443        let primary_key = self.spec.identity.primary_keys.first()?;
1444
1445        // Extract the field name from the primary key (e.g., "id.authority" -> "authority")
1446        let primary_field_name = self.extract_primary_field_name(primary_key)?;
1447
1448        // Check if current handler can access the primary field
1449        if self.current_account_has_primary_field(&primary_field_name, current_mappings) {
1450            return Some(FieldPath::new(&[&primary_field_name]));
1451        }
1452
1453        None
1454    }
1455
1456    /// Check if the current account type has the primary field
1457    /// This is determined by looking at the mappings to see what fields are available
1458    fn current_account_has_primary_field(
1459        &self,
1460        field_name: &str,
1461        mappings: &[TypedFieldMapping<S>],
1462    ) -> bool {
1463        // Look through the mappings to see if any reference the primary field
1464        for mapping in mappings {
1465            if let MappingSource::FromSource { path, .. } = &mapping.source {
1466                // Check if this mapping sources from the primary field
1467                if path.segments.last() == Some(&field_name.to_string()) {
1468                    return true;
1469                }
1470            }
1471        }
1472
1473        false
1474    }
1475
1476    /// Check if handler has access to a specific field in its source account
1477    #[allow(dead_code)]
1478    fn handler_has_field(&self, field_name: &str, mappings: &[TypedFieldMapping<S>]) -> bool {
1479        for mapping in mappings {
1480            if let MappingSource::FromSource { path, .. } = &mapping.source {
1481                if path.segments.last() == Some(&field_name.to_string()) {
1482                    return true;
1483                }
1484            }
1485        }
1486        false
1487    }
1488
1489    /// Check if field exists by looking at mappings (IDL-agnostic approach)
1490    /// This avoids hardcoding account schemas and uses actual mapping evidence
1491    #[allow(dead_code)]
1492    fn field_exists_in_mappings(
1493        &self,
1494        field_name: &str,
1495        mappings: &[TypedFieldMapping<S>],
1496    ) -> bool {
1497        // Look through current mappings to see if the field is referenced
1498        for mapping in mappings {
1499            if let MappingSource::FromSource { path, .. } = &mapping.source {
1500                if path.segments.last() == Some(&field_name.to_string()) {
1501                    return true;
1502                }
1503            }
1504            // Also check AsCapture field transforms
1505            if let MappingSource::AsCapture { field_transforms } = &mapping.source {
1506                if field_transforms.contains_key(field_name) {
1507                    return true;
1508                }
1509            }
1510        }
1511        false
1512    }
1513
1514    fn find_lookup_index_for_field(&self, field_path: &FieldPath) -> Option<String> {
1515        if field_path.segments.is_empty() {
1516            return None;
1517        }
1518
1519        let lookup_field_name = field_path.segments.last().unwrap();
1520
1521        for lookup_index in &self.spec.identity.lookup_indexes {
1522            let index_field_name = lookup_index
1523                .field_name
1524                .split('.')
1525                .next_back()
1526                .unwrap_or(&lookup_index.field_name);
1527            if index_field_name == lookup_field_name {
1528                return Some(format!("{}_lookup_index", index_field_name));
1529            }
1530        }
1531
1532        None
1533    }
1534
1535    /// Find lookup index for a Lookup key resolution by checking if there's a mapping
1536    /// from the primary_field to a lookup index field.
1537    fn find_lookup_index_for_lookup_field(
1538        &self,
1539        primary_field: &FieldPath,
1540        mappings: &[TypedFieldMapping<S>],
1541    ) -> Option<String> {
1542        // Build the primary field path string
1543        let primary_path = primary_field.segments.join(".");
1544
1545        // Check if there's a mapping from this primary field to a lookup index field
1546        for mapping in mappings {
1547            // Check if the mapping source path matches the primary field
1548            if let MappingSource::FromSource { path, .. } = &mapping.source {
1549                let source_path = path.segments.join(".");
1550                if source_path == primary_path {
1551                    // Check if the target is a lookup index field
1552                    for lookup_index in &self.spec.identity.lookup_indexes {
1553                        if mapping.target_path == lookup_index.field_name {
1554                            let index_field_name = lookup_index
1555                                .field_name
1556                                .split('.')
1557                                .next_back()
1558                                .unwrap_or(&lookup_index.field_name);
1559                            return Some(format!("{}_lookup_index", index_field_name));
1560                        }
1561                    }
1562                }
1563            }
1564        }
1565
1566        // Fall back to direct field name matching
1567        self.find_lookup_index_for_field(primary_field)
1568    }
1569
1570    /// Find the source path for a lookup index field by looking at mappings.
1571    /// For example, if target_path is "id.round_address" and the mapping is
1572    /// `id.round_address <- __account_address`, this returns ["__account_address"].
1573    fn find_source_path_for_lookup_index(
1574        &self,
1575        mappings: &[TypedFieldMapping<S>],
1576        lookup_field_name: &str,
1577    ) -> Option<Vec<String>> {
1578        for mapping in mappings {
1579            if mapping.target_path == lookup_field_name {
1580                if let MappingSource::FromSource { path, .. } = &mapping.source {
1581                    return Some(path.segments.clone());
1582                }
1583            }
1584        }
1585        None
1586    }
1587
1588    fn compile_temporal_index_update(
1589        &self,
1590        resolution: &KeyResolutionStrategy,
1591        key_reg: Register,
1592        mappings: &[TypedFieldMapping<S>],
1593    ) -> Vec<OpCode> {
1594        let mut ops = Vec::new();
1595
1596        for lookup_index in &self.spec.identity.lookup_indexes {
1597            let lookup_reg = 17;
1598            let source_field = lookup_index
1599                .field_name
1600                .split('.')
1601                .next_back()
1602                .unwrap_or(&lookup_index.field_name);
1603
1604            match resolution {
1605                KeyResolutionStrategy::Embedded { primary_field: _ } => {
1606                    // For Embedded handlers, find the mapping that targets this lookup index field
1607                    // and use its source path to load the lookup value
1608                    let source_path_opt =
1609                        self.find_source_path_for_lookup_index(mappings, &lookup_index.field_name);
1610
1611                    let load_path = if let Some(ref path) = source_path_opt {
1612                        FieldPath::new(&path.iter().map(|s| s.as_str()).collect::<Vec<_>>())
1613                    } else {
1614                        // Fallback to source_field if no mapping found
1615                        FieldPath::new(&[source_field])
1616                    };
1617
1618                    ops.push(OpCode::LoadEventField {
1619                        path: load_path,
1620                        dest: lookup_reg,
1621                        default: None,
1622                    });
1623
1624                    if let Some(temporal_field_name) = &lookup_index.temporal_field {
1625                        let timestamp_reg = 18;
1626
1627                        ops.push(OpCode::LoadEventField {
1628                            path: FieldPath::new(&[temporal_field_name]),
1629                            dest: timestamp_reg,
1630                            default: None,
1631                        });
1632
1633                        let index_name = format!("{}_temporal_index", source_field);
1634                        ops.push(OpCode::UpdateTemporalIndex {
1635                            state_id: self.state_id,
1636                            index_name,
1637                            lookup_value: lookup_reg,
1638                            primary_key: key_reg,
1639                            timestamp: timestamp_reg,
1640                        });
1641
1642                        let simple_index_name = format!("{}_lookup_index", source_field);
1643                        ops.push(OpCode::UpdateLookupIndex {
1644                            state_id: self.state_id,
1645                            index_name: simple_index_name,
1646                            lookup_value: lookup_reg,
1647                            primary_key: key_reg,
1648                        });
1649                    } else {
1650                        let index_name = format!("{}_lookup_index", source_field);
1651                        ops.push(OpCode::UpdateLookupIndex {
1652                            state_id: self.state_id,
1653                            index_name,
1654                            lookup_value: lookup_reg,
1655                            primary_key: key_reg,
1656                        });
1657                    }
1658
1659                    // Also update PDA reverse lookup table if there's a resolver configured for this entity
1660                    // This allows instruction handlers to look up the primary key from PDA addresses
1661                    // Only do this when the source path is different (e.g., __account_address -> id.round_address)
1662                    if source_path_opt.is_some() {
1663                        ops.push(OpCode::UpdatePdaReverseLookup {
1664                            state_id: self.state_id,
1665                            lookup_name: "default_pda_lookup".to_string(),
1666                            pda_address: lookup_reg,
1667                            primary_key: key_reg,
1668                        });
1669                    }
1670                }
1671                KeyResolutionStrategy::Lookup { primary_field } => {
1672                    // For Lookup handlers, check if there's a mapping that targets this lookup index field
1673                    // If so, the lookup value is the same as the primary_field used for key resolution
1674                    let has_mapping_to_lookup_field = mappings
1675                        .iter()
1676                        .any(|m| m.target_path == lookup_index.field_name);
1677
1678                    if has_mapping_to_lookup_field {
1679                        // Load the lookup value from the event using the primary_field path
1680                        // (this is the same value used for key resolution)
1681                        let path_segments: Vec<&str> =
1682                            primary_field.segments.iter().map(|s| s.as_str()).collect();
1683                        ops.push(OpCode::LoadEventField {
1684                            path: FieldPath::new(&path_segments),
1685                            dest: lookup_reg,
1686                            default: None,
1687                        });
1688
1689                        let index_name = format!("{}_lookup_index", source_field);
1690                        ops.push(OpCode::UpdateLookupIndex {
1691                            state_id: self.state_id,
1692                            index_name,
1693                            lookup_value: lookup_reg,
1694                            primary_key: key_reg,
1695                        });
1696                    }
1697                }
1698                KeyResolutionStrategy::Computed { .. }
1699                | KeyResolutionStrategy::TemporalLookup { .. } => {
1700                    // Computed and TemporalLookup handlers don't populate lookup indexes
1701                }
1702            }
1703        }
1704
1705        ops
1706    }
1707
1708    fn get_event_type(&self, source: &SourceSpec) -> String {
1709        match source {
1710            SourceSpec::Source { type_name, .. } => type_name.clone(),
1711        }
1712    }
1713
1714    fn compile_instruction_hook_actions(&self, actions: &[HookAction]) -> Vec<OpCode> {
1715        let mut ops = Vec::new();
1716        let state_reg = 2;
1717
1718        for action in actions {
1719            match action {
1720                HookAction::SetField {
1721                    target_field,
1722                    source,
1723                    condition,
1724                } => {
1725                    // Check if there's a condition - evaluation handled in VM
1726                    let _ = condition;
1727
1728                    let temp_reg = 11; // Use register 11 for hook values
1729
1730                    // Load the source value
1731                    let load_ops = self.compile_mapping_source(source, temp_reg);
1732                    ops.extend(load_ops);
1733
1734                    // Apply transformation if specified in source
1735                    if let MappingSource::FromSource {
1736                        transform: Some(transform_type),
1737                        ..
1738                    } = source
1739                    {
1740                        ops.push(OpCode::Transform {
1741                            source: temp_reg,
1742                            dest: temp_reg,
1743                            transformation: transform_type.clone(),
1744                        });
1745                    }
1746
1747                    // Conditionally set the field based on parsed condition
1748                    if let Some(cond_expr) = condition {
1749                        if let Some(parsed) = &cond_expr.parsed {
1750                            // Generate condition check opcodes
1751                            let cond_check_ops = self.compile_condition_check(
1752                                parsed,
1753                                temp_reg,
1754                                state_reg,
1755                                target_field,
1756                            );
1757                            ops.extend(cond_check_ops);
1758                        } else {
1759                            // No parsed condition, set unconditionally
1760                            ops.push(OpCode::SetField {
1761                                object: state_reg,
1762                                path: target_field.clone(),
1763                                value: temp_reg,
1764                            });
1765                        }
1766                    } else {
1767                        // No condition, set unconditionally
1768                        ops.push(OpCode::SetField {
1769                            object: state_reg,
1770                            path: target_field.clone(),
1771                            value: temp_reg,
1772                        });
1773                    }
1774                }
1775                HookAction::IncrementField {
1776                    target_field,
1777                    increment_by,
1778                    condition,
1779                } => {
1780                    if let Some(cond_expr) = condition {
1781                        if let Some(parsed) = &cond_expr.parsed {
1782                            // For increment with condition, we need to:
1783                            // 1. Load the condition field
1784                            // 2. Check the condition
1785                            // 3. Conditionally increment
1786                            let cond_check_ops = self.compile_conditional_increment(
1787                                parsed,
1788                                state_reg,
1789                                target_field,
1790                                *increment_by,
1791                            );
1792                            ops.extend(cond_check_ops);
1793                        } else {
1794                            // No parsed condition, increment unconditionally
1795                            ops.push(OpCode::SetFieldIncrement {
1796                                object: state_reg,
1797                                path: target_field.clone(),
1798                            });
1799                        }
1800                    } else {
1801                        // No condition, increment unconditionally
1802                        ops.push(OpCode::SetFieldIncrement {
1803                            object: state_reg,
1804                            path: target_field.clone(),
1805                        });
1806                    }
1807                }
1808                HookAction::RegisterPdaMapping { .. } => {
1809                    // PDA registration is handled elsewhere (in resolvers)
1810                    // Skip for now
1811                }
1812            }
1813        }
1814
1815        ops
1816    }
1817
1818    fn compile_condition_check(
1819        &self,
1820        condition: &ParsedCondition,
1821        value_reg: Register,
1822        state_reg: Register,
1823        target_field: &str,
1824    ) -> Vec<OpCode> {
1825        match condition {
1826            ParsedCondition::Comparison {
1827                field,
1828                op,
1829                value: cond_value,
1830            } => {
1831                // Generate ConditionalSetField opcode
1832                vec![OpCode::ConditionalSetField {
1833                    object: state_reg,
1834                    path: target_field.to_string(),
1835                    value: value_reg,
1836                    condition_field: field.clone(),
1837                    condition_op: op.clone(),
1838                    condition_value: cond_value.clone(),
1839                }]
1840            }
1841            ParsedCondition::Logical { .. } => {
1842                // Logical conditions not yet supported, fall back to unconditional
1843                tracing::warn!("Logical conditions not yet supported in instruction hooks");
1844                vec![OpCode::SetField {
1845                    object: state_reg,
1846                    path: target_field.to_string(),
1847                    value: value_reg,
1848                }]
1849            }
1850        }
1851    }
1852
1853    fn compile_conditional_increment(
1854        &self,
1855        condition: &ParsedCondition,
1856        state_reg: Register,
1857        target_field: &str,
1858        _increment_by: i64,
1859    ) -> Vec<OpCode> {
1860        match condition {
1861            ParsedCondition::Comparison {
1862                field,
1863                op,
1864                value: cond_value,
1865            } => {
1866                vec![OpCode::ConditionalIncrement {
1867                    object: state_reg,
1868                    path: target_field.to_string(),
1869                    condition_field: field.clone(),
1870                    condition_op: op.clone(),
1871                    condition_value: cond_value.clone(),
1872                }]
1873            }
1874            ParsedCondition::Logical { .. } => {
1875                tracing::warn!("Logical conditions not yet supported in instruction hooks");
1876                vec![OpCode::SetFieldIncrement {
1877                    object: state_reg,
1878                    path: target_field.to_string(),
1879                }]
1880            }
1881        }
1882    }
1883}