Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3    ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use once_cell::sync::Lazy;
10use serde_json::{json, Value};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::num::NonZeroUsize;
13use std::time::{Duration, Instant};
14
15#[cfg(feature = "otel")]
16use tracing::instrument;
17/// Context metadata for blockchain updates (accounts and instructions)
18/// This structure is designed to be extended over time with additional metadata
19#[derive(Debug, Clone, Default)]
20pub struct UpdateContext {
21    /// Blockchain slot number
22    pub slot: Option<u64>,
23    /// Transaction signature
24    pub signature: Option<String>,
25    /// Unix timestamp (seconds since epoch)
26    /// If not provided, will default to current system time when accessed
27    pub timestamp: Option<i64>,
28    /// Write version for account updates (monotonically increasing per account within a slot)
29    /// Used for staleness detection to reject out-of-order updates
30    pub write_version: Option<u64>,
31    /// Transaction index for instruction updates (orders transactions within a slot)
32    /// Used for staleness detection to reject out-of-order updates
33    pub txn_index: Option<u64>,
34    /// When true, QueueResolver opcodes are skipped during handler execution.
35    /// Set for reprocessed cached data from PDA mapping changes to prevent
36    /// stale data from triggering resolvers or locking in wrong values via SetOnce.
37    pub skip_resolvers: bool,
38    /// Additional custom metadata that can be added without breaking changes
39    pub metadata: HashMap<String, Value>,
40}
41
42impl UpdateContext {
43    /// Create a new UpdateContext with slot and signature
44    pub fn new(slot: u64, signature: String) -> Self {
45        Self {
46            slot: Some(slot),
47            signature: Some(signature),
48            timestamp: None,
49            write_version: None,
50            txn_index: None,
51            skip_resolvers: false,
52            metadata: HashMap::new(),
53        }
54    }
55
56    /// Create a new UpdateContext with slot, signature, and timestamp
57    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
58        Self {
59            slot: Some(slot),
60            signature: Some(signature),
61            timestamp: Some(timestamp),
62            write_version: None,
63            txn_index: None,
64            skip_resolvers: false,
65            metadata: HashMap::new(),
66        }
67    }
68
69    /// Create context for account updates with write_version for staleness detection
70    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
71        Self {
72            slot: Some(slot),
73            signature: Some(signature),
74            timestamp: None,
75            write_version: Some(write_version),
76            txn_index: None,
77            skip_resolvers: false,
78            metadata: HashMap::new(),
79        }
80    }
81
82    /// Create context for instruction updates with txn_index for staleness detection
83    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
84        Self {
85            slot: Some(slot),
86            signature: Some(signature),
87            timestamp: None,
88            write_version: None,
89            txn_index: Some(txn_index),
90            skip_resolvers: false,
91            metadata: HashMap::new(),
92        }
93    }
94
95    /// Create context for reprocessed cached account data from PDA mapping changes.
96    /// Uses empty signature to prevent `when` guards from matching stale instructions,
97    /// and sets skip_resolvers to prevent stale scheduling/SetOnce lock-in.
98    pub fn new_reprocessed(slot: u64, write_version: u64) -> Self {
99        Self {
100            slot: Some(slot),
101            signature: None,
102            timestamp: None,
103            write_version: Some(write_version),
104            txn_index: None,
105            skip_resolvers: true,
106            metadata: HashMap::new(),
107        }
108    }
109
110    /// Get the timestamp, falling back to current system time if not set
111    pub fn timestamp(&self) -> i64 {
112        self.timestamp.unwrap_or_else(|| {
113            std::time::SystemTime::now()
114                .duration_since(std::time::UNIX_EPOCH)
115                .unwrap()
116                .as_secs() as i64
117        })
118    }
119
120    /// Create an empty context (for testing or when context is not available)
121    pub fn empty() -> Self {
122        Self::default()
123    }
124
125    /// Add custom metadata
126    /// Returns true if this is an account update context (has write_version, no txn_index)
127    pub fn is_account_update(&self) -> bool {
128        self.write_version.is_some() && self.txn_index.is_none()
129    }
130
131    /// Returns true if this is an instruction update context (has txn_index, no write_version)
132    pub fn is_instruction_update(&self) -> bool {
133        self.txn_index.is_some() && self.write_version.is_none()
134    }
135
136    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
137        self.metadata.insert(key, value);
138        self
139    }
140
141    /// Get metadata value
142    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
143        self.metadata.get(key)
144    }
145
146    /// Convert context to JSON value for injection into event data
147    pub fn to_value(&self) -> Value {
148        let mut obj = serde_json::Map::new();
149        if let Some(slot) = self.slot {
150            obj.insert("slot".to_string(), json!(slot));
151        }
152        if let Some(ref sig) = self.signature {
153            obj.insert("signature".to_string(), json!(sig));
154        }
155        // Always include timestamp (use current time if not set)
156        obj.insert("timestamp".to_string(), json!(self.timestamp()));
157        for (key, value) in &self.metadata {
158            obj.insert(key.clone(), value.clone());
159        }
160        Value::Object(obj)
161    }
162}
163
164pub type Register = usize;
165pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
166
167pub type RegisterValue = Value;
168
169/// Trait for evaluating computed fields
170/// Implement this in your generated spec to enable computed field evaluation
171pub trait ComputedFieldsEvaluator {
172    fn evaluate(&self, state: &mut Value) -> Result<()>;
173}
174
175// Pending queue configuration
176const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
177const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
178const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
179
180// Temporal index configuration - prevents unbounded history growth
181const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
182const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
183
184// State table configuration - aligned with downstream EntityCache (500 per view)
185const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
186const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
187
188const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
189
190const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
191
192// Smaller cache for instruction deduplication - provides shorter effective TTL
193// since we don't expect instruction duplicates to arrive much later
194const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
195
196const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
197
198const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
199
200const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 20_000;
201const DEFAULT_RESOLVER_CACHE_TTL_SECS: u64 = 3600; // 1 hour
202
203static RESOLVER_CACHE_CAPACITY: Lazy<NonZeroUsize> = Lazy::new(|| {
204    NonZeroUsize::new(
205        std::env::var("HYPERSTACK_RESOLVER_CACHE_CAPACITY")
206            .ok()
207            .and_then(|value| value.parse::<usize>().ok())
208            .filter(|value| *value > 0)
209            .unwrap_or(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES),
210    )
211    .expect("resolver cache capacity must be > 0")
212});
213
214static RESOLVER_CACHE_TTL: Lazy<Duration> = Lazy::new(|| {
215    let ttl_secs = match std::env::var("HYPERSTACK_RESOLVER_CACHE_TTL_SECS") {
216        Ok(value) => match value.parse::<u64>() {
217            Ok(0) => {
218                tracing::warn!(
219                    default_ttl_secs = DEFAULT_RESOLVER_CACHE_TTL_SECS,
220                    "HYPERSTACK_RESOLVER_CACHE_TTL_SECS=0 is not supported; using default"
221                );
222                DEFAULT_RESOLVER_CACHE_TTL_SECS
223            }
224            Ok(value) => value,
225            Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
226        },
227        Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
228    };
229
230    Duration::from_secs(ttl_secs)
231});
232
233#[derive(Debug, Clone)]
234struct ResolverCacheEntry {
235    value: Value,
236    cached_at: Instant,
237}
238
239fn resolver_cache_capacity() -> NonZeroUsize {
240    *RESOLVER_CACHE_CAPACITY
241}
242
243fn resolver_cache_ttl() -> Duration {
244    *RESOLVER_CACHE_TTL
245}
246
247/// Estimate the size of a JSON value in bytes
248fn estimate_json_size(value: &Value) -> usize {
249    match value {
250        Value::Null => 4,
251        Value::Bool(_) => 5,
252        Value::Number(_) => 8,
253        Value::String(s) => s.len() + 2,
254        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
255        Value::Object(obj) => {
256            2 + obj
257                .iter()
258                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
259                .sum::<usize>()
260        }
261    }
262}
263
264#[derive(Debug, Clone)]
265pub struct CompiledPath {
266    pub segments: std::sync::Arc<[String]>,
267}
268
269impl CompiledPath {
270    pub fn new(path: &str) -> Self {
271        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
272        CompiledPath {
273            segments: segments.into(),
274        }
275    }
276
277    fn segments(&self) -> &[String] {
278        &self.segments
279    }
280}
281
282/// Represents the type of change made to a field for granular dirty tracking.
283/// This enables emitting only the actual changes rather than entire field values.
284#[derive(Debug, Clone)]
285pub enum FieldChange {
286    /// Field was replaced with a new value (emit the full value from state)
287    Replaced,
288    /// Items were appended to an array field (emit only the new items)
289    Appended(Vec<Value>),
290}
291
292/// Tracks field modifications during handler execution with granular change information.
293/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
294#[derive(Debug, Clone, Default)]
295pub struct DirtyTracker {
296    changes: HashMap<String, FieldChange>,
297}
298
299impl DirtyTracker {
300    /// Create a new empty DirtyTracker
301    pub fn new() -> Self {
302        Self {
303            changes: HashMap::new(),
304        }
305    }
306
307    /// Mark a field as replaced (full value will be emitted)
308    pub fn mark_replaced(&mut self, path: &str) {
309        // If there was an append, it's now superseded by a full replacement
310        self.changes.insert(path.to_string(), FieldChange::Replaced);
311    }
312
313    /// Record an appended value for a field
314    pub fn mark_appended(&mut self, path: &str, value: Value) {
315        match self.changes.get_mut(path) {
316            Some(FieldChange::Appended(values)) => {
317                // Add to existing appended values
318                values.push(value);
319            }
320            Some(FieldChange::Replaced) => {
321                // Field was already replaced, keep it as replaced
322                // (the full value including the append will be emitted)
323            }
324            None => {
325                // First append to this field
326                self.changes
327                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
328            }
329        }
330    }
331
332    /// Check if there are any changes tracked
333    pub fn is_empty(&self) -> bool {
334        self.changes.is_empty()
335    }
336
337    /// Get the number of changed fields
338    pub fn len(&self) -> usize {
339        self.changes.len()
340    }
341
342    /// Iterate over all changes
343    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
344        self.changes.iter()
345    }
346
347    /// Get a set of all dirty field paths (for backward compatibility)
348    pub fn dirty_paths(&self) -> HashSet<String> {
349        self.changes.keys().cloned().collect()
350    }
351
352    /// Consume the tracker and return the changes map
353    pub fn into_changes(self) -> HashMap<String, FieldChange> {
354        self.changes
355    }
356
357    /// Get a reference to the changes map
358    pub fn changes(&self) -> &HashMap<String, FieldChange> {
359        &self.changes
360    }
361
362    /// Get paths that were appended (not replaced)
363    pub fn appended_paths(&self) -> Vec<String> {
364        self.changes
365            .iter()
366            .filter_map(|(path, change)| match change {
367                FieldChange::Appended(_) => Some(path.clone()),
368                FieldChange::Replaced => None,
369            })
370            .collect()
371    }
372}
373
374pub struct VmContext {
375    registers: Vec<RegisterValue>,
376    states: HashMap<u32, StateTable>,
377    pub instructions_executed: u64,
378    pub cache_hits: u64,
379    path_cache: HashMap<String, CompiledPath>,
380    pub pda_cache_hits: u64,
381    pub pda_cache_misses: u64,
382    pub pending_queue_size: u64,
383    resolver_requests: VecDeque<ResolverRequest>,
384    resolver_pending: HashMap<String, PendingResolverEntry>,
385    resolver_cache: LruCache<String, ResolverCacheEntry>,
386    pub resolver_cache_hits: u64,
387    pub resolver_cache_misses: u64,
388    current_context: Option<UpdateContext>,
389    warnings: Vec<String>,
390    last_pda_lookup_miss: Option<String>,
391    last_lookup_index_miss: Option<String>,
392    last_pda_registered: Option<String>,
393    last_lookup_index_keys: Vec<String>,
394    scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
395}
396
397#[derive(Debug)]
398pub struct LookupIndex {
399    index: std::sync::Mutex<LruCache<String, Value>>,
400}
401
402impl LookupIndex {
403    pub fn new() -> Self {
404        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
405    }
406
407    pub fn with_capacity(capacity: usize) -> Self {
408        LookupIndex {
409            index: std::sync::Mutex::new(LruCache::new(
410                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
411            )),
412        }
413    }
414
415    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
416        let key = value_to_cache_key(lookup_value);
417        self.index.lock().unwrap().get(&key).cloned()
418    }
419
420    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
421        let key = value_to_cache_key(&lookup_value);
422        self.index.lock().unwrap().put(key, primary_key);
423    }
424
425    /// Remove an entry by lookup value.  Used when a PDA mapping changes to
426    /// clear stale entries that would otherwise shadow the updated PDA mapping.
427    pub fn remove(&self, lookup_value: &Value) {
428        let key = value_to_cache_key(lookup_value);
429        self.index.lock().unwrap().pop(&key);
430    }
431
432    pub fn len(&self) -> usize {
433        self.index.lock().unwrap().len()
434    }
435
436    pub fn is_empty(&self) -> bool {
437        self.index.lock().unwrap().is_empty()
438    }
439}
440
441impl Default for LookupIndex {
442    fn default() -> Self {
443        Self::new()
444    }
445}
446
447fn value_to_cache_key(value: &Value) -> String {
448    match value {
449        Value::String(s) => s.clone(),
450        Value::Number(n) => n.to_string(),
451        Value::Bool(b) => b.to_string(),
452        Value::Null => "null".to_string(),
453        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
454    }
455}
456
457pub(crate) fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
458    match resolver {
459        ResolverType::Token => format!("token:{}", value_to_cache_key(input)),
460        ResolverType::Url(config) => {
461            let method = match config.method {
462                ast::HttpMethod::Get => "get",
463                ast::HttpMethod::Post => "post",
464            };
465            format!("url:{}:{}", method, value_to_cache_key(input))
466        }
467    }
468}
469
470#[derive(Debug)]
471pub struct TemporalIndex {
472    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
473}
474
475impl Default for TemporalIndex {
476    fn default() -> Self {
477        Self::new()
478    }
479}
480
481impl TemporalIndex {
482    pub fn new() -> Self {
483        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
484    }
485
486    pub fn with_capacity(capacity: usize) -> Self {
487        TemporalIndex {
488            index: std::sync::Mutex::new(LruCache::new(
489                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
490            )),
491        }
492    }
493
494    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
495        let key = value_to_cache_key(lookup_value);
496        let mut cache = self.index.lock().unwrap();
497        if let Some(entries) = cache.get(&key) {
498            for i in (0..entries.len()).rev() {
499                if entries[i].1 <= timestamp {
500                    return Some(entries[i].0.clone());
501                }
502            }
503        }
504        None
505    }
506
507    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
508        let key = value_to_cache_key(lookup_value);
509        let mut cache = self.index.lock().unwrap();
510        if let Some(entries) = cache.get(&key) {
511            if let Some(last) = entries.last() {
512                return Some(last.0.clone());
513            }
514        }
515        None
516    }
517
518    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
519        let key = value_to_cache_key(&lookup_value);
520        let mut cache = self.index.lock().unwrap();
521
522        let entries = cache.get_or_insert_mut(key, Vec::new);
523        entries.push((primary_key, timestamp));
524        entries.sort_by_key(|(_, ts)| *ts);
525
526        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
527        entries.retain(|(_, ts)| *ts >= cutoff);
528
529        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
530            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
531            entries.drain(0..excess);
532        }
533    }
534
535    pub fn len(&self) -> usize {
536        self.index.lock().unwrap().len()
537    }
538
539    pub fn is_empty(&self) -> bool {
540        self.index.lock().unwrap().is_empty()
541    }
542
543    pub fn total_entries(&self) -> usize {
544        self.index
545            .lock()
546            .unwrap()
547            .iter()
548            .map(|(_, entries)| entries.len())
549            .sum()
550    }
551
552    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
553        let mut cache = self.index.lock().unwrap();
554        let mut total_removed = 0;
555
556        for (_, entries) in cache.iter_mut() {
557            let original_len = entries.len();
558            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
559            total_removed += original_len - entries.len();
560        }
561
562        total_removed
563    }
564}
565
566#[derive(Debug)]
567pub struct PdaReverseLookup {
568    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
569    index: LruCache<String, String>,
570}
571
572impl PdaReverseLookup {
573    pub fn new(capacity: usize) -> Self {
574        PdaReverseLookup {
575            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
576        }
577    }
578
579    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
580        self.index.get(pda_address).cloned()
581    }
582
583    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
584        let evicted = if self.index.len() >= self.index.cap().get() {
585            self.index.peek_lru().map(|(k, _)| k.clone())
586        } else {
587            None
588        };
589
590        self.index.put(pda_address, seed_value);
591        evicted
592    }
593
594    pub fn len(&self) -> usize {
595        self.index.len()
596    }
597
598    pub fn is_empty(&self) -> bool {
599        self.index.is_empty()
600    }
601
602    pub fn contains(&self, pda_address: &str) -> bool {
603        self.index.peek(pda_address).is_some()
604    }
605}
606
607/// Input for queueing an account update.
608#[derive(Debug, Clone)]
609pub struct QueuedAccountUpdate {
610    pub pda_address: String,
611    pub account_type: String,
612    pub account_data: Value,
613    pub slot: u64,
614    pub write_version: u64,
615    pub signature: String,
616}
617
618/// Internal representation of a pending account update with queue metadata.
619#[derive(Debug, Clone)]
620pub struct PendingAccountUpdate {
621    pub account_type: String,
622    pub pda_address: String,
623    pub account_data: Value,
624    pub slot: u64,
625    pub write_version: u64,
626    pub signature: String,
627    pub queued_at: i64,
628    /// When true, this update was pulled from the `last_account_data` cache
629    /// during a PDA mapping change. It carries stale data from a previous
630    /// entity mapping and should use `UpdateContext::new_reprocessed` to
631    /// prevent `when` guards from matching stale instruction signatures
632    /// and to skip resolver scheduling.
633    pub is_stale_reprocess: bool,
634}
635
636/// Input for queueing an instruction event when PDA lookup fails.
637#[derive(Debug, Clone)]
638pub struct QueuedInstructionEvent {
639    pub pda_address: String,
640    pub event_type: String,
641    pub event_data: Value,
642    pub slot: u64,
643    pub signature: String,
644}
645
646/// Internal representation of a pending instruction event with queue metadata.
647#[derive(Debug, Clone)]
648pub struct PendingInstructionEvent {
649    pub event_type: String,
650    pub pda_address: String,
651    pub event_data: Value,
652    pub slot: u64,
653    pub signature: String,
654    pub queued_at: i64,
655}
656
657#[derive(Debug, Clone)]
658pub struct DeferredWhenOperation {
659    pub entity_name: String,
660    pub primary_key: Value,
661    pub field_path: String,
662    pub field_value: Value,
663    pub when_instruction: String,
664    pub signature: String,
665    pub slot: u64,
666    pub deferred_at: i64,
667    pub emit: bool,
668}
669
670#[derive(Debug, Clone)]
671pub struct ResolverRequest {
672    pub cache_key: String,
673    pub resolver: ResolverType,
674    pub input: Value,
675}
676
677#[derive(Debug, Clone)]
678pub struct ResolverTarget {
679    pub state_id: u32,
680    pub entity_name: String,
681    pub primary_key: Value,
682    pub extracts: Vec<ResolverExtractSpec>,
683}
684
685#[derive(Debug, Clone)]
686pub struct PendingResolverEntry {
687    pub resolver: ResolverType,
688    pub input: Value,
689    pub targets: Vec<ResolverTarget>,
690    pub queued_at: i64,
691}
692
693#[derive(Debug, Clone, PartialEq)]
694pub struct ScheduledCallback {
695    pub state_id: u32,
696    pub entity_name: String,
697    pub primary_key: Value,
698    pub resolver: ResolverType,
699    pub url_template: Option<Vec<ast::UrlTemplatePart>>,
700    pub input_value: Option<Value>,
701    pub input_path: Option<String>,
702    pub condition: Option<ast::ResolverCondition>,
703    pub strategy: ResolveStrategy,
704    pub extracts: Vec<ResolverExtractSpec>,
705    pub retry_count: u32,
706}
707
708impl PendingResolverEntry {
709    fn add_target(&mut self, target: ResolverTarget) {
710        if let Some(existing) = self.targets.iter_mut().find(|t| {
711            t.state_id == target.state_id
712                && t.entity_name == target.entity_name
713                && t.primary_key == target.primary_key
714        }) {
715            let mut seen = HashSet::new();
716            for extract in &existing.extracts {
717                seen.insert((extract.target_path.clone(), extract.source_path.clone()));
718            }
719
720            for extract in target.extracts {
721                let key = (extract.target_path.clone(), extract.source_path.clone());
722                if seen.insert(key) {
723                    existing.extracts.push(extract);
724                }
725            }
726        } else {
727            self.targets.push(target);
728        }
729    }
730}
731
732#[derive(Debug, Clone)]
733pub struct PendingQueueStats {
734    pub total_updates: usize,
735    pub unique_pdas: usize,
736    pub oldest_age_seconds: i64,
737    pub largest_pda_queue_size: usize,
738    pub estimated_memory_bytes: usize,
739}
740
741#[derive(Debug, Clone, Default)]
742pub struct VmMemoryStats {
743    pub state_table_entity_count: usize,
744    pub state_table_max_entries: usize,
745    pub state_table_at_capacity: bool,
746    pub lookup_index_count: usize,
747    pub lookup_index_total_entries: usize,
748    pub temporal_index_count: usize,
749    pub temporal_index_total_entries: usize,
750    pub pda_reverse_lookup_count: usize,
751    pub pda_reverse_lookup_total_entries: usize,
752    pub version_tracker_entries: usize,
753    pub pending_queue_stats: Option<PendingQueueStats>,
754    pub path_cache_size: usize,
755}
756
757#[derive(Debug, Clone, Default)]
758pub struct CleanupResult {
759    pub pending_updates_removed: usize,
760    pub temporal_entries_removed: usize,
761}
762
763#[derive(Debug, Clone)]
764pub struct CapacityWarning {
765    pub current_entries: usize,
766    pub max_entries: usize,
767    pub entries_over_limit: usize,
768}
769
770#[derive(Debug, Clone)]
771pub struct StateTableConfig {
772    pub max_entries: usize,
773    pub max_array_length: usize,
774}
775
776impl Default for StateTableConfig {
777    fn default() -> Self {
778        Self {
779            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
780            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
781        }
782    }
783}
784
785#[derive(Debug)]
786pub struct VersionTracker {
787    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
788}
789
790impl VersionTracker {
791    pub fn new() -> Self {
792        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
793    }
794
795    pub fn with_capacity(capacity: usize) -> Self {
796        VersionTracker {
797            cache: std::sync::Mutex::new(LruCache::new(
798                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
799            )),
800        }
801    }
802
803    fn make_key(primary_key: &Value, event_type: &str) -> String {
804        format!("{}:{}", primary_key, event_type)
805    }
806
807    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
808        let key = Self::make_key(primary_key, event_type);
809        self.cache.lock().unwrap().get(&key).copied()
810    }
811
812    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
813        let key = Self::make_key(primary_key, event_type);
814        self.cache.lock().unwrap().put(key, (slot, ordering_value));
815    }
816
817    pub fn len(&self) -> usize {
818        self.cache.lock().unwrap().len()
819    }
820
821    pub fn is_empty(&self) -> bool {
822        self.cache.lock().unwrap().is_empty()
823    }
824}
825
826impl Default for VersionTracker {
827    fn default() -> Self {
828        Self::new()
829    }
830}
831
832#[derive(Debug)]
833pub struct StateTable {
834    pub data: DashMap<Value, Value>,
835    access_times: DashMap<Value, i64>,
836    pub lookup_indexes: HashMap<String, LookupIndex>,
837    pub temporal_indexes: HashMap<String, TemporalIndex>,
838    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
839    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
840    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
841    /// Cache of the most recent account data per PDA address.  When a PDA
842    /// mapping changes (same PDA, different seed) the cached data is returned
843    /// for reprocessing so cross-account Lookup handlers resolve to the new key.
844    pub last_account_data: DashMap<String, PendingAccountUpdate>,
845    version_tracker: VersionTracker,
846    instruction_dedup_cache: VersionTracker,
847    config: StateTableConfig,
848    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
849    entity_name: String,
850    pub recent_tx_instructions:
851        std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
852    pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
853}
854
855impl StateTable {
856    pub fn is_at_capacity(&self) -> bool {
857        self.data.len() >= self.config.max_entries
858    }
859
860    pub fn entries_over_limit(&self) -> usize {
861        self.data.len().saturating_sub(self.config.max_entries)
862    }
863
864    pub fn max_array_length(&self) -> usize {
865        self.config.max_array_length
866    }
867
868    fn touch(&self, key: &Value) {
869        let now = std::time::SystemTime::now()
870            .duration_since(std::time::UNIX_EPOCH)
871            .unwrap()
872            .as_secs() as i64;
873        self.access_times.insert(key.clone(), now);
874    }
875
876    fn evict_lru(&self, count: usize) -> usize {
877        if count == 0 || self.data.is_empty() {
878            return 0;
879        }
880
881        let mut entries: Vec<(Value, i64)> = self
882            .access_times
883            .iter()
884            .map(|entry| (entry.key().clone(), *entry.value()))
885            .collect();
886
887        entries.sort_by_key(|(_, ts)| *ts);
888
889        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
890
891        let mut evicted = 0;
892        for key in to_evict {
893            self.data.remove(&key);
894            self.access_times.remove(&key);
895            evicted += 1;
896        }
897
898        #[cfg(feature = "otel")]
899        if evicted > 0 {
900            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
901        }
902
903        evicted
904    }
905
906    pub fn insert_with_eviction(&self, key: Value, value: Value) {
907        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
908            #[cfg(feature = "otel")]
909            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
910            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
911            self.evict_lru(to_evict.max(1));
912        }
913        self.data.insert(key.clone(), value);
914        self.touch(&key);
915    }
916
917    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
918        let result = self.data.get(key).map(|v| v.clone());
919        if result.is_some() {
920            self.touch(key);
921        }
922        result
923    }
924
925    /// Check if an update is fresh and update the version tracker.
926    /// Returns true if the update should be processed (is fresh).
927    /// Returns false if the update is stale and should be skipped.
928    ///
929    /// Comparison is lexicographic on (slot, ordering_value):
930    /// (100, 5) > (100, 3) > (99, 999)
931    pub fn is_fresh_update(
932        &self,
933        primary_key: &Value,
934        event_type: &str,
935        slot: u64,
936        ordering_value: u64,
937    ) -> bool {
938        let dominated = self
939            .version_tracker
940            .get(primary_key, event_type)
941            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
942            .unwrap_or(false);
943
944        if dominated {
945            return false;
946        }
947
948        self.version_tracker
949            .insert(primary_key, event_type, slot, ordering_value);
950        true
951    }
952
953    /// Check if an instruction is a duplicate of one we've seen recently.
954    /// Returns true if this exact instruction has been seen before (is a duplicate).
955    /// Returns false if this is a new instruction that should be processed.
956    ///
957    /// Unlike account updates, instructions don't use recency checks - all
958    /// unique instructions are processed. Only exact duplicates are skipped.
959    /// Uses a smaller cache capacity for shorter effective TTL.
960    pub fn is_duplicate_instruction(
961        &self,
962        primary_key: &Value,
963        event_type: &str,
964        slot: u64,
965        txn_index: u64,
966    ) -> bool {
967        // Check if we've seen this exact instruction before
968        let is_duplicate = self
969            .instruction_dedup_cache
970            .get(primary_key, event_type)
971            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
972            .unwrap_or(false);
973
974        if is_duplicate {
975            return true;
976        }
977
978        // Record this instruction for deduplication
979        self.instruction_dedup_cache
980            .insert(primary_key, event_type, slot, txn_index);
981        false
982    }
983}
984
985impl VmContext {
986    pub fn new() -> Self {
987        let mut vm = VmContext {
988            registers: vec![Value::Null; 256],
989            states: HashMap::new(),
990            instructions_executed: 0,
991            cache_hits: 0,
992            path_cache: HashMap::new(),
993            pda_cache_hits: 0,
994            pda_cache_misses: 0,
995            pending_queue_size: 0,
996            resolver_requests: VecDeque::new(),
997            resolver_pending: HashMap::new(),
998            resolver_cache: LruCache::new(resolver_cache_capacity()),
999            resolver_cache_hits: 0,
1000            resolver_cache_misses: 0,
1001            current_context: None,
1002            warnings: Vec::new(),
1003            last_pda_lookup_miss: None,
1004            last_lookup_index_miss: None,
1005            last_pda_registered: None,
1006            last_lookup_index_keys: Vec::new(),
1007            scheduled_callbacks: Vec::new(),
1008        };
1009        vm.states.insert(
1010            0,
1011            StateTable {
1012                data: DashMap::new(),
1013                access_times: DashMap::new(),
1014                lookup_indexes: HashMap::new(),
1015                temporal_indexes: HashMap::new(),
1016                pda_reverse_lookups: HashMap::new(),
1017                pending_updates: DashMap::new(),
1018                pending_instruction_events: DashMap::new(),
1019                last_account_data: DashMap::new(),
1020                version_tracker: VersionTracker::new(),
1021                instruction_dedup_cache: VersionTracker::with_capacity(
1022                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1023                ),
1024                config: StateTableConfig::default(),
1025                entity_name: String::new(),
1026                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1027                    NonZeroUsize::new(1000).unwrap(),
1028                )),
1029                deferred_when_ops: DashMap::new(),
1030            },
1031        );
1032
1033        vm
1034    }
1035
1036    /// Create a new VmContext specifically for multi-entity operation.
1037    pub fn new_multi_entity() -> Self {
1038        VmContext {
1039            registers: vec![Value::Null; 256],
1040            states: HashMap::new(),
1041            instructions_executed: 0,
1042            cache_hits: 0,
1043            path_cache: HashMap::new(),
1044            pda_cache_hits: 0,
1045            pda_cache_misses: 0,
1046            pending_queue_size: 0,
1047            resolver_requests: VecDeque::new(),
1048            resolver_pending: HashMap::new(),
1049            resolver_cache: LruCache::new(resolver_cache_capacity()),
1050            resolver_cache_hits: 0,
1051            resolver_cache_misses: 0,
1052            current_context: None,
1053            warnings: Vec::new(),
1054            last_pda_lookup_miss: None,
1055            last_lookup_index_miss: None,
1056            last_pda_registered: None,
1057            last_lookup_index_keys: Vec::new(),
1058            scheduled_callbacks: Vec::new(),
1059        }
1060    }
1061
1062    pub fn new_with_config(state_config: StateTableConfig) -> Self {
1063        let mut vm = VmContext {
1064            registers: vec![Value::Null; 256],
1065            states: HashMap::new(),
1066            instructions_executed: 0,
1067            cache_hits: 0,
1068            path_cache: HashMap::new(),
1069            pda_cache_hits: 0,
1070            pda_cache_misses: 0,
1071            pending_queue_size: 0,
1072            resolver_requests: VecDeque::new(),
1073            resolver_pending: HashMap::new(),
1074            resolver_cache: LruCache::new(resolver_cache_capacity()),
1075            resolver_cache_hits: 0,
1076            resolver_cache_misses: 0,
1077            current_context: None,
1078            warnings: Vec::new(),
1079            last_pda_lookup_miss: None,
1080            last_lookup_index_miss: None,
1081            last_pda_registered: None,
1082            last_lookup_index_keys: Vec::new(),
1083            scheduled_callbacks: Vec::new(),
1084        };
1085        vm.states.insert(
1086            0,
1087            StateTable {
1088                data: DashMap::new(),
1089                access_times: DashMap::new(),
1090                lookup_indexes: HashMap::new(),
1091                temporal_indexes: HashMap::new(),
1092                pda_reverse_lookups: HashMap::new(),
1093                pending_updates: DashMap::new(),
1094                pending_instruction_events: DashMap::new(),
1095                last_account_data: DashMap::new(),
1096                version_tracker: VersionTracker::new(),
1097                instruction_dedup_cache: VersionTracker::with_capacity(
1098                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1099                ),
1100                config: state_config,
1101                entity_name: "default".to_string(),
1102                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1103                    NonZeroUsize::new(1000).unwrap(),
1104                )),
1105                deferred_when_ops: DashMap::new(),
1106            },
1107        );
1108        vm
1109    }
1110
1111    pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1112        self.resolver_requests.drain(..).collect()
1113    }
1114
1115    pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1116        std::mem::take(&mut self.scheduled_callbacks)
1117    }
1118
1119    pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1120        self.states.get(&state_id)?.get_and_touch(key)
1121    }
1122
1123    pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1124        if requests.is_empty() {
1125            return;
1126        }
1127
1128        self.resolver_requests.extend(requests);
1129    }
1130
1131    pub(crate) fn get_cached_resolver_value(&mut self, cache_key: &str) -> Option<Value> {
1132        let cached = self.resolver_cache.get(cache_key).cloned();
1133
1134        match cached {
1135            Some(entry) if entry.cached_at.elapsed() <= resolver_cache_ttl() => {
1136                self.resolver_cache_hits += 1;
1137                Some(entry.value)
1138            }
1139            Some(_) => {
1140                self.resolver_cache.pop(cache_key);
1141                self.resolver_cache_misses += 1;
1142                None
1143            }
1144            None => {
1145                self.resolver_cache_misses += 1;
1146                None
1147            }
1148        }
1149    }
1150
1151    fn cache_resolver_value(
1152        &mut self,
1153        resolver: &ResolverType,
1154        input: &Value,
1155        resolved_value: &Value,
1156    ) {
1157        self.resolver_cache.put(
1158            resolver_cache_key(resolver, input),
1159            ResolverCacheEntry {
1160                value: resolved_value.clone(),
1161                cached_at: Instant::now(),
1162            },
1163        );
1164    }
1165
1166    pub fn apply_resolver_result(
1167        &mut self,
1168        bytecode: &MultiEntityBytecode,
1169        cache_key: &str,
1170        resolved_value: Value,
1171    ) -> Result<Vec<Mutation>> {
1172        let entry = match self.resolver_pending.remove(cache_key) {
1173            Some(entry) => entry,
1174            None => return Ok(Vec::new()),
1175        };
1176
1177        self.cache_resolver_value(&entry.resolver, &entry.input, &resolved_value);
1178
1179        let mut mutations = Vec::new();
1180
1181        for target in entry.targets {
1182            if target.primary_key.is_null() {
1183                continue;
1184            }
1185
1186            let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1187                Some(bc) => bc,
1188                None => continue,
1189            };
1190
1191            let state = match self.states.get(&target.state_id) {
1192                Some(s) => s,
1193                None => continue,
1194            };
1195
1196            let mut entity_state = state
1197                .get_and_touch(&target.primary_key)
1198                .unwrap_or_else(|| json!({}));
1199
1200            let mut dirty_tracker = DirtyTracker::new();
1201            let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1202
1203            Self::apply_resolver_extractions_to_value(
1204                &mut entity_state,
1205                &resolved_value,
1206                &target.extracts,
1207                &mut dirty_tracker,
1208                &should_emit,
1209            )?;
1210
1211            if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1212                let old_values: Vec<_> = entity_bytecode
1213                    .computed_paths
1214                    .iter()
1215                    .map(|path| Self::get_value_at_path(&entity_state, path))
1216                    .collect();
1217
1218                let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1219                let context_timestamp = self
1220                    .current_context
1221                    .as_ref()
1222                    .map(|c| c.timestamp())
1223                    .unwrap_or_else(|| {
1224                        std::time::SystemTime::now()
1225                            .duration_since(std::time::UNIX_EPOCH)
1226                            .unwrap()
1227                            .as_secs() as i64
1228                    });
1229                let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1230
1231                if eval_result.is_ok() {
1232                    let mut changed_fields = Vec::new();
1233                    for (path, old_value) in
1234                        entity_bytecode.computed_paths.iter().zip(old_values.iter())
1235                    {
1236                        let new_value = Self::get_value_at_path(&entity_state, path);
1237                        let changed = new_value != *old_value;
1238                        let will_emit = should_emit(path);
1239
1240                        if changed && will_emit {
1241                            dirty_tracker.mark_replaced(path);
1242                            changed_fields.push(path.clone());
1243                        }
1244                    }
1245                }
1246            }
1247
1248            state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1249
1250            if dirty_tracker.is_empty() {
1251                continue;
1252            }
1253
1254            let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1255
1256            mutations.push(Mutation {
1257                export: target.entity_name.clone(),
1258                key: target.primary_key.clone(),
1259                patch,
1260                append: vec![],
1261            });
1262        }
1263
1264        Ok(mutations)
1265    }
1266
1267    pub fn enqueue_resolver_request(
1268        &mut self,
1269        _cache_key: String,
1270        resolver: ResolverType,
1271        input: Value,
1272        target: ResolverTarget,
1273    ) {
1274        let cache_key = resolver_cache_key(&resolver, &input);
1275
1276        if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1277            entry.add_target(target);
1278            return;
1279        }
1280
1281        let queued_at = std::time::SystemTime::now()
1282            .duration_since(std::time::UNIX_EPOCH)
1283            .unwrap()
1284            .as_secs() as i64;
1285
1286        self.resolver_pending.insert(
1287            cache_key.clone(),
1288            PendingResolverEntry {
1289                resolver: resolver.clone(),
1290                input: input.clone(),
1291                targets: vec![target],
1292                queued_at,
1293            },
1294        );
1295
1296        self.resolver_requests.push_back(ResolverRequest {
1297            cache_key,
1298            resolver,
1299            input,
1300        });
1301    }
1302
1303    fn apply_resolver_extractions_to_value<F>(
1304        state: &mut Value,
1305        resolved_value: &Value,
1306        extracts: &[ResolverExtractSpec],
1307        dirty_tracker: &mut DirtyTracker,
1308        should_emit: &F,
1309    ) -> Result<()>
1310    where
1311        F: Fn(&str) -> bool,
1312    {
1313        for extract in extracts {
1314            let resolved = match extract.source_path.as_deref() {
1315                Some(path) => Self::get_value_at_path(resolved_value, path),
1316                None => Some(resolved_value.clone()),
1317            };
1318
1319            let Some(value) = resolved else {
1320                continue;
1321            };
1322
1323            let value = if let Some(transform) = &extract.transform {
1324                Self::apply_transformation(&value, transform)?
1325            } else {
1326                value
1327            };
1328
1329            Self::set_nested_field_value(state, &extract.target_path, value)?;
1330            if should_emit(&extract.target_path) {
1331                dirty_tracker.mark_replaced(&extract.target_path);
1332            }
1333        }
1334
1335        Ok(())
1336    }
1337
1338    fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1339        if tracker.is_empty() {
1340            return Ok(json!({}));
1341        }
1342
1343        let mut partial = serde_json::Map::new();
1344
1345        for (path, change) in tracker.iter() {
1346            let segments: Vec<&str> = path.split('.').collect();
1347
1348            let value_to_insert = match change {
1349                FieldChange::Replaced => {
1350                    let mut current = state;
1351                    let mut found = true;
1352
1353                    for segment in &segments {
1354                        match current.get(*segment) {
1355                            Some(v) => current = v,
1356                            None => {
1357                                found = false;
1358                                break;
1359                            }
1360                        }
1361                    }
1362
1363                    if !found {
1364                        continue;
1365                    }
1366                    current.clone()
1367                }
1368                FieldChange::Appended(values) => Value::Array(values.clone()),
1369            };
1370
1371            let mut target = &mut partial;
1372            for (i, segment) in segments.iter().enumerate() {
1373                if i == segments.len() - 1 {
1374                    target.insert(segment.to_string(), value_to_insert.clone());
1375                } else {
1376                    target
1377                        .entry(segment.to_string())
1378                        .or_insert_with(|| json!({}));
1379                    target = target
1380                        .get_mut(*segment)
1381                        .and_then(|v| v.as_object_mut())
1382                        .ok_or("Failed to build nested structure")?;
1383                }
1384            }
1385        }
1386
1387        Ok(Value::Object(partial))
1388    }
1389
1390    /// Get a mutable reference to a state table by ID
1391    /// Returns None if the state ID doesn't exist
1392    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1393        self.states.get_mut(&state_id)
1394    }
1395
1396    /// Get public access to registers (for metrics context)
1397    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1398        &mut self.registers
1399    }
1400
1401    /// Get public access to path cache (for metrics context)
1402    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1403        &self.path_cache
1404    }
1405
1406    /// Get the current update context
1407    pub fn current_context(&self) -> Option<&UpdateContext> {
1408        self.current_context.as_ref()
1409    }
1410
1411    /// Set the current update context for computed field evaluation
1412    pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1413        self.current_context = context;
1414    }
1415
1416    fn add_warning(&mut self, msg: String) {
1417        self.warnings.push(msg);
1418    }
1419
1420    pub fn take_warnings(&mut self) -> Vec<String> {
1421        std::mem::take(&mut self.warnings)
1422    }
1423
1424    pub fn has_warnings(&self) -> bool {
1425        !self.warnings.is_empty()
1426    }
1427
1428    pub fn update_state_from_register(
1429        &mut self,
1430        state_id: u32,
1431        key: Value,
1432        register: Register,
1433    ) -> Result<()> {
1434        let state = self.states.get(&state_id).ok_or("State table not found")?;
1435        let value = self.registers[register].clone();
1436        state.insert_with_eviction(key, value);
1437        Ok(())
1438    }
1439
1440    fn reset_registers(&mut self) {
1441        for reg in &mut self.registers {
1442            *reg = Value::Null;
1443        }
1444    }
1445
1446    /// Extract only the dirty fields from state (public for use by instruction hooks)
1447    pub fn extract_partial_state(
1448        &self,
1449        state_reg: Register,
1450        dirty_fields: &HashSet<String>,
1451    ) -> Result<Value> {
1452        let full_state = &self.registers[state_reg];
1453
1454        if dirty_fields.is_empty() {
1455            return Ok(json!({}));
1456        }
1457
1458        let mut partial = serde_json::Map::new();
1459
1460        for path in dirty_fields {
1461            let segments: Vec<&str> = path.split('.').collect();
1462
1463            let mut current = full_state;
1464            let mut found = true;
1465
1466            for segment in &segments {
1467                match current.get(segment) {
1468                    Some(v) => current = v,
1469                    None => {
1470                        found = false;
1471                        break;
1472                    }
1473                }
1474            }
1475
1476            if !found {
1477                continue;
1478            }
1479
1480            let mut target = &mut partial;
1481            for (i, segment) in segments.iter().enumerate() {
1482                if i == segments.len() - 1 {
1483                    target.insert(segment.to_string(), current.clone());
1484                } else {
1485                    target
1486                        .entry(segment.to_string())
1487                        .or_insert_with(|| json!({}));
1488                    target = target
1489                        .get_mut(*segment)
1490                        .and_then(|v| v.as_object_mut())
1491                        .ok_or("Failed to build nested structure")?;
1492                }
1493            }
1494        }
1495
1496        Ok(Value::Object(partial))
1497    }
1498
1499    /// Extract a patch from state based on the DirtyTracker.
1500    /// For Replaced fields: extracts the full value from state.
1501    /// For Appended fields: emits only the appended values as an array.
1502    pub fn extract_partial_state_with_tracker(
1503        &self,
1504        state_reg: Register,
1505        tracker: &DirtyTracker,
1506    ) -> Result<Value> {
1507        let full_state = &self.registers[state_reg];
1508
1509        if tracker.is_empty() {
1510            return Ok(json!({}));
1511        }
1512
1513        let mut partial = serde_json::Map::new();
1514
1515        for (path, change) in tracker.iter() {
1516            let segments: Vec<&str> = path.split('.').collect();
1517
1518            let value_to_insert = match change {
1519                FieldChange::Replaced => {
1520                    let mut current = full_state;
1521                    let mut found = true;
1522
1523                    for segment in &segments {
1524                        match current.get(*segment) {
1525                            Some(v) => current = v,
1526                            None => {
1527                                found = false;
1528                                break;
1529                            }
1530                        }
1531                    }
1532
1533                    if !found {
1534                        continue;
1535                    }
1536                    current.clone()
1537                }
1538                FieldChange::Appended(values) => Value::Array(values.clone()),
1539            };
1540
1541            let mut target = &mut partial;
1542            for (i, segment) in segments.iter().enumerate() {
1543                if i == segments.len() - 1 {
1544                    target.insert(segment.to_string(), value_to_insert.clone());
1545                } else {
1546                    target
1547                        .entry(segment.to_string())
1548                        .or_insert_with(|| json!({}));
1549                    target = target
1550                        .get_mut(*segment)
1551                        .and_then(|v| v.as_object_mut())
1552                        .ok_or("Failed to build nested structure")?;
1553                }
1554            }
1555        }
1556
1557        Ok(Value::Object(partial))
1558    }
1559
1560    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1561        if let Some(compiled) = self.path_cache.get(path) {
1562            self.cache_hits += 1;
1563            #[cfg(feature = "otel")]
1564            crate::vm_metrics::record_path_cache_hit();
1565            return compiled.clone();
1566        }
1567        #[cfg(feature = "otel")]
1568        crate::vm_metrics::record_path_cache_miss();
1569        let compiled = CompiledPath::new(path);
1570        self.path_cache.insert(path.to_string(), compiled.clone());
1571        compiled
1572    }
1573
1574    /// Process an event with optional context metadata
1575    #[cfg_attr(feature = "otel", instrument(
1576        name = "vm.process_event",
1577        skip(self, bytecode, event_value, log),
1578        level = "info",
1579        fields(
1580            event_type = %event_type,
1581            slot = context.as_ref().and_then(|c| c.slot),
1582        )
1583    ))]
1584    pub fn process_event(
1585        &mut self,
1586        bytecode: &MultiEntityBytecode,
1587        event_value: Value,
1588        event_type: &str,
1589        context: Option<&UpdateContext>,
1590        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1591    ) -> Result<Vec<Mutation>> {
1592        self.current_context = context.cloned();
1593
1594        let mut event_value = event_value;
1595        if let Some(ctx) = context {
1596            if let Some(obj) = event_value.as_object_mut() {
1597                obj.insert("__update_context".to_string(), ctx.to_value());
1598            }
1599        }
1600
1601        let mut all_mutations = Vec::new();
1602
1603        if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1604            if let Some(ctx) = context {
1605                if let Some(signature) = ctx.signature.clone() {
1606                    let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1607                    for state_id in state_ids {
1608                        if let Some(state) = self.states.get(&state_id) {
1609                            {
1610                                let mut cache = state.recent_tx_instructions.lock().unwrap();
1611                                let entry =
1612                                    cache.get_or_insert_mut(signature.clone(), HashSet::new);
1613                                entry.insert(event_type.to_string());
1614                            }
1615
1616                            let key = (signature.clone(), event_type.to_string());
1617                            if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1618                                tracing::debug!(
1619                                    event_type = %event_type,
1620                                    signature = %signature,
1621                                    deferred_count = deferred_ops.len(),
1622                                    "flushing deferred when-ops"
1623                                );
1624                                for op in deferred_ops {
1625                                    // Look up the entity bytecode to get the computed fields evaluator
1626                                    let (evaluator, computed_paths) = bytecode
1627                                        .entities
1628                                        .get(&op.entity_name)
1629                                        .map(|eb| {
1630                                            (
1631                                                eb.computed_fields_evaluator.as_ref(),
1632                                                eb.computed_paths.as_slice(),
1633                                            )
1634                                        })
1635                                        .unwrap_or((None, &[]));
1636
1637                                    match self.apply_deferred_when_op(
1638                                        state_id,
1639                                        &op,
1640                                        evaluator,
1641                                        Some(computed_paths),
1642                                    ) {
1643                                        Ok(mutations) => all_mutations.extend(mutations),
1644                                        Err(e) => tracing::warn!(
1645                                            "Failed to apply deferred when-op: {}",
1646                                            e
1647                                        ),
1648                                    }
1649                                }
1650                            }
1651                        }
1652                    }
1653                }
1654            }
1655        }
1656
1657        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1658            for entity_name in entity_names {
1659                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1660                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1661                        if let Some(ref mut log) = log {
1662                            log.set("entity", entity_name.clone());
1663                            log.inc("handlers", 1);
1664                        }
1665
1666                        let opcodes_before = self.instructions_executed;
1667                        let cache_before = self.cache_hits;
1668                        let pda_hits_before = self.pda_cache_hits;
1669                        let pda_misses_before = self.pda_cache_misses;
1670
1671                        let mutations = self.execute_handler(
1672                            handler,
1673                            &event_value,
1674                            event_type,
1675                            entity_bytecode.state_id,
1676                            entity_name,
1677                            entity_bytecode.computed_fields_evaluator.as_ref(),
1678                            Some(&entity_bytecode.non_emitted_fields),
1679                        )?;
1680
1681                        if let Some(ref mut log) = log {
1682                            log.inc(
1683                                "opcodes",
1684                                (self.instructions_executed - opcodes_before) as i64,
1685                            );
1686                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1687                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1688                            log.inc(
1689                                "pda_misses",
1690                                (self.pda_cache_misses - pda_misses_before) as i64,
1691                            );
1692                        }
1693
1694                        if mutations.is_empty() {
1695                            // CPI events (suffix "CpiEvent") are transaction-scoped like instructions
1696                            // (suffix "IxState") and should be queued the same way when PDA lookup fails.
1697                            let is_tx_event =
1698                                event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1699                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1700                                if is_tx_event {
1701                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1702                                    let signature = context
1703                                        .and_then(|c| c.signature.clone())
1704                                        .unwrap_or_default();
1705                                    let _ = self.queue_instruction_event(
1706                                        entity_bytecode.state_id,
1707                                        QueuedInstructionEvent {
1708                                            pda_address: missed_pda,
1709                                            event_type: event_type.to_string(),
1710                                            event_data: event_value.clone(),
1711                                            slot,
1712                                            signature,
1713                                        },
1714                                    );
1715                                } else {
1716                                    // Queue account updates (e.g. BondingCurve) when PDA
1717                                    // reverse lookup fails. These will be flushed when an
1718                                    // instruction registers the PDA mapping via
1719                                    // UpdateLookupIndex.
1720                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1721                                    let signature = context
1722                                        .and_then(|c| c.signature.clone())
1723                                        .unwrap_or_default();
1724                                    if let Some(write_version) =
1725                                        context.and_then(|c| c.write_version)
1726                                    {
1727                                        let _ = self.queue_account_update(
1728                                            entity_bytecode.state_id,
1729                                            QueuedAccountUpdate {
1730                                                pda_address: missed_pda,
1731                                                account_type: event_type.to_string(),
1732                                                account_data: event_value.clone(),
1733                                                slot,
1734                                                write_version,
1735                                                signature,
1736                                            },
1737                                        );
1738                                    } else {
1739                                        tracing::warn!(
1740                                            event_type = %event_type,
1741                                            "Dropping queued account update: write_version missing from context"
1742                                        );
1743                                    }
1744                                }
1745                            }
1746                            if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1747                                if !is_tx_event {
1748                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1749                                    let signature = context
1750                                        .and_then(|c| c.signature.clone())
1751                                        .unwrap_or_default();
1752                                    if let Some(write_version) =
1753                                        context.and_then(|c| c.write_version)
1754                                    {
1755                                        let _ = self.queue_account_update(
1756                                            entity_bytecode.state_id,
1757                                            QueuedAccountUpdate {
1758                                                pda_address: missed_lookup,
1759                                                account_type: event_type.to_string(),
1760                                                account_data: event_value.clone(),
1761                                                slot,
1762                                                write_version,
1763                                                signature,
1764                                            },
1765                                        );
1766                                    } else {
1767                                        tracing::trace!(
1768                                            event_type = %event_type,
1769                                            "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1770                                        );
1771                                    }
1772                                }
1773                            }
1774                        }
1775
1776                        all_mutations.extend(mutations);
1777
1778                        if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1779                            if let Some(ctx) = context {
1780                                if let Some(ref signature) = ctx.signature {
1781                                    if let Some(state) = self.states.get(&entity_bytecode.state_id)
1782                                    {
1783                                        {
1784                                            let mut cache =
1785                                                state.recent_tx_instructions.lock().unwrap();
1786                                            let entry = cache
1787                                                .get_or_insert_mut(signature.clone(), HashSet::new);
1788                                            entry.insert(event_type.to_string());
1789                                        }
1790
1791                                        let key = (signature.clone(), event_type.to_string());
1792                                        if let Some((_, deferred_ops)) =
1793                                            state.deferred_when_ops.remove(&key)
1794                                        {
1795                                            tracing::debug!(
1796                                                event_type = %event_type,
1797                                                signature = %signature,
1798                                                deferred_count = deferred_ops.len(),
1799                                                "flushing deferred when-ops"
1800                                            );
1801                                            for op in deferred_ops {
1802                                                match self.apply_deferred_when_op(
1803                                                    entity_bytecode.state_id,
1804                                                    &op,
1805                                                    entity_bytecode
1806                                                        .computed_fields_evaluator
1807                                                        .as_ref(),
1808                                                    Some(&entity_bytecode.computed_paths),
1809                                                ) {
1810                                                    Ok(mutations) => {
1811                                                        all_mutations.extend(mutations)
1812                                                    }
1813                                                    Err(e) => {
1814                                                        tracing::warn!(
1815                                                            "Failed to apply deferred when-op: {}",
1816                                                            e
1817                                                        );
1818                                                    }
1819                                                }
1820                                            }
1821                                        }
1822                                    }
1823                                }
1824                            }
1825                        }
1826
1827                        if let Some(registered_pda) = self.take_last_pda_registered() {
1828                            let pending_events = self.flush_pending_instruction_events(
1829                                entity_bytecode.state_id,
1830                                &registered_pda,
1831                            );
1832                            for pending in pending_events {
1833                                if let Some(pending_handler) =
1834                                    entity_bytecode.handlers.get(&pending.event_type)
1835                                {
1836                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1837                                        pending_handler,
1838                                        &pending.event_data,
1839                                        &pending.event_type,
1840                                        entity_bytecode.state_id,
1841                                        entity_name,
1842                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1843                                        Some(&entity_bytecode.non_emitted_fields),
1844                                    ) {
1845                                        all_mutations.extend(reprocessed_mutations);
1846                                    }
1847                                }
1848                            }
1849                        }
1850
1851                        let lookup_keys = self.take_last_lookup_index_keys();
1852                        if !lookup_keys.is_empty() {
1853                            tracing::info!(
1854                                keys = ?lookup_keys,
1855                                entity = %entity_name,
1856                                "vm.process_event: flushing pending updates for lookup_keys"
1857                            );
1858                        }
1859                        for lookup_key in lookup_keys {
1860                            if let Ok(pending_updates) =
1861                                self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1862                            {
1863                                for pending in pending_updates {
1864                                    if let Some(pending_handler) =
1865                                        entity_bytecode.handlers.get(&pending.account_type)
1866                                    {
1867                                        self.current_context = Some(UpdateContext::new_account(
1868                                            pending.slot,
1869                                            pending.signature.clone(),
1870                                            pending.write_version,
1871                                        ));
1872                                        match self.execute_handler(
1873                                            pending_handler,
1874                                            &pending.account_data,
1875                                            &pending.account_type,
1876                                            entity_bytecode.state_id,
1877                                            entity_name,
1878                                            entity_bytecode.computed_fields_evaluator.as_ref(),
1879                                            Some(&entity_bytecode.non_emitted_fields),
1880                                        ) {
1881                                            Ok(reprocessed) => {
1882                                                all_mutations.extend(reprocessed);
1883                                            }
1884                                            Err(e) => {
1885                                                tracing::warn!(
1886                                                    error = %e,
1887                                                    account_type = %pending.account_type,
1888                                                    "Flushed event reprocessing failed"
1889                                                );
1890                                            }
1891                                        }
1892                                    }
1893                                }
1894                            }
1895                        }
1896                    } else if let Some(ref mut log) = log {
1897                        log.set("skip_reason", "no_handler");
1898                    }
1899                } else if let Some(ref mut log) = log {
1900                    log.set("skip_reason", "entity_not_found");
1901                }
1902            }
1903        } else if let Some(ref mut log) = log {
1904            log.set("skip_reason", "no_event_routing");
1905        }
1906
1907        if let Some(log) = log {
1908            log.set("mutations", all_mutations.len() as i64);
1909            if let Some(first) = all_mutations.first() {
1910                if let Some(key_str) = first.key.as_str() {
1911                    log.set("primary_key", key_str);
1912                } else if let Some(key_num) = first.key.as_u64() {
1913                    log.set("primary_key", key_num as i64);
1914                }
1915            }
1916            if let Some(state) = self.states.get(&0) {
1917                log.set("state_table_size", state.data.len() as i64);
1918            }
1919
1920            let warnings = self.take_warnings();
1921            if !warnings.is_empty() {
1922                log.set("warnings", warnings.len() as i64);
1923                log.set(
1924                    "warning_messages",
1925                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1926                );
1927                log.set_level(crate::canonical_log::LogLevel::Warn);
1928            }
1929        } else {
1930            self.warnings.clear();
1931        }
1932
1933        if self.instructions_executed.is_multiple_of(1000) {
1934            let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1935            for state_id in state_ids {
1936                let expired = self.cleanup_expired_when_ops(state_id, 60);
1937                if expired > 0 {
1938                    tracing::debug!(
1939                        "Cleaned up {} expired deferred when-ops for state {}",
1940                        expired,
1941                        state_id
1942                    );
1943                }
1944            }
1945        }
1946
1947        Ok(all_mutations)
1948    }
1949
1950    pub fn process_any(
1951        &mut self,
1952        bytecode: &MultiEntityBytecode,
1953        any: prost_types::Any,
1954    ) -> Result<Vec<Mutation>> {
1955        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1956        self.process_event(bytecode, event_value, &event_type, None, None)
1957    }
1958
1959    #[cfg_attr(feature = "otel", instrument(
1960        name = "vm.execute_handler",
1961        skip(self, handler, event_value, entity_evaluator),
1962        level = "debug",
1963        fields(
1964            event_type = %event_type,
1965            handler_opcodes = handler.len(),
1966        )
1967    ))]
1968    #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1969    fn execute_handler(
1970        &mut self,
1971        handler: &[OpCode],
1972        event_value: &Value,
1973        event_type: &str,
1974        override_state_id: u32,
1975        entity_name: &str,
1976        entity_evaluator: Option<
1977            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1978        >,
1979        non_emitted_fields: Option<&HashSet<String>>,
1980    ) -> Result<Vec<Mutation>> {
1981        self.reset_registers();
1982        self.last_pda_lookup_miss = None;
1983
1984        let mut pc: usize = 0;
1985        let mut output = Vec::new();
1986        let mut dirty_tracker = DirtyTracker::new();
1987        let should_emit = |path: &str| {
1988            non_emitted_fields
1989                .map(|fields| !fields.contains(path))
1990                .unwrap_or(true)
1991        };
1992
1993        while pc < handler.len() {
1994            match &handler[pc] {
1995                OpCode::LoadEventField {
1996                    path,
1997                    dest,
1998                    default,
1999                } => {
2000                    let value = self.load_field(event_value, path, default.as_ref())?;
2001                    self.registers[*dest] = value;
2002                    pc += 1;
2003                }
2004                OpCode::LoadConstant { value, dest } => {
2005                    self.registers[*dest] = value.clone();
2006                    pc += 1;
2007                }
2008                OpCode::CopyRegister { source, dest } => {
2009                    self.registers[*dest] = self.registers[*source].clone();
2010                    pc += 1;
2011                }
2012                OpCode::CopyRegisterIfNull { source, dest } => {
2013                    if self.registers[*dest].is_null() {
2014                        self.registers[*dest] = self.registers[*source].clone();
2015                    }
2016                    pc += 1;
2017                }
2018                OpCode::GetEventType { dest } => {
2019                    self.registers[*dest] = json!(event_type);
2020                    pc += 1;
2021                }
2022                OpCode::CreateObject { dest } => {
2023                    self.registers[*dest] = json!({});
2024                    pc += 1;
2025                }
2026                OpCode::SetField {
2027                    object,
2028                    path,
2029                    value,
2030                } => {
2031                    self.set_field_auto_vivify(*object, path, *value)?;
2032                    if should_emit(path) {
2033                        dirty_tracker.mark_replaced(path);
2034                    }
2035                    pc += 1;
2036                }
2037                OpCode::SetFields { object, fields } => {
2038                    for (path, value_reg) in fields {
2039                        self.set_field_auto_vivify(*object, path, *value_reg)?;
2040                        if should_emit(path) {
2041                            dirty_tracker.mark_replaced(path);
2042                        }
2043                    }
2044                    pc += 1;
2045                }
2046                OpCode::GetField { object, path, dest } => {
2047                    let value = self.get_field(*object, path)?;
2048                    self.registers[*dest] = value;
2049                    pc += 1;
2050                }
2051                OpCode::AbortIfNullKey {
2052                    key,
2053                    is_account_event,
2054                } => {
2055                    let key_value = &self.registers[*key];
2056                    if key_value.is_null() && *is_account_event {
2057                        tracing::debug!(
2058                            event_type = %event_type,
2059                            "AbortIfNullKey: key is null for account state event, \
2060                             returning empty mutations for queueing"
2061                        );
2062                        return Ok(Vec::new());
2063                    }
2064
2065                    pc += 1;
2066                }
2067                OpCode::ReadOrInitState {
2068                    state_id: _,
2069                    key,
2070                    default,
2071                    dest,
2072                } => {
2073                    let actual_state_id = override_state_id;
2074                    let entity_name_owned = entity_name.to_string();
2075                    self.states
2076                        .entry(actual_state_id)
2077                        .or_insert_with(|| StateTable {
2078                            data: DashMap::new(),
2079                            access_times: DashMap::new(),
2080                            lookup_indexes: HashMap::new(),
2081                            temporal_indexes: HashMap::new(),
2082                            pda_reverse_lookups: HashMap::new(),
2083                            pending_updates: DashMap::new(),
2084                            pending_instruction_events: DashMap::new(),
2085                            last_account_data: DashMap::new(),
2086                            version_tracker: VersionTracker::new(),
2087                            instruction_dedup_cache: VersionTracker::with_capacity(
2088                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
2089                            ),
2090                            config: StateTableConfig::default(),
2091                            entity_name: entity_name_owned,
2092                            recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
2093                                NonZeroUsize::new(1000).unwrap(),
2094                            )),
2095                            deferred_when_ops: DashMap::new(),
2096                        });
2097                    let key_value = self.registers[*key].clone();
2098                    // Warn if key is null for account state events (not instruction events or CPI events)
2099                    let warn_null_key = key_value.is_null()
2100                        && event_type.ends_with("State")
2101                        && !event_type.ends_with("IxState")
2102                        && !event_type.ends_with("CpiEvent");
2103
2104                    if warn_null_key {
2105                        self.add_warning(format!(
2106                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
2107                            key, event_type
2108                        ));
2109                    }
2110
2111                    let state = self
2112                        .states
2113                        .get(&actual_state_id)
2114                        .ok_or("State table not found")?;
2115
2116                    if !key_value.is_null() {
2117                        if let Some(ctx) = &self.current_context {
2118                            // Account updates: use recency check to discard stale updates
2119                            // IMPORTANT: Use account address (PDA) as the key, not entity key,
2120                            // because multiple accounts can update the same entity and each
2121                            // has its own independent write_version
2122                            if ctx.is_account_update() {
2123                                if let (Some(slot), Some(write_version)) =
2124                                    (ctx.slot, ctx.write_version)
2125                                {
2126                                    // Get account address from event_value for proper tracking
2127                                    let account_address = event_value
2128                                        .get("__account_address")
2129                                        .cloned()
2130                                        .unwrap_or_else(|| key_value.clone());
2131
2132                                    if !state.is_fresh_update(
2133                                        &account_address,
2134                                        event_type,
2135                                        slot,
2136                                        write_version,
2137                                    ) {
2138                                        self.add_warning(format!(
2139                                            "Stale account update skipped: slot={}, write_version={}, account={}",
2140                                            slot, write_version, account_address
2141                                        ));
2142                                        return Ok(Vec::new());
2143                                    }
2144                                }
2145                            }
2146                            // Instruction updates: process all, but skip exact duplicates
2147                            else if ctx.is_instruction_update() {
2148                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
2149                                    if state.is_duplicate_instruction(
2150                                        &key_value, event_type, slot, txn_index,
2151                                    ) {
2152                                        self.add_warning(format!(
2153                                            "Duplicate instruction skipped: slot={}, txn_index={}",
2154                                            slot, txn_index
2155                                        ));
2156                                        return Ok(Vec::new());
2157                                    }
2158                                }
2159                            }
2160                        }
2161                    }
2162                    let value = state
2163                        .get_and_touch(&key_value)
2164                        .unwrap_or_else(|| default.clone());
2165
2166                    self.registers[*dest] = value;
2167                    pc += 1;
2168                }
2169                OpCode::UpdateState {
2170                    state_id: _,
2171                    key,
2172                    value,
2173                } => {
2174                    let actual_state_id = override_state_id;
2175                    let state = self
2176                        .states
2177                        .get(&actual_state_id)
2178                        .ok_or("State table not found")?;
2179                    let key_value = self.registers[*key].clone();
2180                    let value_data = self.registers[*value].clone();
2181
2182                    state.insert_with_eviction(key_value, value_data);
2183                    pc += 1;
2184                }
2185                OpCode::AppendToArray {
2186                    object,
2187                    path,
2188                    value,
2189                } => {
2190                    let appended_value = self.registers[*value].clone();
2191                    let max_len = self
2192                        .states
2193                        .get(&override_state_id)
2194                        .map(|s| s.max_array_length())
2195                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2196                    self.append_to_array(*object, path, *value, max_len)?;
2197                    if should_emit(path) {
2198                        dirty_tracker.mark_appended(path, appended_value);
2199                    }
2200                    pc += 1;
2201                }
2202                OpCode::GetCurrentTimestamp { dest } => {
2203                    let timestamp = std::time::SystemTime::now()
2204                        .duration_since(std::time::UNIX_EPOCH)
2205                        .unwrap()
2206                        .as_secs() as i64;
2207                    self.registers[*dest] = json!(timestamp);
2208                    pc += 1;
2209                }
2210                OpCode::CreateEvent { dest, event_value } => {
2211                    let timestamp = std::time::SystemTime::now()
2212                        .duration_since(std::time::UNIX_EPOCH)
2213                        .unwrap()
2214                        .as_secs() as i64;
2215
2216                    // Filter out __update_context from the event data
2217                    let mut event_data = self.registers[*event_value].clone();
2218                    if let Some(obj) = event_data.as_object_mut() {
2219                        obj.remove("__update_context");
2220                    }
2221
2222                    // Create event with timestamp, data, and optional slot/signature from context
2223                    let mut event = serde_json::Map::new();
2224                    event.insert("timestamp".to_string(), json!(timestamp));
2225                    event.insert("data".to_string(), event_data);
2226
2227                    // Add slot and signature if available from current context
2228                    if let Some(ref ctx) = self.current_context {
2229                        if let Some(slot) = ctx.slot {
2230                            event.insert("slot".to_string(), json!(slot));
2231                        }
2232                        if let Some(ref signature) = ctx.signature {
2233                            event.insert("signature".to_string(), json!(signature));
2234                        }
2235                    }
2236
2237                    self.registers[*dest] = Value::Object(event);
2238                    pc += 1;
2239                }
2240                OpCode::CreateCapture {
2241                    dest,
2242                    capture_value,
2243                } => {
2244                    let timestamp = std::time::SystemTime::now()
2245                        .duration_since(std::time::UNIX_EPOCH)
2246                        .unwrap()
2247                        .as_secs() as i64;
2248
2249                    // Get the capture data (already filtered by load_field)
2250                    let capture_data = self.registers[*capture_value].clone();
2251
2252                    // Extract account_address from the original event if available
2253                    let account_address = event_value
2254                        .get("__account_address")
2255                        .and_then(|v| v.as_str())
2256                        .unwrap_or("")
2257                        .to_string();
2258
2259                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
2260                    let mut capture = serde_json::Map::new();
2261                    capture.insert("timestamp".to_string(), json!(timestamp));
2262                    capture.insert("account_address".to_string(), json!(account_address));
2263                    capture.insert("data".to_string(), capture_data);
2264
2265                    // Add slot and signature if available from current context
2266                    if let Some(ref ctx) = self.current_context {
2267                        if let Some(slot) = ctx.slot {
2268                            capture.insert("slot".to_string(), json!(slot));
2269                        }
2270                        if let Some(ref signature) = ctx.signature {
2271                            capture.insert("signature".to_string(), json!(signature));
2272                        }
2273                    }
2274
2275                    self.registers[*dest] = Value::Object(capture);
2276                    pc += 1;
2277                }
2278                OpCode::Transform {
2279                    source,
2280                    dest,
2281                    transformation,
2282                } => {
2283                    if source == dest {
2284                        self.transform_in_place(*source, transformation)?;
2285                    } else {
2286                        let source_value = &self.registers[*source];
2287                        let value = Self::apply_transformation(source_value, transformation)?;
2288                        self.registers[*dest] = value;
2289                    }
2290                    pc += 1;
2291                }
2292                OpCode::EmitMutation {
2293                    entity_name,
2294                    key,
2295                    state,
2296                } => {
2297                    let primary_key = self.registers[*key].clone();
2298
2299                    if primary_key.is_null() || dirty_tracker.is_empty() {
2300                        let reason = if dirty_tracker.is_empty() {
2301                            "no_fields_modified"
2302                        } else {
2303                            "null_primary_key"
2304                        };
2305                        self.add_warning(format!(
2306                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
2307                            entity_name,
2308                            reason,
2309                            dirty_tracker.len()
2310                        ));
2311                    } else {
2312                        let patch =
2313                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2314
2315                        let append = dirty_tracker.appended_paths();
2316                        let mutation = Mutation {
2317                            export: entity_name.clone(),
2318                            key: primary_key,
2319                            patch,
2320                            append,
2321                        };
2322                        output.push(mutation);
2323                    }
2324                    pc += 1;
2325                }
2326                OpCode::SetFieldIfNull {
2327                    object,
2328                    path,
2329                    value,
2330                } => {
2331                    let was_set = self.set_field_if_null(*object, path, *value)?;
2332                    if was_set && should_emit(path) {
2333                        dirty_tracker.mark_replaced(path);
2334                    }
2335                    pc += 1;
2336                }
2337                OpCode::SetFieldMax {
2338                    object,
2339                    path,
2340                    value,
2341                } => {
2342                    let was_updated = self.set_field_max(*object, path, *value)?;
2343                    if was_updated && should_emit(path) {
2344                        dirty_tracker.mark_replaced(path);
2345                    }
2346                    pc += 1;
2347                }
2348                OpCode::UpdateTemporalIndex {
2349                    state_id: _,
2350                    index_name,
2351                    lookup_value,
2352                    primary_key,
2353                    timestamp,
2354                } => {
2355                    let actual_state_id = override_state_id;
2356                    let state = self
2357                        .states
2358                        .get_mut(&actual_state_id)
2359                        .ok_or("State table not found")?;
2360                    let index = state
2361                        .temporal_indexes
2362                        .entry(index_name.clone())
2363                        .or_insert_with(TemporalIndex::new);
2364
2365                    let lookup_val = self.registers[*lookup_value].clone();
2366                    let pk_val = self.registers[*primary_key].clone();
2367                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2368                        val
2369                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
2370                        val as i64
2371                    } else {
2372                        return Err(format!(
2373                            "Timestamp must be a number (i64 or u64), got: {:?}",
2374                            self.registers[*timestamp]
2375                        )
2376                        .into());
2377                    };
2378
2379                    index.insert(lookup_val, pk_val, ts_val);
2380                    pc += 1;
2381                }
2382                OpCode::LookupTemporalIndex {
2383                    state_id: _,
2384                    index_name,
2385                    lookup_value,
2386                    timestamp,
2387                    dest,
2388                } => {
2389                    let actual_state_id = override_state_id;
2390                    let state = self
2391                        .states
2392                        .get(&actual_state_id)
2393                        .ok_or("State table not found")?;
2394                    let lookup_val = &self.registers[*lookup_value];
2395
2396                    let result = if self.registers[*timestamp].is_null() {
2397                        if let Some(index) = state.temporal_indexes.get(index_name) {
2398                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2399                        } else {
2400                            Value::Null
2401                        }
2402                    } else {
2403                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2404                            val
2405                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
2406                            val as i64
2407                        } else {
2408                            return Err(format!(
2409                                "Timestamp must be a number (i64 or u64), got: {:?}",
2410                                self.registers[*timestamp]
2411                            )
2412                            .into());
2413                        };
2414
2415                        if let Some(index) = state.temporal_indexes.get(index_name) {
2416                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2417                        } else {
2418                            Value::Null
2419                        }
2420                    };
2421
2422                    self.registers[*dest] = result;
2423                    pc += 1;
2424                }
2425                OpCode::UpdateLookupIndex {
2426                    state_id: _,
2427                    index_name,
2428                    lookup_value,
2429                    primary_key,
2430                } => {
2431                    let actual_state_id = override_state_id;
2432                    let state = self
2433                        .states
2434                        .get_mut(&actual_state_id)
2435                        .ok_or("State table not found")?;
2436                    let index = state
2437                        .lookup_indexes
2438                        .entry(index_name.clone())
2439                        .or_insert_with(LookupIndex::new);
2440
2441                    let lookup_val = self.registers[*lookup_value].clone();
2442                    let pk_val = self.registers[*primary_key].clone();
2443
2444                    index.insert(lookup_val.clone(), pk_val);
2445
2446                    // Track lookup keys so process_event can flush queued account updates
2447                    if let Some(key_str) = lookup_val.as_str() {
2448                        self.last_lookup_index_keys.push(key_str.to_string());
2449                    }
2450
2451                    pc += 1;
2452                }
2453                OpCode::LookupIndex {
2454                    state_id: _,
2455                    index_name,
2456                    lookup_value,
2457                    dest,
2458                } => {
2459                    let actual_state_id = override_state_id;
2460                    let mut current_value = self.registers[*lookup_value].clone();
2461
2462                    const MAX_CHAIN_DEPTH: usize = 5;
2463                    let mut iterations = 0;
2464
2465                    let final_result = if self.states.contains_key(&actual_state_id) {
2466                        loop {
2467                            iterations += 1;
2468                            if iterations > MAX_CHAIN_DEPTH {
2469                                break current_value;
2470                            }
2471
2472                            let resolved = self
2473                                .states
2474                                .get(&actual_state_id)
2475                                .and_then(|state| {
2476                                    if let Some(index) = state.lookup_indexes.get(index_name) {
2477                                        if let Some(found) = index.lookup(&current_value) {
2478                                            return Some(found);
2479                                        }
2480                                    }
2481
2482                                    for (name, index) in state.lookup_indexes.iter() {
2483                                        if name == index_name {
2484                                            continue;
2485                                        }
2486                                        if let Some(found) = index.lookup(&current_value) {
2487                                            return Some(found);
2488                                        }
2489                                    }
2490
2491                                    None
2492                                })
2493                                .unwrap_or(Value::Null);
2494
2495                            let mut resolved_from_pda = false;
2496                            let resolved = if resolved.is_null() {
2497                                if let Some(pda_str) = current_value.as_str() {
2498                                    resolved_from_pda = true;
2499                                    self.states
2500                                        .get_mut(&actual_state_id)
2501                                        .and_then(|state_mut| {
2502                                            state_mut
2503                                                .pda_reverse_lookups
2504                                                .get_mut("default_pda_lookup")
2505                                        })
2506                                        .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2507                                        .map(Value::String)
2508                                        .unwrap_or(Value::Null)
2509                                } else {
2510                                    Value::Null
2511                                }
2512                            } else {
2513                                resolved
2514                            };
2515
2516                            if resolved.is_null() {
2517                                if iterations == 1 {
2518                                    if let Some(pda_str) = current_value.as_str() {
2519                                        self.last_pda_lookup_miss = Some(pda_str.to_string());
2520                                    }
2521                                }
2522                                break Value::Null;
2523                            }
2524
2525                            let can_chain =
2526                                self.can_resolve_further(&resolved, actual_state_id, index_name);
2527
2528                            if !can_chain {
2529                                if resolved_from_pda {
2530                                    if let Some(resolved_str) = resolved.as_str() {
2531                                        self.last_lookup_index_miss =
2532                                            Some(resolved_str.to_string());
2533                                    }
2534                                    break Value::Null;
2535                                }
2536                                break resolved;
2537                            }
2538
2539                            current_value = resolved;
2540                        }
2541                    } else {
2542                        Value::Null
2543                    };
2544
2545                    self.registers[*dest] = final_result;
2546                    pc += 1;
2547                }
2548                OpCode::SetFieldSum {
2549                    object,
2550                    path,
2551                    value,
2552                } => {
2553                    let was_updated = self.set_field_sum(*object, path, *value)?;
2554                    if was_updated && should_emit(path) {
2555                        dirty_tracker.mark_replaced(path);
2556                    }
2557                    pc += 1;
2558                }
2559                OpCode::SetFieldIncrement { object, path } => {
2560                    let was_updated = self.set_field_increment(*object, path)?;
2561                    if was_updated && should_emit(path) {
2562                        dirty_tracker.mark_replaced(path);
2563                    }
2564                    pc += 1;
2565                }
2566                OpCode::SetFieldMin {
2567                    object,
2568                    path,
2569                    value,
2570                } => {
2571                    let was_updated = self.set_field_min(*object, path, *value)?;
2572                    if was_updated && should_emit(path) {
2573                        dirty_tracker.mark_replaced(path);
2574                    }
2575                    pc += 1;
2576                }
2577                OpCode::AddToUniqueSet {
2578                    state_id: _,
2579                    set_name,
2580                    value,
2581                    count_object,
2582                    count_path,
2583                } => {
2584                    let value_to_add = self.registers[*value].clone();
2585
2586                    // Store the unique set within the entity object, not in the state table
2587                    // This ensures each entity instance has its own unique set
2588                    let set_field_path = format!("__unique_set:{}", set_name);
2589
2590                    // Get or create the unique set from the entity object
2591                    let mut set: HashSet<Value> =
2592                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2593                            if !existing.is_null() {
2594                                serde_json::from_value(existing).unwrap_or_default()
2595                            } else {
2596                                HashSet::new()
2597                            }
2598                        } else {
2599                            HashSet::new()
2600                        };
2601
2602                    // Add value to set
2603                    let was_new = set.insert(value_to_add);
2604
2605                    // Store updated set back in the entity object
2606                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2607                    self.registers[100] = serde_json::to_value(set_as_vec)?;
2608                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2609
2610                    // Update the count field in the object
2611                    if was_new {
2612                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2613                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
2614                        if should_emit(count_path) {
2615                            dirty_tracker.mark_replaced(count_path);
2616                        }
2617                    }
2618
2619                    pc += 1;
2620                }
2621                OpCode::ConditionalSetField {
2622                    object,
2623                    path,
2624                    value,
2625                    condition_field,
2626                    condition_op,
2627                    condition_value,
2628                } => {
2629                    let field_value = self.load_field(event_value, condition_field, None)?;
2630                    let condition_met =
2631                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2632
2633                    if condition_met {
2634                        self.set_field_auto_vivify(*object, path, *value)?;
2635                        if should_emit(path) {
2636                            dirty_tracker.mark_replaced(path);
2637                        }
2638                    }
2639                    pc += 1;
2640                }
2641                OpCode::SetFieldWhen {
2642                    object,
2643                    path,
2644                    value,
2645                    when_instruction,
2646                    entity_name,
2647                    key_reg,
2648                    condition_field,
2649                    condition_op,
2650                    condition_value,
2651                } => {
2652                    let actual_state_id = override_state_id;
2653                    let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2654                        condition_field.as_ref(),
2655                        condition_op.as_ref(),
2656                        condition_value.as_ref(),
2657                    ) {
2658                        let field_value = self.load_field(event_value, field, None)?;
2659                        self.evaluate_comparison(&field_value, op, cond_value)?
2660                    } else {
2661                        true
2662                    };
2663
2664                    if !condition_met {
2665                        pc += 1;
2666                        continue;
2667                    }
2668
2669                    let signature = self
2670                        .current_context
2671                        .as_ref()
2672                        .and_then(|c| c.signature.clone())
2673                        .unwrap_or_default();
2674
2675                    let emit = should_emit(path);
2676
2677                    let instruction_seen = if !signature.is_empty() {
2678                        if let Some(state) = self.states.get(&actual_state_id) {
2679                            let mut cache = state.recent_tx_instructions.lock().unwrap();
2680                            cache
2681                                .get(&signature)
2682                                .map(|set| set.contains(when_instruction))
2683                                .unwrap_or(false)
2684                        } else {
2685                            false
2686                        }
2687                    } else {
2688                        false
2689                    };
2690
2691                    if instruction_seen {
2692                        self.set_field_auto_vivify(*object, path, *value)?;
2693                        if emit {
2694                            dirty_tracker.mark_replaced(path);
2695                        }
2696                    } else if !signature.is_empty() {
2697                        let deferred = DeferredWhenOperation {
2698                            entity_name: entity_name.clone(),
2699                            primary_key: self.registers[*key_reg].clone(),
2700                            field_path: path.clone(),
2701                            field_value: self.registers[*value].clone(),
2702                            when_instruction: when_instruction.clone(),
2703                            signature: signature.clone(),
2704                            slot: self
2705                                .current_context
2706                                .as_ref()
2707                                .and_then(|c| c.slot)
2708                                .unwrap_or(0),
2709                            deferred_at: std::time::SystemTime::now()
2710                                .duration_since(std::time::UNIX_EPOCH)
2711                                .unwrap()
2712                                .as_secs() as i64,
2713                            emit,
2714                        };
2715
2716                        if let Some(state) = self.states.get(&actual_state_id) {
2717                            let key = (signature, when_instruction.clone());
2718                            state
2719                                .deferred_when_ops
2720                                .entry(key)
2721                                .or_insert_with(Vec::new)
2722                                .push(deferred);
2723                        }
2724                    }
2725
2726                    pc += 1;
2727                }
2728                OpCode::SetFieldUnlessStopped {
2729                    object,
2730                    path,
2731                    value,
2732                    stop_field,
2733                    stop_instruction,
2734                    entity_name,
2735                    key_reg: _,
2736                } => {
2737                    let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2738                    let stopped = stop_value.as_bool().unwrap_or(false);
2739
2740                    if stopped {
2741                        tracing::debug!(
2742                            entity = %entity_name,
2743                            field = %path,
2744                            stop_field = %stop_field,
2745                            stop_instruction = %stop_instruction,
2746                            "stop flag set; skipping field update"
2747                        );
2748                        pc += 1;
2749                        continue;
2750                    }
2751
2752                    self.set_field_auto_vivify(*object, path, *value)?;
2753                    if should_emit(path) {
2754                        dirty_tracker.mark_replaced(path);
2755                    }
2756                    pc += 1;
2757                }
2758                OpCode::ConditionalIncrement {
2759                    object,
2760                    path,
2761                    condition_field,
2762                    condition_op,
2763                    condition_value,
2764                } => {
2765                    let field_value = self.load_field(event_value, condition_field, None)?;
2766                    let condition_met =
2767                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2768
2769                    if condition_met {
2770                        let was_updated = self.set_field_increment(*object, path)?;
2771                        if was_updated && should_emit(path) {
2772                            dirty_tracker.mark_replaced(path);
2773                        }
2774                    }
2775                    pc += 1;
2776                }
2777                OpCode::EvaluateComputedFields {
2778                    state,
2779                    computed_paths,
2780                } => {
2781                    if let Some(evaluator) = entity_evaluator {
2782                        let old_values: Vec<_> = computed_paths
2783                            .iter()
2784                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2785                            .collect();
2786
2787                        let state_value = &mut self.registers[*state];
2788                        let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2789                        let context_timestamp = self
2790                            .current_context
2791                            .as_ref()
2792                            .map(|c| c.timestamp())
2793                            .unwrap_or_else(|| {
2794                                std::time::SystemTime::now()
2795                                    .duration_since(std::time::UNIX_EPOCH)
2796                                    .unwrap()
2797                                    .as_secs() as i64
2798                            });
2799                        let eval_result = evaluator(state_value, context_slot, context_timestamp);
2800
2801                        if eval_result.is_ok() {
2802                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2803                                let new_value =
2804                                    Self::get_value_at_path(&self.registers[*state], path);
2805
2806                                if new_value != *old_value && should_emit(path) {
2807                                    dirty_tracker.mark_replaced(path);
2808                                }
2809                            }
2810                        }
2811                    }
2812                    pc += 1;
2813                }
2814                OpCode::QueueResolver {
2815                    state_id: _,
2816                    entity_name,
2817                    resolver,
2818                    input_path,
2819                    input_value,
2820                    url_template,
2821                    strategy,
2822                    extracts,
2823                    condition,
2824                    schedule_at,
2825                    state,
2826                    key,
2827                } => {
2828                    let actual_state_id = override_state_id;
2829
2830                    // Skip resolvers for reprocessed cached data from PDA mapping changes.
2831                    // Stale data can carry wrong field values (e.g. old entropy_value) and
2832                    // wrong schedule_at slots that would lock in incorrect results via SetOnce.
2833                    if self
2834                        .current_context
2835                        .as_ref()
2836                        .map(|c| c.skip_resolvers)
2837                        .unwrap_or(false)
2838                    {
2839                        pc += 1;
2840                        continue;
2841                    }
2842
2843                    // Evaluate condition if present
2844                    if let Some(cond) = condition {
2845                        let field_val =
2846                            Self::get_value_at_path(&self.registers[*state], &cond.field_path)
2847                                .unwrap_or(Value::Null);
2848                        let condition_met =
2849                            self.evaluate_comparison(&field_val, &cond.op, &cond.value)?;
2850
2851                        if !condition_met {
2852                            pc += 1;
2853                            continue;
2854                        }
2855                    }
2856
2857                    // Check schedule_at: defer if target slot is in the future
2858                    if let Some(schedule_path) = schedule_at {
2859                        let target_val =
2860                            Self::get_value_at_path(&self.registers[*state], schedule_path);
2861
2862                        match target_val.and_then(|v| v.as_u64()) {
2863                            Some(target_slot) => {
2864                                let current_slot = self
2865                                    .current_context
2866                                    .as_ref()
2867                                    .and_then(|ctx| ctx.slot)
2868                                    .unwrap_or(0);
2869                                if current_slot < target_slot {
2870                                    let key_value = &self.registers[*key];
2871                                    if !key_value.is_null() {
2872                                        self.scheduled_callbacks.push((
2873                                            target_slot,
2874                                            ScheduledCallback {
2875                                                state_id: actual_state_id,
2876                                                entity_name: entity_name.clone(),
2877                                                primary_key: key_value.clone(),
2878                                                resolver: resolver.clone(),
2879                                                url_template: url_template.clone(),
2880                                                input_value: input_value.clone(),
2881                                                input_path: input_path.clone(),
2882                                                condition: condition.clone(),
2883                                                strategy: strategy.clone(),
2884                                                extracts: extracts.clone(),
2885                                                retry_count: 0,
2886                                            },
2887                                        ));
2888                                    }
2889                                    pc += 1;
2890                                    continue;
2891                                }
2892                                // current_slot >= target_slot: fall through to immediate resolution
2893                            }
2894                            None => {
2895                                // schedule_at path is missing or value is not a u64 —
2896                                // skip resolver entirely rather than executing immediately,
2897                                // since the state likely hasn't been fully populated yet.
2898                                pc += 1;
2899                                continue;
2900                            }
2901                        }
2902                    }
2903
2904                    // Build resolver input from template, literal value, or field path
2905                    let resolved_input = if let Some(template) = url_template {
2906                        crate::scheduler::build_url_from_template(template, &self.registers[*state])
2907                            .map(Value::String)
2908                    } else if let Some(value) = input_value {
2909                        Some(value.clone())
2910                    } else if let Some(path) = input_path.as_ref() {
2911                        Self::get_value_at_path(&self.registers[*state], path)
2912                    } else {
2913                        None
2914                    };
2915
2916                    if let Some(input) = resolved_input {
2917                        let key_value = &self.registers[*key];
2918
2919                        if input.is_null() || key_value.is_null() {
2920                            pc += 1;
2921                            continue;
2922                        }
2923
2924                        if matches!(strategy, ResolveStrategy::SetOnce)
2925                            // all(): fire resolver if any target is still null.
2926                            // Already-set fields are protected from overwrite by
2927                            // set_value_at_path's SetOnce null-source guard.
2928                            && extracts.iter().all(|extract| {
2929                                match Self::get_value_at_path(
2930                                    &self.registers[*state],
2931                                    &extract.target_path,
2932                                ) {
2933                                    Some(value) => !value.is_null(),
2934                                    None => false,
2935                                }
2936                            })
2937                        {
2938                            pc += 1;
2939                            continue;
2940                        }
2941
2942                        let cache_key = resolver_cache_key(resolver, &input);
2943
2944                        if let Some(cached) = self.get_cached_resolver_value(&cache_key) {
2945                            Self::apply_resolver_extractions_to_value(
2946                                &mut self.registers[*state],
2947                                &cached,
2948                                extracts,
2949                                &mut dirty_tracker,
2950                                &should_emit,
2951                            )?;
2952                        } else {
2953                            let target = ResolverTarget {
2954                                state_id: actual_state_id,
2955                                entity_name: entity_name.clone(),
2956                                primary_key: self.registers[*key].clone(),
2957                                extracts: extracts.clone(),
2958                            };
2959
2960                            self.enqueue_resolver_request(
2961                                cache_key,
2962                                resolver.clone(),
2963                                input,
2964                                target,
2965                            );
2966                        }
2967                    }
2968
2969                    pc += 1;
2970                }
2971                OpCode::UpdatePdaReverseLookup {
2972                    state_id: _,
2973                    lookup_name,
2974                    pda_address,
2975                    primary_key,
2976                } => {
2977                    let actual_state_id = override_state_id;
2978                    let state = self
2979                        .states
2980                        .get_mut(&actual_state_id)
2981                        .ok_or("State table not found")?;
2982
2983                    let pda_val = self.registers[*pda_address].clone();
2984                    let pk_val = self.registers[*primary_key].clone();
2985
2986                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2987                        let pda_lookup = state
2988                            .pda_reverse_lookups
2989                            .entry(lookup_name.clone())
2990                            .or_insert_with(|| {
2991                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2992                            });
2993
2994                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2995                        self.last_pda_registered = Some(pda_str.to_string());
2996                    } else if !pk_val.is_null() {
2997                        if let Some(pk_num) = pk_val.as_u64() {
2998                            if let Some(pda_str) = pda_val.as_str() {
2999                                let pda_lookup = state
3000                                    .pda_reverse_lookups
3001                                    .entry(lookup_name.clone())
3002                                    .or_insert_with(|| {
3003                                        PdaReverseLookup::new(
3004                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
3005                                        )
3006                                    });
3007
3008                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
3009                                self.last_pda_registered = Some(pda_str.to_string());
3010                            }
3011                        }
3012                    }
3013
3014                    pc += 1;
3015                }
3016            }
3017
3018            self.instructions_executed += 1;
3019        }
3020
3021        Ok(output)
3022    }
3023
3024    fn load_field(
3025        &self,
3026        event_value: &Value,
3027        path: &FieldPath,
3028        default: Option<&Value>,
3029    ) -> Result<Value> {
3030        if path.segments.is_empty() {
3031            if let Some(obj) = event_value.as_object() {
3032                let filtered: serde_json::Map<String, Value> = obj
3033                    .iter()
3034                    .filter(|(k, _)| !k.starts_with("__"))
3035                    .map(|(k, v)| (k.clone(), v.clone()))
3036                    .collect();
3037                return Ok(Value::Object(filtered));
3038            }
3039            return Ok(event_value.clone());
3040        }
3041
3042        let mut current = event_value;
3043        for segment in path.segments.iter() {
3044            current = match current.get(segment) {
3045                Some(v) => v,
3046                None => {
3047                    tracing::trace!(
3048                        "load_field: segment={:?} not found in {:?}, returning default",
3049                        segment,
3050                        current
3051                    );
3052                    return Ok(default.cloned().unwrap_or(Value::Null));
3053                }
3054            };
3055        }
3056
3057        tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
3058        Ok(current.clone())
3059    }
3060
3061    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
3062        let mut current = value;
3063        for segment in path.split('.') {
3064            current = current.get(segment)?;
3065        }
3066        Some(current.clone())
3067    }
3068
3069    fn set_field_auto_vivify(
3070        &mut self,
3071        object_reg: Register,
3072        path: &str,
3073        value_reg: Register,
3074    ) -> Result<()> {
3075        let compiled = self.get_compiled_path(path);
3076        let segments = compiled.segments();
3077        let value = self.registers[value_reg].clone();
3078
3079        if !self.registers[object_reg].is_object() {
3080            self.registers[object_reg] = json!({});
3081        }
3082
3083        let obj = self.registers[object_reg]
3084            .as_object_mut()
3085            .ok_or("Not an object")?;
3086
3087        let mut current = obj;
3088        for (i, segment) in segments.iter().enumerate() {
3089            if i == segments.len() - 1 {
3090                current.insert(segment.to_string(), value);
3091                return Ok(());
3092            } else {
3093                current
3094                    .entry(segment.to_string())
3095                    .or_insert_with(|| json!({}));
3096                current = current
3097                    .get_mut(segment)
3098                    .and_then(|v| v.as_object_mut())
3099                    .ok_or("Path collision: expected object")?;
3100            }
3101        }
3102
3103        Ok(())
3104    }
3105
3106    fn set_field_if_null(
3107        &mut self,
3108        object_reg: Register,
3109        path: &str,
3110        value_reg: Register,
3111    ) -> Result<bool> {
3112        let compiled = self.get_compiled_path(path);
3113        let segments = compiled.segments();
3114        let value = self.registers[value_reg].clone();
3115
3116        // SetOnce should only set meaningful values. A null source typically means
3117        // the field doesn't exist in this event type (e.g., instruction events don't
3118        // have account data). Skip to preserve any existing value.
3119        if value.is_null() {
3120            return Ok(false);
3121        }
3122
3123        if !self.registers[object_reg].is_object() {
3124            self.registers[object_reg] = json!({});
3125        }
3126
3127        let obj = self.registers[object_reg]
3128            .as_object_mut()
3129            .ok_or("Not an object")?;
3130
3131        let mut current = obj;
3132        for (i, segment) in segments.iter().enumerate() {
3133            if i == segments.len() - 1 {
3134                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
3135                    current.insert(segment.to_string(), value);
3136                    return Ok(true);
3137                }
3138                return Ok(false);
3139            } else {
3140                current
3141                    .entry(segment.to_string())
3142                    .or_insert_with(|| json!({}));
3143                current = current
3144                    .get_mut(segment)
3145                    .and_then(|v| v.as_object_mut())
3146                    .ok_or("Path collision: expected object")?;
3147            }
3148        }
3149
3150        Ok(false)
3151    }
3152
3153    fn set_field_max(
3154        &mut self,
3155        object_reg: Register,
3156        path: &str,
3157        value_reg: Register,
3158    ) -> Result<bool> {
3159        let compiled = self.get_compiled_path(path);
3160        let segments = compiled.segments();
3161        let new_value = self.registers[value_reg].clone();
3162
3163        if !self.registers[object_reg].is_object() {
3164            self.registers[object_reg] = json!({});
3165        }
3166
3167        let obj = self.registers[object_reg]
3168            .as_object_mut()
3169            .ok_or("Not an object")?;
3170
3171        let mut current = obj;
3172        for (i, segment) in segments.iter().enumerate() {
3173            if i == segments.len() - 1 {
3174                let should_update = if let Some(current_value) = current.get(segment) {
3175                    if current_value.is_null() {
3176                        true
3177                    } else {
3178                        match (current_value.as_i64(), new_value.as_i64()) {
3179                            (Some(current_val), Some(new_val)) => new_val > current_val,
3180                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3181                                new_value.as_u64().unwrap() as i64 > current_val
3182                            }
3183                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3184                                new_val > current_value.as_u64().unwrap() as i64
3185                            }
3186                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3187                                (Some(current_val), Some(new_val)) => new_val > current_val,
3188                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3189                                    (Some(current_val), Some(new_val)) => new_val > current_val,
3190                                    _ => false,
3191                                },
3192                            },
3193                            _ => false,
3194                        }
3195                    }
3196                } else {
3197                    true
3198                };
3199
3200                if should_update {
3201                    current.insert(segment.to_string(), new_value);
3202                    return Ok(true);
3203                }
3204                return Ok(false);
3205            } else {
3206                current
3207                    .entry(segment.to_string())
3208                    .or_insert_with(|| json!({}));
3209                current = current
3210                    .get_mut(segment)
3211                    .and_then(|v| v.as_object_mut())
3212                    .ok_or("Path collision: expected object")?;
3213            }
3214        }
3215
3216        Ok(false)
3217    }
3218
3219    fn set_field_sum(
3220        &mut self,
3221        object_reg: Register,
3222        path: &str,
3223        value_reg: Register,
3224    ) -> Result<bool> {
3225        let compiled = self.get_compiled_path(path);
3226        let segments = compiled.segments();
3227        let new_value = &self.registers[value_reg];
3228
3229        // Extract numeric value before borrowing object_reg mutably
3230        tracing::trace!(
3231            "set_field_sum: path={:?}, value={:?}, value_type={}",
3232            path,
3233            new_value,
3234            match new_value {
3235                serde_json::Value::Null => "null",
3236                serde_json::Value::Bool(_) => "bool",
3237                serde_json::Value::Number(_) => "number",
3238                serde_json::Value::String(_) => "string",
3239                serde_json::Value::Array(_) => "array",
3240                serde_json::Value::Object(_) => "object",
3241            }
3242        );
3243        let new_val_num = new_value
3244            .as_i64()
3245            .or_else(|| new_value.as_u64().map(|n| n as i64))
3246            .ok_or("Sum requires numeric value")?;
3247
3248        if !self.registers[object_reg].is_object() {
3249            self.registers[object_reg] = json!({});
3250        }
3251
3252        let obj = self.registers[object_reg]
3253            .as_object_mut()
3254            .ok_or("Not an object")?;
3255
3256        let mut current = obj;
3257        for (i, segment) in segments.iter().enumerate() {
3258            if i == segments.len() - 1 {
3259                let current_val = current
3260                    .get(segment)
3261                    .and_then(|v| {
3262                        if v.is_null() {
3263                            None
3264                        } else {
3265                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3266                        }
3267                    })
3268                    .unwrap_or(0);
3269
3270                let sum = current_val + new_val_num;
3271                current.insert(segment.to_string(), json!(sum));
3272                return Ok(true);
3273            } else {
3274                current
3275                    .entry(segment.to_string())
3276                    .or_insert_with(|| json!({}));
3277                current = current
3278                    .get_mut(segment)
3279                    .and_then(|v| v.as_object_mut())
3280                    .ok_or("Path collision: expected object")?;
3281            }
3282        }
3283
3284        Ok(false)
3285    }
3286
3287    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3288        let compiled = self.get_compiled_path(path);
3289        let segments = compiled.segments();
3290
3291        if !self.registers[object_reg].is_object() {
3292            self.registers[object_reg] = json!({});
3293        }
3294
3295        let obj = self.registers[object_reg]
3296            .as_object_mut()
3297            .ok_or("Not an object")?;
3298
3299        let mut current = obj;
3300        for (i, segment) in segments.iter().enumerate() {
3301            if i == segments.len() - 1 {
3302                // Get current value (default to 0 if null/missing)
3303                let current_val = current
3304                    .get(segment)
3305                    .and_then(|v| {
3306                        if v.is_null() {
3307                            None
3308                        } else {
3309                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3310                        }
3311                    })
3312                    .unwrap_or(0);
3313
3314                let incremented = current_val + 1;
3315                current.insert(segment.to_string(), json!(incremented));
3316                return Ok(true);
3317            } else {
3318                current
3319                    .entry(segment.to_string())
3320                    .or_insert_with(|| json!({}));
3321                current = current
3322                    .get_mut(segment)
3323                    .and_then(|v| v.as_object_mut())
3324                    .ok_or("Path collision: expected object")?;
3325            }
3326        }
3327
3328        Ok(false)
3329    }
3330
3331    fn set_field_min(
3332        &mut self,
3333        object_reg: Register,
3334        path: &str,
3335        value_reg: Register,
3336    ) -> Result<bool> {
3337        let compiled = self.get_compiled_path(path);
3338        let segments = compiled.segments();
3339        let new_value = self.registers[value_reg].clone();
3340
3341        if !self.registers[object_reg].is_object() {
3342            self.registers[object_reg] = json!({});
3343        }
3344
3345        let obj = self.registers[object_reg]
3346            .as_object_mut()
3347            .ok_or("Not an object")?;
3348
3349        let mut current = obj;
3350        for (i, segment) in segments.iter().enumerate() {
3351            if i == segments.len() - 1 {
3352                let should_update = if let Some(current_value) = current.get(segment) {
3353                    if current_value.is_null() {
3354                        true
3355                    } else {
3356                        match (current_value.as_i64(), new_value.as_i64()) {
3357                            (Some(current_val), Some(new_val)) => new_val < current_val,
3358                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3359                                (new_value.as_u64().unwrap() as i64) < current_val
3360                            }
3361                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3362                                new_val < current_value.as_u64().unwrap() as i64
3363                            }
3364                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3365                                (Some(current_val), Some(new_val)) => new_val < current_val,
3366                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3367                                    (Some(current_val), Some(new_val)) => new_val < current_val,
3368                                    _ => false,
3369                                },
3370                            },
3371                            _ => false,
3372                        }
3373                    }
3374                } else {
3375                    true
3376                };
3377
3378                if should_update {
3379                    current.insert(segment.to_string(), new_value);
3380                    return Ok(true);
3381                }
3382                return Ok(false);
3383            } else {
3384                current
3385                    .entry(segment.to_string())
3386                    .or_insert_with(|| json!({}));
3387                current = current
3388                    .get_mut(segment)
3389                    .and_then(|v| v.as_object_mut())
3390                    .ok_or("Path collision: expected object")?;
3391            }
3392        }
3393
3394        Ok(false)
3395    }
3396
3397    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3398        let compiled = self.get_compiled_path(path);
3399        let segments = compiled.segments();
3400        let mut current = &self.registers[object_reg];
3401
3402        for segment in segments {
3403            current = current
3404                .get(segment)
3405                .ok_or_else(|| format!("Field not found: {}", segment))?;
3406        }
3407
3408        Ok(current.clone())
3409    }
3410
3411    fn append_to_array(
3412        &mut self,
3413        object_reg: Register,
3414        path: &str,
3415        value_reg: Register,
3416        max_length: usize,
3417    ) -> Result<()> {
3418        let compiled = self.get_compiled_path(path);
3419        let segments = compiled.segments();
3420        let value = self.registers[value_reg].clone();
3421
3422        if !self.registers[object_reg].is_object() {
3423            self.registers[object_reg] = json!({});
3424        }
3425
3426        let obj = self.registers[object_reg]
3427            .as_object_mut()
3428            .ok_or("Not an object")?;
3429
3430        let mut current = obj;
3431        for (i, segment) in segments.iter().enumerate() {
3432            if i == segments.len() - 1 {
3433                current
3434                    .entry(segment.to_string())
3435                    .or_insert_with(|| json!([]));
3436                let arr = current
3437                    .get_mut(segment)
3438                    .and_then(|v| v.as_array_mut())
3439                    .ok_or("Path is not an array")?;
3440                arr.push(value.clone());
3441
3442                if arr.len() > max_length {
3443                    let excess = arr.len() - max_length;
3444                    arr.drain(0..excess);
3445                }
3446            } else {
3447                current
3448                    .entry(segment.to_string())
3449                    .or_insert_with(|| json!({}));
3450                current = current
3451                    .get_mut(segment)
3452                    .and_then(|v| v.as_object_mut())
3453                    .ok_or("Path collision: expected object")?;
3454            }
3455        }
3456
3457        Ok(())
3458    }
3459
3460    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3461        let value = &self.registers[reg];
3462        let transformed = Self::apply_transformation(value, transformation)?;
3463        self.registers[reg] = transformed;
3464        Ok(())
3465    }
3466
3467    fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3468        match transformation {
3469            Transformation::HexEncode => {
3470                if let Some(arr) = value.as_array() {
3471                    let bytes: Vec<u8> = arr
3472                        .iter()
3473                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3474                        .collect();
3475                    let hex = hex::encode(&bytes);
3476                    Ok(json!(hex))
3477                } else if value.is_string() {
3478                    Ok(value.clone())
3479                } else {
3480                    Err("HexEncode requires an array of numbers".into())
3481                }
3482            }
3483            Transformation::HexDecode => {
3484                if let Some(s) = value.as_str() {
3485                    let s = s.strip_prefix("0x").unwrap_or(s);
3486                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3487                    Ok(json!(bytes))
3488                } else {
3489                    Err("HexDecode requires a string".into())
3490                }
3491            }
3492            Transformation::Base58Encode => {
3493                if let Some(arr) = value.as_array() {
3494                    let bytes: Vec<u8> = arr
3495                        .iter()
3496                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3497                        .collect();
3498                    let encoded = bs58::encode(&bytes).into_string();
3499                    Ok(json!(encoded))
3500                } else if value.is_string() {
3501                    Ok(value.clone())
3502                } else {
3503                    Err("Base58Encode requires an array of numbers".into())
3504                }
3505            }
3506            Transformation::Base58Decode => {
3507                if let Some(s) = value.as_str() {
3508                    let bytes = bs58::decode(s)
3509                        .into_vec()
3510                        .map_err(|e| format!("Base58 decode error: {}", e))?;
3511                    Ok(json!(bytes))
3512                } else {
3513                    Err("Base58Decode requires a string".into())
3514                }
3515            }
3516            Transformation::ToString => Ok(json!(value.to_string())),
3517            Transformation::ToNumber => {
3518                if let Some(s) = value.as_str() {
3519                    let n = s
3520                        .parse::<i64>()
3521                        .map_err(|e| format!("Parse error: {}", e))?;
3522                    Ok(json!(n))
3523                } else {
3524                    Ok(value.clone())
3525                }
3526            }
3527        }
3528    }
3529
3530    fn evaluate_comparison(
3531        &self,
3532        field_value: &Value,
3533        op: &ComparisonOp,
3534        condition_value: &Value,
3535    ) -> Result<bool> {
3536        use ComparisonOp::*;
3537
3538        match op {
3539            Equal => Ok(field_value == condition_value),
3540            NotEqual => Ok(field_value != condition_value),
3541            GreaterThan => {
3542                // Try to compare as numbers
3543                match (field_value.as_i64(), condition_value.as_i64()) {
3544                    (Some(a), Some(b)) => Ok(a > b),
3545                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
3546                        (Some(a), Some(b)) => Ok(a > b),
3547                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
3548                            (Some(a), Some(b)) => Ok(a > b),
3549                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3550                        },
3551                    },
3552                }
3553            }
3554            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3555                (Some(a), Some(b)) => Ok(a >= b),
3556                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3557                    (Some(a), Some(b)) => Ok(a >= b),
3558                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3559                        (Some(a), Some(b)) => Ok(a >= b),
3560                        _ => {
3561                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3562                        }
3563                    },
3564                },
3565            },
3566            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3567                (Some(a), Some(b)) => Ok(a < b),
3568                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3569                    (Some(a), Some(b)) => Ok(a < b),
3570                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3571                        (Some(a), Some(b)) => Ok(a < b),
3572                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
3573                    },
3574                },
3575            },
3576            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3577                (Some(a), Some(b)) => Ok(a <= b),
3578                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3579                    (Some(a), Some(b)) => Ok(a <= b),
3580                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3581                        (Some(a), Some(b)) => Ok(a <= b),
3582                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3583                    },
3584                },
3585            },
3586        }
3587    }
3588
3589    fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3590        if let Some(state) = self.states.get(&state_id) {
3591            if let Some(index) = state.lookup_indexes.get(index_name) {
3592                if index.lookup(value).is_some() {
3593                    return true;
3594                }
3595            }
3596
3597            for (name, index) in state.lookup_indexes.iter() {
3598                if name == index_name {
3599                    continue;
3600                }
3601                if index.lookup(value).is_some() {
3602                    return true;
3603                }
3604            }
3605
3606            if let Some(pda_str) = value.as_str() {
3607                if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3608                    if pda_lookup.contains(pda_str) {
3609                        return true;
3610                    }
3611                }
3612            }
3613        }
3614
3615        false
3616    }
3617
3618    #[allow(clippy::type_complexity)]
3619    fn apply_deferred_when_op(
3620        &mut self,
3621        state_id: u32,
3622        op: &DeferredWhenOperation,
3623        entity_evaluator: Option<
3624            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
3625        >,
3626        computed_paths: Option<&[String]>,
3627    ) -> Result<Vec<Mutation>> {
3628        let state = self.states.get(&state_id).ok_or("State not found")?;
3629
3630        if op.primary_key.is_null() {
3631            return Ok(vec![]);
3632        }
3633
3634        let mut entity_state = state
3635            .get_and_touch(&op.primary_key)
3636            .unwrap_or_else(|| json!({}));
3637
3638        // Track old values of computed fields before setting the new value
3639        let old_computed_values: Vec<_> = computed_paths
3640            .map(|paths| {
3641                paths
3642                    .iter()
3643                    .map(|path| Self::get_value_at_path(&entity_state, path))
3644                    .collect()
3645            })
3646            .unwrap_or_default();
3647
3648        Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3649
3650        // Re-evaluate computed fields if an evaluator is provided
3651        if let Some(evaluator) = entity_evaluator {
3652            let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3653            let context_timestamp = self
3654                .current_context
3655                .as_ref()
3656                .map(|c| c.timestamp())
3657                .unwrap_or_else(|| {
3658                    std::time::SystemTime::now()
3659                        .duration_since(std::time::UNIX_EPOCH)
3660                        .unwrap()
3661                        .as_secs() as i64
3662                });
3663
3664            tracing::debug!(
3665                entity_name = %op.entity_name,
3666                primary_key = %op.primary_key,
3667                field_path = %op.field_path,
3668                "Re-evaluating computed fields after deferred when-op"
3669            );
3670
3671            if let Err(e) = evaluator(&mut entity_state, context_slot, context_timestamp) {
3672                tracing::warn!(
3673                    entity_name = %op.entity_name,
3674                    primary_key = %op.primary_key,
3675                    error = %e,
3676                    "Failed to evaluate computed fields after deferred when-op"
3677                );
3678            }
3679        }
3680
3681        state.insert_with_eviction(op.primary_key.clone(), entity_state.clone());
3682
3683        if !op.emit {
3684            return Ok(vec![]);
3685        }
3686
3687        let mut patch = json!({});
3688        Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3689
3690        // Add computed field changes to the patch
3691        if let Some(paths) = computed_paths {
3692            tracing::debug!(
3693                entity_name = %op.entity_name,
3694                primary_key = %op.primary_key,
3695                computed_paths_count = paths.len(),
3696                "Checking computed fields for changes after deferred when-op"
3697            );
3698            for (path, old_value) in paths.iter().zip(old_computed_values.iter()) {
3699                let new_value = Self::get_value_at_path(&entity_state, path);
3700                tracing::debug!(
3701                    entity_name = %op.entity_name,
3702                    primary_key = %op.primary_key,
3703                    field_path = %path,
3704                    old_value = ?old_value,
3705                    new_value = ?new_value,
3706                    "Comparing computed field values"
3707                );
3708                if let Some(ref new_val) = new_value {
3709                    if Some(new_val) != old_value.as_ref() {
3710                        Self::set_nested_field_value(&mut patch, path, new_val.clone())?;
3711                        tracing::info!(
3712                            entity_name = %op.entity_name,
3713                            primary_key = %op.primary_key,
3714                            field_path = %path,
3715                            "Computed field changed after deferred when-op, including in mutation"
3716                        );
3717                    }
3718                }
3719            }
3720        }
3721
3722        Ok(vec![Mutation {
3723            export: op.entity_name.clone(),
3724            key: op.primary_key.clone(),
3725            patch,
3726            append: vec![],
3727        }])
3728    }
3729
3730    fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3731        let parts: Vec<&str> = path.split('.').collect();
3732        let mut current = obj;
3733
3734        for (i, part) in parts.iter().enumerate() {
3735            if i == parts.len() - 1 {
3736                if let Some(map) = current.as_object_mut() {
3737                    map.insert(part.to_string(), value);
3738                    return Ok(());
3739                }
3740                return Err("Cannot set field on non-object".into());
3741            }
3742
3743            if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3744                if let Some(map) = current.as_object_mut() {
3745                    map.insert(part.to_string(), json!({}));
3746                }
3747            }
3748
3749            current = current.get_mut(*part).ok_or("Path navigation failed")?;
3750        }
3751
3752        Ok(())
3753    }
3754
3755    pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3756        let now = std::time::SystemTime::now()
3757            .duration_since(std::time::UNIX_EPOCH)
3758            .unwrap()
3759            .as_secs() as i64;
3760
3761        let state = match self.states.get(&state_id) {
3762            Some(s) => s,
3763            None => return 0,
3764        };
3765
3766        let mut removed = 0;
3767        state.deferred_when_ops.retain(|_, ops| {
3768            let before = ops.len();
3769            ops.retain(|op| now - op.deferred_at < max_age_secs);
3770            removed += before - ops.len();
3771            !ops.is_empty()
3772        });
3773
3774        removed
3775    }
3776
3777    /// Update a PDA reverse lookup and return pending updates for reprocessing.
3778    /// Returns any pending account updates that were queued for this PDA.
3779    /// ```ignore
3780    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3781    /// for update in pending {
3782    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
3783    /// }
3784    /// ```
3785    #[cfg_attr(feature = "otel", instrument(
3786        name = "vm.update_pda_lookup",
3787        skip(self),
3788        fields(
3789            pda = %pda_address,
3790            seed = %seed_value,
3791        )
3792    ))]
3793    pub fn update_pda_reverse_lookup(
3794        &mut self,
3795        state_id: u32,
3796        lookup_name: &str,
3797        pda_address: String,
3798        seed_value: String,
3799    ) -> Result<Vec<PendingAccountUpdate>> {
3800        let state = self
3801            .states
3802            .get_mut(&state_id)
3803            .ok_or("State table not found")?;
3804
3805        let lookup = state
3806            .pda_reverse_lookups
3807            .entry(lookup_name.to_string())
3808            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3809
3810        // Detect if the PDA mapping is CHANGING (same PDA, different seed).
3811        // This happens at round boundaries when e.g. entropyVar is remapped
3812        // from old_round to new_round by a Reset instruction.
3813        let old_seed = lookup.index.peek(&pda_address).cloned();
3814        let mapping_changed = old_seed
3815            .as_ref()
3816            .map(|old| old != &seed_value)
3817            .unwrap_or(false);
3818
3819        if !mapping_changed && old_seed.is_none() {
3820            tracing::info!(
3821                pda = %pda_address,
3822                seed = %seed_value,
3823                "[PDA] First-time PDA reverse lookup established"
3824            );
3825        } else if !mapping_changed {
3826            tracing::debug!(
3827                pda = %pda_address,
3828                seed = %seed_value,
3829                "[PDA] PDA reverse lookup re-registered (same mapping)"
3830            );
3831        }
3832
3833        let evicted_pda = lookup.insert(pda_address.clone(), seed_value.clone());
3834
3835        if let Some(ref evicted) = evicted_pda {
3836            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3837                let count = evicted_updates.len();
3838                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3839            }
3840        }
3841
3842        // Flush pending updates from QueueUntil for this PDA
3843        let mut pending = self.flush_pending_updates(state_id, &pda_address)?;
3844
3845        // When the mapping changed, the last account update for this PDA was
3846        // processed with the OLD seed (wrong key).  We need to:
3847        // 1. Remove stale lookup-index entries that map this PDA address to the
3848        //    old primary key — otherwise LookupIndex resolves the stale entry
3849        //    before PDA reverse lookup is even tried.
3850        // 2. Pull the cached account data and return it for reprocessing with
3851        //    the new mapping.
3852        if mapping_changed {
3853            if let Some(state) = self.states.get(&state_id) {
3854                // Clear stale lookup-index entries for this PDA address
3855                for index in state.lookup_indexes.values() {
3856                    index.remove(&Value::String(pda_address.clone()));
3857                }
3858
3859                if let Some((_, mut cached)) = state.last_account_data.remove(&pda_address) {
3860                    tracing::info!(
3861                        pda = %pda_address,
3862                        old_seed = ?old_seed,
3863                        new_seed = %seed_value,
3864                        account_type = %cached.account_type,
3865                        "PDA mapping changed — clearing stale indexes and reprocessing cached data"
3866                    );
3867                    cached.is_stale_reprocess = true;
3868                    pending.push(cached);
3869                }
3870            }
3871        }
3872
3873        Ok(pending)
3874    }
3875
3876    /// Clean up expired pending updates that are older than the TTL
3877    ///
3878    /// Returns the number of updates that were removed.
3879    /// This should be called periodically to prevent memory leaks from orphaned updates.
3880    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3881        let state = match self.states.get_mut(&state_id) {
3882            Some(s) => s,
3883            None => return 0,
3884        };
3885
3886        let now = std::time::SystemTime::now()
3887            .duration_since(std::time::UNIX_EPOCH)
3888            .unwrap()
3889            .as_secs() as i64;
3890
3891        let mut removed_count = 0;
3892
3893        // Iterate through all pending updates and remove expired ones
3894        state.pending_updates.retain(|_pda_address, updates| {
3895            let original_len = updates.len();
3896
3897            updates.retain(|update| {
3898                let age = now - update.queued_at;
3899                age <= PENDING_UPDATE_TTL_SECONDS
3900            });
3901
3902            removed_count += original_len - updates.len();
3903
3904            // Remove the entry entirely if no updates remain
3905            !updates.is_empty()
3906        });
3907
3908        // Update the global counter
3909        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3910
3911        if removed_count > 0 {
3912            #[cfg(feature = "otel")]
3913            crate::vm_metrics::record_pending_updates_expired(
3914                removed_count as u64,
3915                &state.entity_name,
3916            );
3917        }
3918
3919        removed_count
3920    }
3921
3922    /// Queue an account update for later processing when PDA reverse lookup is not yet available
3923    ///
3924    /// # Workflow
3925    ///
3926    /// This implements a deferred processing pattern for account updates when the PDA reverse
3927    /// lookup needed to resolve the primary key is not yet available:
3928    ///
3929    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
3930    ///    is not available, call `queue_account_update()` to queue it for later.
3931    ///
3932    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
3933    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
3934    ///
3935    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
3936    ///    ```ignore
3937    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3938    ///    for update in pending {
3939    ///        let mutations = vm.process_event(
3940    ///            &bytecode, update.account_data, &update.account_type, None, None
3941    ///        )?;
3942    ///    }
3943    ///    ```
3944    ///
3945    /// # Arguments
3946    ///
3947    /// * `state_id` - The state table ID
3948    /// * `pda_address` - The PDA address that needs reverse lookup
3949    /// * `account_type` - The event type name for reprocessing
3950    /// * `account_data` - The account data (event value) for reprocessing
3951    /// * `slot` - The slot number when this update occurred
3952    /// * `signature` - The transaction signature
3953    #[cfg_attr(feature = "otel", instrument(
3954        name = "vm.queue_account_update",
3955        skip(self, update),
3956        fields(
3957            pda = %update.pda_address,
3958            account_type = %update.account_type,
3959            slot = update.slot,
3960        )
3961    ))]
3962    pub fn queue_account_update(
3963        &mut self,
3964        state_id: u32,
3965        update: QueuedAccountUpdate,
3966    ) -> Result<()> {
3967        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3968            self.cleanup_expired_pending_updates(state_id);
3969            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3970                self.drop_oldest_pending_update(state_id)?;
3971            }
3972        }
3973
3974        let state = self
3975            .states
3976            .get_mut(&state_id)
3977            .ok_or("State table not found")?;
3978
3979        let pending = PendingAccountUpdate {
3980            account_type: update.account_type,
3981            pda_address: update.pda_address.clone(),
3982            account_data: update.account_data,
3983            slot: update.slot,
3984            write_version: update.write_version,
3985            signature: update.signature,
3986            queued_at: std::time::SystemTime::now()
3987                .duration_since(std::time::UNIX_EPOCH)
3988                .unwrap()
3989                .as_secs() as i64,
3990            is_stale_reprocess: false,
3991        };
3992
3993        let pda_address = pending.pda_address.clone();
3994        let slot = pending.slot;
3995
3996        let mut updates = state
3997            .pending_updates
3998            .entry(pda_address.clone())
3999            .or_insert_with(Vec::new);
4000
4001        let original_len = updates.len();
4002        updates.retain(|existing| existing.slot > slot);
4003        let removed_by_dedup = original_len - updates.len();
4004
4005        if removed_by_dedup > 0 {
4006            self.pending_queue_size = self
4007                .pending_queue_size
4008                .saturating_sub(removed_by_dedup as u64);
4009        }
4010
4011        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
4012            updates.remove(0);
4013            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4014        }
4015
4016        updates.push(pending);
4017        #[cfg(feature = "otel")]
4018        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
4019
4020        Ok(())
4021    }
4022
4023    pub fn queue_instruction_event(
4024        &mut self,
4025        state_id: u32,
4026        event: QueuedInstructionEvent,
4027    ) -> Result<()> {
4028        let state = self
4029            .states
4030            .get_mut(&state_id)
4031            .ok_or("State table not found")?;
4032
4033        let pda_address = event.pda_address.clone();
4034
4035        let pending = PendingInstructionEvent {
4036            event_type: event.event_type,
4037            pda_address: event.pda_address,
4038            event_data: event.event_data,
4039            slot: event.slot,
4040            signature: event.signature,
4041            queued_at: std::time::SystemTime::now()
4042                .duration_since(std::time::UNIX_EPOCH)
4043                .unwrap()
4044                .as_secs() as i64,
4045        };
4046
4047        let mut events = state
4048            .pending_instruction_events
4049            .entry(pda_address)
4050            .or_insert_with(Vec::new);
4051
4052        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
4053            events.remove(0);
4054        }
4055
4056        events.push(pending);
4057
4058        Ok(())
4059    }
4060
4061    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
4062        self.last_pda_lookup_miss.take()
4063    }
4064
4065    pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
4066        self.last_lookup_index_miss.take()
4067    }
4068
4069    pub fn take_last_pda_registered(&mut self) -> Option<String> {
4070        self.last_pda_registered.take()
4071    }
4072
4073    pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
4074        std::mem::take(&mut self.last_lookup_index_keys)
4075    }
4076
4077    pub fn flush_pending_instruction_events(
4078        &mut self,
4079        state_id: u32,
4080        pda_address: &str,
4081    ) -> Vec<PendingInstructionEvent> {
4082        let state = match self.states.get_mut(&state_id) {
4083            Some(s) => s,
4084            None => return Vec::new(),
4085        };
4086
4087        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
4088            events
4089        } else {
4090            Vec::new()
4091        }
4092    }
4093
4094    /// Get statistics about the pending queue for monitoring
4095    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
4096        let state = self.states.get(&state_id)?;
4097
4098        let now = std::time::SystemTime::now()
4099            .duration_since(std::time::UNIX_EPOCH)
4100            .unwrap()
4101            .as_secs() as i64;
4102
4103        let mut total_updates = 0;
4104        let mut oldest_timestamp = now;
4105        let mut largest_pda_queue = 0;
4106        let mut estimated_memory = 0;
4107
4108        for entry in state.pending_updates.iter() {
4109            let (_, updates) = entry.pair();
4110            total_updates += updates.len();
4111            largest_pda_queue = largest_pda_queue.max(updates.len());
4112
4113            for update in updates.iter() {
4114                oldest_timestamp = oldest_timestamp.min(update.queued_at);
4115                // Rough memory estimate
4116                estimated_memory += update.account_type.len() +
4117                                   update.pda_address.len() +
4118                                   update.signature.len() +
4119                                   16 + // slot + queued_at
4120                                   estimate_json_size(&update.account_data);
4121            }
4122        }
4123
4124        Some(PendingQueueStats {
4125            total_updates,
4126            unique_pdas: state.pending_updates.len(),
4127            oldest_age_seconds: now - oldest_timestamp,
4128            largest_pda_queue_size: largest_pda_queue,
4129            estimated_memory_bytes: estimated_memory,
4130        })
4131    }
4132
4133    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
4134        let mut stats = VmMemoryStats {
4135            path_cache_size: self.path_cache.len(),
4136            ..Default::default()
4137        };
4138
4139        if let Some(state) = self.states.get(&state_id) {
4140            stats.state_table_entity_count = state.data.len();
4141            stats.state_table_max_entries = state.config.max_entries;
4142            stats.state_table_at_capacity = state.is_at_capacity();
4143
4144            stats.lookup_index_count = state.lookup_indexes.len();
4145            stats.lookup_index_total_entries =
4146                state.lookup_indexes.values().map(|idx| idx.len()).sum();
4147
4148            stats.temporal_index_count = state.temporal_indexes.len();
4149            stats.temporal_index_total_entries = state
4150                .temporal_indexes
4151                .values()
4152                .map(|idx| idx.total_entries())
4153                .sum();
4154
4155            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
4156            stats.pda_reverse_lookup_total_entries = state
4157                .pda_reverse_lookups
4158                .values()
4159                .map(|lookup| lookup.len())
4160                .sum();
4161
4162            stats.version_tracker_entries = state.version_tracker.len();
4163
4164            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
4165        }
4166
4167        stats
4168    }
4169
4170    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
4171        let pending_removed = self.cleanup_expired_pending_updates(state_id);
4172        let temporal_removed = self.cleanup_temporal_indexes(state_id);
4173
4174        #[cfg(feature = "otel")]
4175        if let Some(state) = self.states.get(&state_id) {
4176            crate::vm_metrics::record_cleanup(
4177                pending_removed,
4178                temporal_removed,
4179                &state.entity_name,
4180            );
4181        }
4182
4183        CleanupResult {
4184            pending_updates_removed: pending_removed,
4185            temporal_entries_removed: temporal_removed,
4186        }
4187    }
4188
4189    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
4190        let state = match self.states.get_mut(&state_id) {
4191            Some(s) => s,
4192            None => return 0,
4193        };
4194
4195        let now = std::time::SystemTime::now()
4196            .duration_since(std::time::UNIX_EPOCH)
4197            .unwrap()
4198            .as_secs() as i64;
4199
4200        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
4201        let mut total_removed = 0;
4202
4203        for (_, index) in state.temporal_indexes.iter_mut() {
4204            total_removed += index.cleanup_expired(cutoff);
4205        }
4206
4207        total_removed
4208    }
4209
4210    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
4211        let state = self.states.get(&state_id)?;
4212
4213        if state.is_at_capacity() {
4214            Some(CapacityWarning {
4215                current_entries: state.data.len(),
4216                max_entries: state.config.max_entries,
4217                entries_over_limit: state.entries_over_limit(),
4218            })
4219        } else {
4220            None
4221        }
4222    }
4223
4224    /// Drop the oldest pending update across all PDAs
4225    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
4226        let state = self
4227            .states
4228            .get_mut(&state_id)
4229            .ok_or("State table not found")?;
4230
4231        let mut oldest_pda: Option<String> = None;
4232        let mut oldest_timestamp = i64::MAX;
4233
4234        // Find the PDA with the oldest update
4235        for entry in state.pending_updates.iter() {
4236            let (pda, updates) = entry.pair();
4237            if let Some(update) = updates.first() {
4238                if update.queued_at < oldest_timestamp {
4239                    oldest_timestamp = update.queued_at;
4240                    oldest_pda = Some(pda.clone());
4241                }
4242            }
4243        }
4244
4245        // Remove the oldest update
4246        if let Some(pda) = oldest_pda {
4247            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
4248                if !updates.is_empty() {
4249                    updates.remove(0);
4250                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4251
4252                    // Remove the entry if it's now empty
4253                    if updates.is_empty() {
4254                        drop(updates);
4255                        state.pending_updates.remove(&pda);
4256                    }
4257                }
4258            }
4259        }
4260
4261        Ok(())
4262    }
4263
4264    /// Flush and return pending updates for a PDA for external reprocessing
4265    ///
4266    /// Returns the pending updates that were queued for this PDA address.
4267    /// The caller should reprocess these through the VM using process_event().
4268    fn flush_pending_updates(
4269        &mut self,
4270        state_id: u32,
4271        pda_address: &str,
4272    ) -> Result<Vec<PendingAccountUpdate>> {
4273        let state = self
4274            .states
4275            .get_mut(&state_id)
4276            .ok_or("State table not found")?;
4277
4278        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
4279            let count = pending_updates.len();
4280            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4281            #[cfg(feature = "otel")]
4282            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
4283            Ok(pending_updates)
4284        } else {
4285            Ok(Vec::new())
4286        }
4287    }
4288
4289    /// Cache the most recent account data for a PDA address.
4290    /// Called by the vixen runtime after a Lookup-handler account update is
4291    /// successfully processed.  When a PDA mapping later changes (e.g. at a
4292    /// round boundary), the cached data is returned for reprocessing with the
4293    /// new mapping.
4294    pub fn cache_last_account_data(
4295        &mut self,
4296        state_id: u32,
4297        pda_address: &str,
4298        update: PendingAccountUpdate,
4299    ) {
4300        if let Some(state) = self.states.get(&state_id) {
4301            state
4302                .last_account_data
4303                .insert(pda_address.to_string(), update);
4304        }
4305    }
4306
4307    /// Try to resolve a primary key via PDA reverse lookup
4308    pub fn try_pda_reverse_lookup(
4309        &mut self,
4310        state_id: u32,
4311        lookup_name: &str,
4312        pda_address: &str,
4313    ) -> Option<String> {
4314        let state = self.states.get_mut(&state_id)?;
4315
4316        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
4317            if let Some(value) = lookup.lookup(pda_address) {
4318                self.pda_cache_hits += 1;
4319                return Some(value);
4320            }
4321        }
4322
4323        self.pda_cache_misses += 1;
4324        None
4325    }
4326
4327    /// Try to resolve a value through lookup indexes.
4328    /// This attempts to resolve the value using any lookup index in the state.
4329    pub fn try_lookup_index_resolution(&self, state_id: u32, value: &Value) -> Option<Value> {
4330        let state = self.states.get(&state_id)?;
4331
4332        for index in state.lookup_indexes.values() {
4333            if let Some(resolved) = index.lookup(value) {
4334                return Some(resolved);
4335            }
4336        }
4337
4338        None
4339    }
4340
4341    /// Try to resolve a primary key via chained PDA + lookup index resolution.
4342    /// First tries PDA reverse lookup, then tries to resolve the result through lookup indexes.
4343    /// This is useful when the PDA maps to an intermediate value (e.g., round address)
4344    /// that needs to be resolved to the actual primary key (e.g., round_id).
4345    pub fn try_chained_pda_lookup(
4346        &mut self,
4347        state_id: u32,
4348        lookup_name: &str,
4349        pda_address: &str,
4350    ) -> Option<String> {
4351        // First, try PDA reverse lookup
4352        let pda_result = self.try_pda_reverse_lookup(state_id, lookup_name, pda_address)?;
4353
4354        // Try to resolve the PDA result through lookup indexes
4355        // (e.g., round address -> round_id)
4356        let pda_value = Value::String(pda_result.clone());
4357        if let Some(resolved) = self.try_lookup_index_resolution(state_id, &pda_value) {
4358            resolved.as_str().map(|s| s.to_string())
4359        } else {
4360            // Return the PDA result if we can't resolve further
4361            pda_value.as_str().map(|s| s.to_string())
4362        }
4363    }
4364
4365    // ============================================================================
4366    // Computed Expression Evaluator (Task 5)
4367    // ============================================================================
4368
4369    /// Evaluate a computed expression AST against the current state
4370    /// This is the core runtime evaluator for computed fields from the AST
4371    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4372        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4373    }
4374
4375    /// Evaluate a computed expression with a variable environment (for let bindings)
4376    fn evaluate_computed_expr_with_env(
4377        &self,
4378        expr: &ComputedExpr,
4379        state: &Value,
4380        env: &std::collections::HashMap<String, Value>,
4381    ) -> Result<Value> {
4382        match expr {
4383            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4384
4385            ComputedExpr::Var { name } => env
4386                .get(name)
4387                .cloned()
4388                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4389
4390            ComputedExpr::Let { name, value, body } => {
4391                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4392                let mut new_env = env.clone();
4393                new_env.insert(name.clone(), val);
4394                self.evaluate_computed_expr_with_env(body, state, &new_env)
4395            }
4396
4397            ComputedExpr::If {
4398                condition,
4399                then_branch,
4400                else_branch,
4401            } => {
4402                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4403                if self.value_to_bool(&cond_val) {
4404                    self.evaluate_computed_expr_with_env(then_branch, state, env)
4405                } else {
4406                    self.evaluate_computed_expr_with_env(else_branch, state, env)
4407                }
4408            }
4409
4410            ComputedExpr::None => Ok(Value::Null),
4411
4412            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4413
4414            ComputedExpr::Slice { expr, start, end } => {
4415                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4416                match val {
4417                    Value::Array(arr) => {
4418                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4419                        Ok(Value::Array(slice))
4420                    }
4421                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4422                }
4423            }
4424
4425            ComputedExpr::Index { expr, index } => {
4426                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4427                match val {
4428                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4429                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4430                }
4431            }
4432
4433            ComputedExpr::U64FromLeBytes { bytes } => {
4434                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4435                let byte_vec = self.value_to_bytes(&val)?;
4436                if byte_vec.len() < 8 {
4437                    return Err(format!(
4438                        "u64::from_le_bytes requires 8 bytes, got {}",
4439                        byte_vec.len()
4440                    )
4441                    .into());
4442                }
4443                let arr: [u8; 8] = byte_vec[..8]
4444                    .try_into()
4445                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4446                Ok(Value::Number(serde_json::Number::from(u64::from_le_bytes(
4447                    arr,
4448                ))))
4449            }
4450
4451            ComputedExpr::U64FromBeBytes { bytes } => {
4452                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4453                let byte_vec = self.value_to_bytes(&val)?;
4454                if byte_vec.len() < 8 {
4455                    return Err(format!(
4456                        "u64::from_be_bytes requires 8 bytes, got {}",
4457                        byte_vec.len()
4458                    )
4459                    .into());
4460                }
4461                let arr: [u8; 8] = byte_vec[..8]
4462                    .try_into()
4463                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4464                Ok(Value::Number(serde_json::Number::from(u64::from_be_bytes(
4465                    arr,
4466                ))))
4467            }
4468
4469            ComputedExpr::ByteArray { bytes } => {
4470                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4471            }
4472
4473            ComputedExpr::Closure { param, body } => {
4474                // Closures are stored as-is; they're evaluated when used in map()
4475                // Return a special representation
4476                Ok(json!({
4477                    "__closure": {
4478                        "param": param,
4479                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
4480                    }
4481                }))
4482            }
4483
4484            ComputedExpr::Unary { op, expr } => {
4485                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4486                self.apply_unary_op(op, &val)
4487            }
4488
4489            ComputedExpr::JsonToBytes { expr } => {
4490                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4491                // Convert JSON array of numbers to byte array
4492                let bytes = self.value_to_bytes(&val)?;
4493                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4494            }
4495
4496            ComputedExpr::UnwrapOr { expr, default } => {
4497                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4498                if val.is_null() {
4499                    Ok(default.clone())
4500                } else {
4501                    Ok(val)
4502                }
4503            }
4504
4505            ComputedExpr::Binary { op, left, right } => {
4506                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4507                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4508                self.apply_binary_op(op, &l, &r)
4509            }
4510
4511            ComputedExpr::Cast { expr, to_type } => {
4512                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4513                self.apply_cast(&val, to_type)
4514            }
4515
4516            ComputedExpr::ResolverComputed {
4517                resolver,
4518                method,
4519                args,
4520            } => {
4521                let evaluated_args: Vec<Value> = args
4522                    .iter()
4523                    .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4524                    .collect::<Result<Vec<_>>>()?;
4525                crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4526            }
4527
4528            ComputedExpr::MethodCall { expr, method, args } => {
4529                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4530                // Special handling for map() with closures
4531                if method == "map" && args.len() == 1 {
4532                    if let ComputedExpr::Closure { param, body } = &args[0] {
4533                        // If the value is null, return null (Option::None.map returns None)
4534                        if val.is_null() {
4535                            return Ok(Value::Null);
4536                        }
4537
4538                        if let Value::Array(arr) = &val {
4539                            let results: Result<Vec<Value>> = arr
4540                                .iter()
4541                                .map(|elem| {
4542                                    let mut closure_env = env.clone();
4543                                    closure_env.insert(param.clone(), elem.clone());
4544                                    self.evaluate_computed_expr_with_env(body, state, &closure_env)
4545                                })
4546                                .collect();
4547                            return Ok(Value::Array(results?));
4548                        }
4549
4550                        let mut closure_env = env.clone();
4551                        closure_env.insert(param.clone(), val);
4552                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4553                    }
4554                }
4555                let evaluated_args: Vec<Value> = args
4556                    .iter()
4557                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4558                    .collect::<Result<Vec<_>>>()?;
4559                self.apply_method_call(&val, method, &evaluated_args)
4560            }
4561
4562            ComputedExpr::Literal { value } => Ok(value.clone()),
4563
4564            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4565
4566            ComputedExpr::ContextSlot => Ok(self
4567                .current_context
4568                .as_ref()
4569                .and_then(|ctx| ctx.slot)
4570                .map(|s| json!(s))
4571                .unwrap_or(Value::Null)),
4572
4573            ComputedExpr::ContextTimestamp => Ok(self
4574                .current_context
4575                .as_ref()
4576                .map(|ctx| json!(ctx.timestamp()))
4577                .unwrap_or(Value::Null)),
4578
4579            ComputedExpr::Keccak256 { expr } => {
4580                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4581                let bytes = self.value_to_bytes(&val)?;
4582                use sha3::{Digest, Keccak256};
4583                let hash = Keccak256::digest(&bytes);
4584                Ok(Value::Array(
4585                    hash.to_vec().iter().map(|b| json!(*b)).collect(),
4586                ))
4587            }
4588        }
4589    }
4590
4591    /// Convert a JSON value to a byte vector
4592    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4593        match val {
4594            Value::Array(arr) => arr
4595                .iter()
4596                .map(|v| {
4597                    v.as_u64()
4598                        .map(|n| n as u8)
4599                        .ok_or_else(|| "Array element not a valid byte".into())
4600                })
4601                .collect(),
4602            Value::String(s) => {
4603                // Try to decode as hex
4604                if s.starts_with("0x") || s.starts_with("0X") {
4605                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4606                } else {
4607                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4608                }
4609            }
4610            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4611        }
4612    }
4613
4614    /// Apply a unary operation
4615    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4616        use crate::ast::UnaryOp;
4617        match op {
4618            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4619            UnaryOp::ReverseBits => match val.as_u64() {
4620                Some(n) => Ok(json!(n.reverse_bits())),
4621                None => match val.as_i64() {
4622                    Some(n) => Ok(json!((n as u64).reverse_bits())),
4623                    None => Err("reverse_bits requires an integer".into()),
4624                },
4625            },
4626        }
4627    }
4628
4629    /// Get a field value from state by path (e.g., "section.field" or just "field")
4630    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4631        let segments: Vec<&str> = path.split('.').collect();
4632        let mut current = state;
4633
4634        for segment in segments {
4635            match current.get(segment) {
4636                Some(v) => current = v,
4637                None => return Ok(Value::Null),
4638            }
4639        }
4640
4641        Ok(current.clone())
4642    }
4643
4644    /// Apply a binary operation to two values
4645    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4646        match op {
4647            // Arithmetic operations
4648            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4649            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4650            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4651            BinaryOp::Div => {
4652                // Check for division by zero
4653                if let Some(r) = right.as_i64() {
4654                    if r == 0 {
4655                        return Err("Division by zero".into());
4656                    }
4657                }
4658                if let Some(r) = right.as_f64() {
4659                    if r == 0.0 {
4660                        return Err("Division by zero".into());
4661                    }
4662                }
4663                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4664            }
4665            BinaryOp::Mod => {
4666                // Modulo - only for integers
4667                match (left.as_i64(), right.as_i64()) {
4668                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4669                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4670                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4671                        _ => Err("Modulo requires non-zero integer operands".into()),
4672                    },
4673                    _ => Err("Modulo by zero".into()),
4674                }
4675            }
4676
4677            // Comparison operations
4678            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4679            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4680            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4681            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4682            BinaryOp::Eq => Ok(json!(left == right)),
4683            BinaryOp::Ne => Ok(json!(left != right)),
4684
4685            // Logical operations
4686            BinaryOp::And => {
4687                let l_bool = self.value_to_bool(left);
4688                let r_bool = self.value_to_bool(right);
4689                Ok(json!(l_bool && r_bool))
4690            }
4691            BinaryOp::Or => {
4692                let l_bool = self.value_to_bool(left);
4693                let r_bool = self.value_to_bool(right);
4694                Ok(json!(l_bool || r_bool))
4695            }
4696
4697            // Bitwise operations
4698            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4699                (Some(a), Some(b)) => Ok(json!(a ^ b)),
4700                _ => match (left.as_i64(), right.as_i64()) {
4701                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
4702                    _ => Err("XOR requires integer operands".into()),
4703                },
4704            },
4705            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4706                (Some(a), Some(b)) => Ok(json!(a & b)),
4707                _ => match (left.as_i64(), right.as_i64()) {
4708                    (Some(a), Some(b)) => Ok(json!(a & b)),
4709                    _ => Err("BitAnd requires integer operands".into()),
4710                },
4711            },
4712            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4713                (Some(a), Some(b)) => Ok(json!(a | b)),
4714                _ => match (left.as_i64(), right.as_i64()) {
4715                    (Some(a), Some(b)) => Ok(json!(a | b)),
4716                    _ => Err("BitOr requires integer operands".into()),
4717                },
4718            },
4719            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4720                (Some(a), Some(b)) => Ok(json!(a << b)),
4721                _ => match (left.as_i64(), right.as_i64()) {
4722                    (Some(a), Some(b)) => Ok(json!(a << b)),
4723                    _ => Err("Shl requires integer operands".into()),
4724                },
4725            },
4726            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4727                (Some(a), Some(b)) => Ok(json!(a >> b)),
4728                _ => match (left.as_i64(), right.as_i64()) {
4729                    (Some(a), Some(b)) => Ok(json!(a >> b)),
4730                    _ => Err("Shr requires integer operands".into()),
4731                },
4732            },
4733        }
4734    }
4735
4736    /// Helper for numeric operations that can work on integers or floats
4737    fn numeric_op<F1, F2>(
4738        &self,
4739        left: &Value,
4740        right: &Value,
4741        int_op: F1,
4742        float_op: F2,
4743    ) -> Result<Value>
4744    where
4745        F1: Fn(i64, i64) -> i64,
4746        F2: Fn(f64, f64) -> f64,
4747    {
4748        // Try i64 first
4749        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4750            return Ok(json!(int_op(a, b)));
4751        }
4752
4753        // Try u64
4754        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4755            // For u64, we need to be careful with underflow in subtraction
4756            return Ok(json!(int_op(a as i64, b as i64)));
4757        }
4758
4759        // Try f64
4760        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4761            return Ok(json!(float_op(a, b)));
4762        }
4763
4764        // If either is null, return null
4765        if left.is_null() || right.is_null() {
4766            return Ok(Value::Null);
4767        }
4768
4769        Err(format!(
4770            "Cannot perform numeric operation on {:?} and {:?}",
4771            left, right
4772        )
4773        .into())
4774    }
4775
4776    /// Helper for comparison operations
4777    fn comparison_op<F1, F2>(
4778        &self,
4779        left: &Value,
4780        right: &Value,
4781        int_cmp: F1,
4782        float_cmp: F2,
4783    ) -> Result<Value>
4784    where
4785        F1: Fn(i64, i64) -> bool,
4786        F2: Fn(f64, f64) -> bool,
4787    {
4788        // Try i64 first
4789        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4790            return Ok(json!(int_cmp(a, b)));
4791        }
4792
4793        // Try u64
4794        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4795            return Ok(json!(int_cmp(a as i64, b as i64)));
4796        }
4797
4798        // Try f64
4799        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4800            return Ok(json!(float_cmp(a, b)));
4801        }
4802
4803        // If either is null, comparison returns false
4804        if left.is_null() || right.is_null() {
4805            return Ok(json!(false));
4806        }
4807
4808        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4809    }
4810
4811    /// Convert a value to boolean for logical operations
4812    fn value_to_bool(&self, value: &Value) -> bool {
4813        match value {
4814            Value::Null => false,
4815            Value::Bool(b) => *b,
4816            Value::Number(n) => {
4817                if let Some(i) = n.as_i64() {
4818                    i != 0
4819                } else if let Some(f) = n.as_f64() {
4820                    f != 0.0
4821                } else {
4822                    true
4823                }
4824            }
4825            Value::String(s) => !s.is_empty(),
4826            Value::Array(arr) => !arr.is_empty(),
4827            Value::Object(obj) => !obj.is_empty(),
4828        }
4829    }
4830
4831    /// Apply a type cast to a value
4832    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4833        match to_type {
4834            "i8" | "i16" | "i32" | "i64" | "isize" => {
4835                if let Some(n) = value.as_i64() {
4836                    Ok(json!(n))
4837                } else if let Some(n) = value.as_u64() {
4838                    Ok(json!(n as i64))
4839                } else if let Some(n) = value.as_f64() {
4840                    Ok(json!(n as i64))
4841                } else if let Some(s) = value.as_str() {
4842                    s.parse::<i64>()
4843                        .map(|n| json!(n))
4844                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4845                } else {
4846                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4847                }
4848            }
4849            "u8" | "u16" | "u32" | "u64" | "usize" => {
4850                if let Some(n) = value.as_u64() {
4851                    Ok(json!(n))
4852                } else if let Some(n) = value.as_i64() {
4853                    Ok(json!(n as u64))
4854                } else if let Some(n) = value.as_f64() {
4855                    Ok(json!(n as u64))
4856                } else if let Some(s) = value.as_str() {
4857                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4858                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4859                    })
4860                } else {
4861                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4862                }
4863            }
4864            "f32" | "f64" => {
4865                if let Some(n) = value.as_f64() {
4866                    Ok(json!(n))
4867                } else if let Some(n) = value.as_i64() {
4868                    Ok(json!(n as f64))
4869                } else if let Some(n) = value.as_u64() {
4870                    Ok(json!(n as f64))
4871                } else if let Some(s) = value.as_str() {
4872                    s.parse::<f64>()
4873                        .map(|n| json!(n))
4874                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4875                } else {
4876                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4877                }
4878            }
4879            "String" | "string" => Ok(json!(value.to_string())),
4880            "bool" => Ok(json!(self.value_to_bool(value))),
4881            _ => {
4882                // Unknown type, return value as-is
4883                Ok(value.clone())
4884            }
4885        }
4886    }
4887
4888    /// Apply a method call to a value
4889    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4890        match method {
4891            "unwrap_or" => {
4892                if value.is_null() && !args.is_empty() {
4893                    Ok(args[0].clone())
4894                } else {
4895                    Ok(value.clone())
4896                }
4897            }
4898            "unwrap_or_default" => {
4899                if value.is_null() {
4900                    // Return default for common types
4901                    Ok(json!(0))
4902                } else {
4903                    Ok(value.clone())
4904                }
4905            }
4906            "is_some" => Ok(json!(!value.is_null())),
4907            "is_none" => Ok(json!(value.is_null())),
4908            "abs" => {
4909                if let Some(n) = value.as_i64() {
4910                    Ok(json!(n.abs()))
4911                } else if let Some(n) = value.as_f64() {
4912                    Ok(json!(n.abs()))
4913                } else {
4914                    Err(format!("Cannot call abs() on {:?}", value).into())
4915                }
4916            }
4917            "len" => {
4918                if let Some(s) = value.as_str() {
4919                    Ok(json!(s.len()))
4920                } else if let Some(arr) = value.as_array() {
4921                    Ok(json!(arr.len()))
4922                } else if let Some(obj) = value.as_object() {
4923                    Ok(json!(obj.len()))
4924                } else {
4925                    Err(format!("Cannot call len() on {:?}", value).into())
4926                }
4927            }
4928            "to_string" => Ok(json!(value.to_string())),
4929            "min" => {
4930                if args.is_empty() {
4931                    return Err("min() requires an argument".into());
4932                }
4933                let other = &args[0];
4934                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4935                    Ok(json!(a.min(b)))
4936                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4937                    Ok(json!(a.min(b)))
4938                } else {
4939                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4940                }
4941            }
4942            "max" => {
4943                if args.is_empty() {
4944                    return Err("max() requires an argument".into());
4945                }
4946                let other = &args[0];
4947                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4948                    Ok(json!(a.max(b)))
4949                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4950                    Ok(json!(a.max(b)))
4951                } else {
4952                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4953                }
4954            }
4955            "saturating_add" => {
4956                if args.is_empty() {
4957                    return Err("saturating_add() requires an argument".into());
4958                }
4959                let other = &args[0];
4960                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4961                    Ok(json!(a.saturating_add(b)))
4962                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4963                    Ok(json!(a.saturating_add(b)))
4964                } else {
4965                    Err(format!(
4966                        "Cannot call saturating_add() on {:?} and {:?}",
4967                        value, other
4968                    )
4969                    .into())
4970                }
4971            }
4972            "saturating_sub" => {
4973                if args.is_empty() {
4974                    return Err("saturating_sub() requires an argument".into());
4975                }
4976                let other = &args[0];
4977                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4978                    Ok(json!(a.saturating_sub(b)))
4979                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4980                    Ok(json!(a.saturating_sub(b)))
4981                } else {
4982                    Err(format!(
4983                        "Cannot call saturating_sub() on {:?} and {:?}",
4984                        value, other
4985                    )
4986                    .into())
4987                }
4988            }
4989            _ => Err(format!("Unknown method call: {}()", method).into()),
4990        }
4991    }
4992
4993    /// Evaluate all computed fields for an entity and update the state
4994    /// This takes a list of ComputedFieldSpec from the AST and applies them
4995    pub fn evaluate_computed_fields_from_ast(
4996        &self,
4997        state: &mut Value,
4998        computed_field_specs: &[ComputedFieldSpec],
4999    ) -> Result<Vec<String>> {
5000        let mut updated_paths = Vec::new();
5001
5002        crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
5003
5004        for spec in computed_field_specs {
5005            match self.evaluate_computed_expr(&spec.expression, state) {
5006                Ok(result) => {
5007                    self.set_field_in_state(state, &spec.target_path, result)?;
5008                    updated_paths.push(spec.target_path.clone());
5009                }
5010                Err(e) => {
5011                    tracing::warn!(
5012                        target_path = %spec.target_path,
5013                        error = %e,
5014                        "Failed to evaluate computed field"
5015                    );
5016                }
5017            }
5018        }
5019
5020        Ok(updated_paths)
5021    }
5022
5023    /// Set a field value in state by path (e.g., "section.field")
5024    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
5025        let segments: Vec<&str> = path.split('.').collect();
5026
5027        if segments.is_empty() {
5028            return Err("Empty path".into());
5029        }
5030
5031        // Navigate to parent, creating intermediate objects as needed
5032        let mut current = state;
5033        for (i, segment) in segments.iter().enumerate() {
5034            if i == segments.len() - 1 {
5035                // Last segment - set the value
5036                if let Some(obj) = current.as_object_mut() {
5037                    obj.insert(segment.to_string(), value);
5038                    return Ok(());
5039                } else {
5040                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
5041                }
5042            } else {
5043                // Intermediate segment - navigate or create
5044                if !current.is_object() {
5045                    *current = json!({});
5046                }
5047                let obj = current.as_object_mut().unwrap();
5048                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
5049            }
5050        }
5051
5052        Ok(())
5053    }
5054
5055    /// Create a computed fields evaluator closure from AST specs
5056    /// This returns a function that can be passed to the bytecode builder
5057    pub fn create_evaluator_from_specs(
5058        specs: Vec<ComputedFieldSpec>,
5059    ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
5060        move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
5061            let mut vm = VmContext::new();
5062            vm.current_context = Some(UpdateContext {
5063                slot: context_slot,
5064                timestamp: Some(context_timestamp),
5065                ..Default::default()
5066            });
5067            vm.evaluate_computed_fields_from_ast(state, &specs)?;
5068            Ok(())
5069        }
5070    }
5071}
5072
5073impl Default for VmContext {
5074    fn default() -> Self {
5075        Self::new()
5076    }
5077}
5078
5079// Implement the ReverseLookupUpdater trait for VmContext
5080impl crate::resolvers::ReverseLookupUpdater for VmContext {
5081    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
5082        // Use default state_id=0 and default lookup name
5083        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
5084            .unwrap_or_else(|e| {
5085                tracing::error!("Failed to update PDA reverse lookup: {}", e);
5086                Vec::new()
5087            })
5088    }
5089
5090    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
5091        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
5092        self.flush_pending_updates(0, pda_address)
5093            .unwrap_or_else(|e| {
5094                tracing::error!("Failed to flush pending updates: {}", e);
5095                Vec::new()
5096            })
5097    }
5098}
5099
5100#[cfg(test)]
5101mod tests {
5102    use super::*;
5103    use crate::ast::{
5104        BinaryOp, ComputedExpr, ComputedFieldSpec, HttpMethod, UrlResolverConfig, UrlSource,
5105    };
5106
5107    #[test]
5108    fn test_url_resolver_cache_key_uses_method_and_resolved_url() {
5109        let field_path_resolver = ResolverType::Url(UrlResolverConfig {
5110            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5111            method: HttpMethod::Get,
5112            extract_path: None,
5113        });
5114        let template_resolver = ResolverType::Url(UrlResolverConfig {
5115            url_source: UrlSource::Template(vec![ast::UrlTemplatePart::Literal(
5116                "https://example.com/metadata".to_string(),
5117            )]),
5118            method: HttpMethod::Get,
5119            extract_path: Some("data".to_string()),
5120        });
5121        let input = json!("https://cdn.example.com/token.json");
5122
5123        assert_eq!(
5124            resolver_cache_key(&field_path_resolver, &input),
5125            resolver_cache_key(&template_resolver, &input)
5126        );
5127    }
5128
5129    #[test]
5130    fn test_url_resolver_cache_key_distinguishes_http_method() {
5131        let get_resolver = ResolverType::Url(UrlResolverConfig {
5132            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5133            method: HttpMethod::Get,
5134            extract_path: None,
5135        });
5136        let post_resolver = ResolverType::Url(UrlResolverConfig {
5137            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5138            method: HttpMethod::Post,
5139            extract_path: None,
5140        });
5141        let input = json!("https://api.example.com/round");
5142
5143        assert_ne!(
5144            resolver_cache_key(&get_resolver, &input),
5145            resolver_cache_key(&post_resolver, &input)
5146        );
5147    }
5148
5149    #[test]
5150    fn test_expired_resolver_cache_entry_is_dropped() {
5151        let mut vm = VmContext::new();
5152        let resolver = ResolverType::Url(UrlResolverConfig {
5153            url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5154            method: HttpMethod::Get,
5155            extract_path: None,
5156        });
5157        let input = json!("https://cdn.example.com/token.json");
5158        let cache_key = resolver_cache_key(&resolver, &input);
5159
5160        vm.resolver_cache.put(
5161            cache_key.clone(),
5162            ResolverCacheEntry {
5163                value: json!({ "name": "Token" }),
5164                cached_at: Instant::now() - resolver_cache_ttl() - Duration::from_secs(1),
5165            },
5166        );
5167
5168        assert!(vm.get_cached_resolver_value(&cache_key).is_none());
5169        assert!(vm.resolver_cache.get(&cache_key).is_none());
5170    }
5171
5172    #[test]
5173    fn test_computed_field_preserves_integer_type() {
5174        let vm = VmContext::new();
5175
5176        let mut state = serde_json::json!({
5177            "trading": {
5178                "total_buy_volume": 20000000000_i64,
5179                "total_sell_volume": 17951316474_i64
5180            }
5181        });
5182
5183        let spec = ComputedFieldSpec {
5184            target_path: "trading.total_volume".to_string(),
5185            result_type: "Option<u64>".to_string(),
5186            expression: ComputedExpr::Binary {
5187                op: BinaryOp::Add,
5188                left: Box::new(ComputedExpr::UnwrapOr {
5189                    expr: Box::new(ComputedExpr::FieldRef {
5190                        path: "trading.total_buy_volume".to_string(),
5191                    }),
5192                    default: serde_json::json!(0),
5193                }),
5194                right: Box::new(ComputedExpr::UnwrapOr {
5195                    expr: Box::new(ComputedExpr::FieldRef {
5196                        path: "trading.total_sell_volume".to_string(),
5197                    }),
5198                    default: serde_json::json!(0),
5199                }),
5200            },
5201        };
5202
5203        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
5204            .unwrap();
5205
5206        let total_volume = state
5207            .get("trading")
5208            .and_then(|t| t.get("total_volume"))
5209            .expect("total_volume should exist");
5210
5211        let serialized = serde_json::to_string(total_volume).unwrap();
5212        assert!(
5213            !serialized.contains('.'),
5214            "Integer should not have decimal point: {}",
5215            serialized
5216        );
5217        assert_eq!(
5218            total_volume.as_i64(),
5219            Some(37951316474),
5220            "Value should be correct sum"
5221        );
5222    }
5223
5224    #[test]
5225    fn test_set_field_sum_preserves_integer_type() {
5226        let mut vm = VmContext::new();
5227        vm.registers[0] = serde_json::json!({});
5228        vm.registers[1] = serde_json::json!(20000000000_i64);
5229        vm.registers[2] = serde_json::json!(17951316474_i64);
5230
5231        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
5232        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
5233
5234        let state = &vm.registers[0];
5235        let buy_vol = state
5236            .get("trading")
5237            .and_then(|t| t.get("total_buy_volume"))
5238            .unwrap();
5239        let sell_vol = state
5240            .get("trading")
5241            .and_then(|t| t.get("total_sell_volume"))
5242            .unwrap();
5243
5244        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
5245        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
5246
5247        assert!(
5248            !buy_serialized.contains('.'),
5249            "Buy volume should not have decimal: {}",
5250            buy_serialized
5251        );
5252        assert!(
5253            !sell_serialized.contains('.'),
5254            "Sell volume should not have decimal: {}",
5255            sell_serialized
5256        );
5257    }
5258
5259    #[test]
5260    fn test_lookup_index_chaining() {
5261        let mut vm = VmContext::new();
5262
5263        let state = vm.states.get_mut(&0).unwrap();
5264
5265        state
5266            .pda_reverse_lookups
5267            .entry("default_pda_lookup".to_string())
5268            .or_insert_with(|| PdaReverseLookup::new(1000))
5269            .insert("pda_123".to_string(), "addr_456".to_string());
5270
5271        state
5272            .lookup_indexes
5273            .entry("round_address_lookup_index".to_string())
5274            .or_insert_with(LookupIndex::new)
5275            .insert(json!("addr_456"), json!(789));
5276
5277        let handler = vec![
5278            OpCode::LoadConstant {
5279                value: json!("pda_123"),
5280                dest: 0,
5281            },
5282            OpCode::LookupIndex {
5283                state_id: 0,
5284                index_name: "round_address_lookup_index".to_string(),
5285                lookup_value: 0,
5286                dest: 1,
5287            },
5288        ];
5289
5290        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5291            .unwrap();
5292
5293        assert_eq!(vm.registers[1], json!(789));
5294    }
5295
5296    #[test]
5297    fn test_lookup_index_no_chain() {
5298        let mut vm = VmContext::new();
5299
5300        let state = vm.states.get_mut(&0).unwrap();
5301        state
5302            .lookup_indexes
5303            .entry("test_index".to_string())
5304            .or_insert_with(LookupIndex::new)
5305            .insert(json!("key_abc"), json!(42));
5306
5307        let handler = vec![
5308            OpCode::LoadConstant {
5309                value: json!("key_abc"),
5310                dest: 0,
5311            },
5312            OpCode::LookupIndex {
5313                state_id: 0,
5314                index_name: "test_index".to_string(),
5315                lookup_value: 0,
5316                dest: 1,
5317            },
5318        ];
5319
5320        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5321            .unwrap();
5322
5323        assert_eq!(vm.registers[1], json!(42));
5324    }
5325
5326    #[test]
5327    fn test_conditional_set_field_with_zero_array() {
5328        let mut vm = VmContext::new();
5329
5330        let event_zeros = json!({
5331            "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5332                      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5333        });
5334
5335        let event_nonzero = json!({
5336            "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
5337                      17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
5338        });
5339
5340        let zero_32: Value = json!([
5341            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5342            0, 0, 0
5343        ]);
5344
5345        let handler = vec![
5346            OpCode::CreateObject { dest: 2 },
5347            OpCode::LoadEventField {
5348                path: FieldPath::new(&["value"]),
5349                dest: 10,
5350                default: None,
5351            },
5352            OpCode::ConditionalSetField {
5353                object: 2,
5354                path: "captured_value".to_string(),
5355                value: 10,
5356                condition_field: FieldPath::new(&["value"]),
5357                condition_op: ComparisonOp::NotEqual,
5358                condition_value: zero_32,
5359            },
5360        ];
5361
5362        vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
5363            .unwrap();
5364        assert!(
5365            vm.registers[2].get("captured_value").is_none(),
5366            "Field should not be set when value is all zeros"
5367        );
5368
5369        vm.reset_registers();
5370        vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
5371            .unwrap();
5372        assert!(
5373            vm.registers[2].get("captured_value").is_some(),
5374            "Field should be set when value is non-zero"
5375        );
5376    }
5377
5378    #[test]
5379    fn test_when_instruction_arrives_first() {
5380        let mut vm = VmContext::new();
5381
5382        let signature = "test_sig_123".to_string();
5383
5384        {
5385            let state = vm.states.get(&0).unwrap();
5386            let mut cache = state.recent_tx_instructions.lock().unwrap();
5387            let mut set = HashSet::new();
5388            set.insert("RevealIxState".to_string());
5389            cache.put(signature.clone(), set);
5390        }
5391
5392        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5393
5394        let handler = vec![
5395            OpCode::CreateObject { dest: 2 },
5396            OpCode::LoadConstant {
5397                value: json!("primary_key_value"),
5398                dest: 1,
5399            },
5400            OpCode::LoadConstant {
5401                value: json!("the_revealed_value"),
5402                dest: 10,
5403            },
5404            OpCode::SetFieldWhen {
5405                object: 2,
5406                path: "entropy_value".to_string(),
5407                value: 10,
5408                when_instruction: "RevealIxState".to_string(),
5409                entity_name: "TestEntity".to_string(),
5410                key_reg: 1,
5411                condition_field: None,
5412                condition_op: None,
5413                condition_value: None,
5414            },
5415        ];
5416
5417        vm.execute_handler(
5418            &handler,
5419            &json!({}),
5420            "VarState",
5421            0,
5422            "TestEntity",
5423            None,
5424            None,
5425        )
5426        .unwrap();
5427
5428        assert_eq!(
5429            vm.registers[2].get("entropy_value").unwrap(),
5430            "the_revealed_value",
5431            "Field should be set when instruction was already seen"
5432        );
5433    }
5434
5435    #[test]
5436    fn test_when_account_arrives_first() {
5437        let mut vm = VmContext::new();
5438
5439        let signature = "test_sig_456".to_string();
5440
5441        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5442
5443        let handler = vec![
5444            OpCode::CreateObject { dest: 2 },
5445            OpCode::LoadConstant {
5446                value: json!("pk_123"),
5447                dest: 1,
5448            },
5449            OpCode::LoadConstant {
5450                value: json!("deferred_value"),
5451                dest: 10,
5452            },
5453            OpCode::SetFieldWhen {
5454                object: 2,
5455                path: "entropy_value".to_string(),
5456                value: 10,
5457                when_instruction: "RevealIxState".to_string(),
5458                entity_name: "TestEntity".to_string(),
5459                key_reg: 1,
5460                condition_field: None,
5461                condition_op: None,
5462                condition_value: None,
5463            },
5464        ];
5465
5466        vm.execute_handler(
5467            &handler,
5468            &json!({}),
5469            "VarState",
5470            0,
5471            "TestEntity",
5472            None,
5473            None,
5474        )
5475        .unwrap();
5476
5477        assert!(
5478            vm.registers[2].get("entropy_value").is_none(),
5479            "Field should not be set when instruction hasn't been seen"
5480        );
5481
5482        let state = vm.states.get(&0).unwrap();
5483        let key = (signature.clone(), "RevealIxState".to_string());
5484        assert!(
5485            state.deferred_when_ops.contains_key(&key),
5486            "Operation should be queued"
5487        );
5488
5489        {
5490            let mut cache = state.recent_tx_instructions.lock().unwrap();
5491            let mut set = HashSet::new();
5492            set.insert("RevealIxState".to_string());
5493            cache.put(signature.clone(), set);
5494        }
5495
5496        let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5497        for op in deferred {
5498            vm.apply_deferred_when_op(0, &op, None, None).unwrap();
5499        }
5500
5501        let state = vm.states.get(&0).unwrap();
5502        let entity = state.data.get(&json!("pk_123")).unwrap();
5503        assert_eq!(
5504            entity.get("entropy_value").unwrap(),
5505            "deferred_value",
5506            "Field should be set after instruction arrives"
5507        );
5508    }
5509
5510    #[test]
5511    fn test_when_cleanup_expired() {
5512        let mut vm = VmContext::new();
5513
5514        let state = vm.states.get(&0).unwrap();
5515        let key = ("old_sig".to_string(), "SomeIxState".to_string());
5516        state.deferred_when_ops.insert(
5517            key,
5518            vec![DeferredWhenOperation {
5519                entity_name: "Test".to_string(),
5520                primary_key: json!("pk"),
5521                field_path: "field".to_string(),
5522                field_value: json!("value"),
5523                when_instruction: "SomeIxState".to_string(),
5524                signature: "old_sig".to_string(),
5525                slot: 0,
5526                deferred_at: 0,
5527                emit: true,
5528            }],
5529        );
5530
5531        let removed = vm.cleanup_expired_when_ops(0, 60);
5532
5533        assert_eq!(removed, 1, "Should have removed 1 expired op");
5534        assert!(
5535            vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5536            "Deferred ops should be empty after cleanup"
5537        );
5538    }
5539
5540    #[test]
5541    fn test_deferred_when_op_recomputes_dependent_fields() {
5542        use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5543
5544        let mut vm = VmContext::new();
5545
5546        // Create computed field specs similar to the ore stack:
5547        // pre_reveal_rng depends on base_value
5548        // pre_reveal_winning_square depends on pre_reveal_rng
5549        let computed_specs = vec![
5550            ComputedFieldSpec {
5551                target_path: "results.pre_reveal_rng".to_string(),
5552                result_type: "Option<u64>".to_string(),
5553                expression: ComputedExpr::FieldRef {
5554                    path: "entropy.base_value".to_string(),
5555                },
5556            },
5557            ComputedFieldSpec {
5558                target_path: "results.pre_reveal_winning_square".to_string(),
5559                result_type: "Option<u64>".to_string(),
5560                expression: ComputedExpr::MethodCall {
5561                    expr: Box::new(ComputedExpr::FieldRef {
5562                        path: "results.pre_reveal_rng".to_string(),
5563                    }),
5564                    method: "map".to_string(),
5565                    args: vec![ComputedExpr::Closure {
5566                        param: "r".to_string(),
5567                        body: Box::new(ComputedExpr::Binary {
5568                            op: BinaryOp::Mod,
5569                            left: Box::new(ComputedExpr::Var {
5570                                name: "r".to_string(),
5571                            }),
5572                            right: Box::new(ComputedExpr::Literal {
5573                                value: serde_json::json!(25),
5574                            }),
5575                        }),
5576                    }],
5577                },
5578            },
5579        ];
5580
5581        let evaluator: Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync> =
5582            Box::new(VmContext::create_evaluator_from_specs(computed_specs));
5583
5584        // Test that when we set entropy.base_value via deferred when-op,
5585        // both computed fields are updated
5586        let primary_key = json!("test_pk");
5587        let op = DeferredWhenOperation {
5588            entity_name: "TestEntity".to_string(),
5589            primary_key: primary_key.clone(),
5590            field_path: "entropy.base_value".to_string(),
5591            field_value: json!(100),
5592            when_instruction: "TestIxState".to_string(),
5593            signature: "test_sig".to_string(),
5594            slot: 100,
5595            deferred_at: 0,
5596            emit: true,
5597        };
5598
5599        // Store the entity in state first
5600        let initial_state = json!({
5601            "results": {}
5602        });
5603        vm.states
5604            .get(&0)
5605            .unwrap()
5606            .insert_with_eviction(primary_key.clone(), initial_state);
5607
5608        // Apply the deferred when-op with the evaluator
5609        let mutations = vm
5610            .apply_deferred_when_op(
5611                0,
5612                &op,
5613                Some(&evaluator),
5614                Some(&[
5615                    "results.pre_reveal_rng".to_string(),
5616                    "results.pre_reveal_winning_square".to_string(),
5617                ]),
5618            )
5619            .unwrap();
5620
5621        // Check the entity state
5622        let state = vm.states.get(&0).unwrap();
5623        let entity = state.data.get(&primary_key).unwrap();
5624
5625        println!(
5626            "Entity state: {}",
5627            serde_json::to_string_pretty(&*entity).unwrap()
5628        );
5629
5630        // Verify base value was set
5631        assert_eq!(
5632            entity.get("entropy").and_then(|e| e.get("base_value")),
5633            Some(&json!(100)),
5634            "Base value should be set"
5635        );
5636
5637        // Verify computed fields were calculated
5638        let pre_reveal_rng = entity
5639            .get("results")
5640            .and_then(|r| r.get("pre_reveal_rng"))
5641            .cloned();
5642        let pre_reveal_winning_square = entity
5643            .get("results")
5644            .and_then(|r| r.get("pre_reveal_winning_square"))
5645            .cloned();
5646
5647        assert_eq!(
5648            pre_reveal_rng,
5649            Some(json!(100)),
5650            "pre_reveal_rng should be computed"
5651        );
5652        assert_eq!(
5653            pre_reveal_winning_square,
5654            Some(json!(0)),
5655            "pre_reveal_winning_square should be 100 % 25 = 0"
5656        );
5657
5658        // Verify mutations include computed fields
5659        assert!(!mutations.is_empty(), "Should have mutations");
5660        let mutation = &mutations[0];
5661        let patch = &mutation.patch;
5662
5663        assert!(
5664            patch
5665                .get("results")
5666                .and_then(|r| r.get("pre_reveal_rng"))
5667                .is_some(),
5668            "Mutation should include pre_reveal_rng"
5669        );
5670        assert!(
5671            patch
5672                .get("results")
5673                .and_then(|r| r.get("pre_reveal_winning_square"))
5674                .is_some(),
5675            "Mutation should include pre_reveal_winning_square"
5676        );
5677    }
5678}