Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3    ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use serde_json::{json, Value};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::num::NonZeroUsize;
12
13#[cfg(feature = "otel")]
14use tracing::instrument;
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// Additional custom metadata that can be added without breaking changes
33    pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37    /// Create a new UpdateContext with slot and signature
38    pub fn new(slot: u64, signature: String) -> Self {
39        Self {
40            slot: Some(slot),
41            signature: Some(signature),
42            timestamp: None,
43            write_version: None,
44            txn_index: None,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Create a new UpdateContext with slot, signature, and timestamp
50    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51        Self {
52            slot: Some(slot),
53            signature: Some(signature),
54            timestamp: Some(timestamp),
55            write_version: None,
56            txn_index: None,
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Create context for account updates with write_version for staleness detection
62    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63        Self {
64            slot: Some(slot),
65            signature: Some(signature),
66            timestamp: None,
67            write_version: Some(write_version),
68            txn_index: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create context for instruction updates with txn_index for staleness detection
74    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75        Self {
76            slot: Some(slot),
77            signature: Some(signature),
78            timestamp: None,
79            write_version: None,
80            txn_index: Some(txn_index),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Get the timestamp, falling back to current system time if not set
86    pub fn timestamp(&self) -> i64 {
87        self.timestamp.unwrap_or_else(|| {
88            std::time::SystemTime::now()
89                .duration_since(std::time::UNIX_EPOCH)
90                .unwrap()
91                .as_secs() as i64
92        })
93    }
94
95    /// Create an empty context (for testing or when context is not available)
96    pub fn empty() -> Self {
97        Self::default()
98    }
99
100    /// Add custom metadata
101    /// Returns true if this is an account update context (has write_version, no txn_index)
102    pub fn is_account_update(&self) -> bool {
103        self.write_version.is_some() && self.txn_index.is_none()
104    }
105
106    /// Returns true if this is an instruction update context (has txn_index, no write_version)
107    pub fn is_instruction_update(&self) -> bool {
108        self.txn_index.is_some() && self.write_version.is_none()
109    }
110
111    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
112        self.metadata.insert(key, value);
113        self
114    }
115
116    /// Get metadata value
117    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
118        self.metadata.get(key)
119    }
120
121    /// Convert context to JSON value for injection into event data
122    pub fn to_value(&self) -> Value {
123        let mut obj = serde_json::Map::new();
124        if let Some(slot) = self.slot {
125            obj.insert("slot".to_string(), json!(slot));
126        }
127        if let Some(ref sig) = self.signature {
128            obj.insert("signature".to_string(), json!(sig));
129        }
130        // Always include timestamp (use current time if not set)
131        obj.insert("timestamp".to_string(), json!(self.timestamp()));
132        for (key, value) in &self.metadata {
133            obj.insert(key.clone(), value.clone());
134        }
135        Value::Object(obj)
136    }
137}
138
139pub type Register = usize;
140pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
141
142pub type RegisterValue = Value;
143
144/// Trait for evaluating computed fields
145/// Implement this in your generated spec to enable computed field evaluation
146pub trait ComputedFieldsEvaluator {
147    fn evaluate(&self, state: &mut Value) -> Result<()>;
148}
149
150// Pending queue configuration
151const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
152const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
153const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
154
155// Temporal index configuration - prevents unbounded history growth
156const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
157const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
158
159// State table configuration - aligned with downstream EntityCache (500 per view)
160const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
161const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
162
163const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
164
165const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
166
167// Smaller cache for instruction deduplication - provides shorter effective TTL
168// since we don't expect instruction duplicates to arrive much later
169const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
170
171const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
172
173const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
174
175const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 5_000;
176
177/// Estimate the size of a JSON value in bytes
178fn estimate_json_size(value: &Value) -> usize {
179    match value {
180        Value::Null => 4,
181        Value::Bool(_) => 5,
182        Value::Number(_) => 8,
183        Value::String(s) => s.len() + 2,
184        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
185        Value::Object(obj) => {
186            2 + obj
187                .iter()
188                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
189                .sum::<usize>()
190        }
191    }
192}
193
194#[derive(Debug, Clone)]
195pub struct CompiledPath {
196    pub segments: std::sync::Arc<[String]>,
197}
198
199impl CompiledPath {
200    pub fn new(path: &str) -> Self {
201        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
202        CompiledPath {
203            segments: segments.into(),
204        }
205    }
206
207    fn segments(&self) -> &[String] {
208        &self.segments
209    }
210}
211
212/// Represents the type of change made to a field for granular dirty tracking.
213/// This enables emitting only the actual changes rather than entire field values.
214#[derive(Debug, Clone)]
215pub enum FieldChange {
216    /// Field was replaced with a new value (emit the full value from state)
217    Replaced,
218    /// Items were appended to an array field (emit only the new items)
219    Appended(Vec<Value>),
220}
221
222/// Tracks field modifications during handler execution with granular change information.
223/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
224#[derive(Debug, Clone, Default)]
225pub struct DirtyTracker {
226    changes: HashMap<String, FieldChange>,
227}
228
229impl DirtyTracker {
230    /// Create a new empty DirtyTracker
231    pub fn new() -> Self {
232        Self {
233            changes: HashMap::new(),
234        }
235    }
236
237    /// Mark a field as replaced (full value will be emitted)
238    pub fn mark_replaced(&mut self, path: &str) {
239        // If there was an append, it's now superseded by a full replacement
240        self.changes.insert(path.to_string(), FieldChange::Replaced);
241    }
242
243    /// Record an appended value for a field
244    pub fn mark_appended(&mut self, path: &str, value: Value) {
245        match self.changes.get_mut(path) {
246            Some(FieldChange::Appended(values)) => {
247                // Add to existing appended values
248                values.push(value);
249            }
250            Some(FieldChange::Replaced) => {
251                // Field was already replaced, keep it as replaced
252                // (the full value including the append will be emitted)
253            }
254            None => {
255                // First append to this field
256                self.changes
257                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
258            }
259        }
260    }
261
262    /// Check if there are any changes tracked
263    pub fn is_empty(&self) -> bool {
264        self.changes.is_empty()
265    }
266
267    /// Get the number of changed fields
268    pub fn len(&self) -> usize {
269        self.changes.len()
270    }
271
272    /// Iterate over all changes
273    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
274        self.changes.iter()
275    }
276
277    /// Get a set of all dirty field paths (for backward compatibility)
278    pub fn dirty_paths(&self) -> HashSet<String> {
279        self.changes.keys().cloned().collect()
280    }
281
282    /// Consume the tracker and return the changes map
283    pub fn into_changes(self) -> HashMap<String, FieldChange> {
284        self.changes
285    }
286
287    /// Get a reference to the changes map
288    pub fn changes(&self) -> &HashMap<String, FieldChange> {
289        &self.changes
290    }
291
292    /// Get paths that were appended (not replaced)
293    pub fn appended_paths(&self) -> Vec<String> {
294        self.changes
295            .iter()
296            .filter_map(|(path, change)| match change {
297                FieldChange::Appended(_) => Some(path.clone()),
298                FieldChange::Replaced => None,
299            })
300            .collect()
301    }
302}
303
304pub struct VmContext {
305    registers: Vec<RegisterValue>,
306    states: HashMap<u32, StateTable>,
307    pub instructions_executed: u64,
308    pub cache_hits: u64,
309    path_cache: HashMap<String, CompiledPath>,
310    pub pda_cache_hits: u64,
311    pub pda_cache_misses: u64,
312    pub pending_queue_size: u64,
313    resolver_requests: VecDeque<ResolverRequest>,
314    resolver_pending: HashMap<String, PendingResolverEntry>,
315    resolver_cache: LruCache<String, Value>,
316    pub resolver_cache_hits: u64,
317    pub resolver_cache_misses: u64,
318    current_context: Option<UpdateContext>,
319    warnings: Vec<String>,
320    last_pda_lookup_miss: Option<String>,
321    last_lookup_index_miss: Option<String>,
322    last_pda_registered: Option<String>,
323    last_lookup_index_keys: Vec<String>,
324    scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
325}
326
327#[derive(Debug)]
328pub struct LookupIndex {
329    index: std::sync::Mutex<LruCache<String, Value>>,
330}
331
332impl LookupIndex {
333    pub fn new() -> Self {
334        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
335    }
336
337    pub fn with_capacity(capacity: usize) -> Self {
338        LookupIndex {
339            index: std::sync::Mutex::new(LruCache::new(
340                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
341            )),
342        }
343    }
344
345    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
346        let key = value_to_cache_key(lookup_value);
347        self.index.lock().unwrap().get(&key).cloned()
348    }
349
350    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
351        let key = value_to_cache_key(&lookup_value);
352        self.index.lock().unwrap().put(key, primary_key);
353    }
354
355    pub fn len(&self) -> usize {
356        self.index.lock().unwrap().len()
357    }
358
359    pub fn is_empty(&self) -> bool {
360        self.index.lock().unwrap().is_empty()
361    }
362}
363
364impl Default for LookupIndex {
365    fn default() -> Self {
366        Self::new()
367    }
368}
369
370fn value_to_cache_key(value: &Value) -> String {
371    match value {
372        Value::String(s) => s.clone(),
373        Value::Number(n) => n.to_string(),
374        Value::Bool(b) => b.to_string(),
375        Value::Null => "null".to_string(),
376        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
377    }
378}
379
380fn resolver_type_key(resolver: &ResolverType) -> String {
381    match resolver {
382        ResolverType::Token => "token".to_string(),
383        ResolverType::Url(config) => match &config.url_source {
384            ast::UrlSource::FieldPath(path) => format!("url:{}", path),
385            ast::UrlSource::Template(parts) => {
386                let key: String = parts
387                    .iter()
388                    .map(|p| match p {
389                        ast::UrlTemplatePart::Literal(s) => s.clone(),
390                        ast::UrlTemplatePart::FieldRef(f) => format!("{{{}}}", f),
391                    })
392                    .collect();
393                format!("url:{}", key)
394            }
395        },
396    }
397}
398
399fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
400    format!(
401        "{}:{}",
402        resolver_type_key(resolver),
403        value_to_cache_key(input)
404    )
405}
406
407#[derive(Debug)]
408pub struct TemporalIndex {
409    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
410}
411
412impl Default for TemporalIndex {
413    fn default() -> Self {
414        Self::new()
415    }
416}
417
418impl TemporalIndex {
419    pub fn new() -> Self {
420        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
421    }
422
423    pub fn with_capacity(capacity: usize) -> Self {
424        TemporalIndex {
425            index: std::sync::Mutex::new(LruCache::new(
426                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
427            )),
428        }
429    }
430
431    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
432        let key = value_to_cache_key(lookup_value);
433        let mut cache = self.index.lock().unwrap();
434        if let Some(entries) = cache.get(&key) {
435            for i in (0..entries.len()).rev() {
436                if entries[i].1 <= timestamp {
437                    return Some(entries[i].0.clone());
438                }
439            }
440        }
441        None
442    }
443
444    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
445        let key = value_to_cache_key(lookup_value);
446        let mut cache = self.index.lock().unwrap();
447        if let Some(entries) = cache.get(&key) {
448            if let Some(last) = entries.last() {
449                return Some(last.0.clone());
450            }
451        }
452        None
453    }
454
455    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
456        let key = value_to_cache_key(&lookup_value);
457        let mut cache = self.index.lock().unwrap();
458
459        let entries = cache.get_or_insert_mut(key, Vec::new);
460        entries.push((primary_key, timestamp));
461        entries.sort_by_key(|(_, ts)| *ts);
462
463        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
464        entries.retain(|(_, ts)| *ts >= cutoff);
465
466        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
467            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
468            entries.drain(0..excess);
469        }
470    }
471
472    pub fn len(&self) -> usize {
473        self.index.lock().unwrap().len()
474    }
475
476    pub fn is_empty(&self) -> bool {
477        self.index.lock().unwrap().is_empty()
478    }
479
480    pub fn total_entries(&self) -> usize {
481        self.index
482            .lock()
483            .unwrap()
484            .iter()
485            .map(|(_, entries)| entries.len())
486            .sum()
487    }
488
489    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
490        let mut cache = self.index.lock().unwrap();
491        let mut total_removed = 0;
492
493        for (_, entries) in cache.iter_mut() {
494            let original_len = entries.len();
495            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
496            total_removed += original_len - entries.len();
497        }
498
499        total_removed
500    }
501}
502
503#[derive(Debug)]
504pub struct PdaReverseLookup {
505    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
506    index: LruCache<String, String>,
507}
508
509impl PdaReverseLookup {
510    pub fn new(capacity: usize) -> Self {
511        PdaReverseLookup {
512            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
513        }
514    }
515
516    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
517        self.index.get(pda_address).cloned()
518    }
519
520    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
521        let evicted = if self.index.len() >= self.index.cap().get() {
522            self.index.peek_lru().map(|(k, _)| k.clone())
523        } else {
524            None
525        };
526
527        self.index.put(pda_address, seed_value);
528        evicted
529    }
530
531    pub fn len(&self) -> usize {
532        self.index.len()
533    }
534
535    pub fn is_empty(&self) -> bool {
536        self.index.is_empty()
537    }
538
539    pub fn contains(&self, pda_address: &str) -> bool {
540        self.index.peek(pda_address).is_some()
541    }
542}
543
544/// Input for queueing an account update.
545#[derive(Debug, Clone)]
546pub struct QueuedAccountUpdate {
547    pub pda_address: String,
548    pub account_type: String,
549    pub account_data: Value,
550    pub slot: u64,
551    pub write_version: u64,
552    pub signature: String,
553}
554
555/// Internal representation of a pending account update with queue metadata.
556#[derive(Debug, Clone)]
557pub struct PendingAccountUpdate {
558    pub account_type: String,
559    pub pda_address: String,
560    pub account_data: Value,
561    pub slot: u64,
562    pub write_version: u64,
563    pub signature: String,
564    pub queued_at: i64,
565}
566
567/// Input for queueing an instruction event when PDA lookup fails.
568#[derive(Debug, Clone)]
569pub struct QueuedInstructionEvent {
570    pub pda_address: String,
571    pub event_type: String,
572    pub event_data: Value,
573    pub slot: u64,
574    pub signature: String,
575}
576
577/// Internal representation of a pending instruction event with queue metadata.
578#[derive(Debug, Clone)]
579pub struct PendingInstructionEvent {
580    pub event_type: String,
581    pub pda_address: String,
582    pub event_data: Value,
583    pub slot: u64,
584    pub signature: String,
585    pub queued_at: i64,
586}
587
588#[derive(Debug, Clone)]
589pub struct DeferredWhenOperation {
590    pub entity_name: String,
591    pub primary_key: Value,
592    pub field_path: String,
593    pub field_value: Value,
594    pub when_instruction: String,
595    pub signature: String,
596    pub slot: u64,
597    pub deferred_at: i64,
598    pub emit: bool,
599}
600
601#[derive(Debug, Clone)]
602pub struct ResolverRequest {
603    pub cache_key: String,
604    pub resolver: ResolverType,
605    pub input: Value,
606}
607
608#[derive(Debug, Clone)]
609pub struct ResolverTarget {
610    pub state_id: u32,
611    pub entity_name: String,
612    pub primary_key: Value,
613    pub extracts: Vec<ResolverExtractSpec>,
614}
615
616#[derive(Debug, Clone)]
617pub struct PendingResolverEntry {
618    pub resolver: ResolverType,
619    pub input: Value,
620    pub targets: Vec<ResolverTarget>,
621    pub queued_at: i64,
622}
623
624#[derive(Debug, Clone, PartialEq)]
625pub struct ScheduledCallback {
626    pub state_id: u32,
627    pub entity_name: String,
628    pub primary_key: Value,
629    pub resolver: ResolverType,
630    pub url_template: Option<Vec<ast::UrlTemplatePart>>,
631    pub input_value: Option<Value>,
632    pub input_path: Option<String>,
633    pub condition: Option<ast::ResolverCondition>,
634    pub strategy: ResolveStrategy,
635    pub extracts: Vec<ResolverExtractSpec>,
636    pub retry_count: u32,
637}
638
639impl PendingResolverEntry {
640    fn add_target(&mut self, target: ResolverTarget) {
641        if let Some(existing) = self.targets.iter_mut().find(|t| {
642            t.state_id == target.state_id
643                && t.entity_name == target.entity_name
644                && t.primary_key == target.primary_key
645        }) {
646            let mut seen = HashSet::new();
647            for extract in &existing.extracts {
648                seen.insert((extract.target_path.clone(), extract.source_path.clone()));
649            }
650
651            for extract in target.extracts {
652                let key = (extract.target_path.clone(), extract.source_path.clone());
653                if seen.insert(key) {
654                    existing.extracts.push(extract);
655                }
656            }
657        } else {
658            self.targets.push(target);
659        }
660    }
661}
662
663#[derive(Debug, Clone)]
664pub struct PendingQueueStats {
665    pub total_updates: usize,
666    pub unique_pdas: usize,
667    pub oldest_age_seconds: i64,
668    pub largest_pda_queue_size: usize,
669    pub estimated_memory_bytes: usize,
670}
671
672#[derive(Debug, Clone, Default)]
673pub struct VmMemoryStats {
674    pub state_table_entity_count: usize,
675    pub state_table_max_entries: usize,
676    pub state_table_at_capacity: bool,
677    pub lookup_index_count: usize,
678    pub lookup_index_total_entries: usize,
679    pub temporal_index_count: usize,
680    pub temporal_index_total_entries: usize,
681    pub pda_reverse_lookup_count: usize,
682    pub pda_reverse_lookup_total_entries: usize,
683    pub version_tracker_entries: usize,
684    pub pending_queue_stats: Option<PendingQueueStats>,
685    pub path_cache_size: usize,
686}
687
688#[derive(Debug, Clone, Default)]
689pub struct CleanupResult {
690    pub pending_updates_removed: usize,
691    pub temporal_entries_removed: usize,
692}
693
694#[derive(Debug, Clone)]
695pub struct CapacityWarning {
696    pub current_entries: usize,
697    pub max_entries: usize,
698    pub entries_over_limit: usize,
699}
700
701#[derive(Debug, Clone)]
702pub struct StateTableConfig {
703    pub max_entries: usize,
704    pub max_array_length: usize,
705}
706
707impl Default for StateTableConfig {
708    fn default() -> Self {
709        Self {
710            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
711            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
712        }
713    }
714}
715
716#[derive(Debug)]
717pub struct VersionTracker {
718    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
719}
720
721impl VersionTracker {
722    pub fn new() -> Self {
723        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
724    }
725
726    pub fn with_capacity(capacity: usize) -> Self {
727        VersionTracker {
728            cache: std::sync::Mutex::new(LruCache::new(
729                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
730            )),
731        }
732    }
733
734    fn make_key(primary_key: &Value, event_type: &str) -> String {
735        format!("{}:{}", primary_key, event_type)
736    }
737
738    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
739        let key = Self::make_key(primary_key, event_type);
740        self.cache.lock().unwrap().get(&key).copied()
741    }
742
743    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
744        let key = Self::make_key(primary_key, event_type);
745        self.cache.lock().unwrap().put(key, (slot, ordering_value));
746    }
747
748    pub fn len(&self) -> usize {
749        self.cache.lock().unwrap().len()
750    }
751
752    pub fn is_empty(&self) -> bool {
753        self.cache.lock().unwrap().is_empty()
754    }
755}
756
757impl Default for VersionTracker {
758    fn default() -> Self {
759        Self::new()
760    }
761}
762
763#[derive(Debug)]
764pub struct StateTable {
765    pub data: DashMap<Value, Value>,
766    access_times: DashMap<Value, i64>,
767    pub lookup_indexes: HashMap<String, LookupIndex>,
768    pub temporal_indexes: HashMap<String, TemporalIndex>,
769    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
770    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
771    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
772    version_tracker: VersionTracker,
773    instruction_dedup_cache: VersionTracker,
774    config: StateTableConfig,
775    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
776    entity_name: String,
777    pub recent_tx_instructions:
778        std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
779    pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
780}
781
782impl StateTable {
783    pub fn is_at_capacity(&self) -> bool {
784        self.data.len() >= self.config.max_entries
785    }
786
787    pub fn entries_over_limit(&self) -> usize {
788        self.data.len().saturating_sub(self.config.max_entries)
789    }
790
791    pub fn max_array_length(&self) -> usize {
792        self.config.max_array_length
793    }
794
795    fn touch(&self, key: &Value) {
796        let now = std::time::SystemTime::now()
797            .duration_since(std::time::UNIX_EPOCH)
798            .unwrap()
799            .as_secs() as i64;
800        self.access_times.insert(key.clone(), now);
801    }
802
803    fn evict_lru(&self, count: usize) -> usize {
804        if count == 0 || self.data.is_empty() {
805            return 0;
806        }
807
808        let mut entries: Vec<(Value, i64)> = self
809            .access_times
810            .iter()
811            .map(|entry| (entry.key().clone(), *entry.value()))
812            .collect();
813
814        entries.sort_by_key(|(_, ts)| *ts);
815
816        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
817
818        let mut evicted = 0;
819        for key in to_evict {
820            self.data.remove(&key);
821            self.access_times.remove(&key);
822            evicted += 1;
823        }
824
825        #[cfg(feature = "otel")]
826        if evicted > 0 {
827            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
828        }
829
830        evicted
831    }
832
833    pub fn insert_with_eviction(&self, key: Value, value: Value) {
834        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
835            #[cfg(feature = "otel")]
836            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
837            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
838            self.evict_lru(to_evict.max(1));
839        }
840        self.data.insert(key.clone(), value);
841        self.touch(&key);
842    }
843
844    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
845        let result = self.data.get(key).map(|v| v.clone());
846        if result.is_some() {
847            self.touch(key);
848        }
849        result
850    }
851
852    /// Check if an update is fresh and update the version tracker.
853    /// Returns true if the update should be processed (is fresh).
854    /// Returns false if the update is stale and should be skipped.
855    ///
856    /// Comparison is lexicographic on (slot, ordering_value):
857    /// (100, 5) > (100, 3) > (99, 999)
858    pub fn is_fresh_update(
859        &self,
860        primary_key: &Value,
861        event_type: &str,
862        slot: u64,
863        ordering_value: u64,
864    ) -> bool {
865        let dominated = self
866            .version_tracker
867            .get(primary_key, event_type)
868            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
869            .unwrap_or(false);
870
871        if dominated {
872            return false;
873        }
874
875        self.version_tracker
876            .insert(primary_key, event_type, slot, ordering_value);
877        true
878    }
879
880    /// Check if an instruction is a duplicate of one we've seen recently.
881    /// Returns true if this exact instruction has been seen before (is a duplicate).
882    /// Returns false if this is a new instruction that should be processed.
883    ///
884    /// Unlike account updates, instructions don't use recency checks - all
885    /// unique instructions are processed. Only exact duplicates are skipped.
886    /// Uses a smaller cache capacity for shorter effective TTL.
887    pub fn is_duplicate_instruction(
888        &self,
889        primary_key: &Value,
890        event_type: &str,
891        slot: u64,
892        txn_index: u64,
893    ) -> bool {
894        // Check if we've seen this exact instruction before
895        let is_duplicate = self
896            .instruction_dedup_cache
897            .get(primary_key, event_type)
898            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
899            .unwrap_or(false);
900
901        if is_duplicate {
902            return true;
903        }
904
905        // Record this instruction for deduplication
906        self.instruction_dedup_cache
907            .insert(primary_key, event_type, slot, txn_index);
908        false
909    }
910}
911
912impl VmContext {
913    pub fn new() -> Self {
914        let mut vm = VmContext {
915            registers: vec![Value::Null; 256],
916            states: HashMap::new(),
917            instructions_executed: 0,
918            cache_hits: 0,
919            path_cache: HashMap::new(),
920            pda_cache_hits: 0,
921            pda_cache_misses: 0,
922            pending_queue_size: 0,
923            resolver_requests: VecDeque::new(),
924            resolver_pending: HashMap::new(),
925            resolver_cache: LruCache::new(
926                NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
927                    .expect("capacity must be > 0"),
928            ),
929            resolver_cache_hits: 0,
930            resolver_cache_misses: 0,
931            current_context: None,
932            warnings: Vec::new(),
933            last_pda_lookup_miss: None,
934            last_lookup_index_miss: None,
935            last_pda_registered: None,
936            last_lookup_index_keys: Vec::new(),
937            scheduled_callbacks: Vec::new(),
938        };
939        vm.states.insert(
940            0,
941            StateTable {
942                data: DashMap::new(),
943                access_times: DashMap::new(),
944                lookup_indexes: HashMap::new(),
945                temporal_indexes: HashMap::new(),
946                pda_reverse_lookups: HashMap::new(),
947                pending_updates: DashMap::new(),
948                pending_instruction_events: DashMap::new(),
949                version_tracker: VersionTracker::new(),
950                instruction_dedup_cache: VersionTracker::with_capacity(
951                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
952                ),
953                config: StateTableConfig::default(),
954                entity_name: "default".to_string(),
955                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
956                    NonZeroUsize::new(1000).unwrap(),
957                )),
958                deferred_when_ops: DashMap::new(),
959            },
960        );
961        vm
962    }
963
964    pub fn new_with_config(state_config: StateTableConfig) -> Self {
965        let mut vm = VmContext {
966            registers: vec![Value::Null; 256],
967            states: HashMap::new(),
968            instructions_executed: 0,
969            cache_hits: 0,
970            path_cache: HashMap::new(),
971            pda_cache_hits: 0,
972            pda_cache_misses: 0,
973            pending_queue_size: 0,
974            resolver_requests: VecDeque::new(),
975            resolver_pending: HashMap::new(),
976            resolver_cache: LruCache::new(
977                NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
978                    .expect("capacity must be > 0"),
979            ),
980            resolver_cache_hits: 0,
981            resolver_cache_misses: 0,
982            current_context: None,
983            warnings: Vec::new(),
984            last_pda_lookup_miss: None,
985            last_lookup_index_miss: None,
986            last_pda_registered: None,
987            last_lookup_index_keys: Vec::new(),
988            scheduled_callbacks: Vec::new(),
989        };
990        vm.states.insert(
991            0,
992            StateTable {
993                data: DashMap::new(),
994                access_times: DashMap::new(),
995                lookup_indexes: HashMap::new(),
996                temporal_indexes: HashMap::new(),
997                pda_reverse_lookups: HashMap::new(),
998                pending_updates: DashMap::new(),
999                pending_instruction_events: DashMap::new(),
1000                version_tracker: VersionTracker::new(),
1001                instruction_dedup_cache: VersionTracker::with_capacity(
1002                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1003                ),
1004                config: state_config,
1005                entity_name: "default".to_string(),
1006                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1007                    NonZeroUsize::new(1000).unwrap(),
1008                )),
1009                deferred_when_ops: DashMap::new(),
1010            },
1011        );
1012        vm
1013    }
1014
1015    pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1016        self.resolver_requests.drain(..).collect()
1017    }
1018
1019    pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1020        std::mem::take(&mut self.scheduled_callbacks)
1021    }
1022
1023    pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1024        self.states.get(&state_id)?.get_and_touch(key)
1025    }
1026
1027    pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1028        if requests.is_empty() {
1029            return;
1030        }
1031
1032        self.resolver_requests.extend(requests);
1033    }
1034
1035    pub fn apply_resolver_result(
1036        &mut self,
1037        bytecode: &MultiEntityBytecode,
1038        cache_key: &str,
1039        resolved_value: Value,
1040    ) -> Result<Vec<Mutation>> {
1041        self.resolver_cache
1042            .put(cache_key.to_string(), resolved_value.clone());
1043
1044        let entry = match self.resolver_pending.remove(cache_key) {
1045            Some(entry) => entry,
1046            None => return Ok(Vec::new()),
1047        };
1048
1049        let mut mutations = Vec::new();
1050
1051        for target in entry.targets {
1052            if target.primary_key.is_null() {
1053                continue;
1054            }
1055
1056            let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1057                Some(bc) => bc,
1058                None => continue,
1059            };
1060
1061            let state = match self.states.get(&target.state_id) {
1062                Some(s) => s,
1063                None => continue,
1064            };
1065
1066            let mut entity_state = state
1067                .get_and_touch(&target.primary_key)
1068                .unwrap_or_else(|| json!({}));
1069
1070            let mut dirty_tracker = DirtyTracker::new();
1071            let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1072
1073            Self::apply_resolver_extractions_to_value(
1074                &mut entity_state,
1075                &resolved_value,
1076                &target.extracts,
1077                &mut dirty_tracker,
1078                &should_emit,
1079            )?;
1080
1081            if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1082                let old_values: Vec<_> = entity_bytecode
1083                    .computed_paths
1084                    .iter()
1085                    .map(|path| Self::get_value_at_path(&entity_state, path))
1086                    .collect();
1087
1088                let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1089                let context_timestamp = self
1090                    .current_context
1091                    .as_ref()
1092                    .map(|c| c.timestamp())
1093                    .unwrap_or_else(|| {
1094                        std::time::SystemTime::now()
1095                            .duration_since(std::time::UNIX_EPOCH)
1096                            .unwrap()
1097                            .as_secs() as i64
1098                    });
1099                let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1100                if eval_result.is_ok() {
1101                    for (path, old_value) in
1102                        entity_bytecode.computed_paths.iter().zip(old_values.iter())
1103                    {
1104                        let new_value = Self::get_value_at_path(&entity_state, path);
1105                        if new_value != *old_value && should_emit(path) {
1106                            dirty_tracker.mark_replaced(path);
1107                        }
1108                    }
1109                }
1110            }
1111
1112            state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1113
1114            if dirty_tracker.is_empty() {
1115                continue;
1116            }
1117
1118            let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1119
1120            mutations.push(Mutation {
1121                export: target.entity_name.clone(),
1122                key: target.primary_key.clone(),
1123                patch,
1124                append: vec![],
1125            });
1126        }
1127
1128        Ok(mutations)
1129    }
1130
1131    pub fn enqueue_resolver_request(
1132        &mut self,
1133        cache_key: String,
1134        resolver: ResolverType,
1135        input: Value,
1136        target: ResolverTarget,
1137    ) {
1138        if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1139            entry.add_target(target);
1140            return;
1141        }
1142
1143        let queued_at = std::time::SystemTime::now()
1144            .duration_since(std::time::UNIX_EPOCH)
1145            .unwrap()
1146            .as_secs() as i64;
1147
1148        self.resolver_pending.insert(
1149            cache_key.clone(),
1150            PendingResolverEntry {
1151                resolver: resolver.clone(),
1152                input: input.clone(),
1153                targets: vec![target],
1154                queued_at,
1155            },
1156        );
1157
1158        self.resolver_requests.push_back(ResolverRequest {
1159            cache_key,
1160            resolver,
1161            input,
1162        });
1163    }
1164
1165    fn apply_resolver_extractions_to_value<F>(
1166        state: &mut Value,
1167        resolved_value: &Value,
1168        extracts: &[ResolverExtractSpec],
1169        dirty_tracker: &mut DirtyTracker,
1170        should_emit: &F,
1171    ) -> Result<()>
1172    where
1173        F: Fn(&str) -> bool,
1174    {
1175        for extract in extracts {
1176            let resolved = match extract.source_path.as_deref() {
1177                Some(path) => Self::get_value_at_path(resolved_value, path),
1178                None => Some(resolved_value.clone()),
1179            };
1180
1181            let Some(value) = resolved else {
1182                continue;
1183            };
1184
1185            let value = if let Some(transform) = &extract.transform {
1186                Self::apply_transformation(&value, transform)?
1187            } else {
1188                value
1189            };
1190
1191            Self::set_nested_field_value(state, &extract.target_path, value)?;
1192            if should_emit(&extract.target_path) {
1193                dirty_tracker.mark_replaced(&extract.target_path);
1194            }
1195        }
1196
1197        Ok(())
1198    }
1199
1200    fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1201        if tracker.is_empty() {
1202            return Ok(json!({}));
1203        }
1204
1205        let mut partial = serde_json::Map::new();
1206
1207        for (path, change) in tracker.iter() {
1208            let segments: Vec<&str> = path.split('.').collect();
1209
1210            let value_to_insert = match change {
1211                FieldChange::Replaced => {
1212                    let mut current = state;
1213                    let mut found = true;
1214
1215                    for segment in &segments {
1216                        match current.get(*segment) {
1217                            Some(v) => current = v,
1218                            None => {
1219                                found = false;
1220                                break;
1221                            }
1222                        }
1223                    }
1224
1225                    if !found {
1226                        continue;
1227                    }
1228                    current.clone()
1229                }
1230                FieldChange::Appended(values) => Value::Array(values.clone()),
1231            };
1232
1233            let mut target = &mut partial;
1234            for (i, segment) in segments.iter().enumerate() {
1235                if i == segments.len() - 1 {
1236                    target.insert(segment.to_string(), value_to_insert.clone());
1237                } else {
1238                    target
1239                        .entry(segment.to_string())
1240                        .or_insert_with(|| json!({}));
1241                    target = target
1242                        .get_mut(*segment)
1243                        .and_then(|v| v.as_object_mut())
1244                        .ok_or("Failed to build nested structure")?;
1245                }
1246            }
1247        }
1248
1249        Ok(Value::Object(partial))
1250    }
1251
1252    /// Get a mutable reference to a state table by ID
1253    /// Returns None if the state ID doesn't exist
1254    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1255        self.states.get_mut(&state_id)
1256    }
1257
1258    /// Get public access to registers (for metrics context)
1259    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1260        &mut self.registers
1261    }
1262
1263    /// Get public access to path cache (for metrics context)
1264    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1265        &self.path_cache
1266    }
1267
1268    /// Get the current update context
1269    pub fn current_context(&self) -> Option<&UpdateContext> {
1270        self.current_context.as_ref()
1271    }
1272
1273    /// Set the current update context for computed field evaluation
1274    pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1275        self.current_context = context;
1276    }
1277
1278    fn add_warning(&mut self, msg: String) {
1279        self.warnings.push(msg);
1280    }
1281
1282    pub fn take_warnings(&mut self) -> Vec<String> {
1283        std::mem::take(&mut self.warnings)
1284    }
1285
1286    pub fn has_warnings(&self) -> bool {
1287        !self.warnings.is_empty()
1288    }
1289
1290    pub fn update_state_from_register(
1291        &mut self,
1292        state_id: u32,
1293        key: Value,
1294        register: Register,
1295    ) -> Result<()> {
1296        let state = self.states.get(&state_id).ok_or("State table not found")?;
1297        let value = self.registers[register].clone();
1298        state.insert_with_eviction(key, value);
1299        Ok(())
1300    }
1301
1302    fn reset_registers(&mut self) {
1303        for reg in &mut self.registers {
1304            *reg = Value::Null;
1305        }
1306    }
1307
1308    /// Extract only the dirty fields from state (public for use by instruction hooks)
1309    pub fn extract_partial_state(
1310        &self,
1311        state_reg: Register,
1312        dirty_fields: &HashSet<String>,
1313    ) -> Result<Value> {
1314        let full_state = &self.registers[state_reg];
1315
1316        if dirty_fields.is_empty() {
1317            return Ok(json!({}));
1318        }
1319
1320        let mut partial = serde_json::Map::new();
1321
1322        for path in dirty_fields {
1323            let segments: Vec<&str> = path.split('.').collect();
1324
1325            let mut current = full_state;
1326            let mut found = true;
1327
1328            for segment in &segments {
1329                match current.get(segment) {
1330                    Some(v) => current = v,
1331                    None => {
1332                        found = false;
1333                        break;
1334                    }
1335                }
1336            }
1337
1338            if !found {
1339                continue;
1340            }
1341
1342            let mut target = &mut partial;
1343            for (i, segment) in segments.iter().enumerate() {
1344                if i == segments.len() - 1 {
1345                    target.insert(segment.to_string(), current.clone());
1346                } else {
1347                    target
1348                        .entry(segment.to_string())
1349                        .or_insert_with(|| json!({}));
1350                    target = target
1351                        .get_mut(*segment)
1352                        .and_then(|v| v.as_object_mut())
1353                        .ok_or("Failed to build nested structure")?;
1354                }
1355            }
1356        }
1357
1358        Ok(Value::Object(partial))
1359    }
1360
1361    /// Extract a patch from state based on the DirtyTracker.
1362    /// For Replaced fields: extracts the full value from state.
1363    /// For Appended fields: emits only the appended values as an array.
1364    pub fn extract_partial_state_with_tracker(
1365        &self,
1366        state_reg: Register,
1367        tracker: &DirtyTracker,
1368    ) -> Result<Value> {
1369        let full_state = &self.registers[state_reg];
1370
1371        if tracker.is_empty() {
1372            return Ok(json!({}));
1373        }
1374
1375        let mut partial = serde_json::Map::new();
1376
1377        for (path, change) in tracker.iter() {
1378            let segments: Vec<&str> = path.split('.').collect();
1379
1380            let value_to_insert = match change {
1381                FieldChange::Replaced => {
1382                    let mut current = full_state;
1383                    let mut found = true;
1384
1385                    for segment in &segments {
1386                        match current.get(*segment) {
1387                            Some(v) => current = v,
1388                            None => {
1389                                found = false;
1390                                break;
1391                            }
1392                        }
1393                    }
1394
1395                    if !found {
1396                        continue;
1397                    }
1398                    current.clone()
1399                }
1400                FieldChange::Appended(values) => Value::Array(values.clone()),
1401            };
1402
1403            let mut target = &mut partial;
1404            for (i, segment) in segments.iter().enumerate() {
1405                if i == segments.len() - 1 {
1406                    target.insert(segment.to_string(), value_to_insert.clone());
1407                } else {
1408                    target
1409                        .entry(segment.to_string())
1410                        .or_insert_with(|| json!({}));
1411                    target = target
1412                        .get_mut(*segment)
1413                        .and_then(|v| v.as_object_mut())
1414                        .ok_or("Failed to build nested structure")?;
1415                }
1416            }
1417        }
1418
1419        Ok(Value::Object(partial))
1420    }
1421
1422    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1423        if let Some(compiled) = self.path_cache.get(path) {
1424            self.cache_hits += 1;
1425            #[cfg(feature = "otel")]
1426            crate::vm_metrics::record_path_cache_hit();
1427            return compiled.clone();
1428        }
1429        #[cfg(feature = "otel")]
1430        crate::vm_metrics::record_path_cache_miss();
1431        let compiled = CompiledPath::new(path);
1432        self.path_cache.insert(path.to_string(), compiled.clone());
1433        compiled
1434    }
1435
1436    /// Process an event with optional context metadata
1437    #[cfg_attr(feature = "otel", instrument(
1438        name = "vm.process_event",
1439        skip(self, bytecode, event_value, log),
1440        level = "info",
1441        fields(
1442            event_type = %event_type,
1443            slot = context.as_ref().and_then(|c| c.slot),
1444        )
1445    ))]
1446    pub fn process_event(
1447        &mut self,
1448        bytecode: &MultiEntityBytecode,
1449        event_value: Value,
1450        event_type: &str,
1451        context: Option<&UpdateContext>,
1452        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1453    ) -> Result<Vec<Mutation>> {
1454        self.current_context = context.cloned();
1455
1456        let mut event_value = event_value;
1457        if let Some(ctx) = context {
1458            if let Some(obj) = event_value.as_object_mut() {
1459                obj.insert("__update_context".to_string(), ctx.to_value());
1460            }
1461        }
1462
1463        let mut all_mutations = Vec::new();
1464
1465        if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1466            if let Some(ctx) = context {
1467                if let Some(signature) = ctx.signature.clone() {
1468                    let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1469                    for state_id in state_ids {
1470                        if let Some(state) = self.states.get(&state_id) {
1471                            {
1472                                let mut cache = state.recent_tx_instructions.lock().unwrap();
1473                                let entry =
1474                                    cache.get_or_insert_mut(signature.clone(), HashSet::new);
1475                                entry.insert(event_type.to_string());
1476                            }
1477
1478                            let key = (signature.clone(), event_type.to_string());
1479                            if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1480                                tracing::debug!(
1481                                    event_type = %event_type,
1482                                    signature = %signature,
1483                                    deferred_count = deferred_ops.len(),
1484                                    "flushing deferred when-ops"
1485                                );
1486                                for op in deferred_ops {
1487                                    match self.apply_deferred_when_op(state_id, &op) {
1488                                        Ok(mutations) => all_mutations.extend(mutations),
1489                                        Err(e) => tracing::warn!(
1490                                            "Failed to apply deferred when-op: {}",
1491                                            e
1492                                        ),
1493                                    }
1494                                }
1495                            }
1496                        }
1497                    }
1498                }
1499            }
1500        }
1501
1502        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1503            for entity_name in entity_names {
1504                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1505                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1506                        if let Some(ref mut log) = log {
1507                            log.set("entity", entity_name.clone());
1508                            log.inc("handlers", 1);
1509                        }
1510
1511                        let opcodes_before = self.instructions_executed;
1512                        let cache_before = self.cache_hits;
1513                        let pda_hits_before = self.pda_cache_hits;
1514                        let pda_misses_before = self.pda_cache_misses;
1515
1516                        let mutations = self.execute_handler(
1517                            handler,
1518                            &event_value,
1519                            event_type,
1520                            entity_bytecode.state_id,
1521                            entity_name,
1522                            entity_bytecode.computed_fields_evaluator.as_ref(),
1523                            Some(&entity_bytecode.non_emitted_fields),
1524                        )?;
1525
1526                        if let Some(ref mut log) = log {
1527                            log.inc(
1528                                "opcodes",
1529                                (self.instructions_executed - opcodes_before) as i64,
1530                            );
1531                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1532                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1533                            log.inc(
1534                                "pda_misses",
1535                                (self.pda_cache_misses - pda_misses_before) as i64,
1536                            );
1537                        }
1538
1539                        if mutations.is_empty() {
1540                            // CPI events (suffix "CpiEvent") are transaction-scoped like instructions
1541                            // (suffix "IxState") and should be queued the same way when PDA lookup fails.
1542                            let is_tx_event =
1543                                event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1544                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1545                                if is_tx_event {
1546                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1547                                    let signature = context
1548                                        .and_then(|c| c.signature.clone())
1549                                        .unwrap_or_default();
1550                                    let _ = self.queue_instruction_event(
1551                                        entity_bytecode.state_id,
1552                                        QueuedInstructionEvent {
1553                                            pda_address: missed_pda,
1554                                            event_type: event_type.to_string(),
1555                                            event_data: event_value.clone(),
1556                                            slot,
1557                                            signature,
1558                                        },
1559                                    );
1560                                } else {
1561                                    // Queue account updates (e.g. BondingCurve) when PDA
1562                                    // reverse lookup fails. These will be flushed when an
1563                                    // instruction registers the PDA mapping via
1564                                    // UpdateLookupIndex.
1565                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1566                                    let signature = context
1567                                        .and_then(|c| c.signature.clone())
1568                                        .unwrap_or_default();
1569                                    if let Some(write_version) =
1570                                        context.and_then(|c| c.write_version)
1571                                    {
1572                                        let _ = self.queue_account_update(
1573                                            entity_bytecode.state_id,
1574                                            QueuedAccountUpdate {
1575                                                pda_address: missed_pda,
1576                                                account_type: event_type.to_string(),
1577                                                account_data: event_value.clone(),
1578                                                slot,
1579                                                write_version,
1580                                                signature,
1581                                            },
1582                                        );
1583                                    } else {
1584                                        tracing::warn!(
1585                                            event_type = %event_type,
1586                                            "Dropping queued account update: write_version missing from context"
1587                                        );
1588                                    }
1589                                }
1590                            }
1591                            if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1592                                if !is_tx_event {
1593                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1594                                    let signature = context
1595                                        .and_then(|c| c.signature.clone())
1596                                        .unwrap_or_default();
1597                                    if let Some(write_version) =
1598                                        context.and_then(|c| c.write_version)
1599                                    {
1600                                        let _ = self.queue_account_update(
1601                                            entity_bytecode.state_id,
1602                                            QueuedAccountUpdate {
1603                                                pda_address: missed_lookup,
1604                                                account_type: event_type.to_string(),
1605                                                account_data: event_value.clone(),
1606                                                slot,
1607                                                write_version,
1608                                                signature,
1609                                            },
1610                                        );
1611                                    } else {
1612                                        tracing::trace!(
1613                                            event_type = %event_type,
1614                                            "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1615                                        );
1616                                    }
1617                                }
1618                            }
1619                        }
1620
1621                        all_mutations.extend(mutations);
1622
1623                        if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1624                            if let Some(ctx) = context {
1625                                if let Some(ref signature) = ctx.signature {
1626                                    if let Some(state) = self.states.get(&entity_bytecode.state_id)
1627                                    {
1628                                        {
1629                                            let mut cache =
1630                                                state.recent_tx_instructions.lock().unwrap();
1631                                            let entry = cache
1632                                                .get_or_insert_mut(signature.clone(), HashSet::new);
1633                                            entry.insert(event_type.to_string());
1634                                        }
1635
1636                                        let key = (signature.clone(), event_type.to_string());
1637                                        if let Some((_, deferred_ops)) =
1638                                            state.deferred_when_ops.remove(&key)
1639                                        {
1640                                            tracing::debug!(
1641                                                event_type = %event_type,
1642                                                signature = %signature,
1643                                                deferred_count = deferred_ops.len(),
1644                                                "flushing deferred when-ops"
1645                                            );
1646                                            for op in deferred_ops {
1647                                                match self.apply_deferred_when_op(
1648                                                    entity_bytecode.state_id,
1649                                                    &op,
1650                                                ) {
1651                                                    Ok(mutations) => {
1652                                                        all_mutations.extend(mutations)
1653                                                    }
1654                                                    Err(e) => {
1655                                                        tracing::warn!(
1656                                                            "Failed to apply deferred when-op: {}",
1657                                                            e
1658                                                        );
1659                                                    }
1660                                                }
1661                                            }
1662                                        }
1663                                    }
1664                                }
1665                            }
1666                        }
1667
1668                        if let Some(registered_pda) = self.take_last_pda_registered() {
1669                            let pending_events = self.flush_pending_instruction_events(
1670                                entity_bytecode.state_id,
1671                                &registered_pda,
1672                            );
1673                            for pending in pending_events {
1674                                if let Some(pending_handler) =
1675                                    entity_bytecode.handlers.get(&pending.event_type)
1676                                {
1677                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1678                                        pending_handler,
1679                                        &pending.event_data,
1680                                        &pending.event_type,
1681                                        entity_bytecode.state_id,
1682                                        entity_name,
1683                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1684                                        Some(&entity_bytecode.non_emitted_fields),
1685                                    ) {
1686                                        all_mutations.extend(reprocessed_mutations);
1687                                    }
1688                                }
1689                            }
1690                        }
1691
1692                        let lookup_keys = self.take_last_lookup_index_keys();
1693                        if !lookup_keys.is_empty() {
1694                            tracing::info!(
1695                                keys = ?lookup_keys,
1696                                entity = %entity_name,
1697                                "vm.process_event: flushing pending updates for lookup_keys"
1698                            );
1699                        }
1700                        for lookup_key in lookup_keys {
1701                            if let Ok(pending_updates) =
1702                                self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1703                            {
1704                                for pending in pending_updates {
1705                                    if let Some(pending_handler) =
1706                                        entity_bytecode.handlers.get(&pending.account_type)
1707                                    {
1708                                        self.current_context = Some(UpdateContext::new_account(
1709                                            pending.slot,
1710                                            pending.signature.clone(),
1711                                            pending.write_version,
1712                                        ));
1713                                        match self.execute_handler(
1714                                            pending_handler,
1715                                            &pending.account_data,
1716                                            &pending.account_type,
1717                                            entity_bytecode.state_id,
1718                                            entity_name,
1719                                            entity_bytecode.computed_fields_evaluator.as_ref(),
1720                                            Some(&entity_bytecode.non_emitted_fields),
1721                                        ) {
1722                                            Ok(reprocessed) => {
1723                                                all_mutations.extend(reprocessed);
1724                                            }
1725                                            Err(e) => {
1726                                                tracing::warn!(
1727                                                    error = %e,
1728                                                    account_type = %pending.account_type,
1729                                                    "Flushed event reprocessing failed"
1730                                                );
1731                                            }
1732                                        }
1733                                    }
1734                                }
1735                            }
1736                        }
1737                    } else if let Some(ref mut log) = log {
1738                        log.set("skip_reason", "no_handler");
1739                    }
1740                } else if let Some(ref mut log) = log {
1741                    log.set("skip_reason", "entity_not_found");
1742                }
1743            }
1744        } else if let Some(ref mut log) = log {
1745            log.set("skip_reason", "no_event_routing");
1746        }
1747
1748        if let Some(log) = log {
1749            log.set("mutations", all_mutations.len() as i64);
1750            if let Some(first) = all_mutations.first() {
1751                if let Some(key_str) = first.key.as_str() {
1752                    log.set("primary_key", key_str);
1753                } else if let Some(key_num) = first.key.as_u64() {
1754                    log.set("primary_key", key_num as i64);
1755                }
1756            }
1757            if let Some(state) = self.states.get(&0) {
1758                log.set("state_table_size", state.data.len() as i64);
1759            }
1760
1761            let warnings = self.take_warnings();
1762            if !warnings.is_empty() {
1763                log.set("warnings", warnings.len() as i64);
1764                log.set(
1765                    "warning_messages",
1766                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1767                );
1768                log.set_level(crate::canonical_log::LogLevel::Warn);
1769            }
1770        } else {
1771            self.warnings.clear();
1772        }
1773
1774        if self.instructions_executed.is_multiple_of(1000) {
1775            let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1776            for state_id in state_ids {
1777                let expired = self.cleanup_expired_when_ops(state_id, 60);
1778                if expired > 0 {
1779                    tracing::debug!(
1780                        "Cleaned up {} expired deferred when-ops for state {}",
1781                        expired,
1782                        state_id
1783                    );
1784                }
1785            }
1786        }
1787
1788        Ok(all_mutations)
1789    }
1790
1791    pub fn process_any(
1792        &mut self,
1793        bytecode: &MultiEntityBytecode,
1794        any: prost_types::Any,
1795    ) -> Result<Vec<Mutation>> {
1796        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1797        self.process_event(bytecode, event_value, &event_type, None, None)
1798    }
1799
1800    #[cfg_attr(feature = "otel", instrument(
1801        name = "vm.execute_handler",
1802        skip(self, handler, event_value, entity_evaluator),
1803        level = "debug",
1804        fields(
1805            event_type = %event_type,
1806            handler_opcodes = handler.len(),
1807        )
1808    ))]
1809    #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1810    fn execute_handler(
1811        &mut self,
1812        handler: &[OpCode],
1813        event_value: &Value,
1814        event_type: &str,
1815        override_state_id: u32,
1816        entity_name: &str,
1817        entity_evaluator: Option<
1818            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1819        >,
1820        non_emitted_fields: Option<&HashSet<String>>,
1821    ) -> Result<Vec<Mutation>> {
1822        self.reset_registers();
1823        self.last_pda_lookup_miss = None;
1824
1825        let mut pc: usize = 0;
1826        let mut output = Vec::new();
1827        let mut dirty_tracker = DirtyTracker::new();
1828        let should_emit = |path: &str| {
1829            non_emitted_fields
1830                .map(|fields| !fields.contains(path))
1831                .unwrap_or(true)
1832        };
1833
1834        while pc < handler.len() {
1835            match &handler[pc] {
1836                OpCode::LoadEventField {
1837                    path,
1838                    dest,
1839                    default,
1840                } => {
1841                    let value = self.load_field(event_value, path, default.as_ref())?;
1842                    self.registers[*dest] = value;
1843                    pc += 1;
1844                }
1845                OpCode::LoadConstant { value, dest } => {
1846                    self.registers[*dest] = value.clone();
1847                    pc += 1;
1848                }
1849                OpCode::CopyRegister { source, dest } => {
1850                    self.registers[*dest] = self.registers[*source].clone();
1851                    pc += 1;
1852                }
1853                OpCode::CopyRegisterIfNull { source, dest } => {
1854                    if self.registers[*dest].is_null() {
1855                        self.registers[*dest] = self.registers[*source].clone();
1856                    }
1857                    pc += 1;
1858                }
1859                OpCode::GetEventType { dest } => {
1860                    self.registers[*dest] = json!(event_type);
1861                    pc += 1;
1862                }
1863                OpCode::CreateObject { dest } => {
1864                    self.registers[*dest] = json!({});
1865                    pc += 1;
1866                }
1867                OpCode::SetField {
1868                    object,
1869                    path,
1870                    value,
1871                } => {
1872                    self.set_field_auto_vivify(*object, path, *value)?;
1873                    if should_emit(path) {
1874                        dirty_tracker.mark_replaced(path);
1875                    }
1876                    pc += 1;
1877                }
1878                OpCode::SetFields { object, fields } => {
1879                    for (path, value_reg) in fields {
1880                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1881                        if should_emit(path) {
1882                            dirty_tracker.mark_replaced(path);
1883                        }
1884                    }
1885                    pc += 1;
1886                }
1887                OpCode::GetField { object, path, dest } => {
1888                    let value = self.get_field(*object, path)?;
1889                    self.registers[*dest] = value;
1890                    pc += 1;
1891                }
1892                OpCode::ReadOrInitState {
1893                    state_id: _,
1894                    key,
1895                    default,
1896                    dest,
1897                } => {
1898                    let actual_state_id = override_state_id;
1899                    let entity_name_owned = entity_name.to_string();
1900                    self.states
1901                        .entry(actual_state_id)
1902                        .or_insert_with(|| StateTable {
1903                            data: DashMap::new(),
1904                            access_times: DashMap::new(),
1905                            lookup_indexes: HashMap::new(),
1906                            temporal_indexes: HashMap::new(),
1907                            pda_reverse_lookups: HashMap::new(),
1908                            pending_updates: DashMap::new(),
1909                            pending_instruction_events: DashMap::new(),
1910                            version_tracker: VersionTracker::new(),
1911                            instruction_dedup_cache: VersionTracker::with_capacity(
1912                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1913                            ),
1914                            config: StateTableConfig::default(),
1915                            entity_name: entity_name_owned,
1916                            recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1917                                NonZeroUsize::new(1000).unwrap(),
1918                            )),
1919                            deferred_when_ops: DashMap::new(),
1920                        });
1921                    let key_value = self.registers[*key].clone();
1922                    // Warn if key is null for account state events (not instruction events or CPI events)
1923                    let warn_null_key = key_value.is_null()
1924                        && event_type.ends_with("State")
1925                        && !event_type.ends_with("IxState")
1926                        && !event_type.ends_with("CpiEvent");
1927
1928                    if warn_null_key {
1929                        self.add_warning(format!(
1930                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1931                            key, event_type
1932                        ));
1933                    }
1934
1935                    let state = self
1936                        .states
1937                        .get(&actual_state_id)
1938                        .ok_or("State table not found")?;
1939
1940                    if !key_value.is_null() {
1941                        if let Some(ctx) = &self.current_context {
1942                            // Account updates: use recency check to discard stale updates
1943                            if ctx.is_account_update() {
1944                                if let (Some(slot), Some(write_version)) =
1945                                    (ctx.slot, ctx.write_version)
1946                                {
1947                                    if !state.is_fresh_update(
1948                                        &key_value,
1949                                        event_type,
1950                                        slot,
1951                                        write_version,
1952                                    ) {
1953                                        self.add_warning(format!(
1954                                            "Stale account update skipped: slot={}, write_version={}",
1955                                            slot, write_version
1956                                        ));
1957                                        return Ok(Vec::new());
1958                                    }
1959                                }
1960                            }
1961                            // Instruction updates: process all, but skip exact duplicates
1962                            else if ctx.is_instruction_update() {
1963                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1964                                    if state.is_duplicate_instruction(
1965                                        &key_value, event_type, slot, txn_index,
1966                                    ) {
1967                                        self.add_warning(format!(
1968                                            "Duplicate instruction skipped: slot={}, txn_index={}",
1969                                            slot, txn_index
1970                                        ));
1971                                        return Ok(Vec::new());
1972                                    }
1973                                }
1974                            }
1975                        }
1976                    }
1977                    let value = state
1978                        .get_and_touch(&key_value)
1979                        .unwrap_or_else(|| default.clone());
1980
1981                    self.registers[*dest] = value;
1982                    pc += 1;
1983                }
1984                OpCode::UpdateState {
1985                    state_id: _,
1986                    key,
1987                    value,
1988                } => {
1989                    let actual_state_id = override_state_id;
1990                    let state = self
1991                        .states
1992                        .get(&actual_state_id)
1993                        .ok_or("State table not found")?;
1994                    let key_value = self.registers[*key].clone();
1995                    let value_data = self.registers[*value].clone();
1996
1997                    state.insert_with_eviction(key_value, value_data);
1998                    pc += 1;
1999                }
2000                OpCode::AppendToArray {
2001                    object,
2002                    path,
2003                    value,
2004                } => {
2005                    let appended_value = self.registers[*value].clone();
2006                    let max_len = self
2007                        .states
2008                        .get(&override_state_id)
2009                        .map(|s| s.max_array_length())
2010                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2011                    self.append_to_array(*object, path, *value, max_len)?;
2012                    if should_emit(path) {
2013                        dirty_tracker.mark_appended(path, appended_value);
2014                    }
2015                    pc += 1;
2016                }
2017                OpCode::GetCurrentTimestamp { dest } => {
2018                    let timestamp = std::time::SystemTime::now()
2019                        .duration_since(std::time::UNIX_EPOCH)
2020                        .unwrap()
2021                        .as_secs() as i64;
2022                    self.registers[*dest] = json!(timestamp);
2023                    pc += 1;
2024                }
2025                OpCode::CreateEvent { dest, event_value } => {
2026                    let timestamp = std::time::SystemTime::now()
2027                        .duration_since(std::time::UNIX_EPOCH)
2028                        .unwrap()
2029                        .as_secs() as i64;
2030
2031                    // Filter out __update_context from the event data
2032                    let mut event_data = self.registers[*event_value].clone();
2033                    if let Some(obj) = event_data.as_object_mut() {
2034                        obj.remove("__update_context");
2035                    }
2036
2037                    // Create event with timestamp, data, and optional slot/signature from context
2038                    let mut event = serde_json::Map::new();
2039                    event.insert("timestamp".to_string(), json!(timestamp));
2040                    event.insert("data".to_string(), event_data);
2041
2042                    // Add slot and signature if available from current context
2043                    if let Some(ref ctx) = self.current_context {
2044                        if let Some(slot) = ctx.slot {
2045                            event.insert("slot".to_string(), json!(slot));
2046                        }
2047                        if let Some(ref signature) = ctx.signature {
2048                            event.insert("signature".to_string(), json!(signature));
2049                        }
2050                    }
2051
2052                    self.registers[*dest] = Value::Object(event);
2053                    pc += 1;
2054                }
2055                OpCode::CreateCapture {
2056                    dest,
2057                    capture_value,
2058                } => {
2059                    let timestamp = std::time::SystemTime::now()
2060                        .duration_since(std::time::UNIX_EPOCH)
2061                        .unwrap()
2062                        .as_secs() as i64;
2063
2064                    // Get the capture data (already filtered by load_field)
2065                    let capture_data = self.registers[*capture_value].clone();
2066
2067                    // Extract account_address from the original event if available
2068                    let account_address = event_value
2069                        .get("__account_address")
2070                        .and_then(|v| v.as_str())
2071                        .unwrap_or("")
2072                        .to_string();
2073
2074                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
2075                    let mut capture = serde_json::Map::new();
2076                    capture.insert("timestamp".to_string(), json!(timestamp));
2077                    capture.insert("account_address".to_string(), json!(account_address));
2078                    capture.insert("data".to_string(), capture_data);
2079
2080                    // Add slot and signature if available from current context
2081                    if let Some(ref ctx) = self.current_context {
2082                        if let Some(slot) = ctx.slot {
2083                            capture.insert("slot".to_string(), json!(slot));
2084                        }
2085                        if let Some(ref signature) = ctx.signature {
2086                            capture.insert("signature".to_string(), json!(signature));
2087                        }
2088                    }
2089
2090                    self.registers[*dest] = Value::Object(capture);
2091                    pc += 1;
2092                }
2093                OpCode::Transform {
2094                    source,
2095                    dest,
2096                    transformation,
2097                } => {
2098                    if source == dest {
2099                        self.transform_in_place(*source, transformation)?;
2100                    } else {
2101                        let source_value = &self.registers[*source];
2102                        let value = Self::apply_transformation(source_value, transformation)?;
2103                        self.registers[*dest] = value;
2104                    }
2105                    pc += 1;
2106                }
2107                OpCode::EmitMutation {
2108                    entity_name,
2109                    key,
2110                    state,
2111                } => {
2112                    let primary_key = self.registers[*key].clone();
2113
2114                    if primary_key.is_null() || dirty_tracker.is_empty() {
2115                        let reason = if dirty_tracker.is_empty() {
2116                            "no_fields_modified"
2117                        } else {
2118                            "null_primary_key"
2119                        };
2120                        self.add_warning(format!(
2121                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
2122                            entity_name,
2123                            reason,
2124                            dirty_tracker.len()
2125                        ));
2126                    } else {
2127                        let patch =
2128                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2129
2130                        let append = dirty_tracker.appended_paths();
2131                        let mutation = Mutation {
2132                            export: entity_name.clone(),
2133                            key: primary_key,
2134                            patch,
2135                            append,
2136                        };
2137                        output.push(mutation);
2138                    }
2139                    pc += 1;
2140                }
2141                OpCode::SetFieldIfNull {
2142                    object,
2143                    path,
2144                    value,
2145                } => {
2146                    let was_set = self.set_field_if_null(*object, path, *value)?;
2147                    if was_set && should_emit(path) {
2148                        dirty_tracker.mark_replaced(path);
2149                    }
2150                    pc += 1;
2151                }
2152                OpCode::SetFieldMax {
2153                    object,
2154                    path,
2155                    value,
2156                } => {
2157                    let was_updated = self.set_field_max(*object, path, *value)?;
2158                    if was_updated && should_emit(path) {
2159                        dirty_tracker.mark_replaced(path);
2160                    }
2161                    pc += 1;
2162                }
2163                OpCode::UpdateTemporalIndex {
2164                    state_id: _,
2165                    index_name,
2166                    lookup_value,
2167                    primary_key,
2168                    timestamp,
2169                } => {
2170                    let actual_state_id = override_state_id;
2171                    let state = self
2172                        .states
2173                        .get_mut(&actual_state_id)
2174                        .ok_or("State table not found")?;
2175                    let index = state
2176                        .temporal_indexes
2177                        .entry(index_name.clone())
2178                        .or_insert_with(TemporalIndex::new);
2179
2180                    let lookup_val = self.registers[*lookup_value].clone();
2181                    let pk_val = self.registers[*primary_key].clone();
2182                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2183                        val
2184                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
2185                        val as i64
2186                    } else {
2187                        return Err(format!(
2188                            "Timestamp must be a number (i64 or u64), got: {:?}",
2189                            self.registers[*timestamp]
2190                        )
2191                        .into());
2192                    };
2193
2194                    index.insert(lookup_val, pk_val, ts_val);
2195                    pc += 1;
2196                }
2197                OpCode::LookupTemporalIndex {
2198                    state_id: _,
2199                    index_name,
2200                    lookup_value,
2201                    timestamp,
2202                    dest,
2203                } => {
2204                    let actual_state_id = override_state_id;
2205                    let state = self
2206                        .states
2207                        .get(&actual_state_id)
2208                        .ok_or("State table not found")?;
2209                    let lookup_val = &self.registers[*lookup_value];
2210
2211                    let result = if self.registers[*timestamp].is_null() {
2212                        if let Some(index) = state.temporal_indexes.get(index_name) {
2213                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2214                        } else {
2215                            Value::Null
2216                        }
2217                    } else {
2218                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2219                            val
2220                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
2221                            val as i64
2222                        } else {
2223                            return Err(format!(
2224                                "Timestamp must be a number (i64 or u64), got: {:?}",
2225                                self.registers[*timestamp]
2226                            )
2227                            .into());
2228                        };
2229
2230                        if let Some(index) = state.temporal_indexes.get(index_name) {
2231                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2232                        } else {
2233                            Value::Null
2234                        }
2235                    };
2236
2237                    self.registers[*dest] = result;
2238                    pc += 1;
2239                }
2240                OpCode::UpdateLookupIndex {
2241                    state_id: _,
2242                    index_name,
2243                    lookup_value,
2244                    primary_key,
2245                } => {
2246                    let actual_state_id = override_state_id;
2247                    let state = self
2248                        .states
2249                        .get_mut(&actual_state_id)
2250                        .ok_or("State table not found")?;
2251                    let index = state
2252                        .lookup_indexes
2253                        .entry(index_name.clone())
2254                        .or_insert_with(LookupIndex::new);
2255
2256                    let lookup_val = self.registers[*lookup_value].clone();
2257                    let pk_val = self.registers[*primary_key].clone();
2258
2259                    index.insert(lookup_val.clone(), pk_val);
2260
2261                    // Track lookup keys so process_event can flush queued account updates
2262                    if let Some(key_str) = lookup_val.as_str() {
2263                        self.last_lookup_index_keys.push(key_str.to_string());
2264                    }
2265
2266                    pc += 1;
2267                }
2268                OpCode::LookupIndex {
2269                    state_id: _,
2270                    index_name,
2271                    lookup_value,
2272                    dest,
2273                } => {
2274                    let actual_state_id = override_state_id;
2275                    let mut current_value = self.registers[*lookup_value].clone();
2276
2277                    const MAX_CHAIN_DEPTH: usize = 5;
2278                    let mut iterations = 0;
2279
2280                    let final_result = if self.states.contains_key(&actual_state_id) {
2281                        loop {
2282                            iterations += 1;
2283                            if iterations > MAX_CHAIN_DEPTH {
2284                                break current_value;
2285                            }
2286
2287                            let resolved = self
2288                                .states
2289                                .get(&actual_state_id)
2290                                .and_then(|state| {
2291                                    if let Some(index) = state.lookup_indexes.get(index_name) {
2292                                        if let Some(found) = index.lookup(&current_value) {
2293                                            return Some(found);
2294                                        }
2295                                    }
2296
2297                                    for (name, index) in state.lookup_indexes.iter() {
2298                                        if name == index_name {
2299                                            continue;
2300                                        }
2301                                        if let Some(found) = index.lookup(&current_value) {
2302                                            return Some(found);
2303                                        }
2304                                    }
2305
2306                                    None
2307                                })
2308                                .unwrap_or(Value::Null);
2309
2310                            let mut resolved_from_pda = false;
2311                            let resolved = if resolved.is_null() {
2312                                if let Some(pda_str) = current_value.as_str() {
2313                                    resolved_from_pda = true;
2314                                    self.states
2315                                        .get_mut(&actual_state_id)
2316                                        .and_then(|state_mut| {
2317                                            state_mut
2318                                                .pda_reverse_lookups
2319                                                .get_mut("default_pda_lookup")
2320                                        })
2321                                        .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2322                                        .map(Value::String)
2323                                        .unwrap_or(Value::Null)
2324                                } else {
2325                                    Value::Null
2326                                }
2327                            } else {
2328                                resolved
2329                            };
2330
2331                            if resolved.is_null() {
2332                                if iterations == 1 {
2333                                    if let Some(pda_str) = current_value.as_str() {
2334                                        self.last_pda_lookup_miss = Some(pda_str.to_string());
2335                                    }
2336                                }
2337                                break Value::Null;
2338                            }
2339
2340                            let can_chain =
2341                                self.can_resolve_further(&resolved, actual_state_id, index_name);
2342
2343                            if !can_chain {
2344                                if resolved_from_pda {
2345                                    if let Some(resolved_str) = resolved.as_str() {
2346                                        self.last_lookup_index_miss =
2347                                            Some(resolved_str.to_string());
2348                                    }
2349                                    break Value::Null;
2350                                }
2351                                break resolved;
2352                            }
2353
2354                            current_value = resolved;
2355                        }
2356                    } else {
2357                        Value::Null
2358                    };
2359
2360                    self.registers[*dest] = final_result;
2361                    pc += 1;
2362                }
2363                OpCode::SetFieldSum {
2364                    object,
2365                    path,
2366                    value,
2367                } => {
2368                    let was_updated = self.set_field_sum(*object, path, *value)?;
2369                    if was_updated && should_emit(path) {
2370                        dirty_tracker.mark_replaced(path);
2371                    }
2372                    pc += 1;
2373                }
2374                OpCode::SetFieldIncrement { object, path } => {
2375                    let was_updated = self.set_field_increment(*object, path)?;
2376                    if was_updated && should_emit(path) {
2377                        dirty_tracker.mark_replaced(path);
2378                    }
2379                    pc += 1;
2380                }
2381                OpCode::SetFieldMin {
2382                    object,
2383                    path,
2384                    value,
2385                } => {
2386                    let was_updated = self.set_field_min(*object, path, *value)?;
2387                    if was_updated && should_emit(path) {
2388                        dirty_tracker.mark_replaced(path);
2389                    }
2390                    pc += 1;
2391                }
2392                OpCode::AddToUniqueSet {
2393                    state_id: _,
2394                    set_name,
2395                    value,
2396                    count_object,
2397                    count_path,
2398                } => {
2399                    let value_to_add = self.registers[*value].clone();
2400
2401                    // Store the unique set within the entity object, not in the state table
2402                    // This ensures each entity instance has its own unique set
2403                    let set_field_path = format!("__unique_set:{}", set_name);
2404
2405                    // Get or create the unique set from the entity object
2406                    let mut set: HashSet<Value> =
2407                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2408                            if !existing.is_null() {
2409                                serde_json::from_value(existing).unwrap_or_default()
2410                            } else {
2411                                HashSet::new()
2412                            }
2413                        } else {
2414                            HashSet::new()
2415                        };
2416
2417                    // Add value to set
2418                    let was_new = set.insert(value_to_add);
2419
2420                    // Store updated set back in the entity object
2421                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2422                    self.registers[100] = serde_json::to_value(set_as_vec)?;
2423                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2424
2425                    // Update the count field in the object
2426                    if was_new {
2427                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2428                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
2429                        if should_emit(count_path) {
2430                            dirty_tracker.mark_replaced(count_path);
2431                        }
2432                    }
2433
2434                    pc += 1;
2435                }
2436                OpCode::ConditionalSetField {
2437                    object,
2438                    path,
2439                    value,
2440                    condition_field,
2441                    condition_op,
2442                    condition_value,
2443                } => {
2444                    let field_value = self.load_field(event_value, condition_field, None)?;
2445                    let condition_met =
2446                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2447
2448                    if condition_met {
2449                        self.set_field_auto_vivify(*object, path, *value)?;
2450                        if should_emit(path) {
2451                            dirty_tracker.mark_replaced(path);
2452                        }
2453                    }
2454                    pc += 1;
2455                }
2456                OpCode::SetFieldWhen {
2457                    object,
2458                    path,
2459                    value,
2460                    when_instruction,
2461                    entity_name,
2462                    key_reg,
2463                    condition_field,
2464                    condition_op,
2465                    condition_value,
2466                } => {
2467                    let actual_state_id = override_state_id;
2468                    let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2469                        condition_field.as_ref(),
2470                        condition_op.as_ref(),
2471                        condition_value.as_ref(),
2472                    ) {
2473                        let field_value = self.load_field(event_value, field, None)?;
2474                        self.evaluate_comparison(&field_value, op, cond_value)?
2475                    } else {
2476                        true
2477                    };
2478
2479                    if !condition_met {
2480                        pc += 1;
2481                        continue;
2482                    }
2483
2484                    let signature = self
2485                        .current_context
2486                        .as_ref()
2487                        .and_then(|c| c.signature.clone())
2488                        .unwrap_or_default();
2489
2490                    let emit = should_emit(path);
2491
2492                    let instruction_seen = if !signature.is_empty() {
2493                        if let Some(state) = self.states.get(&actual_state_id) {
2494                            let mut cache = state.recent_tx_instructions.lock().unwrap();
2495                            cache
2496                                .get(&signature)
2497                                .map(|set| set.contains(when_instruction))
2498                                .unwrap_or(false)
2499                        } else {
2500                            false
2501                        }
2502                    } else {
2503                        false
2504                    };
2505
2506                    if instruction_seen {
2507                        self.set_field_auto_vivify(*object, path, *value)?;
2508                        if emit {
2509                            dirty_tracker.mark_replaced(path);
2510                        }
2511                    } else if !signature.is_empty() {
2512                        let deferred = DeferredWhenOperation {
2513                            entity_name: entity_name.clone(),
2514                            primary_key: self.registers[*key_reg].clone(),
2515                            field_path: path.clone(),
2516                            field_value: self.registers[*value].clone(),
2517                            when_instruction: when_instruction.clone(),
2518                            signature: signature.clone(),
2519                            slot: self
2520                                .current_context
2521                                .as_ref()
2522                                .and_then(|c| c.slot)
2523                                .unwrap_or(0),
2524                            deferred_at: std::time::SystemTime::now()
2525                                .duration_since(std::time::UNIX_EPOCH)
2526                                .unwrap()
2527                                .as_secs() as i64,
2528                            emit,
2529                        };
2530
2531                        if let Some(state) = self.states.get(&actual_state_id) {
2532                            let key = (signature, when_instruction.clone());
2533                            state
2534                                .deferred_when_ops
2535                                .entry(key)
2536                                .or_insert_with(Vec::new)
2537                                .push(deferred);
2538                        }
2539                    }
2540
2541                    pc += 1;
2542                }
2543                OpCode::SetFieldUnlessStopped {
2544                    object,
2545                    path,
2546                    value,
2547                    stop_field,
2548                    stop_instruction,
2549                    entity_name,
2550                    key_reg: _,
2551                } => {
2552                    let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2553                    let stopped = stop_value.as_bool().unwrap_or(false);
2554
2555                    if stopped {
2556                        tracing::debug!(
2557                            entity = %entity_name,
2558                            field = %path,
2559                            stop_field = %stop_field,
2560                            stop_instruction = %stop_instruction,
2561                            "stop flag set; skipping field update"
2562                        );
2563                        pc += 1;
2564                        continue;
2565                    }
2566
2567                    self.set_field_auto_vivify(*object, path, *value)?;
2568                    if should_emit(path) {
2569                        dirty_tracker.mark_replaced(path);
2570                    }
2571                    pc += 1;
2572                }
2573                OpCode::ConditionalIncrement {
2574                    object,
2575                    path,
2576                    condition_field,
2577                    condition_op,
2578                    condition_value,
2579                } => {
2580                    let field_value = self.load_field(event_value, condition_field, None)?;
2581                    let condition_met =
2582                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2583
2584                    if condition_met {
2585                        let was_updated = self.set_field_increment(*object, path)?;
2586                        if was_updated && should_emit(path) {
2587                            dirty_tracker.mark_replaced(path);
2588                        }
2589                    }
2590                    pc += 1;
2591                }
2592                OpCode::EvaluateComputedFields {
2593                    state,
2594                    computed_paths,
2595                } => {
2596                    if let Some(evaluator) = entity_evaluator {
2597                        let old_values: Vec<_> = computed_paths
2598                            .iter()
2599                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2600                            .collect();
2601
2602                        let state_value = &mut self.registers[*state];
2603                        let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2604                        let context_timestamp = self
2605                            .current_context
2606                            .as_ref()
2607                            .map(|c| c.timestamp())
2608                            .unwrap_or_else(|| {
2609                                std::time::SystemTime::now()
2610                                    .duration_since(std::time::UNIX_EPOCH)
2611                                    .unwrap()
2612                                    .as_secs() as i64
2613                            });
2614                        let eval_result = evaluator(state_value, context_slot, context_timestamp);
2615
2616                        if eval_result.is_ok() {
2617                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2618                                let new_value =
2619                                    Self::get_value_at_path(&self.registers[*state], path);
2620
2621                                if new_value != *old_value && should_emit(path) {
2622                                    dirty_tracker.mark_replaced(path);
2623                                }
2624                            }
2625                        }
2626                    }
2627                    pc += 1;
2628                }
2629                OpCode::QueueResolver {
2630                    state_id: _,
2631                    entity_name,
2632                    resolver,
2633                    input_path,
2634                    input_value,
2635                    url_template,
2636                    strategy,
2637                    extracts,
2638                    condition,
2639                    schedule_at,
2640                    state,
2641                    key,
2642                } => {
2643                    let actual_state_id = override_state_id;
2644
2645                    // Evaluate condition if present
2646                    if let Some(cond) = condition {
2647                        let field_val =
2648                            Self::get_value_at_path(&self.registers[*state], &cond.field_path)
2649                                .unwrap_or(Value::Null);
2650                        if !self.evaluate_comparison(&field_val, &cond.op, &cond.value)? {
2651                            pc += 1;
2652                            continue;
2653                        }
2654                    }
2655
2656                    // Check schedule_at: defer if target slot is in the future
2657                    if let Some(schedule_path) = schedule_at {
2658                        let target_val =
2659                            Self::get_value_at_path(&self.registers[*state], schedule_path);
2660
2661                        match target_val.and_then(|v| v.as_u64()) {
2662                            Some(target_slot) => {
2663                                let current_slot = self
2664                                    .current_context
2665                                    .as_ref()
2666                                    .and_then(|ctx| ctx.slot)
2667                                    .unwrap_or(0);
2668                                if current_slot < target_slot {
2669                                    let key_value = &self.registers[*key];
2670                                    if !key_value.is_null() {
2671                                        self.scheduled_callbacks.push((
2672                                            target_slot,
2673                                            ScheduledCallback {
2674                                                state_id: actual_state_id,
2675                                                entity_name: entity_name.clone(),
2676                                                primary_key: key_value.clone(),
2677                                                resolver: resolver.clone(),
2678                                                url_template: url_template.clone(),
2679                                                input_value: input_value.clone(),
2680                                                input_path: input_path.clone(),
2681                                                condition: condition.clone(),
2682                                                strategy: strategy.clone(),
2683                                                extracts: extracts.clone(),
2684                                                retry_count: 0,
2685                                            },
2686                                        ));
2687                                    }
2688                                    pc += 1;
2689                                    continue;
2690                                }
2691                                // current_slot >= target_slot: fall through to immediate resolution
2692                            }
2693                            None => {
2694                                // schedule_at path is missing or value is not a u64 —
2695                                // skip resolver entirely rather than executing immediately,
2696                                // since the state likely hasn't been fully populated yet.
2697                                tracing::warn!(
2698                                    schedule_at_path = %schedule_path,
2699                                    entity = %entity_name,
2700                                    "schedule_at field path is missing or not a valid u64 in state; \
2701                                     skipping resolver (state may not be fully populated yet)"
2702                                );
2703                                pc += 1;
2704                                continue;
2705                            }
2706                        }
2707                    }
2708
2709                    // Build resolver input from template, literal value, or field path
2710                    let resolved_input = if let Some(template) = url_template {
2711                        crate::scheduler::build_url_from_template(template, &self.registers[*state])
2712                            .map(Value::String)
2713                    } else if let Some(value) = input_value {
2714                        Some(value.clone())
2715                    } else if let Some(path) = input_path.as_ref() {
2716                        Self::get_value_at_path(&self.registers[*state], path)
2717                    } else {
2718                        None
2719                    };
2720
2721                    if let Some(input) = resolved_input {
2722                        let key_value = &self.registers[*key];
2723
2724                        if input.is_null() || key_value.is_null() {
2725                            tracing::warn!(
2726                                entity = %entity_name,
2727                                resolver = ?resolver,
2728                                input_path = %input_path.as_deref().unwrap_or("<literal>"),
2729                                input_is_null = input.is_null(),
2730                                key_is_null = key_value.is_null(),
2731                                input = ?input,
2732                                key = ?key_value,
2733                                "Resolver skipped: null input or key"
2734                            );
2735                            pc += 1;
2736                            continue;
2737                        }
2738
2739                        if matches!(strategy, ResolveStrategy::SetOnce)
2740                            // all(): fire resolver if any target is still null.
2741                            // Already-set fields are protected from overwrite by
2742                            // set_value_at_path's SetOnce null-source guard.
2743                            && extracts.iter().all(|extract| {
2744                                match Self::get_value_at_path(
2745                                    &self.registers[*state],
2746                                    &extract.target_path,
2747                                ) {
2748                                    Some(value) => !value.is_null(),
2749                                    None => false,
2750                                }
2751                            })
2752                        {
2753                            pc += 1;
2754                            continue;
2755                        }
2756
2757                        let cache_key = resolver_cache_key(resolver, &input);
2758
2759                        let cached_value = self.resolver_cache.get(&cache_key).cloned();
2760                        if let Some(cached) = cached_value {
2761                            self.resolver_cache_hits += 1;
2762                            Self::apply_resolver_extractions_to_value(
2763                                &mut self.registers[*state],
2764                                &cached,
2765                                extracts,
2766                                &mut dirty_tracker,
2767                                &should_emit,
2768                            )?;
2769                        } else {
2770                            self.resolver_cache_misses += 1;
2771                            let target = ResolverTarget {
2772                                state_id: actual_state_id,
2773                                entity_name: entity_name.clone(),
2774                                primary_key: self.registers[*key].clone(),
2775                                extracts: extracts.clone(),
2776                            };
2777
2778                            self.enqueue_resolver_request(
2779                                cache_key,
2780                                resolver.clone(),
2781                                input,
2782                                target,
2783                            );
2784                        }
2785                    } else {
2786                        tracing::warn!(
2787                            entity = %entity_name,
2788                            resolver = ?resolver,
2789                            input_path = %input_path.as_deref().unwrap_or("<literal>"),
2790                            state = ?self.registers[*state],
2791                            "Resolver skipped: input path not found in state"
2792                        );
2793                    }
2794
2795                    pc += 1;
2796                }
2797                OpCode::UpdatePdaReverseLookup {
2798                    state_id: _,
2799                    lookup_name,
2800                    pda_address,
2801                    primary_key,
2802                } => {
2803                    let actual_state_id = override_state_id;
2804                    let state = self
2805                        .states
2806                        .get_mut(&actual_state_id)
2807                        .ok_or("State table not found")?;
2808
2809                    let pda_val = self.registers[*pda_address].clone();
2810                    let pk_val = self.registers[*primary_key].clone();
2811
2812                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2813                        let pda_lookup = state
2814                            .pda_reverse_lookups
2815                            .entry(lookup_name.clone())
2816                            .or_insert_with(|| {
2817                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2818                            });
2819
2820                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2821                        self.last_pda_registered = Some(pda_str.to_string());
2822                    } else if !pk_val.is_null() {
2823                        if let Some(pk_num) = pk_val.as_u64() {
2824                            if let Some(pda_str) = pda_val.as_str() {
2825                                let pda_lookup = state
2826                                    .pda_reverse_lookups
2827                                    .entry(lookup_name.clone())
2828                                    .or_insert_with(|| {
2829                                        PdaReverseLookup::new(
2830                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2831                                        )
2832                                    });
2833
2834                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2835                                self.last_pda_registered = Some(pda_str.to_string());
2836                            }
2837                        }
2838                    }
2839
2840                    pc += 1;
2841                }
2842            }
2843
2844            self.instructions_executed += 1;
2845        }
2846
2847        Ok(output)
2848    }
2849
2850    fn load_field(
2851        &self,
2852        event_value: &Value,
2853        path: &FieldPath,
2854        default: Option<&Value>,
2855    ) -> Result<Value> {
2856        if path.segments.is_empty() {
2857            if let Some(obj) = event_value.as_object() {
2858                let filtered: serde_json::Map<String, Value> = obj
2859                    .iter()
2860                    .filter(|(k, _)| !k.starts_with("__"))
2861                    .map(|(k, v)| (k.clone(), v.clone()))
2862                    .collect();
2863                return Ok(Value::Object(filtered));
2864            }
2865            return Ok(event_value.clone());
2866        }
2867
2868        let mut current = event_value;
2869        for segment in path.segments.iter() {
2870            current = match current.get(segment) {
2871                Some(v) => v,
2872                None => {
2873                    tracing::trace!(
2874                        "load_field: segment={:?} not found in {:?}, returning default",
2875                        segment,
2876                        current
2877                    );
2878                    return Ok(default.cloned().unwrap_or(Value::Null));
2879                }
2880            };
2881        }
2882
2883        tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
2884        Ok(current.clone())
2885    }
2886
2887    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
2888        let mut current = value;
2889        for segment in path.split('.') {
2890            current = current.get(segment)?;
2891        }
2892        Some(current.clone())
2893    }
2894
2895    fn set_field_auto_vivify(
2896        &mut self,
2897        object_reg: Register,
2898        path: &str,
2899        value_reg: Register,
2900    ) -> Result<()> {
2901        let compiled = self.get_compiled_path(path);
2902        let segments = compiled.segments();
2903        let value = self.registers[value_reg].clone();
2904
2905        if !self.registers[object_reg].is_object() {
2906            self.registers[object_reg] = json!({});
2907        }
2908
2909        let obj = self.registers[object_reg]
2910            .as_object_mut()
2911            .ok_or("Not an object")?;
2912
2913        let mut current = obj;
2914        for (i, segment) in segments.iter().enumerate() {
2915            if i == segments.len() - 1 {
2916                current.insert(segment.to_string(), value);
2917                return Ok(());
2918            } else {
2919                current
2920                    .entry(segment.to_string())
2921                    .or_insert_with(|| json!({}));
2922                current = current
2923                    .get_mut(segment)
2924                    .and_then(|v| v.as_object_mut())
2925                    .ok_or("Path collision: expected object")?;
2926            }
2927        }
2928
2929        Ok(())
2930    }
2931
2932    fn set_field_if_null(
2933        &mut self,
2934        object_reg: Register,
2935        path: &str,
2936        value_reg: Register,
2937    ) -> Result<bool> {
2938        let compiled = self.get_compiled_path(path);
2939        let segments = compiled.segments();
2940        let value = self.registers[value_reg].clone();
2941
2942        // SetOnce should only set meaningful values. A null source typically means
2943        // the field doesn't exist in this event type (e.g., instruction events don't
2944        // have account data). Skip to preserve any existing value.
2945        if value.is_null() {
2946            return Ok(false);
2947        }
2948
2949        if !self.registers[object_reg].is_object() {
2950            self.registers[object_reg] = json!({});
2951        }
2952
2953        let obj = self.registers[object_reg]
2954            .as_object_mut()
2955            .ok_or("Not an object")?;
2956
2957        let mut current = obj;
2958        for (i, segment) in segments.iter().enumerate() {
2959            if i == segments.len() - 1 {
2960                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2961                    current.insert(segment.to_string(), value);
2962                    return Ok(true);
2963                }
2964                return Ok(false);
2965            } else {
2966                current
2967                    .entry(segment.to_string())
2968                    .or_insert_with(|| json!({}));
2969                current = current
2970                    .get_mut(segment)
2971                    .and_then(|v| v.as_object_mut())
2972                    .ok_or("Path collision: expected object")?;
2973            }
2974        }
2975
2976        Ok(false)
2977    }
2978
2979    fn set_field_max(
2980        &mut self,
2981        object_reg: Register,
2982        path: &str,
2983        value_reg: Register,
2984    ) -> Result<bool> {
2985        let compiled = self.get_compiled_path(path);
2986        let segments = compiled.segments();
2987        let new_value = self.registers[value_reg].clone();
2988
2989        if !self.registers[object_reg].is_object() {
2990            self.registers[object_reg] = json!({});
2991        }
2992
2993        let obj = self.registers[object_reg]
2994            .as_object_mut()
2995            .ok_or("Not an object")?;
2996
2997        let mut current = obj;
2998        for (i, segment) in segments.iter().enumerate() {
2999            if i == segments.len() - 1 {
3000                let should_update = if let Some(current_value) = current.get(segment) {
3001                    if current_value.is_null() {
3002                        true
3003                    } else {
3004                        match (current_value.as_i64(), new_value.as_i64()) {
3005                            (Some(current_val), Some(new_val)) => new_val > current_val,
3006                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3007                                new_value.as_u64().unwrap() as i64 > current_val
3008                            }
3009                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3010                                new_val > current_value.as_u64().unwrap() as i64
3011                            }
3012                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3013                                (Some(current_val), Some(new_val)) => new_val > current_val,
3014                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3015                                    (Some(current_val), Some(new_val)) => new_val > current_val,
3016                                    _ => false,
3017                                },
3018                            },
3019                            _ => false,
3020                        }
3021                    }
3022                } else {
3023                    true
3024                };
3025
3026                if should_update {
3027                    current.insert(segment.to_string(), new_value);
3028                    return Ok(true);
3029                }
3030                return Ok(false);
3031            } else {
3032                current
3033                    .entry(segment.to_string())
3034                    .or_insert_with(|| json!({}));
3035                current = current
3036                    .get_mut(segment)
3037                    .and_then(|v| v.as_object_mut())
3038                    .ok_or("Path collision: expected object")?;
3039            }
3040        }
3041
3042        Ok(false)
3043    }
3044
3045    fn set_field_sum(
3046        &mut self,
3047        object_reg: Register,
3048        path: &str,
3049        value_reg: Register,
3050    ) -> Result<bool> {
3051        let compiled = self.get_compiled_path(path);
3052        let segments = compiled.segments();
3053        let new_value = &self.registers[value_reg];
3054
3055        // Extract numeric value before borrowing object_reg mutably
3056        tracing::trace!(
3057            "set_field_sum: path={:?}, value={:?}, value_type={}",
3058            path,
3059            new_value,
3060            match new_value {
3061                serde_json::Value::Null => "null",
3062                serde_json::Value::Bool(_) => "bool",
3063                serde_json::Value::Number(_) => "number",
3064                serde_json::Value::String(_) => "string",
3065                serde_json::Value::Array(_) => "array",
3066                serde_json::Value::Object(_) => "object",
3067            }
3068        );
3069        let new_val_num = new_value
3070            .as_i64()
3071            .or_else(|| new_value.as_u64().map(|n| n as i64))
3072            .ok_or("Sum requires numeric value")?;
3073
3074        if !self.registers[object_reg].is_object() {
3075            self.registers[object_reg] = json!({});
3076        }
3077
3078        let obj = self.registers[object_reg]
3079            .as_object_mut()
3080            .ok_or("Not an object")?;
3081
3082        let mut current = obj;
3083        for (i, segment) in segments.iter().enumerate() {
3084            if i == segments.len() - 1 {
3085                let current_val = current
3086                    .get(segment)
3087                    .and_then(|v| {
3088                        if v.is_null() {
3089                            None
3090                        } else {
3091                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3092                        }
3093                    })
3094                    .unwrap_or(0);
3095
3096                let sum = current_val + new_val_num;
3097                current.insert(segment.to_string(), json!(sum));
3098                return Ok(true);
3099            } else {
3100                current
3101                    .entry(segment.to_string())
3102                    .or_insert_with(|| json!({}));
3103                current = current
3104                    .get_mut(segment)
3105                    .and_then(|v| v.as_object_mut())
3106                    .ok_or("Path collision: expected object")?;
3107            }
3108        }
3109
3110        Ok(false)
3111    }
3112
3113    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3114        let compiled = self.get_compiled_path(path);
3115        let segments = compiled.segments();
3116
3117        if !self.registers[object_reg].is_object() {
3118            self.registers[object_reg] = json!({});
3119        }
3120
3121        let obj = self.registers[object_reg]
3122            .as_object_mut()
3123            .ok_or("Not an object")?;
3124
3125        let mut current = obj;
3126        for (i, segment) in segments.iter().enumerate() {
3127            if i == segments.len() - 1 {
3128                // Get current value (default to 0 if null/missing)
3129                let current_val = current
3130                    .get(segment)
3131                    .and_then(|v| {
3132                        if v.is_null() {
3133                            None
3134                        } else {
3135                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3136                        }
3137                    })
3138                    .unwrap_or(0);
3139
3140                let incremented = current_val + 1;
3141                current.insert(segment.to_string(), json!(incremented));
3142                return Ok(true);
3143            } else {
3144                current
3145                    .entry(segment.to_string())
3146                    .or_insert_with(|| json!({}));
3147                current = current
3148                    .get_mut(segment)
3149                    .and_then(|v| v.as_object_mut())
3150                    .ok_or("Path collision: expected object")?;
3151            }
3152        }
3153
3154        Ok(false)
3155    }
3156
3157    fn set_field_min(
3158        &mut self,
3159        object_reg: Register,
3160        path: &str,
3161        value_reg: Register,
3162    ) -> Result<bool> {
3163        let compiled = self.get_compiled_path(path);
3164        let segments = compiled.segments();
3165        let new_value = self.registers[value_reg].clone();
3166
3167        if !self.registers[object_reg].is_object() {
3168            self.registers[object_reg] = json!({});
3169        }
3170
3171        let obj = self.registers[object_reg]
3172            .as_object_mut()
3173            .ok_or("Not an object")?;
3174
3175        let mut current = obj;
3176        for (i, segment) in segments.iter().enumerate() {
3177            if i == segments.len() - 1 {
3178                let should_update = if let Some(current_value) = current.get(segment) {
3179                    if current_value.is_null() {
3180                        true
3181                    } else {
3182                        match (current_value.as_i64(), new_value.as_i64()) {
3183                            (Some(current_val), Some(new_val)) => new_val < current_val,
3184                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3185                                (new_value.as_u64().unwrap() as i64) < current_val
3186                            }
3187                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3188                                new_val < current_value.as_u64().unwrap() as i64
3189                            }
3190                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3191                                (Some(current_val), Some(new_val)) => new_val < current_val,
3192                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3193                                    (Some(current_val), Some(new_val)) => new_val < current_val,
3194                                    _ => false,
3195                                },
3196                            },
3197                            _ => false,
3198                        }
3199                    }
3200                } else {
3201                    true
3202                };
3203
3204                if should_update {
3205                    current.insert(segment.to_string(), new_value);
3206                    return Ok(true);
3207                }
3208                return Ok(false);
3209            } else {
3210                current
3211                    .entry(segment.to_string())
3212                    .or_insert_with(|| json!({}));
3213                current = current
3214                    .get_mut(segment)
3215                    .and_then(|v| v.as_object_mut())
3216                    .ok_or("Path collision: expected object")?;
3217            }
3218        }
3219
3220        Ok(false)
3221    }
3222
3223    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3224        let compiled = self.get_compiled_path(path);
3225        let segments = compiled.segments();
3226        let mut current = &self.registers[object_reg];
3227
3228        for segment in segments {
3229            current = current
3230                .get(segment)
3231                .ok_or_else(|| format!("Field not found: {}", segment))?;
3232        }
3233
3234        Ok(current.clone())
3235    }
3236
3237    fn append_to_array(
3238        &mut self,
3239        object_reg: Register,
3240        path: &str,
3241        value_reg: Register,
3242        max_length: usize,
3243    ) -> Result<()> {
3244        let compiled = self.get_compiled_path(path);
3245        let segments = compiled.segments();
3246        let value = self.registers[value_reg].clone();
3247
3248        if !self.registers[object_reg].is_object() {
3249            self.registers[object_reg] = json!({});
3250        }
3251
3252        let obj = self.registers[object_reg]
3253            .as_object_mut()
3254            .ok_or("Not an object")?;
3255
3256        let mut current = obj;
3257        for (i, segment) in segments.iter().enumerate() {
3258            if i == segments.len() - 1 {
3259                current
3260                    .entry(segment.to_string())
3261                    .or_insert_with(|| json!([]));
3262                let arr = current
3263                    .get_mut(segment)
3264                    .and_then(|v| v.as_array_mut())
3265                    .ok_or("Path is not an array")?;
3266                arr.push(value.clone());
3267
3268                if arr.len() > max_length {
3269                    let excess = arr.len() - max_length;
3270                    arr.drain(0..excess);
3271                }
3272            } else {
3273                current
3274                    .entry(segment.to_string())
3275                    .or_insert_with(|| json!({}));
3276                current = current
3277                    .get_mut(segment)
3278                    .and_then(|v| v.as_object_mut())
3279                    .ok_or("Path collision: expected object")?;
3280            }
3281        }
3282
3283        Ok(())
3284    }
3285
3286    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3287        let value = &self.registers[reg];
3288        let transformed = Self::apply_transformation(value, transformation)?;
3289        self.registers[reg] = transformed;
3290        Ok(())
3291    }
3292
3293    fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3294        match transformation {
3295            Transformation::HexEncode => {
3296                if let Some(arr) = value.as_array() {
3297                    let bytes: Vec<u8> = arr
3298                        .iter()
3299                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3300                        .collect();
3301                    let hex = hex::encode(&bytes);
3302                    Ok(json!(hex))
3303                } else if value.is_string() {
3304                    Ok(value.clone())
3305                } else {
3306                    Err("HexEncode requires an array of numbers".into())
3307                }
3308            }
3309            Transformation::HexDecode => {
3310                if let Some(s) = value.as_str() {
3311                    let s = s.strip_prefix("0x").unwrap_or(s);
3312                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3313                    Ok(json!(bytes))
3314                } else {
3315                    Err("HexDecode requires a string".into())
3316                }
3317            }
3318            Transformation::Base58Encode => {
3319                if let Some(arr) = value.as_array() {
3320                    let bytes: Vec<u8> = arr
3321                        .iter()
3322                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3323                        .collect();
3324                    let encoded = bs58::encode(&bytes).into_string();
3325                    Ok(json!(encoded))
3326                } else if value.is_string() {
3327                    Ok(value.clone())
3328                } else {
3329                    Err("Base58Encode requires an array of numbers".into())
3330                }
3331            }
3332            Transformation::Base58Decode => {
3333                if let Some(s) = value.as_str() {
3334                    let bytes = bs58::decode(s)
3335                        .into_vec()
3336                        .map_err(|e| format!("Base58 decode error: {}", e))?;
3337                    Ok(json!(bytes))
3338                } else {
3339                    Err("Base58Decode requires a string".into())
3340                }
3341            }
3342            Transformation::ToString => Ok(json!(value.to_string())),
3343            Transformation::ToNumber => {
3344                if let Some(s) = value.as_str() {
3345                    let n = s
3346                        .parse::<i64>()
3347                        .map_err(|e| format!("Parse error: {}", e))?;
3348                    Ok(json!(n))
3349                } else {
3350                    Ok(value.clone())
3351                }
3352            }
3353        }
3354    }
3355
3356    fn evaluate_comparison(
3357        &self,
3358        field_value: &Value,
3359        op: &ComparisonOp,
3360        condition_value: &Value,
3361    ) -> Result<bool> {
3362        use ComparisonOp::*;
3363
3364        match op {
3365            Equal => Ok(field_value == condition_value),
3366            NotEqual => Ok(field_value != condition_value),
3367            GreaterThan => {
3368                // Try to compare as numbers
3369                match (field_value.as_i64(), condition_value.as_i64()) {
3370                    (Some(a), Some(b)) => Ok(a > b),
3371                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
3372                        (Some(a), Some(b)) => Ok(a > b),
3373                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
3374                            (Some(a), Some(b)) => Ok(a > b),
3375                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3376                        },
3377                    },
3378                }
3379            }
3380            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3381                (Some(a), Some(b)) => Ok(a >= b),
3382                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3383                    (Some(a), Some(b)) => Ok(a >= b),
3384                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3385                        (Some(a), Some(b)) => Ok(a >= b),
3386                        _ => {
3387                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3388                        }
3389                    },
3390                },
3391            },
3392            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3393                (Some(a), Some(b)) => Ok(a < b),
3394                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3395                    (Some(a), Some(b)) => Ok(a < b),
3396                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3397                        (Some(a), Some(b)) => Ok(a < b),
3398                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
3399                    },
3400                },
3401            },
3402            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3403                (Some(a), Some(b)) => Ok(a <= b),
3404                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3405                    (Some(a), Some(b)) => Ok(a <= b),
3406                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3407                        (Some(a), Some(b)) => Ok(a <= b),
3408                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3409                    },
3410                },
3411            },
3412        }
3413    }
3414
3415    fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3416        if let Some(state) = self.states.get(&state_id) {
3417            if let Some(index) = state.lookup_indexes.get(index_name) {
3418                if index.lookup(value).is_some() {
3419                    return true;
3420                }
3421            }
3422
3423            for (name, index) in state.lookup_indexes.iter() {
3424                if name == index_name {
3425                    continue;
3426                }
3427                if index.lookup(value).is_some() {
3428                    return true;
3429                }
3430            }
3431
3432            if let Some(pda_str) = value.as_str() {
3433                if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3434                    if pda_lookup.contains(pda_str) {
3435                        return true;
3436                    }
3437                }
3438            }
3439        }
3440
3441        false
3442    }
3443
3444    fn apply_deferred_when_op(
3445        &mut self,
3446        state_id: u32,
3447        op: &DeferredWhenOperation,
3448    ) -> Result<Vec<Mutation>> {
3449        let state = self.states.get(&state_id).ok_or("State not found")?;
3450
3451        if op.primary_key.is_null() {
3452            return Ok(vec![]);
3453        }
3454
3455        let mut entity_state = state
3456            .get_and_touch(&op.primary_key)
3457            .unwrap_or_else(|| json!({}));
3458
3459        Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3460
3461        state.insert_with_eviction(op.primary_key.clone(), entity_state);
3462
3463        if !op.emit {
3464            return Ok(vec![]);
3465        }
3466
3467        let mut patch = json!({});
3468        Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3469
3470        Ok(vec![Mutation {
3471            export: op.entity_name.clone(),
3472            key: op.primary_key.clone(),
3473            patch,
3474            append: vec![],
3475        }])
3476    }
3477
3478    fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3479        let parts: Vec<&str> = path.split('.').collect();
3480        let mut current = obj;
3481
3482        for (i, part) in parts.iter().enumerate() {
3483            if i == parts.len() - 1 {
3484                if let Some(map) = current.as_object_mut() {
3485                    map.insert(part.to_string(), value);
3486                    return Ok(());
3487                }
3488                return Err("Cannot set field on non-object".into());
3489            }
3490
3491            if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3492                if let Some(map) = current.as_object_mut() {
3493                    map.insert(part.to_string(), json!({}));
3494                }
3495            }
3496
3497            current = current.get_mut(*part).ok_or("Path navigation failed")?;
3498        }
3499
3500        Ok(())
3501    }
3502
3503    pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3504        let now = std::time::SystemTime::now()
3505            .duration_since(std::time::UNIX_EPOCH)
3506            .unwrap()
3507            .as_secs() as i64;
3508
3509        let state = match self.states.get(&state_id) {
3510            Some(s) => s,
3511            None => return 0,
3512        };
3513
3514        let mut removed = 0;
3515        state.deferred_when_ops.retain(|_, ops| {
3516            let before = ops.len();
3517            ops.retain(|op| now - op.deferred_at < max_age_secs);
3518            removed += before - ops.len();
3519            !ops.is_empty()
3520        });
3521
3522        removed
3523    }
3524
3525    /// Update a PDA reverse lookup and return pending updates for reprocessing.
3526    /// Returns any pending account updates that were queued for this PDA.
3527    /// ```ignore
3528    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3529    /// for update in pending {
3530    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
3531    /// }
3532    /// ```
3533    #[cfg_attr(feature = "otel", instrument(
3534        name = "vm.update_pda_lookup",
3535        skip(self),
3536        fields(
3537            pda = %pda_address,
3538            seed = %seed_value,
3539        )
3540    ))]
3541    pub fn update_pda_reverse_lookup(
3542        &mut self,
3543        state_id: u32,
3544        lookup_name: &str,
3545        pda_address: String,
3546        seed_value: String,
3547    ) -> Result<Vec<PendingAccountUpdate>> {
3548        let state = self
3549            .states
3550            .get_mut(&state_id)
3551            .ok_or("State table not found")?;
3552
3553        let lookup = state
3554            .pda_reverse_lookups
3555            .entry(lookup_name.to_string())
3556            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3557
3558        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
3559
3560        if let Some(ref evicted) = evicted_pda {
3561            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3562                let count = evicted_updates.len();
3563                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3564            }
3565        }
3566
3567        // Flush and return pending updates for this PDA
3568        self.flush_pending_updates(state_id, &pda_address)
3569    }
3570
3571    /// Clean up expired pending updates that are older than the TTL
3572    ///
3573    /// Returns the number of updates that were removed.
3574    /// This should be called periodically to prevent memory leaks from orphaned updates.
3575    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3576        let state = match self.states.get_mut(&state_id) {
3577            Some(s) => s,
3578            None => return 0,
3579        };
3580
3581        let now = std::time::SystemTime::now()
3582            .duration_since(std::time::UNIX_EPOCH)
3583            .unwrap()
3584            .as_secs() as i64;
3585
3586        let mut removed_count = 0;
3587
3588        // Iterate through all pending updates and remove expired ones
3589        state.pending_updates.retain(|_pda_address, updates| {
3590            let original_len = updates.len();
3591
3592            updates.retain(|update| {
3593                let age = now - update.queued_at;
3594                age <= PENDING_UPDATE_TTL_SECONDS
3595            });
3596
3597            removed_count += original_len - updates.len();
3598
3599            // Remove the entry entirely if no updates remain
3600            !updates.is_empty()
3601        });
3602
3603        // Update the global counter
3604        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3605
3606        if removed_count > 0 {
3607            #[cfg(feature = "otel")]
3608            crate::vm_metrics::record_pending_updates_expired(
3609                removed_count as u64,
3610                &state.entity_name,
3611            );
3612        }
3613
3614        removed_count
3615    }
3616
3617    /// Queue an account update for later processing when PDA reverse lookup is not yet available
3618    ///
3619    /// # Workflow
3620    ///
3621    /// This implements a deferred processing pattern for account updates when the PDA reverse
3622    /// lookup needed to resolve the primary key is not yet available:
3623    ///
3624    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
3625    ///    is not available, call `queue_account_update()` to queue it for later.
3626    ///
3627    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
3628    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
3629    ///
3630    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
3631    ///    ```ignore
3632    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3633    ///    for update in pending {
3634    ///        let mutations = vm.process_event(
3635    ///            &bytecode, update.account_data, &update.account_type, None, None
3636    ///        )?;
3637    ///    }
3638    ///    ```
3639    ///
3640    /// # Arguments
3641    ///
3642    /// * `state_id` - The state table ID
3643    /// * `pda_address` - The PDA address that needs reverse lookup
3644    /// * `account_type` - The event type name for reprocessing
3645    /// * `account_data` - The account data (event value) for reprocessing
3646    /// * `slot` - The slot number when this update occurred
3647    /// * `signature` - The transaction signature
3648    #[cfg_attr(feature = "otel", instrument(
3649        name = "vm.queue_account_update",
3650        skip(self, update),
3651        fields(
3652            pda = %update.pda_address,
3653            account_type = %update.account_type,
3654            slot = update.slot,
3655        )
3656    ))]
3657    pub fn queue_account_update(
3658        &mut self,
3659        state_id: u32,
3660        update: QueuedAccountUpdate,
3661    ) -> Result<()> {
3662        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3663            self.cleanup_expired_pending_updates(state_id);
3664            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3665                self.drop_oldest_pending_update(state_id)?;
3666            }
3667        }
3668
3669        let state = self
3670            .states
3671            .get_mut(&state_id)
3672            .ok_or("State table not found")?;
3673
3674        let pending = PendingAccountUpdate {
3675            account_type: update.account_type,
3676            pda_address: update.pda_address.clone(),
3677            account_data: update.account_data,
3678            slot: update.slot,
3679            write_version: update.write_version,
3680            signature: update.signature,
3681            queued_at: std::time::SystemTime::now()
3682                .duration_since(std::time::UNIX_EPOCH)
3683                .unwrap()
3684                .as_secs() as i64,
3685        };
3686
3687        let pda_address = pending.pda_address.clone();
3688        let slot = pending.slot;
3689
3690        let mut updates = state
3691            .pending_updates
3692            .entry(pda_address.clone())
3693            .or_insert_with(Vec::new);
3694
3695        let original_len = updates.len();
3696        updates.retain(|existing| existing.slot > slot);
3697        let removed_by_dedup = original_len - updates.len();
3698
3699        if removed_by_dedup > 0 {
3700            self.pending_queue_size = self
3701                .pending_queue_size
3702                .saturating_sub(removed_by_dedup as u64);
3703        }
3704
3705        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3706            updates.remove(0);
3707            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3708        }
3709
3710        updates.push(pending);
3711        #[cfg(feature = "otel")]
3712        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3713
3714        Ok(())
3715    }
3716
3717    pub fn queue_instruction_event(
3718        &mut self,
3719        state_id: u32,
3720        event: QueuedInstructionEvent,
3721    ) -> Result<()> {
3722        let state = self
3723            .states
3724            .get_mut(&state_id)
3725            .ok_or("State table not found")?;
3726
3727        let pda_address = event.pda_address.clone();
3728
3729        let pending = PendingInstructionEvent {
3730            event_type: event.event_type,
3731            pda_address: event.pda_address,
3732            event_data: event.event_data,
3733            slot: event.slot,
3734            signature: event.signature,
3735            queued_at: std::time::SystemTime::now()
3736                .duration_since(std::time::UNIX_EPOCH)
3737                .unwrap()
3738                .as_secs() as i64,
3739        };
3740
3741        let mut events = state
3742            .pending_instruction_events
3743            .entry(pda_address)
3744            .or_insert_with(Vec::new);
3745
3746        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3747            events.remove(0);
3748        }
3749
3750        events.push(pending);
3751
3752        Ok(())
3753    }
3754
3755    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
3756        self.last_pda_lookup_miss.take()
3757    }
3758
3759    pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
3760        self.last_lookup_index_miss.take()
3761    }
3762
3763    pub fn take_last_pda_registered(&mut self) -> Option<String> {
3764        self.last_pda_registered.take()
3765    }
3766
3767    pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
3768        std::mem::take(&mut self.last_lookup_index_keys)
3769    }
3770
3771    pub fn flush_pending_instruction_events(
3772        &mut self,
3773        state_id: u32,
3774        pda_address: &str,
3775    ) -> Vec<PendingInstructionEvent> {
3776        let state = match self.states.get_mut(&state_id) {
3777            Some(s) => s,
3778            None => return Vec::new(),
3779        };
3780
3781        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
3782            events
3783        } else {
3784            Vec::new()
3785        }
3786    }
3787
3788    /// Get statistics about the pending queue for monitoring
3789    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
3790        let state = self.states.get(&state_id)?;
3791
3792        let now = std::time::SystemTime::now()
3793            .duration_since(std::time::UNIX_EPOCH)
3794            .unwrap()
3795            .as_secs() as i64;
3796
3797        let mut total_updates = 0;
3798        let mut oldest_timestamp = now;
3799        let mut largest_pda_queue = 0;
3800        let mut estimated_memory = 0;
3801
3802        for entry in state.pending_updates.iter() {
3803            let (_, updates) = entry.pair();
3804            total_updates += updates.len();
3805            largest_pda_queue = largest_pda_queue.max(updates.len());
3806
3807            for update in updates.iter() {
3808                oldest_timestamp = oldest_timestamp.min(update.queued_at);
3809                // Rough memory estimate
3810                estimated_memory += update.account_type.len() +
3811                                   update.pda_address.len() +
3812                                   update.signature.len() +
3813                                   16 + // slot + queued_at
3814                                   estimate_json_size(&update.account_data);
3815            }
3816        }
3817
3818        Some(PendingQueueStats {
3819            total_updates,
3820            unique_pdas: state.pending_updates.len(),
3821            oldest_age_seconds: now - oldest_timestamp,
3822            largest_pda_queue_size: largest_pda_queue,
3823            estimated_memory_bytes: estimated_memory,
3824        })
3825    }
3826
3827    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
3828        let mut stats = VmMemoryStats {
3829            path_cache_size: self.path_cache.len(),
3830            ..Default::default()
3831        };
3832
3833        if let Some(state) = self.states.get(&state_id) {
3834            stats.state_table_entity_count = state.data.len();
3835            stats.state_table_max_entries = state.config.max_entries;
3836            stats.state_table_at_capacity = state.is_at_capacity();
3837
3838            stats.lookup_index_count = state.lookup_indexes.len();
3839            stats.lookup_index_total_entries =
3840                state.lookup_indexes.values().map(|idx| idx.len()).sum();
3841
3842            stats.temporal_index_count = state.temporal_indexes.len();
3843            stats.temporal_index_total_entries = state
3844                .temporal_indexes
3845                .values()
3846                .map(|idx| idx.total_entries())
3847                .sum();
3848
3849            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
3850            stats.pda_reverse_lookup_total_entries = state
3851                .pda_reverse_lookups
3852                .values()
3853                .map(|lookup| lookup.len())
3854                .sum();
3855
3856            stats.version_tracker_entries = state.version_tracker.len();
3857
3858            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
3859        }
3860
3861        stats
3862    }
3863
3864    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
3865        let pending_removed = self.cleanup_expired_pending_updates(state_id);
3866        let temporal_removed = self.cleanup_temporal_indexes(state_id);
3867
3868        #[cfg(feature = "otel")]
3869        if let Some(state) = self.states.get(&state_id) {
3870            crate::vm_metrics::record_cleanup(
3871                pending_removed,
3872                temporal_removed,
3873                &state.entity_name,
3874            );
3875        }
3876
3877        CleanupResult {
3878            pending_updates_removed: pending_removed,
3879            temporal_entries_removed: temporal_removed,
3880        }
3881    }
3882
3883    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
3884        let state = match self.states.get_mut(&state_id) {
3885            Some(s) => s,
3886            None => return 0,
3887        };
3888
3889        let now = std::time::SystemTime::now()
3890            .duration_since(std::time::UNIX_EPOCH)
3891            .unwrap()
3892            .as_secs() as i64;
3893
3894        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
3895        let mut total_removed = 0;
3896
3897        for (_, index) in state.temporal_indexes.iter_mut() {
3898            total_removed += index.cleanup_expired(cutoff);
3899        }
3900
3901        total_removed
3902    }
3903
3904    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
3905        let state = self.states.get(&state_id)?;
3906
3907        if state.is_at_capacity() {
3908            Some(CapacityWarning {
3909                current_entries: state.data.len(),
3910                max_entries: state.config.max_entries,
3911                entries_over_limit: state.entries_over_limit(),
3912            })
3913        } else {
3914            None
3915        }
3916    }
3917
3918    /// Drop the oldest pending update across all PDAs
3919    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
3920        let state = self
3921            .states
3922            .get_mut(&state_id)
3923            .ok_or("State table not found")?;
3924
3925        let mut oldest_pda: Option<String> = None;
3926        let mut oldest_timestamp = i64::MAX;
3927
3928        // Find the PDA with the oldest update
3929        for entry in state.pending_updates.iter() {
3930            let (pda, updates) = entry.pair();
3931            if let Some(update) = updates.first() {
3932                if update.queued_at < oldest_timestamp {
3933                    oldest_timestamp = update.queued_at;
3934                    oldest_pda = Some(pda.clone());
3935                }
3936            }
3937        }
3938
3939        // Remove the oldest update
3940        if let Some(pda) = oldest_pda {
3941            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
3942                if !updates.is_empty() {
3943                    updates.remove(0);
3944                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3945
3946                    // Remove the entry if it's now empty
3947                    if updates.is_empty() {
3948                        drop(updates);
3949                        state.pending_updates.remove(&pda);
3950                    }
3951                }
3952            }
3953        }
3954
3955        Ok(())
3956    }
3957
3958    /// Flush and return pending updates for a PDA for external reprocessing
3959    ///
3960    /// Returns the pending updates that were queued for this PDA address.
3961    /// The caller should reprocess these through the VM using process_event().
3962    fn flush_pending_updates(
3963        &mut self,
3964        state_id: u32,
3965        pda_address: &str,
3966    ) -> Result<Vec<PendingAccountUpdate>> {
3967        let state = self
3968            .states
3969            .get_mut(&state_id)
3970            .ok_or("State table not found")?;
3971
3972        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
3973            let count = pending_updates.len();
3974            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3975            #[cfg(feature = "otel")]
3976            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
3977            Ok(pending_updates)
3978        } else {
3979            Ok(Vec::new())
3980        }
3981    }
3982
3983    /// Try to resolve a primary key via PDA reverse lookup
3984    pub fn try_pda_reverse_lookup(
3985        &mut self,
3986        state_id: u32,
3987        lookup_name: &str,
3988        pda_address: &str,
3989    ) -> Option<String> {
3990        let state = self.states.get_mut(&state_id)?;
3991
3992        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
3993            if let Some(value) = lookup.lookup(pda_address) {
3994                self.pda_cache_hits += 1;
3995                return Some(value);
3996            }
3997        }
3998
3999        self.pda_cache_misses += 1;
4000        None
4001    }
4002
4003    // ============================================================================
4004    // Computed Expression Evaluator (Task 5)
4005    // ============================================================================
4006
4007    /// Evaluate a computed expression AST against the current state
4008    /// This is the core runtime evaluator for computed fields from the AST
4009    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4010        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4011    }
4012
4013    /// Evaluate a computed expression with a variable environment (for let bindings)
4014    fn evaluate_computed_expr_with_env(
4015        &self,
4016        expr: &ComputedExpr,
4017        state: &Value,
4018        env: &std::collections::HashMap<String, Value>,
4019    ) -> Result<Value> {
4020        match expr {
4021            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4022
4023            ComputedExpr::Var { name } => env
4024                .get(name)
4025                .cloned()
4026                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4027
4028            ComputedExpr::Let { name, value, body } => {
4029                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4030                let mut new_env = env.clone();
4031                new_env.insert(name.clone(), val);
4032                self.evaluate_computed_expr_with_env(body, state, &new_env)
4033            }
4034
4035            ComputedExpr::If {
4036                condition,
4037                then_branch,
4038                else_branch,
4039            } => {
4040                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4041                if self.value_to_bool(&cond_val) {
4042                    self.evaluate_computed_expr_with_env(then_branch, state, env)
4043                } else {
4044                    self.evaluate_computed_expr_with_env(else_branch, state, env)
4045                }
4046            }
4047
4048            ComputedExpr::None => Ok(Value::Null),
4049
4050            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4051
4052            ComputedExpr::Slice { expr, start, end } => {
4053                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4054                match val {
4055                    Value::Array(arr) => {
4056                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4057                        Ok(Value::Array(slice))
4058                    }
4059                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4060                }
4061            }
4062
4063            ComputedExpr::Index { expr, index } => {
4064                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4065                match val {
4066                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4067                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4068                }
4069            }
4070
4071            ComputedExpr::U64FromLeBytes { bytes } => {
4072                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4073                let byte_vec = self.value_to_bytes(&val)?;
4074                if byte_vec.len() < 8 {
4075                    return Err(format!(
4076                        "u64::from_le_bytes requires 8 bytes, got {}",
4077                        byte_vec.len()
4078                    )
4079                    .into());
4080                }
4081                let arr: [u8; 8] = byte_vec[..8]
4082                    .try_into()
4083                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4084                Ok(json!(u64::from_le_bytes(arr)))
4085            }
4086
4087            ComputedExpr::U64FromBeBytes { bytes } => {
4088                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4089                let byte_vec = self.value_to_bytes(&val)?;
4090                if byte_vec.len() < 8 {
4091                    return Err(format!(
4092                        "u64::from_be_bytes requires 8 bytes, got {}",
4093                        byte_vec.len()
4094                    )
4095                    .into());
4096                }
4097                let arr: [u8; 8] = byte_vec[..8]
4098                    .try_into()
4099                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4100                Ok(json!(u64::from_be_bytes(arr)))
4101            }
4102
4103            ComputedExpr::ByteArray { bytes } => {
4104                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4105            }
4106
4107            ComputedExpr::Closure { param, body } => {
4108                // Closures are stored as-is; they're evaluated when used in map()
4109                // Return a special representation
4110                Ok(json!({
4111                    "__closure": {
4112                        "param": param,
4113                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
4114                    }
4115                }))
4116            }
4117
4118            ComputedExpr::Unary { op, expr } => {
4119                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4120                self.apply_unary_op(op, &val)
4121            }
4122
4123            ComputedExpr::JsonToBytes { expr } => {
4124                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4125                // Convert JSON array of numbers to byte array
4126                let bytes = self.value_to_bytes(&val)?;
4127                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4128            }
4129
4130            ComputedExpr::UnwrapOr { expr, default } => {
4131                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4132                if val.is_null() {
4133                    Ok(default.clone())
4134                } else {
4135                    Ok(val)
4136                }
4137            }
4138
4139            ComputedExpr::Binary { op, left, right } => {
4140                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4141                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4142                self.apply_binary_op(op, &l, &r)
4143            }
4144
4145            ComputedExpr::Cast { expr, to_type } => {
4146                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4147                self.apply_cast(&val, to_type)
4148            }
4149
4150            ComputedExpr::ResolverComputed {
4151                resolver,
4152                method,
4153                args,
4154            } => {
4155                let evaluated_args: Vec<Value> = args
4156                    .iter()
4157                    .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4158                    .collect::<Result<Vec<_>>>()?;
4159                crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4160            }
4161
4162            ComputedExpr::MethodCall { expr, method, args } => {
4163                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4164                // Special handling for map() with closures
4165                if method == "map" && args.len() == 1 {
4166                    if let ComputedExpr::Closure { param, body } = &args[0] {
4167                        // If the value is null, return null (Option::None.map returns None)
4168                        if val.is_null() {
4169                            return Ok(Value::Null);
4170                        }
4171
4172                        if let Value::Array(arr) = &val {
4173                            let results: Result<Vec<Value>> = arr
4174                                .iter()
4175                                .map(|elem| {
4176                                    let mut closure_env = env.clone();
4177                                    closure_env.insert(param.clone(), elem.clone());
4178                                    self.evaluate_computed_expr_with_env(body, state, &closure_env)
4179                                })
4180                                .collect();
4181                            return Ok(Value::Array(results?));
4182                        }
4183
4184                        let mut closure_env = env.clone();
4185                        closure_env.insert(param.clone(), val);
4186                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4187                    }
4188                }
4189                let evaluated_args: Vec<Value> = args
4190                    .iter()
4191                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4192                    .collect::<Result<Vec<_>>>()?;
4193                self.apply_method_call(&val, method, &evaluated_args)
4194            }
4195
4196            ComputedExpr::Literal { value } => Ok(value.clone()),
4197
4198            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4199
4200            ComputedExpr::ContextSlot => Ok(self
4201                .current_context
4202                .as_ref()
4203                .and_then(|ctx| ctx.slot)
4204                .map(|s| json!(s))
4205                .unwrap_or(Value::Null)),
4206
4207            ComputedExpr::ContextTimestamp => Ok(self
4208                .current_context
4209                .as_ref()
4210                .map(|ctx| json!(ctx.timestamp()))
4211                .unwrap_or(Value::Null)),
4212        }
4213    }
4214
4215    /// Convert a JSON value to a byte vector
4216    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4217        match val {
4218            Value::Array(arr) => arr
4219                .iter()
4220                .map(|v| {
4221                    v.as_u64()
4222                        .map(|n| n as u8)
4223                        .ok_or_else(|| "Array element not a valid byte".into())
4224                })
4225                .collect(),
4226            Value::String(s) => {
4227                // Try to decode as hex
4228                if s.starts_with("0x") || s.starts_with("0X") {
4229                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4230                } else {
4231                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4232                }
4233            }
4234            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4235        }
4236    }
4237
4238    /// Apply a unary operation
4239    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4240        use crate::ast::UnaryOp;
4241        match op {
4242            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4243            UnaryOp::ReverseBits => match val.as_u64() {
4244                Some(n) => Ok(json!(n.reverse_bits())),
4245                None => match val.as_i64() {
4246                    Some(n) => Ok(json!((n as u64).reverse_bits())),
4247                    None => Err("reverse_bits requires an integer".into()),
4248                },
4249            },
4250        }
4251    }
4252
4253    /// Get a field value from state by path (e.g., "section.field" or just "field")
4254    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4255        let segments: Vec<&str> = path.split('.').collect();
4256        let mut current = state;
4257
4258        for segment in segments {
4259            match current.get(segment) {
4260                Some(v) => current = v,
4261                None => return Ok(Value::Null),
4262            }
4263        }
4264
4265        Ok(current.clone())
4266    }
4267
4268    /// Apply a binary operation to two values
4269    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4270        match op {
4271            // Arithmetic operations
4272            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4273            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4274            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4275            BinaryOp::Div => {
4276                // Check for division by zero
4277                if let Some(r) = right.as_i64() {
4278                    if r == 0 {
4279                        return Err("Division by zero".into());
4280                    }
4281                }
4282                if let Some(r) = right.as_f64() {
4283                    if r == 0.0 {
4284                        return Err("Division by zero".into());
4285                    }
4286                }
4287                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4288            }
4289            BinaryOp::Mod => {
4290                // Modulo - only for integers
4291                match (left.as_i64(), right.as_i64()) {
4292                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4293                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4294                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4295                        _ => Err("Modulo requires non-zero integer operands".into()),
4296                    },
4297                    _ => Err("Modulo by zero".into()),
4298                }
4299            }
4300
4301            // Comparison operations
4302            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4303            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4304            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4305            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4306            BinaryOp::Eq => Ok(json!(left == right)),
4307            BinaryOp::Ne => Ok(json!(left != right)),
4308
4309            // Logical operations
4310            BinaryOp::And => {
4311                let l_bool = self.value_to_bool(left);
4312                let r_bool = self.value_to_bool(right);
4313                Ok(json!(l_bool && r_bool))
4314            }
4315            BinaryOp::Or => {
4316                let l_bool = self.value_to_bool(left);
4317                let r_bool = self.value_to_bool(right);
4318                Ok(json!(l_bool || r_bool))
4319            }
4320
4321            // Bitwise operations
4322            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4323                (Some(a), Some(b)) => Ok(json!(a ^ b)),
4324                _ => match (left.as_i64(), right.as_i64()) {
4325                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
4326                    _ => Err("XOR requires integer operands".into()),
4327                },
4328            },
4329            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4330                (Some(a), Some(b)) => Ok(json!(a & b)),
4331                _ => match (left.as_i64(), right.as_i64()) {
4332                    (Some(a), Some(b)) => Ok(json!(a & b)),
4333                    _ => Err("BitAnd requires integer operands".into()),
4334                },
4335            },
4336            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4337                (Some(a), Some(b)) => Ok(json!(a | b)),
4338                _ => match (left.as_i64(), right.as_i64()) {
4339                    (Some(a), Some(b)) => Ok(json!(a | b)),
4340                    _ => Err("BitOr requires integer operands".into()),
4341                },
4342            },
4343            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4344                (Some(a), Some(b)) => Ok(json!(a << b)),
4345                _ => match (left.as_i64(), right.as_i64()) {
4346                    (Some(a), Some(b)) => Ok(json!(a << b)),
4347                    _ => Err("Shl requires integer operands".into()),
4348                },
4349            },
4350            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4351                (Some(a), Some(b)) => Ok(json!(a >> b)),
4352                _ => match (left.as_i64(), right.as_i64()) {
4353                    (Some(a), Some(b)) => Ok(json!(a >> b)),
4354                    _ => Err("Shr requires integer operands".into()),
4355                },
4356            },
4357        }
4358    }
4359
4360    /// Helper for numeric operations that can work on integers or floats
4361    fn numeric_op<F1, F2>(
4362        &self,
4363        left: &Value,
4364        right: &Value,
4365        int_op: F1,
4366        float_op: F2,
4367    ) -> Result<Value>
4368    where
4369        F1: Fn(i64, i64) -> i64,
4370        F2: Fn(f64, f64) -> f64,
4371    {
4372        // Try i64 first
4373        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4374            return Ok(json!(int_op(a, b)));
4375        }
4376
4377        // Try u64
4378        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4379            // For u64, we need to be careful with underflow in subtraction
4380            return Ok(json!(int_op(a as i64, b as i64)));
4381        }
4382
4383        // Try f64
4384        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4385            return Ok(json!(float_op(a, b)));
4386        }
4387
4388        // If either is null, return null
4389        if left.is_null() || right.is_null() {
4390            return Ok(Value::Null);
4391        }
4392
4393        Err(format!(
4394            "Cannot perform numeric operation on {:?} and {:?}",
4395            left, right
4396        )
4397        .into())
4398    }
4399
4400    /// Helper for comparison operations
4401    fn comparison_op<F1, F2>(
4402        &self,
4403        left: &Value,
4404        right: &Value,
4405        int_cmp: F1,
4406        float_cmp: F2,
4407    ) -> Result<Value>
4408    where
4409        F1: Fn(i64, i64) -> bool,
4410        F2: Fn(f64, f64) -> bool,
4411    {
4412        // Try i64 first
4413        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4414            return Ok(json!(int_cmp(a, b)));
4415        }
4416
4417        // Try u64
4418        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4419            return Ok(json!(int_cmp(a as i64, b as i64)));
4420        }
4421
4422        // Try f64
4423        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4424            return Ok(json!(float_cmp(a, b)));
4425        }
4426
4427        // If either is null, comparison returns false
4428        if left.is_null() || right.is_null() {
4429            return Ok(json!(false));
4430        }
4431
4432        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4433    }
4434
4435    /// Convert a value to boolean for logical operations
4436    fn value_to_bool(&self, value: &Value) -> bool {
4437        match value {
4438            Value::Null => false,
4439            Value::Bool(b) => *b,
4440            Value::Number(n) => {
4441                if let Some(i) = n.as_i64() {
4442                    i != 0
4443                } else if let Some(f) = n.as_f64() {
4444                    f != 0.0
4445                } else {
4446                    true
4447                }
4448            }
4449            Value::String(s) => !s.is_empty(),
4450            Value::Array(arr) => !arr.is_empty(),
4451            Value::Object(obj) => !obj.is_empty(),
4452        }
4453    }
4454
4455    /// Apply a type cast to a value
4456    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4457        match to_type {
4458            "i8" | "i16" | "i32" | "i64" | "isize" => {
4459                if let Some(n) = value.as_i64() {
4460                    Ok(json!(n))
4461                } else if let Some(n) = value.as_u64() {
4462                    Ok(json!(n as i64))
4463                } else if let Some(n) = value.as_f64() {
4464                    Ok(json!(n as i64))
4465                } else if let Some(s) = value.as_str() {
4466                    s.parse::<i64>()
4467                        .map(|n| json!(n))
4468                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4469                } else {
4470                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4471                }
4472            }
4473            "u8" | "u16" | "u32" | "u64" | "usize" => {
4474                if let Some(n) = value.as_u64() {
4475                    Ok(json!(n))
4476                } else if let Some(n) = value.as_i64() {
4477                    Ok(json!(n as u64))
4478                } else if let Some(n) = value.as_f64() {
4479                    Ok(json!(n as u64))
4480                } else if let Some(s) = value.as_str() {
4481                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4482                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4483                    })
4484                } else {
4485                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4486                }
4487            }
4488            "f32" | "f64" => {
4489                if let Some(n) = value.as_f64() {
4490                    Ok(json!(n))
4491                } else if let Some(n) = value.as_i64() {
4492                    Ok(json!(n as f64))
4493                } else if let Some(n) = value.as_u64() {
4494                    Ok(json!(n as f64))
4495                } else if let Some(s) = value.as_str() {
4496                    s.parse::<f64>()
4497                        .map(|n| json!(n))
4498                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4499                } else {
4500                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4501                }
4502            }
4503            "String" | "string" => Ok(json!(value.to_string())),
4504            "bool" => Ok(json!(self.value_to_bool(value))),
4505            _ => {
4506                // Unknown type, return value as-is
4507                Ok(value.clone())
4508            }
4509        }
4510    }
4511
4512    /// Apply a method call to a value
4513    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4514        match method {
4515            "unwrap_or" => {
4516                if value.is_null() && !args.is_empty() {
4517                    Ok(args[0].clone())
4518                } else {
4519                    Ok(value.clone())
4520                }
4521            }
4522            "unwrap_or_default" => {
4523                if value.is_null() {
4524                    // Return default for common types
4525                    Ok(json!(0))
4526                } else {
4527                    Ok(value.clone())
4528                }
4529            }
4530            "is_some" => Ok(json!(!value.is_null())),
4531            "is_none" => Ok(json!(value.is_null())),
4532            "abs" => {
4533                if let Some(n) = value.as_i64() {
4534                    Ok(json!(n.abs()))
4535                } else if let Some(n) = value.as_f64() {
4536                    Ok(json!(n.abs()))
4537                } else {
4538                    Err(format!("Cannot call abs() on {:?}", value).into())
4539                }
4540            }
4541            "len" => {
4542                if let Some(s) = value.as_str() {
4543                    Ok(json!(s.len()))
4544                } else if let Some(arr) = value.as_array() {
4545                    Ok(json!(arr.len()))
4546                } else if let Some(obj) = value.as_object() {
4547                    Ok(json!(obj.len()))
4548                } else {
4549                    Err(format!("Cannot call len() on {:?}", value).into())
4550                }
4551            }
4552            "to_string" => Ok(json!(value.to_string())),
4553            "min" => {
4554                if args.is_empty() {
4555                    return Err("min() requires an argument".into());
4556                }
4557                let other = &args[0];
4558                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4559                    Ok(json!(a.min(b)))
4560                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4561                    Ok(json!(a.min(b)))
4562                } else {
4563                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4564                }
4565            }
4566            "max" => {
4567                if args.is_empty() {
4568                    return Err("max() requires an argument".into());
4569                }
4570                let other = &args[0];
4571                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4572                    Ok(json!(a.max(b)))
4573                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4574                    Ok(json!(a.max(b)))
4575                } else {
4576                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4577                }
4578            }
4579            "saturating_add" => {
4580                if args.is_empty() {
4581                    return Err("saturating_add() requires an argument".into());
4582                }
4583                let other = &args[0];
4584                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4585                    Ok(json!(a.saturating_add(b)))
4586                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4587                    Ok(json!(a.saturating_add(b)))
4588                } else {
4589                    Err(format!(
4590                        "Cannot call saturating_add() on {:?} and {:?}",
4591                        value, other
4592                    )
4593                    .into())
4594                }
4595            }
4596            "saturating_sub" => {
4597                if args.is_empty() {
4598                    return Err("saturating_sub() requires an argument".into());
4599                }
4600                let other = &args[0];
4601                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4602                    Ok(json!(a.saturating_sub(b)))
4603                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4604                    Ok(json!(a.saturating_sub(b)))
4605                } else {
4606                    Err(format!(
4607                        "Cannot call saturating_sub() on {:?} and {:?}",
4608                        value, other
4609                    )
4610                    .into())
4611                }
4612            }
4613            _ => Err(format!("Unknown method call: {}()", method).into()),
4614        }
4615    }
4616
4617    /// Evaluate all computed fields for an entity and update the state
4618    /// This takes a list of ComputedFieldSpec from the AST and applies them
4619    pub fn evaluate_computed_fields_from_ast(
4620        &self,
4621        state: &mut Value,
4622        computed_field_specs: &[ComputedFieldSpec],
4623    ) -> Result<Vec<String>> {
4624        let mut updated_paths = Vec::new();
4625
4626        crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
4627
4628        for spec in computed_field_specs {
4629            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
4630                self.set_field_in_state(state, &spec.target_path, result)?;
4631                updated_paths.push(spec.target_path.clone());
4632            }
4633        }
4634
4635        Ok(updated_paths)
4636    }
4637
4638    /// Set a field value in state by path (e.g., "section.field")
4639    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
4640        let segments: Vec<&str> = path.split('.').collect();
4641
4642        if segments.is_empty() {
4643            return Err("Empty path".into());
4644        }
4645
4646        // Navigate to parent, creating intermediate objects as needed
4647        let mut current = state;
4648        for (i, segment) in segments.iter().enumerate() {
4649            if i == segments.len() - 1 {
4650                // Last segment - set the value
4651                if let Some(obj) = current.as_object_mut() {
4652                    obj.insert(segment.to_string(), value);
4653                    return Ok(());
4654                } else {
4655                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
4656                }
4657            } else {
4658                // Intermediate segment - navigate or create
4659                if !current.is_object() {
4660                    *current = json!({});
4661                }
4662                let obj = current.as_object_mut().unwrap();
4663                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
4664            }
4665        }
4666
4667        Ok(())
4668    }
4669
4670    /// Create a computed fields evaluator closure from AST specs
4671    /// This returns a function that can be passed to the bytecode builder
4672    pub fn create_evaluator_from_specs(
4673        specs: Vec<ComputedFieldSpec>,
4674    ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
4675        move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
4676            let mut vm = VmContext::new();
4677            vm.current_context = Some(UpdateContext {
4678                slot: context_slot,
4679                timestamp: Some(context_timestamp),
4680                ..Default::default()
4681            });
4682            vm.evaluate_computed_fields_from_ast(state, &specs)?;
4683            Ok(())
4684        }
4685    }
4686}
4687
4688impl Default for VmContext {
4689    fn default() -> Self {
4690        Self::new()
4691    }
4692}
4693
4694// Implement the ReverseLookupUpdater trait for VmContext
4695impl crate::resolvers::ReverseLookupUpdater for VmContext {
4696    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
4697        // Use default state_id=0 and default lookup name
4698        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
4699            .unwrap_or_else(|e| {
4700                tracing::error!("Failed to update PDA reverse lookup: {}", e);
4701                Vec::new()
4702            })
4703    }
4704
4705    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
4706        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
4707        self.flush_pending_updates(0, pda_address)
4708            .unwrap_or_else(|e| {
4709                tracing::error!("Failed to flush pending updates: {}", e);
4710                Vec::new()
4711            })
4712    }
4713}
4714
4715#[cfg(test)]
4716mod tests {
4717    use super::*;
4718    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
4719
4720    #[test]
4721    fn test_computed_field_preserves_integer_type() {
4722        let vm = VmContext::new();
4723
4724        let mut state = serde_json::json!({
4725            "trading": {
4726                "total_buy_volume": 20000000000_i64,
4727                "total_sell_volume": 17951316474_i64
4728            }
4729        });
4730
4731        let spec = ComputedFieldSpec {
4732            target_path: "trading.total_volume".to_string(),
4733            result_type: "Option<u64>".to_string(),
4734            expression: ComputedExpr::Binary {
4735                op: BinaryOp::Add,
4736                left: Box::new(ComputedExpr::UnwrapOr {
4737                    expr: Box::new(ComputedExpr::FieldRef {
4738                        path: "trading.total_buy_volume".to_string(),
4739                    }),
4740                    default: serde_json::json!(0),
4741                }),
4742                right: Box::new(ComputedExpr::UnwrapOr {
4743                    expr: Box::new(ComputedExpr::FieldRef {
4744                        path: "trading.total_sell_volume".to_string(),
4745                    }),
4746                    default: serde_json::json!(0),
4747                }),
4748            },
4749        };
4750
4751        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
4752            .unwrap();
4753
4754        let total_volume = state
4755            .get("trading")
4756            .and_then(|t| t.get("total_volume"))
4757            .expect("total_volume should exist");
4758
4759        let serialized = serde_json::to_string(total_volume).unwrap();
4760        assert!(
4761            !serialized.contains('.'),
4762            "Integer should not have decimal point: {}",
4763            serialized
4764        );
4765        assert_eq!(
4766            total_volume.as_i64(),
4767            Some(37951316474),
4768            "Value should be correct sum"
4769        );
4770    }
4771
4772    #[test]
4773    fn test_set_field_sum_preserves_integer_type() {
4774        let mut vm = VmContext::new();
4775        vm.registers[0] = serde_json::json!({});
4776        vm.registers[1] = serde_json::json!(20000000000_i64);
4777        vm.registers[2] = serde_json::json!(17951316474_i64);
4778
4779        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
4780        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
4781
4782        let state = &vm.registers[0];
4783        let buy_vol = state
4784            .get("trading")
4785            .and_then(|t| t.get("total_buy_volume"))
4786            .unwrap();
4787        let sell_vol = state
4788            .get("trading")
4789            .and_then(|t| t.get("total_sell_volume"))
4790            .unwrap();
4791
4792        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
4793        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
4794
4795        assert!(
4796            !buy_serialized.contains('.'),
4797            "Buy volume should not have decimal: {}",
4798            buy_serialized
4799        );
4800        assert!(
4801            !sell_serialized.contains('.'),
4802            "Sell volume should not have decimal: {}",
4803            sell_serialized
4804        );
4805    }
4806
4807    #[test]
4808    fn test_lookup_index_chaining() {
4809        let mut vm = VmContext::new();
4810
4811        let state = vm.states.get_mut(&0).unwrap();
4812
4813        state
4814            .pda_reverse_lookups
4815            .entry("default_pda_lookup".to_string())
4816            .or_insert_with(|| PdaReverseLookup::new(1000))
4817            .insert("pda_123".to_string(), "addr_456".to_string());
4818
4819        state
4820            .lookup_indexes
4821            .entry("round_address_lookup_index".to_string())
4822            .or_insert_with(LookupIndex::new)
4823            .insert(json!("addr_456"), json!(789));
4824
4825        let handler = vec![
4826            OpCode::LoadConstant {
4827                value: json!("pda_123"),
4828                dest: 0,
4829            },
4830            OpCode::LookupIndex {
4831                state_id: 0,
4832                index_name: "round_address_lookup_index".to_string(),
4833                lookup_value: 0,
4834                dest: 1,
4835            },
4836        ];
4837
4838        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4839            .unwrap();
4840
4841        assert_eq!(vm.registers[1], json!(789));
4842    }
4843
4844    #[test]
4845    fn test_lookup_index_no_chain() {
4846        let mut vm = VmContext::new();
4847
4848        let state = vm.states.get_mut(&0).unwrap();
4849        state
4850            .lookup_indexes
4851            .entry("test_index".to_string())
4852            .or_insert_with(LookupIndex::new)
4853            .insert(json!("key_abc"), json!(42));
4854
4855        let handler = vec![
4856            OpCode::LoadConstant {
4857                value: json!("key_abc"),
4858                dest: 0,
4859            },
4860            OpCode::LookupIndex {
4861                state_id: 0,
4862                index_name: "test_index".to_string(),
4863                lookup_value: 0,
4864                dest: 1,
4865            },
4866        ];
4867
4868        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4869            .unwrap();
4870
4871        assert_eq!(vm.registers[1], json!(42));
4872    }
4873
4874    #[test]
4875    fn test_conditional_set_field_with_zero_array() {
4876        let mut vm = VmContext::new();
4877
4878        let event_zeros = json!({
4879            "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4880                      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
4881        });
4882
4883        let event_nonzero = json!({
4884            "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
4885                      17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
4886        });
4887
4888        let zero_32: Value = json!([
4889            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4890            0, 0, 0
4891        ]);
4892
4893        let handler = vec![
4894            OpCode::CreateObject { dest: 2 },
4895            OpCode::LoadEventField {
4896                path: FieldPath::new(&["value"]),
4897                dest: 10,
4898                default: None,
4899            },
4900            OpCode::ConditionalSetField {
4901                object: 2,
4902                path: "captured_value".to_string(),
4903                value: 10,
4904                condition_field: FieldPath::new(&["value"]),
4905                condition_op: ComparisonOp::NotEqual,
4906                condition_value: zero_32,
4907            },
4908        ];
4909
4910        vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
4911            .unwrap();
4912        assert!(
4913            vm.registers[2].get("captured_value").is_none(),
4914            "Field should not be set when value is all zeros"
4915        );
4916
4917        vm.reset_registers();
4918        vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
4919            .unwrap();
4920        assert!(
4921            vm.registers[2].get("captured_value").is_some(),
4922            "Field should be set when value is non-zero"
4923        );
4924    }
4925
4926    #[test]
4927    fn test_when_instruction_arrives_first() {
4928        let mut vm = VmContext::new();
4929
4930        let signature = "test_sig_123".to_string();
4931
4932        {
4933            let state = vm.states.get(&0).unwrap();
4934            let mut cache = state.recent_tx_instructions.lock().unwrap();
4935            let mut set = HashSet::new();
4936            set.insert("RevealIxState".to_string());
4937            cache.put(signature.clone(), set);
4938        }
4939
4940        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4941
4942        let handler = vec![
4943            OpCode::CreateObject { dest: 2 },
4944            OpCode::LoadConstant {
4945                value: json!("primary_key_value"),
4946                dest: 1,
4947            },
4948            OpCode::LoadConstant {
4949                value: json!("the_revealed_value"),
4950                dest: 10,
4951            },
4952            OpCode::SetFieldWhen {
4953                object: 2,
4954                path: "entropy_value".to_string(),
4955                value: 10,
4956                when_instruction: "RevealIxState".to_string(),
4957                entity_name: "TestEntity".to_string(),
4958                key_reg: 1,
4959                condition_field: None,
4960                condition_op: None,
4961                condition_value: None,
4962            },
4963        ];
4964
4965        vm.execute_handler(
4966            &handler,
4967            &json!({}),
4968            "VarState",
4969            0,
4970            "TestEntity",
4971            None,
4972            None,
4973        )
4974        .unwrap();
4975
4976        assert_eq!(
4977            vm.registers[2].get("entropy_value").unwrap(),
4978            "the_revealed_value",
4979            "Field should be set when instruction was already seen"
4980        );
4981    }
4982
4983    #[test]
4984    fn test_when_account_arrives_first() {
4985        let mut vm = VmContext::new();
4986
4987        let signature = "test_sig_456".to_string();
4988
4989        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4990
4991        let handler = vec![
4992            OpCode::CreateObject { dest: 2 },
4993            OpCode::LoadConstant {
4994                value: json!("pk_123"),
4995                dest: 1,
4996            },
4997            OpCode::LoadConstant {
4998                value: json!("deferred_value"),
4999                dest: 10,
5000            },
5001            OpCode::SetFieldWhen {
5002                object: 2,
5003                path: "entropy_value".to_string(),
5004                value: 10,
5005                when_instruction: "RevealIxState".to_string(),
5006                entity_name: "TestEntity".to_string(),
5007                key_reg: 1,
5008                condition_field: None,
5009                condition_op: None,
5010                condition_value: None,
5011            },
5012        ];
5013
5014        vm.execute_handler(
5015            &handler,
5016            &json!({}),
5017            "VarState",
5018            0,
5019            "TestEntity",
5020            None,
5021            None,
5022        )
5023        .unwrap();
5024
5025        assert!(
5026            vm.registers[2].get("entropy_value").is_none(),
5027            "Field should not be set when instruction hasn't been seen"
5028        );
5029
5030        let state = vm.states.get(&0).unwrap();
5031        let key = (signature.clone(), "RevealIxState".to_string());
5032        assert!(
5033            state.deferred_when_ops.contains_key(&key),
5034            "Operation should be queued"
5035        );
5036
5037        {
5038            let mut cache = state.recent_tx_instructions.lock().unwrap();
5039            let mut set = HashSet::new();
5040            set.insert("RevealIxState".to_string());
5041            cache.put(signature.clone(), set);
5042        }
5043
5044        let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5045        for op in deferred {
5046            vm.apply_deferred_when_op(0, &op).unwrap();
5047        }
5048
5049        let state = vm.states.get(&0).unwrap();
5050        let entity = state.data.get(&json!("pk_123")).unwrap();
5051        assert_eq!(
5052            entity.get("entropy_value").unwrap(),
5053            "deferred_value",
5054            "Field should be set after instruction arrives"
5055        );
5056    }
5057
5058    #[test]
5059    fn test_when_cleanup_expired() {
5060        let mut vm = VmContext::new();
5061
5062        let state = vm.states.get(&0).unwrap();
5063        let key = ("old_sig".to_string(), "SomeIxState".to_string());
5064        state.deferred_when_ops.insert(
5065            key,
5066            vec![DeferredWhenOperation {
5067                entity_name: "Test".to_string(),
5068                primary_key: json!("pk"),
5069                field_path: "field".to_string(),
5070                field_value: json!("value"),
5071                when_instruction: "SomeIxState".to_string(),
5072                signature: "old_sig".to_string(),
5073                slot: 0,
5074                deferred_at: 0,
5075                emit: true,
5076            }],
5077        );
5078
5079        let removed = vm.cleanup_expired_when_ops(0, 60);
5080
5081        assert_eq!(removed, 1, "Should have removed 1 expired op");
5082        assert!(
5083            vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5084            "Deferred ops should be empty after cleanup"
5085        );
5086    }
5087}