hyperstack_interpreter/
vm.rs

1use crate::ast::{BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation};
2use crate::compiler::{MultiEntityBytecode, OpCode};
3use crate::Mutation;
4use dashmap::DashMap;
5use lru::LruCache;
6use serde_json::{json, Value};
7use std::collections::{HashMap, HashSet};
8use std::num::NonZeroUsize;
9
10#[cfg(feature = "otel")]
11use tracing::instrument;
12
13/// Context metadata for blockchain updates (accounts and instructions)
14/// This structure is designed to be extended over time with additional metadata
15#[derive(Debug, Clone, Default)]
16pub struct UpdateContext {
17    /// Blockchain slot number
18    pub slot: Option<u64>,
19    /// Transaction signature
20    pub signature: Option<String>,
21    /// Unix timestamp (seconds since epoch)
22    /// If not provided, will default to current system time when accessed
23    pub timestamp: Option<i64>,
24    /// Additional custom metadata that can be added without breaking changes
25    pub metadata: HashMap<String, Value>,
26}
27
28impl UpdateContext {
29    /// Create a new UpdateContext with slot and signature
30    pub fn new(slot: u64, signature: String) -> Self {
31        Self {
32            slot: Some(slot),
33            signature: Some(signature),
34            timestamp: None,
35            metadata: HashMap::new(),
36        }
37    }
38
39    /// Create a new UpdateContext with slot, signature, and timestamp
40    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
41        Self {
42            slot: Some(slot),
43            signature: Some(signature),
44            timestamp: Some(timestamp),
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Get the timestamp, falling back to current system time if not set
50    pub fn timestamp(&self) -> i64 {
51        self.timestamp.unwrap_or_else(|| {
52            std::time::SystemTime::now()
53                .duration_since(std::time::UNIX_EPOCH)
54                .unwrap()
55                .as_secs() as i64
56        })
57    }
58
59    /// Create an empty context (for testing or when context is not available)
60    pub fn empty() -> Self {
61        Self::default()
62    }
63
64    /// Add custom metadata
65    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
66        self.metadata.insert(key, value);
67        self
68    }
69
70    /// Get metadata value
71    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
72        self.metadata.get(key)
73    }
74
75    /// Convert context to JSON value for injection into event data
76    pub fn to_value(&self) -> Value {
77        let mut obj = serde_json::Map::new();
78        if let Some(slot) = self.slot {
79            obj.insert("slot".to_string(), json!(slot));
80        }
81        if let Some(ref sig) = self.signature {
82            obj.insert("signature".to_string(), json!(sig));
83        }
84        // Always include timestamp (use current time if not set)
85        obj.insert("timestamp".to_string(), json!(self.timestamp()));
86        for (key, value) in &self.metadata {
87            obj.insert(key.clone(), value.clone());
88        }
89        Value::Object(obj)
90    }
91}
92
93pub type Register = usize;
94pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
95
96pub type RegisterValue = Value;
97
98/// Trait for evaluating computed fields
99/// Implement this in your generated spec to enable computed field evaluation
100pub trait ComputedFieldsEvaluator {
101    fn evaluate(&self, state: &mut Value) -> Result<()>;
102}
103
104// Pending queue configuration
105const MAX_PENDING_UPDATES_TOTAL: usize = 10_000;
106const MAX_PENDING_UPDATES_PER_PDA: usize = 10;
107const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
108
109/// Estimate the size of a JSON value in bytes
110fn estimate_json_size(value: &Value) -> usize {
111    match value {
112        Value::Null => 4,
113        Value::Bool(_) => 5,
114        Value::Number(_) => 8,
115        Value::String(s) => s.len() + 2,
116        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
117        Value::Object(obj) => {
118            2 + obj
119                .iter()
120                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
121                .sum::<usize>()
122        }
123    }
124}
125
126#[derive(Debug, Clone)]
127pub struct CompiledPath {
128    pub segments: std::sync::Arc<[String]>,
129}
130
131impl CompiledPath {
132    pub fn new(path: &str) -> Self {
133        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
134        CompiledPath {
135            segments: segments.into(),
136        }
137    }
138
139    fn segments(&self) -> &[String] {
140        &self.segments
141    }
142}
143
144pub struct VmContext {
145    registers: Vec<RegisterValue>,
146    states: HashMap<u32, StateTable>,
147    pub instructions_executed: u64,
148    pub cache_hits: u64,
149    path_cache: HashMap<String, CompiledPath>,
150    pub pda_cache_hits: u64,
151    pub pda_cache_misses: u64,
152    pub pending_queue_size: u64,
153    /// Current update context (set during execute_handler)
154    current_context: Option<UpdateContext>,
155}
156
157#[derive(Debug)]
158pub struct LookupIndex {
159    index: DashMap<Value, Value>,
160}
161
162impl Default for LookupIndex {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168impl LookupIndex {
169    pub fn new() -> Self {
170        LookupIndex {
171            index: DashMap::new(),
172        }
173    }
174
175    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
176        self.index.get(lookup_value).map(|v| v.clone())
177    }
178
179    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
180        self.index.insert(lookup_value, primary_key);
181    }
182
183    pub fn len(&self) -> usize {
184        self.index.len()
185    }
186
187    pub fn is_empty(&self) -> bool {
188        self.index.is_empty()
189    }
190}
191
192#[derive(Debug)]
193pub struct TemporalIndex {
194    index: DashMap<Value, Vec<(Value, i64)>>,
195}
196
197impl Default for TemporalIndex {
198    fn default() -> Self {
199        Self::new()
200    }
201}
202
203impl TemporalIndex {
204    pub fn new() -> Self {
205        TemporalIndex {
206            index: DashMap::new(),
207        }
208    }
209
210    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
211        if let Some(entries) = self.index.get(lookup_value) {
212            let entries_vec = entries.value();
213            for i in (0..entries_vec.len()).rev() {
214                if entries_vec[i].1 <= timestamp {
215                    return Some(entries_vec[i].0.clone());
216                }
217            }
218        }
219        None
220    }
221
222    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
223        if let Some(entries) = self.index.get(lookup_value) {
224            let entries_vec = entries.value();
225            if let Some(last) = entries_vec.last() {
226                return Some(last.0.clone());
227            }
228        }
229        None
230    }
231
232    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
233        let lookup_key = lookup_value.clone();
234        self.index
235            .entry(lookup_value)
236            .or_default()
237            .push((primary_key, timestamp));
238
239        if let Some(mut entries) = self.index.get_mut(&lookup_key) {
240            entries.sort_by_key(|(_, ts)| *ts);
241        }
242    }
243}
244
245#[derive(Debug)]
246pub struct PdaReverseLookup {
247    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
248    index: LruCache<String, String>,
249}
250
251impl PdaReverseLookup {
252    pub fn new(capacity: usize) -> Self {
253        PdaReverseLookup {
254            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
255        }
256    }
257
258    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
259        self.index.get(pda_address).cloned()
260    }
261
262    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
263        // Check if we're at capacity and will evict
264        let evicted = if self.index.len() >= self.index.cap().get() {
265            // Get the LRU key that will be evicted
266            self.index.peek_lru().map(|(k, _)| k.clone())
267        } else {
268            None
269        };
270
271        self.index.put(pda_address, seed_value);
272        evicted
273    }
274}
275
276#[derive(Debug, Clone)]
277pub struct PendingAccountUpdate {
278    pub account_type: String,
279    pub pda_address: String,
280    pub account_data: Value,
281    pub slot: u64,
282    pub signature: String,
283    pub queued_at: i64,
284}
285
286#[derive(Debug, Clone)]
287pub struct PendingQueueStats {
288    pub total_updates: usize,
289    pub unique_pdas: usize,
290    pub oldest_age_seconds: i64,
291    pub largest_pda_queue_size: usize,
292    pub estimated_memory_bytes: usize,
293}
294
295#[derive(Debug)]
296pub struct StateTable {
297    pub data: DashMap<Value, Value>,
298    pub lookup_indexes: HashMap<String, LookupIndex>,
299    pub temporal_indexes: HashMap<String, TemporalIndex>,
300    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
301    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
302}
303
304impl VmContext {
305    pub fn new() -> Self {
306        let mut vm = VmContext {
307            registers: vec![Value::Null; 256],
308            states: HashMap::new(),
309            instructions_executed: 0,
310            cache_hits: 0,
311            path_cache: HashMap::new(),
312            pda_cache_hits: 0,
313            pda_cache_misses: 0,
314            pending_queue_size: 0,
315            current_context: None,
316        };
317        vm.states.insert(
318            0,
319            StateTable {
320                data: DashMap::new(),
321                lookup_indexes: HashMap::new(),
322                temporal_indexes: HashMap::new(),
323                pda_reverse_lookups: HashMap::new(),
324                pending_updates: DashMap::new(),
325            },
326        );
327        vm
328    }
329
330    /// Get a mutable reference to a state table by ID
331    /// Returns None if the state ID doesn't exist
332    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
333        self.states.get_mut(&state_id)
334    }
335
336    /// Get public access to registers (for metrics context)
337    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
338        &mut self.registers
339    }
340
341    /// Get public access to path cache (for metrics context)
342    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
343        &self.path_cache
344    }
345
346    /// Get the current update context
347    pub fn current_context(&self) -> Option<&UpdateContext> {
348        self.current_context.as_ref()
349    }
350
351    /// Update the state table with the current value in a register
352    /// This allows imperative hooks to persist their changes to the state table
353    pub fn update_state_from_register(
354        &mut self,
355        state_id: u32,
356        key: Value,
357        register: Register,
358    ) -> Result<()> {
359        let state = self.states.get(&state_id).ok_or("State table not found")?;
360        let value = self.registers[register].clone();
361        state.data.insert(key, value);
362        Ok(())
363    }
364
365    fn reset_registers(&mut self) {
366        for reg in &mut self.registers {
367            *reg = Value::Null;
368        }
369    }
370
371    /// Extract only the dirty fields from state (public for use by instruction hooks)
372    pub fn extract_partial_state(
373        &self,
374        state_reg: Register,
375        dirty_fields: &HashSet<String>,
376    ) -> Result<Value> {
377        let full_state = &self.registers[state_reg];
378
379        if dirty_fields.is_empty() {
380            return Ok(json!({}));
381        }
382
383        let mut partial = serde_json::Map::new();
384
385        for path in dirty_fields {
386            let segments: Vec<&str> = path.split('.').collect();
387
388            let mut current = full_state;
389            let mut found = true;
390
391            for segment in &segments {
392                match current.get(segment) {
393                    Some(v) => current = v,
394                    None => {
395                        found = false;
396                        break;
397                    }
398                }
399            }
400
401            if !found {
402                continue;
403            }
404
405            let mut target = &mut partial;
406            for (i, segment) in segments.iter().enumerate() {
407                if i == segments.len() - 1 {
408                    target.insert(segment.to_string(), current.clone());
409                } else {
410                    target
411                        .entry(segment.to_string())
412                        .or_insert_with(|| json!({}));
413                    target = target
414                        .get_mut(*segment)
415                        .and_then(|v| v.as_object_mut())
416                        .ok_or("Failed to build nested structure")?;
417                }
418            }
419        }
420
421        Ok(Value::Object(partial))
422    }
423
424    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
425        if let Some(compiled) = self.path_cache.get(path) {
426            self.cache_hits += 1;
427            return compiled.clone();
428        }
429        let compiled = CompiledPath::new(path);
430        self.path_cache.insert(path.to_string(), compiled.clone());
431        compiled
432    }
433
434    /// Process an event with optional context metadata
435    #[cfg_attr(feature = "otel", instrument(
436        name = "vm.process_event",
437        skip(self, bytecode, event_value, context),
438        fields(
439            event_type = %event_type,
440            slot = context.as_ref().and_then(|c| c.slot),
441        )
442    ))]
443    pub fn process_event_with_context(
444        &mut self,
445        bytecode: &MultiEntityBytecode,
446        mut event_value: Value,
447        event_type: &str,
448        context: Option<&UpdateContext>,
449    ) -> Result<Vec<Mutation>> {
450        // Store context for use during handler execution
451        self.current_context = context.cloned();
452
453        // Inject context metadata into event value if provided
454        if let Some(ctx) = context {
455            if let Some(obj) = event_value.as_object_mut() {
456                obj.insert("__update_context".to_string(), ctx.to_value());
457            }
458        }
459
460        let mut all_mutations = Vec::new();
461
462        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
463            tracing::debug!(
464                "🔀 Event type '{}' routes to {} entity/entities",
465                event_type,
466                entity_names.len()
467            );
468
469            for entity_name in entity_names {
470                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
471                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
472                        tracing::debug!(
473                            "   ▶️  Executing handler for entity '{}', event '{}'",
474                            entity_name,
475                            event_type
476                        );
477                        tracing::debug!("      Handler has {} opcodes", handler.len());
478
479                        let mutations = self.execute_handler(
480                            handler,
481                            event_value.clone(),
482                            event_type,
483                            entity_bytecode.state_id,
484                            entity_bytecode.computed_fields_evaluator.as_ref(),
485                        )?;
486
487                        tracing::debug!("      Handler produced {} mutation(s)", mutations.len());
488                        all_mutations.extend(mutations);
489                    } else {
490                        tracing::debug!(
491                            "   ⊘ No handler found for entity '{}', event '{}'",
492                            entity_name,
493                            event_type
494                        );
495                    }
496                } else {
497                    tracing::debug!("   ⊘ Entity '{}' not found in bytecode", entity_name);
498                }
499            }
500        } else {
501            tracing::debug!("⊘ No event routing found for event type '{}'", event_type);
502        }
503
504        Ok(all_mutations)
505    }
506
507    /// Process an event without context (backward compatibility)
508    pub fn process_event(
509        &mut self,
510        bytecode: &MultiEntityBytecode,
511        event_value: Value,
512        event_type: &str,
513    ) -> Result<Vec<Mutation>> {
514        self.process_event_with_context(bytecode, event_value, event_type, None)
515    }
516
517    pub fn process_any(
518        &mut self,
519        bytecode: &MultiEntityBytecode,
520        any: prost_types::Any,
521    ) -> Result<Vec<Mutation>> {
522        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
523        self.process_event(bytecode, event_value, &event_type)
524    }
525
526    #[cfg_attr(feature = "otel", instrument(
527        name = "vm.execute_handler", 
528        skip(self, handler, event_value, entity_evaluator),
529        level = "debug",
530        fields(
531            event_type = %event_type,
532            handler_opcodes = handler.len(),
533        )
534    ))]
535    #[allow(clippy::type_complexity)]
536    fn execute_handler(
537        &mut self,
538        handler: &[OpCode],
539        event_value: Value,
540        event_type: &str,
541        override_state_id: u32,
542        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
543    ) -> Result<Vec<Mutation>> {
544        self.reset_registers();
545
546        tracing::trace!(
547            "Executing handler: event_type={}, handler_opcodes={}, event_value={:?}",
548            event_type,
549            handler.len(),
550            event_value
551        );
552
553        let mut pc: usize = 0;
554        let mut output = Vec::new();
555        let mut dirty_fields: HashSet<String> = HashSet::new();
556
557        while pc < handler.len() {
558            match &handler[pc] {
559                OpCode::LoadEventField {
560                    path,
561                    dest,
562                    default,
563                } => {
564                    let value = self.load_field(&event_value, path, default.as_ref())?;
565                    tracing::trace!(
566                        "LoadEventField: path={:?}, dest={}, value={:?}",
567                        path.segments,
568                        dest,
569                        value
570                    );
571                    // Warn if accounts.* path returns null - this helps debug key resolution issues
572                    if value.is_null() && path.segments.len() >= 2 && path.segments[0] == "accounts" {
573                        tracing::warn!(
574                            "⚠️ LoadEventField returned NULL for accounts path: {:?} -> dest={}",
575                            path.segments, dest
576                        );
577                    }
578                    self.registers[*dest] = value;
579                    pc += 1;
580                }
581                OpCode::LoadConstant { value, dest } => {
582                    self.registers[*dest] = value.clone();
583                    pc += 1;
584                }
585                OpCode::CopyRegister { source, dest } => {
586                    let value = self.registers[*source].clone();
587                    tracing::trace!(
588                        "CopyRegister: source={}, dest={}, value={:?}",
589                        source,
590                        dest,
591                        if value.is_null() { "NULL".to_string() } else { format!("{}", value) }
592                    );
593                    self.registers[*dest] = value;
594                    pc += 1;
595                }
596                OpCode::CopyRegisterIfNull { source, dest } => {
597                    if self.registers[*dest].is_null() {
598                        self.registers[*dest] = self.registers[*source].clone();
599                        tracing::trace!(
600                            "CopyRegisterIfNull: copied from reg {} to reg {} (value={:?})",
601                            source,
602                            dest,
603                            self.registers[*source]
604                        );
605                    } else {
606                        tracing::trace!(
607                            "CopyRegisterIfNull: dest reg {} not null, skipping copy",
608                            dest
609                        );
610                    }
611                    pc += 1;
612                }
613                OpCode::GetEventType { dest } => {
614                    self.registers[*dest] = json!(event_type);
615                    pc += 1;
616                }
617                OpCode::CreateObject { dest } => {
618                    self.registers[*dest] = json!({});
619                    pc += 1;
620                }
621                OpCode::SetField {
622                    object,
623                    path,
624                    value,
625                } => {
626                    let val = self.registers[*value].clone();
627                    tracing::trace!("SetField: path={}, value={:?}", path, val);
628
629                    self.set_field_auto_vivify(*object, path, *value)?;
630                    dirty_fields.insert(path.to_string());
631                    pc += 1;
632                }
633                OpCode::SetFields { object, fields } => {
634                    for (path, value_reg) in fields {
635                        self.set_field_auto_vivify(*object, path, *value_reg)?;
636                        dirty_fields.insert(path.to_string());
637                    }
638                    pc += 1;
639                }
640                OpCode::GetField { object, path, dest } => {
641                    let value = self.get_field(*object, path)?;
642                    self.registers[*dest] = value;
643                    pc += 1;
644                }
645                OpCode::ReadOrInitState {
646                    state_id: _,
647                    key,
648                    default,
649                    dest,
650                } => {
651                    let actual_state_id = override_state_id;
652                    self.states
653                        .entry(actual_state_id)
654                        .or_insert_with(|| StateTable {
655                            data: DashMap::new(),
656                            lookup_indexes: HashMap::new(),
657                            temporal_indexes: HashMap::new(),
658                            pda_reverse_lookups: HashMap::new(),
659                            pending_updates: DashMap::new(),
660                        });
661                    let state = self
662                        .states
663                        .get(&actual_state_id)
664                        .ok_or("State table not found")?;
665                    let key_value = &self.registers[*key];
666                    if key_value.is_null() {
667                        // Only warn for account state updates (not instruction hooks that just register PDAs)
668                        // Instruction types ending in "IxState" that don't have lookup_by are expected to have null keys
669                        if event_type.ends_with("State") && !event_type.ends_with("IxState") {
670                            tracing::warn!(
671                                "ReadOrInitState: key register {} is NULL for account state, event_type={}",
672                                key,
673                                event_type
674                            );
675                        } else {
676                            tracing::debug!(
677                                "ReadOrInitState: key register {} is NULL (expected for PDA registration hook), event_type={}",
678                                key,
679                                event_type
680                            );
681                        }
682                    }
683                    let value = state
684                        .data
685                        .get(key_value)
686                        .map(|v| v.clone())
687                        .unwrap_or_else(|| default.clone());
688
689                    self.registers[*dest] = value;
690                    pc += 1;
691                }
692                OpCode::UpdateState {
693                    state_id: _,
694                    key,
695                    value,
696                } => {
697                    let actual_state_id = override_state_id;
698                    let state = self
699                        .states
700                        .get(&actual_state_id)
701                        .ok_or("State table not found")?;
702                    let key_value = self.registers[*key].clone();
703                    let value_data = self.registers[*value].clone();
704
705                    state.data.insert(key_value, value_data);
706                    pc += 1;
707                }
708                OpCode::AppendToArray {
709                    object,
710                    path,
711                    value,
712                } => {
713                    tracing::trace!("AppendToArray: path={}, value={:?}", path, self.registers[*value]);
714                    self.append_to_array(*object, path, *value)?;
715                    dirty_fields.insert(path.to_string());
716                    pc += 1;
717                }
718                OpCode::GetCurrentTimestamp { dest } => {
719                    let timestamp = std::time::SystemTime::now()
720                        .duration_since(std::time::UNIX_EPOCH)
721                        .unwrap()
722                        .as_secs() as i64;
723                    self.registers[*dest] = json!(timestamp);
724                    pc += 1;
725                }
726                OpCode::CreateEvent { dest, event_value } => {
727                    tracing::trace!("CreateEvent: event_value_reg={}, event_value={:?}", event_value, self.registers[*event_value]);
728                    let timestamp = std::time::SystemTime::now()
729                        .duration_since(std::time::UNIX_EPOCH)
730                        .unwrap()
731                        .as_secs() as i64;
732
733                    // Filter out __update_context from the event data
734                    let mut event_data = self.registers[*event_value].clone();
735                    if let Some(obj) = event_data.as_object_mut() {
736                        obj.remove("__update_context");
737                    }
738
739                    // Create event with timestamp, data, and optional slot/signature from context
740                    let mut event = serde_json::Map::new();
741                    event.insert("timestamp".to_string(), json!(timestamp));
742                    event.insert("data".to_string(), event_data);
743
744                    // Add slot and signature if available from current context
745                    if let Some(ref ctx) = self.current_context {
746                        if let Some(slot) = ctx.slot {
747                            event.insert("slot".to_string(), json!(slot));
748                        }
749                        if let Some(ref signature) = ctx.signature {
750                            event.insert("signature".to_string(), json!(signature));
751                        }
752                    }
753
754                    self.registers[*dest] = Value::Object(event);
755                    pc += 1;
756                }
757                OpCode::CreateCapture {
758                    dest,
759                    capture_value,
760                } => {
761                    let timestamp = std::time::SystemTime::now()
762                        .duration_since(std::time::UNIX_EPOCH)
763                        .unwrap()
764                        .as_secs() as i64;
765
766                    // Get the capture data (already filtered by load_field)
767                    let capture_data = self.registers[*capture_value].clone();
768
769                    // Extract account_address from the original event if available
770                    let account_address = event_value
771                        .get("__account_address")
772                        .and_then(|v| v.as_str())
773                        .unwrap_or("")
774                        .to_string();
775
776                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
777                    let mut capture = serde_json::Map::new();
778                    capture.insert("timestamp".to_string(), json!(timestamp));
779                    capture.insert("account_address".to_string(), json!(account_address));
780                    capture.insert("data".to_string(), capture_data);
781
782                    // Add slot and signature if available from current context
783                    if let Some(ref ctx) = self.current_context {
784                        if let Some(slot) = ctx.slot {
785                            capture.insert("slot".to_string(), json!(slot));
786                        }
787                        if let Some(ref signature) = ctx.signature {
788                            capture.insert("signature".to_string(), json!(signature));
789                        }
790                    }
791
792                    self.registers[*dest] = Value::Object(capture);
793                    pc += 1;
794                }
795                OpCode::Transform {
796                    source,
797                    dest,
798                    transformation,
799                } => {
800                    if source == dest {
801                        let result = self.transform_in_place(*source, transformation);
802                        if let Err(ref e) = result {
803                            tracing::error!(
804                                "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
805                                transformation, pc, e, source, &self.registers[*source]
806                            );
807                        }
808                        result?;
809                    } else {
810                        let source_value = &self.registers[*source];
811                        let result = self.apply_transformation(source_value, transformation);
812                        if let Err(ref e) = result {
813                            tracing::error!(
814                                "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
815                                transformation, pc, e, source, source_value
816                            );
817                        }
818                        let value = result?;
819                        self.registers[*dest] = value;
820                    }
821                    pc += 1;
822                }
823                OpCode::EmitMutation {
824                    entity_name,
825                    key,
826                    state,
827                } => {
828                    let primary_key = self.registers[*key].clone();
829
830                    if primary_key.is_null() || dirty_fields.is_empty() {
831                        // Skip mutation if no primary key or no dirty fields
832                        tracing::warn!(
833                            "   ⊘ [VM] Skipping mutation for entity '{}': primary_key={}, dirty_fields.len()={}",
834                            entity_name,
835                            if primary_key.is_null() { "NULL" } else { "present" },
836                            dirty_fields.len()
837                        );
838                        if dirty_fields.is_empty() {
839                            tracing::warn!("      Reason: No fields were modified by this handler");
840                        }
841                        if primary_key.is_null() {
842                            tracing::warn!("      Reason: Primary key could not be resolved (check __resolved_primary_key or key_resolution)");
843                            // Debug: dump register 15, 17, 19, 20 to understand key loading state
844                            tracing::warn!("      Debug: reg[15]={:?}, reg[17]={:?}, reg[19]={:?}, reg[20]={:?}",
845                                self.registers.get(15).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
846                                self.registers.get(17).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
847                                self.registers.get(19).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
848                                self.registers.get(20).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) })
849                            );
850                        }
851                    } else {
852                        let patch = self.extract_partial_state(*state, &dirty_fields)?;
853                        tracing::debug!(
854                            "   Patch structure: {}",
855                            serde_json::to_string_pretty(&patch).unwrap_or_default()
856                        );
857                        let mutation = Mutation {
858                            export: entity_name.clone(),
859                            key: primary_key,
860                            patch,
861                        };
862                        output.push(mutation);
863                    }
864                    pc += 1;
865                }
866                OpCode::SetFieldIfNull {
867                    object,
868                    path,
869                    value,
870                } => {
871                    let val = self.registers[*value].clone();
872                    let was_set = self.set_field_if_null(*object, path, *value)?;
873                    tracing::trace!(
874                        "SetFieldIfNull: path={}, value={:?}, was_set={}",
875                        path,
876                        val,
877                        was_set
878                    );
879                    if was_set {
880                        dirty_fields.insert(path.to_string());
881                    }
882                    pc += 1;
883                }
884                OpCode::SetFieldMax {
885                    object,
886                    path,
887                    value,
888                } => {
889                    let was_updated = self.set_field_max(*object, path, *value)?;
890                    if was_updated {
891                        dirty_fields.insert(path.to_string());
892                    }
893                    pc += 1;
894                }
895                OpCode::UpdateTemporalIndex {
896                    state_id: _,
897                    index_name,
898                    lookup_value,
899                    primary_key,
900                    timestamp,
901                } => {
902                    let actual_state_id = override_state_id;
903                    let state = self
904                        .states
905                        .get_mut(&actual_state_id)
906                        .ok_or("State table not found")?;
907                    let index = state
908                        .temporal_indexes
909                        .entry(index_name.clone())
910                        .or_insert_with(TemporalIndex::new);
911
912                    let lookup_val = self.registers[*lookup_value].clone();
913                    let pk_val = self.registers[*primary_key].clone();
914                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
915                        val
916                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
917                        val as i64
918                    } else {
919                        return Err(format!(
920                            "Timestamp must be a number (i64 or u64), got: {:?}",
921                            self.registers[*timestamp]
922                        )
923                        .into());
924                    };
925
926                    index.insert(lookup_val, pk_val, ts_val);
927                    pc += 1;
928                }
929                OpCode::LookupTemporalIndex {
930                    state_id: _,
931                    index_name,
932                    lookup_value,
933                    timestamp,
934                    dest,
935                } => {
936                    let actual_state_id = override_state_id;
937                    let state = self
938                        .states
939                        .get(&actual_state_id)
940                        .ok_or("State table not found")?;
941                    let lookup_val = &self.registers[*lookup_value];
942
943                    let result = if self.registers[*timestamp].is_null() {
944                        if let Some(index) = state.temporal_indexes.get(index_name) {
945                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
946                        } else {
947                            Value::Null
948                        }
949                    } else {
950                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
951                            val
952                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
953                            val as i64
954                        } else {
955                            return Err(format!(
956                                "Timestamp must be a number (i64 or u64), got: {:?}",
957                                self.registers[*timestamp]
958                            )
959                            .into());
960                        };
961
962                        if let Some(index) = state.temporal_indexes.get(index_name) {
963                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
964                        } else {
965                            Value::Null
966                        }
967                    };
968
969                    self.registers[*dest] = result;
970                    pc += 1;
971                }
972                OpCode::UpdateLookupIndex {
973                    state_id: _,
974                    index_name,
975                    lookup_value,
976                    primary_key,
977                } => {
978                    let actual_state_id = override_state_id;
979                    let state = self
980                        .states
981                        .get_mut(&actual_state_id)
982                        .ok_or("State table not found")?;
983                    let index = state
984                        .lookup_indexes
985                        .entry(index_name.clone())
986                        .or_insert_with(LookupIndex::new);
987
988                    let lookup_val = self.registers[*lookup_value].clone();
989                    let pk_val = self.registers[*primary_key].clone();
990
991                    tracing::debug!(
992                        "📝 UpdateLookupIndex: {} -> {} (index: {})",
993                        lookup_val, pk_val, index_name
994                    );
995
996                    index.insert(lookup_val, pk_val);
997                    pc += 1;
998                }
999                OpCode::LookupIndex {
1000                    state_id: _,
1001                    index_name,
1002                    lookup_value,
1003                    dest,
1004                } => {
1005                    let actual_state_id = override_state_id;
1006                    let lookup_val = self.registers[*lookup_value].clone();
1007
1008                    let result = {
1009                        let state = self
1010                            .states
1011                            .get(&actual_state_id)
1012                            .ok_or("State table not found")?;
1013                        
1014                        if let Some(index) = state.lookup_indexes.get(index_name) {
1015                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1016                            tracing::debug!(
1017                                "🔍 LookupIndex: {} -> {} (index: {}, entries: {})",
1018                                lookup_val, found, index_name, index.len()
1019                            );
1020                            found
1021                        } else {
1022                            Value::Null
1023                        }
1024                    };
1025                    
1026                    // Fallback: Check PDA reverse lookup table if regular index returned NULL or doesn't exist
1027                    // This handles cases where Lookup key resolution uses PDA addresses
1028                    let final_result = if result.is_null() {
1029                        if let Some(pda_str) = lookup_val.as_str() {
1030                            let state = self
1031                                .states
1032                                .get_mut(&actual_state_id)
1033                                .ok_or("State table not found")?;
1034                                
1035                            if let Some(pda_lookup) = state.pda_reverse_lookups.get_mut("default_pda_lookup") {
1036                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1037                                    tracing::info!(
1038                                        "🔍 LookupIndex (PDA fallback): {} -> {} (via default_pda_lookup)",
1039                                        &pda_str[..pda_str.len().min(8)], resolved
1040                                    );
1041                                    Value::String(resolved)
1042                                } else {
1043                                    tracing::debug!(
1044                                        "🔍 LookupIndex (PDA fallback): {} -> NOT FOUND in PDA reverse lookup",
1045                                        &pda_str[..pda_str.len().min(8)]
1046                                    );
1047                                    Value::Null
1048                                }
1049                            } else {
1050                                tracing::debug!(
1051                                    "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist, no PDA lookup table)",
1052                                    lookup_val, index_name
1053                                );
1054                                Value::Null
1055                            }
1056                        } else {
1057                            tracing::debug!(
1058                                "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist)",
1059                                lookup_val, index_name
1060                            );
1061                            Value::Null
1062                        }
1063                    } else {
1064                        result
1065                    };
1066
1067                    self.registers[*dest] = final_result;
1068                    pc += 1;
1069                }
1070                OpCode::SetFieldSum {
1071                    object,
1072                    path,
1073                    value,
1074                } => {
1075                    let was_updated = self.set_field_sum(*object, path, *value)?;
1076                    if was_updated {
1077                        dirty_fields.insert(path.to_string());
1078                    }
1079                    pc += 1;
1080                }
1081                OpCode::SetFieldIncrement { object, path } => {
1082                    let was_updated = self.set_field_increment(*object, path)?;
1083                    if was_updated {
1084                        dirty_fields.insert(path.to_string());
1085                    }
1086                    pc += 1;
1087                }
1088                OpCode::SetFieldMin {
1089                    object,
1090                    path,
1091                    value,
1092                } => {
1093                    let was_updated = self.set_field_min(*object, path, *value)?;
1094                    if was_updated {
1095                        dirty_fields.insert(path.to_string());
1096                    }
1097                    pc += 1;
1098                }
1099                OpCode::AddToUniqueSet {
1100                    state_id: _,
1101                    set_name,
1102                    value,
1103                    count_object,
1104                    count_path,
1105                } => {
1106                    let value_to_add = self.registers[*value].clone();
1107
1108                    // Store the unique set within the entity object, not in the state table
1109                    // This ensures each entity instance has its own unique set
1110                    let set_field_path = format!("__unique_set:{}", set_name);
1111
1112                    // Get or create the unique set from the entity object
1113                    let mut set: HashSet<Value> =
1114                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1115                            if !existing.is_null() {
1116                                serde_json::from_value(existing).unwrap_or_default()
1117                            } else {
1118                                HashSet::new()
1119                            }
1120                        } else {
1121                            HashSet::new()
1122                        };
1123
1124                    // Add value to set
1125                    let was_new = set.insert(value_to_add);
1126
1127                    // Store updated set back in the entity object
1128                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1129                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1130                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1131
1132                    // Update the count field in the object
1133                    if was_new {
1134                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1135                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1136                        dirty_fields.insert(count_path.to_string());
1137                    }
1138
1139                    pc += 1;
1140                }
1141                OpCode::ConditionalSetField {
1142                    object,
1143                    path,
1144                    value,
1145                    condition_field,
1146                    condition_op,
1147                    condition_value,
1148                } => {
1149                    // Load the field value from the event
1150                    let field_value = self.load_field(&event_value, condition_field, None)?;
1151
1152                    // Evaluate the condition
1153                    let condition_met =
1154                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1155
1156                    if condition_met {
1157                        let val = self.registers[*value].clone();
1158                        tracing::trace!(
1159                            "ConditionalSetField: condition met, setting {}={:?}",
1160                            path,
1161                            val
1162                        );
1163                        self.set_field_auto_vivify(*object, path, *value)?;
1164                        dirty_fields.insert(path.to_string());
1165                    } else {
1166                        tracing::trace!(
1167                            "ConditionalSetField: condition not met, skipping {}",
1168                            path
1169                        );
1170                    }
1171                    pc += 1;
1172                }
1173                OpCode::ConditionalIncrement {
1174                    object,
1175                    path,
1176                    condition_field,
1177                    condition_op,
1178                    condition_value,
1179                } => {
1180                    // Load the field value from the event
1181                    let field_value = self.load_field(&event_value, condition_field, None)?;
1182
1183                    // Evaluate the condition
1184                    let condition_met =
1185                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1186
1187                    if condition_met {
1188                        tracing::trace!(
1189                            "ConditionalIncrement: condition met, incrementing {}",
1190                            path
1191                        );
1192                        let was_updated = self.set_field_increment(*object, path)?;
1193                        if was_updated {
1194                            dirty_fields.insert(path.to_string());
1195                        }
1196                    } else {
1197                        tracing::trace!(
1198                            "ConditionalIncrement: condition not met, skipping {}",
1199                            path
1200                        );
1201                    }
1202                    pc += 1;
1203                }
1204                OpCode::EvaluateComputedFields {
1205                    state,
1206                    computed_paths,
1207                } => {
1208                    // Call the registered evaluator if one exists
1209                    if let Some(evaluator) = entity_evaluator {
1210                        let state_value = &mut self.registers[*state];
1211                        match evaluator(state_value) {
1212                            Ok(()) => {
1213                                // Mark computed fields as dirty so they're included in mutations
1214                                for path in computed_paths {
1215                                    dirty_fields.insert(path.clone());
1216                                }
1217                            }
1218                            Err(_e) => {
1219                                // Silently ignore evaluation errors
1220                            }
1221                        }
1222                    }
1223                    pc += 1;
1224                }
1225                OpCode::UpdatePdaReverseLookup {
1226                    state_id: _,
1227                    lookup_name,
1228                    pda_address,
1229                    primary_key,
1230                } => {
1231                    let actual_state_id = override_state_id;
1232                    let state = self
1233                        .states
1234                        .get_mut(&actual_state_id)
1235                        .ok_or("State table not found")?;
1236                    
1237                    let pda_val = self.registers[*pda_address].clone();
1238                    let pk_val = self.registers[*primary_key].clone();
1239                    
1240                    // Only update if both values are non-null strings
1241                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1242                        // Get or create the PDA reverse lookup table
1243                        let pda_lookup = state
1244                            .pda_reverse_lookups
1245                            .entry(lookup_name.clone())
1246                            .or_insert_with(|| PdaReverseLookup::new(10000));
1247                        
1248                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1249                        
1250                        tracing::debug!(
1251                            "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {})",
1252                            pda_str, pk_str, lookup_name
1253                        );
1254                    } else if !pk_val.is_null() {
1255                        // Primary key might be a u64, convert to string
1256                        if let Some(pk_num) = pk_val.as_u64() {
1257                            if let Some(pda_str) = pda_val.as_str() {
1258                                let pda_lookup = state
1259                                    .pda_reverse_lookups
1260                                    .entry(lookup_name.clone())
1261                                    .or_insert_with(|| PdaReverseLookup::new(10000));
1262                                
1263                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1264                                
1265                                tracing::debug!(
1266                                    "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {}, pk was u64)",
1267                                    pda_str, pk_num, lookup_name
1268                                );
1269                            }
1270                        }
1271                    }
1272                    
1273                    pc += 1;
1274                }
1275            }
1276
1277            self.instructions_executed += 1;
1278        }
1279
1280        Ok(output)
1281    }
1282
1283    fn load_field(
1284        &self,
1285        event_value: &Value,
1286        path: &FieldPath,
1287        default: Option<&Value>,
1288    ) -> Result<Value> {
1289        // Warn if path contains empty segments - this indicates a bug in AST generation
1290        if path.segments.iter().any(|s| s.is_empty()) {
1291            tracing::warn!(
1292                "load_field: path contains empty segment! path={:?}",
1293                path.segments
1294            );
1295        }
1296        
1297        if path.segments.is_empty() {
1298            // For WholeSource (empty path), filter out internal metadata fields
1299            // This prevents __account_address, __resolved_primary_key, __update_context
1300            // from appearing in captured account data
1301            if let Some(obj) = event_value.as_object() {
1302                let filtered: serde_json::Map<String, Value> = obj
1303                    .iter()
1304                    .filter(|(k, _)| !k.starts_with("__"))
1305                    .map(|(k, v)| (k.clone(), v.clone()))
1306                    .collect();
1307                return Ok(Value::Object(filtered));
1308            }
1309            return Ok(event_value.clone());
1310        }
1311
1312        let mut current = event_value;
1313        for (i, segment) in path.segments.iter().enumerate() {
1314            current = match current.get(segment) {
1315                Some(v) => v,
1316                None => {
1317                    // Log when we can't find a path segment - this helps debug key loading issues
1318                    if path.segments.len() == 2 && path.segments[0] == "accounts" {
1319                        tracing::warn!(
1320                            "load_field: could not find segment '{}' at index {} in path {:?}",
1321                            segment,
1322                            i,
1323                            path.segments,
1324                        );
1325                        tracing::warn!(
1326                            "  event has top-level keys: {:?}",
1327                            event_value.as_object().map(|o| o.keys().collect::<Vec<_>>())
1328                        );
1329                        // Show what's in the accounts object if it exists
1330                        if let Some(accounts) = event_value.get("accounts") {
1331                            tracing::warn!(
1332                                "  'accounts' exists with keys: {:?}",
1333                                accounts.as_object().map(|o| o.keys().collect::<Vec<_>>())
1334                            );
1335                        } else {
1336                            tracing::warn!("  'accounts' key does NOT exist in event_value");
1337                        }
1338                    }
1339                    return Ok(default.cloned().unwrap_or(Value::Null));
1340                }
1341            };
1342        }
1343
1344        Ok(current.clone())
1345    }
1346
1347    fn set_field_auto_vivify(
1348        &mut self,
1349        object_reg: Register,
1350        path: &str,
1351        value_reg: Register,
1352    ) -> Result<()> {
1353        let compiled = self.get_compiled_path(path);
1354        let segments = compiled.segments();
1355        let value = self.registers[value_reg].clone();
1356
1357        if !self.registers[object_reg].is_object() {
1358            self.registers[object_reg] = json!({});
1359        }
1360
1361        let obj = self.registers[object_reg]
1362            .as_object_mut()
1363            .ok_or("Not an object")?;
1364
1365        let mut current = obj;
1366        for (i, segment) in segments.iter().enumerate() {
1367            if i == segments.len() - 1 {
1368                current.insert(segment.to_string(), value.clone());
1369            } else {
1370                current
1371                    .entry(segment.to_string())
1372                    .or_insert_with(|| json!({}));
1373                current = current
1374                    .get_mut(segment)
1375                    .and_then(|v| v.as_object_mut())
1376                    .ok_or("Path collision: expected object")?;
1377            }
1378        }
1379
1380        Ok(())
1381    }
1382
1383    fn set_field_if_null(
1384        &mut self,
1385        object_reg: Register,
1386        path: &str,
1387        value_reg: Register,
1388    ) -> Result<bool> {
1389        let compiled = self.get_compiled_path(path);
1390        let segments = compiled.segments();
1391        let value = self.registers[value_reg].clone();
1392
1393        if !self.registers[object_reg].is_object() {
1394            self.registers[object_reg] = json!({});
1395        }
1396
1397        let obj = self.registers[object_reg]
1398            .as_object_mut()
1399            .ok_or("Not an object")?;
1400
1401        let mut current = obj;
1402        let mut was_set = false;
1403        for (i, segment) in segments.iter().enumerate() {
1404            if i == segments.len() - 1 {
1405                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1406                    current.insert(segment.to_string(), value.clone());
1407                    was_set = true;
1408                }
1409            } else {
1410                current
1411                    .entry(segment.to_string())
1412                    .or_insert_with(|| json!({}));
1413                current = current
1414                    .get_mut(segment)
1415                    .and_then(|v| v.as_object_mut())
1416                    .ok_or("Path collision: expected object")?;
1417            }
1418        }
1419
1420        Ok(was_set)
1421    }
1422
1423    fn set_field_max(
1424        &mut self,
1425        object_reg: Register,
1426        path: &str,
1427        value_reg: Register,
1428    ) -> Result<bool> {
1429        let compiled = self.get_compiled_path(path);
1430        let segments = compiled.segments();
1431        let new_value = self.registers[value_reg].clone();
1432
1433        if !self.registers[object_reg].is_object() {
1434            self.registers[object_reg] = json!({});
1435        }
1436
1437        let obj = self.registers[object_reg]
1438            .as_object_mut()
1439            .ok_or("Not an object")?;
1440
1441        let mut current = obj;
1442        let mut was_updated = false;
1443        for (i, segment) in segments.iter().enumerate() {
1444            if i == segments.len() - 1 {
1445                let should_update = if let Some(current_value) = current.get(segment) {
1446                    if current_value.is_null() {
1447                        true
1448                    } else {
1449                        match (current_value.as_i64(), new_value.as_i64()) {
1450                            (Some(current_val), Some(new_val)) => new_val > current_val,
1451                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1452                                new_value.as_u64().unwrap() as i64 > current_val
1453                            }
1454                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1455                                new_val > current_value.as_u64().unwrap() as i64
1456                            }
1457                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1458                                (Some(current_val), Some(new_val)) => new_val > current_val,
1459                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1460                                    (Some(current_val), Some(new_val)) => new_val > current_val,
1461                                    _ => false,
1462                                },
1463                            },
1464                            _ => false,
1465                        }
1466                    }
1467                } else {
1468                    true
1469                };
1470
1471                if should_update {
1472                    current.insert(segment.to_string(), new_value.clone());
1473                    was_updated = true;
1474                }
1475            } else {
1476                current
1477                    .entry(segment.to_string())
1478                    .or_insert_with(|| json!({}));
1479                current = current
1480                    .get_mut(segment)
1481                    .and_then(|v| v.as_object_mut())
1482                    .ok_or("Path collision: expected object")?;
1483            }
1484        }
1485
1486        Ok(was_updated)
1487    }
1488
1489    fn set_field_sum(
1490        &mut self,
1491        object_reg: Register,
1492        path: &str,
1493        value_reg: Register,
1494    ) -> Result<bool> {
1495        let compiled = self.get_compiled_path(path);
1496        let segments = compiled.segments();
1497        let new_value = self.registers[value_reg].clone();
1498
1499        if !self.registers[object_reg].is_object() {
1500            self.registers[object_reg] = json!({});
1501        }
1502
1503        let obj = self.registers[object_reg]
1504            .as_object_mut()
1505            .ok_or("Not an object")?;
1506
1507        let mut current = obj;
1508        for (i, segment) in segments.iter().enumerate() {
1509            if i == segments.len() - 1 {
1510                // Get current value (default to 0 if null/missing)
1511                let current_val = current
1512                    .get(segment)
1513                    .and_then(|v| {
1514                        if v.is_null() {
1515                            None
1516                        } else {
1517                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1518                        }
1519                    })
1520                    .unwrap_or(0);
1521
1522                // Add new value
1523                let new_val_num = new_value
1524                    .as_i64()
1525                    .or_else(|| new_value.as_u64().map(|n| n as i64))
1526                    .ok_or("Sum requires numeric value")?;
1527
1528                let sum = current_val + new_val_num;
1529                current.insert(segment.to_string(), json!(sum));
1530                return Ok(true);
1531            } else {
1532                current
1533                    .entry(segment.to_string())
1534                    .or_insert_with(|| json!({}));
1535                current = current
1536                    .get_mut(segment)
1537                    .and_then(|v| v.as_object_mut())
1538                    .ok_or("Path collision: expected object")?;
1539            }
1540        }
1541
1542        Ok(false)
1543    }
1544
1545    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1546        let compiled = self.get_compiled_path(path);
1547        let segments = compiled.segments();
1548
1549        if !self.registers[object_reg].is_object() {
1550            self.registers[object_reg] = json!({});
1551        }
1552
1553        let obj = self.registers[object_reg]
1554            .as_object_mut()
1555            .ok_or("Not an object")?;
1556
1557        let mut current = obj;
1558        for (i, segment) in segments.iter().enumerate() {
1559            if i == segments.len() - 1 {
1560                // Get current value (default to 0 if null/missing)
1561                let current_val = current
1562                    .get(segment)
1563                    .and_then(|v| {
1564                        if v.is_null() {
1565                            None
1566                        } else {
1567                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1568                        }
1569                    })
1570                    .unwrap_or(0);
1571
1572                let incremented = current_val + 1;
1573                current.insert(segment.to_string(), json!(incremented));
1574                return Ok(true);
1575            } else {
1576                current
1577                    .entry(segment.to_string())
1578                    .or_insert_with(|| json!({}));
1579                current = current
1580                    .get_mut(segment)
1581                    .and_then(|v| v.as_object_mut())
1582                    .ok_or("Path collision: expected object")?;
1583            }
1584        }
1585
1586        Ok(false)
1587    }
1588
1589    fn set_field_min(
1590        &mut self,
1591        object_reg: Register,
1592        path: &str,
1593        value_reg: Register,
1594    ) -> Result<bool> {
1595        let compiled = self.get_compiled_path(path);
1596        let segments = compiled.segments();
1597        let new_value = self.registers[value_reg].clone();
1598
1599        if !self.registers[object_reg].is_object() {
1600            self.registers[object_reg] = json!({});
1601        }
1602
1603        let obj = self.registers[object_reg]
1604            .as_object_mut()
1605            .ok_or("Not an object")?;
1606
1607        let mut current = obj;
1608        let mut was_updated = false;
1609        for (i, segment) in segments.iter().enumerate() {
1610            if i == segments.len() - 1 {
1611                let should_update = if let Some(current_value) = current.get(segment) {
1612                    if current_value.is_null() {
1613                        true
1614                    } else {
1615                        match (current_value.as_i64(), new_value.as_i64()) {
1616                            (Some(current_val), Some(new_val)) => new_val < current_val,
1617                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1618                                (new_value.as_u64().unwrap() as i64) < current_val
1619                            }
1620                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1621                                new_val < current_value.as_u64().unwrap() as i64
1622                            }
1623                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1624                                (Some(current_val), Some(new_val)) => new_val < current_val,
1625                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1626                                    (Some(current_val), Some(new_val)) => new_val < current_val,
1627                                    _ => false,
1628                                },
1629                            },
1630                            _ => false,
1631                        }
1632                    }
1633                } else {
1634                    true
1635                };
1636
1637                if should_update {
1638                    current.insert(segment.to_string(), new_value.clone());
1639                    was_updated = true;
1640                }
1641            } else {
1642                current
1643                    .entry(segment.to_string())
1644                    .or_insert_with(|| json!({}));
1645                current = current
1646                    .get_mut(segment)
1647                    .and_then(|v| v.as_object_mut())
1648                    .ok_or("Path collision: expected object")?;
1649            }
1650        }
1651
1652        Ok(was_updated)
1653    }
1654
1655    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
1656        let compiled = self.get_compiled_path(path);
1657        let segments = compiled.segments();
1658        let mut current = &self.registers[object_reg];
1659
1660        for segment in segments {
1661            current = current
1662                .get(segment)
1663                .ok_or_else(|| format!("Field not found: {}", segment))?;
1664        }
1665
1666        Ok(current.clone())
1667    }
1668
1669    fn append_to_array(
1670        &mut self,
1671        object_reg: Register,
1672        path: &str,
1673        value_reg: Register,
1674    ) -> Result<()> {
1675        let compiled = self.get_compiled_path(path);
1676        let segments = compiled.segments();
1677        let value = self.registers[value_reg].clone();
1678
1679        if !self.registers[object_reg].is_object() {
1680            self.registers[object_reg] = json!({});
1681        }
1682
1683        let obj = self.registers[object_reg]
1684            .as_object_mut()
1685            .ok_or("Not an object")?;
1686
1687        let mut current = obj;
1688        for (i, segment) in segments.iter().enumerate() {
1689            if i == segments.len() - 1 {
1690                current
1691                    .entry(segment.to_string())
1692                    .or_insert_with(|| json!([]));
1693                let arr = current
1694                    .get_mut(segment)
1695                    .and_then(|v| v.as_array_mut())
1696                    .ok_or("Path is not an array")?;
1697                arr.push(value.clone());
1698            } else {
1699                current
1700                    .entry(segment.to_string())
1701                    .or_insert_with(|| json!({}));
1702                current = current
1703                    .get_mut(segment)
1704                    .and_then(|v| v.as_object_mut())
1705                    .ok_or("Path collision: expected object")?;
1706            }
1707        }
1708
1709        Ok(())
1710    }
1711
1712    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
1713        let value = &self.registers[reg];
1714        let transformed = self.apply_transformation(value, transformation)?;
1715        self.registers[reg] = transformed;
1716        Ok(())
1717    }
1718
1719    fn apply_transformation(
1720        &self,
1721        value: &Value,
1722        transformation: &Transformation,
1723    ) -> Result<Value> {
1724        match transformation {
1725            Transformation::HexEncode => {
1726                if let Some(arr) = value.as_array() {
1727                    let bytes: Vec<u8> = arr
1728                        .iter()
1729                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1730                        .collect();
1731                    let hex = hex::encode(&bytes);
1732                    Ok(json!(hex))
1733                } else {
1734                    Err("HexEncode requires an array of numbers".into())
1735                }
1736            }
1737            Transformation::HexDecode => {
1738                if let Some(s) = value.as_str() {
1739                    let s = s.strip_prefix("0x").unwrap_or(s);
1740                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
1741                    Ok(json!(bytes))
1742                } else {
1743                    Err("HexDecode requires a string".into())
1744                }
1745            }
1746            Transformation::Base58Encode => {
1747                if let Some(arr) = value.as_array() {
1748                    let bytes: Vec<u8> = arr
1749                        .iter()
1750                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1751                        .collect();
1752                    let encoded = bs58::encode(&bytes).into_string();
1753                    Ok(json!(encoded))
1754                } else if value.is_string() {
1755                    // Value is already a string (likely already base58 encoded), return as-is
1756                    tracing::debug!("Base58Encode: value is already a string, passing through: {:?}", value);
1757                    Ok(value.clone())
1758                } else {
1759                    tracing::error!("Base58Encode failed: value type is {:?}, value: {:?}", 
1760                        if value.is_null() { "null" }
1761                        else if value.is_boolean() { "boolean" }
1762                        else if value.is_number() { "number" }
1763                        else if value.is_object() { "object" }
1764                        else { "unknown" },
1765                        value
1766                    );
1767                    Err("Base58Encode requires an array of numbers".into())
1768                }
1769            }
1770            Transformation::Base58Decode => {
1771                if let Some(s) = value.as_str() {
1772                    let bytes = bs58::decode(s).into_vec().map_err(|e| format!("Base58 decode error: {}", e))?;
1773                    Ok(json!(bytes))
1774                } else {
1775                    Err("Base58Decode requires a string".into())
1776                }
1777            }
1778            Transformation::ToString => Ok(json!(value.to_string())),
1779            Transformation::ToNumber => {
1780                if let Some(s) = value.as_str() {
1781                    let n = s
1782                        .parse::<i64>()
1783                        .map_err(|e| format!("Parse error: {}", e))?;
1784                    Ok(json!(n))
1785                } else {
1786                    Ok(value.clone())
1787                }
1788            }
1789        }
1790    }
1791
1792    fn evaluate_comparison(
1793        &self,
1794        field_value: &Value,
1795        op: &ComparisonOp,
1796        condition_value: &Value,
1797    ) -> Result<bool> {
1798        use ComparisonOp::*;
1799
1800        match op {
1801            Equal => Ok(field_value == condition_value),
1802            NotEqual => Ok(field_value != condition_value),
1803            GreaterThan => {
1804                // Try to compare as numbers
1805                match (field_value.as_i64(), condition_value.as_i64()) {
1806                    (Some(a), Some(b)) => Ok(a > b),
1807                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
1808                        (Some(a), Some(b)) => Ok(a > b),
1809                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
1810                            (Some(a), Some(b)) => Ok(a > b),
1811                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
1812                        },
1813                    },
1814                }
1815            }
1816            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1817                (Some(a), Some(b)) => Ok(a >= b),
1818                _ => match (field_value.as_u64(), condition_value.as_u64()) {
1819                    (Some(a), Some(b)) => Ok(a >= b),
1820                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
1821                        (Some(a), Some(b)) => Ok(a >= b),
1822                        _ => {
1823                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
1824                        }
1825                    },
1826                },
1827            },
1828            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
1829                (Some(a), Some(b)) => Ok(a < b),
1830                _ => match (field_value.as_u64(), condition_value.as_u64()) {
1831                    (Some(a), Some(b)) => Ok(a < b),
1832                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
1833                        (Some(a), Some(b)) => Ok(a < b),
1834                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
1835                    },
1836                },
1837            },
1838            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1839                (Some(a), Some(b)) => Ok(a <= b),
1840                _ => match (field_value.as_u64(), condition_value.as_u64()) {
1841                    (Some(a), Some(b)) => Ok(a <= b),
1842                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
1843                        (Some(a), Some(b)) => Ok(a <= b),
1844                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
1845                    },
1846                },
1847            },
1848        }
1849    }
1850
1851    /// Update a PDA reverse lookup and return pending updates for reprocessing
1852    ///
1853    /// After registering the PDA reverse lookup, this returns any pending account updates
1854    /// that were queued for this PDA. The caller should reprocess these through the VM
1855    /// by calling process_event() for each update.
1856    ///
1857    /// # Example
1858    /// ```ignore
1859    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
1860    /// for update in pending {
1861    ///     vm.process_event(&bytecode, update.account_data, &update.account_type)?;
1862    /// }
1863    /// ```
1864    #[cfg_attr(feature = "otel", instrument(
1865        name = "vm.update_pda_lookup",
1866        skip(self),
1867        fields(
1868            pda = %pda_address,
1869            seed = %seed_value,
1870        )
1871    ))]
1872    pub fn update_pda_reverse_lookup(
1873        &mut self,
1874        state_id: u32,
1875        lookup_name: &str,
1876        pda_address: String,
1877        seed_value: String,
1878    ) -> Result<Vec<PendingAccountUpdate>> {
1879        let state = self
1880            .states
1881            .get_mut(&state_id)
1882            .ok_or("State table not found")?;
1883
1884        let lookup = state
1885            .pda_reverse_lookups
1886            .entry(lookup_name.to_string())
1887            .or_insert_with(|| PdaReverseLookup::new(10000));
1888
1889        // Insert and check if an entry was evicted
1890        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
1891
1892        // Clean up pending updates for the evicted PDA
1893        if let Some(ref evicted) = evicted_pda {
1894            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
1895                let count = evicted_updates.len();
1896                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
1897                tracing::info!(
1898                    "Cleaned up {} pending updates for evicted PDA {} from LRU cache",
1899                    count,
1900                    evicted
1901                );
1902            }
1903        }
1904
1905        // Flush and return pending updates for this PDA
1906        self.flush_pending_updates(state_id, &pda_address)
1907    }
1908
1909    /// Clean up expired pending updates that are older than the TTL
1910    ///
1911    /// Returns the number of updates that were removed.
1912    /// This should be called periodically to prevent memory leaks from orphaned updates.
1913    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
1914        let state = match self.states.get_mut(&state_id) {
1915            Some(s) => s,
1916            None => return 0,
1917        };
1918
1919        let now = std::time::SystemTime::now()
1920            .duration_since(std::time::UNIX_EPOCH)
1921            .unwrap()
1922            .as_secs() as i64;
1923
1924        let mut removed_count = 0;
1925
1926        // Iterate through all pending updates and remove expired ones
1927        state.pending_updates.retain(|_pda_address, updates| {
1928            let original_len = updates.len();
1929
1930            updates.retain(|update| {
1931                let age = now - update.queued_at;
1932                age <= PENDING_UPDATE_TTL_SECONDS
1933            });
1934
1935            removed_count += original_len - updates.len();
1936
1937            // Remove the entry entirely if no updates remain
1938            !updates.is_empty()
1939        });
1940
1941        // Update the global counter
1942        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
1943
1944        if removed_count > 0 {
1945            tracing::info!(
1946                "Cleaned up {} expired pending updates (TTL: {}s)",
1947                removed_count,
1948                PENDING_UPDATE_TTL_SECONDS
1949            );
1950        }
1951
1952        removed_count
1953    }
1954
1955    /// Queue an account update for later processing when PDA reverse lookup is not yet available
1956    ///
1957    /// # Workflow
1958    ///
1959    /// This implements a deferred processing pattern for account updates when the PDA reverse
1960    /// lookup needed to resolve the primary key is not yet available:
1961    ///
1962    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
1963    ///    is not available, call `queue_account_update()` to queue it for later.
1964    ///
1965    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
1966    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
1967    ///
1968    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
1969    ///    ```ignore
1970    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
1971    ///    for update in pending {
1972    ///        let mutations = vm.process_event(
1973    ///            &bytecode,
1974    ///            update.account_data,
1975    ///            &update.account_type
1976    ///        )?;
1977    ///        // Handle mutations...
1978    ///    }
1979    ///    ```
1980    ///
1981    /// # Arguments
1982    ///
1983    /// * `state_id` - The state table ID
1984    /// * `pda_address` - The PDA address that needs reverse lookup
1985    /// * `account_type` - The event type name for reprocessing
1986    /// * `account_data` - The account data (event value) for reprocessing
1987    /// * `slot` - The slot number when this update occurred
1988    /// * `signature` - The transaction signature
1989    #[cfg_attr(feature = "otel", instrument(
1990        name = "vm.queue_account_update",
1991        skip(self, account_data),
1992        fields(
1993            pda = %pda_address,
1994            account_type = %account_type,
1995            slot = slot,
1996        )
1997    ))]
1998    pub fn queue_account_update(
1999        &mut self,
2000        state_id: u32,
2001        pda_address: String,
2002        account_type: String,
2003        account_data: Value,
2004        slot: u64,
2005        signature: String,
2006    ) -> Result<()> {
2007        // Check if we've exceeded the global limit
2008        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2009            tracing::warn!(
2010                "Pending queue size limit reached ({}), cleaning up expired updates",
2011                MAX_PENDING_UPDATES_TOTAL
2012            );
2013            let removed = self.cleanup_expired_pending_updates(state_id);
2014
2015            // If still at limit after cleanup, drop the oldest update
2016            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2017                tracing::warn!(
2018                    "Still at limit after cleanup (removed {}), will drop oldest update",
2019                    removed
2020                );
2021                self.drop_oldest_pending_update(state_id)?;
2022            }
2023        }
2024
2025        let state = self
2026            .states
2027            .get_mut(&state_id)
2028            .ok_or("State table not found")?;
2029
2030        let pending = PendingAccountUpdate {
2031            account_type,
2032            pda_address: pda_address.clone(),
2033            account_data,
2034            slot,
2035            signature,
2036            queued_at: std::time::SystemTime::now()
2037                .duration_since(std::time::UNIX_EPOCH)
2038                .unwrap()
2039                .as_secs() as i64,
2040        };
2041
2042        // Get or create the vector for this PDA and enforce limits
2043        {
2044            let mut updates = state
2045                .pending_updates
2046                .entry(pda_address.clone())
2047                .or_insert_with(Vec::new);
2048
2049            // Deduplication: Remove any updates with the same or older slot
2050            // Keep only updates with newer slots than the incoming one
2051            let original_len = updates.len();
2052            updates.retain(|existing| {
2053                // Keep if existing has a newer slot
2054                existing.slot > slot
2055            });
2056            let removed_by_dedup = original_len - updates.len();
2057
2058            if removed_by_dedup > 0 {
2059                self.pending_queue_size = self
2060                    .pending_queue_size
2061                    .saturating_sub(removed_by_dedup as u64);
2062                tracing::debug!(
2063                    "Deduplicated {} older update(s) for PDA {} (new slot: {})",
2064                    removed_by_dedup,
2065                    pda_address,
2066                    slot
2067                );
2068            }
2069
2070            // Enforce per-PDA limit after deduplication
2071            if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2072                tracing::warn!(
2073                    "Per-PDA limit reached for {} ({} updates), dropping oldest",
2074                    pda_address,
2075                    updates.len()
2076                );
2077                updates.remove(0);
2078                self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2079            }
2080
2081            updates.push(pending);
2082            self.pending_queue_size += 1;
2083        }
2084
2085        Ok(())
2086    }
2087
2088    /// Get statistics about the pending queue for monitoring
2089    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2090        let state = self.states.get(&state_id)?;
2091
2092        let now = std::time::SystemTime::now()
2093            .duration_since(std::time::UNIX_EPOCH)
2094            .unwrap()
2095            .as_secs() as i64;
2096
2097        let mut total_updates = 0;
2098        let mut oldest_timestamp = now;
2099        let mut largest_pda_queue = 0;
2100        let mut estimated_memory = 0;
2101
2102        for entry in state.pending_updates.iter() {
2103            let (_, updates) = entry.pair();
2104            total_updates += updates.len();
2105            largest_pda_queue = largest_pda_queue.max(updates.len());
2106
2107            for update in updates.iter() {
2108                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2109                // Rough memory estimate
2110                estimated_memory += update.account_type.len() +
2111                                   update.pda_address.len() +
2112                                   update.signature.len() +
2113                                   16 + // slot + queued_at
2114                                   estimate_json_size(&update.account_data);
2115            }
2116        }
2117
2118        Some(PendingQueueStats {
2119            total_updates,
2120            unique_pdas: state.pending_updates.len(),
2121            oldest_age_seconds: now - oldest_timestamp,
2122            largest_pda_queue_size: largest_pda_queue,
2123            estimated_memory_bytes: estimated_memory,
2124        })
2125    }
2126
2127    /// Drop the oldest pending update across all PDAs
2128    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2129        let state = self
2130            .states
2131            .get_mut(&state_id)
2132            .ok_or("State table not found")?;
2133
2134        let mut oldest_pda: Option<String> = None;
2135        let mut oldest_timestamp = i64::MAX;
2136
2137        // Find the PDA with the oldest update
2138        for entry in state.pending_updates.iter() {
2139            let (pda, updates) = entry.pair();
2140            if let Some(update) = updates.first() {
2141                if update.queued_at < oldest_timestamp {
2142                    oldest_timestamp = update.queued_at;
2143                    oldest_pda = Some(pda.clone());
2144                }
2145            }
2146        }
2147
2148        // Remove the oldest update
2149        if let Some(pda) = oldest_pda {
2150            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2151                if !updates.is_empty() {
2152                    updates.remove(0);
2153                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2154
2155                    // Remove the entry if it's now empty
2156                    if updates.is_empty() {
2157                        drop(updates);
2158                        state.pending_updates.remove(&pda);
2159                    }
2160                }
2161            }
2162        }
2163
2164        Ok(())
2165    }
2166
2167    /// Flush and return pending updates for a PDA for external reprocessing
2168    ///
2169    /// Returns the pending updates that were queued for this PDA address.
2170    /// The caller should reprocess these through the VM using process_event().
2171    fn flush_pending_updates(
2172        &mut self,
2173        state_id: u32,
2174        pda_address: &str,
2175    ) -> Result<Vec<PendingAccountUpdate>> {
2176        let state = self
2177            .states
2178            .get_mut(&state_id)
2179            .ok_or("State table not found")?;
2180
2181        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2182            let count = pending_updates.len();
2183            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2184            Ok(pending_updates)
2185        } else {
2186            Ok(Vec::new())
2187        }
2188    }
2189
2190    /// Try to resolve a primary key via PDA reverse lookup
2191    pub fn try_pda_reverse_lookup(
2192        &mut self,
2193        state_id: u32,
2194        lookup_name: &str,
2195        pda_address: &str,
2196    ) -> Option<String> {
2197        let state = self.states.get_mut(&state_id)?;
2198
2199        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2200            if let Some(value) = lookup.lookup(pda_address) {
2201                self.pda_cache_hits += 1;
2202                return Some(value);
2203            }
2204        }
2205
2206        self.pda_cache_misses += 1;
2207        None
2208    }
2209
2210    // ============================================================================
2211    // Computed Expression Evaluator (Task 5)
2212    // ============================================================================
2213
2214    /// Evaluate a computed expression AST against the current state
2215    /// This is the core runtime evaluator for computed fields from the AST
2216    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2217        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2218    }
2219
2220    /// Evaluate a computed expression with a variable environment (for let bindings)
2221    fn evaluate_computed_expr_with_env(
2222        &self,
2223        expr: &ComputedExpr,
2224        state: &Value,
2225        env: &std::collections::HashMap<String, Value>,
2226    ) -> Result<Value> {
2227        match expr {
2228            ComputedExpr::FieldRef { path } => {
2229                self.get_field_from_state(state, path)
2230            }
2231
2232            ComputedExpr::Var { name } => {
2233                env.get(name)
2234                    .cloned()
2235                    .ok_or_else(|| format!("Undefined variable: {}", name).into())
2236            }
2237
2238            ComputedExpr::Let { name, value, body } => {
2239                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2240                let mut new_env = env.clone();
2241                new_env.insert(name.clone(), val);
2242                self.evaluate_computed_expr_with_env(body, state, &new_env)
2243            }
2244
2245            ComputedExpr::If { condition, then_branch, else_branch } => {
2246                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2247                if self.value_to_bool(&cond_val) {
2248                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2249                } else {
2250                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2251                }
2252            }
2253
2254            ComputedExpr::None => Ok(Value::Null),
2255
2256            ComputedExpr::Some { value } => {
2257                self.evaluate_computed_expr_with_env(value, state, env)
2258            }
2259
2260            ComputedExpr::Slice { expr, start, end } => {
2261                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2262                match val {
2263                    Value::Array(arr) => {
2264                        let slice: Vec<Value> = arr.get(*start..*end)
2265                            .unwrap_or(&[])
2266                            .to_vec();
2267                        Ok(Value::Array(slice))
2268                    }
2269                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2270                }
2271            }
2272
2273            ComputedExpr::Index { expr, index } => {
2274                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2275                match val {
2276                    Value::Array(arr) => {
2277                        Ok(arr.get(*index).cloned().unwrap_or(Value::Null))
2278                    }
2279                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2280                }
2281            }
2282
2283            ComputedExpr::U64FromLeBytes { bytes } => {
2284                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2285                let byte_vec = self.value_to_bytes(&val)?;
2286                if byte_vec.len() < 8 {
2287                    return Err(format!("u64::from_le_bytes requires 8 bytes, got {}", byte_vec.len()).into());
2288                }
2289                let arr: [u8; 8] = byte_vec[..8].try_into()
2290                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2291                Ok(json!(u64::from_le_bytes(arr)))
2292            }
2293
2294            ComputedExpr::U64FromBeBytes { bytes } => {
2295                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2296                let byte_vec = self.value_to_bytes(&val)?;
2297                if byte_vec.len() < 8 {
2298                    return Err(format!("u64::from_be_bytes requires 8 bytes, got {}", byte_vec.len()).into());
2299                }
2300                let arr: [u8; 8] = byte_vec[..8].try_into()
2301                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2302                Ok(json!(u64::from_be_bytes(arr)))
2303            }
2304
2305            ComputedExpr::ByteArray { bytes } => {
2306                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2307            }
2308
2309            ComputedExpr::Closure { param, body } => {
2310                // Closures are stored as-is; they're evaluated when used in map()
2311                // Return a special representation
2312                Ok(json!({
2313                    "__closure": {
2314                        "param": param,
2315                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
2316                    }
2317                }))
2318            }
2319
2320            ComputedExpr::Unary { op, expr } => {
2321                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2322                self.apply_unary_op(op, &val)
2323            }
2324
2325            ComputedExpr::JsonToBytes { expr } => {
2326                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2327                // Convert JSON array of numbers to byte array
2328                let bytes = self.value_to_bytes(&val)?;
2329                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2330            }
2331
2332            ComputedExpr::UnwrapOr { expr, default } => {
2333                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2334                if val.is_null() {
2335                    Ok(default.clone())
2336                } else {
2337                    Ok(val)
2338                }
2339            }
2340
2341            ComputedExpr::Binary { op, left, right } => {
2342                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2343                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2344                self.apply_binary_op(op, &l, &r)
2345            }
2346
2347            ComputedExpr::Cast { expr, to_type } => {
2348                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2349                self.apply_cast(&val, to_type)
2350            }
2351
2352            ComputedExpr::MethodCall { expr, method, args } => {
2353                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2354                // Special handling for map() with closures
2355                if method == "map" && args.len() == 1 {
2356                    if let ComputedExpr::Closure { param, body } = &args[0] {
2357                        // If the value is null, return null (Option::None.map returns None)
2358                        if val.is_null() {
2359                            return Ok(Value::Null);
2360                        }
2361                        // Evaluate the closure body with the value bound to param
2362                        let mut closure_env = env.clone();
2363                        closure_env.insert(param.clone(), val);
2364                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2365                    }
2366                }
2367                let evaluated_args: Vec<Value> = args
2368                    .iter()
2369                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2370                    .collect::<Result<Vec<_>>>()?;
2371                self.apply_method_call(&val, method, &evaluated_args)
2372            }
2373
2374            ComputedExpr::Literal { value } => {
2375                Ok(value.clone())
2376            }
2377
2378            ComputedExpr::Paren { expr } => {
2379                self.evaluate_computed_expr_with_env(expr, state, env)
2380            }
2381        }
2382    }
2383
2384    /// Convert a JSON value to a byte vector
2385    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2386        match val {
2387            Value::Array(arr) => {
2388                arr.iter()
2389                    .map(|v| v.as_u64().map(|n| n as u8).ok_or_else(|| "Array element not a valid byte".into()))
2390                    .collect()
2391            }
2392            Value::String(s) => {
2393                // Try to decode as hex
2394                if s.starts_with("0x") || s.starts_with("0X") {
2395                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2396                } else {
2397                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2398                }
2399            }
2400            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2401        }
2402    }
2403
2404    /// Apply a unary operation
2405    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2406        use crate::ast::UnaryOp;
2407        match op {
2408            UnaryOp::Not => {
2409                Ok(json!(!self.value_to_bool(val)))
2410            }
2411            UnaryOp::ReverseBits => {
2412                match val.as_u64() {
2413                    Some(n) => Ok(json!(n.reverse_bits())),
2414                    None => match val.as_i64() {
2415                        Some(n) => Ok(json!((n as u64).reverse_bits())),
2416                        None => Err("reverse_bits requires an integer".into()),
2417                    }
2418                }
2419            }
2420        }
2421    }
2422
2423    /// Get a field value from state by path (e.g., "section.field" or just "field")
2424    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2425        let segments: Vec<&str> = path.split('.').collect();
2426        let mut current = state;
2427
2428        for segment in segments {
2429            match current.get(segment) {
2430                Some(v) => current = v,
2431                None => return Ok(Value::Null),
2432            }
2433        }
2434
2435        Ok(current.clone())
2436    }
2437
2438    /// Apply a binary operation to two values
2439    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2440        match op {
2441            // Arithmetic operations
2442            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2443            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2444            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2445            BinaryOp::Div => {
2446                // Check for division by zero
2447                if let Some(r) = right.as_i64() {
2448                    if r == 0 {
2449                        return Err("Division by zero".into());
2450                    }
2451                }
2452                if let Some(r) = right.as_f64() {
2453                    if r == 0.0 {
2454                        return Err("Division by zero".into());
2455                    }
2456                }
2457                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2458            }
2459            BinaryOp::Mod => {
2460                // Modulo - only for integers
2461                match (left.as_i64(), right.as_i64()) {
2462                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2463                    (None, _) | (_, None) => {
2464                        match (left.as_u64(), right.as_u64()) {
2465                            (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2466                            _ => Err("Modulo requires non-zero integer operands".into()),
2467                        }
2468                    }
2469                    _ => Err("Modulo by zero".into()),
2470                }
2471            }
2472
2473            // Comparison operations
2474            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2475            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2476            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2477            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2478            BinaryOp::Eq => Ok(json!(left == right)),
2479            BinaryOp::Ne => Ok(json!(left != right)),
2480
2481            // Logical operations
2482            BinaryOp::And => {
2483                let l_bool = self.value_to_bool(left);
2484                let r_bool = self.value_to_bool(right);
2485                Ok(json!(l_bool && r_bool))
2486            }
2487            BinaryOp::Or => {
2488                let l_bool = self.value_to_bool(left);
2489                let r_bool = self.value_to_bool(right);
2490                Ok(json!(l_bool || r_bool))
2491            }
2492
2493            // Bitwise operations
2494            BinaryOp::Xor => {
2495                match (left.as_u64(), right.as_u64()) {
2496                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
2497                    _ => match (left.as_i64(), right.as_i64()) {
2498                        (Some(a), Some(b)) => Ok(json!(a ^ b)),
2499                        _ => Err("XOR requires integer operands".into()),
2500                    }
2501                }
2502            }
2503            BinaryOp::BitAnd => {
2504                match (left.as_u64(), right.as_u64()) {
2505                    (Some(a), Some(b)) => Ok(json!(a & b)),
2506                    _ => match (left.as_i64(), right.as_i64()) {
2507                        (Some(a), Some(b)) => Ok(json!(a & b)),
2508                        _ => Err("BitAnd requires integer operands".into()),
2509                    }
2510                }
2511            }
2512            BinaryOp::BitOr => {
2513                match (left.as_u64(), right.as_u64()) {
2514                    (Some(a), Some(b)) => Ok(json!(a | b)),
2515                    _ => match (left.as_i64(), right.as_i64()) {
2516                        (Some(a), Some(b)) => Ok(json!(a | b)),
2517                        _ => Err("BitOr requires integer operands".into()),
2518                    }
2519                }
2520            }
2521            BinaryOp::Shl => {
2522                match (left.as_u64(), right.as_u64()) {
2523                    (Some(a), Some(b)) => Ok(json!(a << b)),
2524                    _ => match (left.as_i64(), right.as_i64()) {
2525                        (Some(a), Some(b)) => Ok(json!(a << b)),
2526                        _ => Err("Shl requires integer operands".into()),
2527                    }
2528                }
2529            }
2530            BinaryOp::Shr => {
2531                match (left.as_u64(), right.as_u64()) {
2532                    (Some(a), Some(b)) => Ok(json!(a >> b)),
2533                    _ => match (left.as_i64(), right.as_i64()) {
2534                        (Some(a), Some(b)) => Ok(json!(a >> b)),
2535                        _ => Err("Shr requires integer operands".into()),
2536                    }
2537                }
2538            }
2539        }
2540    }
2541
2542    /// Helper for numeric operations that can work on integers or floats
2543    fn numeric_op<F1, F2>(
2544        &self,
2545        left: &Value,
2546        right: &Value,
2547        int_op: F1,
2548        float_op: F2,
2549    ) -> Result<Value>
2550    where
2551        F1: Fn(i64, i64) -> i64,
2552        F2: Fn(f64, f64) -> f64,
2553    {
2554        // Try i64 first
2555        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2556            return Ok(json!(int_op(a, b)));
2557        }
2558
2559        // Try u64
2560        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2561            // For u64, we need to be careful with underflow in subtraction
2562            return Ok(json!(int_op(a as i64, b as i64)));
2563        }
2564
2565        // Try f64
2566        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2567            return Ok(json!(float_op(a, b)));
2568        }
2569
2570        // If either is null, return null
2571        if left.is_null() || right.is_null() {
2572            return Ok(Value::Null);
2573        }
2574
2575        Err(format!(
2576            "Cannot perform numeric operation on {:?} and {:?}",
2577            left, right
2578        )
2579        .into())
2580    }
2581
2582    /// Helper for comparison operations
2583    fn comparison_op<F1, F2>(
2584        &self,
2585        left: &Value,
2586        right: &Value,
2587        int_cmp: F1,
2588        float_cmp: F2,
2589    ) -> Result<Value>
2590    where
2591        F1: Fn(i64, i64) -> bool,
2592        F2: Fn(f64, f64) -> bool,
2593    {
2594        // Try i64 first
2595        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2596            return Ok(json!(int_cmp(a, b)));
2597        }
2598
2599        // Try u64
2600        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2601            return Ok(json!(int_cmp(a as i64, b as i64)));
2602        }
2603
2604        // Try f64
2605        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2606            return Ok(json!(float_cmp(a, b)));
2607        }
2608
2609        // If either is null, comparison returns false
2610        if left.is_null() || right.is_null() {
2611            return Ok(json!(false));
2612        }
2613
2614        Err(format!(
2615            "Cannot compare {:?} and {:?}",
2616            left, right
2617        )
2618        .into())
2619    }
2620
2621    /// Convert a value to boolean for logical operations
2622    fn value_to_bool(&self, value: &Value) -> bool {
2623        match value {
2624            Value::Null => false,
2625            Value::Bool(b) => *b,
2626            Value::Number(n) => {
2627                if let Some(i) = n.as_i64() {
2628                    i != 0
2629                } else if let Some(f) = n.as_f64() {
2630                    f != 0.0
2631                } else {
2632                    true
2633                }
2634            }
2635            Value::String(s) => !s.is_empty(),
2636            Value::Array(arr) => !arr.is_empty(),
2637            Value::Object(obj) => !obj.is_empty(),
2638        }
2639    }
2640
2641    /// Apply a type cast to a value
2642    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
2643        match to_type {
2644            "i8" | "i16" | "i32" | "i64" | "isize" => {
2645                if let Some(n) = value.as_i64() {
2646                    Ok(json!(n))
2647                } else if let Some(n) = value.as_u64() {
2648                    Ok(json!(n as i64))
2649                } else if let Some(n) = value.as_f64() {
2650                    Ok(json!(n as i64))
2651                } else if let Some(s) = value.as_str() {
2652                    s.parse::<i64>()
2653                        .map(|n| json!(n))
2654                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
2655                } else {
2656                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2657                }
2658            }
2659            "u8" | "u16" | "u32" | "u64" | "usize" => {
2660                if let Some(n) = value.as_u64() {
2661                    Ok(json!(n))
2662                } else if let Some(n) = value.as_i64() {
2663                    Ok(json!(n as u64))
2664                } else if let Some(n) = value.as_f64() {
2665                    Ok(json!(n as u64))
2666                } else if let Some(s) = value.as_str() {
2667                    s.parse::<u64>()
2668                        .map(|n| json!(n))
2669                        .map_err(|e| format!("Cannot parse '{}' as unsigned integer: {}", s, e).into())
2670                } else {
2671                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2672                }
2673            }
2674            "f32" | "f64" => {
2675                if let Some(n) = value.as_f64() {
2676                    Ok(json!(n))
2677                } else if let Some(n) = value.as_i64() {
2678                    Ok(json!(n as f64))
2679                } else if let Some(n) = value.as_u64() {
2680                    Ok(json!(n as f64))
2681                } else if let Some(s) = value.as_str() {
2682                    s.parse::<f64>()
2683                        .map(|n| json!(n))
2684                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
2685                } else {
2686                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2687                }
2688            }
2689            "String" | "string" => {
2690                Ok(json!(value.to_string()))
2691            }
2692            "bool" => {
2693                Ok(json!(self.value_to_bool(value)))
2694            }
2695            _ => {
2696                // Unknown type, return value as-is
2697                Ok(value.clone())
2698            }
2699        }
2700    }
2701
2702    /// Apply a method call to a value
2703    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
2704        match method {
2705            "unwrap_or" => {
2706                if value.is_null() && !args.is_empty() {
2707                    Ok(args[0].clone())
2708                } else {
2709                    Ok(value.clone())
2710                }
2711            }
2712            "unwrap_or_default" => {
2713                if value.is_null() {
2714                    // Return default for common types
2715                    Ok(json!(0))
2716                } else {
2717                    Ok(value.clone())
2718                }
2719            }
2720            "is_some" => {
2721                Ok(json!(!value.is_null()))
2722            }
2723            "is_none" => {
2724                Ok(json!(value.is_null()))
2725            }
2726            "abs" => {
2727                if let Some(n) = value.as_i64() {
2728                    Ok(json!(n.abs()))
2729                } else if let Some(n) = value.as_f64() {
2730                    Ok(json!(n.abs()))
2731                } else {
2732                    Err(format!("Cannot call abs() on {:?}", value).into())
2733                }
2734            }
2735            "len" => {
2736                if let Some(s) = value.as_str() {
2737                    Ok(json!(s.len()))
2738                } else if let Some(arr) = value.as_array() {
2739                    Ok(json!(arr.len()))
2740                } else if let Some(obj) = value.as_object() {
2741                    Ok(json!(obj.len()))
2742                } else {
2743                    Err(format!("Cannot call len() on {:?}", value).into())
2744                }
2745            }
2746            "to_string" => {
2747                Ok(json!(value.to_string()))
2748            }
2749            "min" => {
2750                if args.is_empty() {
2751                    return Err("min() requires an argument".into());
2752                }
2753                let other = &args[0];
2754                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2755                    Ok(json!(a.min(b)))
2756                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2757                    Ok(json!(a.min(b)))
2758                } else {
2759                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
2760                }
2761            }
2762            "max" => {
2763                if args.is_empty() {
2764                    return Err("max() requires an argument".into());
2765                }
2766                let other = &args[0];
2767                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2768                    Ok(json!(a.max(b)))
2769                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2770                    Ok(json!(a.max(b)))
2771                } else {
2772                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
2773                }
2774            }
2775            "saturating_add" => {
2776                if args.is_empty() {
2777                    return Err("saturating_add() requires an argument".into());
2778                }
2779                let other = &args[0];
2780                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2781                    Ok(json!(a.saturating_add(b)))
2782                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2783                    Ok(json!(a.saturating_add(b)))
2784                } else {
2785                    Err(format!("Cannot call saturating_add() on {:?} and {:?}", value, other).into())
2786                }
2787            }
2788            "saturating_sub" => {
2789                if args.is_empty() {
2790                    return Err("saturating_sub() requires an argument".into());
2791                }
2792                let other = &args[0];
2793                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2794                    Ok(json!(a.saturating_sub(b)))
2795                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2796                    Ok(json!(a.saturating_sub(b)))
2797                } else {
2798                    Err(format!("Cannot call saturating_sub() on {:?} and {:?}", value, other).into())
2799                }
2800            }
2801            _ => {
2802                // Unknown method - return value unchanged with a warning
2803                tracing::warn!("Unknown method call: {}() on {:?}", method, value);
2804                Ok(value.clone())
2805            }
2806        }
2807    }
2808
2809    /// Evaluate all computed fields for an entity and update the state
2810    /// This takes a list of ComputedFieldSpec from the AST and applies them
2811    pub fn evaluate_computed_fields_from_ast(
2812        &self,
2813        state: &mut Value,
2814        computed_field_specs: &[ComputedFieldSpec],
2815    ) -> Result<Vec<String>> {
2816        let mut updated_paths = Vec::new();
2817
2818        for spec in computed_field_specs {
2819            match self.evaluate_computed_expr(&spec.expression, state) {
2820                Ok(result) => {
2821                    // Set the computed value in state
2822                    self.set_field_in_state(state, &spec.target_path, result)?;
2823                    updated_paths.push(spec.target_path.clone());
2824                }
2825                Err(e) => {
2826                    // Log error but continue with other computed fields
2827                    tracing::warn!(
2828                        "Failed to evaluate computed field '{}': {}",
2829                        spec.target_path,
2830                        e
2831                    );
2832                }
2833            }
2834        }
2835
2836        Ok(updated_paths)
2837    }
2838
2839    /// Set a field value in state by path (e.g., "section.field")
2840    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
2841        let segments: Vec<&str> = path.split('.').collect();
2842        
2843        if segments.is_empty() {
2844            return Err("Empty path".into());
2845        }
2846
2847        // Navigate to parent, creating intermediate objects as needed
2848        let mut current = state;
2849        for (i, segment) in segments.iter().enumerate() {
2850            if i == segments.len() - 1 {
2851                // Last segment - set the value
2852                if let Some(obj) = current.as_object_mut() {
2853                    obj.insert(segment.to_string(), value);
2854                    return Ok(());
2855                } else {
2856                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
2857                }
2858            } else {
2859                // Intermediate segment - navigate or create
2860                if !current.is_object() {
2861                    *current = json!({});
2862                }
2863                let obj = current.as_object_mut().unwrap();
2864                current = obj
2865                    .entry(segment.to_string())
2866                    .or_insert_with(|| json!({}));
2867            }
2868        }
2869
2870        Ok(())
2871    }
2872
2873    /// Create a computed fields evaluator closure from AST specs
2874    /// This returns a function that can be passed to the bytecode builder
2875    pub fn create_evaluator_from_specs(
2876        specs: Vec<ComputedFieldSpec>,
2877    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
2878        move |state: &mut Value| {
2879            // Create a temporary VmContext just for evaluation
2880            // (We only need the expression evaluation methods)
2881            let vm = VmContext::new();
2882            vm.evaluate_computed_fields_from_ast(state, &specs)?;
2883            Ok(())
2884        }
2885    }
2886}
2887
2888impl Default for VmContext {
2889    fn default() -> Self {
2890        Self::new()
2891    }
2892}
2893
2894// Implement the ReverseLookupUpdater trait for VmContext
2895impl crate::resolvers::ReverseLookupUpdater for VmContext {
2896    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
2897        // Use default state_id=0 and default lookup name
2898        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
2899            .unwrap_or_else(|e| {
2900                tracing::error!("Failed to update PDA reverse lookup: {}", e);
2901                Vec::new()
2902            })
2903    }
2904
2905    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
2906        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
2907        self.flush_pending_updates(0, pda_address)
2908            .unwrap_or_else(|e| {
2909                tracing::error!("Failed to flush pending updates: {}", e);
2910                Vec::new()
2911            })
2912    }
2913}