Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// Additional custom metadata that can be added without breaking changes
33    pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37    /// Create a new UpdateContext with slot and signature
38    pub fn new(slot: u64, signature: String) -> Self {
39        Self {
40            slot: Some(slot),
41            signature: Some(signature),
42            timestamp: None,
43            write_version: None,
44            txn_index: None,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Create a new UpdateContext with slot, signature, and timestamp
50    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51        Self {
52            slot: Some(slot),
53            signature: Some(signature),
54            timestamp: Some(timestamp),
55            write_version: None,
56            txn_index: None,
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Create context for account updates with write_version for staleness detection
62    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63        Self {
64            slot: Some(slot),
65            signature: Some(signature),
66            timestamp: None,
67            write_version: Some(write_version),
68            txn_index: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create context for instruction updates with txn_index for staleness detection
74    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75        Self {
76            slot: Some(slot),
77            signature: Some(signature),
78            timestamp: None,
79            write_version: None,
80            txn_index: Some(txn_index),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Get the timestamp, falling back to current system time if not set
86    pub fn timestamp(&self) -> i64 {
87        self.timestamp.unwrap_or_else(|| {
88            std::time::SystemTime::now()
89                .duration_since(std::time::UNIX_EPOCH)
90                .unwrap()
91                .as_secs() as i64
92        })
93    }
94
95    /// Create an empty context (for testing or when context is not available)
96    pub fn empty() -> Self {
97        Self::default()
98    }
99
100    /// Add custom metadata
101    /// Returns true if this is an account update context (has write_version, no txn_index)
102    pub fn is_account_update(&self) -> bool {
103        self.write_version.is_some() && self.txn_index.is_none()
104    }
105
106    /// Returns true if this is an instruction update context (has txn_index, no write_version)
107    pub fn is_instruction_update(&self) -> bool {
108        self.txn_index.is_some() && self.write_version.is_none()
109    }
110
111    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
112        self.metadata.insert(key, value);
113        self
114    }
115
116    /// Get metadata value
117    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
118        self.metadata.get(key)
119    }
120
121    /// Convert context to JSON value for injection into event data
122    pub fn to_value(&self) -> Value {
123        let mut obj = serde_json::Map::new();
124        if let Some(slot) = self.slot {
125            obj.insert("slot".to_string(), json!(slot));
126        }
127        if let Some(ref sig) = self.signature {
128            obj.insert("signature".to_string(), json!(sig));
129        }
130        // Always include timestamp (use current time if not set)
131        obj.insert("timestamp".to_string(), json!(self.timestamp()));
132        for (key, value) in &self.metadata {
133            obj.insert(key.clone(), value.clone());
134        }
135        Value::Object(obj)
136    }
137}
138
139pub type Register = usize;
140pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
141
142pub type RegisterValue = Value;
143
144/// Trait for evaluating computed fields
145/// Implement this in your generated spec to enable computed field evaluation
146pub trait ComputedFieldsEvaluator {
147    fn evaluate(&self, state: &mut Value) -> Result<()>;
148}
149
150// Pending queue configuration
151const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
152const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
153const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
154
155// Temporal index configuration - prevents unbounded history growth
156const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
157const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
158
159// State table configuration - aligned with downstream EntityCache (500 per view)
160const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
161const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
162
163const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
164
165const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
166
167// Smaller cache for instruction deduplication - provides shorter effective TTL
168// since we don't expect instruction duplicates to arrive much later
169const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
170
171const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
172
173const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
174
175/// Estimate the size of a JSON value in bytes
176fn estimate_json_size(value: &Value) -> usize {
177    match value {
178        Value::Null => 4,
179        Value::Bool(_) => 5,
180        Value::Number(_) => 8,
181        Value::String(s) => s.len() + 2,
182        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
183        Value::Object(obj) => {
184            2 + obj
185                .iter()
186                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
187                .sum::<usize>()
188        }
189    }
190}
191
192#[derive(Debug, Clone)]
193pub struct CompiledPath {
194    pub segments: std::sync::Arc<[String]>,
195}
196
197impl CompiledPath {
198    pub fn new(path: &str) -> Self {
199        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
200        CompiledPath {
201            segments: segments.into(),
202        }
203    }
204
205    fn segments(&self) -> &[String] {
206        &self.segments
207    }
208}
209
210/// Represents the type of change made to a field for granular dirty tracking.
211/// This enables emitting only the actual changes rather than entire field values.
212#[derive(Debug, Clone)]
213pub enum FieldChange {
214    /// Field was replaced with a new value (emit the full value from state)
215    Replaced,
216    /// Items were appended to an array field (emit only the new items)
217    Appended(Vec<Value>),
218}
219
220/// Tracks field modifications during handler execution with granular change information.
221/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
222#[derive(Debug, Clone, Default)]
223pub struct DirtyTracker {
224    changes: HashMap<String, FieldChange>,
225}
226
227impl DirtyTracker {
228    /// Create a new empty DirtyTracker
229    pub fn new() -> Self {
230        Self {
231            changes: HashMap::new(),
232        }
233    }
234
235    /// Mark a field as replaced (full value will be emitted)
236    pub fn mark_replaced(&mut self, path: &str) {
237        // If there was an append, it's now superseded by a full replacement
238        self.changes.insert(path.to_string(), FieldChange::Replaced);
239    }
240
241    /// Record an appended value for a field
242    pub fn mark_appended(&mut self, path: &str, value: Value) {
243        match self.changes.get_mut(path) {
244            Some(FieldChange::Appended(values)) => {
245                // Add to existing appended values
246                values.push(value);
247            }
248            Some(FieldChange::Replaced) => {
249                // Field was already replaced, keep it as replaced
250                // (the full value including the append will be emitted)
251            }
252            None => {
253                // First append to this field
254                self.changes
255                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
256            }
257        }
258    }
259
260    /// Check if there are any changes tracked
261    pub fn is_empty(&self) -> bool {
262        self.changes.is_empty()
263    }
264
265    /// Get the number of changed fields
266    pub fn len(&self) -> usize {
267        self.changes.len()
268    }
269
270    /// Iterate over all changes
271    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
272        self.changes.iter()
273    }
274
275    /// Get a set of all dirty field paths (for backward compatibility)
276    pub fn dirty_paths(&self) -> HashSet<String> {
277        self.changes.keys().cloned().collect()
278    }
279
280    /// Consume the tracker and return the changes map
281    pub fn into_changes(self) -> HashMap<String, FieldChange> {
282        self.changes
283    }
284
285    /// Get a reference to the changes map
286    pub fn changes(&self) -> &HashMap<String, FieldChange> {
287        &self.changes
288    }
289
290    /// Get paths that were appended (not replaced)
291    pub fn appended_paths(&self) -> Vec<String> {
292        self.changes
293            .iter()
294            .filter_map(|(path, change)| match change {
295                FieldChange::Appended(_) => Some(path.clone()),
296                FieldChange::Replaced => None,
297            })
298            .collect()
299    }
300}
301
302pub struct VmContext {
303    registers: Vec<RegisterValue>,
304    states: HashMap<u32, StateTable>,
305    pub instructions_executed: u64,
306    pub cache_hits: u64,
307    path_cache: HashMap<String, CompiledPath>,
308    pub pda_cache_hits: u64,
309    pub pda_cache_misses: u64,
310    pub pending_queue_size: u64,
311    current_context: Option<UpdateContext>,
312    warnings: Vec<String>,
313    last_pda_lookup_miss: Option<String>,
314    last_pda_registered: Option<String>,
315}
316
317#[derive(Debug)]
318pub struct LookupIndex {
319    index: std::sync::Mutex<LruCache<String, Value>>,
320}
321
322impl LookupIndex {
323    pub fn new() -> Self {
324        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
325    }
326
327    pub fn with_capacity(capacity: usize) -> Self {
328        LookupIndex {
329            index: std::sync::Mutex::new(LruCache::new(
330                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
331            )),
332        }
333    }
334
335    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
336        let key = value_to_cache_key(lookup_value);
337        self.index.lock().unwrap().get(&key).cloned()
338    }
339
340    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
341        let key = value_to_cache_key(&lookup_value);
342        self.index.lock().unwrap().put(key, primary_key);
343    }
344
345    pub fn len(&self) -> usize {
346        self.index.lock().unwrap().len()
347    }
348
349    pub fn is_empty(&self) -> bool {
350        self.index.lock().unwrap().is_empty()
351    }
352}
353
354impl Default for LookupIndex {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360fn value_to_cache_key(value: &Value) -> String {
361    match value {
362        Value::String(s) => s.clone(),
363        Value::Number(n) => n.to_string(),
364        Value::Bool(b) => b.to_string(),
365        Value::Null => "null".to_string(),
366        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
367    }
368}
369
370#[derive(Debug)]
371pub struct TemporalIndex {
372    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
373}
374
375impl Default for TemporalIndex {
376    fn default() -> Self {
377        Self::new()
378    }
379}
380
381impl TemporalIndex {
382    pub fn new() -> Self {
383        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
384    }
385
386    pub fn with_capacity(capacity: usize) -> Self {
387        TemporalIndex {
388            index: std::sync::Mutex::new(LruCache::new(
389                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
390            )),
391        }
392    }
393
394    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
395        let key = value_to_cache_key(lookup_value);
396        let mut cache = self.index.lock().unwrap();
397        if let Some(entries) = cache.get(&key) {
398            for i in (0..entries.len()).rev() {
399                if entries[i].1 <= timestamp {
400                    return Some(entries[i].0.clone());
401                }
402            }
403        }
404        None
405    }
406
407    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
408        let key = value_to_cache_key(lookup_value);
409        let mut cache = self.index.lock().unwrap();
410        if let Some(entries) = cache.get(&key) {
411            if let Some(last) = entries.last() {
412                return Some(last.0.clone());
413            }
414        }
415        None
416    }
417
418    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
419        let key = value_to_cache_key(&lookup_value);
420        let mut cache = self.index.lock().unwrap();
421
422        let entries = cache.get_or_insert_mut(key, Vec::new);
423        entries.push((primary_key, timestamp));
424        entries.sort_by_key(|(_, ts)| *ts);
425
426        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
427        entries.retain(|(_, ts)| *ts >= cutoff);
428
429        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
430            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
431            entries.drain(0..excess);
432        }
433    }
434
435    pub fn len(&self) -> usize {
436        self.index.lock().unwrap().len()
437    }
438
439    pub fn is_empty(&self) -> bool {
440        self.index.lock().unwrap().is_empty()
441    }
442
443    pub fn total_entries(&self) -> usize {
444        self.index
445            .lock()
446            .unwrap()
447            .iter()
448            .map(|(_, entries)| entries.len())
449            .sum()
450    }
451
452    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
453        let mut cache = self.index.lock().unwrap();
454        let mut total_removed = 0;
455
456        for (_, entries) in cache.iter_mut() {
457            let original_len = entries.len();
458            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
459            total_removed += original_len - entries.len();
460        }
461
462        total_removed
463    }
464}
465
466#[derive(Debug)]
467pub struct PdaReverseLookup {
468    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
469    index: LruCache<String, String>,
470}
471
472impl PdaReverseLookup {
473    pub fn new(capacity: usize) -> Self {
474        PdaReverseLookup {
475            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
476        }
477    }
478
479    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
480        self.index.get(pda_address).cloned()
481    }
482
483    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
484        let evicted = if self.index.len() >= self.index.cap().get() {
485            self.index.peek_lru().map(|(k, _)| k.clone())
486        } else {
487            None
488        };
489
490        self.index.put(pda_address, seed_value);
491        evicted
492    }
493
494    pub fn len(&self) -> usize {
495        self.index.len()
496    }
497
498    pub fn is_empty(&self) -> bool {
499        self.index.is_empty()
500    }
501}
502
503/// Input for queueing an account update.
504#[derive(Debug, Clone)]
505pub struct QueuedAccountUpdate {
506    pub pda_address: String,
507    pub account_type: String,
508    pub account_data: Value,
509    pub slot: u64,
510    pub write_version: u64,
511    pub signature: String,
512}
513
514/// Internal representation of a pending account update with queue metadata.
515#[derive(Debug, Clone)]
516pub struct PendingAccountUpdate {
517    pub account_type: String,
518    pub pda_address: String,
519    pub account_data: Value,
520    pub slot: u64,
521    pub write_version: u64,
522    pub signature: String,
523    pub queued_at: i64,
524}
525
526/// Input for queueing an instruction event when PDA lookup fails.
527#[derive(Debug, Clone)]
528pub struct QueuedInstructionEvent {
529    pub pda_address: String,
530    pub event_type: String,
531    pub event_data: Value,
532    pub slot: u64,
533    pub signature: String,
534}
535
536/// Internal representation of a pending instruction event with queue metadata.
537#[derive(Debug, Clone)]
538pub struct PendingInstructionEvent {
539    pub event_type: String,
540    pub pda_address: String,
541    pub event_data: Value,
542    pub slot: u64,
543    pub signature: String,
544    pub queued_at: i64,
545}
546
547#[derive(Debug, Clone)]
548pub struct PendingQueueStats {
549    pub total_updates: usize,
550    pub unique_pdas: usize,
551    pub oldest_age_seconds: i64,
552    pub largest_pda_queue_size: usize,
553    pub estimated_memory_bytes: usize,
554}
555
556#[derive(Debug, Clone, Default)]
557pub struct VmMemoryStats {
558    pub state_table_entity_count: usize,
559    pub state_table_max_entries: usize,
560    pub state_table_at_capacity: bool,
561    pub lookup_index_count: usize,
562    pub lookup_index_total_entries: usize,
563    pub temporal_index_count: usize,
564    pub temporal_index_total_entries: usize,
565    pub pda_reverse_lookup_count: usize,
566    pub pda_reverse_lookup_total_entries: usize,
567    pub version_tracker_entries: usize,
568    pub pending_queue_stats: Option<PendingQueueStats>,
569    pub path_cache_size: usize,
570}
571
572#[derive(Debug, Clone, Default)]
573pub struct CleanupResult {
574    pub pending_updates_removed: usize,
575    pub temporal_entries_removed: usize,
576}
577
578#[derive(Debug, Clone)]
579pub struct CapacityWarning {
580    pub current_entries: usize,
581    pub max_entries: usize,
582    pub entries_over_limit: usize,
583}
584
585#[derive(Debug, Clone)]
586pub struct StateTableConfig {
587    pub max_entries: usize,
588    pub max_array_length: usize,
589}
590
591impl Default for StateTableConfig {
592    fn default() -> Self {
593        Self {
594            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
595            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
596        }
597    }
598}
599
600#[derive(Debug)]
601pub struct VersionTracker {
602    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
603}
604
605impl VersionTracker {
606    pub fn new() -> Self {
607        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
608    }
609
610    pub fn with_capacity(capacity: usize) -> Self {
611        VersionTracker {
612            cache: std::sync::Mutex::new(LruCache::new(
613                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
614            )),
615        }
616    }
617
618    fn make_key(primary_key: &Value, event_type: &str) -> String {
619        format!("{}:{}", primary_key, event_type)
620    }
621
622    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
623        let key = Self::make_key(primary_key, event_type);
624        self.cache.lock().unwrap().get(&key).copied()
625    }
626
627    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
628        let key = Self::make_key(primary_key, event_type);
629        self.cache.lock().unwrap().put(key, (slot, ordering_value));
630    }
631
632    pub fn len(&self) -> usize {
633        self.cache.lock().unwrap().len()
634    }
635
636    pub fn is_empty(&self) -> bool {
637        self.cache.lock().unwrap().is_empty()
638    }
639}
640
641impl Default for VersionTracker {
642    fn default() -> Self {
643        Self::new()
644    }
645}
646
647#[derive(Debug)]
648pub struct StateTable {
649    pub data: DashMap<Value, Value>,
650    access_times: DashMap<Value, i64>,
651    pub lookup_indexes: HashMap<String, LookupIndex>,
652    pub temporal_indexes: HashMap<String, TemporalIndex>,
653    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
654    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
655    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
656    version_tracker: VersionTracker,
657    instruction_dedup_cache: VersionTracker,
658    config: StateTableConfig,
659    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
660    entity_name: String,
661}
662
663impl StateTable {
664    pub fn is_at_capacity(&self) -> bool {
665        self.data.len() >= self.config.max_entries
666    }
667
668    pub fn entries_over_limit(&self) -> usize {
669        self.data.len().saturating_sub(self.config.max_entries)
670    }
671
672    pub fn max_array_length(&self) -> usize {
673        self.config.max_array_length
674    }
675
676    fn touch(&self, key: &Value) {
677        let now = std::time::SystemTime::now()
678            .duration_since(std::time::UNIX_EPOCH)
679            .unwrap()
680            .as_secs() as i64;
681        self.access_times.insert(key.clone(), now);
682    }
683
684    fn evict_lru(&self, count: usize) -> usize {
685        if count == 0 || self.data.is_empty() {
686            return 0;
687        }
688
689        let mut entries: Vec<(Value, i64)> = self
690            .access_times
691            .iter()
692            .map(|entry| (entry.key().clone(), *entry.value()))
693            .collect();
694
695        entries.sort_by_key(|(_, ts)| *ts);
696
697        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
698
699        let mut evicted = 0;
700        for key in to_evict {
701            self.data.remove(&key);
702            self.access_times.remove(&key);
703            evicted += 1;
704        }
705
706        #[cfg(feature = "otel")]
707        if evicted > 0 {
708            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
709        }
710
711        evicted
712    }
713
714    pub fn insert_with_eviction(&self, key: Value, value: Value) {
715        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
716            #[cfg(feature = "otel")]
717            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
718            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
719            self.evict_lru(to_evict.max(1));
720        }
721        self.data.insert(key.clone(), value);
722        self.touch(&key);
723    }
724
725    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
726        let result = self.data.get(key).map(|v| v.clone());
727        if result.is_some() {
728            self.touch(key);
729        }
730        result
731    }
732
733    /// Check if an update is fresh and update the version tracker.
734    /// Returns true if the update should be processed (is fresh).
735    /// Returns false if the update is stale and should be skipped.
736    ///
737    /// Comparison is lexicographic on (slot, ordering_value):
738    /// (100, 5) > (100, 3) > (99, 999)
739    pub fn is_fresh_update(
740        &self,
741        primary_key: &Value,
742        event_type: &str,
743        slot: u64,
744        ordering_value: u64,
745    ) -> bool {
746        let dominated = self
747            .version_tracker
748            .get(primary_key, event_type)
749            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
750            .unwrap_or(false);
751
752        if dominated {
753            return false;
754        }
755
756        self.version_tracker
757            .insert(primary_key, event_type, slot, ordering_value);
758        true
759    }
760
761    /// Check if an instruction is a duplicate of one we've seen recently.
762    /// Returns true if this exact instruction has been seen before (is a duplicate).
763    /// Returns false if this is a new instruction that should be processed.
764    ///
765    /// Unlike account updates, instructions don't use recency checks - all
766    /// unique instructions are processed. Only exact duplicates are skipped.
767    /// Uses a smaller cache capacity for shorter effective TTL.
768    pub fn is_duplicate_instruction(
769        &self,
770        primary_key: &Value,
771        event_type: &str,
772        slot: u64,
773        txn_index: u64,
774    ) -> bool {
775        // Check if we've seen this exact instruction before
776        let is_duplicate = self
777            .instruction_dedup_cache
778            .get(primary_key, event_type)
779            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
780            .unwrap_or(false);
781
782        if is_duplicate {
783            return true;
784        }
785
786        // Record this instruction for deduplication
787        self.instruction_dedup_cache
788            .insert(primary_key, event_type, slot, txn_index);
789        false
790    }
791}
792
793impl VmContext {
794    pub fn new() -> Self {
795        let mut vm = VmContext {
796            registers: vec![Value::Null; 256],
797            states: HashMap::new(),
798            instructions_executed: 0,
799            cache_hits: 0,
800            path_cache: HashMap::new(),
801            pda_cache_hits: 0,
802            pda_cache_misses: 0,
803            pending_queue_size: 0,
804            current_context: None,
805            warnings: Vec::new(),
806            last_pda_lookup_miss: None,
807            last_pda_registered: None,
808        };
809        vm.states.insert(
810            0,
811            StateTable {
812                data: DashMap::new(),
813                access_times: DashMap::new(),
814                lookup_indexes: HashMap::new(),
815                temporal_indexes: HashMap::new(),
816                pda_reverse_lookups: HashMap::new(),
817                pending_updates: DashMap::new(),
818                pending_instruction_events: DashMap::new(),
819                version_tracker: VersionTracker::new(),
820                instruction_dedup_cache: VersionTracker::with_capacity(
821                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
822                ),
823                config: StateTableConfig::default(),
824                entity_name: "default".to_string(),
825            },
826        );
827        vm
828    }
829
830    pub fn new_with_config(state_config: StateTableConfig) -> Self {
831        let mut vm = VmContext {
832            registers: vec![Value::Null; 256],
833            states: HashMap::new(),
834            instructions_executed: 0,
835            cache_hits: 0,
836            path_cache: HashMap::new(),
837            pda_cache_hits: 0,
838            pda_cache_misses: 0,
839            pending_queue_size: 0,
840            current_context: None,
841            warnings: Vec::new(),
842            last_pda_lookup_miss: None,
843            last_pda_registered: None,
844        };
845        vm.states.insert(
846            0,
847            StateTable {
848                data: DashMap::new(),
849                access_times: DashMap::new(),
850                lookup_indexes: HashMap::new(),
851                temporal_indexes: HashMap::new(),
852                pda_reverse_lookups: HashMap::new(),
853                pending_updates: DashMap::new(),
854                pending_instruction_events: DashMap::new(),
855                version_tracker: VersionTracker::new(),
856                instruction_dedup_cache: VersionTracker::with_capacity(
857                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
858                ),
859                config: state_config,
860                entity_name: "default".to_string(),
861            },
862        );
863        vm
864    }
865
866    /// Get a mutable reference to a state table by ID
867    /// Returns None if the state ID doesn't exist
868    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
869        self.states.get_mut(&state_id)
870    }
871
872    /// Get public access to registers (for metrics context)
873    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
874        &mut self.registers
875    }
876
877    /// Get public access to path cache (for metrics context)
878    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
879        &self.path_cache
880    }
881
882    /// Get the current update context
883    pub fn current_context(&self) -> Option<&UpdateContext> {
884        self.current_context.as_ref()
885    }
886
887    fn add_warning(&mut self, msg: String) {
888        self.warnings.push(msg);
889    }
890
891    pub fn take_warnings(&mut self) -> Vec<String> {
892        std::mem::take(&mut self.warnings)
893    }
894
895    pub fn has_warnings(&self) -> bool {
896        !self.warnings.is_empty()
897    }
898
899    pub fn update_state_from_register(
900        &mut self,
901        state_id: u32,
902        key: Value,
903        register: Register,
904    ) -> Result<()> {
905        let state = self.states.get(&state_id).ok_or("State table not found")?;
906        let value = self.registers[register].clone();
907        state.insert_with_eviction(key, value);
908        Ok(())
909    }
910
911    fn reset_registers(&mut self) {
912        for reg in &mut self.registers {
913            *reg = Value::Null;
914        }
915    }
916
917    /// Extract only the dirty fields from state (public for use by instruction hooks)
918    pub fn extract_partial_state(
919        &self,
920        state_reg: Register,
921        dirty_fields: &HashSet<String>,
922    ) -> Result<Value> {
923        let full_state = &self.registers[state_reg];
924
925        if dirty_fields.is_empty() {
926            return Ok(json!({}));
927        }
928
929        let mut partial = serde_json::Map::new();
930
931        for path in dirty_fields {
932            let segments: Vec<&str> = path.split('.').collect();
933
934            let mut current = full_state;
935            let mut found = true;
936
937            for segment in &segments {
938                match current.get(segment) {
939                    Some(v) => current = v,
940                    None => {
941                        found = false;
942                        break;
943                    }
944                }
945            }
946
947            if !found {
948                continue;
949            }
950
951            let mut target = &mut partial;
952            for (i, segment) in segments.iter().enumerate() {
953                if i == segments.len() - 1 {
954                    target.insert(segment.to_string(), current.clone());
955                } else {
956                    target
957                        .entry(segment.to_string())
958                        .or_insert_with(|| json!({}));
959                    target = target
960                        .get_mut(*segment)
961                        .and_then(|v| v.as_object_mut())
962                        .ok_or("Failed to build nested structure")?;
963                }
964            }
965        }
966
967        Ok(Value::Object(partial))
968    }
969
970    /// Extract a patch from state based on the DirtyTracker.
971    /// For Replaced fields: extracts the full value from state.
972    /// For Appended fields: emits only the appended values as an array.
973    pub fn extract_partial_state_with_tracker(
974        &self,
975        state_reg: Register,
976        tracker: &DirtyTracker,
977    ) -> Result<Value> {
978        let full_state = &self.registers[state_reg];
979
980        if tracker.is_empty() {
981            return Ok(json!({}));
982        }
983
984        let mut partial = serde_json::Map::new();
985
986        for (path, change) in tracker.iter() {
987            let segments: Vec<&str> = path.split('.').collect();
988
989            let value_to_insert = match change {
990                FieldChange::Replaced => {
991                    let mut current = full_state;
992                    let mut found = true;
993
994                    for segment in &segments {
995                        match current.get(*segment) {
996                            Some(v) => current = v,
997                            None => {
998                                found = false;
999                                break;
1000                            }
1001                        }
1002                    }
1003
1004                    if !found {
1005                        continue;
1006                    }
1007                    current.clone()
1008                }
1009                FieldChange::Appended(values) => Value::Array(values.clone()),
1010            };
1011
1012            let mut target = &mut partial;
1013            for (i, segment) in segments.iter().enumerate() {
1014                if i == segments.len() - 1 {
1015                    target.insert(segment.to_string(), value_to_insert.clone());
1016                } else {
1017                    target
1018                        .entry(segment.to_string())
1019                        .or_insert_with(|| json!({}));
1020                    target = target
1021                        .get_mut(*segment)
1022                        .and_then(|v| v.as_object_mut())
1023                        .ok_or("Failed to build nested structure")?;
1024                }
1025            }
1026        }
1027
1028        Ok(Value::Object(partial))
1029    }
1030
1031    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1032        if let Some(compiled) = self.path_cache.get(path) {
1033            self.cache_hits += 1;
1034            #[cfg(feature = "otel")]
1035            crate::vm_metrics::record_path_cache_hit();
1036            return compiled.clone();
1037        }
1038        #[cfg(feature = "otel")]
1039        crate::vm_metrics::record_path_cache_miss();
1040        let compiled = CompiledPath::new(path);
1041        self.path_cache.insert(path.to_string(), compiled.clone());
1042        compiled
1043    }
1044
1045    /// Process an event with optional context metadata
1046    #[cfg_attr(feature = "otel", instrument(
1047        name = "vm.process_event",
1048        skip(self, bytecode, event_value, log),
1049        level = "info",
1050        fields(
1051            event_type = %event_type,
1052            slot = context.as_ref().and_then(|c| c.slot),
1053        )
1054    ))]
1055    pub fn process_event(
1056        &mut self,
1057        bytecode: &MultiEntityBytecode,
1058        event_value: Value,
1059        event_type: &str,
1060        context: Option<&UpdateContext>,
1061        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1062    ) -> Result<Vec<Mutation>> {
1063        self.current_context = context.cloned();
1064
1065        let mut event_value = event_value;
1066        if let Some(ctx) = context {
1067            if let Some(obj) = event_value.as_object_mut() {
1068                obj.insert("__update_context".to_string(), ctx.to_value());
1069            }
1070        }
1071
1072        let mut all_mutations = Vec::new();
1073
1074        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1075            for entity_name in entity_names {
1076                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1077                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1078                        if let Some(ref mut log) = log {
1079                            log.set("entity", entity_name.clone());
1080                            log.inc("handlers", 1);
1081                        }
1082
1083                        let opcodes_before = self.instructions_executed;
1084                        let cache_before = self.cache_hits;
1085                        let pda_hits_before = self.pda_cache_hits;
1086                        let pda_misses_before = self.pda_cache_misses;
1087
1088                        let mutations = self.execute_handler(
1089                            handler,
1090                            &event_value,
1091                            event_type,
1092                            entity_bytecode.state_id,
1093                            entity_name,
1094                            entity_bytecode.computed_fields_evaluator.as_ref(),
1095                        )?;
1096
1097                        if let Some(ref mut log) = log {
1098                            log.inc(
1099                                "opcodes",
1100                                (self.instructions_executed - opcodes_before) as i64,
1101                            );
1102                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1103                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1104                            log.inc(
1105                                "pda_misses",
1106                                (self.pda_cache_misses - pda_misses_before) as i64,
1107                            );
1108                        }
1109
1110                        if mutations.is_empty() {
1111                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1112                                if event_type.ends_with("IxState") {
1113                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1114                                    let signature = context
1115                                        .and_then(|c| c.signature.clone())
1116                                        .unwrap_or_default();
1117                                    let _ = self.queue_instruction_event(
1118                                        entity_bytecode.state_id,
1119                                        QueuedInstructionEvent {
1120                                            pda_address: missed_pda,
1121                                            event_type: event_type.to_string(),
1122                                            event_data: event_value.clone(),
1123                                            slot,
1124                                            signature,
1125                                        },
1126                                    );
1127                                }
1128                            }
1129                        }
1130
1131                        all_mutations.extend(mutations);
1132
1133                        if let Some(registered_pda) = self.take_last_pda_registered() {
1134                            let pending_events = self.flush_pending_instruction_events(
1135                                entity_bytecode.state_id,
1136                                &registered_pda,
1137                            );
1138                            for pending in pending_events {
1139                                if let Some(pending_handler) =
1140                                    entity_bytecode.handlers.get(&pending.event_type)
1141                                {
1142                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1143                                        pending_handler,
1144                                        &pending.event_data,
1145                                        &pending.event_type,
1146                                        entity_bytecode.state_id,
1147                                        entity_name,
1148                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1149                                    ) {
1150                                        all_mutations.extend(reprocessed_mutations);
1151                                    }
1152                                }
1153                            }
1154                        }
1155                    } else if let Some(ref mut log) = log {
1156                        log.set("skip_reason", "no_handler");
1157                    }
1158                } else if let Some(ref mut log) = log {
1159                    log.set("skip_reason", "entity_not_found");
1160                }
1161            }
1162        } else if let Some(ref mut log) = log {
1163            log.set("skip_reason", "no_event_routing");
1164        }
1165
1166        if let Some(log) = log {
1167            log.set("mutations", all_mutations.len() as i64);
1168            if let Some(first) = all_mutations.first() {
1169                if let Some(key_str) = first.key.as_str() {
1170                    log.set("primary_key", key_str);
1171                } else if let Some(key_num) = first.key.as_u64() {
1172                    log.set("primary_key", key_num as i64);
1173                }
1174            }
1175            if let Some(state) = self.states.get(&0) {
1176                log.set("state_table_size", state.data.len() as i64);
1177            }
1178
1179            let warnings = self.take_warnings();
1180            if !warnings.is_empty() {
1181                log.set("warnings", warnings.len() as i64);
1182                log.set(
1183                    "warning_messages",
1184                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1185                );
1186                log.set_level(crate::canonical_log::LogLevel::Warn);
1187            }
1188        } else {
1189            self.warnings.clear();
1190        }
1191
1192        Ok(all_mutations)
1193    }
1194
1195    pub fn process_any(
1196        &mut self,
1197        bytecode: &MultiEntityBytecode,
1198        any: prost_types::Any,
1199    ) -> Result<Vec<Mutation>> {
1200        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1201        self.process_event(bytecode, event_value, &event_type, None, None)
1202    }
1203
1204    #[cfg_attr(feature = "otel", instrument(
1205        name = "vm.execute_handler",
1206        skip(self, handler, event_value, entity_evaluator),
1207        level = "debug",
1208        fields(
1209            event_type = %event_type,
1210            handler_opcodes = handler.len(),
1211        )
1212    ))]
1213    #[allow(clippy::type_complexity)]
1214    fn execute_handler(
1215        &mut self,
1216        handler: &[OpCode],
1217        event_value: &Value,
1218        event_type: &str,
1219        override_state_id: u32,
1220        entity_name: &str,
1221        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1222    ) -> Result<Vec<Mutation>> {
1223        self.reset_registers();
1224        self.last_pda_lookup_miss = None;
1225
1226        let mut pc: usize = 0;
1227        let mut output = Vec::new();
1228        let mut dirty_tracker = DirtyTracker::new();
1229
1230        while pc < handler.len() {
1231            match &handler[pc] {
1232                OpCode::LoadEventField {
1233                    path,
1234                    dest,
1235                    default,
1236                } => {
1237                    let value = self.load_field(event_value, path, default.as_ref())?;
1238                    self.registers[*dest] = value;
1239                    pc += 1;
1240                }
1241                OpCode::LoadConstant { value, dest } => {
1242                    self.registers[*dest] = value.clone();
1243                    pc += 1;
1244                }
1245                OpCode::CopyRegister { source, dest } => {
1246                    self.registers[*dest] = self.registers[*source].clone();
1247                    pc += 1;
1248                }
1249                OpCode::CopyRegisterIfNull { source, dest } => {
1250                    if self.registers[*dest].is_null() {
1251                        self.registers[*dest] = self.registers[*source].clone();
1252                    }
1253                    pc += 1;
1254                }
1255                OpCode::GetEventType { dest } => {
1256                    self.registers[*dest] = json!(event_type);
1257                    pc += 1;
1258                }
1259                OpCode::CreateObject { dest } => {
1260                    self.registers[*dest] = json!({});
1261                    pc += 1;
1262                }
1263                OpCode::SetField {
1264                    object,
1265                    path,
1266                    value,
1267                } => {
1268                    self.set_field_auto_vivify(*object, path, *value)?;
1269                    dirty_tracker.mark_replaced(path);
1270                    pc += 1;
1271                }
1272                OpCode::SetFields { object, fields } => {
1273                    for (path, value_reg) in fields {
1274                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1275                        dirty_tracker.mark_replaced(path);
1276                    }
1277                    pc += 1;
1278                }
1279                OpCode::GetField { object, path, dest } => {
1280                    let value = self.get_field(*object, path)?;
1281                    self.registers[*dest] = value;
1282                    pc += 1;
1283                }
1284                OpCode::ReadOrInitState {
1285                    state_id: _,
1286                    key,
1287                    default,
1288                    dest,
1289                } => {
1290                    let actual_state_id = override_state_id;
1291                    let entity_name_owned = entity_name.to_string();
1292                    self.states
1293                        .entry(actual_state_id)
1294                        .or_insert_with(|| StateTable {
1295                            data: DashMap::new(),
1296                            access_times: DashMap::new(),
1297                            lookup_indexes: HashMap::new(),
1298                            temporal_indexes: HashMap::new(),
1299                            pda_reverse_lookups: HashMap::new(),
1300                            pending_updates: DashMap::new(),
1301                            pending_instruction_events: DashMap::new(),
1302                            version_tracker: VersionTracker::new(),
1303                            instruction_dedup_cache: VersionTracker::with_capacity(
1304                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1305                            ),
1306                            config: StateTableConfig::default(),
1307                            entity_name: entity_name_owned,
1308                        });
1309                    let key_value = self.registers[*key].clone();
1310                    let warn_null_key = key_value.is_null()
1311                        && event_type.ends_with("State")
1312                        && !event_type.ends_with("IxState");
1313
1314                    if warn_null_key {
1315                        self.add_warning(format!(
1316                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1317                            key, event_type
1318                        ));
1319                    }
1320
1321                    let state = self
1322                        .states
1323                        .get(&actual_state_id)
1324                        .ok_or("State table not found")?;
1325
1326                    if !key_value.is_null() {
1327                        if let Some(ctx) = &self.current_context {
1328                            // Account updates: use recency check to discard stale updates
1329                            if ctx.is_account_update() {
1330                                if let (Some(slot), Some(write_version)) =
1331                                    (ctx.slot, ctx.write_version)
1332                                {
1333                                    if !state.is_fresh_update(
1334                                        &key_value,
1335                                        event_type,
1336                                        slot,
1337                                        write_version,
1338                                    ) {
1339                                        self.add_warning(format!(
1340                                            "Stale account update skipped: slot={}, write_version={}",
1341                                            slot, write_version
1342                                        ));
1343                                        return Ok(Vec::new());
1344                                    }
1345                                }
1346                            }
1347                            // Instruction updates: process all, but skip exact duplicates
1348                            else if ctx.is_instruction_update() {
1349                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1350                                    if state.is_duplicate_instruction(
1351                                        &key_value, event_type, slot, txn_index,
1352                                    ) {
1353                                        self.add_warning(format!(
1354                                            "Duplicate instruction skipped: slot={}, txn_index={}",
1355                                            slot, txn_index
1356                                        ));
1357                                        return Ok(Vec::new());
1358                                    }
1359                                }
1360                            }
1361                        }
1362                    }
1363                    let value = state
1364                        .get_and_touch(&key_value)
1365                        .unwrap_or_else(|| default.clone());
1366
1367                    self.registers[*dest] = value;
1368                    pc += 1;
1369                }
1370                OpCode::UpdateState {
1371                    state_id: _,
1372                    key,
1373                    value,
1374                } => {
1375                    let actual_state_id = override_state_id;
1376                    let state = self
1377                        .states
1378                        .get(&actual_state_id)
1379                        .ok_or("State table not found")?;
1380                    let key_value = self.registers[*key].clone();
1381                    let value_data = self.registers[*value].clone();
1382
1383                    state.insert_with_eviction(key_value, value_data);
1384                    pc += 1;
1385                }
1386                OpCode::AppendToArray {
1387                    object,
1388                    path,
1389                    value,
1390                } => {
1391                    let appended_value = self.registers[*value].clone();
1392                    let max_len = self
1393                        .states
1394                        .get(&override_state_id)
1395                        .map(|s| s.max_array_length())
1396                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1397                    self.append_to_array(*object, path, *value, max_len)?;
1398                    dirty_tracker.mark_appended(path, appended_value);
1399                    pc += 1;
1400                }
1401                OpCode::GetCurrentTimestamp { dest } => {
1402                    let timestamp = std::time::SystemTime::now()
1403                        .duration_since(std::time::UNIX_EPOCH)
1404                        .unwrap()
1405                        .as_secs() as i64;
1406                    self.registers[*dest] = json!(timestamp);
1407                    pc += 1;
1408                }
1409                OpCode::CreateEvent { dest, event_value } => {
1410                    let timestamp = std::time::SystemTime::now()
1411                        .duration_since(std::time::UNIX_EPOCH)
1412                        .unwrap()
1413                        .as_secs() as i64;
1414
1415                    // Filter out __update_context from the event data
1416                    let mut event_data = self.registers[*event_value].clone();
1417                    if let Some(obj) = event_data.as_object_mut() {
1418                        obj.remove("__update_context");
1419                    }
1420
1421                    // Create event with timestamp, data, and optional slot/signature from context
1422                    let mut event = serde_json::Map::new();
1423                    event.insert("timestamp".to_string(), json!(timestamp));
1424                    event.insert("data".to_string(), event_data);
1425
1426                    // Add slot and signature if available from current context
1427                    if let Some(ref ctx) = self.current_context {
1428                        if let Some(slot) = ctx.slot {
1429                            event.insert("slot".to_string(), json!(slot));
1430                        }
1431                        if let Some(ref signature) = ctx.signature {
1432                            event.insert("signature".to_string(), json!(signature));
1433                        }
1434                    }
1435
1436                    self.registers[*dest] = Value::Object(event);
1437                    pc += 1;
1438                }
1439                OpCode::CreateCapture {
1440                    dest,
1441                    capture_value,
1442                } => {
1443                    let timestamp = std::time::SystemTime::now()
1444                        .duration_since(std::time::UNIX_EPOCH)
1445                        .unwrap()
1446                        .as_secs() as i64;
1447
1448                    // Get the capture data (already filtered by load_field)
1449                    let capture_data = self.registers[*capture_value].clone();
1450
1451                    // Extract account_address from the original event if available
1452                    let account_address = event_value
1453                        .get("__account_address")
1454                        .and_then(|v| v.as_str())
1455                        .unwrap_or("")
1456                        .to_string();
1457
1458                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1459                    let mut capture = serde_json::Map::new();
1460                    capture.insert("timestamp".to_string(), json!(timestamp));
1461                    capture.insert("account_address".to_string(), json!(account_address));
1462                    capture.insert("data".to_string(), capture_data);
1463
1464                    // Add slot and signature if available from current context
1465                    if let Some(ref ctx) = self.current_context {
1466                        if let Some(slot) = ctx.slot {
1467                            capture.insert("slot".to_string(), json!(slot));
1468                        }
1469                        if let Some(ref signature) = ctx.signature {
1470                            capture.insert("signature".to_string(), json!(signature));
1471                        }
1472                    }
1473
1474                    self.registers[*dest] = Value::Object(capture);
1475                    pc += 1;
1476                }
1477                OpCode::Transform {
1478                    source,
1479                    dest,
1480                    transformation,
1481                } => {
1482                    if source == dest {
1483                        self.transform_in_place(*source, transformation)?;
1484                    } else {
1485                        let source_value = &self.registers[*source];
1486                        let value = self.apply_transformation(source_value, transformation)?;
1487                        self.registers[*dest] = value;
1488                    }
1489                    pc += 1;
1490                }
1491                OpCode::EmitMutation {
1492                    entity_name,
1493                    key,
1494                    state,
1495                } => {
1496                    let primary_key = self.registers[*key].clone();
1497
1498                    if primary_key.is_null() || dirty_tracker.is_empty() {
1499                        let reason = if dirty_tracker.is_empty() {
1500                            "no_fields_modified"
1501                        } else {
1502                            "null_primary_key"
1503                        };
1504                        self.add_warning(format!(
1505                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
1506                            entity_name,
1507                            reason,
1508                            dirty_tracker.len()
1509                        ));
1510                    } else {
1511                        let patch =
1512                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1513
1514                        let append = dirty_tracker.appended_paths();
1515                        let mutation = Mutation {
1516                            export: entity_name.clone(),
1517                            key: primary_key,
1518                            patch,
1519                            append,
1520                        };
1521                        output.push(mutation);
1522                    }
1523                    pc += 1;
1524                }
1525                OpCode::SetFieldIfNull {
1526                    object,
1527                    path,
1528                    value,
1529                } => {
1530                    let was_set = self.set_field_if_null(*object, path, *value)?;
1531                    if was_set {
1532                        dirty_tracker.mark_replaced(path);
1533                    }
1534                    pc += 1;
1535                }
1536                OpCode::SetFieldMax {
1537                    object,
1538                    path,
1539                    value,
1540                } => {
1541                    let was_updated = self.set_field_max(*object, path, *value)?;
1542                    if was_updated {
1543                        dirty_tracker.mark_replaced(path);
1544                    }
1545                    pc += 1;
1546                }
1547                OpCode::UpdateTemporalIndex {
1548                    state_id: _,
1549                    index_name,
1550                    lookup_value,
1551                    primary_key,
1552                    timestamp,
1553                } => {
1554                    let actual_state_id = override_state_id;
1555                    let state = self
1556                        .states
1557                        .get_mut(&actual_state_id)
1558                        .ok_or("State table not found")?;
1559                    let index = state
1560                        .temporal_indexes
1561                        .entry(index_name.clone())
1562                        .or_insert_with(TemporalIndex::new);
1563
1564                    let lookup_val = self.registers[*lookup_value].clone();
1565                    let pk_val = self.registers[*primary_key].clone();
1566                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1567                        val
1568                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
1569                        val as i64
1570                    } else {
1571                        return Err(format!(
1572                            "Timestamp must be a number (i64 or u64), got: {:?}",
1573                            self.registers[*timestamp]
1574                        )
1575                        .into());
1576                    };
1577
1578                    index.insert(lookup_val, pk_val, ts_val);
1579                    pc += 1;
1580                }
1581                OpCode::LookupTemporalIndex {
1582                    state_id: _,
1583                    index_name,
1584                    lookup_value,
1585                    timestamp,
1586                    dest,
1587                } => {
1588                    let actual_state_id = override_state_id;
1589                    let state = self
1590                        .states
1591                        .get(&actual_state_id)
1592                        .ok_or("State table not found")?;
1593                    let lookup_val = &self.registers[*lookup_value];
1594
1595                    let result = if self.registers[*timestamp].is_null() {
1596                        if let Some(index) = state.temporal_indexes.get(index_name) {
1597                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1598                        } else {
1599                            Value::Null
1600                        }
1601                    } else {
1602                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1603                            val
1604                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
1605                            val as i64
1606                        } else {
1607                            return Err(format!(
1608                                "Timestamp must be a number (i64 or u64), got: {:?}",
1609                                self.registers[*timestamp]
1610                            )
1611                            .into());
1612                        };
1613
1614                        if let Some(index) = state.temporal_indexes.get(index_name) {
1615                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1616                        } else {
1617                            Value::Null
1618                        }
1619                    };
1620
1621                    self.registers[*dest] = result;
1622                    pc += 1;
1623                }
1624                OpCode::UpdateLookupIndex {
1625                    state_id: _,
1626                    index_name,
1627                    lookup_value,
1628                    primary_key,
1629                } => {
1630                    let actual_state_id = override_state_id;
1631                    let state = self
1632                        .states
1633                        .get_mut(&actual_state_id)
1634                        .ok_or("State table not found")?;
1635                    let index = state
1636                        .lookup_indexes
1637                        .entry(index_name.clone())
1638                        .or_insert_with(LookupIndex::new);
1639
1640                    let lookup_val = self.registers[*lookup_value].clone();
1641                    let pk_val = self.registers[*primary_key].clone();
1642
1643                    index.insert(lookup_val, pk_val);
1644                    pc += 1;
1645                }
1646                OpCode::LookupIndex {
1647                    state_id: _,
1648                    index_name,
1649                    lookup_value,
1650                    dest,
1651                } => {
1652                    let actual_state_id = override_state_id;
1653                    let lookup_val = self.registers[*lookup_value].clone();
1654
1655                    let result = {
1656                        let state = self
1657                            .states
1658                            .get(&actual_state_id)
1659                            .ok_or("State table not found")?;
1660
1661                        if let Some(index) = state.lookup_indexes.get(index_name) {
1662                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1663                            #[cfg(feature = "otel")]
1664                            if found.is_null() {
1665                                crate::vm_metrics::record_lookup_index_miss(index_name);
1666                            } else {
1667                                crate::vm_metrics::record_lookup_index_hit(index_name);
1668                            }
1669                            found
1670                        } else {
1671                            Value::Null
1672                        }
1673                    };
1674
1675                    let final_result = if result.is_null() {
1676                        if let Some(pda_str) = lookup_val.as_str() {
1677                            let state = self
1678                                .states
1679                                .get_mut(&actual_state_id)
1680                                .ok_or("State table not found")?;
1681
1682                            if let Some(pda_lookup) =
1683                                state.pda_reverse_lookups.get_mut("default_pda_lookup")
1684                            {
1685                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1686                                    Value::String(resolved)
1687                                } else {
1688                                    self.last_pda_lookup_miss = Some(pda_str.to_string());
1689                                    Value::Null
1690                                }
1691                            } else {
1692                                self.last_pda_lookup_miss = Some(pda_str.to_string());
1693                                Value::Null
1694                            }
1695                        } else {
1696                            Value::Null
1697                        }
1698                    } else {
1699                        result
1700                    };
1701
1702                    self.registers[*dest] = final_result;
1703                    pc += 1;
1704                }
1705                OpCode::SetFieldSum {
1706                    object,
1707                    path,
1708                    value,
1709                } => {
1710                    let was_updated = self.set_field_sum(*object, path, *value)?;
1711                    if was_updated {
1712                        dirty_tracker.mark_replaced(path);
1713                    }
1714                    pc += 1;
1715                }
1716                OpCode::SetFieldIncrement { object, path } => {
1717                    let was_updated = self.set_field_increment(*object, path)?;
1718                    if was_updated {
1719                        dirty_tracker.mark_replaced(path);
1720                    }
1721                    pc += 1;
1722                }
1723                OpCode::SetFieldMin {
1724                    object,
1725                    path,
1726                    value,
1727                } => {
1728                    let was_updated = self.set_field_min(*object, path, *value)?;
1729                    if was_updated {
1730                        dirty_tracker.mark_replaced(path);
1731                    }
1732                    pc += 1;
1733                }
1734                OpCode::AddToUniqueSet {
1735                    state_id: _,
1736                    set_name,
1737                    value,
1738                    count_object,
1739                    count_path,
1740                } => {
1741                    let value_to_add = self.registers[*value].clone();
1742
1743                    // Store the unique set within the entity object, not in the state table
1744                    // This ensures each entity instance has its own unique set
1745                    let set_field_path = format!("__unique_set:{}", set_name);
1746
1747                    // Get or create the unique set from the entity object
1748                    let mut set: HashSet<Value> =
1749                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1750                            if !existing.is_null() {
1751                                serde_json::from_value(existing).unwrap_or_default()
1752                            } else {
1753                                HashSet::new()
1754                            }
1755                        } else {
1756                            HashSet::new()
1757                        };
1758
1759                    // Add value to set
1760                    let was_new = set.insert(value_to_add);
1761
1762                    // Store updated set back in the entity object
1763                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1764                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1765                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1766
1767                    // Update the count field in the object
1768                    if was_new {
1769                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1770                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1771                        dirty_tracker.mark_replaced(count_path);
1772                    }
1773
1774                    pc += 1;
1775                }
1776                OpCode::ConditionalSetField {
1777                    object,
1778                    path,
1779                    value,
1780                    condition_field,
1781                    condition_op,
1782                    condition_value,
1783                } => {
1784                    let field_value = self.load_field(event_value, condition_field, None)?;
1785                    let condition_met =
1786                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1787
1788                    if condition_met {
1789                        self.set_field_auto_vivify(*object, path, *value)?;
1790                        dirty_tracker.mark_replaced(path);
1791                    }
1792                    pc += 1;
1793                }
1794                OpCode::ConditionalIncrement {
1795                    object,
1796                    path,
1797                    condition_field,
1798                    condition_op,
1799                    condition_value,
1800                } => {
1801                    let field_value = self.load_field(event_value, condition_field, None)?;
1802                    let condition_met =
1803                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1804
1805                    if condition_met {
1806                        let was_updated = self.set_field_increment(*object, path)?;
1807                        if was_updated {
1808                            dirty_tracker.mark_replaced(path);
1809                        }
1810                    }
1811                    pc += 1;
1812                }
1813                OpCode::EvaluateComputedFields {
1814                    state,
1815                    computed_paths,
1816                } => {
1817                    if let Some(evaluator) = entity_evaluator {
1818                        let old_values: Vec<_> = computed_paths
1819                            .iter()
1820                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1821                            .collect();
1822
1823                        let state_value = &mut self.registers[*state];
1824                        let eval_result = evaluator(state_value);
1825
1826                        if eval_result.is_ok() {
1827                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1828                                let new_value =
1829                                    Self::get_value_at_path(&self.registers[*state], path);
1830
1831                                if new_value != *old_value {
1832                                    dirty_tracker.mark_replaced(path);
1833                                }
1834                            }
1835                        }
1836                    }
1837                    pc += 1;
1838                }
1839                OpCode::UpdatePdaReverseLookup {
1840                    state_id: _,
1841                    lookup_name,
1842                    pda_address,
1843                    primary_key,
1844                } => {
1845                    let actual_state_id = override_state_id;
1846                    let state = self
1847                        .states
1848                        .get_mut(&actual_state_id)
1849                        .ok_or("State table not found")?;
1850
1851                    let pda_val = self.registers[*pda_address].clone();
1852                    let pk_val = self.registers[*primary_key].clone();
1853
1854                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1855                        let pda_lookup = state
1856                            .pda_reverse_lookups
1857                            .entry(lookup_name.clone())
1858                            .or_insert_with(|| {
1859                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1860                            });
1861
1862                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1863                        self.last_pda_registered = Some(pda_str.to_string());
1864                    } else if !pk_val.is_null() {
1865                        if let Some(pk_num) = pk_val.as_u64() {
1866                            if let Some(pda_str) = pda_val.as_str() {
1867                                let pda_lookup = state
1868                                    .pda_reverse_lookups
1869                                    .entry(lookup_name.clone())
1870                                    .or_insert_with(|| {
1871                                        PdaReverseLookup::new(
1872                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1873                                        )
1874                                    });
1875
1876                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1877                                self.last_pda_registered = Some(pda_str.to_string());
1878                            }
1879                        }
1880                    }
1881
1882                    pc += 1;
1883                }
1884            }
1885
1886            self.instructions_executed += 1;
1887        }
1888
1889        Ok(output)
1890    }
1891
1892    fn load_field(
1893        &self,
1894        event_value: &Value,
1895        path: &FieldPath,
1896        default: Option<&Value>,
1897    ) -> Result<Value> {
1898        if path.segments.is_empty() {
1899            if let Some(obj) = event_value.as_object() {
1900                let filtered: serde_json::Map<String, Value> = obj
1901                    .iter()
1902                    .filter(|(k, _)| !k.starts_with("__"))
1903                    .map(|(k, v)| (k.clone(), v.clone()))
1904                    .collect();
1905                return Ok(Value::Object(filtered));
1906            }
1907            return Ok(event_value.clone());
1908        }
1909
1910        let mut current = event_value;
1911        for segment in path.segments.iter() {
1912            current = match current.get(segment) {
1913                Some(v) => v,
1914                None => return Ok(default.cloned().unwrap_or(Value::Null)),
1915            };
1916        }
1917
1918        Ok(current.clone())
1919    }
1920
1921    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1922        let mut current = value;
1923        for segment in path.split('.') {
1924            current = current.get(segment)?;
1925        }
1926        Some(current.clone())
1927    }
1928
1929    fn set_field_auto_vivify(
1930        &mut self,
1931        object_reg: Register,
1932        path: &str,
1933        value_reg: Register,
1934    ) -> Result<()> {
1935        let compiled = self.get_compiled_path(path);
1936        let segments = compiled.segments();
1937        let value = self.registers[value_reg].clone();
1938
1939        if !self.registers[object_reg].is_object() {
1940            self.registers[object_reg] = json!({});
1941        }
1942
1943        let obj = self.registers[object_reg]
1944            .as_object_mut()
1945            .ok_or("Not an object")?;
1946
1947        let mut current = obj;
1948        for (i, segment) in segments.iter().enumerate() {
1949            if i == segments.len() - 1 {
1950                current.insert(segment.to_string(), value);
1951                return Ok(());
1952            } else {
1953                current
1954                    .entry(segment.to_string())
1955                    .or_insert_with(|| json!({}));
1956                current = current
1957                    .get_mut(segment)
1958                    .and_then(|v| v.as_object_mut())
1959                    .ok_or("Path collision: expected object")?;
1960            }
1961        }
1962
1963        Ok(())
1964    }
1965
1966    fn set_field_if_null(
1967        &mut self,
1968        object_reg: Register,
1969        path: &str,
1970        value_reg: Register,
1971    ) -> Result<bool> {
1972        let compiled = self.get_compiled_path(path);
1973        let segments = compiled.segments();
1974        let value = self.registers[value_reg].clone();
1975
1976        // SetOnce should only set meaningful values. A null source typically means
1977        // the field doesn't exist in this event type (e.g., instruction events don't
1978        // have account data). Skip to preserve any existing value.
1979        if value.is_null() {
1980            return Ok(false);
1981        }
1982
1983        if !self.registers[object_reg].is_object() {
1984            self.registers[object_reg] = json!({});
1985        }
1986
1987        let obj = self.registers[object_reg]
1988            .as_object_mut()
1989            .ok_or("Not an object")?;
1990
1991        let mut current = obj;
1992        for (i, segment) in segments.iter().enumerate() {
1993            if i == segments.len() - 1 {
1994                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1995                    current.insert(segment.to_string(), value);
1996                    return Ok(true);
1997                }
1998                return Ok(false);
1999            } else {
2000                current
2001                    .entry(segment.to_string())
2002                    .or_insert_with(|| json!({}));
2003                current = current
2004                    .get_mut(segment)
2005                    .and_then(|v| v.as_object_mut())
2006                    .ok_or("Path collision: expected object")?;
2007            }
2008        }
2009
2010        Ok(false)
2011    }
2012
2013    fn set_field_max(
2014        &mut self,
2015        object_reg: Register,
2016        path: &str,
2017        value_reg: Register,
2018    ) -> Result<bool> {
2019        let compiled = self.get_compiled_path(path);
2020        let segments = compiled.segments();
2021        let new_value = self.registers[value_reg].clone();
2022
2023        if !self.registers[object_reg].is_object() {
2024            self.registers[object_reg] = json!({});
2025        }
2026
2027        let obj = self.registers[object_reg]
2028            .as_object_mut()
2029            .ok_or("Not an object")?;
2030
2031        let mut current = obj;
2032        for (i, segment) in segments.iter().enumerate() {
2033            if i == segments.len() - 1 {
2034                let should_update = if let Some(current_value) = current.get(segment) {
2035                    if current_value.is_null() {
2036                        true
2037                    } else {
2038                        match (current_value.as_i64(), new_value.as_i64()) {
2039                            (Some(current_val), Some(new_val)) => new_val > current_val,
2040                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2041                                new_value.as_u64().unwrap() as i64 > current_val
2042                            }
2043                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2044                                new_val > current_value.as_u64().unwrap() as i64
2045                            }
2046                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2047                                (Some(current_val), Some(new_val)) => new_val > current_val,
2048                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2049                                    (Some(current_val), Some(new_val)) => new_val > current_val,
2050                                    _ => false,
2051                                },
2052                            },
2053                            _ => false,
2054                        }
2055                    }
2056                } else {
2057                    true
2058                };
2059
2060                if should_update {
2061                    current.insert(segment.to_string(), new_value);
2062                    return Ok(true);
2063                }
2064                return Ok(false);
2065            } else {
2066                current
2067                    .entry(segment.to_string())
2068                    .or_insert_with(|| json!({}));
2069                current = current
2070                    .get_mut(segment)
2071                    .and_then(|v| v.as_object_mut())
2072                    .ok_or("Path collision: expected object")?;
2073            }
2074        }
2075
2076        Ok(false)
2077    }
2078
2079    fn set_field_sum(
2080        &mut self,
2081        object_reg: Register,
2082        path: &str,
2083        value_reg: Register,
2084    ) -> Result<bool> {
2085        let compiled = self.get_compiled_path(path);
2086        let segments = compiled.segments();
2087        let new_value = &self.registers[value_reg];
2088
2089        // Extract numeric value before borrowing object_reg mutably
2090        let new_val_num = new_value
2091            .as_i64()
2092            .or_else(|| new_value.as_u64().map(|n| n as i64))
2093            .ok_or("Sum requires numeric value")?;
2094
2095        if !self.registers[object_reg].is_object() {
2096            self.registers[object_reg] = json!({});
2097        }
2098
2099        let obj = self.registers[object_reg]
2100            .as_object_mut()
2101            .ok_or("Not an object")?;
2102
2103        let mut current = obj;
2104        for (i, segment) in segments.iter().enumerate() {
2105            if i == segments.len() - 1 {
2106                let current_val = current
2107                    .get(segment)
2108                    .and_then(|v| {
2109                        if v.is_null() {
2110                            None
2111                        } else {
2112                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2113                        }
2114                    })
2115                    .unwrap_or(0);
2116
2117                let sum = current_val + new_val_num;
2118                current.insert(segment.to_string(), json!(sum));
2119                return Ok(true);
2120            } else {
2121                current
2122                    .entry(segment.to_string())
2123                    .or_insert_with(|| json!({}));
2124                current = current
2125                    .get_mut(segment)
2126                    .and_then(|v| v.as_object_mut())
2127                    .ok_or("Path collision: expected object")?;
2128            }
2129        }
2130
2131        Ok(false)
2132    }
2133
2134    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2135        let compiled = self.get_compiled_path(path);
2136        let segments = compiled.segments();
2137
2138        if !self.registers[object_reg].is_object() {
2139            self.registers[object_reg] = json!({});
2140        }
2141
2142        let obj = self.registers[object_reg]
2143            .as_object_mut()
2144            .ok_or("Not an object")?;
2145
2146        let mut current = obj;
2147        for (i, segment) in segments.iter().enumerate() {
2148            if i == segments.len() - 1 {
2149                // Get current value (default to 0 if null/missing)
2150                let current_val = current
2151                    .get(segment)
2152                    .and_then(|v| {
2153                        if v.is_null() {
2154                            None
2155                        } else {
2156                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2157                        }
2158                    })
2159                    .unwrap_or(0);
2160
2161                let incremented = current_val + 1;
2162                current.insert(segment.to_string(), json!(incremented));
2163                return Ok(true);
2164            } else {
2165                current
2166                    .entry(segment.to_string())
2167                    .or_insert_with(|| json!({}));
2168                current = current
2169                    .get_mut(segment)
2170                    .and_then(|v| v.as_object_mut())
2171                    .ok_or("Path collision: expected object")?;
2172            }
2173        }
2174
2175        Ok(false)
2176    }
2177
2178    fn set_field_min(
2179        &mut self,
2180        object_reg: Register,
2181        path: &str,
2182        value_reg: Register,
2183    ) -> Result<bool> {
2184        let compiled = self.get_compiled_path(path);
2185        let segments = compiled.segments();
2186        let new_value = self.registers[value_reg].clone();
2187
2188        if !self.registers[object_reg].is_object() {
2189            self.registers[object_reg] = json!({});
2190        }
2191
2192        let obj = self.registers[object_reg]
2193            .as_object_mut()
2194            .ok_or("Not an object")?;
2195
2196        let mut current = obj;
2197        for (i, segment) in segments.iter().enumerate() {
2198            if i == segments.len() - 1 {
2199                let should_update = if let Some(current_value) = current.get(segment) {
2200                    if current_value.is_null() {
2201                        true
2202                    } else {
2203                        match (current_value.as_i64(), new_value.as_i64()) {
2204                            (Some(current_val), Some(new_val)) => new_val < current_val,
2205                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2206                                (new_value.as_u64().unwrap() as i64) < current_val
2207                            }
2208                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2209                                new_val < current_value.as_u64().unwrap() as i64
2210                            }
2211                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2212                                (Some(current_val), Some(new_val)) => new_val < current_val,
2213                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2214                                    (Some(current_val), Some(new_val)) => new_val < current_val,
2215                                    _ => false,
2216                                },
2217                            },
2218                            _ => false,
2219                        }
2220                    }
2221                } else {
2222                    true
2223                };
2224
2225                if should_update {
2226                    current.insert(segment.to_string(), new_value);
2227                    return Ok(true);
2228                }
2229                return Ok(false);
2230            } else {
2231                current
2232                    .entry(segment.to_string())
2233                    .or_insert_with(|| json!({}));
2234                current = current
2235                    .get_mut(segment)
2236                    .and_then(|v| v.as_object_mut())
2237                    .ok_or("Path collision: expected object")?;
2238            }
2239        }
2240
2241        Ok(false)
2242    }
2243
2244    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2245        let compiled = self.get_compiled_path(path);
2246        let segments = compiled.segments();
2247        let mut current = &self.registers[object_reg];
2248
2249        for segment in segments {
2250            current = current
2251                .get(segment)
2252                .ok_or_else(|| format!("Field not found: {}", segment))?;
2253        }
2254
2255        Ok(current.clone())
2256    }
2257
2258    fn append_to_array(
2259        &mut self,
2260        object_reg: Register,
2261        path: &str,
2262        value_reg: Register,
2263        max_length: usize,
2264    ) -> Result<()> {
2265        let compiled = self.get_compiled_path(path);
2266        let segments = compiled.segments();
2267        let value = self.registers[value_reg].clone();
2268
2269        if !self.registers[object_reg].is_object() {
2270            self.registers[object_reg] = json!({});
2271        }
2272
2273        let obj = self.registers[object_reg]
2274            .as_object_mut()
2275            .ok_or("Not an object")?;
2276
2277        let mut current = obj;
2278        for (i, segment) in segments.iter().enumerate() {
2279            if i == segments.len() - 1 {
2280                current
2281                    .entry(segment.to_string())
2282                    .or_insert_with(|| json!([]));
2283                let arr = current
2284                    .get_mut(segment)
2285                    .and_then(|v| v.as_array_mut())
2286                    .ok_or("Path is not an array")?;
2287                arr.push(value.clone());
2288
2289                if arr.len() > max_length {
2290                    let excess = arr.len() - max_length;
2291                    arr.drain(0..excess);
2292                }
2293            } else {
2294                current
2295                    .entry(segment.to_string())
2296                    .or_insert_with(|| json!({}));
2297                current = current
2298                    .get_mut(segment)
2299                    .and_then(|v| v.as_object_mut())
2300                    .ok_or("Path collision: expected object")?;
2301            }
2302        }
2303
2304        Ok(())
2305    }
2306
2307    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2308        let value = &self.registers[reg];
2309        let transformed = self.apply_transformation(value, transformation)?;
2310        self.registers[reg] = transformed;
2311        Ok(())
2312    }
2313
2314    fn apply_transformation(
2315        &self,
2316        value: &Value,
2317        transformation: &Transformation,
2318    ) -> Result<Value> {
2319        match transformation {
2320            Transformation::HexEncode => {
2321                if let Some(arr) = value.as_array() {
2322                    let bytes: Vec<u8> = arr
2323                        .iter()
2324                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2325                        .collect();
2326                    let hex = hex::encode(&bytes);
2327                    Ok(json!(hex))
2328                } else {
2329                    Err("HexEncode requires an array of numbers".into())
2330                }
2331            }
2332            Transformation::HexDecode => {
2333                if let Some(s) = value.as_str() {
2334                    let s = s.strip_prefix("0x").unwrap_or(s);
2335                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2336                    Ok(json!(bytes))
2337                } else {
2338                    Err("HexDecode requires a string".into())
2339                }
2340            }
2341            Transformation::Base58Encode => {
2342                if let Some(arr) = value.as_array() {
2343                    let bytes: Vec<u8> = arr
2344                        .iter()
2345                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2346                        .collect();
2347                    let encoded = bs58::encode(&bytes).into_string();
2348                    Ok(json!(encoded))
2349                } else if value.is_string() {
2350                    Ok(value.clone())
2351                } else {
2352                    Err("Base58Encode requires an array of numbers".into())
2353                }
2354            }
2355            Transformation::Base58Decode => {
2356                if let Some(s) = value.as_str() {
2357                    let bytes = bs58::decode(s)
2358                        .into_vec()
2359                        .map_err(|e| format!("Base58 decode error: {}", e))?;
2360                    Ok(json!(bytes))
2361                } else {
2362                    Err("Base58Decode requires a string".into())
2363                }
2364            }
2365            Transformation::ToString => Ok(json!(value.to_string())),
2366            Transformation::ToNumber => {
2367                if let Some(s) = value.as_str() {
2368                    let n = s
2369                        .parse::<i64>()
2370                        .map_err(|e| format!("Parse error: {}", e))?;
2371                    Ok(json!(n))
2372                } else {
2373                    Ok(value.clone())
2374                }
2375            }
2376        }
2377    }
2378
2379    fn evaluate_comparison(
2380        &self,
2381        field_value: &Value,
2382        op: &ComparisonOp,
2383        condition_value: &Value,
2384    ) -> Result<bool> {
2385        use ComparisonOp::*;
2386
2387        match op {
2388            Equal => Ok(field_value == condition_value),
2389            NotEqual => Ok(field_value != condition_value),
2390            GreaterThan => {
2391                // Try to compare as numbers
2392                match (field_value.as_i64(), condition_value.as_i64()) {
2393                    (Some(a), Some(b)) => Ok(a > b),
2394                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
2395                        (Some(a), Some(b)) => Ok(a > b),
2396                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
2397                            (Some(a), Some(b)) => Ok(a > b),
2398                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2399                        },
2400                    },
2401                }
2402            }
2403            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2404                (Some(a), Some(b)) => Ok(a >= b),
2405                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2406                    (Some(a), Some(b)) => Ok(a >= b),
2407                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2408                        (Some(a), Some(b)) => Ok(a >= b),
2409                        _ => {
2410                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2411                        }
2412                    },
2413                },
2414            },
2415            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2416                (Some(a), Some(b)) => Ok(a < b),
2417                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2418                    (Some(a), Some(b)) => Ok(a < b),
2419                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2420                        (Some(a), Some(b)) => Ok(a < b),
2421                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
2422                    },
2423                },
2424            },
2425            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2426                (Some(a), Some(b)) => Ok(a <= b),
2427                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2428                    (Some(a), Some(b)) => Ok(a <= b),
2429                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2430                        (Some(a), Some(b)) => Ok(a <= b),
2431                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2432                    },
2433                },
2434            },
2435        }
2436    }
2437
2438    /// Update a PDA reverse lookup and return pending updates for reprocessing.
2439    /// Returns any pending account updates that were queued for this PDA.
2440    /// ```ignore
2441    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2442    /// for update in pending {
2443    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
2444    /// }
2445    /// ```
2446    #[cfg_attr(feature = "otel", instrument(
2447        name = "vm.update_pda_lookup",
2448        skip(self),
2449        fields(
2450            pda = %pda_address,
2451            seed = %seed_value,
2452        )
2453    ))]
2454    pub fn update_pda_reverse_lookup(
2455        &mut self,
2456        state_id: u32,
2457        lookup_name: &str,
2458        pda_address: String,
2459        seed_value: String,
2460    ) -> Result<Vec<PendingAccountUpdate>> {
2461        let state = self
2462            .states
2463            .get_mut(&state_id)
2464            .ok_or("State table not found")?;
2465
2466        let lookup = state
2467            .pda_reverse_lookups
2468            .entry(lookup_name.to_string())
2469            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2470
2471        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2472
2473        if let Some(ref evicted) = evicted_pda {
2474            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2475                let count = evicted_updates.len();
2476                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2477            }
2478        }
2479
2480        // Flush and return pending updates for this PDA
2481        self.flush_pending_updates(state_id, &pda_address)
2482    }
2483
2484    /// Clean up expired pending updates that are older than the TTL
2485    ///
2486    /// Returns the number of updates that were removed.
2487    /// This should be called periodically to prevent memory leaks from orphaned updates.
2488    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2489        let state = match self.states.get_mut(&state_id) {
2490            Some(s) => s,
2491            None => return 0,
2492        };
2493
2494        let now = std::time::SystemTime::now()
2495            .duration_since(std::time::UNIX_EPOCH)
2496            .unwrap()
2497            .as_secs() as i64;
2498
2499        let mut removed_count = 0;
2500
2501        // Iterate through all pending updates and remove expired ones
2502        state.pending_updates.retain(|_pda_address, updates| {
2503            let original_len = updates.len();
2504
2505            updates.retain(|update| {
2506                let age = now - update.queued_at;
2507                age <= PENDING_UPDATE_TTL_SECONDS
2508            });
2509
2510            removed_count += original_len - updates.len();
2511
2512            // Remove the entry entirely if no updates remain
2513            !updates.is_empty()
2514        });
2515
2516        // Update the global counter
2517        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2518
2519        if removed_count > 0 {
2520            #[cfg(feature = "otel")]
2521            crate::vm_metrics::record_pending_updates_expired(
2522                removed_count as u64,
2523                &state.entity_name,
2524            );
2525        }
2526
2527        removed_count
2528    }
2529
2530    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2531    ///
2532    /// # Workflow
2533    ///
2534    /// This implements a deferred processing pattern for account updates when the PDA reverse
2535    /// lookup needed to resolve the primary key is not yet available:
2536    ///
2537    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2538    ///    is not available, call `queue_account_update()` to queue it for later.
2539    ///
2540    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2541    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2542    ///
2543    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2544    ///    ```ignore
2545    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2546    ///    for update in pending {
2547    ///        let mutations = vm.process_event(
2548    ///            &bytecode, update.account_data, &update.account_type, None, None
2549    ///        )?;
2550    ///    }
2551    ///    ```
2552    ///
2553    /// # Arguments
2554    ///
2555    /// * `state_id` - The state table ID
2556    /// * `pda_address` - The PDA address that needs reverse lookup
2557    /// * `account_type` - The event type name for reprocessing
2558    /// * `account_data` - The account data (event value) for reprocessing
2559    /// * `slot` - The slot number when this update occurred
2560    /// * `signature` - The transaction signature
2561    #[cfg_attr(feature = "otel", instrument(
2562        name = "vm.queue_account_update",
2563        skip(self, update),
2564        fields(
2565            pda = %update.pda_address,
2566            account_type = %update.account_type,
2567            slot = update.slot,
2568        )
2569    ))]
2570    pub fn queue_account_update(
2571        &mut self,
2572        state_id: u32,
2573        update: QueuedAccountUpdate,
2574    ) -> Result<()> {
2575        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2576            self.cleanup_expired_pending_updates(state_id);
2577            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2578                self.drop_oldest_pending_update(state_id)?;
2579            }
2580        }
2581
2582        let state = self
2583            .states
2584            .get_mut(&state_id)
2585            .ok_or("State table not found")?;
2586
2587        let pending = PendingAccountUpdate {
2588            account_type: update.account_type,
2589            pda_address: update.pda_address.clone(),
2590            account_data: update.account_data,
2591            slot: update.slot,
2592            write_version: update.write_version,
2593            signature: update.signature,
2594            queued_at: std::time::SystemTime::now()
2595                .duration_since(std::time::UNIX_EPOCH)
2596                .unwrap()
2597                .as_secs() as i64,
2598        };
2599
2600        let pda_address = pending.pda_address.clone();
2601        let slot = pending.slot;
2602
2603        let mut updates = state
2604            .pending_updates
2605            .entry(pda_address.clone())
2606            .or_insert_with(Vec::new);
2607
2608        let original_len = updates.len();
2609        updates.retain(|existing| existing.slot > slot);
2610        let removed_by_dedup = original_len - updates.len();
2611
2612        if removed_by_dedup > 0 {
2613            self.pending_queue_size = self
2614                .pending_queue_size
2615                .saturating_sub(removed_by_dedup as u64);
2616        }
2617
2618        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2619            updates.remove(0);
2620            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2621        }
2622
2623        updates.push(pending);
2624        #[cfg(feature = "otel")]
2625        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2626
2627        Ok(())
2628    }
2629
2630    pub fn queue_instruction_event(
2631        &mut self,
2632        state_id: u32,
2633        event: QueuedInstructionEvent,
2634    ) -> Result<()> {
2635        let state = self
2636            .states
2637            .get_mut(&state_id)
2638            .ok_or("State table not found")?;
2639
2640        let pda_address = event.pda_address.clone();
2641
2642        let pending = PendingInstructionEvent {
2643            event_type: event.event_type,
2644            pda_address: event.pda_address,
2645            event_data: event.event_data,
2646            slot: event.slot,
2647            signature: event.signature,
2648            queued_at: std::time::SystemTime::now()
2649                .duration_since(std::time::UNIX_EPOCH)
2650                .unwrap()
2651                .as_secs() as i64,
2652        };
2653
2654        let mut events = state
2655            .pending_instruction_events
2656            .entry(pda_address)
2657            .or_insert_with(Vec::new);
2658
2659        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2660            events.remove(0);
2661        }
2662
2663        events.push(pending);
2664
2665        Ok(())
2666    }
2667
2668    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2669        self.last_pda_lookup_miss.take()
2670    }
2671
2672    pub fn take_last_pda_registered(&mut self) -> Option<String> {
2673        self.last_pda_registered.take()
2674    }
2675
2676    pub fn flush_pending_instruction_events(
2677        &mut self,
2678        state_id: u32,
2679        pda_address: &str,
2680    ) -> Vec<PendingInstructionEvent> {
2681        let state = match self.states.get_mut(&state_id) {
2682            Some(s) => s,
2683            None => return Vec::new(),
2684        };
2685
2686        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2687            events
2688        } else {
2689            Vec::new()
2690        }
2691    }
2692
2693    /// Get statistics about the pending queue for monitoring
2694    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2695        let state = self.states.get(&state_id)?;
2696
2697        let now = std::time::SystemTime::now()
2698            .duration_since(std::time::UNIX_EPOCH)
2699            .unwrap()
2700            .as_secs() as i64;
2701
2702        let mut total_updates = 0;
2703        let mut oldest_timestamp = now;
2704        let mut largest_pda_queue = 0;
2705        let mut estimated_memory = 0;
2706
2707        for entry in state.pending_updates.iter() {
2708            let (_, updates) = entry.pair();
2709            total_updates += updates.len();
2710            largest_pda_queue = largest_pda_queue.max(updates.len());
2711
2712            for update in updates.iter() {
2713                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2714                // Rough memory estimate
2715                estimated_memory += update.account_type.len() +
2716                                   update.pda_address.len() +
2717                                   update.signature.len() +
2718                                   16 + // slot + queued_at
2719                                   estimate_json_size(&update.account_data);
2720            }
2721        }
2722
2723        Some(PendingQueueStats {
2724            total_updates,
2725            unique_pdas: state.pending_updates.len(),
2726            oldest_age_seconds: now - oldest_timestamp,
2727            largest_pda_queue_size: largest_pda_queue,
2728            estimated_memory_bytes: estimated_memory,
2729        })
2730    }
2731
2732    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2733        let mut stats = VmMemoryStats {
2734            path_cache_size: self.path_cache.len(),
2735            ..Default::default()
2736        };
2737
2738        if let Some(state) = self.states.get(&state_id) {
2739            stats.state_table_entity_count = state.data.len();
2740            stats.state_table_max_entries = state.config.max_entries;
2741            stats.state_table_at_capacity = state.is_at_capacity();
2742
2743            stats.lookup_index_count = state.lookup_indexes.len();
2744            stats.lookup_index_total_entries =
2745                state.lookup_indexes.values().map(|idx| idx.len()).sum();
2746
2747            stats.temporal_index_count = state.temporal_indexes.len();
2748            stats.temporal_index_total_entries = state
2749                .temporal_indexes
2750                .values()
2751                .map(|idx| idx.total_entries())
2752                .sum();
2753
2754            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2755            stats.pda_reverse_lookup_total_entries = state
2756                .pda_reverse_lookups
2757                .values()
2758                .map(|lookup| lookup.len())
2759                .sum();
2760
2761            stats.version_tracker_entries = state.version_tracker.len();
2762
2763            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2764        }
2765
2766        stats
2767    }
2768
2769    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2770        let pending_removed = self.cleanup_expired_pending_updates(state_id);
2771        let temporal_removed = self.cleanup_temporal_indexes(state_id);
2772
2773        #[cfg(feature = "otel")]
2774        if let Some(state) = self.states.get(&state_id) {
2775            crate::vm_metrics::record_cleanup(
2776                pending_removed,
2777                temporal_removed,
2778                &state.entity_name,
2779            );
2780        }
2781
2782        CleanupResult {
2783            pending_updates_removed: pending_removed,
2784            temporal_entries_removed: temporal_removed,
2785        }
2786    }
2787
2788    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2789        let state = match self.states.get_mut(&state_id) {
2790            Some(s) => s,
2791            None => return 0,
2792        };
2793
2794        let now = std::time::SystemTime::now()
2795            .duration_since(std::time::UNIX_EPOCH)
2796            .unwrap()
2797            .as_secs() as i64;
2798
2799        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2800        let mut total_removed = 0;
2801
2802        for (_, index) in state.temporal_indexes.iter_mut() {
2803            total_removed += index.cleanup_expired(cutoff);
2804        }
2805
2806        total_removed
2807    }
2808
2809    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2810        let state = self.states.get(&state_id)?;
2811
2812        if state.is_at_capacity() {
2813            Some(CapacityWarning {
2814                current_entries: state.data.len(),
2815                max_entries: state.config.max_entries,
2816                entries_over_limit: state.entries_over_limit(),
2817            })
2818        } else {
2819            None
2820        }
2821    }
2822
2823    /// Drop the oldest pending update across all PDAs
2824    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2825        let state = self
2826            .states
2827            .get_mut(&state_id)
2828            .ok_or("State table not found")?;
2829
2830        let mut oldest_pda: Option<String> = None;
2831        let mut oldest_timestamp = i64::MAX;
2832
2833        // Find the PDA with the oldest update
2834        for entry in state.pending_updates.iter() {
2835            let (pda, updates) = entry.pair();
2836            if let Some(update) = updates.first() {
2837                if update.queued_at < oldest_timestamp {
2838                    oldest_timestamp = update.queued_at;
2839                    oldest_pda = Some(pda.clone());
2840                }
2841            }
2842        }
2843
2844        // Remove the oldest update
2845        if let Some(pda) = oldest_pda {
2846            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2847                if !updates.is_empty() {
2848                    updates.remove(0);
2849                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2850
2851                    // Remove the entry if it's now empty
2852                    if updates.is_empty() {
2853                        drop(updates);
2854                        state.pending_updates.remove(&pda);
2855                    }
2856                }
2857            }
2858        }
2859
2860        Ok(())
2861    }
2862
2863    /// Flush and return pending updates for a PDA for external reprocessing
2864    ///
2865    /// Returns the pending updates that were queued for this PDA address.
2866    /// The caller should reprocess these through the VM using process_event().
2867    fn flush_pending_updates(
2868        &mut self,
2869        state_id: u32,
2870        pda_address: &str,
2871    ) -> Result<Vec<PendingAccountUpdate>> {
2872        let state = self
2873            .states
2874            .get_mut(&state_id)
2875            .ok_or("State table not found")?;
2876
2877        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2878            let count = pending_updates.len();
2879            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2880            #[cfg(feature = "otel")]
2881            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2882            Ok(pending_updates)
2883        } else {
2884            Ok(Vec::new())
2885        }
2886    }
2887
2888    /// Try to resolve a primary key via PDA reverse lookup
2889    pub fn try_pda_reverse_lookup(
2890        &mut self,
2891        state_id: u32,
2892        lookup_name: &str,
2893        pda_address: &str,
2894    ) -> Option<String> {
2895        let state = self.states.get_mut(&state_id)?;
2896
2897        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2898            if let Some(value) = lookup.lookup(pda_address) {
2899                self.pda_cache_hits += 1;
2900                return Some(value);
2901            }
2902        }
2903
2904        self.pda_cache_misses += 1;
2905        None
2906    }
2907
2908    // ============================================================================
2909    // Computed Expression Evaluator (Task 5)
2910    // ============================================================================
2911
2912    /// Evaluate a computed expression AST against the current state
2913    /// This is the core runtime evaluator for computed fields from the AST
2914    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2915        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2916    }
2917
2918    /// Evaluate a computed expression with a variable environment (for let bindings)
2919    fn evaluate_computed_expr_with_env(
2920        &self,
2921        expr: &ComputedExpr,
2922        state: &Value,
2923        env: &std::collections::HashMap<String, Value>,
2924    ) -> Result<Value> {
2925        match expr {
2926            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2927
2928            ComputedExpr::Var { name } => env
2929                .get(name)
2930                .cloned()
2931                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2932
2933            ComputedExpr::Let { name, value, body } => {
2934                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2935                let mut new_env = env.clone();
2936                new_env.insert(name.clone(), val);
2937                self.evaluate_computed_expr_with_env(body, state, &new_env)
2938            }
2939
2940            ComputedExpr::If {
2941                condition,
2942                then_branch,
2943                else_branch,
2944            } => {
2945                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2946                if self.value_to_bool(&cond_val) {
2947                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2948                } else {
2949                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2950                }
2951            }
2952
2953            ComputedExpr::None => Ok(Value::Null),
2954
2955            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2956
2957            ComputedExpr::Slice { expr, start, end } => {
2958                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2959                match val {
2960                    Value::Array(arr) => {
2961                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2962                        Ok(Value::Array(slice))
2963                    }
2964                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2965                }
2966            }
2967
2968            ComputedExpr::Index { expr, index } => {
2969                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2970                match val {
2971                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2972                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2973                }
2974            }
2975
2976            ComputedExpr::U64FromLeBytes { bytes } => {
2977                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2978                let byte_vec = self.value_to_bytes(&val)?;
2979                if byte_vec.len() < 8 {
2980                    return Err(format!(
2981                        "u64::from_le_bytes requires 8 bytes, got {}",
2982                        byte_vec.len()
2983                    )
2984                    .into());
2985                }
2986                let arr: [u8; 8] = byte_vec[..8]
2987                    .try_into()
2988                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2989                Ok(json!(u64::from_le_bytes(arr)))
2990            }
2991
2992            ComputedExpr::U64FromBeBytes { bytes } => {
2993                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2994                let byte_vec = self.value_to_bytes(&val)?;
2995                if byte_vec.len() < 8 {
2996                    return Err(format!(
2997                        "u64::from_be_bytes requires 8 bytes, got {}",
2998                        byte_vec.len()
2999                    )
3000                    .into());
3001                }
3002                let arr: [u8; 8] = byte_vec[..8]
3003                    .try_into()
3004                    .map_err(|_| "Failed to convert to [u8; 8]")?;
3005                Ok(json!(u64::from_be_bytes(arr)))
3006            }
3007
3008            ComputedExpr::ByteArray { bytes } => {
3009                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3010            }
3011
3012            ComputedExpr::Closure { param, body } => {
3013                // Closures are stored as-is; they're evaluated when used in map()
3014                // Return a special representation
3015                Ok(json!({
3016                    "__closure": {
3017                        "param": param,
3018                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
3019                    }
3020                }))
3021            }
3022
3023            ComputedExpr::Unary { op, expr } => {
3024                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3025                self.apply_unary_op(op, &val)
3026            }
3027
3028            ComputedExpr::JsonToBytes { expr } => {
3029                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3030                // Convert JSON array of numbers to byte array
3031                let bytes = self.value_to_bytes(&val)?;
3032                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3033            }
3034
3035            ComputedExpr::UnwrapOr { expr, default } => {
3036                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3037                if val.is_null() {
3038                    Ok(default.clone())
3039                } else {
3040                    Ok(val)
3041                }
3042            }
3043
3044            ComputedExpr::Binary { op, left, right } => {
3045                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3046                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3047                self.apply_binary_op(op, &l, &r)
3048            }
3049
3050            ComputedExpr::Cast { expr, to_type } => {
3051                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3052                self.apply_cast(&val, to_type)
3053            }
3054
3055            ComputedExpr::MethodCall { expr, method, args } => {
3056                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3057                // Special handling for map() with closures
3058                if method == "map" && args.len() == 1 {
3059                    if let ComputedExpr::Closure { param, body } = &args[0] {
3060                        // If the value is null, return null (Option::None.map returns None)
3061                        if val.is_null() {
3062                            return Ok(Value::Null);
3063                        }
3064                        // Evaluate the closure body with the value bound to param
3065                        let mut closure_env = env.clone();
3066                        closure_env.insert(param.clone(), val);
3067                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3068                    }
3069                }
3070                let evaluated_args: Vec<Value> = args
3071                    .iter()
3072                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3073                    .collect::<Result<Vec<_>>>()?;
3074                self.apply_method_call(&val, method, &evaluated_args)
3075            }
3076
3077            ComputedExpr::Literal { value } => Ok(value.clone()),
3078
3079            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3080        }
3081    }
3082
3083    /// Convert a JSON value to a byte vector
3084    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3085        match val {
3086            Value::Array(arr) => arr
3087                .iter()
3088                .map(|v| {
3089                    v.as_u64()
3090                        .map(|n| n as u8)
3091                        .ok_or_else(|| "Array element not a valid byte".into())
3092                })
3093                .collect(),
3094            Value::String(s) => {
3095                // Try to decode as hex
3096                if s.starts_with("0x") || s.starts_with("0X") {
3097                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3098                } else {
3099                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3100                }
3101            }
3102            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3103        }
3104    }
3105
3106    /// Apply a unary operation
3107    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3108        use crate::ast::UnaryOp;
3109        match op {
3110            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3111            UnaryOp::ReverseBits => match val.as_u64() {
3112                Some(n) => Ok(json!(n.reverse_bits())),
3113                None => match val.as_i64() {
3114                    Some(n) => Ok(json!((n as u64).reverse_bits())),
3115                    None => Err("reverse_bits requires an integer".into()),
3116                },
3117            },
3118        }
3119    }
3120
3121    /// Get a field value from state by path (e.g., "section.field" or just "field")
3122    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3123        let segments: Vec<&str> = path.split('.').collect();
3124        let mut current = state;
3125
3126        for segment in segments {
3127            match current.get(segment) {
3128                Some(v) => current = v,
3129                None => return Ok(Value::Null),
3130            }
3131        }
3132
3133        Ok(current.clone())
3134    }
3135
3136    /// Apply a binary operation to two values
3137    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3138        match op {
3139            // Arithmetic operations
3140            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3141            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3142            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3143            BinaryOp::Div => {
3144                // Check for division by zero
3145                if let Some(r) = right.as_i64() {
3146                    if r == 0 {
3147                        return Err("Division by zero".into());
3148                    }
3149                }
3150                if let Some(r) = right.as_f64() {
3151                    if r == 0.0 {
3152                        return Err("Division by zero".into());
3153                    }
3154                }
3155                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3156            }
3157            BinaryOp::Mod => {
3158                // Modulo - only for integers
3159                match (left.as_i64(), right.as_i64()) {
3160                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3161                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3162                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3163                        _ => Err("Modulo requires non-zero integer operands".into()),
3164                    },
3165                    _ => Err("Modulo by zero".into()),
3166                }
3167            }
3168
3169            // Comparison operations
3170            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3171            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3172            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3173            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3174            BinaryOp::Eq => Ok(json!(left == right)),
3175            BinaryOp::Ne => Ok(json!(left != right)),
3176
3177            // Logical operations
3178            BinaryOp::And => {
3179                let l_bool = self.value_to_bool(left);
3180                let r_bool = self.value_to_bool(right);
3181                Ok(json!(l_bool && r_bool))
3182            }
3183            BinaryOp::Or => {
3184                let l_bool = self.value_to_bool(left);
3185                let r_bool = self.value_to_bool(right);
3186                Ok(json!(l_bool || r_bool))
3187            }
3188
3189            // Bitwise operations
3190            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3191                (Some(a), Some(b)) => Ok(json!(a ^ b)),
3192                _ => match (left.as_i64(), right.as_i64()) {
3193                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
3194                    _ => Err("XOR requires integer operands".into()),
3195                },
3196            },
3197            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3198                (Some(a), Some(b)) => Ok(json!(a & b)),
3199                _ => match (left.as_i64(), right.as_i64()) {
3200                    (Some(a), Some(b)) => Ok(json!(a & b)),
3201                    _ => Err("BitAnd requires integer operands".into()),
3202                },
3203            },
3204            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3205                (Some(a), Some(b)) => Ok(json!(a | b)),
3206                _ => match (left.as_i64(), right.as_i64()) {
3207                    (Some(a), Some(b)) => Ok(json!(a | b)),
3208                    _ => Err("BitOr requires integer operands".into()),
3209                },
3210            },
3211            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3212                (Some(a), Some(b)) => Ok(json!(a << b)),
3213                _ => match (left.as_i64(), right.as_i64()) {
3214                    (Some(a), Some(b)) => Ok(json!(a << b)),
3215                    _ => Err("Shl requires integer operands".into()),
3216                },
3217            },
3218            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3219                (Some(a), Some(b)) => Ok(json!(a >> b)),
3220                _ => match (left.as_i64(), right.as_i64()) {
3221                    (Some(a), Some(b)) => Ok(json!(a >> b)),
3222                    _ => Err("Shr requires integer operands".into()),
3223                },
3224            },
3225        }
3226    }
3227
3228    /// Helper for numeric operations that can work on integers or floats
3229    fn numeric_op<F1, F2>(
3230        &self,
3231        left: &Value,
3232        right: &Value,
3233        int_op: F1,
3234        float_op: F2,
3235    ) -> Result<Value>
3236    where
3237        F1: Fn(i64, i64) -> i64,
3238        F2: Fn(f64, f64) -> f64,
3239    {
3240        // Try i64 first
3241        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3242            return Ok(json!(int_op(a, b)));
3243        }
3244
3245        // Try u64
3246        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3247            // For u64, we need to be careful with underflow in subtraction
3248            return Ok(json!(int_op(a as i64, b as i64)));
3249        }
3250
3251        // Try f64
3252        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3253            return Ok(json!(float_op(a, b)));
3254        }
3255
3256        // If either is null, return null
3257        if left.is_null() || right.is_null() {
3258            return Ok(Value::Null);
3259        }
3260
3261        Err(format!(
3262            "Cannot perform numeric operation on {:?} and {:?}",
3263            left, right
3264        )
3265        .into())
3266    }
3267
3268    /// Helper for comparison operations
3269    fn comparison_op<F1, F2>(
3270        &self,
3271        left: &Value,
3272        right: &Value,
3273        int_cmp: F1,
3274        float_cmp: F2,
3275    ) -> Result<Value>
3276    where
3277        F1: Fn(i64, i64) -> bool,
3278        F2: Fn(f64, f64) -> bool,
3279    {
3280        // Try i64 first
3281        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3282            return Ok(json!(int_cmp(a, b)));
3283        }
3284
3285        // Try u64
3286        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3287            return Ok(json!(int_cmp(a as i64, b as i64)));
3288        }
3289
3290        // Try f64
3291        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3292            return Ok(json!(float_cmp(a, b)));
3293        }
3294
3295        // If either is null, comparison returns false
3296        if left.is_null() || right.is_null() {
3297            return Ok(json!(false));
3298        }
3299
3300        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3301    }
3302
3303    /// Convert a value to boolean for logical operations
3304    fn value_to_bool(&self, value: &Value) -> bool {
3305        match value {
3306            Value::Null => false,
3307            Value::Bool(b) => *b,
3308            Value::Number(n) => {
3309                if let Some(i) = n.as_i64() {
3310                    i != 0
3311                } else if let Some(f) = n.as_f64() {
3312                    f != 0.0
3313                } else {
3314                    true
3315                }
3316            }
3317            Value::String(s) => !s.is_empty(),
3318            Value::Array(arr) => !arr.is_empty(),
3319            Value::Object(obj) => !obj.is_empty(),
3320        }
3321    }
3322
3323    /// Apply a type cast to a value
3324    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3325        match to_type {
3326            "i8" | "i16" | "i32" | "i64" | "isize" => {
3327                if let Some(n) = value.as_i64() {
3328                    Ok(json!(n))
3329                } else if let Some(n) = value.as_u64() {
3330                    Ok(json!(n as i64))
3331                } else if let Some(n) = value.as_f64() {
3332                    Ok(json!(n as i64))
3333                } else if let Some(s) = value.as_str() {
3334                    s.parse::<i64>()
3335                        .map(|n| json!(n))
3336                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3337                } else {
3338                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3339                }
3340            }
3341            "u8" | "u16" | "u32" | "u64" | "usize" => {
3342                if let Some(n) = value.as_u64() {
3343                    Ok(json!(n))
3344                } else if let Some(n) = value.as_i64() {
3345                    Ok(json!(n as u64))
3346                } else if let Some(n) = value.as_f64() {
3347                    Ok(json!(n as u64))
3348                } else if let Some(s) = value.as_str() {
3349                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3350                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3351                    })
3352                } else {
3353                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3354                }
3355            }
3356            "f32" | "f64" => {
3357                if let Some(n) = value.as_f64() {
3358                    Ok(json!(n))
3359                } else if let Some(n) = value.as_i64() {
3360                    Ok(json!(n as f64))
3361                } else if let Some(n) = value.as_u64() {
3362                    Ok(json!(n as f64))
3363                } else if let Some(s) = value.as_str() {
3364                    s.parse::<f64>()
3365                        .map(|n| json!(n))
3366                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3367                } else {
3368                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3369                }
3370            }
3371            "String" | "string" => Ok(json!(value.to_string())),
3372            "bool" => Ok(json!(self.value_to_bool(value))),
3373            _ => {
3374                // Unknown type, return value as-is
3375                Ok(value.clone())
3376            }
3377        }
3378    }
3379
3380    /// Apply a method call to a value
3381    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3382        match method {
3383            "unwrap_or" => {
3384                if value.is_null() && !args.is_empty() {
3385                    Ok(args[0].clone())
3386                } else {
3387                    Ok(value.clone())
3388                }
3389            }
3390            "unwrap_or_default" => {
3391                if value.is_null() {
3392                    // Return default for common types
3393                    Ok(json!(0))
3394                } else {
3395                    Ok(value.clone())
3396                }
3397            }
3398            "is_some" => Ok(json!(!value.is_null())),
3399            "is_none" => Ok(json!(value.is_null())),
3400            "abs" => {
3401                if let Some(n) = value.as_i64() {
3402                    Ok(json!(n.abs()))
3403                } else if let Some(n) = value.as_f64() {
3404                    Ok(json!(n.abs()))
3405                } else {
3406                    Err(format!("Cannot call abs() on {:?}", value).into())
3407                }
3408            }
3409            "len" => {
3410                if let Some(s) = value.as_str() {
3411                    Ok(json!(s.len()))
3412                } else if let Some(arr) = value.as_array() {
3413                    Ok(json!(arr.len()))
3414                } else if let Some(obj) = value.as_object() {
3415                    Ok(json!(obj.len()))
3416                } else {
3417                    Err(format!("Cannot call len() on {:?}", value).into())
3418                }
3419            }
3420            "to_string" => Ok(json!(value.to_string())),
3421            "min" => {
3422                if args.is_empty() {
3423                    return Err("min() requires an argument".into());
3424                }
3425                let other = &args[0];
3426                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3427                    Ok(json!(a.min(b)))
3428                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3429                    Ok(json!(a.min(b)))
3430                } else {
3431                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3432                }
3433            }
3434            "max" => {
3435                if args.is_empty() {
3436                    return Err("max() requires an argument".into());
3437                }
3438                let other = &args[0];
3439                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3440                    Ok(json!(a.max(b)))
3441                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3442                    Ok(json!(a.max(b)))
3443                } else {
3444                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3445                }
3446            }
3447            "saturating_add" => {
3448                if args.is_empty() {
3449                    return Err("saturating_add() requires an argument".into());
3450                }
3451                let other = &args[0];
3452                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3453                    Ok(json!(a.saturating_add(b)))
3454                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3455                    Ok(json!(a.saturating_add(b)))
3456                } else {
3457                    Err(format!(
3458                        "Cannot call saturating_add() on {:?} and {:?}",
3459                        value, other
3460                    )
3461                    .into())
3462                }
3463            }
3464            "saturating_sub" => {
3465                if args.is_empty() {
3466                    return Err("saturating_sub() requires an argument".into());
3467                }
3468                let other = &args[0];
3469                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3470                    Ok(json!(a.saturating_sub(b)))
3471                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3472                    Ok(json!(a.saturating_sub(b)))
3473                } else {
3474                    Err(format!(
3475                        "Cannot call saturating_sub() on {:?} and {:?}",
3476                        value, other
3477                    )
3478                    .into())
3479                }
3480            }
3481            _ => Err(format!("Unknown method call: {}()", method).into()),
3482        }
3483    }
3484
3485    /// Evaluate all computed fields for an entity and update the state
3486    /// This takes a list of ComputedFieldSpec from the AST and applies them
3487    pub fn evaluate_computed_fields_from_ast(
3488        &self,
3489        state: &mut Value,
3490        computed_field_specs: &[ComputedFieldSpec],
3491    ) -> Result<Vec<String>> {
3492        let mut updated_paths = Vec::new();
3493
3494        for spec in computed_field_specs {
3495            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3496                self.set_field_in_state(state, &spec.target_path, result)?;
3497                updated_paths.push(spec.target_path.clone());
3498            }
3499        }
3500
3501        Ok(updated_paths)
3502    }
3503
3504    /// Set a field value in state by path (e.g., "section.field")
3505    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3506        let segments: Vec<&str> = path.split('.').collect();
3507
3508        if segments.is_empty() {
3509            return Err("Empty path".into());
3510        }
3511
3512        // Navigate to parent, creating intermediate objects as needed
3513        let mut current = state;
3514        for (i, segment) in segments.iter().enumerate() {
3515            if i == segments.len() - 1 {
3516                // Last segment - set the value
3517                if let Some(obj) = current.as_object_mut() {
3518                    obj.insert(segment.to_string(), value);
3519                    return Ok(());
3520                } else {
3521                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
3522                }
3523            } else {
3524                // Intermediate segment - navigate or create
3525                if !current.is_object() {
3526                    *current = json!({});
3527                }
3528                let obj = current.as_object_mut().unwrap();
3529                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3530            }
3531        }
3532
3533        Ok(())
3534    }
3535
3536    /// Create a computed fields evaluator closure from AST specs
3537    /// This returns a function that can be passed to the bytecode builder
3538    pub fn create_evaluator_from_specs(
3539        specs: Vec<ComputedFieldSpec>,
3540    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3541        move |state: &mut Value| {
3542            // Create a temporary VmContext just for evaluation
3543            // (We only need the expression evaluation methods)
3544            let vm = VmContext::new();
3545            vm.evaluate_computed_fields_from_ast(state, &specs)?;
3546            Ok(())
3547        }
3548    }
3549}
3550
3551impl Default for VmContext {
3552    fn default() -> Self {
3553        Self::new()
3554    }
3555}
3556
3557// Implement the ReverseLookupUpdater trait for VmContext
3558impl crate::resolvers::ReverseLookupUpdater for VmContext {
3559    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3560        // Use default state_id=0 and default lookup name
3561        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3562            .unwrap_or_else(|e| {
3563                tracing::error!("Failed to update PDA reverse lookup: {}", e);
3564                Vec::new()
3565            })
3566    }
3567
3568    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3569        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
3570        self.flush_pending_updates(0, pda_address)
3571            .unwrap_or_else(|e| {
3572                tracing::error!("Failed to flush pending updates: {}", e);
3573                Vec::new()
3574            })
3575    }
3576}
3577
3578#[cfg(test)]
3579mod tests {
3580    use super::*;
3581    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3582
3583    #[test]
3584    fn test_computed_field_preserves_integer_type() {
3585        let vm = VmContext::new();
3586
3587        let mut state = serde_json::json!({
3588            "trading": {
3589                "total_buy_volume": 20000000000_i64,
3590                "total_sell_volume": 17951316474_i64
3591            }
3592        });
3593
3594        let spec = ComputedFieldSpec {
3595            target_path: "trading.total_volume".to_string(),
3596            result_type: "Option<u64>".to_string(),
3597            expression: ComputedExpr::Binary {
3598                op: BinaryOp::Add,
3599                left: Box::new(ComputedExpr::UnwrapOr {
3600                    expr: Box::new(ComputedExpr::FieldRef {
3601                        path: "trading.total_buy_volume".to_string(),
3602                    }),
3603                    default: serde_json::json!(0),
3604                }),
3605                right: Box::new(ComputedExpr::UnwrapOr {
3606                    expr: Box::new(ComputedExpr::FieldRef {
3607                        path: "trading.total_sell_volume".to_string(),
3608                    }),
3609                    default: serde_json::json!(0),
3610                }),
3611            },
3612        };
3613
3614        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3615            .unwrap();
3616
3617        let total_volume = state
3618            .get("trading")
3619            .and_then(|t| t.get("total_volume"))
3620            .expect("total_volume should exist");
3621
3622        let serialized = serde_json::to_string(total_volume).unwrap();
3623        assert!(
3624            !serialized.contains('.'),
3625            "Integer should not have decimal point: {}",
3626            serialized
3627        );
3628        assert_eq!(
3629            total_volume.as_i64(),
3630            Some(37951316474),
3631            "Value should be correct sum"
3632        );
3633    }
3634
3635    #[test]
3636    fn test_set_field_sum_preserves_integer_type() {
3637        let mut vm = VmContext::new();
3638        vm.registers[0] = serde_json::json!({});
3639        vm.registers[1] = serde_json::json!(20000000000_i64);
3640        vm.registers[2] = serde_json::json!(17951316474_i64);
3641
3642        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3643        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3644
3645        let state = &vm.registers[0];
3646        let buy_vol = state
3647            .get("trading")
3648            .and_then(|t| t.get("total_buy_volume"))
3649            .unwrap();
3650        let sell_vol = state
3651            .get("trading")
3652            .and_then(|t| t.get("total_sell_volume"))
3653            .unwrap();
3654
3655        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3656        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3657
3658        assert!(
3659            !buy_serialized.contains('.'),
3660            "Buy volume should not have decimal: {}",
3661            buy_serialized
3662        );
3663        assert!(
3664            !sell_serialized.contains('.'),
3665            "Sell volume should not have decimal: {}",
3666            sell_serialized
3667        );
3668    }
3669}