Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3    ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use serde_json::{json, Value};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::num::NonZeroUsize;
12
13#[cfg(feature = "otel")]
14use tracing::instrument;
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// Additional custom metadata that can be added without breaking changes
33    pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37    /// Create a new UpdateContext with slot and signature
38    pub fn new(slot: u64, signature: String) -> Self {
39        Self {
40            slot: Some(slot),
41            signature: Some(signature),
42            timestamp: None,
43            write_version: None,
44            txn_index: None,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Create a new UpdateContext with slot, signature, and timestamp
50    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51        Self {
52            slot: Some(slot),
53            signature: Some(signature),
54            timestamp: Some(timestamp),
55            write_version: None,
56            txn_index: None,
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Create context for account updates with write_version for staleness detection
62    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63        Self {
64            slot: Some(slot),
65            signature: Some(signature),
66            timestamp: None,
67            write_version: Some(write_version),
68            txn_index: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create context for instruction updates with txn_index for staleness detection
74    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75        Self {
76            slot: Some(slot),
77            signature: Some(signature),
78            timestamp: None,
79            write_version: None,
80            txn_index: Some(txn_index),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Get the timestamp, falling back to current system time if not set
86    pub fn timestamp(&self) -> i64 {
87        self.timestamp.unwrap_or_else(|| {
88            std::time::SystemTime::now()
89                .duration_since(std::time::UNIX_EPOCH)
90                .unwrap()
91                .as_secs() as i64
92        })
93    }
94
95    /// Create an empty context (for testing or when context is not available)
96    pub fn empty() -> Self {
97        Self::default()
98    }
99
100    /// Add custom metadata
101    /// Returns true if this is an account update context (has write_version, no txn_index)
102    pub fn is_account_update(&self) -> bool {
103        self.write_version.is_some() && self.txn_index.is_none()
104    }
105
106    /// Returns true if this is an instruction update context (has txn_index, no write_version)
107    pub fn is_instruction_update(&self) -> bool {
108        self.txn_index.is_some() && self.write_version.is_none()
109    }
110
111    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
112        self.metadata.insert(key, value);
113        self
114    }
115
116    /// Get metadata value
117    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
118        self.metadata.get(key)
119    }
120
121    /// Convert context to JSON value for injection into event data
122    pub fn to_value(&self) -> Value {
123        let mut obj = serde_json::Map::new();
124        if let Some(slot) = self.slot {
125            obj.insert("slot".to_string(), json!(slot));
126        }
127        if let Some(ref sig) = self.signature {
128            obj.insert("signature".to_string(), json!(sig));
129        }
130        // Always include timestamp (use current time if not set)
131        obj.insert("timestamp".to_string(), json!(self.timestamp()));
132        for (key, value) in &self.metadata {
133            obj.insert(key.clone(), value.clone());
134        }
135        Value::Object(obj)
136    }
137}
138
139pub type Register = usize;
140pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
141
142pub type RegisterValue = Value;
143
144/// Trait for evaluating computed fields
145/// Implement this in your generated spec to enable computed field evaluation
146pub trait ComputedFieldsEvaluator {
147    fn evaluate(&self, state: &mut Value) -> Result<()>;
148}
149
150// Pending queue configuration
151const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
152const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
153const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
154
155// Temporal index configuration - prevents unbounded history growth
156const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
157const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
158
159// State table configuration - aligned with downstream EntityCache (500 per view)
160const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
161const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
162
163const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
164
165const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
166
167// Smaller cache for instruction deduplication - provides shorter effective TTL
168// since we don't expect instruction duplicates to arrive much later
169const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
170
171const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
172
173const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
174
175const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 5_000;
176
177/// Estimate the size of a JSON value in bytes
178fn estimate_json_size(value: &Value) -> usize {
179    match value {
180        Value::Null => 4,
181        Value::Bool(_) => 5,
182        Value::Number(_) => 8,
183        Value::String(s) => s.len() + 2,
184        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
185        Value::Object(obj) => {
186            2 + obj
187                .iter()
188                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
189                .sum::<usize>()
190        }
191    }
192}
193
194#[derive(Debug, Clone)]
195pub struct CompiledPath {
196    pub segments: std::sync::Arc<[String]>,
197}
198
199impl CompiledPath {
200    pub fn new(path: &str) -> Self {
201        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
202        CompiledPath {
203            segments: segments.into(),
204        }
205    }
206
207    fn segments(&self) -> &[String] {
208        &self.segments
209    }
210}
211
212/// Represents the type of change made to a field for granular dirty tracking.
213/// This enables emitting only the actual changes rather than entire field values.
214#[derive(Debug, Clone)]
215pub enum FieldChange {
216    /// Field was replaced with a new value (emit the full value from state)
217    Replaced,
218    /// Items were appended to an array field (emit only the new items)
219    Appended(Vec<Value>),
220}
221
222/// Tracks field modifications during handler execution with granular change information.
223/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
224#[derive(Debug, Clone, Default)]
225pub struct DirtyTracker {
226    changes: HashMap<String, FieldChange>,
227}
228
229impl DirtyTracker {
230    /// Create a new empty DirtyTracker
231    pub fn new() -> Self {
232        Self {
233            changes: HashMap::new(),
234        }
235    }
236
237    /// Mark a field as replaced (full value will be emitted)
238    pub fn mark_replaced(&mut self, path: &str) {
239        // If there was an append, it's now superseded by a full replacement
240        self.changes.insert(path.to_string(), FieldChange::Replaced);
241    }
242
243    /// Record an appended value for a field
244    pub fn mark_appended(&mut self, path: &str, value: Value) {
245        match self.changes.get_mut(path) {
246            Some(FieldChange::Appended(values)) => {
247                // Add to existing appended values
248                values.push(value);
249            }
250            Some(FieldChange::Replaced) => {
251                // Field was already replaced, keep it as replaced
252                // (the full value including the append will be emitted)
253            }
254            None => {
255                // First append to this field
256                self.changes
257                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
258            }
259        }
260    }
261
262    /// Check if there are any changes tracked
263    pub fn is_empty(&self) -> bool {
264        self.changes.is_empty()
265    }
266
267    /// Get the number of changed fields
268    pub fn len(&self) -> usize {
269        self.changes.len()
270    }
271
272    /// Iterate over all changes
273    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
274        self.changes.iter()
275    }
276
277    /// Get a set of all dirty field paths (for backward compatibility)
278    pub fn dirty_paths(&self) -> HashSet<String> {
279        self.changes.keys().cloned().collect()
280    }
281
282    /// Consume the tracker and return the changes map
283    pub fn into_changes(self) -> HashMap<String, FieldChange> {
284        self.changes
285    }
286
287    /// Get a reference to the changes map
288    pub fn changes(&self) -> &HashMap<String, FieldChange> {
289        &self.changes
290    }
291
292    /// Get paths that were appended (not replaced)
293    pub fn appended_paths(&self) -> Vec<String> {
294        self.changes
295            .iter()
296            .filter_map(|(path, change)| match change {
297                FieldChange::Appended(_) => Some(path.clone()),
298                FieldChange::Replaced => None,
299            })
300            .collect()
301    }
302}
303
304pub struct VmContext {
305    registers: Vec<RegisterValue>,
306    states: HashMap<u32, StateTable>,
307    pub instructions_executed: u64,
308    pub cache_hits: u64,
309    path_cache: HashMap<String, CompiledPath>,
310    pub pda_cache_hits: u64,
311    pub pda_cache_misses: u64,
312    pub pending_queue_size: u64,
313    resolver_requests: VecDeque<ResolverRequest>,
314    resolver_pending: HashMap<String, PendingResolverEntry>,
315    resolver_cache: LruCache<String, Value>,
316    pub resolver_cache_hits: u64,
317    pub resolver_cache_misses: u64,
318    current_context: Option<UpdateContext>,
319    warnings: Vec<String>,
320    last_pda_lookup_miss: Option<String>,
321    last_lookup_index_miss: Option<String>,
322    last_pda_registered: Option<String>,
323    last_lookup_index_keys: Vec<String>,
324}
325
326#[derive(Debug)]
327pub struct LookupIndex {
328    index: std::sync::Mutex<LruCache<String, Value>>,
329}
330
331impl LookupIndex {
332    pub fn new() -> Self {
333        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
334    }
335
336    pub fn with_capacity(capacity: usize) -> Self {
337        LookupIndex {
338            index: std::sync::Mutex::new(LruCache::new(
339                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
340            )),
341        }
342    }
343
344    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
345        let key = value_to_cache_key(lookup_value);
346        self.index.lock().unwrap().get(&key).cloned()
347    }
348
349    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
350        let key = value_to_cache_key(&lookup_value);
351        self.index.lock().unwrap().put(key, primary_key);
352    }
353
354    pub fn len(&self) -> usize {
355        self.index.lock().unwrap().len()
356    }
357
358    pub fn is_empty(&self) -> bool {
359        self.index.lock().unwrap().is_empty()
360    }
361}
362
363impl Default for LookupIndex {
364    fn default() -> Self {
365        Self::new()
366    }
367}
368
369fn value_to_cache_key(value: &Value) -> String {
370    match value {
371        Value::String(s) => s.clone(),
372        Value::Number(n) => n.to_string(),
373        Value::Bool(b) => b.to_string(),
374        Value::Null => "null".to_string(),
375        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
376    }
377}
378
379fn resolver_type_key(resolver: &ResolverType) -> &'static str {
380    match resolver {
381        ResolverType::Token => "token",
382    }
383}
384
385fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
386    format!(
387        "{}:{}",
388        resolver_type_key(resolver),
389        value_to_cache_key(input)
390    )
391}
392
393#[derive(Debug)]
394pub struct TemporalIndex {
395    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
396}
397
398impl Default for TemporalIndex {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404impl TemporalIndex {
405    pub fn new() -> Self {
406        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
407    }
408
409    pub fn with_capacity(capacity: usize) -> Self {
410        TemporalIndex {
411            index: std::sync::Mutex::new(LruCache::new(
412                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
413            )),
414        }
415    }
416
417    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
418        let key = value_to_cache_key(lookup_value);
419        let mut cache = self.index.lock().unwrap();
420        if let Some(entries) = cache.get(&key) {
421            for i in (0..entries.len()).rev() {
422                if entries[i].1 <= timestamp {
423                    return Some(entries[i].0.clone());
424                }
425            }
426        }
427        None
428    }
429
430    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
431        let key = value_to_cache_key(lookup_value);
432        let mut cache = self.index.lock().unwrap();
433        if let Some(entries) = cache.get(&key) {
434            if let Some(last) = entries.last() {
435                return Some(last.0.clone());
436            }
437        }
438        None
439    }
440
441    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
442        let key = value_to_cache_key(&lookup_value);
443        let mut cache = self.index.lock().unwrap();
444
445        let entries = cache.get_or_insert_mut(key, Vec::new);
446        entries.push((primary_key, timestamp));
447        entries.sort_by_key(|(_, ts)| *ts);
448
449        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
450        entries.retain(|(_, ts)| *ts >= cutoff);
451
452        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
453            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
454            entries.drain(0..excess);
455        }
456    }
457
458    pub fn len(&self) -> usize {
459        self.index.lock().unwrap().len()
460    }
461
462    pub fn is_empty(&self) -> bool {
463        self.index.lock().unwrap().is_empty()
464    }
465
466    pub fn total_entries(&self) -> usize {
467        self.index
468            .lock()
469            .unwrap()
470            .iter()
471            .map(|(_, entries)| entries.len())
472            .sum()
473    }
474
475    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
476        let mut cache = self.index.lock().unwrap();
477        let mut total_removed = 0;
478
479        for (_, entries) in cache.iter_mut() {
480            let original_len = entries.len();
481            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
482            total_removed += original_len - entries.len();
483        }
484
485        total_removed
486    }
487}
488
489#[derive(Debug)]
490pub struct PdaReverseLookup {
491    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
492    index: LruCache<String, String>,
493}
494
495impl PdaReverseLookup {
496    pub fn new(capacity: usize) -> Self {
497        PdaReverseLookup {
498            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
499        }
500    }
501
502    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
503        self.index.get(pda_address).cloned()
504    }
505
506    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
507        let evicted = if self.index.len() >= self.index.cap().get() {
508            self.index.peek_lru().map(|(k, _)| k.clone())
509        } else {
510            None
511        };
512
513        self.index.put(pda_address, seed_value);
514        evicted
515    }
516
517    pub fn len(&self) -> usize {
518        self.index.len()
519    }
520
521    pub fn is_empty(&self) -> bool {
522        self.index.is_empty()
523    }
524
525    pub fn contains(&self, pda_address: &str) -> bool {
526        self.index.peek(pda_address).is_some()
527    }
528}
529
530/// Input for queueing an account update.
531#[derive(Debug, Clone)]
532pub struct QueuedAccountUpdate {
533    pub pda_address: String,
534    pub account_type: String,
535    pub account_data: Value,
536    pub slot: u64,
537    pub write_version: u64,
538    pub signature: String,
539}
540
541/// Internal representation of a pending account update with queue metadata.
542#[derive(Debug, Clone)]
543pub struct PendingAccountUpdate {
544    pub account_type: String,
545    pub pda_address: String,
546    pub account_data: Value,
547    pub slot: u64,
548    pub write_version: u64,
549    pub signature: String,
550    pub queued_at: i64,
551}
552
553/// Input for queueing an instruction event when PDA lookup fails.
554#[derive(Debug, Clone)]
555pub struct QueuedInstructionEvent {
556    pub pda_address: String,
557    pub event_type: String,
558    pub event_data: Value,
559    pub slot: u64,
560    pub signature: String,
561}
562
563/// Internal representation of a pending instruction event with queue metadata.
564#[derive(Debug, Clone)]
565pub struct PendingInstructionEvent {
566    pub event_type: String,
567    pub pda_address: String,
568    pub event_data: Value,
569    pub slot: u64,
570    pub signature: String,
571    pub queued_at: i64,
572}
573
574#[derive(Debug, Clone)]
575pub struct DeferredWhenOperation {
576    pub entity_name: String,
577    pub primary_key: Value,
578    pub field_path: String,
579    pub field_value: Value,
580    pub when_instruction: String,
581    pub signature: String,
582    pub slot: u64,
583    pub deferred_at: i64,
584    pub emit: bool,
585}
586
587#[derive(Debug, Clone)]
588pub struct ResolverRequest {
589    pub cache_key: String,
590    pub resolver: ResolverType,
591    pub input: Value,
592}
593
594#[derive(Debug, Clone)]
595pub struct ResolverTarget {
596    pub state_id: u32,
597    pub entity_name: String,
598    pub primary_key: Value,
599    pub extracts: Vec<ResolverExtractSpec>,
600}
601
602#[derive(Debug, Clone)]
603pub struct PendingResolverEntry {
604    pub resolver: ResolverType,
605    pub input: Value,
606    pub targets: Vec<ResolverTarget>,
607    pub queued_at: i64,
608}
609
610impl PendingResolverEntry {
611    fn add_target(&mut self, target: ResolverTarget) {
612        if let Some(existing) = self.targets.iter_mut().find(|t| {
613            t.state_id == target.state_id
614                && t.entity_name == target.entity_name
615                && t.primary_key == target.primary_key
616        }) {
617            let mut seen = HashSet::new();
618            for extract in &existing.extracts {
619                seen.insert((extract.target_path.clone(), extract.source_path.clone()));
620            }
621
622            for extract in target.extracts {
623                let key = (extract.target_path.clone(), extract.source_path.clone());
624                if seen.insert(key) {
625                    existing.extracts.push(extract);
626                }
627            }
628        } else {
629            self.targets.push(target);
630        }
631    }
632}
633
634#[derive(Debug, Clone)]
635pub struct PendingQueueStats {
636    pub total_updates: usize,
637    pub unique_pdas: usize,
638    pub oldest_age_seconds: i64,
639    pub largest_pda_queue_size: usize,
640    pub estimated_memory_bytes: usize,
641}
642
643#[derive(Debug, Clone, Default)]
644pub struct VmMemoryStats {
645    pub state_table_entity_count: usize,
646    pub state_table_max_entries: usize,
647    pub state_table_at_capacity: bool,
648    pub lookup_index_count: usize,
649    pub lookup_index_total_entries: usize,
650    pub temporal_index_count: usize,
651    pub temporal_index_total_entries: usize,
652    pub pda_reverse_lookup_count: usize,
653    pub pda_reverse_lookup_total_entries: usize,
654    pub version_tracker_entries: usize,
655    pub pending_queue_stats: Option<PendingQueueStats>,
656    pub path_cache_size: usize,
657}
658
659#[derive(Debug, Clone, Default)]
660pub struct CleanupResult {
661    pub pending_updates_removed: usize,
662    pub temporal_entries_removed: usize,
663}
664
665#[derive(Debug, Clone)]
666pub struct CapacityWarning {
667    pub current_entries: usize,
668    pub max_entries: usize,
669    pub entries_over_limit: usize,
670}
671
672#[derive(Debug, Clone)]
673pub struct StateTableConfig {
674    pub max_entries: usize,
675    pub max_array_length: usize,
676}
677
678impl Default for StateTableConfig {
679    fn default() -> Self {
680        Self {
681            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
682            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
683        }
684    }
685}
686
687#[derive(Debug)]
688pub struct VersionTracker {
689    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
690}
691
692impl VersionTracker {
693    pub fn new() -> Self {
694        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
695    }
696
697    pub fn with_capacity(capacity: usize) -> Self {
698        VersionTracker {
699            cache: std::sync::Mutex::new(LruCache::new(
700                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
701            )),
702        }
703    }
704
705    fn make_key(primary_key: &Value, event_type: &str) -> String {
706        format!("{}:{}", primary_key, event_type)
707    }
708
709    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
710        let key = Self::make_key(primary_key, event_type);
711        self.cache.lock().unwrap().get(&key).copied()
712    }
713
714    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
715        let key = Self::make_key(primary_key, event_type);
716        self.cache.lock().unwrap().put(key, (slot, ordering_value));
717    }
718
719    pub fn len(&self) -> usize {
720        self.cache.lock().unwrap().len()
721    }
722
723    pub fn is_empty(&self) -> bool {
724        self.cache.lock().unwrap().is_empty()
725    }
726}
727
728impl Default for VersionTracker {
729    fn default() -> Self {
730        Self::new()
731    }
732}
733
734#[derive(Debug)]
735pub struct StateTable {
736    pub data: DashMap<Value, Value>,
737    access_times: DashMap<Value, i64>,
738    pub lookup_indexes: HashMap<String, LookupIndex>,
739    pub temporal_indexes: HashMap<String, TemporalIndex>,
740    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
741    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
742    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
743    version_tracker: VersionTracker,
744    instruction_dedup_cache: VersionTracker,
745    config: StateTableConfig,
746    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
747    entity_name: String,
748    pub recent_tx_instructions:
749        std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
750    pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
751}
752
753impl StateTable {
754    pub fn is_at_capacity(&self) -> bool {
755        self.data.len() >= self.config.max_entries
756    }
757
758    pub fn entries_over_limit(&self) -> usize {
759        self.data.len().saturating_sub(self.config.max_entries)
760    }
761
762    pub fn max_array_length(&self) -> usize {
763        self.config.max_array_length
764    }
765
766    fn touch(&self, key: &Value) {
767        let now = std::time::SystemTime::now()
768            .duration_since(std::time::UNIX_EPOCH)
769            .unwrap()
770            .as_secs() as i64;
771        self.access_times.insert(key.clone(), now);
772    }
773
774    fn evict_lru(&self, count: usize) -> usize {
775        if count == 0 || self.data.is_empty() {
776            return 0;
777        }
778
779        let mut entries: Vec<(Value, i64)> = self
780            .access_times
781            .iter()
782            .map(|entry| (entry.key().clone(), *entry.value()))
783            .collect();
784
785        entries.sort_by_key(|(_, ts)| *ts);
786
787        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
788
789        let mut evicted = 0;
790        for key in to_evict {
791            self.data.remove(&key);
792            self.access_times.remove(&key);
793            evicted += 1;
794        }
795
796        #[cfg(feature = "otel")]
797        if evicted > 0 {
798            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
799        }
800
801        evicted
802    }
803
804    pub fn insert_with_eviction(&self, key: Value, value: Value) {
805        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
806            #[cfg(feature = "otel")]
807            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
808            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
809            self.evict_lru(to_evict.max(1));
810        }
811        self.data.insert(key.clone(), value);
812        self.touch(&key);
813    }
814
815    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
816        let result = self.data.get(key).map(|v| v.clone());
817        if result.is_some() {
818            self.touch(key);
819        }
820        result
821    }
822
823    /// Check if an update is fresh and update the version tracker.
824    /// Returns true if the update should be processed (is fresh).
825    /// Returns false if the update is stale and should be skipped.
826    ///
827    /// Comparison is lexicographic on (slot, ordering_value):
828    /// (100, 5) > (100, 3) > (99, 999)
829    pub fn is_fresh_update(
830        &self,
831        primary_key: &Value,
832        event_type: &str,
833        slot: u64,
834        ordering_value: u64,
835    ) -> bool {
836        let dominated = self
837            .version_tracker
838            .get(primary_key, event_type)
839            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
840            .unwrap_or(false);
841
842        if dominated {
843            return false;
844        }
845
846        self.version_tracker
847            .insert(primary_key, event_type, slot, ordering_value);
848        true
849    }
850
851    /// Check if an instruction is a duplicate of one we've seen recently.
852    /// Returns true if this exact instruction has been seen before (is a duplicate).
853    /// Returns false if this is a new instruction that should be processed.
854    ///
855    /// Unlike account updates, instructions don't use recency checks - all
856    /// unique instructions are processed. Only exact duplicates are skipped.
857    /// Uses a smaller cache capacity for shorter effective TTL.
858    pub fn is_duplicate_instruction(
859        &self,
860        primary_key: &Value,
861        event_type: &str,
862        slot: u64,
863        txn_index: u64,
864    ) -> bool {
865        // Check if we've seen this exact instruction before
866        let is_duplicate = self
867            .instruction_dedup_cache
868            .get(primary_key, event_type)
869            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
870            .unwrap_or(false);
871
872        if is_duplicate {
873            return true;
874        }
875
876        // Record this instruction for deduplication
877        self.instruction_dedup_cache
878            .insert(primary_key, event_type, slot, txn_index);
879        false
880    }
881}
882
883impl VmContext {
884    pub fn new() -> Self {
885        let mut vm = VmContext {
886            registers: vec![Value::Null; 256],
887            states: HashMap::new(),
888            instructions_executed: 0,
889            cache_hits: 0,
890            path_cache: HashMap::new(),
891            pda_cache_hits: 0,
892            pda_cache_misses: 0,
893            pending_queue_size: 0,
894            resolver_requests: VecDeque::new(),
895            resolver_pending: HashMap::new(),
896            resolver_cache: LruCache::new(
897                NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
898                    .expect("capacity must be > 0"),
899            ),
900            resolver_cache_hits: 0,
901            resolver_cache_misses: 0,
902            current_context: None,
903            warnings: Vec::new(),
904            last_pda_lookup_miss: None,
905            last_lookup_index_miss: None,
906            last_pda_registered: None,
907            last_lookup_index_keys: Vec::new(),
908        };
909        vm.states.insert(
910            0,
911            StateTable {
912                data: DashMap::new(),
913                access_times: DashMap::new(),
914                lookup_indexes: HashMap::new(),
915                temporal_indexes: HashMap::new(),
916                pda_reverse_lookups: HashMap::new(),
917                pending_updates: DashMap::new(),
918                pending_instruction_events: DashMap::new(),
919                version_tracker: VersionTracker::new(),
920                instruction_dedup_cache: VersionTracker::with_capacity(
921                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
922                ),
923                config: StateTableConfig::default(),
924                entity_name: "default".to_string(),
925                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
926                    NonZeroUsize::new(1000).unwrap(),
927                )),
928                deferred_when_ops: DashMap::new(),
929            },
930        );
931        vm
932    }
933
934    pub fn new_with_config(state_config: StateTableConfig) -> Self {
935        let mut vm = VmContext {
936            registers: vec![Value::Null; 256],
937            states: HashMap::new(),
938            instructions_executed: 0,
939            cache_hits: 0,
940            path_cache: HashMap::new(),
941            pda_cache_hits: 0,
942            pda_cache_misses: 0,
943            pending_queue_size: 0,
944            resolver_requests: VecDeque::new(),
945            resolver_pending: HashMap::new(),
946            resolver_cache: LruCache::new(
947                NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
948                    .expect("capacity must be > 0"),
949            ),
950            resolver_cache_hits: 0,
951            resolver_cache_misses: 0,
952            current_context: None,
953            warnings: Vec::new(),
954            last_pda_lookup_miss: None,
955            last_lookup_index_miss: None,
956            last_pda_registered: None,
957            last_lookup_index_keys: Vec::new(),
958        };
959        vm.states.insert(
960            0,
961            StateTable {
962                data: DashMap::new(),
963                access_times: DashMap::new(),
964                lookup_indexes: HashMap::new(),
965                temporal_indexes: HashMap::new(),
966                pda_reverse_lookups: HashMap::new(),
967                pending_updates: DashMap::new(),
968                pending_instruction_events: DashMap::new(),
969                version_tracker: VersionTracker::new(),
970                instruction_dedup_cache: VersionTracker::with_capacity(
971                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
972                ),
973                config: state_config,
974                entity_name: "default".to_string(),
975                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
976                    NonZeroUsize::new(1000).unwrap(),
977                )),
978                deferred_when_ops: DashMap::new(),
979            },
980        );
981        vm
982    }
983
984    pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
985        self.resolver_requests.drain(..).collect()
986    }
987
988    pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
989        if requests.is_empty() {
990            return;
991        }
992
993        self.resolver_requests.extend(requests);
994    }
995
996    pub fn apply_resolver_result(
997        &mut self,
998        bytecode: &MultiEntityBytecode,
999        cache_key: &str,
1000        resolved_value: Value,
1001    ) -> Result<Vec<Mutation>> {
1002        self.resolver_cache
1003            .put(cache_key.to_string(), resolved_value.clone());
1004
1005        let entry = match self.resolver_pending.remove(cache_key) {
1006            Some(entry) => entry,
1007            None => return Ok(Vec::new()),
1008        };
1009
1010        let mut mutations = Vec::new();
1011
1012        for target in entry.targets {
1013            if target.primary_key.is_null() {
1014                continue;
1015            }
1016
1017            let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1018                Some(bc) => bc,
1019                None => continue,
1020            };
1021
1022            let state = match self.states.get(&target.state_id) {
1023                Some(s) => s,
1024                None => continue,
1025            };
1026
1027            let mut entity_state = state
1028                .get_and_touch(&target.primary_key)
1029                .unwrap_or_else(|| json!({}));
1030
1031            let mut dirty_tracker = DirtyTracker::new();
1032            let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1033
1034            Self::apply_resolver_extractions_to_value(
1035                &mut entity_state,
1036                &resolved_value,
1037                &target.extracts,
1038                &mut dirty_tracker,
1039                &should_emit,
1040            )?;
1041
1042            if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1043                let old_values: Vec<_> = entity_bytecode
1044                    .computed_paths
1045                    .iter()
1046                    .map(|path| Self::get_value_at_path(&entity_state, path))
1047                    .collect();
1048
1049                let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1050                let context_timestamp = self
1051                    .current_context
1052                    .as_ref()
1053                    .map(|c| c.timestamp())
1054                    .unwrap_or_else(|| {
1055                        std::time::SystemTime::now()
1056                            .duration_since(std::time::UNIX_EPOCH)
1057                            .unwrap()
1058                            .as_secs() as i64
1059                    });
1060                let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1061                if eval_result.is_ok() {
1062                    for (path, old_value) in
1063                        entity_bytecode.computed_paths.iter().zip(old_values.iter())
1064                    {
1065                        let new_value = Self::get_value_at_path(&entity_state, path);
1066                        if new_value != *old_value && should_emit(path) {
1067                            dirty_tracker.mark_replaced(path);
1068                        }
1069                    }
1070                }
1071            }
1072
1073            state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1074
1075            if dirty_tracker.is_empty() {
1076                continue;
1077            }
1078
1079            let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1080
1081            mutations.push(Mutation {
1082                export: target.entity_name.clone(),
1083                key: target.primary_key.clone(),
1084                patch,
1085                append: vec![],
1086            });
1087        }
1088
1089        Ok(mutations)
1090    }
1091
1092    fn enqueue_resolver_request(
1093        &mut self,
1094        cache_key: String,
1095        resolver: ResolverType,
1096        input: Value,
1097        target: ResolverTarget,
1098    ) {
1099        if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1100            entry.add_target(target);
1101            return;
1102        }
1103
1104        let queued_at = std::time::SystemTime::now()
1105            .duration_since(std::time::UNIX_EPOCH)
1106            .unwrap()
1107            .as_secs() as i64;
1108
1109        self.resolver_pending.insert(
1110            cache_key.clone(),
1111            PendingResolverEntry {
1112                resolver: resolver.clone(),
1113                input: input.clone(),
1114                targets: vec![target],
1115                queued_at,
1116            },
1117        );
1118
1119        self.resolver_requests.push_back(ResolverRequest {
1120            cache_key,
1121            resolver,
1122            input,
1123        });
1124    }
1125
1126    fn apply_resolver_extractions_to_value<F>(
1127        state: &mut Value,
1128        resolved_value: &Value,
1129        extracts: &[ResolverExtractSpec],
1130        dirty_tracker: &mut DirtyTracker,
1131        should_emit: &F,
1132    ) -> Result<()>
1133    where
1134        F: Fn(&str) -> bool,
1135    {
1136        for extract in extracts {
1137            let resolved = match extract.source_path.as_deref() {
1138                Some(path) => Self::get_value_at_path(resolved_value, path),
1139                None => Some(resolved_value.clone()),
1140            };
1141
1142            let Some(value) = resolved else {
1143                continue;
1144            };
1145
1146            let value = if let Some(transform) = &extract.transform {
1147                Self::apply_transformation(&value, transform)?
1148            } else {
1149                value
1150            };
1151
1152            Self::set_nested_field_value(state, &extract.target_path, value)?;
1153            if should_emit(&extract.target_path) {
1154                dirty_tracker.mark_replaced(&extract.target_path);
1155            }
1156        }
1157
1158        Ok(())
1159    }
1160
1161    fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1162        if tracker.is_empty() {
1163            return Ok(json!({}));
1164        }
1165
1166        let mut partial = serde_json::Map::new();
1167
1168        for (path, change) in tracker.iter() {
1169            let segments: Vec<&str> = path.split('.').collect();
1170
1171            let value_to_insert = match change {
1172                FieldChange::Replaced => {
1173                    let mut current = state;
1174                    let mut found = true;
1175
1176                    for segment in &segments {
1177                        match current.get(*segment) {
1178                            Some(v) => current = v,
1179                            None => {
1180                                found = false;
1181                                break;
1182                            }
1183                        }
1184                    }
1185
1186                    if !found {
1187                        continue;
1188                    }
1189                    current.clone()
1190                }
1191                FieldChange::Appended(values) => Value::Array(values.clone()),
1192            };
1193
1194            let mut target = &mut partial;
1195            for (i, segment) in segments.iter().enumerate() {
1196                if i == segments.len() - 1 {
1197                    target.insert(segment.to_string(), value_to_insert.clone());
1198                } else {
1199                    target
1200                        .entry(segment.to_string())
1201                        .or_insert_with(|| json!({}));
1202                    target = target
1203                        .get_mut(*segment)
1204                        .and_then(|v| v.as_object_mut())
1205                        .ok_or("Failed to build nested structure")?;
1206                }
1207            }
1208        }
1209
1210        Ok(Value::Object(partial))
1211    }
1212
1213    /// Get a mutable reference to a state table by ID
1214    /// Returns None if the state ID doesn't exist
1215    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1216        self.states.get_mut(&state_id)
1217    }
1218
1219    /// Get public access to registers (for metrics context)
1220    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1221        &mut self.registers
1222    }
1223
1224    /// Get public access to path cache (for metrics context)
1225    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1226        &self.path_cache
1227    }
1228
1229    /// Get the current update context
1230    pub fn current_context(&self) -> Option<&UpdateContext> {
1231        self.current_context.as_ref()
1232    }
1233
1234    /// Set the current update context for computed field evaluation
1235    pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1236        self.current_context = context;
1237    }
1238
1239    fn add_warning(&mut self, msg: String) {
1240        self.warnings.push(msg);
1241    }
1242
1243    pub fn take_warnings(&mut self) -> Vec<String> {
1244        std::mem::take(&mut self.warnings)
1245    }
1246
1247    pub fn has_warnings(&self) -> bool {
1248        !self.warnings.is_empty()
1249    }
1250
1251    pub fn update_state_from_register(
1252        &mut self,
1253        state_id: u32,
1254        key: Value,
1255        register: Register,
1256    ) -> Result<()> {
1257        let state = self.states.get(&state_id).ok_or("State table not found")?;
1258        let value = self.registers[register].clone();
1259        state.insert_with_eviction(key, value);
1260        Ok(())
1261    }
1262
1263    fn reset_registers(&mut self) {
1264        for reg in &mut self.registers {
1265            *reg = Value::Null;
1266        }
1267    }
1268
1269    /// Extract only the dirty fields from state (public for use by instruction hooks)
1270    pub fn extract_partial_state(
1271        &self,
1272        state_reg: Register,
1273        dirty_fields: &HashSet<String>,
1274    ) -> Result<Value> {
1275        let full_state = &self.registers[state_reg];
1276
1277        if dirty_fields.is_empty() {
1278            return Ok(json!({}));
1279        }
1280
1281        let mut partial = serde_json::Map::new();
1282
1283        for path in dirty_fields {
1284            let segments: Vec<&str> = path.split('.').collect();
1285
1286            let mut current = full_state;
1287            let mut found = true;
1288
1289            for segment in &segments {
1290                match current.get(segment) {
1291                    Some(v) => current = v,
1292                    None => {
1293                        found = false;
1294                        break;
1295                    }
1296                }
1297            }
1298
1299            if !found {
1300                continue;
1301            }
1302
1303            let mut target = &mut partial;
1304            for (i, segment) in segments.iter().enumerate() {
1305                if i == segments.len() - 1 {
1306                    target.insert(segment.to_string(), current.clone());
1307                } else {
1308                    target
1309                        .entry(segment.to_string())
1310                        .or_insert_with(|| json!({}));
1311                    target = target
1312                        .get_mut(*segment)
1313                        .and_then(|v| v.as_object_mut())
1314                        .ok_or("Failed to build nested structure")?;
1315                }
1316            }
1317        }
1318
1319        Ok(Value::Object(partial))
1320    }
1321
1322    /// Extract a patch from state based on the DirtyTracker.
1323    /// For Replaced fields: extracts the full value from state.
1324    /// For Appended fields: emits only the appended values as an array.
1325    pub fn extract_partial_state_with_tracker(
1326        &self,
1327        state_reg: Register,
1328        tracker: &DirtyTracker,
1329    ) -> Result<Value> {
1330        let full_state = &self.registers[state_reg];
1331
1332        if tracker.is_empty() {
1333            return Ok(json!({}));
1334        }
1335
1336        let mut partial = serde_json::Map::new();
1337
1338        for (path, change) in tracker.iter() {
1339            let segments: Vec<&str> = path.split('.').collect();
1340
1341            let value_to_insert = match change {
1342                FieldChange::Replaced => {
1343                    let mut current = full_state;
1344                    let mut found = true;
1345
1346                    for segment in &segments {
1347                        match current.get(*segment) {
1348                            Some(v) => current = v,
1349                            None => {
1350                                found = false;
1351                                break;
1352                            }
1353                        }
1354                    }
1355
1356                    if !found {
1357                        continue;
1358                    }
1359                    current.clone()
1360                }
1361                FieldChange::Appended(values) => Value::Array(values.clone()),
1362            };
1363
1364            let mut target = &mut partial;
1365            for (i, segment) in segments.iter().enumerate() {
1366                if i == segments.len() - 1 {
1367                    target.insert(segment.to_string(), value_to_insert.clone());
1368                } else {
1369                    target
1370                        .entry(segment.to_string())
1371                        .or_insert_with(|| json!({}));
1372                    target = target
1373                        .get_mut(*segment)
1374                        .and_then(|v| v.as_object_mut())
1375                        .ok_or("Failed to build nested structure")?;
1376                }
1377            }
1378        }
1379
1380        Ok(Value::Object(partial))
1381    }
1382
1383    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1384        if let Some(compiled) = self.path_cache.get(path) {
1385            self.cache_hits += 1;
1386            #[cfg(feature = "otel")]
1387            crate::vm_metrics::record_path_cache_hit();
1388            return compiled.clone();
1389        }
1390        #[cfg(feature = "otel")]
1391        crate::vm_metrics::record_path_cache_miss();
1392        let compiled = CompiledPath::new(path);
1393        self.path_cache.insert(path.to_string(), compiled.clone());
1394        compiled
1395    }
1396
1397    /// Process an event with optional context metadata
1398    #[cfg_attr(feature = "otel", instrument(
1399        name = "vm.process_event",
1400        skip(self, bytecode, event_value, log),
1401        level = "info",
1402        fields(
1403            event_type = %event_type,
1404            slot = context.as_ref().and_then(|c| c.slot),
1405        )
1406    ))]
1407    pub fn process_event(
1408        &mut self,
1409        bytecode: &MultiEntityBytecode,
1410        event_value: Value,
1411        event_type: &str,
1412        context: Option<&UpdateContext>,
1413        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1414    ) -> Result<Vec<Mutation>> {
1415        self.current_context = context.cloned();
1416
1417        let mut event_value = event_value;
1418        if let Some(ctx) = context {
1419            if let Some(obj) = event_value.as_object_mut() {
1420                obj.insert("__update_context".to_string(), ctx.to_value());
1421            }
1422        }
1423
1424        let mut all_mutations = Vec::new();
1425
1426        if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1427            if let Some(ctx) = context {
1428                if let Some(signature) = ctx.signature.clone() {
1429                    let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1430                    for state_id in state_ids {
1431                        if let Some(state) = self.states.get(&state_id) {
1432                            {
1433                                let mut cache = state.recent_tx_instructions.lock().unwrap();
1434                                let entry =
1435                                    cache.get_or_insert_mut(signature.clone(), HashSet::new);
1436                                entry.insert(event_type.to_string());
1437                            }
1438
1439                            let key = (signature.clone(), event_type.to_string());
1440                            if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1441                                tracing::debug!(
1442                                    event_type = %event_type,
1443                                    signature = %signature,
1444                                    deferred_count = deferred_ops.len(),
1445                                    "flushing deferred when-ops"
1446                                );
1447                                for op in deferred_ops {
1448                                    match self.apply_deferred_when_op(state_id, &op) {
1449                                        Ok(mutations) => all_mutations.extend(mutations),
1450                                        Err(e) => tracing::warn!(
1451                                            "Failed to apply deferred when-op: {}",
1452                                            e
1453                                        ),
1454                                    }
1455                                }
1456                            }
1457                        }
1458                    }
1459                }
1460            }
1461        }
1462
1463        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1464            for entity_name in entity_names {
1465                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1466                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1467                        if let Some(ref mut log) = log {
1468                            log.set("entity", entity_name.clone());
1469                            log.inc("handlers", 1);
1470                        }
1471
1472                        let opcodes_before = self.instructions_executed;
1473                        let cache_before = self.cache_hits;
1474                        let pda_hits_before = self.pda_cache_hits;
1475                        let pda_misses_before = self.pda_cache_misses;
1476
1477                        let mutations = self.execute_handler(
1478                            handler,
1479                            &event_value,
1480                            event_type,
1481                            entity_bytecode.state_id,
1482                            entity_name,
1483                            entity_bytecode.computed_fields_evaluator.as_ref(),
1484                            Some(&entity_bytecode.non_emitted_fields),
1485                        )?;
1486
1487                        if let Some(ref mut log) = log {
1488                            log.inc(
1489                                "opcodes",
1490                                (self.instructions_executed - opcodes_before) as i64,
1491                            );
1492                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1493                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1494                            log.inc(
1495                                "pda_misses",
1496                                (self.pda_cache_misses - pda_misses_before) as i64,
1497                            );
1498                        }
1499
1500                        if mutations.is_empty() {
1501                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1502                                if event_type.ends_with("IxState") {
1503                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1504                                    let signature = context
1505                                        .and_then(|c| c.signature.clone())
1506                                        .unwrap_or_default();
1507                                    let _ = self.queue_instruction_event(
1508                                        entity_bytecode.state_id,
1509                                        QueuedInstructionEvent {
1510                                            pda_address: missed_pda,
1511                                            event_type: event_type.to_string(),
1512                                            event_data: event_value.clone(),
1513                                            slot,
1514                                            signature,
1515                                        },
1516                                    );
1517                                }
1518                            }
1519
1520                            if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1521                                if !event_type.ends_with("IxState") {
1522                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1523                                    let signature = context
1524                                        .and_then(|c| c.signature.clone())
1525                                        .unwrap_or_default();
1526                                    if let Some(write_version) =
1527                                        context.and_then(|c| c.write_version)
1528                                    {
1529                                        let _ = self.queue_account_update(
1530                                            entity_bytecode.state_id,
1531                                            QueuedAccountUpdate {
1532                                                pda_address: missed_lookup,
1533                                                account_type: event_type.to_string(),
1534                                                account_data: event_value.clone(),
1535                                                slot,
1536                                                write_version,
1537                                                signature,
1538                                            },
1539                                        );
1540                                    }
1541                                }
1542                            }
1543                        }
1544
1545                        all_mutations.extend(mutations);
1546
1547                        if event_type.ends_with("IxState") {
1548                            if let Some(ctx) = context {
1549                                if let Some(ref signature) = ctx.signature {
1550                                    if let Some(state) = self.states.get(&entity_bytecode.state_id)
1551                                    {
1552                                        {
1553                                            let mut cache =
1554                                                state.recent_tx_instructions.lock().unwrap();
1555                                            let entry = cache
1556                                                .get_or_insert_mut(signature.clone(), HashSet::new);
1557                                            entry.insert(event_type.to_string());
1558                                        }
1559
1560                                        let key = (signature.clone(), event_type.to_string());
1561                                        if let Some((_, deferred_ops)) =
1562                                            state.deferred_when_ops.remove(&key)
1563                                        {
1564                                            tracing::debug!(
1565                                                event_type = %event_type,
1566                                                signature = %signature,
1567                                                deferred_count = deferred_ops.len(),
1568                                                "flushing deferred when-ops"
1569                                            );
1570                                            for op in deferred_ops {
1571                                                match self.apply_deferred_when_op(
1572                                                    entity_bytecode.state_id,
1573                                                    &op,
1574                                                ) {
1575                                                    Ok(mutations) => {
1576                                                        all_mutations.extend(mutations)
1577                                                    }
1578                                                    Err(e) => {
1579                                                        tracing::warn!(
1580                                                            "Failed to apply deferred when-op: {}",
1581                                                            e
1582                                                        );
1583                                                    }
1584                                                }
1585                                            }
1586                                        }
1587                                    }
1588                                }
1589                            }
1590                        }
1591
1592                        if let Some(registered_pda) = self.take_last_pda_registered() {
1593                            let pending_events = self.flush_pending_instruction_events(
1594                                entity_bytecode.state_id,
1595                                &registered_pda,
1596                            );
1597                            for pending in pending_events {
1598                                if let Some(pending_handler) =
1599                                    entity_bytecode.handlers.get(&pending.event_type)
1600                                {
1601                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1602                                        pending_handler,
1603                                        &pending.event_data,
1604                                        &pending.event_type,
1605                                        entity_bytecode.state_id,
1606                                        entity_name,
1607                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1608                                        Some(&entity_bytecode.non_emitted_fields),
1609                                    ) {
1610                                        all_mutations.extend(reprocessed_mutations);
1611                                    }
1612                                }
1613                            }
1614                        }
1615
1616                        let lookup_keys = self.take_last_lookup_index_keys();
1617                        for lookup_key in lookup_keys {
1618                            if let Ok(pending_updates) =
1619                                self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1620                            {
1621                                for pending in pending_updates {
1622                                    if let Some(pending_handler) =
1623                                        entity_bytecode.handlers.get(&pending.account_type)
1624                                    {
1625                                        self.current_context = Some(UpdateContext::new_account(
1626                                            pending.slot,
1627                                            pending.signature.clone(),
1628                                            pending.write_version,
1629                                        ));
1630                                        if let Ok(reprocessed) = self.execute_handler(
1631                                            pending_handler,
1632                                            &pending.account_data,
1633                                            &pending.account_type,
1634                                            entity_bytecode.state_id,
1635                                            entity_name,
1636                                            entity_bytecode.computed_fields_evaluator.as_ref(),
1637                                            Some(&entity_bytecode.non_emitted_fields),
1638                                        ) {
1639                                            all_mutations.extend(reprocessed);
1640                                        }
1641                                    }
1642                                }
1643                            }
1644                        }
1645                    } else if let Some(ref mut log) = log {
1646                        log.set("skip_reason", "no_handler");
1647                    }
1648                } else if let Some(ref mut log) = log {
1649                    log.set("skip_reason", "entity_not_found");
1650                }
1651            }
1652        } else if let Some(ref mut log) = log {
1653            log.set("skip_reason", "no_event_routing");
1654        }
1655
1656        if let Some(log) = log {
1657            log.set("mutations", all_mutations.len() as i64);
1658            if let Some(first) = all_mutations.first() {
1659                if let Some(key_str) = first.key.as_str() {
1660                    log.set("primary_key", key_str);
1661                } else if let Some(key_num) = first.key.as_u64() {
1662                    log.set("primary_key", key_num as i64);
1663                }
1664            }
1665            if let Some(state) = self.states.get(&0) {
1666                log.set("state_table_size", state.data.len() as i64);
1667            }
1668
1669            let warnings = self.take_warnings();
1670            if !warnings.is_empty() {
1671                log.set("warnings", warnings.len() as i64);
1672                log.set(
1673                    "warning_messages",
1674                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1675                );
1676                log.set_level(crate::canonical_log::LogLevel::Warn);
1677            }
1678        } else {
1679            self.warnings.clear();
1680        }
1681
1682        if self.instructions_executed.is_multiple_of(1000) {
1683            let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1684            for state_id in state_ids {
1685                let expired = self.cleanup_expired_when_ops(state_id, 60);
1686                if expired > 0 {
1687                    tracing::debug!(
1688                        "Cleaned up {} expired deferred when-ops for state {}",
1689                        expired,
1690                        state_id
1691                    );
1692                }
1693            }
1694        }
1695
1696        Ok(all_mutations)
1697    }
1698
1699    pub fn process_any(
1700        &mut self,
1701        bytecode: &MultiEntityBytecode,
1702        any: prost_types::Any,
1703    ) -> Result<Vec<Mutation>> {
1704        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1705        self.process_event(bytecode, event_value, &event_type, None, None)
1706    }
1707
1708    #[cfg_attr(feature = "otel", instrument(
1709        name = "vm.execute_handler",
1710        skip(self, handler, event_value, entity_evaluator),
1711        level = "debug",
1712        fields(
1713            event_type = %event_type,
1714            handler_opcodes = handler.len(),
1715        )
1716    ))]
1717    #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1718    fn execute_handler(
1719        &mut self,
1720        handler: &[OpCode],
1721        event_value: &Value,
1722        event_type: &str,
1723        override_state_id: u32,
1724        entity_name: &str,
1725        entity_evaluator: Option<
1726            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1727        >,
1728        non_emitted_fields: Option<&HashSet<String>>,
1729    ) -> Result<Vec<Mutation>> {
1730        self.reset_registers();
1731        self.last_pda_lookup_miss = None;
1732
1733        let mut pc: usize = 0;
1734        let mut output = Vec::new();
1735        let mut dirty_tracker = DirtyTracker::new();
1736        let should_emit = |path: &str| {
1737            non_emitted_fields
1738                .map(|fields| !fields.contains(path))
1739                .unwrap_or(true)
1740        };
1741
1742        while pc < handler.len() {
1743            match &handler[pc] {
1744                OpCode::LoadEventField {
1745                    path,
1746                    dest,
1747                    default,
1748                } => {
1749                    let value = self.load_field(event_value, path, default.as_ref())?;
1750                    self.registers[*dest] = value;
1751                    pc += 1;
1752                }
1753                OpCode::LoadConstant { value, dest } => {
1754                    self.registers[*dest] = value.clone();
1755                    pc += 1;
1756                }
1757                OpCode::CopyRegister { source, dest } => {
1758                    self.registers[*dest] = self.registers[*source].clone();
1759                    pc += 1;
1760                }
1761                OpCode::CopyRegisterIfNull { source, dest } => {
1762                    if self.registers[*dest].is_null() {
1763                        self.registers[*dest] = self.registers[*source].clone();
1764                    }
1765                    pc += 1;
1766                }
1767                OpCode::GetEventType { dest } => {
1768                    self.registers[*dest] = json!(event_type);
1769                    pc += 1;
1770                }
1771                OpCode::CreateObject { dest } => {
1772                    self.registers[*dest] = json!({});
1773                    pc += 1;
1774                }
1775                OpCode::SetField {
1776                    object,
1777                    path,
1778                    value,
1779                } => {
1780                    self.set_field_auto_vivify(*object, path, *value)?;
1781                    if should_emit(path) {
1782                        dirty_tracker.mark_replaced(path);
1783                    }
1784                    pc += 1;
1785                }
1786                OpCode::SetFields { object, fields } => {
1787                    for (path, value_reg) in fields {
1788                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1789                        if should_emit(path) {
1790                            dirty_tracker.mark_replaced(path);
1791                        }
1792                    }
1793                    pc += 1;
1794                }
1795                OpCode::GetField { object, path, dest } => {
1796                    let value = self.get_field(*object, path)?;
1797                    self.registers[*dest] = value;
1798                    pc += 1;
1799                }
1800                OpCode::ReadOrInitState {
1801                    state_id: _,
1802                    key,
1803                    default,
1804                    dest,
1805                } => {
1806                    let actual_state_id = override_state_id;
1807                    let entity_name_owned = entity_name.to_string();
1808                    self.states
1809                        .entry(actual_state_id)
1810                        .or_insert_with(|| StateTable {
1811                            data: DashMap::new(),
1812                            access_times: DashMap::new(),
1813                            lookup_indexes: HashMap::new(),
1814                            temporal_indexes: HashMap::new(),
1815                            pda_reverse_lookups: HashMap::new(),
1816                            pending_updates: DashMap::new(),
1817                            pending_instruction_events: DashMap::new(),
1818                            version_tracker: VersionTracker::new(),
1819                            instruction_dedup_cache: VersionTracker::with_capacity(
1820                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1821                            ),
1822                            config: StateTableConfig::default(),
1823                            entity_name: entity_name_owned,
1824                            recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1825                                NonZeroUsize::new(1000).unwrap(),
1826                            )),
1827                            deferred_when_ops: DashMap::new(),
1828                        });
1829                    let key_value = self.registers[*key].clone();
1830                    let warn_null_key = key_value.is_null()
1831                        && event_type.ends_with("State")
1832                        && !event_type.ends_with("IxState");
1833
1834                    if warn_null_key {
1835                        self.add_warning(format!(
1836                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1837                            key, event_type
1838                        ));
1839                    }
1840
1841                    let state = self
1842                        .states
1843                        .get(&actual_state_id)
1844                        .ok_or("State table not found")?;
1845
1846                    if !key_value.is_null() {
1847                        if let Some(ctx) = &self.current_context {
1848                            // Account updates: use recency check to discard stale updates
1849                            if ctx.is_account_update() {
1850                                if let (Some(slot), Some(write_version)) =
1851                                    (ctx.slot, ctx.write_version)
1852                                {
1853                                    if !state.is_fresh_update(
1854                                        &key_value,
1855                                        event_type,
1856                                        slot,
1857                                        write_version,
1858                                    ) {
1859                                        self.add_warning(format!(
1860                                            "Stale account update skipped: slot={}, write_version={}",
1861                                            slot, write_version
1862                                        ));
1863                                        return Ok(Vec::new());
1864                                    }
1865                                }
1866                            }
1867                            // Instruction updates: process all, but skip exact duplicates
1868                            else if ctx.is_instruction_update() {
1869                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1870                                    if state.is_duplicate_instruction(
1871                                        &key_value, event_type, slot, txn_index,
1872                                    ) {
1873                                        self.add_warning(format!(
1874                                            "Duplicate instruction skipped: slot={}, txn_index={}",
1875                                            slot, txn_index
1876                                        ));
1877                                        return Ok(Vec::new());
1878                                    }
1879                                }
1880                            }
1881                        }
1882                    }
1883                    let value = state
1884                        .get_and_touch(&key_value)
1885                        .unwrap_or_else(|| default.clone());
1886
1887                    self.registers[*dest] = value;
1888                    pc += 1;
1889                }
1890                OpCode::UpdateState {
1891                    state_id: _,
1892                    key,
1893                    value,
1894                } => {
1895                    let actual_state_id = override_state_id;
1896                    let state = self
1897                        .states
1898                        .get(&actual_state_id)
1899                        .ok_or("State table not found")?;
1900                    let key_value = self.registers[*key].clone();
1901                    let value_data = self.registers[*value].clone();
1902
1903                    state.insert_with_eviction(key_value, value_data);
1904                    pc += 1;
1905                }
1906                OpCode::AppendToArray {
1907                    object,
1908                    path,
1909                    value,
1910                } => {
1911                    let appended_value = self.registers[*value].clone();
1912                    let max_len = self
1913                        .states
1914                        .get(&override_state_id)
1915                        .map(|s| s.max_array_length())
1916                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1917                    self.append_to_array(*object, path, *value, max_len)?;
1918                    if should_emit(path) {
1919                        dirty_tracker.mark_appended(path, appended_value);
1920                    }
1921                    pc += 1;
1922                }
1923                OpCode::GetCurrentTimestamp { dest } => {
1924                    let timestamp = std::time::SystemTime::now()
1925                        .duration_since(std::time::UNIX_EPOCH)
1926                        .unwrap()
1927                        .as_secs() as i64;
1928                    self.registers[*dest] = json!(timestamp);
1929                    pc += 1;
1930                }
1931                OpCode::CreateEvent { dest, event_value } => {
1932                    let timestamp = std::time::SystemTime::now()
1933                        .duration_since(std::time::UNIX_EPOCH)
1934                        .unwrap()
1935                        .as_secs() as i64;
1936
1937                    // Filter out __update_context from the event data
1938                    let mut event_data = self.registers[*event_value].clone();
1939                    if let Some(obj) = event_data.as_object_mut() {
1940                        obj.remove("__update_context");
1941                    }
1942
1943                    // Create event with timestamp, data, and optional slot/signature from context
1944                    let mut event = serde_json::Map::new();
1945                    event.insert("timestamp".to_string(), json!(timestamp));
1946                    event.insert("data".to_string(), event_data);
1947
1948                    // Add slot and signature if available from current context
1949                    if let Some(ref ctx) = self.current_context {
1950                        if let Some(slot) = ctx.slot {
1951                            event.insert("slot".to_string(), json!(slot));
1952                        }
1953                        if let Some(ref signature) = ctx.signature {
1954                            event.insert("signature".to_string(), json!(signature));
1955                        }
1956                    }
1957
1958                    self.registers[*dest] = Value::Object(event);
1959                    pc += 1;
1960                }
1961                OpCode::CreateCapture {
1962                    dest,
1963                    capture_value,
1964                } => {
1965                    let timestamp = std::time::SystemTime::now()
1966                        .duration_since(std::time::UNIX_EPOCH)
1967                        .unwrap()
1968                        .as_secs() as i64;
1969
1970                    // Get the capture data (already filtered by load_field)
1971                    let capture_data = self.registers[*capture_value].clone();
1972
1973                    // Extract account_address from the original event if available
1974                    let account_address = event_value
1975                        .get("__account_address")
1976                        .and_then(|v| v.as_str())
1977                        .unwrap_or("")
1978                        .to_string();
1979
1980                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1981                    let mut capture = serde_json::Map::new();
1982                    capture.insert("timestamp".to_string(), json!(timestamp));
1983                    capture.insert("account_address".to_string(), json!(account_address));
1984                    capture.insert("data".to_string(), capture_data);
1985
1986                    // Add slot and signature if available from current context
1987                    if let Some(ref ctx) = self.current_context {
1988                        if let Some(slot) = ctx.slot {
1989                            capture.insert("slot".to_string(), json!(slot));
1990                        }
1991                        if let Some(ref signature) = ctx.signature {
1992                            capture.insert("signature".to_string(), json!(signature));
1993                        }
1994                    }
1995
1996                    self.registers[*dest] = Value::Object(capture);
1997                    pc += 1;
1998                }
1999                OpCode::Transform {
2000                    source,
2001                    dest,
2002                    transformation,
2003                } => {
2004                    if source == dest {
2005                        self.transform_in_place(*source, transformation)?;
2006                    } else {
2007                        let source_value = &self.registers[*source];
2008                        let value = Self::apply_transformation(source_value, transformation)?;
2009                        self.registers[*dest] = value;
2010                    }
2011                    pc += 1;
2012                }
2013                OpCode::EmitMutation {
2014                    entity_name,
2015                    key,
2016                    state,
2017                } => {
2018                    let primary_key = self.registers[*key].clone();
2019
2020                    if primary_key.is_null() || dirty_tracker.is_empty() {
2021                        let reason = if dirty_tracker.is_empty() {
2022                            "no_fields_modified"
2023                        } else {
2024                            "null_primary_key"
2025                        };
2026                        self.add_warning(format!(
2027                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
2028                            entity_name,
2029                            reason,
2030                            dirty_tracker.len()
2031                        ));
2032                    } else {
2033                        let patch =
2034                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2035
2036                        let append = dirty_tracker.appended_paths();
2037                        let mutation = Mutation {
2038                            export: entity_name.clone(),
2039                            key: primary_key,
2040                            patch,
2041                            append,
2042                        };
2043                        output.push(mutation);
2044                    }
2045                    pc += 1;
2046                }
2047                OpCode::SetFieldIfNull {
2048                    object,
2049                    path,
2050                    value,
2051                } => {
2052                    let was_set = self.set_field_if_null(*object, path, *value)?;
2053                    if was_set && should_emit(path) {
2054                        dirty_tracker.mark_replaced(path);
2055                    }
2056                    pc += 1;
2057                }
2058                OpCode::SetFieldMax {
2059                    object,
2060                    path,
2061                    value,
2062                } => {
2063                    let was_updated = self.set_field_max(*object, path, *value)?;
2064                    if was_updated && should_emit(path) {
2065                        dirty_tracker.mark_replaced(path);
2066                    }
2067                    pc += 1;
2068                }
2069                OpCode::UpdateTemporalIndex {
2070                    state_id: _,
2071                    index_name,
2072                    lookup_value,
2073                    primary_key,
2074                    timestamp,
2075                } => {
2076                    let actual_state_id = override_state_id;
2077                    let state = self
2078                        .states
2079                        .get_mut(&actual_state_id)
2080                        .ok_or("State table not found")?;
2081                    let index = state
2082                        .temporal_indexes
2083                        .entry(index_name.clone())
2084                        .or_insert_with(TemporalIndex::new);
2085
2086                    let lookup_val = self.registers[*lookup_value].clone();
2087                    let pk_val = self.registers[*primary_key].clone();
2088                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2089                        val
2090                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
2091                        val as i64
2092                    } else {
2093                        return Err(format!(
2094                            "Timestamp must be a number (i64 or u64), got: {:?}",
2095                            self.registers[*timestamp]
2096                        )
2097                        .into());
2098                    };
2099
2100                    index.insert(lookup_val, pk_val, ts_val);
2101                    pc += 1;
2102                }
2103                OpCode::LookupTemporalIndex {
2104                    state_id: _,
2105                    index_name,
2106                    lookup_value,
2107                    timestamp,
2108                    dest,
2109                } => {
2110                    let actual_state_id = override_state_id;
2111                    let state = self
2112                        .states
2113                        .get(&actual_state_id)
2114                        .ok_or("State table not found")?;
2115                    let lookup_val = &self.registers[*lookup_value];
2116
2117                    let result = if self.registers[*timestamp].is_null() {
2118                        if let Some(index) = state.temporal_indexes.get(index_name) {
2119                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2120                        } else {
2121                            Value::Null
2122                        }
2123                    } else {
2124                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2125                            val
2126                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
2127                            val as i64
2128                        } else {
2129                            return Err(format!(
2130                                "Timestamp must be a number (i64 or u64), got: {:?}",
2131                                self.registers[*timestamp]
2132                            )
2133                            .into());
2134                        };
2135
2136                        if let Some(index) = state.temporal_indexes.get(index_name) {
2137                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2138                        } else {
2139                            Value::Null
2140                        }
2141                    };
2142
2143                    self.registers[*dest] = result;
2144                    pc += 1;
2145                }
2146                OpCode::UpdateLookupIndex {
2147                    state_id: _,
2148                    index_name,
2149                    lookup_value,
2150                    primary_key,
2151                } => {
2152                    let actual_state_id = override_state_id;
2153                    let state = self
2154                        .states
2155                        .get_mut(&actual_state_id)
2156                        .ok_or("State table not found")?;
2157                    let index = state
2158                        .lookup_indexes
2159                        .entry(index_name.clone())
2160                        .or_insert_with(LookupIndex::new);
2161
2162                    let lookup_val = self.registers[*lookup_value].clone();
2163                    let pk_val = self.registers[*primary_key].clone();
2164
2165                    index.insert(lookup_val.clone(), pk_val);
2166
2167                    // Track lookup keys so process_event can flush queued account updates
2168                    if let Some(key_str) = lookup_val.as_str() {
2169                        self.last_lookup_index_keys.push(key_str.to_string());
2170                    }
2171
2172                    pc += 1;
2173                }
2174                OpCode::LookupIndex {
2175                    state_id: _,
2176                    index_name,
2177                    lookup_value,
2178                    dest,
2179                } => {
2180                    let actual_state_id = override_state_id;
2181                    let mut current_value = self.registers[*lookup_value].clone();
2182
2183                    const MAX_CHAIN_DEPTH: usize = 5;
2184                    let mut iterations = 0;
2185
2186                    let final_result = if self.states.contains_key(&actual_state_id) {
2187                        loop {
2188                            iterations += 1;
2189                            if iterations > MAX_CHAIN_DEPTH {
2190                                break current_value;
2191                            }
2192
2193                            let resolved = self
2194                                .states
2195                                .get(&actual_state_id)
2196                                .and_then(|state| {
2197                                    if let Some(index) = state.lookup_indexes.get(index_name) {
2198                                        if let Some(found) = index.lookup(&current_value) {
2199                                            return Some(found);
2200                                        }
2201                                    }
2202
2203                                    for (name, index) in state.lookup_indexes.iter() {
2204                                        if name == index_name {
2205                                            continue;
2206                                        }
2207                                        if let Some(found) = index.lookup(&current_value) {
2208                                            return Some(found);
2209                                        }
2210                                    }
2211
2212                                    None
2213                                })
2214                                .unwrap_or(Value::Null);
2215
2216                            let mut resolved_from_pda = false;
2217                            let resolved = if resolved.is_null() {
2218                                if let Some(pda_str) = current_value.as_str() {
2219                                    resolved_from_pda = true;
2220                                    self.states
2221                                        .get_mut(&actual_state_id)
2222                                        .and_then(|state_mut| {
2223                                            state_mut
2224                                                .pda_reverse_lookups
2225                                                .get_mut("default_pda_lookup")
2226                                        })
2227                                        .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2228                                        .map(Value::String)
2229                                        .unwrap_or(Value::Null)
2230                                } else {
2231                                    Value::Null
2232                                }
2233                            } else {
2234                                resolved
2235                            };
2236
2237                            if resolved.is_null() {
2238                                if iterations == 1 {
2239                                    if let Some(pda_str) = current_value.as_str() {
2240                                        self.last_pda_lookup_miss = Some(pda_str.to_string());
2241                                    }
2242                                }
2243                                break Value::Null;
2244                            }
2245
2246                            let can_chain =
2247                                self.can_resolve_further(&resolved, actual_state_id, index_name);
2248
2249                            if !can_chain {
2250                                if resolved_from_pda {
2251                                    if let Some(resolved_str) = resolved.as_str() {
2252                                        self.last_lookup_index_miss =
2253                                            Some(resolved_str.to_string());
2254                                    }
2255                                    break Value::Null;
2256                                }
2257                                break resolved;
2258                            }
2259
2260                            current_value = resolved;
2261                        }
2262                    } else {
2263                        Value::Null
2264                    };
2265
2266                    self.registers[*dest] = final_result;
2267                    pc += 1;
2268                }
2269                OpCode::SetFieldSum {
2270                    object,
2271                    path,
2272                    value,
2273                } => {
2274                    let was_updated = self.set_field_sum(*object, path, *value)?;
2275                    if was_updated && should_emit(path) {
2276                        dirty_tracker.mark_replaced(path);
2277                    }
2278                    pc += 1;
2279                }
2280                OpCode::SetFieldIncrement { object, path } => {
2281                    let was_updated = self.set_field_increment(*object, path)?;
2282                    if was_updated && should_emit(path) {
2283                        dirty_tracker.mark_replaced(path);
2284                    }
2285                    pc += 1;
2286                }
2287                OpCode::SetFieldMin {
2288                    object,
2289                    path,
2290                    value,
2291                } => {
2292                    let was_updated = self.set_field_min(*object, path, *value)?;
2293                    if was_updated && should_emit(path) {
2294                        dirty_tracker.mark_replaced(path);
2295                    }
2296                    pc += 1;
2297                }
2298                OpCode::AddToUniqueSet {
2299                    state_id: _,
2300                    set_name,
2301                    value,
2302                    count_object,
2303                    count_path,
2304                } => {
2305                    let value_to_add = self.registers[*value].clone();
2306
2307                    // Store the unique set within the entity object, not in the state table
2308                    // This ensures each entity instance has its own unique set
2309                    let set_field_path = format!("__unique_set:{}", set_name);
2310
2311                    // Get or create the unique set from the entity object
2312                    let mut set: HashSet<Value> =
2313                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2314                            if !existing.is_null() {
2315                                serde_json::from_value(existing).unwrap_or_default()
2316                            } else {
2317                                HashSet::new()
2318                            }
2319                        } else {
2320                            HashSet::new()
2321                        };
2322
2323                    // Add value to set
2324                    let was_new = set.insert(value_to_add);
2325
2326                    // Store updated set back in the entity object
2327                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2328                    self.registers[100] = serde_json::to_value(set_as_vec)?;
2329                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2330
2331                    // Update the count field in the object
2332                    if was_new {
2333                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2334                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
2335                        if should_emit(count_path) {
2336                            dirty_tracker.mark_replaced(count_path);
2337                        }
2338                    }
2339
2340                    pc += 1;
2341                }
2342                OpCode::ConditionalSetField {
2343                    object,
2344                    path,
2345                    value,
2346                    condition_field,
2347                    condition_op,
2348                    condition_value,
2349                } => {
2350                    let field_value = self.load_field(event_value, condition_field, None)?;
2351                    let condition_met =
2352                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2353
2354                    if condition_met {
2355                        self.set_field_auto_vivify(*object, path, *value)?;
2356                        if should_emit(path) {
2357                            dirty_tracker.mark_replaced(path);
2358                        }
2359                    }
2360                    pc += 1;
2361                }
2362                OpCode::SetFieldWhen {
2363                    object,
2364                    path,
2365                    value,
2366                    when_instruction,
2367                    entity_name,
2368                    key_reg,
2369                    condition_field,
2370                    condition_op,
2371                    condition_value,
2372                } => {
2373                    let actual_state_id = override_state_id;
2374                    let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2375                        condition_field.as_ref(),
2376                        condition_op.as_ref(),
2377                        condition_value.as_ref(),
2378                    ) {
2379                        let field_value = self.load_field(event_value, field, None)?;
2380                        self.evaluate_comparison(&field_value, op, cond_value)?
2381                    } else {
2382                        true
2383                    };
2384
2385                    if !condition_met {
2386                        pc += 1;
2387                        continue;
2388                    }
2389
2390                    let signature = self
2391                        .current_context
2392                        .as_ref()
2393                        .and_then(|c| c.signature.clone())
2394                        .unwrap_or_default();
2395
2396                    let emit = should_emit(path);
2397
2398                    let instruction_seen = if !signature.is_empty() {
2399                        if let Some(state) = self.states.get(&actual_state_id) {
2400                            let mut cache = state.recent_tx_instructions.lock().unwrap();
2401                            cache
2402                                .get(&signature)
2403                                .map(|set| set.contains(when_instruction))
2404                                .unwrap_or(false)
2405                        } else {
2406                            false
2407                        }
2408                    } else {
2409                        false
2410                    };
2411
2412                    if instruction_seen {
2413                        self.set_field_auto_vivify(*object, path, *value)?;
2414                        if emit {
2415                            dirty_tracker.mark_replaced(path);
2416                        }
2417                    } else if !signature.is_empty() {
2418                        let deferred = DeferredWhenOperation {
2419                            entity_name: entity_name.clone(),
2420                            primary_key: self.registers[*key_reg].clone(),
2421                            field_path: path.clone(),
2422                            field_value: self.registers[*value].clone(),
2423                            when_instruction: when_instruction.clone(),
2424                            signature: signature.clone(),
2425                            slot: self
2426                                .current_context
2427                                .as_ref()
2428                                .and_then(|c| c.slot)
2429                                .unwrap_or(0),
2430                            deferred_at: std::time::SystemTime::now()
2431                                .duration_since(std::time::UNIX_EPOCH)
2432                                .unwrap()
2433                                .as_secs() as i64,
2434                            emit,
2435                        };
2436
2437                        if let Some(state) = self.states.get(&actual_state_id) {
2438                            let key = (signature, when_instruction.clone());
2439                            state
2440                                .deferred_when_ops
2441                                .entry(key)
2442                                .or_insert_with(Vec::new)
2443                                .push(deferred);
2444                        }
2445                    }
2446
2447                    pc += 1;
2448                }
2449                OpCode::SetFieldUnlessStopped {
2450                    object,
2451                    path,
2452                    value,
2453                    stop_field,
2454                    stop_instruction,
2455                    entity_name,
2456                    key_reg: _,
2457                } => {
2458                    let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2459                    let stopped = stop_value.as_bool().unwrap_or(false);
2460
2461                    if stopped {
2462                        tracing::debug!(
2463                            entity = %entity_name,
2464                            field = %path,
2465                            stop_field = %stop_field,
2466                            stop_instruction = %stop_instruction,
2467                            "stop flag set; skipping field update"
2468                        );
2469                        pc += 1;
2470                        continue;
2471                    }
2472
2473                    self.set_field_auto_vivify(*object, path, *value)?;
2474                    if should_emit(path) {
2475                        dirty_tracker.mark_replaced(path);
2476                    }
2477                    pc += 1;
2478                }
2479                OpCode::ConditionalIncrement {
2480                    object,
2481                    path,
2482                    condition_field,
2483                    condition_op,
2484                    condition_value,
2485                } => {
2486                    let field_value = self.load_field(event_value, condition_field, None)?;
2487                    let condition_met =
2488                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2489
2490                    if condition_met {
2491                        let was_updated = self.set_field_increment(*object, path)?;
2492                        if was_updated && should_emit(path) {
2493                            dirty_tracker.mark_replaced(path);
2494                        }
2495                    }
2496                    pc += 1;
2497                }
2498                OpCode::EvaluateComputedFields {
2499                    state,
2500                    computed_paths,
2501                } => {
2502                    if let Some(evaluator) = entity_evaluator {
2503                        let old_values: Vec<_> = computed_paths
2504                            .iter()
2505                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2506                            .collect();
2507
2508                        let state_value = &mut self.registers[*state];
2509                        let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2510                        let context_timestamp = self
2511                            .current_context
2512                            .as_ref()
2513                            .map(|c| c.timestamp())
2514                            .unwrap_or_else(|| {
2515                                std::time::SystemTime::now()
2516                                    .duration_since(std::time::UNIX_EPOCH)
2517                                    .unwrap()
2518                                    .as_secs() as i64
2519                            });
2520                        let eval_result = evaluator(state_value, context_slot, context_timestamp);
2521
2522                        if eval_result.is_ok() {
2523                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2524                                let new_value =
2525                                    Self::get_value_at_path(&self.registers[*state], path);
2526
2527                                if new_value != *old_value && should_emit(path) {
2528                                    dirty_tracker.mark_replaced(path);
2529                                }
2530                            }
2531                        }
2532                    }
2533                    pc += 1;
2534                }
2535                OpCode::QueueResolver {
2536                    state_id: _,
2537                    entity_name,
2538                    resolver,
2539                    input_path,
2540                    input_value,
2541                    strategy,
2542                    extracts,
2543                    state,
2544                    key,
2545                } => {
2546                    let actual_state_id = override_state_id;
2547                    let resolved_input = if let Some(value) = input_value {
2548                        Some(value.clone())
2549                    } else if let Some(path) = input_path.as_ref() {
2550                        Self::get_value_at_path(&self.registers[*state], path)
2551                    } else {
2552                        None
2553                    };
2554
2555                    if let Some(input) = resolved_input {
2556                        let key_value = &self.registers[*key];
2557
2558                        if input.is_null() || key_value.is_null() {
2559                            tracing::warn!(
2560                                entity = %entity_name,
2561                                resolver = ?resolver,
2562                                input_path = %input_path.as_deref().unwrap_or("<literal>"),
2563                                input_is_null = input.is_null(),
2564                                key_is_null = key_value.is_null(),
2565                                input = ?input,
2566                                key = ?key_value,
2567                                "Resolver skipped: null input or key"
2568                            );
2569                            pc += 1;
2570                            continue;
2571                        }
2572
2573                        if matches!(strategy, ResolveStrategy::SetOnce)
2574                            && extracts.iter().all(|extract| {
2575                                match Self::get_value_at_path(
2576                                    &self.registers[*state],
2577                                    &extract.target_path,
2578                                ) {
2579                                    Some(value) => !value.is_null(),
2580                                    None => false,
2581                                }
2582                            })
2583                        {
2584                            pc += 1;
2585                            continue;
2586                        }
2587
2588                        let cache_key = resolver_cache_key(resolver, &input);
2589
2590                        let cached_value = self.resolver_cache.get(&cache_key).cloned();
2591                        if let Some(cached) = cached_value {
2592                            self.resolver_cache_hits += 1;
2593                            Self::apply_resolver_extractions_to_value(
2594                                &mut self.registers[*state],
2595                                &cached,
2596                                extracts,
2597                                &mut dirty_tracker,
2598                                &should_emit,
2599                            )?;
2600                        } else {
2601                            self.resolver_cache_misses += 1;
2602                            let target = ResolverTarget {
2603                                state_id: actual_state_id,
2604                                entity_name: entity_name.clone(),
2605                                primary_key: self.registers[*key].clone(),
2606                                extracts: extracts.clone(),
2607                            };
2608
2609                            self.enqueue_resolver_request(
2610                                cache_key,
2611                                resolver.clone(),
2612                                input,
2613                                target,
2614                            );
2615                        }
2616                    } else {
2617                        tracing::warn!(
2618                            entity = %entity_name,
2619                            resolver = ?resolver,
2620                            input_path = %input_path.as_deref().unwrap_or("<literal>"),
2621                            state = ?self.registers[*state],
2622                            "Resolver skipped: input path not found in state"
2623                        );
2624                    }
2625
2626                    pc += 1;
2627                }
2628                OpCode::UpdatePdaReverseLookup {
2629                    state_id: _,
2630                    lookup_name,
2631                    pda_address,
2632                    primary_key,
2633                } => {
2634                    let actual_state_id = override_state_id;
2635                    let state = self
2636                        .states
2637                        .get_mut(&actual_state_id)
2638                        .ok_or("State table not found")?;
2639
2640                    let pda_val = self.registers[*pda_address].clone();
2641                    let pk_val = self.registers[*primary_key].clone();
2642
2643                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2644                        let pda_lookup = state
2645                            .pda_reverse_lookups
2646                            .entry(lookup_name.clone())
2647                            .or_insert_with(|| {
2648                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2649                            });
2650
2651                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2652                        self.last_pda_registered = Some(pda_str.to_string());
2653                    } else if !pk_val.is_null() {
2654                        if let Some(pk_num) = pk_val.as_u64() {
2655                            if let Some(pda_str) = pda_val.as_str() {
2656                                let pda_lookup = state
2657                                    .pda_reverse_lookups
2658                                    .entry(lookup_name.clone())
2659                                    .or_insert_with(|| {
2660                                        PdaReverseLookup::new(
2661                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2662                                        )
2663                                    });
2664
2665                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2666                                self.last_pda_registered = Some(pda_str.to_string());
2667                            }
2668                        }
2669                    }
2670
2671                    pc += 1;
2672                }
2673            }
2674
2675            self.instructions_executed += 1;
2676        }
2677
2678        Ok(output)
2679    }
2680
2681    fn load_field(
2682        &self,
2683        event_value: &Value,
2684        path: &FieldPath,
2685        default: Option<&Value>,
2686    ) -> Result<Value> {
2687        if path.segments.is_empty() {
2688            if let Some(obj) = event_value.as_object() {
2689                let filtered: serde_json::Map<String, Value> = obj
2690                    .iter()
2691                    .filter(|(k, _)| !k.starts_with("__"))
2692                    .map(|(k, v)| (k.clone(), v.clone()))
2693                    .collect();
2694                return Ok(Value::Object(filtered));
2695            }
2696            return Ok(event_value.clone());
2697        }
2698
2699        let mut current = event_value;
2700        for segment in path.segments.iter() {
2701            current = match current.get(segment) {
2702                Some(v) => v,
2703                None => return Ok(default.cloned().unwrap_or(Value::Null)),
2704            };
2705        }
2706
2707        Ok(current.clone())
2708    }
2709
2710    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
2711        let mut current = value;
2712        for segment in path.split('.') {
2713            current = current.get(segment)?;
2714        }
2715        Some(current.clone())
2716    }
2717
2718    fn set_field_auto_vivify(
2719        &mut self,
2720        object_reg: Register,
2721        path: &str,
2722        value_reg: Register,
2723    ) -> Result<()> {
2724        let compiled = self.get_compiled_path(path);
2725        let segments = compiled.segments();
2726        let value = self.registers[value_reg].clone();
2727
2728        if !self.registers[object_reg].is_object() {
2729            self.registers[object_reg] = json!({});
2730        }
2731
2732        let obj = self.registers[object_reg]
2733            .as_object_mut()
2734            .ok_or("Not an object")?;
2735
2736        let mut current = obj;
2737        for (i, segment) in segments.iter().enumerate() {
2738            if i == segments.len() - 1 {
2739                current.insert(segment.to_string(), value);
2740                return Ok(());
2741            } else {
2742                current
2743                    .entry(segment.to_string())
2744                    .or_insert_with(|| json!({}));
2745                current = current
2746                    .get_mut(segment)
2747                    .and_then(|v| v.as_object_mut())
2748                    .ok_or("Path collision: expected object")?;
2749            }
2750        }
2751
2752        Ok(())
2753    }
2754
2755    fn set_field_if_null(
2756        &mut self,
2757        object_reg: Register,
2758        path: &str,
2759        value_reg: Register,
2760    ) -> Result<bool> {
2761        let compiled = self.get_compiled_path(path);
2762        let segments = compiled.segments();
2763        let value = self.registers[value_reg].clone();
2764
2765        // SetOnce should only set meaningful values. A null source typically means
2766        // the field doesn't exist in this event type (e.g., instruction events don't
2767        // have account data). Skip to preserve any existing value.
2768        if value.is_null() {
2769            return Ok(false);
2770        }
2771
2772        if !self.registers[object_reg].is_object() {
2773            self.registers[object_reg] = json!({});
2774        }
2775
2776        let obj = self.registers[object_reg]
2777            .as_object_mut()
2778            .ok_or("Not an object")?;
2779
2780        let mut current = obj;
2781        for (i, segment) in segments.iter().enumerate() {
2782            if i == segments.len() - 1 {
2783                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2784                    current.insert(segment.to_string(), value);
2785                    return Ok(true);
2786                }
2787                return Ok(false);
2788            } else {
2789                current
2790                    .entry(segment.to_string())
2791                    .or_insert_with(|| json!({}));
2792                current = current
2793                    .get_mut(segment)
2794                    .and_then(|v| v.as_object_mut())
2795                    .ok_or("Path collision: expected object")?;
2796            }
2797        }
2798
2799        Ok(false)
2800    }
2801
2802    fn set_field_max(
2803        &mut self,
2804        object_reg: Register,
2805        path: &str,
2806        value_reg: Register,
2807    ) -> Result<bool> {
2808        let compiled = self.get_compiled_path(path);
2809        let segments = compiled.segments();
2810        let new_value = self.registers[value_reg].clone();
2811
2812        if !self.registers[object_reg].is_object() {
2813            self.registers[object_reg] = json!({});
2814        }
2815
2816        let obj = self.registers[object_reg]
2817            .as_object_mut()
2818            .ok_or("Not an object")?;
2819
2820        let mut current = obj;
2821        for (i, segment) in segments.iter().enumerate() {
2822            if i == segments.len() - 1 {
2823                let should_update = if let Some(current_value) = current.get(segment) {
2824                    if current_value.is_null() {
2825                        true
2826                    } else {
2827                        match (current_value.as_i64(), new_value.as_i64()) {
2828                            (Some(current_val), Some(new_val)) => new_val > current_val,
2829                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2830                                new_value.as_u64().unwrap() as i64 > current_val
2831                            }
2832                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2833                                new_val > current_value.as_u64().unwrap() as i64
2834                            }
2835                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2836                                (Some(current_val), Some(new_val)) => new_val > current_val,
2837                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2838                                    (Some(current_val), Some(new_val)) => new_val > current_val,
2839                                    _ => false,
2840                                },
2841                            },
2842                            _ => false,
2843                        }
2844                    }
2845                } else {
2846                    true
2847                };
2848
2849                if should_update {
2850                    current.insert(segment.to_string(), new_value);
2851                    return Ok(true);
2852                }
2853                return Ok(false);
2854            } else {
2855                current
2856                    .entry(segment.to_string())
2857                    .or_insert_with(|| json!({}));
2858                current = current
2859                    .get_mut(segment)
2860                    .and_then(|v| v.as_object_mut())
2861                    .ok_or("Path collision: expected object")?;
2862            }
2863        }
2864
2865        Ok(false)
2866    }
2867
2868    fn set_field_sum(
2869        &mut self,
2870        object_reg: Register,
2871        path: &str,
2872        value_reg: Register,
2873    ) -> Result<bool> {
2874        let compiled = self.get_compiled_path(path);
2875        let segments = compiled.segments();
2876        let new_value = &self.registers[value_reg];
2877
2878        // Extract numeric value before borrowing object_reg mutably
2879        let new_val_num = new_value
2880            .as_i64()
2881            .or_else(|| new_value.as_u64().map(|n| n as i64))
2882            .ok_or("Sum requires numeric value")?;
2883
2884        if !self.registers[object_reg].is_object() {
2885            self.registers[object_reg] = json!({});
2886        }
2887
2888        let obj = self.registers[object_reg]
2889            .as_object_mut()
2890            .ok_or("Not an object")?;
2891
2892        let mut current = obj;
2893        for (i, segment) in segments.iter().enumerate() {
2894            if i == segments.len() - 1 {
2895                let current_val = current
2896                    .get(segment)
2897                    .and_then(|v| {
2898                        if v.is_null() {
2899                            None
2900                        } else {
2901                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2902                        }
2903                    })
2904                    .unwrap_or(0);
2905
2906                let sum = current_val + new_val_num;
2907                current.insert(segment.to_string(), json!(sum));
2908                return Ok(true);
2909            } else {
2910                current
2911                    .entry(segment.to_string())
2912                    .or_insert_with(|| json!({}));
2913                current = current
2914                    .get_mut(segment)
2915                    .and_then(|v| v.as_object_mut())
2916                    .ok_or("Path collision: expected object")?;
2917            }
2918        }
2919
2920        Ok(false)
2921    }
2922
2923    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2924        let compiled = self.get_compiled_path(path);
2925        let segments = compiled.segments();
2926
2927        if !self.registers[object_reg].is_object() {
2928            self.registers[object_reg] = json!({});
2929        }
2930
2931        let obj = self.registers[object_reg]
2932            .as_object_mut()
2933            .ok_or("Not an object")?;
2934
2935        let mut current = obj;
2936        for (i, segment) in segments.iter().enumerate() {
2937            if i == segments.len() - 1 {
2938                // Get current value (default to 0 if null/missing)
2939                let current_val = current
2940                    .get(segment)
2941                    .and_then(|v| {
2942                        if v.is_null() {
2943                            None
2944                        } else {
2945                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2946                        }
2947                    })
2948                    .unwrap_or(0);
2949
2950                let incremented = current_val + 1;
2951                current.insert(segment.to_string(), json!(incremented));
2952                return Ok(true);
2953            } else {
2954                current
2955                    .entry(segment.to_string())
2956                    .or_insert_with(|| json!({}));
2957                current = current
2958                    .get_mut(segment)
2959                    .and_then(|v| v.as_object_mut())
2960                    .ok_or("Path collision: expected object")?;
2961            }
2962        }
2963
2964        Ok(false)
2965    }
2966
2967    fn set_field_min(
2968        &mut self,
2969        object_reg: Register,
2970        path: &str,
2971        value_reg: Register,
2972    ) -> Result<bool> {
2973        let compiled = self.get_compiled_path(path);
2974        let segments = compiled.segments();
2975        let new_value = self.registers[value_reg].clone();
2976
2977        if !self.registers[object_reg].is_object() {
2978            self.registers[object_reg] = json!({});
2979        }
2980
2981        let obj = self.registers[object_reg]
2982            .as_object_mut()
2983            .ok_or("Not an object")?;
2984
2985        let mut current = obj;
2986        for (i, segment) in segments.iter().enumerate() {
2987            if i == segments.len() - 1 {
2988                let should_update = if let Some(current_value) = current.get(segment) {
2989                    if current_value.is_null() {
2990                        true
2991                    } else {
2992                        match (current_value.as_i64(), new_value.as_i64()) {
2993                            (Some(current_val), Some(new_val)) => new_val < current_val,
2994                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2995                                (new_value.as_u64().unwrap() as i64) < current_val
2996                            }
2997                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2998                                new_val < current_value.as_u64().unwrap() as i64
2999                            }
3000                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3001                                (Some(current_val), Some(new_val)) => new_val < current_val,
3002                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3003                                    (Some(current_val), Some(new_val)) => new_val < current_val,
3004                                    _ => false,
3005                                },
3006                            },
3007                            _ => false,
3008                        }
3009                    }
3010                } else {
3011                    true
3012                };
3013
3014                if should_update {
3015                    current.insert(segment.to_string(), new_value);
3016                    return Ok(true);
3017                }
3018                return Ok(false);
3019            } else {
3020                current
3021                    .entry(segment.to_string())
3022                    .or_insert_with(|| json!({}));
3023                current = current
3024                    .get_mut(segment)
3025                    .and_then(|v| v.as_object_mut())
3026                    .ok_or("Path collision: expected object")?;
3027            }
3028        }
3029
3030        Ok(false)
3031    }
3032
3033    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3034        let compiled = self.get_compiled_path(path);
3035        let segments = compiled.segments();
3036        let mut current = &self.registers[object_reg];
3037
3038        for segment in segments {
3039            current = current
3040                .get(segment)
3041                .ok_or_else(|| format!("Field not found: {}", segment))?;
3042        }
3043
3044        Ok(current.clone())
3045    }
3046
3047    fn append_to_array(
3048        &mut self,
3049        object_reg: Register,
3050        path: &str,
3051        value_reg: Register,
3052        max_length: usize,
3053    ) -> Result<()> {
3054        let compiled = self.get_compiled_path(path);
3055        let segments = compiled.segments();
3056        let value = self.registers[value_reg].clone();
3057
3058        if !self.registers[object_reg].is_object() {
3059            self.registers[object_reg] = json!({});
3060        }
3061
3062        let obj = self.registers[object_reg]
3063            .as_object_mut()
3064            .ok_or("Not an object")?;
3065
3066        let mut current = obj;
3067        for (i, segment) in segments.iter().enumerate() {
3068            if i == segments.len() - 1 {
3069                current
3070                    .entry(segment.to_string())
3071                    .or_insert_with(|| json!([]));
3072                let arr = current
3073                    .get_mut(segment)
3074                    .and_then(|v| v.as_array_mut())
3075                    .ok_or("Path is not an array")?;
3076                arr.push(value.clone());
3077
3078                if arr.len() > max_length {
3079                    let excess = arr.len() - max_length;
3080                    arr.drain(0..excess);
3081                }
3082            } else {
3083                current
3084                    .entry(segment.to_string())
3085                    .or_insert_with(|| json!({}));
3086                current = current
3087                    .get_mut(segment)
3088                    .and_then(|v| v.as_object_mut())
3089                    .ok_or("Path collision: expected object")?;
3090            }
3091        }
3092
3093        Ok(())
3094    }
3095
3096    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3097        let value = &self.registers[reg];
3098        let transformed = Self::apply_transformation(value, transformation)?;
3099        self.registers[reg] = transformed;
3100        Ok(())
3101    }
3102
3103    fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3104        match transformation {
3105            Transformation::HexEncode => {
3106                if let Some(arr) = value.as_array() {
3107                    let bytes: Vec<u8> = arr
3108                        .iter()
3109                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3110                        .collect();
3111                    let hex = hex::encode(&bytes);
3112                    Ok(json!(hex))
3113                } else if value.is_string() {
3114                    Ok(value.clone())
3115                } else {
3116                    Err("HexEncode requires an array of numbers".into())
3117                }
3118            }
3119            Transformation::HexDecode => {
3120                if let Some(s) = value.as_str() {
3121                    let s = s.strip_prefix("0x").unwrap_or(s);
3122                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3123                    Ok(json!(bytes))
3124                } else {
3125                    Err("HexDecode requires a string".into())
3126                }
3127            }
3128            Transformation::Base58Encode => {
3129                if let Some(arr) = value.as_array() {
3130                    let bytes: Vec<u8> = arr
3131                        .iter()
3132                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3133                        .collect();
3134                    let encoded = bs58::encode(&bytes).into_string();
3135                    Ok(json!(encoded))
3136                } else if value.is_string() {
3137                    Ok(value.clone())
3138                } else {
3139                    Err("Base58Encode requires an array of numbers".into())
3140                }
3141            }
3142            Transformation::Base58Decode => {
3143                if let Some(s) = value.as_str() {
3144                    let bytes = bs58::decode(s)
3145                        .into_vec()
3146                        .map_err(|e| format!("Base58 decode error: {}", e))?;
3147                    Ok(json!(bytes))
3148                } else {
3149                    Err("Base58Decode requires a string".into())
3150                }
3151            }
3152            Transformation::ToString => Ok(json!(value.to_string())),
3153            Transformation::ToNumber => {
3154                if let Some(s) = value.as_str() {
3155                    let n = s
3156                        .parse::<i64>()
3157                        .map_err(|e| format!("Parse error: {}", e))?;
3158                    Ok(json!(n))
3159                } else {
3160                    Ok(value.clone())
3161                }
3162            }
3163        }
3164    }
3165
3166    fn evaluate_comparison(
3167        &self,
3168        field_value: &Value,
3169        op: &ComparisonOp,
3170        condition_value: &Value,
3171    ) -> Result<bool> {
3172        use ComparisonOp::*;
3173
3174        match op {
3175            Equal => Ok(field_value == condition_value),
3176            NotEqual => Ok(field_value != condition_value),
3177            GreaterThan => {
3178                // Try to compare as numbers
3179                match (field_value.as_i64(), condition_value.as_i64()) {
3180                    (Some(a), Some(b)) => Ok(a > b),
3181                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
3182                        (Some(a), Some(b)) => Ok(a > b),
3183                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
3184                            (Some(a), Some(b)) => Ok(a > b),
3185                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3186                        },
3187                    },
3188                }
3189            }
3190            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3191                (Some(a), Some(b)) => Ok(a >= b),
3192                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3193                    (Some(a), Some(b)) => Ok(a >= b),
3194                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3195                        (Some(a), Some(b)) => Ok(a >= b),
3196                        _ => {
3197                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3198                        }
3199                    },
3200                },
3201            },
3202            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3203                (Some(a), Some(b)) => Ok(a < b),
3204                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3205                    (Some(a), Some(b)) => Ok(a < b),
3206                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3207                        (Some(a), Some(b)) => Ok(a < b),
3208                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
3209                    },
3210                },
3211            },
3212            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3213                (Some(a), Some(b)) => Ok(a <= b),
3214                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3215                    (Some(a), Some(b)) => Ok(a <= b),
3216                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3217                        (Some(a), Some(b)) => Ok(a <= b),
3218                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3219                    },
3220                },
3221            },
3222        }
3223    }
3224
3225    fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3226        if let Some(state) = self.states.get(&state_id) {
3227            if let Some(index) = state.lookup_indexes.get(index_name) {
3228                if index.lookup(value).is_some() {
3229                    return true;
3230                }
3231            }
3232
3233            for (name, index) in state.lookup_indexes.iter() {
3234                if name == index_name {
3235                    continue;
3236                }
3237                if index.lookup(value).is_some() {
3238                    return true;
3239                }
3240            }
3241
3242            if let Some(pda_str) = value.as_str() {
3243                if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3244                    if pda_lookup.contains(pda_str) {
3245                        return true;
3246                    }
3247                }
3248            }
3249        }
3250
3251        false
3252    }
3253
3254    fn apply_deferred_when_op(
3255        &mut self,
3256        state_id: u32,
3257        op: &DeferredWhenOperation,
3258    ) -> Result<Vec<Mutation>> {
3259        let state = self.states.get(&state_id).ok_or("State not found")?;
3260
3261        if op.primary_key.is_null() {
3262            return Ok(vec![]);
3263        }
3264
3265        let mut entity_state = state
3266            .get_and_touch(&op.primary_key)
3267            .unwrap_or_else(|| json!({}));
3268
3269        Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3270
3271        state.insert_with_eviction(op.primary_key.clone(), entity_state);
3272
3273        if !op.emit {
3274            return Ok(vec![]);
3275        }
3276
3277        let mut patch = json!({});
3278        Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3279
3280        Ok(vec![Mutation {
3281            export: op.entity_name.clone(),
3282            key: op.primary_key.clone(),
3283            patch,
3284            append: vec![],
3285        }])
3286    }
3287
3288    fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3289        let parts: Vec<&str> = path.split('.').collect();
3290        let mut current = obj;
3291
3292        for (i, part) in parts.iter().enumerate() {
3293            if i == parts.len() - 1 {
3294                if let Some(map) = current.as_object_mut() {
3295                    map.insert(part.to_string(), value);
3296                    return Ok(());
3297                }
3298                return Err("Cannot set field on non-object".into());
3299            }
3300
3301            if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3302                if let Some(map) = current.as_object_mut() {
3303                    map.insert(part.to_string(), json!({}));
3304                }
3305            }
3306
3307            current = current.get_mut(*part).ok_or("Path navigation failed")?;
3308        }
3309
3310        Ok(())
3311    }
3312
3313    pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3314        let now = std::time::SystemTime::now()
3315            .duration_since(std::time::UNIX_EPOCH)
3316            .unwrap()
3317            .as_secs() as i64;
3318
3319        let state = match self.states.get(&state_id) {
3320            Some(s) => s,
3321            None => return 0,
3322        };
3323
3324        let mut removed = 0;
3325        state.deferred_when_ops.retain(|_, ops| {
3326            let before = ops.len();
3327            ops.retain(|op| now - op.deferred_at < max_age_secs);
3328            removed += before - ops.len();
3329            !ops.is_empty()
3330        });
3331
3332        removed
3333    }
3334
3335    /// Update a PDA reverse lookup and return pending updates for reprocessing.
3336    /// Returns any pending account updates that were queued for this PDA.
3337    /// ```ignore
3338    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3339    /// for update in pending {
3340    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
3341    /// }
3342    /// ```
3343    #[cfg_attr(feature = "otel", instrument(
3344        name = "vm.update_pda_lookup",
3345        skip(self),
3346        fields(
3347            pda = %pda_address,
3348            seed = %seed_value,
3349        )
3350    ))]
3351    pub fn update_pda_reverse_lookup(
3352        &mut self,
3353        state_id: u32,
3354        lookup_name: &str,
3355        pda_address: String,
3356        seed_value: String,
3357    ) -> Result<Vec<PendingAccountUpdate>> {
3358        let state = self
3359            .states
3360            .get_mut(&state_id)
3361            .ok_or("State table not found")?;
3362
3363        let lookup = state
3364            .pda_reverse_lookups
3365            .entry(lookup_name.to_string())
3366            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3367
3368        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
3369
3370        if let Some(ref evicted) = evicted_pda {
3371            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3372                let count = evicted_updates.len();
3373                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3374            }
3375        }
3376
3377        // Flush and return pending updates for this PDA
3378        self.flush_pending_updates(state_id, &pda_address)
3379    }
3380
3381    /// Clean up expired pending updates that are older than the TTL
3382    ///
3383    /// Returns the number of updates that were removed.
3384    /// This should be called periodically to prevent memory leaks from orphaned updates.
3385    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3386        let state = match self.states.get_mut(&state_id) {
3387            Some(s) => s,
3388            None => return 0,
3389        };
3390
3391        let now = std::time::SystemTime::now()
3392            .duration_since(std::time::UNIX_EPOCH)
3393            .unwrap()
3394            .as_secs() as i64;
3395
3396        let mut removed_count = 0;
3397
3398        // Iterate through all pending updates and remove expired ones
3399        state.pending_updates.retain(|_pda_address, updates| {
3400            let original_len = updates.len();
3401
3402            updates.retain(|update| {
3403                let age = now - update.queued_at;
3404                age <= PENDING_UPDATE_TTL_SECONDS
3405            });
3406
3407            removed_count += original_len - updates.len();
3408
3409            // Remove the entry entirely if no updates remain
3410            !updates.is_empty()
3411        });
3412
3413        // Update the global counter
3414        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3415
3416        if removed_count > 0 {
3417            #[cfg(feature = "otel")]
3418            crate::vm_metrics::record_pending_updates_expired(
3419                removed_count as u64,
3420                &state.entity_name,
3421            );
3422        }
3423
3424        removed_count
3425    }
3426
3427    /// Queue an account update for later processing when PDA reverse lookup is not yet available
3428    ///
3429    /// # Workflow
3430    ///
3431    /// This implements a deferred processing pattern for account updates when the PDA reverse
3432    /// lookup needed to resolve the primary key is not yet available:
3433    ///
3434    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
3435    ///    is not available, call `queue_account_update()` to queue it for later.
3436    ///
3437    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
3438    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
3439    ///
3440    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
3441    ///    ```ignore
3442    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3443    ///    for update in pending {
3444    ///        let mutations = vm.process_event(
3445    ///            &bytecode, update.account_data, &update.account_type, None, None
3446    ///        )?;
3447    ///    }
3448    ///    ```
3449    ///
3450    /// # Arguments
3451    ///
3452    /// * `state_id` - The state table ID
3453    /// * `pda_address` - The PDA address that needs reverse lookup
3454    /// * `account_type` - The event type name for reprocessing
3455    /// * `account_data` - The account data (event value) for reprocessing
3456    /// * `slot` - The slot number when this update occurred
3457    /// * `signature` - The transaction signature
3458    #[cfg_attr(feature = "otel", instrument(
3459        name = "vm.queue_account_update",
3460        skip(self, update),
3461        fields(
3462            pda = %update.pda_address,
3463            account_type = %update.account_type,
3464            slot = update.slot,
3465        )
3466    ))]
3467    pub fn queue_account_update(
3468        &mut self,
3469        state_id: u32,
3470        update: QueuedAccountUpdate,
3471    ) -> Result<()> {
3472        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3473            self.cleanup_expired_pending_updates(state_id);
3474            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3475                self.drop_oldest_pending_update(state_id)?;
3476            }
3477        }
3478
3479        let state = self
3480            .states
3481            .get_mut(&state_id)
3482            .ok_or("State table not found")?;
3483
3484        let pending = PendingAccountUpdate {
3485            account_type: update.account_type,
3486            pda_address: update.pda_address.clone(),
3487            account_data: update.account_data,
3488            slot: update.slot,
3489            write_version: update.write_version,
3490            signature: update.signature,
3491            queued_at: std::time::SystemTime::now()
3492                .duration_since(std::time::UNIX_EPOCH)
3493                .unwrap()
3494                .as_secs() as i64,
3495        };
3496
3497        let pda_address = pending.pda_address.clone();
3498        let slot = pending.slot;
3499
3500        let mut updates = state
3501            .pending_updates
3502            .entry(pda_address.clone())
3503            .or_insert_with(Vec::new);
3504
3505        let original_len = updates.len();
3506        updates.retain(|existing| existing.slot > slot);
3507        let removed_by_dedup = original_len - updates.len();
3508
3509        if removed_by_dedup > 0 {
3510            self.pending_queue_size = self
3511                .pending_queue_size
3512                .saturating_sub(removed_by_dedup as u64);
3513        }
3514
3515        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3516            updates.remove(0);
3517            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3518        }
3519
3520        updates.push(pending);
3521        #[cfg(feature = "otel")]
3522        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3523
3524        Ok(())
3525    }
3526
3527    pub fn queue_instruction_event(
3528        &mut self,
3529        state_id: u32,
3530        event: QueuedInstructionEvent,
3531    ) -> Result<()> {
3532        let state = self
3533            .states
3534            .get_mut(&state_id)
3535            .ok_or("State table not found")?;
3536
3537        let pda_address = event.pda_address.clone();
3538
3539        let pending = PendingInstructionEvent {
3540            event_type: event.event_type,
3541            pda_address: event.pda_address,
3542            event_data: event.event_data,
3543            slot: event.slot,
3544            signature: event.signature,
3545            queued_at: std::time::SystemTime::now()
3546                .duration_since(std::time::UNIX_EPOCH)
3547                .unwrap()
3548                .as_secs() as i64,
3549        };
3550
3551        let mut events = state
3552            .pending_instruction_events
3553            .entry(pda_address)
3554            .or_insert_with(Vec::new);
3555
3556        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3557            events.remove(0);
3558        }
3559
3560        events.push(pending);
3561
3562        Ok(())
3563    }
3564
3565    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
3566        self.last_pda_lookup_miss.take()
3567    }
3568
3569    pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
3570        self.last_lookup_index_miss.take()
3571    }
3572
3573    pub fn take_last_pda_registered(&mut self) -> Option<String> {
3574        self.last_pda_registered.take()
3575    }
3576
3577    pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
3578        std::mem::take(&mut self.last_lookup_index_keys)
3579    }
3580
3581    pub fn flush_pending_instruction_events(
3582        &mut self,
3583        state_id: u32,
3584        pda_address: &str,
3585    ) -> Vec<PendingInstructionEvent> {
3586        let state = match self.states.get_mut(&state_id) {
3587            Some(s) => s,
3588            None => return Vec::new(),
3589        };
3590
3591        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
3592            events
3593        } else {
3594            Vec::new()
3595        }
3596    }
3597
3598    /// Get statistics about the pending queue for monitoring
3599    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
3600        let state = self.states.get(&state_id)?;
3601
3602        let now = std::time::SystemTime::now()
3603            .duration_since(std::time::UNIX_EPOCH)
3604            .unwrap()
3605            .as_secs() as i64;
3606
3607        let mut total_updates = 0;
3608        let mut oldest_timestamp = now;
3609        let mut largest_pda_queue = 0;
3610        let mut estimated_memory = 0;
3611
3612        for entry in state.pending_updates.iter() {
3613            let (_, updates) = entry.pair();
3614            total_updates += updates.len();
3615            largest_pda_queue = largest_pda_queue.max(updates.len());
3616
3617            for update in updates.iter() {
3618                oldest_timestamp = oldest_timestamp.min(update.queued_at);
3619                // Rough memory estimate
3620                estimated_memory += update.account_type.len() +
3621                                   update.pda_address.len() +
3622                                   update.signature.len() +
3623                                   16 + // slot + queued_at
3624                                   estimate_json_size(&update.account_data);
3625            }
3626        }
3627
3628        Some(PendingQueueStats {
3629            total_updates,
3630            unique_pdas: state.pending_updates.len(),
3631            oldest_age_seconds: now - oldest_timestamp,
3632            largest_pda_queue_size: largest_pda_queue,
3633            estimated_memory_bytes: estimated_memory,
3634        })
3635    }
3636
3637    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
3638        let mut stats = VmMemoryStats {
3639            path_cache_size: self.path_cache.len(),
3640            ..Default::default()
3641        };
3642
3643        if let Some(state) = self.states.get(&state_id) {
3644            stats.state_table_entity_count = state.data.len();
3645            stats.state_table_max_entries = state.config.max_entries;
3646            stats.state_table_at_capacity = state.is_at_capacity();
3647
3648            stats.lookup_index_count = state.lookup_indexes.len();
3649            stats.lookup_index_total_entries =
3650                state.lookup_indexes.values().map(|idx| idx.len()).sum();
3651
3652            stats.temporal_index_count = state.temporal_indexes.len();
3653            stats.temporal_index_total_entries = state
3654                .temporal_indexes
3655                .values()
3656                .map(|idx| idx.total_entries())
3657                .sum();
3658
3659            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
3660            stats.pda_reverse_lookup_total_entries = state
3661                .pda_reverse_lookups
3662                .values()
3663                .map(|lookup| lookup.len())
3664                .sum();
3665
3666            stats.version_tracker_entries = state.version_tracker.len();
3667
3668            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
3669        }
3670
3671        stats
3672    }
3673
3674    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
3675        let pending_removed = self.cleanup_expired_pending_updates(state_id);
3676        let temporal_removed = self.cleanup_temporal_indexes(state_id);
3677
3678        #[cfg(feature = "otel")]
3679        if let Some(state) = self.states.get(&state_id) {
3680            crate::vm_metrics::record_cleanup(
3681                pending_removed,
3682                temporal_removed,
3683                &state.entity_name,
3684            );
3685        }
3686
3687        CleanupResult {
3688            pending_updates_removed: pending_removed,
3689            temporal_entries_removed: temporal_removed,
3690        }
3691    }
3692
3693    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
3694        let state = match self.states.get_mut(&state_id) {
3695            Some(s) => s,
3696            None => return 0,
3697        };
3698
3699        let now = std::time::SystemTime::now()
3700            .duration_since(std::time::UNIX_EPOCH)
3701            .unwrap()
3702            .as_secs() as i64;
3703
3704        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
3705        let mut total_removed = 0;
3706
3707        for (_, index) in state.temporal_indexes.iter_mut() {
3708            total_removed += index.cleanup_expired(cutoff);
3709        }
3710
3711        total_removed
3712    }
3713
3714    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
3715        let state = self.states.get(&state_id)?;
3716
3717        if state.is_at_capacity() {
3718            Some(CapacityWarning {
3719                current_entries: state.data.len(),
3720                max_entries: state.config.max_entries,
3721                entries_over_limit: state.entries_over_limit(),
3722            })
3723        } else {
3724            None
3725        }
3726    }
3727
3728    /// Drop the oldest pending update across all PDAs
3729    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
3730        let state = self
3731            .states
3732            .get_mut(&state_id)
3733            .ok_or("State table not found")?;
3734
3735        let mut oldest_pda: Option<String> = None;
3736        let mut oldest_timestamp = i64::MAX;
3737
3738        // Find the PDA with the oldest update
3739        for entry in state.pending_updates.iter() {
3740            let (pda, updates) = entry.pair();
3741            if let Some(update) = updates.first() {
3742                if update.queued_at < oldest_timestamp {
3743                    oldest_timestamp = update.queued_at;
3744                    oldest_pda = Some(pda.clone());
3745                }
3746            }
3747        }
3748
3749        // Remove the oldest update
3750        if let Some(pda) = oldest_pda {
3751            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
3752                if !updates.is_empty() {
3753                    updates.remove(0);
3754                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3755
3756                    // Remove the entry if it's now empty
3757                    if updates.is_empty() {
3758                        drop(updates);
3759                        state.pending_updates.remove(&pda);
3760                    }
3761                }
3762            }
3763        }
3764
3765        Ok(())
3766    }
3767
3768    /// Flush and return pending updates for a PDA for external reprocessing
3769    ///
3770    /// Returns the pending updates that were queued for this PDA address.
3771    /// The caller should reprocess these through the VM using process_event().
3772    fn flush_pending_updates(
3773        &mut self,
3774        state_id: u32,
3775        pda_address: &str,
3776    ) -> Result<Vec<PendingAccountUpdate>> {
3777        let state = self
3778            .states
3779            .get_mut(&state_id)
3780            .ok_or("State table not found")?;
3781
3782        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
3783            let count = pending_updates.len();
3784            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3785            #[cfg(feature = "otel")]
3786            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
3787            Ok(pending_updates)
3788        } else {
3789            Ok(Vec::new())
3790        }
3791    }
3792
3793    /// Try to resolve a primary key via PDA reverse lookup
3794    pub fn try_pda_reverse_lookup(
3795        &mut self,
3796        state_id: u32,
3797        lookup_name: &str,
3798        pda_address: &str,
3799    ) -> Option<String> {
3800        let state = self.states.get_mut(&state_id)?;
3801
3802        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
3803            if let Some(value) = lookup.lookup(pda_address) {
3804                self.pda_cache_hits += 1;
3805                return Some(value);
3806            }
3807        }
3808
3809        self.pda_cache_misses += 1;
3810        None
3811    }
3812
3813    // ============================================================================
3814    // Computed Expression Evaluator (Task 5)
3815    // ============================================================================
3816
3817    /// Evaluate a computed expression AST against the current state
3818    /// This is the core runtime evaluator for computed fields from the AST
3819    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
3820        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
3821    }
3822
3823    /// Evaluate a computed expression with a variable environment (for let bindings)
3824    fn evaluate_computed_expr_with_env(
3825        &self,
3826        expr: &ComputedExpr,
3827        state: &Value,
3828        env: &std::collections::HashMap<String, Value>,
3829    ) -> Result<Value> {
3830        match expr {
3831            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
3832
3833            ComputedExpr::Var { name } => env
3834                .get(name)
3835                .cloned()
3836                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
3837
3838            ComputedExpr::Let { name, value, body } => {
3839                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
3840                let mut new_env = env.clone();
3841                new_env.insert(name.clone(), val);
3842                self.evaluate_computed_expr_with_env(body, state, &new_env)
3843            }
3844
3845            ComputedExpr::If {
3846                condition,
3847                then_branch,
3848                else_branch,
3849            } => {
3850                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
3851                if self.value_to_bool(&cond_val) {
3852                    self.evaluate_computed_expr_with_env(then_branch, state, env)
3853                } else {
3854                    self.evaluate_computed_expr_with_env(else_branch, state, env)
3855                }
3856            }
3857
3858            ComputedExpr::None => Ok(Value::Null),
3859
3860            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
3861
3862            ComputedExpr::Slice { expr, start, end } => {
3863                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3864                match val {
3865                    Value::Array(arr) => {
3866                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
3867                        Ok(Value::Array(slice))
3868                    }
3869                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
3870                }
3871            }
3872
3873            ComputedExpr::Index { expr, index } => {
3874                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3875                match val {
3876                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
3877                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
3878                }
3879            }
3880
3881            ComputedExpr::U64FromLeBytes { bytes } => {
3882                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3883                let byte_vec = self.value_to_bytes(&val)?;
3884                if byte_vec.len() < 8 {
3885                    return Err(format!(
3886                        "u64::from_le_bytes requires 8 bytes, got {}",
3887                        byte_vec.len()
3888                    )
3889                    .into());
3890                }
3891                let arr: [u8; 8] = byte_vec[..8]
3892                    .try_into()
3893                    .map_err(|_| "Failed to convert to [u8; 8]")?;
3894                Ok(json!(u64::from_le_bytes(arr)))
3895            }
3896
3897            ComputedExpr::U64FromBeBytes { bytes } => {
3898                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3899                let byte_vec = self.value_to_bytes(&val)?;
3900                if byte_vec.len() < 8 {
3901                    return Err(format!(
3902                        "u64::from_be_bytes requires 8 bytes, got {}",
3903                        byte_vec.len()
3904                    )
3905                    .into());
3906                }
3907                let arr: [u8; 8] = byte_vec[..8]
3908                    .try_into()
3909                    .map_err(|_| "Failed to convert to [u8; 8]")?;
3910                Ok(json!(u64::from_be_bytes(arr)))
3911            }
3912
3913            ComputedExpr::ByteArray { bytes } => {
3914                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3915            }
3916
3917            ComputedExpr::Closure { param, body } => {
3918                // Closures are stored as-is; they're evaluated when used in map()
3919                // Return a special representation
3920                Ok(json!({
3921                    "__closure": {
3922                        "param": param,
3923                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
3924                    }
3925                }))
3926            }
3927
3928            ComputedExpr::Unary { op, expr } => {
3929                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3930                self.apply_unary_op(op, &val)
3931            }
3932
3933            ComputedExpr::JsonToBytes { expr } => {
3934                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3935                // Convert JSON array of numbers to byte array
3936                let bytes = self.value_to_bytes(&val)?;
3937                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3938            }
3939
3940            ComputedExpr::UnwrapOr { expr, default } => {
3941                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3942                if val.is_null() {
3943                    Ok(default.clone())
3944                } else {
3945                    Ok(val)
3946                }
3947            }
3948
3949            ComputedExpr::Binary { op, left, right } => {
3950                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3951                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3952                self.apply_binary_op(op, &l, &r)
3953            }
3954
3955            ComputedExpr::Cast { expr, to_type } => {
3956                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3957                self.apply_cast(&val, to_type)
3958            }
3959
3960            ComputedExpr::ResolverComputed {
3961                resolver,
3962                method,
3963                args,
3964            } => {
3965                let evaluated_args: Vec<Value> = args
3966                    .iter()
3967                    .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
3968                    .collect::<Result<Vec<_>>>()?;
3969                crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
3970            }
3971
3972            ComputedExpr::MethodCall { expr, method, args } => {
3973                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3974                // Special handling for map() with closures
3975                if method == "map" && args.len() == 1 {
3976                    if let ComputedExpr::Closure { param, body } = &args[0] {
3977                        // If the value is null, return null (Option::None.map returns None)
3978                        if val.is_null() {
3979                            return Ok(Value::Null);
3980                        }
3981
3982                        if let Value::Array(arr) = &val {
3983                            let results: Result<Vec<Value>> = arr
3984                                .iter()
3985                                .map(|elem| {
3986                                    let mut closure_env = env.clone();
3987                                    closure_env.insert(param.clone(), elem.clone());
3988                                    self.evaluate_computed_expr_with_env(body, state, &closure_env)
3989                                })
3990                                .collect();
3991                            return Ok(Value::Array(results?));
3992                        }
3993
3994                        let mut closure_env = env.clone();
3995                        closure_env.insert(param.clone(), val);
3996                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3997                    }
3998                }
3999                let evaluated_args: Vec<Value> = args
4000                    .iter()
4001                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4002                    .collect::<Result<Vec<_>>>()?;
4003                self.apply_method_call(&val, method, &evaluated_args)
4004            }
4005
4006            ComputedExpr::Literal { value } => Ok(value.clone()),
4007
4008            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4009
4010            ComputedExpr::ContextSlot => Ok(self
4011                .current_context
4012                .as_ref()
4013                .and_then(|ctx| ctx.slot)
4014                .map(|s| json!(s))
4015                .unwrap_or(Value::Null)),
4016
4017            ComputedExpr::ContextTimestamp => Ok(self
4018                .current_context
4019                .as_ref()
4020                .map(|ctx| json!(ctx.timestamp()))
4021                .unwrap_or(Value::Null)),
4022        }
4023    }
4024
4025    /// Convert a JSON value to a byte vector
4026    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4027        match val {
4028            Value::Array(arr) => arr
4029                .iter()
4030                .map(|v| {
4031                    v.as_u64()
4032                        .map(|n| n as u8)
4033                        .ok_or_else(|| "Array element not a valid byte".into())
4034                })
4035                .collect(),
4036            Value::String(s) => {
4037                // Try to decode as hex
4038                if s.starts_with("0x") || s.starts_with("0X") {
4039                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4040                } else {
4041                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4042                }
4043            }
4044            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4045        }
4046    }
4047
4048    /// Apply a unary operation
4049    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4050        use crate::ast::UnaryOp;
4051        match op {
4052            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4053            UnaryOp::ReverseBits => match val.as_u64() {
4054                Some(n) => Ok(json!(n.reverse_bits())),
4055                None => match val.as_i64() {
4056                    Some(n) => Ok(json!((n as u64).reverse_bits())),
4057                    None => Err("reverse_bits requires an integer".into()),
4058                },
4059            },
4060        }
4061    }
4062
4063    /// Get a field value from state by path (e.g., "section.field" or just "field")
4064    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4065        let segments: Vec<&str> = path.split('.').collect();
4066        let mut current = state;
4067
4068        for segment in segments {
4069            match current.get(segment) {
4070                Some(v) => current = v,
4071                None => return Ok(Value::Null),
4072            }
4073        }
4074
4075        Ok(current.clone())
4076    }
4077
4078    /// Apply a binary operation to two values
4079    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4080        match op {
4081            // Arithmetic operations
4082            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4083            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4084            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4085            BinaryOp::Div => {
4086                // Check for division by zero
4087                if let Some(r) = right.as_i64() {
4088                    if r == 0 {
4089                        return Err("Division by zero".into());
4090                    }
4091                }
4092                if let Some(r) = right.as_f64() {
4093                    if r == 0.0 {
4094                        return Err("Division by zero".into());
4095                    }
4096                }
4097                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4098            }
4099            BinaryOp::Mod => {
4100                // Modulo - only for integers
4101                match (left.as_i64(), right.as_i64()) {
4102                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4103                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4104                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4105                        _ => Err("Modulo requires non-zero integer operands".into()),
4106                    },
4107                    _ => Err("Modulo by zero".into()),
4108                }
4109            }
4110
4111            // Comparison operations
4112            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4113            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4114            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4115            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4116            BinaryOp::Eq => Ok(json!(left == right)),
4117            BinaryOp::Ne => Ok(json!(left != right)),
4118
4119            // Logical operations
4120            BinaryOp::And => {
4121                let l_bool = self.value_to_bool(left);
4122                let r_bool = self.value_to_bool(right);
4123                Ok(json!(l_bool && r_bool))
4124            }
4125            BinaryOp::Or => {
4126                let l_bool = self.value_to_bool(left);
4127                let r_bool = self.value_to_bool(right);
4128                Ok(json!(l_bool || r_bool))
4129            }
4130
4131            // Bitwise operations
4132            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4133                (Some(a), Some(b)) => Ok(json!(a ^ b)),
4134                _ => match (left.as_i64(), right.as_i64()) {
4135                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
4136                    _ => Err("XOR requires integer operands".into()),
4137                },
4138            },
4139            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4140                (Some(a), Some(b)) => Ok(json!(a & b)),
4141                _ => match (left.as_i64(), right.as_i64()) {
4142                    (Some(a), Some(b)) => Ok(json!(a & b)),
4143                    _ => Err("BitAnd requires integer operands".into()),
4144                },
4145            },
4146            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4147                (Some(a), Some(b)) => Ok(json!(a | b)),
4148                _ => match (left.as_i64(), right.as_i64()) {
4149                    (Some(a), Some(b)) => Ok(json!(a | b)),
4150                    _ => Err("BitOr requires integer operands".into()),
4151                },
4152            },
4153            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4154                (Some(a), Some(b)) => Ok(json!(a << b)),
4155                _ => match (left.as_i64(), right.as_i64()) {
4156                    (Some(a), Some(b)) => Ok(json!(a << b)),
4157                    _ => Err("Shl requires integer operands".into()),
4158                },
4159            },
4160            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4161                (Some(a), Some(b)) => Ok(json!(a >> b)),
4162                _ => match (left.as_i64(), right.as_i64()) {
4163                    (Some(a), Some(b)) => Ok(json!(a >> b)),
4164                    _ => Err("Shr requires integer operands".into()),
4165                },
4166            },
4167        }
4168    }
4169
4170    /// Helper for numeric operations that can work on integers or floats
4171    fn numeric_op<F1, F2>(
4172        &self,
4173        left: &Value,
4174        right: &Value,
4175        int_op: F1,
4176        float_op: F2,
4177    ) -> Result<Value>
4178    where
4179        F1: Fn(i64, i64) -> i64,
4180        F2: Fn(f64, f64) -> f64,
4181    {
4182        // Try i64 first
4183        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4184            return Ok(json!(int_op(a, b)));
4185        }
4186
4187        // Try u64
4188        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4189            // For u64, we need to be careful with underflow in subtraction
4190            return Ok(json!(int_op(a as i64, b as i64)));
4191        }
4192
4193        // Try f64
4194        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4195            return Ok(json!(float_op(a, b)));
4196        }
4197
4198        // If either is null, return null
4199        if left.is_null() || right.is_null() {
4200            return Ok(Value::Null);
4201        }
4202
4203        Err(format!(
4204            "Cannot perform numeric operation on {:?} and {:?}",
4205            left, right
4206        )
4207        .into())
4208    }
4209
4210    /// Helper for comparison operations
4211    fn comparison_op<F1, F2>(
4212        &self,
4213        left: &Value,
4214        right: &Value,
4215        int_cmp: F1,
4216        float_cmp: F2,
4217    ) -> Result<Value>
4218    where
4219        F1: Fn(i64, i64) -> bool,
4220        F2: Fn(f64, f64) -> bool,
4221    {
4222        // Try i64 first
4223        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4224            return Ok(json!(int_cmp(a, b)));
4225        }
4226
4227        // Try u64
4228        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4229            return Ok(json!(int_cmp(a as i64, b as i64)));
4230        }
4231
4232        // Try f64
4233        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4234            return Ok(json!(float_cmp(a, b)));
4235        }
4236
4237        // If either is null, comparison returns false
4238        if left.is_null() || right.is_null() {
4239            return Ok(json!(false));
4240        }
4241
4242        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4243    }
4244
4245    /// Convert a value to boolean for logical operations
4246    fn value_to_bool(&self, value: &Value) -> bool {
4247        match value {
4248            Value::Null => false,
4249            Value::Bool(b) => *b,
4250            Value::Number(n) => {
4251                if let Some(i) = n.as_i64() {
4252                    i != 0
4253                } else if let Some(f) = n.as_f64() {
4254                    f != 0.0
4255                } else {
4256                    true
4257                }
4258            }
4259            Value::String(s) => !s.is_empty(),
4260            Value::Array(arr) => !arr.is_empty(),
4261            Value::Object(obj) => !obj.is_empty(),
4262        }
4263    }
4264
4265    /// Apply a type cast to a value
4266    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4267        match to_type {
4268            "i8" | "i16" | "i32" | "i64" | "isize" => {
4269                if let Some(n) = value.as_i64() {
4270                    Ok(json!(n))
4271                } else if let Some(n) = value.as_u64() {
4272                    Ok(json!(n as i64))
4273                } else if let Some(n) = value.as_f64() {
4274                    Ok(json!(n as i64))
4275                } else if let Some(s) = value.as_str() {
4276                    s.parse::<i64>()
4277                        .map(|n| json!(n))
4278                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4279                } else {
4280                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4281                }
4282            }
4283            "u8" | "u16" | "u32" | "u64" | "usize" => {
4284                if let Some(n) = value.as_u64() {
4285                    Ok(json!(n))
4286                } else if let Some(n) = value.as_i64() {
4287                    Ok(json!(n as u64))
4288                } else if let Some(n) = value.as_f64() {
4289                    Ok(json!(n as u64))
4290                } else if let Some(s) = value.as_str() {
4291                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4292                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4293                    })
4294                } else {
4295                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4296                }
4297            }
4298            "f32" | "f64" => {
4299                if let Some(n) = value.as_f64() {
4300                    Ok(json!(n))
4301                } else if let Some(n) = value.as_i64() {
4302                    Ok(json!(n as f64))
4303                } else if let Some(n) = value.as_u64() {
4304                    Ok(json!(n as f64))
4305                } else if let Some(s) = value.as_str() {
4306                    s.parse::<f64>()
4307                        .map(|n| json!(n))
4308                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4309                } else {
4310                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4311                }
4312            }
4313            "String" | "string" => Ok(json!(value.to_string())),
4314            "bool" => Ok(json!(self.value_to_bool(value))),
4315            _ => {
4316                // Unknown type, return value as-is
4317                Ok(value.clone())
4318            }
4319        }
4320    }
4321
4322    /// Apply a method call to a value
4323    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4324        match method {
4325            "unwrap_or" => {
4326                if value.is_null() && !args.is_empty() {
4327                    Ok(args[0].clone())
4328                } else {
4329                    Ok(value.clone())
4330                }
4331            }
4332            "unwrap_or_default" => {
4333                if value.is_null() {
4334                    // Return default for common types
4335                    Ok(json!(0))
4336                } else {
4337                    Ok(value.clone())
4338                }
4339            }
4340            "is_some" => Ok(json!(!value.is_null())),
4341            "is_none" => Ok(json!(value.is_null())),
4342            "abs" => {
4343                if let Some(n) = value.as_i64() {
4344                    Ok(json!(n.abs()))
4345                } else if let Some(n) = value.as_f64() {
4346                    Ok(json!(n.abs()))
4347                } else {
4348                    Err(format!("Cannot call abs() on {:?}", value).into())
4349                }
4350            }
4351            "len" => {
4352                if let Some(s) = value.as_str() {
4353                    Ok(json!(s.len()))
4354                } else if let Some(arr) = value.as_array() {
4355                    Ok(json!(arr.len()))
4356                } else if let Some(obj) = value.as_object() {
4357                    Ok(json!(obj.len()))
4358                } else {
4359                    Err(format!("Cannot call len() on {:?}", value).into())
4360                }
4361            }
4362            "to_string" => Ok(json!(value.to_string())),
4363            "min" => {
4364                if args.is_empty() {
4365                    return Err("min() requires an argument".into());
4366                }
4367                let other = &args[0];
4368                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4369                    Ok(json!(a.min(b)))
4370                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4371                    Ok(json!(a.min(b)))
4372                } else {
4373                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4374                }
4375            }
4376            "max" => {
4377                if args.is_empty() {
4378                    return Err("max() requires an argument".into());
4379                }
4380                let other = &args[0];
4381                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4382                    Ok(json!(a.max(b)))
4383                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4384                    Ok(json!(a.max(b)))
4385                } else {
4386                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4387                }
4388            }
4389            "saturating_add" => {
4390                if args.is_empty() {
4391                    return Err("saturating_add() requires an argument".into());
4392                }
4393                let other = &args[0];
4394                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4395                    Ok(json!(a.saturating_add(b)))
4396                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4397                    Ok(json!(a.saturating_add(b)))
4398                } else {
4399                    Err(format!(
4400                        "Cannot call saturating_add() on {:?} and {:?}",
4401                        value, other
4402                    )
4403                    .into())
4404                }
4405            }
4406            "saturating_sub" => {
4407                if args.is_empty() {
4408                    return Err("saturating_sub() requires an argument".into());
4409                }
4410                let other = &args[0];
4411                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4412                    Ok(json!(a.saturating_sub(b)))
4413                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4414                    Ok(json!(a.saturating_sub(b)))
4415                } else {
4416                    Err(format!(
4417                        "Cannot call saturating_sub() on {:?} and {:?}",
4418                        value, other
4419                    )
4420                    .into())
4421                }
4422            }
4423            _ => Err(format!("Unknown method call: {}()", method).into()),
4424        }
4425    }
4426
4427    /// Evaluate all computed fields for an entity and update the state
4428    /// This takes a list of ComputedFieldSpec from the AST and applies them
4429    pub fn evaluate_computed_fields_from_ast(
4430        &self,
4431        state: &mut Value,
4432        computed_field_specs: &[ComputedFieldSpec],
4433    ) -> Result<Vec<String>> {
4434        let mut updated_paths = Vec::new();
4435
4436        crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
4437
4438        for spec in computed_field_specs {
4439            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
4440                self.set_field_in_state(state, &spec.target_path, result)?;
4441                updated_paths.push(spec.target_path.clone());
4442            }
4443        }
4444
4445        Ok(updated_paths)
4446    }
4447
4448    /// Set a field value in state by path (e.g., "section.field")
4449    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
4450        let segments: Vec<&str> = path.split('.').collect();
4451
4452        if segments.is_empty() {
4453            return Err("Empty path".into());
4454        }
4455
4456        // Navigate to parent, creating intermediate objects as needed
4457        let mut current = state;
4458        for (i, segment) in segments.iter().enumerate() {
4459            if i == segments.len() - 1 {
4460                // Last segment - set the value
4461                if let Some(obj) = current.as_object_mut() {
4462                    obj.insert(segment.to_string(), value);
4463                    return Ok(());
4464                } else {
4465                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
4466                }
4467            } else {
4468                // Intermediate segment - navigate or create
4469                if !current.is_object() {
4470                    *current = json!({});
4471                }
4472                let obj = current.as_object_mut().unwrap();
4473                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
4474            }
4475        }
4476
4477        Ok(())
4478    }
4479
4480    /// Create a computed fields evaluator closure from AST specs
4481    /// This returns a function that can be passed to the bytecode builder
4482    pub fn create_evaluator_from_specs(
4483        specs: Vec<ComputedFieldSpec>,
4484    ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
4485        move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
4486            let mut vm = VmContext::new();
4487            vm.current_context = Some(UpdateContext {
4488                slot: context_slot,
4489                timestamp: Some(context_timestamp),
4490                ..Default::default()
4491            });
4492            vm.evaluate_computed_fields_from_ast(state, &specs)?;
4493            Ok(())
4494        }
4495    }
4496}
4497
4498impl Default for VmContext {
4499    fn default() -> Self {
4500        Self::new()
4501    }
4502}
4503
4504// Implement the ReverseLookupUpdater trait for VmContext
4505impl crate::resolvers::ReverseLookupUpdater for VmContext {
4506    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
4507        // Use default state_id=0 and default lookup name
4508        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
4509            .unwrap_or_else(|e| {
4510                tracing::error!("Failed to update PDA reverse lookup: {}", e);
4511                Vec::new()
4512            })
4513    }
4514
4515    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
4516        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
4517        self.flush_pending_updates(0, pda_address)
4518            .unwrap_or_else(|e| {
4519                tracing::error!("Failed to flush pending updates: {}", e);
4520                Vec::new()
4521            })
4522    }
4523}
4524
4525#[cfg(test)]
4526mod tests {
4527    use super::*;
4528    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
4529
4530    #[test]
4531    fn test_computed_field_preserves_integer_type() {
4532        let vm = VmContext::new();
4533
4534        let mut state = serde_json::json!({
4535            "trading": {
4536                "total_buy_volume": 20000000000_i64,
4537                "total_sell_volume": 17951316474_i64
4538            }
4539        });
4540
4541        let spec = ComputedFieldSpec {
4542            target_path: "trading.total_volume".to_string(),
4543            result_type: "Option<u64>".to_string(),
4544            expression: ComputedExpr::Binary {
4545                op: BinaryOp::Add,
4546                left: Box::new(ComputedExpr::UnwrapOr {
4547                    expr: Box::new(ComputedExpr::FieldRef {
4548                        path: "trading.total_buy_volume".to_string(),
4549                    }),
4550                    default: serde_json::json!(0),
4551                }),
4552                right: Box::new(ComputedExpr::UnwrapOr {
4553                    expr: Box::new(ComputedExpr::FieldRef {
4554                        path: "trading.total_sell_volume".to_string(),
4555                    }),
4556                    default: serde_json::json!(0),
4557                }),
4558            },
4559        };
4560
4561        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
4562            .unwrap();
4563
4564        let total_volume = state
4565            .get("trading")
4566            .and_then(|t| t.get("total_volume"))
4567            .expect("total_volume should exist");
4568
4569        let serialized = serde_json::to_string(total_volume).unwrap();
4570        assert!(
4571            !serialized.contains('.'),
4572            "Integer should not have decimal point: {}",
4573            serialized
4574        );
4575        assert_eq!(
4576            total_volume.as_i64(),
4577            Some(37951316474),
4578            "Value should be correct sum"
4579        );
4580    }
4581
4582    #[test]
4583    fn test_set_field_sum_preserves_integer_type() {
4584        let mut vm = VmContext::new();
4585        vm.registers[0] = serde_json::json!({});
4586        vm.registers[1] = serde_json::json!(20000000000_i64);
4587        vm.registers[2] = serde_json::json!(17951316474_i64);
4588
4589        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
4590        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
4591
4592        let state = &vm.registers[0];
4593        let buy_vol = state
4594            .get("trading")
4595            .and_then(|t| t.get("total_buy_volume"))
4596            .unwrap();
4597        let sell_vol = state
4598            .get("trading")
4599            .and_then(|t| t.get("total_sell_volume"))
4600            .unwrap();
4601
4602        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
4603        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
4604
4605        assert!(
4606            !buy_serialized.contains('.'),
4607            "Buy volume should not have decimal: {}",
4608            buy_serialized
4609        );
4610        assert!(
4611            !sell_serialized.contains('.'),
4612            "Sell volume should not have decimal: {}",
4613            sell_serialized
4614        );
4615    }
4616
4617    #[test]
4618    fn test_lookup_index_chaining() {
4619        let mut vm = VmContext::new();
4620
4621        let state = vm.states.get_mut(&0).unwrap();
4622
4623        state
4624            .pda_reverse_lookups
4625            .entry("default_pda_lookup".to_string())
4626            .or_insert_with(|| PdaReverseLookup::new(1000))
4627            .insert("pda_123".to_string(), "addr_456".to_string());
4628
4629        state
4630            .lookup_indexes
4631            .entry("round_address_lookup_index".to_string())
4632            .or_insert_with(LookupIndex::new)
4633            .insert(json!("addr_456"), json!(789));
4634
4635        let handler = vec![
4636            OpCode::LoadConstant {
4637                value: json!("pda_123"),
4638                dest: 0,
4639            },
4640            OpCode::LookupIndex {
4641                state_id: 0,
4642                index_name: "round_address_lookup_index".to_string(),
4643                lookup_value: 0,
4644                dest: 1,
4645            },
4646        ];
4647
4648        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4649            .unwrap();
4650
4651        assert_eq!(vm.registers[1], json!(789));
4652    }
4653
4654    #[test]
4655    fn test_lookup_index_no_chain() {
4656        let mut vm = VmContext::new();
4657
4658        let state = vm.states.get_mut(&0).unwrap();
4659        state
4660            .lookup_indexes
4661            .entry("test_index".to_string())
4662            .or_insert_with(LookupIndex::new)
4663            .insert(json!("key_abc"), json!(42));
4664
4665        let handler = vec![
4666            OpCode::LoadConstant {
4667                value: json!("key_abc"),
4668                dest: 0,
4669            },
4670            OpCode::LookupIndex {
4671                state_id: 0,
4672                index_name: "test_index".to_string(),
4673                lookup_value: 0,
4674                dest: 1,
4675            },
4676        ];
4677
4678        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4679            .unwrap();
4680
4681        assert_eq!(vm.registers[1], json!(42));
4682    }
4683
4684    #[test]
4685    fn test_conditional_set_field_with_zero_array() {
4686        let mut vm = VmContext::new();
4687
4688        let event_zeros = json!({
4689            "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4690                      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
4691        });
4692
4693        let event_nonzero = json!({
4694            "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
4695                      17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
4696        });
4697
4698        let zero_32: Value = json!([
4699            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4700            0, 0, 0
4701        ]);
4702
4703        let handler = vec![
4704            OpCode::CreateObject { dest: 2 },
4705            OpCode::LoadEventField {
4706                path: FieldPath::new(&["value"]),
4707                dest: 10,
4708                default: None,
4709            },
4710            OpCode::ConditionalSetField {
4711                object: 2,
4712                path: "captured_value".to_string(),
4713                value: 10,
4714                condition_field: FieldPath::new(&["value"]),
4715                condition_op: ComparisonOp::NotEqual,
4716                condition_value: zero_32,
4717            },
4718        ];
4719
4720        vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
4721            .unwrap();
4722        assert!(
4723            vm.registers[2].get("captured_value").is_none(),
4724            "Field should not be set when value is all zeros"
4725        );
4726
4727        vm.reset_registers();
4728        vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
4729            .unwrap();
4730        assert!(
4731            vm.registers[2].get("captured_value").is_some(),
4732            "Field should be set when value is non-zero"
4733        );
4734    }
4735
4736    #[test]
4737    fn test_when_instruction_arrives_first() {
4738        let mut vm = VmContext::new();
4739
4740        let signature = "test_sig_123".to_string();
4741
4742        {
4743            let state = vm.states.get(&0).unwrap();
4744            let mut cache = state.recent_tx_instructions.lock().unwrap();
4745            let mut set = HashSet::new();
4746            set.insert("RevealIxState".to_string());
4747            cache.put(signature.clone(), set);
4748        }
4749
4750        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4751
4752        let handler = vec![
4753            OpCode::CreateObject { dest: 2 },
4754            OpCode::LoadConstant {
4755                value: json!("primary_key_value"),
4756                dest: 1,
4757            },
4758            OpCode::LoadConstant {
4759                value: json!("the_revealed_value"),
4760                dest: 10,
4761            },
4762            OpCode::SetFieldWhen {
4763                object: 2,
4764                path: "entropy_value".to_string(),
4765                value: 10,
4766                when_instruction: "RevealIxState".to_string(),
4767                entity_name: "TestEntity".to_string(),
4768                key_reg: 1,
4769                condition_field: None,
4770                condition_op: None,
4771                condition_value: None,
4772            },
4773        ];
4774
4775        vm.execute_handler(
4776            &handler,
4777            &json!({}),
4778            "VarState",
4779            0,
4780            "TestEntity",
4781            None,
4782            None,
4783        )
4784        .unwrap();
4785
4786        assert_eq!(
4787            vm.registers[2].get("entropy_value").unwrap(),
4788            "the_revealed_value",
4789            "Field should be set when instruction was already seen"
4790        );
4791    }
4792
4793    #[test]
4794    fn test_when_account_arrives_first() {
4795        let mut vm = VmContext::new();
4796
4797        let signature = "test_sig_456".to_string();
4798
4799        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4800
4801        let handler = vec![
4802            OpCode::CreateObject { dest: 2 },
4803            OpCode::LoadConstant {
4804                value: json!("pk_123"),
4805                dest: 1,
4806            },
4807            OpCode::LoadConstant {
4808                value: json!("deferred_value"),
4809                dest: 10,
4810            },
4811            OpCode::SetFieldWhen {
4812                object: 2,
4813                path: "entropy_value".to_string(),
4814                value: 10,
4815                when_instruction: "RevealIxState".to_string(),
4816                entity_name: "TestEntity".to_string(),
4817                key_reg: 1,
4818                condition_field: None,
4819                condition_op: None,
4820                condition_value: None,
4821            },
4822        ];
4823
4824        vm.execute_handler(
4825            &handler,
4826            &json!({}),
4827            "VarState",
4828            0,
4829            "TestEntity",
4830            None,
4831            None,
4832        )
4833        .unwrap();
4834
4835        assert!(
4836            vm.registers[2].get("entropy_value").is_none(),
4837            "Field should not be set when instruction hasn't been seen"
4838        );
4839
4840        let state = vm.states.get(&0).unwrap();
4841        let key = (signature.clone(), "RevealIxState".to_string());
4842        assert!(
4843            state.deferred_when_ops.contains_key(&key),
4844            "Operation should be queued"
4845        );
4846
4847        {
4848            let mut cache = state.recent_tx_instructions.lock().unwrap();
4849            let mut set = HashSet::new();
4850            set.insert("RevealIxState".to_string());
4851            cache.put(signature.clone(), set);
4852        }
4853
4854        let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
4855        for op in deferred {
4856            vm.apply_deferred_when_op(0, &op).unwrap();
4857        }
4858
4859        let state = vm.states.get(&0).unwrap();
4860        let entity = state.data.get(&json!("pk_123")).unwrap();
4861        assert_eq!(
4862            entity.get("entropy_value").unwrap(),
4863            "deferred_value",
4864            "Field should be set after instruction arrives"
4865        );
4866    }
4867
4868    #[test]
4869    fn test_when_cleanup_expired() {
4870        let mut vm = VmContext::new();
4871
4872        let state = vm.states.get(&0).unwrap();
4873        let key = ("old_sig".to_string(), "SomeIxState".to_string());
4874        state.deferred_when_ops.insert(
4875            key,
4876            vec![DeferredWhenOperation {
4877                entity_name: "Test".to_string(),
4878                primary_key: json!("pk"),
4879                field_path: "field".to_string(),
4880                field_value: json!("value"),
4881                when_instruction: "SomeIxState".to_string(),
4882                signature: "old_sig".to_string(),
4883                slot: 0,
4884                deferred_at: 0,
4885                emit: true,
4886            }],
4887        );
4888
4889        let removed = vm.cleanup_expired_when_ops(0, 60);
4890
4891        assert_eq!(removed, 1, "Should have removed 1 expired op");
4892        assert!(
4893            vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
4894            "Deferred ops should be empty after cleanup"
4895        );
4896    }
4897}