hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// Additional custom metadata that can be added without breaking changes
33    pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37    /// Create a new UpdateContext with slot and signature
38    pub fn new(slot: u64, signature: String) -> Self {
39        Self {
40            slot: Some(slot),
41            signature: Some(signature),
42            timestamp: None,
43            write_version: None,
44            txn_index: None,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Create a new UpdateContext with slot, signature, and timestamp
50    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51        Self {
52            slot: Some(slot),
53            signature: Some(signature),
54            timestamp: Some(timestamp),
55            write_version: None,
56            txn_index: None,
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Create context for account updates with write_version for staleness detection
62    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63        Self {
64            slot: Some(slot),
65            signature: Some(signature),
66            timestamp: None,
67            write_version: Some(write_version),
68            txn_index: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create context for instruction updates with txn_index for staleness detection
74    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75        Self {
76            slot: Some(slot),
77            signature: Some(signature),
78            timestamp: None,
79            write_version: None,
80            txn_index: Some(txn_index),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Get the timestamp, falling back to current system time if not set
86    pub fn timestamp(&self) -> i64 {
87        self.timestamp.unwrap_or_else(|| {
88            std::time::SystemTime::now()
89                .duration_since(std::time::UNIX_EPOCH)
90                .unwrap()
91                .as_secs() as i64
92        })
93    }
94
95    /// Create an empty context (for testing or when context is not available)
96    pub fn empty() -> Self {
97        Self::default()
98    }
99
100    /// Add custom metadata
101    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102        self.metadata.insert(key, value);
103        self
104    }
105
106    /// Get metadata value
107    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108        self.metadata.get(key)
109    }
110
111    /// Convert context to JSON value for injection into event data
112    pub fn to_value(&self) -> Value {
113        let mut obj = serde_json::Map::new();
114        if let Some(slot) = self.slot {
115            obj.insert("slot".to_string(), json!(slot));
116        }
117        if let Some(ref sig) = self.signature {
118            obj.insert("signature".to_string(), json!(sig));
119        }
120        // Always include timestamp (use current time if not set)
121        obj.insert("timestamp".to_string(), json!(self.timestamp()));
122        for (key, value) in &self.metadata {
123            obj.insert(key.clone(), value.clone());
124        }
125        Value::Object(obj)
126    }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134/// Trait for evaluating computed fields
135/// Implement this in your generated spec to enable computed field evaluation
136pub trait ComputedFieldsEvaluator {
137    fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140// Pending queue configuration
141const MAX_PENDING_UPDATES_TOTAL: usize = 10_000;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 10;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
144
145// Temporal index configuration - prevents unbounded history growth
146const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
147const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 1000; // Hard cap as safety net
148
149// State table configuration
150const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 100_000;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153/// Estimate the size of a JSON value in bytes
154fn estimate_json_size(value: &Value) -> usize {
155    match value {
156        Value::Null => 4,
157        Value::Bool(_) => 5,
158        Value::Number(_) => 8,
159        Value::String(s) => s.len() + 2,
160        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
161        Value::Object(obj) => {
162            2 + obj
163                .iter()
164                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
165                .sum::<usize>()
166        }
167    }
168}
169
170#[derive(Debug, Clone)]
171pub struct CompiledPath {
172    pub segments: std::sync::Arc<[String]>,
173}
174
175impl CompiledPath {
176    pub fn new(path: &str) -> Self {
177        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
178        CompiledPath {
179            segments: segments.into(),
180        }
181    }
182
183    fn segments(&self) -> &[String] {
184        &self.segments
185    }
186}
187
188pub struct VmContext {
189    registers: Vec<RegisterValue>,
190    states: HashMap<u32, StateTable>,
191    pub instructions_executed: u64,
192    pub cache_hits: u64,
193    path_cache: HashMap<String, CompiledPath>,
194    pub pda_cache_hits: u64,
195    pub pda_cache_misses: u64,
196    pub pending_queue_size: u64,
197    /// Current update context (set during execute_handler)
198    current_context: Option<UpdateContext>,
199}
200
201#[derive(Debug)]
202pub struct LookupIndex {
203    index: DashMap<Value, Value>,
204}
205
206impl Default for LookupIndex {
207    fn default() -> Self {
208        Self::new()
209    }
210}
211
212impl LookupIndex {
213    pub fn new() -> Self {
214        LookupIndex {
215            index: DashMap::new(),
216        }
217    }
218
219    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
220        self.index.get(lookup_value).map(|v| v.clone())
221    }
222
223    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
224        self.index.insert(lookup_value, primary_key);
225    }
226
227    pub fn len(&self) -> usize {
228        self.index.len()
229    }
230
231    pub fn is_empty(&self) -> bool {
232        self.index.is_empty()
233    }
234}
235
236#[derive(Debug)]
237pub struct TemporalIndex {
238    index: DashMap<Value, Vec<(Value, i64)>>,
239}
240
241impl Default for TemporalIndex {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247impl TemporalIndex {
248    pub fn new() -> Self {
249        TemporalIndex {
250            index: DashMap::new(),
251        }
252    }
253
254    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
255        if let Some(entries) = self.index.get(lookup_value) {
256            let entries_vec = entries.value();
257            for i in (0..entries_vec.len()).rev() {
258                if entries_vec[i].1 <= timestamp {
259                    return Some(entries_vec[i].0.clone());
260                }
261            }
262        }
263        None
264    }
265
266    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
267        if let Some(entries) = self.index.get(lookup_value) {
268            let entries_vec = entries.value();
269            if let Some(last) = entries_vec.last() {
270                return Some(last.0.clone());
271            }
272        }
273        None
274    }
275
276    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
277        let lookup_key = lookup_value.clone();
278        self.index
279            .entry(lookup_value)
280            .or_default()
281            .push((primary_key, timestamp));
282
283        if let Some(mut entries) = self.index.get_mut(&lookup_key) {
284            entries.sort_by_key(|(_, ts)| *ts);
285
286            let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
287            entries.retain(|(_, ts)| *ts >= cutoff);
288
289            if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
290                let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
291                entries.drain(0..excess);
292            }
293        }
294    }
295
296    pub fn len(&self) -> usize {
297        self.index.len()
298    }
299
300    pub fn is_empty(&self) -> bool {
301        self.index.is_empty()
302    }
303
304    pub fn total_entries(&self) -> usize {
305        self.index.iter().map(|entry| entry.value().len()).sum()
306    }
307}
308
309#[derive(Debug)]
310pub struct PdaReverseLookup {
311    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
312    index: LruCache<String, String>,
313}
314
315impl PdaReverseLookup {
316    pub fn new(capacity: usize) -> Self {
317        PdaReverseLookup {
318            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
319        }
320    }
321
322    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
323        self.index.get(pda_address).cloned()
324    }
325
326    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
327        let evicted = if self.index.len() >= self.index.cap().get() {
328            self.index.peek_lru().map(|(k, _)| k.clone())
329        } else {
330            None
331        };
332
333        self.index.put(pda_address, seed_value);
334        evicted
335    }
336
337    pub fn len(&self) -> usize {
338        self.index.len()
339    }
340
341    pub fn is_empty(&self) -> bool {
342        self.index.is_empty()
343    }
344}
345
346/// Input for queueing an account update.
347#[derive(Debug, Clone)]
348pub struct QueuedAccountUpdate {
349    pub pda_address: String,
350    pub account_type: String,
351    pub account_data: Value,
352    pub slot: u64,
353    pub write_version: u64,
354    pub signature: String,
355}
356
357/// Internal representation of a pending account update with queue metadata.
358#[derive(Debug, Clone)]
359pub struct PendingAccountUpdate {
360    pub account_type: String,
361    pub pda_address: String,
362    pub account_data: Value,
363    pub slot: u64,
364    pub write_version: u64,
365    pub signature: String,
366    pub queued_at: i64,
367}
368
369#[derive(Debug, Clone)]
370pub struct PendingQueueStats {
371    pub total_updates: usize,
372    pub unique_pdas: usize,
373    pub oldest_age_seconds: i64,
374    pub largest_pda_queue_size: usize,
375    pub estimated_memory_bytes: usize,
376}
377
378#[derive(Debug, Clone, Default)]
379pub struct VmMemoryStats {
380    pub state_table_entity_count: usize,
381    pub state_table_max_entries: usize,
382    pub state_table_at_capacity: bool,
383    pub lookup_index_count: usize,
384    pub lookup_index_total_entries: usize,
385    pub temporal_index_count: usize,
386    pub temporal_index_total_entries: usize,
387    pub pda_reverse_lookup_count: usize,
388    pub pda_reverse_lookup_total_entries: usize,
389    pub pending_queue_stats: Option<PendingQueueStats>,
390    pub path_cache_size: usize,
391}
392
393#[derive(Debug, Clone, Default)]
394pub struct CleanupResult {
395    pub pending_updates_removed: usize,
396    pub temporal_entries_removed: usize,
397}
398
399#[derive(Debug, Clone)]
400pub struct CapacityWarning {
401    pub current_entries: usize,
402    pub max_entries: usize,
403    pub entries_over_limit: usize,
404}
405
406#[derive(Debug, Clone)]
407pub struct StateTableConfig {
408    pub max_entries: usize,
409    pub max_array_length: usize,
410}
411
412impl Default for StateTableConfig {
413    fn default() -> Self {
414        Self {
415            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
416            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
417        }
418    }
419}
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422struct VersionKey {
423    primary_key: String,
424    event_type: String,
425}
426
427#[derive(Debug)]
428pub struct StateTable {
429    pub data: DashMap<Value, Value>,
430    access_times: DashMap<Value, i64>,
431    pub lookup_indexes: HashMap<String, LookupIndex>,
432    pub temporal_indexes: HashMap<String, TemporalIndex>,
433    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
434    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
435    /// Tracks (slot, ordering_value) per (primary_key, event_type) for staleness detection
436    version_tracker: DashMap<VersionKey, (u64, u64)>,
437    config: StateTableConfig,
438}
439
440impl StateTable {
441    pub fn is_at_capacity(&self) -> bool {
442        self.data.len() >= self.config.max_entries
443    }
444
445    pub fn entries_over_limit(&self) -> usize {
446        self.data.len().saturating_sub(self.config.max_entries)
447    }
448
449    pub fn max_array_length(&self) -> usize {
450        self.config.max_array_length
451    }
452
453    fn touch(&self, key: &Value) {
454        let now = std::time::SystemTime::now()
455            .duration_since(std::time::UNIX_EPOCH)
456            .unwrap()
457            .as_secs() as i64;
458        self.access_times.insert(key.clone(), now);
459    }
460
461    fn evict_lru(&self, count: usize) -> usize {
462        if count == 0 || self.data.is_empty() {
463            return 0;
464        }
465
466        let mut entries: Vec<(Value, i64)> = self
467            .access_times
468            .iter()
469            .map(|entry| (entry.key().clone(), *entry.value()))
470            .collect();
471
472        entries.sort_by_key(|(_, ts)| *ts);
473
474        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
475
476        let mut evicted = 0;
477        for key in to_evict {
478            self.data.remove(&key);
479            self.access_times.remove(&key);
480            evicted += 1;
481        }
482
483        if evicted > 0 {
484            tracing::info!("Evicted {} LRU entries from state table", evicted);
485        }
486
487        evicted
488    }
489
490    pub fn insert_with_eviction(&self, key: Value, value: Value) {
491        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
492            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
493            self.evict_lru(to_evict.max(1));
494        }
495        self.data.insert(key.clone(), value);
496        self.touch(&key);
497    }
498
499    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
500        let result = self.data.get(key).map(|v| v.clone());
501        if result.is_some() {
502            self.touch(key);
503        }
504        result
505    }
506
507    /// Check if an update is fresh and update the version tracker.
508    /// Returns true if the update should be processed (is fresh).
509    /// Returns false if the update is stale and should be skipped.
510    ///
511    /// Comparison is lexicographic on (slot, ordering_value):
512    /// (100, 5) > (100, 3) > (99, 999)
513    pub fn is_fresh_update(
514        &self,
515        primary_key: &Value,
516        event_type: &str,
517        slot: u64,
518        ordering_value: u64,
519    ) -> bool {
520        let key = VersionKey {
521            primary_key: primary_key.to_string(),
522            event_type: event_type.to_string(),
523        };
524
525        let dominated = self
526            .version_tracker
527            .get(&key)
528            .map(|entry| {
529                let (last_slot, last_version) = *entry;
530                (slot, ordering_value) <= (last_slot, last_version)
531            })
532            .unwrap_or(false);
533
534        if dominated {
535            return false;
536        }
537
538        self.version_tracker.insert(key, (slot, ordering_value));
539        true
540    }
541}
542
543impl VmContext {
544    pub fn new() -> Self {
545        let mut vm = VmContext {
546            registers: vec![Value::Null; 256],
547            states: HashMap::new(),
548            instructions_executed: 0,
549            cache_hits: 0,
550            path_cache: HashMap::new(),
551            pda_cache_hits: 0,
552            pda_cache_misses: 0,
553            pending_queue_size: 0,
554            current_context: None,
555        };
556        vm.states.insert(
557            0,
558            StateTable {
559                data: DashMap::new(),
560                access_times: DashMap::new(),
561                lookup_indexes: HashMap::new(),
562                temporal_indexes: HashMap::new(),
563                pda_reverse_lookups: HashMap::new(),
564                pending_updates: DashMap::new(),
565                version_tracker: DashMap::new(),
566                config: StateTableConfig::default(),
567            },
568        );
569        vm
570    }
571
572    pub fn new_with_config(state_config: StateTableConfig) -> Self {
573        let mut vm = VmContext {
574            registers: vec![Value::Null; 256],
575            states: HashMap::new(),
576            instructions_executed: 0,
577            cache_hits: 0,
578            path_cache: HashMap::new(),
579            pda_cache_hits: 0,
580            pda_cache_misses: 0,
581            pending_queue_size: 0,
582            current_context: None,
583        };
584        vm.states.insert(
585            0,
586            StateTable {
587                data: DashMap::new(),
588                access_times: DashMap::new(),
589                lookup_indexes: HashMap::new(),
590                temporal_indexes: HashMap::new(),
591                pda_reverse_lookups: HashMap::new(),
592                pending_updates: DashMap::new(),
593                version_tracker: DashMap::new(),
594                config: state_config,
595            },
596        );
597        vm
598    }
599
600    /// Get a mutable reference to a state table by ID
601    /// Returns None if the state ID doesn't exist
602    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
603        self.states.get_mut(&state_id)
604    }
605
606    /// Get public access to registers (for metrics context)
607    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
608        &mut self.registers
609    }
610
611    /// Get public access to path cache (for metrics context)
612    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
613        &self.path_cache
614    }
615
616    /// Get the current update context
617    pub fn current_context(&self) -> Option<&UpdateContext> {
618        self.current_context.as_ref()
619    }
620
621    pub fn update_state_from_register(
622        &mut self,
623        state_id: u32,
624        key: Value,
625        register: Register,
626    ) -> Result<()> {
627        let state = self.states.get(&state_id).ok_or("State table not found")?;
628        let value = self.registers[register].clone();
629        state.insert_with_eviction(key, value);
630        Ok(())
631    }
632
633    fn reset_registers(&mut self) {
634        for reg in &mut self.registers {
635            *reg = Value::Null;
636        }
637    }
638
639    /// Extract only the dirty fields from state (public for use by instruction hooks)
640    pub fn extract_partial_state(
641        &self,
642        state_reg: Register,
643        dirty_fields: &HashSet<String>,
644    ) -> Result<Value> {
645        let full_state = &self.registers[state_reg];
646
647        if dirty_fields.is_empty() {
648            return Ok(json!({}));
649        }
650
651        let mut partial = serde_json::Map::new();
652
653        for path in dirty_fields {
654            let segments: Vec<&str> = path.split('.').collect();
655
656            let mut current = full_state;
657            let mut found = true;
658
659            for segment in &segments {
660                match current.get(segment) {
661                    Some(v) => current = v,
662                    None => {
663                        found = false;
664                        break;
665                    }
666                }
667            }
668
669            if !found {
670                continue;
671            }
672
673            let mut target = &mut partial;
674            for (i, segment) in segments.iter().enumerate() {
675                if i == segments.len() - 1 {
676                    target.insert(segment.to_string(), current.clone());
677                } else {
678                    target
679                        .entry(segment.to_string())
680                        .or_insert_with(|| json!({}));
681                    target = target
682                        .get_mut(*segment)
683                        .and_then(|v| v.as_object_mut())
684                        .ok_or("Failed to build nested structure")?;
685                }
686            }
687        }
688
689        Ok(Value::Object(partial))
690    }
691
692    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
693        if let Some(compiled) = self.path_cache.get(path) {
694            self.cache_hits += 1;
695            return compiled.clone();
696        }
697        let compiled = CompiledPath::new(path);
698        self.path_cache.insert(path.to_string(), compiled.clone());
699        compiled
700    }
701
702    /// Process an event with optional context metadata
703    #[cfg_attr(feature = "otel", instrument(
704        name = "vm.process_event",
705        skip(self, bytecode, event_value, context),
706        fields(
707            event_type = %event_type,
708            slot = context.as_ref().and_then(|c| c.slot),
709        )
710    ))]
711    pub fn process_event_with_context(
712        &mut self,
713        bytecode: &MultiEntityBytecode,
714        mut event_value: Value,
715        event_type: &str,
716        context: Option<&UpdateContext>,
717    ) -> Result<Vec<Mutation>> {
718        // Store context for use during handler execution
719        self.current_context = context.cloned();
720
721        // Inject context metadata into event value if provided
722        if let Some(ctx) = context {
723            if let Some(obj) = event_value.as_object_mut() {
724                obj.insert("__update_context".to_string(), ctx.to_value());
725            }
726        }
727
728        let mut all_mutations = Vec::new();
729
730        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
731            tracing::debug!(
732                "🔀 Event type '{}' routes to {} entity/entities",
733                event_type,
734                entity_names.len()
735            );
736
737            for entity_name in entity_names {
738                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
739                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
740                        tracing::debug!(
741                            "   ▶️  Executing handler for entity '{}', event '{}'",
742                            entity_name,
743                            event_type
744                        );
745                        tracing::debug!("      Handler has {} opcodes", handler.len());
746
747                        let mutations = self.execute_handler(
748                            handler,
749                            event_value.clone(),
750                            event_type,
751                            entity_bytecode.state_id,
752                            entity_bytecode.computed_fields_evaluator.as_ref(),
753                        )?;
754
755                        tracing::debug!("      Handler produced {} mutation(s)", mutations.len());
756                        all_mutations.extend(mutations);
757                    } else {
758                        tracing::debug!(
759                            "   ⊘ No handler found for entity '{}', event '{}'",
760                            entity_name,
761                            event_type
762                        );
763                    }
764                } else {
765                    tracing::debug!("   ⊘ Entity '{}' not found in bytecode", entity_name);
766                }
767            }
768        } else {
769            tracing::debug!("⊘ No event routing found for event type '{}'", event_type);
770        }
771
772        Ok(all_mutations)
773    }
774
775    /// Process an event without context (backward compatibility)
776    pub fn process_event(
777        &mut self,
778        bytecode: &MultiEntityBytecode,
779        event_value: Value,
780        event_type: &str,
781    ) -> Result<Vec<Mutation>> {
782        self.process_event_with_context(bytecode, event_value, event_type, None)
783    }
784
785    pub fn process_any(
786        &mut self,
787        bytecode: &MultiEntityBytecode,
788        any: prost_types::Any,
789    ) -> Result<Vec<Mutation>> {
790        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
791        self.process_event(bytecode, event_value, &event_type)
792    }
793
794    #[cfg_attr(feature = "otel", instrument(
795        name = "vm.execute_handler", 
796        skip(self, handler, event_value, entity_evaluator),
797        level = "debug",
798        fields(
799            event_type = %event_type,
800            handler_opcodes = handler.len(),
801        )
802    ))]
803    #[allow(clippy::type_complexity)]
804    fn execute_handler(
805        &mut self,
806        handler: &[OpCode],
807        event_value: Value,
808        event_type: &str,
809        override_state_id: u32,
810        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
811    ) -> Result<Vec<Mutation>> {
812        self.reset_registers();
813
814        tracing::trace!(
815            "Executing handler: event_type={}, handler_opcodes={}, event_value={:?}",
816            event_type,
817            handler.len(),
818            event_value
819        );
820
821        let mut pc: usize = 0;
822        let mut output = Vec::new();
823        let mut dirty_fields: HashSet<String> = HashSet::new();
824
825        while pc < handler.len() {
826            tracing::debug!(
827                "Executing opcode {}/{}: {:?}",
828                pc + 1,
829                handler.len(),
830                &handler[pc]
831            );
832            match &handler[pc] {
833                OpCode::LoadEventField {
834                    path,
835                    dest,
836                    default,
837                } => {
838                    let value = self.load_field(&event_value, path, default.as_ref())?;
839                    tracing::trace!(
840                        "LoadEventField: path={:?}, dest={}, value={:?}",
841                        path.segments,
842                        dest,
843                        value
844                    );
845                    // Warn if accounts.* path returns null - this helps debug key resolution issues
846                    if value.is_null() && path.segments.len() >= 2 && path.segments[0] == "accounts"
847                    {
848                        tracing::warn!(
849                            "⚠️ LoadEventField returned NULL for accounts path: {:?} -> dest={}",
850                            path.segments,
851                            dest
852                        );
853                    }
854                    self.registers[*dest] = value;
855                    pc += 1;
856                }
857                OpCode::LoadConstant { value, dest } => {
858                    self.registers[*dest] = value.clone();
859                    pc += 1;
860                }
861                OpCode::CopyRegister { source, dest } => {
862                    let value = self.registers[*source].clone();
863                    tracing::trace!(
864                        "CopyRegister: source={}, dest={}, value={:?}",
865                        source,
866                        dest,
867                        if value.is_null() {
868                            "NULL".to_string()
869                        } else {
870                            format!("{}", value)
871                        }
872                    );
873                    self.registers[*dest] = value;
874                    pc += 1;
875                }
876                OpCode::CopyRegisterIfNull { source, dest } => {
877                    if self.registers[*dest].is_null() {
878                        self.registers[*dest] = self.registers[*source].clone();
879                        tracing::trace!(
880                            "CopyRegisterIfNull: copied from reg {} to reg {} (value={:?})",
881                            source,
882                            dest,
883                            self.registers[*source]
884                        );
885                    } else {
886                        tracing::trace!(
887                            "CopyRegisterIfNull: dest reg {} not null, skipping copy",
888                            dest
889                        );
890                    }
891                    pc += 1;
892                }
893                OpCode::GetEventType { dest } => {
894                    self.registers[*dest] = json!(event_type);
895                    pc += 1;
896                }
897                OpCode::CreateObject { dest } => {
898                    self.registers[*dest] = json!({});
899                    pc += 1;
900                }
901                OpCode::SetField {
902                    object,
903                    path,
904                    value,
905                } => {
906                    let val = self.registers[*value].clone();
907                    tracing::trace!("SetField: path={}, value={:?}", path, val);
908
909                    self.set_field_auto_vivify(*object, path, *value)?;
910                    dirty_fields.insert(path.to_string());
911                    pc += 1;
912                }
913                OpCode::SetFields { object, fields } => {
914                    for (path, value_reg) in fields {
915                        self.set_field_auto_vivify(*object, path, *value_reg)?;
916                        dirty_fields.insert(path.to_string());
917                    }
918                    pc += 1;
919                }
920                OpCode::GetField { object, path, dest } => {
921                    let value = self.get_field(*object, path)?;
922                    self.registers[*dest] = value;
923                    pc += 1;
924                }
925                OpCode::ReadOrInitState {
926                    state_id: _,
927                    key,
928                    default,
929                    dest,
930                } => {
931                    let actual_state_id = override_state_id;
932                    self.states
933                        .entry(actual_state_id)
934                        .or_insert_with(|| StateTable {
935                            data: DashMap::new(),
936                            access_times: DashMap::new(),
937                            lookup_indexes: HashMap::new(),
938                            temporal_indexes: HashMap::new(),
939                            pda_reverse_lookups: HashMap::new(),
940                            pending_updates: DashMap::new(),
941                            version_tracker: DashMap::new(),
942                            config: StateTableConfig::default(),
943                        });
944                    let state = self
945                        .states
946                        .get(&actual_state_id)
947                        .ok_or("State table not found")?;
948                    let key_value = &self.registers[*key];
949                    if key_value.is_null() {
950                        // Only warn for account state updates (not instruction hooks that just register PDAs)
951                        // Instruction types ending in "IxState" that don't have lookup_by are expected to have null keys
952                        if event_type.ends_with("State") && !event_type.ends_with("IxState") {
953                            tracing::warn!(
954                                "ReadOrInitState: key register {} is NULL for account state, event_type={}",
955                                key,
956                                event_type
957                            );
958                        } else {
959                            tracing::debug!(
960                                "ReadOrInitState: key register {} is NULL (expected for PDA registration hook), event_type={}",
961                                key,
962                                event_type
963                            );
964                        }
965                    } else if let Some(ctx) = &self.current_context {
966                        // Check for staleness if we have a valid key and version info
967                        let ordering_value = ctx.write_version.or(ctx.txn_index);
968                        if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
969                            if !state.is_fresh_update(key_value, event_type, slot, version) {
970                                tracing::debug!(
971                                    "Skipping stale update for key={}, event_type={}, slot={}, version={}",
972                                    key_value,
973                                    event_type,
974                                    slot,
975                                    version
976                                );
977                                return Ok(Vec::new());
978                            }
979                        }
980                    }
981                    let value = state
982                        .get_and_touch(key_value)
983                        .unwrap_or_else(|| default.clone());
984
985                    self.registers[*dest] = value;
986                    pc += 1;
987                }
988                OpCode::UpdateState {
989                    state_id: _,
990                    key,
991                    value,
992                } => {
993                    let actual_state_id = override_state_id;
994                    let state = self
995                        .states
996                        .get(&actual_state_id)
997                        .ok_or("State table not found")?;
998                    let key_value = self.registers[*key].clone();
999                    let value_data = self.registers[*value].clone();
1000
1001                    state.insert_with_eviction(key_value, value_data);
1002                    pc += 1;
1003                }
1004                OpCode::AppendToArray {
1005                    object,
1006                    path,
1007                    value,
1008                } => {
1009                    tracing::trace!(
1010                        "AppendToArray: path={}, value={:?}",
1011                        path,
1012                        self.registers[*value]
1013                    );
1014                    let max_len = self
1015                        .states
1016                        .get(&override_state_id)
1017                        .map(|s| s.max_array_length())
1018                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1019                    self.append_to_array(*object, path, *value, max_len)?;
1020                    dirty_fields.insert(path.to_string());
1021                    pc += 1;
1022                }
1023                OpCode::GetCurrentTimestamp { dest } => {
1024                    let timestamp = std::time::SystemTime::now()
1025                        .duration_since(std::time::UNIX_EPOCH)
1026                        .unwrap()
1027                        .as_secs() as i64;
1028                    self.registers[*dest] = json!(timestamp);
1029                    pc += 1;
1030                }
1031                OpCode::CreateEvent { dest, event_value } => {
1032                    tracing::trace!(
1033                        "CreateEvent: event_value_reg={}, event_value={:?}",
1034                        event_value,
1035                        self.registers[*event_value]
1036                    );
1037                    let timestamp = std::time::SystemTime::now()
1038                        .duration_since(std::time::UNIX_EPOCH)
1039                        .unwrap()
1040                        .as_secs() as i64;
1041
1042                    // Filter out __update_context from the event data
1043                    let mut event_data = self.registers[*event_value].clone();
1044                    if let Some(obj) = event_data.as_object_mut() {
1045                        obj.remove("__update_context");
1046                    }
1047
1048                    // Create event with timestamp, data, and optional slot/signature from context
1049                    let mut event = serde_json::Map::new();
1050                    event.insert("timestamp".to_string(), json!(timestamp));
1051                    event.insert("data".to_string(), event_data);
1052
1053                    // Add slot and signature if available from current context
1054                    if let Some(ref ctx) = self.current_context {
1055                        if let Some(slot) = ctx.slot {
1056                            event.insert("slot".to_string(), json!(slot));
1057                        }
1058                        if let Some(ref signature) = ctx.signature {
1059                            event.insert("signature".to_string(), json!(signature));
1060                        }
1061                    }
1062
1063                    self.registers[*dest] = Value::Object(event);
1064                    pc += 1;
1065                }
1066                OpCode::CreateCapture {
1067                    dest,
1068                    capture_value,
1069                } => {
1070                    let timestamp = std::time::SystemTime::now()
1071                        .duration_since(std::time::UNIX_EPOCH)
1072                        .unwrap()
1073                        .as_secs() as i64;
1074
1075                    // Get the capture data (already filtered by load_field)
1076                    let capture_data = self.registers[*capture_value].clone();
1077
1078                    // Extract account_address from the original event if available
1079                    let account_address = event_value
1080                        .get("__account_address")
1081                        .and_then(|v| v.as_str())
1082                        .unwrap_or("")
1083                        .to_string();
1084
1085                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1086                    let mut capture = serde_json::Map::new();
1087                    capture.insert("timestamp".to_string(), json!(timestamp));
1088                    capture.insert("account_address".to_string(), json!(account_address));
1089                    capture.insert("data".to_string(), capture_data);
1090
1091                    // Add slot and signature if available from current context
1092                    if let Some(ref ctx) = self.current_context {
1093                        if let Some(slot) = ctx.slot {
1094                            capture.insert("slot".to_string(), json!(slot));
1095                        }
1096                        if let Some(ref signature) = ctx.signature {
1097                            capture.insert("signature".to_string(), json!(signature));
1098                        }
1099                    }
1100
1101                    self.registers[*dest] = Value::Object(capture);
1102                    pc += 1;
1103                }
1104                OpCode::Transform {
1105                    source,
1106                    dest,
1107                    transformation,
1108                } => {
1109                    if source == dest {
1110                        let result = self.transform_in_place(*source, transformation);
1111                        if let Err(ref e) = result {
1112                            tracing::error!(
1113                                "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
1114                                transformation, pc, e, source, &self.registers[*source]
1115                            );
1116                        }
1117                        result?;
1118                    } else {
1119                        let source_value = &self.registers[*source];
1120                        let result = self.apply_transformation(source_value, transformation);
1121                        if let Err(ref e) = result {
1122                            tracing::error!(
1123                                "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
1124                                transformation, pc, e, source, source_value
1125                            );
1126                        }
1127                        let value = result?;
1128                        self.registers[*dest] = value;
1129                    }
1130                    pc += 1;
1131                }
1132                OpCode::EmitMutation {
1133                    entity_name,
1134                    key,
1135                    state,
1136                } => {
1137                    let primary_key = self.registers[*key].clone();
1138
1139                    if primary_key.is_null() || dirty_fields.is_empty() {
1140                        // Skip mutation if no primary key or no dirty fields
1141                        tracing::warn!(
1142                            "   ⊘ [VM] Skipping mutation for entity '{}': primary_key={}, dirty_fields.len()={}",
1143                            entity_name,
1144                            if primary_key.is_null() { "NULL" } else { "present" },
1145                            dirty_fields.len()
1146                        );
1147                        if dirty_fields.is_empty() {
1148                            tracing::warn!("      Reason: No fields were modified by this handler");
1149                        }
1150                        if primary_key.is_null() {
1151                            tracing::warn!("      Reason: Primary key could not be resolved (check __resolved_primary_key or key_resolution)");
1152                            // Debug: dump register 15, 17, 19, 20 to understand key loading state
1153                            tracing::warn!("      Debug: reg[15]={:?}, reg[17]={:?}, reg[19]={:?}, reg[20]={:?}",
1154                                self.registers.get(15).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
1155                                self.registers.get(17).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
1156                                self.registers.get(19).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
1157                                self.registers.get(20).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) })
1158                            );
1159                        }
1160                    } else {
1161                        let patch = self.extract_partial_state(*state, &dirty_fields)?;
1162                        tracing::debug!(
1163                            "   Patch structure: {}",
1164                            serde_json::to_string_pretty(&patch).unwrap_or_default()
1165                        );
1166                        let mutation = Mutation {
1167                            export: entity_name.clone(),
1168                            key: primary_key,
1169                            patch,
1170                        };
1171                        output.push(mutation);
1172                    }
1173                    pc += 1;
1174                }
1175                OpCode::SetFieldIfNull {
1176                    object,
1177                    path,
1178                    value,
1179                } => {
1180                    let val = self.registers[*value].clone();
1181                    let was_set = self.set_field_if_null(*object, path, *value)?;
1182                    tracing::trace!(
1183                        "SetFieldIfNull: path={}, value={:?}, was_set={}",
1184                        path,
1185                        val,
1186                        was_set
1187                    );
1188                    if was_set {
1189                        dirty_fields.insert(path.to_string());
1190                    }
1191                    pc += 1;
1192                }
1193                OpCode::SetFieldMax {
1194                    object,
1195                    path,
1196                    value,
1197                } => {
1198                    let was_updated = self.set_field_max(*object, path, *value)?;
1199                    if was_updated {
1200                        dirty_fields.insert(path.to_string());
1201                    }
1202                    pc += 1;
1203                }
1204                OpCode::UpdateTemporalIndex {
1205                    state_id: _,
1206                    index_name,
1207                    lookup_value,
1208                    primary_key,
1209                    timestamp,
1210                } => {
1211                    let actual_state_id = override_state_id;
1212                    let state = self
1213                        .states
1214                        .get_mut(&actual_state_id)
1215                        .ok_or("State table not found")?;
1216                    let index = state
1217                        .temporal_indexes
1218                        .entry(index_name.clone())
1219                        .or_insert_with(TemporalIndex::new);
1220
1221                    let lookup_val = self.registers[*lookup_value].clone();
1222                    let pk_val = self.registers[*primary_key].clone();
1223                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1224                        val
1225                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
1226                        val as i64
1227                    } else {
1228                        return Err(format!(
1229                            "Timestamp must be a number (i64 or u64), got: {:?}",
1230                            self.registers[*timestamp]
1231                        )
1232                        .into());
1233                    };
1234
1235                    index.insert(lookup_val, pk_val, ts_val);
1236                    pc += 1;
1237                }
1238                OpCode::LookupTemporalIndex {
1239                    state_id: _,
1240                    index_name,
1241                    lookup_value,
1242                    timestamp,
1243                    dest,
1244                } => {
1245                    let actual_state_id = override_state_id;
1246                    let state = self
1247                        .states
1248                        .get(&actual_state_id)
1249                        .ok_or("State table not found")?;
1250                    let lookup_val = &self.registers[*lookup_value];
1251
1252                    let result = if self.registers[*timestamp].is_null() {
1253                        if let Some(index) = state.temporal_indexes.get(index_name) {
1254                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1255                        } else {
1256                            Value::Null
1257                        }
1258                    } else {
1259                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1260                            val
1261                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
1262                            val as i64
1263                        } else {
1264                            return Err(format!(
1265                                "Timestamp must be a number (i64 or u64), got: {:?}",
1266                                self.registers[*timestamp]
1267                            )
1268                            .into());
1269                        };
1270
1271                        if let Some(index) = state.temporal_indexes.get(index_name) {
1272                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1273                        } else {
1274                            Value::Null
1275                        }
1276                    };
1277
1278                    self.registers[*dest] = result;
1279                    pc += 1;
1280                }
1281                OpCode::UpdateLookupIndex {
1282                    state_id: _,
1283                    index_name,
1284                    lookup_value,
1285                    primary_key,
1286                } => {
1287                    let actual_state_id = override_state_id;
1288                    let state = self
1289                        .states
1290                        .get_mut(&actual_state_id)
1291                        .ok_or("State table not found")?;
1292                    let index = state
1293                        .lookup_indexes
1294                        .entry(index_name.clone())
1295                        .or_insert_with(LookupIndex::new);
1296
1297                    let lookup_val = self.registers[*lookup_value].clone();
1298                    let pk_val = self.registers[*primary_key].clone();
1299
1300                    tracing::debug!(
1301                        "📝 UpdateLookupIndex: {} -> {} (index: {})",
1302                        lookup_val,
1303                        pk_val,
1304                        index_name
1305                    );
1306
1307                    index.insert(lookup_val, pk_val);
1308                    pc += 1;
1309                }
1310                OpCode::LookupIndex {
1311                    state_id: _,
1312                    index_name,
1313                    lookup_value,
1314                    dest,
1315                } => {
1316                    let actual_state_id = override_state_id;
1317                    let lookup_val = self.registers[*lookup_value].clone();
1318
1319                    let result = {
1320                        let state = self
1321                            .states
1322                            .get(&actual_state_id)
1323                            .ok_or("State table not found")?;
1324
1325                        if let Some(index) = state.lookup_indexes.get(index_name) {
1326                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1327                            tracing::debug!(
1328                                "🔍 LookupIndex: {} -> {} (index: {}, entries: {})",
1329                                lookup_val,
1330                                found,
1331                                index_name,
1332                                index.len()
1333                            );
1334                            found
1335                        } else {
1336                            Value::Null
1337                        }
1338                    };
1339
1340                    // Fallback: Check PDA reverse lookup table if regular index returned NULL or doesn't exist
1341                    // This handles cases where Lookup key resolution uses PDA addresses
1342                    let final_result = if result.is_null() {
1343                        tracing::debug!(
1344                            "🔍 LookupIndex failed for {}, attempting PDA reverse lookup fallback",
1345                            lookup_val
1346                        );
1347                        if let Some(pda_str) = lookup_val.as_str() {
1348                            let state = self
1349                                .states
1350                                .get_mut(&actual_state_id)
1351                                .ok_or("State table not found")?;
1352
1353                            if let Some(pda_lookup) =
1354                                state.pda_reverse_lookups.get_mut("default_pda_lookup")
1355                            {
1356                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1357                                    tracing::info!(
1358                                        "🔍 LookupIndex (PDA fallback): {} -> {} (via default_pda_lookup)",
1359                                        &pda_str[..pda_str.len().min(8)], resolved
1360                                    );
1361                                    Value::String(resolved)
1362                                } else {
1363                                    tracing::debug!(
1364                                        "🔍 LookupIndex (PDA fallback): {} -> NOT FOUND in PDA reverse lookup",
1365                                        &pda_str[..pda_str.len().min(8)]
1366                                    );
1367                                    Value::Null
1368                                }
1369                            } else {
1370                                tracing::debug!(
1371                                    "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist, no PDA lookup table)",
1372                                    lookup_val, index_name
1373                                );
1374                                Value::Null
1375                            }
1376                        } else {
1377                            tracing::debug!(
1378                                "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist)",
1379                                lookup_val,
1380                                index_name
1381                            );
1382                            Value::Null
1383                        }
1384                    } else {
1385                        result
1386                    };
1387
1388                    self.registers[*dest] = final_result;
1389                    pc += 1;
1390                }
1391                OpCode::SetFieldSum {
1392                    object,
1393                    path,
1394                    value,
1395                } => {
1396                    let was_updated = self.set_field_sum(*object, path, *value)?;
1397                    if was_updated {
1398                        dirty_fields.insert(path.to_string());
1399                    }
1400                    pc += 1;
1401                }
1402                OpCode::SetFieldIncrement { object, path } => {
1403                    let was_updated = self.set_field_increment(*object, path)?;
1404                    if was_updated {
1405                        dirty_fields.insert(path.to_string());
1406                    }
1407                    pc += 1;
1408                }
1409                OpCode::SetFieldMin {
1410                    object,
1411                    path,
1412                    value,
1413                } => {
1414                    let was_updated = self.set_field_min(*object, path, *value)?;
1415                    if was_updated {
1416                        dirty_fields.insert(path.to_string());
1417                    }
1418                    pc += 1;
1419                }
1420                OpCode::AddToUniqueSet {
1421                    state_id: _,
1422                    set_name,
1423                    value,
1424                    count_object,
1425                    count_path,
1426                } => {
1427                    let value_to_add = self.registers[*value].clone();
1428
1429                    // Store the unique set within the entity object, not in the state table
1430                    // This ensures each entity instance has its own unique set
1431                    let set_field_path = format!("__unique_set:{}", set_name);
1432
1433                    // Get or create the unique set from the entity object
1434                    let mut set: HashSet<Value> =
1435                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1436                            if !existing.is_null() {
1437                                serde_json::from_value(existing).unwrap_or_default()
1438                            } else {
1439                                HashSet::new()
1440                            }
1441                        } else {
1442                            HashSet::new()
1443                        };
1444
1445                    // Add value to set
1446                    let was_new = set.insert(value_to_add);
1447
1448                    // Store updated set back in the entity object
1449                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1450                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1451                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1452
1453                    // Update the count field in the object
1454                    if was_new {
1455                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1456                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1457                        dirty_fields.insert(count_path.to_string());
1458                    }
1459
1460                    pc += 1;
1461                }
1462                OpCode::ConditionalSetField {
1463                    object,
1464                    path,
1465                    value,
1466                    condition_field,
1467                    condition_op,
1468                    condition_value,
1469                } => {
1470                    // Load the field value from the event
1471                    let field_value = self.load_field(&event_value, condition_field, None)?;
1472
1473                    // Evaluate the condition
1474                    let condition_met =
1475                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1476
1477                    if condition_met {
1478                        let val = self.registers[*value].clone();
1479                        tracing::trace!(
1480                            "ConditionalSetField: condition met, setting {}={:?}",
1481                            path,
1482                            val
1483                        );
1484                        self.set_field_auto_vivify(*object, path, *value)?;
1485                        dirty_fields.insert(path.to_string());
1486                    } else {
1487                        tracing::trace!(
1488                            "ConditionalSetField: condition not met, skipping {}",
1489                            path
1490                        );
1491                    }
1492                    pc += 1;
1493                }
1494                OpCode::ConditionalIncrement {
1495                    object,
1496                    path,
1497                    condition_field,
1498                    condition_op,
1499                    condition_value,
1500                } => {
1501                    // Load the field value from the event
1502                    let field_value = self.load_field(&event_value, condition_field, None)?;
1503
1504                    // Evaluate the condition
1505                    let condition_met =
1506                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1507
1508                    if condition_met {
1509                        tracing::trace!(
1510                            "ConditionalIncrement: condition met, incrementing {}",
1511                            path
1512                        );
1513                        let was_updated = self.set_field_increment(*object, path)?;
1514                        if was_updated {
1515                            dirty_fields.insert(path.to_string());
1516                        }
1517                    } else {
1518                        tracing::trace!(
1519                            "ConditionalIncrement: condition not met, skipping {}",
1520                            path
1521                        );
1522                    }
1523                    pc += 1;
1524                }
1525                OpCode::EvaluateComputedFields {
1526                    state,
1527                    computed_paths,
1528                } => {
1529                    // Call the registered evaluator if one exists
1530                    if let Some(evaluator) = entity_evaluator {
1531                        let state_value = &mut self.registers[*state];
1532                        match evaluator(state_value) {
1533                            Ok(()) => {
1534                                // Mark computed fields as dirty so they're included in mutations
1535                                for path in computed_paths {
1536                                    dirty_fields.insert(path.clone());
1537                                }
1538                            }
1539                            Err(_e) => {
1540                                // Silently ignore evaluation errors
1541                            }
1542                        }
1543                    }
1544                    pc += 1;
1545                }
1546                OpCode::UpdatePdaReverseLookup {
1547                    state_id: _,
1548                    lookup_name,
1549                    pda_address,
1550                    primary_key,
1551                } => {
1552                    let actual_state_id = override_state_id;
1553                    let state = self
1554                        .states
1555                        .get_mut(&actual_state_id)
1556                        .ok_or("State table not found")?;
1557
1558                    let pda_val = self.registers[*pda_address].clone();
1559                    let pk_val = self.registers[*primary_key].clone();
1560
1561                    // Only update if both values are non-null strings
1562                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1563                        // Get or create the PDA reverse lookup table
1564                        let pda_lookup = state
1565                            .pda_reverse_lookups
1566                            .entry(lookup_name.clone())
1567                            .or_insert_with(|| PdaReverseLookup::new(10000));
1568
1569                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1570
1571                        tracing::debug!(
1572                            "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {})",
1573                            pda_str,
1574                            pk_str,
1575                            lookup_name
1576                        );
1577                    } else if !pk_val.is_null() {
1578                        // Primary key might be a u64, convert to string
1579                        if let Some(pk_num) = pk_val.as_u64() {
1580                            if let Some(pda_str) = pda_val.as_str() {
1581                                let pda_lookup = state
1582                                    .pda_reverse_lookups
1583                                    .entry(lookup_name.clone())
1584                                    .or_insert_with(|| PdaReverseLookup::new(10000));
1585
1586                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1587
1588                                tracing::debug!(
1589                                    "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {}, pk was u64)",
1590                                    pda_str,
1591                                    pk_num,
1592                                    lookup_name
1593                                );
1594                            }
1595                        }
1596                    }
1597
1598                    pc += 1;
1599                }
1600            }
1601
1602            self.instructions_executed += 1;
1603        }
1604
1605        tracing::info!(
1606            "🏁 Handler completed: produced {} mutation(s)",
1607            output.len()
1608        );
1609        Ok(output)
1610    }
1611
1612    fn load_field(
1613        &self,
1614        event_value: &Value,
1615        path: &FieldPath,
1616        default: Option<&Value>,
1617    ) -> Result<Value> {
1618        // Warn if path contains empty segments - this indicates a bug in AST generation
1619        if path.segments.iter().any(|s| s.is_empty()) {
1620            tracing::warn!(
1621                "load_field: path contains empty segment! path={:?}",
1622                path.segments
1623            );
1624        }
1625
1626        if path.segments.is_empty() {
1627            // For WholeSource (empty path), filter out internal metadata fields
1628            // This prevents __account_address, __resolved_primary_key, __update_context
1629            // from appearing in captured account data
1630            if let Some(obj) = event_value.as_object() {
1631                let filtered: serde_json::Map<String, Value> = obj
1632                    .iter()
1633                    .filter(|(k, _)| !k.starts_with("__"))
1634                    .map(|(k, v)| (k.clone(), v.clone()))
1635                    .collect();
1636                return Ok(Value::Object(filtered));
1637            }
1638            return Ok(event_value.clone());
1639        }
1640
1641        let mut current = event_value;
1642        for (i, segment) in path.segments.iter().enumerate() {
1643            current = match current.get(segment) {
1644                Some(v) => v,
1645                None => {
1646                    // Log when we can't find a path segment - this helps debug key loading issues
1647                    if path.segments.len() == 2 && path.segments[0] == "accounts" {
1648                        tracing::warn!(
1649                            "load_field: could not find segment '{}' at index {} in path {:?}",
1650                            segment,
1651                            i,
1652                            path.segments,
1653                        );
1654                        tracing::warn!(
1655                            "  event has top-level keys: {:?}",
1656                            event_value
1657                                .as_object()
1658                                .map(|o| o.keys().collect::<Vec<_>>())
1659                        );
1660                        // Show what's in the accounts object if it exists
1661                        if let Some(accounts) = event_value.get("accounts") {
1662                            tracing::warn!(
1663                                "  'accounts' exists with keys: {:?}",
1664                                accounts.as_object().map(|o| o.keys().collect::<Vec<_>>())
1665                            );
1666                        } else {
1667                            tracing::warn!("  'accounts' key does NOT exist in event_value");
1668                        }
1669                    }
1670                    return Ok(default.cloned().unwrap_or(Value::Null));
1671                }
1672            };
1673        }
1674
1675        Ok(current.clone())
1676    }
1677
1678    fn set_field_auto_vivify(
1679        &mut self,
1680        object_reg: Register,
1681        path: &str,
1682        value_reg: Register,
1683    ) -> Result<()> {
1684        let compiled = self.get_compiled_path(path);
1685        let segments = compiled.segments();
1686        let value = self.registers[value_reg].clone();
1687
1688        if !self.registers[object_reg].is_object() {
1689            self.registers[object_reg] = json!({});
1690        }
1691
1692        let obj = self.registers[object_reg]
1693            .as_object_mut()
1694            .ok_or("Not an object")?;
1695
1696        let mut current = obj;
1697        for (i, segment) in segments.iter().enumerate() {
1698            if i == segments.len() - 1 {
1699                current.insert(segment.to_string(), value.clone());
1700            } else {
1701                current
1702                    .entry(segment.to_string())
1703                    .or_insert_with(|| json!({}));
1704                current = current
1705                    .get_mut(segment)
1706                    .and_then(|v| v.as_object_mut())
1707                    .ok_or("Path collision: expected object")?;
1708            }
1709        }
1710
1711        Ok(())
1712    }
1713
1714    fn set_field_if_null(
1715        &mut self,
1716        object_reg: Register,
1717        path: &str,
1718        value_reg: Register,
1719    ) -> Result<bool> {
1720        let compiled = self.get_compiled_path(path);
1721        let segments = compiled.segments();
1722        let value = self.registers[value_reg].clone();
1723
1724        if !self.registers[object_reg].is_object() {
1725            self.registers[object_reg] = json!({});
1726        }
1727
1728        let obj = self.registers[object_reg]
1729            .as_object_mut()
1730            .ok_or("Not an object")?;
1731
1732        let mut current = obj;
1733        let mut was_set = false;
1734        for (i, segment) in segments.iter().enumerate() {
1735            if i == segments.len() - 1 {
1736                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1737                    current.insert(segment.to_string(), value.clone());
1738                    was_set = true;
1739                }
1740            } else {
1741                current
1742                    .entry(segment.to_string())
1743                    .or_insert_with(|| json!({}));
1744                current = current
1745                    .get_mut(segment)
1746                    .and_then(|v| v.as_object_mut())
1747                    .ok_or("Path collision: expected object")?;
1748            }
1749        }
1750
1751        Ok(was_set)
1752    }
1753
1754    fn set_field_max(
1755        &mut self,
1756        object_reg: Register,
1757        path: &str,
1758        value_reg: Register,
1759    ) -> Result<bool> {
1760        let compiled = self.get_compiled_path(path);
1761        let segments = compiled.segments();
1762        let new_value = self.registers[value_reg].clone();
1763
1764        if !self.registers[object_reg].is_object() {
1765            self.registers[object_reg] = json!({});
1766        }
1767
1768        let obj = self.registers[object_reg]
1769            .as_object_mut()
1770            .ok_or("Not an object")?;
1771
1772        let mut current = obj;
1773        let mut was_updated = false;
1774        for (i, segment) in segments.iter().enumerate() {
1775            if i == segments.len() - 1 {
1776                let should_update = if let Some(current_value) = current.get(segment) {
1777                    if current_value.is_null() {
1778                        true
1779                    } else {
1780                        match (current_value.as_i64(), new_value.as_i64()) {
1781                            (Some(current_val), Some(new_val)) => new_val > current_val,
1782                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1783                                new_value.as_u64().unwrap() as i64 > current_val
1784                            }
1785                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1786                                new_val > current_value.as_u64().unwrap() as i64
1787                            }
1788                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1789                                (Some(current_val), Some(new_val)) => new_val > current_val,
1790                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1791                                    (Some(current_val), Some(new_val)) => new_val > current_val,
1792                                    _ => false,
1793                                },
1794                            },
1795                            _ => false,
1796                        }
1797                    }
1798                } else {
1799                    true
1800                };
1801
1802                if should_update {
1803                    current.insert(segment.to_string(), new_value.clone());
1804                    was_updated = true;
1805                }
1806            } else {
1807                current
1808                    .entry(segment.to_string())
1809                    .or_insert_with(|| json!({}));
1810                current = current
1811                    .get_mut(segment)
1812                    .and_then(|v| v.as_object_mut())
1813                    .ok_or("Path collision: expected object")?;
1814            }
1815        }
1816
1817        Ok(was_updated)
1818    }
1819
1820    fn set_field_sum(
1821        &mut self,
1822        object_reg: Register,
1823        path: &str,
1824        value_reg: Register,
1825    ) -> Result<bool> {
1826        let compiled = self.get_compiled_path(path);
1827        let segments = compiled.segments();
1828        let new_value = self.registers[value_reg].clone();
1829
1830        if !self.registers[object_reg].is_object() {
1831            self.registers[object_reg] = json!({});
1832        }
1833
1834        let obj = self.registers[object_reg]
1835            .as_object_mut()
1836            .ok_or("Not an object")?;
1837
1838        let mut current = obj;
1839        for (i, segment) in segments.iter().enumerate() {
1840            if i == segments.len() - 1 {
1841                // Get current value (default to 0 if null/missing)
1842                let current_val = current
1843                    .get(segment)
1844                    .and_then(|v| {
1845                        if v.is_null() {
1846                            None
1847                        } else {
1848                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1849                        }
1850                    })
1851                    .unwrap_or(0);
1852
1853                // Add new value
1854                let new_val_num = new_value
1855                    .as_i64()
1856                    .or_else(|| new_value.as_u64().map(|n| n as i64))
1857                    .ok_or("Sum requires numeric value")?;
1858
1859                let sum = current_val + new_val_num;
1860                current.insert(segment.to_string(), json!(sum));
1861                return Ok(true);
1862            } else {
1863                current
1864                    .entry(segment.to_string())
1865                    .or_insert_with(|| json!({}));
1866                current = current
1867                    .get_mut(segment)
1868                    .and_then(|v| v.as_object_mut())
1869                    .ok_or("Path collision: expected object")?;
1870            }
1871        }
1872
1873        Ok(false)
1874    }
1875
1876    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1877        let compiled = self.get_compiled_path(path);
1878        let segments = compiled.segments();
1879
1880        if !self.registers[object_reg].is_object() {
1881            self.registers[object_reg] = json!({});
1882        }
1883
1884        let obj = self.registers[object_reg]
1885            .as_object_mut()
1886            .ok_or("Not an object")?;
1887
1888        let mut current = obj;
1889        for (i, segment) in segments.iter().enumerate() {
1890            if i == segments.len() - 1 {
1891                // Get current value (default to 0 if null/missing)
1892                let current_val = current
1893                    .get(segment)
1894                    .and_then(|v| {
1895                        if v.is_null() {
1896                            None
1897                        } else {
1898                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1899                        }
1900                    })
1901                    .unwrap_or(0);
1902
1903                let incremented = current_val + 1;
1904                current.insert(segment.to_string(), json!(incremented));
1905                return Ok(true);
1906            } else {
1907                current
1908                    .entry(segment.to_string())
1909                    .or_insert_with(|| json!({}));
1910                current = current
1911                    .get_mut(segment)
1912                    .and_then(|v| v.as_object_mut())
1913                    .ok_or("Path collision: expected object")?;
1914            }
1915        }
1916
1917        Ok(false)
1918    }
1919
1920    fn set_field_min(
1921        &mut self,
1922        object_reg: Register,
1923        path: &str,
1924        value_reg: Register,
1925    ) -> Result<bool> {
1926        let compiled = self.get_compiled_path(path);
1927        let segments = compiled.segments();
1928        let new_value = self.registers[value_reg].clone();
1929
1930        if !self.registers[object_reg].is_object() {
1931            self.registers[object_reg] = json!({});
1932        }
1933
1934        let obj = self.registers[object_reg]
1935            .as_object_mut()
1936            .ok_or("Not an object")?;
1937
1938        let mut current = obj;
1939        let mut was_updated = false;
1940        for (i, segment) in segments.iter().enumerate() {
1941            if i == segments.len() - 1 {
1942                let should_update = if let Some(current_value) = current.get(segment) {
1943                    if current_value.is_null() {
1944                        true
1945                    } else {
1946                        match (current_value.as_i64(), new_value.as_i64()) {
1947                            (Some(current_val), Some(new_val)) => new_val < current_val,
1948                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1949                                (new_value.as_u64().unwrap() as i64) < current_val
1950                            }
1951                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1952                                new_val < current_value.as_u64().unwrap() as i64
1953                            }
1954                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1955                                (Some(current_val), Some(new_val)) => new_val < current_val,
1956                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1957                                    (Some(current_val), Some(new_val)) => new_val < current_val,
1958                                    _ => false,
1959                                },
1960                            },
1961                            _ => false,
1962                        }
1963                    }
1964                } else {
1965                    true
1966                };
1967
1968                if should_update {
1969                    current.insert(segment.to_string(), new_value.clone());
1970                    was_updated = true;
1971                }
1972            } else {
1973                current
1974                    .entry(segment.to_string())
1975                    .or_insert_with(|| json!({}));
1976                current = current
1977                    .get_mut(segment)
1978                    .and_then(|v| v.as_object_mut())
1979                    .ok_or("Path collision: expected object")?;
1980            }
1981        }
1982
1983        Ok(was_updated)
1984    }
1985
1986    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
1987        let compiled = self.get_compiled_path(path);
1988        let segments = compiled.segments();
1989        let mut current = &self.registers[object_reg];
1990
1991        for segment in segments {
1992            current = current
1993                .get(segment)
1994                .ok_or_else(|| format!("Field not found: {}", segment))?;
1995        }
1996
1997        Ok(current.clone())
1998    }
1999
2000    fn append_to_array(
2001        &mut self,
2002        object_reg: Register,
2003        path: &str,
2004        value_reg: Register,
2005        max_length: usize,
2006    ) -> Result<()> {
2007        let compiled = self.get_compiled_path(path);
2008        let segments = compiled.segments();
2009        let value = self.registers[value_reg].clone();
2010
2011        if !self.registers[object_reg].is_object() {
2012            self.registers[object_reg] = json!({});
2013        }
2014
2015        let obj = self.registers[object_reg]
2016            .as_object_mut()
2017            .ok_or("Not an object")?;
2018
2019        let mut current = obj;
2020        for (i, segment) in segments.iter().enumerate() {
2021            if i == segments.len() - 1 {
2022                current
2023                    .entry(segment.to_string())
2024                    .or_insert_with(|| json!([]));
2025                let arr = current
2026                    .get_mut(segment)
2027                    .and_then(|v| v.as_array_mut())
2028                    .ok_or("Path is not an array")?;
2029                arr.push(value.clone());
2030
2031                if arr.len() > max_length {
2032                    let excess = arr.len() - max_length;
2033                    arr.drain(0..excess);
2034                }
2035            } else {
2036                current
2037                    .entry(segment.to_string())
2038                    .or_insert_with(|| json!({}));
2039                current = current
2040                    .get_mut(segment)
2041                    .and_then(|v| v.as_object_mut())
2042                    .ok_or("Path collision: expected object")?;
2043            }
2044        }
2045
2046        Ok(())
2047    }
2048
2049    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2050        let value = &self.registers[reg];
2051        let transformed = self.apply_transformation(value, transformation)?;
2052        self.registers[reg] = transformed;
2053        Ok(())
2054    }
2055
2056    fn apply_transformation(
2057        &self,
2058        value: &Value,
2059        transformation: &Transformation,
2060    ) -> Result<Value> {
2061        match transformation {
2062            Transformation::HexEncode => {
2063                if let Some(arr) = value.as_array() {
2064                    let bytes: Vec<u8> = arr
2065                        .iter()
2066                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2067                        .collect();
2068                    let hex = hex::encode(&bytes);
2069                    Ok(json!(hex))
2070                } else {
2071                    Err("HexEncode requires an array of numbers".into())
2072                }
2073            }
2074            Transformation::HexDecode => {
2075                if let Some(s) = value.as_str() {
2076                    let s = s.strip_prefix("0x").unwrap_or(s);
2077                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2078                    Ok(json!(bytes))
2079                } else {
2080                    Err("HexDecode requires a string".into())
2081                }
2082            }
2083            Transformation::Base58Encode => {
2084                if let Some(arr) = value.as_array() {
2085                    let bytes: Vec<u8> = arr
2086                        .iter()
2087                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2088                        .collect();
2089                    let encoded = bs58::encode(&bytes).into_string();
2090                    Ok(json!(encoded))
2091                } else if value.is_string() {
2092                    // Value is already a string (likely already base58 encoded), return as-is
2093                    tracing::debug!(
2094                        "Base58Encode: value is already a string, passing through: {:?}",
2095                        value
2096                    );
2097                    Ok(value.clone())
2098                } else {
2099                    tracing::error!(
2100                        "Base58Encode failed: value type is {:?}, value: {:?}",
2101                        if value.is_null() {
2102                            "null"
2103                        } else if value.is_boolean() {
2104                            "boolean"
2105                        } else if value.is_number() {
2106                            "number"
2107                        } else if value.is_object() {
2108                            "object"
2109                        } else {
2110                            "unknown"
2111                        },
2112                        value
2113                    );
2114                    Err("Base58Encode requires an array of numbers".into())
2115                }
2116            }
2117            Transformation::Base58Decode => {
2118                if let Some(s) = value.as_str() {
2119                    let bytes = bs58::decode(s)
2120                        .into_vec()
2121                        .map_err(|e| format!("Base58 decode error: {}", e))?;
2122                    Ok(json!(bytes))
2123                } else {
2124                    Err("Base58Decode requires a string".into())
2125                }
2126            }
2127            Transformation::ToString => Ok(json!(value.to_string())),
2128            Transformation::ToNumber => {
2129                if let Some(s) = value.as_str() {
2130                    let n = s
2131                        .parse::<i64>()
2132                        .map_err(|e| format!("Parse error: {}", e))?;
2133                    Ok(json!(n))
2134                } else {
2135                    Ok(value.clone())
2136                }
2137            }
2138        }
2139    }
2140
2141    fn evaluate_comparison(
2142        &self,
2143        field_value: &Value,
2144        op: &ComparisonOp,
2145        condition_value: &Value,
2146    ) -> Result<bool> {
2147        use ComparisonOp::*;
2148
2149        match op {
2150            Equal => Ok(field_value == condition_value),
2151            NotEqual => Ok(field_value != condition_value),
2152            GreaterThan => {
2153                // Try to compare as numbers
2154                match (field_value.as_i64(), condition_value.as_i64()) {
2155                    (Some(a), Some(b)) => Ok(a > b),
2156                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
2157                        (Some(a), Some(b)) => Ok(a > b),
2158                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
2159                            (Some(a), Some(b)) => Ok(a > b),
2160                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2161                        },
2162                    },
2163                }
2164            }
2165            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2166                (Some(a), Some(b)) => Ok(a >= b),
2167                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2168                    (Some(a), Some(b)) => Ok(a >= b),
2169                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2170                        (Some(a), Some(b)) => Ok(a >= b),
2171                        _ => {
2172                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2173                        }
2174                    },
2175                },
2176            },
2177            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2178                (Some(a), Some(b)) => Ok(a < b),
2179                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2180                    (Some(a), Some(b)) => Ok(a < b),
2181                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2182                        (Some(a), Some(b)) => Ok(a < b),
2183                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
2184                    },
2185                },
2186            },
2187            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2188                (Some(a), Some(b)) => Ok(a <= b),
2189                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2190                    (Some(a), Some(b)) => Ok(a <= b),
2191                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2192                        (Some(a), Some(b)) => Ok(a <= b),
2193                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2194                    },
2195                },
2196            },
2197        }
2198    }
2199
2200    /// Update a PDA reverse lookup and return pending updates for reprocessing
2201    ///
2202    /// After registering the PDA reverse lookup, this returns any pending account updates
2203    /// that were queued for this PDA. The caller should reprocess these through the VM
2204    /// by calling process_event() for each update.
2205    ///
2206    /// # Example
2207    /// ```ignore
2208    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2209    /// for update in pending {
2210    ///     vm.process_event(&bytecode, update.account_data, &update.account_type)?;
2211    /// }
2212    /// ```
2213    #[cfg_attr(feature = "otel", instrument(
2214        name = "vm.update_pda_lookup",
2215        skip(self),
2216        fields(
2217            pda = %pda_address,
2218            seed = %seed_value,
2219        )
2220    ))]
2221    pub fn update_pda_reverse_lookup(
2222        &mut self,
2223        state_id: u32,
2224        lookup_name: &str,
2225        pda_address: String,
2226        seed_value: String,
2227    ) -> Result<Vec<PendingAccountUpdate>> {
2228        let state = self
2229            .states
2230            .get_mut(&state_id)
2231            .ok_or("State table not found")?;
2232
2233        let lookup = state
2234            .pda_reverse_lookups
2235            .entry(lookup_name.to_string())
2236            .or_insert_with(|| PdaReverseLookup::new(10000));
2237
2238        tracing::info!(
2239            "📝 Registering PDA reverse lookup: {} -> {} (lookup: {})",
2240            pda_address,
2241            seed_value,
2242            lookup_name
2243        );
2244
2245        // Insert and check if an entry was evicted
2246        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2247
2248        // Clean up pending updates for the evicted PDA
2249        if let Some(ref evicted) = evicted_pda {
2250            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2251                let count = evicted_updates.len();
2252                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2253                tracing::info!(
2254                    "Cleaned up {} pending updates for evicted PDA {} from LRU cache",
2255                    count,
2256                    evicted
2257                );
2258            }
2259        }
2260
2261        // Flush and return pending updates for this PDA
2262        self.flush_pending_updates(state_id, &pda_address)
2263    }
2264
2265    /// Clean up expired pending updates that are older than the TTL
2266    ///
2267    /// Returns the number of updates that were removed.
2268    /// This should be called periodically to prevent memory leaks from orphaned updates.
2269    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2270        let state = match self.states.get_mut(&state_id) {
2271            Some(s) => s,
2272            None => return 0,
2273        };
2274
2275        let now = std::time::SystemTime::now()
2276            .duration_since(std::time::UNIX_EPOCH)
2277            .unwrap()
2278            .as_secs() as i64;
2279
2280        let mut removed_count = 0;
2281
2282        // Iterate through all pending updates and remove expired ones
2283        state.pending_updates.retain(|_pda_address, updates| {
2284            let original_len = updates.len();
2285
2286            updates.retain(|update| {
2287                let age = now - update.queued_at;
2288                age <= PENDING_UPDATE_TTL_SECONDS
2289            });
2290
2291            removed_count += original_len - updates.len();
2292
2293            // Remove the entry entirely if no updates remain
2294            !updates.is_empty()
2295        });
2296
2297        // Update the global counter
2298        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2299
2300        if removed_count > 0 {
2301            tracing::info!(
2302                "Cleaned up {} expired pending updates (TTL: {}s)",
2303                removed_count,
2304                PENDING_UPDATE_TTL_SECONDS
2305            );
2306        }
2307
2308        removed_count
2309    }
2310
2311    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2312    ///
2313    /// # Workflow
2314    ///
2315    /// This implements a deferred processing pattern for account updates when the PDA reverse
2316    /// lookup needed to resolve the primary key is not yet available:
2317    ///
2318    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2319    ///    is not available, call `queue_account_update()` to queue it for later.
2320    ///
2321    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2322    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2323    ///
2324    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2325    ///    ```ignore
2326    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2327    ///    for update in pending {
2328    ///        let mutations = vm.process_event(
2329    ///            &bytecode,
2330    ///            update.account_data,
2331    ///            &update.account_type
2332    ///        )?;
2333    ///        // Handle mutations...
2334    ///    }
2335    ///    ```
2336    ///
2337    /// # Arguments
2338    ///
2339    /// * `state_id` - The state table ID
2340    /// * `pda_address` - The PDA address that needs reverse lookup
2341    /// * `account_type` - The event type name for reprocessing
2342    /// * `account_data` - The account data (event value) for reprocessing
2343    /// * `slot` - The slot number when this update occurred
2344    /// * `signature` - The transaction signature
2345    #[cfg_attr(feature = "otel", instrument(
2346        name = "vm.queue_account_update",
2347        skip(self, update),
2348        fields(
2349            pda = %update.pda_address,
2350            account_type = %update.account_type,
2351            slot = update.slot,
2352        )
2353    ))]
2354    pub fn queue_account_update(
2355        &mut self,
2356        state_id: u32,
2357        update: QueuedAccountUpdate,
2358    ) -> Result<()> {
2359        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2360            tracing::warn!(
2361                "Pending queue size limit reached ({}), cleaning up expired updates",
2362                MAX_PENDING_UPDATES_TOTAL
2363            );
2364            let removed = self.cleanup_expired_pending_updates(state_id);
2365
2366            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2367                tracing::warn!(
2368                    "Still at limit after cleanup (removed {}), will drop oldest update",
2369                    removed
2370                );
2371                self.drop_oldest_pending_update(state_id)?;
2372            }
2373        }
2374
2375        let state = self
2376            .states
2377            .get_mut(&state_id)
2378            .ok_or("State table not found")?;
2379
2380        tracing::debug!(
2381            "📋 Queued account update for PDA {} (type: {}, slot: {})",
2382            update.pda_address,
2383            update.account_type,
2384            update.slot
2385        );
2386        let pending = PendingAccountUpdate {
2387            account_type: update.account_type,
2388            pda_address: update.pda_address.clone(),
2389            account_data: update.account_data,
2390            slot: update.slot,
2391            write_version: update.write_version,
2392            signature: update.signature,
2393            queued_at: std::time::SystemTime::now()
2394                .duration_since(std::time::UNIX_EPOCH)
2395                .unwrap()
2396                .as_secs() as i64,
2397        };
2398
2399        let pda_address = pending.pda_address.clone();
2400        let slot = pending.slot;
2401
2402        let mut updates = state
2403            .pending_updates
2404            .entry(pda_address.clone())
2405            .or_insert_with(Vec::new);
2406
2407        let original_len = updates.len();
2408        updates.retain(|existing| existing.slot > slot);
2409        let removed_by_dedup = original_len - updates.len();
2410
2411        if removed_by_dedup > 0 {
2412            self.pending_queue_size = self
2413                .pending_queue_size
2414                .saturating_sub(removed_by_dedup as u64);
2415            tracing::debug!(
2416                "Deduplicated {} older update(s) for PDA {} (new slot: {})",
2417                removed_by_dedup,
2418                pda_address,
2419                slot
2420            );
2421        }
2422
2423        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2424            tracing::warn!(
2425                "Per-PDA limit reached for {} ({} updates), dropping oldest",
2426                pda_address,
2427                updates.len()
2428            );
2429            updates.remove(0);
2430            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2431        }
2432
2433        updates.push(pending);
2434
2435        Ok(())
2436    }
2437
2438    /// Get statistics about the pending queue for monitoring
2439    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2440        let state = self.states.get(&state_id)?;
2441
2442        let now = std::time::SystemTime::now()
2443            .duration_since(std::time::UNIX_EPOCH)
2444            .unwrap()
2445            .as_secs() as i64;
2446
2447        let mut total_updates = 0;
2448        let mut oldest_timestamp = now;
2449        let mut largest_pda_queue = 0;
2450        let mut estimated_memory = 0;
2451
2452        for entry in state.pending_updates.iter() {
2453            let (_, updates) = entry.pair();
2454            total_updates += updates.len();
2455            largest_pda_queue = largest_pda_queue.max(updates.len());
2456
2457            for update in updates.iter() {
2458                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2459                // Rough memory estimate
2460                estimated_memory += update.account_type.len() +
2461                                   update.pda_address.len() +
2462                                   update.signature.len() +
2463                                   16 + // slot + queued_at
2464                                   estimate_json_size(&update.account_data);
2465            }
2466        }
2467
2468        Some(PendingQueueStats {
2469            total_updates,
2470            unique_pdas: state.pending_updates.len(),
2471            oldest_age_seconds: now - oldest_timestamp,
2472            largest_pda_queue_size: largest_pda_queue,
2473            estimated_memory_bytes: estimated_memory,
2474        })
2475    }
2476
2477    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2478        let mut stats = VmMemoryStats {
2479            path_cache_size: self.path_cache.len(),
2480            ..Default::default()
2481        };
2482
2483        if let Some(state) = self.states.get(&state_id) {
2484            stats.state_table_entity_count = state.data.len();
2485            stats.state_table_max_entries = state.config.max_entries;
2486            stats.state_table_at_capacity = state.is_at_capacity();
2487
2488            stats.lookup_index_count = state.lookup_indexes.len();
2489            stats.lookup_index_total_entries =
2490                state.lookup_indexes.values().map(|idx| idx.len()).sum();
2491
2492            stats.temporal_index_count = state.temporal_indexes.len();
2493            stats.temporal_index_total_entries = state
2494                .temporal_indexes
2495                .values()
2496                .map(|idx| idx.total_entries())
2497                .sum();
2498
2499            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2500            stats.pda_reverse_lookup_total_entries = state
2501                .pda_reverse_lookups
2502                .values()
2503                .map(|lookup| lookup.len())
2504                .sum();
2505
2506            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2507        }
2508
2509        stats
2510    }
2511
2512    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2513        let pending_removed = self.cleanup_expired_pending_updates(state_id);
2514        let temporal_removed = self.cleanup_temporal_indexes(state_id);
2515
2516        CleanupResult {
2517            pending_updates_removed: pending_removed,
2518            temporal_entries_removed: temporal_removed,
2519        }
2520    }
2521
2522    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2523        let state = match self.states.get_mut(&state_id) {
2524            Some(s) => s,
2525            None => return 0,
2526        };
2527
2528        let now = std::time::SystemTime::now()
2529            .duration_since(std::time::UNIX_EPOCH)
2530            .unwrap()
2531            .as_secs() as i64;
2532
2533        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2534        let mut total_removed = 0;
2535
2536        for (_, index) in state.temporal_indexes.iter_mut() {
2537            for mut entry in index.index.iter_mut() {
2538                let original_len = entry.value().len();
2539                entry.value_mut().retain(|(_, ts)| *ts >= cutoff);
2540                total_removed += original_len - entry.value().len();
2541            }
2542            index.index.retain(|_, entries| !entries.is_empty());
2543        }
2544
2545        if total_removed > 0 {
2546            tracing::info!(
2547                "Cleaned up {} expired temporal index entries (TTL: {}s)",
2548                total_removed,
2549                TEMPORAL_HISTORY_TTL_SECONDS
2550            );
2551        }
2552
2553        total_removed
2554    }
2555
2556    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2557        let state = self.states.get(&state_id)?;
2558
2559        if state.is_at_capacity() {
2560            Some(CapacityWarning {
2561                current_entries: state.data.len(),
2562                max_entries: state.config.max_entries,
2563                entries_over_limit: state.entries_over_limit(),
2564            })
2565        } else {
2566            None
2567        }
2568    }
2569
2570    /// Drop the oldest pending update across all PDAs
2571    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2572        let state = self
2573            .states
2574            .get_mut(&state_id)
2575            .ok_or("State table not found")?;
2576
2577        let mut oldest_pda: Option<String> = None;
2578        let mut oldest_timestamp = i64::MAX;
2579
2580        // Find the PDA with the oldest update
2581        for entry in state.pending_updates.iter() {
2582            let (pda, updates) = entry.pair();
2583            if let Some(update) = updates.first() {
2584                if update.queued_at < oldest_timestamp {
2585                    oldest_timestamp = update.queued_at;
2586                    oldest_pda = Some(pda.clone());
2587                }
2588            }
2589        }
2590
2591        // Remove the oldest update
2592        if let Some(pda) = oldest_pda {
2593            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2594                if !updates.is_empty() {
2595                    updates.remove(0);
2596                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2597
2598                    // Remove the entry if it's now empty
2599                    if updates.is_empty() {
2600                        drop(updates);
2601                        state.pending_updates.remove(&pda);
2602                    }
2603                }
2604            }
2605        }
2606
2607        Ok(())
2608    }
2609
2610    /// Flush and return pending updates for a PDA for external reprocessing
2611    ///
2612    /// Returns the pending updates that were queued for this PDA address.
2613    /// The caller should reprocess these through the VM using process_event().
2614    fn flush_pending_updates(
2615        &mut self,
2616        state_id: u32,
2617        pda_address: &str,
2618    ) -> Result<Vec<PendingAccountUpdate>> {
2619        let state = self
2620            .states
2621            .get_mut(&state_id)
2622            .ok_or("State table not found")?;
2623
2624        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2625            let count = pending_updates.len();
2626            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2627            tracing::info!(
2628                "🔄 Flushed {} pending account update(s) for PDA {}",
2629                count,
2630                pda_address
2631            );
2632            Ok(pending_updates)
2633        } else {
2634            Ok(Vec::new())
2635        }
2636    }
2637
2638    /// Try to resolve a primary key via PDA reverse lookup
2639    pub fn try_pda_reverse_lookup(
2640        &mut self,
2641        state_id: u32,
2642        lookup_name: &str,
2643        pda_address: &str,
2644    ) -> Option<String> {
2645        let state = self.states.get_mut(&state_id)?;
2646
2647        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2648            if let Some(value) = lookup.lookup(pda_address) {
2649                self.pda_cache_hits += 1;
2650                tracing::debug!(
2651                    "✓ PDA reverse lookup cache hit: {} -> {}",
2652                    pda_address,
2653                    value
2654                );
2655                return Some(value);
2656            }
2657        }
2658
2659        self.pda_cache_misses += 1;
2660        tracing::debug!("✗ PDA reverse lookup cache miss: {}", pda_address);
2661        None
2662    }
2663
2664    // ============================================================================
2665    // Computed Expression Evaluator (Task 5)
2666    // ============================================================================
2667
2668    /// Evaluate a computed expression AST against the current state
2669    /// This is the core runtime evaluator for computed fields from the AST
2670    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2671        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2672    }
2673
2674    /// Evaluate a computed expression with a variable environment (for let bindings)
2675    fn evaluate_computed_expr_with_env(
2676        &self,
2677        expr: &ComputedExpr,
2678        state: &Value,
2679        env: &std::collections::HashMap<String, Value>,
2680    ) -> Result<Value> {
2681        match expr {
2682            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2683
2684            ComputedExpr::Var { name } => env
2685                .get(name)
2686                .cloned()
2687                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2688
2689            ComputedExpr::Let { name, value, body } => {
2690                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2691                let mut new_env = env.clone();
2692                new_env.insert(name.clone(), val);
2693                self.evaluate_computed_expr_with_env(body, state, &new_env)
2694            }
2695
2696            ComputedExpr::If {
2697                condition,
2698                then_branch,
2699                else_branch,
2700            } => {
2701                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2702                if self.value_to_bool(&cond_val) {
2703                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2704                } else {
2705                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2706                }
2707            }
2708
2709            ComputedExpr::None => Ok(Value::Null),
2710
2711            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2712
2713            ComputedExpr::Slice { expr, start, end } => {
2714                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2715                match val {
2716                    Value::Array(arr) => {
2717                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2718                        Ok(Value::Array(slice))
2719                    }
2720                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2721                }
2722            }
2723
2724            ComputedExpr::Index { expr, index } => {
2725                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2726                match val {
2727                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2728                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2729                }
2730            }
2731
2732            ComputedExpr::U64FromLeBytes { bytes } => {
2733                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2734                let byte_vec = self.value_to_bytes(&val)?;
2735                if byte_vec.len() < 8 {
2736                    return Err(format!(
2737                        "u64::from_le_bytes requires 8 bytes, got {}",
2738                        byte_vec.len()
2739                    )
2740                    .into());
2741                }
2742                let arr: [u8; 8] = byte_vec[..8]
2743                    .try_into()
2744                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2745                Ok(json!(u64::from_le_bytes(arr)))
2746            }
2747
2748            ComputedExpr::U64FromBeBytes { bytes } => {
2749                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2750                let byte_vec = self.value_to_bytes(&val)?;
2751                if byte_vec.len() < 8 {
2752                    return Err(format!(
2753                        "u64::from_be_bytes requires 8 bytes, got {}",
2754                        byte_vec.len()
2755                    )
2756                    .into());
2757                }
2758                let arr: [u8; 8] = byte_vec[..8]
2759                    .try_into()
2760                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2761                Ok(json!(u64::from_be_bytes(arr)))
2762            }
2763
2764            ComputedExpr::ByteArray { bytes } => {
2765                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2766            }
2767
2768            ComputedExpr::Closure { param, body } => {
2769                // Closures are stored as-is; they're evaluated when used in map()
2770                // Return a special representation
2771                Ok(json!({
2772                    "__closure": {
2773                        "param": param,
2774                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
2775                    }
2776                }))
2777            }
2778
2779            ComputedExpr::Unary { op, expr } => {
2780                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2781                self.apply_unary_op(op, &val)
2782            }
2783
2784            ComputedExpr::JsonToBytes { expr } => {
2785                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2786                // Convert JSON array of numbers to byte array
2787                let bytes = self.value_to_bytes(&val)?;
2788                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2789            }
2790
2791            ComputedExpr::UnwrapOr { expr, default } => {
2792                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2793                if val.is_null() {
2794                    Ok(default.clone())
2795                } else {
2796                    Ok(val)
2797                }
2798            }
2799
2800            ComputedExpr::Binary { op, left, right } => {
2801                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2802                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2803                self.apply_binary_op(op, &l, &r)
2804            }
2805
2806            ComputedExpr::Cast { expr, to_type } => {
2807                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2808                self.apply_cast(&val, to_type)
2809            }
2810
2811            ComputedExpr::MethodCall { expr, method, args } => {
2812                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2813                // Special handling for map() with closures
2814                if method == "map" && args.len() == 1 {
2815                    if let ComputedExpr::Closure { param, body } = &args[0] {
2816                        // If the value is null, return null (Option::None.map returns None)
2817                        if val.is_null() {
2818                            return Ok(Value::Null);
2819                        }
2820                        // Evaluate the closure body with the value bound to param
2821                        let mut closure_env = env.clone();
2822                        closure_env.insert(param.clone(), val);
2823                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2824                    }
2825                }
2826                let evaluated_args: Vec<Value> = args
2827                    .iter()
2828                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2829                    .collect::<Result<Vec<_>>>()?;
2830                self.apply_method_call(&val, method, &evaluated_args)
2831            }
2832
2833            ComputedExpr::Literal { value } => Ok(value.clone()),
2834
2835            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2836        }
2837    }
2838
2839    /// Convert a JSON value to a byte vector
2840    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2841        match val {
2842            Value::Array(arr) => arr
2843                .iter()
2844                .map(|v| {
2845                    v.as_u64()
2846                        .map(|n| n as u8)
2847                        .ok_or_else(|| "Array element not a valid byte".into())
2848                })
2849                .collect(),
2850            Value::String(s) => {
2851                // Try to decode as hex
2852                if s.starts_with("0x") || s.starts_with("0X") {
2853                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2854                } else {
2855                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2856                }
2857            }
2858            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2859        }
2860    }
2861
2862    /// Apply a unary operation
2863    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2864        use crate::ast::UnaryOp;
2865        match op {
2866            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
2867            UnaryOp::ReverseBits => match val.as_u64() {
2868                Some(n) => Ok(json!(n.reverse_bits())),
2869                None => match val.as_i64() {
2870                    Some(n) => Ok(json!((n as u64).reverse_bits())),
2871                    None => Err("reverse_bits requires an integer".into()),
2872                },
2873            },
2874        }
2875    }
2876
2877    /// Get a field value from state by path (e.g., "section.field" or just "field")
2878    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2879        let segments: Vec<&str> = path.split('.').collect();
2880        let mut current = state;
2881
2882        for segment in segments {
2883            match current.get(segment) {
2884                Some(v) => current = v,
2885                None => return Ok(Value::Null),
2886            }
2887        }
2888
2889        Ok(current.clone())
2890    }
2891
2892    /// Apply a binary operation to two values
2893    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2894        match op {
2895            // Arithmetic operations
2896            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2897            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2898            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2899            BinaryOp::Div => {
2900                // Check for division by zero
2901                if let Some(r) = right.as_i64() {
2902                    if r == 0 {
2903                        return Err("Division by zero".into());
2904                    }
2905                }
2906                if let Some(r) = right.as_f64() {
2907                    if r == 0.0 {
2908                        return Err("Division by zero".into());
2909                    }
2910                }
2911                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2912            }
2913            BinaryOp::Mod => {
2914                // Modulo - only for integers
2915                match (left.as_i64(), right.as_i64()) {
2916                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2917                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
2918                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2919                        _ => Err("Modulo requires non-zero integer operands".into()),
2920                    },
2921                    _ => Err("Modulo by zero".into()),
2922                }
2923            }
2924
2925            // Comparison operations
2926            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2927            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2928            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2929            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2930            BinaryOp::Eq => Ok(json!(left == right)),
2931            BinaryOp::Ne => Ok(json!(left != right)),
2932
2933            // Logical operations
2934            BinaryOp::And => {
2935                let l_bool = self.value_to_bool(left);
2936                let r_bool = self.value_to_bool(right);
2937                Ok(json!(l_bool && r_bool))
2938            }
2939            BinaryOp::Or => {
2940                let l_bool = self.value_to_bool(left);
2941                let r_bool = self.value_to_bool(right);
2942                Ok(json!(l_bool || r_bool))
2943            }
2944
2945            // Bitwise operations
2946            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
2947                (Some(a), Some(b)) => Ok(json!(a ^ b)),
2948                _ => match (left.as_i64(), right.as_i64()) {
2949                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
2950                    _ => Err("XOR requires integer operands".into()),
2951                },
2952            },
2953            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
2954                (Some(a), Some(b)) => Ok(json!(a & b)),
2955                _ => match (left.as_i64(), right.as_i64()) {
2956                    (Some(a), Some(b)) => Ok(json!(a & b)),
2957                    _ => Err("BitAnd requires integer operands".into()),
2958                },
2959            },
2960            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
2961                (Some(a), Some(b)) => Ok(json!(a | b)),
2962                _ => match (left.as_i64(), right.as_i64()) {
2963                    (Some(a), Some(b)) => Ok(json!(a | b)),
2964                    _ => Err("BitOr requires integer operands".into()),
2965                },
2966            },
2967            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
2968                (Some(a), Some(b)) => Ok(json!(a << b)),
2969                _ => match (left.as_i64(), right.as_i64()) {
2970                    (Some(a), Some(b)) => Ok(json!(a << b)),
2971                    _ => Err("Shl requires integer operands".into()),
2972                },
2973            },
2974            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
2975                (Some(a), Some(b)) => Ok(json!(a >> b)),
2976                _ => match (left.as_i64(), right.as_i64()) {
2977                    (Some(a), Some(b)) => Ok(json!(a >> b)),
2978                    _ => Err("Shr requires integer operands".into()),
2979                },
2980            },
2981        }
2982    }
2983
2984    /// Helper for numeric operations that can work on integers or floats
2985    fn numeric_op<F1, F2>(
2986        &self,
2987        left: &Value,
2988        right: &Value,
2989        int_op: F1,
2990        float_op: F2,
2991    ) -> Result<Value>
2992    where
2993        F1: Fn(i64, i64) -> i64,
2994        F2: Fn(f64, f64) -> f64,
2995    {
2996        // Try i64 first
2997        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2998            return Ok(json!(int_op(a, b)));
2999        }
3000
3001        // Try u64
3002        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3003            // For u64, we need to be careful with underflow in subtraction
3004            return Ok(json!(int_op(a as i64, b as i64)));
3005        }
3006
3007        // Try f64
3008        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3009            return Ok(json!(float_op(a, b)));
3010        }
3011
3012        // If either is null, return null
3013        if left.is_null() || right.is_null() {
3014            return Ok(Value::Null);
3015        }
3016
3017        Err(format!(
3018            "Cannot perform numeric operation on {:?} and {:?}",
3019            left, right
3020        )
3021        .into())
3022    }
3023
3024    /// Helper for comparison operations
3025    fn comparison_op<F1, F2>(
3026        &self,
3027        left: &Value,
3028        right: &Value,
3029        int_cmp: F1,
3030        float_cmp: F2,
3031    ) -> Result<Value>
3032    where
3033        F1: Fn(i64, i64) -> bool,
3034        F2: Fn(f64, f64) -> bool,
3035    {
3036        // Try i64 first
3037        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3038            return Ok(json!(int_cmp(a, b)));
3039        }
3040
3041        // Try u64
3042        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3043            return Ok(json!(int_cmp(a as i64, b as i64)));
3044        }
3045
3046        // Try f64
3047        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3048            return Ok(json!(float_cmp(a, b)));
3049        }
3050
3051        // If either is null, comparison returns false
3052        if left.is_null() || right.is_null() {
3053            return Ok(json!(false));
3054        }
3055
3056        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3057    }
3058
3059    /// Convert a value to boolean for logical operations
3060    fn value_to_bool(&self, value: &Value) -> bool {
3061        match value {
3062            Value::Null => false,
3063            Value::Bool(b) => *b,
3064            Value::Number(n) => {
3065                if let Some(i) = n.as_i64() {
3066                    i != 0
3067                } else if let Some(f) = n.as_f64() {
3068                    f != 0.0
3069                } else {
3070                    true
3071                }
3072            }
3073            Value::String(s) => !s.is_empty(),
3074            Value::Array(arr) => !arr.is_empty(),
3075            Value::Object(obj) => !obj.is_empty(),
3076        }
3077    }
3078
3079    /// Apply a type cast to a value
3080    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3081        match to_type {
3082            "i8" | "i16" | "i32" | "i64" | "isize" => {
3083                if let Some(n) = value.as_i64() {
3084                    Ok(json!(n))
3085                } else if let Some(n) = value.as_u64() {
3086                    Ok(json!(n as i64))
3087                } else if let Some(n) = value.as_f64() {
3088                    Ok(json!(n as i64))
3089                } else if let Some(s) = value.as_str() {
3090                    s.parse::<i64>()
3091                        .map(|n| json!(n))
3092                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3093                } else {
3094                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3095                }
3096            }
3097            "u8" | "u16" | "u32" | "u64" | "usize" => {
3098                if let Some(n) = value.as_u64() {
3099                    Ok(json!(n))
3100                } else if let Some(n) = value.as_i64() {
3101                    Ok(json!(n as u64))
3102                } else if let Some(n) = value.as_f64() {
3103                    Ok(json!(n as u64))
3104                } else if let Some(s) = value.as_str() {
3105                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3106                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3107                    })
3108                } else {
3109                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3110                }
3111            }
3112            "f32" | "f64" => {
3113                if let Some(n) = value.as_f64() {
3114                    Ok(json!(n))
3115                } else if let Some(n) = value.as_i64() {
3116                    Ok(json!(n as f64))
3117                } else if let Some(n) = value.as_u64() {
3118                    Ok(json!(n as f64))
3119                } else if let Some(s) = value.as_str() {
3120                    s.parse::<f64>()
3121                        .map(|n| json!(n))
3122                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3123                } else {
3124                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3125                }
3126            }
3127            "String" | "string" => Ok(json!(value.to_string())),
3128            "bool" => Ok(json!(self.value_to_bool(value))),
3129            _ => {
3130                // Unknown type, return value as-is
3131                Ok(value.clone())
3132            }
3133        }
3134    }
3135
3136    /// Apply a method call to a value
3137    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3138        match method {
3139            "unwrap_or" => {
3140                if value.is_null() && !args.is_empty() {
3141                    Ok(args[0].clone())
3142                } else {
3143                    Ok(value.clone())
3144                }
3145            }
3146            "unwrap_or_default" => {
3147                if value.is_null() {
3148                    // Return default for common types
3149                    Ok(json!(0))
3150                } else {
3151                    Ok(value.clone())
3152                }
3153            }
3154            "is_some" => Ok(json!(!value.is_null())),
3155            "is_none" => Ok(json!(value.is_null())),
3156            "abs" => {
3157                if let Some(n) = value.as_i64() {
3158                    Ok(json!(n.abs()))
3159                } else if let Some(n) = value.as_f64() {
3160                    Ok(json!(n.abs()))
3161                } else {
3162                    Err(format!("Cannot call abs() on {:?}", value).into())
3163                }
3164            }
3165            "len" => {
3166                if let Some(s) = value.as_str() {
3167                    Ok(json!(s.len()))
3168                } else if let Some(arr) = value.as_array() {
3169                    Ok(json!(arr.len()))
3170                } else if let Some(obj) = value.as_object() {
3171                    Ok(json!(obj.len()))
3172                } else {
3173                    Err(format!("Cannot call len() on {:?}", value).into())
3174                }
3175            }
3176            "to_string" => Ok(json!(value.to_string())),
3177            "min" => {
3178                if args.is_empty() {
3179                    return Err("min() requires an argument".into());
3180                }
3181                let other = &args[0];
3182                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3183                    Ok(json!(a.min(b)))
3184                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3185                    Ok(json!(a.min(b)))
3186                } else {
3187                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3188                }
3189            }
3190            "max" => {
3191                if args.is_empty() {
3192                    return Err("max() requires an argument".into());
3193                }
3194                let other = &args[0];
3195                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3196                    Ok(json!(a.max(b)))
3197                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3198                    Ok(json!(a.max(b)))
3199                } else {
3200                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3201                }
3202            }
3203            "saturating_add" => {
3204                if args.is_empty() {
3205                    return Err("saturating_add() requires an argument".into());
3206                }
3207                let other = &args[0];
3208                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3209                    Ok(json!(a.saturating_add(b)))
3210                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3211                    Ok(json!(a.saturating_add(b)))
3212                } else {
3213                    Err(format!(
3214                        "Cannot call saturating_add() on {:?} and {:?}",
3215                        value, other
3216                    )
3217                    .into())
3218                }
3219            }
3220            "saturating_sub" => {
3221                if args.is_empty() {
3222                    return Err("saturating_sub() requires an argument".into());
3223                }
3224                let other = &args[0];
3225                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3226                    Ok(json!(a.saturating_sub(b)))
3227                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3228                    Ok(json!(a.saturating_sub(b)))
3229                } else {
3230                    Err(format!(
3231                        "Cannot call saturating_sub() on {:?} and {:?}",
3232                        value, other
3233                    )
3234                    .into())
3235                }
3236            }
3237            _ => {
3238                // Unknown method - return value unchanged with a warning
3239                tracing::warn!("Unknown method call: {}() on {:?}", method, value);
3240                Ok(value.clone())
3241            }
3242        }
3243    }
3244
3245    /// Evaluate all computed fields for an entity and update the state
3246    /// This takes a list of ComputedFieldSpec from the AST and applies them
3247    pub fn evaluate_computed_fields_from_ast(
3248        &self,
3249        state: &mut Value,
3250        computed_field_specs: &[ComputedFieldSpec],
3251    ) -> Result<Vec<String>> {
3252        let mut updated_paths = Vec::new();
3253
3254        for spec in computed_field_specs {
3255            match self.evaluate_computed_expr(&spec.expression, state) {
3256                Ok(result) => {
3257                    // Set the computed value in state
3258                    self.set_field_in_state(state, &spec.target_path, result)?;
3259                    updated_paths.push(spec.target_path.clone());
3260                }
3261                Err(e) => {
3262                    // Log error but continue with other computed fields
3263                    tracing::warn!(
3264                        "Failed to evaluate computed field '{}': {}",
3265                        spec.target_path,
3266                        e
3267                    );
3268                }
3269            }
3270        }
3271
3272        Ok(updated_paths)
3273    }
3274
3275    /// Set a field value in state by path (e.g., "section.field")
3276    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3277        let segments: Vec<&str> = path.split('.').collect();
3278
3279        if segments.is_empty() {
3280            return Err("Empty path".into());
3281        }
3282
3283        // Navigate to parent, creating intermediate objects as needed
3284        let mut current = state;
3285        for (i, segment) in segments.iter().enumerate() {
3286            if i == segments.len() - 1 {
3287                // Last segment - set the value
3288                if let Some(obj) = current.as_object_mut() {
3289                    obj.insert(segment.to_string(), value);
3290                    return Ok(());
3291                } else {
3292                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
3293                }
3294            } else {
3295                // Intermediate segment - navigate or create
3296                if !current.is_object() {
3297                    *current = json!({});
3298                }
3299                let obj = current.as_object_mut().unwrap();
3300                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3301            }
3302        }
3303
3304        Ok(())
3305    }
3306
3307    /// Create a computed fields evaluator closure from AST specs
3308    /// This returns a function that can be passed to the bytecode builder
3309    pub fn create_evaluator_from_specs(
3310        specs: Vec<ComputedFieldSpec>,
3311    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3312        move |state: &mut Value| {
3313            // Create a temporary VmContext just for evaluation
3314            // (We only need the expression evaluation methods)
3315            let vm = VmContext::new();
3316            vm.evaluate_computed_fields_from_ast(state, &specs)?;
3317            Ok(())
3318        }
3319    }
3320}
3321
3322impl Default for VmContext {
3323    fn default() -> Self {
3324        Self::new()
3325    }
3326}
3327
3328// Implement the ReverseLookupUpdater trait for VmContext
3329impl crate::resolvers::ReverseLookupUpdater for VmContext {
3330    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3331        // Use default state_id=0 and default lookup name
3332        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3333            .unwrap_or_else(|e| {
3334                tracing::error!("Failed to update PDA reverse lookup: {}", e);
3335                Vec::new()
3336            })
3337    }
3338
3339    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3340        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
3341        self.flush_pending_updates(0, pda_address)
3342            .unwrap_or_else(|e| {
3343                tracing::error!("Failed to flush pending updates: {}", e);
3344                Vec::new()
3345            })
3346    }
3347}
3348
3349#[cfg(test)]
3350mod tests {
3351    use super::*;
3352    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3353
3354    #[test]
3355    fn test_computed_field_preserves_integer_type() {
3356        let vm = VmContext::new();
3357
3358        let mut state = serde_json::json!({
3359            "trading": {
3360                "total_buy_volume": 20000000000_i64,
3361                "total_sell_volume": 17951316474_i64
3362            }
3363        });
3364
3365        let spec = ComputedFieldSpec {
3366            target_path: "trading.total_volume".to_string(),
3367            result_type: "Option<u64>".to_string(),
3368            expression: ComputedExpr::Binary {
3369                op: BinaryOp::Add,
3370                left: Box::new(ComputedExpr::UnwrapOr {
3371                    expr: Box::new(ComputedExpr::FieldRef {
3372                        path: "trading.total_buy_volume".to_string(),
3373                    }),
3374                    default: serde_json::json!(0),
3375                }),
3376                right: Box::new(ComputedExpr::UnwrapOr {
3377                    expr: Box::new(ComputedExpr::FieldRef {
3378                        path: "trading.total_sell_volume".to_string(),
3379                    }),
3380                    default: serde_json::json!(0),
3381                }),
3382            },
3383        };
3384
3385        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3386            .unwrap();
3387
3388        let total_volume = state
3389            .get("trading")
3390            .and_then(|t| t.get("total_volume"))
3391            .expect("total_volume should exist");
3392
3393        let serialized = serde_json::to_string(total_volume).unwrap();
3394        assert!(
3395            !serialized.contains('.'),
3396            "Integer should not have decimal point: {}",
3397            serialized
3398        );
3399        assert_eq!(
3400            total_volume.as_i64(),
3401            Some(37951316474),
3402            "Value should be correct sum"
3403        );
3404    }
3405
3406    #[test]
3407    fn test_set_field_sum_preserves_integer_type() {
3408        let mut vm = VmContext::new();
3409        vm.registers[0] = serde_json::json!({});
3410        vm.registers[1] = serde_json::json!(20000000000_i64);
3411        vm.registers[2] = serde_json::json!(17951316474_i64);
3412
3413        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3414        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3415
3416        let state = &vm.registers[0];
3417        let buy_vol = state
3418            .get("trading")
3419            .and_then(|t| t.get("total_buy_volume"))
3420            .unwrap();
3421        let sell_vol = state
3422            .get("trading")
3423            .and_then(|t| t.get("total_sell_volume"))
3424            .unwrap();
3425
3426        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3427        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3428
3429        assert!(
3430            !buy_serialized.contains('.'),
3431            "Buy volume should not have decimal: {}",
3432            buy_serialized
3433        );
3434        assert!(
3435            !sell_serialized.contains('.'),
3436            "Sell volume should not have decimal: {}",
3437            sell_serialized
3438        );
3439    }
3440}