hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// Additional custom metadata that can be added without breaking changes
33    pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37    /// Create a new UpdateContext with slot and signature
38    pub fn new(slot: u64, signature: String) -> Self {
39        Self {
40            slot: Some(slot),
41            signature: Some(signature),
42            timestamp: None,
43            write_version: None,
44            txn_index: None,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Create a new UpdateContext with slot, signature, and timestamp
50    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51        Self {
52            slot: Some(slot),
53            signature: Some(signature),
54            timestamp: Some(timestamp),
55            write_version: None,
56            txn_index: None,
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Create context for account updates with write_version for staleness detection
62    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63        Self {
64            slot: Some(slot),
65            signature: Some(signature),
66            timestamp: None,
67            write_version: Some(write_version),
68            txn_index: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create context for instruction updates with txn_index for staleness detection
74    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75        Self {
76            slot: Some(slot),
77            signature: Some(signature),
78            timestamp: None,
79            write_version: None,
80            txn_index: Some(txn_index),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Get the timestamp, falling back to current system time if not set
86    pub fn timestamp(&self) -> i64 {
87        self.timestamp.unwrap_or_else(|| {
88            std::time::SystemTime::now()
89                .duration_since(std::time::UNIX_EPOCH)
90                .unwrap()
91                .as_secs() as i64
92        })
93    }
94
95    /// Create an empty context (for testing or when context is not available)
96    pub fn empty() -> Self {
97        Self::default()
98    }
99
100    /// Add custom metadata
101    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102        self.metadata.insert(key, value);
103        self
104    }
105
106    /// Get metadata value
107    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108        self.metadata.get(key)
109    }
110
111    /// Convert context to JSON value for injection into event data
112    pub fn to_value(&self) -> Value {
113        let mut obj = serde_json::Map::new();
114        if let Some(slot) = self.slot {
115            obj.insert("slot".to_string(), json!(slot));
116        }
117        if let Some(ref sig) = self.signature {
118            obj.insert("signature".to_string(), json!(sig));
119        }
120        // Always include timestamp (use current time if not set)
121        obj.insert("timestamp".to_string(), json!(self.timestamp()));
122        for (key, value) in &self.metadata {
123            obj.insert(key.clone(), value.clone());
124        }
125        Value::Object(obj)
126    }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134/// Trait for evaluating computed fields
135/// Implement this in your generated spec to enable computed field evaluation
136pub trait ComputedFieldsEvaluator {
137    fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140// Pending queue configuration
141const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
144
145// Temporal index configuration - prevents unbounded history growth
146const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
147const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
148
149// State table configuration - aligned with downstream EntityCache (500 per view)
150const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
154
155const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
156
157const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
158
159const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
160
161/// Estimate the size of a JSON value in bytes
162fn estimate_json_size(value: &Value) -> usize {
163    match value {
164        Value::Null => 4,
165        Value::Bool(_) => 5,
166        Value::Number(_) => 8,
167        Value::String(s) => s.len() + 2,
168        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
169        Value::Object(obj) => {
170            2 + obj
171                .iter()
172                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
173                .sum::<usize>()
174        }
175    }
176}
177
178#[derive(Debug, Clone)]
179pub struct CompiledPath {
180    pub segments: std::sync::Arc<[String]>,
181}
182
183impl CompiledPath {
184    pub fn new(path: &str) -> Self {
185        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
186        CompiledPath {
187            segments: segments.into(),
188        }
189    }
190
191    fn segments(&self) -> &[String] {
192        &self.segments
193    }
194}
195
196/// Represents the type of change made to a field for granular dirty tracking.
197/// This enables emitting only the actual changes rather than entire field values.
198#[derive(Debug, Clone)]
199pub enum FieldChange {
200    /// Field was replaced with a new value (emit the full value from state)
201    Replaced,
202    /// Items were appended to an array field (emit only the new items)
203    Appended(Vec<Value>),
204}
205
206/// Tracks field modifications during handler execution with granular change information.
207/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
208#[derive(Debug, Clone, Default)]
209pub struct DirtyTracker {
210    changes: HashMap<String, FieldChange>,
211}
212
213impl DirtyTracker {
214    /// Create a new empty DirtyTracker
215    pub fn new() -> Self {
216        Self {
217            changes: HashMap::new(),
218        }
219    }
220
221    /// Mark a field as replaced (full value will be emitted)
222    pub fn mark_replaced(&mut self, path: &str) {
223        // If there was an append, it's now superseded by a full replacement
224        self.changes.insert(path.to_string(), FieldChange::Replaced);
225    }
226
227    /// Record an appended value for a field
228    pub fn mark_appended(&mut self, path: &str, value: Value) {
229        match self.changes.get_mut(path) {
230            Some(FieldChange::Appended(values)) => {
231                // Add to existing appended values
232                values.push(value);
233            }
234            Some(FieldChange::Replaced) => {
235                // Field was already replaced, keep it as replaced
236                // (the full value including the append will be emitted)
237            }
238            None => {
239                // First append to this field
240                self.changes
241                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
242            }
243        }
244    }
245
246    /// Check if there are any changes tracked
247    pub fn is_empty(&self) -> bool {
248        self.changes.is_empty()
249    }
250
251    /// Get the number of changed fields
252    pub fn len(&self) -> usize {
253        self.changes.len()
254    }
255
256    /// Iterate over all changes
257    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
258        self.changes.iter()
259    }
260
261    /// Get a set of all dirty field paths (for backward compatibility)
262    pub fn dirty_paths(&self) -> HashSet<String> {
263        self.changes.keys().cloned().collect()
264    }
265
266    /// Consume the tracker and return the changes map
267    pub fn into_changes(self) -> HashMap<String, FieldChange> {
268        self.changes
269    }
270
271    /// Get a reference to the changes map
272    pub fn changes(&self) -> &HashMap<String, FieldChange> {
273        &self.changes
274    }
275
276    /// Get paths that were appended (not replaced)
277    pub fn appended_paths(&self) -> Vec<String> {
278        self.changes
279            .iter()
280            .filter_map(|(path, change)| match change {
281                FieldChange::Appended(_) => Some(path.clone()),
282                FieldChange::Replaced => None,
283            })
284            .collect()
285    }
286}
287
288pub struct VmContext {
289    registers: Vec<RegisterValue>,
290    states: HashMap<u32, StateTable>,
291    pub instructions_executed: u64,
292    pub cache_hits: u64,
293    path_cache: HashMap<String, CompiledPath>,
294    pub pda_cache_hits: u64,
295    pub pda_cache_misses: u64,
296    pub pending_queue_size: u64,
297    current_context: Option<UpdateContext>,
298    warnings: Vec<String>,
299}
300
301#[derive(Debug)]
302pub struct LookupIndex {
303    index: std::sync::Mutex<LruCache<String, Value>>,
304}
305
306impl LookupIndex {
307    pub fn new() -> Self {
308        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
309    }
310
311    pub fn with_capacity(capacity: usize) -> Self {
312        LookupIndex {
313            index: std::sync::Mutex::new(LruCache::new(
314                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
315            )),
316        }
317    }
318
319    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
320        let key = value_to_cache_key(lookup_value);
321        self.index.lock().unwrap().get(&key).cloned()
322    }
323
324    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
325        let key = value_to_cache_key(&lookup_value);
326        self.index.lock().unwrap().put(key, primary_key);
327    }
328
329    pub fn len(&self) -> usize {
330        self.index.lock().unwrap().len()
331    }
332
333    pub fn is_empty(&self) -> bool {
334        self.index.lock().unwrap().is_empty()
335    }
336}
337
338impl Default for LookupIndex {
339    fn default() -> Self {
340        Self::new()
341    }
342}
343
344fn value_to_cache_key(value: &Value) -> String {
345    match value {
346        Value::String(s) => s.clone(),
347        Value::Number(n) => n.to_string(),
348        Value::Bool(b) => b.to_string(),
349        Value::Null => "null".to_string(),
350        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
351    }
352}
353
354#[derive(Debug)]
355pub struct TemporalIndex {
356    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
357}
358
359impl Default for TemporalIndex {
360    fn default() -> Self {
361        Self::new()
362    }
363}
364
365impl TemporalIndex {
366    pub fn new() -> Self {
367        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
368    }
369
370    pub fn with_capacity(capacity: usize) -> Self {
371        TemporalIndex {
372            index: std::sync::Mutex::new(LruCache::new(
373                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
374            )),
375        }
376    }
377
378    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
379        let key = value_to_cache_key(lookup_value);
380        let mut cache = self.index.lock().unwrap();
381        if let Some(entries) = cache.get(&key) {
382            for i in (0..entries.len()).rev() {
383                if entries[i].1 <= timestamp {
384                    return Some(entries[i].0.clone());
385                }
386            }
387        }
388        None
389    }
390
391    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
392        let key = value_to_cache_key(lookup_value);
393        let mut cache = self.index.lock().unwrap();
394        if let Some(entries) = cache.get(&key) {
395            if let Some(last) = entries.last() {
396                return Some(last.0.clone());
397            }
398        }
399        None
400    }
401
402    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
403        let key = value_to_cache_key(&lookup_value);
404        let mut cache = self.index.lock().unwrap();
405
406        let entries = cache.get_or_insert_mut(key, Vec::new);
407        entries.push((primary_key, timestamp));
408        entries.sort_by_key(|(_, ts)| *ts);
409
410        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
411        entries.retain(|(_, ts)| *ts >= cutoff);
412
413        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
414            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
415            entries.drain(0..excess);
416        }
417    }
418
419    pub fn len(&self) -> usize {
420        self.index.lock().unwrap().len()
421    }
422
423    pub fn is_empty(&self) -> bool {
424        self.index.lock().unwrap().is_empty()
425    }
426
427    pub fn total_entries(&self) -> usize {
428        self.index
429            .lock()
430            .unwrap()
431            .iter()
432            .map(|(_, entries)| entries.len())
433            .sum()
434    }
435
436    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
437        let mut cache = self.index.lock().unwrap();
438        let mut total_removed = 0;
439
440        for (_, entries) in cache.iter_mut() {
441            let original_len = entries.len();
442            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
443            total_removed += original_len - entries.len();
444        }
445
446        total_removed
447    }
448}
449
450#[derive(Debug)]
451pub struct PdaReverseLookup {
452    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
453    index: LruCache<String, String>,
454}
455
456impl PdaReverseLookup {
457    pub fn new(capacity: usize) -> Self {
458        PdaReverseLookup {
459            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
460        }
461    }
462
463    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
464        self.index.get(pda_address).cloned()
465    }
466
467    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
468        let evicted = if self.index.len() >= self.index.cap().get() {
469            self.index.peek_lru().map(|(k, _)| k.clone())
470        } else {
471            None
472        };
473
474        self.index.put(pda_address, seed_value);
475        evicted
476    }
477
478    pub fn len(&self) -> usize {
479        self.index.len()
480    }
481
482    pub fn is_empty(&self) -> bool {
483        self.index.is_empty()
484    }
485}
486
487/// Input for queueing an account update.
488#[derive(Debug, Clone)]
489pub struct QueuedAccountUpdate {
490    pub pda_address: String,
491    pub account_type: String,
492    pub account_data: Value,
493    pub slot: u64,
494    pub write_version: u64,
495    pub signature: String,
496}
497
498/// Internal representation of a pending account update with queue metadata.
499#[derive(Debug, Clone)]
500pub struct PendingAccountUpdate {
501    pub account_type: String,
502    pub pda_address: String,
503    pub account_data: Value,
504    pub slot: u64,
505    pub write_version: u64,
506    pub signature: String,
507    pub queued_at: i64,
508}
509
510#[derive(Debug, Clone)]
511pub struct PendingQueueStats {
512    pub total_updates: usize,
513    pub unique_pdas: usize,
514    pub oldest_age_seconds: i64,
515    pub largest_pda_queue_size: usize,
516    pub estimated_memory_bytes: usize,
517}
518
519#[derive(Debug, Clone, Default)]
520pub struct VmMemoryStats {
521    pub state_table_entity_count: usize,
522    pub state_table_max_entries: usize,
523    pub state_table_at_capacity: bool,
524    pub lookup_index_count: usize,
525    pub lookup_index_total_entries: usize,
526    pub temporal_index_count: usize,
527    pub temporal_index_total_entries: usize,
528    pub pda_reverse_lookup_count: usize,
529    pub pda_reverse_lookup_total_entries: usize,
530    pub version_tracker_entries: usize,
531    pub pending_queue_stats: Option<PendingQueueStats>,
532    pub path_cache_size: usize,
533}
534
535#[derive(Debug, Clone, Default)]
536pub struct CleanupResult {
537    pub pending_updates_removed: usize,
538    pub temporal_entries_removed: usize,
539}
540
541#[derive(Debug, Clone)]
542pub struct CapacityWarning {
543    pub current_entries: usize,
544    pub max_entries: usize,
545    pub entries_over_limit: usize,
546}
547
548#[derive(Debug, Clone)]
549pub struct StateTableConfig {
550    pub max_entries: usize,
551    pub max_array_length: usize,
552}
553
554impl Default for StateTableConfig {
555    fn default() -> Self {
556        Self {
557            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
558            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
559        }
560    }
561}
562
563#[derive(Debug)]
564pub struct VersionTracker {
565    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
566}
567
568impl VersionTracker {
569    pub fn new() -> Self {
570        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
571    }
572
573    pub fn with_capacity(capacity: usize) -> Self {
574        VersionTracker {
575            cache: std::sync::Mutex::new(LruCache::new(
576                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
577            )),
578        }
579    }
580
581    fn make_key(primary_key: &Value, event_type: &str) -> String {
582        format!("{}:{}", primary_key, event_type)
583    }
584
585    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
586        let key = Self::make_key(primary_key, event_type);
587        self.cache.lock().unwrap().get(&key).copied()
588    }
589
590    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
591        let key = Self::make_key(primary_key, event_type);
592        self.cache.lock().unwrap().put(key, (slot, ordering_value));
593    }
594
595    pub fn len(&self) -> usize {
596        self.cache.lock().unwrap().len()
597    }
598
599    pub fn is_empty(&self) -> bool {
600        self.cache.lock().unwrap().is_empty()
601    }
602}
603
604impl Default for VersionTracker {
605    fn default() -> Self {
606        Self::new()
607    }
608}
609
610#[derive(Debug)]
611pub struct StateTable {
612    pub data: DashMap<Value, Value>,
613    access_times: DashMap<Value, i64>,
614    pub lookup_indexes: HashMap<String, LookupIndex>,
615    pub temporal_indexes: HashMap<String, TemporalIndex>,
616    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
617    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
618    version_tracker: VersionTracker,
619    config: StateTableConfig,
620    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
621    entity_name: String,
622}
623
624impl StateTable {
625    pub fn is_at_capacity(&self) -> bool {
626        self.data.len() >= self.config.max_entries
627    }
628
629    pub fn entries_over_limit(&self) -> usize {
630        self.data.len().saturating_sub(self.config.max_entries)
631    }
632
633    pub fn max_array_length(&self) -> usize {
634        self.config.max_array_length
635    }
636
637    fn touch(&self, key: &Value) {
638        let now = std::time::SystemTime::now()
639            .duration_since(std::time::UNIX_EPOCH)
640            .unwrap()
641            .as_secs() as i64;
642        self.access_times.insert(key.clone(), now);
643    }
644
645    fn evict_lru(&self, count: usize) -> usize {
646        if count == 0 || self.data.is_empty() {
647            return 0;
648        }
649
650        let mut entries: Vec<(Value, i64)> = self
651            .access_times
652            .iter()
653            .map(|entry| (entry.key().clone(), *entry.value()))
654            .collect();
655
656        entries.sort_by_key(|(_, ts)| *ts);
657
658        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
659
660        let mut evicted = 0;
661        for key in to_evict {
662            self.data.remove(&key);
663            self.access_times.remove(&key);
664            evicted += 1;
665        }
666
667        #[cfg(feature = "otel")]
668        if evicted > 0 {
669            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
670        }
671
672        evicted
673    }
674
675    pub fn insert_with_eviction(&self, key: Value, value: Value) {
676        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
677            #[cfg(feature = "otel")]
678            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
679            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
680            self.evict_lru(to_evict.max(1));
681        }
682        self.data.insert(key.clone(), value);
683        self.touch(&key);
684    }
685
686    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
687        let result = self.data.get(key).map(|v| v.clone());
688        if result.is_some() {
689            self.touch(key);
690        }
691        result
692    }
693
694    /// Check if an update is fresh and update the version tracker.
695    /// Returns true if the update should be processed (is fresh).
696    /// Returns false if the update is stale and should be skipped.
697    ///
698    /// Comparison is lexicographic on (slot, ordering_value):
699    /// (100, 5) > (100, 3) > (99, 999)
700    pub fn is_fresh_update(
701        &self,
702        primary_key: &Value,
703        event_type: &str,
704        slot: u64,
705        ordering_value: u64,
706    ) -> bool {
707        let dominated = self
708            .version_tracker
709            .get(primary_key, event_type)
710            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
711            .unwrap_or(false);
712
713        if dominated {
714            return false;
715        }
716
717        self.version_tracker
718            .insert(primary_key, event_type, slot, ordering_value);
719        true
720    }
721}
722
723impl VmContext {
724    pub fn new() -> Self {
725        let mut vm = VmContext {
726            registers: vec![Value::Null; 256],
727            states: HashMap::new(),
728            instructions_executed: 0,
729            cache_hits: 0,
730            path_cache: HashMap::new(),
731            pda_cache_hits: 0,
732            pda_cache_misses: 0,
733            pending_queue_size: 0,
734            current_context: None,
735            warnings: Vec::new(),
736        };
737        vm.states.insert(
738            0,
739            StateTable {
740                data: DashMap::new(),
741                access_times: DashMap::new(),
742                lookup_indexes: HashMap::new(),
743                temporal_indexes: HashMap::new(),
744                pda_reverse_lookups: HashMap::new(),
745                pending_updates: DashMap::new(),
746                version_tracker: VersionTracker::new(),
747                config: StateTableConfig::default(),
748                entity_name: "default".to_string(),
749            },
750        );
751        vm
752    }
753
754    pub fn new_with_config(state_config: StateTableConfig) -> Self {
755        let mut vm = VmContext {
756            registers: vec![Value::Null; 256],
757            states: HashMap::new(),
758            instructions_executed: 0,
759            cache_hits: 0,
760            path_cache: HashMap::new(),
761            pda_cache_hits: 0,
762            pda_cache_misses: 0,
763            pending_queue_size: 0,
764            current_context: None,
765            warnings: Vec::new(),
766        };
767        vm.states.insert(
768            0,
769            StateTable {
770                data: DashMap::new(),
771                access_times: DashMap::new(),
772                lookup_indexes: HashMap::new(),
773                temporal_indexes: HashMap::new(),
774                pda_reverse_lookups: HashMap::new(),
775                pending_updates: DashMap::new(),
776                version_tracker: VersionTracker::new(),
777                config: state_config,
778                entity_name: "default".to_string(),
779            },
780        );
781        vm
782    }
783
784    /// Get a mutable reference to a state table by ID
785    /// Returns None if the state ID doesn't exist
786    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
787        self.states.get_mut(&state_id)
788    }
789
790    /// Get public access to registers (for metrics context)
791    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
792        &mut self.registers
793    }
794
795    /// Get public access to path cache (for metrics context)
796    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
797        &self.path_cache
798    }
799
800    /// Get the current update context
801    pub fn current_context(&self) -> Option<&UpdateContext> {
802        self.current_context.as_ref()
803    }
804
805    fn add_warning(&mut self, msg: String) {
806        self.warnings.push(msg);
807    }
808
809    pub fn take_warnings(&mut self) -> Vec<String> {
810        std::mem::take(&mut self.warnings)
811    }
812
813    pub fn has_warnings(&self) -> bool {
814        !self.warnings.is_empty()
815    }
816
817    pub fn update_state_from_register(
818        &mut self,
819        state_id: u32,
820        key: Value,
821        register: Register,
822    ) -> Result<()> {
823        let state = self.states.get(&state_id).ok_or("State table not found")?;
824        let value = self.registers[register].clone();
825        state.insert_with_eviction(key, value);
826        Ok(())
827    }
828
829    fn reset_registers(&mut self) {
830        for reg in &mut self.registers {
831            *reg = Value::Null;
832        }
833    }
834
835    /// Extract only the dirty fields from state (public for use by instruction hooks)
836    pub fn extract_partial_state(
837        &self,
838        state_reg: Register,
839        dirty_fields: &HashSet<String>,
840    ) -> Result<Value> {
841        let full_state = &self.registers[state_reg];
842
843        if dirty_fields.is_empty() {
844            return Ok(json!({}));
845        }
846
847        let mut partial = serde_json::Map::new();
848
849        for path in dirty_fields {
850            let segments: Vec<&str> = path.split('.').collect();
851
852            let mut current = full_state;
853            let mut found = true;
854
855            for segment in &segments {
856                match current.get(segment) {
857                    Some(v) => current = v,
858                    None => {
859                        found = false;
860                        break;
861                    }
862                }
863            }
864
865            if !found {
866                continue;
867            }
868
869            let mut target = &mut partial;
870            for (i, segment) in segments.iter().enumerate() {
871                if i == segments.len() - 1 {
872                    target.insert(segment.to_string(), current.clone());
873                } else {
874                    target
875                        .entry(segment.to_string())
876                        .or_insert_with(|| json!({}));
877                    target = target
878                        .get_mut(*segment)
879                        .and_then(|v| v.as_object_mut())
880                        .ok_or("Failed to build nested structure")?;
881                }
882            }
883        }
884
885        Ok(Value::Object(partial))
886    }
887
888    /// Extract a patch from state based on the DirtyTracker.
889    /// For Replaced fields: extracts the full value from state.
890    /// For Appended fields: emits only the appended values as an array.
891    pub fn extract_partial_state_with_tracker(
892        &self,
893        state_reg: Register,
894        tracker: &DirtyTracker,
895    ) -> Result<Value> {
896        let full_state = &self.registers[state_reg];
897
898        if tracker.is_empty() {
899            return Ok(json!({}));
900        }
901
902        let mut partial = serde_json::Map::new();
903
904        for (path, change) in tracker.iter() {
905            let segments: Vec<&str> = path.split('.').collect();
906
907            let value_to_insert = match change {
908                FieldChange::Replaced => {
909                    let mut current = full_state;
910                    let mut found = true;
911
912                    for segment in &segments {
913                        match current.get(*segment) {
914                            Some(v) => current = v,
915                            None => {
916                                found = false;
917                                break;
918                            }
919                        }
920                    }
921
922                    if !found {
923                        continue;
924                    }
925                    current.clone()
926                }
927                FieldChange::Appended(values) => Value::Array(values.clone()),
928            };
929
930            let mut target = &mut partial;
931            for (i, segment) in segments.iter().enumerate() {
932                if i == segments.len() - 1 {
933                    target.insert(segment.to_string(), value_to_insert.clone());
934                } else {
935                    target
936                        .entry(segment.to_string())
937                        .or_insert_with(|| json!({}));
938                    target = target
939                        .get_mut(*segment)
940                        .and_then(|v| v.as_object_mut())
941                        .ok_or("Failed to build nested structure")?;
942                }
943            }
944        }
945
946        Ok(Value::Object(partial))
947    }
948
949    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
950        if let Some(compiled) = self.path_cache.get(path) {
951            self.cache_hits += 1;
952            #[cfg(feature = "otel")]
953            crate::vm_metrics::record_path_cache_hit();
954            return compiled.clone();
955        }
956        #[cfg(feature = "otel")]
957        crate::vm_metrics::record_path_cache_miss();
958        let compiled = CompiledPath::new(path);
959        self.path_cache.insert(path.to_string(), compiled.clone());
960        compiled
961    }
962
963    /// Process an event with optional context metadata
964    #[cfg_attr(feature = "otel", instrument(
965        name = "vm.process_event",
966        skip(self, bytecode, event_value, log),
967        level = "info",
968        fields(
969            event_type = %event_type,
970            slot = context.as_ref().and_then(|c| c.slot),
971        )
972    ))]
973    pub fn process_event(
974        &mut self,
975        bytecode: &MultiEntityBytecode,
976        event_value: Value,
977        event_type: &str,
978        context: Option<&UpdateContext>,
979        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
980    ) -> Result<Vec<Mutation>> {
981        self.current_context = context.cloned();
982
983        let mut event_value = event_value;
984        if let Some(ctx) = context {
985            if let Some(obj) = event_value.as_object_mut() {
986                obj.insert("__update_context".to_string(), ctx.to_value());
987            }
988        }
989
990        let mut all_mutations = Vec::new();
991
992        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
993            for entity_name in entity_names {
994                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
995                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
996                        if let Some(ref mut log) = log {
997                            log.set("entity", entity_name.clone());
998                            log.inc("handlers", 1);
999                        }
1000
1001                        let opcodes_before = self.instructions_executed;
1002                        let cache_before = self.cache_hits;
1003                        let pda_hits_before = self.pda_cache_hits;
1004                        let pda_misses_before = self.pda_cache_misses;
1005
1006                        let mutations = self.execute_handler(
1007                            handler,
1008                            &event_value,
1009                            event_type,
1010                            entity_bytecode.state_id,
1011                            entity_name,
1012                            entity_bytecode.computed_fields_evaluator.as_ref(),
1013                        )?;
1014
1015                        if let Some(ref mut log) = log {
1016                            log.inc(
1017                                "opcodes",
1018                                (self.instructions_executed - opcodes_before) as i64,
1019                            );
1020                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1021                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1022                            log.inc(
1023                                "pda_misses",
1024                                (self.pda_cache_misses - pda_misses_before) as i64,
1025                            );
1026                        }
1027
1028                        all_mutations.extend(mutations);
1029                    } else if let Some(ref mut log) = log {
1030                        log.set("skip_reason", "no_handler");
1031                    }
1032                } else if let Some(ref mut log) = log {
1033                    log.set("skip_reason", "entity_not_found");
1034                }
1035            }
1036        } else if let Some(ref mut log) = log {
1037            log.set("skip_reason", "no_event_routing");
1038        }
1039
1040        if let Some(log) = log {
1041            log.set("mutations", all_mutations.len() as i64);
1042            if let Some(first) = all_mutations.first() {
1043                if let Some(key_str) = first.key.as_str() {
1044                    log.set("primary_key", key_str);
1045                } else if let Some(key_num) = first.key.as_u64() {
1046                    log.set("primary_key", key_num as i64);
1047                }
1048            }
1049            if let Some(state) = self.states.get(&0) {
1050                log.set("state_table_size", state.data.len() as i64);
1051            }
1052
1053            let warnings = self.take_warnings();
1054            if !warnings.is_empty() {
1055                log.set("warnings", warnings.len() as i64);
1056                log.set(
1057                    "warning_messages",
1058                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1059                );
1060                log.set_level(crate::canonical_log::LogLevel::Warn);
1061            }
1062        } else {
1063            self.warnings.clear();
1064        }
1065
1066        Ok(all_mutations)
1067    }
1068
1069    pub fn process_any(
1070        &mut self,
1071        bytecode: &MultiEntityBytecode,
1072        any: prost_types::Any,
1073    ) -> Result<Vec<Mutation>> {
1074        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1075        self.process_event(bytecode, event_value, &event_type, None, None)
1076    }
1077
1078    #[cfg_attr(feature = "otel", instrument(
1079        name = "vm.execute_handler",
1080        skip(self, handler, event_value, entity_evaluator),
1081        level = "debug",
1082        fields(
1083            event_type = %event_type,
1084            handler_opcodes = handler.len(),
1085        )
1086    ))]
1087    #[allow(clippy::type_complexity)]
1088    fn execute_handler(
1089        &mut self,
1090        handler: &[OpCode],
1091        event_value: &Value,
1092        event_type: &str,
1093        override_state_id: u32,
1094        entity_name: &str,
1095        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1096    ) -> Result<Vec<Mutation>> {
1097        self.reset_registers();
1098
1099        let mut pc: usize = 0;
1100        let mut output = Vec::new();
1101        let mut dirty_tracker = DirtyTracker::new();
1102
1103        while pc < handler.len() {
1104            match &handler[pc] {
1105                OpCode::LoadEventField {
1106                    path,
1107                    dest,
1108                    default,
1109                } => {
1110                    let value = self.load_field(event_value, path, default.as_ref())?;
1111                    self.registers[*dest] = value;
1112                    pc += 1;
1113                }
1114                OpCode::LoadConstant { value, dest } => {
1115                    self.registers[*dest] = value.clone();
1116                    pc += 1;
1117                }
1118                OpCode::CopyRegister { source, dest } => {
1119                    self.registers[*dest] = self.registers[*source].clone();
1120                    pc += 1;
1121                }
1122                OpCode::CopyRegisterIfNull { source, dest } => {
1123                    if self.registers[*dest].is_null() {
1124                        self.registers[*dest] = self.registers[*source].clone();
1125                    }
1126                    pc += 1;
1127                }
1128                OpCode::GetEventType { dest } => {
1129                    self.registers[*dest] = json!(event_type);
1130                    pc += 1;
1131                }
1132                OpCode::CreateObject { dest } => {
1133                    self.registers[*dest] = json!({});
1134                    pc += 1;
1135                }
1136                OpCode::SetField {
1137                    object,
1138                    path,
1139                    value,
1140                } => {
1141                    self.set_field_auto_vivify(*object, path, *value)?;
1142                    dirty_tracker.mark_replaced(path);
1143                    pc += 1;
1144                }
1145                OpCode::SetFields { object, fields } => {
1146                    for (path, value_reg) in fields {
1147                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1148                        dirty_tracker.mark_replaced(path);
1149                    }
1150                    pc += 1;
1151                }
1152                OpCode::GetField { object, path, dest } => {
1153                    let value = self.get_field(*object, path)?;
1154                    self.registers[*dest] = value;
1155                    pc += 1;
1156                }
1157                OpCode::ReadOrInitState {
1158                    state_id: _,
1159                    key,
1160                    default,
1161                    dest,
1162                } => {
1163                    let actual_state_id = override_state_id;
1164                    let entity_name_owned = entity_name.to_string();
1165                    self.states
1166                        .entry(actual_state_id)
1167                        .or_insert_with(|| StateTable {
1168                            data: DashMap::new(),
1169                            access_times: DashMap::new(),
1170                            lookup_indexes: HashMap::new(),
1171                            temporal_indexes: HashMap::new(),
1172                            pda_reverse_lookups: HashMap::new(),
1173                            pending_updates: DashMap::new(),
1174                            version_tracker: VersionTracker::new(),
1175                            config: StateTableConfig::default(),
1176                            entity_name: entity_name_owned,
1177                        });
1178                    let key_value = self.registers[*key].clone();
1179                    let warn_null_key = key_value.is_null()
1180                        && event_type.ends_with("State")
1181                        && !event_type.ends_with("IxState");
1182
1183                    if warn_null_key {
1184                        self.add_warning(format!(
1185                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1186                            key, event_type
1187                        ));
1188                    }
1189
1190                    let state = self
1191                        .states
1192                        .get(&actual_state_id)
1193                        .ok_or("State table not found")?;
1194
1195                    if !key_value.is_null() {
1196                        if let Some(ctx) = &self.current_context {
1197                            let ordering_value = ctx.write_version.or(ctx.txn_index);
1198                            if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
1199                                if !state.is_fresh_update(&key_value, event_type, slot, version) {
1200                                    self.add_warning(format!(
1201                                        "Stale update skipped: slot={}, version={}",
1202                                        slot, version
1203                                    ));
1204                                    return Ok(Vec::new());
1205                                }
1206                            }
1207                        }
1208                    }
1209                    let value = state
1210                        .get_and_touch(&key_value)
1211                        .unwrap_or_else(|| default.clone());
1212
1213                    self.registers[*dest] = value;
1214                    pc += 1;
1215                }
1216                OpCode::UpdateState {
1217                    state_id: _,
1218                    key,
1219                    value,
1220                } => {
1221                    let actual_state_id = override_state_id;
1222                    let state = self
1223                        .states
1224                        .get(&actual_state_id)
1225                        .ok_or("State table not found")?;
1226                    let key_value = self.registers[*key].clone();
1227                    let value_data = self.registers[*value].clone();
1228
1229                    state.insert_with_eviction(key_value, value_data);
1230                    pc += 1;
1231                }
1232                OpCode::AppendToArray {
1233                    object,
1234                    path,
1235                    value,
1236                } => {
1237                    let appended_value = self.registers[*value].clone();
1238                    let max_len = self
1239                        .states
1240                        .get(&override_state_id)
1241                        .map(|s| s.max_array_length())
1242                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1243                    self.append_to_array(*object, path, *value, max_len)?;
1244                    dirty_tracker.mark_appended(path, appended_value);
1245                    pc += 1;
1246                }
1247                OpCode::GetCurrentTimestamp { dest } => {
1248                    let timestamp = std::time::SystemTime::now()
1249                        .duration_since(std::time::UNIX_EPOCH)
1250                        .unwrap()
1251                        .as_secs() as i64;
1252                    self.registers[*dest] = json!(timestamp);
1253                    pc += 1;
1254                }
1255                OpCode::CreateEvent { dest, event_value } => {
1256                    let timestamp = std::time::SystemTime::now()
1257                        .duration_since(std::time::UNIX_EPOCH)
1258                        .unwrap()
1259                        .as_secs() as i64;
1260
1261                    // Filter out __update_context from the event data
1262                    let mut event_data = self.registers[*event_value].clone();
1263                    if let Some(obj) = event_data.as_object_mut() {
1264                        obj.remove("__update_context");
1265                    }
1266
1267                    // Create event with timestamp, data, and optional slot/signature from context
1268                    let mut event = serde_json::Map::new();
1269                    event.insert("timestamp".to_string(), json!(timestamp));
1270                    event.insert("data".to_string(), event_data);
1271
1272                    // Add slot and signature if available from current context
1273                    if let Some(ref ctx) = self.current_context {
1274                        if let Some(slot) = ctx.slot {
1275                            event.insert("slot".to_string(), json!(slot));
1276                        }
1277                        if let Some(ref signature) = ctx.signature {
1278                            event.insert("signature".to_string(), json!(signature));
1279                        }
1280                    }
1281
1282                    self.registers[*dest] = Value::Object(event);
1283                    pc += 1;
1284                }
1285                OpCode::CreateCapture {
1286                    dest,
1287                    capture_value,
1288                } => {
1289                    let timestamp = std::time::SystemTime::now()
1290                        .duration_since(std::time::UNIX_EPOCH)
1291                        .unwrap()
1292                        .as_secs() as i64;
1293
1294                    // Get the capture data (already filtered by load_field)
1295                    let capture_data = self.registers[*capture_value].clone();
1296
1297                    // Extract account_address from the original event if available
1298                    let account_address = event_value
1299                        .get("__account_address")
1300                        .and_then(|v| v.as_str())
1301                        .unwrap_or("")
1302                        .to_string();
1303
1304                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1305                    let mut capture = serde_json::Map::new();
1306                    capture.insert("timestamp".to_string(), json!(timestamp));
1307                    capture.insert("account_address".to_string(), json!(account_address));
1308                    capture.insert("data".to_string(), capture_data);
1309
1310                    // Add slot and signature if available from current context
1311                    if let Some(ref ctx) = self.current_context {
1312                        if let Some(slot) = ctx.slot {
1313                            capture.insert("slot".to_string(), json!(slot));
1314                        }
1315                        if let Some(ref signature) = ctx.signature {
1316                            capture.insert("signature".to_string(), json!(signature));
1317                        }
1318                    }
1319
1320                    self.registers[*dest] = Value::Object(capture);
1321                    pc += 1;
1322                }
1323                OpCode::Transform {
1324                    source,
1325                    dest,
1326                    transformation,
1327                } => {
1328                    if source == dest {
1329                        self.transform_in_place(*source, transformation)?;
1330                    } else {
1331                        let source_value = &self.registers[*source];
1332                        let value = self.apply_transformation(source_value, transformation)?;
1333                        self.registers[*dest] = value;
1334                    }
1335                    pc += 1;
1336                }
1337                OpCode::EmitMutation {
1338                    entity_name,
1339                    key,
1340                    state,
1341                } => {
1342                    let primary_key = self.registers[*key].clone();
1343
1344                    if primary_key.is_null() || dirty_tracker.is_empty() {
1345                        let reason = if dirty_tracker.is_empty() {
1346                            "no_fields_modified"
1347                        } else {
1348                            "null_primary_key"
1349                        };
1350                        self.add_warning(format!(
1351                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
1352                            entity_name,
1353                            reason,
1354                            dirty_tracker.len()
1355                        ));
1356                    } else {
1357                        let patch =
1358                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1359                        let append = dirty_tracker.appended_paths();
1360                        let mutation = Mutation {
1361                            export: entity_name.clone(),
1362                            key: primary_key,
1363                            patch,
1364                            append,
1365                        };
1366                        output.push(mutation);
1367                    }
1368                    pc += 1;
1369                }
1370                OpCode::SetFieldIfNull {
1371                    object,
1372                    path,
1373                    value,
1374                } => {
1375                    let was_set = self.set_field_if_null(*object, path, *value)?;
1376                    if was_set {
1377                        dirty_tracker.mark_replaced(path);
1378                    }
1379                    pc += 1;
1380                }
1381                OpCode::SetFieldMax {
1382                    object,
1383                    path,
1384                    value,
1385                } => {
1386                    let was_updated = self.set_field_max(*object, path, *value)?;
1387                    if was_updated {
1388                        dirty_tracker.mark_replaced(path);
1389                    }
1390                    pc += 1;
1391                }
1392                OpCode::UpdateTemporalIndex {
1393                    state_id: _,
1394                    index_name,
1395                    lookup_value,
1396                    primary_key,
1397                    timestamp,
1398                } => {
1399                    let actual_state_id = override_state_id;
1400                    let state = self
1401                        .states
1402                        .get_mut(&actual_state_id)
1403                        .ok_or("State table not found")?;
1404                    let index = state
1405                        .temporal_indexes
1406                        .entry(index_name.clone())
1407                        .or_insert_with(TemporalIndex::new);
1408
1409                    let lookup_val = self.registers[*lookup_value].clone();
1410                    let pk_val = self.registers[*primary_key].clone();
1411                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1412                        val
1413                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
1414                        val as i64
1415                    } else {
1416                        return Err(format!(
1417                            "Timestamp must be a number (i64 or u64), got: {:?}",
1418                            self.registers[*timestamp]
1419                        )
1420                        .into());
1421                    };
1422
1423                    index.insert(lookup_val, pk_val, ts_val);
1424                    pc += 1;
1425                }
1426                OpCode::LookupTemporalIndex {
1427                    state_id: _,
1428                    index_name,
1429                    lookup_value,
1430                    timestamp,
1431                    dest,
1432                } => {
1433                    let actual_state_id = override_state_id;
1434                    let state = self
1435                        .states
1436                        .get(&actual_state_id)
1437                        .ok_or("State table not found")?;
1438                    let lookup_val = &self.registers[*lookup_value];
1439
1440                    let result = if self.registers[*timestamp].is_null() {
1441                        if let Some(index) = state.temporal_indexes.get(index_name) {
1442                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1443                        } else {
1444                            Value::Null
1445                        }
1446                    } else {
1447                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1448                            val
1449                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
1450                            val as i64
1451                        } else {
1452                            return Err(format!(
1453                                "Timestamp must be a number (i64 or u64), got: {:?}",
1454                                self.registers[*timestamp]
1455                            )
1456                            .into());
1457                        };
1458
1459                        if let Some(index) = state.temporal_indexes.get(index_name) {
1460                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1461                        } else {
1462                            Value::Null
1463                        }
1464                    };
1465
1466                    self.registers[*dest] = result;
1467                    pc += 1;
1468                }
1469                OpCode::UpdateLookupIndex {
1470                    state_id: _,
1471                    index_name,
1472                    lookup_value,
1473                    primary_key,
1474                } => {
1475                    let actual_state_id = override_state_id;
1476                    let state = self
1477                        .states
1478                        .get_mut(&actual_state_id)
1479                        .ok_or("State table not found")?;
1480                    let index = state
1481                        .lookup_indexes
1482                        .entry(index_name.clone())
1483                        .or_insert_with(LookupIndex::new);
1484
1485                    let lookup_val = self.registers[*lookup_value].clone();
1486                    let pk_val = self.registers[*primary_key].clone();
1487
1488                    index.insert(lookup_val, pk_val);
1489                    pc += 1;
1490                }
1491                OpCode::LookupIndex {
1492                    state_id: _,
1493                    index_name,
1494                    lookup_value,
1495                    dest,
1496                } => {
1497                    let actual_state_id = override_state_id;
1498                    let lookup_val = self.registers[*lookup_value].clone();
1499
1500                    let result = {
1501                        let state = self
1502                            .states
1503                            .get(&actual_state_id)
1504                            .ok_or("State table not found")?;
1505
1506                        if let Some(index) = state.lookup_indexes.get(index_name) {
1507                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1508                            #[cfg(feature = "otel")]
1509                            if found.is_null() {
1510                                crate::vm_metrics::record_lookup_index_miss(index_name);
1511                            } else {
1512                                crate::vm_metrics::record_lookup_index_hit(index_name);
1513                            }
1514                            found
1515                        } else {
1516                            Value::Null
1517                        }
1518                    };
1519
1520                    let final_result = if result.is_null() {
1521                        if let Some(pda_str) = lookup_val.as_str() {
1522                            let state = self
1523                                .states
1524                                .get_mut(&actual_state_id)
1525                                .ok_or("State table not found")?;
1526
1527                            if let Some(pda_lookup) =
1528                                state.pda_reverse_lookups.get_mut("default_pda_lookup")
1529                            {
1530                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1531                                    Value::String(resolved)
1532                                } else {
1533                                    Value::Null
1534                                }
1535                            } else {
1536                                Value::Null
1537                            }
1538                        } else {
1539                            Value::Null
1540                        }
1541                    } else {
1542                        result
1543                    };
1544
1545                    self.registers[*dest] = final_result;
1546                    pc += 1;
1547                }
1548                OpCode::SetFieldSum {
1549                    object,
1550                    path,
1551                    value,
1552                } => {
1553                    let was_updated = self.set_field_sum(*object, path, *value)?;
1554                    if was_updated {
1555                        dirty_tracker.mark_replaced(path);
1556                    }
1557                    pc += 1;
1558                }
1559                OpCode::SetFieldIncrement { object, path } => {
1560                    let was_updated = self.set_field_increment(*object, path)?;
1561                    if was_updated {
1562                        dirty_tracker.mark_replaced(path);
1563                    }
1564                    pc += 1;
1565                }
1566                OpCode::SetFieldMin {
1567                    object,
1568                    path,
1569                    value,
1570                } => {
1571                    let was_updated = self.set_field_min(*object, path, *value)?;
1572                    if was_updated {
1573                        dirty_tracker.mark_replaced(path);
1574                    }
1575                    pc += 1;
1576                }
1577                OpCode::AddToUniqueSet {
1578                    state_id: _,
1579                    set_name,
1580                    value,
1581                    count_object,
1582                    count_path,
1583                } => {
1584                    let value_to_add = self.registers[*value].clone();
1585
1586                    // Store the unique set within the entity object, not in the state table
1587                    // This ensures each entity instance has its own unique set
1588                    let set_field_path = format!("__unique_set:{}", set_name);
1589
1590                    // Get or create the unique set from the entity object
1591                    let mut set: HashSet<Value> =
1592                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1593                            if !existing.is_null() {
1594                                serde_json::from_value(existing).unwrap_or_default()
1595                            } else {
1596                                HashSet::new()
1597                            }
1598                        } else {
1599                            HashSet::new()
1600                        };
1601
1602                    // Add value to set
1603                    let was_new = set.insert(value_to_add);
1604
1605                    // Store updated set back in the entity object
1606                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1607                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1608                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1609
1610                    // Update the count field in the object
1611                    if was_new {
1612                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1613                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1614                        dirty_tracker.mark_replaced(count_path);
1615                    }
1616
1617                    pc += 1;
1618                }
1619                OpCode::ConditionalSetField {
1620                    object,
1621                    path,
1622                    value,
1623                    condition_field,
1624                    condition_op,
1625                    condition_value,
1626                } => {
1627                    let field_value = self.load_field(event_value, condition_field, None)?;
1628                    let condition_met =
1629                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1630
1631                    if condition_met {
1632                        self.set_field_auto_vivify(*object, path, *value)?;
1633                        dirty_tracker.mark_replaced(path);
1634                    }
1635                    pc += 1;
1636                }
1637                OpCode::ConditionalIncrement {
1638                    object,
1639                    path,
1640                    condition_field,
1641                    condition_op,
1642                    condition_value,
1643                } => {
1644                    let field_value = self.load_field(event_value, condition_field, None)?;
1645                    let condition_met =
1646                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1647
1648                    if condition_met {
1649                        let was_updated = self.set_field_increment(*object, path)?;
1650                        if was_updated {
1651                            dirty_tracker.mark_replaced(path);
1652                        }
1653                    }
1654                    pc += 1;
1655                }
1656                OpCode::EvaluateComputedFields {
1657                    state,
1658                    computed_paths,
1659                } => {
1660                    if let Some(evaluator) = entity_evaluator {
1661                        let old_values: Vec<_> = computed_paths
1662                            .iter()
1663                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1664                            .collect();
1665
1666                        let state_value = &mut self.registers[*state];
1667                        if evaluator(state_value).is_ok() {
1668                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1669                                let new_value =
1670                                    Self::get_value_at_path(&self.registers[*state], path);
1671                                if new_value != *old_value {
1672                                    dirty_tracker.mark_replaced(path);
1673                                }
1674                            }
1675                        }
1676                    }
1677                    pc += 1;
1678                }
1679                OpCode::UpdatePdaReverseLookup {
1680                    state_id: _,
1681                    lookup_name,
1682                    pda_address,
1683                    primary_key,
1684                } => {
1685                    let actual_state_id = override_state_id;
1686                    let state = self
1687                        .states
1688                        .get_mut(&actual_state_id)
1689                        .ok_or("State table not found")?;
1690
1691                    let pda_val = self.registers[*pda_address].clone();
1692                    let pk_val = self.registers[*primary_key].clone();
1693
1694                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1695                        let pda_lookup = state
1696                            .pda_reverse_lookups
1697                            .entry(lookup_name.clone())
1698                            .or_insert_with(|| {
1699                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1700                            });
1701
1702                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1703                    } else if !pk_val.is_null() {
1704                        if let Some(pk_num) = pk_val.as_u64() {
1705                            if let Some(pda_str) = pda_val.as_str() {
1706                                let pda_lookup = state
1707                                    .pda_reverse_lookups
1708                                    .entry(lookup_name.clone())
1709                                    .or_insert_with(|| {
1710                                        PdaReverseLookup::new(
1711                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1712                                        )
1713                                    });
1714
1715                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1716                            }
1717                        }
1718                    }
1719
1720                    pc += 1;
1721                }
1722            }
1723
1724            self.instructions_executed += 1;
1725        }
1726
1727        Ok(output)
1728    }
1729
1730    fn load_field(
1731        &self,
1732        event_value: &Value,
1733        path: &FieldPath,
1734        default: Option<&Value>,
1735    ) -> Result<Value> {
1736        if path.segments.is_empty() {
1737            if let Some(obj) = event_value.as_object() {
1738                let filtered: serde_json::Map<String, Value> = obj
1739                    .iter()
1740                    .filter(|(k, _)| !k.starts_with("__"))
1741                    .map(|(k, v)| (k.clone(), v.clone()))
1742                    .collect();
1743                return Ok(Value::Object(filtered));
1744            }
1745            return Ok(event_value.clone());
1746        }
1747
1748        let mut current = event_value;
1749        for segment in path.segments.iter() {
1750            current = match current.get(segment) {
1751                Some(v) => v,
1752                None => return Ok(default.cloned().unwrap_or(Value::Null)),
1753            };
1754        }
1755
1756        Ok(current.clone())
1757    }
1758
1759    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1760        let mut current = value;
1761        for segment in path.split('.') {
1762            current = current.get(segment)?;
1763        }
1764        Some(current.clone())
1765    }
1766
1767    fn set_field_auto_vivify(
1768        &mut self,
1769        object_reg: Register,
1770        path: &str,
1771        value_reg: Register,
1772    ) -> Result<()> {
1773        let compiled = self.get_compiled_path(path);
1774        let segments = compiled.segments();
1775        let value = self.registers[value_reg].clone();
1776
1777        if !self.registers[object_reg].is_object() {
1778            self.registers[object_reg] = json!({});
1779        }
1780
1781        let obj = self.registers[object_reg]
1782            .as_object_mut()
1783            .ok_or("Not an object")?;
1784
1785        let mut current = obj;
1786        for (i, segment) in segments.iter().enumerate() {
1787            if i == segments.len() - 1 {
1788                current.insert(segment.to_string(), value);
1789                return Ok(());
1790            } else {
1791                current
1792                    .entry(segment.to_string())
1793                    .or_insert_with(|| json!({}));
1794                current = current
1795                    .get_mut(segment)
1796                    .and_then(|v| v.as_object_mut())
1797                    .ok_or("Path collision: expected object")?;
1798            }
1799        }
1800
1801        Ok(())
1802    }
1803
1804    fn set_field_if_null(
1805        &mut self,
1806        object_reg: Register,
1807        path: &str,
1808        value_reg: Register,
1809    ) -> Result<bool> {
1810        let compiled = self.get_compiled_path(path);
1811        let segments = compiled.segments();
1812        let value = self.registers[value_reg].clone();
1813
1814        if !self.registers[object_reg].is_object() {
1815            self.registers[object_reg] = json!({});
1816        }
1817
1818        let obj = self.registers[object_reg]
1819            .as_object_mut()
1820            .ok_or("Not an object")?;
1821
1822        let mut current = obj;
1823        for (i, segment) in segments.iter().enumerate() {
1824            if i == segments.len() - 1 {
1825                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1826                    current.insert(segment.to_string(), value);
1827                    return Ok(true);
1828                }
1829                return Ok(false);
1830            } else {
1831                current
1832                    .entry(segment.to_string())
1833                    .or_insert_with(|| json!({}));
1834                current = current
1835                    .get_mut(segment)
1836                    .and_then(|v| v.as_object_mut())
1837                    .ok_or("Path collision: expected object")?;
1838            }
1839        }
1840
1841        Ok(false)
1842    }
1843
1844    fn set_field_max(
1845        &mut self,
1846        object_reg: Register,
1847        path: &str,
1848        value_reg: Register,
1849    ) -> Result<bool> {
1850        let compiled = self.get_compiled_path(path);
1851        let segments = compiled.segments();
1852        let new_value = self.registers[value_reg].clone();
1853
1854        if !self.registers[object_reg].is_object() {
1855            self.registers[object_reg] = json!({});
1856        }
1857
1858        let obj = self.registers[object_reg]
1859            .as_object_mut()
1860            .ok_or("Not an object")?;
1861
1862        let mut current = obj;
1863        for (i, segment) in segments.iter().enumerate() {
1864            if i == segments.len() - 1 {
1865                let should_update = if let Some(current_value) = current.get(segment) {
1866                    if current_value.is_null() {
1867                        true
1868                    } else {
1869                        match (current_value.as_i64(), new_value.as_i64()) {
1870                            (Some(current_val), Some(new_val)) => new_val > current_val,
1871                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1872                                new_value.as_u64().unwrap() as i64 > current_val
1873                            }
1874                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1875                                new_val > current_value.as_u64().unwrap() as i64
1876                            }
1877                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1878                                (Some(current_val), Some(new_val)) => new_val > current_val,
1879                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1880                                    (Some(current_val), Some(new_val)) => new_val > current_val,
1881                                    _ => false,
1882                                },
1883                            },
1884                            _ => false,
1885                        }
1886                    }
1887                } else {
1888                    true
1889                };
1890
1891                if should_update {
1892                    current.insert(segment.to_string(), new_value);
1893                    return Ok(true);
1894                }
1895                return Ok(false);
1896            } else {
1897                current
1898                    .entry(segment.to_string())
1899                    .or_insert_with(|| json!({}));
1900                current = current
1901                    .get_mut(segment)
1902                    .and_then(|v| v.as_object_mut())
1903                    .ok_or("Path collision: expected object")?;
1904            }
1905        }
1906
1907        Ok(false)
1908    }
1909
1910    fn set_field_sum(
1911        &mut self,
1912        object_reg: Register,
1913        path: &str,
1914        value_reg: Register,
1915    ) -> Result<bool> {
1916        let compiled = self.get_compiled_path(path);
1917        let segments = compiled.segments();
1918        let new_value = &self.registers[value_reg];
1919
1920        // Extract numeric value before borrowing object_reg mutably
1921        let new_val_num = new_value
1922            .as_i64()
1923            .or_else(|| new_value.as_u64().map(|n| n as i64))
1924            .ok_or("Sum requires numeric value")?;
1925
1926        if !self.registers[object_reg].is_object() {
1927            self.registers[object_reg] = json!({});
1928        }
1929
1930        let obj = self.registers[object_reg]
1931            .as_object_mut()
1932            .ok_or("Not an object")?;
1933
1934        let mut current = obj;
1935        for (i, segment) in segments.iter().enumerate() {
1936            if i == segments.len() - 1 {
1937                let current_val = current
1938                    .get(segment)
1939                    .and_then(|v| {
1940                        if v.is_null() {
1941                            None
1942                        } else {
1943                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1944                        }
1945                    })
1946                    .unwrap_or(0);
1947
1948                let sum = current_val + new_val_num;
1949                current.insert(segment.to_string(), json!(sum));
1950                return Ok(true);
1951            } else {
1952                current
1953                    .entry(segment.to_string())
1954                    .or_insert_with(|| json!({}));
1955                current = current
1956                    .get_mut(segment)
1957                    .and_then(|v| v.as_object_mut())
1958                    .ok_or("Path collision: expected object")?;
1959            }
1960        }
1961
1962        Ok(false)
1963    }
1964
1965    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1966        let compiled = self.get_compiled_path(path);
1967        let segments = compiled.segments();
1968
1969        if !self.registers[object_reg].is_object() {
1970            self.registers[object_reg] = json!({});
1971        }
1972
1973        let obj = self.registers[object_reg]
1974            .as_object_mut()
1975            .ok_or("Not an object")?;
1976
1977        let mut current = obj;
1978        for (i, segment) in segments.iter().enumerate() {
1979            if i == segments.len() - 1 {
1980                // Get current value (default to 0 if null/missing)
1981                let current_val = current
1982                    .get(segment)
1983                    .and_then(|v| {
1984                        if v.is_null() {
1985                            None
1986                        } else {
1987                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1988                        }
1989                    })
1990                    .unwrap_or(0);
1991
1992                let incremented = current_val + 1;
1993                current.insert(segment.to_string(), json!(incremented));
1994                return Ok(true);
1995            } else {
1996                current
1997                    .entry(segment.to_string())
1998                    .or_insert_with(|| json!({}));
1999                current = current
2000                    .get_mut(segment)
2001                    .and_then(|v| v.as_object_mut())
2002                    .ok_or("Path collision: expected object")?;
2003            }
2004        }
2005
2006        Ok(false)
2007    }
2008
2009    fn set_field_min(
2010        &mut self,
2011        object_reg: Register,
2012        path: &str,
2013        value_reg: Register,
2014    ) -> Result<bool> {
2015        let compiled = self.get_compiled_path(path);
2016        let segments = compiled.segments();
2017        let new_value = self.registers[value_reg].clone();
2018
2019        if !self.registers[object_reg].is_object() {
2020            self.registers[object_reg] = json!({});
2021        }
2022
2023        let obj = self.registers[object_reg]
2024            .as_object_mut()
2025            .ok_or("Not an object")?;
2026
2027        let mut current = obj;
2028        for (i, segment) in segments.iter().enumerate() {
2029            if i == segments.len() - 1 {
2030                let should_update = if let Some(current_value) = current.get(segment) {
2031                    if current_value.is_null() {
2032                        true
2033                    } else {
2034                        match (current_value.as_i64(), new_value.as_i64()) {
2035                            (Some(current_val), Some(new_val)) => new_val < current_val,
2036                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2037                                (new_value.as_u64().unwrap() as i64) < current_val
2038                            }
2039                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2040                                new_val < current_value.as_u64().unwrap() as i64
2041                            }
2042                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2043                                (Some(current_val), Some(new_val)) => new_val < current_val,
2044                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2045                                    (Some(current_val), Some(new_val)) => new_val < current_val,
2046                                    _ => false,
2047                                },
2048                            },
2049                            _ => false,
2050                        }
2051                    }
2052                } else {
2053                    true
2054                };
2055
2056                if should_update {
2057                    current.insert(segment.to_string(), new_value);
2058                    return Ok(true);
2059                }
2060                return Ok(false);
2061            } else {
2062                current
2063                    .entry(segment.to_string())
2064                    .or_insert_with(|| json!({}));
2065                current = current
2066                    .get_mut(segment)
2067                    .and_then(|v| v.as_object_mut())
2068                    .ok_or("Path collision: expected object")?;
2069            }
2070        }
2071
2072        Ok(false)
2073    }
2074
2075    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2076        let compiled = self.get_compiled_path(path);
2077        let segments = compiled.segments();
2078        let mut current = &self.registers[object_reg];
2079
2080        for segment in segments {
2081            current = current
2082                .get(segment)
2083                .ok_or_else(|| format!("Field not found: {}", segment))?;
2084        }
2085
2086        Ok(current.clone())
2087    }
2088
2089    fn append_to_array(
2090        &mut self,
2091        object_reg: Register,
2092        path: &str,
2093        value_reg: Register,
2094        max_length: usize,
2095    ) -> Result<()> {
2096        let compiled = self.get_compiled_path(path);
2097        let segments = compiled.segments();
2098        let value = self.registers[value_reg].clone();
2099
2100        if !self.registers[object_reg].is_object() {
2101            self.registers[object_reg] = json!({});
2102        }
2103
2104        let obj = self.registers[object_reg]
2105            .as_object_mut()
2106            .ok_or("Not an object")?;
2107
2108        let mut current = obj;
2109        for (i, segment) in segments.iter().enumerate() {
2110            if i == segments.len() - 1 {
2111                current
2112                    .entry(segment.to_string())
2113                    .or_insert_with(|| json!([]));
2114                let arr = current
2115                    .get_mut(segment)
2116                    .and_then(|v| v.as_array_mut())
2117                    .ok_or("Path is not an array")?;
2118                arr.push(value.clone());
2119
2120                if arr.len() > max_length {
2121                    let excess = arr.len() - max_length;
2122                    arr.drain(0..excess);
2123                }
2124            } else {
2125                current
2126                    .entry(segment.to_string())
2127                    .or_insert_with(|| json!({}));
2128                current = current
2129                    .get_mut(segment)
2130                    .and_then(|v| v.as_object_mut())
2131                    .ok_or("Path collision: expected object")?;
2132            }
2133        }
2134
2135        Ok(())
2136    }
2137
2138    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2139        let value = &self.registers[reg];
2140        let transformed = self.apply_transformation(value, transformation)?;
2141        self.registers[reg] = transformed;
2142        Ok(())
2143    }
2144
2145    fn apply_transformation(
2146        &self,
2147        value: &Value,
2148        transformation: &Transformation,
2149    ) -> Result<Value> {
2150        match transformation {
2151            Transformation::HexEncode => {
2152                if let Some(arr) = value.as_array() {
2153                    let bytes: Vec<u8> = arr
2154                        .iter()
2155                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2156                        .collect();
2157                    let hex = hex::encode(&bytes);
2158                    Ok(json!(hex))
2159                } else {
2160                    Err("HexEncode requires an array of numbers".into())
2161                }
2162            }
2163            Transformation::HexDecode => {
2164                if let Some(s) = value.as_str() {
2165                    let s = s.strip_prefix("0x").unwrap_or(s);
2166                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2167                    Ok(json!(bytes))
2168                } else {
2169                    Err("HexDecode requires a string".into())
2170                }
2171            }
2172            Transformation::Base58Encode => {
2173                if let Some(arr) = value.as_array() {
2174                    let bytes: Vec<u8> = arr
2175                        .iter()
2176                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2177                        .collect();
2178                    let encoded = bs58::encode(&bytes).into_string();
2179                    Ok(json!(encoded))
2180                } else if value.is_string() {
2181                    Ok(value.clone())
2182                } else {
2183                    Err("Base58Encode requires an array of numbers".into())
2184                }
2185            }
2186            Transformation::Base58Decode => {
2187                if let Some(s) = value.as_str() {
2188                    let bytes = bs58::decode(s)
2189                        .into_vec()
2190                        .map_err(|e| format!("Base58 decode error: {}", e))?;
2191                    Ok(json!(bytes))
2192                } else {
2193                    Err("Base58Decode requires a string".into())
2194                }
2195            }
2196            Transformation::ToString => Ok(json!(value.to_string())),
2197            Transformation::ToNumber => {
2198                if let Some(s) = value.as_str() {
2199                    let n = s
2200                        .parse::<i64>()
2201                        .map_err(|e| format!("Parse error: {}", e))?;
2202                    Ok(json!(n))
2203                } else {
2204                    Ok(value.clone())
2205                }
2206            }
2207        }
2208    }
2209
2210    fn evaluate_comparison(
2211        &self,
2212        field_value: &Value,
2213        op: &ComparisonOp,
2214        condition_value: &Value,
2215    ) -> Result<bool> {
2216        use ComparisonOp::*;
2217
2218        match op {
2219            Equal => Ok(field_value == condition_value),
2220            NotEqual => Ok(field_value != condition_value),
2221            GreaterThan => {
2222                // Try to compare as numbers
2223                match (field_value.as_i64(), condition_value.as_i64()) {
2224                    (Some(a), Some(b)) => Ok(a > b),
2225                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
2226                        (Some(a), Some(b)) => Ok(a > b),
2227                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
2228                            (Some(a), Some(b)) => Ok(a > b),
2229                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2230                        },
2231                    },
2232                }
2233            }
2234            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2235                (Some(a), Some(b)) => Ok(a >= b),
2236                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2237                    (Some(a), Some(b)) => Ok(a >= b),
2238                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2239                        (Some(a), Some(b)) => Ok(a >= b),
2240                        _ => {
2241                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2242                        }
2243                    },
2244                },
2245            },
2246            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2247                (Some(a), Some(b)) => Ok(a < b),
2248                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2249                    (Some(a), Some(b)) => Ok(a < b),
2250                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2251                        (Some(a), Some(b)) => Ok(a < b),
2252                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
2253                    },
2254                },
2255            },
2256            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2257                (Some(a), Some(b)) => Ok(a <= b),
2258                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2259                    (Some(a), Some(b)) => Ok(a <= b),
2260                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2261                        (Some(a), Some(b)) => Ok(a <= b),
2262                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2263                    },
2264                },
2265            },
2266        }
2267    }
2268
2269    /// Update a PDA reverse lookup and return pending updates for reprocessing.
2270    /// Returns any pending account updates that were queued for this PDA.
2271    /// ```ignore
2272    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2273    /// for update in pending {
2274    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
2275    /// }
2276    /// ```
2277    #[cfg_attr(feature = "otel", instrument(
2278        name = "vm.update_pda_lookup",
2279        skip(self),
2280        fields(
2281            pda = %pda_address,
2282            seed = %seed_value,
2283        )
2284    ))]
2285    pub fn update_pda_reverse_lookup(
2286        &mut self,
2287        state_id: u32,
2288        lookup_name: &str,
2289        pda_address: String,
2290        seed_value: String,
2291    ) -> Result<Vec<PendingAccountUpdate>> {
2292        let state = self
2293            .states
2294            .get_mut(&state_id)
2295            .ok_or("State table not found")?;
2296
2297        let lookup = state
2298            .pda_reverse_lookups
2299            .entry(lookup_name.to_string())
2300            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2301
2302        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2303
2304        if let Some(ref evicted) = evicted_pda {
2305            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2306                let count = evicted_updates.len();
2307                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2308            }
2309        }
2310
2311        // Flush and return pending updates for this PDA
2312        self.flush_pending_updates(state_id, &pda_address)
2313    }
2314
2315    /// Clean up expired pending updates that are older than the TTL
2316    ///
2317    /// Returns the number of updates that were removed.
2318    /// This should be called periodically to prevent memory leaks from orphaned updates.
2319    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2320        let state = match self.states.get_mut(&state_id) {
2321            Some(s) => s,
2322            None => return 0,
2323        };
2324
2325        let now = std::time::SystemTime::now()
2326            .duration_since(std::time::UNIX_EPOCH)
2327            .unwrap()
2328            .as_secs() as i64;
2329
2330        let mut removed_count = 0;
2331
2332        // Iterate through all pending updates and remove expired ones
2333        state.pending_updates.retain(|_pda_address, updates| {
2334            let original_len = updates.len();
2335
2336            updates.retain(|update| {
2337                let age = now - update.queued_at;
2338                age <= PENDING_UPDATE_TTL_SECONDS
2339            });
2340
2341            removed_count += original_len - updates.len();
2342
2343            // Remove the entry entirely if no updates remain
2344            !updates.is_empty()
2345        });
2346
2347        // Update the global counter
2348        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2349
2350        if removed_count > 0 {
2351            #[cfg(feature = "otel")]
2352            crate::vm_metrics::record_pending_updates_expired(
2353                removed_count as u64,
2354                &state.entity_name,
2355            );
2356        }
2357
2358        removed_count
2359    }
2360
2361    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2362    ///
2363    /// # Workflow
2364    ///
2365    /// This implements a deferred processing pattern for account updates when the PDA reverse
2366    /// lookup needed to resolve the primary key is not yet available:
2367    ///
2368    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2369    ///    is not available, call `queue_account_update()` to queue it for later.
2370    ///
2371    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2372    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2373    ///
2374    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2375    ///    ```ignore
2376    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2377    ///    for update in pending {
2378    ///        let mutations = vm.process_event(
2379    ///            &bytecode, update.account_data, &update.account_type, None, None
2380    ///        )?;
2381    ///    }
2382    ///    ```
2383    ///
2384    /// # Arguments
2385    ///
2386    /// * `state_id` - The state table ID
2387    /// * `pda_address` - The PDA address that needs reverse lookup
2388    /// * `account_type` - The event type name for reprocessing
2389    /// * `account_data` - The account data (event value) for reprocessing
2390    /// * `slot` - The slot number when this update occurred
2391    /// * `signature` - The transaction signature
2392    #[cfg_attr(feature = "otel", instrument(
2393        name = "vm.queue_account_update",
2394        skip(self, update),
2395        fields(
2396            pda = %update.pda_address,
2397            account_type = %update.account_type,
2398            slot = update.slot,
2399        )
2400    ))]
2401    pub fn queue_account_update(
2402        &mut self,
2403        state_id: u32,
2404        update: QueuedAccountUpdate,
2405    ) -> Result<()> {
2406        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2407            self.cleanup_expired_pending_updates(state_id);
2408            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2409                self.drop_oldest_pending_update(state_id)?;
2410            }
2411        }
2412
2413        let state = self
2414            .states
2415            .get_mut(&state_id)
2416            .ok_or("State table not found")?;
2417
2418        let pending = PendingAccountUpdate {
2419            account_type: update.account_type,
2420            pda_address: update.pda_address.clone(),
2421            account_data: update.account_data,
2422            slot: update.slot,
2423            write_version: update.write_version,
2424            signature: update.signature,
2425            queued_at: std::time::SystemTime::now()
2426                .duration_since(std::time::UNIX_EPOCH)
2427                .unwrap()
2428                .as_secs() as i64,
2429        };
2430
2431        let pda_address = pending.pda_address.clone();
2432        let slot = pending.slot;
2433
2434        let mut updates = state
2435            .pending_updates
2436            .entry(pda_address.clone())
2437            .or_insert_with(Vec::new);
2438
2439        let original_len = updates.len();
2440        updates.retain(|existing| existing.slot > slot);
2441        let removed_by_dedup = original_len - updates.len();
2442
2443        if removed_by_dedup > 0 {
2444            self.pending_queue_size = self
2445                .pending_queue_size
2446                .saturating_sub(removed_by_dedup as u64);
2447        }
2448
2449        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2450            updates.remove(0);
2451            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2452        }
2453
2454        updates.push(pending);
2455        #[cfg(feature = "otel")]
2456        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2457
2458        Ok(())
2459    }
2460
2461    /// Get statistics about the pending queue for monitoring
2462    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2463        let state = self.states.get(&state_id)?;
2464
2465        let now = std::time::SystemTime::now()
2466            .duration_since(std::time::UNIX_EPOCH)
2467            .unwrap()
2468            .as_secs() as i64;
2469
2470        let mut total_updates = 0;
2471        let mut oldest_timestamp = now;
2472        let mut largest_pda_queue = 0;
2473        let mut estimated_memory = 0;
2474
2475        for entry in state.pending_updates.iter() {
2476            let (_, updates) = entry.pair();
2477            total_updates += updates.len();
2478            largest_pda_queue = largest_pda_queue.max(updates.len());
2479
2480            for update in updates.iter() {
2481                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2482                // Rough memory estimate
2483                estimated_memory += update.account_type.len() +
2484                                   update.pda_address.len() +
2485                                   update.signature.len() +
2486                                   16 + // slot + queued_at
2487                                   estimate_json_size(&update.account_data);
2488            }
2489        }
2490
2491        Some(PendingQueueStats {
2492            total_updates,
2493            unique_pdas: state.pending_updates.len(),
2494            oldest_age_seconds: now - oldest_timestamp,
2495            largest_pda_queue_size: largest_pda_queue,
2496            estimated_memory_bytes: estimated_memory,
2497        })
2498    }
2499
2500    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2501        let mut stats = VmMemoryStats {
2502            path_cache_size: self.path_cache.len(),
2503            ..Default::default()
2504        };
2505
2506        if let Some(state) = self.states.get(&state_id) {
2507            stats.state_table_entity_count = state.data.len();
2508            stats.state_table_max_entries = state.config.max_entries;
2509            stats.state_table_at_capacity = state.is_at_capacity();
2510
2511            stats.lookup_index_count = state.lookup_indexes.len();
2512            stats.lookup_index_total_entries =
2513                state.lookup_indexes.values().map(|idx| idx.len()).sum();
2514
2515            stats.temporal_index_count = state.temporal_indexes.len();
2516            stats.temporal_index_total_entries = state
2517                .temporal_indexes
2518                .values()
2519                .map(|idx| idx.total_entries())
2520                .sum();
2521
2522            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2523            stats.pda_reverse_lookup_total_entries = state
2524                .pda_reverse_lookups
2525                .values()
2526                .map(|lookup| lookup.len())
2527                .sum();
2528
2529            stats.version_tracker_entries = state.version_tracker.len();
2530
2531            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2532        }
2533
2534        stats
2535    }
2536
2537    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2538        let pending_removed = self.cleanup_expired_pending_updates(state_id);
2539        let temporal_removed = self.cleanup_temporal_indexes(state_id);
2540
2541        #[cfg(feature = "otel")]
2542        if let Some(state) = self.states.get(&state_id) {
2543            crate::vm_metrics::record_cleanup(
2544                pending_removed,
2545                temporal_removed,
2546                &state.entity_name,
2547            );
2548        }
2549
2550        CleanupResult {
2551            pending_updates_removed: pending_removed,
2552            temporal_entries_removed: temporal_removed,
2553        }
2554    }
2555
2556    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2557        let state = match self.states.get_mut(&state_id) {
2558            Some(s) => s,
2559            None => return 0,
2560        };
2561
2562        let now = std::time::SystemTime::now()
2563            .duration_since(std::time::UNIX_EPOCH)
2564            .unwrap()
2565            .as_secs() as i64;
2566
2567        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2568        let mut total_removed = 0;
2569
2570        for (_, index) in state.temporal_indexes.iter_mut() {
2571            total_removed += index.cleanup_expired(cutoff);
2572        }
2573
2574        total_removed
2575    }
2576
2577    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2578        let state = self.states.get(&state_id)?;
2579
2580        if state.is_at_capacity() {
2581            Some(CapacityWarning {
2582                current_entries: state.data.len(),
2583                max_entries: state.config.max_entries,
2584                entries_over_limit: state.entries_over_limit(),
2585            })
2586        } else {
2587            None
2588        }
2589    }
2590
2591    /// Drop the oldest pending update across all PDAs
2592    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2593        let state = self
2594            .states
2595            .get_mut(&state_id)
2596            .ok_or("State table not found")?;
2597
2598        let mut oldest_pda: Option<String> = None;
2599        let mut oldest_timestamp = i64::MAX;
2600
2601        // Find the PDA with the oldest update
2602        for entry in state.pending_updates.iter() {
2603            let (pda, updates) = entry.pair();
2604            if let Some(update) = updates.first() {
2605                if update.queued_at < oldest_timestamp {
2606                    oldest_timestamp = update.queued_at;
2607                    oldest_pda = Some(pda.clone());
2608                }
2609            }
2610        }
2611
2612        // Remove the oldest update
2613        if let Some(pda) = oldest_pda {
2614            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2615                if !updates.is_empty() {
2616                    updates.remove(0);
2617                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2618
2619                    // Remove the entry if it's now empty
2620                    if updates.is_empty() {
2621                        drop(updates);
2622                        state.pending_updates.remove(&pda);
2623                    }
2624                }
2625            }
2626        }
2627
2628        Ok(())
2629    }
2630
2631    /// Flush and return pending updates for a PDA for external reprocessing
2632    ///
2633    /// Returns the pending updates that were queued for this PDA address.
2634    /// The caller should reprocess these through the VM using process_event().
2635    fn flush_pending_updates(
2636        &mut self,
2637        state_id: u32,
2638        pda_address: &str,
2639    ) -> Result<Vec<PendingAccountUpdate>> {
2640        let state = self
2641            .states
2642            .get_mut(&state_id)
2643            .ok_or("State table not found")?;
2644
2645        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2646            let count = pending_updates.len();
2647            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2648            #[cfg(feature = "otel")]
2649            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2650            Ok(pending_updates)
2651        } else {
2652            Ok(Vec::new())
2653        }
2654    }
2655
2656    /// Try to resolve a primary key via PDA reverse lookup
2657    pub fn try_pda_reverse_lookup(
2658        &mut self,
2659        state_id: u32,
2660        lookup_name: &str,
2661        pda_address: &str,
2662    ) -> Option<String> {
2663        let state = self.states.get_mut(&state_id)?;
2664
2665        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2666            if let Some(value) = lookup.lookup(pda_address) {
2667                self.pda_cache_hits += 1;
2668                return Some(value);
2669            }
2670        }
2671
2672        self.pda_cache_misses += 1;
2673        None
2674    }
2675
2676    // ============================================================================
2677    // Computed Expression Evaluator (Task 5)
2678    // ============================================================================
2679
2680    /// Evaluate a computed expression AST against the current state
2681    /// This is the core runtime evaluator for computed fields from the AST
2682    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2683        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2684    }
2685
2686    /// Evaluate a computed expression with a variable environment (for let bindings)
2687    fn evaluate_computed_expr_with_env(
2688        &self,
2689        expr: &ComputedExpr,
2690        state: &Value,
2691        env: &std::collections::HashMap<String, Value>,
2692    ) -> Result<Value> {
2693        match expr {
2694            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2695
2696            ComputedExpr::Var { name } => env
2697                .get(name)
2698                .cloned()
2699                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2700
2701            ComputedExpr::Let { name, value, body } => {
2702                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2703                let mut new_env = env.clone();
2704                new_env.insert(name.clone(), val);
2705                self.evaluate_computed_expr_with_env(body, state, &new_env)
2706            }
2707
2708            ComputedExpr::If {
2709                condition,
2710                then_branch,
2711                else_branch,
2712            } => {
2713                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2714                if self.value_to_bool(&cond_val) {
2715                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2716                } else {
2717                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2718                }
2719            }
2720
2721            ComputedExpr::None => Ok(Value::Null),
2722
2723            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2724
2725            ComputedExpr::Slice { expr, start, end } => {
2726                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2727                match val {
2728                    Value::Array(arr) => {
2729                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2730                        Ok(Value::Array(slice))
2731                    }
2732                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2733                }
2734            }
2735
2736            ComputedExpr::Index { expr, index } => {
2737                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2738                match val {
2739                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2740                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2741                }
2742            }
2743
2744            ComputedExpr::U64FromLeBytes { bytes } => {
2745                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2746                let byte_vec = self.value_to_bytes(&val)?;
2747                if byte_vec.len() < 8 {
2748                    return Err(format!(
2749                        "u64::from_le_bytes requires 8 bytes, got {}",
2750                        byte_vec.len()
2751                    )
2752                    .into());
2753                }
2754                let arr: [u8; 8] = byte_vec[..8]
2755                    .try_into()
2756                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2757                Ok(json!(u64::from_le_bytes(arr)))
2758            }
2759
2760            ComputedExpr::U64FromBeBytes { bytes } => {
2761                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2762                let byte_vec = self.value_to_bytes(&val)?;
2763                if byte_vec.len() < 8 {
2764                    return Err(format!(
2765                        "u64::from_be_bytes requires 8 bytes, got {}",
2766                        byte_vec.len()
2767                    )
2768                    .into());
2769                }
2770                let arr: [u8; 8] = byte_vec[..8]
2771                    .try_into()
2772                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2773                Ok(json!(u64::from_be_bytes(arr)))
2774            }
2775
2776            ComputedExpr::ByteArray { bytes } => {
2777                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2778            }
2779
2780            ComputedExpr::Closure { param, body } => {
2781                // Closures are stored as-is; they're evaluated when used in map()
2782                // Return a special representation
2783                Ok(json!({
2784                    "__closure": {
2785                        "param": param,
2786                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
2787                    }
2788                }))
2789            }
2790
2791            ComputedExpr::Unary { op, expr } => {
2792                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2793                self.apply_unary_op(op, &val)
2794            }
2795
2796            ComputedExpr::JsonToBytes { expr } => {
2797                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2798                // Convert JSON array of numbers to byte array
2799                let bytes = self.value_to_bytes(&val)?;
2800                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2801            }
2802
2803            ComputedExpr::UnwrapOr { expr, default } => {
2804                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2805                if val.is_null() {
2806                    Ok(default.clone())
2807                } else {
2808                    Ok(val)
2809                }
2810            }
2811
2812            ComputedExpr::Binary { op, left, right } => {
2813                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2814                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2815                self.apply_binary_op(op, &l, &r)
2816            }
2817
2818            ComputedExpr::Cast { expr, to_type } => {
2819                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2820                self.apply_cast(&val, to_type)
2821            }
2822
2823            ComputedExpr::MethodCall { expr, method, args } => {
2824                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2825                // Special handling for map() with closures
2826                if method == "map" && args.len() == 1 {
2827                    if let ComputedExpr::Closure { param, body } = &args[0] {
2828                        // If the value is null, return null (Option::None.map returns None)
2829                        if val.is_null() {
2830                            return Ok(Value::Null);
2831                        }
2832                        // Evaluate the closure body with the value bound to param
2833                        let mut closure_env = env.clone();
2834                        closure_env.insert(param.clone(), val);
2835                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2836                    }
2837                }
2838                let evaluated_args: Vec<Value> = args
2839                    .iter()
2840                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2841                    .collect::<Result<Vec<_>>>()?;
2842                self.apply_method_call(&val, method, &evaluated_args)
2843            }
2844
2845            ComputedExpr::Literal { value } => Ok(value.clone()),
2846
2847            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2848        }
2849    }
2850
2851    /// Convert a JSON value to a byte vector
2852    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2853        match val {
2854            Value::Array(arr) => arr
2855                .iter()
2856                .map(|v| {
2857                    v.as_u64()
2858                        .map(|n| n as u8)
2859                        .ok_or_else(|| "Array element not a valid byte".into())
2860                })
2861                .collect(),
2862            Value::String(s) => {
2863                // Try to decode as hex
2864                if s.starts_with("0x") || s.starts_with("0X") {
2865                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2866                } else {
2867                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2868                }
2869            }
2870            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2871        }
2872    }
2873
2874    /// Apply a unary operation
2875    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2876        use crate::ast::UnaryOp;
2877        match op {
2878            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
2879            UnaryOp::ReverseBits => match val.as_u64() {
2880                Some(n) => Ok(json!(n.reverse_bits())),
2881                None => match val.as_i64() {
2882                    Some(n) => Ok(json!((n as u64).reverse_bits())),
2883                    None => Err("reverse_bits requires an integer".into()),
2884                },
2885            },
2886        }
2887    }
2888
2889    /// Get a field value from state by path (e.g., "section.field" or just "field")
2890    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2891        let segments: Vec<&str> = path.split('.').collect();
2892        let mut current = state;
2893
2894        for segment in segments {
2895            match current.get(segment) {
2896                Some(v) => current = v,
2897                None => return Ok(Value::Null),
2898            }
2899        }
2900
2901        Ok(current.clone())
2902    }
2903
2904    /// Apply a binary operation to two values
2905    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2906        match op {
2907            // Arithmetic operations
2908            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2909            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2910            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2911            BinaryOp::Div => {
2912                // Check for division by zero
2913                if let Some(r) = right.as_i64() {
2914                    if r == 0 {
2915                        return Err("Division by zero".into());
2916                    }
2917                }
2918                if let Some(r) = right.as_f64() {
2919                    if r == 0.0 {
2920                        return Err("Division by zero".into());
2921                    }
2922                }
2923                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2924            }
2925            BinaryOp::Mod => {
2926                // Modulo - only for integers
2927                match (left.as_i64(), right.as_i64()) {
2928                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2929                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
2930                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2931                        _ => Err("Modulo requires non-zero integer operands".into()),
2932                    },
2933                    _ => Err("Modulo by zero".into()),
2934                }
2935            }
2936
2937            // Comparison operations
2938            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2939            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2940            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2941            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2942            BinaryOp::Eq => Ok(json!(left == right)),
2943            BinaryOp::Ne => Ok(json!(left != right)),
2944
2945            // Logical operations
2946            BinaryOp::And => {
2947                let l_bool = self.value_to_bool(left);
2948                let r_bool = self.value_to_bool(right);
2949                Ok(json!(l_bool && r_bool))
2950            }
2951            BinaryOp::Or => {
2952                let l_bool = self.value_to_bool(left);
2953                let r_bool = self.value_to_bool(right);
2954                Ok(json!(l_bool || r_bool))
2955            }
2956
2957            // Bitwise operations
2958            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
2959                (Some(a), Some(b)) => Ok(json!(a ^ b)),
2960                _ => match (left.as_i64(), right.as_i64()) {
2961                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
2962                    _ => Err("XOR requires integer operands".into()),
2963                },
2964            },
2965            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
2966                (Some(a), Some(b)) => Ok(json!(a & b)),
2967                _ => match (left.as_i64(), right.as_i64()) {
2968                    (Some(a), Some(b)) => Ok(json!(a & b)),
2969                    _ => Err("BitAnd requires integer operands".into()),
2970                },
2971            },
2972            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
2973                (Some(a), Some(b)) => Ok(json!(a | b)),
2974                _ => match (left.as_i64(), right.as_i64()) {
2975                    (Some(a), Some(b)) => Ok(json!(a | b)),
2976                    _ => Err("BitOr requires integer operands".into()),
2977                },
2978            },
2979            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
2980                (Some(a), Some(b)) => Ok(json!(a << b)),
2981                _ => match (left.as_i64(), right.as_i64()) {
2982                    (Some(a), Some(b)) => Ok(json!(a << b)),
2983                    _ => Err("Shl requires integer operands".into()),
2984                },
2985            },
2986            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
2987                (Some(a), Some(b)) => Ok(json!(a >> b)),
2988                _ => match (left.as_i64(), right.as_i64()) {
2989                    (Some(a), Some(b)) => Ok(json!(a >> b)),
2990                    _ => Err("Shr requires integer operands".into()),
2991                },
2992            },
2993        }
2994    }
2995
2996    /// Helper for numeric operations that can work on integers or floats
2997    fn numeric_op<F1, F2>(
2998        &self,
2999        left: &Value,
3000        right: &Value,
3001        int_op: F1,
3002        float_op: F2,
3003    ) -> Result<Value>
3004    where
3005        F1: Fn(i64, i64) -> i64,
3006        F2: Fn(f64, f64) -> f64,
3007    {
3008        // Try i64 first
3009        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3010            return Ok(json!(int_op(a, b)));
3011        }
3012
3013        // Try u64
3014        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3015            // For u64, we need to be careful with underflow in subtraction
3016            return Ok(json!(int_op(a as i64, b as i64)));
3017        }
3018
3019        // Try f64
3020        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3021            return Ok(json!(float_op(a, b)));
3022        }
3023
3024        // If either is null, return null
3025        if left.is_null() || right.is_null() {
3026            return Ok(Value::Null);
3027        }
3028
3029        Err(format!(
3030            "Cannot perform numeric operation on {:?} and {:?}",
3031            left, right
3032        )
3033        .into())
3034    }
3035
3036    /// Helper for comparison operations
3037    fn comparison_op<F1, F2>(
3038        &self,
3039        left: &Value,
3040        right: &Value,
3041        int_cmp: F1,
3042        float_cmp: F2,
3043    ) -> Result<Value>
3044    where
3045        F1: Fn(i64, i64) -> bool,
3046        F2: Fn(f64, f64) -> bool,
3047    {
3048        // Try i64 first
3049        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3050            return Ok(json!(int_cmp(a, b)));
3051        }
3052
3053        // Try u64
3054        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3055            return Ok(json!(int_cmp(a as i64, b as i64)));
3056        }
3057
3058        // Try f64
3059        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3060            return Ok(json!(float_cmp(a, b)));
3061        }
3062
3063        // If either is null, comparison returns false
3064        if left.is_null() || right.is_null() {
3065            return Ok(json!(false));
3066        }
3067
3068        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3069    }
3070
3071    /// Convert a value to boolean for logical operations
3072    fn value_to_bool(&self, value: &Value) -> bool {
3073        match value {
3074            Value::Null => false,
3075            Value::Bool(b) => *b,
3076            Value::Number(n) => {
3077                if let Some(i) = n.as_i64() {
3078                    i != 0
3079                } else if let Some(f) = n.as_f64() {
3080                    f != 0.0
3081                } else {
3082                    true
3083                }
3084            }
3085            Value::String(s) => !s.is_empty(),
3086            Value::Array(arr) => !arr.is_empty(),
3087            Value::Object(obj) => !obj.is_empty(),
3088        }
3089    }
3090
3091    /// Apply a type cast to a value
3092    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3093        match to_type {
3094            "i8" | "i16" | "i32" | "i64" | "isize" => {
3095                if let Some(n) = value.as_i64() {
3096                    Ok(json!(n))
3097                } else if let Some(n) = value.as_u64() {
3098                    Ok(json!(n as i64))
3099                } else if let Some(n) = value.as_f64() {
3100                    Ok(json!(n as i64))
3101                } else if let Some(s) = value.as_str() {
3102                    s.parse::<i64>()
3103                        .map(|n| json!(n))
3104                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3105                } else {
3106                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3107                }
3108            }
3109            "u8" | "u16" | "u32" | "u64" | "usize" => {
3110                if let Some(n) = value.as_u64() {
3111                    Ok(json!(n))
3112                } else if let Some(n) = value.as_i64() {
3113                    Ok(json!(n as u64))
3114                } else if let Some(n) = value.as_f64() {
3115                    Ok(json!(n as u64))
3116                } else if let Some(s) = value.as_str() {
3117                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3118                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3119                    })
3120                } else {
3121                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3122                }
3123            }
3124            "f32" | "f64" => {
3125                if let Some(n) = value.as_f64() {
3126                    Ok(json!(n))
3127                } else if let Some(n) = value.as_i64() {
3128                    Ok(json!(n as f64))
3129                } else if let Some(n) = value.as_u64() {
3130                    Ok(json!(n as f64))
3131                } else if let Some(s) = value.as_str() {
3132                    s.parse::<f64>()
3133                        .map(|n| json!(n))
3134                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3135                } else {
3136                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3137                }
3138            }
3139            "String" | "string" => Ok(json!(value.to_string())),
3140            "bool" => Ok(json!(self.value_to_bool(value))),
3141            _ => {
3142                // Unknown type, return value as-is
3143                Ok(value.clone())
3144            }
3145        }
3146    }
3147
3148    /// Apply a method call to a value
3149    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3150        match method {
3151            "unwrap_or" => {
3152                if value.is_null() && !args.is_empty() {
3153                    Ok(args[0].clone())
3154                } else {
3155                    Ok(value.clone())
3156                }
3157            }
3158            "unwrap_or_default" => {
3159                if value.is_null() {
3160                    // Return default for common types
3161                    Ok(json!(0))
3162                } else {
3163                    Ok(value.clone())
3164                }
3165            }
3166            "is_some" => Ok(json!(!value.is_null())),
3167            "is_none" => Ok(json!(value.is_null())),
3168            "abs" => {
3169                if let Some(n) = value.as_i64() {
3170                    Ok(json!(n.abs()))
3171                } else if let Some(n) = value.as_f64() {
3172                    Ok(json!(n.abs()))
3173                } else {
3174                    Err(format!("Cannot call abs() on {:?}", value).into())
3175                }
3176            }
3177            "len" => {
3178                if let Some(s) = value.as_str() {
3179                    Ok(json!(s.len()))
3180                } else if let Some(arr) = value.as_array() {
3181                    Ok(json!(arr.len()))
3182                } else if let Some(obj) = value.as_object() {
3183                    Ok(json!(obj.len()))
3184                } else {
3185                    Err(format!("Cannot call len() on {:?}", value).into())
3186                }
3187            }
3188            "to_string" => Ok(json!(value.to_string())),
3189            "min" => {
3190                if args.is_empty() {
3191                    return Err("min() requires an argument".into());
3192                }
3193                let other = &args[0];
3194                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3195                    Ok(json!(a.min(b)))
3196                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3197                    Ok(json!(a.min(b)))
3198                } else {
3199                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3200                }
3201            }
3202            "max" => {
3203                if args.is_empty() {
3204                    return Err("max() requires an argument".into());
3205                }
3206                let other = &args[0];
3207                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3208                    Ok(json!(a.max(b)))
3209                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3210                    Ok(json!(a.max(b)))
3211                } else {
3212                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3213                }
3214            }
3215            "saturating_add" => {
3216                if args.is_empty() {
3217                    return Err("saturating_add() requires an argument".into());
3218                }
3219                let other = &args[0];
3220                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3221                    Ok(json!(a.saturating_add(b)))
3222                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3223                    Ok(json!(a.saturating_add(b)))
3224                } else {
3225                    Err(format!(
3226                        "Cannot call saturating_add() on {:?} and {:?}",
3227                        value, other
3228                    )
3229                    .into())
3230                }
3231            }
3232            "saturating_sub" => {
3233                if args.is_empty() {
3234                    return Err("saturating_sub() requires an argument".into());
3235                }
3236                let other = &args[0];
3237                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3238                    Ok(json!(a.saturating_sub(b)))
3239                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3240                    Ok(json!(a.saturating_sub(b)))
3241                } else {
3242                    Err(format!(
3243                        "Cannot call saturating_sub() on {:?} and {:?}",
3244                        value, other
3245                    )
3246                    .into())
3247                }
3248            }
3249            _ => Err(format!("Unknown method call: {}()", method).into()),
3250        }
3251    }
3252
3253    /// Evaluate all computed fields for an entity and update the state
3254    /// This takes a list of ComputedFieldSpec from the AST and applies them
3255    pub fn evaluate_computed_fields_from_ast(
3256        &self,
3257        state: &mut Value,
3258        computed_field_specs: &[ComputedFieldSpec],
3259    ) -> Result<Vec<String>> {
3260        let mut updated_paths = Vec::new();
3261
3262        for spec in computed_field_specs {
3263            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3264                self.set_field_in_state(state, &spec.target_path, result)?;
3265                updated_paths.push(spec.target_path.clone());
3266            }
3267        }
3268
3269        Ok(updated_paths)
3270    }
3271
3272    /// Set a field value in state by path (e.g., "section.field")
3273    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3274        let segments: Vec<&str> = path.split('.').collect();
3275
3276        if segments.is_empty() {
3277            return Err("Empty path".into());
3278        }
3279
3280        // Navigate to parent, creating intermediate objects as needed
3281        let mut current = state;
3282        for (i, segment) in segments.iter().enumerate() {
3283            if i == segments.len() - 1 {
3284                // Last segment - set the value
3285                if let Some(obj) = current.as_object_mut() {
3286                    obj.insert(segment.to_string(), value);
3287                    return Ok(());
3288                } else {
3289                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
3290                }
3291            } else {
3292                // Intermediate segment - navigate or create
3293                if !current.is_object() {
3294                    *current = json!({});
3295                }
3296                let obj = current.as_object_mut().unwrap();
3297                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3298            }
3299        }
3300
3301        Ok(())
3302    }
3303
3304    /// Create a computed fields evaluator closure from AST specs
3305    /// This returns a function that can be passed to the bytecode builder
3306    pub fn create_evaluator_from_specs(
3307        specs: Vec<ComputedFieldSpec>,
3308    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3309        move |state: &mut Value| {
3310            // Create a temporary VmContext just for evaluation
3311            // (We only need the expression evaluation methods)
3312            let vm = VmContext::new();
3313            vm.evaluate_computed_fields_from_ast(state, &specs)?;
3314            Ok(())
3315        }
3316    }
3317}
3318
3319impl Default for VmContext {
3320    fn default() -> Self {
3321        Self::new()
3322    }
3323}
3324
3325// Implement the ReverseLookupUpdater trait for VmContext
3326impl crate::resolvers::ReverseLookupUpdater for VmContext {
3327    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3328        // Use default state_id=0 and default lookup name
3329        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3330            .unwrap_or_else(|e| {
3331                tracing::error!("Failed to update PDA reverse lookup: {}", e);
3332                Vec::new()
3333            })
3334    }
3335
3336    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3337        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
3338        self.flush_pending_updates(0, pda_address)
3339            .unwrap_or_else(|e| {
3340                tracing::error!("Failed to flush pending updates: {}", e);
3341                Vec::new()
3342            })
3343    }
3344}
3345
3346#[cfg(test)]
3347mod tests {
3348    use super::*;
3349    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3350
3351    #[test]
3352    fn test_computed_field_preserves_integer_type() {
3353        let vm = VmContext::new();
3354
3355        let mut state = serde_json::json!({
3356            "trading": {
3357                "total_buy_volume": 20000000000_i64,
3358                "total_sell_volume": 17951316474_i64
3359            }
3360        });
3361
3362        let spec = ComputedFieldSpec {
3363            target_path: "trading.total_volume".to_string(),
3364            result_type: "Option<u64>".to_string(),
3365            expression: ComputedExpr::Binary {
3366                op: BinaryOp::Add,
3367                left: Box::new(ComputedExpr::UnwrapOr {
3368                    expr: Box::new(ComputedExpr::FieldRef {
3369                        path: "trading.total_buy_volume".to_string(),
3370                    }),
3371                    default: serde_json::json!(0),
3372                }),
3373                right: Box::new(ComputedExpr::UnwrapOr {
3374                    expr: Box::new(ComputedExpr::FieldRef {
3375                        path: "trading.total_sell_volume".to_string(),
3376                    }),
3377                    default: serde_json::json!(0),
3378                }),
3379            },
3380        };
3381
3382        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3383            .unwrap();
3384
3385        let total_volume = state
3386            .get("trading")
3387            .and_then(|t| t.get("total_volume"))
3388            .expect("total_volume should exist");
3389
3390        let serialized = serde_json::to_string(total_volume).unwrap();
3391        assert!(
3392            !serialized.contains('.'),
3393            "Integer should not have decimal point: {}",
3394            serialized
3395        );
3396        assert_eq!(
3397            total_volume.as_i64(),
3398            Some(37951316474),
3399            "Value should be correct sum"
3400        );
3401    }
3402
3403    #[test]
3404    fn test_set_field_sum_preserves_integer_type() {
3405        let mut vm = VmContext::new();
3406        vm.registers[0] = serde_json::json!({});
3407        vm.registers[1] = serde_json::json!(20000000000_i64);
3408        vm.registers[2] = serde_json::json!(17951316474_i64);
3409
3410        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3411        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3412
3413        let state = &vm.registers[0];
3414        let buy_vol = state
3415            .get("trading")
3416            .and_then(|t| t.get("total_buy_volume"))
3417            .unwrap();
3418        let sell_vol = state
3419            .get("trading")
3420            .and_then(|t| t.get("total_sell_volume"))
3421            .unwrap();
3422
3423        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3424        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3425
3426        assert!(
3427            !buy_serialized.contains('.'),
3428            "Buy volume should not have decimal: {}",
3429            buy_serialized
3430        );
3431        assert!(
3432            !sell_serialized.contains('.'),
3433            "Sell volume should not have decimal: {}",
3434            sell_serialized
3435        );
3436    }
3437}