Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14/// Context metadata for blockchain updates (accounts and instructions)
15/// This structure is designed to be extended over time with additional metadata
16#[derive(Debug, Clone, Default)]
17pub struct UpdateContext {
18    /// Blockchain slot number
19    pub slot: Option<u64>,
20    /// Transaction signature
21    pub signature: Option<String>,
22    /// Unix timestamp (seconds since epoch)
23    /// If not provided, will default to current system time when accessed
24    pub timestamp: Option<i64>,
25    /// Write version for account updates (monotonically increasing per account within a slot)
26    /// Used for staleness detection to reject out-of-order updates
27    pub write_version: Option<u64>,
28    /// Transaction index for instruction updates (orders transactions within a slot)
29    /// Used for staleness detection to reject out-of-order updates
30    pub txn_index: Option<u64>,
31    /// Additional custom metadata that can be added without breaking changes
32    pub metadata: HashMap<String, Value>,
33}
34
35impl UpdateContext {
36    /// Create a new UpdateContext with slot and signature
37    pub fn new(slot: u64, signature: String) -> Self {
38        Self {
39            slot: Some(slot),
40            signature: Some(signature),
41            timestamp: None,
42            write_version: None,
43            txn_index: None,
44            metadata: HashMap::new(),
45        }
46    }
47
48    /// Create a new UpdateContext with slot, signature, and timestamp
49    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
50        Self {
51            slot: Some(slot),
52            signature: Some(signature),
53            timestamp: Some(timestamp),
54            write_version: None,
55            txn_index: None,
56            metadata: HashMap::new(),
57        }
58    }
59
60    /// Create context for account updates with write_version for staleness detection
61    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
62        Self {
63            slot: Some(slot),
64            signature: Some(signature),
65            timestamp: None,
66            write_version: Some(write_version),
67            txn_index: None,
68            metadata: HashMap::new(),
69        }
70    }
71
72    /// Create context for instruction updates with txn_index for staleness detection
73    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
74        Self {
75            slot: Some(slot),
76            signature: Some(signature),
77            timestamp: None,
78            write_version: None,
79            txn_index: Some(txn_index),
80            metadata: HashMap::new(),
81        }
82    }
83
84    /// Get the timestamp, falling back to current system time if not set
85    pub fn timestamp(&self) -> i64 {
86        self.timestamp.unwrap_or_else(|| {
87            std::time::SystemTime::now()
88                .duration_since(std::time::UNIX_EPOCH)
89                .unwrap()
90                .as_secs() as i64
91        })
92    }
93
94    /// Create an empty context (for testing or when context is not available)
95    pub fn empty() -> Self {
96        Self::default()
97    }
98
99    /// Add custom metadata
100    /// Returns true if this is an account update context (has write_version, no txn_index)
101    pub fn is_account_update(&self) -> bool {
102        self.write_version.is_some() && self.txn_index.is_none()
103    }
104
105    /// Returns true if this is an instruction update context (has txn_index, no write_version)
106    pub fn is_instruction_update(&self) -> bool {
107        self.txn_index.is_some() && self.write_version.is_none()
108    }
109
110    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
111        self.metadata.insert(key, value);
112        self
113    }
114
115    /// Get metadata value
116    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
117        self.metadata.get(key)
118    }
119
120    /// Convert context to JSON value for injection into event data
121    pub fn to_value(&self) -> Value {
122        let mut obj = serde_json::Map::new();
123        if let Some(slot) = self.slot {
124            obj.insert("slot".to_string(), json!(slot));
125        }
126        if let Some(ref sig) = self.signature {
127            obj.insert("signature".to_string(), json!(sig));
128        }
129        // Always include timestamp (use current time if not set)
130        obj.insert("timestamp".to_string(), json!(self.timestamp()));
131        for (key, value) in &self.metadata {
132            obj.insert(key.clone(), value.clone());
133        }
134        Value::Object(obj)
135    }
136}
137
138pub type Register = usize;
139pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
140
141pub type RegisterValue = Value;
142
143/// Trait for evaluating computed fields
144/// Implement this in your generated spec to enable computed field evaluation
145pub trait ComputedFieldsEvaluator {
146    fn evaluate(&self, state: &mut Value) -> Result<()>;
147}
148
149// Pending queue configuration
150const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
151const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
152const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
153
154// Temporal index configuration - prevents unbounded history growth
155const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
156const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
157
158// State table configuration - aligned with downstream EntityCache (500 per view)
159const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
160const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
161
162const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
163
164const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
165
166// Smaller cache for instruction deduplication - provides shorter effective TTL
167// since we don't expect instruction duplicates to arrive much later
168const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
169
170const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
171
172const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
173
174/// Estimate the size of a JSON value in bytes
175fn estimate_json_size(value: &Value) -> usize {
176    match value {
177        Value::Null => 4,
178        Value::Bool(_) => 5,
179        Value::Number(_) => 8,
180        Value::String(s) => s.len() + 2,
181        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
182        Value::Object(obj) => {
183            2 + obj
184                .iter()
185                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
186                .sum::<usize>()
187        }
188    }
189}
190
191#[derive(Debug, Clone)]
192pub struct CompiledPath {
193    pub segments: std::sync::Arc<[String]>,
194}
195
196impl CompiledPath {
197    pub fn new(path: &str) -> Self {
198        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
199        CompiledPath {
200            segments: segments.into(),
201        }
202    }
203
204    fn segments(&self) -> &[String] {
205        &self.segments
206    }
207}
208
209/// Represents the type of change made to a field for granular dirty tracking.
210/// This enables emitting only the actual changes rather than entire field values.
211#[derive(Debug, Clone)]
212pub enum FieldChange {
213    /// Field was replaced with a new value (emit the full value from state)
214    Replaced,
215    /// Items were appended to an array field (emit only the new items)
216    Appended(Vec<Value>),
217}
218
219/// Tracks field modifications during handler execution with granular change information.
220/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
221#[derive(Debug, Clone, Default)]
222pub struct DirtyTracker {
223    changes: HashMap<String, FieldChange>,
224}
225
226impl DirtyTracker {
227    /// Create a new empty DirtyTracker
228    pub fn new() -> Self {
229        Self {
230            changes: HashMap::new(),
231        }
232    }
233
234    /// Mark a field as replaced (full value will be emitted)
235    pub fn mark_replaced(&mut self, path: &str) {
236        // If there was an append, it's now superseded by a full replacement
237        self.changes.insert(path.to_string(), FieldChange::Replaced);
238    }
239
240    /// Record an appended value for a field
241    pub fn mark_appended(&mut self, path: &str, value: Value) {
242        match self.changes.get_mut(path) {
243            Some(FieldChange::Appended(values)) => {
244                // Add to existing appended values
245                values.push(value);
246            }
247            Some(FieldChange::Replaced) => {
248                // Field was already replaced, keep it as replaced
249                // (the full value including the append will be emitted)
250            }
251            None => {
252                // First append to this field
253                self.changes
254                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
255            }
256        }
257    }
258
259    /// Check if there are any changes tracked
260    pub fn is_empty(&self) -> bool {
261        self.changes.is_empty()
262    }
263
264    /// Get the number of changed fields
265    pub fn len(&self) -> usize {
266        self.changes.len()
267    }
268
269    /// Iterate over all changes
270    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
271        self.changes.iter()
272    }
273
274    /// Get a set of all dirty field paths (for backward compatibility)
275    pub fn dirty_paths(&self) -> HashSet<String> {
276        self.changes.keys().cloned().collect()
277    }
278
279    /// Consume the tracker and return the changes map
280    pub fn into_changes(self) -> HashMap<String, FieldChange> {
281        self.changes
282    }
283
284    /// Get a reference to the changes map
285    pub fn changes(&self) -> &HashMap<String, FieldChange> {
286        &self.changes
287    }
288
289    /// Get paths that were appended (not replaced)
290    pub fn appended_paths(&self) -> Vec<String> {
291        self.changes
292            .iter()
293            .filter_map(|(path, change)| match change {
294                FieldChange::Appended(_) => Some(path.clone()),
295                FieldChange::Replaced => None,
296            })
297            .collect()
298    }
299}
300
301pub struct VmContext {
302    registers: Vec<RegisterValue>,
303    states: HashMap<u32, StateTable>,
304    pub instructions_executed: u64,
305    pub cache_hits: u64,
306    path_cache: HashMap<String, CompiledPath>,
307    pub pda_cache_hits: u64,
308    pub pda_cache_misses: u64,
309    pub pending_queue_size: u64,
310    current_context: Option<UpdateContext>,
311    warnings: Vec<String>,
312    last_pda_lookup_miss: Option<String>,
313    last_lookup_index_miss: Option<String>,
314    last_pda_registered: Option<String>,
315    last_lookup_index_keys: Vec<String>,
316}
317
318#[derive(Debug)]
319pub struct LookupIndex {
320    index: std::sync::Mutex<LruCache<String, Value>>,
321}
322
323impl LookupIndex {
324    pub fn new() -> Self {
325        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
326    }
327
328    pub fn with_capacity(capacity: usize) -> Self {
329        LookupIndex {
330            index: std::sync::Mutex::new(LruCache::new(
331                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
332            )),
333        }
334    }
335
336    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
337        let key = value_to_cache_key(lookup_value);
338        self.index.lock().unwrap().get(&key).cloned()
339    }
340
341    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
342        let key = value_to_cache_key(&lookup_value);
343        self.index.lock().unwrap().put(key, primary_key);
344    }
345
346    pub fn len(&self) -> usize {
347        self.index.lock().unwrap().len()
348    }
349
350    pub fn is_empty(&self) -> bool {
351        self.index.lock().unwrap().is_empty()
352    }
353}
354
355impl Default for LookupIndex {
356    fn default() -> Self {
357        Self::new()
358    }
359}
360
361fn value_to_cache_key(value: &Value) -> String {
362    match value {
363        Value::String(s) => s.clone(),
364        Value::Number(n) => n.to_string(),
365        Value::Bool(b) => b.to_string(),
366        Value::Null => "null".to_string(),
367        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
368    }
369}
370
371#[derive(Debug)]
372pub struct TemporalIndex {
373    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
374}
375
376impl Default for TemporalIndex {
377    fn default() -> Self {
378        Self::new()
379    }
380}
381
382impl TemporalIndex {
383    pub fn new() -> Self {
384        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
385    }
386
387    pub fn with_capacity(capacity: usize) -> Self {
388        TemporalIndex {
389            index: std::sync::Mutex::new(LruCache::new(
390                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
391            )),
392        }
393    }
394
395    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
396        let key = value_to_cache_key(lookup_value);
397        let mut cache = self.index.lock().unwrap();
398        if let Some(entries) = cache.get(&key) {
399            for i in (0..entries.len()).rev() {
400                if entries[i].1 <= timestamp {
401                    return Some(entries[i].0.clone());
402                }
403            }
404        }
405        None
406    }
407
408    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
409        let key = value_to_cache_key(lookup_value);
410        let mut cache = self.index.lock().unwrap();
411        if let Some(entries) = cache.get(&key) {
412            if let Some(last) = entries.last() {
413                return Some(last.0.clone());
414            }
415        }
416        None
417    }
418
419    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
420        let key = value_to_cache_key(&lookup_value);
421        let mut cache = self.index.lock().unwrap();
422
423        let entries = cache.get_or_insert_mut(key, Vec::new);
424        entries.push((primary_key, timestamp));
425        entries.sort_by_key(|(_, ts)| *ts);
426
427        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
428        entries.retain(|(_, ts)| *ts >= cutoff);
429
430        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
431            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
432            entries.drain(0..excess);
433        }
434    }
435
436    pub fn len(&self) -> usize {
437        self.index.lock().unwrap().len()
438    }
439
440    pub fn is_empty(&self) -> bool {
441        self.index.lock().unwrap().is_empty()
442    }
443
444    pub fn total_entries(&self) -> usize {
445        self.index
446            .lock()
447            .unwrap()
448            .iter()
449            .map(|(_, entries)| entries.len())
450            .sum()
451    }
452
453    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
454        let mut cache = self.index.lock().unwrap();
455        let mut total_removed = 0;
456
457        for (_, entries) in cache.iter_mut() {
458            let original_len = entries.len();
459            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
460            total_removed += original_len - entries.len();
461        }
462
463        total_removed
464    }
465}
466
467#[derive(Debug)]
468pub struct PdaReverseLookup {
469    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
470    index: LruCache<String, String>,
471}
472
473impl PdaReverseLookup {
474    pub fn new(capacity: usize) -> Self {
475        PdaReverseLookup {
476            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
477        }
478    }
479
480    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
481        self.index.get(pda_address).cloned()
482    }
483
484    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
485        let evicted = if self.index.len() >= self.index.cap().get() {
486            self.index.peek_lru().map(|(k, _)| k.clone())
487        } else {
488            None
489        };
490
491        self.index.put(pda_address, seed_value);
492        evicted
493    }
494
495    pub fn len(&self) -> usize {
496        self.index.len()
497    }
498
499    pub fn is_empty(&self) -> bool {
500        self.index.is_empty()
501    }
502
503    pub fn contains(&self, pda_address: &str) -> bool {
504        self.index.peek(pda_address).is_some()
505    }
506}
507
508/// Input for queueing an account update.
509#[derive(Debug, Clone)]
510pub struct QueuedAccountUpdate {
511    pub pda_address: String,
512    pub account_type: String,
513    pub account_data: Value,
514    pub slot: u64,
515    pub write_version: u64,
516    pub signature: String,
517}
518
519/// Internal representation of a pending account update with queue metadata.
520#[derive(Debug, Clone)]
521pub struct PendingAccountUpdate {
522    pub account_type: String,
523    pub pda_address: String,
524    pub account_data: Value,
525    pub slot: u64,
526    pub write_version: u64,
527    pub signature: String,
528    pub queued_at: i64,
529}
530
531/// Input for queueing an instruction event when PDA lookup fails.
532#[derive(Debug, Clone)]
533pub struct QueuedInstructionEvent {
534    pub pda_address: String,
535    pub event_type: String,
536    pub event_data: Value,
537    pub slot: u64,
538    pub signature: String,
539}
540
541/// Internal representation of a pending instruction event with queue metadata.
542#[derive(Debug, Clone)]
543pub struct PendingInstructionEvent {
544    pub event_type: String,
545    pub pda_address: String,
546    pub event_data: Value,
547    pub slot: u64,
548    pub signature: String,
549    pub queued_at: i64,
550}
551
552#[derive(Debug, Clone)]
553pub struct DeferredWhenOperation {
554    pub entity_name: String,
555    pub primary_key: Value,
556    pub field_path: String,
557    pub field_value: Value,
558    pub when_instruction: String,
559    pub signature: String,
560    pub slot: u64,
561    pub deferred_at: i64,
562    pub emit: bool,
563}
564
565#[derive(Debug, Clone)]
566pub struct PendingQueueStats {
567    pub total_updates: usize,
568    pub unique_pdas: usize,
569    pub oldest_age_seconds: i64,
570    pub largest_pda_queue_size: usize,
571    pub estimated_memory_bytes: usize,
572}
573
574#[derive(Debug, Clone, Default)]
575pub struct VmMemoryStats {
576    pub state_table_entity_count: usize,
577    pub state_table_max_entries: usize,
578    pub state_table_at_capacity: bool,
579    pub lookup_index_count: usize,
580    pub lookup_index_total_entries: usize,
581    pub temporal_index_count: usize,
582    pub temporal_index_total_entries: usize,
583    pub pda_reverse_lookup_count: usize,
584    pub pda_reverse_lookup_total_entries: usize,
585    pub version_tracker_entries: usize,
586    pub pending_queue_stats: Option<PendingQueueStats>,
587    pub path_cache_size: usize,
588}
589
590#[derive(Debug, Clone, Default)]
591pub struct CleanupResult {
592    pub pending_updates_removed: usize,
593    pub temporal_entries_removed: usize,
594}
595
596#[derive(Debug, Clone)]
597pub struct CapacityWarning {
598    pub current_entries: usize,
599    pub max_entries: usize,
600    pub entries_over_limit: usize,
601}
602
603#[derive(Debug, Clone)]
604pub struct StateTableConfig {
605    pub max_entries: usize,
606    pub max_array_length: usize,
607}
608
609impl Default for StateTableConfig {
610    fn default() -> Self {
611        Self {
612            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
613            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
614        }
615    }
616}
617
618#[derive(Debug)]
619pub struct VersionTracker {
620    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
621}
622
623impl VersionTracker {
624    pub fn new() -> Self {
625        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
626    }
627
628    pub fn with_capacity(capacity: usize) -> Self {
629        VersionTracker {
630            cache: std::sync::Mutex::new(LruCache::new(
631                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
632            )),
633        }
634    }
635
636    fn make_key(primary_key: &Value, event_type: &str) -> String {
637        format!("{}:{}", primary_key, event_type)
638    }
639
640    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
641        let key = Self::make_key(primary_key, event_type);
642        self.cache.lock().unwrap().get(&key).copied()
643    }
644
645    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
646        let key = Self::make_key(primary_key, event_type);
647        self.cache.lock().unwrap().put(key, (slot, ordering_value));
648    }
649
650    pub fn len(&self) -> usize {
651        self.cache.lock().unwrap().len()
652    }
653
654    pub fn is_empty(&self) -> bool {
655        self.cache.lock().unwrap().is_empty()
656    }
657}
658
659impl Default for VersionTracker {
660    fn default() -> Self {
661        Self::new()
662    }
663}
664
665#[derive(Debug)]
666pub struct StateTable {
667    pub data: DashMap<Value, Value>,
668    access_times: DashMap<Value, i64>,
669    pub lookup_indexes: HashMap<String, LookupIndex>,
670    pub temporal_indexes: HashMap<String, TemporalIndex>,
671    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
672    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
673    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
674    version_tracker: VersionTracker,
675    instruction_dedup_cache: VersionTracker,
676    config: StateTableConfig,
677    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
678    entity_name: String,
679    pub recent_tx_instructions:
680        std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
681    pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
682}
683
684impl StateTable {
685    pub fn is_at_capacity(&self) -> bool {
686        self.data.len() >= self.config.max_entries
687    }
688
689    pub fn entries_over_limit(&self) -> usize {
690        self.data.len().saturating_sub(self.config.max_entries)
691    }
692
693    pub fn max_array_length(&self) -> usize {
694        self.config.max_array_length
695    }
696
697    fn touch(&self, key: &Value) {
698        let now = std::time::SystemTime::now()
699            .duration_since(std::time::UNIX_EPOCH)
700            .unwrap()
701            .as_secs() as i64;
702        self.access_times.insert(key.clone(), now);
703    }
704
705    fn evict_lru(&self, count: usize) -> usize {
706        if count == 0 || self.data.is_empty() {
707            return 0;
708        }
709
710        let mut entries: Vec<(Value, i64)> = self
711            .access_times
712            .iter()
713            .map(|entry| (entry.key().clone(), *entry.value()))
714            .collect();
715
716        entries.sort_by_key(|(_, ts)| *ts);
717
718        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
719
720        let mut evicted = 0;
721        for key in to_evict {
722            self.data.remove(&key);
723            self.access_times.remove(&key);
724            evicted += 1;
725        }
726
727        #[cfg(feature = "otel")]
728        if evicted > 0 {
729            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
730        }
731
732        evicted
733    }
734
735    pub fn insert_with_eviction(&self, key: Value, value: Value) {
736        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
737            #[cfg(feature = "otel")]
738            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
739            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
740            self.evict_lru(to_evict.max(1));
741        }
742        self.data.insert(key.clone(), value);
743        self.touch(&key);
744    }
745
746    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
747        let result = self.data.get(key).map(|v| v.clone());
748        if result.is_some() {
749            self.touch(key);
750        }
751        result
752    }
753
754    /// Check if an update is fresh and update the version tracker.
755    /// Returns true if the update should be processed (is fresh).
756    /// Returns false if the update is stale and should be skipped.
757    ///
758    /// Comparison is lexicographic on (slot, ordering_value):
759    /// (100, 5) > (100, 3) > (99, 999)
760    pub fn is_fresh_update(
761        &self,
762        primary_key: &Value,
763        event_type: &str,
764        slot: u64,
765        ordering_value: u64,
766    ) -> bool {
767        let dominated = self
768            .version_tracker
769            .get(primary_key, event_type)
770            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
771            .unwrap_or(false);
772
773        if dominated {
774            return false;
775        }
776
777        self.version_tracker
778            .insert(primary_key, event_type, slot, ordering_value);
779        true
780    }
781
782    /// Check if an instruction is a duplicate of one we've seen recently.
783    /// Returns true if this exact instruction has been seen before (is a duplicate).
784    /// Returns false if this is a new instruction that should be processed.
785    ///
786    /// Unlike account updates, instructions don't use recency checks - all
787    /// unique instructions are processed. Only exact duplicates are skipped.
788    /// Uses a smaller cache capacity for shorter effective TTL.
789    pub fn is_duplicate_instruction(
790        &self,
791        primary_key: &Value,
792        event_type: &str,
793        slot: u64,
794        txn_index: u64,
795    ) -> bool {
796        // Check if we've seen this exact instruction before
797        let is_duplicate = self
798            .instruction_dedup_cache
799            .get(primary_key, event_type)
800            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
801            .unwrap_or(false);
802
803        if is_duplicate {
804            return true;
805        }
806
807        // Record this instruction for deduplication
808        self.instruction_dedup_cache
809            .insert(primary_key, event_type, slot, txn_index);
810        false
811    }
812}
813
814impl VmContext {
815    pub fn new() -> Self {
816        let mut vm = VmContext {
817            registers: vec![Value::Null; 256],
818            states: HashMap::new(),
819            instructions_executed: 0,
820            cache_hits: 0,
821            path_cache: HashMap::new(),
822            pda_cache_hits: 0,
823            pda_cache_misses: 0,
824            pending_queue_size: 0,
825            current_context: None,
826            warnings: Vec::new(),
827            last_pda_lookup_miss: None,
828            last_lookup_index_miss: None,
829            last_pda_registered: None,
830            last_lookup_index_keys: Vec::new(),
831        };
832        vm.states.insert(
833            0,
834            StateTable {
835                data: DashMap::new(),
836                access_times: DashMap::new(),
837                lookup_indexes: HashMap::new(),
838                temporal_indexes: HashMap::new(),
839                pda_reverse_lookups: HashMap::new(),
840                pending_updates: DashMap::new(),
841                pending_instruction_events: DashMap::new(),
842                version_tracker: VersionTracker::new(),
843                instruction_dedup_cache: VersionTracker::with_capacity(
844                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
845                ),
846                config: StateTableConfig::default(),
847                entity_name: "default".to_string(),
848                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
849                    NonZeroUsize::new(1000).unwrap(),
850                )),
851                deferred_when_ops: DashMap::new(),
852            },
853        );
854        vm
855    }
856
857    pub fn new_with_config(state_config: StateTableConfig) -> Self {
858        let mut vm = VmContext {
859            registers: vec![Value::Null; 256],
860            states: HashMap::new(),
861            instructions_executed: 0,
862            cache_hits: 0,
863            path_cache: HashMap::new(),
864            pda_cache_hits: 0,
865            pda_cache_misses: 0,
866            pending_queue_size: 0,
867            current_context: None,
868            warnings: Vec::new(),
869            last_pda_lookup_miss: None,
870            last_lookup_index_miss: None,
871            last_pda_registered: None,
872            last_lookup_index_keys: Vec::new(),
873        };
874        vm.states.insert(
875            0,
876            StateTable {
877                data: DashMap::new(),
878                access_times: DashMap::new(),
879                lookup_indexes: HashMap::new(),
880                temporal_indexes: HashMap::new(),
881                pda_reverse_lookups: HashMap::new(),
882                pending_updates: DashMap::new(),
883                pending_instruction_events: DashMap::new(),
884                version_tracker: VersionTracker::new(),
885                instruction_dedup_cache: VersionTracker::with_capacity(
886                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
887                ),
888                config: state_config,
889                entity_name: "default".to_string(),
890                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
891                    NonZeroUsize::new(1000).unwrap(),
892                )),
893                deferred_when_ops: DashMap::new(),
894            },
895        );
896        vm
897    }
898
899    /// Get a mutable reference to a state table by ID
900    /// Returns None if the state ID doesn't exist
901    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
902        self.states.get_mut(&state_id)
903    }
904
905    /// Get public access to registers (for metrics context)
906    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
907        &mut self.registers
908    }
909
910    /// Get public access to path cache (for metrics context)
911    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
912        &self.path_cache
913    }
914
915    /// Get the current update context
916    pub fn current_context(&self) -> Option<&UpdateContext> {
917        self.current_context.as_ref()
918    }
919
920    fn add_warning(&mut self, msg: String) {
921        self.warnings.push(msg);
922    }
923
924    pub fn take_warnings(&mut self) -> Vec<String> {
925        std::mem::take(&mut self.warnings)
926    }
927
928    pub fn has_warnings(&self) -> bool {
929        !self.warnings.is_empty()
930    }
931
932    pub fn update_state_from_register(
933        &mut self,
934        state_id: u32,
935        key: Value,
936        register: Register,
937    ) -> Result<()> {
938        let state = self.states.get(&state_id).ok_or("State table not found")?;
939        let value = self.registers[register].clone();
940        state.insert_with_eviction(key, value);
941        Ok(())
942    }
943
944    fn reset_registers(&mut self) {
945        for reg in &mut self.registers {
946            *reg = Value::Null;
947        }
948    }
949
950    /// Extract only the dirty fields from state (public for use by instruction hooks)
951    pub fn extract_partial_state(
952        &self,
953        state_reg: Register,
954        dirty_fields: &HashSet<String>,
955    ) -> Result<Value> {
956        let full_state = &self.registers[state_reg];
957
958        if dirty_fields.is_empty() {
959            return Ok(json!({}));
960        }
961
962        let mut partial = serde_json::Map::new();
963
964        for path in dirty_fields {
965            let segments: Vec<&str> = path.split('.').collect();
966
967            let mut current = full_state;
968            let mut found = true;
969
970            for segment in &segments {
971                match current.get(segment) {
972                    Some(v) => current = v,
973                    None => {
974                        found = false;
975                        break;
976                    }
977                }
978            }
979
980            if !found {
981                continue;
982            }
983
984            let mut target = &mut partial;
985            for (i, segment) in segments.iter().enumerate() {
986                if i == segments.len() - 1 {
987                    target.insert(segment.to_string(), current.clone());
988                } else {
989                    target
990                        .entry(segment.to_string())
991                        .or_insert_with(|| json!({}));
992                    target = target
993                        .get_mut(*segment)
994                        .and_then(|v| v.as_object_mut())
995                        .ok_or("Failed to build nested structure")?;
996                }
997            }
998        }
999
1000        Ok(Value::Object(partial))
1001    }
1002
1003    /// Extract a patch from state based on the DirtyTracker.
1004    /// For Replaced fields: extracts the full value from state.
1005    /// For Appended fields: emits only the appended values as an array.
1006    pub fn extract_partial_state_with_tracker(
1007        &self,
1008        state_reg: Register,
1009        tracker: &DirtyTracker,
1010    ) -> Result<Value> {
1011        let full_state = &self.registers[state_reg];
1012
1013        if tracker.is_empty() {
1014            return Ok(json!({}));
1015        }
1016
1017        let mut partial = serde_json::Map::new();
1018
1019        for (path, change) in tracker.iter() {
1020            let segments: Vec<&str> = path.split('.').collect();
1021
1022            let value_to_insert = match change {
1023                FieldChange::Replaced => {
1024                    let mut current = full_state;
1025                    let mut found = true;
1026
1027                    for segment in &segments {
1028                        match current.get(*segment) {
1029                            Some(v) => current = v,
1030                            None => {
1031                                found = false;
1032                                break;
1033                            }
1034                        }
1035                    }
1036
1037                    if !found {
1038                        continue;
1039                    }
1040                    current.clone()
1041                }
1042                FieldChange::Appended(values) => Value::Array(values.clone()),
1043            };
1044
1045            let mut target = &mut partial;
1046            for (i, segment) in segments.iter().enumerate() {
1047                if i == segments.len() - 1 {
1048                    target.insert(segment.to_string(), value_to_insert.clone());
1049                } else {
1050                    target
1051                        .entry(segment.to_string())
1052                        .or_insert_with(|| json!({}));
1053                    target = target
1054                        .get_mut(*segment)
1055                        .and_then(|v| v.as_object_mut())
1056                        .ok_or("Failed to build nested structure")?;
1057                }
1058            }
1059        }
1060
1061        Ok(Value::Object(partial))
1062    }
1063
1064    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1065        if let Some(compiled) = self.path_cache.get(path) {
1066            self.cache_hits += 1;
1067            #[cfg(feature = "otel")]
1068            crate::vm_metrics::record_path_cache_hit();
1069            return compiled.clone();
1070        }
1071        #[cfg(feature = "otel")]
1072        crate::vm_metrics::record_path_cache_miss();
1073        let compiled = CompiledPath::new(path);
1074        self.path_cache.insert(path.to_string(), compiled.clone());
1075        compiled
1076    }
1077
1078    /// Process an event with optional context metadata
1079    #[cfg_attr(feature = "otel", instrument(
1080        name = "vm.process_event",
1081        skip(self, bytecode, event_value, log),
1082        level = "info",
1083        fields(
1084            event_type = %event_type,
1085            slot = context.as_ref().and_then(|c| c.slot),
1086        )
1087    ))]
1088    pub fn process_event(
1089        &mut self,
1090        bytecode: &MultiEntityBytecode,
1091        event_value: Value,
1092        event_type: &str,
1093        context: Option<&UpdateContext>,
1094        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1095    ) -> Result<Vec<Mutation>> {
1096        self.current_context = context.cloned();
1097
1098        let mut event_value = event_value;
1099        if let Some(ctx) = context {
1100            if let Some(obj) = event_value.as_object_mut() {
1101                obj.insert("__update_context".to_string(), ctx.to_value());
1102            }
1103        }
1104
1105        let mut all_mutations = Vec::new();
1106
1107        if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1108            if let Some(ctx) = context {
1109                if let Some(signature) = ctx.signature.clone() {
1110                    let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1111                    for state_id in state_ids {
1112                        if let Some(state) = self.states.get(&state_id) {
1113                            {
1114                                let mut cache = state.recent_tx_instructions.lock().unwrap();
1115                                let entry =
1116                                    cache.get_or_insert_mut(signature.clone(), HashSet::new);
1117                                entry.insert(event_type.to_string());
1118                            }
1119
1120                            let key = (signature.clone(), event_type.to_string());
1121                            if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1122                                for op in deferred_ops {
1123                                    match self.apply_deferred_when_op(state_id, &op) {
1124                                        Ok(mutations) => all_mutations.extend(mutations),
1125                                        Err(e) => tracing::warn!(
1126                                            "Failed to apply deferred when-op: {}",
1127                                            e
1128                                        ),
1129                                    }
1130                                }
1131                            }
1132                        }
1133                    }
1134                }
1135            }
1136        }
1137
1138        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1139            for entity_name in entity_names {
1140                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1141                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1142                        if let Some(ref mut log) = log {
1143                            log.set("entity", entity_name.clone());
1144                            log.inc("handlers", 1);
1145                        }
1146
1147                        let opcodes_before = self.instructions_executed;
1148                        let cache_before = self.cache_hits;
1149                        let pda_hits_before = self.pda_cache_hits;
1150                        let pda_misses_before = self.pda_cache_misses;
1151
1152                        let mutations = self.execute_handler(
1153                            handler,
1154                            &event_value,
1155                            event_type,
1156                            entity_bytecode.state_id,
1157                            entity_name,
1158                            entity_bytecode.computed_fields_evaluator.as_ref(),
1159                            Some(&entity_bytecode.non_emitted_fields),
1160                        )?;
1161
1162                        if let Some(ref mut log) = log {
1163                            log.inc(
1164                                "opcodes",
1165                                (self.instructions_executed - opcodes_before) as i64,
1166                            );
1167                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1168                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1169                            log.inc(
1170                                "pda_misses",
1171                                (self.pda_cache_misses - pda_misses_before) as i64,
1172                            );
1173                        }
1174
1175                        if mutations.is_empty() {
1176                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1177                                if event_type.ends_with("IxState") {
1178                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1179                                    let signature = context
1180                                        .and_then(|c| c.signature.clone())
1181                                        .unwrap_or_default();
1182                                    let _ = self.queue_instruction_event(
1183                                        entity_bytecode.state_id,
1184                                        QueuedInstructionEvent {
1185                                            pda_address: missed_pda,
1186                                            event_type: event_type.to_string(),
1187                                            event_data: event_value.clone(),
1188                                            slot,
1189                                            signature,
1190                                        },
1191                                    );
1192                                }
1193                            }
1194
1195                            if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1196                                if !event_type.ends_with("IxState") {
1197                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1198                                    let signature = context
1199                                        .and_then(|c| c.signature.clone())
1200                                        .unwrap_or_default();
1201                                    if let Some(write_version) =
1202                                        context.and_then(|c| c.write_version)
1203                                    {
1204                                        let _ = self.queue_account_update(
1205                                            entity_bytecode.state_id,
1206                                            QueuedAccountUpdate {
1207                                                pda_address: missed_lookup,
1208                                                account_type: event_type.to_string(),
1209                                                account_data: event_value.clone(),
1210                                                slot,
1211                                                write_version,
1212                                                signature,
1213                                            },
1214                                        );
1215                                    }
1216                                }
1217                            }
1218                        }
1219
1220                        all_mutations.extend(mutations);
1221
1222                        if event_type.ends_with("IxState") {
1223                            if let Some(ctx) = context {
1224                                if let Some(ref signature) = ctx.signature {
1225                                    if let Some(state) = self.states.get(&entity_bytecode.state_id)
1226                                    {
1227                                        {
1228                                            let mut cache =
1229                                                state.recent_tx_instructions.lock().unwrap();
1230                                            let entry = cache
1231                                                .get_or_insert_mut(signature.clone(), HashSet::new);
1232                                            entry.insert(event_type.to_string());
1233                                        }
1234
1235                                        let key = (signature.clone(), event_type.to_string());
1236                                        if let Some((_, deferred_ops)) =
1237                                            state.deferred_when_ops.remove(&key)
1238                                        {
1239                                            for op in deferred_ops {
1240                                                match self.apply_deferred_when_op(
1241                                                    entity_bytecode.state_id,
1242                                                    &op,
1243                                                ) {
1244                                                    Ok(mutations) => {
1245                                                        all_mutations.extend(mutations)
1246                                                    }
1247                                                    Err(e) => {
1248                                                        tracing::warn!(
1249                                                            "Failed to apply deferred when-op: {}",
1250                                                            e
1251                                                        );
1252                                                    }
1253                                                }
1254                                            }
1255                                        }
1256                                    }
1257                                }
1258                            }
1259                        }
1260
1261                        if let Some(registered_pda) = self.take_last_pda_registered() {
1262                            let pending_events = self.flush_pending_instruction_events(
1263                                entity_bytecode.state_id,
1264                                &registered_pda,
1265                            );
1266                            for pending in pending_events {
1267                                if let Some(pending_handler) =
1268                                    entity_bytecode.handlers.get(&pending.event_type)
1269                                {
1270                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1271                                        pending_handler,
1272                                        &pending.event_data,
1273                                        &pending.event_type,
1274                                        entity_bytecode.state_id,
1275                                        entity_name,
1276                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1277                                        Some(&entity_bytecode.non_emitted_fields),
1278                                    ) {
1279                                        all_mutations.extend(reprocessed_mutations);
1280                                    }
1281                                }
1282                            }
1283                        }
1284
1285                        let lookup_keys = self.take_last_lookup_index_keys();
1286                        for lookup_key in lookup_keys {
1287                            if let Ok(pending_updates) =
1288                                self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1289                            {
1290                                for pending in pending_updates {
1291                                    if let Some(pending_handler) =
1292                                        entity_bytecode.handlers.get(&pending.account_type)
1293                                    {
1294                                        self.current_context = Some(UpdateContext::new_account(
1295                                            pending.slot,
1296                                            pending.signature.clone(),
1297                                            pending.write_version,
1298                                        ));
1299                                        if let Ok(reprocessed) = self.execute_handler(
1300                                            pending_handler,
1301                                            &pending.account_data,
1302                                            &pending.account_type,
1303                                            entity_bytecode.state_id,
1304                                            entity_name,
1305                                            entity_bytecode.computed_fields_evaluator.as_ref(),
1306                                            Some(&entity_bytecode.non_emitted_fields),
1307                                        ) {
1308                                            all_mutations.extend(reprocessed);
1309                                        }
1310                                    }
1311                                }
1312                            }
1313                        }
1314                    } else if let Some(ref mut log) = log {
1315                        log.set("skip_reason", "no_handler");
1316                    }
1317                } else if let Some(ref mut log) = log {
1318                    log.set("skip_reason", "entity_not_found");
1319                }
1320            }
1321        } else if let Some(ref mut log) = log {
1322            log.set("skip_reason", "no_event_routing");
1323        }
1324
1325        if let Some(log) = log {
1326            log.set("mutations", all_mutations.len() as i64);
1327            if let Some(first) = all_mutations.first() {
1328                if let Some(key_str) = first.key.as_str() {
1329                    log.set("primary_key", key_str);
1330                } else if let Some(key_num) = first.key.as_u64() {
1331                    log.set("primary_key", key_num as i64);
1332                }
1333            }
1334            if let Some(state) = self.states.get(&0) {
1335                log.set("state_table_size", state.data.len() as i64);
1336            }
1337
1338            let warnings = self.take_warnings();
1339            if !warnings.is_empty() {
1340                log.set("warnings", warnings.len() as i64);
1341                log.set(
1342                    "warning_messages",
1343                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1344                );
1345                log.set_level(crate::canonical_log::LogLevel::Warn);
1346            }
1347        } else {
1348            self.warnings.clear();
1349        }
1350
1351        if self.instructions_executed.is_multiple_of(1000) {
1352            let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1353            for state_id in state_ids {
1354                let expired = self.cleanup_expired_when_ops(state_id, 60);
1355                if expired > 0 {
1356                    tracing::debug!(
1357                        "Cleaned up {} expired deferred when-ops for state {}",
1358                        expired,
1359                        state_id
1360                    );
1361                }
1362            }
1363        }
1364
1365        Ok(all_mutations)
1366    }
1367
1368    pub fn process_any(
1369        &mut self,
1370        bytecode: &MultiEntityBytecode,
1371        any: prost_types::Any,
1372    ) -> Result<Vec<Mutation>> {
1373        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1374        self.process_event(bytecode, event_value, &event_type, None, None)
1375    }
1376
1377    #[cfg_attr(feature = "otel", instrument(
1378        name = "vm.execute_handler",
1379        skip(self, handler, event_value, entity_evaluator),
1380        level = "debug",
1381        fields(
1382            event_type = %event_type,
1383            handler_opcodes = handler.len(),
1384        )
1385    ))]
1386    #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1387    fn execute_handler(
1388        &mut self,
1389        handler: &[OpCode],
1390        event_value: &Value,
1391        event_type: &str,
1392        override_state_id: u32,
1393        entity_name: &str,
1394        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1395        non_emitted_fields: Option<&HashSet<String>>,
1396    ) -> Result<Vec<Mutation>> {
1397        self.reset_registers();
1398        self.last_pda_lookup_miss = None;
1399
1400        let mut pc: usize = 0;
1401        let mut output = Vec::new();
1402        let mut dirty_tracker = DirtyTracker::new();
1403        let should_emit = |path: &str| {
1404            non_emitted_fields
1405                .map(|fields| !fields.contains(path))
1406                .unwrap_or(true)
1407        };
1408
1409        while pc < handler.len() {
1410            match &handler[pc] {
1411                OpCode::LoadEventField {
1412                    path,
1413                    dest,
1414                    default,
1415                } => {
1416                    let value = self.load_field(event_value, path, default.as_ref())?;
1417                    self.registers[*dest] = value;
1418                    pc += 1;
1419                }
1420                OpCode::LoadConstant { value, dest } => {
1421                    self.registers[*dest] = value.clone();
1422                    pc += 1;
1423                }
1424                OpCode::CopyRegister { source, dest } => {
1425                    self.registers[*dest] = self.registers[*source].clone();
1426                    pc += 1;
1427                }
1428                OpCode::CopyRegisterIfNull { source, dest } => {
1429                    if self.registers[*dest].is_null() {
1430                        self.registers[*dest] = self.registers[*source].clone();
1431                    }
1432                    pc += 1;
1433                }
1434                OpCode::GetEventType { dest } => {
1435                    self.registers[*dest] = json!(event_type);
1436                    pc += 1;
1437                }
1438                OpCode::CreateObject { dest } => {
1439                    self.registers[*dest] = json!({});
1440                    pc += 1;
1441                }
1442                OpCode::SetField {
1443                    object,
1444                    path,
1445                    value,
1446                } => {
1447                    self.set_field_auto_vivify(*object, path, *value)?;
1448                    if should_emit(path) {
1449                        dirty_tracker.mark_replaced(path);
1450                    }
1451                    pc += 1;
1452                }
1453                OpCode::SetFields { object, fields } => {
1454                    for (path, value_reg) in fields {
1455                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1456                        if should_emit(path) {
1457                            dirty_tracker.mark_replaced(path);
1458                        }
1459                    }
1460                    pc += 1;
1461                }
1462                OpCode::GetField { object, path, dest } => {
1463                    let value = self.get_field(*object, path)?;
1464                    self.registers[*dest] = value;
1465                    pc += 1;
1466                }
1467                OpCode::ReadOrInitState {
1468                    state_id: _,
1469                    key,
1470                    default,
1471                    dest,
1472                } => {
1473                    let actual_state_id = override_state_id;
1474                    let entity_name_owned = entity_name.to_string();
1475                    self.states
1476                        .entry(actual_state_id)
1477                        .or_insert_with(|| StateTable {
1478                            data: DashMap::new(),
1479                            access_times: DashMap::new(),
1480                            lookup_indexes: HashMap::new(),
1481                            temporal_indexes: HashMap::new(),
1482                            pda_reverse_lookups: HashMap::new(),
1483                            pending_updates: DashMap::new(),
1484                            pending_instruction_events: DashMap::new(),
1485                            version_tracker: VersionTracker::new(),
1486                            instruction_dedup_cache: VersionTracker::with_capacity(
1487                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1488                            ),
1489                            config: StateTableConfig::default(),
1490                            entity_name: entity_name_owned,
1491                            recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1492                                NonZeroUsize::new(1000).unwrap(),
1493                            )),
1494                            deferred_when_ops: DashMap::new(),
1495                        });
1496                    let key_value = self.registers[*key].clone();
1497                    let warn_null_key = key_value.is_null()
1498                        && event_type.ends_with("State")
1499                        && !event_type.ends_with("IxState");
1500
1501                    if warn_null_key {
1502                        self.add_warning(format!(
1503                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1504                            key, event_type
1505                        ));
1506                    }
1507
1508                    let state = self
1509                        .states
1510                        .get(&actual_state_id)
1511                        .ok_or("State table not found")?;
1512
1513                    if !key_value.is_null() {
1514                        if let Some(ctx) = &self.current_context {
1515                            // Account updates: use recency check to discard stale updates
1516                            if ctx.is_account_update() {
1517                                if let (Some(slot), Some(write_version)) =
1518                                    (ctx.slot, ctx.write_version)
1519                                {
1520                                    if !state.is_fresh_update(
1521                                        &key_value,
1522                                        event_type,
1523                                        slot,
1524                                        write_version,
1525                                    ) {
1526                                        self.add_warning(format!(
1527                                            "Stale account update skipped: slot={}, write_version={}",
1528                                            slot, write_version
1529                                        ));
1530                                        return Ok(Vec::new());
1531                                    }
1532                                }
1533                            }
1534                            // Instruction updates: process all, but skip exact duplicates
1535                            else if ctx.is_instruction_update() {
1536                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1537                                    if state.is_duplicate_instruction(
1538                                        &key_value, event_type, slot, txn_index,
1539                                    ) {
1540                                        self.add_warning(format!(
1541                                            "Duplicate instruction skipped: slot={}, txn_index={}",
1542                                            slot, txn_index
1543                                        ));
1544                                        return Ok(Vec::new());
1545                                    }
1546                                }
1547                            }
1548                        }
1549                    }
1550                    let value = state
1551                        .get_and_touch(&key_value)
1552                        .unwrap_or_else(|| default.clone());
1553
1554                    self.registers[*dest] = value;
1555                    pc += 1;
1556                }
1557                OpCode::UpdateState {
1558                    state_id: _,
1559                    key,
1560                    value,
1561                } => {
1562                    let actual_state_id = override_state_id;
1563                    let state = self
1564                        .states
1565                        .get(&actual_state_id)
1566                        .ok_or("State table not found")?;
1567                    let key_value = self.registers[*key].clone();
1568                    let value_data = self.registers[*value].clone();
1569
1570                    state.insert_with_eviction(key_value, value_data);
1571                    pc += 1;
1572                }
1573                OpCode::AppendToArray {
1574                    object,
1575                    path,
1576                    value,
1577                } => {
1578                    let appended_value = self.registers[*value].clone();
1579                    let max_len = self
1580                        .states
1581                        .get(&override_state_id)
1582                        .map(|s| s.max_array_length())
1583                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1584                    self.append_to_array(*object, path, *value, max_len)?;
1585                    if should_emit(path) {
1586                        dirty_tracker.mark_appended(path, appended_value);
1587                    }
1588                    pc += 1;
1589                }
1590                OpCode::GetCurrentTimestamp { dest } => {
1591                    let timestamp = std::time::SystemTime::now()
1592                        .duration_since(std::time::UNIX_EPOCH)
1593                        .unwrap()
1594                        .as_secs() as i64;
1595                    self.registers[*dest] = json!(timestamp);
1596                    pc += 1;
1597                }
1598                OpCode::CreateEvent { dest, event_value } => {
1599                    let timestamp = std::time::SystemTime::now()
1600                        .duration_since(std::time::UNIX_EPOCH)
1601                        .unwrap()
1602                        .as_secs() as i64;
1603
1604                    // Filter out __update_context from the event data
1605                    let mut event_data = self.registers[*event_value].clone();
1606                    if let Some(obj) = event_data.as_object_mut() {
1607                        obj.remove("__update_context");
1608                    }
1609
1610                    // Create event with timestamp, data, and optional slot/signature from context
1611                    let mut event = serde_json::Map::new();
1612                    event.insert("timestamp".to_string(), json!(timestamp));
1613                    event.insert("data".to_string(), event_data);
1614
1615                    // Add slot and signature if available from current context
1616                    if let Some(ref ctx) = self.current_context {
1617                        if let Some(slot) = ctx.slot {
1618                            event.insert("slot".to_string(), json!(slot));
1619                        }
1620                        if let Some(ref signature) = ctx.signature {
1621                            event.insert("signature".to_string(), json!(signature));
1622                        }
1623                    }
1624
1625                    self.registers[*dest] = Value::Object(event);
1626                    pc += 1;
1627                }
1628                OpCode::CreateCapture {
1629                    dest,
1630                    capture_value,
1631                } => {
1632                    let timestamp = std::time::SystemTime::now()
1633                        .duration_since(std::time::UNIX_EPOCH)
1634                        .unwrap()
1635                        .as_secs() as i64;
1636
1637                    // Get the capture data (already filtered by load_field)
1638                    let capture_data = self.registers[*capture_value].clone();
1639
1640                    // Extract account_address from the original event if available
1641                    let account_address = event_value
1642                        .get("__account_address")
1643                        .and_then(|v| v.as_str())
1644                        .unwrap_or("")
1645                        .to_string();
1646
1647                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1648                    let mut capture = serde_json::Map::new();
1649                    capture.insert("timestamp".to_string(), json!(timestamp));
1650                    capture.insert("account_address".to_string(), json!(account_address));
1651                    capture.insert("data".to_string(), capture_data);
1652
1653                    // Add slot and signature if available from current context
1654                    if let Some(ref ctx) = self.current_context {
1655                        if let Some(slot) = ctx.slot {
1656                            capture.insert("slot".to_string(), json!(slot));
1657                        }
1658                        if let Some(ref signature) = ctx.signature {
1659                            capture.insert("signature".to_string(), json!(signature));
1660                        }
1661                    }
1662
1663                    self.registers[*dest] = Value::Object(capture);
1664                    pc += 1;
1665                }
1666                OpCode::Transform {
1667                    source,
1668                    dest,
1669                    transformation,
1670                } => {
1671                    if source == dest {
1672                        self.transform_in_place(*source, transformation)?;
1673                    } else {
1674                        let source_value = &self.registers[*source];
1675                        let value = self.apply_transformation(source_value, transformation)?;
1676                        self.registers[*dest] = value;
1677                    }
1678                    pc += 1;
1679                }
1680                OpCode::EmitMutation {
1681                    entity_name,
1682                    key,
1683                    state,
1684                } => {
1685                    let primary_key = self.registers[*key].clone();
1686
1687                    if primary_key.is_null() || dirty_tracker.is_empty() {
1688                        let reason = if dirty_tracker.is_empty() {
1689                            "no_fields_modified"
1690                        } else {
1691                            "null_primary_key"
1692                        };
1693                        self.add_warning(format!(
1694                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
1695                            entity_name,
1696                            reason,
1697                            dirty_tracker.len()
1698                        ));
1699                    } else {
1700                        let patch =
1701                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1702
1703                        let append = dirty_tracker.appended_paths();
1704                        let mutation = Mutation {
1705                            export: entity_name.clone(),
1706                            key: primary_key,
1707                            patch,
1708                            append,
1709                        };
1710                        output.push(mutation);
1711                    }
1712                    pc += 1;
1713                }
1714                OpCode::SetFieldIfNull {
1715                    object,
1716                    path,
1717                    value,
1718                } => {
1719                    let was_set = self.set_field_if_null(*object, path, *value)?;
1720                    if was_set && should_emit(path) {
1721                        dirty_tracker.mark_replaced(path);
1722                    }
1723                    pc += 1;
1724                }
1725                OpCode::SetFieldMax {
1726                    object,
1727                    path,
1728                    value,
1729                } => {
1730                    let was_updated = self.set_field_max(*object, path, *value)?;
1731                    if was_updated && should_emit(path) {
1732                        dirty_tracker.mark_replaced(path);
1733                    }
1734                    pc += 1;
1735                }
1736                OpCode::UpdateTemporalIndex {
1737                    state_id: _,
1738                    index_name,
1739                    lookup_value,
1740                    primary_key,
1741                    timestamp,
1742                } => {
1743                    let actual_state_id = override_state_id;
1744                    let state = self
1745                        .states
1746                        .get_mut(&actual_state_id)
1747                        .ok_or("State table not found")?;
1748                    let index = state
1749                        .temporal_indexes
1750                        .entry(index_name.clone())
1751                        .or_insert_with(TemporalIndex::new);
1752
1753                    let lookup_val = self.registers[*lookup_value].clone();
1754                    let pk_val = self.registers[*primary_key].clone();
1755                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1756                        val
1757                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
1758                        val as i64
1759                    } else {
1760                        return Err(format!(
1761                            "Timestamp must be a number (i64 or u64), got: {:?}",
1762                            self.registers[*timestamp]
1763                        )
1764                        .into());
1765                    };
1766
1767                    index.insert(lookup_val, pk_val, ts_val);
1768                    pc += 1;
1769                }
1770                OpCode::LookupTemporalIndex {
1771                    state_id: _,
1772                    index_name,
1773                    lookup_value,
1774                    timestamp,
1775                    dest,
1776                } => {
1777                    let actual_state_id = override_state_id;
1778                    let state = self
1779                        .states
1780                        .get(&actual_state_id)
1781                        .ok_or("State table not found")?;
1782                    let lookup_val = &self.registers[*lookup_value];
1783
1784                    let result = if self.registers[*timestamp].is_null() {
1785                        if let Some(index) = state.temporal_indexes.get(index_name) {
1786                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1787                        } else {
1788                            Value::Null
1789                        }
1790                    } else {
1791                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1792                            val
1793                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
1794                            val as i64
1795                        } else {
1796                            return Err(format!(
1797                                "Timestamp must be a number (i64 or u64), got: {:?}",
1798                                self.registers[*timestamp]
1799                            )
1800                            .into());
1801                        };
1802
1803                        if let Some(index) = state.temporal_indexes.get(index_name) {
1804                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1805                        } else {
1806                            Value::Null
1807                        }
1808                    };
1809
1810                    self.registers[*dest] = result;
1811                    pc += 1;
1812                }
1813                OpCode::UpdateLookupIndex {
1814                    state_id: _,
1815                    index_name,
1816                    lookup_value,
1817                    primary_key,
1818                } => {
1819                    let actual_state_id = override_state_id;
1820                    let state = self
1821                        .states
1822                        .get_mut(&actual_state_id)
1823                        .ok_or("State table not found")?;
1824                    let index = state
1825                        .lookup_indexes
1826                        .entry(index_name.clone())
1827                        .or_insert_with(LookupIndex::new);
1828
1829                    let lookup_val = self.registers[*lookup_value].clone();
1830                    let pk_val = self.registers[*primary_key].clone();
1831
1832                    index.insert(lookup_val.clone(), pk_val);
1833
1834                    // Track lookup keys so process_event can flush queued account updates
1835                    if let Some(key_str) = lookup_val.as_str() {
1836                        self.last_lookup_index_keys.push(key_str.to_string());
1837                    }
1838
1839                    pc += 1;
1840                }
1841                OpCode::LookupIndex {
1842                    state_id: _,
1843                    index_name,
1844                    lookup_value,
1845                    dest,
1846                } => {
1847                    let actual_state_id = override_state_id;
1848                    let mut current_value = self.registers[*lookup_value].clone();
1849
1850                    const MAX_CHAIN_DEPTH: usize = 5;
1851                    let mut iterations = 0;
1852
1853                    let final_result = if self.states.contains_key(&actual_state_id) {
1854                        loop {
1855                            iterations += 1;
1856                            if iterations > MAX_CHAIN_DEPTH {
1857                                break current_value;
1858                            }
1859
1860                            let resolved = self
1861                                .states
1862                                .get(&actual_state_id)
1863                                .and_then(|state| {
1864                                    if let Some(index) = state.lookup_indexes.get(index_name) {
1865                                        if let Some(found) = index.lookup(&current_value) {
1866                                            return Some(found);
1867                                        }
1868                                    }
1869
1870                                    for (name, index) in state.lookup_indexes.iter() {
1871                                        if name == index_name {
1872                                            continue;
1873                                        }
1874                                        if let Some(found) = index.lookup(&current_value) {
1875                                            return Some(found);
1876                                        }
1877                                    }
1878
1879                                    None
1880                                })
1881                                .unwrap_or(Value::Null);
1882
1883                            let mut resolved_from_pda = false;
1884                            let resolved = if resolved.is_null() {
1885                                if let Some(pda_str) = current_value.as_str() {
1886                                    resolved_from_pda = true;
1887                                    self.states
1888                                        .get_mut(&actual_state_id)
1889                                        .and_then(|state_mut| {
1890                                            state_mut
1891                                                .pda_reverse_lookups
1892                                                .get_mut("default_pda_lookup")
1893                                        })
1894                                        .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
1895                                        .map(Value::String)
1896                                        .unwrap_or(Value::Null)
1897                                } else {
1898                                    Value::Null
1899                                }
1900                            } else {
1901                                resolved
1902                            };
1903
1904                            if resolved.is_null() {
1905                                if iterations == 1 {
1906                                    if let Some(pda_str) = current_value.as_str() {
1907                                        self.last_pda_lookup_miss = Some(pda_str.to_string());
1908                                    }
1909                                }
1910                                break Value::Null;
1911                            }
1912
1913                            let can_chain =
1914                                self.can_resolve_further(&resolved, actual_state_id, index_name);
1915
1916                            if !can_chain {
1917                                if resolved_from_pda {
1918                                    if let Some(resolved_str) = resolved.as_str() {
1919                                        self.last_lookup_index_miss =
1920                                            Some(resolved_str.to_string());
1921                                    }
1922                                    break Value::Null;
1923                                }
1924                                break resolved;
1925                            }
1926
1927                            current_value = resolved;
1928                        }
1929                    } else {
1930                        Value::Null
1931                    };
1932
1933                    self.registers[*dest] = final_result;
1934                    pc += 1;
1935                }
1936                OpCode::SetFieldSum {
1937                    object,
1938                    path,
1939                    value,
1940                } => {
1941                    let was_updated = self.set_field_sum(*object, path, *value)?;
1942                    if was_updated && should_emit(path) {
1943                        dirty_tracker.mark_replaced(path);
1944                    }
1945                    pc += 1;
1946                }
1947                OpCode::SetFieldIncrement { object, path } => {
1948                    let was_updated = self.set_field_increment(*object, path)?;
1949                    if was_updated && should_emit(path) {
1950                        dirty_tracker.mark_replaced(path);
1951                    }
1952                    pc += 1;
1953                }
1954                OpCode::SetFieldMin {
1955                    object,
1956                    path,
1957                    value,
1958                } => {
1959                    let was_updated = self.set_field_min(*object, path, *value)?;
1960                    if was_updated && should_emit(path) {
1961                        dirty_tracker.mark_replaced(path);
1962                    }
1963                    pc += 1;
1964                }
1965                OpCode::AddToUniqueSet {
1966                    state_id: _,
1967                    set_name,
1968                    value,
1969                    count_object,
1970                    count_path,
1971                } => {
1972                    let value_to_add = self.registers[*value].clone();
1973
1974                    // Store the unique set within the entity object, not in the state table
1975                    // This ensures each entity instance has its own unique set
1976                    let set_field_path = format!("__unique_set:{}", set_name);
1977
1978                    // Get or create the unique set from the entity object
1979                    let mut set: HashSet<Value> =
1980                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1981                            if !existing.is_null() {
1982                                serde_json::from_value(existing).unwrap_or_default()
1983                            } else {
1984                                HashSet::new()
1985                            }
1986                        } else {
1987                            HashSet::new()
1988                        };
1989
1990                    // Add value to set
1991                    let was_new = set.insert(value_to_add);
1992
1993                    // Store updated set back in the entity object
1994                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1995                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1996                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1997
1998                    // Update the count field in the object
1999                    if was_new {
2000                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2001                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
2002                        if should_emit(count_path) {
2003                            dirty_tracker.mark_replaced(count_path);
2004                        }
2005                    }
2006
2007                    pc += 1;
2008                }
2009                OpCode::ConditionalSetField {
2010                    object,
2011                    path,
2012                    value,
2013                    condition_field,
2014                    condition_op,
2015                    condition_value,
2016                } => {
2017                    let field_value = self.load_field(event_value, condition_field, None)?;
2018                    let condition_met =
2019                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2020
2021                    if condition_met {
2022                        self.set_field_auto_vivify(*object, path, *value)?;
2023                        if should_emit(path) {
2024                            dirty_tracker.mark_replaced(path);
2025                        }
2026                    }
2027                    pc += 1;
2028                }
2029                OpCode::SetFieldWhen {
2030                    object,
2031                    path,
2032                    value,
2033                    when_instruction,
2034                    entity_name,
2035                    key_reg,
2036                    condition_field,
2037                    condition_op,
2038                    condition_value,
2039                } => {
2040                    let actual_state_id = override_state_id;
2041                    let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2042                        condition_field.as_ref(),
2043                        condition_op.as_ref(),
2044                        condition_value.as_ref(),
2045                    ) {
2046                        let field_value = self.load_field(event_value, field, None)?;
2047                        self.evaluate_comparison(&field_value, op, cond_value)?
2048                    } else {
2049                        true
2050                    };
2051
2052                    if !condition_met {
2053                        pc += 1;
2054                        continue;
2055                    }
2056
2057                    let signature = self
2058                        .current_context
2059                        .as_ref()
2060                        .and_then(|c| c.signature.clone())
2061                        .unwrap_or_default();
2062
2063                    let emit = should_emit(path);
2064
2065                    let instruction_seen = if !signature.is_empty() {
2066                        if let Some(state) = self.states.get(&actual_state_id) {
2067                            let mut cache = state.recent_tx_instructions.lock().unwrap();
2068                            cache
2069                                .get(&signature)
2070                                .map(|set| set.contains(when_instruction))
2071                                .unwrap_or(false)
2072                        } else {
2073                            false
2074                        }
2075                    } else {
2076                        false
2077                    };
2078
2079                    if instruction_seen {
2080                        self.set_field_auto_vivify(*object, path, *value)?;
2081                        if emit {
2082                            dirty_tracker.mark_replaced(path);
2083                        }
2084                    } else if !signature.is_empty() {
2085                        let deferred = DeferredWhenOperation {
2086                            entity_name: entity_name.clone(),
2087                            primary_key: self.registers[*key_reg].clone(),
2088                            field_path: path.clone(),
2089                            field_value: self.registers[*value].clone(),
2090                            when_instruction: when_instruction.clone(),
2091                            signature: signature.clone(),
2092                            slot: self
2093                                .current_context
2094                                .as_ref()
2095                                .and_then(|c| c.slot)
2096                                .unwrap_or(0),
2097                            deferred_at: std::time::SystemTime::now()
2098                                .duration_since(std::time::UNIX_EPOCH)
2099                                .unwrap()
2100                                .as_secs() as i64,
2101                            emit,
2102                        };
2103
2104                        if let Some(state) = self.states.get(&actual_state_id) {
2105                            let key = (signature, when_instruction.clone());
2106                            state
2107                                .deferred_when_ops
2108                                .entry(key)
2109                                .or_insert_with(Vec::new)
2110                                .push(deferred);
2111                        }
2112                    }
2113
2114                    pc += 1;
2115                }
2116                OpCode::ConditionalIncrement {
2117                    object,
2118                    path,
2119                    condition_field,
2120                    condition_op,
2121                    condition_value,
2122                } => {
2123                    let field_value = self.load_field(event_value, condition_field, None)?;
2124                    let condition_met =
2125                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2126
2127                    if condition_met {
2128                        let was_updated = self.set_field_increment(*object, path)?;
2129                        if was_updated && should_emit(path) {
2130                            dirty_tracker.mark_replaced(path);
2131                        }
2132                    }
2133                    pc += 1;
2134                }
2135                OpCode::EvaluateComputedFields {
2136                    state,
2137                    computed_paths,
2138                } => {
2139                    if let Some(evaluator) = entity_evaluator {
2140                        let old_values: Vec<_> = computed_paths
2141                            .iter()
2142                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2143                            .collect();
2144
2145                        let state_value = &mut self.registers[*state];
2146                        let eval_result = evaluator(state_value);
2147
2148                        if eval_result.is_ok() {
2149                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2150                                let new_value =
2151                                    Self::get_value_at_path(&self.registers[*state], path);
2152
2153                                if new_value != *old_value && should_emit(path) {
2154                                    dirty_tracker.mark_replaced(path);
2155                                }
2156                            }
2157                        }
2158                    }
2159                    pc += 1;
2160                }
2161                OpCode::UpdatePdaReverseLookup {
2162                    state_id: _,
2163                    lookup_name,
2164                    pda_address,
2165                    primary_key,
2166                } => {
2167                    let actual_state_id = override_state_id;
2168                    let state = self
2169                        .states
2170                        .get_mut(&actual_state_id)
2171                        .ok_or("State table not found")?;
2172
2173                    let pda_val = self.registers[*pda_address].clone();
2174                    let pk_val = self.registers[*primary_key].clone();
2175
2176                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2177                        let pda_lookup = state
2178                            .pda_reverse_lookups
2179                            .entry(lookup_name.clone())
2180                            .or_insert_with(|| {
2181                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2182                            });
2183
2184                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2185                        self.last_pda_registered = Some(pda_str.to_string());
2186                    } else if !pk_val.is_null() {
2187                        if let Some(pk_num) = pk_val.as_u64() {
2188                            if let Some(pda_str) = pda_val.as_str() {
2189                                let pda_lookup = state
2190                                    .pda_reverse_lookups
2191                                    .entry(lookup_name.clone())
2192                                    .or_insert_with(|| {
2193                                        PdaReverseLookup::new(
2194                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2195                                        )
2196                                    });
2197
2198                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2199                                self.last_pda_registered = Some(pda_str.to_string());
2200                            }
2201                        }
2202                    }
2203
2204                    pc += 1;
2205                }
2206            }
2207
2208            self.instructions_executed += 1;
2209        }
2210
2211        Ok(output)
2212    }
2213
2214    fn load_field(
2215        &self,
2216        event_value: &Value,
2217        path: &FieldPath,
2218        default: Option<&Value>,
2219    ) -> Result<Value> {
2220        if path.segments.is_empty() {
2221            if let Some(obj) = event_value.as_object() {
2222                let filtered: serde_json::Map<String, Value> = obj
2223                    .iter()
2224                    .filter(|(k, _)| !k.starts_with("__"))
2225                    .map(|(k, v)| (k.clone(), v.clone()))
2226                    .collect();
2227                return Ok(Value::Object(filtered));
2228            }
2229            return Ok(event_value.clone());
2230        }
2231
2232        let mut current = event_value;
2233        for segment in path.segments.iter() {
2234            current = match current.get(segment) {
2235                Some(v) => v,
2236                None => return Ok(default.cloned().unwrap_or(Value::Null)),
2237            };
2238        }
2239
2240        Ok(current.clone())
2241    }
2242
2243    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
2244        let mut current = value;
2245        for segment in path.split('.') {
2246            current = current.get(segment)?;
2247        }
2248        Some(current.clone())
2249    }
2250
2251    fn set_field_auto_vivify(
2252        &mut self,
2253        object_reg: Register,
2254        path: &str,
2255        value_reg: Register,
2256    ) -> Result<()> {
2257        let compiled = self.get_compiled_path(path);
2258        let segments = compiled.segments();
2259        let value = self.registers[value_reg].clone();
2260
2261        if !self.registers[object_reg].is_object() {
2262            self.registers[object_reg] = json!({});
2263        }
2264
2265        let obj = self.registers[object_reg]
2266            .as_object_mut()
2267            .ok_or("Not an object")?;
2268
2269        let mut current = obj;
2270        for (i, segment) in segments.iter().enumerate() {
2271            if i == segments.len() - 1 {
2272                current.insert(segment.to_string(), value);
2273                return Ok(());
2274            } else {
2275                current
2276                    .entry(segment.to_string())
2277                    .or_insert_with(|| json!({}));
2278                current = current
2279                    .get_mut(segment)
2280                    .and_then(|v| v.as_object_mut())
2281                    .ok_or("Path collision: expected object")?;
2282            }
2283        }
2284
2285        Ok(())
2286    }
2287
2288    fn set_field_if_null(
2289        &mut self,
2290        object_reg: Register,
2291        path: &str,
2292        value_reg: Register,
2293    ) -> Result<bool> {
2294        let compiled = self.get_compiled_path(path);
2295        let segments = compiled.segments();
2296        let value = self.registers[value_reg].clone();
2297
2298        // SetOnce should only set meaningful values. A null source typically means
2299        // the field doesn't exist in this event type (e.g., instruction events don't
2300        // have account data). Skip to preserve any existing value.
2301        if value.is_null() {
2302            return Ok(false);
2303        }
2304
2305        if !self.registers[object_reg].is_object() {
2306            self.registers[object_reg] = json!({});
2307        }
2308
2309        let obj = self.registers[object_reg]
2310            .as_object_mut()
2311            .ok_or("Not an object")?;
2312
2313        let mut current = obj;
2314        for (i, segment) in segments.iter().enumerate() {
2315            if i == segments.len() - 1 {
2316                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2317                    current.insert(segment.to_string(), value);
2318                    return Ok(true);
2319                }
2320                return Ok(false);
2321            } else {
2322                current
2323                    .entry(segment.to_string())
2324                    .or_insert_with(|| json!({}));
2325                current = current
2326                    .get_mut(segment)
2327                    .and_then(|v| v.as_object_mut())
2328                    .ok_or("Path collision: expected object")?;
2329            }
2330        }
2331
2332        Ok(false)
2333    }
2334
2335    fn set_field_max(
2336        &mut self,
2337        object_reg: Register,
2338        path: &str,
2339        value_reg: Register,
2340    ) -> Result<bool> {
2341        let compiled = self.get_compiled_path(path);
2342        let segments = compiled.segments();
2343        let new_value = self.registers[value_reg].clone();
2344
2345        if !self.registers[object_reg].is_object() {
2346            self.registers[object_reg] = json!({});
2347        }
2348
2349        let obj = self.registers[object_reg]
2350            .as_object_mut()
2351            .ok_or("Not an object")?;
2352
2353        let mut current = obj;
2354        for (i, segment) in segments.iter().enumerate() {
2355            if i == segments.len() - 1 {
2356                let should_update = if let Some(current_value) = current.get(segment) {
2357                    if current_value.is_null() {
2358                        true
2359                    } else {
2360                        match (current_value.as_i64(), new_value.as_i64()) {
2361                            (Some(current_val), Some(new_val)) => new_val > current_val,
2362                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2363                                new_value.as_u64().unwrap() as i64 > current_val
2364                            }
2365                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2366                                new_val > current_value.as_u64().unwrap() as i64
2367                            }
2368                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2369                                (Some(current_val), Some(new_val)) => new_val > current_val,
2370                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2371                                    (Some(current_val), Some(new_val)) => new_val > current_val,
2372                                    _ => false,
2373                                },
2374                            },
2375                            _ => false,
2376                        }
2377                    }
2378                } else {
2379                    true
2380                };
2381
2382                if should_update {
2383                    current.insert(segment.to_string(), new_value);
2384                    return Ok(true);
2385                }
2386                return Ok(false);
2387            } else {
2388                current
2389                    .entry(segment.to_string())
2390                    .or_insert_with(|| json!({}));
2391                current = current
2392                    .get_mut(segment)
2393                    .and_then(|v| v.as_object_mut())
2394                    .ok_or("Path collision: expected object")?;
2395            }
2396        }
2397
2398        Ok(false)
2399    }
2400
2401    fn set_field_sum(
2402        &mut self,
2403        object_reg: Register,
2404        path: &str,
2405        value_reg: Register,
2406    ) -> Result<bool> {
2407        let compiled = self.get_compiled_path(path);
2408        let segments = compiled.segments();
2409        let new_value = &self.registers[value_reg];
2410
2411        // Extract numeric value before borrowing object_reg mutably
2412        let new_val_num = new_value
2413            .as_i64()
2414            .or_else(|| new_value.as_u64().map(|n| n as i64))
2415            .ok_or("Sum requires numeric value")?;
2416
2417        if !self.registers[object_reg].is_object() {
2418            self.registers[object_reg] = json!({});
2419        }
2420
2421        let obj = self.registers[object_reg]
2422            .as_object_mut()
2423            .ok_or("Not an object")?;
2424
2425        let mut current = obj;
2426        for (i, segment) in segments.iter().enumerate() {
2427            if i == segments.len() - 1 {
2428                let current_val = current
2429                    .get(segment)
2430                    .and_then(|v| {
2431                        if v.is_null() {
2432                            None
2433                        } else {
2434                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2435                        }
2436                    })
2437                    .unwrap_or(0);
2438
2439                let sum = current_val + new_val_num;
2440                current.insert(segment.to_string(), json!(sum));
2441                return Ok(true);
2442            } else {
2443                current
2444                    .entry(segment.to_string())
2445                    .or_insert_with(|| json!({}));
2446                current = current
2447                    .get_mut(segment)
2448                    .and_then(|v| v.as_object_mut())
2449                    .ok_or("Path collision: expected object")?;
2450            }
2451        }
2452
2453        Ok(false)
2454    }
2455
2456    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2457        let compiled = self.get_compiled_path(path);
2458        let segments = compiled.segments();
2459
2460        if !self.registers[object_reg].is_object() {
2461            self.registers[object_reg] = json!({});
2462        }
2463
2464        let obj = self.registers[object_reg]
2465            .as_object_mut()
2466            .ok_or("Not an object")?;
2467
2468        let mut current = obj;
2469        for (i, segment) in segments.iter().enumerate() {
2470            if i == segments.len() - 1 {
2471                // Get current value (default to 0 if null/missing)
2472                let current_val = current
2473                    .get(segment)
2474                    .and_then(|v| {
2475                        if v.is_null() {
2476                            None
2477                        } else {
2478                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2479                        }
2480                    })
2481                    .unwrap_or(0);
2482
2483                let incremented = current_val + 1;
2484                current.insert(segment.to_string(), json!(incremented));
2485                return Ok(true);
2486            } else {
2487                current
2488                    .entry(segment.to_string())
2489                    .or_insert_with(|| json!({}));
2490                current = current
2491                    .get_mut(segment)
2492                    .and_then(|v| v.as_object_mut())
2493                    .ok_or("Path collision: expected object")?;
2494            }
2495        }
2496
2497        Ok(false)
2498    }
2499
2500    fn set_field_min(
2501        &mut self,
2502        object_reg: Register,
2503        path: &str,
2504        value_reg: Register,
2505    ) -> Result<bool> {
2506        let compiled = self.get_compiled_path(path);
2507        let segments = compiled.segments();
2508        let new_value = self.registers[value_reg].clone();
2509
2510        if !self.registers[object_reg].is_object() {
2511            self.registers[object_reg] = json!({});
2512        }
2513
2514        let obj = self.registers[object_reg]
2515            .as_object_mut()
2516            .ok_or("Not an object")?;
2517
2518        let mut current = obj;
2519        for (i, segment) in segments.iter().enumerate() {
2520            if i == segments.len() - 1 {
2521                let should_update = if let Some(current_value) = current.get(segment) {
2522                    if current_value.is_null() {
2523                        true
2524                    } else {
2525                        match (current_value.as_i64(), new_value.as_i64()) {
2526                            (Some(current_val), Some(new_val)) => new_val < current_val,
2527                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2528                                (new_value.as_u64().unwrap() as i64) < current_val
2529                            }
2530                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2531                                new_val < current_value.as_u64().unwrap() as i64
2532                            }
2533                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2534                                (Some(current_val), Some(new_val)) => new_val < current_val,
2535                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2536                                    (Some(current_val), Some(new_val)) => new_val < current_val,
2537                                    _ => false,
2538                                },
2539                            },
2540                            _ => false,
2541                        }
2542                    }
2543                } else {
2544                    true
2545                };
2546
2547                if should_update {
2548                    current.insert(segment.to_string(), new_value);
2549                    return Ok(true);
2550                }
2551                return Ok(false);
2552            } else {
2553                current
2554                    .entry(segment.to_string())
2555                    .or_insert_with(|| json!({}));
2556                current = current
2557                    .get_mut(segment)
2558                    .and_then(|v| v.as_object_mut())
2559                    .ok_or("Path collision: expected object")?;
2560            }
2561        }
2562
2563        Ok(false)
2564    }
2565
2566    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2567        let compiled = self.get_compiled_path(path);
2568        let segments = compiled.segments();
2569        let mut current = &self.registers[object_reg];
2570
2571        for segment in segments {
2572            current = current
2573                .get(segment)
2574                .ok_or_else(|| format!("Field not found: {}", segment))?;
2575        }
2576
2577        Ok(current.clone())
2578    }
2579
2580    fn append_to_array(
2581        &mut self,
2582        object_reg: Register,
2583        path: &str,
2584        value_reg: Register,
2585        max_length: usize,
2586    ) -> Result<()> {
2587        let compiled = self.get_compiled_path(path);
2588        let segments = compiled.segments();
2589        let value = self.registers[value_reg].clone();
2590
2591        if !self.registers[object_reg].is_object() {
2592            self.registers[object_reg] = json!({});
2593        }
2594
2595        let obj = self.registers[object_reg]
2596            .as_object_mut()
2597            .ok_or("Not an object")?;
2598
2599        let mut current = obj;
2600        for (i, segment) in segments.iter().enumerate() {
2601            if i == segments.len() - 1 {
2602                current
2603                    .entry(segment.to_string())
2604                    .or_insert_with(|| json!([]));
2605                let arr = current
2606                    .get_mut(segment)
2607                    .and_then(|v| v.as_array_mut())
2608                    .ok_or("Path is not an array")?;
2609                arr.push(value.clone());
2610
2611                if arr.len() > max_length {
2612                    let excess = arr.len() - max_length;
2613                    arr.drain(0..excess);
2614                }
2615            } else {
2616                current
2617                    .entry(segment.to_string())
2618                    .or_insert_with(|| json!({}));
2619                current = current
2620                    .get_mut(segment)
2621                    .and_then(|v| v.as_object_mut())
2622                    .ok_or("Path collision: expected object")?;
2623            }
2624        }
2625
2626        Ok(())
2627    }
2628
2629    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2630        let value = &self.registers[reg];
2631        let transformed = self.apply_transformation(value, transformation)?;
2632        self.registers[reg] = transformed;
2633        Ok(())
2634    }
2635
2636    fn apply_transformation(
2637        &self,
2638        value: &Value,
2639        transformation: &Transformation,
2640    ) -> Result<Value> {
2641        match transformation {
2642            Transformation::HexEncode => {
2643                if let Some(arr) = value.as_array() {
2644                    let bytes: Vec<u8> = arr
2645                        .iter()
2646                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2647                        .collect();
2648                    let hex = hex::encode(&bytes);
2649                    Ok(json!(hex))
2650                } else {
2651                    Err("HexEncode requires an array of numbers".into())
2652                }
2653            }
2654            Transformation::HexDecode => {
2655                if let Some(s) = value.as_str() {
2656                    let s = s.strip_prefix("0x").unwrap_or(s);
2657                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2658                    Ok(json!(bytes))
2659                } else {
2660                    Err("HexDecode requires a string".into())
2661                }
2662            }
2663            Transformation::Base58Encode => {
2664                if let Some(arr) = value.as_array() {
2665                    let bytes: Vec<u8> = arr
2666                        .iter()
2667                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2668                        .collect();
2669                    let encoded = bs58::encode(&bytes).into_string();
2670                    Ok(json!(encoded))
2671                } else if value.is_string() {
2672                    Ok(value.clone())
2673                } else {
2674                    Err("Base58Encode requires an array of numbers".into())
2675                }
2676            }
2677            Transformation::Base58Decode => {
2678                if let Some(s) = value.as_str() {
2679                    let bytes = bs58::decode(s)
2680                        .into_vec()
2681                        .map_err(|e| format!("Base58 decode error: {}", e))?;
2682                    Ok(json!(bytes))
2683                } else {
2684                    Err("Base58Decode requires a string".into())
2685                }
2686            }
2687            Transformation::ToString => Ok(json!(value.to_string())),
2688            Transformation::ToNumber => {
2689                if let Some(s) = value.as_str() {
2690                    let n = s
2691                        .parse::<i64>()
2692                        .map_err(|e| format!("Parse error: {}", e))?;
2693                    Ok(json!(n))
2694                } else {
2695                    Ok(value.clone())
2696                }
2697            }
2698        }
2699    }
2700
2701    fn evaluate_comparison(
2702        &self,
2703        field_value: &Value,
2704        op: &ComparisonOp,
2705        condition_value: &Value,
2706    ) -> Result<bool> {
2707        use ComparisonOp::*;
2708
2709        match op {
2710            Equal => Ok(field_value == condition_value),
2711            NotEqual => Ok(field_value != condition_value),
2712            GreaterThan => {
2713                // Try to compare as numbers
2714                match (field_value.as_i64(), condition_value.as_i64()) {
2715                    (Some(a), Some(b)) => Ok(a > b),
2716                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
2717                        (Some(a), Some(b)) => Ok(a > b),
2718                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
2719                            (Some(a), Some(b)) => Ok(a > b),
2720                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2721                        },
2722                    },
2723                }
2724            }
2725            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2726                (Some(a), Some(b)) => Ok(a >= b),
2727                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2728                    (Some(a), Some(b)) => Ok(a >= b),
2729                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2730                        (Some(a), Some(b)) => Ok(a >= b),
2731                        _ => {
2732                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2733                        }
2734                    },
2735                },
2736            },
2737            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2738                (Some(a), Some(b)) => Ok(a < b),
2739                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2740                    (Some(a), Some(b)) => Ok(a < b),
2741                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2742                        (Some(a), Some(b)) => Ok(a < b),
2743                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
2744                    },
2745                },
2746            },
2747            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2748                (Some(a), Some(b)) => Ok(a <= b),
2749                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2750                    (Some(a), Some(b)) => Ok(a <= b),
2751                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2752                        (Some(a), Some(b)) => Ok(a <= b),
2753                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2754                    },
2755                },
2756            },
2757        }
2758    }
2759
2760    fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
2761        if let Some(state) = self.states.get(&state_id) {
2762            if let Some(index) = state.lookup_indexes.get(index_name) {
2763                if index.lookup(value).is_some() {
2764                    return true;
2765                }
2766            }
2767
2768            for (name, index) in state.lookup_indexes.iter() {
2769                if name == index_name {
2770                    continue;
2771                }
2772                if index.lookup(value).is_some() {
2773                    return true;
2774                }
2775            }
2776
2777            if let Some(pda_str) = value.as_str() {
2778                if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
2779                    if pda_lookup.contains(pda_str) {
2780                        return true;
2781                    }
2782                }
2783            }
2784        }
2785
2786        false
2787    }
2788
2789    fn apply_deferred_when_op(
2790        &mut self,
2791        state_id: u32,
2792        op: &DeferredWhenOperation,
2793    ) -> Result<Vec<Mutation>> {
2794        let state = self.states.get(&state_id).ok_or("State not found")?;
2795
2796        if op.primary_key.is_null() {
2797            return Ok(vec![]);
2798        }
2799
2800        let mut entity_state = state
2801            .get_and_touch(&op.primary_key)
2802            .unwrap_or_else(|| json!({}));
2803
2804        Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
2805
2806        state.insert_with_eviction(op.primary_key.clone(), entity_state);
2807
2808        if !op.emit {
2809            return Ok(vec![]);
2810        }
2811
2812        let mut patch = json!({});
2813        Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
2814
2815        Ok(vec![Mutation {
2816            export: op.entity_name.clone(),
2817            key: op.primary_key.clone(),
2818            patch,
2819            append: vec![],
2820        }])
2821    }
2822
2823    fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
2824        let parts: Vec<&str> = path.split('.').collect();
2825        let mut current = obj;
2826
2827        for (i, part) in parts.iter().enumerate() {
2828            if i == parts.len() - 1 {
2829                if let Some(map) = current.as_object_mut() {
2830                    map.insert(part.to_string(), value);
2831                    return Ok(());
2832                }
2833                return Err("Cannot set field on non-object".into());
2834            }
2835
2836            if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
2837                if let Some(map) = current.as_object_mut() {
2838                    map.insert(part.to_string(), json!({}));
2839                }
2840            }
2841
2842            current = current.get_mut(*part).ok_or("Path navigation failed")?;
2843        }
2844
2845        Ok(())
2846    }
2847
2848    pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
2849        let now = std::time::SystemTime::now()
2850            .duration_since(std::time::UNIX_EPOCH)
2851            .unwrap()
2852            .as_secs() as i64;
2853
2854        let state = match self.states.get(&state_id) {
2855            Some(s) => s,
2856            None => return 0,
2857        };
2858
2859        let mut removed = 0;
2860        state.deferred_when_ops.retain(|_, ops| {
2861            let before = ops.len();
2862            ops.retain(|op| now - op.deferred_at < max_age_secs);
2863            removed += before - ops.len();
2864            !ops.is_empty()
2865        });
2866
2867        removed
2868    }
2869
2870    /// Update a PDA reverse lookup and return pending updates for reprocessing.
2871    /// Returns any pending account updates that were queued for this PDA.
2872    /// ```ignore
2873    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2874    /// for update in pending {
2875    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
2876    /// }
2877    /// ```
2878    #[cfg_attr(feature = "otel", instrument(
2879        name = "vm.update_pda_lookup",
2880        skip(self),
2881        fields(
2882            pda = %pda_address,
2883            seed = %seed_value,
2884        )
2885    ))]
2886    pub fn update_pda_reverse_lookup(
2887        &mut self,
2888        state_id: u32,
2889        lookup_name: &str,
2890        pda_address: String,
2891        seed_value: String,
2892    ) -> Result<Vec<PendingAccountUpdate>> {
2893        let state = self
2894            .states
2895            .get_mut(&state_id)
2896            .ok_or("State table not found")?;
2897
2898        let lookup = state
2899            .pda_reverse_lookups
2900            .entry(lookup_name.to_string())
2901            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2902
2903        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2904
2905        if let Some(ref evicted) = evicted_pda {
2906            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2907                let count = evicted_updates.len();
2908                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2909            }
2910        }
2911
2912        // Flush and return pending updates for this PDA
2913        self.flush_pending_updates(state_id, &pda_address)
2914    }
2915
2916    /// Clean up expired pending updates that are older than the TTL
2917    ///
2918    /// Returns the number of updates that were removed.
2919    /// This should be called periodically to prevent memory leaks from orphaned updates.
2920    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2921        let state = match self.states.get_mut(&state_id) {
2922            Some(s) => s,
2923            None => return 0,
2924        };
2925
2926        let now = std::time::SystemTime::now()
2927            .duration_since(std::time::UNIX_EPOCH)
2928            .unwrap()
2929            .as_secs() as i64;
2930
2931        let mut removed_count = 0;
2932
2933        // Iterate through all pending updates and remove expired ones
2934        state.pending_updates.retain(|_pda_address, updates| {
2935            let original_len = updates.len();
2936
2937            updates.retain(|update| {
2938                let age = now - update.queued_at;
2939                age <= PENDING_UPDATE_TTL_SECONDS
2940            });
2941
2942            removed_count += original_len - updates.len();
2943
2944            // Remove the entry entirely if no updates remain
2945            !updates.is_empty()
2946        });
2947
2948        // Update the global counter
2949        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2950
2951        if removed_count > 0 {
2952            #[cfg(feature = "otel")]
2953            crate::vm_metrics::record_pending_updates_expired(
2954                removed_count as u64,
2955                &state.entity_name,
2956            );
2957        }
2958
2959        removed_count
2960    }
2961
2962    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2963    ///
2964    /// # Workflow
2965    ///
2966    /// This implements a deferred processing pattern for account updates when the PDA reverse
2967    /// lookup needed to resolve the primary key is not yet available:
2968    ///
2969    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2970    ///    is not available, call `queue_account_update()` to queue it for later.
2971    ///
2972    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2973    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2974    ///
2975    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2976    ///    ```ignore
2977    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2978    ///    for update in pending {
2979    ///        let mutations = vm.process_event(
2980    ///            &bytecode, update.account_data, &update.account_type, None, None
2981    ///        )?;
2982    ///    }
2983    ///    ```
2984    ///
2985    /// # Arguments
2986    ///
2987    /// * `state_id` - The state table ID
2988    /// * `pda_address` - The PDA address that needs reverse lookup
2989    /// * `account_type` - The event type name for reprocessing
2990    /// * `account_data` - The account data (event value) for reprocessing
2991    /// * `slot` - The slot number when this update occurred
2992    /// * `signature` - The transaction signature
2993    #[cfg_attr(feature = "otel", instrument(
2994        name = "vm.queue_account_update",
2995        skip(self, update),
2996        fields(
2997            pda = %update.pda_address,
2998            account_type = %update.account_type,
2999            slot = update.slot,
3000        )
3001    ))]
3002    pub fn queue_account_update(
3003        &mut self,
3004        state_id: u32,
3005        update: QueuedAccountUpdate,
3006    ) -> Result<()> {
3007        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3008            self.cleanup_expired_pending_updates(state_id);
3009            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3010                self.drop_oldest_pending_update(state_id)?;
3011            }
3012        }
3013
3014        let state = self
3015            .states
3016            .get_mut(&state_id)
3017            .ok_or("State table not found")?;
3018
3019        let pending = PendingAccountUpdate {
3020            account_type: update.account_type,
3021            pda_address: update.pda_address.clone(),
3022            account_data: update.account_data,
3023            slot: update.slot,
3024            write_version: update.write_version,
3025            signature: update.signature,
3026            queued_at: std::time::SystemTime::now()
3027                .duration_since(std::time::UNIX_EPOCH)
3028                .unwrap()
3029                .as_secs() as i64,
3030        };
3031
3032        let pda_address = pending.pda_address.clone();
3033        let slot = pending.slot;
3034
3035        let mut updates = state
3036            .pending_updates
3037            .entry(pda_address.clone())
3038            .or_insert_with(Vec::new);
3039
3040        let original_len = updates.len();
3041        updates.retain(|existing| existing.slot > slot);
3042        let removed_by_dedup = original_len - updates.len();
3043
3044        if removed_by_dedup > 0 {
3045            self.pending_queue_size = self
3046                .pending_queue_size
3047                .saturating_sub(removed_by_dedup as u64);
3048        }
3049
3050        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3051            updates.remove(0);
3052            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3053        }
3054
3055        updates.push(pending);
3056        #[cfg(feature = "otel")]
3057        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3058
3059        Ok(())
3060    }
3061
3062    pub fn queue_instruction_event(
3063        &mut self,
3064        state_id: u32,
3065        event: QueuedInstructionEvent,
3066    ) -> Result<()> {
3067        let state = self
3068            .states
3069            .get_mut(&state_id)
3070            .ok_or("State table not found")?;
3071
3072        let pda_address = event.pda_address.clone();
3073
3074        let pending = PendingInstructionEvent {
3075            event_type: event.event_type,
3076            pda_address: event.pda_address,
3077            event_data: event.event_data,
3078            slot: event.slot,
3079            signature: event.signature,
3080            queued_at: std::time::SystemTime::now()
3081                .duration_since(std::time::UNIX_EPOCH)
3082                .unwrap()
3083                .as_secs() as i64,
3084        };
3085
3086        let mut events = state
3087            .pending_instruction_events
3088            .entry(pda_address)
3089            .or_insert_with(Vec::new);
3090
3091        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3092            events.remove(0);
3093        }
3094
3095        events.push(pending);
3096
3097        Ok(())
3098    }
3099
3100    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
3101        self.last_pda_lookup_miss.take()
3102    }
3103
3104    pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
3105        self.last_lookup_index_miss.take()
3106    }
3107
3108    pub fn take_last_pda_registered(&mut self) -> Option<String> {
3109        self.last_pda_registered.take()
3110    }
3111
3112    pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
3113        std::mem::take(&mut self.last_lookup_index_keys)
3114    }
3115
3116    pub fn flush_pending_instruction_events(
3117        &mut self,
3118        state_id: u32,
3119        pda_address: &str,
3120    ) -> Vec<PendingInstructionEvent> {
3121        let state = match self.states.get_mut(&state_id) {
3122            Some(s) => s,
3123            None => return Vec::new(),
3124        };
3125
3126        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
3127            events
3128        } else {
3129            Vec::new()
3130        }
3131    }
3132
3133    /// Get statistics about the pending queue for monitoring
3134    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
3135        let state = self.states.get(&state_id)?;
3136
3137        let now = std::time::SystemTime::now()
3138            .duration_since(std::time::UNIX_EPOCH)
3139            .unwrap()
3140            .as_secs() as i64;
3141
3142        let mut total_updates = 0;
3143        let mut oldest_timestamp = now;
3144        let mut largest_pda_queue = 0;
3145        let mut estimated_memory = 0;
3146
3147        for entry in state.pending_updates.iter() {
3148            let (_, updates) = entry.pair();
3149            total_updates += updates.len();
3150            largest_pda_queue = largest_pda_queue.max(updates.len());
3151
3152            for update in updates.iter() {
3153                oldest_timestamp = oldest_timestamp.min(update.queued_at);
3154                // Rough memory estimate
3155                estimated_memory += update.account_type.len() +
3156                                   update.pda_address.len() +
3157                                   update.signature.len() +
3158                                   16 + // slot + queued_at
3159                                   estimate_json_size(&update.account_data);
3160            }
3161        }
3162
3163        Some(PendingQueueStats {
3164            total_updates,
3165            unique_pdas: state.pending_updates.len(),
3166            oldest_age_seconds: now - oldest_timestamp,
3167            largest_pda_queue_size: largest_pda_queue,
3168            estimated_memory_bytes: estimated_memory,
3169        })
3170    }
3171
3172    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
3173        let mut stats = VmMemoryStats {
3174            path_cache_size: self.path_cache.len(),
3175            ..Default::default()
3176        };
3177
3178        if let Some(state) = self.states.get(&state_id) {
3179            stats.state_table_entity_count = state.data.len();
3180            stats.state_table_max_entries = state.config.max_entries;
3181            stats.state_table_at_capacity = state.is_at_capacity();
3182
3183            stats.lookup_index_count = state.lookup_indexes.len();
3184            stats.lookup_index_total_entries =
3185                state.lookup_indexes.values().map(|idx| idx.len()).sum();
3186
3187            stats.temporal_index_count = state.temporal_indexes.len();
3188            stats.temporal_index_total_entries = state
3189                .temporal_indexes
3190                .values()
3191                .map(|idx| idx.total_entries())
3192                .sum();
3193
3194            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
3195            stats.pda_reverse_lookup_total_entries = state
3196                .pda_reverse_lookups
3197                .values()
3198                .map(|lookup| lookup.len())
3199                .sum();
3200
3201            stats.version_tracker_entries = state.version_tracker.len();
3202
3203            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
3204        }
3205
3206        stats
3207    }
3208
3209    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
3210        let pending_removed = self.cleanup_expired_pending_updates(state_id);
3211        let temporal_removed = self.cleanup_temporal_indexes(state_id);
3212
3213        #[cfg(feature = "otel")]
3214        if let Some(state) = self.states.get(&state_id) {
3215            crate::vm_metrics::record_cleanup(
3216                pending_removed,
3217                temporal_removed,
3218                &state.entity_name,
3219            );
3220        }
3221
3222        CleanupResult {
3223            pending_updates_removed: pending_removed,
3224            temporal_entries_removed: temporal_removed,
3225        }
3226    }
3227
3228    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
3229        let state = match self.states.get_mut(&state_id) {
3230            Some(s) => s,
3231            None => return 0,
3232        };
3233
3234        let now = std::time::SystemTime::now()
3235            .duration_since(std::time::UNIX_EPOCH)
3236            .unwrap()
3237            .as_secs() as i64;
3238
3239        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
3240        let mut total_removed = 0;
3241
3242        for (_, index) in state.temporal_indexes.iter_mut() {
3243            total_removed += index.cleanup_expired(cutoff);
3244        }
3245
3246        total_removed
3247    }
3248
3249    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
3250        let state = self.states.get(&state_id)?;
3251
3252        if state.is_at_capacity() {
3253            Some(CapacityWarning {
3254                current_entries: state.data.len(),
3255                max_entries: state.config.max_entries,
3256                entries_over_limit: state.entries_over_limit(),
3257            })
3258        } else {
3259            None
3260        }
3261    }
3262
3263    /// Drop the oldest pending update across all PDAs
3264    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
3265        let state = self
3266            .states
3267            .get_mut(&state_id)
3268            .ok_or("State table not found")?;
3269
3270        let mut oldest_pda: Option<String> = None;
3271        let mut oldest_timestamp = i64::MAX;
3272
3273        // Find the PDA with the oldest update
3274        for entry in state.pending_updates.iter() {
3275            let (pda, updates) = entry.pair();
3276            if let Some(update) = updates.first() {
3277                if update.queued_at < oldest_timestamp {
3278                    oldest_timestamp = update.queued_at;
3279                    oldest_pda = Some(pda.clone());
3280                }
3281            }
3282        }
3283
3284        // Remove the oldest update
3285        if let Some(pda) = oldest_pda {
3286            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
3287                if !updates.is_empty() {
3288                    updates.remove(0);
3289                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3290
3291                    // Remove the entry if it's now empty
3292                    if updates.is_empty() {
3293                        drop(updates);
3294                        state.pending_updates.remove(&pda);
3295                    }
3296                }
3297            }
3298        }
3299
3300        Ok(())
3301    }
3302
3303    /// Flush and return pending updates for a PDA for external reprocessing
3304    ///
3305    /// Returns the pending updates that were queued for this PDA address.
3306    /// The caller should reprocess these through the VM using process_event().
3307    fn flush_pending_updates(
3308        &mut self,
3309        state_id: u32,
3310        pda_address: &str,
3311    ) -> Result<Vec<PendingAccountUpdate>> {
3312        let state = self
3313            .states
3314            .get_mut(&state_id)
3315            .ok_or("State table not found")?;
3316
3317        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
3318            let count = pending_updates.len();
3319            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3320            #[cfg(feature = "otel")]
3321            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
3322            Ok(pending_updates)
3323        } else {
3324            Ok(Vec::new())
3325        }
3326    }
3327
3328    /// Try to resolve a primary key via PDA reverse lookup
3329    pub fn try_pda_reverse_lookup(
3330        &mut self,
3331        state_id: u32,
3332        lookup_name: &str,
3333        pda_address: &str,
3334    ) -> Option<String> {
3335        let state = self.states.get_mut(&state_id)?;
3336
3337        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
3338            if let Some(value) = lookup.lookup(pda_address) {
3339                self.pda_cache_hits += 1;
3340                return Some(value);
3341            }
3342        }
3343
3344        self.pda_cache_misses += 1;
3345        None
3346    }
3347
3348    // ============================================================================
3349    // Computed Expression Evaluator (Task 5)
3350    // ============================================================================
3351
3352    /// Evaluate a computed expression AST against the current state
3353    /// This is the core runtime evaluator for computed fields from the AST
3354    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
3355        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
3356    }
3357
3358    /// Evaluate a computed expression with a variable environment (for let bindings)
3359    fn evaluate_computed_expr_with_env(
3360        &self,
3361        expr: &ComputedExpr,
3362        state: &Value,
3363        env: &std::collections::HashMap<String, Value>,
3364    ) -> Result<Value> {
3365        match expr {
3366            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
3367
3368            ComputedExpr::Var { name } => env
3369                .get(name)
3370                .cloned()
3371                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
3372
3373            ComputedExpr::Let { name, value, body } => {
3374                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
3375                let mut new_env = env.clone();
3376                new_env.insert(name.clone(), val);
3377                self.evaluate_computed_expr_with_env(body, state, &new_env)
3378            }
3379
3380            ComputedExpr::If {
3381                condition,
3382                then_branch,
3383                else_branch,
3384            } => {
3385                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
3386                if self.value_to_bool(&cond_val) {
3387                    self.evaluate_computed_expr_with_env(then_branch, state, env)
3388                } else {
3389                    self.evaluate_computed_expr_with_env(else_branch, state, env)
3390                }
3391            }
3392
3393            ComputedExpr::None => Ok(Value::Null),
3394
3395            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
3396
3397            ComputedExpr::Slice { expr, start, end } => {
3398                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3399                match val {
3400                    Value::Array(arr) => {
3401                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
3402                        Ok(Value::Array(slice))
3403                    }
3404                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
3405                }
3406            }
3407
3408            ComputedExpr::Index { expr, index } => {
3409                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3410                match val {
3411                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
3412                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
3413                }
3414            }
3415
3416            ComputedExpr::U64FromLeBytes { bytes } => {
3417                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3418                let byte_vec = self.value_to_bytes(&val)?;
3419                if byte_vec.len() < 8 {
3420                    return Err(format!(
3421                        "u64::from_le_bytes requires 8 bytes, got {}",
3422                        byte_vec.len()
3423                    )
3424                    .into());
3425                }
3426                let arr: [u8; 8] = byte_vec[..8]
3427                    .try_into()
3428                    .map_err(|_| "Failed to convert to [u8; 8]")?;
3429                Ok(json!(u64::from_le_bytes(arr)))
3430            }
3431
3432            ComputedExpr::U64FromBeBytes { bytes } => {
3433                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3434                let byte_vec = self.value_to_bytes(&val)?;
3435                if byte_vec.len() < 8 {
3436                    return Err(format!(
3437                        "u64::from_be_bytes requires 8 bytes, got {}",
3438                        byte_vec.len()
3439                    )
3440                    .into());
3441                }
3442                let arr: [u8; 8] = byte_vec[..8]
3443                    .try_into()
3444                    .map_err(|_| "Failed to convert to [u8; 8]")?;
3445                Ok(json!(u64::from_be_bytes(arr)))
3446            }
3447
3448            ComputedExpr::ByteArray { bytes } => {
3449                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3450            }
3451
3452            ComputedExpr::Closure { param, body } => {
3453                // Closures are stored as-is; they're evaluated when used in map()
3454                // Return a special representation
3455                Ok(json!({
3456                    "__closure": {
3457                        "param": param,
3458                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
3459                    }
3460                }))
3461            }
3462
3463            ComputedExpr::Unary { op, expr } => {
3464                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3465                self.apply_unary_op(op, &val)
3466            }
3467
3468            ComputedExpr::JsonToBytes { expr } => {
3469                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3470                // Convert JSON array of numbers to byte array
3471                let bytes = self.value_to_bytes(&val)?;
3472                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3473            }
3474
3475            ComputedExpr::UnwrapOr { expr, default } => {
3476                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3477                if val.is_null() {
3478                    Ok(default.clone())
3479                } else {
3480                    Ok(val)
3481                }
3482            }
3483
3484            ComputedExpr::Binary { op, left, right } => {
3485                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3486                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3487                self.apply_binary_op(op, &l, &r)
3488            }
3489
3490            ComputedExpr::Cast { expr, to_type } => {
3491                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3492                self.apply_cast(&val, to_type)
3493            }
3494
3495            ComputedExpr::MethodCall { expr, method, args } => {
3496                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3497                // Special handling for map() with closures
3498                if method == "map" && args.len() == 1 {
3499                    if let ComputedExpr::Closure { param, body } = &args[0] {
3500                        // If the value is null, return null (Option::None.map returns None)
3501                        if val.is_null() {
3502                            return Ok(Value::Null);
3503                        }
3504                        // Evaluate the closure body with the value bound to param
3505                        let mut closure_env = env.clone();
3506                        closure_env.insert(param.clone(), val);
3507                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3508                    }
3509                }
3510                let evaluated_args: Vec<Value> = args
3511                    .iter()
3512                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3513                    .collect::<Result<Vec<_>>>()?;
3514                self.apply_method_call(&val, method, &evaluated_args)
3515            }
3516
3517            ComputedExpr::Literal { value } => Ok(value.clone()),
3518
3519            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3520        }
3521    }
3522
3523    /// Convert a JSON value to a byte vector
3524    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3525        match val {
3526            Value::Array(arr) => arr
3527                .iter()
3528                .map(|v| {
3529                    v.as_u64()
3530                        .map(|n| n as u8)
3531                        .ok_or_else(|| "Array element not a valid byte".into())
3532                })
3533                .collect(),
3534            Value::String(s) => {
3535                // Try to decode as hex
3536                if s.starts_with("0x") || s.starts_with("0X") {
3537                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3538                } else {
3539                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3540                }
3541            }
3542            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3543        }
3544    }
3545
3546    /// Apply a unary operation
3547    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3548        use crate::ast::UnaryOp;
3549        match op {
3550            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3551            UnaryOp::ReverseBits => match val.as_u64() {
3552                Some(n) => Ok(json!(n.reverse_bits())),
3553                None => match val.as_i64() {
3554                    Some(n) => Ok(json!((n as u64).reverse_bits())),
3555                    None => Err("reverse_bits requires an integer".into()),
3556                },
3557            },
3558        }
3559    }
3560
3561    /// Get a field value from state by path (e.g., "section.field" or just "field")
3562    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3563        let segments: Vec<&str> = path.split('.').collect();
3564        let mut current = state;
3565
3566        for segment in segments {
3567            match current.get(segment) {
3568                Some(v) => current = v,
3569                None => return Ok(Value::Null),
3570            }
3571        }
3572
3573        Ok(current.clone())
3574    }
3575
3576    /// Apply a binary operation to two values
3577    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3578        match op {
3579            // Arithmetic operations
3580            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3581            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3582            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3583            BinaryOp::Div => {
3584                // Check for division by zero
3585                if let Some(r) = right.as_i64() {
3586                    if r == 0 {
3587                        return Err("Division by zero".into());
3588                    }
3589                }
3590                if let Some(r) = right.as_f64() {
3591                    if r == 0.0 {
3592                        return Err("Division by zero".into());
3593                    }
3594                }
3595                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3596            }
3597            BinaryOp::Mod => {
3598                // Modulo - only for integers
3599                match (left.as_i64(), right.as_i64()) {
3600                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3601                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3602                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3603                        _ => Err("Modulo requires non-zero integer operands".into()),
3604                    },
3605                    _ => Err("Modulo by zero".into()),
3606                }
3607            }
3608
3609            // Comparison operations
3610            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3611            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3612            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3613            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3614            BinaryOp::Eq => Ok(json!(left == right)),
3615            BinaryOp::Ne => Ok(json!(left != right)),
3616
3617            // Logical operations
3618            BinaryOp::And => {
3619                let l_bool = self.value_to_bool(left);
3620                let r_bool = self.value_to_bool(right);
3621                Ok(json!(l_bool && r_bool))
3622            }
3623            BinaryOp::Or => {
3624                let l_bool = self.value_to_bool(left);
3625                let r_bool = self.value_to_bool(right);
3626                Ok(json!(l_bool || r_bool))
3627            }
3628
3629            // Bitwise operations
3630            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3631                (Some(a), Some(b)) => Ok(json!(a ^ b)),
3632                _ => match (left.as_i64(), right.as_i64()) {
3633                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
3634                    _ => Err("XOR requires integer operands".into()),
3635                },
3636            },
3637            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3638                (Some(a), Some(b)) => Ok(json!(a & b)),
3639                _ => match (left.as_i64(), right.as_i64()) {
3640                    (Some(a), Some(b)) => Ok(json!(a & b)),
3641                    _ => Err("BitAnd requires integer operands".into()),
3642                },
3643            },
3644            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3645                (Some(a), Some(b)) => Ok(json!(a | b)),
3646                _ => match (left.as_i64(), right.as_i64()) {
3647                    (Some(a), Some(b)) => Ok(json!(a | b)),
3648                    _ => Err("BitOr requires integer operands".into()),
3649                },
3650            },
3651            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3652                (Some(a), Some(b)) => Ok(json!(a << b)),
3653                _ => match (left.as_i64(), right.as_i64()) {
3654                    (Some(a), Some(b)) => Ok(json!(a << b)),
3655                    _ => Err("Shl requires integer operands".into()),
3656                },
3657            },
3658            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3659                (Some(a), Some(b)) => Ok(json!(a >> b)),
3660                _ => match (left.as_i64(), right.as_i64()) {
3661                    (Some(a), Some(b)) => Ok(json!(a >> b)),
3662                    _ => Err("Shr requires integer operands".into()),
3663                },
3664            },
3665        }
3666    }
3667
3668    /// Helper for numeric operations that can work on integers or floats
3669    fn numeric_op<F1, F2>(
3670        &self,
3671        left: &Value,
3672        right: &Value,
3673        int_op: F1,
3674        float_op: F2,
3675    ) -> Result<Value>
3676    where
3677        F1: Fn(i64, i64) -> i64,
3678        F2: Fn(f64, f64) -> f64,
3679    {
3680        // Try i64 first
3681        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3682            return Ok(json!(int_op(a, b)));
3683        }
3684
3685        // Try u64
3686        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3687            // For u64, we need to be careful with underflow in subtraction
3688            return Ok(json!(int_op(a as i64, b as i64)));
3689        }
3690
3691        // Try f64
3692        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3693            return Ok(json!(float_op(a, b)));
3694        }
3695
3696        // If either is null, return null
3697        if left.is_null() || right.is_null() {
3698            return Ok(Value::Null);
3699        }
3700
3701        Err(format!(
3702            "Cannot perform numeric operation on {:?} and {:?}",
3703            left, right
3704        )
3705        .into())
3706    }
3707
3708    /// Helper for comparison operations
3709    fn comparison_op<F1, F2>(
3710        &self,
3711        left: &Value,
3712        right: &Value,
3713        int_cmp: F1,
3714        float_cmp: F2,
3715    ) -> Result<Value>
3716    where
3717        F1: Fn(i64, i64) -> bool,
3718        F2: Fn(f64, f64) -> bool,
3719    {
3720        // Try i64 first
3721        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3722            return Ok(json!(int_cmp(a, b)));
3723        }
3724
3725        // Try u64
3726        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3727            return Ok(json!(int_cmp(a as i64, b as i64)));
3728        }
3729
3730        // Try f64
3731        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3732            return Ok(json!(float_cmp(a, b)));
3733        }
3734
3735        // If either is null, comparison returns false
3736        if left.is_null() || right.is_null() {
3737            return Ok(json!(false));
3738        }
3739
3740        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3741    }
3742
3743    /// Convert a value to boolean for logical operations
3744    fn value_to_bool(&self, value: &Value) -> bool {
3745        match value {
3746            Value::Null => false,
3747            Value::Bool(b) => *b,
3748            Value::Number(n) => {
3749                if let Some(i) = n.as_i64() {
3750                    i != 0
3751                } else if let Some(f) = n.as_f64() {
3752                    f != 0.0
3753                } else {
3754                    true
3755                }
3756            }
3757            Value::String(s) => !s.is_empty(),
3758            Value::Array(arr) => !arr.is_empty(),
3759            Value::Object(obj) => !obj.is_empty(),
3760        }
3761    }
3762
3763    /// Apply a type cast to a value
3764    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3765        match to_type {
3766            "i8" | "i16" | "i32" | "i64" | "isize" => {
3767                if let Some(n) = value.as_i64() {
3768                    Ok(json!(n))
3769                } else if let Some(n) = value.as_u64() {
3770                    Ok(json!(n as i64))
3771                } else if let Some(n) = value.as_f64() {
3772                    Ok(json!(n as i64))
3773                } else if let Some(s) = value.as_str() {
3774                    s.parse::<i64>()
3775                        .map(|n| json!(n))
3776                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3777                } else {
3778                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3779                }
3780            }
3781            "u8" | "u16" | "u32" | "u64" | "usize" => {
3782                if let Some(n) = value.as_u64() {
3783                    Ok(json!(n))
3784                } else if let Some(n) = value.as_i64() {
3785                    Ok(json!(n as u64))
3786                } else if let Some(n) = value.as_f64() {
3787                    Ok(json!(n as u64))
3788                } else if let Some(s) = value.as_str() {
3789                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3790                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3791                    })
3792                } else {
3793                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3794                }
3795            }
3796            "f32" | "f64" => {
3797                if let Some(n) = value.as_f64() {
3798                    Ok(json!(n))
3799                } else if let Some(n) = value.as_i64() {
3800                    Ok(json!(n as f64))
3801                } else if let Some(n) = value.as_u64() {
3802                    Ok(json!(n as f64))
3803                } else if let Some(s) = value.as_str() {
3804                    s.parse::<f64>()
3805                        .map(|n| json!(n))
3806                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3807                } else {
3808                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3809                }
3810            }
3811            "String" | "string" => Ok(json!(value.to_string())),
3812            "bool" => Ok(json!(self.value_to_bool(value))),
3813            _ => {
3814                // Unknown type, return value as-is
3815                Ok(value.clone())
3816            }
3817        }
3818    }
3819
3820    /// Apply a method call to a value
3821    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3822        match method {
3823            "unwrap_or" => {
3824                if value.is_null() && !args.is_empty() {
3825                    Ok(args[0].clone())
3826                } else {
3827                    Ok(value.clone())
3828                }
3829            }
3830            "unwrap_or_default" => {
3831                if value.is_null() {
3832                    // Return default for common types
3833                    Ok(json!(0))
3834                } else {
3835                    Ok(value.clone())
3836                }
3837            }
3838            "is_some" => Ok(json!(!value.is_null())),
3839            "is_none" => Ok(json!(value.is_null())),
3840            "abs" => {
3841                if let Some(n) = value.as_i64() {
3842                    Ok(json!(n.abs()))
3843                } else if let Some(n) = value.as_f64() {
3844                    Ok(json!(n.abs()))
3845                } else {
3846                    Err(format!("Cannot call abs() on {:?}", value).into())
3847                }
3848            }
3849            "len" => {
3850                if let Some(s) = value.as_str() {
3851                    Ok(json!(s.len()))
3852                } else if let Some(arr) = value.as_array() {
3853                    Ok(json!(arr.len()))
3854                } else if let Some(obj) = value.as_object() {
3855                    Ok(json!(obj.len()))
3856                } else {
3857                    Err(format!("Cannot call len() on {:?}", value).into())
3858                }
3859            }
3860            "to_string" => Ok(json!(value.to_string())),
3861            "min" => {
3862                if args.is_empty() {
3863                    return Err("min() requires an argument".into());
3864                }
3865                let other = &args[0];
3866                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3867                    Ok(json!(a.min(b)))
3868                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3869                    Ok(json!(a.min(b)))
3870                } else {
3871                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3872                }
3873            }
3874            "max" => {
3875                if args.is_empty() {
3876                    return Err("max() requires an argument".into());
3877                }
3878                let other = &args[0];
3879                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3880                    Ok(json!(a.max(b)))
3881                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3882                    Ok(json!(a.max(b)))
3883                } else {
3884                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3885                }
3886            }
3887            "saturating_add" => {
3888                if args.is_empty() {
3889                    return Err("saturating_add() requires an argument".into());
3890                }
3891                let other = &args[0];
3892                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3893                    Ok(json!(a.saturating_add(b)))
3894                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3895                    Ok(json!(a.saturating_add(b)))
3896                } else {
3897                    Err(format!(
3898                        "Cannot call saturating_add() on {:?} and {:?}",
3899                        value, other
3900                    )
3901                    .into())
3902                }
3903            }
3904            "saturating_sub" => {
3905                if args.is_empty() {
3906                    return Err("saturating_sub() requires an argument".into());
3907                }
3908                let other = &args[0];
3909                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3910                    Ok(json!(a.saturating_sub(b)))
3911                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3912                    Ok(json!(a.saturating_sub(b)))
3913                } else {
3914                    Err(format!(
3915                        "Cannot call saturating_sub() on {:?} and {:?}",
3916                        value, other
3917                    )
3918                    .into())
3919                }
3920            }
3921            _ => Err(format!("Unknown method call: {}()", method).into()),
3922        }
3923    }
3924
3925    /// Evaluate all computed fields for an entity and update the state
3926    /// This takes a list of ComputedFieldSpec from the AST and applies them
3927    pub fn evaluate_computed_fields_from_ast(
3928        &self,
3929        state: &mut Value,
3930        computed_field_specs: &[ComputedFieldSpec],
3931    ) -> Result<Vec<String>> {
3932        let mut updated_paths = Vec::new();
3933
3934        for spec in computed_field_specs {
3935            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3936                self.set_field_in_state(state, &spec.target_path, result)?;
3937                updated_paths.push(spec.target_path.clone());
3938            }
3939        }
3940
3941        Ok(updated_paths)
3942    }
3943
3944    /// Set a field value in state by path (e.g., "section.field")
3945    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3946        let segments: Vec<&str> = path.split('.').collect();
3947
3948        if segments.is_empty() {
3949            return Err("Empty path".into());
3950        }
3951
3952        // Navigate to parent, creating intermediate objects as needed
3953        let mut current = state;
3954        for (i, segment) in segments.iter().enumerate() {
3955            if i == segments.len() - 1 {
3956                // Last segment - set the value
3957                if let Some(obj) = current.as_object_mut() {
3958                    obj.insert(segment.to_string(), value);
3959                    return Ok(());
3960                } else {
3961                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
3962                }
3963            } else {
3964                // Intermediate segment - navigate or create
3965                if !current.is_object() {
3966                    *current = json!({});
3967                }
3968                let obj = current.as_object_mut().unwrap();
3969                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3970            }
3971        }
3972
3973        Ok(())
3974    }
3975
3976    /// Create a computed fields evaluator closure from AST specs
3977    /// This returns a function that can be passed to the bytecode builder
3978    pub fn create_evaluator_from_specs(
3979        specs: Vec<ComputedFieldSpec>,
3980    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3981        move |state: &mut Value| {
3982            // Create a temporary VmContext just for evaluation
3983            // (We only need the expression evaluation methods)
3984            let vm = VmContext::new();
3985            vm.evaluate_computed_fields_from_ast(state, &specs)?;
3986            Ok(())
3987        }
3988    }
3989}
3990
3991impl Default for VmContext {
3992    fn default() -> Self {
3993        Self::new()
3994    }
3995}
3996
3997// Implement the ReverseLookupUpdater trait for VmContext
3998impl crate::resolvers::ReverseLookupUpdater for VmContext {
3999    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
4000        // Use default state_id=0 and default lookup name
4001        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
4002            .unwrap_or_else(|e| {
4003                tracing::error!("Failed to update PDA reverse lookup: {}", e);
4004                Vec::new()
4005            })
4006    }
4007
4008    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
4009        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
4010        self.flush_pending_updates(0, pda_address)
4011            .unwrap_or_else(|e| {
4012                tracing::error!("Failed to flush pending updates: {}", e);
4013                Vec::new()
4014            })
4015    }
4016}
4017
4018#[cfg(test)]
4019mod tests {
4020    use super::*;
4021    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
4022
4023    #[test]
4024    fn test_computed_field_preserves_integer_type() {
4025        let vm = VmContext::new();
4026
4027        let mut state = serde_json::json!({
4028            "trading": {
4029                "total_buy_volume": 20000000000_i64,
4030                "total_sell_volume": 17951316474_i64
4031            }
4032        });
4033
4034        let spec = ComputedFieldSpec {
4035            target_path: "trading.total_volume".to_string(),
4036            result_type: "Option<u64>".to_string(),
4037            expression: ComputedExpr::Binary {
4038                op: BinaryOp::Add,
4039                left: Box::new(ComputedExpr::UnwrapOr {
4040                    expr: Box::new(ComputedExpr::FieldRef {
4041                        path: "trading.total_buy_volume".to_string(),
4042                    }),
4043                    default: serde_json::json!(0),
4044                }),
4045                right: Box::new(ComputedExpr::UnwrapOr {
4046                    expr: Box::new(ComputedExpr::FieldRef {
4047                        path: "trading.total_sell_volume".to_string(),
4048                    }),
4049                    default: serde_json::json!(0),
4050                }),
4051            },
4052        };
4053
4054        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
4055            .unwrap();
4056
4057        let total_volume = state
4058            .get("trading")
4059            .and_then(|t| t.get("total_volume"))
4060            .expect("total_volume should exist");
4061
4062        let serialized = serde_json::to_string(total_volume).unwrap();
4063        assert!(
4064            !serialized.contains('.'),
4065            "Integer should not have decimal point: {}",
4066            serialized
4067        );
4068        assert_eq!(
4069            total_volume.as_i64(),
4070            Some(37951316474),
4071            "Value should be correct sum"
4072        );
4073    }
4074
4075    #[test]
4076    fn test_set_field_sum_preserves_integer_type() {
4077        let mut vm = VmContext::new();
4078        vm.registers[0] = serde_json::json!({});
4079        vm.registers[1] = serde_json::json!(20000000000_i64);
4080        vm.registers[2] = serde_json::json!(17951316474_i64);
4081
4082        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
4083        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
4084
4085        let state = &vm.registers[0];
4086        let buy_vol = state
4087            .get("trading")
4088            .and_then(|t| t.get("total_buy_volume"))
4089            .unwrap();
4090        let sell_vol = state
4091            .get("trading")
4092            .and_then(|t| t.get("total_sell_volume"))
4093            .unwrap();
4094
4095        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
4096        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
4097
4098        assert!(
4099            !buy_serialized.contains('.'),
4100            "Buy volume should not have decimal: {}",
4101            buy_serialized
4102        );
4103        assert!(
4104            !sell_serialized.contains('.'),
4105            "Sell volume should not have decimal: {}",
4106            sell_serialized
4107        );
4108    }
4109
4110    #[test]
4111    fn test_lookup_index_chaining() {
4112        let mut vm = VmContext::new();
4113
4114        let state = vm.states.get_mut(&0).unwrap();
4115
4116        state
4117            .pda_reverse_lookups
4118            .entry("default_pda_lookup".to_string())
4119            .or_insert_with(|| PdaReverseLookup::new(1000))
4120            .insert("pda_123".to_string(), "addr_456".to_string());
4121
4122        state
4123            .lookup_indexes
4124            .entry("round_address_lookup_index".to_string())
4125            .or_insert_with(LookupIndex::new)
4126            .insert(json!("addr_456"), json!(789));
4127
4128        let handler = vec![
4129            OpCode::LoadConstant {
4130                value: json!("pda_123"),
4131                dest: 0,
4132            },
4133            OpCode::LookupIndex {
4134                state_id: 0,
4135                index_name: "round_address_lookup_index".to_string(),
4136                lookup_value: 0,
4137                dest: 1,
4138            },
4139        ];
4140
4141        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4142            .unwrap();
4143
4144        assert_eq!(vm.registers[1], json!(789));
4145    }
4146
4147    #[test]
4148    fn test_lookup_index_no_chain() {
4149        let mut vm = VmContext::new();
4150
4151        let state = vm.states.get_mut(&0).unwrap();
4152        state
4153            .lookup_indexes
4154            .entry("test_index".to_string())
4155            .or_insert_with(LookupIndex::new)
4156            .insert(json!("key_abc"), json!(42));
4157
4158        let handler = vec![
4159            OpCode::LoadConstant {
4160                value: json!("key_abc"),
4161                dest: 0,
4162            },
4163            OpCode::LookupIndex {
4164                state_id: 0,
4165                index_name: "test_index".to_string(),
4166                lookup_value: 0,
4167                dest: 1,
4168            },
4169        ];
4170
4171        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4172            .unwrap();
4173
4174        assert_eq!(vm.registers[1], json!(42));
4175    }
4176
4177    #[test]
4178    fn test_conditional_set_field_with_zero_array() {
4179        let mut vm = VmContext::new();
4180
4181        let event_zeros = json!({
4182            "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4183                      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
4184        });
4185
4186        let event_nonzero = json!({
4187            "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
4188                      17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
4189        });
4190
4191        let zero_32: Value = json!([
4192            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4193            0, 0, 0
4194        ]);
4195
4196        let handler = vec![
4197            OpCode::CreateObject { dest: 2 },
4198            OpCode::LoadEventField {
4199                path: FieldPath::new(&["value"]),
4200                dest: 10,
4201                default: None,
4202            },
4203            OpCode::ConditionalSetField {
4204                object: 2,
4205                path: "captured_value".to_string(),
4206                value: 10,
4207                condition_field: FieldPath::new(&["value"]),
4208                condition_op: ComparisonOp::NotEqual,
4209                condition_value: zero_32,
4210            },
4211        ];
4212
4213        vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
4214            .unwrap();
4215        assert!(
4216            vm.registers[2].get("captured_value").is_none(),
4217            "Field should not be set when value is all zeros"
4218        );
4219
4220        vm.reset_registers();
4221        vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
4222            .unwrap();
4223        assert!(
4224            vm.registers[2].get("captured_value").is_some(),
4225            "Field should be set when value is non-zero"
4226        );
4227    }
4228
4229    #[test]
4230    fn test_when_instruction_arrives_first() {
4231        let mut vm = VmContext::new();
4232
4233        let signature = "test_sig_123".to_string();
4234
4235        {
4236            let state = vm.states.get(&0).unwrap();
4237            let mut cache = state.recent_tx_instructions.lock().unwrap();
4238            let mut set = HashSet::new();
4239            set.insert("RevealIxState".to_string());
4240            cache.put(signature.clone(), set);
4241        }
4242
4243        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4244
4245        let handler = vec![
4246            OpCode::CreateObject { dest: 2 },
4247            OpCode::LoadConstant {
4248                value: json!("primary_key_value"),
4249                dest: 1,
4250            },
4251            OpCode::LoadConstant {
4252                value: json!("the_revealed_value"),
4253                dest: 10,
4254            },
4255            OpCode::SetFieldWhen {
4256                object: 2,
4257                path: "entropy_value".to_string(),
4258                value: 10,
4259                when_instruction: "RevealIxState".to_string(),
4260                entity_name: "TestEntity".to_string(),
4261                key_reg: 1,
4262                condition_field: None,
4263                condition_op: None,
4264                condition_value: None,
4265            },
4266        ];
4267
4268        vm.execute_handler(
4269            &handler,
4270            &json!({}),
4271            "VarState",
4272            0,
4273            "TestEntity",
4274            None,
4275            None,
4276        )
4277        .unwrap();
4278
4279        assert_eq!(
4280            vm.registers[2].get("entropy_value").unwrap(),
4281            "the_revealed_value",
4282            "Field should be set when instruction was already seen"
4283        );
4284    }
4285
4286    #[test]
4287    fn test_when_account_arrives_first() {
4288        let mut vm = VmContext::new();
4289
4290        let signature = "test_sig_456".to_string();
4291
4292        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4293
4294        let handler = vec![
4295            OpCode::CreateObject { dest: 2 },
4296            OpCode::LoadConstant {
4297                value: json!("pk_123"),
4298                dest: 1,
4299            },
4300            OpCode::LoadConstant {
4301                value: json!("deferred_value"),
4302                dest: 10,
4303            },
4304            OpCode::SetFieldWhen {
4305                object: 2,
4306                path: "entropy_value".to_string(),
4307                value: 10,
4308                when_instruction: "RevealIxState".to_string(),
4309                entity_name: "TestEntity".to_string(),
4310                key_reg: 1,
4311                condition_field: None,
4312                condition_op: None,
4313                condition_value: None,
4314            },
4315        ];
4316
4317        vm.execute_handler(
4318            &handler,
4319            &json!({}),
4320            "VarState",
4321            0,
4322            "TestEntity",
4323            None,
4324            None,
4325        )
4326        .unwrap();
4327
4328        assert!(
4329            vm.registers[2].get("entropy_value").is_none(),
4330            "Field should not be set when instruction hasn't been seen"
4331        );
4332
4333        let state = vm.states.get(&0).unwrap();
4334        let key = (signature.clone(), "RevealIxState".to_string());
4335        assert!(
4336            state.deferred_when_ops.contains_key(&key),
4337            "Operation should be queued"
4338        );
4339
4340        {
4341            let mut cache = state.recent_tx_instructions.lock().unwrap();
4342            let mut set = HashSet::new();
4343            set.insert("RevealIxState".to_string());
4344            cache.put(signature.clone(), set);
4345        }
4346
4347        let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
4348        for op in deferred {
4349            vm.apply_deferred_when_op(0, &op).unwrap();
4350        }
4351
4352        let state = vm.states.get(&0).unwrap();
4353        let entity = state.data.get(&json!("pk_123")).unwrap();
4354        assert_eq!(
4355            entity.get("entropy_value").unwrap(),
4356            "deferred_value",
4357            "Field should be set after instruction arrives"
4358        );
4359    }
4360
4361    #[test]
4362    fn test_when_cleanup_expired() {
4363        let mut vm = VmContext::new();
4364
4365        let state = vm.states.get(&0).unwrap();
4366        let key = ("old_sig".to_string(), "SomeIxState".to_string());
4367        state.deferred_when_ops.insert(
4368            key,
4369            vec![DeferredWhenOperation {
4370                entity_name: "Test".to_string(),
4371                primary_key: json!("pk"),
4372                field_path: "field".to_string(),
4373                field_value: json!("value"),
4374                when_instruction: "SomeIxState".to_string(),
4375                signature: "old_sig".to_string(),
4376                slot: 0,
4377                deferred_at: 0,
4378                emit: true,
4379            }],
4380        );
4381
4382        let removed = vm.cleanup_expired_when_ops(0, 60);
4383
4384        assert_eq!(removed, 1, "Should have removed 1 expired op");
4385        assert!(
4386            vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
4387            "Deferred ops should be empty after cleanup"
4388        );
4389    }
4390}