Skip to main content

arete_interpreter/
vm.rs

1use crate::ast::{
2    self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3    ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::debugger::{VmDebugEvent, VmDebugger, VmLookupHop};
7use crate::Mutation;
8use dashmap::DashMap;
9use lru::LruCache;
10use once_cell::sync::Lazy;
11use serde_json::{json, Value};
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::num::NonZeroUsize;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17#[cfg(feature = "otel")]
18use tracing::instrument;
19/// Context metadata for blockchain updates (accounts and instructions)
20/// This structure is designed to be extended over time with additional metadata
21#[derive(Debug, Clone, Default)]
22pub struct UpdateContext {
23    /// Blockchain slot number
24    pub slot: Option<u64>,
25    /// Transaction signature
26    pub signature: Option<String>,
27    /// Unix timestamp (seconds since epoch)
28    /// If not provided, will default to current system time when accessed
29    pub timestamp: Option<i64>,
30    /// Write version for account updates (monotonically increasing per account within a slot)
31    /// Used for staleness detection to reject out-of-order updates
32    pub write_version: Option<u64>,
33    /// Transaction index for instruction updates (orders transactions within a slot)
34    /// Used for staleness detection to reject out-of-order updates
35    pub txn_index: Option<u64>,
36    /// When true, QueueResolver opcodes are skipped during handler execution.
37    /// Set for reprocessed cached data from PDA mapping changes to prevent
38    /// stale data from triggering resolvers or locking in wrong values via SetOnce.
39    pub skip_resolvers: bool,
40    /// Additional custom metadata that can be added without breaking changes
41    pub metadata: HashMap<String, Value>,
42}
43
44impl UpdateContext {
45    /// Create a new UpdateContext with slot and signature
46    pub fn new(slot: u64, signature: String) -> Self {
47        Self {
48            slot: Some(slot),
49            signature: Some(signature),
50            timestamp: None,
51            write_version: None,
52            txn_index: None,
53            skip_resolvers: false,
54            metadata: HashMap::new(),
55        }
56    }
57
58    /// Create a new UpdateContext with slot, signature, and timestamp
59    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
60        Self {
61            slot: Some(slot),
62            signature: Some(signature),
63            timestamp: Some(timestamp),
64            write_version: None,
65            txn_index: None,
66            skip_resolvers: false,
67            metadata: HashMap::new(),
68        }
69    }
70
71    /// Create context for account updates with write_version for staleness detection
72    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
73        Self {
74            slot: Some(slot),
75            signature: Some(signature),
76            timestamp: None,
77            write_version: Some(write_version),
78            txn_index: None,
79            skip_resolvers: false,
80            metadata: HashMap::new(),
81        }
82    }
83
84    /// Create context for instruction updates with txn_index for staleness detection
85    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
86        Self {
87            slot: Some(slot),
88            signature: Some(signature),
89            timestamp: None,
90            write_version: None,
91            txn_index: Some(txn_index),
92            skip_resolvers: false,
93            metadata: HashMap::new(),
94        }
95    }
96
97    /// Create context for reprocessed cached account data from PDA mapping changes.
98    /// Uses empty signature to prevent `when` guards from matching stale instructions,
99    /// and sets skip_resolvers to prevent stale scheduling/SetOnce lock-in.
100    pub fn new_reprocessed(slot: u64, write_version: u64) -> Self {
101        Self {
102            slot: Some(slot),
103            signature: None,
104            timestamp: None,
105            write_version: Some(write_version),
106            txn_index: None,
107            skip_resolvers: true,
108            metadata: HashMap::new(),
109        }
110    }
111
112    /// Get the timestamp, falling back to current system time if not set
113    pub fn timestamp(&self) -> i64 {
114        self.timestamp.unwrap_or_else(|| {
115            std::time::SystemTime::now()
116                .duration_since(std::time::UNIX_EPOCH)
117                .unwrap()
118                .as_secs() as i64
119        })
120    }
121
122    /// Create an empty context (for testing or when context is not available)
123    pub fn empty() -> Self {
124        Self::default()
125    }
126
127    /// Add custom metadata
128    /// Returns true if this is an account update context (has write_version, no txn_index)
129    pub fn is_account_update(&self) -> bool {
130        self.write_version.is_some() && self.txn_index.is_none()
131    }
132
133    /// Returns true if this is an instruction update context (has txn_index, no write_version)
134    pub fn is_instruction_update(&self) -> bool {
135        self.txn_index.is_some() && self.write_version.is_none()
136    }
137
138    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
139        self.metadata.insert(key, value);
140        self
141    }
142
143    /// Get metadata value
144    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
145        self.metadata.get(key)
146    }
147
148    /// Convert context to JSON value for injection into event data
149    pub fn to_value(&self) -> Value {
150        let mut obj = serde_json::Map::new();
151        if let Some(slot) = self.slot {
152            obj.insert("slot".to_string(), json!(slot));
153        }
154        if let Some(ref sig) = self.signature {
155            obj.insert("signature".to_string(), json!(sig));
156        }
157        // Always include timestamp (use current time if not set)
158        obj.insert("timestamp".to_string(), json!(self.timestamp()));
159        for (key, value) in &self.metadata {
160            obj.insert(key.clone(), value.clone());
161        }
162        Value::Object(obj)
163    }
164}
165
166pub type Register = usize;
167pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
168
169pub type RegisterValue = Value;
170
171/// Trait for evaluating computed fields
172/// Implement this in your generated spec to enable computed field evaluation
173pub trait ComputedFieldsEvaluator {
174    fn evaluate(&self, state: &mut Value) -> Result<()>;
175}
176
177// Pending queue configuration
178const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
179const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
180const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
181
182// Temporal index configuration - prevents unbounded history growth
183const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
184const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
185
186// State table configuration - aligned with downstream EntityCache (500 per view)
187const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
188const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
189
190const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
191
192const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
193
194// Smaller cache for instruction deduplication - provides shorter effective TTL
195// since we don't expect instruction duplicates to arrive much later
196const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
197
198const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
199
200const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
201
202const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 20_000;
203const DEFAULT_RESOLVER_CACHE_TTL_SECS: u64 = 3600; // 1 hour
204
205static RESOLVER_CACHE_CAPACITY: Lazy<NonZeroUsize> = Lazy::new(|| {
206    NonZeroUsize::new(
207        std::env::var("ARETE_RESOLVER_CACHE_CAPACITY")
208            .ok()
209            .and_then(|value| value.parse::<usize>().ok())
210            .filter(|value| *value > 0)
211            .unwrap_or(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES),
212    )
213    .expect("resolver cache capacity must be > 0")
214});
215
216static RESOLVER_CACHE_TTL: Lazy<Duration> = Lazy::new(|| {
217    let ttl_secs = match std::env::var("ARETE_RESOLVER_CACHE_TTL_SECS") {
218        Ok(value) => match value.parse::<u64>() {
219            Ok(0) => {
220                tracing::warn!(
221                    default_ttl_secs = DEFAULT_RESOLVER_CACHE_TTL_SECS,
222                    "ARETE_RESOLVER_CACHE_TTL_SECS=0 is not supported; using default"
223                );
224                DEFAULT_RESOLVER_CACHE_TTL_SECS
225            }
226            Ok(value) => value,
227            Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
228        },
229        Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
230    };
231
232    Duration::from_secs(ttl_secs)
233});
234
235#[derive(Debug, Clone)]
236struct ResolverCacheEntry {
237    value: Value,
238    cached_at: Instant,
239}
240
241fn resolver_cache_capacity() -> NonZeroUsize {
242    *RESOLVER_CACHE_CAPACITY
243}
244
245fn resolver_cache_ttl() -> Duration {
246    *RESOLVER_CACHE_TTL
247}
248
249/// Estimate the size of a JSON value in bytes
250fn estimate_json_size(value: &Value) -> usize {
251    match value {
252        Value::Null => 4,
253        Value::Bool(_) => 5,
254        Value::Number(_) => 8,
255        Value::String(s) => s.len() + 2,
256        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
257        Value::Object(obj) => {
258            2 + obj
259                .iter()
260                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
261                .sum::<usize>()
262        }
263    }
264}
265
266#[derive(Debug, Clone)]
267pub struct CompiledPath {
268    pub segments: std::sync::Arc<[String]>,
269}
270
271impl CompiledPath {
272    pub fn new(path: &str) -> Self {
273        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
274        CompiledPath {
275            segments: segments.into(),
276        }
277    }
278
279    fn segments(&self) -> &[String] {
280        &self.segments
281    }
282}
283
284/// Represents the type of change made to a field for granular dirty tracking.
285/// This enables emitting only the actual changes rather than entire field values.
286#[derive(Debug, Clone)]
287pub enum FieldChange {
288    /// Field was replaced with a new value (emit the full value from state)
289    Replaced,
290    /// Items were appended to an array field (emit only the new items)
291    Appended(Vec<Value>),
292}
293
294/// Tracks field modifications during handler execution with granular change information.
295/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
296#[derive(Debug, Clone, Default)]
297pub struct DirtyTracker {
298    changes: HashMap<String, FieldChange>,
299}
300
301impl DirtyTracker {
302    /// Create a new empty DirtyTracker
303    pub fn new() -> Self {
304        Self {
305            changes: HashMap::new(),
306        }
307    }
308
309    /// Mark a field as replaced (full value will be emitted)
310    pub fn mark_replaced(&mut self, path: &str) {
311        // If there was an append, it's now superseded by a full replacement
312        self.changes.insert(path.to_string(), FieldChange::Replaced);
313    }
314
315    /// Record an appended value for a field
316    pub fn mark_appended(&mut self, path: &str, value: Value) {
317        match self.changes.get_mut(path) {
318            Some(FieldChange::Appended(values)) => {
319                // Add to existing appended values
320                values.push(value);
321            }
322            Some(FieldChange::Replaced) => {
323                // Field was already replaced, keep it as replaced
324                // (the full value including the append will be emitted)
325            }
326            None => {
327                // First append to this field
328                self.changes
329                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
330            }
331        }
332    }
333
334    /// Check if there are any changes tracked
335    pub fn is_empty(&self) -> bool {
336        self.changes.is_empty()
337    }
338
339    /// Get the number of changed fields
340    pub fn len(&self) -> usize {
341        self.changes.len()
342    }
343
344    /// Iterate over all changes
345    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
346        self.changes.iter()
347    }
348
349    /// Get a set of all dirty field paths (for backward compatibility)
350    pub fn dirty_paths(&self) -> HashSet<String> {
351        self.changes.keys().cloned().collect()
352    }
353
354    /// Consume the tracker and return the changes map
355    pub fn into_changes(self) -> HashMap<String, FieldChange> {
356        self.changes
357    }
358
359    /// Get a reference to the changes map
360    pub fn changes(&self) -> &HashMap<String, FieldChange> {
361        &self.changes
362    }
363
364    /// Get paths that were appended (not replaced)
365    pub fn appended_paths(&self) -> Vec<String> {
366        self.changes
367            .iter()
368            .filter_map(|(path, change)| match change {
369                FieldChange::Appended(_) => Some(path.clone()),
370                FieldChange::Replaced => None,
371            })
372            .collect()
373    }
374}
375
376pub struct VmContext {
377    registers: Vec<RegisterValue>,
378    states: HashMap<u32, StateTable>,
379    debugger: Option<Arc<dyn VmDebugger>>,
380    pub instructions_executed: u64,
381    pub cache_hits: u64,
382    path_cache: HashMap<String, CompiledPath>,
383    pub pda_cache_hits: u64,
384    pub pda_cache_misses: u64,
385    pub pending_queue_size: u64,
386    resolver_requests: VecDeque<ResolverRequest>,
387    resolver_pending: HashMap<String, PendingResolverEntry>,
388    resolver_cache: LruCache<String, ResolverCacheEntry>,
389    pub resolver_cache_hits: u64,
390    pub resolver_cache_misses: u64,
391    current_context: Option<UpdateContext>,
392    warnings: Vec<String>,
393    last_pda_lookup_miss: Option<String>,
394    last_lookup_index_miss: Option<String>,
395    last_pda_registered: Option<String>,
396    last_lookup_index_keys: Vec<String>,
397    pending_pda_reprocess_updates: Vec<PendingAccountUpdate>,
398    scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
399}
400
401#[derive(Debug)]
402pub struct LookupIndex {
403    index: std::sync::Mutex<LruCache<String, Value>>,
404}
405
406impl LookupIndex {
407    pub fn new() -> Self {
408        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
409    }
410
411    pub fn with_capacity(capacity: usize) -> Self {
412        LookupIndex {
413            index: std::sync::Mutex::new(LruCache::new(
414                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
415            )),
416        }
417    }
418
419    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
420        let key = value_to_cache_key(lookup_value);
421        self.index.lock().unwrap().get(&key).cloned()
422    }
423
424    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
425        let key = value_to_cache_key(&lookup_value);
426        self.index.lock().unwrap().put(key, primary_key);
427    }
428
429    /// Remove an entry by lookup value.  Used when a PDA mapping changes to
430    /// clear stale entries that would otherwise shadow the updated PDA mapping.
431    pub fn remove(&self, lookup_value: &Value) {
432        let key = value_to_cache_key(lookup_value);
433        self.index.lock().unwrap().pop(&key);
434    }
435
436    pub fn len(&self) -> usize {
437        self.index.lock().unwrap().len()
438    }
439
440    pub fn is_empty(&self) -> bool {
441        self.index.lock().unwrap().is_empty()
442    }
443}
444
445impl Default for LookupIndex {
446    fn default() -> Self {
447        Self::new()
448    }
449}
450
451fn value_to_cache_key(value: &Value) -> String {
452    match value {
453        Value::String(s) => s.clone(),
454        Value::Number(n) => n.to_string(),
455        Value::Bool(b) => b.to_string(),
456        Value::Null => "null".to_string(),
457        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
458    }
459}
460
461pub(crate) fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
462    match resolver {
463        ResolverType::Token => format!("token:{}", value_to_cache_key(input)),
464        ResolverType::Url(config) => {
465            let method = match config.method {
466                ast::HttpMethod::Get => "get",
467                ast::HttpMethod::Post => "post",
468            };
469            format!("url:{}:{}", method, value_to_cache_key(input))
470        }
471    }
472}
473
474#[derive(Debug)]
475pub struct TemporalIndex {
476    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
477}
478
479impl Default for TemporalIndex {
480    fn default() -> Self {
481        Self::new()
482    }
483}
484
485impl TemporalIndex {
486    pub fn new() -> Self {
487        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
488    }
489
490    pub fn with_capacity(capacity: usize) -> Self {
491        TemporalIndex {
492            index: std::sync::Mutex::new(LruCache::new(
493                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
494            )),
495        }
496    }
497
498    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
499        let key = value_to_cache_key(lookup_value);
500        let mut cache = self.index.lock().unwrap();
501        if let Some(entries) = cache.get(&key) {
502            for i in (0..entries.len()).rev() {
503                if entries[i].1 <= timestamp {
504                    return Some(entries[i].0.clone());
505                }
506            }
507        }
508        None
509    }
510
511    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
512        let key = value_to_cache_key(lookup_value);
513        let mut cache = self.index.lock().unwrap();
514        if let Some(entries) = cache.get(&key) {
515            if let Some(last) = entries.last() {
516                return Some(last.0.clone());
517            }
518        }
519        None
520    }
521
522    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
523        let key = value_to_cache_key(&lookup_value);
524        let mut cache = self.index.lock().unwrap();
525
526        let entries = cache.get_or_insert_mut(key, Vec::new);
527        entries.push((primary_key, timestamp));
528        entries.sort_by_key(|(_, ts)| *ts);
529
530        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
531        entries.retain(|(_, ts)| *ts >= cutoff);
532
533        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
534            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
535            entries.drain(0..excess);
536        }
537    }
538
539    pub fn len(&self) -> usize {
540        self.index.lock().unwrap().len()
541    }
542
543    pub fn is_empty(&self) -> bool {
544        self.index.lock().unwrap().is_empty()
545    }
546
547    pub fn total_entries(&self) -> usize {
548        self.index
549            .lock()
550            .unwrap()
551            .iter()
552            .map(|(_, entries)| entries.len())
553            .sum()
554    }
555
556    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
557        let mut cache = self.index.lock().unwrap();
558        let mut total_removed = 0;
559
560        for (_, entries) in cache.iter_mut() {
561            let original_len = entries.len();
562            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
563            total_removed += original_len - entries.len();
564        }
565
566        total_removed
567    }
568}
569
570#[derive(Debug)]
571pub struct PdaReverseLookup {
572    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
573    index: LruCache<String, String>,
574}
575
576impl PdaReverseLookup {
577    pub fn new(capacity: usize) -> Self {
578        PdaReverseLookup {
579            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
580        }
581    }
582
583    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
584        self.index.get(pda_address).cloned()
585    }
586
587    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
588        let evicted = if self.index.len() >= self.index.cap().get() {
589            self.index.peek_lru().map(|(k, _)| k.clone())
590        } else {
591            None
592        };
593
594        self.index.put(pda_address, seed_value);
595        evicted
596    }
597
598    pub fn len(&self) -> usize {
599        self.index.len()
600    }
601
602    pub fn is_empty(&self) -> bool {
603        self.index.is_empty()
604    }
605
606    pub fn contains(&self, pda_address: &str) -> bool {
607        self.index.peek(pda_address).is_some()
608    }
609}
610
611/// Input for queueing an account update.
612#[derive(Debug, Clone)]
613pub struct QueuedAccountUpdate {
614    pub pda_address: String,
615    pub account_type: String,
616    pub account_data: Value,
617    pub slot: u64,
618    pub write_version: u64,
619    pub signature: String,
620}
621
622/// Internal representation of a pending account update with queue metadata.
623#[derive(Debug, Clone)]
624pub struct PendingAccountUpdate {
625    pub account_type: String,
626    pub pda_address: String,
627    pub account_data: Value,
628    pub slot: u64,
629    pub write_version: u64,
630    pub signature: String,
631    pub queued_at: i64,
632    /// When true, this update was pulled from the `last_account_data` cache
633    /// during a PDA mapping change. It carries stale data from a previous
634    /// entity mapping and should use `UpdateContext::new_reprocessed` to
635    /// prevent `when` guards from matching stale instruction signatures
636    /// and to skip resolver scheduling.
637    pub is_stale_reprocess: bool,
638}
639
640/// Input for queueing an instruction event when PDA lookup fails.
641#[derive(Debug, Clone)]
642pub struct QueuedInstructionEvent {
643    pub pda_address: String,
644    pub event_type: String,
645    pub event_data: Value,
646    pub slot: u64,
647    pub signature: String,
648}
649
650/// Internal representation of a pending instruction event with queue metadata.
651#[derive(Debug, Clone)]
652pub struct PendingInstructionEvent {
653    pub event_type: String,
654    pub pda_address: String,
655    pub event_data: Value,
656    pub slot: u64,
657    pub signature: String,
658    pub queued_at: i64,
659}
660
661#[derive(Debug, Clone)]
662pub struct DeferredWhenOperation {
663    pub entity_name: String,
664    pub primary_key: Value,
665    pub field_path: String,
666    pub field_value: Value,
667    pub when_instruction: String,
668    pub signature: String,
669    pub slot: u64,
670    pub deferred_at: i64,
671    pub emit: bool,
672}
673
674#[derive(Debug, Clone)]
675pub struct ResolverRequest {
676    pub cache_key: String,
677    pub resolver: ResolverType,
678    pub input: Value,
679}
680
681#[derive(Debug, Clone)]
682pub struct ResolverTarget {
683    pub state_id: u32,
684    pub entity_name: String,
685    pub primary_key: Value,
686    pub extracts: Vec<ResolverExtractSpec>,
687}
688
689#[derive(Debug, Clone)]
690pub struct PendingResolverEntry {
691    pub resolver: ResolverType,
692    pub input: Value,
693    pub targets: Vec<ResolverTarget>,
694    pub queued_at: i64,
695}
696
697#[derive(Debug, Clone, PartialEq)]
698pub struct ScheduledCallback {
699    pub state_id: u32,
700    pub entity_name: String,
701    pub primary_key: Value,
702    pub resolver: ResolverType,
703    pub url_template: Option<Vec<ast::UrlTemplatePart>>,
704    pub input_value: Option<Value>,
705    pub input_path: Option<String>,
706    pub condition: Option<ast::ResolverCondition>,
707    pub strategy: ResolveStrategy,
708    pub extracts: Vec<ResolverExtractSpec>,
709    pub retry_count: u32,
710}
711
712impl PendingResolverEntry {
713    fn add_target(&mut self, target: ResolverTarget) {
714        if let Some(existing) = self.targets.iter_mut().find(|t| {
715            t.state_id == target.state_id
716                && t.entity_name == target.entity_name
717                && t.primary_key == target.primary_key
718        }) {
719            let mut seen = HashSet::new();
720            for extract in &existing.extracts {
721                seen.insert((extract.target_path.clone(), extract.source_path.clone()));
722            }
723
724            for extract in target.extracts {
725                let key = (extract.target_path.clone(), extract.source_path.clone());
726                if seen.insert(key) {
727                    existing.extracts.push(extract);
728                }
729            }
730        } else {
731            self.targets.push(target);
732        }
733    }
734}
735
736#[derive(Debug, Clone)]
737pub struct PendingQueueStats {
738    pub total_updates: usize,
739    pub unique_pdas: usize,
740    pub oldest_age_seconds: i64,
741    pub largest_pda_queue_size: usize,
742    pub estimated_memory_bytes: usize,
743}
744
745#[derive(Debug, Clone, Default)]
746pub struct VmMemoryStats {
747    pub state_table_entity_count: usize,
748    pub state_table_max_entries: usize,
749    pub state_table_at_capacity: bool,
750    pub lookup_index_count: usize,
751    pub lookup_index_total_entries: usize,
752    pub temporal_index_count: usize,
753    pub temporal_index_total_entries: usize,
754    pub pda_reverse_lookup_count: usize,
755    pub pda_reverse_lookup_total_entries: usize,
756    pub version_tracker_entries: usize,
757    pub pending_queue_stats: Option<PendingQueueStats>,
758    pub path_cache_size: usize,
759}
760
761#[derive(Debug, Clone, Default)]
762pub struct CleanupResult {
763    pub pending_updates_removed: usize,
764    pub temporal_entries_removed: usize,
765}
766
767#[derive(Debug, Clone)]
768pub struct CapacityWarning {
769    pub current_entries: usize,
770    pub max_entries: usize,
771    pub entries_over_limit: usize,
772}
773
774#[derive(Debug, Clone)]
775pub struct StateTableConfig {
776    pub max_entries: usize,
777    pub max_array_length: usize,
778}
779
780impl Default for StateTableConfig {
781    fn default() -> Self {
782        Self {
783            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
784            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
785        }
786    }
787}
788
789#[derive(Debug)]
790pub struct VersionTracker {
791    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
792}
793
794impl VersionTracker {
795    pub fn new() -> Self {
796        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
797    }
798
799    pub fn with_capacity(capacity: usize) -> Self {
800        VersionTracker {
801            cache: std::sync::Mutex::new(LruCache::new(
802                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
803            )),
804        }
805    }
806
807    fn make_key(primary_key: &Value, event_type: &str) -> String {
808        format!("{}:{}", primary_key, event_type)
809    }
810
811    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
812        let key = Self::make_key(primary_key, event_type);
813        self.cache.lock().unwrap().get(&key).copied()
814    }
815
816    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
817        let key = Self::make_key(primary_key, event_type);
818        self.cache.lock().unwrap().put(key, (slot, ordering_value));
819    }
820
821    pub fn len(&self) -> usize {
822        self.cache.lock().unwrap().len()
823    }
824
825    pub fn is_empty(&self) -> bool {
826        self.cache.lock().unwrap().is_empty()
827    }
828}
829
830impl Default for VersionTracker {
831    fn default() -> Self {
832        Self::new()
833    }
834}
835
836#[derive(Debug)]
837pub struct StateTable {
838    pub data: DashMap<Value, Value>,
839    access_times: DashMap<Value, i64>,
840    pub lookup_indexes: HashMap<String, LookupIndex>,
841    pub temporal_indexes: HashMap<String, TemporalIndex>,
842    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
843    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
844    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
845    /// Cache of the most recent account data per PDA address.  When a PDA
846    /// mapping changes (same PDA, different seed) the cached data is returned
847    /// for reprocessing so cross-account Lookup handlers resolve to the new key.
848    pub last_account_data: DashMap<String, PendingAccountUpdate>,
849    version_tracker: VersionTracker,
850    instruction_dedup_cache: VersionTracker,
851    config: StateTableConfig,
852    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
853    entity_name: String,
854    pub recent_tx_instructions:
855        std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
856    pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
857}
858
859impl StateTable {
860    pub fn is_at_capacity(&self) -> bool {
861        self.data.len() >= self.config.max_entries
862    }
863
864    pub fn entries_over_limit(&self) -> usize {
865        self.data.len().saturating_sub(self.config.max_entries)
866    }
867
868    pub fn max_array_length(&self) -> usize {
869        self.config.max_array_length
870    }
871
872    fn touch(&self, key: &Value) {
873        let now = std::time::SystemTime::now()
874            .duration_since(std::time::UNIX_EPOCH)
875            .unwrap()
876            .as_secs() as i64;
877        self.access_times.insert(key.clone(), now);
878    }
879
880    fn evict_lru(&self, count: usize) -> usize {
881        if count == 0 || self.data.is_empty() {
882            return 0;
883        }
884
885        let mut entries: Vec<(Value, i64)> = self
886            .access_times
887            .iter()
888            .map(|entry| (entry.key().clone(), *entry.value()))
889            .collect();
890
891        entries.sort_by_key(|(_, ts)| *ts);
892
893        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
894
895        let mut evicted = 0;
896        for key in to_evict {
897            self.data.remove(&key);
898            self.access_times.remove(&key);
899            evicted += 1;
900        }
901
902        #[cfg(feature = "otel")]
903        if evicted > 0 {
904            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
905        }
906
907        evicted
908    }
909
910    pub fn insert_with_eviction(&self, key: Value, value: Value) {
911        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
912            #[cfg(feature = "otel")]
913            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
914            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
915            self.evict_lru(to_evict.max(1));
916        }
917        self.data.insert(key.clone(), value);
918        self.touch(&key);
919    }
920
921    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
922        let result = self.data.get(key).map(|v| v.clone());
923        if result.is_some() {
924            self.touch(key);
925        }
926        result
927    }
928
929    /// Check if an update is fresh and update the version tracker.
930    /// Returns true if the update should be processed (is fresh).
931    /// Returns false if the update is stale and should be skipped.
932    ///
933    /// Comparison is lexicographic on (slot, ordering_value):
934    /// (100, 5) > (100, 3) > (99, 999)
935    pub fn is_fresh_update(
936        &self,
937        primary_key: &Value,
938        event_type: &str,
939        slot: u64,
940        ordering_value: u64,
941    ) -> bool {
942        let dominated = self
943            .version_tracker
944            .get(primary_key, event_type)
945            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
946            .unwrap_or(false);
947
948        if dominated {
949            return false;
950        }
951
952        self.version_tracker
953            .insert(primary_key, event_type, slot, ordering_value);
954        true
955    }
956
957    /// Check if an instruction is a duplicate of one we've seen recently.
958    /// Returns true if this exact instruction has been seen before (is a duplicate).
959    /// Returns false if this is a new instruction that should be processed.
960    ///
961    /// Unlike account updates, instructions don't use recency checks - all
962    /// unique instructions are processed. Only exact duplicates are skipped.
963    /// Uses a smaller cache capacity for shorter effective TTL.
964    pub fn is_duplicate_instruction(
965        &self,
966        primary_key: &Value,
967        event_type: &str,
968        slot: u64,
969        txn_index: u64,
970    ) -> bool {
971        // Check if we've seen this exact instruction before
972        let is_duplicate = self
973            .instruction_dedup_cache
974            .get(primary_key, event_type)
975            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
976            .unwrap_or(false);
977
978        if is_duplicate {
979            return true;
980        }
981
982        // Record this instruction for deduplication
983        self.instruction_dedup_cache
984            .insert(primary_key, event_type, slot, txn_index);
985        false
986    }
987}
988
989impl VmContext {
990    pub fn new() -> Self {
991        let mut vm = VmContext {
992            registers: vec![Value::Null; 256],
993            states: HashMap::new(),
994            debugger: None,
995            instructions_executed: 0,
996            cache_hits: 0,
997            path_cache: HashMap::new(),
998            pda_cache_hits: 0,
999            pda_cache_misses: 0,
1000            pending_queue_size: 0,
1001            resolver_requests: VecDeque::new(),
1002            resolver_pending: HashMap::new(),
1003            resolver_cache: LruCache::new(resolver_cache_capacity()),
1004            resolver_cache_hits: 0,
1005            resolver_cache_misses: 0,
1006            current_context: None,
1007            warnings: Vec::new(),
1008            last_pda_lookup_miss: None,
1009            last_lookup_index_miss: None,
1010            last_pda_registered: None,
1011            last_lookup_index_keys: Vec::new(),
1012            pending_pda_reprocess_updates: Vec::new(),
1013            scheduled_callbacks: Vec::new(),
1014        };
1015        vm.states.insert(
1016            0,
1017            StateTable {
1018                data: DashMap::new(),
1019                access_times: DashMap::new(),
1020                lookup_indexes: HashMap::new(),
1021                temporal_indexes: HashMap::new(),
1022                pda_reverse_lookups: HashMap::new(),
1023                pending_updates: DashMap::new(),
1024                pending_instruction_events: DashMap::new(),
1025                last_account_data: DashMap::new(),
1026                version_tracker: VersionTracker::new(),
1027                instruction_dedup_cache: VersionTracker::with_capacity(
1028                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1029                ),
1030                config: StateTableConfig::default(),
1031                entity_name: String::new(),
1032                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1033                    NonZeroUsize::new(1000).unwrap(),
1034                )),
1035                deferred_when_ops: DashMap::new(),
1036            },
1037        );
1038
1039        vm
1040    }
1041
1042    pub fn set_debugger(&mut self, debugger: Arc<dyn VmDebugger>) {
1043        self.debugger = Some(debugger);
1044    }
1045
1046    pub fn clear_debugger(&mut self) {
1047        self.debugger = None;
1048    }
1049
1050    fn emit_debug<F>(&self, builder: F)
1051    where
1052        F: FnOnce() -> VmDebugEvent,
1053    {
1054        if let Some(debugger) = &self.debugger {
1055            debugger.record(builder());
1056        }
1057    }
1058
1059    fn is_debug_enabled(&self) -> bool {
1060        self.debugger.is_some()
1061    }
1062
1063    fn take_pending_pda_reprocess_updates(&mut self) -> Vec<PendingAccountUpdate> {
1064        std::mem::take(&mut self.pending_pda_reprocess_updates)
1065    }
1066
1067    /// Create a new VmContext specifically for multi-entity operation.
1068    pub fn new_multi_entity() -> Self {
1069        VmContext {
1070            registers: vec![Value::Null; 256],
1071            states: HashMap::new(),
1072            debugger: None,
1073            instructions_executed: 0,
1074            cache_hits: 0,
1075            path_cache: HashMap::new(),
1076            pda_cache_hits: 0,
1077            pda_cache_misses: 0,
1078            pending_queue_size: 0,
1079            resolver_requests: VecDeque::new(),
1080            resolver_pending: HashMap::new(),
1081            resolver_cache: LruCache::new(resolver_cache_capacity()),
1082            resolver_cache_hits: 0,
1083            resolver_cache_misses: 0,
1084            current_context: None,
1085            warnings: Vec::new(),
1086            last_pda_lookup_miss: None,
1087            last_lookup_index_miss: None,
1088            last_pda_registered: None,
1089            last_lookup_index_keys: Vec::new(),
1090            pending_pda_reprocess_updates: Vec::new(),
1091            scheduled_callbacks: Vec::new(),
1092        }
1093    }
1094
1095    pub fn new_with_config(state_config: StateTableConfig) -> Self {
1096        let mut vm = VmContext {
1097            registers: vec![Value::Null; 256],
1098            states: HashMap::new(),
1099            debugger: None,
1100            instructions_executed: 0,
1101            cache_hits: 0,
1102            path_cache: HashMap::new(),
1103            pda_cache_hits: 0,
1104            pda_cache_misses: 0,
1105            pending_queue_size: 0,
1106            resolver_requests: VecDeque::new(),
1107            resolver_pending: HashMap::new(),
1108            resolver_cache: LruCache::new(resolver_cache_capacity()),
1109            resolver_cache_hits: 0,
1110            resolver_cache_misses: 0,
1111            current_context: None,
1112            warnings: Vec::new(),
1113            last_pda_lookup_miss: None,
1114            last_lookup_index_miss: None,
1115            last_pda_registered: None,
1116            last_lookup_index_keys: Vec::new(),
1117            pending_pda_reprocess_updates: Vec::new(),
1118            scheduled_callbacks: Vec::new(),
1119        };
1120        vm.states.insert(
1121            0,
1122            StateTable {
1123                data: DashMap::new(),
1124                access_times: DashMap::new(),
1125                lookup_indexes: HashMap::new(),
1126                temporal_indexes: HashMap::new(),
1127                pda_reverse_lookups: HashMap::new(),
1128                pending_updates: DashMap::new(),
1129                pending_instruction_events: DashMap::new(),
1130                last_account_data: DashMap::new(),
1131                version_tracker: VersionTracker::new(),
1132                instruction_dedup_cache: VersionTracker::with_capacity(
1133                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1134                ),
1135                config: state_config,
1136                entity_name: "default".to_string(),
1137                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1138                    NonZeroUsize::new(1000).unwrap(),
1139                )),
1140                deferred_when_ops: DashMap::new(),
1141            },
1142        );
1143        vm
1144    }
1145
1146    pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1147        self.resolver_requests.drain(..).collect()
1148    }
1149
1150    pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1151        std::mem::take(&mut self.scheduled_callbacks)
1152    }
1153
1154    pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1155        self.states.get(&state_id)?.get_and_touch(key)
1156    }
1157
1158    pub fn snapshot_state_table(&self, state_id: u32) -> Vec<(Value, Value)> {
1159        self.states
1160            .get(&state_id)
1161            .map(|state| {
1162                state
1163                    .data
1164                    .iter()
1165                    .map(|entry| (entry.key().clone(), entry.value().clone()))
1166                    .collect()
1167            })
1168            .unwrap_or_default()
1169    }
1170
1171    pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1172        if requests.is_empty() {
1173            return;
1174        }
1175
1176        self.resolver_requests.extend(requests);
1177    }
1178
1179    pub(crate) fn get_cached_resolver_value(&mut self, cache_key: &str) -> Option<Value> {
1180        let cached = self.resolver_cache.get(cache_key).cloned();
1181
1182        match cached {
1183            Some(entry) if entry.cached_at.elapsed() <= resolver_cache_ttl() => {
1184                self.resolver_cache_hits += 1;
1185                Some(entry.value)
1186            }
1187            Some(_) => {
1188                self.resolver_cache.pop(cache_key);
1189                self.resolver_cache_misses += 1;
1190                None
1191            }
1192            None => {
1193                self.resolver_cache_misses += 1;
1194                None
1195            }
1196        }
1197    }
1198
1199    fn cache_resolver_value(
1200        &mut self,
1201        resolver: &ResolverType,
1202        input: &Value,
1203        resolved_value: &Value,
1204    ) {
1205        self.resolver_cache.put(
1206            resolver_cache_key(resolver, input),
1207            ResolverCacheEntry {
1208                value: resolved_value.clone(),
1209                cached_at: Instant::now(),
1210            },
1211        );
1212    }
1213
1214    pub fn apply_resolver_result(
1215        &mut self,
1216        bytecode: &MultiEntityBytecode,
1217        cache_key: &str,
1218        resolved_value: Value,
1219    ) -> Result<Vec<Mutation>> {
1220        let entry = match self.resolver_pending.remove(cache_key) {
1221            Some(entry) => entry,
1222            None => return Ok(Vec::new()),
1223        };
1224
1225        self.cache_resolver_value(&entry.resolver, &entry.input, &resolved_value);
1226
1227        let mut mutations = Vec::new();
1228
1229        for target in entry.targets {
1230            if target.primary_key.is_null() {
1231                continue;
1232            }
1233
1234            let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1235                Some(bc) => bc,
1236                None => continue,
1237            };
1238
1239            let state = match self.states.get(&target.state_id) {
1240                Some(s) => s,
1241                None => continue,
1242            };
1243
1244            let mut entity_state = state
1245                .get_and_touch(&target.primary_key)
1246                .unwrap_or_else(|| json!({}));
1247
1248            let mut dirty_tracker = DirtyTracker::new();
1249            let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1250
1251            Self::apply_resolver_extractions_to_value(
1252                &mut entity_state,
1253                &resolved_value,
1254                &target.extracts,
1255                &mut dirty_tracker,
1256                &should_emit,
1257            )?;
1258
1259            if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1260                let old_values: Vec<_> = entity_bytecode
1261                    .computed_paths
1262                    .iter()
1263                    .map(|path| Self::get_value_at_path(&entity_state, path))
1264                    .collect();
1265
1266                let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1267                let context_timestamp = self
1268                    .current_context
1269                    .as_ref()
1270                    .map(|c| c.timestamp())
1271                    .unwrap_or_else(|| {
1272                        std::time::SystemTime::now()
1273                            .duration_since(std::time::UNIX_EPOCH)
1274                            .unwrap()
1275                            .as_secs() as i64
1276                    });
1277                let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1278
1279                if eval_result.is_ok() {
1280                    let mut changed_fields = Vec::new();
1281                    for (path, old_value) in
1282                        entity_bytecode.computed_paths.iter().zip(old_values.iter())
1283                    {
1284                        let new_value = Self::get_value_at_path(&entity_state, path);
1285                        let changed = new_value != *old_value;
1286                        let will_emit = should_emit(path);
1287
1288                        if changed && will_emit {
1289                            dirty_tracker.mark_replaced(path);
1290                            changed_fields.push(path.clone());
1291                        }
1292                    }
1293                }
1294            }
1295
1296            state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1297
1298            if dirty_tracker.is_empty() {
1299                continue;
1300            }
1301
1302            let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1303
1304            mutations.push(Mutation {
1305                export: target.entity_name.clone(),
1306                key: target.primary_key.clone(),
1307                patch,
1308                append: vec![],
1309            });
1310        }
1311
1312        Ok(mutations)
1313    }
1314
1315    pub fn enqueue_resolver_request(
1316        &mut self,
1317        _cache_key: String,
1318        resolver: ResolverType,
1319        input: Value,
1320        target: ResolverTarget,
1321    ) {
1322        let cache_key = resolver_cache_key(&resolver, &input);
1323
1324        if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1325            entry.add_target(target);
1326            return;
1327        }
1328
1329        let queued_at = std::time::SystemTime::now()
1330            .duration_since(std::time::UNIX_EPOCH)
1331            .unwrap()
1332            .as_secs() as i64;
1333
1334        self.resolver_pending.insert(
1335            cache_key.clone(),
1336            PendingResolverEntry {
1337                resolver: resolver.clone(),
1338                input: input.clone(),
1339                targets: vec![target],
1340                queued_at,
1341            },
1342        );
1343
1344        self.resolver_requests.push_back(ResolverRequest {
1345            cache_key,
1346            resolver,
1347            input,
1348        });
1349    }
1350
1351    fn apply_resolver_extractions_to_value<F>(
1352        state: &mut Value,
1353        resolved_value: &Value,
1354        extracts: &[ResolverExtractSpec],
1355        dirty_tracker: &mut DirtyTracker,
1356        should_emit: &F,
1357    ) -> Result<()>
1358    where
1359        F: Fn(&str) -> bool,
1360    {
1361        for extract in extracts {
1362            let resolved = match extract.source_path.as_deref() {
1363                Some(path) => Self::get_value_at_path(resolved_value, path),
1364                None => Some(resolved_value.clone()),
1365            };
1366
1367            let Some(value) = resolved else {
1368                continue;
1369            };
1370
1371            let value = if let Some(transform) = &extract.transform {
1372                Self::apply_transformation(&value, transform)?
1373            } else {
1374                value
1375            };
1376
1377            Self::set_nested_field_value(state, &extract.target_path, value)?;
1378            if should_emit(&extract.target_path) {
1379                dirty_tracker.mark_replaced(&extract.target_path);
1380            }
1381        }
1382
1383        Ok(())
1384    }
1385
1386    fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1387        if tracker.is_empty() {
1388            return Ok(json!({}));
1389        }
1390
1391        let mut partial = serde_json::Map::new();
1392
1393        for (path, change) in tracker.iter() {
1394            let segments: Vec<&str> = path.split('.').collect();
1395
1396            let value_to_insert = match change {
1397                FieldChange::Replaced => {
1398                    let mut current = state;
1399                    let mut found = true;
1400
1401                    for segment in &segments {
1402                        match current.get(*segment) {
1403                            Some(v) => current = v,
1404                            None => {
1405                                found = false;
1406                                break;
1407                            }
1408                        }
1409                    }
1410
1411                    if !found {
1412                        continue;
1413                    }
1414                    current.clone()
1415                }
1416                FieldChange::Appended(values) => Value::Array(values.clone()),
1417            };
1418
1419            let mut target = &mut partial;
1420            for (i, segment) in segments.iter().enumerate() {
1421                if i == segments.len() - 1 {
1422                    target.insert(segment.to_string(), value_to_insert.clone());
1423                } else {
1424                    target
1425                        .entry(segment.to_string())
1426                        .or_insert_with(|| json!({}));
1427                    target = target
1428                        .get_mut(*segment)
1429                        .and_then(|v| v.as_object_mut())
1430                        .ok_or("Failed to build nested structure")?;
1431                }
1432            }
1433        }
1434
1435        Ok(Value::Object(partial))
1436    }
1437
1438    /// Get a mutable reference to a state table by ID
1439    /// Returns None if the state ID doesn't exist
1440    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1441        self.states.get_mut(&state_id)
1442    }
1443
1444    /// Get public access to registers (for metrics context)
1445    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1446        &mut self.registers
1447    }
1448
1449    /// Get public access to path cache (for metrics context)
1450    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1451        &self.path_cache
1452    }
1453
1454    /// Get the current update context
1455    pub fn current_context(&self) -> Option<&UpdateContext> {
1456        self.current_context.as_ref()
1457    }
1458
1459    /// Set the current update context for computed field evaluation
1460    pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1461        self.current_context = context;
1462    }
1463
1464    fn add_warning(&mut self, msg: String) {
1465        self.warnings.push(msg);
1466    }
1467
1468    pub fn take_warnings(&mut self) -> Vec<String> {
1469        std::mem::take(&mut self.warnings)
1470    }
1471
1472    pub fn has_warnings(&self) -> bool {
1473        !self.warnings.is_empty()
1474    }
1475
1476    pub fn update_state_from_register(
1477        &mut self,
1478        state_id: u32,
1479        key: Value,
1480        register: Register,
1481    ) -> Result<()> {
1482        let state = self.states.get(&state_id).ok_or("State table not found")?;
1483        let value = self.registers[register].clone();
1484        state.insert_with_eviction(key, value);
1485        Ok(())
1486    }
1487
1488    fn reset_registers(&mut self) {
1489        for reg in &mut self.registers {
1490            *reg = Value::Null;
1491        }
1492    }
1493
1494    /// Extract only the dirty fields from state (public for use by instruction hooks)
1495    pub fn extract_partial_state(
1496        &self,
1497        state_reg: Register,
1498        dirty_fields: &HashSet<String>,
1499    ) -> Result<Value> {
1500        let full_state = &self.registers[state_reg];
1501
1502        if dirty_fields.is_empty() {
1503            return Ok(json!({}));
1504        }
1505
1506        let mut partial = serde_json::Map::new();
1507
1508        for path in dirty_fields {
1509            let segments: Vec<&str> = path.split('.').collect();
1510
1511            let mut current = full_state;
1512            let mut found = true;
1513
1514            for segment in &segments {
1515                match current.get(segment) {
1516                    Some(v) => current = v,
1517                    None => {
1518                        found = false;
1519                        break;
1520                    }
1521                }
1522            }
1523
1524            if !found {
1525                continue;
1526            }
1527
1528            let mut target = &mut partial;
1529            for (i, segment) in segments.iter().enumerate() {
1530                if i == segments.len() - 1 {
1531                    target.insert(segment.to_string(), current.clone());
1532                } else {
1533                    target
1534                        .entry(segment.to_string())
1535                        .or_insert_with(|| json!({}));
1536                    target = target
1537                        .get_mut(*segment)
1538                        .and_then(|v| v.as_object_mut())
1539                        .ok_or("Failed to build nested structure")?;
1540                }
1541            }
1542        }
1543
1544        Ok(Value::Object(partial))
1545    }
1546
1547    /// Extract a patch from state based on the DirtyTracker.
1548    /// For Replaced fields: extracts the full value from state.
1549    /// For Appended fields: emits only the appended values as an array.
1550    pub fn extract_partial_state_with_tracker(
1551        &self,
1552        state_reg: Register,
1553        tracker: &DirtyTracker,
1554    ) -> Result<Value> {
1555        let full_state = &self.registers[state_reg];
1556
1557        if tracker.is_empty() {
1558            return Ok(json!({}));
1559        }
1560
1561        let mut partial = serde_json::Map::new();
1562
1563        for (path, change) in tracker.iter() {
1564            let segments: Vec<&str> = path.split('.').collect();
1565
1566            let value_to_insert = match change {
1567                FieldChange::Replaced => {
1568                    let mut current = full_state;
1569                    let mut found = true;
1570
1571                    for segment in &segments {
1572                        match current.get(*segment) {
1573                            Some(v) => current = v,
1574                            None => {
1575                                found = false;
1576                                break;
1577                            }
1578                        }
1579                    }
1580
1581                    if !found {
1582                        continue;
1583                    }
1584                    current.clone()
1585                }
1586                FieldChange::Appended(values) => Value::Array(values.clone()),
1587            };
1588
1589            let mut target = &mut partial;
1590            for (i, segment) in segments.iter().enumerate() {
1591                if i == segments.len() - 1 {
1592                    target.insert(segment.to_string(), value_to_insert.clone());
1593                } else {
1594                    target
1595                        .entry(segment.to_string())
1596                        .or_insert_with(|| json!({}));
1597                    target = target
1598                        .get_mut(*segment)
1599                        .and_then(|v| v.as_object_mut())
1600                        .ok_or("Failed to build nested structure")?;
1601                }
1602            }
1603        }
1604
1605        Ok(Value::Object(partial))
1606    }
1607
1608    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1609        if let Some(compiled) = self.path_cache.get(path) {
1610            self.cache_hits += 1;
1611            #[cfg(feature = "otel")]
1612            crate::vm_metrics::record_path_cache_hit();
1613            return compiled.clone();
1614        }
1615        #[cfg(feature = "otel")]
1616        crate::vm_metrics::record_path_cache_miss();
1617        let compiled = CompiledPath::new(path);
1618        self.path_cache.insert(path.to_string(), compiled.clone());
1619        compiled
1620    }
1621
1622    /// Process an event with optional context metadata
1623    #[cfg_attr(feature = "otel", instrument(
1624        name = "vm.process_event",
1625        skip(self, bytecode, event_value, log),
1626        level = "info",
1627        fields(
1628            event_type = %event_type,
1629            slot = context.as_ref().and_then(|c| c.slot),
1630        )
1631    ))]
1632    pub fn process_event(
1633        &mut self,
1634        bytecode: &MultiEntityBytecode,
1635        event_value: Value,
1636        event_type: &str,
1637        context: Option<&UpdateContext>,
1638        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1639    ) -> Result<Vec<Mutation>> {
1640        self.current_context = context.cloned();
1641        self.emit_debug(|| VmDebugEvent::ProcessEventStart {
1642            event_type: event_type.to_string(),
1643            context: context.map(|ctx| ctx.to_value()),
1644        });
1645
1646        let mut event_value = event_value;
1647        if let Some(ctx) = context {
1648            if let Some(obj) = event_value.as_object_mut() {
1649                obj.insert("__update_context".to_string(), ctx.to_value());
1650            }
1651        }
1652
1653        let mut all_mutations = Vec::new();
1654
1655        if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1656            if let Some(ctx) = context {
1657                if let Some(signature) = ctx.signature.clone() {
1658                    let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1659                    for state_id in state_ids {
1660                        if let Some(state) = self.states.get(&state_id) {
1661                            {
1662                                let mut cache = state.recent_tx_instructions.lock().unwrap();
1663                                let entry =
1664                                    cache.get_or_insert_mut(signature.clone(), HashSet::new);
1665                                entry.insert(event_type.to_string());
1666                            }
1667
1668                            let key = (signature.clone(), event_type.to_string());
1669                            if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1670                                tracing::debug!(
1671                                    event_type = %event_type,
1672                                    signature = %signature,
1673                                    deferred_count = deferred_ops.len(),
1674                                    "flushing deferred when-ops"
1675                                );
1676                                for op in deferred_ops {
1677                                    // Look up the entity bytecode to get the computed fields evaluator
1678                                    let (evaluator, computed_paths) = bytecode
1679                                        .entities
1680                                        .get(&op.entity_name)
1681                                        .map(|eb| {
1682                                            (
1683                                                eb.computed_fields_evaluator.as_ref(),
1684                                                eb.computed_paths.as_slice(),
1685                                            )
1686                                        })
1687                                        .unwrap_or((None, &[]));
1688
1689                                    match self.apply_deferred_when_op(
1690                                        state_id,
1691                                        &op,
1692                                        evaluator,
1693                                        Some(computed_paths),
1694                                    ) {
1695                                        Ok(mutations) => all_mutations.extend(mutations),
1696                                        Err(e) => tracing::warn!(
1697                                            "Failed to apply deferred when-op: {}",
1698                                            e
1699                                        ),
1700                                    }
1701                                }
1702                            }
1703                        }
1704                    }
1705                }
1706            }
1707        }
1708
1709        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1710            for entity_name in entity_names {
1711                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1712                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1713                        self.emit_debug(|| VmDebugEvent::HandlerStart {
1714                            entity_name: entity_name.clone(),
1715                            event_type: event_type.to_string(),
1716                            state_id: entity_bytecode.state_id,
1717                        });
1718
1719                        if let Some(ref mut log) = log {
1720                            log.set("entity", entity_name.clone());
1721                            log.inc("handlers", 1);
1722                        }
1723
1724                        let opcodes_before = self.instructions_executed;
1725                        let cache_before = self.cache_hits;
1726                        let pda_hits_before = self.pda_cache_hits;
1727                        let pda_misses_before = self.pda_cache_misses;
1728
1729                        let mutations = self.execute_handler(
1730                            handler,
1731                            &event_value,
1732                            event_type,
1733                            entity_bytecode.state_id,
1734                            entity_name,
1735                            entity_bytecode.computed_fields_evaluator.as_ref(),
1736                            Some(&entity_bytecode.non_emitted_fields),
1737                        )?;
1738                        let direct_mutation_count = mutations.len();
1739
1740                        if let Some(ref mut log) = log {
1741                            log.inc(
1742                                "opcodes",
1743                                (self.instructions_executed - opcodes_before) as i64,
1744                            );
1745                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1746                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1747                            log.inc(
1748                                "pda_misses",
1749                                (self.pda_cache_misses - pda_misses_before) as i64,
1750                            );
1751                        }
1752
1753                        if mutations.is_empty() {
1754                            // CPI events (suffix "CpiEvent") are transaction-scoped like instructions
1755                            // (suffix "IxState") and should be queued the same way when PDA lookup fails.
1756                            let is_tx_event =
1757                                event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1758                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1759                                if is_tx_event {
1760                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1761                                    let signature = context
1762                                        .and_then(|c| c.signature.clone())
1763                                        .unwrap_or_default();
1764                                    let _ = self.queue_instruction_event(
1765                                        entity_bytecode.state_id,
1766                                        QueuedInstructionEvent {
1767                                            pda_address: missed_pda.clone(),
1768                                            event_type: event_type.to_string(),
1769                                            event_data: event_value.clone(),
1770                                            slot,
1771                                            signature,
1772                                        },
1773                                    );
1774                                    self.emit_debug(|| VmDebugEvent::QueueAction {
1775                                        entity_name: entity_name.clone(),
1776                                        event_type: event_type.to_string(),
1777                                        queue_kind: "instruction_pda_lookup_miss".to_string(),
1778                                        lookup_value: missed_pda,
1779                                    });
1780                                } else {
1781                                    // Queue account updates (e.g. BondingCurve) when PDA
1782                                    // reverse lookup fails. These will be flushed when an
1783                                    // instruction registers the PDA mapping via
1784                                    // UpdateLookupIndex.
1785                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1786                                    let signature = context
1787                                        .and_then(|c| c.signature.clone())
1788                                        .unwrap_or_default();
1789                                    if let Some(write_version) =
1790                                        context.and_then(|c| c.write_version)
1791                                    {
1792                                        let _ = self.queue_account_update(
1793                                            entity_bytecode.state_id,
1794                                            QueuedAccountUpdate {
1795                                                pda_address: missed_pda.clone(),
1796                                                account_type: event_type.to_string(),
1797                                                account_data: event_value.clone(),
1798                                                slot,
1799                                                write_version,
1800                                                signature,
1801                                            },
1802                                        );
1803                                        self.emit_debug(|| VmDebugEvent::QueueAction {
1804                                            entity_name: entity_name.clone(),
1805                                            event_type: event_type.to_string(),
1806                                            queue_kind: "account_pda_lookup_miss".to_string(),
1807                                            lookup_value: missed_pda,
1808                                        });
1809                                    } else {
1810                                        tracing::warn!(
1811                                            event_type = %event_type,
1812                                            "Dropping queued account update: write_version missing from context"
1813                                        );
1814                                    }
1815                                }
1816                            }
1817                            if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1818                                if !is_tx_event {
1819                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1820                                    let signature = context
1821                                        .and_then(|c| c.signature.clone())
1822                                        .unwrap_or_default();
1823                                    if let Some(write_version) =
1824                                        context.and_then(|c| c.write_version)
1825                                    {
1826                                        let _ = self.queue_account_update(
1827                                            entity_bytecode.state_id,
1828                                            QueuedAccountUpdate {
1829                                                pda_address: missed_lookup.clone(),
1830                                                account_type: event_type.to_string(),
1831                                                account_data: event_value.clone(),
1832                                                slot,
1833                                                write_version,
1834                                                signature,
1835                                            },
1836                                        );
1837                                        self.emit_debug(|| VmDebugEvent::QueueAction {
1838                                            entity_name: entity_name.clone(),
1839                                            event_type: event_type.to_string(),
1840                                            queue_kind: "account_lookup_index_miss".to_string(),
1841                                            lookup_value: missed_lookup,
1842                                        });
1843                                    } else {
1844                                        tracing::trace!(
1845                                            event_type = %event_type,
1846                                            "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1847                                        );
1848                                    }
1849                                }
1850                            }
1851                        }
1852
1853                        all_mutations.extend(mutations);
1854
1855                        if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1856                            if let Some(ctx) = context {
1857                                if let Some(ref signature) = ctx.signature {
1858                                    if let Some(state) = self.states.get(&entity_bytecode.state_id)
1859                                    {
1860                                        {
1861                                            let mut cache =
1862                                                state.recent_tx_instructions.lock().unwrap();
1863                                            let entry = cache
1864                                                .get_or_insert_mut(signature.clone(), HashSet::new);
1865                                            entry.insert(event_type.to_string());
1866                                        }
1867
1868                                        let key = (signature.clone(), event_type.to_string());
1869                                        if let Some((_, deferred_ops)) =
1870                                            state.deferred_when_ops.remove(&key)
1871                                        {
1872                                            tracing::debug!(
1873                                                event_type = %event_type,
1874                                                signature = %signature,
1875                                                deferred_count = deferred_ops.len(),
1876                                                "flushing deferred when-ops"
1877                                            );
1878                                            for op in deferred_ops {
1879                                                match self.apply_deferred_when_op(
1880                                                    entity_bytecode.state_id,
1881                                                    &op,
1882                                                    entity_bytecode
1883                                                        .computed_fields_evaluator
1884                                                        .as_ref(),
1885                                                    Some(&entity_bytecode.computed_paths),
1886                                                ) {
1887                                                    Ok(mutations) => {
1888                                                        all_mutations.extend(mutations)
1889                                                    }
1890                                                    Err(e) => {
1891                                                        tracing::warn!(
1892                                                            "Failed to apply deferred when-op: {}",
1893                                                            e
1894                                                        );
1895                                                    }
1896                                                }
1897                                            }
1898                                        }
1899                                    }
1900                                }
1901                            }
1902                        }
1903
1904                        if let Some(registered_pda) = self.take_last_pda_registered() {
1905                            let pending_events = self.flush_pending_instruction_events(
1906                                entity_bytecode.state_id,
1907                                &registered_pda,
1908                            );
1909                            let pending_count = pending_events.len();
1910                            if pending_count > 0 {
1911                                self.emit_debug(|| VmDebugEvent::FlushAction {
1912                                    entity_name: entity_name.clone(),
1913                                    event_type: event_type.to_string(),
1914                                    flush_kind: "pending_instruction_events".to_string(),
1915                                    trigger: registered_pda.clone(),
1916                                    count: pending_count,
1917                                });
1918                            }
1919                            for pending in pending_events {
1920                                if let Some(pending_handler) =
1921                                    entity_bytecode.handlers.get(&pending.event_type)
1922                                {
1923                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1924                                        pending_handler,
1925                                        &pending.event_data,
1926                                        &pending.event_type,
1927                                        entity_bytecode.state_id,
1928                                        entity_name,
1929                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1930                                        Some(&entity_bytecode.non_emitted_fields),
1931                                    ) {
1932                                        all_mutations.extend(reprocessed_mutations);
1933                                    }
1934                                }
1935                            }
1936                        }
1937
1938                        for pending in self.take_pending_pda_reprocess_updates() {
1939                            if let Some(pending_handler) =
1940                                entity_bytecode.handlers.get(&pending.account_type)
1941                            {
1942                                self.current_context = Some(if pending.is_stale_reprocess {
1943                                    UpdateContext::new_reprocessed(
1944                                        pending.slot,
1945                                        pending.write_version,
1946                                    )
1947                                } else {
1948                                    UpdateContext::new_account(
1949                                        pending.slot,
1950                                        pending.signature.clone(),
1951                                        pending.write_version,
1952                                    )
1953                                });
1954                                match self.execute_handler(
1955                                    pending_handler,
1956                                    &pending.account_data,
1957                                    &pending.account_type,
1958                                    entity_bytecode.state_id,
1959                                    entity_name,
1960                                    entity_bytecode.computed_fields_evaluator.as_ref(),
1961                                    Some(&entity_bytecode.non_emitted_fields),
1962                                ) {
1963                                    Ok(reprocessed) => {
1964                                        all_mutations.extend(reprocessed);
1965                                    }
1966                                    Err(e) => {
1967                                        tracing::warn!(
1968                                            error = %e,
1969                                            account_type = %pending.account_type,
1970                                            "PDA remap reprocessing failed"
1971                                        );
1972                                    }
1973                                }
1974                            }
1975                        }
1976
1977                        let lookup_keys = self.take_last_lookup_index_keys();
1978                        if !lookup_keys.is_empty() {
1979                            tracing::info!(
1980                                keys = ?lookup_keys,
1981                                entity = %entity_name,
1982                                "vm.process_event: flushing pending updates for lookup_keys"
1983                            );
1984                        }
1985                        for lookup_key in lookup_keys {
1986                            if let Ok(pending_updates) =
1987                                self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1988                            {
1989                                let pending_count = pending_updates.len();
1990                                if pending_count > 0 {
1991                                    self.emit_debug(|| VmDebugEvent::FlushAction {
1992                                        entity_name: entity_name.clone(),
1993                                        event_type: event_type.to_string(),
1994                                        flush_kind: "pending_account_updates".to_string(),
1995                                        trigger: lookup_key.clone(),
1996                                        count: pending_count,
1997                                    });
1998                                }
1999                                for pending in pending_updates {
2000                                    if let Some(pending_handler) =
2001                                        entity_bytecode.handlers.get(&pending.account_type)
2002                                    {
2003                                        self.current_context = Some(UpdateContext::new_account(
2004                                            pending.slot,
2005                                            pending.signature.clone(),
2006                                            pending.write_version,
2007                                        ));
2008                                        match self.execute_handler(
2009                                            pending_handler,
2010                                            &pending.account_data,
2011                                            &pending.account_type,
2012                                            entity_bytecode.state_id,
2013                                            entity_name,
2014                                            entity_bytecode.computed_fields_evaluator.as_ref(),
2015                                            Some(&entity_bytecode.non_emitted_fields),
2016                                        ) {
2017                                            Ok(reprocessed) => {
2018                                                all_mutations.extend(reprocessed);
2019                                            }
2020                                            Err(e) => {
2021                                                tracing::warn!(
2022                                                    error = %e,
2023                                                    account_type = %pending.account_type,
2024                                                    "Flushed event reprocessing failed"
2025                                                );
2026                                            }
2027                                        }
2028                                    }
2029                                }
2030                            }
2031                        }
2032
2033                        self.emit_debug(|| VmDebugEvent::HandlerEnd {
2034                            entity_name: entity_name.clone(),
2035                            event_type: event_type.to_string(),
2036                            mutations: direct_mutation_count,
2037                        });
2038                    } else if let Some(ref mut log) = log {
2039                        log.set("skip_reason", "no_handler");
2040                    }
2041                } else if let Some(ref mut log) = log {
2042                    log.set("skip_reason", "entity_not_found");
2043                }
2044            }
2045        } else if let Some(ref mut log) = log {
2046            log.set("skip_reason", "no_event_routing");
2047        }
2048
2049        let debug_warnings = self.warnings.clone();
2050
2051        if let Some(log) = log {
2052            log.set("mutations", all_mutations.len() as i64);
2053            if let Some(first) = all_mutations.first() {
2054                if let Some(key_str) = first.key.as_str() {
2055                    log.set("primary_key", key_str);
2056                } else if let Some(key_num) = first.key.as_u64() {
2057                    log.set("primary_key", key_num as i64);
2058                }
2059            }
2060            if let Some(state) = self.states.get(&0) {
2061                log.set("state_table_size", state.data.len() as i64);
2062            }
2063
2064            let warnings = self.take_warnings();
2065            if !warnings.is_empty() {
2066                log.set("warnings", warnings.len() as i64);
2067                log.set(
2068                    "warning_messages",
2069                    Value::Array(warnings.into_iter().map(Value::String).collect()),
2070                );
2071                log.set_level(crate::canonical_log::LogLevel::Warn);
2072            }
2073        } else {
2074            self.warnings.clear();
2075        }
2076
2077        self.emit_debug(|| VmDebugEvent::ProcessEventEnd {
2078            event_type: event_type.to_string(),
2079            mutations: all_mutations.len(),
2080            warnings: debug_warnings,
2081        });
2082
2083        if self.instructions_executed.is_multiple_of(1000) {
2084            let state_ids: Vec<u32> = self.states.keys().cloned().collect();
2085            for state_id in state_ids {
2086                let expired = self.cleanup_expired_when_ops(state_id, 60);
2087                if expired > 0 {
2088                    tracing::debug!(
2089                        "Cleaned up {} expired deferred when-ops for state {}",
2090                        expired,
2091                        state_id
2092                    );
2093                }
2094            }
2095        }
2096
2097        Ok(all_mutations)
2098    }
2099
2100    pub fn process_any(
2101        &mut self,
2102        bytecode: &MultiEntityBytecode,
2103        any: prost_types::Any,
2104    ) -> Result<Vec<Mutation>> {
2105        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
2106        self.process_event(bytecode, event_value, &event_type, None, None)
2107    }
2108
2109    #[cfg_attr(feature = "otel", instrument(
2110        name = "vm.execute_handler",
2111        skip(self, handler, event_value, entity_evaluator),
2112        level = "debug",
2113        fields(
2114            event_type = %event_type,
2115            handler_opcodes = handler.len(),
2116        )
2117    ))]
2118    #[allow(clippy::type_complexity, clippy::too_many_arguments)]
2119    fn execute_handler(
2120        &mut self,
2121        handler: &[OpCode],
2122        event_value: &Value,
2123        event_type: &str,
2124        override_state_id: u32,
2125        entity_name: &str,
2126        entity_evaluator: Option<
2127            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
2128        >,
2129        non_emitted_fields: Option<&HashSet<String>>,
2130    ) -> Result<Vec<Mutation>> {
2131        self.reset_registers();
2132        self.last_pda_lookup_miss = None;
2133
2134        let mut pc: usize = 0;
2135        let mut output = Vec::new();
2136        let mut dirty_tracker = DirtyTracker::new();
2137        let should_emit = |path: &str| {
2138            non_emitted_fields
2139                .map(|fields| !fields.contains(path))
2140                .unwrap_or(true)
2141        };
2142
2143        while pc < handler.len() {
2144            match &handler[pc] {
2145                OpCode::LoadEventField {
2146                    path,
2147                    dest,
2148                    default,
2149                } => {
2150                    let value = self.load_field(event_value, path, default.as_ref())?;
2151                    if self.is_debug_enabled() {
2152                        self.emit_debug(|| VmDebugEvent::LoadEventField {
2153                            entity_name: entity_name.to_string(),
2154                            event_type: event_type.to_string(),
2155                            path: path.segments.clone(),
2156                            value: value.clone(),
2157                        });
2158                    }
2159                    self.registers[*dest] = value;
2160                    pc += 1;
2161                }
2162                OpCode::LoadConstant { value, dest } => {
2163                    self.registers[*dest] = value.clone();
2164                    pc += 1;
2165                }
2166                OpCode::CopyRegister { source, dest } => {
2167                    self.registers[*dest] = self.registers[*source].clone();
2168                    pc += 1;
2169                }
2170                OpCode::CopyRegisterIfNull { source, dest } => {
2171                    if self.registers[*dest].is_null() {
2172                        self.registers[*dest] = self.registers[*source].clone();
2173                    }
2174                    pc += 1;
2175                }
2176                OpCode::GetEventType { dest } => {
2177                    self.registers[*dest] = json!(event_type);
2178                    pc += 1;
2179                }
2180                OpCode::CreateObject { dest } => {
2181                    self.registers[*dest] = json!({});
2182                    pc += 1;
2183                }
2184                OpCode::SetField {
2185                    object,
2186                    path,
2187                    value,
2188                } => {
2189                    let old_value = self
2190                        .is_debug_enabled()
2191                        .then(|| Self::get_value_at_path(&self.registers[*object], path))
2192                        .flatten();
2193                    self.set_field_auto_vivify(*object, path, *value)?;
2194                    if should_emit(path) {
2195                        dirty_tracker.mark_replaced(path);
2196                    }
2197                    if self.is_debug_enabled() {
2198                        self.emit_debug(|| VmDebugEvent::FieldWrite {
2199                            entity_name: entity_name.to_string(),
2200                            event_type: event_type.to_string(),
2201                            op: "set_field".to_string(),
2202                            path: path.clone(),
2203                            old_value,
2204                            new_value: Self::get_value_at_path(&self.registers[*object], path),
2205                            applied: true,
2206                            reason: None,
2207                        });
2208                    }
2209                    pc += 1;
2210                }
2211                OpCode::SetFields { object, fields } => {
2212                    for (path, value_reg) in fields {
2213                        let old_value = self
2214                            .is_debug_enabled()
2215                            .then(|| Self::get_value_at_path(&self.registers[*object], path))
2216                            .flatten();
2217                        self.set_field_auto_vivify(*object, path, *value_reg)?;
2218                        if should_emit(path) {
2219                            dirty_tracker.mark_replaced(path);
2220                        }
2221                        if self.is_debug_enabled() {
2222                            self.emit_debug(|| VmDebugEvent::FieldWrite {
2223                                entity_name: entity_name.to_string(),
2224                                event_type: event_type.to_string(),
2225                                op: "set_fields".to_string(),
2226                                path: path.clone(),
2227                                old_value,
2228                                new_value: Self::get_value_at_path(&self.registers[*object], path),
2229                                applied: true,
2230                                reason: None,
2231                            });
2232                        }
2233                    }
2234                    pc += 1;
2235                }
2236                OpCode::GetField { object, path, dest } => {
2237                    let value = self.get_field(*object, path)?;
2238                    self.registers[*dest] = value;
2239                    pc += 1;
2240                }
2241                OpCode::AbortIfNullKey {
2242                    key,
2243                    is_account_event,
2244                } => {
2245                    let key_value = &self.registers[*key];
2246                    if key_value.is_null() && *is_account_event {
2247                        self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2248                            entity_name: entity_name.to_string(),
2249                            event_type: event_type.to_string(),
2250                            key: key_value.clone(),
2251                            existing_state: None,
2252                            loaded_state: Value::Null,
2253                            skipped_reason: Some("null_primary_key".to_string()),
2254                        });
2255                        tracing::debug!(
2256                            event_type = %event_type,
2257                            "AbortIfNullKey: key is null for account state event, \
2258                             returning empty mutations for queueing"
2259                        );
2260                        return Ok(Vec::new());
2261                    }
2262
2263                    pc += 1;
2264                }
2265                OpCode::ReadOrInitState {
2266                    state_id: _,
2267                    key,
2268                    default,
2269                    dest,
2270                } => {
2271                    let actual_state_id = override_state_id;
2272                    let entity_name_owned = entity_name.to_string();
2273                    self.states
2274                        .entry(actual_state_id)
2275                        .or_insert_with(|| StateTable {
2276                            data: DashMap::new(),
2277                            access_times: DashMap::new(),
2278                            lookup_indexes: HashMap::new(),
2279                            temporal_indexes: HashMap::new(),
2280                            pda_reverse_lookups: HashMap::new(),
2281                            pending_updates: DashMap::new(),
2282                            pending_instruction_events: DashMap::new(),
2283                            last_account_data: DashMap::new(),
2284                            version_tracker: VersionTracker::new(),
2285                            instruction_dedup_cache: VersionTracker::with_capacity(
2286                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
2287                            ),
2288                            config: StateTableConfig::default(),
2289                            entity_name: entity_name_owned,
2290                            recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
2291                                NonZeroUsize::new(1000).unwrap(),
2292                            )),
2293                            deferred_when_ops: DashMap::new(),
2294                        });
2295                    let key_value = self.registers[*key].clone();
2296                    // Warn if key is null for account state events (not instruction events or CPI events)
2297                    let warn_null_key = key_value.is_null()
2298                        && event_type.ends_with("State")
2299                        && !event_type.ends_with("IxState")
2300                        && !event_type.ends_with("CpiEvent");
2301
2302                    if warn_null_key {
2303                        self.add_warning(format!(
2304                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
2305                            key, event_type
2306                        ));
2307                    }
2308
2309                    let state = self
2310                        .states
2311                        .get(&actual_state_id)
2312                        .ok_or("State table not found")?;
2313
2314                    if !key_value.is_null() {
2315                        if let Some(ctx) = &self.current_context {
2316                            // Account updates: use recency check to discard stale updates
2317                            // IMPORTANT: Use account address (PDA) as the key, not entity key,
2318                            // because multiple accounts can update the same entity and each
2319                            // has its own independent write_version
2320                            if ctx.is_account_update() {
2321                                if let (Some(slot), Some(write_version)) =
2322                                    (ctx.slot, ctx.write_version)
2323                                {
2324                                    // Get account address from event_value for proper tracking
2325                                    let account_address = event_value
2326                                        .get("__account_address")
2327                                        .cloned()
2328                                        .unwrap_or_else(|| key_value.clone());
2329
2330                                    if !state.is_fresh_update(
2331                                        &account_address,
2332                                        event_type,
2333                                        slot,
2334                                        write_version,
2335                                    ) {
2336                                        self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2337                                            entity_name: entity_name.to_string(),
2338                                            event_type: event_type.to_string(),
2339                                            key: key_value.clone(),
2340                                            existing_state: None,
2341                                            loaded_state: Value::Null,
2342                                            skipped_reason: Some(
2343                                                "stale_account_update".to_string(),
2344                                            ),
2345                                        });
2346                                        self.add_warning(format!(
2347                                            "Stale account update skipped: slot={}, write_version={}, account={}",
2348                                            slot, write_version, account_address
2349                                        ));
2350                                        return Ok(Vec::new());
2351                                    }
2352                                }
2353                            }
2354                            // Instruction updates: process all, but skip exact duplicates
2355                            else if ctx.is_instruction_update() {
2356                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
2357                                    if state.is_duplicate_instruction(
2358                                        &key_value, event_type, slot, txn_index,
2359                                    ) {
2360                                        self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2361                                            entity_name: entity_name.to_string(),
2362                                            event_type: event_type.to_string(),
2363                                            key: key_value.clone(),
2364                                            existing_state: None,
2365                                            loaded_state: Value::Null,
2366                                            skipped_reason: Some(
2367                                                "duplicate_instruction".to_string(),
2368                                            ),
2369                                        });
2370                                        self.add_warning(format!(
2371                                            "Duplicate instruction skipped: slot={}, txn_index={}",
2372                                            slot, txn_index
2373                                        ));
2374                                        return Ok(Vec::new());
2375                                    }
2376                                }
2377                            }
2378                        }
2379                    }
2380                    let existing_state = state.get_and_touch(&key_value);
2381                    let value = existing_state.clone().unwrap_or_else(|| default.clone());
2382
2383                    self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2384                        entity_name: entity_name.to_string(),
2385                        event_type: event_type.to_string(),
2386                        key: key_value,
2387                        existing_state,
2388                        loaded_state: value.clone(),
2389                        skipped_reason: None,
2390                    });
2391
2392                    self.registers[*dest] = value;
2393                    pc += 1;
2394                }
2395                OpCode::UpdateState {
2396                    state_id: _,
2397                    key,
2398                    value,
2399                } => {
2400                    let actual_state_id = override_state_id;
2401                    let state = self
2402                        .states
2403                        .get(&actual_state_id)
2404                        .ok_or("State table not found")?;
2405                    let key_value = self.registers[*key].clone();
2406                    let value_data = self.registers[*value].clone();
2407
2408                    state.insert_with_eviction(key_value, value_data);
2409                    pc += 1;
2410                }
2411                OpCode::AppendToArray {
2412                    object,
2413                    path,
2414                    value,
2415                } => {
2416                    let appended_value = self.registers[*value].clone();
2417                    let max_len = self
2418                        .states
2419                        .get(&override_state_id)
2420                        .map(|s| s.max_array_length())
2421                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2422                    self.append_to_array(*object, path, *value, max_len)?;
2423                    if should_emit(path) {
2424                        dirty_tracker.mark_appended(path, appended_value);
2425                    }
2426                    pc += 1;
2427                }
2428                OpCode::GetCurrentTimestamp { dest } => {
2429                    let timestamp = std::time::SystemTime::now()
2430                        .duration_since(std::time::UNIX_EPOCH)
2431                        .unwrap()
2432                        .as_secs() as i64;
2433                    self.registers[*dest] = json!(timestamp);
2434                    pc += 1;
2435                }
2436                OpCode::CreateEvent { dest, event_value } => {
2437                    let timestamp = std::time::SystemTime::now()
2438                        .duration_since(std::time::UNIX_EPOCH)
2439                        .unwrap()
2440                        .as_secs() as i64;
2441
2442                    // Filter out __update_context from the event data
2443                    let mut event_data = self.registers[*event_value].clone();
2444                    if let Some(obj) = event_data.as_object_mut() {
2445                        obj.remove("__update_context");
2446                    }
2447
2448                    // Create event with timestamp, data, and optional slot/signature from context
2449                    let mut event = serde_json::Map::new();
2450                    event.insert("timestamp".to_string(), json!(timestamp));
2451                    event.insert("data".to_string(), event_data);
2452
2453                    // Add slot and signature if available from current context
2454                    if let Some(ref ctx) = self.current_context {
2455                        if let Some(slot) = ctx.slot {
2456                            event.insert("slot".to_string(), json!(slot));
2457                        }
2458                        if let Some(ref signature) = ctx.signature {
2459                            event.insert("signature".to_string(), json!(signature));
2460                        }
2461                    }
2462
2463                    self.registers[*dest] = Value::Object(event);
2464                    pc += 1;
2465                }
2466                OpCode::CreateCapture {
2467                    dest,
2468                    capture_value,
2469                } => {
2470                    let timestamp = std::time::SystemTime::now()
2471                        .duration_since(std::time::UNIX_EPOCH)
2472                        .unwrap()
2473                        .as_secs() as i64;
2474
2475                    // Get the capture data (already filtered by load_field)
2476                    let capture_data = self.registers[*capture_value].clone();
2477
2478                    // Extract account_address from the original event if available
2479                    let account_address = event_value
2480                        .get("__account_address")
2481                        .and_then(|v| v.as_str())
2482                        .unwrap_or("")
2483                        .to_string();
2484
2485                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
2486                    let mut capture = serde_json::Map::new();
2487                    capture.insert("timestamp".to_string(), json!(timestamp));
2488                    capture.insert("account_address".to_string(), json!(account_address));
2489                    capture.insert("data".to_string(), capture_data);
2490
2491                    // Add slot and signature if available from current context
2492                    if let Some(ref ctx) = self.current_context {
2493                        if let Some(slot) = ctx.slot {
2494                            capture.insert("slot".to_string(), json!(slot));
2495                        }
2496                        if let Some(ref signature) = ctx.signature {
2497                            capture.insert("signature".to_string(), json!(signature));
2498                        }
2499                    }
2500
2501                    self.registers[*dest] = Value::Object(capture);
2502                    pc += 1;
2503                }
2504                OpCode::Transform {
2505                    source,
2506                    dest,
2507                    transformation,
2508                } => {
2509                    if source == dest {
2510                        self.transform_in_place(*source, transformation)?;
2511                    } else {
2512                        let source_value = &self.registers[*source];
2513                        let value = Self::apply_transformation(source_value, transformation)?;
2514                        self.registers[*dest] = value;
2515                    }
2516                    pc += 1;
2517                }
2518                OpCode::EmitMutation {
2519                    entity_name,
2520                    key,
2521                    state,
2522                } => {
2523                    let primary_key = self.registers[*key].clone();
2524                    let dirty_fields: Vec<String> =
2525                        dirty_tracker.dirty_paths().into_iter().collect();
2526
2527                    if primary_key.is_null() || dirty_tracker.is_empty() {
2528                        let reason = if dirty_tracker.is_empty() {
2529                            "no_fields_modified"
2530                        } else {
2531                            "null_primary_key"
2532                        };
2533                        self.emit_debug(|| VmDebugEvent::EmitMutation {
2534                            entity_name: entity_name.clone(),
2535                            event_type: event_type.to_string(),
2536                            key: primary_key.clone(),
2537                            emitted: false,
2538                            reason: Some(reason.to_string()),
2539                            patch: None,
2540                            dirty_fields,
2541                        });
2542                        self.add_warning(format!(
2543                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
2544                            entity_name,
2545                            reason,
2546                            dirty_tracker.len()
2547                        ));
2548                    } else {
2549                        let patch =
2550                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2551
2552                        let append = dirty_tracker.appended_paths();
2553                        let mutation = Mutation {
2554                            export: entity_name.clone(),
2555                            key: primary_key,
2556                            patch,
2557                            append,
2558                        };
2559                        self.emit_debug(|| VmDebugEvent::EmitMutation {
2560                            entity_name: entity_name.clone(),
2561                            event_type: event_type.to_string(),
2562                            key: mutation.key.clone(),
2563                            emitted: true,
2564                            reason: None,
2565                            patch: Some(mutation.patch.clone()),
2566                            dirty_fields,
2567                        });
2568                        output.push(mutation);
2569                    }
2570                    pc += 1;
2571                }
2572                OpCode::SetFieldIfNull {
2573                    object,
2574                    path,
2575                    value,
2576                } => {
2577                    let old_value = self
2578                        .is_debug_enabled()
2579                        .then(|| Self::get_value_at_path(&self.registers[*object], path))
2580                        .flatten();
2581                    let was_set = self.set_field_if_null(*object, path, *value)?;
2582                    if was_set && should_emit(path) {
2583                        dirty_tracker.mark_replaced(path);
2584                    }
2585                    if self.is_debug_enabled() {
2586                        self.emit_debug(|| VmDebugEvent::FieldWrite {
2587                            entity_name: entity_name.to_string(),
2588                            event_type: event_type.to_string(),
2589                            op: "set_field_if_null".to_string(),
2590                            path: path.clone(),
2591                            old_value,
2592                            new_value: Self::get_value_at_path(&self.registers[*object], path),
2593                            applied: was_set,
2594                            reason: (!was_set).then_some("already_set".to_string()),
2595                        });
2596                    }
2597                    pc += 1;
2598                }
2599                OpCode::SetFieldMax {
2600                    object,
2601                    path,
2602                    value,
2603                } => {
2604                    let was_updated = self.set_field_max(*object, path, *value)?;
2605                    if was_updated && should_emit(path) {
2606                        dirty_tracker.mark_replaced(path);
2607                    }
2608                    pc += 1;
2609                }
2610                OpCode::UpdateTemporalIndex {
2611                    state_id: _,
2612                    index_name,
2613                    lookup_value,
2614                    primary_key,
2615                    timestamp,
2616                } => {
2617                    let actual_state_id = override_state_id;
2618                    let state = self
2619                        .states
2620                        .get_mut(&actual_state_id)
2621                        .ok_or("State table not found")?;
2622                    let index = state
2623                        .temporal_indexes
2624                        .entry(index_name.clone())
2625                        .or_insert_with(TemporalIndex::new);
2626
2627                    let lookup_val = self.registers[*lookup_value].clone();
2628                    let pk_val = self.registers[*primary_key].clone();
2629                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2630                        val
2631                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
2632                        val as i64
2633                    } else {
2634                        return Err(format!(
2635                            "Timestamp must be a number (i64 or u64), got: {:?}",
2636                            self.registers[*timestamp]
2637                        )
2638                        .into());
2639                    };
2640
2641                    index.insert(lookup_val, pk_val, ts_val);
2642                    pc += 1;
2643                }
2644                OpCode::LookupTemporalIndex {
2645                    state_id: _,
2646                    index_name,
2647                    lookup_value,
2648                    timestamp,
2649                    dest,
2650                } => {
2651                    let actual_state_id = override_state_id;
2652                    let state = self
2653                        .states
2654                        .get(&actual_state_id)
2655                        .ok_or("State table not found")?;
2656                    let lookup_val = &self.registers[*lookup_value];
2657
2658                    let result = if self.registers[*timestamp].is_null() {
2659                        if let Some(index) = state.temporal_indexes.get(index_name) {
2660                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2661                        } else {
2662                            Value::Null
2663                        }
2664                    } else {
2665                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2666                            val
2667                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
2668                            val as i64
2669                        } else {
2670                            return Err(format!(
2671                                "Timestamp must be a number (i64 or u64), got: {:?}",
2672                                self.registers[*timestamp]
2673                            )
2674                            .into());
2675                        };
2676
2677                        if let Some(index) = state.temporal_indexes.get(index_name) {
2678                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2679                        } else {
2680                            Value::Null
2681                        }
2682                    };
2683
2684                    self.registers[*dest] = result;
2685                    pc += 1;
2686                }
2687                OpCode::UpdateLookupIndex {
2688                    state_id: _,
2689                    index_name,
2690                    lookup_value,
2691                    primary_key,
2692                } => {
2693                    let actual_state_id = override_state_id;
2694                    let state = self
2695                        .states
2696                        .get_mut(&actual_state_id)
2697                        .ok_or("State table not found")?;
2698                    let index = state
2699                        .lookup_indexes
2700                        .entry(index_name.clone())
2701                        .or_insert_with(LookupIndex::new);
2702
2703                    let lookup_val = self.registers[*lookup_value].clone();
2704                    let pk_val = self.registers[*primary_key].clone();
2705
2706                    index.insert(lookup_val.clone(), pk_val);
2707
2708                    // Track lookup keys so process_event can flush queued account updates
2709                    if let Some(key_str) = lookup_val.as_str() {
2710                        self.last_lookup_index_keys.push(key_str.to_string());
2711                    }
2712
2713                    pc += 1;
2714                }
2715                OpCode::LookupIndex {
2716                    state_id: _,
2717                    index_name,
2718                    lookup_value,
2719                    dest,
2720                } => {
2721                    let actual_state_id = override_state_id;
2722                    let mut current_value = self.registers[*lookup_value].clone();
2723                    let original_lookup_value = current_value.clone();
2724                    let mut hops = if self.is_debug_enabled() {
2725                        Some(Vec::<VmLookupHop>::new())
2726                    } else {
2727                        None
2728                    };
2729                    let mut miss_kind: Option<String> = None;
2730
2731                    const MAX_CHAIN_DEPTH: usize = 5;
2732                    let mut iterations = 0;
2733
2734                    let final_result = if self.states.contains_key(&actual_state_id) {
2735                        loop {
2736                            iterations += 1;
2737                            if iterations > MAX_CHAIN_DEPTH {
2738                                break current_value;
2739                            }
2740
2741                            let resolved = self
2742                                .states
2743                                .get(&actual_state_id)
2744                                .and_then(|state| {
2745                                    if let Some(index) = state.lookup_indexes.get(index_name) {
2746                                        if let Some(found) = index.lookup(&current_value) {
2747                                            return Some(found);
2748                                        }
2749                                    }
2750
2751                                    for (name, index) in state.lookup_indexes.iter() {
2752                                        if name == index_name {
2753                                            continue;
2754                                        }
2755                                        if let Some(found) = index.lookup(&current_value) {
2756                                            return Some(found);
2757                                        }
2758                                    }
2759
2760                                    None
2761                                })
2762                                .unwrap_or(Value::Null);
2763
2764                            let lookup_result = resolved.clone();
2765                            if let Some(hops) = hops.as_mut() {
2766                                hops.push(VmLookupHop {
2767                                    source: "lookup_index".to_string(),
2768                                    input: current_value.clone(),
2769                                    result: lookup_result.clone(),
2770                                    chained: false,
2771                                });
2772                            }
2773
2774                            let mut resolved_from_pda = false;
2775                            let resolved = if resolved.is_null() {
2776                                if let Some(pda_str) = current_value.as_str() {
2777                                    resolved_from_pda = true;
2778                                    let pda_result = self
2779                                        .states
2780                                        .get_mut(&actual_state_id)
2781                                        .and_then(|state_mut| {
2782                                            state_mut
2783                                                .pda_reverse_lookups
2784                                                .get_mut("default_pda_lookup")
2785                                        })
2786                                        .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2787                                        .map(Value::String)
2788                                        .unwrap_or(Value::Null);
2789                                    if let Some(hops) = hops.as_mut() {
2790                                        hops.push(VmLookupHop {
2791                                            source: "pda_reverse_lookup".to_string(),
2792                                            input: current_value.clone(),
2793                                            result: pda_result.clone(),
2794                                            chained: false,
2795                                        });
2796                                    }
2797                                    pda_result
2798                                } else {
2799                                    Value::Null
2800                                }
2801                            } else {
2802                                resolved
2803                            };
2804
2805                            if resolved.is_null() {
2806                                if iterations == 1 {
2807                                    if let Some(pda_str) = current_value.as_str() {
2808                                        self.last_pda_lookup_miss = Some(pda_str.to_string());
2809                                        miss_kind = Some("pda_lookup_miss".to_string());
2810                                    } else {
2811                                        miss_kind = Some("lookup_index_miss".to_string());
2812                                    }
2813                                }
2814                                break Value::Null;
2815                            }
2816
2817                            let can_chain =
2818                                self.can_resolve_further(&resolved, actual_state_id, index_name);
2819
2820                            if !can_chain {
2821                                if resolved_from_pda {
2822                                    if let Some(resolved_str) = resolved.as_str() {
2823                                        self.last_lookup_index_miss =
2824                                            Some(resolved_str.to_string());
2825                                    }
2826                                    miss_kind = Some("lookup_chain_incomplete".to_string());
2827                                    break Value::Null;
2828                                }
2829                                break resolved;
2830                            }
2831
2832                            if let Some(hops) = hops.as_mut() {
2833                                if let Some(last) = hops.last_mut() {
2834                                    last.chained = true;
2835                                }
2836                            }
2837
2838                            current_value = resolved;
2839                        }
2840                    } else {
2841                        miss_kind = Some("state_not_initialized".to_string());
2842                        Value::Null
2843                    };
2844
2845                    self.emit_debug(|| VmDebugEvent::LookupIndex {
2846                        entity_name: entity_name.to_string(),
2847                        event_type: event_type.to_string(),
2848                        index_name: index_name.clone(),
2849                        lookup_value: original_lookup_value,
2850                        hops: hops.unwrap_or_default(),
2851                        final_result: final_result.clone(),
2852                        miss_kind,
2853                    });
2854
2855                    self.registers[*dest] = final_result;
2856                    pc += 1;
2857                }
2858                OpCode::SetFieldSum {
2859                    object,
2860                    path,
2861                    value,
2862                } => {
2863                    let was_updated = self.set_field_sum(*object, path, *value)?;
2864                    if was_updated && should_emit(path) {
2865                        dirty_tracker.mark_replaced(path);
2866                    }
2867                    pc += 1;
2868                }
2869                OpCode::SetFieldIncrement { object, path } => {
2870                    let was_updated = self.set_field_increment(*object, path)?;
2871                    if was_updated && should_emit(path) {
2872                        dirty_tracker.mark_replaced(path);
2873                    }
2874                    pc += 1;
2875                }
2876                OpCode::SetFieldMin {
2877                    object,
2878                    path,
2879                    value,
2880                } => {
2881                    let was_updated = self.set_field_min(*object, path, *value)?;
2882                    if was_updated && should_emit(path) {
2883                        dirty_tracker.mark_replaced(path);
2884                    }
2885                    pc += 1;
2886                }
2887                OpCode::AddToUniqueSet {
2888                    state_id: _,
2889                    set_name,
2890                    value,
2891                    count_object,
2892                    count_path,
2893                } => {
2894                    let value_to_add = self.registers[*value].clone();
2895
2896                    // Store the unique set within the entity object, not in the state table
2897                    // This ensures each entity instance has its own unique set
2898                    let set_field_path = format!("__unique_set:{}", set_name);
2899
2900                    // Get or create the unique set from the entity object
2901                    let mut set: HashSet<Value> =
2902                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2903                            if !existing.is_null() {
2904                                serde_json::from_value(existing).unwrap_or_default()
2905                            } else {
2906                                HashSet::new()
2907                            }
2908                        } else {
2909                            HashSet::new()
2910                        };
2911
2912                    // Add value to set
2913                    let was_new = set.insert(value_to_add);
2914
2915                    // Store updated set back in the entity object
2916                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2917                    self.registers[100] = serde_json::to_value(set_as_vec)?;
2918                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2919
2920                    // Update the count field in the object
2921                    if was_new {
2922                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2923                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
2924                        if should_emit(count_path) {
2925                            dirty_tracker.mark_replaced(count_path);
2926                        }
2927                    }
2928
2929                    pc += 1;
2930                }
2931                OpCode::ConditionalSetField {
2932                    object,
2933                    path,
2934                    value,
2935                    condition_field,
2936                    condition_op,
2937                    condition_value,
2938                } => {
2939                    let field_value = self.load_field(event_value, condition_field, None)?;
2940                    let condition_met =
2941                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2942
2943                    if condition_met {
2944                        self.set_field_auto_vivify(*object, path, *value)?;
2945                        if should_emit(path) {
2946                            dirty_tracker.mark_replaced(path);
2947                        }
2948                    }
2949                    pc += 1;
2950                }
2951                OpCode::SetFieldWhen {
2952                    object,
2953                    path,
2954                    value,
2955                    when_instruction,
2956                    entity_name,
2957                    key_reg,
2958                    condition_field,
2959                    condition_op,
2960                    condition_value,
2961                } => {
2962                    let actual_state_id = override_state_id;
2963                    let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2964                        condition_field.as_ref(),
2965                        condition_op.as_ref(),
2966                        condition_value.as_ref(),
2967                    ) {
2968                        let field_value = self.load_field(event_value, field, None)?;
2969                        self.evaluate_comparison(&field_value, op, cond_value)?
2970                    } else {
2971                        true
2972                    };
2973
2974                    if !condition_met {
2975                        pc += 1;
2976                        continue;
2977                    }
2978
2979                    let signature = self
2980                        .current_context
2981                        .as_ref()
2982                        .and_then(|c| c.signature.clone())
2983                        .unwrap_or_default();
2984
2985                    let emit = should_emit(path);
2986
2987                    let instruction_seen = if !signature.is_empty() {
2988                        if let Some(state) = self.states.get(&actual_state_id) {
2989                            let mut cache = state.recent_tx_instructions.lock().unwrap();
2990                            cache
2991                                .get(&signature)
2992                                .map(|set| set.contains(when_instruction))
2993                                .unwrap_or(false)
2994                        } else {
2995                            false
2996                        }
2997                    } else {
2998                        false
2999                    };
3000
3001                    if instruction_seen {
3002                        self.set_field_auto_vivify(*object, path, *value)?;
3003                        if emit {
3004                            dirty_tracker.mark_replaced(path);
3005                        }
3006                    } else if !signature.is_empty() {
3007                        let deferred = DeferredWhenOperation {
3008                            entity_name: entity_name.clone(),
3009                            primary_key: self.registers[*key_reg].clone(),
3010                            field_path: path.clone(),
3011                            field_value: self.registers[*value].clone(),
3012                            when_instruction: when_instruction.clone(),
3013                            signature: signature.clone(),
3014                            slot: self
3015                                .current_context
3016                                .as_ref()
3017                                .and_then(|c| c.slot)
3018                                .unwrap_or(0),
3019                            deferred_at: std::time::SystemTime::now()
3020                                .duration_since(std::time::UNIX_EPOCH)
3021                                .unwrap()
3022                                .as_secs() as i64,
3023                            emit,
3024                        };
3025
3026                        if let Some(state) = self.states.get(&actual_state_id) {
3027                            let key = (signature, when_instruction.clone());
3028                            state
3029                                .deferred_when_ops
3030                                .entry(key)
3031                                .or_insert_with(Vec::new)
3032                                .push(deferred);
3033                        }
3034                    }
3035
3036                    pc += 1;
3037                }
3038                OpCode::SetFieldUnlessStopped {
3039                    object,
3040                    path,
3041                    value,
3042                    stop_field,
3043                    stop_instruction,
3044                    entity_name,
3045                    key_reg: _,
3046                } => {
3047                    let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
3048                    let stopped = stop_value.as_bool().unwrap_or(false);
3049                    let old_value = self
3050                        .is_debug_enabled()
3051                        .then(|| Self::get_value_at_path(&self.registers[*object], path))
3052                        .flatten();
3053
3054                    if stopped {
3055                        self.emit_debug(|| VmDebugEvent::FieldWrite {
3056                            entity_name: entity_name.clone(),
3057                            event_type: event_type.to_string(),
3058                            op: "set_field_unless_stopped".to_string(),
3059                            path: path.clone(),
3060                            old_value,
3061                            new_value: Self::get_value_at_path(&self.registers[*object], path),
3062                            applied: false,
3063                            reason: Some(format!("stop_flag:{}", stop_instruction)),
3064                        });
3065                        tracing::debug!(
3066                            entity = %entity_name,
3067                            field = %path,
3068                            stop_field = %stop_field,
3069                            stop_instruction = %stop_instruction,
3070                            "stop flag set; skipping field update"
3071                        );
3072                        pc += 1;
3073                        continue;
3074                    }
3075
3076                    self.set_field_auto_vivify(*object, path, *value)?;
3077                    if should_emit(path) {
3078                        dirty_tracker.mark_replaced(path);
3079                    }
3080                    if self.is_debug_enabled() {
3081                        self.emit_debug(|| VmDebugEvent::FieldWrite {
3082                            entity_name: entity_name.clone(),
3083                            event_type: event_type.to_string(),
3084                            op: "set_field_unless_stopped".to_string(),
3085                            path: path.clone(),
3086                            old_value,
3087                            new_value: Self::get_value_at_path(&self.registers[*object], path),
3088                            applied: true,
3089                            reason: None,
3090                        });
3091                    }
3092                    pc += 1;
3093                }
3094                OpCode::ConditionalIncrement {
3095                    object,
3096                    path,
3097                    condition_field,
3098                    condition_op,
3099                    condition_value,
3100                } => {
3101                    let field_value = self.load_field(event_value, condition_field, None)?;
3102                    let condition_met =
3103                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
3104
3105                    if condition_met {
3106                        let was_updated = self.set_field_increment(*object, path)?;
3107                        if was_updated && should_emit(path) {
3108                            dirty_tracker.mark_replaced(path);
3109                        }
3110                    }
3111                    pc += 1;
3112                }
3113                OpCode::EvaluateComputedFields {
3114                    state,
3115                    computed_paths,
3116                } => {
3117                    if let Some(evaluator) = entity_evaluator {
3118                        let old_values: Vec<_> = computed_paths
3119                            .iter()
3120                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
3121                            .collect();
3122
3123                        let state_value = &mut self.registers[*state];
3124                        let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3125                        let context_timestamp = self
3126                            .current_context
3127                            .as_ref()
3128                            .map(|c| c.timestamp())
3129                            .unwrap_or_else(|| {
3130                                std::time::SystemTime::now()
3131                                    .duration_since(std::time::UNIX_EPOCH)
3132                                    .unwrap()
3133                                    .as_secs() as i64
3134                            });
3135                        let eval_result = evaluator(state_value, context_slot, context_timestamp);
3136
3137                        if eval_result.is_ok() {
3138                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
3139                                let new_value =
3140                                    Self::get_value_at_path(&self.registers[*state], path);
3141
3142                                if new_value != *old_value && should_emit(path) {
3143                                    dirty_tracker.mark_replaced(path);
3144                                }
3145                            }
3146                        }
3147                    }
3148                    pc += 1;
3149                }
3150                OpCode::QueueResolver {
3151                    state_id: _,
3152                    entity_name,
3153                    resolver,
3154                    input_path,
3155                    input_value,
3156                    url_template,
3157                    strategy,
3158                    extracts,
3159                    condition,
3160                    schedule_at,
3161                    state,
3162                    key,
3163                } => {
3164                    let actual_state_id = override_state_id;
3165
3166                    // Skip resolvers for reprocessed cached data from PDA mapping changes.
3167                    // Stale data can carry wrong field values (e.g. old entropy_value) and
3168                    // wrong schedule_at slots that would lock in incorrect results via SetOnce.
3169                    if self
3170                        .current_context
3171                        .as_ref()
3172                        .map(|c| c.skip_resolvers)
3173                        .unwrap_or(false)
3174                    {
3175                        pc += 1;
3176                        continue;
3177                    }
3178
3179                    // Evaluate condition if present
3180                    if let Some(cond) = condition {
3181                        let field_val =
3182                            Self::get_value_at_path(&self.registers[*state], &cond.field_path)
3183                                .unwrap_or(Value::Null);
3184                        let condition_met =
3185                            self.evaluate_comparison(&field_val, &cond.op, &cond.value)?;
3186
3187                        if !condition_met {
3188                            pc += 1;
3189                            continue;
3190                        }
3191                    }
3192
3193                    // Check schedule_at: defer if target slot is in the future
3194                    if let Some(schedule_path) = schedule_at {
3195                        let target_val =
3196                            Self::get_value_at_path(&self.registers[*state], schedule_path);
3197
3198                        match target_val.and_then(|v| v.as_u64()) {
3199                            Some(target_slot) => {
3200                                let current_slot = self
3201                                    .current_context
3202                                    .as_ref()
3203                                    .and_then(|ctx| ctx.slot)
3204                                    .unwrap_or(0);
3205                                if current_slot < target_slot {
3206                                    let key_value = &self.registers[*key];
3207                                    if !key_value.is_null() {
3208                                        self.scheduled_callbacks.push((
3209                                            target_slot,
3210                                            ScheduledCallback {
3211                                                state_id: actual_state_id,
3212                                                entity_name: entity_name.clone(),
3213                                                primary_key: key_value.clone(),
3214                                                resolver: resolver.clone(),
3215                                                url_template: url_template.clone(),
3216                                                input_value: input_value.clone(),
3217                                                input_path: input_path.clone(),
3218                                                condition: condition.clone(),
3219                                                strategy: strategy.clone(),
3220                                                extracts: extracts.clone(),
3221                                                retry_count: 0,
3222                                            },
3223                                        ));
3224                                    }
3225                                    pc += 1;
3226                                    continue;
3227                                }
3228                                // current_slot >= target_slot: fall through to immediate resolution
3229                            }
3230                            None => {
3231                                // schedule_at path is missing or value is not a u64 —
3232                                // skip resolver entirely rather than executing immediately,
3233                                // since the state likely hasn't been fully populated yet.
3234                                pc += 1;
3235                                continue;
3236                            }
3237                        }
3238                    }
3239
3240                    // Build resolver input from template, literal value, or field path
3241                    let resolved_input = if let Some(template) = url_template {
3242                        crate::scheduler::build_url_from_template(template, &self.registers[*state])
3243                            .map(Value::String)
3244                    } else if let Some(value) = input_value {
3245                        Some(value.clone())
3246                    } else if let Some(path) = input_path.as_ref() {
3247                        Self::get_value_at_path(&self.registers[*state], path)
3248                    } else {
3249                        None
3250                    };
3251
3252                    if let Some(input) = resolved_input {
3253                        let key_value = &self.registers[*key];
3254
3255                        if input.is_null() || key_value.is_null() {
3256                            pc += 1;
3257                            continue;
3258                        }
3259
3260                        if matches!(strategy, ResolveStrategy::SetOnce)
3261                            // all(): fire resolver if any target is still null.
3262                            // Already-set fields are protected from overwrite by
3263                            // set_value_at_path's SetOnce null-source guard.
3264                            && extracts.iter().all(|extract| {
3265                                match Self::get_value_at_path(
3266                                    &self.registers[*state],
3267                                    &extract.target_path,
3268                                ) {
3269                                    Some(value) => !value.is_null(),
3270                                    None => false,
3271                                }
3272                            })
3273                        {
3274                            pc += 1;
3275                            continue;
3276                        }
3277
3278                        let cache_key = resolver_cache_key(resolver, &input);
3279
3280                        if let Some(cached) = self.get_cached_resolver_value(&cache_key) {
3281                            Self::apply_resolver_extractions_to_value(
3282                                &mut self.registers[*state],
3283                                &cached,
3284                                extracts,
3285                                &mut dirty_tracker,
3286                                &should_emit,
3287                            )?;
3288                        } else {
3289                            let target = ResolverTarget {
3290                                state_id: actual_state_id,
3291                                entity_name: entity_name.clone(),
3292                                primary_key: self.registers[*key].clone(),
3293                                extracts: extracts.clone(),
3294                            };
3295
3296                            self.enqueue_resolver_request(
3297                                cache_key,
3298                                resolver.clone(),
3299                                input,
3300                                target,
3301                            );
3302                        }
3303                    }
3304
3305                    pc += 1;
3306                }
3307                OpCode::UpdatePdaReverseLookup {
3308                    state_id: _,
3309                    lookup_name,
3310                    pda_address,
3311                    primary_key,
3312                } => {
3313                    let actual_state_id = override_state_id;
3314                    let pda_val = self.registers[*pda_address].clone();
3315                    let pk_val = self.registers[*primary_key].clone();
3316
3317                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
3318                        let pending = self.update_pda_reverse_lookup(
3319                            actual_state_id,
3320                            lookup_name,
3321                            pda_str.to_string(),
3322                            pk_str.to_string(),
3323                        )?;
3324                        self.pending_pda_reprocess_updates.extend(pending);
3325                        self.last_pda_registered = Some(pda_str.to_string());
3326                        self.emit_debug(|| VmDebugEvent::PdaReverseLookupUpdate {
3327                            entity_name: entity_name.to_string(),
3328                            event_type: event_type.to_string(),
3329                            lookup_name: lookup_name.clone(),
3330                            pda_address: pda_str.to_string(),
3331                            primary_key: pk_val.clone(),
3332                        });
3333                    } else if !pk_val.is_null() {
3334                        if let Some(pk_num) = pk_val.as_u64() {
3335                            if let Some(pda_str) = pda_val.as_str() {
3336                                let pending = self.update_pda_reverse_lookup(
3337                                    actual_state_id,
3338                                    lookup_name,
3339                                    pda_str.to_string(),
3340                                    pk_num.to_string(),
3341                                )?;
3342                                self.pending_pda_reprocess_updates.extend(pending);
3343                                self.last_pda_registered = Some(pda_str.to_string());
3344                                self.emit_debug(|| VmDebugEvent::PdaReverseLookupUpdate {
3345                                    entity_name: entity_name.to_string(),
3346                                    event_type: event_type.to_string(),
3347                                    lookup_name: lookup_name.clone(),
3348                                    pda_address: pda_str.to_string(),
3349                                    primary_key: pk_val.clone(),
3350                                });
3351                            }
3352                        }
3353                    }
3354
3355                    pc += 1;
3356                }
3357            }
3358
3359            self.instructions_executed += 1;
3360        }
3361
3362        Ok(output)
3363    }
3364
3365    fn load_field(
3366        &self,
3367        event_value: &Value,
3368        path: &FieldPath,
3369        default: Option<&Value>,
3370    ) -> Result<Value> {
3371        if path.segments.is_empty() {
3372            if let Some(obj) = event_value.as_object() {
3373                let filtered: serde_json::Map<String, Value> = obj
3374                    .iter()
3375                    .filter(|(k, _)| !k.starts_with("__"))
3376                    .map(|(k, v)| (k.clone(), v.clone()))
3377                    .collect();
3378                return Ok(Value::Object(filtered));
3379            }
3380            return Ok(event_value.clone());
3381        }
3382
3383        let mut current = event_value;
3384        for segment in path.segments.iter() {
3385            current = match current.get(segment) {
3386                Some(v) => v,
3387                None => {
3388                    tracing::trace!(
3389                        "load_field: segment={:?} not found in {:?}, returning default",
3390                        segment,
3391                        current
3392                    );
3393                    return Ok(default.cloned().unwrap_or(Value::Null));
3394                }
3395            };
3396        }
3397
3398        tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
3399        Ok(current.clone())
3400    }
3401
3402    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
3403        let mut current = value;
3404        for segment in path.split('.') {
3405            current = current.get(segment)?;
3406        }
3407        Some(current.clone())
3408    }
3409
3410    fn set_field_auto_vivify(
3411        &mut self,
3412        object_reg: Register,
3413        path: &str,
3414        value_reg: Register,
3415    ) -> Result<()> {
3416        let compiled = self.get_compiled_path(path);
3417        let segments = compiled.segments();
3418        let value = self.registers[value_reg].clone();
3419
3420        if !self.registers[object_reg].is_object() {
3421            self.registers[object_reg] = json!({});
3422        }
3423
3424        let obj = self.registers[object_reg]
3425            .as_object_mut()
3426            .ok_or("Not an object")?;
3427
3428        let mut current = obj;
3429        for (i, segment) in segments.iter().enumerate() {
3430            if i == segments.len() - 1 {
3431                current.insert(segment.to_string(), value);
3432                return Ok(());
3433            } else {
3434                current
3435                    .entry(segment.to_string())
3436                    .or_insert_with(|| json!({}));
3437                current = current
3438                    .get_mut(segment)
3439                    .and_then(|v| v.as_object_mut())
3440                    .ok_or("Path collision: expected object")?;
3441            }
3442        }
3443
3444        Ok(())
3445    }
3446
3447    fn set_field_if_null(
3448        &mut self,
3449        object_reg: Register,
3450        path: &str,
3451        value_reg: Register,
3452    ) -> Result<bool> {
3453        let compiled = self.get_compiled_path(path);
3454        let segments = compiled.segments();
3455        let value = self.registers[value_reg].clone();
3456
3457        // SetOnce should only set meaningful values. A null source typically means
3458        // the field doesn't exist in this event type (e.g., instruction events don't
3459        // have account data). Skip to preserve any existing value.
3460        if value.is_null() {
3461            return Ok(false);
3462        }
3463
3464        if !self.registers[object_reg].is_object() {
3465            self.registers[object_reg] = json!({});
3466        }
3467
3468        let obj = self.registers[object_reg]
3469            .as_object_mut()
3470            .ok_or("Not an object")?;
3471
3472        let mut current = obj;
3473        for (i, segment) in segments.iter().enumerate() {
3474            if i == segments.len() - 1 {
3475                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
3476                    current.insert(segment.to_string(), value);
3477                    return Ok(true);
3478                }
3479                return Ok(false);
3480            } else {
3481                current
3482                    .entry(segment.to_string())
3483                    .or_insert_with(|| json!({}));
3484                current = current
3485                    .get_mut(segment)
3486                    .and_then(|v| v.as_object_mut())
3487                    .ok_or("Path collision: expected object")?;
3488            }
3489        }
3490
3491        Ok(false)
3492    }
3493
3494    fn set_field_max(
3495        &mut self,
3496        object_reg: Register,
3497        path: &str,
3498        value_reg: Register,
3499    ) -> Result<bool> {
3500        let compiled = self.get_compiled_path(path);
3501        let segments = compiled.segments();
3502        let new_value = self.registers[value_reg].clone();
3503
3504        if !self.registers[object_reg].is_object() {
3505            self.registers[object_reg] = json!({});
3506        }
3507
3508        let obj = self.registers[object_reg]
3509            .as_object_mut()
3510            .ok_or("Not an object")?;
3511
3512        let mut current = obj;
3513        for (i, segment) in segments.iter().enumerate() {
3514            if i == segments.len() - 1 {
3515                let should_update = if let Some(current_value) = current.get(segment) {
3516                    if current_value.is_null() {
3517                        true
3518                    } else {
3519                        match (current_value.as_i64(), new_value.as_i64()) {
3520                            (Some(current_val), Some(new_val)) => new_val > current_val,
3521                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3522                                new_value.as_u64().unwrap() as i64 > current_val
3523                            }
3524                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3525                                new_val > current_value.as_u64().unwrap() as i64
3526                            }
3527                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3528                                (Some(current_val), Some(new_val)) => new_val > current_val,
3529                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3530                                    (Some(current_val), Some(new_val)) => new_val > current_val,
3531                                    _ => false,
3532                                },
3533                            },
3534                            _ => false,
3535                        }
3536                    }
3537                } else {
3538                    true
3539                };
3540
3541                if should_update {
3542                    current.insert(segment.to_string(), new_value);
3543                    return Ok(true);
3544                }
3545                return Ok(false);
3546            } else {
3547                current
3548                    .entry(segment.to_string())
3549                    .or_insert_with(|| json!({}));
3550                current = current
3551                    .get_mut(segment)
3552                    .and_then(|v| v.as_object_mut())
3553                    .ok_or("Path collision: expected object")?;
3554            }
3555        }
3556
3557        Ok(false)
3558    }
3559
3560    fn set_field_sum(
3561        &mut self,
3562        object_reg: Register,
3563        path: &str,
3564        value_reg: Register,
3565    ) -> Result<bool> {
3566        let compiled = self.get_compiled_path(path);
3567        let segments = compiled.segments();
3568        let new_value = &self.registers[value_reg];
3569
3570        // Extract numeric value before borrowing object_reg mutably
3571        tracing::trace!(
3572            "set_field_sum: path={:?}, value={:?}, value_type={}",
3573            path,
3574            new_value,
3575            match new_value {
3576                serde_json::Value::Null => "null",
3577                serde_json::Value::Bool(_) => "bool",
3578                serde_json::Value::Number(_) => "number",
3579                serde_json::Value::String(_) => "string",
3580                serde_json::Value::Array(_) => "array",
3581                serde_json::Value::Object(_) => "object",
3582            }
3583        );
3584        let new_val_num = new_value
3585            .as_i64()
3586            .or_else(|| new_value.as_u64().map(|n| n as i64))
3587            .ok_or("Sum requires numeric value")?;
3588
3589        if !self.registers[object_reg].is_object() {
3590            self.registers[object_reg] = json!({});
3591        }
3592
3593        let obj = self.registers[object_reg]
3594            .as_object_mut()
3595            .ok_or("Not an object")?;
3596
3597        let mut current = obj;
3598        for (i, segment) in segments.iter().enumerate() {
3599            if i == segments.len() - 1 {
3600                let current_val = current
3601                    .get(segment)
3602                    .and_then(|v| {
3603                        if v.is_null() {
3604                            None
3605                        } else {
3606                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3607                        }
3608                    })
3609                    .unwrap_or(0);
3610
3611                let sum = current_val + new_val_num;
3612                current.insert(segment.to_string(), json!(sum));
3613                return Ok(true);
3614            } else {
3615                current
3616                    .entry(segment.to_string())
3617                    .or_insert_with(|| json!({}));
3618                current = current
3619                    .get_mut(segment)
3620                    .and_then(|v| v.as_object_mut())
3621                    .ok_or("Path collision: expected object")?;
3622            }
3623        }
3624
3625        Ok(false)
3626    }
3627
3628    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3629        let compiled = self.get_compiled_path(path);
3630        let segments = compiled.segments();
3631
3632        if !self.registers[object_reg].is_object() {
3633            self.registers[object_reg] = json!({});
3634        }
3635
3636        let obj = self.registers[object_reg]
3637            .as_object_mut()
3638            .ok_or("Not an object")?;
3639
3640        let mut current = obj;
3641        for (i, segment) in segments.iter().enumerate() {
3642            if i == segments.len() - 1 {
3643                // Get current value (default to 0 if null/missing)
3644                let current_val = current
3645                    .get(segment)
3646                    .and_then(|v| {
3647                        if v.is_null() {
3648                            None
3649                        } else {
3650                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3651                        }
3652                    })
3653                    .unwrap_or(0);
3654
3655                let incremented = current_val + 1;
3656                current.insert(segment.to_string(), json!(incremented));
3657                return Ok(true);
3658            } else {
3659                current
3660                    .entry(segment.to_string())
3661                    .or_insert_with(|| json!({}));
3662                current = current
3663                    .get_mut(segment)
3664                    .and_then(|v| v.as_object_mut())
3665                    .ok_or("Path collision: expected object")?;
3666            }
3667        }
3668
3669        Ok(false)
3670    }
3671
3672    fn set_field_min(
3673        &mut self,
3674        object_reg: Register,
3675        path: &str,
3676        value_reg: Register,
3677    ) -> Result<bool> {
3678        let compiled = self.get_compiled_path(path);
3679        let segments = compiled.segments();
3680        let new_value = self.registers[value_reg].clone();
3681
3682        if !self.registers[object_reg].is_object() {
3683            self.registers[object_reg] = json!({});
3684        }
3685
3686        let obj = self.registers[object_reg]
3687            .as_object_mut()
3688            .ok_or("Not an object")?;
3689
3690        let mut current = obj;
3691        for (i, segment) in segments.iter().enumerate() {
3692            if i == segments.len() - 1 {
3693                let should_update = if let Some(current_value) = current.get(segment) {
3694                    if current_value.is_null() {
3695                        true
3696                    } else {
3697                        match (current_value.as_i64(), new_value.as_i64()) {
3698                            (Some(current_val), Some(new_val)) => new_val < current_val,
3699                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3700                                (new_value.as_u64().unwrap() as i64) < current_val
3701                            }
3702                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3703                                new_val < current_value.as_u64().unwrap() as i64
3704                            }
3705                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3706                                (Some(current_val), Some(new_val)) => new_val < current_val,
3707                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3708                                    (Some(current_val), Some(new_val)) => new_val < current_val,
3709                                    _ => false,
3710                                },
3711                            },
3712                            _ => false,
3713                        }
3714                    }
3715                } else {
3716                    true
3717                };
3718
3719                if should_update {
3720                    current.insert(segment.to_string(), new_value);
3721                    return Ok(true);
3722                }
3723                return Ok(false);
3724            } else {
3725                current
3726                    .entry(segment.to_string())
3727                    .or_insert_with(|| json!({}));
3728                current = current
3729                    .get_mut(segment)
3730                    .and_then(|v| v.as_object_mut())
3731                    .ok_or("Path collision: expected object")?;
3732            }
3733        }
3734
3735        Ok(false)
3736    }
3737
3738    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3739        let compiled = self.get_compiled_path(path);
3740        let segments = compiled.segments();
3741        let mut current = &self.registers[object_reg];
3742
3743        for segment in segments {
3744            current = current
3745                .get(segment)
3746                .ok_or_else(|| format!("Field not found: {}", segment))?;
3747        }
3748
3749        Ok(current.clone())
3750    }
3751
3752    fn append_to_array(
3753        &mut self,
3754        object_reg: Register,
3755        path: &str,
3756        value_reg: Register,
3757        max_length: usize,
3758    ) -> Result<()> {
3759        let compiled = self.get_compiled_path(path);
3760        let segments = compiled.segments();
3761        let value = self.registers[value_reg].clone();
3762
3763        if !self.registers[object_reg].is_object() {
3764            self.registers[object_reg] = json!({});
3765        }
3766
3767        let obj = self.registers[object_reg]
3768            .as_object_mut()
3769            .ok_or("Not an object")?;
3770
3771        let mut current = obj;
3772        for (i, segment) in segments.iter().enumerate() {
3773            if i == segments.len() - 1 {
3774                current
3775                    .entry(segment.to_string())
3776                    .or_insert_with(|| json!([]));
3777                let arr = current
3778                    .get_mut(segment)
3779                    .and_then(|v| v.as_array_mut())
3780                    .ok_or("Path is not an array")?;
3781                arr.push(value.clone());
3782
3783                if arr.len() > max_length {
3784                    let excess = arr.len() - max_length;
3785                    arr.drain(0..excess);
3786                }
3787            } else {
3788                current
3789                    .entry(segment.to_string())
3790                    .or_insert_with(|| json!({}));
3791                current = current
3792                    .get_mut(segment)
3793                    .and_then(|v| v.as_object_mut())
3794                    .ok_or("Path collision: expected object")?;
3795            }
3796        }
3797
3798        Ok(())
3799    }
3800
3801    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3802        let value = &self.registers[reg];
3803        let transformed = Self::apply_transformation(value, transformation)?;
3804        self.registers[reg] = transformed;
3805        Ok(())
3806    }
3807
3808    fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3809        match transformation {
3810            Transformation::HexEncode => {
3811                if let Some(arr) = value.as_array() {
3812                    let bytes: Vec<u8> = arr
3813                        .iter()
3814                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3815                        .collect();
3816                    let hex = hex::encode(&bytes);
3817                    Ok(json!(hex))
3818                } else if value.is_string() {
3819                    Ok(value.clone())
3820                } else {
3821                    Err("HexEncode requires an array of numbers".into())
3822                }
3823            }
3824            Transformation::HexDecode => {
3825                if let Some(s) = value.as_str() {
3826                    let s = s.strip_prefix("0x").unwrap_or(s);
3827                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3828                    Ok(json!(bytes))
3829                } else {
3830                    Err("HexDecode requires a string".into())
3831                }
3832            }
3833            Transformation::Base58Encode => {
3834                if let Some(arr) = value.as_array() {
3835                    let bytes: Vec<u8> = arr
3836                        .iter()
3837                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3838                        .collect();
3839                    let encoded = bs58::encode(&bytes).into_string();
3840                    Ok(json!(encoded))
3841                } else if value.is_string() {
3842                    Ok(value.clone())
3843                } else {
3844                    Err("Base58Encode requires an array of numbers".into())
3845                }
3846            }
3847            Transformation::Base58Decode => {
3848                if let Some(s) = value.as_str() {
3849                    let bytes = bs58::decode(s)
3850                        .into_vec()
3851                        .map_err(|e| format!("Base58 decode error: {}", e))?;
3852                    Ok(json!(bytes))
3853                } else {
3854                    Err("Base58Decode requires a string".into())
3855                }
3856            }
3857            Transformation::ToString => Ok(json!(value.to_string())),
3858            Transformation::ToNumber => {
3859                if let Some(s) = value.as_str() {
3860                    let n = s
3861                        .parse::<i64>()
3862                        .map_err(|e| format!("Parse error: {}", e))?;
3863                    Ok(json!(n))
3864                } else {
3865                    Ok(value.clone())
3866                }
3867            }
3868        }
3869    }
3870
3871    fn evaluate_comparison(
3872        &self,
3873        field_value: &Value,
3874        op: &ComparisonOp,
3875        condition_value: &Value,
3876    ) -> Result<bool> {
3877        use ComparisonOp::*;
3878
3879        match op {
3880            Equal => Ok(field_value == condition_value),
3881            NotEqual => Ok(field_value != condition_value),
3882            GreaterThan => {
3883                // Try to compare as numbers
3884                match (field_value.as_i64(), condition_value.as_i64()) {
3885                    (Some(a), Some(b)) => Ok(a > b),
3886                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
3887                        (Some(a), Some(b)) => Ok(a > b),
3888                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
3889                            (Some(a), Some(b)) => Ok(a > b),
3890                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3891                        },
3892                    },
3893                }
3894            }
3895            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3896                (Some(a), Some(b)) => Ok(a >= b),
3897                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3898                    (Some(a), Some(b)) => Ok(a >= b),
3899                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3900                        (Some(a), Some(b)) => Ok(a >= b),
3901                        _ => {
3902                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3903                        }
3904                    },
3905                },
3906            },
3907            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3908                (Some(a), Some(b)) => Ok(a < b),
3909                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3910                    (Some(a), Some(b)) => Ok(a < b),
3911                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3912                        (Some(a), Some(b)) => Ok(a < b),
3913                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
3914                    },
3915                },
3916            },
3917            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3918                (Some(a), Some(b)) => Ok(a <= b),
3919                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3920                    (Some(a), Some(b)) => Ok(a <= b),
3921                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3922                        (Some(a), Some(b)) => Ok(a <= b),
3923                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3924                    },
3925                },
3926            },
3927        }
3928    }
3929
3930    fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3931        if let Some(state) = self.states.get(&state_id) {
3932            if let Some(index) = state.lookup_indexes.get(index_name) {
3933                if index.lookup(value).is_some() {
3934                    return true;
3935                }
3936            }
3937
3938            for (name, index) in state.lookup_indexes.iter() {
3939                if name == index_name {
3940                    continue;
3941                }
3942                if index.lookup(value).is_some() {
3943                    return true;
3944                }
3945            }
3946
3947            if let Some(pda_str) = value.as_str() {
3948                if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3949                    if pda_lookup.contains(pda_str) {
3950                        return true;
3951                    }
3952                }
3953            }
3954        }
3955
3956        false
3957    }
3958
3959    #[allow(clippy::type_complexity)]
3960    fn apply_deferred_when_op(
3961        &mut self,
3962        state_id: u32,
3963        op: &DeferredWhenOperation,
3964        entity_evaluator: Option<
3965            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
3966        >,
3967        computed_paths: Option<&[String]>,
3968    ) -> Result<Vec<Mutation>> {
3969        let state = self.states.get(&state_id).ok_or("State not found")?;
3970
3971        if op.primary_key.is_null() {
3972            return Ok(vec![]);
3973        }
3974
3975        let mut entity_state = state
3976            .get_and_touch(&op.primary_key)
3977            .unwrap_or_else(|| json!({}));
3978
3979        // Track old values of computed fields before setting the new value
3980        let old_computed_values: Vec<_> = computed_paths
3981            .map(|paths| {
3982                paths
3983                    .iter()
3984                    .map(|path| Self::get_value_at_path(&entity_state, path))
3985                    .collect()
3986            })
3987            .unwrap_or_default();
3988
3989        Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3990
3991        // Re-evaluate computed fields if an evaluator is provided
3992        if let Some(evaluator) = entity_evaluator {
3993            let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3994            let context_timestamp = self
3995                .current_context
3996                .as_ref()
3997                .map(|c| c.timestamp())
3998                .unwrap_or_else(|| {
3999                    std::time::SystemTime::now()
4000                        .duration_since(std::time::UNIX_EPOCH)
4001                        .unwrap()
4002                        .as_secs() as i64
4003                });
4004
4005            tracing::debug!(
4006                entity_name = %op.entity_name,
4007                primary_key = %op.primary_key,
4008                field_path = %op.field_path,
4009                "Re-evaluating computed fields after deferred when-op"
4010            );
4011
4012            if let Err(e) = evaluator(&mut entity_state, context_slot, context_timestamp) {
4013                tracing::warn!(
4014                    entity_name = %op.entity_name,
4015                    primary_key = %op.primary_key,
4016                    error = %e,
4017                    "Failed to evaluate computed fields after deferred when-op"
4018                );
4019            }
4020        }
4021
4022        state.insert_with_eviction(op.primary_key.clone(), entity_state.clone());
4023
4024        if !op.emit {
4025            return Ok(vec![]);
4026        }
4027
4028        let mut patch = json!({});
4029        Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
4030
4031        // Add computed field changes to the patch
4032        if let Some(paths) = computed_paths {
4033            tracing::debug!(
4034                entity_name = %op.entity_name,
4035                primary_key = %op.primary_key,
4036                computed_paths_count = paths.len(),
4037                "Checking computed fields for changes after deferred when-op"
4038            );
4039            for (path, old_value) in paths.iter().zip(old_computed_values.iter()) {
4040                let new_value = Self::get_value_at_path(&entity_state, path);
4041                tracing::debug!(
4042                    entity_name = %op.entity_name,
4043                    primary_key = %op.primary_key,
4044                    field_path = %path,
4045                    old_value = ?old_value,
4046                    new_value = ?new_value,
4047                    "Comparing computed field values"
4048                );
4049                if let Some(ref new_val) = new_value {
4050                    if Some(new_val) != old_value.as_ref() {
4051                        Self::set_nested_field_value(&mut patch, path, new_val.clone())?;
4052                        tracing::info!(
4053                            entity_name = %op.entity_name,
4054                            primary_key = %op.primary_key,
4055                            field_path = %path,
4056                            "Computed field changed after deferred when-op, including in mutation"
4057                        );
4058                    }
4059                }
4060            }
4061        }
4062
4063        Ok(vec![Mutation {
4064            export: op.entity_name.clone(),
4065            key: op.primary_key.clone(),
4066            patch,
4067            append: vec![],
4068        }])
4069    }
4070
4071    fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
4072        let parts: Vec<&str> = path.split('.').collect();
4073        let mut current = obj;
4074
4075        for (i, part) in parts.iter().enumerate() {
4076            if i == parts.len() - 1 {
4077                if let Some(map) = current.as_object_mut() {
4078                    map.insert(part.to_string(), value);
4079                    return Ok(());
4080                }
4081                return Err("Cannot set field on non-object".into());
4082            }
4083
4084            if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
4085                if let Some(map) = current.as_object_mut() {
4086                    map.insert(part.to_string(), json!({}));
4087                }
4088            }
4089
4090            current = current.get_mut(*part).ok_or("Path navigation failed")?;
4091        }
4092
4093        Ok(())
4094    }
4095
4096    pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
4097        let now = std::time::SystemTime::now()
4098            .duration_since(std::time::UNIX_EPOCH)
4099            .unwrap()
4100            .as_secs() as i64;
4101
4102        let state = match self.states.get(&state_id) {
4103            Some(s) => s,
4104            None => return 0,
4105        };
4106
4107        let mut removed = 0;
4108        state.deferred_when_ops.retain(|_, ops| {
4109            let before = ops.len();
4110            ops.retain(|op| now - op.deferred_at < max_age_secs);
4111            removed += before - ops.len();
4112            !ops.is_empty()
4113        });
4114
4115        removed
4116    }
4117
4118    /// Update a PDA reverse lookup and return pending updates for reprocessing.
4119    /// Returns any pending account updates that were queued for this PDA.
4120    /// ```ignore
4121    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
4122    /// for update in pending {
4123    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
4124    /// }
4125    /// ```
4126    #[cfg_attr(feature = "otel", instrument(
4127        name = "vm.update_pda_lookup",
4128        skip(self),
4129        fields(
4130            pda = %pda_address,
4131            seed = %seed_value,
4132        )
4133    ))]
4134    pub fn update_pda_reverse_lookup(
4135        &mut self,
4136        state_id: u32,
4137        lookup_name: &str,
4138        pda_address: String,
4139        seed_value: String,
4140    ) -> Result<Vec<PendingAccountUpdate>> {
4141        let state = self
4142            .states
4143            .get_mut(&state_id)
4144            .ok_or("State table not found")?;
4145
4146        let lookup = state
4147            .pda_reverse_lookups
4148            .entry(lookup_name.to_string())
4149            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
4150
4151        // Detect if the PDA mapping is CHANGING (same PDA, different seed).
4152        // This happens at round boundaries when e.g. entropyVar is remapped
4153        // from old_round to new_round by a Reset instruction.
4154        let old_seed = lookup.index.peek(&pda_address).cloned();
4155        let mapping_changed = old_seed
4156            .as_ref()
4157            .map(|old| old != &seed_value)
4158            .unwrap_or(false);
4159
4160        if !mapping_changed && old_seed.is_none() {
4161            tracing::info!(
4162                pda = %pda_address,
4163                seed = %seed_value,
4164                "[PDA] First-time PDA reverse lookup established"
4165            );
4166        } else if !mapping_changed {
4167            tracing::debug!(
4168                pda = %pda_address,
4169                seed = %seed_value,
4170                "[PDA] PDA reverse lookup re-registered (same mapping)"
4171            );
4172        }
4173
4174        let evicted_pda = lookup.insert(pda_address.clone(), seed_value.clone());
4175
4176        if let Some(ref evicted) = evicted_pda {
4177            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
4178                let count = evicted_updates.len();
4179                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4180            }
4181        }
4182
4183        // Flush pending updates from QueueUntil for this PDA
4184        let mut pending = self.flush_pending_updates(state_id, &pda_address)?;
4185
4186        // When the mapping changed, the last account update for this PDA was
4187        // processed with the OLD seed (wrong key).  We need to:
4188        // 1. Remove stale lookup-index entries that map this PDA address to the
4189        //    old primary key — otherwise LookupIndex resolves the stale entry
4190        //    before PDA reverse lookup is even tried.
4191        // 2. Pull the cached account data and return it for reprocessing with
4192        //    the new mapping.
4193        if mapping_changed {
4194            if let Some(state) = self.states.get(&state_id) {
4195                // Clear stale lookup-index entries for this PDA address
4196                for index in state.lookup_indexes.values() {
4197                    index.remove(&Value::String(pda_address.clone()));
4198                }
4199
4200                if let Some((_, mut cached)) = state.last_account_data.remove(&pda_address) {
4201                    tracing::info!(
4202                        pda = %pda_address,
4203                        old_seed = ?old_seed,
4204                        new_seed = %seed_value,
4205                        account_type = %cached.account_type,
4206                        "PDA mapping changed — clearing stale indexes and reprocessing cached data"
4207                    );
4208                    cached.is_stale_reprocess = true;
4209                    pending.push(cached);
4210                }
4211            }
4212        }
4213
4214        Ok(pending)
4215    }
4216
4217    /// Clean up expired pending updates that are older than the TTL
4218    ///
4219    /// Returns the number of updates that were removed.
4220    /// This should be called periodically to prevent memory leaks from orphaned updates.
4221    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
4222        let state = match self.states.get_mut(&state_id) {
4223            Some(s) => s,
4224            None => return 0,
4225        };
4226
4227        let now = std::time::SystemTime::now()
4228            .duration_since(std::time::UNIX_EPOCH)
4229            .unwrap()
4230            .as_secs() as i64;
4231
4232        let mut removed_count = 0;
4233
4234        // Iterate through all pending updates and remove expired ones
4235        state.pending_updates.retain(|_pda_address, updates| {
4236            let original_len = updates.len();
4237
4238            updates.retain(|update| {
4239                let age = now - update.queued_at;
4240                age <= PENDING_UPDATE_TTL_SECONDS
4241            });
4242
4243            removed_count += original_len - updates.len();
4244
4245            // Remove the entry entirely if no updates remain
4246            !updates.is_empty()
4247        });
4248
4249        // Update the global counter
4250        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
4251
4252        if removed_count > 0 {
4253            #[cfg(feature = "otel")]
4254            crate::vm_metrics::record_pending_updates_expired(
4255                removed_count as u64,
4256                &state.entity_name,
4257            );
4258        }
4259
4260        removed_count
4261    }
4262
4263    /// Queue an account update for later processing when PDA reverse lookup is not yet available
4264    ///
4265    /// # Workflow
4266    ///
4267    /// This implements a deferred processing pattern for account updates when the PDA reverse
4268    /// lookup needed to resolve the primary key is not yet available:
4269    ///
4270    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
4271    ///    is not available, call `queue_account_update()` to queue it for later.
4272    ///
4273    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
4274    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
4275    ///
4276    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
4277    ///    ```ignore
4278    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
4279    ///    for update in pending {
4280    ///        let mutations = vm.process_event(
4281    ///            &bytecode, update.account_data, &update.account_type, None, None
4282    ///        )?;
4283    ///    }
4284    ///    ```
4285    ///
4286    /// # Arguments
4287    ///
4288    /// * `state_id` - The state table ID
4289    /// * `pda_address` - The PDA address that needs reverse lookup
4290    /// * `account_type` - The event type name for reprocessing
4291    /// * `account_data` - The account data (event value) for reprocessing
4292    /// * `slot` - The slot number when this update occurred
4293    /// * `signature` - The transaction signature
4294    #[cfg_attr(feature = "otel", instrument(
4295        name = "vm.queue_account_update",
4296        skip(self, update),
4297        fields(
4298            pda = %update.pda_address,
4299            account_type = %update.account_type,
4300            slot = update.slot,
4301        )
4302    ))]
4303    pub fn queue_account_update(
4304        &mut self,
4305        state_id: u32,
4306        update: QueuedAccountUpdate,
4307    ) -> Result<()> {
4308        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
4309            self.cleanup_expired_pending_updates(state_id);
4310            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
4311                self.drop_oldest_pending_update(state_id)?;
4312            }
4313        }
4314
4315        let state = self
4316            .states
4317            .get_mut(&state_id)
4318            .ok_or("State table not found")?;
4319
4320        let pending = PendingAccountUpdate {
4321            account_type: update.account_type,
4322            pda_address: update.pda_address.clone(),
4323            account_data: update.account_data,
4324            slot: update.slot,
4325            write_version: update.write_version,
4326            signature: update.signature,
4327            queued_at: std::time::SystemTime::now()
4328                .duration_since(std::time::UNIX_EPOCH)
4329                .unwrap()
4330                .as_secs() as i64,
4331            is_stale_reprocess: false,
4332        };
4333
4334        let pda_address = pending.pda_address.clone();
4335        let slot = pending.slot;
4336
4337        let mut updates = state
4338            .pending_updates
4339            .entry(pda_address.clone())
4340            .or_insert_with(Vec::new);
4341
4342        let original_len = updates.len();
4343        updates.retain(|existing| existing.slot > slot);
4344        let removed_by_dedup = original_len - updates.len();
4345
4346        if removed_by_dedup > 0 {
4347            self.pending_queue_size = self
4348                .pending_queue_size
4349                .saturating_sub(removed_by_dedup as u64);
4350        }
4351
4352        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
4353            updates.remove(0);
4354            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4355        }
4356
4357        updates.push(pending);
4358        #[cfg(feature = "otel")]
4359        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
4360
4361        Ok(())
4362    }
4363
4364    pub fn queue_instruction_event(
4365        &mut self,
4366        state_id: u32,
4367        event: QueuedInstructionEvent,
4368    ) -> Result<()> {
4369        let state = self
4370            .states
4371            .get_mut(&state_id)
4372            .ok_or("State table not found")?;
4373
4374        let pda_address = event.pda_address.clone();
4375
4376        let pending = PendingInstructionEvent {
4377            event_type: event.event_type,
4378            pda_address: event.pda_address,
4379            event_data: event.event_data,
4380            slot: event.slot,
4381            signature: event.signature,
4382            queued_at: std::time::SystemTime::now()
4383                .duration_since(std::time::UNIX_EPOCH)
4384                .unwrap()
4385                .as_secs() as i64,
4386        };
4387
4388        let mut events = state
4389            .pending_instruction_events
4390            .entry(pda_address)
4391            .or_insert_with(Vec::new);
4392
4393        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
4394            events.remove(0);
4395        }
4396
4397        events.push(pending);
4398
4399        Ok(())
4400    }
4401
4402    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
4403        self.last_pda_lookup_miss.take()
4404    }
4405
4406    pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
4407        self.last_lookup_index_miss.take()
4408    }
4409
4410    pub fn take_last_pda_registered(&mut self) -> Option<String> {
4411        self.last_pda_registered.take()
4412    }
4413
4414    pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
4415        std::mem::take(&mut self.last_lookup_index_keys)
4416    }
4417
4418    pub fn flush_pending_instruction_events(
4419        &mut self,
4420        state_id: u32,
4421        pda_address: &str,
4422    ) -> Vec<PendingInstructionEvent> {
4423        let state = match self.states.get_mut(&state_id) {
4424            Some(s) => s,
4425            None => return Vec::new(),
4426        };
4427
4428        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
4429            events
4430        } else {
4431            Vec::new()
4432        }
4433    }
4434
4435    /// Get statistics about the pending queue for monitoring
4436    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
4437        let state = self.states.get(&state_id)?;
4438
4439        let now = std::time::SystemTime::now()
4440            .duration_since(std::time::UNIX_EPOCH)
4441            .unwrap()
4442            .as_secs() as i64;
4443
4444        let mut total_updates = 0;
4445        let mut oldest_timestamp = now;
4446        let mut largest_pda_queue = 0;
4447        let mut estimated_memory = 0;
4448
4449        for entry in state.pending_updates.iter() {
4450            let (_, updates) = entry.pair();
4451            total_updates += updates.len();
4452            largest_pda_queue = largest_pda_queue.max(updates.len());
4453
4454            for update in updates.iter() {
4455                oldest_timestamp = oldest_timestamp.min(update.queued_at);
4456                // Rough memory estimate
4457                estimated_memory += update.account_type.len() +
4458                                   update.pda_address.len() +
4459                                   update.signature.len() +
4460                                   16 + // slot + queued_at
4461                                   estimate_json_size(&update.account_data);
4462            }
4463        }
4464
4465        Some(PendingQueueStats {
4466            total_updates,
4467            unique_pdas: state.pending_updates.len(),
4468            oldest_age_seconds: now - oldest_timestamp,
4469            largest_pda_queue_size: largest_pda_queue,
4470            estimated_memory_bytes: estimated_memory,
4471        })
4472    }
4473
4474    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
4475        let mut stats = VmMemoryStats {
4476            path_cache_size: self.path_cache.len(),
4477            ..Default::default()
4478        };
4479
4480        if let Some(state) = self.states.get(&state_id) {
4481            stats.state_table_entity_count = state.data.len();
4482            stats.state_table_max_entries = state.config.max_entries;
4483            stats.state_table_at_capacity = state.is_at_capacity();
4484
4485            stats.lookup_index_count = state.lookup_indexes.len();
4486            stats.lookup_index_total_entries =
4487                state.lookup_indexes.values().map(|idx| idx.len()).sum();
4488
4489            stats.temporal_index_count = state.temporal_indexes.len();
4490            stats.temporal_index_total_entries = state
4491                .temporal_indexes
4492                .values()
4493                .map(|idx| idx.total_entries())
4494                .sum();
4495
4496            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
4497            stats.pda_reverse_lookup_total_entries = state
4498                .pda_reverse_lookups
4499                .values()
4500                .map(|lookup| lookup.len())
4501                .sum();
4502
4503            stats.version_tracker_entries = state.version_tracker.len();
4504
4505            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
4506        }
4507
4508        stats
4509    }
4510
4511    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
4512        let pending_removed = self.cleanup_expired_pending_updates(state_id);
4513        let temporal_removed = self.cleanup_temporal_indexes(state_id);
4514
4515        #[cfg(feature = "otel")]
4516        if let Some(state) = self.states.get(&state_id) {
4517            crate::vm_metrics::record_cleanup(
4518                pending_removed,
4519                temporal_removed,
4520                &state.entity_name,
4521            );
4522        }
4523
4524        CleanupResult {
4525            pending_updates_removed: pending_removed,
4526            temporal_entries_removed: temporal_removed,
4527        }
4528    }
4529
4530    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
4531        let state = match self.states.get_mut(&state_id) {
4532            Some(s) => s,
4533            None => return 0,
4534        };
4535
4536        let now = std::time::SystemTime::now()
4537            .duration_since(std::time::UNIX_EPOCH)
4538            .unwrap()
4539            .as_secs() as i64;
4540
4541        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
4542        let mut total_removed = 0;
4543
4544        for (_, index) in state.temporal_indexes.iter_mut() {
4545            total_removed += index.cleanup_expired(cutoff);
4546        }
4547
4548        total_removed
4549    }
4550
4551    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
4552        let state = self.states.get(&state_id)?;
4553
4554        if state.is_at_capacity() {
4555            Some(CapacityWarning {
4556                current_entries: state.data.len(),
4557                max_entries: state.config.max_entries,
4558                entries_over_limit: state.entries_over_limit(),
4559            })
4560        } else {
4561            None
4562        }
4563    }
4564
4565    /// Drop the oldest pending update across all PDAs
4566    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
4567        let state = self
4568            .states
4569            .get_mut(&state_id)
4570            .ok_or("State table not found")?;
4571
4572        let mut oldest_pda: Option<String> = None;
4573        let mut oldest_timestamp = i64::MAX;
4574
4575        // Find the PDA with the oldest update
4576        for entry in state.pending_updates.iter() {
4577            let (pda, updates) = entry.pair();
4578            if let Some(update) = updates.first() {
4579                if update.queued_at < oldest_timestamp {
4580                    oldest_timestamp = update.queued_at;
4581                    oldest_pda = Some(pda.clone());
4582                }
4583            }
4584        }
4585
4586        // Remove the oldest update
4587        if let Some(pda) = oldest_pda {
4588            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
4589                if !updates.is_empty() {
4590                    updates.remove(0);
4591                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4592
4593                    // Remove the entry if it's now empty
4594                    if updates.is_empty() {
4595                        drop(updates);
4596                        state.pending_updates.remove(&pda);
4597                    }
4598                }
4599            }
4600        }
4601
4602        Ok(())
4603    }
4604
4605    /// Flush and return pending updates for a PDA for external reprocessing
4606    ///
4607    /// Returns the pending updates that were queued for this PDA address.
4608    /// The caller should reprocess these through the VM using process_event().
4609    fn flush_pending_updates(
4610        &mut self,
4611        state_id: u32,
4612        pda_address: &str,
4613    ) -> Result<Vec<PendingAccountUpdate>> {
4614        let state = self
4615            .states
4616            .get_mut(&state_id)
4617            .ok_or("State table not found")?;
4618
4619        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
4620            let count = pending_updates.len();
4621            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4622            #[cfg(feature = "otel")]
4623            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
4624            Ok(pending_updates)
4625        } else {
4626            Ok(Vec::new())
4627        }
4628    }
4629
4630    /// Cache the most recent account data for a PDA address.
4631    /// Called by the vixen runtime after a Lookup-handler account update is
4632    /// successfully processed.  When a PDA mapping later changes (e.g. at a
4633    /// round boundary), the cached data is returned for reprocessing with the
4634    /// new mapping.
4635    pub fn cache_last_account_data(
4636        &mut self,
4637        state_id: u32,
4638        pda_address: &str,
4639        update: PendingAccountUpdate,
4640    ) {
4641        if let Some(state) = self.states.get(&state_id) {
4642            state
4643                .last_account_data
4644                .insert(pda_address.to_string(), update);
4645        }
4646    }
4647
4648    /// Try to resolve a primary key via PDA reverse lookup
4649    pub fn try_pda_reverse_lookup(
4650        &mut self,
4651        state_id: u32,
4652        lookup_name: &str,
4653        pda_address: &str,
4654    ) -> Option<String> {
4655        let state = self.states.get_mut(&state_id)?;
4656
4657        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
4658            if let Some(value) = lookup.lookup(pda_address) {
4659                self.pda_cache_hits += 1;
4660                return Some(value);
4661            }
4662        }
4663
4664        self.pda_cache_misses += 1;
4665        None
4666    }
4667
4668    /// Try to resolve a value through lookup indexes.
4669    /// This attempts to resolve the value using any lookup index in the state.
4670    pub fn try_lookup_index_resolution(&self, state_id: u32, value: &Value) -> Option<Value> {
4671        let state = self.states.get(&state_id)?;
4672
4673        for index in state.lookup_indexes.values() {
4674            if let Some(resolved) = index.lookup(value) {
4675                return Some(resolved);
4676            }
4677        }
4678
4679        None
4680    }
4681
4682    /// Try to resolve a primary key via chained PDA + lookup index resolution.
4683    /// First tries PDA reverse lookup, then tries to resolve the result through lookup indexes.
4684    /// This is useful when the PDA maps to an intermediate value (e.g., round address)
4685    /// that needs to be resolved to the actual primary key (e.g., round_id).
4686    pub fn try_chained_pda_lookup(
4687        &mut self,
4688        state_id: u32,
4689        lookup_name: &str,
4690        pda_address: &str,
4691    ) -> Option<String> {
4692        // First, try PDA reverse lookup
4693        let pda_result = self.try_pda_reverse_lookup(state_id, lookup_name, pda_address)?;
4694
4695        // Try to resolve the PDA result through lookup indexes
4696        // (e.g., round address -> round_id)
4697        let pda_value = Value::String(pda_result.clone());
4698        if let Some(resolved) = self.try_lookup_index_resolution(state_id, &pda_value) {
4699            resolved.as_str().map(|s| s.to_string())
4700        } else {
4701            // Return the PDA result if we can't resolve further
4702            pda_value.as_str().map(|s| s.to_string())
4703        }
4704    }
4705
4706    // ============================================================================
4707    // Computed Expression Evaluator (Task 5)
4708    // ============================================================================
4709
4710    /// Evaluate a computed expression AST against the current state
4711    /// This is the core runtime evaluator for computed fields from the AST
4712    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4713        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4714    }
4715
4716    /// Evaluate a computed expression with a variable environment (for let bindings)
4717    fn evaluate_computed_expr_with_env(
4718        &self,
4719        expr: &ComputedExpr,
4720        state: &Value,
4721        env: &std::collections::HashMap<String, Value>,
4722    ) -> Result<Value> {
4723        match expr {
4724            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4725
4726            ComputedExpr::Var { name } => env
4727                .get(name)
4728                .cloned()
4729                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4730
4731            ComputedExpr::Let { name, value, body } => {
4732                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4733                let mut new_env = env.clone();
4734                new_env.insert(name.clone(), val);
4735                self.evaluate_computed_expr_with_env(body, state, &new_env)
4736            }
4737
4738            ComputedExpr::If {
4739                condition,
4740                then_branch,
4741                else_branch,
4742            } => {
4743                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4744                if self.value_to_bool(&cond_val) {
4745                    self.evaluate_computed_expr_with_env(then_branch, state, env)
4746                } else {
4747                    self.evaluate_computed_expr_with_env(else_branch, state, env)
4748                }
4749            }
4750
4751            ComputedExpr::None => Ok(Value::Null),
4752
4753            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4754
4755            ComputedExpr::Slice { expr, start, end } => {
4756                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4757                match val {
4758                    Value::Array(arr) => {
4759                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4760                        Ok(Value::Array(slice))
4761                    }
4762                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4763                }
4764            }
4765
4766            ComputedExpr::Index { expr, index } => {
4767                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4768                match val {
4769                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4770                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4771                }
4772            }
4773
4774            ComputedExpr::U64FromLeBytes { bytes } => {
4775                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4776                let byte_vec = self.value_to_bytes(&val)?;
4777                if byte_vec.len() < 8 {
4778                    return Err(format!(
4779                        "u64::from_le_bytes requires 8 bytes, got {}",
4780                        byte_vec.len()
4781                    )
4782                    .into());
4783                }
4784                let arr: [u8; 8] = byte_vec[..8]
4785                    .try_into()
4786                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4787                Ok(Value::Number(serde_json::Number::from(u64::from_le_bytes(
4788                    arr,
4789                ))))
4790            }
4791
4792            ComputedExpr::U64FromBeBytes { bytes } => {
4793                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4794                let byte_vec = self.value_to_bytes(&val)?;
4795                if byte_vec.len() < 8 {
4796                    return Err(format!(
4797                        "u64::from_be_bytes requires 8 bytes, got {}",
4798                        byte_vec.len()
4799                    )
4800                    .into());
4801                }
4802                let arr: [u8; 8] = byte_vec[..8]
4803                    .try_into()
4804                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4805                Ok(Value::Number(serde_json::Number::from(u64::from_be_bytes(
4806                    arr,
4807                ))))
4808            }
4809
4810            ComputedExpr::ByteArray { bytes } => {
4811                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4812            }
4813
4814            ComputedExpr::Closure { param, body } => {
4815                // Closures are stored as-is; they're evaluated when used in map()
4816                // Return a special representation
4817                Ok(json!({
4818                    "__closure": {
4819                        "param": param,
4820                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
4821                    }
4822                }))
4823            }
4824
4825            ComputedExpr::Unary { op, expr } => {
4826                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4827                self.apply_unary_op(op, &val)
4828            }
4829
4830            ComputedExpr::JsonToBytes { expr } => {
4831                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4832                // Convert JSON array of numbers to byte array
4833                let bytes = self.value_to_bytes(&val)?;
4834                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4835            }
4836
4837            ComputedExpr::UnwrapOr { expr, default } => {
4838                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4839                if val.is_null() {
4840                    Ok(default.clone())
4841                } else {
4842                    Ok(val)
4843                }
4844            }
4845
4846            ComputedExpr::Binary { op, left, right } => {
4847                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4848                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4849                self.apply_binary_op(op, &l, &r)
4850            }
4851
4852            ComputedExpr::Cast { expr, to_type } => {
4853                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4854                self.apply_cast(&val, to_type)
4855            }
4856
4857            ComputedExpr::ResolverComputed {
4858                resolver,
4859                method,
4860                args,
4861            } => {
4862                let evaluated_args: Vec<Value> = args
4863                    .iter()
4864                    .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4865                    .collect::<Result<Vec<_>>>()?;
4866                crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4867            }
4868
4869            ComputedExpr::MethodCall { expr, method, args } => {
4870                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4871                // Special handling for map() with closures
4872                if method == "map" && args.len() == 1 {
4873                    if let ComputedExpr::Closure { param, body } = &args[0] {
4874                        // If the value is null, return null (Option::None.map returns None)
4875                        if val.is_null() {
4876                            return Ok(Value::Null);
4877                        }
4878
4879                        if let Value::Array(arr) = &val {
4880                            let results: Result<Vec<Value>> = arr
4881                                .iter()
4882                                .map(|elem| {
4883                                    let mut closure_env = env.clone();
4884                                    closure_env.insert(param.clone(), elem.clone());
4885                                    self.evaluate_computed_expr_with_env(body, state, &closure_env)
4886                                })
4887                                .collect();
4888                            return Ok(Value::Array(results?));
4889                        }
4890
4891                        let mut closure_env = env.clone();
4892                        closure_env.insert(param.clone(), val);
4893                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4894                    }
4895                }
4896                let evaluated_args: Vec<Value> = args
4897                    .iter()
4898                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4899                    .collect::<Result<Vec<_>>>()?;
4900                self.apply_method_call(&val, method, &evaluated_args)
4901            }
4902
4903            ComputedExpr::Literal { value } => Ok(value.clone()),
4904
4905            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4906
4907            ComputedExpr::ContextSlot => Ok(self
4908                .current_context
4909                .as_ref()
4910                .and_then(|ctx| ctx.slot)
4911                .map(|s| json!(s))
4912                .unwrap_or(Value::Null)),
4913
4914            ComputedExpr::ContextTimestamp => Ok(self
4915                .current_context
4916                .as_ref()
4917                .map(|ctx| json!(ctx.timestamp()))
4918                .unwrap_or(Value::Null)),
4919
4920            ComputedExpr::Keccak256 { expr } => {
4921                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4922                let bytes = self.value_to_bytes(&val)?;
4923                use sha3::{Digest, Keccak256};
4924                let hash = Keccak256::digest(&bytes);
4925                Ok(Value::Array(
4926                    hash.to_vec().iter().map(|b| json!(*b)).collect(),
4927                ))
4928            }
4929        }
4930    }
4931
4932    /// Convert a JSON value to a byte vector
4933    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4934        match val {
4935            Value::Array(arr) => arr
4936                .iter()
4937                .map(|v| {
4938                    v.as_u64()
4939                        .map(|n| n as u8)
4940                        .ok_or_else(|| "Array element not a valid byte".into())
4941                })
4942                .collect(),
4943            Value::String(s) => {
4944                // Try to decode as hex
4945                if s.starts_with("0x") || s.starts_with("0X") {
4946                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4947                } else {
4948                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4949                }
4950            }
4951            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4952        }
4953    }
4954
4955    /// Apply a unary operation
4956    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4957        use crate::ast::UnaryOp;
4958        match op {
4959            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4960            UnaryOp::ReverseBits => match val.as_u64() {
4961                Some(n) => Ok(json!(n.reverse_bits())),
4962                None => match val.as_i64() {
4963                    Some(n) => Ok(json!((n as u64).reverse_bits())),
4964                    None => Err("reverse_bits requires an integer".into()),
4965                },
4966            },
4967        }
4968    }
4969
4970    /// Get a field value from state by path (e.g., "section.field" or just "field")
4971    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4972        let segments: Vec<&str> = path.split('.').collect();
4973        let mut current = state;
4974
4975        for segment in segments {
4976            match current.get(segment) {
4977                Some(v) => current = v,
4978                None => return Ok(Value::Null),
4979            }
4980        }
4981
4982        Ok(current.clone())
4983    }
4984
4985    /// Apply a binary operation to two values
4986    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4987        match op {
4988            // Arithmetic operations
4989            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4990            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4991            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4992            BinaryOp::Div => {
4993                // Check for division by zero
4994                if let Some(r) = right.as_i64() {
4995                    if r == 0 {
4996                        return Err("Division by zero".into());
4997                    }
4998                }
4999                if let Some(r) = right.as_f64() {
5000                    if r == 0.0 {
5001                        return Err("Division by zero".into());
5002                    }
5003                }
5004                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
5005            }
5006            BinaryOp::Mod => {
5007                // Modulo - only for integers
5008                match (left.as_i64(), right.as_i64()) {
5009                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
5010                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
5011                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
5012                        _ => Err("Modulo requires non-zero integer operands".into()),
5013                    },
5014                    _ => Err("Modulo by zero".into()),
5015                }
5016            }
5017
5018            // Comparison operations
5019            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
5020            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
5021            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
5022            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
5023            BinaryOp::Eq => Ok(json!(left == right)),
5024            BinaryOp::Ne => Ok(json!(left != right)),
5025
5026            // Logical operations
5027            BinaryOp::And => {
5028                let l_bool = self.value_to_bool(left);
5029                let r_bool = self.value_to_bool(right);
5030                Ok(json!(l_bool && r_bool))
5031            }
5032            BinaryOp::Or => {
5033                let l_bool = self.value_to_bool(left);
5034                let r_bool = self.value_to_bool(right);
5035                Ok(json!(l_bool || r_bool))
5036            }
5037
5038            // Bitwise operations
5039            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
5040                (Some(a), Some(b)) => Ok(json!(a ^ b)),
5041                _ => match (left.as_i64(), right.as_i64()) {
5042                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
5043                    _ => Err("XOR requires integer operands".into()),
5044                },
5045            },
5046            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
5047                (Some(a), Some(b)) => Ok(json!(a & b)),
5048                _ => match (left.as_i64(), right.as_i64()) {
5049                    (Some(a), Some(b)) => Ok(json!(a & b)),
5050                    _ => Err("BitAnd requires integer operands".into()),
5051                },
5052            },
5053            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
5054                (Some(a), Some(b)) => Ok(json!(a | b)),
5055                _ => match (left.as_i64(), right.as_i64()) {
5056                    (Some(a), Some(b)) => Ok(json!(a | b)),
5057                    _ => Err("BitOr requires integer operands".into()),
5058                },
5059            },
5060            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
5061                (Some(a), Some(b)) => Ok(json!(a << b)),
5062                _ => match (left.as_i64(), right.as_i64()) {
5063                    (Some(a), Some(b)) => Ok(json!(a << b)),
5064                    _ => Err("Shl requires integer operands".into()),
5065                },
5066            },
5067            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
5068                (Some(a), Some(b)) => Ok(json!(a >> b)),
5069                _ => match (left.as_i64(), right.as_i64()) {
5070                    (Some(a), Some(b)) => Ok(json!(a >> b)),
5071                    _ => Err("Shr requires integer operands".into()),
5072                },
5073            },
5074        }
5075    }
5076
5077    /// Helper for numeric operations that can work on integers or floats
5078    fn numeric_op<F1, F2>(
5079        &self,
5080        left: &Value,
5081        right: &Value,
5082        int_op: F1,
5083        float_op: F2,
5084    ) -> Result<Value>
5085    where
5086        F1: Fn(i64, i64) -> i64,
5087        F2: Fn(f64, f64) -> f64,
5088    {
5089        // Try i64 first
5090        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
5091            return Ok(json!(int_op(a, b)));
5092        }
5093
5094        // Try u64
5095        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
5096            // For u64, we need to be careful with underflow in subtraction
5097            return Ok(json!(int_op(a as i64, b as i64)));
5098        }
5099
5100        // Try f64
5101        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
5102            return Ok(json!(float_op(a, b)));
5103        }
5104
5105        // If either is null, return null
5106        if left.is_null() || right.is_null() {
5107            return Ok(Value::Null);
5108        }
5109
5110        Err(format!(
5111            "Cannot perform numeric operation on {:?} and {:?}",
5112            left, right
5113        )
5114        .into())
5115    }
5116
5117    /// Helper for comparison operations
5118    fn comparison_op<F1, F2>(
5119        &self,
5120        left: &Value,
5121        right: &Value,
5122        int_cmp: F1,
5123        float_cmp: F2,
5124    ) -> Result<Value>
5125    where
5126        F1: Fn(i64, i64) -> bool,
5127        F2: Fn(f64, f64) -> bool,
5128    {
5129        // Try i64 first
5130        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
5131            return Ok(json!(int_cmp(a, b)));
5132        }
5133
5134        // Try u64
5135        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
5136            return Ok(json!(int_cmp(a as i64, b as i64)));
5137        }
5138
5139        // Try f64
5140        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
5141            return Ok(json!(float_cmp(a, b)));
5142        }
5143
5144        // If either is null, comparison returns false
5145        if left.is_null() || right.is_null() {
5146            return Ok(json!(false));
5147        }
5148
5149        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
5150    }
5151
5152    /// Convert a value to boolean for logical operations
5153    fn value_to_bool(&self, value: &Value) -> bool {
5154        match value {
5155            Value::Null => false,
5156            Value::Bool(b) => *b,
5157            Value::Number(n) => {
5158                if let Some(i) = n.as_i64() {
5159                    i != 0
5160                } else if let Some(f) = n.as_f64() {
5161                    f != 0.0
5162                } else {
5163                    true
5164                }
5165            }
5166            Value::String(s) => !s.is_empty(),
5167            Value::Array(arr) => !arr.is_empty(),
5168            Value::Object(obj) => !obj.is_empty(),
5169        }
5170    }
5171
5172    /// Apply a type cast to a value
5173    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
5174        match to_type {
5175            "i8" | "i16" | "i32" | "i64" | "isize" => {
5176                if let Some(n) = value.as_i64() {
5177                    Ok(json!(n))
5178                } else if let Some(n) = value.as_u64() {
5179                    Ok(json!(n as i64))
5180                } else if let Some(n) = value.as_f64() {
5181                    Ok(json!(n as i64))
5182                } else if let Some(s) = value.as_str() {
5183                    s.parse::<i64>()
5184                        .map(|n| json!(n))
5185                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
5186                } else {
5187                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
5188                }
5189            }
5190            "u8" | "u16" | "u32" | "u64" | "usize" => {
5191                if let Some(n) = value.as_u64() {
5192                    Ok(json!(n))
5193                } else if let Some(n) = value.as_i64() {
5194                    Ok(json!(n as u64))
5195                } else if let Some(n) = value.as_f64() {
5196                    Ok(json!(n as u64))
5197                } else if let Some(s) = value.as_str() {
5198                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
5199                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
5200                    })
5201                } else {
5202                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
5203                }
5204            }
5205            "f32" | "f64" => {
5206                if let Some(n) = value.as_f64() {
5207                    Ok(json!(n))
5208                } else if let Some(n) = value.as_i64() {
5209                    Ok(json!(n as f64))
5210                } else if let Some(n) = value.as_u64() {
5211                    Ok(json!(n as f64))
5212                } else if let Some(s) = value.as_str() {
5213                    s.parse::<f64>()
5214                        .map(|n| json!(n))
5215                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
5216                } else {
5217                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
5218                }
5219            }
5220            "String" | "string" => Ok(json!(value.to_string())),
5221            "bool" => Ok(json!(self.value_to_bool(value))),
5222            _ => {
5223                // Unknown type, return value as-is
5224                Ok(value.clone())
5225            }
5226        }
5227    }
5228
5229    /// Apply a method call to a value
5230    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
5231        match method {
5232            "unwrap_or" => {
5233                if value.is_null() && !args.is_empty() {
5234                    Ok(args[0].clone())
5235                } else {
5236                    Ok(value.clone())
5237                }
5238            }
5239            "unwrap_or_default" => {
5240                if value.is_null() {
5241                    // Return default for common types
5242                    Ok(json!(0))
5243                } else {
5244                    Ok(value.clone())
5245                }
5246            }
5247            "is_some" => Ok(json!(!value.is_null())),
5248            "is_none" => Ok(json!(value.is_null())),
5249            "abs" => {
5250                if let Some(n) = value.as_i64() {
5251                    Ok(json!(n.abs()))
5252                } else if let Some(n) = value.as_f64() {
5253                    Ok(json!(n.abs()))
5254                } else {
5255                    Err(format!("Cannot call abs() on {:?}", value).into())
5256                }
5257            }
5258            "len" => {
5259                if let Some(s) = value.as_str() {
5260                    Ok(json!(s.len()))
5261                } else if let Some(arr) = value.as_array() {
5262                    Ok(json!(arr.len()))
5263                } else if let Some(obj) = value.as_object() {
5264                    Ok(json!(obj.len()))
5265                } else {
5266                    Err(format!("Cannot call len() on {:?}", value).into())
5267                }
5268            }
5269            "to_string" => Ok(json!(value.to_string())),
5270            "min" => {
5271                if args.is_empty() {
5272                    return Err("min() requires an argument".into());
5273                }
5274                let other = &args[0];
5275                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5276                    Ok(json!(a.min(b)))
5277                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
5278                    Ok(json!(a.min(b)))
5279                } else {
5280                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
5281                }
5282            }
5283            "max" => {
5284                if args.is_empty() {
5285                    return Err("max() requires an argument".into());
5286                }
5287                let other = &args[0];
5288                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5289                    Ok(json!(a.max(b)))
5290                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
5291                    Ok(json!(a.max(b)))
5292                } else {
5293                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
5294                }
5295            }
5296            "saturating_add" => {
5297                if args.is_empty() {
5298                    return Err("saturating_add() requires an argument".into());
5299                }
5300                let other = &args[0];
5301                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5302                    Ok(json!(a.saturating_add(b)))
5303                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
5304                    Ok(json!(a.saturating_add(b)))
5305                } else {
5306                    Err(format!(
5307                        "Cannot call saturating_add() on {:?} and {:?}",
5308                        value, other
5309                    )
5310                    .into())
5311                }
5312            }
5313            "saturating_sub" => {
5314                if args.is_empty() {
5315                    return Err("saturating_sub() requires an argument".into());
5316                }
5317                let other = &args[0];
5318                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5319                    Ok(json!(a.saturating_sub(b)))
5320                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
5321                    Ok(json!(a.saturating_sub(b)))
5322                } else {
5323                    Err(format!(
5324                        "Cannot call saturating_sub() on {:?} and {:?}",
5325                        value, other
5326                    )
5327                    .into())
5328                }
5329            }
5330            _ => Err(format!("Unknown method call: {}()", method).into()),
5331        }
5332    }
5333
5334    /// Evaluate all computed fields for an entity and update the state
5335    /// This takes a list of ComputedFieldSpec from the AST and applies them
5336    pub fn evaluate_computed_fields_from_ast(
5337        &self,
5338        state: &mut Value,
5339        computed_field_specs: &[ComputedFieldSpec],
5340    ) -> Result<Vec<String>> {
5341        let mut updated_paths = Vec::new();
5342
5343        crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
5344
5345        for spec in computed_field_specs {
5346            match self.evaluate_computed_expr(&spec.expression, state) {
5347                Ok(result) => {
5348                    self.set_field_in_state(state, &spec.target_path, result)?;
5349                    updated_paths.push(spec.target_path.clone());
5350                }
5351                Err(e) => {
5352                    tracing::warn!(
5353                        target_path = %spec.target_path,
5354                        error = %e,
5355                        "Failed to evaluate computed field"
5356                    );
5357                }
5358            }
5359        }
5360
5361        Ok(updated_paths)
5362    }
5363
5364    /// Set a field value in state by path (e.g., "section.field")
5365    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
5366        let segments: Vec<&str> = path.split('.').collect();
5367
5368        if segments.is_empty() {
5369            return Err("Empty path".into());
5370        }
5371
5372        // Navigate to parent, creating intermediate objects as needed
5373        let mut current = state;
5374        for (i, segment) in segments.iter().enumerate() {
5375            if i == segments.len() - 1 {
5376                // Last segment - set the value
5377                if let Some(obj) = current.as_object_mut() {
5378                    obj.insert(segment.to_string(), value);
5379                    return Ok(());
5380                } else {
5381                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
5382                }
5383            } else {
5384                // Intermediate segment - navigate or create
5385                if !current.is_object() {
5386                    *current = json!({});
5387                }
5388                let obj = current.as_object_mut().unwrap();
5389                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
5390            }
5391        }
5392
5393        Ok(())
5394    }
5395
5396    /// Create a computed fields evaluator closure from AST specs
5397    /// This returns a function that can be passed to the bytecode builder
5398    pub fn create_evaluator_from_specs(
5399        specs: Vec<ComputedFieldSpec>,
5400    ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
5401        move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
5402            let mut vm = VmContext::new();
5403            vm.current_context = Some(UpdateContext {
5404                slot: context_slot,
5405                timestamp: Some(context_timestamp),
5406                ..Default::default()
5407            });
5408            vm.evaluate_computed_fields_from_ast(state, &specs)?;
5409            Ok(())
5410        }
5411    }
5412}
5413
5414impl Default for VmContext {
5415    fn default() -> Self {
5416        Self::new()
5417    }
5418}
5419
5420// Implement the ReverseLookupUpdater trait for VmContext
5421impl crate::resolvers::ReverseLookupUpdater for VmContext {
5422    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
5423        // Use default state_id=0 and default lookup name
5424        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
5425            .unwrap_or_else(|e| {
5426                tracing::error!("Failed to update PDA reverse lookup: {}", e);
5427                Vec::new()
5428            })
5429    }
5430
5431    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
5432        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
5433        self.flush_pending_updates(0, pda_address)
5434            .unwrap_or_else(|e| {
5435                tracing::error!("Failed to flush pending updates: {}", e);
5436                Vec::new()
5437            })
5438    }
5439}
5440
5441#[cfg(test)]
5442mod tests {
5443    use super::*;
5444    use crate::ast::{
5445        BinaryOp, ComputedExpr, ComputedFieldSpec, HttpMethod, UrlResolverConfig, UrlSource,
5446    };
5447
5448    #[test]
5449    fn test_url_resolver_cache_key_uses_method_and_resolved_url() {
5450        let field_path_resolver = ResolverType::Url(UrlResolverConfig {
5451            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5452            method: HttpMethod::Get,
5453            extract_path: None,
5454        });
5455        let template_resolver = ResolverType::Url(UrlResolverConfig {
5456            url_source: UrlSource::Template(vec![ast::UrlTemplatePart::Literal(
5457                "https://example.com/metadata".to_string(),
5458            )]),
5459            method: HttpMethod::Get,
5460            extract_path: Some("data".to_string()),
5461        });
5462        let input = json!("https://cdn.example.com/token.json");
5463
5464        assert_eq!(
5465            resolver_cache_key(&field_path_resolver, &input),
5466            resolver_cache_key(&template_resolver, &input)
5467        );
5468    }
5469
5470    #[test]
5471    fn test_url_resolver_cache_key_distinguishes_http_method() {
5472        let get_resolver = ResolverType::Url(UrlResolverConfig {
5473            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5474            method: HttpMethod::Get,
5475            extract_path: None,
5476        });
5477        let post_resolver = ResolverType::Url(UrlResolverConfig {
5478            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5479            method: HttpMethod::Post,
5480            extract_path: None,
5481        });
5482        let input = json!("https://api.example.com/round");
5483
5484        assert_ne!(
5485            resolver_cache_key(&get_resolver, &input),
5486            resolver_cache_key(&post_resolver, &input)
5487        );
5488    }
5489
5490    #[test]
5491    fn test_expired_resolver_cache_entry_is_dropped() {
5492        let mut vm = VmContext::new();
5493        let resolver = ResolverType::Url(UrlResolverConfig {
5494            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5495            method: HttpMethod::Get,
5496            extract_path: None,
5497        });
5498        let input = json!("https://cdn.example.com/token.json");
5499        let cache_key = resolver_cache_key(&resolver, &input);
5500
5501        vm.resolver_cache.put(
5502            cache_key.clone(),
5503            ResolverCacheEntry {
5504                value: json!({ "name": "Token" }),
5505                cached_at: Instant::now() - resolver_cache_ttl() - Duration::from_secs(1),
5506            },
5507        );
5508
5509        assert!(vm.get_cached_resolver_value(&cache_key).is_none());
5510        assert!(vm.resolver_cache.get(&cache_key).is_none());
5511    }
5512
5513    #[test]
5514    fn test_computed_field_preserves_integer_type() {
5515        let vm = VmContext::new();
5516
5517        let mut state = serde_json::json!({
5518            "trading": {
5519                "total_buy_volume": 20000000000_i64,
5520                "total_sell_volume": 17951316474_i64
5521            }
5522        });
5523
5524        let spec = ComputedFieldSpec {
5525            target_path: "trading.total_volume".to_string(),
5526            result_type: "Option<u64>".to_string(),
5527            expression: ComputedExpr::Binary {
5528                op: BinaryOp::Add,
5529                left: Box::new(ComputedExpr::UnwrapOr {
5530                    expr: Box::new(ComputedExpr::FieldRef {
5531                        path: "trading.total_buy_volume".to_string(),
5532                    }),
5533                    default: serde_json::json!(0),
5534                }),
5535                right: Box::new(ComputedExpr::UnwrapOr {
5536                    expr: Box::new(ComputedExpr::FieldRef {
5537                        path: "trading.total_sell_volume".to_string(),
5538                    }),
5539                    default: serde_json::json!(0),
5540                }),
5541            },
5542        };
5543
5544        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
5545            .unwrap();
5546
5547        let total_volume = state
5548            .get("trading")
5549            .and_then(|t| t.get("total_volume"))
5550            .expect("total_volume should exist");
5551
5552        let serialized = serde_json::to_string(total_volume).unwrap();
5553        assert!(
5554            !serialized.contains('.'),
5555            "Integer should not have decimal point: {}",
5556            serialized
5557        );
5558        assert_eq!(
5559            total_volume.as_i64(),
5560            Some(37951316474),
5561            "Value should be correct sum"
5562        );
5563    }
5564
5565    #[test]
5566    fn test_set_field_sum_preserves_integer_type() {
5567        let mut vm = VmContext::new();
5568        vm.registers[0] = serde_json::json!({});
5569        vm.registers[1] = serde_json::json!(20000000000_i64);
5570        vm.registers[2] = serde_json::json!(17951316474_i64);
5571
5572        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
5573        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
5574
5575        let state = &vm.registers[0];
5576        let buy_vol = state
5577            .get("trading")
5578            .and_then(|t| t.get("total_buy_volume"))
5579            .unwrap();
5580        let sell_vol = state
5581            .get("trading")
5582            .and_then(|t| t.get("total_sell_volume"))
5583            .unwrap();
5584
5585        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
5586        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
5587
5588        assert!(
5589            !buy_serialized.contains('.'),
5590            "Buy volume should not have decimal: {}",
5591            buy_serialized
5592        );
5593        assert!(
5594            !sell_serialized.contains('.'),
5595            "Sell volume should not have decimal: {}",
5596            sell_serialized
5597        );
5598    }
5599
5600    #[test]
5601    fn test_lookup_index_chaining() {
5602        let mut vm = VmContext::new();
5603
5604        let state = vm.states.get_mut(&0).unwrap();
5605
5606        state
5607            .pda_reverse_lookups
5608            .entry("default_pda_lookup".to_string())
5609            .or_insert_with(|| PdaReverseLookup::new(1000))
5610            .insert("pda_123".to_string(), "addr_456".to_string());
5611
5612        state
5613            .lookup_indexes
5614            .entry("round_address_lookup_index".to_string())
5615            .or_insert_with(LookupIndex::new)
5616            .insert(json!("addr_456"), json!(789));
5617
5618        let handler = vec![
5619            OpCode::LoadConstant {
5620                value: json!("pda_123"),
5621                dest: 0,
5622            },
5623            OpCode::LookupIndex {
5624                state_id: 0,
5625                index_name: "round_address_lookup_index".to_string(),
5626                lookup_value: 0,
5627                dest: 1,
5628            },
5629        ];
5630
5631        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5632            .unwrap();
5633
5634        assert_eq!(vm.registers[1], json!(789));
5635    }
5636
5637    #[test]
5638    fn test_lookup_index_no_chain() {
5639        let mut vm = VmContext::new();
5640
5641        let state = vm.states.get_mut(&0).unwrap();
5642        state
5643            .lookup_indexes
5644            .entry("test_index".to_string())
5645            .or_insert_with(LookupIndex::new)
5646            .insert(json!("key_abc"), json!(42));
5647
5648        let handler = vec![
5649            OpCode::LoadConstant {
5650                value: json!("key_abc"),
5651                dest: 0,
5652            },
5653            OpCode::LookupIndex {
5654                state_id: 0,
5655                index_name: "test_index".to_string(),
5656                lookup_value: 0,
5657                dest: 1,
5658            },
5659        ];
5660
5661        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5662            .unwrap();
5663
5664        assert_eq!(vm.registers[1], json!(42));
5665    }
5666
5667    #[test]
5668    fn test_conditional_set_field_with_zero_array() {
5669        let mut vm = VmContext::new();
5670
5671        let event_zeros = json!({
5672            "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5673                      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5674        });
5675
5676        let event_nonzero = json!({
5677            "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
5678                      17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
5679        });
5680
5681        let zero_32: Value = json!([
5682            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5683            0, 0, 0
5684        ]);
5685
5686        let handler = vec![
5687            OpCode::CreateObject { dest: 2 },
5688            OpCode::LoadEventField {
5689                path: FieldPath::new(&["value"]),
5690                dest: 10,
5691                default: None,
5692            },
5693            OpCode::ConditionalSetField {
5694                object: 2,
5695                path: "captured_value".to_string(),
5696                value: 10,
5697                condition_field: FieldPath::new(&["value"]),
5698                condition_op: ComparisonOp::NotEqual,
5699                condition_value: zero_32,
5700            },
5701        ];
5702
5703        vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
5704            .unwrap();
5705        assert!(
5706            vm.registers[2].get("captured_value").is_none(),
5707            "Field should not be set when value is all zeros"
5708        );
5709
5710        vm.reset_registers();
5711        vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
5712            .unwrap();
5713        assert!(
5714            vm.registers[2].get("captured_value").is_some(),
5715            "Field should be set when value is non-zero"
5716        );
5717    }
5718
5719    #[test]
5720    fn test_when_instruction_arrives_first() {
5721        let mut vm = VmContext::new();
5722
5723        let signature = "test_sig_123".to_string();
5724
5725        {
5726            let state = vm.states.get(&0).unwrap();
5727            let mut cache = state.recent_tx_instructions.lock().unwrap();
5728            let mut set = HashSet::new();
5729            set.insert("RevealIxState".to_string());
5730            cache.put(signature.clone(), set);
5731        }
5732
5733        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5734
5735        let handler = vec![
5736            OpCode::CreateObject { dest: 2 },
5737            OpCode::LoadConstant {
5738                value: json!("primary_key_value"),
5739                dest: 1,
5740            },
5741            OpCode::LoadConstant {
5742                value: json!("the_revealed_value"),
5743                dest: 10,
5744            },
5745            OpCode::SetFieldWhen {
5746                object: 2,
5747                path: "entropy_value".to_string(),
5748                value: 10,
5749                when_instruction: "RevealIxState".to_string(),
5750                entity_name: "TestEntity".to_string(),
5751                key_reg: 1,
5752                condition_field: None,
5753                condition_op: None,
5754                condition_value: None,
5755            },
5756        ];
5757
5758        vm.execute_handler(
5759            &handler,
5760            &json!({}),
5761            "VarState",
5762            0,
5763            "TestEntity",
5764            None,
5765            None,
5766        )
5767        .unwrap();
5768
5769        assert_eq!(
5770            vm.registers[2].get("entropy_value").unwrap(),
5771            "the_revealed_value",
5772            "Field should be set when instruction was already seen"
5773        );
5774    }
5775
5776    #[test]
5777    fn test_when_account_arrives_first() {
5778        let mut vm = VmContext::new();
5779
5780        let signature = "test_sig_456".to_string();
5781
5782        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5783
5784        let handler = vec![
5785            OpCode::CreateObject { dest: 2 },
5786            OpCode::LoadConstant {
5787                value: json!("pk_123"),
5788                dest: 1,
5789            },
5790            OpCode::LoadConstant {
5791                value: json!("deferred_value"),
5792                dest: 10,
5793            },
5794            OpCode::SetFieldWhen {
5795                object: 2,
5796                path: "entropy_value".to_string(),
5797                value: 10,
5798                when_instruction: "RevealIxState".to_string(),
5799                entity_name: "TestEntity".to_string(),
5800                key_reg: 1,
5801                condition_field: None,
5802                condition_op: None,
5803                condition_value: None,
5804            },
5805        ];
5806
5807        vm.execute_handler(
5808            &handler,
5809            &json!({}),
5810            "VarState",
5811            0,
5812            "TestEntity",
5813            None,
5814            None,
5815        )
5816        .unwrap();
5817
5818        assert!(
5819            vm.registers[2].get("entropy_value").is_none(),
5820            "Field should not be set when instruction hasn't been seen"
5821        );
5822
5823        let state = vm.states.get(&0).unwrap();
5824        let key = (signature.clone(), "RevealIxState".to_string());
5825        assert!(
5826            state.deferred_when_ops.contains_key(&key),
5827            "Operation should be queued"
5828        );
5829
5830        {
5831            let mut cache = state.recent_tx_instructions.lock().unwrap();
5832            let mut set = HashSet::new();
5833            set.insert("RevealIxState".to_string());
5834            cache.put(signature.clone(), set);
5835        }
5836
5837        let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5838        for op in deferred {
5839            vm.apply_deferred_when_op(0, &op, None, None).unwrap();
5840        }
5841
5842        let state = vm.states.get(&0).unwrap();
5843        let entity = state.data.get(&json!("pk_123")).unwrap();
5844        assert_eq!(
5845            entity.get("entropy_value").unwrap(),
5846            "deferred_value",
5847            "Field should be set after instruction arrives"
5848        );
5849    }
5850
5851    #[test]
5852    fn test_when_cleanup_expired() {
5853        let mut vm = VmContext::new();
5854
5855        let state = vm.states.get(&0).unwrap();
5856        let key = ("old_sig".to_string(), "SomeIxState".to_string());
5857        state.deferred_when_ops.insert(
5858            key,
5859            vec![DeferredWhenOperation {
5860                entity_name: "Test".to_string(),
5861                primary_key: json!("pk"),
5862                field_path: "field".to_string(),
5863                field_value: json!("value"),
5864                when_instruction: "SomeIxState".to_string(),
5865                signature: "old_sig".to_string(),
5866                slot: 0,
5867                deferred_at: 0,
5868                emit: true,
5869            }],
5870        );
5871
5872        let removed = vm.cleanup_expired_when_ops(0, 60);
5873
5874        assert_eq!(removed, 1, "Should have removed 1 expired op");
5875        assert!(
5876            vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5877            "Deferred ops should be empty after cleanup"
5878        );
5879    }
5880
5881    #[test]
5882    fn test_deferred_when_op_recomputes_dependent_fields() {
5883        use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5884
5885        let mut vm = VmContext::new();
5886
5887        // Create computed field specs similar to the ore stack:
5888        // pre_reveal_rng depends on base_value
5889        // pre_reveal_winning_square depends on pre_reveal_rng
5890        let computed_specs = vec![
5891            ComputedFieldSpec {
5892                target_path: "results.pre_reveal_rng".to_string(),
5893                result_type: "Option<u64>".to_string(),
5894                expression: ComputedExpr::FieldRef {
5895                    path: "entropy.base_value".to_string(),
5896                },
5897            },
5898            ComputedFieldSpec {
5899                target_path: "results.pre_reveal_winning_square".to_string(),
5900                result_type: "Option<u64>".to_string(),
5901                expression: ComputedExpr::MethodCall {
5902                    expr: Box::new(ComputedExpr::FieldRef {
5903                        path: "results.pre_reveal_rng".to_string(),
5904                    }),
5905                    method: "map".to_string(),
5906                    args: vec![ComputedExpr::Closure {
5907                        param: "r".to_string(),
5908                        body: Box::new(ComputedExpr::Binary {
5909                            op: BinaryOp::Mod,
5910                            left: Box::new(ComputedExpr::Var {
5911                                name: "r".to_string(),
5912                            }),
5913                            right: Box::new(ComputedExpr::Literal {
5914                                value: serde_json::json!(25),
5915                            }),
5916                        }),
5917                    }],
5918                },
5919            },
5920        ];
5921
5922        let evaluator: Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync> =
5923            Box::new(VmContext::create_evaluator_from_specs(computed_specs));
5924
5925        // Test that when we set entropy.base_value via deferred when-op,
5926        // both computed fields are updated
5927        let primary_key = json!("test_pk");
5928        let op = DeferredWhenOperation {
5929            entity_name: "TestEntity".to_string(),
5930            primary_key: primary_key.clone(),
5931            field_path: "entropy.base_value".to_string(),
5932            field_value: json!(100),
5933            when_instruction: "TestIxState".to_string(),
5934            signature: "test_sig".to_string(),
5935            slot: 100,
5936            deferred_at: 0,
5937            emit: true,
5938        };
5939
5940        // Store the entity in state first
5941        let initial_state = json!({
5942            "results": {}
5943        });
5944        vm.states
5945            .get(&0)
5946            .unwrap()
5947            .insert_with_eviction(primary_key.clone(), initial_state);
5948
5949        // Apply the deferred when-op with the evaluator
5950        let mutations = vm
5951            .apply_deferred_when_op(
5952                0,
5953                &op,
5954                Some(&evaluator),
5955                Some(&[
5956                    "results.pre_reveal_rng".to_string(),
5957                    "results.pre_reveal_winning_square".to_string(),
5958                ]),
5959            )
5960            .unwrap();
5961
5962        // Check the entity state
5963        let state = vm.states.get(&0).unwrap();
5964        let entity = state.data.get(&primary_key).unwrap();
5965
5966        println!(
5967            "Entity state: {}",
5968            serde_json::to_string_pretty(&*entity).unwrap()
5969        );
5970
5971        // Verify base value was set
5972        assert_eq!(
5973            entity.get("entropy").and_then(|e| e.get("base_value")),
5974            Some(&json!(100)),
5975            "Base value should be set"
5976        );
5977
5978        // Verify computed fields were calculated
5979        let pre_reveal_rng = entity
5980            .get("results")
5981            .and_then(|r| r.get("pre_reveal_rng"))
5982            .cloned();
5983        let pre_reveal_winning_square = entity
5984            .get("results")
5985            .and_then(|r| r.get("pre_reveal_winning_square"))
5986            .cloned();
5987
5988        assert_eq!(
5989            pre_reveal_rng,
5990            Some(json!(100)),
5991            "pre_reveal_rng should be computed"
5992        );
5993        assert_eq!(
5994            pre_reveal_winning_square,
5995            Some(json!(0)),
5996            "pre_reveal_winning_square should be 100 % 25 = 0"
5997        );
5998
5999        // Verify mutations include computed fields
6000        assert!(!mutations.is_empty(), "Should have mutations");
6001        let mutation = &mutations[0];
6002        let patch = &mutation.patch;
6003
6004        assert!(
6005            patch
6006                .get("results")
6007                .and_then(|r| r.get("pre_reveal_rng"))
6008                .is_some(),
6009            "Mutation should include pre_reveal_rng"
6010        );
6011        assert!(
6012            patch
6013                .get("results")
6014                .and_then(|r| r.get("pre_reveal_winning_square"))
6015                .is_some(),
6016            "Mutation should include pre_reveal_winning_square"
6017        );
6018    }
6019}