hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Additional custom metadata that can be added without breaking changes
27    pub metadata: HashMap<String, Value>,
28}
29
30impl UpdateContext {
31    /// Create a new UpdateContext with slot and signature
32    pub fn new(slot: u64, signature: String) -> Self {
33        Self {
34            slot: Some(slot),
35            signature: Some(signature),
36            timestamp: None,
37            metadata: HashMap::new(),
38        }
39    }
40
41    /// Create a new UpdateContext with slot, signature, and timestamp
42    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
43        Self {
44            slot: Some(slot),
45            signature: Some(signature),
46            timestamp: Some(timestamp),
47            metadata: HashMap::new(),
48        }
49    }
50
51    /// Get the timestamp, falling back to current system time if not set
52    pub fn timestamp(&self) -> i64 {
53        self.timestamp.unwrap_or_else(|| {
54            std::time::SystemTime::now()
55                .duration_since(std::time::UNIX_EPOCH)
56                .unwrap()
57                .as_secs() as i64
58        })
59    }
60
61    /// Create an empty context (for testing or when context is not available)
62    pub fn empty() -> Self {
63        Self::default()
64    }
65
66    /// Add custom metadata
67    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
68        self.metadata.insert(key, value);
69        self
70    }
71
72    /// Get metadata value
73    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
74        self.metadata.get(key)
75    }
76
77    /// Convert context to JSON value for injection into event data
78    pub fn to_value(&self) -> Value {
79        let mut obj = serde_json::Map::new();
80        if let Some(slot) = self.slot {
81            obj.insert("slot".to_string(), json!(slot));
82        }
83        if let Some(ref sig) = self.signature {
84            obj.insert("signature".to_string(), json!(sig));
85        }
86        // Always include timestamp (use current time if not set)
87        obj.insert("timestamp".to_string(), json!(self.timestamp()));
88        for (key, value) in &self.metadata {
89            obj.insert(key.clone(), value.clone());
90        }
91        Value::Object(obj)
92    }
93}
94
95pub type Register = usize;
96pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
97
98pub type RegisterValue = Value;
99
100/// Trait for evaluating computed fields
101/// Implement this in your generated spec to enable computed field evaluation
102pub trait ComputedFieldsEvaluator {
103    fn evaluate(&self, state: &mut Value) -> Result<()>;
104}
105
106// Pending queue configuration
107const MAX_PENDING_UPDATES_TOTAL: usize = 10_000;
108const MAX_PENDING_UPDATES_PER_PDA: usize = 10;
109const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
110
111/// Estimate the size of a JSON value in bytes
112fn estimate_json_size(value: &Value) -> usize {
113    match value {
114        Value::Null => 4,
115        Value::Bool(_) => 5,
116        Value::Number(_) => 8,
117        Value::String(s) => s.len() + 2,
118        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
119        Value::Object(obj) => {
120            2 + obj
121                .iter()
122                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
123                .sum::<usize>()
124        }
125    }
126}
127
128#[derive(Debug, Clone)]
129pub struct CompiledPath {
130    pub segments: std::sync::Arc<[String]>,
131}
132
133impl CompiledPath {
134    pub fn new(path: &str) -> Self {
135        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
136        CompiledPath {
137            segments: segments.into(),
138        }
139    }
140
141    fn segments(&self) -> &[String] {
142        &self.segments
143    }
144}
145
146pub struct VmContext {
147    registers: Vec<RegisterValue>,
148    states: HashMap<u32, StateTable>,
149    pub instructions_executed: u64,
150    pub cache_hits: u64,
151    path_cache: HashMap<String, CompiledPath>,
152    pub pda_cache_hits: u64,
153    pub pda_cache_misses: u64,
154    pub pending_queue_size: u64,
155    /// Current update context (set during execute_handler)
156    current_context: Option<UpdateContext>,
157}
158
159#[derive(Debug)]
160pub struct LookupIndex {
161    index: DashMap<Value, Value>,
162}
163
164impl Default for LookupIndex {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170impl LookupIndex {
171    pub fn new() -> Self {
172        LookupIndex {
173            index: DashMap::new(),
174        }
175    }
176
177    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
178        self.index.get(lookup_value).map(|v| v.clone())
179    }
180
181    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
182        self.index.insert(lookup_value, primary_key);
183    }
184
185    pub fn len(&self) -> usize {
186        self.index.len()
187    }
188
189    pub fn is_empty(&self) -> bool {
190        self.index.is_empty()
191    }
192}
193
194#[derive(Debug)]
195pub struct TemporalIndex {
196    index: DashMap<Value, Vec<(Value, i64)>>,
197}
198
199impl Default for TemporalIndex {
200    fn default() -> Self {
201        Self::new()
202    }
203}
204
205impl TemporalIndex {
206    pub fn new() -> Self {
207        TemporalIndex {
208            index: DashMap::new(),
209        }
210    }
211
212    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
213        if let Some(entries) = self.index.get(lookup_value) {
214            let entries_vec = entries.value();
215            for i in (0..entries_vec.len()).rev() {
216                if entries_vec[i].1 <= timestamp {
217                    return Some(entries_vec[i].0.clone());
218                }
219            }
220        }
221        None
222    }
223
224    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
225        if let Some(entries) = self.index.get(lookup_value) {
226            let entries_vec = entries.value();
227            if let Some(last) = entries_vec.last() {
228                return Some(last.0.clone());
229            }
230        }
231        None
232    }
233
234    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
235        let lookup_key = lookup_value.clone();
236        self.index
237            .entry(lookup_value)
238            .or_default()
239            .push((primary_key, timestamp));
240
241        if let Some(mut entries) = self.index.get_mut(&lookup_key) {
242            entries.sort_by_key(|(_, ts)| *ts);
243        }
244    }
245}
246
247#[derive(Debug)]
248pub struct PdaReverseLookup {
249    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
250    index: LruCache<String, String>,
251}
252
253impl PdaReverseLookup {
254    pub fn new(capacity: usize) -> Self {
255        PdaReverseLookup {
256            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
257        }
258    }
259
260    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
261        self.index.get(pda_address).cloned()
262    }
263
264    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
265        // Check if we're at capacity and will evict
266        let evicted = if self.index.len() >= self.index.cap().get() {
267            // Get the LRU key that will be evicted
268            self.index.peek_lru().map(|(k, _)| k.clone())
269        } else {
270            None
271        };
272
273        self.index.put(pda_address, seed_value);
274        evicted
275    }
276}
277
278#[derive(Debug, Clone)]
279pub struct PendingAccountUpdate {
280    pub account_type: String,
281    pub pda_address: String,
282    pub account_data: Value,
283    pub slot: u64,
284    pub signature: String,
285    pub queued_at: i64,
286}
287
288#[derive(Debug, Clone)]
289pub struct PendingQueueStats {
290    pub total_updates: usize,
291    pub unique_pdas: usize,
292    pub oldest_age_seconds: i64,
293    pub largest_pda_queue_size: usize,
294    pub estimated_memory_bytes: usize,
295}
296
297#[derive(Debug)]
298pub struct StateTable {
299    pub data: DashMap<Value, Value>,
300    pub lookup_indexes: HashMap<String, LookupIndex>,
301    pub temporal_indexes: HashMap<String, TemporalIndex>,
302    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
303    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
304}
305
306impl VmContext {
307    pub fn new() -> Self {
308        let mut vm = VmContext {
309            registers: vec![Value::Null; 256],
310            states: HashMap::new(),
311            instructions_executed: 0,
312            cache_hits: 0,
313            path_cache: HashMap::new(),
314            pda_cache_hits: 0,
315            pda_cache_misses: 0,
316            pending_queue_size: 0,
317            current_context: None,
318        };
319        vm.states.insert(
320            0,
321            StateTable {
322                data: DashMap::new(),
323                lookup_indexes: HashMap::new(),
324                temporal_indexes: HashMap::new(),
325                pda_reverse_lookups: HashMap::new(),
326                pending_updates: DashMap::new(),
327            },
328        );
329        vm
330    }
331
332    /// Get a mutable reference to a state table by ID
333    /// Returns None if the state ID doesn't exist
334    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
335        self.states.get_mut(&state_id)
336    }
337
338    /// Get public access to registers (for metrics context)
339    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
340        &mut self.registers
341    }
342
343    /// Get public access to path cache (for metrics context)
344    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
345        &self.path_cache
346    }
347
348    /// Get the current update context
349    pub fn current_context(&self) -> Option<&UpdateContext> {
350        self.current_context.as_ref()
351    }
352
353    /// Update the state table with the current value in a register
354    /// This allows imperative hooks to persist their changes to the state table
355    pub fn update_state_from_register(
356        &mut self,
357        state_id: u32,
358        key: Value,
359        register: Register,
360    ) -> Result<()> {
361        let state = self.states.get(&state_id).ok_or("State table not found")?;
362        let value = self.registers[register].clone();
363        state.data.insert(key, value);
364        Ok(())
365    }
366
367    fn reset_registers(&mut self) {
368        for reg in &mut self.registers {
369            *reg = Value::Null;
370        }
371    }
372
373    /// Extract only the dirty fields from state (public for use by instruction hooks)
374    pub fn extract_partial_state(
375        &self,
376        state_reg: Register,
377        dirty_fields: &HashSet<String>,
378    ) -> Result<Value> {
379        let full_state = &self.registers[state_reg];
380
381        if dirty_fields.is_empty() {
382            return Ok(json!({}));
383        }
384
385        let mut partial = serde_json::Map::new();
386
387        for path in dirty_fields {
388            let segments: Vec<&str> = path.split('.').collect();
389
390            let mut current = full_state;
391            let mut found = true;
392
393            for segment in &segments {
394                match current.get(segment) {
395                    Some(v) => current = v,
396                    None => {
397                        found = false;
398                        break;
399                    }
400                }
401            }
402
403            if !found {
404                continue;
405            }
406
407            let mut target = &mut partial;
408            for (i, segment) in segments.iter().enumerate() {
409                if i == segments.len() - 1 {
410                    target.insert(segment.to_string(), current.clone());
411                } else {
412                    target
413                        .entry(segment.to_string())
414                        .or_insert_with(|| json!({}));
415                    target = target
416                        .get_mut(*segment)
417                        .and_then(|v| v.as_object_mut())
418                        .ok_or("Failed to build nested structure")?;
419                }
420            }
421        }
422
423        Ok(Value::Object(partial))
424    }
425
426    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
427        if let Some(compiled) = self.path_cache.get(path) {
428            self.cache_hits += 1;
429            return compiled.clone();
430        }
431        let compiled = CompiledPath::new(path);
432        self.path_cache.insert(path.to_string(), compiled.clone());
433        compiled
434    }
435
436    /// Process an event with optional context metadata
437    #[cfg_attr(feature = "otel", instrument(
438        name = "vm.process_event",
439        skip(self, bytecode, event_value, context),
440        fields(
441            event_type = %event_type,
442            slot = context.as_ref().and_then(|c| c.slot),
443        )
444    ))]
445    pub fn process_event_with_context(
446        &mut self,
447        bytecode: &MultiEntityBytecode,
448        mut event_value: Value,
449        event_type: &str,
450        context: Option<&UpdateContext>,
451    ) -> Result<Vec<Mutation>> {
452        // Store context for use during handler execution
453        self.current_context = context.cloned();
454
455        // Inject context metadata into event value if provided
456        if let Some(ctx) = context {
457            if let Some(obj) = event_value.as_object_mut() {
458                obj.insert("__update_context".to_string(), ctx.to_value());
459            }
460        }
461
462        let mut all_mutations = Vec::new();
463
464        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
465            tracing::debug!(
466                "🔀 Event type '{}' routes to {} entity/entities",
467                event_type,
468                entity_names.len()
469            );
470
471            for entity_name in entity_names {
472                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
473                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
474                        tracing::debug!(
475                            "   ▶️  Executing handler for entity '{}', event '{}'",
476                            entity_name,
477                            event_type
478                        );
479                        tracing::debug!("      Handler has {} opcodes", handler.len());
480
481                        let mutations = self.execute_handler(
482                            handler,
483                            event_value.clone(),
484                            event_type,
485                            entity_bytecode.state_id,
486                            entity_bytecode.computed_fields_evaluator.as_ref(),
487                        )?;
488
489                        tracing::debug!("      Handler produced {} mutation(s)", mutations.len());
490                        all_mutations.extend(mutations);
491                    } else {
492                        tracing::debug!(
493                            "   ⊘ No handler found for entity '{}', event '{}'",
494                            entity_name,
495                            event_type
496                        );
497                    }
498                } else {
499                    tracing::debug!("   ⊘ Entity '{}' not found in bytecode", entity_name);
500                }
501            }
502        } else {
503            tracing::debug!("⊘ No event routing found for event type '{}'", event_type);
504        }
505
506        Ok(all_mutations)
507    }
508
509    /// Process an event without context (backward compatibility)
510    pub fn process_event(
511        &mut self,
512        bytecode: &MultiEntityBytecode,
513        event_value: Value,
514        event_type: &str,
515    ) -> Result<Vec<Mutation>> {
516        self.process_event_with_context(bytecode, event_value, event_type, None)
517    }
518
519    pub fn process_any(
520        &mut self,
521        bytecode: &MultiEntityBytecode,
522        any: prost_types::Any,
523    ) -> Result<Vec<Mutation>> {
524        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
525        self.process_event(bytecode, event_value, &event_type)
526    }
527
528    #[cfg_attr(feature = "otel", instrument(
529        name = "vm.execute_handler", 
530        skip(self, handler, event_value, entity_evaluator),
531        level = "debug",
532        fields(
533            event_type = %event_type,
534            handler_opcodes = handler.len(),
535        )
536    ))]
537    #[allow(clippy::type_complexity)]
538    fn execute_handler(
539        &mut self,
540        handler: &[OpCode],
541        event_value: Value,
542        event_type: &str,
543        override_state_id: u32,
544        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
545    ) -> Result<Vec<Mutation>> {
546        self.reset_registers();
547
548        tracing::trace!(
549            "Executing handler: event_type={}, handler_opcodes={}, event_value={:?}",
550            event_type,
551            handler.len(),
552            event_value
553        );
554
555        let mut pc: usize = 0;
556        let mut output = Vec::new();
557        let mut dirty_fields: HashSet<String> = HashSet::new();
558
559        while pc < handler.len() {
560            tracing::debug!(
561                "Executing opcode {}/{}: {:?}",
562                pc + 1,
563                handler.len(),
564                &handler[pc]
565            );
566            match &handler[pc] {
567                OpCode::LoadEventField {
568                    path,
569                    dest,
570                    default,
571                } => {
572                    let value = self.load_field(&event_value, path, default.as_ref())?;
573                    tracing::trace!(
574                        "LoadEventField: path={:?}, dest={}, value={:?}",
575                        path.segments,
576                        dest,
577                        value
578                    );
579                    // Warn if accounts.* path returns null - this helps debug key resolution issues
580                    if value.is_null() && path.segments.len() >= 2 && path.segments[0] == "accounts"
581                    {
582                        tracing::warn!(
583                            "⚠️ LoadEventField returned NULL for accounts path: {:?} -> dest={}",
584                            path.segments,
585                            dest
586                        );
587                    }
588                    self.registers[*dest] = value;
589                    pc += 1;
590                }
591                OpCode::LoadConstant { value, dest } => {
592                    self.registers[*dest] = value.clone();
593                    pc += 1;
594                }
595                OpCode::CopyRegister { source, dest } => {
596                    let value = self.registers[*source].clone();
597                    tracing::trace!(
598                        "CopyRegister: source={}, dest={}, value={:?}",
599                        source,
600                        dest,
601                        if value.is_null() {
602                            "NULL".to_string()
603                        } else {
604                            format!("{}", value)
605                        }
606                    );
607                    self.registers[*dest] = value;
608                    pc += 1;
609                }
610                OpCode::CopyRegisterIfNull { source, dest } => {
611                    if self.registers[*dest].is_null() {
612                        self.registers[*dest] = self.registers[*source].clone();
613                        tracing::trace!(
614                            "CopyRegisterIfNull: copied from reg {} to reg {} (value={:?})",
615                            source,
616                            dest,
617                            self.registers[*source]
618                        );
619                    } else {
620                        tracing::trace!(
621                            "CopyRegisterIfNull: dest reg {} not null, skipping copy",
622                            dest
623                        );
624                    }
625                    pc += 1;
626                }
627                OpCode::GetEventType { dest } => {
628                    self.registers[*dest] = json!(event_type);
629                    pc += 1;
630                }
631                OpCode::CreateObject { dest } => {
632                    self.registers[*dest] = json!({});
633                    pc += 1;
634                }
635                OpCode::SetField {
636                    object,
637                    path,
638                    value,
639                } => {
640                    let val = self.registers[*value].clone();
641                    tracing::trace!("SetField: path={}, value={:?}", path, val);
642
643                    self.set_field_auto_vivify(*object, path, *value)?;
644                    dirty_fields.insert(path.to_string());
645                    pc += 1;
646                }
647                OpCode::SetFields { object, fields } => {
648                    for (path, value_reg) in fields {
649                        self.set_field_auto_vivify(*object, path, *value_reg)?;
650                        dirty_fields.insert(path.to_string());
651                    }
652                    pc += 1;
653                }
654                OpCode::GetField { object, path, dest } => {
655                    let value = self.get_field(*object, path)?;
656                    self.registers[*dest] = value;
657                    pc += 1;
658                }
659                OpCode::ReadOrInitState {
660                    state_id: _,
661                    key,
662                    default,
663                    dest,
664                } => {
665                    let actual_state_id = override_state_id;
666                    self.states
667                        .entry(actual_state_id)
668                        .or_insert_with(|| StateTable {
669                            data: DashMap::new(),
670                            lookup_indexes: HashMap::new(),
671                            temporal_indexes: HashMap::new(),
672                            pda_reverse_lookups: HashMap::new(),
673                            pending_updates: DashMap::new(),
674                        });
675                    let state = self
676                        .states
677                        .get(&actual_state_id)
678                        .ok_or("State table not found")?;
679                    let key_value = &self.registers[*key];
680                    if key_value.is_null() {
681                        // Only warn for account state updates (not instruction hooks that just register PDAs)
682                        // Instruction types ending in "IxState" that don't have lookup_by are expected to have null keys
683                        if event_type.ends_with("State") && !event_type.ends_with("IxState") {
684                            tracing::warn!(
685                                "ReadOrInitState: key register {} is NULL for account state, event_type={}",
686                                key,
687                                event_type
688                            );
689                        } else {
690                            tracing::debug!(
691                                "ReadOrInitState: key register {} is NULL (expected for PDA registration hook), event_type={}",
692                                key,
693                                event_type
694                            );
695                        }
696                    }
697                    let value = state
698                        .data
699                        .get(key_value)
700                        .map(|v| v.clone())
701                        .unwrap_or_else(|| default.clone());
702
703                    self.registers[*dest] = value;
704                    pc += 1;
705                }
706                OpCode::UpdateState {
707                    state_id: _,
708                    key,
709                    value,
710                } => {
711                    let actual_state_id = override_state_id;
712                    let state = self
713                        .states
714                        .get(&actual_state_id)
715                        .ok_or("State table not found")?;
716                    let key_value = self.registers[*key].clone();
717                    let value_data = self.registers[*value].clone();
718
719                    state.data.insert(key_value, value_data);
720                    pc += 1;
721                }
722                OpCode::AppendToArray {
723                    object,
724                    path,
725                    value,
726                } => {
727                    tracing::trace!(
728                        "AppendToArray: path={}, value={:?}",
729                        path,
730                        self.registers[*value]
731                    );
732                    self.append_to_array(*object, path, *value)?;
733                    dirty_fields.insert(path.to_string());
734                    pc += 1;
735                }
736                OpCode::GetCurrentTimestamp { dest } => {
737                    let timestamp = std::time::SystemTime::now()
738                        .duration_since(std::time::UNIX_EPOCH)
739                        .unwrap()
740                        .as_secs() as i64;
741                    self.registers[*dest] = json!(timestamp);
742                    pc += 1;
743                }
744                OpCode::CreateEvent { dest, event_value } => {
745                    tracing::trace!(
746                        "CreateEvent: event_value_reg={}, event_value={:?}",
747                        event_value,
748                        self.registers[*event_value]
749                    );
750                    let timestamp = std::time::SystemTime::now()
751                        .duration_since(std::time::UNIX_EPOCH)
752                        .unwrap()
753                        .as_secs() as i64;
754
755                    // Filter out __update_context from the event data
756                    let mut event_data = self.registers[*event_value].clone();
757                    if let Some(obj) = event_data.as_object_mut() {
758                        obj.remove("__update_context");
759                    }
760
761                    // Create event with timestamp, data, and optional slot/signature from context
762                    let mut event = serde_json::Map::new();
763                    event.insert("timestamp".to_string(), json!(timestamp));
764                    event.insert("data".to_string(), event_data);
765
766                    // Add slot and signature if available from current context
767                    if let Some(ref ctx) = self.current_context {
768                        if let Some(slot) = ctx.slot {
769                            event.insert("slot".to_string(), json!(slot));
770                        }
771                        if let Some(ref signature) = ctx.signature {
772                            event.insert("signature".to_string(), json!(signature));
773                        }
774                    }
775
776                    self.registers[*dest] = Value::Object(event);
777                    pc += 1;
778                }
779                OpCode::CreateCapture {
780                    dest,
781                    capture_value,
782                } => {
783                    let timestamp = std::time::SystemTime::now()
784                        .duration_since(std::time::UNIX_EPOCH)
785                        .unwrap()
786                        .as_secs() as i64;
787
788                    // Get the capture data (already filtered by load_field)
789                    let capture_data = self.registers[*capture_value].clone();
790
791                    // Extract account_address from the original event if available
792                    let account_address = event_value
793                        .get("__account_address")
794                        .and_then(|v| v.as_str())
795                        .unwrap_or("")
796                        .to_string();
797
798                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
799                    let mut capture = serde_json::Map::new();
800                    capture.insert("timestamp".to_string(), json!(timestamp));
801                    capture.insert("account_address".to_string(), json!(account_address));
802                    capture.insert("data".to_string(), capture_data);
803
804                    // Add slot and signature if available from current context
805                    if let Some(ref ctx) = self.current_context {
806                        if let Some(slot) = ctx.slot {
807                            capture.insert("slot".to_string(), json!(slot));
808                        }
809                        if let Some(ref signature) = ctx.signature {
810                            capture.insert("signature".to_string(), json!(signature));
811                        }
812                    }
813
814                    self.registers[*dest] = Value::Object(capture);
815                    pc += 1;
816                }
817                OpCode::Transform {
818                    source,
819                    dest,
820                    transformation,
821                } => {
822                    if source == dest {
823                        let result = self.transform_in_place(*source, transformation);
824                        if let Err(ref e) = result {
825                            tracing::error!(
826                                "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
827                                transformation, pc, e, source, &self.registers[*source]
828                            );
829                        }
830                        result?;
831                    } else {
832                        let source_value = &self.registers[*source];
833                        let result = self.apply_transformation(source_value, transformation);
834                        if let Err(ref e) = result {
835                            tracing::error!(
836                                "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
837                                transformation, pc, e, source, source_value
838                            );
839                        }
840                        let value = result?;
841                        self.registers[*dest] = value;
842                    }
843                    pc += 1;
844                }
845                OpCode::EmitMutation {
846                    entity_name,
847                    key,
848                    state,
849                } => {
850                    let primary_key = self.registers[*key].clone();
851
852                    if primary_key.is_null() || dirty_fields.is_empty() {
853                        // Skip mutation if no primary key or no dirty fields
854                        tracing::warn!(
855                            "   ⊘ [VM] Skipping mutation for entity '{}': primary_key={}, dirty_fields.len()={}",
856                            entity_name,
857                            if primary_key.is_null() { "NULL" } else { "present" },
858                            dirty_fields.len()
859                        );
860                        if dirty_fields.is_empty() {
861                            tracing::warn!("      Reason: No fields were modified by this handler");
862                        }
863                        if primary_key.is_null() {
864                            tracing::warn!("      Reason: Primary key could not be resolved (check __resolved_primary_key or key_resolution)");
865                            // Debug: dump register 15, 17, 19, 20 to understand key loading state
866                            tracing::warn!("      Debug: reg[15]={:?}, reg[17]={:?}, reg[19]={:?}, reg[20]={:?}",
867                                self.registers.get(15).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
868                                self.registers.get(17).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
869                                self.registers.get(19).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
870                                self.registers.get(20).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) })
871                            );
872                        }
873                    } else {
874                        let patch = self.extract_partial_state(*state, &dirty_fields)?;
875                        tracing::debug!(
876                            "   Patch structure: {}",
877                            serde_json::to_string_pretty(&patch).unwrap_or_default()
878                        );
879                        let mutation = Mutation {
880                            export: entity_name.clone(),
881                            key: primary_key,
882                            patch,
883                        };
884                        output.push(mutation);
885                    }
886                    pc += 1;
887                }
888                OpCode::SetFieldIfNull {
889                    object,
890                    path,
891                    value,
892                } => {
893                    let val = self.registers[*value].clone();
894                    let was_set = self.set_field_if_null(*object, path, *value)?;
895                    tracing::trace!(
896                        "SetFieldIfNull: path={}, value={:?}, was_set={}",
897                        path,
898                        val,
899                        was_set
900                    );
901                    if was_set {
902                        dirty_fields.insert(path.to_string());
903                    }
904                    pc += 1;
905                }
906                OpCode::SetFieldMax {
907                    object,
908                    path,
909                    value,
910                } => {
911                    let was_updated = self.set_field_max(*object, path, *value)?;
912                    if was_updated {
913                        dirty_fields.insert(path.to_string());
914                    }
915                    pc += 1;
916                }
917                OpCode::UpdateTemporalIndex {
918                    state_id: _,
919                    index_name,
920                    lookup_value,
921                    primary_key,
922                    timestamp,
923                } => {
924                    let actual_state_id = override_state_id;
925                    let state = self
926                        .states
927                        .get_mut(&actual_state_id)
928                        .ok_or("State table not found")?;
929                    let index = state
930                        .temporal_indexes
931                        .entry(index_name.clone())
932                        .or_insert_with(TemporalIndex::new);
933
934                    let lookup_val = self.registers[*lookup_value].clone();
935                    let pk_val = self.registers[*primary_key].clone();
936                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
937                        val
938                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
939                        val as i64
940                    } else {
941                        return Err(format!(
942                            "Timestamp must be a number (i64 or u64), got: {:?}",
943                            self.registers[*timestamp]
944                        )
945                        .into());
946                    };
947
948                    index.insert(lookup_val, pk_val, ts_val);
949                    pc += 1;
950                }
951                OpCode::LookupTemporalIndex {
952                    state_id: _,
953                    index_name,
954                    lookup_value,
955                    timestamp,
956                    dest,
957                } => {
958                    let actual_state_id = override_state_id;
959                    let state = self
960                        .states
961                        .get(&actual_state_id)
962                        .ok_or("State table not found")?;
963                    let lookup_val = &self.registers[*lookup_value];
964
965                    let result = if self.registers[*timestamp].is_null() {
966                        if let Some(index) = state.temporal_indexes.get(index_name) {
967                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
968                        } else {
969                            Value::Null
970                        }
971                    } else {
972                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
973                            val
974                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
975                            val as i64
976                        } else {
977                            return Err(format!(
978                                "Timestamp must be a number (i64 or u64), got: {:?}",
979                                self.registers[*timestamp]
980                            )
981                            .into());
982                        };
983
984                        if let Some(index) = state.temporal_indexes.get(index_name) {
985                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
986                        } else {
987                            Value::Null
988                        }
989                    };
990
991                    self.registers[*dest] = result;
992                    pc += 1;
993                }
994                OpCode::UpdateLookupIndex {
995                    state_id: _,
996                    index_name,
997                    lookup_value,
998                    primary_key,
999                } => {
1000                    let actual_state_id = override_state_id;
1001                    let state = self
1002                        .states
1003                        .get_mut(&actual_state_id)
1004                        .ok_or("State table not found")?;
1005                    let index = state
1006                        .lookup_indexes
1007                        .entry(index_name.clone())
1008                        .or_insert_with(LookupIndex::new);
1009
1010                    let lookup_val = self.registers[*lookup_value].clone();
1011                    let pk_val = self.registers[*primary_key].clone();
1012
1013                    tracing::debug!(
1014                        "📝 UpdateLookupIndex: {} -> {} (index: {})",
1015                        lookup_val,
1016                        pk_val,
1017                        index_name
1018                    );
1019
1020                    index.insert(lookup_val, pk_val);
1021                    pc += 1;
1022                }
1023                OpCode::LookupIndex {
1024                    state_id: _,
1025                    index_name,
1026                    lookup_value,
1027                    dest,
1028                } => {
1029                    let actual_state_id = override_state_id;
1030                    let lookup_val = self.registers[*lookup_value].clone();
1031
1032                    let result = {
1033                        let state = self
1034                            .states
1035                            .get(&actual_state_id)
1036                            .ok_or("State table not found")?;
1037
1038                        if let Some(index) = state.lookup_indexes.get(index_name) {
1039                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1040                            tracing::debug!(
1041                                "🔍 LookupIndex: {} -> {} (index: {}, entries: {})",
1042                                lookup_val,
1043                                found,
1044                                index_name,
1045                                index.len()
1046                            );
1047                            found
1048                        } else {
1049                            Value::Null
1050                        }
1051                    };
1052
1053                    // Fallback: Check PDA reverse lookup table if regular index returned NULL or doesn't exist
1054                    // This handles cases where Lookup key resolution uses PDA addresses
1055                    let final_result = if result.is_null() {
1056                        tracing::debug!(
1057                            "🔍 LookupIndex failed for {}, attempting PDA reverse lookup fallback",
1058                            lookup_val
1059                        );
1060                        if let Some(pda_str) = lookup_val.as_str() {
1061                            let state = self
1062                                .states
1063                                .get_mut(&actual_state_id)
1064                                .ok_or("State table not found")?;
1065
1066                            if let Some(pda_lookup) =
1067                                state.pda_reverse_lookups.get_mut("default_pda_lookup")
1068                            {
1069                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1070                                    tracing::info!(
1071                                        "🔍 LookupIndex (PDA fallback): {} -> {} (via default_pda_lookup)",
1072                                        &pda_str[..pda_str.len().min(8)], resolved
1073                                    );
1074                                    Value::String(resolved)
1075                                } else {
1076                                    tracing::debug!(
1077                                        "🔍 LookupIndex (PDA fallback): {} -> NOT FOUND in PDA reverse lookup",
1078                                        &pda_str[..pda_str.len().min(8)]
1079                                    );
1080                                    Value::Null
1081                                }
1082                            } else {
1083                                tracing::debug!(
1084                                    "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist, no PDA lookup table)",
1085                                    lookup_val, index_name
1086                                );
1087                                Value::Null
1088                            }
1089                        } else {
1090                            tracing::debug!(
1091                                "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist)",
1092                                lookup_val,
1093                                index_name
1094                            );
1095                            Value::Null
1096                        }
1097                    } else {
1098                        result
1099                    };
1100
1101                    self.registers[*dest] = final_result;
1102                    pc += 1;
1103                }
1104                OpCode::SetFieldSum {
1105                    object,
1106                    path,
1107                    value,
1108                } => {
1109                    let was_updated = self.set_field_sum(*object, path, *value)?;
1110                    if was_updated {
1111                        dirty_fields.insert(path.to_string());
1112                    }
1113                    pc += 1;
1114                }
1115                OpCode::SetFieldIncrement { object, path } => {
1116                    let was_updated = self.set_field_increment(*object, path)?;
1117                    if was_updated {
1118                        dirty_fields.insert(path.to_string());
1119                    }
1120                    pc += 1;
1121                }
1122                OpCode::SetFieldMin {
1123                    object,
1124                    path,
1125                    value,
1126                } => {
1127                    let was_updated = self.set_field_min(*object, path, *value)?;
1128                    if was_updated {
1129                        dirty_fields.insert(path.to_string());
1130                    }
1131                    pc += 1;
1132                }
1133                OpCode::AddToUniqueSet {
1134                    state_id: _,
1135                    set_name,
1136                    value,
1137                    count_object,
1138                    count_path,
1139                } => {
1140                    let value_to_add = self.registers[*value].clone();
1141
1142                    // Store the unique set within the entity object, not in the state table
1143                    // This ensures each entity instance has its own unique set
1144                    let set_field_path = format!("__unique_set:{}", set_name);
1145
1146                    // Get or create the unique set from the entity object
1147                    let mut set: HashSet<Value> =
1148                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1149                            if !existing.is_null() {
1150                                serde_json::from_value(existing).unwrap_or_default()
1151                            } else {
1152                                HashSet::new()
1153                            }
1154                        } else {
1155                            HashSet::new()
1156                        };
1157
1158                    // Add value to set
1159                    let was_new = set.insert(value_to_add);
1160
1161                    // Store updated set back in the entity object
1162                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1163                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1164                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1165
1166                    // Update the count field in the object
1167                    if was_new {
1168                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1169                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1170                        dirty_fields.insert(count_path.to_string());
1171                    }
1172
1173                    pc += 1;
1174                }
1175                OpCode::ConditionalSetField {
1176                    object,
1177                    path,
1178                    value,
1179                    condition_field,
1180                    condition_op,
1181                    condition_value,
1182                } => {
1183                    // Load the field value from the event
1184                    let field_value = self.load_field(&event_value, condition_field, None)?;
1185
1186                    // Evaluate the condition
1187                    let condition_met =
1188                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1189
1190                    if condition_met {
1191                        let val = self.registers[*value].clone();
1192                        tracing::trace!(
1193                            "ConditionalSetField: condition met, setting {}={:?}",
1194                            path,
1195                            val
1196                        );
1197                        self.set_field_auto_vivify(*object, path, *value)?;
1198                        dirty_fields.insert(path.to_string());
1199                    } else {
1200                        tracing::trace!(
1201                            "ConditionalSetField: condition not met, skipping {}",
1202                            path
1203                        );
1204                    }
1205                    pc += 1;
1206                }
1207                OpCode::ConditionalIncrement {
1208                    object,
1209                    path,
1210                    condition_field,
1211                    condition_op,
1212                    condition_value,
1213                } => {
1214                    // Load the field value from the event
1215                    let field_value = self.load_field(&event_value, condition_field, None)?;
1216
1217                    // Evaluate the condition
1218                    let condition_met =
1219                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1220
1221                    if condition_met {
1222                        tracing::trace!(
1223                            "ConditionalIncrement: condition met, incrementing {}",
1224                            path
1225                        );
1226                        let was_updated = self.set_field_increment(*object, path)?;
1227                        if was_updated {
1228                            dirty_fields.insert(path.to_string());
1229                        }
1230                    } else {
1231                        tracing::trace!(
1232                            "ConditionalIncrement: condition not met, skipping {}",
1233                            path
1234                        );
1235                    }
1236                    pc += 1;
1237                }
1238                OpCode::EvaluateComputedFields {
1239                    state,
1240                    computed_paths,
1241                } => {
1242                    // Call the registered evaluator if one exists
1243                    if let Some(evaluator) = entity_evaluator {
1244                        let state_value = &mut self.registers[*state];
1245                        match evaluator(state_value) {
1246                            Ok(()) => {
1247                                // Mark computed fields as dirty so they're included in mutations
1248                                for path in computed_paths {
1249                                    dirty_fields.insert(path.clone());
1250                                }
1251                            }
1252                            Err(_e) => {
1253                                // Silently ignore evaluation errors
1254                            }
1255                        }
1256                    }
1257                    pc += 1;
1258                }
1259                OpCode::UpdatePdaReverseLookup {
1260                    state_id: _,
1261                    lookup_name,
1262                    pda_address,
1263                    primary_key,
1264                } => {
1265                    let actual_state_id = override_state_id;
1266                    let state = self
1267                        .states
1268                        .get_mut(&actual_state_id)
1269                        .ok_or("State table not found")?;
1270
1271                    let pda_val = self.registers[*pda_address].clone();
1272                    let pk_val = self.registers[*primary_key].clone();
1273
1274                    // Only update if both values are non-null strings
1275                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1276                        // Get or create the PDA reverse lookup table
1277                        let pda_lookup = state
1278                            .pda_reverse_lookups
1279                            .entry(lookup_name.clone())
1280                            .or_insert_with(|| PdaReverseLookup::new(10000));
1281
1282                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1283
1284                        tracing::debug!(
1285                            "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {})",
1286                            pda_str,
1287                            pk_str,
1288                            lookup_name
1289                        );
1290                    } else if !pk_val.is_null() {
1291                        // Primary key might be a u64, convert to string
1292                        if let Some(pk_num) = pk_val.as_u64() {
1293                            if let Some(pda_str) = pda_val.as_str() {
1294                                let pda_lookup = state
1295                                    .pda_reverse_lookups
1296                                    .entry(lookup_name.clone())
1297                                    .or_insert_with(|| PdaReverseLookup::new(10000));
1298
1299                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1300
1301                                tracing::debug!(
1302                                    "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {}, pk was u64)",
1303                                    pda_str,
1304                                    pk_num,
1305                                    lookup_name
1306                                );
1307                            }
1308                        }
1309                    }
1310
1311                    pc += 1;
1312                }
1313            }
1314
1315            self.instructions_executed += 1;
1316        }
1317
1318        tracing::info!(
1319            "🏁 Handler completed: produced {} mutation(s)",
1320            output.len()
1321        );
1322        Ok(output)
1323    }
1324
1325    fn load_field(
1326        &self,
1327        event_value: &Value,
1328        path: &FieldPath,
1329        default: Option<&Value>,
1330    ) -> Result<Value> {
1331        // Warn if path contains empty segments - this indicates a bug in AST generation
1332        if path.segments.iter().any(|s| s.is_empty()) {
1333            tracing::warn!(
1334                "load_field: path contains empty segment! path={:?}",
1335                path.segments
1336            );
1337        }
1338
1339        if path.segments.is_empty() {
1340            // For WholeSource (empty path), filter out internal metadata fields
1341            // This prevents __account_address, __resolved_primary_key, __update_context
1342            // from appearing in captured account data
1343            if let Some(obj) = event_value.as_object() {
1344                let filtered: serde_json::Map<String, Value> = obj
1345                    .iter()
1346                    .filter(|(k, _)| !k.starts_with("__"))
1347                    .map(|(k, v)| (k.clone(), v.clone()))
1348                    .collect();
1349                return Ok(Value::Object(filtered));
1350            }
1351            return Ok(event_value.clone());
1352        }
1353
1354        let mut current = event_value;
1355        for (i, segment) in path.segments.iter().enumerate() {
1356            current = match current.get(segment) {
1357                Some(v) => v,
1358                None => {
1359                    // Log when we can't find a path segment - this helps debug key loading issues
1360                    if path.segments.len() == 2 && path.segments[0] == "accounts" {
1361                        tracing::warn!(
1362                            "load_field: could not find segment '{}' at index {} in path {:?}",
1363                            segment,
1364                            i,
1365                            path.segments,
1366                        );
1367                        tracing::warn!(
1368                            "  event has top-level keys: {:?}",
1369                            event_value
1370                                .as_object()
1371                                .map(|o| o.keys().collect::<Vec<_>>())
1372                        );
1373                        // Show what's in the accounts object if it exists
1374                        if let Some(accounts) = event_value.get("accounts") {
1375                            tracing::warn!(
1376                                "  'accounts' exists with keys: {:?}",
1377                                accounts.as_object().map(|o| o.keys().collect::<Vec<_>>())
1378                            );
1379                        } else {
1380                            tracing::warn!("  'accounts' key does NOT exist in event_value");
1381                        }
1382                    }
1383                    return Ok(default.cloned().unwrap_or(Value::Null));
1384                }
1385            };
1386        }
1387
1388        Ok(current.clone())
1389    }
1390
1391    fn set_field_auto_vivify(
1392        &mut self,
1393        object_reg: Register,
1394        path: &str,
1395        value_reg: Register,
1396    ) -> Result<()> {
1397        let compiled = self.get_compiled_path(path);
1398        let segments = compiled.segments();
1399        let value = self.registers[value_reg].clone();
1400
1401        if !self.registers[object_reg].is_object() {
1402            self.registers[object_reg] = json!({});
1403        }
1404
1405        let obj = self.registers[object_reg]
1406            .as_object_mut()
1407            .ok_or("Not an object")?;
1408
1409        let mut current = obj;
1410        for (i, segment) in segments.iter().enumerate() {
1411            if i == segments.len() - 1 {
1412                current.insert(segment.to_string(), value.clone());
1413            } else {
1414                current
1415                    .entry(segment.to_string())
1416                    .or_insert_with(|| json!({}));
1417                current = current
1418                    .get_mut(segment)
1419                    .and_then(|v| v.as_object_mut())
1420                    .ok_or("Path collision: expected object")?;
1421            }
1422        }
1423
1424        Ok(())
1425    }
1426
1427    fn set_field_if_null(
1428        &mut self,
1429        object_reg: Register,
1430        path: &str,
1431        value_reg: Register,
1432    ) -> Result<bool> {
1433        let compiled = self.get_compiled_path(path);
1434        let segments = compiled.segments();
1435        let value = self.registers[value_reg].clone();
1436
1437        if !self.registers[object_reg].is_object() {
1438            self.registers[object_reg] = json!({});
1439        }
1440
1441        let obj = self.registers[object_reg]
1442            .as_object_mut()
1443            .ok_or("Not an object")?;
1444
1445        let mut current = obj;
1446        let mut was_set = false;
1447        for (i, segment) in segments.iter().enumerate() {
1448            if i == segments.len() - 1 {
1449                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1450                    current.insert(segment.to_string(), value.clone());
1451                    was_set = true;
1452                }
1453            } else {
1454                current
1455                    .entry(segment.to_string())
1456                    .or_insert_with(|| json!({}));
1457                current = current
1458                    .get_mut(segment)
1459                    .and_then(|v| v.as_object_mut())
1460                    .ok_or("Path collision: expected object")?;
1461            }
1462        }
1463
1464        Ok(was_set)
1465    }
1466
1467    fn set_field_max(
1468        &mut self,
1469        object_reg: Register,
1470        path: &str,
1471        value_reg: Register,
1472    ) -> Result<bool> {
1473        let compiled = self.get_compiled_path(path);
1474        let segments = compiled.segments();
1475        let new_value = self.registers[value_reg].clone();
1476
1477        if !self.registers[object_reg].is_object() {
1478            self.registers[object_reg] = json!({});
1479        }
1480
1481        let obj = self.registers[object_reg]
1482            .as_object_mut()
1483            .ok_or("Not an object")?;
1484
1485        let mut current = obj;
1486        let mut was_updated = false;
1487        for (i, segment) in segments.iter().enumerate() {
1488            if i == segments.len() - 1 {
1489                let should_update = if let Some(current_value) = current.get(segment) {
1490                    if current_value.is_null() {
1491                        true
1492                    } else {
1493                        match (current_value.as_i64(), new_value.as_i64()) {
1494                            (Some(current_val), Some(new_val)) => new_val > current_val,
1495                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1496                                new_value.as_u64().unwrap() as i64 > current_val
1497                            }
1498                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1499                                new_val > current_value.as_u64().unwrap() as i64
1500                            }
1501                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1502                                (Some(current_val), Some(new_val)) => new_val > current_val,
1503                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1504                                    (Some(current_val), Some(new_val)) => new_val > current_val,
1505                                    _ => false,
1506                                },
1507                            },
1508                            _ => false,
1509                        }
1510                    }
1511                } else {
1512                    true
1513                };
1514
1515                if should_update {
1516                    current.insert(segment.to_string(), new_value.clone());
1517                    was_updated = true;
1518                }
1519            } else {
1520                current
1521                    .entry(segment.to_string())
1522                    .or_insert_with(|| json!({}));
1523                current = current
1524                    .get_mut(segment)
1525                    .and_then(|v| v.as_object_mut())
1526                    .ok_or("Path collision: expected object")?;
1527            }
1528        }
1529
1530        Ok(was_updated)
1531    }
1532
1533    fn set_field_sum(
1534        &mut self,
1535        object_reg: Register,
1536        path: &str,
1537        value_reg: Register,
1538    ) -> Result<bool> {
1539        let compiled = self.get_compiled_path(path);
1540        let segments = compiled.segments();
1541        let new_value = self.registers[value_reg].clone();
1542
1543        if !self.registers[object_reg].is_object() {
1544            self.registers[object_reg] = json!({});
1545        }
1546
1547        let obj = self.registers[object_reg]
1548            .as_object_mut()
1549            .ok_or("Not an object")?;
1550
1551        let mut current = obj;
1552        for (i, segment) in segments.iter().enumerate() {
1553            if i == segments.len() - 1 {
1554                // Get current value (default to 0 if null/missing)
1555                let current_val = current
1556                    .get(segment)
1557                    .and_then(|v| {
1558                        if v.is_null() {
1559                            None
1560                        } else {
1561                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1562                        }
1563                    })
1564                    .unwrap_or(0);
1565
1566                // Add new value
1567                let new_val_num = new_value
1568                    .as_i64()
1569                    .or_else(|| new_value.as_u64().map(|n| n as i64))
1570                    .ok_or("Sum requires numeric value")?;
1571
1572                let sum = current_val + new_val_num;
1573                current.insert(segment.to_string(), json!(sum));
1574                return Ok(true);
1575            } else {
1576                current
1577                    .entry(segment.to_string())
1578                    .or_insert_with(|| json!({}));
1579                current = current
1580                    .get_mut(segment)
1581                    .and_then(|v| v.as_object_mut())
1582                    .ok_or("Path collision: expected object")?;
1583            }
1584        }
1585
1586        Ok(false)
1587    }
1588
1589    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1590        let compiled = self.get_compiled_path(path);
1591        let segments = compiled.segments();
1592
1593        if !self.registers[object_reg].is_object() {
1594            self.registers[object_reg] = json!({});
1595        }
1596
1597        let obj = self.registers[object_reg]
1598            .as_object_mut()
1599            .ok_or("Not an object")?;
1600
1601        let mut current = obj;
1602        for (i, segment) in segments.iter().enumerate() {
1603            if i == segments.len() - 1 {
1604                // Get current value (default to 0 if null/missing)
1605                let current_val = current
1606                    .get(segment)
1607                    .and_then(|v| {
1608                        if v.is_null() {
1609                            None
1610                        } else {
1611                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1612                        }
1613                    })
1614                    .unwrap_or(0);
1615
1616                let incremented = current_val + 1;
1617                current.insert(segment.to_string(), json!(incremented));
1618                return Ok(true);
1619            } else {
1620                current
1621                    .entry(segment.to_string())
1622                    .or_insert_with(|| json!({}));
1623                current = current
1624                    .get_mut(segment)
1625                    .and_then(|v| v.as_object_mut())
1626                    .ok_or("Path collision: expected object")?;
1627            }
1628        }
1629
1630        Ok(false)
1631    }
1632
1633    fn set_field_min(
1634        &mut self,
1635        object_reg: Register,
1636        path: &str,
1637        value_reg: Register,
1638    ) -> Result<bool> {
1639        let compiled = self.get_compiled_path(path);
1640        let segments = compiled.segments();
1641        let new_value = self.registers[value_reg].clone();
1642
1643        if !self.registers[object_reg].is_object() {
1644            self.registers[object_reg] = json!({});
1645        }
1646
1647        let obj = self.registers[object_reg]
1648            .as_object_mut()
1649            .ok_or("Not an object")?;
1650
1651        let mut current = obj;
1652        let mut was_updated = false;
1653        for (i, segment) in segments.iter().enumerate() {
1654            if i == segments.len() - 1 {
1655                let should_update = if let Some(current_value) = current.get(segment) {
1656                    if current_value.is_null() {
1657                        true
1658                    } else {
1659                        match (current_value.as_i64(), new_value.as_i64()) {
1660                            (Some(current_val), Some(new_val)) => new_val < current_val,
1661                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1662                                (new_value.as_u64().unwrap() as i64) < current_val
1663                            }
1664                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1665                                new_val < current_value.as_u64().unwrap() as i64
1666                            }
1667                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1668                                (Some(current_val), Some(new_val)) => new_val < current_val,
1669                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1670                                    (Some(current_val), Some(new_val)) => new_val < current_val,
1671                                    _ => false,
1672                                },
1673                            },
1674                            _ => false,
1675                        }
1676                    }
1677                } else {
1678                    true
1679                };
1680
1681                if should_update {
1682                    current.insert(segment.to_string(), new_value.clone());
1683                    was_updated = true;
1684                }
1685            } else {
1686                current
1687                    .entry(segment.to_string())
1688                    .or_insert_with(|| json!({}));
1689                current = current
1690                    .get_mut(segment)
1691                    .and_then(|v| v.as_object_mut())
1692                    .ok_or("Path collision: expected object")?;
1693            }
1694        }
1695
1696        Ok(was_updated)
1697    }
1698
1699    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
1700        let compiled = self.get_compiled_path(path);
1701        let segments = compiled.segments();
1702        let mut current = &self.registers[object_reg];
1703
1704        for segment in segments {
1705            current = current
1706                .get(segment)
1707                .ok_or_else(|| format!("Field not found: {}", segment))?;
1708        }
1709
1710        Ok(current.clone())
1711    }
1712
1713    fn append_to_array(
1714        &mut self,
1715        object_reg: Register,
1716        path: &str,
1717        value_reg: Register,
1718    ) -> Result<()> {
1719        let compiled = self.get_compiled_path(path);
1720        let segments = compiled.segments();
1721        let value = self.registers[value_reg].clone();
1722
1723        if !self.registers[object_reg].is_object() {
1724            self.registers[object_reg] = json!({});
1725        }
1726
1727        let obj = self.registers[object_reg]
1728            .as_object_mut()
1729            .ok_or("Not an object")?;
1730
1731        let mut current = obj;
1732        for (i, segment) in segments.iter().enumerate() {
1733            if i == segments.len() - 1 {
1734                current
1735                    .entry(segment.to_string())
1736                    .or_insert_with(|| json!([]));
1737                let arr = current
1738                    .get_mut(segment)
1739                    .and_then(|v| v.as_array_mut())
1740                    .ok_or("Path is not an array")?;
1741                arr.push(value.clone());
1742            } else {
1743                current
1744                    .entry(segment.to_string())
1745                    .or_insert_with(|| json!({}));
1746                current = current
1747                    .get_mut(segment)
1748                    .and_then(|v| v.as_object_mut())
1749                    .ok_or("Path collision: expected object")?;
1750            }
1751        }
1752
1753        Ok(())
1754    }
1755
1756    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
1757        let value = &self.registers[reg];
1758        let transformed = self.apply_transformation(value, transformation)?;
1759        self.registers[reg] = transformed;
1760        Ok(())
1761    }
1762
1763    fn apply_transformation(
1764        &self,
1765        value: &Value,
1766        transformation: &Transformation,
1767    ) -> Result<Value> {
1768        match transformation {
1769            Transformation::HexEncode => {
1770                if let Some(arr) = value.as_array() {
1771                    let bytes: Vec<u8> = arr
1772                        .iter()
1773                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1774                        .collect();
1775                    let hex = hex::encode(&bytes);
1776                    Ok(json!(hex))
1777                } else {
1778                    Err("HexEncode requires an array of numbers".into())
1779                }
1780            }
1781            Transformation::HexDecode => {
1782                if let Some(s) = value.as_str() {
1783                    let s = s.strip_prefix("0x").unwrap_or(s);
1784                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
1785                    Ok(json!(bytes))
1786                } else {
1787                    Err("HexDecode requires a string".into())
1788                }
1789            }
1790            Transformation::Base58Encode => {
1791                if let Some(arr) = value.as_array() {
1792                    let bytes: Vec<u8> = arr
1793                        .iter()
1794                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1795                        .collect();
1796                    let encoded = bs58::encode(&bytes).into_string();
1797                    Ok(json!(encoded))
1798                } else if value.is_string() {
1799                    // Value is already a string (likely already base58 encoded), return as-is
1800                    tracing::debug!(
1801                        "Base58Encode: value is already a string, passing through: {:?}",
1802                        value
1803                    );
1804                    Ok(value.clone())
1805                } else {
1806                    tracing::error!(
1807                        "Base58Encode failed: value type is {:?}, value: {:?}",
1808                        if value.is_null() {
1809                            "null"
1810                        } else if value.is_boolean() {
1811                            "boolean"
1812                        } else if value.is_number() {
1813                            "number"
1814                        } else if value.is_object() {
1815                            "object"
1816                        } else {
1817                            "unknown"
1818                        },
1819                        value
1820                    );
1821                    Err("Base58Encode requires an array of numbers".into())
1822                }
1823            }
1824            Transformation::Base58Decode => {
1825                if let Some(s) = value.as_str() {
1826                    let bytes = bs58::decode(s)
1827                        .into_vec()
1828                        .map_err(|e| format!("Base58 decode error: {}", e))?;
1829                    Ok(json!(bytes))
1830                } else {
1831                    Err("Base58Decode requires a string".into())
1832                }
1833            }
1834            Transformation::ToString => Ok(json!(value.to_string())),
1835            Transformation::ToNumber => {
1836                if let Some(s) = value.as_str() {
1837                    let n = s
1838                        .parse::<i64>()
1839                        .map_err(|e| format!("Parse error: {}", e))?;
1840                    Ok(json!(n))
1841                } else {
1842                    Ok(value.clone())
1843                }
1844            }
1845        }
1846    }
1847
1848    fn evaluate_comparison(
1849        &self,
1850        field_value: &Value,
1851        op: &ComparisonOp,
1852        condition_value: &Value,
1853    ) -> Result<bool> {
1854        use ComparisonOp::*;
1855
1856        match op {
1857            Equal => Ok(field_value == condition_value),
1858            NotEqual => Ok(field_value != condition_value),
1859            GreaterThan => {
1860                // Try to compare as numbers
1861                match (field_value.as_i64(), condition_value.as_i64()) {
1862                    (Some(a), Some(b)) => Ok(a > b),
1863                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
1864                        (Some(a), Some(b)) => Ok(a > b),
1865                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
1866                            (Some(a), Some(b)) => Ok(a > b),
1867                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
1868                        },
1869                    },
1870                }
1871            }
1872            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1873                (Some(a), Some(b)) => Ok(a >= b),
1874                _ => match (field_value.as_u64(), condition_value.as_u64()) {
1875                    (Some(a), Some(b)) => Ok(a >= b),
1876                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
1877                        (Some(a), Some(b)) => Ok(a >= b),
1878                        _ => {
1879                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
1880                        }
1881                    },
1882                },
1883            },
1884            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
1885                (Some(a), Some(b)) => Ok(a < b),
1886                _ => match (field_value.as_u64(), condition_value.as_u64()) {
1887                    (Some(a), Some(b)) => Ok(a < b),
1888                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
1889                        (Some(a), Some(b)) => Ok(a < b),
1890                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
1891                    },
1892                },
1893            },
1894            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1895                (Some(a), Some(b)) => Ok(a <= b),
1896                _ => match (field_value.as_u64(), condition_value.as_u64()) {
1897                    (Some(a), Some(b)) => Ok(a <= b),
1898                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
1899                        (Some(a), Some(b)) => Ok(a <= b),
1900                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
1901                    },
1902                },
1903            },
1904        }
1905    }
1906
1907    /// Update a PDA reverse lookup and return pending updates for reprocessing
1908    ///
1909    /// After registering the PDA reverse lookup, this returns any pending account updates
1910    /// that were queued for this PDA. The caller should reprocess these through the VM
1911    /// by calling process_event() for each update.
1912    ///
1913    /// # Example
1914    /// ```ignore
1915    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
1916    /// for update in pending {
1917    ///     vm.process_event(&bytecode, update.account_data, &update.account_type)?;
1918    /// }
1919    /// ```
1920    #[cfg_attr(feature = "otel", instrument(
1921        name = "vm.update_pda_lookup",
1922        skip(self),
1923        fields(
1924            pda = %pda_address,
1925            seed = %seed_value,
1926        )
1927    ))]
1928    pub fn update_pda_reverse_lookup(
1929        &mut self,
1930        state_id: u32,
1931        lookup_name: &str,
1932        pda_address: String,
1933        seed_value: String,
1934    ) -> Result<Vec<PendingAccountUpdate>> {
1935        let state = self
1936            .states
1937            .get_mut(&state_id)
1938            .ok_or("State table not found")?;
1939
1940        let lookup = state
1941            .pda_reverse_lookups
1942            .entry(lookup_name.to_string())
1943            .or_insert_with(|| PdaReverseLookup::new(10000));
1944
1945        tracing::info!(
1946            "📝 Registering PDA reverse lookup: {} -> {} (lookup: {})",
1947            pda_address,
1948            seed_value,
1949            lookup_name
1950        );
1951
1952        // Insert and check if an entry was evicted
1953        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
1954
1955        // Clean up pending updates for the evicted PDA
1956        if let Some(ref evicted) = evicted_pda {
1957            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
1958                let count = evicted_updates.len();
1959                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
1960                tracing::info!(
1961                    "Cleaned up {} pending updates for evicted PDA {} from LRU cache",
1962                    count,
1963                    evicted
1964                );
1965            }
1966        }
1967
1968        // Flush and return pending updates for this PDA
1969        self.flush_pending_updates(state_id, &pda_address)
1970    }
1971
1972    /// Clean up expired pending updates that are older than the TTL
1973    ///
1974    /// Returns the number of updates that were removed.
1975    /// This should be called periodically to prevent memory leaks from orphaned updates.
1976    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
1977        let state = match self.states.get_mut(&state_id) {
1978            Some(s) => s,
1979            None => return 0,
1980        };
1981
1982        let now = std::time::SystemTime::now()
1983            .duration_since(std::time::UNIX_EPOCH)
1984            .unwrap()
1985            .as_secs() as i64;
1986
1987        let mut removed_count = 0;
1988
1989        // Iterate through all pending updates and remove expired ones
1990        state.pending_updates.retain(|_pda_address, updates| {
1991            let original_len = updates.len();
1992
1993            updates.retain(|update| {
1994                let age = now - update.queued_at;
1995                age <= PENDING_UPDATE_TTL_SECONDS
1996            });
1997
1998            removed_count += original_len - updates.len();
1999
2000            // Remove the entry entirely if no updates remain
2001            !updates.is_empty()
2002        });
2003
2004        // Update the global counter
2005        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2006
2007        if removed_count > 0 {
2008            tracing::info!(
2009                "Cleaned up {} expired pending updates (TTL: {}s)",
2010                removed_count,
2011                PENDING_UPDATE_TTL_SECONDS
2012            );
2013        }
2014
2015        removed_count
2016    }
2017
2018    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2019    ///
2020    /// # Workflow
2021    ///
2022    /// This implements a deferred processing pattern for account updates when the PDA reverse
2023    /// lookup needed to resolve the primary key is not yet available:
2024    ///
2025    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2026    ///    is not available, call `queue_account_update()` to queue it for later.
2027    ///
2028    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2029    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2030    ///
2031    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2032    ///    ```ignore
2033    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2034    ///    for update in pending {
2035    ///        let mutations = vm.process_event(
2036    ///            &bytecode,
2037    ///            update.account_data,
2038    ///            &update.account_type
2039    ///        )?;
2040    ///        // Handle mutations...
2041    ///    }
2042    ///    ```
2043    ///
2044    /// # Arguments
2045    ///
2046    /// * `state_id` - The state table ID
2047    /// * `pda_address` - The PDA address that needs reverse lookup
2048    /// * `account_type` - The event type name for reprocessing
2049    /// * `account_data` - The account data (event value) for reprocessing
2050    /// * `slot` - The slot number when this update occurred
2051    /// * `signature` - The transaction signature
2052    #[cfg_attr(feature = "otel", instrument(
2053        name = "vm.queue_account_update",
2054        skip(self, account_data),
2055        fields(
2056            pda = %pda_address,
2057            account_type = %account_type,
2058            slot = slot,
2059        )
2060    ))]
2061    pub fn queue_account_update(
2062        &mut self,
2063        state_id: u32,
2064        pda_address: String,
2065        account_type: String,
2066        account_data: Value,
2067        slot: u64,
2068        signature: String,
2069    ) -> Result<()> {
2070        // Check if we've exceeded the global limit
2071        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2072            tracing::warn!(
2073                "Pending queue size limit reached ({}), cleaning up expired updates",
2074                MAX_PENDING_UPDATES_TOTAL
2075            );
2076            let removed = self.cleanup_expired_pending_updates(state_id);
2077
2078            // If still at limit after cleanup, drop the oldest update
2079            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2080                tracing::warn!(
2081                    "Still at limit after cleanup (removed {}), will drop oldest update",
2082                    removed
2083                );
2084                self.drop_oldest_pending_update(state_id)?;
2085            }
2086        }
2087
2088        let state = self
2089            .states
2090            .get_mut(&state_id)
2091            .ok_or("State table not found")?;
2092
2093        tracing::debug!(
2094            "📋 Queued account update for PDA {} (type: {}, slot: {})",
2095            pda_address,
2096            account_type,
2097            slot
2098        );
2099        let pending = PendingAccountUpdate {
2100            account_type,
2101            pda_address: pda_address.clone(),
2102            account_data,
2103            slot,
2104            signature,
2105            queued_at: std::time::SystemTime::now()
2106                .duration_since(std::time::UNIX_EPOCH)
2107                .unwrap()
2108                .as_secs() as i64,
2109        };
2110
2111        // Get or create the vector for this PDA and enforce limits
2112        {
2113            let mut updates = state
2114                .pending_updates
2115                .entry(pda_address.clone())
2116                .or_insert_with(Vec::new);
2117
2118            // Deduplication: Remove any updates with the same or older slot
2119            // Keep only updates with newer slots than the incoming one
2120            let original_len = updates.len();
2121            updates.retain(|existing| {
2122                // Keep if existing has a newer slot
2123                existing.slot > slot
2124            });
2125            let removed_by_dedup = original_len - updates.len();
2126
2127            if removed_by_dedup > 0 {
2128                self.pending_queue_size = self
2129                    .pending_queue_size
2130                    .saturating_sub(removed_by_dedup as u64);
2131                tracing::debug!(
2132                    "Deduplicated {} older update(s) for PDA {} (new slot: {})",
2133                    removed_by_dedup,
2134                    pda_address,
2135                    slot
2136                );
2137            }
2138
2139            // Enforce per-PDA limit after deduplication
2140            if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2141                tracing::warn!(
2142                    "Per-PDA limit reached for {} ({} updates), dropping oldest",
2143                    pda_address,
2144                    updates.len()
2145                );
2146                updates.remove(0);
2147                self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2148            }
2149
2150            updates.push(pending);
2151        }
2152
2153        Ok(())
2154    }
2155
2156    /// Get statistics about the pending queue for monitoring
2157    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2158        let state = self.states.get(&state_id)?;
2159
2160        let now = std::time::SystemTime::now()
2161            .duration_since(std::time::UNIX_EPOCH)
2162            .unwrap()
2163            .as_secs() as i64;
2164
2165        let mut total_updates = 0;
2166        let mut oldest_timestamp = now;
2167        let mut largest_pda_queue = 0;
2168        let mut estimated_memory = 0;
2169
2170        for entry in state.pending_updates.iter() {
2171            let (_, updates) = entry.pair();
2172            total_updates += updates.len();
2173            largest_pda_queue = largest_pda_queue.max(updates.len());
2174
2175            for update in updates.iter() {
2176                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2177                // Rough memory estimate
2178                estimated_memory += update.account_type.len() +
2179                                   update.pda_address.len() +
2180                                   update.signature.len() +
2181                                   16 + // slot + queued_at
2182                                   estimate_json_size(&update.account_data);
2183            }
2184        }
2185
2186        Some(PendingQueueStats {
2187            total_updates,
2188            unique_pdas: state.pending_updates.len(),
2189            oldest_age_seconds: now - oldest_timestamp,
2190            largest_pda_queue_size: largest_pda_queue,
2191            estimated_memory_bytes: estimated_memory,
2192        })
2193    }
2194
2195    /// Drop the oldest pending update across all PDAs
2196    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2197        let state = self
2198            .states
2199            .get_mut(&state_id)
2200            .ok_or("State table not found")?;
2201
2202        let mut oldest_pda: Option<String> = None;
2203        let mut oldest_timestamp = i64::MAX;
2204
2205        // Find the PDA with the oldest update
2206        for entry in state.pending_updates.iter() {
2207            let (pda, updates) = entry.pair();
2208            if let Some(update) = updates.first() {
2209                if update.queued_at < oldest_timestamp {
2210                    oldest_timestamp = update.queued_at;
2211                    oldest_pda = Some(pda.clone());
2212                }
2213            }
2214        }
2215
2216        // Remove the oldest update
2217        if let Some(pda) = oldest_pda {
2218            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2219                if !updates.is_empty() {
2220                    updates.remove(0);
2221                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2222
2223                    // Remove the entry if it's now empty
2224                    if updates.is_empty() {
2225                        drop(updates);
2226                        state.pending_updates.remove(&pda);
2227                    }
2228                }
2229            }
2230        }
2231
2232        Ok(())
2233    }
2234
2235    /// Flush and return pending updates for a PDA for external reprocessing
2236    ///
2237    /// Returns the pending updates that were queued for this PDA address.
2238    /// The caller should reprocess these through the VM using process_event().
2239    fn flush_pending_updates(
2240        &mut self,
2241        state_id: u32,
2242        pda_address: &str,
2243    ) -> Result<Vec<PendingAccountUpdate>> {
2244        let state = self
2245            .states
2246            .get_mut(&state_id)
2247            .ok_or("State table not found")?;
2248
2249        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2250            let count = pending_updates.len();
2251            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2252            tracing::info!(
2253                "🔄 Flushed {} pending account update(s) for PDA {}",
2254                count,
2255                pda_address
2256            );
2257            Ok(pending_updates)
2258        } else {
2259            Ok(Vec::new())
2260        }
2261    }
2262
2263    /// Try to resolve a primary key via PDA reverse lookup
2264    pub fn try_pda_reverse_lookup(
2265        &mut self,
2266        state_id: u32,
2267        lookup_name: &str,
2268        pda_address: &str,
2269    ) -> Option<String> {
2270        let state = self.states.get_mut(&state_id)?;
2271
2272        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2273            if let Some(value) = lookup.lookup(pda_address) {
2274                self.pda_cache_hits += 1;
2275                tracing::debug!(
2276                    "✓ PDA reverse lookup cache hit: {} -> {}",
2277                    pda_address,
2278                    value
2279                );
2280                return Some(value);
2281            }
2282        }
2283
2284        self.pda_cache_misses += 1;
2285        tracing::debug!("✗ PDA reverse lookup cache miss: {}", pda_address);
2286        None
2287    }
2288
2289    // ============================================================================
2290    // Computed Expression Evaluator (Task 5)
2291    // ============================================================================
2292
2293    /// Evaluate a computed expression AST against the current state
2294    /// This is the core runtime evaluator for computed fields from the AST
2295    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2296        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2297    }
2298
2299    /// Evaluate a computed expression with a variable environment (for let bindings)
2300    fn evaluate_computed_expr_with_env(
2301        &self,
2302        expr: &ComputedExpr,
2303        state: &Value,
2304        env: &std::collections::HashMap<String, Value>,
2305    ) -> Result<Value> {
2306        match expr {
2307            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2308
2309            ComputedExpr::Var { name } => env
2310                .get(name)
2311                .cloned()
2312                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2313
2314            ComputedExpr::Let { name, value, body } => {
2315                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2316                let mut new_env = env.clone();
2317                new_env.insert(name.clone(), val);
2318                self.evaluate_computed_expr_with_env(body, state, &new_env)
2319            }
2320
2321            ComputedExpr::If {
2322                condition,
2323                then_branch,
2324                else_branch,
2325            } => {
2326                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2327                if self.value_to_bool(&cond_val) {
2328                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2329                } else {
2330                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2331                }
2332            }
2333
2334            ComputedExpr::None => Ok(Value::Null),
2335
2336            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2337
2338            ComputedExpr::Slice { expr, start, end } => {
2339                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2340                match val {
2341                    Value::Array(arr) => {
2342                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2343                        Ok(Value::Array(slice))
2344                    }
2345                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2346                }
2347            }
2348
2349            ComputedExpr::Index { expr, index } => {
2350                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2351                match val {
2352                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2353                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2354                }
2355            }
2356
2357            ComputedExpr::U64FromLeBytes { bytes } => {
2358                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2359                let byte_vec = self.value_to_bytes(&val)?;
2360                if byte_vec.len() < 8 {
2361                    return Err(format!(
2362                        "u64::from_le_bytes requires 8 bytes, got {}",
2363                        byte_vec.len()
2364                    )
2365                    .into());
2366                }
2367                let arr: [u8; 8] = byte_vec[..8]
2368                    .try_into()
2369                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2370                Ok(json!(u64::from_le_bytes(arr)))
2371            }
2372
2373            ComputedExpr::U64FromBeBytes { bytes } => {
2374                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2375                let byte_vec = self.value_to_bytes(&val)?;
2376                if byte_vec.len() < 8 {
2377                    return Err(format!(
2378                        "u64::from_be_bytes requires 8 bytes, got {}",
2379                        byte_vec.len()
2380                    )
2381                    .into());
2382                }
2383                let arr: [u8; 8] = byte_vec[..8]
2384                    .try_into()
2385                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2386                Ok(json!(u64::from_be_bytes(arr)))
2387            }
2388
2389            ComputedExpr::ByteArray { bytes } => {
2390                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2391            }
2392
2393            ComputedExpr::Closure { param, body } => {
2394                // Closures are stored as-is; they're evaluated when used in map()
2395                // Return a special representation
2396                Ok(json!({
2397                    "__closure": {
2398                        "param": param,
2399                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
2400                    }
2401                }))
2402            }
2403
2404            ComputedExpr::Unary { op, expr } => {
2405                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2406                self.apply_unary_op(op, &val)
2407            }
2408
2409            ComputedExpr::JsonToBytes { expr } => {
2410                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2411                // Convert JSON array of numbers to byte array
2412                let bytes = self.value_to_bytes(&val)?;
2413                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2414            }
2415
2416            ComputedExpr::UnwrapOr { expr, default } => {
2417                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2418                if val.is_null() {
2419                    Ok(default.clone())
2420                } else {
2421                    Ok(val)
2422                }
2423            }
2424
2425            ComputedExpr::Binary { op, left, right } => {
2426                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2427                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2428                self.apply_binary_op(op, &l, &r)
2429            }
2430
2431            ComputedExpr::Cast { expr, to_type } => {
2432                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2433                self.apply_cast(&val, to_type)
2434            }
2435
2436            ComputedExpr::MethodCall { expr, method, args } => {
2437                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2438                // Special handling for map() with closures
2439                if method == "map" && args.len() == 1 {
2440                    if let ComputedExpr::Closure { param, body } = &args[0] {
2441                        // If the value is null, return null (Option::None.map returns None)
2442                        if val.is_null() {
2443                            return Ok(Value::Null);
2444                        }
2445                        // Evaluate the closure body with the value bound to param
2446                        let mut closure_env = env.clone();
2447                        closure_env.insert(param.clone(), val);
2448                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2449                    }
2450                }
2451                let evaluated_args: Vec<Value> = args
2452                    .iter()
2453                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2454                    .collect::<Result<Vec<_>>>()?;
2455                self.apply_method_call(&val, method, &evaluated_args)
2456            }
2457
2458            ComputedExpr::Literal { value } => Ok(value.clone()),
2459
2460            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2461        }
2462    }
2463
2464    /// Convert a JSON value to a byte vector
2465    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2466        match val {
2467            Value::Array(arr) => arr
2468                .iter()
2469                .map(|v| {
2470                    v.as_u64()
2471                        .map(|n| n as u8)
2472                        .ok_or_else(|| "Array element not a valid byte".into())
2473                })
2474                .collect(),
2475            Value::String(s) => {
2476                // Try to decode as hex
2477                if s.starts_with("0x") || s.starts_with("0X") {
2478                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2479                } else {
2480                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2481                }
2482            }
2483            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2484        }
2485    }
2486
2487    /// Apply a unary operation
2488    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2489        use crate::ast::UnaryOp;
2490        match op {
2491            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
2492            UnaryOp::ReverseBits => match val.as_u64() {
2493                Some(n) => Ok(json!(n.reverse_bits())),
2494                None => match val.as_i64() {
2495                    Some(n) => Ok(json!((n as u64).reverse_bits())),
2496                    None => Err("reverse_bits requires an integer".into()),
2497                },
2498            },
2499        }
2500    }
2501
2502    /// Get a field value from state by path (e.g., "section.field" or just "field")
2503    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2504        let segments: Vec<&str> = path.split('.').collect();
2505        let mut current = state;
2506
2507        for segment in segments {
2508            match current.get(segment) {
2509                Some(v) => current = v,
2510                None => return Ok(Value::Null),
2511            }
2512        }
2513
2514        Ok(current.clone())
2515    }
2516
2517    /// Apply a binary operation to two values
2518    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2519        match op {
2520            // Arithmetic operations
2521            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2522            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2523            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2524            BinaryOp::Div => {
2525                // Check for division by zero
2526                if let Some(r) = right.as_i64() {
2527                    if r == 0 {
2528                        return Err("Division by zero".into());
2529                    }
2530                }
2531                if let Some(r) = right.as_f64() {
2532                    if r == 0.0 {
2533                        return Err("Division by zero".into());
2534                    }
2535                }
2536                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2537            }
2538            BinaryOp::Mod => {
2539                // Modulo - only for integers
2540                match (left.as_i64(), right.as_i64()) {
2541                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2542                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
2543                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2544                        _ => Err("Modulo requires non-zero integer operands".into()),
2545                    },
2546                    _ => Err("Modulo by zero".into()),
2547                }
2548            }
2549
2550            // Comparison operations
2551            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2552            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2553            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2554            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2555            BinaryOp::Eq => Ok(json!(left == right)),
2556            BinaryOp::Ne => Ok(json!(left != right)),
2557
2558            // Logical operations
2559            BinaryOp::And => {
2560                let l_bool = self.value_to_bool(left);
2561                let r_bool = self.value_to_bool(right);
2562                Ok(json!(l_bool && r_bool))
2563            }
2564            BinaryOp::Or => {
2565                let l_bool = self.value_to_bool(left);
2566                let r_bool = self.value_to_bool(right);
2567                Ok(json!(l_bool || r_bool))
2568            }
2569
2570            // Bitwise operations
2571            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
2572                (Some(a), Some(b)) => Ok(json!(a ^ b)),
2573                _ => match (left.as_i64(), right.as_i64()) {
2574                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
2575                    _ => Err("XOR requires integer operands".into()),
2576                },
2577            },
2578            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
2579                (Some(a), Some(b)) => Ok(json!(a & b)),
2580                _ => match (left.as_i64(), right.as_i64()) {
2581                    (Some(a), Some(b)) => Ok(json!(a & b)),
2582                    _ => Err("BitAnd requires integer operands".into()),
2583                },
2584            },
2585            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
2586                (Some(a), Some(b)) => Ok(json!(a | b)),
2587                _ => match (left.as_i64(), right.as_i64()) {
2588                    (Some(a), Some(b)) => Ok(json!(a | b)),
2589                    _ => Err("BitOr requires integer operands".into()),
2590                },
2591            },
2592            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
2593                (Some(a), Some(b)) => Ok(json!(a << b)),
2594                _ => match (left.as_i64(), right.as_i64()) {
2595                    (Some(a), Some(b)) => Ok(json!(a << b)),
2596                    _ => Err("Shl requires integer operands".into()),
2597                },
2598            },
2599            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
2600                (Some(a), Some(b)) => Ok(json!(a >> b)),
2601                _ => match (left.as_i64(), right.as_i64()) {
2602                    (Some(a), Some(b)) => Ok(json!(a >> b)),
2603                    _ => Err("Shr requires integer operands".into()),
2604                },
2605            },
2606        }
2607    }
2608
2609    /// Helper for numeric operations that can work on integers or floats
2610    fn numeric_op<F1, F2>(
2611        &self,
2612        left: &Value,
2613        right: &Value,
2614        int_op: F1,
2615        float_op: F2,
2616    ) -> Result<Value>
2617    where
2618        F1: Fn(i64, i64) -> i64,
2619        F2: Fn(f64, f64) -> f64,
2620    {
2621        // Try i64 first
2622        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2623            return Ok(json!(int_op(a, b)));
2624        }
2625
2626        // Try u64
2627        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2628            // For u64, we need to be careful with underflow in subtraction
2629            return Ok(json!(int_op(a as i64, b as i64)));
2630        }
2631
2632        // Try f64
2633        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2634            return Ok(json!(float_op(a, b)));
2635        }
2636
2637        // If either is null, return null
2638        if left.is_null() || right.is_null() {
2639            return Ok(Value::Null);
2640        }
2641
2642        Err(format!(
2643            "Cannot perform numeric operation on {:?} and {:?}",
2644            left, right
2645        )
2646        .into())
2647    }
2648
2649    /// Helper for comparison operations
2650    fn comparison_op<F1, F2>(
2651        &self,
2652        left: &Value,
2653        right: &Value,
2654        int_cmp: F1,
2655        float_cmp: F2,
2656    ) -> Result<Value>
2657    where
2658        F1: Fn(i64, i64) -> bool,
2659        F2: Fn(f64, f64) -> bool,
2660    {
2661        // Try i64 first
2662        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2663            return Ok(json!(int_cmp(a, b)));
2664        }
2665
2666        // Try u64
2667        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2668            return Ok(json!(int_cmp(a as i64, b as i64)));
2669        }
2670
2671        // Try f64
2672        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2673            return Ok(json!(float_cmp(a, b)));
2674        }
2675
2676        // If either is null, comparison returns false
2677        if left.is_null() || right.is_null() {
2678            return Ok(json!(false));
2679        }
2680
2681        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
2682    }
2683
2684    /// Convert a value to boolean for logical operations
2685    fn value_to_bool(&self, value: &Value) -> bool {
2686        match value {
2687            Value::Null => false,
2688            Value::Bool(b) => *b,
2689            Value::Number(n) => {
2690                if let Some(i) = n.as_i64() {
2691                    i != 0
2692                } else if let Some(f) = n.as_f64() {
2693                    f != 0.0
2694                } else {
2695                    true
2696                }
2697            }
2698            Value::String(s) => !s.is_empty(),
2699            Value::Array(arr) => !arr.is_empty(),
2700            Value::Object(obj) => !obj.is_empty(),
2701        }
2702    }
2703
2704    /// Apply a type cast to a value
2705    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
2706        match to_type {
2707            "i8" | "i16" | "i32" | "i64" | "isize" => {
2708                if let Some(n) = value.as_i64() {
2709                    Ok(json!(n))
2710                } else if let Some(n) = value.as_u64() {
2711                    Ok(json!(n as i64))
2712                } else if let Some(n) = value.as_f64() {
2713                    Ok(json!(n as i64))
2714                } else if let Some(s) = value.as_str() {
2715                    s.parse::<i64>()
2716                        .map(|n| json!(n))
2717                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
2718                } else {
2719                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2720                }
2721            }
2722            "u8" | "u16" | "u32" | "u64" | "usize" => {
2723                if let Some(n) = value.as_u64() {
2724                    Ok(json!(n))
2725                } else if let Some(n) = value.as_i64() {
2726                    Ok(json!(n as u64))
2727                } else if let Some(n) = value.as_f64() {
2728                    Ok(json!(n as u64))
2729                } else if let Some(s) = value.as_str() {
2730                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
2731                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
2732                    })
2733                } else {
2734                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2735                }
2736            }
2737            "f32" | "f64" => {
2738                if let Some(n) = value.as_f64() {
2739                    Ok(json!(n))
2740                } else if let Some(n) = value.as_i64() {
2741                    Ok(json!(n as f64))
2742                } else if let Some(n) = value.as_u64() {
2743                    Ok(json!(n as f64))
2744                } else if let Some(s) = value.as_str() {
2745                    s.parse::<f64>()
2746                        .map(|n| json!(n))
2747                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
2748                } else {
2749                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2750                }
2751            }
2752            "String" | "string" => Ok(json!(value.to_string())),
2753            "bool" => Ok(json!(self.value_to_bool(value))),
2754            _ => {
2755                // Unknown type, return value as-is
2756                Ok(value.clone())
2757            }
2758        }
2759    }
2760
2761    /// Apply a method call to a value
2762    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
2763        match method {
2764            "unwrap_or" => {
2765                if value.is_null() && !args.is_empty() {
2766                    Ok(args[0].clone())
2767                } else {
2768                    Ok(value.clone())
2769                }
2770            }
2771            "unwrap_or_default" => {
2772                if value.is_null() {
2773                    // Return default for common types
2774                    Ok(json!(0))
2775                } else {
2776                    Ok(value.clone())
2777                }
2778            }
2779            "is_some" => Ok(json!(!value.is_null())),
2780            "is_none" => Ok(json!(value.is_null())),
2781            "abs" => {
2782                if let Some(n) = value.as_i64() {
2783                    Ok(json!(n.abs()))
2784                } else if let Some(n) = value.as_f64() {
2785                    Ok(json!(n.abs()))
2786                } else {
2787                    Err(format!("Cannot call abs() on {:?}", value).into())
2788                }
2789            }
2790            "len" => {
2791                if let Some(s) = value.as_str() {
2792                    Ok(json!(s.len()))
2793                } else if let Some(arr) = value.as_array() {
2794                    Ok(json!(arr.len()))
2795                } else if let Some(obj) = value.as_object() {
2796                    Ok(json!(obj.len()))
2797                } else {
2798                    Err(format!("Cannot call len() on {:?}", value).into())
2799                }
2800            }
2801            "to_string" => Ok(json!(value.to_string())),
2802            "min" => {
2803                if args.is_empty() {
2804                    return Err("min() requires an argument".into());
2805                }
2806                let other = &args[0];
2807                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2808                    Ok(json!(a.min(b)))
2809                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2810                    Ok(json!(a.min(b)))
2811                } else {
2812                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
2813                }
2814            }
2815            "max" => {
2816                if args.is_empty() {
2817                    return Err("max() requires an argument".into());
2818                }
2819                let other = &args[0];
2820                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2821                    Ok(json!(a.max(b)))
2822                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2823                    Ok(json!(a.max(b)))
2824                } else {
2825                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
2826                }
2827            }
2828            "saturating_add" => {
2829                if args.is_empty() {
2830                    return Err("saturating_add() requires an argument".into());
2831                }
2832                let other = &args[0];
2833                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2834                    Ok(json!(a.saturating_add(b)))
2835                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2836                    Ok(json!(a.saturating_add(b)))
2837                } else {
2838                    Err(format!(
2839                        "Cannot call saturating_add() on {:?} and {:?}",
2840                        value, other
2841                    )
2842                    .into())
2843                }
2844            }
2845            "saturating_sub" => {
2846                if args.is_empty() {
2847                    return Err("saturating_sub() requires an argument".into());
2848                }
2849                let other = &args[0];
2850                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2851                    Ok(json!(a.saturating_sub(b)))
2852                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2853                    Ok(json!(a.saturating_sub(b)))
2854                } else {
2855                    Err(format!(
2856                        "Cannot call saturating_sub() on {:?} and {:?}",
2857                        value, other
2858                    )
2859                    .into())
2860                }
2861            }
2862            _ => {
2863                // Unknown method - return value unchanged with a warning
2864                tracing::warn!("Unknown method call: {}() on {:?}", method, value);
2865                Ok(value.clone())
2866            }
2867        }
2868    }
2869
2870    /// Evaluate all computed fields for an entity and update the state
2871    /// This takes a list of ComputedFieldSpec from the AST and applies them
2872    pub fn evaluate_computed_fields_from_ast(
2873        &self,
2874        state: &mut Value,
2875        computed_field_specs: &[ComputedFieldSpec],
2876    ) -> Result<Vec<String>> {
2877        let mut updated_paths = Vec::new();
2878
2879        for spec in computed_field_specs {
2880            match self.evaluate_computed_expr(&spec.expression, state) {
2881                Ok(result) => {
2882                    // Set the computed value in state
2883                    self.set_field_in_state(state, &spec.target_path, result)?;
2884                    updated_paths.push(spec.target_path.clone());
2885                }
2886                Err(e) => {
2887                    // Log error but continue with other computed fields
2888                    tracing::warn!(
2889                        "Failed to evaluate computed field '{}': {}",
2890                        spec.target_path,
2891                        e
2892                    );
2893                }
2894            }
2895        }
2896
2897        Ok(updated_paths)
2898    }
2899
2900    /// Set a field value in state by path (e.g., "section.field")
2901    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
2902        let segments: Vec<&str> = path.split('.').collect();
2903
2904        if segments.is_empty() {
2905            return Err("Empty path".into());
2906        }
2907
2908        // Navigate to parent, creating intermediate objects as needed
2909        let mut current = state;
2910        for (i, segment) in segments.iter().enumerate() {
2911            if i == segments.len() - 1 {
2912                // Last segment - set the value
2913                if let Some(obj) = current.as_object_mut() {
2914                    obj.insert(segment.to_string(), value);
2915                    return Ok(());
2916                } else {
2917                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
2918                }
2919            } else {
2920                // Intermediate segment - navigate or create
2921                if !current.is_object() {
2922                    *current = json!({});
2923                }
2924                let obj = current.as_object_mut().unwrap();
2925                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
2926            }
2927        }
2928
2929        Ok(())
2930    }
2931
2932    /// Create a computed fields evaluator closure from AST specs
2933    /// This returns a function that can be passed to the bytecode builder
2934    pub fn create_evaluator_from_specs(
2935        specs: Vec<ComputedFieldSpec>,
2936    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
2937        move |state: &mut Value| {
2938            // Create a temporary VmContext just for evaluation
2939            // (We only need the expression evaluation methods)
2940            let vm = VmContext::new();
2941            vm.evaluate_computed_fields_from_ast(state, &specs)?;
2942            Ok(())
2943        }
2944    }
2945}
2946
2947impl Default for VmContext {
2948    fn default() -> Self {
2949        Self::new()
2950    }
2951}
2952
2953// Implement the ReverseLookupUpdater trait for VmContext
2954impl crate::resolvers::ReverseLookupUpdater for VmContext {
2955    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
2956        // Use default state_id=0 and default lookup name
2957        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
2958            .unwrap_or_else(|e| {
2959                tracing::error!("Failed to update PDA reverse lookup: {}", e);
2960                Vec::new()
2961            })
2962    }
2963
2964    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
2965        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
2966        self.flush_pending_updates(0, pda_address)
2967            .unwrap_or_else(|e| {
2968                tracing::error!("Failed to flush pending updates: {}", e);
2969                Vec::new()
2970            })
2971    }
2972}