1use crate::ast::{
2 self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3 ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::debugger::{VmDebugEvent, VmDebugger, VmLookupHop};
7use crate::Mutation;
8use dashmap::DashMap;
9use lru::LruCache;
10use once_cell::sync::Lazy;
11use serde_json::{json, Value};
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::num::NonZeroUsize;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17#[cfg(feature = "otel")]
18use tracing::instrument;
19#[derive(Debug, Clone, Default)]
22pub struct UpdateContext {
23 pub slot: Option<u64>,
25 pub signature: Option<String>,
27 pub timestamp: Option<i64>,
30 pub write_version: Option<u64>,
33 pub txn_index: Option<u64>,
36 pub skip_resolvers: bool,
40 pub metadata: HashMap<String, Value>,
42}
43
44impl UpdateContext {
45 pub fn new(slot: u64, signature: String) -> Self {
47 Self {
48 slot: Some(slot),
49 signature: Some(signature),
50 timestamp: None,
51 write_version: None,
52 txn_index: None,
53 skip_resolvers: false,
54 metadata: HashMap::new(),
55 }
56 }
57
58 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
60 Self {
61 slot: Some(slot),
62 signature: Some(signature),
63 timestamp: Some(timestamp),
64 write_version: None,
65 txn_index: None,
66 skip_resolvers: false,
67 metadata: HashMap::new(),
68 }
69 }
70
71 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
73 Self {
74 slot: Some(slot),
75 signature: Some(signature),
76 timestamp: None,
77 write_version: Some(write_version),
78 txn_index: None,
79 skip_resolvers: false,
80 metadata: HashMap::new(),
81 }
82 }
83
84 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
86 Self {
87 slot: Some(slot),
88 signature: Some(signature),
89 timestamp: None,
90 write_version: None,
91 txn_index: Some(txn_index),
92 skip_resolvers: false,
93 metadata: HashMap::new(),
94 }
95 }
96
97 pub fn new_reprocessed(slot: u64, write_version: u64) -> Self {
101 Self {
102 slot: Some(slot),
103 signature: None,
104 timestamp: None,
105 write_version: Some(write_version),
106 txn_index: None,
107 skip_resolvers: true,
108 metadata: HashMap::new(),
109 }
110 }
111
112 pub fn timestamp(&self) -> i64 {
114 self.timestamp.unwrap_or_else(|| {
115 std::time::SystemTime::now()
116 .duration_since(std::time::UNIX_EPOCH)
117 .unwrap()
118 .as_secs() as i64
119 })
120 }
121
122 pub fn empty() -> Self {
124 Self::default()
125 }
126
127 pub fn is_account_update(&self) -> bool {
130 self.write_version.is_some() && self.txn_index.is_none()
131 }
132
133 pub fn is_instruction_update(&self) -> bool {
135 self.txn_index.is_some() && self.write_version.is_none()
136 }
137
138 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
139 self.metadata.insert(key, value);
140 self
141 }
142
143 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
145 self.metadata.get(key)
146 }
147
148 pub fn to_value(&self) -> Value {
150 let mut obj = serde_json::Map::new();
151 if let Some(slot) = self.slot {
152 obj.insert("slot".to_string(), json!(slot));
153 }
154 if let Some(ref sig) = self.signature {
155 obj.insert("signature".to_string(), json!(sig));
156 }
157 obj.insert("timestamp".to_string(), json!(self.timestamp()));
159 for (key, value) in &self.metadata {
160 obj.insert(key.clone(), value.clone());
161 }
162 Value::Object(obj)
163 }
164}
165
166pub type Register = usize;
167pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
168
169pub type RegisterValue = Value;
170
171pub trait ComputedFieldsEvaluator {
174 fn evaluate(&self, state: &mut Value) -> Result<()>;
175}
176
177const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
179const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
180const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
185
186const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
188const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
189
190const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
191
192const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
193
194const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
197
198const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
199
200const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
201
202const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 20_000;
203const DEFAULT_RESOLVER_CACHE_TTL_SECS: u64 = 3600; static RESOLVER_CACHE_CAPACITY: Lazy<NonZeroUsize> = Lazy::new(|| {
206 NonZeroUsize::new(
207 std::env::var("ARETE_RESOLVER_CACHE_CAPACITY")
208 .ok()
209 .and_then(|value| value.parse::<usize>().ok())
210 .filter(|value| *value > 0)
211 .unwrap_or(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES),
212 )
213 .expect("resolver cache capacity must be > 0")
214});
215
216static RESOLVER_CACHE_TTL: Lazy<Duration> = Lazy::new(|| {
217 let ttl_secs = match std::env::var("ARETE_RESOLVER_CACHE_TTL_SECS") {
218 Ok(value) => match value.parse::<u64>() {
219 Ok(0) => {
220 tracing::warn!(
221 default_ttl_secs = DEFAULT_RESOLVER_CACHE_TTL_SECS,
222 "ARETE_RESOLVER_CACHE_TTL_SECS=0 is not supported; using default"
223 );
224 DEFAULT_RESOLVER_CACHE_TTL_SECS
225 }
226 Ok(value) => value,
227 Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
228 },
229 Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
230 };
231
232 Duration::from_secs(ttl_secs)
233});
234
235#[derive(Debug, Clone)]
236struct ResolverCacheEntry {
237 value: Value,
238 cached_at: Instant,
239}
240
241fn resolver_cache_capacity() -> NonZeroUsize {
242 *RESOLVER_CACHE_CAPACITY
243}
244
245fn resolver_cache_ttl() -> Duration {
246 *RESOLVER_CACHE_TTL
247}
248
249fn estimate_json_size(value: &Value) -> usize {
251 match value {
252 Value::Null => 4,
253 Value::Bool(_) => 5,
254 Value::Number(_) => 8,
255 Value::String(s) => s.len() + 2,
256 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
257 Value::Object(obj) => {
258 2 + obj
259 .iter()
260 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
261 .sum::<usize>()
262 }
263 }
264}
265
266#[derive(Debug, Clone)]
267pub struct CompiledPath {
268 pub segments: std::sync::Arc<[String]>,
269}
270
271impl CompiledPath {
272 pub fn new(path: &str) -> Self {
273 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
274 CompiledPath {
275 segments: segments.into(),
276 }
277 }
278
279 fn segments(&self) -> &[String] {
280 &self.segments
281 }
282}
283
284#[derive(Debug, Clone)]
287pub enum FieldChange {
288 Replaced,
290 Appended(Vec<Value>),
292}
293
294#[derive(Debug, Clone, Default)]
297pub struct DirtyTracker {
298 changes: HashMap<String, FieldChange>,
299}
300
301impl DirtyTracker {
302 pub fn new() -> Self {
304 Self {
305 changes: HashMap::new(),
306 }
307 }
308
309 pub fn mark_replaced(&mut self, path: &str) {
311 self.changes.insert(path.to_string(), FieldChange::Replaced);
313 }
314
315 pub fn mark_appended(&mut self, path: &str, value: Value) {
317 match self.changes.get_mut(path) {
318 Some(FieldChange::Appended(values)) => {
319 values.push(value);
321 }
322 Some(FieldChange::Replaced) => {
323 }
326 None => {
327 self.changes
329 .insert(path.to_string(), FieldChange::Appended(vec![value]));
330 }
331 }
332 }
333
334 pub fn is_empty(&self) -> bool {
336 self.changes.is_empty()
337 }
338
339 pub fn len(&self) -> usize {
341 self.changes.len()
342 }
343
344 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
346 self.changes.iter()
347 }
348
349 pub fn dirty_paths(&self) -> HashSet<String> {
351 self.changes.keys().cloned().collect()
352 }
353
354 pub fn into_changes(self) -> HashMap<String, FieldChange> {
356 self.changes
357 }
358
359 pub fn changes(&self) -> &HashMap<String, FieldChange> {
361 &self.changes
362 }
363
364 pub fn appended_paths(&self) -> Vec<String> {
366 self.changes
367 .iter()
368 .filter_map(|(path, change)| match change {
369 FieldChange::Appended(_) => Some(path.clone()),
370 FieldChange::Replaced => None,
371 })
372 .collect()
373 }
374}
375
376pub struct VmContext {
377 registers: Vec<RegisterValue>,
378 states: HashMap<u32, StateTable>,
379 debugger: Option<Arc<dyn VmDebugger>>,
380 pub instructions_executed: u64,
381 pub cache_hits: u64,
382 path_cache: HashMap<String, CompiledPath>,
383 pub pda_cache_hits: u64,
384 pub pda_cache_misses: u64,
385 pub pending_queue_size: u64,
386 resolver_requests: VecDeque<ResolverRequest>,
387 resolver_pending: HashMap<String, PendingResolverEntry>,
388 resolver_cache: LruCache<String, ResolverCacheEntry>,
389 pub resolver_cache_hits: u64,
390 pub resolver_cache_misses: u64,
391 current_context: Option<UpdateContext>,
392 warnings: Vec<String>,
393 last_pda_lookup_miss: Option<String>,
394 last_lookup_index_miss: Option<String>,
395 last_pda_registered: Option<String>,
396 last_lookup_index_keys: Vec<String>,
397 pending_pda_reprocess_updates: Vec<PendingAccountUpdate>,
398 scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
399}
400
401#[derive(Debug)]
402pub struct LookupIndex {
403 index: std::sync::Mutex<LruCache<String, Value>>,
404}
405
406impl LookupIndex {
407 pub fn new() -> Self {
408 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
409 }
410
411 pub fn with_capacity(capacity: usize) -> Self {
412 LookupIndex {
413 index: std::sync::Mutex::new(LruCache::new(
414 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
415 )),
416 }
417 }
418
419 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
420 let key = value_to_cache_key(lookup_value);
421 self.index.lock().unwrap().get(&key).cloned()
422 }
423
424 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
425 let key = value_to_cache_key(&lookup_value);
426 self.index.lock().unwrap().put(key, primary_key);
427 }
428
429 pub fn remove(&self, lookup_value: &Value) {
432 let key = value_to_cache_key(lookup_value);
433 self.index.lock().unwrap().pop(&key);
434 }
435
436 pub fn len(&self) -> usize {
437 self.index.lock().unwrap().len()
438 }
439
440 pub fn is_empty(&self) -> bool {
441 self.index.lock().unwrap().is_empty()
442 }
443}
444
445impl Default for LookupIndex {
446 fn default() -> Self {
447 Self::new()
448 }
449}
450
451fn value_to_cache_key(value: &Value) -> String {
452 match value {
453 Value::String(s) => s.clone(),
454 Value::Number(n) => n.to_string(),
455 Value::Bool(b) => b.to_string(),
456 Value::Null => "null".to_string(),
457 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
458 }
459}
460
461pub(crate) fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
462 match resolver {
463 ResolverType::Token => format!("token:{}", value_to_cache_key(input)),
464 ResolverType::Url(config) => {
465 let method = match config.method {
466 ast::HttpMethod::Get => "get",
467 ast::HttpMethod::Post => "post",
468 };
469 format!("url:{}:{}", method, value_to_cache_key(input))
470 }
471 }
472}
473
474#[derive(Debug)]
475pub struct TemporalIndex {
476 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
477}
478
479impl Default for TemporalIndex {
480 fn default() -> Self {
481 Self::new()
482 }
483}
484
485impl TemporalIndex {
486 pub fn new() -> Self {
487 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
488 }
489
490 pub fn with_capacity(capacity: usize) -> Self {
491 TemporalIndex {
492 index: std::sync::Mutex::new(LruCache::new(
493 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
494 )),
495 }
496 }
497
498 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
499 let key = value_to_cache_key(lookup_value);
500 let mut cache = self.index.lock().unwrap();
501 if let Some(entries) = cache.get(&key) {
502 for i in (0..entries.len()).rev() {
503 if entries[i].1 <= timestamp {
504 return Some(entries[i].0.clone());
505 }
506 }
507 }
508 None
509 }
510
511 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
512 let key = value_to_cache_key(lookup_value);
513 let mut cache = self.index.lock().unwrap();
514 if let Some(entries) = cache.get(&key) {
515 if let Some(last) = entries.last() {
516 return Some(last.0.clone());
517 }
518 }
519 None
520 }
521
522 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
523 let key = value_to_cache_key(&lookup_value);
524 let mut cache = self.index.lock().unwrap();
525
526 let entries = cache.get_or_insert_mut(key, Vec::new);
527 entries.push((primary_key, timestamp));
528 entries.sort_by_key(|(_, ts)| *ts);
529
530 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
531 entries.retain(|(_, ts)| *ts >= cutoff);
532
533 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
534 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
535 entries.drain(0..excess);
536 }
537 }
538
539 pub fn len(&self) -> usize {
540 self.index.lock().unwrap().len()
541 }
542
543 pub fn is_empty(&self) -> bool {
544 self.index.lock().unwrap().is_empty()
545 }
546
547 pub fn total_entries(&self) -> usize {
548 self.index
549 .lock()
550 .unwrap()
551 .iter()
552 .map(|(_, entries)| entries.len())
553 .sum()
554 }
555
556 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
557 let mut cache = self.index.lock().unwrap();
558 let mut total_removed = 0;
559
560 for (_, entries) in cache.iter_mut() {
561 let original_len = entries.len();
562 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
563 total_removed += original_len - entries.len();
564 }
565
566 total_removed
567 }
568}
569
570#[derive(Debug)]
571pub struct PdaReverseLookup {
572 index: LruCache<String, String>,
574}
575
576impl PdaReverseLookup {
577 pub fn new(capacity: usize) -> Self {
578 PdaReverseLookup {
579 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
580 }
581 }
582
583 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
584 self.index.get(pda_address).cloned()
585 }
586
587 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
588 let evicted = if self.index.len() >= self.index.cap().get() {
589 self.index.peek_lru().map(|(k, _)| k.clone())
590 } else {
591 None
592 };
593
594 self.index.put(pda_address, seed_value);
595 evicted
596 }
597
598 pub fn len(&self) -> usize {
599 self.index.len()
600 }
601
602 pub fn is_empty(&self) -> bool {
603 self.index.is_empty()
604 }
605
606 pub fn contains(&self, pda_address: &str) -> bool {
607 self.index.peek(pda_address).is_some()
608 }
609}
610
611#[derive(Debug, Clone)]
613pub struct QueuedAccountUpdate {
614 pub pda_address: String,
615 pub account_type: String,
616 pub account_data: Value,
617 pub slot: u64,
618 pub write_version: u64,
619 pub signature: String,
620}
621
622#[derive(Debug, Clone)]
624pub struct PendingAccountUpdate {
625 pub account_type: String,
626 pub pda_address: String,
627 pub account_data: Value,
628 pub slot: u64,
629 pub write_version: u64,
630 pub signature: String,
631 pub queued_at: i64,
632 pub is_stale_reprocess: bool,
638}
639
640#[derive(Debug, Clone)]
642pub struct QueuedInstructionEvent {
643 pub pda_address: String,
644 pub event_type: String,
645 pub event_data: Value,
646 pub slot: u64,
647 pub signature: String,
648}
649
650#[derive(Debug, Clone)]
652pub struct PendingInstructionEvent {
653 pub event_type: String,
654 pub pda_address: String,
655 pub event_data: Value,
656 pub slot: u64,
657 pub signature: String,
658 pub queued_at: i64,
659}
660
661#[derive(Debug, Clone)]
662pub struct DeferredWhenOperation {
663 pub entity_name: String,
664 pub primary_key: Value,
665 pub field_path: String,
666 pub field_value: Value,
667 pub when_instruction: String,
668 pub signature: String,
669 pub slot: u64,
670 pub deferred_at: i64,
671 pub emit: bool,
672}
673
674#[derive(Debug, Clone)]
675pub struct ResolverRequest {
676 pub cache_key: String,
677 pub resolver: ResolverType,
678 pub input: Value,
679}
680
681#[derive(Debug, Clone)]
682pub struct ResolverTarget {
683 pub state_id: u32,
684 pub entity_name: String,
685 pub primary_key: Value,
686 pub extracts: Vec<ResolverExtractSpec>,
687}
688
689#[derive(Debug, Clone)]
690pub struct PendingResolverEntry {
691 pub resolver: ResolverType,
692 pub input: Value,
693 pub targets: Vec<ResolverTarget>,
694 pub queued_at: i64,
695}
696
697#[derive(Debug, Clone, PartialEq)]
698pub struct ScheduledCallback {
699 pub state_id: u32,
700 pub entity_name: String,
701 pub primary_key: Value,
702 pub resolver: ResolverType,
703 pub url_template: Option<Vec<ast::UrlTemplatePart>>,
704 pub input_value: Option<Value>,
705 pub input_path: Option<String>,
706 pub condition: Option<ast::ResolverCondition>,
707 pub strategy: ResolveStrategy,
708 pub extracts: Vec<ResolverExtractSpec>,
709 pub retry_count: u32,
710}
711
712impl PendingResolverEntry {
713 fn add_target(&mut self, target: ResolverTarget) {
714 if let Some(existing) = self.targets.iter_mut().find(|t| {
715 t.state_id == target.state_id
716 && t.entity_name == target.entity_name
717 && t.primary_key == target.primary_key
718 }) {
719 let mut seen = HashSet::new();
720 for extract in &existing.extracts {
721 seen.insert((extract.target_path.clone(), extract.source_path.clone()));
722 }
723
724 for extract in target.extracts {
725 let key = (extract.target_path.clone(), extract.source_path.clone());
726 if seen.insert(key) {
727 existing.extracts.push(extract);
728 }
729 }
730 } else {
731 self.targets.push(target);
732 }
733 }
734}
735
736#[derive(Debug, Clone)]
737pub struct PendingQueueStats {
738 pub total_updates: usize,
739 pub unique_pdas: usize,
740 pub oldest_age_seconds: i64,
741 pub largest_pda_queue_size: usize,
742 pub estimated_memory_bytes: usize,
743}
744
745#[derive(Debug, Clone, Default)]
746pub struct VmMemoryStats {
747 pub state_table_entity_count: usize,
748 pub state_table_max_entries: usize,
749 pub state_table_at_capacity: bool,
750 pub lookup_index_count: usize,
751 pub lookup_index_total_entries: usize,
752 pub temporal_index_count: usize,
753 pub temporal_index_total_entries: usize,
754 pub pda_reverse_lookup_count: usize,
755 pub pda_reverse_lookup_total_entries: usize,
756 pub version_tracker_entries: usize,
757 pub pending_queue_stats: Option<PendingQueueStats>,
758 pub path_cache_size: usize,
759}
760
761#[derive(Debug, Clone, Default)]
762pub struct CleanupResult {
763 pub pending_updates_removed: usize,
764 pub temporal_entries_removed: usize,
765}
766
767#[derive(Debug, Clone)]
768pub struct CapacityWarning {
769 pub current_entries: usize,
770 pub max_entries: usize,
771 pub entries_over_limit: usize,
772}
773
774#[derive(Debug, Clone)]
775pub struct StateTableConfig {
776 pub max_entries: usize,
777 pub max_array_length: usize,
778}
779
780impl Default for StateTableConfig {
781 fn default() -> Self {
782 Self {
783 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
784 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
785 }
786 }
787}
788
789#[derive(Debug)]
790pub struct VersionTracker {
791 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
792}
793
794impl VersionTracker {
795 pub fn new() -> Self {
796 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
797 }
798
799 pub fn with_capacity(capacity: usize) -> Self {
800 VersionTracker {
801 cache: std::sync::Mutex::new(LruCache::new(
802 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
803 )),
804 }
805 }
806
807 fn make_key(primary_key: &Value, event_type: &str) -> String {
808 format!("{}:{}", primary_key, event_type)
809 }
810
811 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
812 let key = Self::make_key(primary_key, event_type);
813 self.cache.lock().unwrap().get(&key).copied()
814 }
815
816 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
817 let key = Self::make_key(primary_key, event_type);
818 self.cache.lock().unwrap().put(key, (slot, ordering_value));
819 }
820
821 pub fn len(&self) -> usize {
822 self.cache.lock().unwrap().len()
823 }
824
825 pub fn is_empty(&self) -> bool {
826 self.cache.lock().unwrap().is_empty()
827 }
828}
829
830impl Default for VersionTracker {
831 fn default() -> Self {
832 Self::new()
833 }
834}
835
836#[derive(Debug)]
837pub struct StateTable {
838 pub data: DashMap<Value, Value>,
839 access_times: DashMap<Value, i64>,
840 pub lookup_indexes: HashMap<String, LookupIndex>,
841 pub temporal_indexes: HashMap<String, TemporalIndex>,
842 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
843 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
844 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
845 pub last_account_data: DashMap<String, PendingAccountUpdate>,
849 version_tracker: VersionTracker,
850 instruction_dedup_cache: VersionTracker,
851 config: StateTableConfig,
852 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
853 entity_name: String,
854 pub recent_tx_instructions:
855 std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
856 pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
857}
858
859impl StateTable {
860 pub fn is_at_capacity(&self) -> bool {
861 self.data.len() >= self.config.max_entries
862 }
863
864 pub fn entries_over_limit(&self) -> usize {
865 self.data.len().saturating_sub(self.config.max_entries)
866 }
867
868 pub fn max_array_length(&self) -> usize {
869 self.config.max_array_length
870 }
871
872 fn touch(&self, key: &Value) {
873 let now = std::time::SystemTime::now()
874 .duration_since(std::time::UNIX_EPOCH)
875 .unwrap()
876 .as_secs() as i64;
877 self.access_times.insert(key.clone(), now);
878 }
879
880 fn evict_lru(&self, count: usize) -> usize {
881 if count == 0 || self.data.is_empty() {
882 return 0;
883 }
884
885 let mut entries: Vec<(Value, i64)> = self
886 .access_times
887 .iter()
888 .map(|entry| (entry.key().clone(), *entry.value()))
889 .collect();
890
891 entries.sort_by_key(|(_, ts)| *ts);
892
893 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
894
895 let mut evicted = 0;
896 for key in to_evict {
897 self.data.remove(&key);
898 self.access_times.remove(&key);
899 evicted += 1;
900 }
901
902 #[cfg(feature = "otel")]
903 if evicted > 0 {
904 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
905 }
906
907 evicted
908 }
909
910 pub fn insert_with_eviction(&self, key: Value, value: Value) {
911 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
912 #[cfg(feature = "otel")]
913 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
914 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
915 self.evict_lru(to_evict.max(1));
916 }
917 self.data.insert(key.clone(), value);
918 self.touch(&key);
919 }
920
921 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
922 let result = self.data.get(key).map(|v| v.clone());
923 if result.is_some() {
924 self.touch(key);
925 }
926 result
927 }
928
929 pub fn is_fresh_update(
936 &self,
937 primary_key: &Value,
938 event_type: &str,
939 slot: u64,
940 ordering_value: u64,
941 ) -> bool {
942 let dominated = self
943 .version_tracker
944 .get(primary_key, event_type)
945 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
946 .unwrap_or(false);
947
948 if dominated {
949 return false;
950 }
951
952 self.version_tracker
953 .insert(primary_key, event_type, slot, ordering_value);
954 true
955 }
956
957 pub fn is_duplicate_instruction(
965 &self,
966 primary_key: &Value,
967 event_type: &str,
968 slot: u64,
969 txn_index: u64,
970 ) -> bool {
971 let is_duplicate = self
973 .instruction_dedup_cache
974 .get(primary_key, event_type)
975 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
976 .unwrap_or(false);
977
978 if is_duplicate {
979 return true;
980 }
981
982 self.instruction_dedup_cache
984 .insert(primary_key, event_type, slot, txn_index);
985 false
986 }
987}
988
989impl VmContext {
990 pub fn new() -> Self {
991 let mut vm = VmContext {
992 registers: vec![Value::Null; 256],
993 states: HashMap::new(),
994 debugger: None,
995 instructions_executed: 0,
996 cache_hits: 0,
997 path_cache: HashMap::new(),
998 pda_cache_hits: 0,
999 pda_cache_misses: 0,
1000 pending_queue_size: 0,
1001 resolver_requests: VecDeque::new(),
1002 resolver_pending: HashMap::new(),
1003 resolver_cache: LruCache::new(resolver_cache_capacity()),
1004 resolver_cache_hits: 0,
1005 resolver_cache_misses: 0,
1006 current_context: None,
1007 warnings: Vec::new(),
1008 last_pda_lookup_miss: None,
1009 last_lookup_index_miss: None,
1010 last_pda_registered: None,
1011 last_lookup_index_keys: Vec::new(),
1012 pending_pda_reprocess_updates: Vec::new(),
1013 scheduled_callbacks: Vec::new(),
1014 };
1015 vm.states.insert(
1016 0,
1017 StateTable {
1018 data: DashMap::new(),
1019 access_times: DashMap::new(),
1020 lookup_indexes: HashMap::new(),
1021 temporal_indexes: HashMap::new(),
1022 pda_reverse_lookups: HashMap::new(),
1023 pending_updates: DashMap::new(),
1024 pending_instruction_events: DashMap::new(),
1025 last_account_data: DashMap::new(),
1026 version_tracker: VersionTracker::new(),
1027 instruction_dedup_cache: VersionTracker::with_capacity(
1028 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1029 ),
1030 config: StateTableConfig::default(),
1031 entity_name: String::new(),
1032 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1033 NonZeroUsize::new(1000).unwrap(),
1034 )),
1035 deferred_when_ops: DashMap::new(),
1036 },
1037 );
1038
1039 vm
1040 }
1041
1042 pub fn set_debugger(&mut self, debugger: Arc<dyn VmDebugger>) {
1043 self.debugger = Some(debugger);
1044 }
1045
1046 pub fn clear_debugger(&mut self) {
1047 self.debugger = None;
1048 }
1049
1050 fn emit_debug<F>(&self, builder: F)
1051 where
1052 F: FnOnce() -> VmDebugEvent,
1053 {
1054 if let Some(debugger) = &self.debugger {
1055 debugger.record(builder());
1056 }
1057 }
1058
1059 fn is_debug_enabled(&self) -> bool {
1060 self.debugger.is_some()
1061 }
1062
1063 fn take_pending_pda_reprocess_updates(&mut self) -> Vec<PendingAccountUpdate> {
1064 std::mem::take(&mut self.pending_pda_reprocess_updates)
1065 }
1066
1067 pub fn new_multi_entity() -> Self {
1069 VmContext {
1070 registers: vec![Value::Null; 256],
1071 states: HashMap::new(),
1072 debugger: None,
1073 instructions_executed: 0,
1074 cache_hits: 0,
1075 path_cache: HashMap::new(),
1076 pda_cache_hits: 0,
1077 pda_cache_misses: 0,
1078 pending_queue_size: 0,
1079 resolver_requests: VecDeque::new(),
1080 resolver_pending: HashMap::new(),
1081 resolver_cache: LruCache::new(resolver_cache_capacity()),
1082 resolver_cache_hits: 0,
1083 resolver_cache_misses: 0,
1084 current_context: None,
1085 warnings: Vec::new(),
1086 last_pda_lookup_miss: None,
1087 last_lookup_index_miss: None,
1088 last_pda_registered: None,
1089 last_lookup_index_keys: Vec::new(),
1090 pending_pda_reprocess_updates: Vec::new(),
1091 scheduled_callbacks: Vec::new(),
1092 }
1093 }
1094
1095 pub fn new_with_config(state_config: StateTableConfig) -> Self {
1096 let mut vm = VmContext {
1097 registers: vec![Value::Null; 256],
1098 states: HashMap::new(),
1099 debugger: None,
1100 instructions_executed: 0,
1101 cache_hits: 0,
1102 path_cache: HashMap::new(),
1103 pda_cache_hits: 0,
1104 pda_cache_misses: 0,
1105 pending_queue_size: 0,
1106 resolver_requests: VecDeque::new(),
1107 resolver_pending: HashMap::new(),
1108 resolver_cache: LruCache::new(resolver_cache_capacity()),
1109 resolver_cache_hits: 0,
1110 resolver_cache_misses: 0,
1111 current_context: None,
1112 warnings: Vec::new(),
1113 last_pda_lookup_miss: None,
1114 last_lookup_index_miss: None,
1115 last_pda_registered: None,
1116 last_lookup_index_keys: Vec::new(),
1117 pending_pda_reprocess_updates: Vec::new(),
1118 scheduled_callbacks: Vec::new(),
1119 };
1120 vm.states.insert(
1121 0,
1122 StateTable {
1123 data: DashMap::new(),
1124 access_times: DashMap::new(),
1125 lookup_indexes: HashMap::new(),
1126 temporal_indexes: HashMap::new(),
1127 pda_reverse_lookups: HashMap::new(),
1128 pending_updates: DashMap::new(),
1129 pending_instruction_events: DashMap::new(),
1130 last_account_data: DashMap::new(),
1131 version_tracker: VersionTracker::new(),
1132 instruction_dedup_cache: VersionTracker::with_capacity(
1133 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1134 ),
1135 config: state_config,
1136 entity_name: "default".to_string(),
1137 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1138 NonZeroUsize::new(1000).unwrap(),
1139 )),
1140 deferred_when_ops: DashMap::new(),
1141 },
1142 );
1143 vm
1144 }
1145
1146 pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1147 self.resolver_requests.drain(..).collect()
1148 }
1149
1150 pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1151 std::mem::take(&mut self.scheduled_callbacks)
1152 }
1153
1154 pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1155 self.states.get(&state_id)?.get_and_touch(key)
1156 }
1157
1158 pub fn snapshot_state_table(&self, state_id: u32) -> Vec<(Value, Value)> {
1159 self.states
1160 .get(&state_id)
1161 .map(|state| {
1162 state
1163 .data
1164 .iter()
1165 .map(|entry| (entry.key().clone(), entry.value().clone()))
1166 .collect()
1167 })
1168 .unwrap_or_default()
1169 }
1170
1171 pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1172 if requests.is_empty() {
1173 return;
1174 }
1175
1176 self.resolver_requests.extend(requests);
1177 }
1178
1179 pub(crate) fn get_cached_resolver_value(&mut self, cache_key: &str) -> Option<Value> {
1180 let cached = self.resolver_cache.get(cache_key).cloned();
1181
1182 match cached {
1183 Some(entry) if entry.cached_at.elapsed() <= resolver_cache_ttl() => {
1184 self.resolver_cache_hits += 1;
1185 Some(entry.value)
1186 }
1187 Some(_) => {
1188 self.resolver_cache.pop(cache_key);
1189 self.resolver_cache_misses += 1;
1190 None
1191 }
1192 None => {
1193 self.resolver_cache_misses += 1;
1194 None
1195 }
1196 }
1197 }
1198
1199 fn cache_resolver_value(
1200 &mut self,
1201 resolver: &ResolverType,
1202 input: &Value,
1203 resolved_value: &Value,
1204 ) {
1205 self.resolver_cache.put(
1206 resolver_cache_key(resolver, input),
1207 ResolverCacheEntry {
1208 value: resolved_value.clone(),
1209 cached_at: Instant::now(),
1210 },
1211 );
1212 }
1213
1214 pub fn apply_resolver_result(
1215 &mut self,
1216 bytecode: &MultiEntityBytecode,
1217 cache_key: &str,
1218 resolved_value: Value,
1219 ) -> Result<Vec<Mutation>> {
1220 let entry = match self.resolver_pending.remove(cache_key) {
1221 Some(entry) => entry,
1222 None => return Ok(Vec::new()),
1223 };
1224
1225 self.cache_resolver_value(&entry.resolver, &entry.input, &resolved_value);
1226
1227 let mut mutations = Vec::new();
1228
1229 for target in entry.targets {
1230 if target.primary_key.is_null() {
1231 continue;
1232 }
1233
1234 let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1235 Some(bc) => bc,
1236 None => continue,
1237 };
1238
1239 let state = match self.states.get(&target.state_id) {
1240 Some(s) => s,
1241 None => continue,
1242 };
1243
1244 let mut entity_state = state
1245 .get_and_touch(&target.primary_key)
1246 .unwrap_or_else(|| json!({}));
1247
1248 let mut dirty_tracker = DirtyTracker::new();
1249 let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1250
1251 Self::apply_resolver_extractions_to_value(
1252 &mut entity_state,
1253 &resolved_value,
1254 &target.extracts,
1255 &mut dirty_tracker,
1256 &should_emit,
1257 )?;
1258
1259 if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1260 let old_values: Vec<_> = entity_bytecode
1261 .computed_paths
1262 .iter()
1263 .map(|path| Self::get_value_at_path(&entity_state, path))
1264 .collect();
1265
1266 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1267 let context_timestamp = self
1268 .current_context
1269 .as_ref()
1270 .map(|c| c.timestamp())
1271 .unwrap_or_else(|| {
1272 std::time::SystemTime::now()
1273 .duration_since(std::time::UNIX_EPOCH)
1274 .unwrap()
1275 .as_secs() as i64
1276 });
1277 let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1278
1279 if eval_result.is_ok() {
1280 let mut changed_fields = Vec::new();
1281 for (path, old_value) in
1282 entity_bytecode.computed_paths.iter().zip(old_values.iter())
1283 {
1284 let new_value = Self::get_value_at_path(&entity_state, path);
1285 let changed = new_value != *old_value;
1286 let will_emit = should_emit(path);
1287
1288 if changed && will_emit {
1289 dirty_tracker.mark_replaced(path);
1290 changed_fields.push(path.clone());
1291 }
1292 }
1293 }
1294 }
1295
1296 state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1297
1298 if dirty_tracker.is_empty() {
1299 continue;
1300 }
1301
1302 let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1303
1304 mutations.push(Mutation {
1305 export: target.entity_name.clone(),
1306 key: target.primary_key.clone(),
1307 patch,
1308 append: vec![],
1309 });
1310 }
1311
1312 Ok(mutations)
1313 }
1314
1315 pub fn enqueue_resolver_request(
1316 &mut self,
1317 _cache_key: String,
1318 resolver: ResolverType,
1319 input: Value,
1320 target: ResolverTarget,
1321 ) {
1322 let cache_key = resolver_cache_key(&resolver, &input);
1323
1324 if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1325 entry.add_target(target);
1326 return;
1327 }
1328
1329 let queued_at = std::time::SystemTime::now()
1330 .duration_since(std::time::UNIX_EPOCH)
1331 .unwrap()
1332 .as_secs() as i64;
1333
1334 self.resolver_pending.insert(
1335 cache_key.clone(),
1336 PendingResolverEntry {
1337 resolver: resolver.clone(),
1338 input: input.clone(),
1339 targets: vec![target],
1340 queued_at,
1341 },
1342 );
1343
1344 self.resolver_requests.push_back(ResolverRequest {
1345 cache_key,
1346 resolver,
1347 input,
1348 });
1349 }
1350
1351 fn apply_resolver_extractions_to_value<F>(
1352 state: &mut Value,
1353 resolved_value: &Value,
1354 extracts: &[ResolverExtractSpec],
1355 dirty_tracker: &mut DirtyTracker,
1356 should_emit: &F,
1357 ) -> Result<()>
1358 where
1359 F: Fn(&str) -> bool,
1360 {
1361 for extract in extracts {
1362 let resolved = match extract.source_path.as_deref() {
1363 Some(path) => Self::get_value_at_path(resolved_value, path),
1364 None => Some(resolved_value.clone()),
1365 };
1366
1367 let Some(value) = resolved else {
1368 continue;
1369 };
1370
1371 let value = if let Some(transform) = &extract.transform {
1372 Self::apply_transformation(&value, transform)?
1373 } else {
1374 value
1375 };
1376
1377 Self::set_nested_field_value(state, &extract.target_path, value)?;
1378 if should_emit(&extract.target_path) {
1379 dirty_tracker.mark_replaced(&extract.target_path);
1380 }
1381 }
1382
1383 Ok(())
1384 }
1385
1386 fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1387 if tracker.is_empty() {
1388 return Ok(json!({}));
1389 }
1390
1391 let mut partial = serde_json::Map::new();
1392
1393 for (path, change) in tracker.iter() {
1394 let segments: Vec<&str> = path.split('.').collect();
1395
1396 let value_to_insert = match change {
1397 FieldChange::Replaced => {
1398 let mut current = state;
1399 let mut found = true;
1400
1401 for segment in &segments {
1402 match current.get(*segment) {
1403 Some(v) => current = v,
1404 None => {
1405 found = false;
1406 break;
1407 }
1408 }
1409 }
1410
1411 if !found {
1412 continue;
1413 }
1414 current.clone()
1415 }
1416 FieldChange::Appended(values) => Value::Array(values.clone()),
1417 };
1418
1419 let mut target = &mut partial;
1420 for (i, segment) in segments.iter().enumerate() {
1421 if i == segments.len() - 1 {
1422 target.insert(segment.to_string(), value_to_insert.clone());
1423 } else {
1424 target
1425 .entry(segment.to_string())
1426 .or_insert_with(|| json!({}));
1427 target = target
1428 .get_mut(*segment)
1429 .and_then(|v| v.as_object_mut())
1430 .ok_or("Failed to build nested structure")?;
1431 }
1432 }
1433 }
1434
1435 Ok(Value::Object(partial))
1436 }
1437
1438 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1441 self.states.get_mut(&state_id)
1442 }
1443
1444 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1446 &mut self.registers
1447 }
1448
1449 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1451 &self.path_cache
1452 }
1453
1454 pub fn current_context(&self) -> Option<&UpdateContext> {
1456 self.current_context.as_ref()
1457 }
1458
1459 pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1461 self.current_context = context;
1462 }
1463
1464 fn add_warning(&mut self, msg: String) {
1465 self.warnings.push(msg);
1466 }
1467
1468 pub fn take_warnings(&mut self) -> Vec<String> {
1469 std::mem::take(&mut self.warnings)
1470 }
1471
1472 pub fn has_warnings(&self) -> bool {
1473 !self.warnings.is_empty()
1474 }
1475
1476 pub fn update_state_from_register(
1477 &mut self,
1478 state_id: u32,
1479 key: Value,
1480 register: Register,
1481 ) -> Result<()> {
1482 let state = self.states.get(&state_id).ok_or("State table not found")?;
1483 let value = self.registers[register].clone();
1484 state.insert_with_eviction(key, value);
1485 Ok(())
1486 }
1487
1488 fn reset_registers(&mut self) {
1489 for reg in &mut self.registers {
1490 *reg = Value::Null;
1491 }
1492 }
1493
1494 pub fn extract_partial_state(
1496 &self,
1497 state_reg: Register,
1498 dirty_fields: &HashSet<String>,
1499 ) -> Result<Value> {
1500 let full_state = &self.registers[state_reg];
1501
1502 if dirty_fields.is_empty() {
1503 return Ok(json!({}));
1504 }
1505
1506 let mut partial = serde_json::Map::new();
1507
1508 for path in dirty_fields {
1509 let segments: Vec<&str> = path.split('.').collect();
1510
1511 let mut current = full_state;
1512 let mut found = true;
1513
1514 for segment in &segments {
1515 match current.get(segment) {
1516 Some(v) => current = v,
1517 None => {
1518 found = false;
1519 break;
1520 }
1521 }
1522 }
1523
1524 if !found {
1525 continue;
1526 }
1527
1528 let mut target = &mut partial;
1529 for (i, segment) in segments.iter().enumerate() {
1530 if i == segments.len() - 1 {
1531 target.insert(segment.to_string(), current.clone());
1532 } else {
1533 target
1534 .entry(segment.to_string())
1535 .or_insert_with(|| json!({}));
1536 target = target
1537 .get_mut(*segment)
1538 .and_then(|v| v.as_object_mut())
1539 .ok_or("Failed to build nested structure")?;
1540 }
1541 }
1542 }
1543
1544 Ok(Value::Object(partial))
1545 }
1546
1547 pub fn extract_partial_state_with_tracker(
1551 &self,
1552 state_reg: Register,
1553 tracker: &DirtyTracker,
1554 ) -> Result<Value> {
1555 let full_state = &self.registers[state_reg];
1556
1557 if tracker.is_empty() {
1558 return Ok(json!({}));
1559 }
1560
1561 let mut partial = serde_json::Map::new();
1562
1563 for (path, change) in tracker.iter() {
1564 let segments: Vec<&str> = path.split('.').collect();
1565
1566 let value_to_insert = match change {
1567 FieldChange::Replaced => {
1568 let mut current = full_state;
1569 let mut found = true;
1570
1571 for segment in &segments {
1572 match current.get(*segment) {
1573 Some(v) => current = v,
1574 None => {
1575 found = false;
1576 break;
1577 }
1578 }
1579 }
1580
1581 if !found {
1582 continue;
1583 }
1584 current.clone()
1585 }
1586 FieldChange::Appended(values) => Value::Array(values.clone()),
1587 };
1588
1589 let mut target = &mut partial;
1590 for (i, segment) in segments.iter().enumerate() {
1591 if i == segments.len() - 1 {
1592 target.insert(segment.to_string(), value_to_insert.clone());
1593 } else {
1594 target
1595 .entry(segment.to_string())
1596 .or_insert_with(|| json!({}));
1597 target = target
1598 .get_mut(*segment)
1599 .and_then(|v| v.as_object_mut())
1600 .ok_or("Failed to build nested structure")?;
1601 }
1602 }
1603 }
1604
1605 Ok(Value::Object(partial))
1606 }
1607
1608 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1609 if let Some(compiled) = self.path_cache.get(path) {
1610 self.cache_hits += 1;
1611 #[cfg(feature = "otel")]
1612 crate::vm_metrics::record_path_cache_hit();
1613 return compiled.clone();
1614 }
1615 #[cfg(feature = "otel")]
1616 crate::vm_metrics::record_path_cache_miss();
1617 let compiled = CompiledPath::new(path);
1618 self.path_cache.insert(path.to_string(), compiled.clone());
1619 compiled
1620 }
1621
1622 #[cfg_attr(feature = "otel", instrument(
1624 name = "vm.process_event",
1625 skip(self, bytecode, event_value, log),
1626 level = "info",
1627 fields(
1628 event_type = %event_type,
1629 slot = context.as_ref().and_then(|c| c.slot),
1630 )
1631 ))]
1632 pub fn process_event(
1633 &mut self,
1634 bytecode: &MultiEntityBytecode,
1635 event_value: Value,
1636 event_type: &str,
1637 context: Option<&UpdateContext>,
1638 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1639 ) -> Result<Vec<Mutation>> {
1640 self.current_context = context.cloned();
1641 self.emit_debug(|| VmDebugEvent::ProcessEventStart {
1642 event_type: event_type.to_string(),
1643 context: context.map(|ctx| ctx.to_value()),
1644 });
1645
1646 let mut event_value = event_value;
1647 if let Some(ctx) = context {
1648 if let Some(obj) = event_value.as_object_mut() {
1649 obj.insert("__update_context".to_string(), ctx.to_value());
1650 }
1651 }
1652
1653 let mut all_mutations = Vec::new();
1654
1655 if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1656 if let Some(ctx) = context {
1657 if let Some(signature) = ctx.signature.clone() {
1658 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1659 for state_id in state_ids {
1660 if let Some(state) = self.states.get(&state_id) {
1661 {
1662 let mut cache = state.recent_tx_instructions.lock().unwrap();
1663 let entry =
1664 cache.get_or_insert_mut(signature.clone(), HashSet::new);
1665 entry.insert(event_type.to_string());
1666 }
1667
1668 let key = (signature.clone(), event_type.to_string());
1669 if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1670 tracing::debug!(
1671 event_type = %event_type,
1672 signature = %signature,
1673 deferred_count = deferred_ops.len(),
1674 "flushing deferred when-ops"
1675 );
1676 for op in deferred_ops {
1677 let (evaluator, computed_paths) = bytecode
1679 .entities
1680 .get(&op.entity_name)
1681 .map(|eb| {
1682 (
1683 eb.computed_fields_evaluator.as_ref(),
1684 eb.computed_paths.as_slice(),
1685 )
1686 })
1687 .unwrap_or((None, &[]));
1688
1689 match self.apply_deferred_when_op(
1690 state_id,
1691 &op,
1692 evaluator,
1693 Some(computed_paths),
1694 ) {
1695 Ok(mutations) => all_mutations.extend(mutations),
1696 Err(e) => tracing::warn!(
1697 "Failed to apply deferred when-op: {}",
1698 e
1699 ),
1700 }
1701 }
1702 }
1703 }
1704 }
1705 }
1706 }
1707 }
1708
1709 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1710 for entity_name in entity_names {
1711 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1712 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1713 self.emit_debug(|| VmDebugEvent::HandlerStart {
1714 entity_name: entity_name.clone(),
1715 event_type: event_type.to_string(),
1716 state_id: entity_bytecode.state_id,
1717 });
1718
1719 if let Some(ref mut log) = log {
1720 log.set("entity", entity_name.clone());
1721 log.inc("handlers", 1);
1722 }
1723
1724 let opcodes_before = self.instructions_executed;
1725 let cache_before = self.cache_hits;
1726 let pda_hits_before = self.pda_cache_hits;
1727 let pda_misses_before = self.pda_cache_misses;
1728
1729 let mutations = self.execute_handler(
1730 handler,
1731 &event_value,
1732 event_type,
1733 entity_bytecode.state_id,
1734 entity_name,
1735 entity_bytecode.computed_fields_evaluator.as_ref(),
1736 Some(&entity_bytecode.non_emitted_fields),
1737 )?;
1738 let direct_mutation_count = mutations.len();
1739
1740 if let Some(ref mut log) = log {
1741 log.inc(
1742 "opcodes",
1743 (self.instructions_executed - opcodes_before) as i64,
1744 );
1745 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1746 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1747 log.inc(
1748 "pda_misses",
1749 (self.pda_cache_misses - pda_misses_before) as i64,
1750 );
1751 }
1752
1753 if mutations.is_empty() {
1754 let is_tx_event =
1757 event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1758 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1759 if is_tx_event {
1760 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1761 let signature = context
1762 .and_then(|c| c.signature.clone())
1763 .unwrap_or_default();
1764 let _ = self.queue_instruction_event(
1765 entity_bytecode.state_id,
1766 QueuedInstructionEvent {
1767 pda_address: missed_pda.clone(),
1768 event_type: event_type.to_string(),
1769 event_data: event_value.clone(),
1770 slot,
1771 signature,
1772 },
1773 );
1774 self.emit_debug(|| VmDebugEvent::QueueAction {
1775 entity_name: entity_name.clone(),
1776 event_type: event_type.to_string(),
1777 queue_kind: "instruction_pda_lookup_miss".to_string(),
1778 lookup_value: missed_pda,
1779 });
1780 } else {
1781 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1786 let signature = context
1787 .and_then(|c| c.signature.clone())
1788 .unwrap_or_default();
1789 if let Some(write_version) =
1790 context.and_then(|c| c.write_version)
1791 {
1792 let _ = self.queue_account_update(
1793 entity_bytecode.state_id,
1794 QueuedAccountUpdate {
1795 pda_address: missed_pda.clone(),
1796 account_type: event_type.to_string(),
1797 account_data: event_value.clone(),
1798 slot,
1799 write_version,
1800 signature,
1801 },
1802 );
1803 self.emit_debug(|| VmDebugEvent::QueueAction {
1804 entity_name: entity_name.clone(),
1805 event_type: event_type.to_string(),
1806 queue_kind: "account_pda_lookup_miss".to_string(),
1807 lookup_value: missed_pda,
1808 });
1809 } else {
1810 tracing::warn!(
1811 event_type = %event_type,
1812 "Dropping queued account update: write_version missing from context"
1813 );
1814 }
1815 }
1816 }
1817 if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1818 if !is_tx_event {
1819 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1820 let signature = context
1821 .and_then(|c| c.signature.clone())
1822 .unwrap_or_default();
1823 if let Some(write_version) =
1824 context.and_then(|c| c.write_version)
1825 {
1826 let _ = self.queue_account_update(
1827 entity_bytecode.state_id,
1828 QueuedAccountUpdate {
1829 pda_address: missed_lookup.clone(),
1830 account_type: event_type.to_string(),
1831 account_data: event_value.clone(),
1832 slot,
1833 write_version,
1834 signature,
1835 },
1836 );
1837 self.emit_debug(|| VmDebugEvent::QueueAction {
1838 entity_name: entity_name.clone(),
1839 event_type: event_type.to_string(),
1840 queue_kind: "account_lookup_index_miss".to_string(),
1841 lookup_value: missed_lookup,
1842 });
1843 } else {
1844 tracing::trace!(
1845 event_type = %event_type,
1846 "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1847 );
1848 }
1849 }
1850 }
1851 }
1852
1853 all_mutations.extend(mutations);
1854
1855 if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1856 if let Some(ctx) = context {
1857 if let Some(ref signature) = ctx.signature {
1858 if let Some(state) = self.states.get(&entity_bytecode.state_id)
1859 {
1860 {
1861 let mut cache =
1862 state.recent_tx_instructions.lock().unwrap();
1863 let entry = cache
1864 .get_or_insert_mut(signature.clone(), HashSet::new);
1865 entry.insert(event_type.to_string());
1866 }
1867
1868 let key = (signature.clone(), event_type.to_string());
1869 if let Some((_, deferred_ops)) =
1870 state.deferred_when_ops.remove(&key)
1871 {
1872 tracing::debug!(
1873 event_type = %event_type,
1874 signature = %signature,
1875 deferred_count = deferred_ops.len(),
1876 "flushing deferred when-ops"
1877 );
1878 for op in deferred_ops {
1879 match self.apply_deferred_when_op(
1880 entity_bytecode.state_id,
1881 &op,
1882 entity_bytecode
1883 .computed_fields_evaluator
1884 .as_ref(),
1885 Some(&entity_bytecode.computed_paths),
1886 ) {
1887 Ok(mutations) => {
1888 all_mutations.extend(mutations)
1889 }
1890 Err(e) => {
1891 tracing::warn!(
1892 "Failed to apply deferred when-op: {}",
1893 e
1894 );
1895 }
1896 }
1897 }
1898 }
1899 }
1900 }
1901 }
1902 }
1903
1904 if let Some(registered_pda) = self.take_last_pda_registered() {
1905 let pending_events = self.flush_pending_instruction_events(
1906 entity_bytecode.state_id,
1907 ®istered_pda,
1908 );
1909 let pending_count = pending_events.len();
1910 if pending_count > 0 {
1911 self.emit_debug(|| VmDebugEvent::FlushAction {
1912 entity_name: entity_name.clone(),
1913 event_type: event_type.to_string(),
1914 flush_kind: "pending_instruction_events".to_string(),
1915 trigger: registered_pda.clone(),
1916 count: pending_count,
1917 });
1918 }
1919 for pending in pending_events {
1920 if let Some(pending_handler) =
1921 entity_bytecode.handlers.get(&pending.event_type)
1922 {
1923 if let Ok(reprocessed_mutations) = self.execute_handler(
1924 pending_handler,
1925 &pending.event_data,
1926 &pending.event_type,
1927 entity_bytecode.state_id,
1928 entity_name,
1929 entity_bytecode.computed_fields_evaluator.as_ref(),
1930 Some(&entity_bytecode.non_emitted_fields),
1931 ) {
1932 all_mutations.extend(reprocessed_mutations);
1933 }
1934 }
1935 }
1936 }
1937
1938 for pending in self.take_pending_pda_reprocess_updates() {
1939 if let Some(pending_handler) =
1940 entity_bytecode.handlers.get(&pending.account_type)
1941 {
1942 self.current_context = Some(if pending.is_stale_reprocess {
1943 UpdateContext::new_reprocessed(
1944 pending.slot,
1945 pending.write_version,
1946 )
1947 } else {
1948 UpdateContext::new_account(
1949 pending.slot,
1950 pending.signature.clone(),
1951 pending.write_version,
1952 )
1953 });
1954 match self.execute_handler(
1955 pending_handler,
1956 &pending.account_data,
1957 &pending.account_type,
1958 entity_bytecode.state_id,
1959 entity_name,
1960 entity_bytecode.computed_fields_evaluator.as_ref(),
1961 Some(&entity_bytecode.non_emitted_fields),
1962 ) {
1963 Ok(reprocessed) => {
1964 all_mutations.extend(reprocessed);
1965 }
1966 Err(e) => {
1967 tracing::warn!(
1968 error = %e,
1969 account_type = %pending.account_type,
1970 "PDA remap reprocessing failed"
1971 );
1972 }
1973 }
1974 }
1975 }
1976
1977 let lookup_keys = self.take_last_lookup_index_keys();
1978 if !lookup_keys.is_empty() {
1979 tracing::info!(
1980 keys = ?lookup_keys,
1981 entity = %entity_name,
1982 "vm.process_event: flushing pending updates for lookup_keys"
1983 );
1984 }
1985 for lookup_key in lookup_keys {
1986 if let Ok(pending_updates) =
1987 self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1988 {
1989 let pending_count = pending_updates.len();
1990 if pending_count > 0 {
1991 self.emit_debug(|| VmDebugEvent::FlushAction {
1992 entity_name: entity_name.clone(),
1993 event_type: event_type.to_string(),
1994 flush_kind: "pending_account_updates".to_string(),
1995 trigger: lookup_key.clone(),
1996 count: pending_count,
1997 });
1998 }
1999 for pending in pending_updates {
2000 if let Some(pending_handler) =
2001 entity_bytecode.handlers.get(&pending.account_type)
2002 {
2003 self.current_context = Some(UpdateContext::new_account(
2004 pending.slot,
2005 pending.signature.clone(),
2006 pending.write_version,
2007 ));
2008 match self.execute_handler(
2009 pending_handler,
2010 &pending.account_data,
2011 &pending.account_type,
2012 entity_bytecode.state_id,
2013 entity_name,
2014 entity_bytecode.computed_fields_evaluator.as_ref(),
2015 Some(&entity_bytecode.non_emitted_fields),
2016 ) {
2017 Ok(reprocessed) => {
2018 all_mutations.extend(reprocessed);
2019 }
2020 Err(e) => {
2021 tracing::warn!(
2022 error = %e,
2023 account_type = %pending.account_type,
2024 "Flushed event reprocessing failed"
2025 );
2026 }
2027 }
2028 }
2029 }
2030 }
2031 }
2032
2033 self.emit_debug(|| VmDebugEvent::HandlerEnd {
2034 entity_name: entity_name.clone(),
2035 event_type: event_type.to_string(),
2036 mutations: direct_mutation_count,
2037 });
2038 } else if let Some(ref mut log) = log {
2039 log.set("skip_reason", "no_handler");
2040 }
2041 } else if let Some(ref mut log) = log {
2042 log.set("skip_reason", "entity_not_found");
2043 }
2044 }
2045 } else if let Some(ref mut log) = log {
2046 log.set("skip_reason", "no_event_routing");
2047 }
2048
2049 let debug_warnings = self.warnings.clone();
2050
2051 if let Some(log) = log {
2052 log.set("mutations", all_mutations.len() as i64);
2053 if let Some(first) = all_mutations.first() {
2054 if let Some(key_str) = first.key.as_str() {
2055 log.set("primary_key", key_str);
2056 } else if let Some(key_num) = first.key.as_u64() {
2057 log.set("primary_key", key_num as i64);
2058 }
2059 }
2060 if let Some(state) = self.states.get(&0) {
2061 log.set("state_table_size", state.data.len() as i64);
2062 }
2063
2064 let warnings = self.take_warnings();
2065 if !warnings.is_empty() {
2066 log.set("warnings", warnings.len() as i64);
2067 log.set(
2068 "warning_messages",
2069 Value::Array(warnings.into_iter().map(Value::String).collect()),
2070 );
2071 log.set_level(crate::canonical_log::LogLevel::Warn);
2072 }
2073 } else {
2074 self.warnings.clear();
2075 }
2076
2077 self.emit_debug(|| VmDebugEvent::ProcessEventEnd {
2078 event_type: event_type.to_string(),
2079 mutations: all_mutations.len(),
2080 warnings: debug_warnings,
2081 });
2082
2083 if self.instructions_executed.is_multiple_of(1000) {
2084 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
2085 for state_id in state_ids {
2086 let expired = self.cleanup_expired_when_ops(state_id, 60);
2087 if expired > 0 {
2088 tracing::debug!(
2089 "Cleaned up {} expired deferred when-ops for state {}",
2090 expired,
2091 state_id
2092 );
2093 }
2094 }
2095 }
2096
2097 Ok(all_mutations)
2098 }
2099
2100 pub fn process_any(
2101 &mut self,
2102 bytecode: &MultiEntityBytecode,
2103 any: prost_types::Any,
2104 ) -> Result<Vec<Mutation>> {
2105 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
2106 self.process_event(bytecode, event_value, &event_type, None, None)
2107 }
2108
2109 #[cfg_attr(feature = "otel", instrument(
2110 name = "vm.execute_handler",
2111 skip(self, handler, event_value, entity_evaluator),
2112 level = "debug",
2113 fields(
2114 event_type = %event_type,
2115 handler_opcodes = handler.len(),
2116 )
2117 ))]
2118 #[allow(clippy::type_complexity, clippy::too_many_arguments)]
2119 fn execute_handler(
2120 &mut self,
2121 handler: &[OpCode],
2122 event_value: &Value,
2123 event_type: &str,
2124 override_state_id: u32,
2125 entity_name: &str,
2126 entity_evaluator: Option<
2127 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
2128 >,
2129 non_emitted_fields: Option<&HashSet<String>>,
2130 ) -> Result<Vec<Mutation>> {
2131 self.reset_registers();
2132 self.last_pda_lookup_miss = None;
2133
2134 let mut pc: usize = 0;
2135 let mut output = Vec::new();
2136 let mut dirty_tracker = DirtyTracker::new();
2137 let should_emit = |path: &str| {
2138 non_emitted_fields
2139 .map(|fields| !fields.contains(path))
2140 .unwrap_or(true)
2141 };
2142
2143 while pc < handler.len() {
2144 match &handler[pc] {
2145 OpCode::LoadEventField {
2146 path,
2147 dest,
2148 default,
2149 } => {
2150 let value = self.load_field(event_value, path, default.as_ref())?;
2151 if self.is_debug_enabled() {
2152 self.emit_debug(|| VmDebugEvent::LoadEventField {
2153 entity_name: entity_name.to_string(),
2154 event_type: event_type.to_string(),
2155 path: path.segments.clone(),
2156 value: value.clone(),
2157 });
2158 }
2159 self.registers[*dest] = value;
2160 pc += 1;
2161 }
2162 OpCode::LoadConstant { value, dest } => {
2163 self.registers[*dest] = value.clone();
2164 pc += 1;
2165 }
2166 OpCode::CopyRegister { source, dest } => {
2167 self.registers[*dest] = self.registers[*source].clone();
2168 pc += 1;
2169 }
2170 OpCode::CopyRegisterIfNull { source, dest } => {
2171 if self.registers[*dest].is_null() {
2172 self.registers[*dest] = self.registers[*source].clone();
2173 }
2174 pc += 1;
2175 }
2176 OpCode::GetEventType { dest } => {
2177 self.registers[*dest] = json!(event_type);
2178 pc += 1;
2179 }
2180 OpCode::CreateObject { dest } => {
2181 self.registers[*dest] = json!({});
2182 pc += 1;
2183 }
2184 OpCode::SetField {
2185 object,
2186 path,
2187 value,
2188 } => {
2189 let old_value = self
2190 .is_debug_enabled()
2191 .then(|| Self::get_value_at_path(&self.registers[*object], path))
2192 .flatten();
2193 self.set_field_auto_vivify(*object, path, *value)?;
2194 if should_emit(path) {
2195 dirty_tracker.mark_replaced(path);
2196 }
2197 if self.is_debug_enabled() {
2198 self.emit_debug(|| VmDebugEvent::FieldWrite {
2199 entity_name: entity_name.to_string(),
2200 event_type: event_type.to_string(),
2201 op: "set_field".to_string(),
2202 path: path.clone(),
2203 old_value,
2204 new_value: Self::get_value_at_path(&self.registers[*object], path),
2205 applied: true,
2206 reason: None,
2207 });
2208 }
2209 pc += 1;
2210 }
2211 OpCode::SetFields { object, fields } => {
2212 for (path, value_reg) in fields {
2213 let old_value = self
2214 .is_debug_enabled()
2215 .then(|| Self::get_value_at_path(&self.registers[*object], path))
2216 .flatten();
2217 self.set_field_auto_vivify(*object, path, *value_reg)?;
2218 if should_emit(path) {
2219 dirty_tracker.mark_replaced(path);
2220 }
2221 if self.is_debug_enabled() {
2222 self.emit_debug(|| VmDebugEvent::FieldWrite {
2223 entity_name: entity_name.to_string(),
2224 event_type: event_type.to_string(),
2225 op: "set_fields".to_string(),
2226 path: path.clone(),
2227 old_value,
2228 new_value: Self::get_value_at_path(&self.registers[*object], path),
2229 applied: true,
2230 reason: None,
2231 });
2232 }
2233 }
2234 pc += 1;
2235 }
2236 OpCode::GetField { object, path, dest } => {
2237 let value = self.get_field(*object, path)?;
2238 self.registers[*dest] = value;
2239 pc += 1;
2240 }
2241 OpCode::AbortIfNullKey {
2242 key,
2243 is_account_event,
2244 } => {
2245 let key_value = &self.registers[*key];
2246 if key_value.is_null() && *is_account_event {
2247 self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2248 entity_name: entity_name.to_string(),
2249 event_type: event_type.to_string(),
2250 key: key_value.clone(),
2251 existing_state: None,
2252 loaded_state: Value::Null,
2253 skipped_reason: Some("null_primary_key".to_string()),
2254 });
2255 tracing::debug!(
2256 event_type = %event_type,
2257 "AbortIfNullKey: key is null for account state event, \
2258 returning empty mutations for queueing"
2259 );
2260 return Ok(Vec::new());
2261 }
2262
2263 pc += 1;
2264 }
2265 OpCode::ReadOrInitState {
2266 state_id: _,
2267 key,
2268 default,
2269 dest,
2270 } => {
2271 let actual_state_id = override_state_id;
2272 let entity_name_owned = entity_name.to_string();
2273 self.states
2274 .entry(actual_state_id)
2275 .or_insert_with(|| StateTable {
2276 data: DashMap::new(),
2277 access_times: DashMap::new(),
2278 lookup_indexes: HashMap::new(),
2279 temporal_indexes: HashMap::new(),
2280 pda_reverse_lookups: HashMap::new(),
2281 pending_updates: DashMap::new(),
2282 pending_instruction_events: DashMap::new(),
2283 last_account_data: DashMap::new(),
2284 version_tracker: VersionTracker::new(),
2285 instruction_dedup_cache: VersionTracker::with_capacity(
2286 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
2287 ),
2288 config: StateTableConfig::default(),
2289 entity_name: entity_name_owned,
2290 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
2291 NonZeroUsize::new(1000).unwrap(),
2292 )),
2293 deferred_when_ops: DashMap::new(),
2294 });
2295 let key_value = self.registers[*key].clone();
2296 let warn_null_key = key_value.is_null()
2298 && event_type.ends_with("State")
2299 && !event_type.ends_with("IxState")
2300 && !event_type.ends_with("CpiEvent");
2301
2302 if warn_null_key {
2303 self.add_warning(format!(
2304 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
2305 key, event_type
2306 ));
2307 }
2308
2309 let state = self
2310 .states
2311 .get(&actual_state_id)
2312 .ok_or("State table not found")?;
2313
2314 if !key_value.is_null() {
2315 if let Some(ctx) = &self.current_context {
2316 if ctx.is_account_update() {
2321 if let (Some(slot), Some(write_version)) =
2322 (ctx.slot, ctx.write_version)
2323 {
2324 let account_address = event_value
2326 .get("__account_address")
2327 .cloned()
2328 .unwrap_or_else(|| key_value.clone());
2329
2330 if !state.is_fresh_update(
2331 &account_address,
2332 event_type,
2333 slot,
2334 write_version,
2335 ) {
2336 self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2337 entity_name: entity_name.to_string(),
2338 event_type: event_type.to_string(),
2339 key: key_value.clone(),
2340 existing_state: None,
2341 loaded_state: Value::Null,
2342 skipped_reason: Some(
2343 "stale_account_update".to_string(),
2344 ),
2345 });
2346 self.add_warning(format!(
2347 "Stale account update skipped: slot={}, write_version={}, account={}",
2348 slot, write_version, account_address
2349 ));
2350 return Ok(Vec::new());
2351 }
2352 }
2353 }
2354 else if ctx.is_instruction_update() {
2356 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
2357 if state.is_duplicate_instruction(
2358 &key_value, event_type, slot, txn_index,
2359 ) {
2360 self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2361 entity_name: entity_name.to_string(),
2362 event_type: event_type.to_string(),
2363 key: key_value.clone(),
2364 existing_state: None,
2365 loaded_state: Value::Null,
2366 skipped_reason: Some(
2367 "duplicate_instruction".to_string(),
2368 ),
2369 });
2370 self.add_warning(format!(
2371 "Duplicate instruction skipped: slot={}, txn_index={}",
2372 slot, txn_index
2373 ));
2374 return Ok(Vec::new());
2375 }
2376 }
2377 }
2378 }
2379 }
2380 let existing_state = state.get_and_touch(&key_value);
2381 let value = existing_state.clone().unwrap_or_else(|| default.clone());
2382
2383 self.emit_debug(|| VmDebugEvent::ReadOrInitState {
2384 entity_name: entity_name.to_string(),
2385 event_type: event_type.to_string(),
2386 key: key_value,
2387 existing_state,
2388 loaded_state: value.clone(),
2389 skipped_reason: None,
2390 });
2391
2392 self.registers[*dest] = value;
2393 pc += 1;
2394 }
2395 OpCode::UpdateState {
2396 state_id: _,
2397 key,
2398 value,
2399 } => {
2400 let actual_state_id = override_state_id;
2401 let state = self
2402 .states
2403 .get(&actual_state_id)
2404 .ok_or("State table not found")?;
2405 let key_value = self.registers[*key].clone();
2406 let value_data = self.registers[*value].clone();
2407
2408 state.insert_with_eviction(key_value, value_data);
2409 pc += 1;
2410 }
2411 OpCode::AppendToArray {
2412 object,
2413 path,
2414 value,
2415 } => {
2416 let appended_value = self.registers[*value].clone();
2417 let max_len = self
2418 .states
2419 .get(&override_state_id)
2420 .map(|s| s.max_array_length())
2421 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2422 self.append_to_array(*object, path, *value, max_len)?;
2423 if should_emit(path) {
2424 dirty_tracker.mark_appended(path, appended_value);
2425 }
2426 pc += 1;
2427 }
2428 OpCode::GetCurrentTimestamp { dest } => {
2429 let timestamp = std::time::SystemTime::now()
2430 .duration_since(std::time::UNIX_EPOCH)
2431 .unwrap()
2432 .as_secs() as i64;
2433 self.registers[*dest] = json!(timestamp);
2434 pc += 1;
2435 }
2436 OpCode::CreateEvent { dest, event_value } => {
2437 let timestamp = std::time::SystemTime::now()
2438 .duration_since(std::time::UNIX_EPOCH)
2439 .unwrap()
2440 .as_secs() as i64;
2441
2442 let mut event_data = self.registers[*event_value].clone();
2444 if let Some(obj) = event_data.as_object_mut() {
2445 obj.remove("__update_context");
2446 }
2447
2448 let mut event = serde_json::Map::new();
2450 event.insert("timestamp".to_string(), json!(timestamp));
2451 event.insert("data".to_string(), event_data);
2452
2453 if let Some(ref ctx) = self.current_context {
2455 if let Some(slot) = ctx.slot {
2456 event.insert("slot".to_string(), json!(slot));
2457 }
2458 if let Some(ref signature) = ctx.signature {
2459 event.insert("signature".to_string(), json!(signature));
2460 }
2461 }
2462
2463 self.registers[*dest] = Value::Object(event);
2464 pc += 1;
2465 }
2466 OpCode::CreateCapture {
2467 dest,
2468 capture_value,
2469 } => {
2470 let timestamp = std::time::SystemTime::now()
2471 .duration_since(std::time::UNIX_EPOCH)
2472 .unwrap()
2473 .as_secs() as i64;
2474
2475 let capture_data = self.registers[*capture_value].clone();
2477
2478 let account_address = event_value
2480 .get("__account_address")
2481 .and_then(|v| v.as_str())
2482 .unwrap_or("")
2483 .to_string();
2484
2485 let mut capture = serde_json::Map::new();
2487 capture.insert("timestamp".to_string(), json!(timestamp));
2488 capture.insert("account_address".to_string(), json!(account_address));
2489 capture.insert("data".to_string(), capture_data);
2490
2491 if let Some(ref ctx) = self.current_context {
2493 if let Some(slot) = ctx.slot {
2494 capture.insert("slot".to_string(), json!(slot));
2495 }
2496 if let Some(ref signature) = ctx.signature {
2497 capture.insert("signature".to_string(), json!(signature));
2498 }
2499 }
2500
2501 self.registers[*dest] = Value::Object(capture);
2502 pc += 1;
2503 }
2504 OpCode::Transform {
2505 source,
2506 dest,
2507 transformation,
2508 } => {
2509 if source == dest {
2510 self.transform_in_place(*source, transformation)?;
2511 } else {
2512 let source_value = &self.registers[*source];
2513 let value = Self::apply_transformation(source_value, transformation)?;
2514 self.registers[*dest] = value;
2515 }
2516 pc += 1;
2517 }
2518 OpCode::EmitMutation {
2519 entity_name,
2520 key,
2521 state,
2522 } => {
2523 let primary_key = self.registers[*key].clone();
2524 let dirty_fields: Vec<String> =
2525 dirty_tracker.dirty_paths().into_iter().collect();
2526
2527 if primary_key.is_null() || dirty_tracker.is_empty() {
2528 let reason = if dirty_tracker.is_empty() {
2529 "no_fields_modified"
2530 } else {
2531 "null_primary_key"
2532 };
2533 self.emit_debug(|| VmDebugEvent::EmitMutation {
2534 entity_name: entity_name.clone(),
2535 event_type: event_type.to_string(),
2536 key: primary_key.clone(),
2537 emitted: false,
2538 reason: Some(reason.to_string()),
2539 patch: None,
2540 dirty_fields,
2541 });
2542 self.add_warning(format!(
2543 "Skipping mutation for entity '{}': {} (dirty_fields={})",
2544 entity_name,
2545 reason,
2546 dirty_tracker.len()
2547 ));
2548 } else {
2549 let patch =
2550 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2551
2552 let append = dirty_tracker.appended_paths();
2553 let mutation = Mutation {
2554 export: entity_name.clone(),
2555 key: primary_key,
2556 patch,
2557 append,
2558 };
2559 self.emit_debug(|| VmDebugEvent::EmitMutation {
2560 entity_name: entity_name.clone(),
2561 event_type: event_type.to_string(),
2562 key: mutation.key.clone(),
2563 emitted: true,
2564 reason: None,
2565 patch: Some(mutation.patch.clone()),
2566 dirty_fields,
2567 });
2568 output.push(mutation);
2569 }
2570 pc += 1;
2571 }
2572 OpCode::SetFieldIfNull {
2573 object,
2574 path,
2575 value,
2576 } => {
2577 let old_value = self
2578 .is_debug_enabled()
2579 .then(|| Self::get_value_at_path(&self.registers[*object], path))
2580 .flatten();
2581 let was_set = self.set_field_if_null(*object, path, *value)?;
2582 if was_set && should_emit(path) {
2583 dirty_tracker.mark_replaced(path);
2584 }
2585 if self.is_debug_enabled() {
2586 self.emit_debug(|| VmDebugEvent::FieldWrite {
2587 entity_name: entity_name.to_string(),
2588 event_type: event_type.to_string(),
2589 op: "set_field_if_null".to_string(),
2590 path: path.clone(),
2591 old_value,
2592 new_value: Self::get_value_at_path(&self.registers[*object], path),
2593 applied: was_set,
2594 reason: (!was_set).then_some("already_set".to_string()),
2595 });
2596 }
2597 pc += 1;
2598 }
2599 OpCode::SetFieldMax {
2600 object,
2601 path,
2602 value,
2603 } => {
2604 let was_updated = self.set_field_max(*object, path, *value)?;
2605 if was_updated && should_emit(path) {
2606 dirty_tracker.mark_replaced(path);
2607 }
2608 pc += 1;
2609 }
2610 OpCode::UpdateTemporalIndex {
2611 state_id: _,
2612 index_name,
2613 lookup_value,
2614 primary_key,
2615 timestamp,
2616 } => {
2617 let actual_state_id = override_state_id;
2618 let state = self
2619 .states
2620 .get_mut(&actual_state_id)
2621 .ok_or("State table not found")?;
2622 let index = state
2623 .temporal_indexes
2624 .entry(index_name.clone())
2625 .or_insert_with(TemporalIndex::new);
2626
2627 let lookup_val = self.registers[*lookup_value].clone();
2628 let pk_val = self.registers[*primary_key].clone();
2629 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2630 val
2631 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2632 val as i64
2633 } else {
2634 return Err(format!(
2635 "Timestamp must be a number (i64 or u64), got: {:?}",
2636 self.registers[*timestamp]
2637 )
2638 .into());
2639 };
2640
2641 index.insert(lookup_val, pk_val, ts_val);
2642 pc += 1;
2643 }
2644 OpCode::LookupTemporalIndex {
2645 state_id: _,
2646 index_name,
2647 lookup_value,
2648 timestamp,
2649 dest,
2650 } => {
2651 let actual_state_id = override_state_id;
2652 let state = self
2653 .states
2654 .get(&actual_state_id)
2655 .ok_or("State table not found")?;
2656 let lookup_val = &self.registers[*lookup_value];
2657
2658 let result = if self.registers[*timestamp].is_null() {
2659 if let Some(index) = state.temporal_indexes.get(index_name) {
2660 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2661 } else {
2662 Value::Null
2663 }
2664 } else {
2665 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2666 val
2667 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2668 val as i64
2669 } else {
2670 return Err(format!(
2671 "Timestamp must be a number (i64 or u64), got: {:?}",
2672 self.registers[*timestamp]
2673 )
2674 .into());
2675 };
2676
2677 if let Some(index) = state.temporal_indexes.get(index_name) {
2678 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2679 } else {
2680 Value::Null
2681 }
2682 };
2683
2684 self.registers[*dest] = result;
2685 pc += 1;
2686 }
2687 OpCode::UpdateLookupIndex {
2688 state_id: _,
2689 index_name,
2690 lookup_value,
2691 primary_key,
2692 } => {
2693 let actual_state_id = override_state_id;
2694 let state = self
2695 .states
2696 .get_mut(&actual_state_id)
2697 .ok_or("State table not found")?;
2698 let index = state
2699 .lookup_indexes
2700 .entry(index_name.clone())
2701 .or_insert_with(LookupIndex::new);
2702
2703 let lookup_val = self.registers[*lookup_value].clone();
2704 let pk_val = self.registers[*primary_key].clone();
2705
2706 index.insert(lookup_val.clone(), pk_val);
2707
2708 if let Some(key_str) = lookup_val.as_str() {
2710 self.last_lookup_index_keys.push(key_str.to_string());
2711 }
2712
2713 pc += 1;
2714 }
2715 OpCode::LookupIndex {
2716 state_id: _,
2717 index_name,
2718 lookup_value,
2719 dest,
2720 } => {
2721 let actual_state_id = override_state_id;
2722 let mut current_value = self.registers[*lookup_value].clone();
2723 let original_lookup_value = current_value.clone();
2724 let mut hops = if self.is_debug_enabled() {
2725 Some(Vec::<VmLookupHop>::new())
2726 } else {
2727 None
2728 };
2729 let mut miss_kind: Option<String> = None;
2730
2731 const MAX_CHAIN_DEPTH: usize = 5;
2732 let mut iterations = 0;
2733
2734 let final_result = if self.states.contains_key(&actual_state_id) {
2735 loop {
2736 iterations += 1;
2737 if iterations > MAX_CHAIN_DEPTH {
2738 break current_value;
2739 }
2740
2741 let resolved = self
2742 .states
2743 .get(&actual_state_id)
2744 .and_then(|state| {
2745 if let Some(index) = state.lookup_indexes.get(index_name) {
2746 if let Some(found) = index.lookup(¤t_value) {
2747 return Some(found);
2748 }
2749 }
2750
2751 for (name, index) in state.lookup_indexes.iter() {
2752 if name == index_name {
2753 continue;
2754 }
2755 if let Some(found) = index.lookup(¤t_value) {
2756 return Some(found);
2757 }
2758 }
2759
2760 None
2761 })
2762 .unwrap_or(Value::Null);
2763
2764 let lookup_result = resolved.clone();
2765 if let Some(hops) = hops.as_mut() {
2766 hops.push(VmLookupHop {
2767 source: "lookup_index".to_string(),
2768 input: current_value.clone(),
2769 result: lookup_result.clone(),
2770 chained: false,
2771 });
2772 }
2773
2774 let mut resolved_from_pda = false;
2775 let resolved = if resolved.is_null() {
2776 if let Some(pda_str) = current_value.as_str() {
2777 resolved_from_pda = true;
2778 let pda_result = self
2779 .states
2780 .get_mut(&actual_state_id)
2781 .and_then(|state_mut| {
2782 state_mut
2783 .pda_reverse_lookups
2784 .get_mut("default_pda_lookup")
2785 })
2786 .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2787 .map(Value::String)
2788 .unwrap_or(Value::Null);
2789 if let Some(hops) = hops.as_mut() {
2790 hops.push(VmLookupHop {
2791 source: "pda_reverse_lookup".to_string(),
2792 input: current_value.clone(),
2793 result: pda_result.clone(),
2794 chained: false,
2795 });
2796 }
2797 pda_result
2798 } else {
2799 Value::Null
2800 }
2801 } else {
2802 resolved
2803 };
2804
2805 if resolved.is_null() {
2806 if iterations == 1 {
2807 if let Some(pda_str) = current_value.as_str() {
2808 self.last_pda_lookup_miss = Some(pda_str.to_string());
2809 miss_kind = Some("pda_lookup_miss".to_string());
2810 } else {
2811 miss_kind = Some("lookup_index_miss".to_string());
2812 }
2813 }
2814 break Value::Null;
2815 }
2816
2817 let can_chain =
2818 self.can_resolve_further(&resolved, actual_state_id, index_name);
2819
2820 if !can_chain {
2821 if resolved_from_pda {
2822 if let Some(resolved_str) = resolved.as_str() {
2823 self.last_lookup_index_miss =
2824 Some(resolved_str.to_string());
2825 }
2826 miss_kind = Some("lookup_chain_incomplete".to_string());
2827 break Value::Null;
2828 }
2829 break resolved;
2830 }
2831
2832 if let Some(hops) = hops.as_mut() {
2833 if let Some(last) = hops.last_mut() {
2834 last.chained = true;
2835 }
2836 }
2837
2838 current_value = resolved;
2839 }
2840 } else {
2841 miss_kind = Some("state_not_initialized".to_string());
2842 Value::Null
2843 };
2844
2845 self.emit_debug(|| VmDebugEvent::LookupIndex {
2846 entity_name: entity_name.to_string(),
2847 event_type: event_type.to_string(),
2848 index_name: index_name.clone(),
2849 lookup_value: original_lookup_value,
2850 hops: hops.unwrap_or_default(),
2851 final_result: final_result.clone(),
2852 miss_kind,
2853 });
2854
2855 self.registers[*dest] = final_result;
2856 pc += 1;
2857 }
2858 OpCode::SetFieldSum {
2859 object,
2860 path,
2861 value,
2862 } => {
2863 let was_updated = self.set_field_sum(*object, path, *value)?;
2864 if was_updated && should_emit(path) {
2865 dirty_tracker.mark_replaced(path);
2866 }
2867 pc += 1;
2868 }
2869 OpCode::SetFieldIncrement { object, path } => {
2870 let was_updated = self.set_field_increment(*object, path)?;
2871 if was_updated && should_emit(path) {
2872 dirty_tracker.mark_replaced(path);
2873 }
2874 pc += 1;
2875 }
2876 OpCode::SetFieldMin {
2877 object,
2878 path,
2879 value,
2880 } => {
2881 let was_updated = self.set_field_min(*object, path, *value)?;
2882 if was_updated && should_emit(path) {
2883 dirty_tracker.mark_replaced(path);
2884 }
2885 pc += 1;
2886 }
2887 OpCode::AddToUniqueSet {
2888 state_id: _,
2889 set_name,
2890 value,
2891 count_object,
2892 count_path,
2893 } => {
2894 let value_to_add = self.registers[*value].clone();
2895
2896 let set_field_path = format!("__unique_set:{}", set_name);
2899
2900 let mut set: HashSet<Value> =
2902 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2903 if !existing.is_null() {
2904 serde_json::from_value(existing).unwrap_or_default()
2905 } else {
2906 HashSet::new()
2907 }
2908 } else {
2909 HashSet::new()
2910 };
2911
2912 let was_new = set.insert(value_to_add);
2914
2915 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2917 self.registers[100] = serde_json::to_value(set_as_vec)?;
2918 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2919
2920 if was_new {
2922 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2923 self.set_field_auto_vivify(*count_object, count_path, 100)?;
2924 if should_emit(count_path) {
2925 dirty_tracker.mark_replaced(count_path);
2926 }
2927 }
2928
2929 pc += 1;
2930 }
2931 OpCode::ConditionalSetField {
2932 object,
2933 path,
2934 value,
2935 condition_field,
2936 condition_op,
2937 condition_value,
2938 } => {
2939 let field_value = self.load_field(event_value, condition_field, None)?;
2940 let condition_met =
2941 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2942
2943 if condition_met {
2944 self.set_field_auto_vivify(*object, path, *value)?;
2945 if should_emit(path) {
2946 dirty_tracker.mark_replaced(path);
2947 }
2948 }
2949 pc += 1;
2950 }
2951 OpCode::SetFieldWhen {
2952 object,
2953 path,
2954 value,
2955 when_instruction,
2956 entity_name,
2957 key_reg,
2958 condition_field,
2959 condition_op,
2960 condition_value,
2961 } => {
2962 let actual_state_id = override_state_id;
2963 let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2964 condition_field.as_ref(),
2965 condition_op.as_ref(),
2966 condition_value.as_ref(),
2967 ) {
2968 let field_value = self.load_field(event_value, field, None)?;
2969 self.evaluate_comparison(&field_value, op, cond_value)?
2970 } else {
2971 true
2972 };
2973
2974 if !condition_met {
2975 pc += 1;
2976 continue;
2977 }
2978
2979 let signature = self
2980 .current_context
2981 .as_ref()
2982 .and_then(|c| c.signature.clone())
2983 .unwrap_or_default();
2984
2985 let emit = should_emit(path);
2986
2987 let instruction_seen = if !signature.is_empty() {
2988 if let Some(state) = self.states.get(&actual_state_id) {
2989 let mut cache = state.recent_tx_instructions.lock().unwrap();
2990 cache
2991 .get(&signature)
2992 .map(|set| set.contains(when_instruction))
2993 .unwrap_or(false)
2994 } else {
2995 false
2996 }
2997 } else {
2998 false
2999 };
3000
3001 if instruction_seen {
3002 self.set_field_auto_vivify(*object, path, *value)?;
3003 if emit {
3004 dirty_tracker.mark_replaced(path);
3005 }
3006 } else if !signature.is_empty() {
3007 let deferred = DeferredWhenOperation {
3008 entity_name: entity_name.clone(),
3009 primary_key: self.registers[*key_reg].clone(),
3010 field_path: path.clone(),
3011 field_value: self.registers[*value].clone(),
3012 when_instruction: when_instruction.clone(),
3013 signature: signature.clone(),
3014 slot: self
3015 .current_context
3016 .as_ref()
3017 .and_then(|c| c.slot)
3018 .unwrap_or(0),
3019 deferred_at: std::time::SystemTime::now()
3020 .duration_since(std::time::UNIX_EPOCH)
3021 .unwrap()
3022 .as_secs() as i64,
3023 emit,
3024 };
3025
3026 if let Some(state) = self.states.get(&actual_state_id) {
3027 let key = (signature, when_instruction.clone());
3028 state
3029 .deferred_when_ops
3030 .entry(key)
3031 .or_insert_with(Vec::new)
3032 .push(deferred);
3033 }
3034 }
3035
3036 pc += 1;
3037 }
3038 OpCode::SetFieldUnlessStopped {
3039 object,
3040 path,
3041 value,
3042 stop_field,
3043 stop_instruction,
3044 entity_name,
3045 key_reg: _,
3046 } => {
3047 let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
3048 let stopped = stop_value.as_bool().unwrap_or(false);
3049 let old_value = self
3050 .is_debug_enabled()
3051 .then(|| Self::get_value_at_path(&self.registers[*object], path))
3052 .flatten();
3053
3054 if stopped {
3055 self.emit_debug(|| VmDebugEvent::FieldWrite {
3056 entity_name: entity_name.clone(),
3057 event_type: event_type.to_string(),
3058 op: "set_field_unless_stopped".to_string(),
3059 path: path.clone(),
3060 old_value,
3061 new_value: Self::get_value_at_path(&self.registers[*object], path),
3062 applied: false,
3063 reason: Some(format!("stop_flag:{}", stop_instruction)),
3064 });
3065 tracing::debug!(
3066 entity = %entity_name,
3067 field = %path,
3068 stop_field = %stop_field,
3069 stop_instruction = %stop_instruction,
3070 "stop flag set; skipping field update"
3071 );
3072 pc += 1;
3073 continue;
3074 }
3075
3076 self.set_field_auto_vivify(*object, path, *value)?;
3077 if should_emit(path) {
3078 dirty_tracker.mark_replaced(path);
3079 }
3080 if self.is_debug_enabled() {
3081 self.emit_debug(|| VmDebugEvent::FieldWrite {
3082 entity_name: entity_name.clone(),
3083 event_type: event_type.to_string(),
3084 op: "set_field_unless_stopped".to_string(),
3085 path: path.clone(),
3086 old_value,
3087 new_value: Self::get_value_at_path(&self.registers[*object], path),
3088 applied: true,
3089 reason: None,
3090 });
3091 }
3092 pc += 1;
3093 }
3094 OpCode::ConditionalIncrement {
3095 object,
3096 path,
3097 condition_field,
3098 condition_op,
3099 condition_value,
3100 } => {
3101 let field_value = self.load_field(event_value, condition_field, None)?;
3102 let condition_met =
3103 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
3104
3105 if condition_met {
3106 let was_updated = self.set_field_increment(*object, path)?;
3107 if was_updated && should_emit(path) {
3108 dirty_tracker.mark_replaced(path);
3109 }
3110 }
3111 pc += 1;
3112 }
3113 OpCode::EvaluateComputedFields {
3114 state,
3115 computed_paths,
3116 } => {
3117 if let Some(evaluator) = entity_evaluator {
3118 let old_values: Vec<_> = computed_paths
3119 .iter()
3120 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
3121 .collect();
3122
3123 let state_value = &mut self.registers[*state];
3124 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3125 let context_timestamp = self
3126 .current_context
3127 .as_ref()
3128 .map(|c| c.timestamp())
3129 .unwrap_or_else(|| {
3130 std::time::SystemTime::now()
3131 .duration_since(std::time::UNIX_EPOCH)
3132 .unwrap()
3133 .as_secs() as i64
3134 });
3135 let eval_result = evaluator(state_value, context_slot, context_timestamp);
3136
3137 if eval_result.is_ok() {
3138 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
3139 let new_value =
3140 Self::get_value_at_path(&self.registers[*state], path);
3141
3142 if new_value != *old_value && should_emit(path) {
3143 dirty_tracker.mark_replaced(path);
3144 }
3145 }
3146 }
3147 }
3148 pc += 1;
3149 }
3150 OpCode::QueueResolver {
3151 state_id: _,
3152 entity_name,
3153 resolver,
3154 input_path,
3155 input_value,
3156 url_template,
3157 strategy,
3158 extracts,
3159 condition,
3160 schedule_at,
3161 state,
3162 key,
3163 } => {
3164 let actual_state_id = override_state_id;
3165
3166 if self
3170 .current_context
3171 .as_ref()
3172 .map(|c| c.skip_resolvers)
3173 .unwrap_or(false)
3174 {
3175 pc += 1;
3176 continue;
3177 }
3178
3179 if let Some(cond) = condition {
3181 let field_val =
3182 Self::get_value_at_path(&self.registers[*state], &cond.field_path)
3183 .unwrap_or(Value::Null);
3184 let condition_met =
3185 self.evaluate_comparison(&field_val, &cond.op, &cond.value)?;
3186
3187 if !condition_met {
3188 pc += 1;
3189 continue;
3190 }
3191 }
3192
3193 if let Some(schedule_path) = schedule_at {
3195 let target_val =
3196 Self::get_value_at_path(&self.registers[*state], schedule_path);
3197
3198 match target_val.and_then(|v| v.as_u64()) {
3199 Some(target_slot) => {
3200 let current_slot = self
3201 .current_context
3202 .as_ref()
3203 .and_then(|ctx| ctx.slot)
3204 .unwrap_or(0);
3205 if current_slot < target_slot {
3206 let key_value = &self.registers[*key];
3207 if !key_value.is_null() {
3208 self.scheduled_callbacks.push((
3209 target_slot,
3210 ScheduledCallback {
3211 state_id: actual_state_id,
3212 entity_name: entity_name.clone(),
3213 primary_key: key_value.clone(),
3214 resolver: resolver.clone(),
3215 url_template: url_template.clone(),
3216 input_value: input_value.clone(),
3217 input_path: input_path.clone(),
3218 condition: condition.clone(),
3219 strategy: strategy.clone(),
3220 extracts: extracts.clone(),
3221 retry_count: 0,
3222 },
3223 ));
3224 }
3225 pc += 1;
3226 continue;
3227 }
3228 }
3230 None => {
3231 pc += 1;
3235 continue;
3236 }
3237 }
3238 }
3239
3240 let resolved_input = if let Some(template) = url_template {
3242 crate::scheduler::build_url_from_template(template, &self.registers[*state])
3243 .map(Value::String)
3244 } else if let Some(value) = input_value {
3245 Some(value.clone())
3246 } else if let Some(path) = input_path.as_ref() {
3247 Self::get_value_at_path(&self.registers[*state], path)
3248 } else {
3249 None
3250 };
3251
3252 if let Some(input) = resolved_input {
3253 let key_value = &self.registers[*key];
3254
3255 if input.is_null() || key_value.is_null() {
3256 pc += 1;
3257 continue;
3258 }
3259
3260 if matches!(strategy, ResolveStrategy::SetOnce)
3261 && extracts.iter().all(|extract| {
3265 match Self::get_value_at_path(
3266 &self.registers[*state],
3267 &extract.target_path,
3268 ) {
3269 Some(value) => !value.is_null(),
3270 None => false,
3271 }
3272 })
3273 {
3274 pc += 1;
3275 continue;
3276 }
3277
3278 let cache_key = resolver_cache_key(resolver, &input);
3279
3280 if let Some(cached) = self.get_cached_resolver_value(&cache_key) {
3281 Self::apply_resolver_extractions_to_value(
3282 &mut self.registers[*state],
3283 &cached,
3284 extracts,
3285 &mut dirty_tracker,
3286 &should_emit,
3287 )?;
3288 } else {
3289 let target = ResolverTarget {
3290 state_id: actual_state_id,
3291 entity_name: entity_name.clone(),
3292 primary_key: self.registers[*key].clone(),
3293 extracts: extracts.clone(),
3294 };
3295
3296 self.enqueue_resolver_request(
3297 cache_key,
3298 resolver.clone(),
3299 input,
3300 target,
3301 );
3302 }
3303 }
3304
3305 pc += 1;
3306 }
3307 OpCode::UpdatePdaReverseLookup {
3308 state_id: _,
3309 lookup_name,
3310 pda_address,
3311 primary_key,
3312 } => {
3313 let actual_state_id = override_state_id;
3314 let pda_val = self.registers[*pda_address].clone();
3315 let pk_val = self.registers[*primary_key].clone();
3316
3317 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
3318 let pending = self.update_pda_reverse_lookup(
3319 actual_state_id,
3320 lookup_name,
3321 pda_str.to_string(),
3322 pk_str.to_string(),
3323 )?;
3324 self.pending_pda_reprocess_updates.extend(pending);
3325 self.last_pda_registered = Some(pda_str.to_string());
3326 self.emit_debug(|| VmDebugEvent::PdaReverseLookupUpdate {
3327 entity_name: entity_name.to_string(),
3328 event_type: event_type.to_string(),
3329 lookup_name: lookup_name.clone(),
3330 pda_address: pda_str.to_string(),
3331 primary_key: pk_val.clone(),
3332 });
3333 } else if !pk_val.is_null() {
3334 if let Some(pk_num) = pk_val.as_u64() {
3335 if let Some(pda_str) = pda_val.as_str() {
3336 let pending = self.update_pda_reverse_lookup(
3337 actual_state_id,
3338 lookup_name,
3339 pda_str.to_string(),
3340 pk_num.to_string(),
3341 )?;
3342 self.pending_pda_reprocess_updates.extend(pending);
3343 self.last_pda_registered = Some(pda_str.to_string());
3344 self.emit_debug(|| VmDebugEvent::PdaReverseLookupUpdate {
3345 entity_name: entity_name.to_string(),
3346 event_type: event_type.to_string(),
3347 lookup_name: lookup_name.clone(),
3348 pda_address: pda_str.to_string(),
3349 primary_key: pk_val.clone(),
3350 });
3351 }
3352 }
3353 }
3354
3355 pc += 1;
3356 }
3357 }
3358
3359 self.instructions_executed += 1;
3360 }
3361
3362 Ok(output)
3363 }
3364
3365 fn load_field(
3366 &self,
3367 event_value: &Value,
3368 path: &FieldPath,
3369 default: Option<&Value>,
3370 ) -> Result<Value> {
3371 if path.segments.is_empty() {
3372 if let Some(obj) = event_value.as_object() {
3373 let filtered: serde_json::Map<String, Value> = obj
3374 .iter()
3375 .filter(|(k, _)| !k.starts_with("__"))
3376 .map(|(k, v)| (k.clone(), v.clone()))
3377 .collect();
3378 return Ok(Value::Object(filtered));
3379 }
3380 return Ok(event_value.clone());
3381 }
3382
3383 let mut current = event_value;
3384 for segment in path.segments.iter() {
3385 current = match current.get(segment) {
3386 Some(v) => v,
3387 None => {
3388 tracing::trace!(
3389 "load_field: segment={:?} not found in {:?}, returning default",
3390 segment,
3391 current
3392 );
3393 return Ok(default.cloned().unwrap_or(Value::Null));
3394 }
3395 };
3396 }
3397
3398 tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
3399 Ok(current.clone())
3400 }
3401
3402 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
3403 let mut current = value;
3404 for segment in path.split('.') {
3405 current = current.get(segment)?;
3406 }
3407 Some(current.clone())
3408 }
3409
3410 fn set_field_auto_vivify(
3411 &mut self,
3412 object_reg: Register,
3413 path: &str,
3414 value_reg: Register,
3415 ) -> Result<()> {
3416 let compiled = self.get_compiled_path(path);
3417 let segments = compiled.segments();
3418 let value = self.registers[value_reg].clone();
3419
3420 if !self.registers[object_reg].is_object() {
3421 self.registers[object_reg] = json!({});
3422 }
3423
3424 let obj = self.registers[object_reg]
3425 .as_object_mut()
3426 .ok_or("Not an object")?;
3427
3428 let mut current = obj;
3429 for (i, segment) in segments.iter().enumerate() {
3430 if i == segments.len() - 1 {
3431 current.insert(segment.to_string(), value);
3432 return Ok(());
3433 } else {
3434 current
3435 .entry(segment.to_string())
3436 .or_insert_with(|| json!({}));
3437 current = current
3438 .get_mut(segment)
3439 .and_then(|v| v.as_object_mut())
3440 .ok_or("Path collision: expected object")?;
3441 }
3442 }
3443
3444 Ok(())
3445 }
3446
3447 fn set_field_if_null(
3448 &mut self,
3449 object_reg: Register,
3450 path: &str,
3451 value_reg: Register,
3452 ) -> Result<bool> {
3453 let compiled = self.get_compiled_path(path);
3454 let segments = compiled.segments();
3455 let value = self.registers[value_reg].clone();
3456
3457 if value.is_null() {
3461 return Ok(false);
3462 }
3463
3464 if !self.registers[object_reg].is_object() {
3465 self.registers[object_reg] = json!({});
3466 }
3467
3468 let obj = self.registers[object_reg]
3469 .as_object_mut()
3470 .ok_or("Not an object")?;
3471
3472 let mut current = obj;
3473 for (i, segment) in segments.iter().enumerate() {
3474 if i == segments.len() - 1 {
3475 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
3476 current.insert(segment.to_string(), value);
3477 return Ok(true);
3478 }
3479 return Ok(false);
3480 } else {
3481 current
3482 .entry(segment.to_string())
3483 .or_insert_with(|| json!({}));
3484 current = current
3485 .get_mut(segment)
3486 .and_then(|v| v.as_object_mut())
3487 .ok_or("Path collision: expected object")?;
3488 }
3489 }
3490
3491 Ok(false)
3492 }
3493
3494 fn set_field_max(
3495 &mut self,
3496 object_reg: Register,
3497 path: &str,
3498 value_reg: Register,
3499 ) -> Result<bool> {
3500 let compiled = self.get_compiled_path(path);
3501 let segments = compiled.segments();
3502 let new_value = self.registers[value_reg].clone();
3503
3504 if !self.registers[object_reg].is_object() {
3505 self.registers[object_reg] = json!({});
3506 }
3507
3508 let obj = self.registers[object_reg]
3509 .as_object_mut()
3510 .ok_or("Not an object")?;
3511
3512 let mut current = obj;
3513 for (i, segment) in segments.iter().enumerate() {
3514 if i == segments.len() - 1 {
3515 let should_update = if let Some(current_value) = current.get(segment) {
3516 if current_value.is_null() {
3517 true
3518 } else {
3519 match (current_value.as_i64(), new_value.as_i64()) {
3520 (Some(current_val), Some(new_val)) => new_val > current_val,
3521 (Some(current_val), None) if new_value.as_u64().is_some() => {
3522 new_value.as_u64().unwrap() as i64 > current_val
3523 }
3524 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3525 new_val > current_value.as_u64().unwrap() as i64
3526 }
3527 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3528 (Some(current_val), Some(new_val)) => new_val > current_val,
3529 _ => match (current_value.as_f64(), new_value.as_f64()) {
3530 (Some(current_val), Some(new_val)) => new_val > current_val,
3531 _ => false,
3532 },
3533 },
3534 _ => false,
3535 }
3536 }
3537 } else {
3538 true
3539 };
3540
3541 if should_update {
3542 current.insert(segment.to_string(), new_value);
3543 return Ok(true);
3544 }
3545 return Ok(false);
3546 } else {
3547 current
3548 .entry(segment.to_string())
3549 .or_insert_with(|| json!({}));
3550 current = current
3551 .get_mut(segment)
3552 .and_then(|v| v.as_object_mut())
3553 .ok_or("Path collision: expected object")?;
3554 }
3555 }
3556
3557 Ok(false)
3558 }
3559
3560 fn set_field_sum(
3561 &mut self,
3562 object_reg: Register,
3563 path: &str,
3564 value_reg: Register,
3565 ) -> Result<bool> {
3566 let compiled = self.get_compiled_path(path);
3567 let segments = compiled.segments();
3568 let new_value = &self.registers[value_reg];
3569
3570 tracing::trace!(
3572 "set_field_sum: path={:?}, value={:?}, value_type={}",
3573 path,
3574 new_value,
3575 match new_value {
3576 serde_json::Value::Null => "null",
3577 serde_json::Value::Bool(_) => "bool",
3578 serde_json::Value::Number(_) => "number",
3579 serde_json::Value::String(_) => "string",
3580 serde_json::Value::Array(_) => "array",
3581 serde_json::Value::Object(_) => "object",
3582 }
3583 );
3584 let new_val_num = new_value
3585 .as_i64()
3586 .or_else(|| new_value.as_u64().map(|n| n as i64))
3587 .ok_or("Sum requires numeric value")?;
3588
3589 if !self.registers[object_reg].is_object() {
3590 self.registers[object_reg] = json!({});
3591 }
3592
3593 let obj = self.registers[object_reg]
3594 .as_object_mut()
3595 .ok_or("Not an object")?;
3596
3597 let mut current = obj;
3598 for (i, segment) in segments.iter().enumerate() {
3599 if i == segments.len() - 1 {
3600 let current_val = current
3601 .get(segment)
3602 .and_then(|v| {
3603 if v.is_null() {
3604 None
3605 } else {
3606 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3607 }
3608 })
3609 .unwrap_or(0);
3610
3611 let sum = current_val + new_val_num;
3612 current.insert(segment.to_string(), json!(sum));
3613 return Ok(true);
3614 } else {
3615 current
3616 .entry(segment.to_string())
3617 .or_insert_with(|| json!({}));
3618 current = current
3619 .get_mut(segment)
3620 .and_then(|v| v.as_object_mut())
3621 .ok_or("Path collision: expected object")?;
3622 }
3623 }
3624
3625 Ok(false)
3626 }
3627
3628 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3629 let compiled = self.get_compiled_path(path);
3630 let segments = compiled.segments();
3631
3632 if !self.registers[object_reg].is_object() {
3633 self.registers[object_reg] = json!({});
3634 }
3635
3636 let obj = self.registers[object_reg]
3637 .as_object_mut()
3638 .ok_or("Not an object")?;
3639
3640 let mut current = obj;
3641 for (i, segment) in segments.iter().enumerate() {
3642 if i == segments.len() - 1 {
3643 let current_val = current
3645 .get(segment)
3646 .and_then(|v| {
3647 if v.is_null() {
3648 None
3649 } else {
3650 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3651 }
3652 })
3653 .unwrap_or(0);
3654
3655 let incremented = current_val + 1;
3656 current.insert(segment.to_string(), json!(incremented));
3657 return Ok(true);
3658 } else {
3659 current
3660 .entry(segment.to_string())
3661 .or_insert_with(|| json!({}));
3662 current = current
3663 .get_mut(segment)
3664 .and_then(|v| v.as_object_mut())
3665 .ok_or("Path collision: expected object")?;
3666 }
3667 }
3668
3669 Ok(false)
3670 }
3671
3672 fn set_field_min(
3673 &mut self,
3674 object_reg: Register,
3675 path: &str,
3676 value_reg: Register,
3677 ) -> Result<bool> {
3678 let compiled = self.get_compiled_path(path);
3679 let segments = compiled.segments();
3680 let new_value = self.registers[value_reg].clone();
3681
3682 if !self.registers[object_reg].is_object() {
3683 self.registers[object_reg] = json!({});
3684 }
3685
3686 let obj = self.registers[object_reg]
3687 .as_object_mut()
3688 .ok_or("Not an object")?;
3689
3690 let mut current = obj;
3691 for (i, segment) in segments.iter().enumerate() {
3692 if i == segments.len() - 1 {
3693 let should_update = if let Some(current_value) = current.get(segment) {
3694 if current_value.is_null() {
3695 true
3696 } else {
3697 match (current_value.as_i64(), new_value.as_i64()) {
3698 (Some(current_val), Some(new_val)) => new_val < current_val,
3699 (Some(current_val), None) if new_value.as_u64().is_some() => {
3700 (new_value.as_u64().unwrap() as i64) < current_val
3701 }
3702 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3703 new_val < current_value.as_u64().unwrap() as i64
3704 }
3705 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3706 (Some(current_val), Some(new_val)) => new_val < current_val,
3707 _ => match (current_value.as_f64(), new_value.as_f64()) {
3708 (Some(current_val), Some(new_val)) => new_val < current_val,
3709 _ => false,
3710 },
3711 },
3712 _ => false,
3713 }
3714 }
3715 } else {
3716 true
3717 };
3718
3719 if should_update {
3720 current.insert(segment.to_string(), new_value);
3721 return Ok(true);
3722 }
3723 return Ok(false);
3724 } else {
3725 current
3726 .entry(segment.to_string())
3727 .or_insert_with(|| json!({}));
3728 current = current
3729 .get_mut(segment)
3730 .and_then(|v| v.as_object_mut())
3731 .ok_or("Path collision: expected object")?;
3732 }
3733 }
3734
3735 Ok(false)
3736 }
3737
3738 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3739 let compiled = self.get_compiled_path(path);
3740 let segments = compiled.segments();
3741 let mut current = &self.registers[object_reg];
3742
3743 for segment in segments {
3744 current = current
3745 .get(segment)
3746 .ok_or_else(|| format!("Field not found: {}", segment))?;
3747 }
3748
3749 Ok(current.clone())
3750 }
3751
3752 fn append_to_array(
3753 &mut self,
3754 object_reg: Register,
3755 path: &str,
3756 value_reg: Register,
3757 max_length: usize,
3758 ) -> Result<()> {
3759 let compiled = self.get_compiled_path(path);
3760 let segments = compiled.segments();
3761 let value = self.registers[value_reg].clone();
3762
3763 if !self.registers[object_reg].is_object() {
3764 self.registers[object_reg] = json!({});
3765 }
3766
3767 let obj = self.registers[object_reg]
3768 .as_object_mut()
3769 .ok_or("Not an object")?;
3770
3771 let mut current = obj;
3772 for (i, segment) in segments.iter().enumerate() {
3773 if i == segments.len() - 1 {
3774 current
3775 .entry(segment.to_string())
3776 .or_insert_with(|| json!([]));
3777 let arr = current
3778 .get_mut(segment)
3779 .and_then(|v| v.as_array_mut())
3780 .ok_or("Path is not an array")?;
3781 arr.push(value.clone());
3782
3783 if arr.len() > max_length {
3784 let excess = arr.len() - max_length;
3785 arr.drain(0..excess);
3786 }
3787 } else {
3788 current
3789 .entry(segment.to_string())
3790 .or_insert_with(|| json!({}));
3791 current = current
3792 .get_mut(segment)
3793 .and_then(|v| v.as_object_mut())
3794 .ok_or("Path collision: expected object")?;
3795 }
3796 }
3797
3798 Ok(())
3799 }
3800
3801 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3802 let value = &self.registers[reg];
3803 let transformed = Self::apply_transformation(value, transformation)?;
3804 self.registers[reg] = transformed;
3805 Ok(())
3806 }
3807
3808 fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3809 match transformation {
3810 Transformation::HexEncode => {
3811 if let Some(arr) = value.as_array() {
3812 let bytes: Vec<u8> = arr
3813 .iter()
3814 .filter_map(|v| v.as_u64().map(|n| n as u8))
3815 .collect();
3816 let hex = hex::encode(&bytes);
3817 Ok(json!(hex))
3818 } else if value.is_string() {
3819 Ok(value.clone())
3820 } else {
3821 Err("HexEncode requires an array of numbers".into())
3822 }
3823 }
3824 Transformation::HexDecode => {
3825 if let Some(s) = value.as_str() {
3826 let s = s.strip_prefix("0x").unwrap_or(s);
3827 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3828 Ok(json!(bytes))
3829 } else {
3830 Err("HexDecode requires a string".into())
3831 }
3832 }
3833 Transformation::Base58Encode => {
3834 if let Some(arr) = value.as_array() {
3835 let bytes: Vec<u8> = arr
3836 .iter()
3837 .filter_map(|v| v.as_u64().map(|n| n as u8))
3838 .collect();
3839 let encoded = bs58::encode(&bytes).into_string();
3840 Ok(json!(encoded))
3841 } else if value.is_string() {
3842 Ok(value.clone())
3843 } else {
3844 Err("Base58Encode requires an array of numbers".into())
3845 }
3846 }
3847 Transformation::Base58Decode => {
3848 if let Some(s) = value.as_str() {
3849 let bytes = bs58::decode(s)
3850 .into_vec()
3851 .map_err(|e| format!("Base58 decode error: {}", e))?;
3852 Ok(json!(bytes))
3853 } else {
3854 Err("Base58Decode requires a string".into())
3855 }
3856 }
3857 Transformation::ToString => Ok(json!(value.to_string())),
3858 Transformation::ToNumber => {
3859 if let Some(s) = value.as_str() {
3860 let n = s
3861 .parse::<i64>()
3862 .map_err(|e| format!("Parse error: {}", e))?;
3863 Ok(json!(n))
3864 } else {
3865 Ok(value.clone())
3866 }
3867 }
3868 }
3869 }
3870
3871 fn evaluate_comparison(
3872 &self,
3873 field_value: &Value,
3874 op: &ComparisonOp,
3875 condition_value: &Value,
3876 ) -> Result<bool> {
3877 use ComparisonOp::*;
3878
3879 match op {
3880 Equal => Ok(field_value == condition_value),
3881 NotEqual => Ok(field_value != condition_value),
3882 GreaterThan => {
3883 match (field_value.as_i64(), condition_value.as_i64()) {
3885 (Some(a), Some(b)) => Ok(a > b),
3886 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3887 (Some(a), Some(b)) => Ok(a > b),
3888 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3889 (Some(a), Some(b)) => Ok(a > b),
3890 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3891 },
3892 },
3893 }
3894 }
3895 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3896 (Some(a), Some(b)) => Ok(a >= b),
3897 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3898 (Some(a), Some(b)) => Ok(a >= b),
3899 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3900 (Some(a), Some(b)) => Ok(a >= b),
3901 _ => {
3902 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3903 }
3904 },
3905 },
3906 },
3907 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3908 (Some(a), Some(b)) => Ok(a < b),
3909 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3910 (Some(a), Some(b)) => Ok(a < b),
3911 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3912 (Some(a), Some(b)) => Ok(a < b),
3913 _ => Err("Cannot compare non-numeric values with LessThan".into()),
3914 },
3915 },
3916 },
3917 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3918 (Some(a), Some(b)) => Ok(a <= b),
3919 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3920 (Some(a), Some(b)) => Ok(a <= b),
3921 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3922 (Some(a), Some(b)) => Ok(a <= b),
3923 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3924 },
3925 },
3926 },
3927 }
3928 }
3929
3930 fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3931 if let Some(state) = self.states.get(&state_id) {
3932 if let Some(index) = state.lookup_indexes.get(index_name) {
3933 if index.lookup(value).is_some() {
3934 return true;
3935 }
3936 }
3937
3938 for (name, index) in state.lookup_indexes.iter() {
3939 if name == index_name {
3940 continue;
3941 }
3942 if index.lookup(value).is_some() {
3943 return true;
3944 }
3945 }
3946
3947 if let Some(pda_str) = value.as_str() {
3948 if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3949 if pda_lookup.contains(pda_str) {
3950 return true;
3951 }
3952 }
3953 }
3954 }
3955
3956 false
3957 }
3958
3959 #[allow(clippy::type_complexity)]
3960 fn apply_deferred_when_op(
3961 &mut self,
3962 state_id: u32,
3963 op: &DeferredWhenOperation,
3964 entity_evaluator: Option<
3965 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
3966 >,
3967 computed_paths: Option<&[String]>,
3968 ) -> Result<Vec<Mutation>> {
3969 let state = self.states.get(&state_id).ok_or("State not found")?;
3970
3971 if op.primary_key.is_null() {
3972 return Ok(vec![]);
3973 }
3974
3975 let mut entity_state = state
3976 .get_and_touch(&op.primary_key)
3977 .unwrap_or_else(|| json!({}));
3978
3979 let old_computed_values: Vec<_> = computed_paths
3981 .map(|paths| {
3982 paths
3983 .iter()
3984 .map(|path| Self::get_value_at_path(&entity_state, path))
3985 .collect()
3986 })
3987 .unwrap_or_default();
3988
3989 Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3990
3991 if let Some(evaluator) = entity_evaluator {
3993 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3994 let context_timestamp = self
3995 .current_context
3996 .as_ref()
3997 .map(|c| c.timestamp())
3998 .unwrap_or_else(|| {
3999 std::time::SystemTime::now()
4000 .duration_since(std::time::UNIX_EPOCH)
4001 .unwrap()
4002 .as_secs() as i64
4003 });
4004
4005 tracing::debug!(
4006 entity_name = %op.entity_name,
4007 primary_key = %op.primary_key,
4008 field_path = %op.field_path,
4009 "Re-evaluating computed fields after deferred when-op"
4010 );
4011
4012 if let Err(e) = evaluator(&mut entity_state, context_slot, context_timestamp) {
4013 tracing::warn!(
4014 entity_name = %op.entity_name,
4015 primary_key = %op.primary_key,
4016 error = %e,
4017 "Failed to evaluate computed fields after deferred when-op"
4018 );
4019 }
4020 }
4021
4022 state.insert_with_eviction(op.primary_key.clone(), entity_state.clone());
4023
4024 if !op.emit {
4025 return Ok(vec![]);
4026 }
4027
4028 let mut patch = json!({});
4029 Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
4030
4031 if let Some(paths) = computed_paths {
4033 tracing::debug!(
4034 entity_name = %op.entity_name,
4035 primary_key = %op.primary_key,
4036 computed_paths_count = paths.len(),
4037 "Checking computed fields for changes after deferred when-op"
4038 );
4039 for (path, old_value) in paths.iter().zip(old_computed_values.iter()) {
4040 let new_value = Self::get_value_at_path(&entity_state, path);
4041 tracing::debug!(
4042 entity_name = %op.entity_name,
4043 primary_key = %op.primary_key,
4044 field_path = %path,
4045 old_value = ?old_value,
4046 new_value = ?new_value,
4047 "Comparing computed field values"
4048 );
4049 if let Some(ref new_val) = new_value {
4050 if Some(new_val) != old_value.as_ref() {
4051 Self::set_nested_field_value(&mut patch, path, new_val.clone())?;
4052 tracing::info!(
4053 entity_name = %op.entity_name,
4054 primary_key = %op.primary_key,
4055 field_path = %path,
4056 "Computed field changed after deferred when-op, including in mutation"
4057 );
4058 }
4059 }
4060 }
4061 }
4062
4063 Ok(vec![Mutation {
4064 export: op.entity_name.clone(),
4065 key: op.primary_key.clone(),
4066 patch,
4067 append: vec![],
4068 }])
4069 }
4070
4071 fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
4072 let parts: Vec<&str> = path.split('.').collect();
4073 let mut current = obj;
4074
4075 for (i, part) in parts.iter().enumerate() {
4076 if i == parts.len() - 1 {
4077 if let Some(map) = current.as_object_mut() {
4078 map.insert(part.to_string(), value);
4079 return Ok(());
4080 }
4081 return Err("Cannot set field on non-object".into());
4082 }
4083
4084 if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
4085 if let Some(map) = current.as_object_mut() {
4086 map.insert(part.to_string(), json!({}));
4087 }
4088 }
4089
4090 current = current.get_mut(*part).ok_or("Path navigation failed")?;
4091 }
4092
4093 Ok(())
4094 }
4095
4096 pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
4097 let now = std::time::SystemTime::now()
4098 .duration_since(std::time::UNIX_EPOCH)
4099 .unwrap()
4100 .as_secs() as i64;
4101
4102 let state = match self.states.get(&state_id) {
4103 Some(s) => s,
4104 None => return 0,
4105 };
4106
4107 let mut removed = 0;
4108 state.deferred_when_ops.retain(|_, ops| {
4109 let before = ops.len();
4110 ops.retain(|op| now - op.deferred_at < max_age_secs);
4111 removed += before - ops.len();
4112 !ops.is_empty()
4113 });
4114
4115 removed
4116 }
4117
4118 #[cfg_attr(feature = "otel", instrument(
4127 name = "vm.update_pda_lookup",
4128 skip(self),
4129 fields(
4130 pda = %pda_address,
4131 seed = %seed_value,
4132 )
4133 ))]
4134 pub fn update_pda_reverse_lookup(
4135 &mut self,
4136 state_id: u32,
4137 lookup_name: &str,
4138 pda_address: String,
4139 seed_value: String,
4140 ) -> Result<Vec<PendingAccountUpdate>> {
4141 let state = self
4142 .states
4143 .get_mut(&state_id)
4144 .ok_or("State table not found")?;
4145
4146 let lookup = state
4147 .pda_reverse_lookups
4148 .entry(lookup_name.to_string())
4149 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
4150
4151 let old_seed = lookup.index.peek(&pda_address).cloned();
4155 let mapping_changed = old_seed
4156 .as_ref()
4157 .map(|old| old != &seed_value)
4158 .unwrap_or(false);
4159
4160 if !mapping_changed && old_seed.is_none() {
4161 tracing::info!(
4162 pda = %pda_address,
4163 seed = %seed_value,
4164 "[PDA] First-time PDA reverse lookup established"
4165 );
4166 } else if !mapping_changed {
4167 tracing::debug!(
4168 pda = %pda_address,
4169 seed = %seed_value,
4170 "[PDA] PDA reverse lookup re-registered (same mapping)"
4171 );
4172 }
4173
4174 let evicted_pda = lookup.insert(pda_address.clone(), seed_value.clone());
4175
4176 if let Some(ref evicted) = evicted_pda {
4177 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
4178 let count = evicted_updates.len();
4179 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4180 }
4181 }
4182
4183 let mut pending = self.flush_pending_updates(state_id, &pda_address)?;
4185
4186 if mapping_changed {
4194 if let Some(state) = self.states.get(&state_id) {
4195 for index in state.lookup_indexes.values() {
4197 index.remove(&Value::String(pda_address.clone()));
4198 }
4199
4200 if let Some((_, mut cached)) = state.last_account_data.remove(&pda_address) {
4201 tracing::info!(
4202 pda = %pda_address,
4203 old_seed = ?old_seed,
4204 new_seed = %seed_value,
4205 account_type = %cached.account_type,
4206 "PDA mapping changed — clearing stale indexes and reprocessing cached data"
4207 );
4208 cached.is_stale_reprocess = true;
4209 pending.push(cached);
4210 }
4211 }
4212 }
4213
4214 Ok(pending)
4215 }
4216
4217 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
4222 let state = match self.states.get_mut(&state_id) {
4223 Some(s) => s,
4224 None => return 0,
4225 };
4226
4227 let now = std::time::SystemTime::now()
4228 .duration_since(std::time::UNIX_EPOCH)
4229 .unwrap()
4230 .as_secs() as i64;
4231
4232 let mut removed_count = 0;
4233
4234 state.pending_updates.retain(|_pda_address, updates| {
4236 let original_len = updates.len();
4237
4238 updates.retain(|update| {
4239 let age = now - update.queued_at;
4240 age <= PENDING_UPDATE_TTL_SECONDS
4241 });
4242
4243 removed_count += original_len - updates.len();
4244
4245 !updates.is_empty()
4247 });
4248
4249 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
4251
4252 if removed_count > 0 {
4253 #[cfg(feature = "otel")]
4254 crate::vm_metrics::record_pending_updates_expired(
4255 removed_count as u64,
4256 &state.entity_name,
4257 );
4258 }
4259
4260 removed_count
4261 }
4262
4263 #[cfg_attr(feature = "otel", instrument(
4295 name = "vm.queue_account_update",
4296 skip(self, update),
4297 fields(
4298 pda = %update.pda_address,
4299 account_type = %update.account_type,
4300 slot = update.slot,
4301 )
4302 ))]
4303 pub fn queue_account_update(
4304 &mut self,
4305 state_id: u32,
4306 update: QueuedAccountUpdate,
4307 ) -> Result<()> {
4308 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
4309 self.cleanup_expired_pending_updates(state_id);
4310 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
4311 self.drop_oldest_pending_update(state_id)?;
4312 }
4313 }
4314
4315 let state = self
4316 .states
4317 .get_mut(&state_id)
4318 .ok_or("State table not found")?;
4319
4320 let pending = PendingAccountUpdate {
4321 account_type: update.account_type,
4322 pda_address: update.pda_address.clone(),
4323 account_data: update.account_data,
4324 slot: update.slot,
4325 write_version: update.write_version,
4326 signature: update.signature,
4327 queued_at: std::time::SystemTime::now()
4328 .duration_since(std::time::UNIX_EPOCH)
4329 .unwrap()
4330 .as_secs() as i64,
4331 is_stale_reprocess: false,
4332 };
4333
4334 let pda_address = pending.pda_address.clone();
4335 let slot = pending.slot;
4336
4337 let mut updates = state
4338 .pending_updates
4339 .entry(pda_address.clone())
4340 .or_insert_with(Vec::new);
4341
4342 let original_len = updates.len();
4343 updates.retain(|existing| existing.slot > slot);
4344 let removed_by_dedup = original_len - updates.len();
4345
4346 if removed_by_dedup > 0 {
4347 self.pending_queue_size = self
4348 .pending_queue_size
4349 .saturating_sub(removed_by_dedup as u64);
4350 }
4351
4352 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
4353 updates.remove(0);
4354 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4355 }
4356
4357 updates.push(pending);
4358 #[cfg(feature = "otel")]
4359 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
4360
4361 Ok(())
4362 }
4363
4364 pub fn queue_instruction_event(
4365 &mut self,
4366 state_id: u32,
4367 event: QueuedInstructionEvent,
4368 ) -> Result<()> {
4369 let state = self
4370 .states
4371 .get_mut(&state_id)
4372 .ok_or("State table not found")?;
4373
4374 let pda_address = event.pda_address.clone();
4375
4376 let pending = PendingInstructionEvent {
4377 event_type: event.event_type,
4378 pda_address: event.pda_address,
4379 event_data: event.event_data,
4380 slot: event.slot,
4381 signature: event.signature,
4382 queued_at: std::time::SystemTime::now()
4383 .duration_since(std::time::UNIX_EPOCH)
4384 .unwrap()
4385 .as_secs() as i64,
4386 };
4387
4388 let mut events = state
4389 .pending_instruction_events
4390 .entry(pda_address)
4391 .or_insert_with(Vec::new);
4392
4393 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
4394 events.remove(0);
4395 }
4396
4397 events.push(pending);
4398
4399 Ok(())
4400 }
4401
4402 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
4403 self.last_pda_lookup_miss.take()
4404 }
4405
4406 pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
4407 self.last_lookup_index_miss.take()
4408 }
4409
4410 pub fn take_last_pda_registered(&mut self) -> Option<String> {
4411 self.last_pda_registered.take()
4412 }
4413
4414 pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
4415 std::mem::take(&mut self.last_lookup_index_keys)
4416 }
4417
4418 pub fn flush_pending_instruction_events(
4419 &mut self,
4420 state_id: u32,
4421 pda_address: &str,
4422 ) -> Vec<PendingInstructionEvent> {
4423 let state = match self.states.get_mut(&state_id) {
4424 Some(s) => s,
4425 None => return Vec::new(),
4426 };
4427
4428 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
4429 events
4430 } else {
4431 Vec::new()
4432 }
4433 }
4434
4435 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
4437 let state = self.states.get(&state_id)?;
4438
4439 let now = std::time::SystemTime::now()
4440 .duration_since(std::time::UNIX_EPOCH)
4441 .unwrap()
4442 .as_secs() as i64;
4443
4444 let mut total_updates = 0;
4445 let mut oldest_timestamp = now;
4446 let mut largest_pda_queue = 0;
4447 let mut estimated_memory = 0;
4448
4449 for entry in state.pending_updates.iter() {
4450 let (_, updates) = entry.pair();
4451 total_updates += updates.len();
4452 largest_pda_queue = largest_pda_queue.max(updates.len());
4453
4454 for update in updates.iter() {
4455 oldest_timestamp = oldest_timestamp.min(update.queued_at);
4456 estimated_memory += update.account_type.len() +
4458 update.pda_address.len() +
4459 update.signature.len() +
4460 16 + estimate_json_size(&update.account_data);
4462 }
4463 }
4464
4465 Some(PendingQueueStats {
4466 total_updates,
4467 unique_pdas: state.pending_updates.len(),
4468 oldest_age_seconds: now - oldest_timestamp,
4469 largest_pda_queue_size: largest_pda_queue,
4470 estimated_memory_bytes: estimated_memory,
4471 })
4472 }
4473
4474 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
4475 let mut stats = VmMemoryStats {
4476 path_cache_size: self.path_cache.len(),
4477 ..Default::default()
4478 };
4479
4480 if let Some(state) = self.states.get(&state_id) {
4481 stats.state_table_entity_count = state.data.len();
4482 stats.state_table_max_entries = state.config.max_entries;
4483 stats.state_table_at_capacity = state.is_at_capacity();
4484
4485 stats.lookup_index_count = state.lookup_indexes.len();
4486 stats.lookup_index_total_entries =
4487 state.lookup_indexes.values().map(|idx| idx.len()).sum();
4488
4489 stats.temporal_index_count = state.temporal_indexes.len();
4490 stats.temporal_index_total_entries = state
4491 .temporal_indexes
4492 .values()
4493 .map(|idx| idx.total_entries())
4494 .sum();
4495
4496 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
4497 stats.pda_reverse_lookup_total_entries = state
4498 .pda_reverse_lookups
4499 .values()
4500 .map(|lookup| lookup.len())
4501 .sum();
4502
4503 stats.version_tracker_entries = state.version_tracker.len();
4504
4505 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
4506 }
4507
4508 stats
4509 }
4510
4511 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
4512 let pending_removed = self.cleanup_expired_pending_updates(state_id);
4513 let temporal_removed = self.cleanup_temporal_indexes(state_id);
4514
4515 #[cfg(feature = "otel")]
4516 if let Some(state) = self.states.get(&state_id) {
4517 crate::vm_metrics::record_cleanup(
4518 pending_removed,
4519 temporal_removed,
4520 &state.entity_name,
4521 );
4522 }
4523
4524 CleanupResult {
4525 pending_updates_removed: pending_removed,
4526 temporal_entries_removed: temporal_removed,
4527 }
4528 }
4529
4530 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
4531 let state = match self.states.get_mut(&state_id) {
4532 Some(s) => s,
4533 None => return 0,
4534 };
4535
4536 let now = std::time::SystemTime::now()
4537 .duration_since(std::time::UNIX_EPOCH)
4538 .unwrap()
4539 .as_secs() as i64;
4540
4541 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
4542 let mut total_removed = 0;
4543
4544 for (_, index) in state.temporal_indexes.iter_mut() {
4545 total_removed += index.cleanup_expired(cutoff);
4546 }
4547
4548 total_removed
4549 }
4550
4551 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
4552 let state = self.states.get(&state_id)?;
4553
4554 if state.is_at_capacity() {
4555 Some(CapacityWarning {
4556 current_entries: state.data.len(),
4557 max_entries: state.config.max_entries,
4558 entries_over_limit: state.entries_over_limit(),
4559 })
4560 } else {
4561 None
4562 }
4563 }
4564
4565 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
4567 let state = self
4568 .states
4569 .get_mut(&state_id)
4570 .ok_or("State table not found")?;
4571
4572 let mut oldest_pda: Option<String> = None;
4573 let mut oldest_timestamp = i64::MAX;
4574
4575 for entry in state.pending_updates.iter() {
4577 let (pda, updates) = entry.pair();
4578 if let Some(update) = updates.first() {
4579 if update.queued_at < oldest_timestamp {
4580 oldest_timestamp = update.queued_at;
4581 oldest_pda = Some(pda.clone());
4582 }
4583 }
4584 }
4585
4586 if let Some(pda) = oldest_pda {
4588 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
4589 if !updates.is_empty() {
4590 updates.remove(0);
4591 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4592
4593 if updates.is_empty() {
4595 drop(updates);
4596 state.pending_updates.remove(&pda);
4597 }
4598 }
4599 }
4600 }
4601
4602 Ok(())
4603 }
4604
4605 fn flush_pending_updates(
4610 &mut self,
4611 state_id: u32,
4612 pda_address: &str,
4613 ) -> Result<Vec<PendingAccountUpdate>> {
4614 let state = self
4615 .states
4616 .get_mut(&state_id)
4617 .ok_or("State table not found")?;
4618
4619 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
4620 let count = pending_updates.len();
4621 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4622 #[cfg(feature = "otel")]
4623 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
4624 Ok(pending_updates)
4625 } else {
4626 Ok(Vec::new())
4627 }
4628 }
4629
4630 pub fn cache_last_account_data(
4636 &mut self,
4637 state_id: u32,
4638 pda_address: &str,
4639 update: PendingAccountUpdate,
4640 ) {
4641 if let Some(state) = self.states.get(&state_id) {
4642 state
4643 .last_account_data
4644 .insert(pda_address.to_string(), update);
4645 }
4646 }
4647
4648 pub fn try_pda_reverse_lookup(
4650 &mut self,
4651 state_id: u32,
4652 lookup_name: &str,
4653 pda_address: &str,
4654 ) -> Option<String> {
4655 let state = self.states.get_mut(&state_id)?;
4656
4657 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
4658 if let Some(value) = lookup.lookup(pda_address) {
4659 self.pda_cache_hits += 1;
4660 return Some(value);
4661 }
4662 }
4663
4664 self.pda_cache_misses += 1;
4665 None
4666 }
4667
4668 pub fn try_lookup_index_resolution(&self, state_id: u32, value: &Value) -> Option<Value> {
4671 let state = self.states.get(&state_id)?;
4672
4673 for index in state.lookup_indexes.values() {
4674 if let Some(resolved) = index.lookup(value) {
4675 return Some(resolved);
4676 }
4677 }
4678
4679 None
4680 }
4681
4682 pub fn try_chained_pda_lookup(
4687 &mut self,
4688 state_id: u32,
4689 lookup_name: &str,
4690 pda_address: &str,
4691 ) -> Option<String> {
4692 let pda_result = self.try_pda_reverse_lookup(state_id, lookup_name, pda_address)?;
4694
4695 let pda_value = Value::String(pda_result.clone());
4698 if let Some(resolved) = self.try_lookup_index_resolution(state_id, &pda_value) {
4699 resolved.as_str().map(|s| s.to_string())
4700 } else {
4701 pda_value.as_str().map(|s| s.to_string())
4703 }
4704 }
4705
4706 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4713 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4714 }
4715
4716 fn evaluate_computed_expr_with_env(
4718 &self,
4719 expr: &ComputedExpr,
4720 state: &Value,
4721 env: &std::collections::HashMap<String, Value>,
4722 ) -> Result<Value> {
4723 match expr {
4724 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4725
4726 ComputedExpr::Var { name } => env
4727 .get(name)
4728 .cloned()
4729 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4730
4731 ComputedExpr::Let { name, value, body } => {
4732 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4733 let mut new_env = env.clone();
4734 new_env.insert(name.clone(), val);
4735 self.evaluate_computed_expr_with_env(body, state, &new_env)
4736 }
4737
4738 ComputedExpr::If {
4739 condition,
4740 then_branch,
4741 else_branch,
4742 } => {
4743 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4744 if self.value_to_bool(&cond_val) {
4745 self.evaluate_computed_expr_with_env(then_branch, state, env)
4746 } else {
4747 self.evaluate_computed_expr_with_env(else_branch, state, env)
4748 }
4749 }
4750
4751 ComputedExpr::None => Ok(Value::Null),
4752
4753 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4754
4755 ComputedExpr::Slice { expr, start, end } => {
4756 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4757 match val {
4758 Value::Array(arr) => {
4759 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4760 Ok(Value::Array(slice))
4761 }
4762 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4763 }
4764 }
4765
4766 ComputedExpr::Index { expr, index } => {
4767 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4768 match val {
4769 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4770 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4771 }
4772 }
4773
4774 ComputedExpr::U64FromLeBytes { bytes } => {
4775 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4776 let byte_vec = self.value_to_bytes(&val)?;
4777 if byte_vec.len() < 8 {
4778 return Err(format!(
4779 "u64::from_le_bytes requires 8 bytes, got {}",
4780 byte_vec.len()
4781 )
4782 .into());
4783 }
4784 let arr: [u8; 8] = byte_vec[..8]
4785 .try_into()
4786 .map_err(|_| "Failed to convert to [u8; 8]")?;
4787 Ok(Value::Number(serde_json::Number::from(u64::from_le_bytes(
4788 arr,
4789 ))))
4790 }
4791
4792 ComputedExpr::U64FromBeBytes { bytes } => {
4793 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4794 let byte_vec = self.value_to_bytes(&val)?;
4795 if byte_vec.len() < 8 {
4796 return Err(format!(
4797 "u64::from_be_bytes requires 8 bytes, got {}",
4798 byte_vec.len()
4799 )
4800 .into());
4801 }
4802 let arr: [u8; 8] = byte_vec[..8]
4803 .try_into()
4804 .map_err(|_| "Failed to convert to [u8; 8]")?;
4805 Ok(Value::Number(serde_json::Number::from(u64::from_be_bytes(
4806 arr,
4807 ))))
4808 }
4809
4810 ComputedExpr::ByteArray { bytes } => {
4811 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4812 }
4813
4814 ComputedExpr::Closure { param, body } => {
4815 Ok(json!({
4818 "__closure": {
4819 "param": param,
4820 "body": serde_json::to_value(body).unwrap_or(Value::Null)
4821 }
4822 }))
4823 }
4824
4825 ComputedExpr::Unary { op, expr } => {
4826 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4827 self.apply_unary_op(op, &val)
4828 }
4829
4830 ComputedExpr::JsonToBytes { expr } => {
4831 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4832 let bytes = self.value_to_bytes(&val)?;
4834 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4835 }
4836
4837 ComputedExpr::UnwrapOr { expr, default } => {
4838 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4839 if val.is_null() {
4840 Ok(default.clone())
4841 } else {
4842 Ok(val)
4843 }
4844 }
4845
4846 ComputedExpr::Binary { op, left, right } => {
4847 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4848 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4849 self.apply_binary_op(op, &l, &r)
4850 }
4851
4852 ComputedExpr::Cast { expr, to_type } => {
4853 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4854 self.apply_cast(&val, to_type)
4855 }
4856
4857 ComputedExpr::ResolverComputed {
4858 resolver,
4859 method,
4860 args,
4861 } => {
4862 let evaluated_args: Vec<Value> = args
4863 .iter()
4864 .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4865 .collect::<Result<Vec<_>>>()?;
4866 crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4867 }
4868
4869 ComputedExpr::MethodCall { expr, method, args } => {
4870 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4871 if method == "map" && args.len() == 1 {
4873 if let ComputedExpr::Closure { param, body } = &args[0] {
4874 if val.is_null() {
4876 return Ok(Value::Null);
4877 }
4878
4879 if let Value::Array(arr) = &val {
4880 let results: Result<Vec<Value>> = arr
4881 .iter()
4882 .map(|elem| {
4883 let mut closure_env = env.clone();
4884 closure_env.insert(param.clone(), elem.clone());
4885 self.evaluate_computed_expr_with_env(body, state, &closure_env)
4886 })
4887 .collect();
4888 return Ok(Value::Array(results?));
4889 }
4890
4891 let mut closure_env = env.clone();
4892 closure_env.insert(param.clone(), val);
4893 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4894 }
4895 }
4896 let evaluated_args: Vec<Value> = args
4897 .iter()
4898 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4899 .collect::<Result<Vec<_>>>()?;
4900 self.apply_method_call(&val, method, &evaluated_args)
4901 }
4902
4903 ComputedExpr::Literal { value } => Ok(value.clone()),
4904
4905 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4906
4907 ComputedExpr::ContextSlot => Ok(self
4908 .current_context
4909 .as_ref()
4910 .and_then(|ctx| ctx.slot)
4911 .map(|s| json!(s))
4912 .unwrap_or(Value::Null)),
4913
4914 ComputedExpr::ContextTimestamp => Ok(self
4915 .current_context
4916 .as_ref()
4917 .map(|ctx| json!(ctx.timestamp()))
4918 .unwrap_or(Value::Null)),
4919
4920 ComputedExpr::Keccak256 { expr } => {
4921 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4922 let bytes = self.value_to_bytes(&val)?;
4923 use sha3::{Digest, Keccak256};
4924 let hash = Keccak256::digest(&bytes);
4925 Ok(Value::Array(
4926 hash.to_vec().iter().map(|b| json!(*b)).collect(),
4927 ))
4928 }
4929 }
4930 }
4931
4932 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4934 match val {
4935 Value::Array(arr) => arr
4936 .iter()
4937 .map(|v| {
4938 v.as_u64()
4939 .map(|n| n as u8)
4940 .ok_or_else(|| "Array element not a valid byte".into())
4941 })
4942 .collect(),
4943 Value::String(s) => {
4944 if s.starts_with("0x") || s.starts_with("0X") {
4946 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4947 } else {
4948 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4949 }
4950 }
4951 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4952 }
4953 }
4954
4955 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4957 use crate::ast::UnaryOp;
4958 match op {
4959 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4960 UnaryOp::ReverseBits => match val.as_u64() {
4961 Some(n) => Ok(json!(n.reverse_bits())),
4962 None => match val.as_i64() {
4963 Some(n) => Ok(json!((n as u64).reverse_bits())),
4964 None => Err("reverse_bits requires an integer".into()),
4965 },
4966 },
4967 }
4968 }
4969
4970 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4972 let segments: Vec<&str> = path.split('.').collect();
4973 let mut current = state;
4974
4975 for segment in segments {
4976 match current.get(segment) {
4977 Some(v) => current = v,
4978 None => return Ok(Value::Null),
4979 }
4980 }
4981
4982 Ok(current.clone())
4983 }
4984
4985 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4987 match op {
4988 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4990 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4991 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4992 BinaryOp::Div => {
4993 if let Some(r) = right.as_i64() {
4995 if r == 0 {
4996 return Err("Division by zero".into());
4997 }
4998 }
4999 if let Some(r) = right.as_f64() {
5000 if r == 0.0 {
5001 return Err("Division by zero".into());
5002 }
5003 }
5004 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
5005 }
5006 BinaryOp::Mod => {
5007 match (left.as_i64(), right.as_i64()) {
5009 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
5010 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
5011 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
5012 _ => Err("Modulo requires non-zero integer operands".into()),
5013 },
5014 _ => Err("Modulo by zero".into()),
5015 }
5016 }
5017
5018 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
5020 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
5021 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
5022 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
5023 BinaryOp::Eq => Ok(json!(left == right)),
5024 BinaryOp::Ne => Ok(json!(left != right)),
5025
5026 BinaryOp::And => {
5028 let l_bool = self.value_to_bool(left);
5029 let r_bool = self.value_to_bool(right);
5030 Ok(json!(l_bool && r_bool))
5031 }
5032 BinaryOp::Or => {
5033 let l_bool = self.value_to_bool(left);
5034 let r_bool = self.value_to_bool(right);
5035 Ok(json!(l_bool || r_bool))
5036 }
5037
5038 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
5040 (Some(a), Some(b)) => Ok(json!(a ^ b)),
5041 _ => match (left.as_i64(), right.as_i64()) {
5042 (Some(a), Some(b)) => Ok(json!(a ^ b)),
5043 _ => Err("XOR requires integer operands".into()),
5044 },
5045 },
5046 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
5047 (Some(a), Some(b)) => Ok(json!(a & b)),
5048 _ => match (left.as_i64(), right.as_i64()) {
5049 (Some(a), Some(b)) => Ok(json!(a & b)),
5050 _ => Err("BitAnd requires integer operands".into()),
5051 },
5052 },
5053 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
5054 (Some(a), Some(b)) => Ok(json!(a | b)),
5055 _ => match (left.as_i64(), right.as_i64()) {
5056 (Some(a), Some(b)) => Ok(json!(a | b)),
5057 _ => Err("BitOr requires integer operands".into()),
5058 },
5059 },
5060 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
5061 (Some(a), Some(b)) => Ok(json!(a << b)),
5062 _ => match (left.as_i64(), right.as_i64()) {
5063 (Some(a), Some(b)) => Ok(json!(a << b)),
5064 _ => Err("Shl requires integer operands".into()),
5065 },
5066 },
5067 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
5068 (Some(a), Some(b)) => Ok(json!(a >> b)),
5069 _ => match (left.as_i64(), right.as_i64()) {
5070 (Some(a), Some(b)) => Ok(json!(a >> b)),
5071 _ => Err("Shr requires integer operands".into()),
5072 },
5073 },
5074 }
5075 }
5076
5077 fn numeric_op<F1, F2>(
5079 &self,
5080 left: &Value,
5081 right: &Value,
5082 int_op: F1,
5083 float_op: F2,
5084 ) -> Result<Value>
5085 where
5086 F1: Fn(i64, i64) -> i64,
5087 F2: Fn(f64, f64) -> f64,
5088 {
5089 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
5091 return Ok(json!(int_op(a, b)));
5092 }
5093
5094 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
5096 return Ok(json!(int_op(a as i64, b as i64)));
5098 }
5099
5100 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
5102 return Ok(json!(float_op(a, b)));
5103 }
5104
5105 if left.is_null() || right.is_null() {
5107 return Ok(Value::Null);
5108 }
5109
5110 Err(format!(
5111 "Cannot perform numeric operation on {:?} and {:?}",
5112 left, right
5113 )
5114 .into())
5115 }
5116
5117 fn comparison_op<F1, F2>(
5119 &self,
5120 left: &Value,
5121 right: &Value,
5122 int_cmp: F1,
5123 float_cmp: F2,
5124 ) -> Result<Value>
5125 where
5126 F1: Fn(i64, i64) -> bool,
5127 F2: Fn(f64, f64) -> bool,
5128 {
5129 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
5131 return Ok(json!(int_cmp(a, b)));
5132 }
5133
5134 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
5136 return Ok(json!(int_cmp(a as i64, b as i64)));
5137 }
5138
5139 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
5141 return Ok(json!(float_cmp(a, b)));
5142 }
5143
5144 if left.is_null() || right.is_null() {
5146 return Ok(json!(false));
5147 }
5148
5149 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
5150 }
5151
5152 fn value_to_bool(&self, value: &Value) -> bool {
5154 match value {
5155 Value::Null => false,
5156 Value::Bool(b) => *b,
5157 Value::Number(n) => {
5158 if let Some(i) = n.as_i64() {
5159 i != 0
5160 } else if let Some(f) = n.as_f64() {
5161 f != 0.0
5162 } else {
5163 true
5164 }
5165 }
5166 Value::String(s) => !s.is_empty(),
5167 Value::Array(arr) => !arr.is_empty(),
5168 Value::Object(obj) => !obj.is_empty(),
5169 }
5170 }
5171
5172 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
5174 match to_type {
5175 "i8" | "i16" | "i32" | "i64" | "isize" => {
5176 if let Some(n) = value.as_i64() {
5177 Ok(json!(n))
5178 } else if let Some(n) = value.as_u64() {
5179 Ok(json!(n as i64))
5180 } else if let Some(n) = value.as_f64() {
5181 Ok(json!(n as i64))
5182 } else if let Some(s) = value.as_str() {
5183 s.parse::<i64>()
5184 .map(|n| json!(n))
5185 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
5186 } else {
5187 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
5188 }
5189 }
5190 "u8" | "u16" | "u32" | "u64" | "usize" => {
5191 if let Some(n) = value.as_u64() {
5192 Ok(json!(n))
5193 } else if let Some(n) = value.as_i64() {
5194 Ok(json!(n as u64))
5195 } else if let Some(n) = value.as_f64() {
5196 Ok(json!(n as u64))
5197 } else if let Some(s) = value.as_str() {
5198 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
5199 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
5200 })
5201 } else {
5202 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
5203 }
5204 }
5205 "f32" | "f64" => {
5206 if let Some(n) = value.as_f64() {
5207 Ok(json!(n))
5208 } else if let Some(n) = value.as_i64() {
5209 Ok(json!(n as f64))
5210 } else if let Some(n) = value.as_u64() {
5211 Ok(json!(n as f64))
5212 } else if let Some(s) = value.as_str() {
5213 s.parse::<f64>()
5214 .map(|n| json!(n))
5215 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
5216 } else {
5217 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
5218 }
5219 }
5220 "String" | "string" => Ok(json!(value.to_string())),
5221 "bool" => Ok(json!(self.value_to_bool(value))),
5222 _ => {
5223 Ok(value.clone())
5225 }
5226 }
5227 }
5228
5229 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
5231 match method {
5232 "unwrap_or" => {
5233 if value.is_null() && !args.is_empty() {
5234 Ok(args[0].clone())
5235 } else {
5236 Ok(value.clone())
5237 }
5238 }
5239 "unwrap_or_default" => {
5240 if value.is_null() {
5241 Ok(json!(0))
5243 } else {
5244 Ok(value.clone())
5245 }
5246 }
5247 "is_some" => Ok(json!(!value.is_null())),
5248 "is_none" => Ok(json!(value.is_null())),
5249 "abs" => {
5250 if let Some(n) = value.as_i64() {
5251 Ok(json!(n.abs()))
5252 } else if let Some(n) = value.as_f64() {
5253 Ok(json!(n.abs()))
5254 } else {
5255 Err(format!("Cannot call abs() on {:?}", value).into())
5256 }
5257 }
5258 "len" => {
5259 if let Some(s) = value.as_str() {
5260 Ok(json!(s.len()))
5261 } else if let Some(arr) = value.as_array() {
5262 Ok(json!(arr.len()))
5263 } else if let Some(obj) = value.as_object() {
5264 Ok(json!(obj.len()))
5265 } else {
5266 Err(format!("Cannot call len() on {:?}", value).into())
5267 }
5268 }
5269 "to_string" => Ok(json!(value.to_string())),
5270 "min" => {
5271 if args.is_empty() {
5272 return Err("min() requires an argument".into());
5273 }
5274 let other = &args[0];
5275 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5276 Ok(json!(a.min(b)))
5277 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
5278 Ok(json!(a.min(b)))
5279 } else {
5280 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
5281 }
5282 }
5283 "max" => {
5284 if args.is_empty() {
5285 return Err("max() requires an argument".into());
5286 }
5287 let other = &args[0];
5288 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5289 Ok(json!(a.max(b)))
5290 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
5291 Ok(json!(a.max(b)))
5292 } else {
5293 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
5294 }
5295 }
5296 "saturating_add" => {
5297 if args.is_empty() {
5298 return Err("saturating_add() requires an argument".into());
5299 }
5300 let other = &args[0];
5301 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5302 Ok(json!(a.saturating_add(b)))
5303 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
5304 Ok(json!(a.saturating_add(b)))
5305 } else {
5306 Err(format!(
5307 "Cannot call saturating_add() on {:?} and {:?}",
5308 value, other
5309 )
5310 .into())
5311 }
5312 }
5313 "saturating_sub" => {
5314 if args.is_empty() {
5315 return Err("saturating_sub() requires an argument".into());
5316 }
5317 let other = &args[0];
5318 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
5319 Ok(json!(a.saturating_sub(b)))
5320 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
5321 Ok(json!(a.saturating_sub(b)))
5322 } else {
5323 Err(format!(
5324 "Cannot call saturating_sub() on {:?} and {:?}",
5325 value, other
5326 )
5327 .into())
5328 }
5329 }
5330 _ => Err(format!("Unknown method call: {}()", method).into()),
5331 }
5332 }
5333
5334 pub fn evaluate_computed_fields_from_ast(
5337 &self,
5338 state: &mut Value,
5339 computed_field_specs: &[ComputedFieldSpec],
5340 ) -> Result<Vec<String>> {
5341 let mut updated_paths = Vec::new();
5342
5343 crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
5344
5345 for spec in computed_field_specs {
5346 match self.evaluate_computed_expr(&spec.expression, state) {
5347 Ok(result) => {
5348 self.set_field_in_state(state, &spec.target_path, result)?;
5349 updated_paths.push(spec.target_path.clone());
5350 }
5351 Err(e) => {
5352 tracing::warn!(
5353 target_path = %spec.target_path,
5354 error = %e,
5355 "Failed to evaluate computed field"
5356 );
5357 }
5358 }
5359 }
5360
5361 Ok(updated_paths)
5362 }
5363
5364 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
5366 let segments: Vec<&str> = path.split('.').collect();
5367
5368 if segments.is_empty() {
5369 return Err("Empty path".into());
5370 }
5371
5372 let mut current = state;
5374 for (i, segment) in segments.iter().enumerate() {
5375 if i == segments.len() - 1 {
5376 if let Some(obj) = current.as_object_mut() {
5378 obj.insert(segment.to_string(), value);
5379 return Ok(());
5380 } else {
5381 return Err(format!("Cannot set field '{}' on non-object", segment).into());
5382 }
5383 } else {
5384 if !current.is_object() {
5386 *current = json!({});
5387 }
5388 let obj = current.as_object_mut().unwrap();
5389 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
5390 }
5391 }
5392
5393 Ok(())
5394 }
5395
5396 pub fn create_evaluator_from_specs(
5399 specs: Vec<ComputedFieldSpec>,
5400 ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
5401 move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
5402 let mut vm = VmContext::new();
5403 vm.current_context = Some(UpdateContext {
5404 slot: context_slot,
5405 timestamp: Some(context_timestamp),
5406 ..Default::default()
5407 });
5408 vm.evaluate_computed_fields_from_ast(state, &specs)?;
5409 Ok(())
5410 }
5411 }
5412}
5413
5414impl Default for VmContext {
5415 fn default() -> Self {
5416 Self::new()
5417 }
5418}
5419
5420impl crate::resolvers::ReverseLookupUpdater for VmContext {
5422 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
5423 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
5425 .unwrap_or_else(|e| {
5426 tracing::error!("Failed to update PDA reverse lookup: {}", e);
5427 Vec::new()
5428 })
5429 }
5430
5431 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
5432 self.flush_pending_updates(0, pda_address)
5434 .unwrap_or_else(|e| {
5435 tracing::error!("Failed to flush pending updates: {}", e);
5436 Vec::new()
5437 })
5438 }
5439}
5440
5441#[cfg(test)]
5442mod tests {
5443 use super::*;
5444 use crate::ast::{
5445 BinaryOp, ComputedExpr, ComputedFieldSpec, HttpMethod, UrlResolverConfig, UrlSource,
5446 };
5447
5448 #[test]
5449 fn test_url_resolver_cache_key_uses_method_and_resolved_url() {
5450 let field_path_resolver = ResolverType::Url(UrlResolverConfig {
5451 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5452 method: HttpMethod::Get,
5453 extract_path: None,
5454 });
5455 let template_resolver = ResolverType::Url(UrlResolverConfig {
5456 url_source: UrlSource::Template(vec![ast::UrlTemplatePart::Literal(
5457 "https://example.com/metadata".to_string(),
5458 )]),
5459 method: HttpMethod::Get,
5460 extract_path: Some("data".to_string()),
5461 });
5462 let input = json!("https://cdn.example.com/token.json");
5463
5464 assert_eq!(
5465 resolver_cache_key(&field_path_resolver, &input),
5466 resolver_cache_key(&template_resolver, &input)
5467 );
5468 }
5469
5470 #[test]
5471 fn test_url_resolver_cache_key_distinguishes_http_method() {
5472 let get_resolver = ResolverType::Url(UrlResolverConfig {
5473 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5474 method: HttpMethod::Get,
5475 extract_path: None,
5476 });
5477 let post_resolver = ResolverType::Url(UrlResolverConfig {
5478 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5479 method: HttpMethod::Post,
5480 extract_path: None,
5481 });
5482 let input = json!("https://api.example.com/round");
5483
5484 assert_ne!(
5485 resolver_cache_key(&get_resolver, &input),
5486 resolver_cache_key(&post_resolver, &input)
5487 );
5488 }
5489
5490 #[test]
5491 fn test_expired_resolver_cache_entry_is_dropped() {
5492 let mut vm = VmContext::new();
5493 let resolver = ResolverType::Url(UrlResolverConfig {
5494 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5495 method: HttpMethod::Get,
5496 extract_path: None,
5497 });
5498 let input = json!("https://cdn.example.com/token.json");
5499 let cache_key = resolver_cache_key(&resolver, &input);
5500
5501 vm.resolver_cache.put(
5502 cache_key.clone(),
5503 ResolverCacheEntry {
5504 value: json!({ "name": "Token" }),
5505 cached_at: Instant::now() - resolver_cache_ttl() - Duration::from_secs(1),
5506 },
5507 );
5508
5509 assert!(vm.get_cached_resolver_value(&cache_key).is_none());
5510 assert!(vm.resolver_cache.get(&cache_key).is_none());
5511 }
5512
5513 #[test]
5514 fn test_computed_field_preserves_integer_type() {
5515 let vm = VmContext::new();
5516
5517 let mut state = serde_json::json!({
5518 "trading": {
5519 "total_buy_volume": 20000000000_i64,
5520 "total_sell_volume": 17951316474_i64
5521 }
5522 });
5523
5524 let spec = ComputedFieldSpec {
5525 target_path: "trading.total_volume".to_string(),
5526 result_type: "Option<u64>".to_string(),
5527 expression: ComputedExpr::Binary {
5528 op: BinaryOp::Add,
5529 left: Box::new(ComputedExpr::UnwrapOr {
5530 expr: Box::new(ComputedExpr::FieldRef {
5531 path: "trading.total_buy_volume".to_string(),
5532 }),
5533 default: serde_json::json!(0),
5534 }),
5535 right: Box::new(ComputedExpr::UnwrapOr {
5536 expr: Box::new(ComputedExpr::FieldRef {
5537 path: "trading.total_sell_volume".to_string(),
5538 }),
5539 default: serde_json::json!(0),
5540 }),
5541 },
5542 };
5543
5544 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
5545 .unwrap();
5546
5547 let total_volume = state
5548 .get("trading")
5549 .and_then(|t| t.get("total_volume"))
5550 .expect("total_volume should exist");
5551
5552 let serialized = serde_json::to_string(total_volume).unwrap();
5553 assert!(
5554 !serialized.contains('.'),
5555 "Integer should not have decimal point: {}",
5556 serialized
5557 );
5558 assert_eq!(
5559 total_volume.as_i64(),
5560 Some(37951316474),
5561 "Value should be correct sum"
5562 );
5563 }
5564
5565 #[test]
5566 fn test_set_field_sum_preserves_integer_type() {
5567 let mut vm = VmContext::new();
5568 vm.registers[0] = serde_json::json!({});
5569 vm.registers[1] = serde_json::json!(20000000000_i64);
5570 vm.registers[2] = serde_json::json!(17951316474_i64);
5571
5572 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
5573 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
5574
5575 let state = &vm.registers[0];
5576 let buy_vol = state
5577 .get("trading")
5578 .and_then(|t| t.get("total_buy_volume"))
5579 .unwrap();
5580 let sell_vol = state
5581 .get("trading")
5582 .and_then(|t| t.get("total_sell_volume"))
5583 .unwrap();
5584
5585 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
5586 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
5587
5588 assert!(
5589 !buy_serialized.contains('.'),
5590 "Buy volume should not have decimal: {}",
5591 buy_serialized
5592 );
5593 assert!(
5594 !sell_serialized.contains('.'),
5595 "Sell volume should not have decimal: {}",
5596 sell_serialized
5597 );
5598 }
5599
5600 #[test]
5601 fn test_lookup_index_chaining() {
5602 let mut vm = VmContext::new();
5603
5604 let state = vm.states.get_mut(&0).unwrap();
5605
5606 state
5607 .pda_reverse_lookups
5608 .entry("default_pda_lookup".to_string())
5609 .or_insert_with(|| PdaReverseLookup::new(1000))
5610 .insert("pda_123".to_string(), "addr_456".to_string());
5611
5612 state
5613 .lookup_indexes
5614 .entry("round_address_lookup_index".to_string())
5615 .or_insert_with(LookupIndex::new)
5616 .insert(json!("addr_456"), json!(789));
5617
5618 let handler = vec![
5619 OpCode::LoadConstant {
5620 value: json!("pda_123"),
5621 dest: 0,
5622 },
5623 OpCode::LookupIndex {
5624 state_id: 0,
5625 index_name: "round_address_lookup_index".to_string(),
5626 lookup_value: 0,
5627 dest: 1,
5628 },
5629 ];
5630
5631 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5632 .unwrap();
5633
5634 assert_eq!(vm.registers[1], json!(789));
5635 }
5636
5637 #[test]
5638 fn test_lookup_index_no_chain() {
5639 let mut vm = VmContext::new();
5640
5641 let state = vm.states.get_mut(&0).unwrap();
5642 state
5643 .lookup_indexes
5644 .entry("test_index".to_string())
5645 .or_insert_with(LookupIndex::new)
5646 .insert(json!("key_abc"), json!(42));
5647
5648 let handler = vec![
5649 OpCode::LoadConstant {
5650 value: json!("key_abc"),
5651 dest: 0,
5652 },
5653 OpCode::LookupIndex {
5654 state_id: 0,
5655 index_name: "test_index".to_string(),
5656 lookup_value: 0,
5657 dest: 1,
5658 },
5659 ];
5660
5661 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5662 .unwrap();
5663
5664 assert_eq!(vm.registers[1], json!(42));
5665 }
5666
5667 #[test]
5668 fn test_conditional_set_field_with_zero_array() {
5669 let mut vm = VmContext::new();
5670
5671 let event_zeros = json!({
5672 "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5673 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5674 });
5675
5676 let event_nonzero = json!({
5677 "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
5678 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
5679 });
5680
5681 let zero_32: Value = json!([
5682 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5683 0, 0, 0
5684 ]);
5685
5686 let handler = vec![
5687 OpCode::CreateObject { dest: 2 },
5688 OpCode::LoadEventField {
5689 path: FieldPath::new(&["value"]),
5690 dest: 10,
5691 default: None,
5692 },
5693 OpCode::ConditionalSetField {
5694 object: 2,
5695 path: "captured_value".to_string(),
5696 value: 10,
5697 condition_field: FieldPath::new(&["value"]),
5698 condition_op: ComparisonOp::NotEqual,
5699 condition_value: zero_32,
5700 },
5701 ];
5702
5703 vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
5704 .unwrap();
5705 assert!(
5706 vm.registers[2].get("captured_value").is_none(),
5707 "Field should not be set when value is all zeros"
5708 );
5709
5710 vm.reset_registers();
5711 vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
5712 .unwrap();
5713 assert!(
5714 vm.registers[2].get("captured_value").is_some(),
5715 "Field should be set when value is non-zero"
5716 );
5717 }
5718
5719 #[test]
5720 fn test_when_instruction_arrives_first() {
5721 let mut vm = VmContext::new();
5722
5723 let signature = "test_sig_123".to_string();
5724
5725 {
5726 let state = vm.states.get(&0).unwrap();
5727 let mut cache = state.recent_tx_instructions.lock().unwrap();
5728 let mut set = HashSet::new();
5729 set.insert("RevealIxState".to_string());
5730 cache.put(signature.clone(), set);
5731 }
5732
5733 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5734
5735 let handler = vec![
5736 OpCode::CreateObject { dest: 2 },
5737 OpCode::LoadConstant {
5738 value: json!("primary_key_value"),
5739 dest: 1,
5740 },
5741 OpCode::LoadConstant {
5742 value: json!("the_revealed_value"),
5743 dest: 10,
5744 },
5745 OpCode::SetFieldWhen {
5746 object: 2,
5747 path: "entropy_value".to_string(),
5748 value: 10,
5749 when_instruction: "RevealIxState".to_string(),
5750 entity_name: "TestEntity".to_string(),
5751 key_reg: 1,
5752 condition_field: None,
5753 condition_op: None,
5754 condition_value: None,
5755 },
5756 ];
5757
5758 vm.execute_handler(
5759 &handler,
5760 &json!({}),
5761 "VarState",
5762 0,
5763 "TestEntity",
5764 None,
5765 None,
5766 )
5767 .unwrap();
5768
5769 assert_eq!(
5770 vm.registers[2].get("entropy_value").unwrap(),
5771 "the_revealed_value",
5772 "Field should be set when instruction was already seen"
5773 );
5774 }
5775
5776 #[test]
5777 fn test_when_account_arrives_first() {
5778 let mut vm = VmContext::new();
5779
5780 let signature = "test_sig_456".to_string();
5781
5782 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5783
5784 let handler = vec![
5785 OpCode::CreateObject { dest: 2 },
5786 OpCode::LoadConstant {
5787 value: json!("pk_123"),
5788 dest: 1,
5789 },
5790 OpCode::LoadConstant {
5791 value: json!("deferred_value"),
5792 dest: 10,
5793 },
5794 OpCode::SetFieldWhen {
5795 object: 2,
5796 path: "entropy_value".to_string(),
5797 value: 10,
5798 when_instruction: "RevealIxState".to_string(),
5799 entity_name: "TestEntity".to_string(),
5800 key_reg: 1,
5801 condition_field: None,
5802 condition_op: None,
5803 condition_value: None,
5804 },
5805 ];
5806
5807 vm.execute_handler(
5808 &handler,
5809 &json!({}),
5810 "VarState",
5811 0,
5812 "TestEntity",
5813 None,
5814 None,
5815 )
5816 .unwrap();
5817
5818 assert!(
5819 vm.registers[2].get("entropy_value").is_none(),
5820 "Field should not be set when instruction hasn't been seen"
5821 );
5822
5823 let state = vm.states.get(&0).unwrap();
5824 let key = (signature.clone(), "RevealIxState".to_string());
5825 assert!(
5826 state.deferred_when_ops.contains_key(&key),
5827 "Operation should be queued"
5828 );
5829
5830 {
5831 let mut cache = state.recent_tx_instructions.lock().unwrap();
5832 let mut set = HashSet::new();
5833 set.insert("RevealIxState".to_string());
5834 cache.put(signature.clone(), set);
5835 }
5836
5837 let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5838 for op in deferred {
5839 vm.apply_deferred_when_op(0, &op, None, None).unwrap();
5840 }
5841
5842 let state = vm.states.get(&0).unwrap();
5843 let entity = state.data.get(&json!("pk_123")).unwrap();
5844 assert_eq!(
5845 entity.get("entropy_value").unwrap(),
5846 "deferred_value",
5847 "Field should be set after instruction arrives"
5848 );
5849 }
5850
5851 #[test]
5852 fn test_when_cleanup_expired() {
5853 let mut vm = VmContext::new();
5854
5855 let state = vm.states.get(&0).unwrap();
5856 let key = ("old_sig".to_string(), "SomeIxState".to_string());
5857 state.deferred_when_ops.insert(
5858 key,
5859 vec![DeferredWhenOperation {
5860 entity_name: "Test".to_string(),
5861 primary_key: json!("pk"),
5862 field_path: "field".to_string(),
5863 field_value: json!("value"),
5864 when_instruction: "SomeIxState".to_string(),
5865 signature: "old_sig".to_string(),
5866 slot: 0,
5867 deferred_at: 0,
5868 emit: true,
5869 }],
5870 );
5871
5872 let removed = vm.cleanup_expired_when_ops(0, 60);
5873
5874 assert_eq!(removed, 1, "Should have removed 1 expired op");
5875 assert!(
5876 vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5877 "Deferred ops should be empty after cleanup"
5878 );
5879 }
5880
5881 #[test]
5882 fn test_deferred_when_op_recomputes_dependent_fields() {
5883 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5884
5885 let mut vm = VmContext::new();
5886
5887 let computed_specs = vec![
5891 ComputedFieldSpec {
5892 target_path: "results.pre_reveal_rng".to_string(),
5893 result_type: "Option<u64>".to_string(),
5894 expression: ComputedExpr::FieldRef {
5895 path: "entropy.base_value".to_string(),
5896 },
5897 },
5898 ComputedFieldSpec {
5899 target_path: "results.pre_reveal_winning_square".to_string(),
5900 result_type: "Option<u64>".to_string(),
5901 expression: ComputedExpr::MethodCall {
5902 expr: Box::new(ComputedExpr::FieldRef {
5903 path: "results.pre_reveal_rng".to_string(),
5904 }),
5905 method: "map".to_string(),
5906 args: vec![ComputedExpr::Closure {
5907 param: "r".to_string(),
5908 body: Box::new(ComputedExpr::Binary {
5909 op: BinaryOp::Mod,
5910 left: Box::new(ComputedExpr::Var {
5911 name: "r".to_string(),
5912 }),
5913 right: Box::new(ComputedExpr::Literal {
5914 value: serde_json::json!(25),
5915 }),
5916 }),
5917 }],
5918 },
5919 },
5920 ];
5921
5922 let evaluator: Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync> =
5923 Box::new(VmContext::create_evaluator_from_specs(computed_specs));
5924
5925 let primary_key = json!("test_pk");
5928 let op = DeferredWhenOperation {
5929 entity_name: "TestEntity".to_string(),
5930 primary_key: primary_key.clone(),
5931 field_path: "entropy.base_value".to_string(),
5932 field_value: json!(100),
5933 when_instruction: "TestIxState".to_string(),
5934 signature: "test_sig".to_string(),
5935 slot: 100,
5936 deferred_at: 0,
5937 emit: true,
5938 };
5939
5940 let initial_state = json!({
5942 "results": {}
5943 });
5944 vm.states
5945 .get(&0)
5946 .unwrap()
5947 .insert_with_eviction(primary_key.clone(), initial_state);
5948
5949 let mutations = vm
5951 .apply_deferred_when_op(
5952 0,
5953 &op,
5954 Some(&evaluator),
5955 Some(&[
5956 "results.pre_reveal_rng".to_string(),
5957 "results.pre_reveal_winning_square".to_string(),
5958 ]),
5959 )
5960 .unwrap();
5961
5962 let state = vm.states.get(&0).unwrap();
5964 let entity = state.data.get(&primary_key).unwrap();
5965
5966 println!(
5967 "Entity state: {}",
5968 serde_json::to_string_pretty(&*entity).unwrap()
5969 );
5970
5971 assert_eq!(
5973 entity.get("entropy").and_then(|e| e.get("base_value")),
5974 Some(&json!(100)),
5975 "Base value should be set"
5976 );
5977
5978 let pre_reveal_rng = entity
5980 .get("results")
5981 .and_then(|r| r.get("pre_reveal_rng"))
5982 .cloned();
5983 let pre_reveal_winning_square = entity
5984 .get("results")
5985 .and_then(|r| r.get("pre_reveal_winning_square"))
5986 .cloned();
5987
5988 assert_eq!(
5989 pre_reveal_rng,
5990 Some(json!(100)),
5991 "pre_reveal_rng should be computed"
5992 );
5993 assert_eq!(
5994 pre_reveal_winning_square,
5995 Some(json!(0)),
5996 "pre_reveal_winning_square should be 100 % 25 = 0"
5997 );
5998
5999 assert!(!mutations.is_empty(), "Should have mutations");
6001 let mutation = &mutations[0];
6002 let patch = &mutation.patch;
6003
6004 assert!(
6005 patch
6006 .get("results")
6007 .and_then(|r| r.get("pre_reveal_rng"))
6008 .is_some(),
6009 "Mutation should include pre_reveal_rng"
6010 );
6011 assert!(
6012 patch
6013 .get("results")
6014 .and_then(|r| r.get("pre_reveal_winning_square"))
6015 .is_some(),
6016 "Mutation should include pre_reveal_winning_square"
6017 );
6018 }
6019}