1use crate::ast::{
2 self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3 ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use once_cell::sync::Lazy;
10use serde_json::{json, Value};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::num::NonZeroUsize;
13use std::time::{Duration, Instant};
14
15#[cfg(feature = "otel")]
16use tracing::instrument;
17#[derive(Debug, Clone, Default)]
20pub struct UpdateContext {
21 pub slot: Option<u64>,
23 pub signature: Option<String>,
25 pub timestamp: Option<i64>,
28 pub write_version: Option<u64>,
31 pub txn_index: Option<u64>,
34 pub skip_resolvers: bool,
38 pub metadata: HashMap<String, Value>,
40}
41
42impl UpdateContext {
43 pub fn new(slot: u64, signature: String) -> Self {
45 Self {
46 slot: Some(slot),
47 signature: Some(signature),
48 timestamp: None,
49 write_version: None,
50 txn_index: None,
51 skip_resolvers: false,
52 metadata: HashMap::new(),
53 }
54 }
55
56 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
58 Self {
59 slot: Some(slot),
60 signature: Some(signature),
61 timestamp: Some(timestamp),
62 write_version: None,
63 txn_index: None,
64 skip_resolvers: false,
65 metadata: HashMap::new(),
66 }
67 }
68
69 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
71 Self {
72 slot: Some(slot),
73 signature: Some(signature),
74 timestamp: None,
75 write_version: Some(write_version),
76 txn_index: None,
77 skip_resolvers: false,
78 metadata: HashMap::new(),
79 }
80 }
81
82 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
84 Self {
85 slot: Some(slot),
86 signature: Some(signature),
87 timestamp: None,
88 write_version: None,
89 txn_index: Some(txn_index),
90 skip_resolvers: false,
91 metadata: HashMap::new(),
92 }
93 }
94
95 pub fn new_reprocessed(slot: u64, write_version: u64) -> Self {
99 Self {
100 slot: Some(slot),
101 signature: None,
102 timestamp: None,
103 write_version: Some(write_version),
104 txn_index: None,
105 skip_resolvers: true,
106 metadata: HashMap::new(),
107 }
108 }
109
110 pub fn timestamp(&self) -> i64 {
112 self.timestamp.unwrap_or_else(|| {
113 std::time::SystemTime::now()
114 .duration_since(std::time::UNIX_EPOCH)
115 .unwrap()
116 .as_secs() as i64
117 })
118 }
119
120 pub fn empty() -> Self {
122 Self::default()
123 }
124
125 pub fn is_account_update(&self) -> bool {
128 self.write_version.is_some() && self.txn_index.is_none()
129 }
130
131 pub fn is_instruction_update(&self) -> bool {
133 self.txn_index.is_some() && self.write_version.is_none()
134 }
135
136 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
137 self.metadata.insert(key, value);
138 self
139 }
140
141 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
143 self.metadata.get(key)
144 }
145
146 pub fn to_value(&self) -> Value {
148 let mut obj = serde_json::Map::new();
149 if let Some(slot) = self.slot {
150 obj.insert("slot".to_string(), json!(slot));
151 }
152 if let Some(ref sig) = self.signature {
153 obj.insert("signature".to_string(), json!(sig));
154 }
155 obj.insert("timestamp".to_string(), json!(self.timestamp()));
157 for (key, value) in &self.metadata {
158 obj.insert(key.clone(), value.clone());
159 }
160 Value::Object(obj)
161 }
162}
163
164pub type Register = usize;
165pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
166
167pub type RegisterValue = Value;
168
169pub trait ComputedFieldsEvaluator {
172 fn evaluate(&self, state: &mut Value) -> Result<()>;
173}
174
175const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
177const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
178const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
183
184const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
186const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
187
188const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
189
190const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
191
192const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
195
196const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
197
198const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
199
200const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 20_000;
201const DEFAULT_RESOLVER_CACHE_TTL_SECS: u64 = 3600; static RESOLVER_CACHE_CAPACITY: Lazy<NonZeroUsize> = Lazy::new(|| {
204 NonZeroUsize::new(
205 std::env::var("HYPERSTACK_RESOLVER_CACHE_CAPACITY")
206 .ok()
207 .and_then(|value| value.parse::<usize>().ok())
208 .filter(|value| *value > 0)
209 .unwrap_or(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES),
210 )
211 .expect("resolver cache capacity must be > 0")
212});
213
214static RESOLVER_CACHE_TTL: Lazy<Duration> = Lazy::new(|| {
215 let ttl_secs = match std::env::var("HYPERSTACK_RESOLVER_CACHE_TTL_SECS") {
216 Ok(value) => match value.parse::<u64>() {
217 Ok(0) => {
218 tracing::warn!(
219 default_ttl_secs = DEFAULT_RESOLVER_CACHE_TTL_SECS,
220 "HYPERSTACK_RESOLVER_CACHE_TTL_SECS=0 is not supported; using default"
221 );
222 DEFAULT_RESOLVER_CACHE_TTL_SECS
223 }
224 Ok(value) => value,
225 Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
226 },
227 Err(_) => DEFAULT_RESOLVER_CACHE_TTL_SECS,
228 };
229
230 Duration::from_secs(ttl_secs)
231});
232
233#[derive(Debug, Clone)]
234struct ResolverCacheEntry {
235 value: Value,
236 cached_at: Instant,
237}
238
239fn resolver_cache_capacity() -> NonZeroUsize {
240 *RESOLVER_CACHE_CAPACITY
241}
242
243fn resolver_cache_ttl() -> Duration {
244 *RESOLVER_CACHE_TTL
245}
246
247fn estimate_json_size(value: &Value) -> usize {
249 match value {
250 Value::Null => 4,
251 Value::Bool(_) => 5,
252 Value::Number(_) => 8,
253 Value::String(s) => s.len() + 2,
254 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
255 Value::Object(obj) => {
256 2 + obj
257 .iter()
258 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
259 .sum::<usize>()
260 }
261 }
262}
263
264#[derive(Debug, Clone)]
265pub struct CompiledPath {
266 pub segments: std::sync::Arc<[String]>,
267}
268
269impl CompiledPath {
270 pub fn new(path: &str) -> Self {
271 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
272 CompiledPath {
273 segments: segments.into(),
274 }
275 }
276
277 fn segments(&self) -> &[String] {
278 &self.segments
279 }
280}
281
282#[derive(Debug, Clone)]
285pub enum FieldChange {
286 Replaced,
288 Appended(Vec<Value>),
290}
291
292#[derive(Debug, Clone, Default)]
295pub struct DirtyTracker {
296 changes: HashMap<String, FieldChange>,
297}
298
299impl DirtyTracker {
300 pub fn new() -> Self {
302 Self {
303 changes: HashMap::new(),
304 }
305 }
306
307 pub fn mark_replaced(&mut self, path: &str) {
309 self.changes.insert(path.to_string(), FieldChange::Replaced);
311 }
312
313 pub fn mark_appended(&mut self, path: &str, value: Value) {
315 match self.changes.get_mut(path) {
316 Some(FieldChange::Appended(values)) => {
317 values.push(value);
319 }
320 Some(FieldChange::Replaced) => {
321 }
324 None => {
325 self.changes
327 .insert(path.to_string(), FieldChange::Appended(vec![value]));
328 }
329 }
330 }
331
332 pub fn is_empty(&self) -> bool {
334 self.changes.is_empty()
335 }
336
337 pub fn len(&self) -> usize {
339 self.changes.len()
340 }
341
342 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
344 self.changes.iter()
345 }
346
347 pub fn dirty_paths(&self) -> HashSet<String> {
349 self.changes.keys().cloned().collect()
350 }
351
352 pub fn into_changes(self) -> HashMap<String, FieldChange> {
354 self.changes
355 }
356
357 pub fn changes(&self) -> &HashMap<String, FieldChange> {
359 &self.changes
360 }
361
362 pub fn appended_paths(&self) -> Vec<String> {
364 self.changes
365 .iter()
366 .filter_map(|(path, change)| match change {
367 FieldChange::Appended(_) => Some(path.clone()),
368 FieldChange::Replaced => None,
369 })
370 .collect()
371 }
372}
373
374pub struct VmContext {
375 registers: Vec<RegisterValue>,
376 states: HashMap<u32, StateTable>,
377 pub instructions_executed: u64,
378 pub cache_hits: u64,
379 path_cache: HashMap<String, CompiledPath>,
380 pub pda_cache_hits: u64,
381 pub pda_cache_misses: u64,
382 pub pending_queue_size: u64,
383 resolver_requests: VecDeque<ResolverRequest>,
384 resolver_pending: HashMap<String, PendingResolverEntry>,
385 resolver_cache: LruCache<String, ResolverCacheEntry>,
386 pub resolver_cache_hits: u64,
387 pub resolver_cache_misses: u64,
388 current_context: Option<UpdateContext>,
389 warnings: Vec<String>,
390 last_pda_lookup_miss: Option<String>,
391 last_lookup_index_miss: Option<String>,
392 last_pda_registered: Option<String>,
393 last_lookup_index_keys: Vec<String>,
394 scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
395}
396
397#[derive(Debug)]
398pub struct LookupIndex {
399 index: std::sync::Mutex<LruCache<String, Value>>,
400}
401
402impl LookupIndex {
403 pub fn new() -> Self {
404 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
405 }
406
407 pub fn with_capacity(capacity: usize) -> Self {
408 LookupIndex {
409 index: std::sync::Mutex::new(LruCache::new(
410 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
411 )),
412 }
413 }
414
415 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
416 let key = value_to_cache_key(lookup_value);
417 self.index.lock().unwrap().get(&key).cloned()
418 }
419
420 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
421 let key = value_to_cache_key(&lookup_value);
422 self.index.lock().unwrap().put(key, primary_key);
423 }
424
425 pub fn remove(&self, lookup_value: &Value) {
428 let key = value_to_cache_key(lookup_value);
429 self.index.lock().unwrap().pop(&key);
430 }
431
432 pub fn len(&self) -> usize {
433 self.index.lock().unwrap().len()
434 }
435
436 pub fn is_empty(&self) -> bool {
437 self.index.lock().unwrap().is_empty()
438 }
439}
440
441impl Default for LookupIndex {
442 fn default() -> Self {
443 Self::new()
444 }
445}
446
447fn value_to_cache_key(value: &Value) -> String {
448 match value {
449 Value::String(s) => s.clone(),
450 Value::Number(n) => n.to_string(),
451 Value::Bool(b) => b.to_string(),
452 Value::Null => "null".to_string(),
453 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
454 }
455}
456
457pub(crate) fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
458 match resolver {
459 ResolverType::Token => format!("token:{}", value_to_cache_key(input)),
460 ResolverType::Url(config) => {
461 let method = match config.method {
462 ast::HttpMethod::Get => "get",
463 ast::HttpMethod::Post => "post",
464 };
465 format!("url:{}:{}", method, value_to_cache_key(input))
466 }
467 }
468}
469
470#[derive(Debug)]
471pub struct TemporalIndex {
472 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
473}
474
475impl Default for TemporalIndex {
476 fn default() -> Self {
477 Self::new()
478 }
479}
480
481impl TemporalIndex {
482 pub fn new() -> Self {
483 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
484 }
485
486 pub fn with_capacity(capacity: usize) -> Self {
487 TemporalIndex {
488 index: std::sync::Mutex::new(LruCache::new(
489 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
490 )),
491 }
492 }
493
494 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
495 let key = value_to_cache_key(lookup_value);
496 let mut cache = self.index.lock().unwrap();
497 if let Some(entries) = cache.get(&key) {
498 for i in (0..entries.len()).rev() {
499 if entries[i].1 <= timestamp {
500 return Some(entries[i].0.clone());
501 }
502 }
503 }
504 None
505 }
506
507 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
508 let key = value_to_cache_key(lookup_value);
509 let mut cache = self.index.lock().unwrap();
510 if let Some(entries) = cache.get(&key) {
511 if let Some(last) = entries.last() {
512 return Some(last.0.clone());
513 }
514 }
515 None
516 }
517
518 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
519 let key = value_to_cache_key(&lookup_value);
520 let mut cache = self.index.lock().unwrap();
521
522 let entries = cache.get_or_insert_mut(key, Vec::new);
523 entries.push((primary_key, timestamp));
524 entries.sort_by_key(|(_, ts)| *ts);
525
526 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
527 entries.retain(|(_, ts)| *ts >= cutoff);
528
529 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
530 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
531 entries.drain(0..excess);
532 }
533 }
534
535 pub fn len(&self) -> usize {
536 self.index.lock().unwrap().len()
537 }
538
539 pub fn is_empty(&self) -> bool {
540 self.index.lock().unwrap().is_empty()
541 }
542
543 pub fn total_entries(&self) -> usize {
544 self.index
545 .lock()
546 .unwrap()
547 .iter()
548 .map(|(_, entries)| entries.len())
549 .sum()
550 }
551
552 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
553 let mut cache = self.index.lock().unwrap();
554 let mut total_removed = 0;
555
556 for (_, entries) in cache.iter_mut() {
557 let original_len = entries.len();
558 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
559 total_removed += original_len - entries.len();
560 }
561
562 total_removed
563 }
564}
565
566#[derive(Debug)]
567pub struct PdaReverseLookup {
568 index: LruCache<String, String>,
570}
571
572impl PdaReverseLookup {
573 pub fn new(capacity: usize) -> Self {
574 PdaReverseLookup {
575 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
576 }
577 }
578
579 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
580 self.index.get(pda_address).cloned()
581 }
582
583 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
584 let evicted = if self.index.len() >= self.index.cap().get() {
585 self.index.peek_lru().map(|(k, _)| k.clone())
586 } else {
587 None
588 };
589
590 self.index.put(pda_address, seed_value);
591 evicted
592 }
593
594 pub fn len(&self) -> usize {
595 self.index.len()
596 }
597
598 pub fn is_empty(&self) -> bool {
599 self.index.is_empty()
600 }
601
602 pub fn contains(&self, pda_address: &str) -> bool {
603 self.index.peek(pda_address).is_some()
604 }
605}
606
607#[derive(Debug, Clone)]
609pub struct QueuedAccountUpdate {
610 pub pda_address: String,
611 pub account_type: String,
612 pub account_data: Value,
613 pub slot: u64,
614 pub write_version: u64,
615 pub signature: String,
616}
617
618#[derive(Debug, Clone)]
620pub struct PendingAccountUpdate {
621 pub account_type: String,
622 pub pda_address: String,
623 pub account_data: Value,
624 pub slot: u64,
625 pub write_version: u64,
626 pub signature: String,
627 pub queued_at: i64,
628 pub is_stale_reprocess: bool,
634}
635
636#[derive(Debug, Clone)]
638pub struct QueuedInstructionEvent {
639 pub pda_address: String,
640 pub event_type: String,
641 pub event_data: Value,
642 pub slot: u64,
643 pub signature: String,
644}
645
646#[derive(Debug, Clone)]
648pub struct PendingInstructionEvent {
649 pub event_type: String,
650 pub pda_address: String,
651 pub event_data: Value,
652 pub slot: u64,
653 pub signature: String,
654 pub queued_at: i64,
655}
656
657#[derive(Debug, Clone)]
658pub struct DeferredWhenOperation {
659 pub entity_name: String,
660 pub primary_key: Value,
661 pub field_path: String,
662 pub field_value: Value,
663 pub when_instruction: String,
664 pub signature: String,
665 pub slot: u64,
666 pub deferred_at: i64,
667 pub emit: bool,
668}
669
670#[derive(Debug, Clone)]
671pub struct ResolverRequest {
672 pub cache_key: String,
673 pub resolver: ResolverType,
674 pub input: Value,
675}
676
677#[derive(Debug, Clone)]
678pub struct ResolverTarget {
679 pub state_id: u32,
680 pub entity_name: String,
681 pub primary_key: Value,
682 pub extracts: Vec<ResolverExtractSpec>,
683}
684
685#[derive(Debug, Clone)]
686pub struct PendingResolverEntry {
687 pub resolver: ResolverType,
688 pub input: Value,
689 pub targets: Vec<ResolverTarget>,
690 pub queued_at: i64,
691}
692
693#[derive(Debug, Clone, PartialEq)]
694pub struct ScheduledCallback {
695 pub state_id: u32,
696 pub entity_name: String,
697 pub primary_key: Value,
698 pub resolver: ResolverType,
699 pub url_template: Option<Vec<ast::UrlTemplatePart>>,
700 pub input_value: Option<Value>,
701 pub input_path: Option<String>,
702 pub condition: Option<ast::ResolverCondition>,
703 pub strategy: ResolveStrategy,
704 pub extracts: Vec<ResolverExtractSpec>,
705 pub retry_count: u32,
706}
707
708impl PendingResolverEntry {
709 fn add_target(&mut self, target: ResolverTarget) {
710 if let Some(existing) = self.targets.iter_mut().find(|t| {
711 t.state_id == target.state_id
712 && t.entity_name == target.entity_name
713 && t.primary_key == target.primary_key
714 }) {
715 let mut seen = HashSet::new();
716 for extract in &existing.extracts {
717 seen.insert((extract.target_path.clone(), extract.source_path.clone()));
718 }
719
720 for extract in target.extracts {
721 let key = (extract.target_path.clone(), extract.source_path.clone());
722 if seen.insert(key) {
723 existing.extracts.push(extract);
724 }
725 }
726 } else {
727 self.targets.push(target);
728 }
729 }
730}
731
732#[derive(Debug, Clone)]
733pub struct PendingQueueStats {
734 pub total_updates: usize,
735 pub unique_pdas: usize,
736 pub oldest_age_seconds: i64,
737 pub largest_pda_queue_size: usize,
738 pub estimated_memory_bytes: usize,
739}
740
741#[derive(Debug, Clone, Default)]
742pub struct VmMemoryStats {
743 pub state_table_entity_count: usize,
744 pub state_table_max_entries: usize,
745 pub state_table_at_capacity: bool,
746 pub lookup_index_count: usize,
747 pub lookup_index_total_entries: usize,
748 pub temporal_index_count: usize,
749 pub temporal_index_total_entries: usize,
750 pub pda_reverse_lookup_count: usize,
751 pub pda_reverse_lookup_total_entries: usize,
752 pub version_tracker_entries: usize,
753 pub pending_queue_stats: Option<PendingQueueStats>,
754 pub path_cache_size: usize,
755}
756
757#[derive(Debug, Clone, Default)]
758pub struct CleanupResult {
759 pub pending_updates_removed: usize,
760 pub temporal_entries_removed: usize,
761}
762
763#[derive(Debug, Clone)]
764pub struct CapacityWarning {
765 pub current_entries: usize,
766 pub max_entries: usize,
767 pub entries_over_limit: usize,
768}
769
770#[derive(Debug, Clone)]
771pub struct StateTableConfig {
772 pub max_entries: usize,
773 pub max_array_length: usize,
774}
775
776impl Default for StateTableConfig {
777 fn default() -> Self {
778 Self {
779 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
780 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
781 }
782 }
783}
784
785#[derive(Debug)]
786pub struct VersionTracker {
787 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
788}
789
790impl VersionTracker {
791 pub fn new() -> Self {
792 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
793 }
794
795 pub fn with_capacity(capacity: usize) -> Self {
796 VersionTracker {
797 cache: std::sync::Mutex::new(LruCache::new(
798 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
799 )),
800 }
801 }
802
803 fn make_key(primary_key: &Value, event_type: &str) -> String {
804 format!("{}:{}", primary_key, event_type)
805 }
806
807 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
808 let key = Self::make_key(primary_key, event_type);
809 self.cache.lock().unwrap().get(&key).copied()
810 }
811
812 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
813 let key = Self::make_key(primary_key, event_type);
814 self.cache.lock().unwrap().put(key, (slot, ordering_value));
815 }
816
817 pub fn len(&self) -> usize {
818 self.cache.lock().unwrap().len()
819 }
820
821 pub fn is_empty(&self) -> bool {
822 self.cache.lock().unwrap().is_empty()
823 }
824}
825
826impl Default for VersionTracker {
827 fn default() -> Self {
828 Self::new()
829 }
830}
831
832#[derive(Debug)]
833pub struct StateTable {
834 pub data: DashMap<Value, Value>,
835 access_times: DashMap<Value, i64>,
836 pub lookup_indexes: HashMap<String, LookupIndex>,
837 pub temporal_indexes: HashMap<String, TemporalIndex>,
838 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
839 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
840 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
841 pub last_account_data: DashMap<String, PendingAccountUpdate>,
845 version_tracker: VersionTracker,
846 instruction_dedup_cache: VersionTracker,
847 config: StateTableConfig,
848 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
849 entity_name: String,
850 pub recent_tx_instructions:
851 std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
852 pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
853}
854
855impl StateTable {
856 pub fn is_at_capacity(&self) -> bool {
857 self.data.len() >= self.config.max_entries
858 }
859
860 pub fn entries_over_limit(&self) -> usize {
861 self.data.len().saturating_sub(self.config.max_entries)
862 }
863
864 pub fn max_array_length(&self) -> usize {
865 self.config.max_array_length
866 }
867
868 fn touch(&self, key: &Value) {
869 let now = std::time::SystemTime::now()
870 .duration_since(std::time::UNIX_EPOCH)
871 .unwrap()
872 .as_secs() as i64;
873 self.access_times.insert(key.clone(), now);
874 }
875
876 fn evict_lru(&self, count: usize) -> usize {
877 if count == 0 || self.data.is_empty() {
878 return 0;
879 }
880
881 let mut entries: Vec<(Value, i64)> = self
882 .access_times
883 .iter()
884 .map(|entry| (entry.key().clone(), *entry.value()))
885 .collect();
886
887 entries.sort_by_key(|(_, ts)| *ts);
888
889 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
890
891 let mut evicted = 0;
892 for key in to_evict {
893 self.data.remove(&key);
894 self.access_times.remove(&key);
895 evicted += 1;
896 }
897
898 #[cfg(feature = "otel")]
899 if evicted > 0 {
900 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
901 }
902
903 evicted
904 }
905
906 pub fn insert_with_eviction(&self, key: Value, value: Value) {
907 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
908 #[cfg(feature = "otel")]
909 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
910 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
911 self.evict_lru(to_evict.max(1));
912 }
913 self.data.insert(key.clone(), value);
914 self.touch(&key);
915 }
916
917 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
918 let result = self.data.get(key).map(|v| v.clone());
919 if result.is_some() {
920 self.touch(key);
921 }
922 result
923 }
924
925 pub fn is_fresh_update(
932 &self,
933 primary_key: &Value,
934 event_type: &str,
935 slot: u64,
936 ordering_value: u64,
937 ) -> bool {
938 let dominated = self
939 .version_tracker
940 .get(primary_key, event_type)
941 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
942 .unwrap_or(false);
943
944 if dominated {
945 return false;
946 }
947
948 self.version_tracker
949 .insert(primary_key, event_type, slot, ordering_value);
950 true
951 }
952
953 pub fn is_duplicate_instruction(
961 &self,
962 primary_key: &Value,
963 event_type: &str,
964 slot: u64,
965 txn_index: u64,
966 ) -> bool {
967 let is_duplicate = self
969 .instruction_dedup_cache
970 .get(primary_key, event_type)
971 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
972 .unwrap_or(false);
973
974 if is_duplicate {
975 return true;
976 }
977
978 self.instruction_dedup_cache
980 .insert(primary_key, event_type, slot, txn_index);
981 false
982 }
983}
984
985impl VmContext {
986 pub fn new() -> Self {
987 let mut vm = VmContext {
988 registers: vec![Value::Null; 256],
989 states: HashMap::new(),
990 instructions_executed: 0,
991 cache_hits: 0,
992 path_cache: HashMap::new(),
993 pda_cache_hits: 0,
994 pda_cache_misses: 0,
995 pending_queue_size: 0,
996 resolver_requests: VecDeque::new(),
997 resolver_pending: HashMap::new(),
998 resolver_cache: LruCache::new(resolver_cache_capacity()),
999 resolver_cache_hits: 0,
1000 resolver_cache_misses: 0,
1001 current_context: None,
1002 warnings: Vec::new(),
1003 last_pda_lookup_miss: None,
1004 last_lookup_index_miss: None,
1005 last_pda_registered: None,
1006 last_lookup_index_keys: Vec::new(),
1007 scheduled_callbacks: Vec::new(),
1008 };
1009 vm.states.insert(
1010 0,
1011 StateTable {
1012 data: DashMap::new(),
1013 access_times: DashMap::new(),
1014 lookup_indexes: HashMap::new(),
1015 temporal_indexes: HashMap::new(),
1016 pda_reverse_lookups: HashMap::new(),
1017 pending_updates: DashMap::new(),
1018 pending_instruction_events: DashMap::new(),
1019 last_account_data: DashMap::new(),
1020 version_tracker: VersionTracker::new(),
1021 instruction_dedup_cache: VersionTracker::with_capacity(
1022 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1023 ),
1024 config: StateTableConfig::default(),
1025 entity_name: String::new(),
1026 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1027 NonZeroUsize::new(1000).unwrap(),
1028 )),
1029 deferred_when_ops: DashMap::new(),
1030 },
1031 );
1032
1033 vm
1034 }
1035
1036 pub fn new_multi_entity() -> Self {
1038 VmContext {
1039 registers: vec![Value::Null; 256],
1040 states: HashMap::new(),
1041 instructions_executed: 0,
1042 cache_hits: 0,
1043 path_cache: HashMap::new(),
1044 pda_cache_hits: 0,
1045 pda_cache_misses: 0,
1046 pending_queue_size: 0,
1047 resolver_requests: VecDeque::new(),
1048 resolver_pending: HashMap::new(),
1049 resolver_cache: LruCache::new(resolver_cache_capacity()),
1050 resolver_cache_hits: 0,
1051 resolver_cache_misses: 0,
1052 current_context: None,
1053 warnings: Vec::new(),
1054 last_pda_lookup_miss: None,
1055 last_lookup_index_miss: None,
1056 last_pda_registered: None,
1057 last_lookup_index_keys: Vec::new(),
1058 scheduled_callbacks: Vec::new(),
1059 }
1060 }
1061
1062 pub fn new_with_config(state_config: StateTableConfig) -> Self {
1063 let mut vm = VmContext {
1064 registers: vec![Value::Null; 256],
1065 states: HashMap::new(),
1066 instructions_executed: 0,
1067 cache_hits: 0,
1068 path_cache: HashMap::new(),
1069 pda_cache_hits: 0,
1070 pda_cache_misses: 0,
1071 pending_queue_size: 0,
1072 resolver_requests: VecDeque::new(),
1073 resolver_pending: HashMap::new(),
1074 resolver_cache: LruCache::new(resolver_cache_capacity()),
1075 resolver_cache_hits: 0,
1076 resolver_cache_misses: 0,
1077 current_context: None,
1078 warnings: Vec::new(),
1079 last_pda_lookup_miss: None,
1080 last_lookup_index_miss: None,
1081 last_pda_registered: None,
1082 last_lookup_index_keys: Vec::new(),
1083 scheduled_callbacks: Vec::new(),
1084 };
1085 vm.states.insert(
1086 0,
1087 StateTable {
1088 data: DashMap::new(),
1089 access_times: DashMap::new(),
1090 lookup_indexes: HashMap::new(),
1091 temporal_indexes: HashMap::new(),
1092 pda_reverse_lookups: HashMap::new(),
1093 pending_updates: DashMap::new(),
1094 pending_instruction_events: DashMap::new(),
1095 last_account_data: DashMap::new(),
1096 version_tracker: VersionTracker::new(),
1097 instruction_dedup_cache: VersionTracker::with_capacity(
1098 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1099 ),
1100 config: state_config,
1101 entity_name: "default".to_string(),
1102 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1103 NonZeroUsize::new(1000).unwrap(),
1104 )),
1105 deferred_when_ops: DashMap::new(),
1106 },
1107 );
1108 vm
1109 }
1110
1111 pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1112 self.resolver_requests.drain(..).collect()
1113 }
1114
1115 pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1116 std::mem::take(&mut self.scheduled_callbacks)
1117 }
1118
1119 pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1120 self.states.get(&state_id)?.get_and_touch(key)
1121 }
1122
1123 pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1124 if requests.is_empty() {
1125 return;
1126 }
1127
1128 self.resolver_requests.extend(requests);
1129 }
1130
1131 pub(crate) fn get_cached_resolver_value(&mut self, cache_key: &str) -> Option<Value> {
1132 let cached = self.resolver_cache.get(cache_key).cloned();
1133
1134 match cached {
1135 Some(entry) if entry.cached_at.elapsed() <= resolver_cache_ttl() => {
1136 self.resolver_cache_hits += 1;
1137 Some(entry.value)
1138 }
1139 Some(_) => {
1140 self.resolver_cache.pop(cache_key);
1141 self.resolver_cache_misses += 1;
1142 None
1143 }
1144 None => {
1145 self.resolver_cache_misses += 1;
1146 None
1147 }
1148 }
1149 }
1150
1151 fn cache_resolver_value(
1152 &mut self,
1153 resolver: &ResolverType,
1154 input: &Value,
1155 resolved_value: &Value,
1156 ) {
1157 self.resolver_cache.put(
1158 resolver_cache_key(resolver, input),
1159 ResolverCacheEntry {
1160 value: resolved_value.clone(),
1161 cached_at: Instant::now(),
1162 },
1163 );
1164 }
1165
1166 pub fn apply_resolver_result(
1167 &mut self,
1168 bytecode: &MultiEntityBytecode,
1169 cache_key: &str,
1170 resolved_value: Value,
1171 ) -> Result<Vec<Mutation>> {
1172 let entry = match self.resolver_pending.remove(cache_key) {
1173 Some(entry) => entry,
1174 None => return Ok(Vec::new()),
1175 };
1176
1177 self.cache_resolver_value(&entry.resolver, &entry.input, &resolved_value);
1178
1179 let mut mutations = Vec::new();
1180
1181 for target in entry.targets {
1182 if target.primary_key.is_null() {
1183 continue;
1184 }
1185
1186 let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1187 Some(bc) => bc,
1188 None => continue,
1189 };
1190
1191 let state = match self.states.get(&target.state_id) {
1192 Some(s) => s,
1193 None => continue,
1194 };
1195
1196 let mut entity_state = state
1197 .get_and_touch(&target.primary_key)
1198 .unwrap_or_else(|| json!({}));
1199
1200 let mut dirty_tracker = DirtyTracker::new();
1201 let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1202
1203 Self::apply_resolver_extractions_to_value(
1204 &mut entity_state,
1205 &resolved_value,
1206 &target.extracts,
1207 &mut dirty_tracker,
1208 &should_emit,
1209 )?;
1210
1211 if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1212 let old_values: Vec<_> = entity_bytecode
1213 .computed_paths
1214 .iter()
1215 .map(|path| Self::get_value_at_path(&entity_state, path))
1216 .collect();
1217
1218 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1219 let context_timestamp = self
1220 .current_context
1221 .as_ref()
1222 .map(|c| c.timestamp())
1223 .unwrap_or_else(|| {
1224 std::time::SystemTime::now()
1225 .duration_since(std::time::UNIX_EPOCH)
1226 .unwrap()
1227 .as_secs() as i64
1228 });
1229 let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1230
1231 if eval_result.is_ok() {
1232 let mut changed_fields = Vec::new();
1233 for (path, old_value) in
1234 entity_bytecode.computed_paths.iter().zip(old_values.iter())
1235 {
1236 let new_value = Self::get_value_at_path(&entity_state, path);
1237 let changed = new_value != *old_value;
1238 let will_emit = should_emit(path);
1239
1240 if changed && will_emit {
1241 dirty_tracker.mark_replaced(path);
1242 changed_fields.push(path.clone());
1243 }
1244 }
1245 }
1246 }
1247
1248 state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1249
1250 if dirty_tracker.is_empty() {
1251 continue;
1252 }
1253
1254 let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1255
1256 mutations.push(Mutation {
1257 export: target.entity_name.clone(),
1258 key: target.primary_key.clone(),
1259 patch,
1260 append: vec![],
1261 });
1262 }
1263
1264 Ok(mutations)
1265 }
1266
1267 pub fn enqueue_resolver_request(
1268 &mut self,
1269 _cache_key: String,
1270 resolver: ResolverType,
1271 input: Value,
1272 target: ResolverTarget,
1273 ) {
1274 let cache_key = resolver_cache_key(&resolver, &input);
1275
1276 if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1277 entry.add_target(target);
1278 return;
1279 }
1280
1281 let queued_at = std::time::SystemTime::now()
1282 .duration_since(std::time::UNIX_EPOCH)
1283 .unwrap()
1284 .as_secs() as i64;
1285
1286 self.resolver_pending.insert(
1287 cache_key.clone(),
1288 PendingResolverEntry {
1289 resolver: resolver.clone(),
1290 input: input.clone(),
1291 targets: vec![target],
1292 queued_at,
1293 },
1294 );
1295
1296 self.resolver_requests.push_back(ResolverRequest {
1297 cache_key,
1298 resolver,
1299 input,
1300 });
1301 }
1302
1303 fn apply_resolver_extractions_to_value<F>(
1304 state: &mut Value,
1305 resolved_value: &Value,
1306 extracts: &[ResolverExtractSpec],
1307 dirty_tracker: &mut DirtyTracker,
1308 should_emit: &F,
1309 ) -> Result<()>
1310 where
1311 F: Fn(&str) -> bool,
1312 {
1313 for extract in extracts {
1314 let resolved = match extract.source_path.as_deref() {
1315 Some(path) => Self::get_value_at_path(resolved_value, path),
1316 None => Some(resolved_value.clone()),
1317 };
1318
1319 let Some(value) = resolved else {
1320 continue;
1321 };
1322
1323 let value = if let Some(transform) = &extract.transform {
1324 Self::apply_transformation(&value, transform)?
1325 } else {
1326 value
1327 };
1328
1329 Self::set_nested_field_value(state, &extract.target_path, value)?;
1330 if should_emit(&extract.target_path) {
1331 dirty_tracker.mark_replaced(&extract.target_path);
1332 }
1333 }
1334
1335 Ok(())
1336 }
1337
1338 fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1339 if tracker.is_empty() {
1340 return Ok(json!({}));
1341 }
1342
1343 let mut partial = serde_json::Map::new();
1344
1345 for (path, change) in tracker.iter() {
1346 let segments: Vec<&str> = path.split('.').collect();
1347
1348 let value_to_insert = match change {
1349 FieldChange::Replaced => {
1350 let mut current = state;
1351 let mut found = true;
1352
1353 for segment in &segments {
1354 match current.get(*segment) {
1355 Some(v) => current = v,
1356 None => {
1357 found = false;
1358 break;
1359 }
1360 }
1361 }
1362
1363 if !found {
1364 continue;
1365 }
1366 current.clone()
1367 }
1368 FieldChange::Appended(values) => Value::Array(values.clone()),
1369 };
1370
1371 let mut target = &mut partial;
1372 for (i, segment) in segments.iter().enumerate() {
1373 if i == segments.len() - 1 {
1374 target.insert(segment.to_string(), value_to_insert.clone());
1375 } else {
1376 target
1377 .entry(segment.to_string())
1378 .or_insert_with(|| json!({}));
1379 target = target
1380 .get_mut(*segment)
1381 .and_then(|v| v.as_object_mut())
1382 .ok_or("Failed to build nested structure")?;
1383 }
1384 }
1385 }
1386
1387 Ok(Value::Object(partial))
1388 }
1389
1390 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1393 self.states.get_mut(&state_id)
1394 }
1395
1396 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1398 &mut self.registers
1399 }
1400
1401 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1403 &self.path_cache
1404 }
1405
1406 pub fn current_context(&self) -> Option<&UpdateContext> {
1408 self.current_context.as_ref()
1409 }
1410
1411 pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1413 self.current_context = context;
1414 }
1415
1416 fn add_warning(&mut self, msg: String) {
1417 self.warnings.push(msg);
1418 }
1419
1420 pub fn take_warnings(&mut self) -> Vec<String> {
1421 std::mem::take(&mut self.warnings)
1422 }
1423
1424 pub fn has_warnings(&self) -> bool {
1425 !self.warnings.is_empty()
1426 }
1427
1428 pub fn update_state_from_register(
1429 &mut self,
1430 state_id: u32,
1431 key: Value,
1432 register: Register,
1433 ) -> Result<()> {
1434 let state = self.states.get(&state_id).ok_or("State table not found")?;
1435 let value = self.registers[register].clone();
1436 state.insert_with_eviction(key, value);
1437 Ok(())
1438 }
1439
1440 fn reset_registers(&mut self) {
1441 for reg in &mut self.registers {
1442 *reg = Value::Null;
1443 }
1444 }
1445
1446 pub fn extract_partial_state(
1448 &self,
1449 state_reg: Register,
1450 dirty_fields: &HashSet<String>,
1451 ) -> Result<Value> {
1452 let full_state = &self.registers[state_reg];
1453
1454 if dirty_fields.is_empty() {
1455 return Ok(json!({}));
1456 }
1457
1458 let mut partial = serde_json::Map::new();
1459
1460 for path in dirty_fields {
1461 let segments: Vec<&str> = path.split('.').collect();
1462
1463 let mut current = full_state;
1464 let mut found = true;
1465
1466 for segment in &segments {
1467 match current.get(segment) {
1468 Some(v) => current = v,
1469 None => {
1470 found = false;
1471 break;
1472 }
1473 }
1474 }
1475
1476 if !found {
1477 continue;
1478 }
1479
1480 let mut target = &mut partial;
1481 for (i, segment) in segments.iter().enumerate() {
1482 if i == segments.len() - 1 {
1483 target.insert(segment.to_string(), current.clone());
1484 } else {
1485 target
1486 .entry(segment.to_string())
1487 .or_insert_with(|| json!({}));
1488 target = target
1489 .get_mut(*segment)
1490 .and_then(|v| v.as_object_mut())
1491 .ok_or("Failed to build nested structure")?;
1492 }
1493 }
1494 }
1495
1496 Ok(Value::Object(partial))
1497 }
1498
1499 pub fn extract_partial_state_with_tracker(
1503 &self,
1504 state_reg: Register,
1505 tracker: &DirtyTracker,
1506 ) -> Result<Value> {
1507 let full_state = &self.registers[state_reg];
1508
1509 if tracker.is_empty() {
1510 return Ok(json!({}));
1511 }
1512
1513 let mut partial = serde_json::Map::new();
1514
1515 for (path, change) in tracker.iter() {
1516 let segments: Vec<&str> = path.split('.').collect();
1517
1518 let value_to_insert = match change {
1519 FieldChange::Replaced => {
1520 let mut current = full_state;
1521 let mut found = true;
1522
1523 for segment in &segments {
1524 match current.get(*segment) {
1525 Some(v) => current = v,
1526 None => {
1527 found = false;
1528 break;
1529 }
1530 }
1531 }
1532
1533 if !found {
1534 continue;
1535 }
1536 current.clone()
1537 }
1538 FieldChange::Appended(values) => Value::Array(values.clone()),
1539 };
1540
1541 let mut target = &mut partial;
1542 for (i, segment) in segments.iter().enumerate() {
1543 if i == segments.len() - 1 {
1544 target.insert(segment.to_string(), value_to_insert.clone());
1545 } else {
1546 target
1547 .entry(segment.to_string())
1548 .or_insert_with(|| json!({}));
1549 target = target
1550 .get_mut(*segment)
1551 .and_then(|v| v.as_object_mut())
1552 .ok_or("Failed to build nested structure")?;
1553 }
1554 }
1555 }
1556
1557 Ok(Value::Object(partial))
1558 }
1559
1560 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1561 if let Some(compiled) = self.path_cache.get(path) {
1562 self.cache_hits += 1;
1563 #[cfg(feature = "otel")]
1564 crate::vm_metrics::record_path_cache_hit();
1565 return compiled.clone();
1566 }
1567 #[cfg(feature = "otel")]
1568 crate::vm_metrics::record_path_cache_miss();
1569 let compiled = CompiledPath::new(path);
1570 self.path_cache.insert(path.to_string(), compiled.clone());
1571 compiled
1572 }
1573
1574 #[cfg_attr(feature = "otel", instrument(
1576 name = "vm.process_event",
1577 skip(self, bytecode, event_value, log),
1578 level = "info",
1579 fields(
1580 event_type = %event_type,
1581 slot = context.as_ref().and_then(|c| c.slot),
1582 )
1583 ))]
1584 pub fn process_event(
1585 &mut self,
1586 bytecode: &MultiEntityBytecode,
1587 event_value: Value,
1588 event_type: &str,
1589 context: Option<&UpdateContext>,
1590 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1591 ) -> Result<Vec<Mutation>> {
1592 self.current_context = context.cloned();
1593
1594 let mut event_value = event_value;
1595 if let Some(ctx) = context {
1596 if let Some(obj) = event_value.as_object_mut() {
1597 obj.insert("__update_context".to_string(), ctx.to_value());
1598 }
1599 }
1600
1601 let mut all_mutations = Vec::new();
1602
1603 if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1604 if let Some(ctx) = context {
1605 if let Some(signature) = ctx.signature.clone() {
1606 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1607 for state_id in state_ids {
1608 if let Some(state) = self.states.get(&state_id) {
1609 {
1610 let mut cache = state.recent_tx_instructions.lock().unwrap();
1611 let entry =
1612 cache.get_or_insert_mut(signature.clone(), HashSet::new);
1613 entry.insert(event_type.to_string());
1614 }
1615
1616 let key = (signature.clone(), event_type.to_string());
1617 if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1618 tracing::debug!(
1619 event_type = %event_type,
1620 signature = %signature,
1621 deferred_count = deferred_ops.len(),
1622 "flushing deferred when-ops"
1623 );
1624 for op in deferred_ops {
1625 let (evaluator, computed_paths) = bytecode
1627 .entities
1628 .get(&op.entity_name)
1629 .map(|eb| {
1630 (
1631 eb.computed_fields_evaluator.as_ref(),
1632 eb.computed_paths.as_slice(),
1633 )
1634 })
1635 .unwrap_or((None, &[]));
1636
1637 match self.apply_deferred_when_op(
1638 state_id,
1639 &op,
1640 evaluator,
1641 Some(computed_paths),
1642 ) {
1643 Ok(mutations) => all_mutations.extend(mutations),
1644 Err(e) => tracing::warn!(
1645 "Failed to apply deferred when-op: {}",
1646 e
1647 ),
1648 }
1649 }
1650 }
1651 }
1652 }
1653 }
1654 }
1655 }
1656
1657 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1658 for entity_name in entity_names {
1659 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1660 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1661 if let Some(ref mut log) = log {
1662 log.set("entity", entity_name.clone());
1663 log.inc("handlers", 1);
1664 }
1665
1666 let opcodes_before = self.instructions_executed;
1667 let cache_before = self.cache_hits;
1668 let pda_hits_before = self.pda_cache_hits;
1669 let pda_misses_before = self.pda_cache_misses;
1670
1671 let mutations = self.execute_handler(
1672 handler,
1673 &event_value,
1674 event_type,
1675 entity_bytecode.state_id,
1676 entity_name,
1677 entity_bytecode.computed_fields_evaluator.as_ref(),
1678 Some(&entity_bytecode.non_emitted_fields),
1679 )?;
1680
1681 if let Some(ref mut log) = log {
1682 log.inc(
1683 "opcodes",
1684 (self.instructions_executed - opcodes_before) as i64,
1685 );
1686 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1687 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1688 log.inc(
1689 "pda_misses",
1690 (self.pda_cache_misses - pda_misses_before) as i64,
1691 );
1692 }
1693
1694 if mutations.is_empty() {
1695 let is_tx_event =
1698 event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1699 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1700 if is_tx_event {
1701 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1702 let signature = context
1703 .and_then(|c| c.signature.clone())
1704 .unwrap_or_default();
1705 let _ = self.queue_instruction_event(
1706 entity_bytecode.state_id,
1707 QueuedInstructionEvent {
1708 pda_address: missed_pda,
1709 event_type: event_type.to_string(),
1710 event_data: event_value.clone(),
1711 slot,
1712 signature,
1713 },
1714 );
1715 } else {
1716 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1721 let signature = context
1722 .and_then(|c| c.signature.clone())
1723 .unwrap_or_default();
1724 if let Some(write_version) =
1725 context.and_then(|c| c.write_version)
1726 {
1727 let _ = self.queue_account_update(
1728 entity_bytecode.state_id,
1729 QueuedAccountUpdate {
1730 pda_address: missed_pda,
1731 account_type: event_type.to_string(),
1732 account_data: event_value.clone(),
1733 slot,
1734 write_version,
1735 signature,
1736 },
1737 );
1738 } else {
1739 tracing::warn!(
1740 event_type = %event_type,
1741 "Dropping queued account update: write_version missing from context"
1742 );
1743 }
1744 }
1745 }
1746 if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1747 if !is_tx_event {
1748 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1749 let signature = context
1750 .and_then(|c| c.signature.clone())
1751 .unwrap_or_default();
1752 if let Some(write_version) =
1753 context.and_then(|c| c.write_version)
1754 {
1755 let _ = self.queue_account_update(
1756 entity_bytecode.state_id,
1757 QueuedAccountUpdate {
1758 pda_address: missed_lookup,
1759 account_type: event_type.to_string(),
1760 account_data: event_value.clone(),
1761 slot,
1762 write_version,
1763 signature,
1764 },
1765 );
1766 } else {
1767 tracing::trace!(
1768 event_type = %event_type,
1769 "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1770 );
1771 }
1772 }
1773 }
1774 }
1775
1776 all_mutations.extend(mutations);
1777
1778 if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1779 if let Some(ctx) = context {
1780 if let Some(ref signature) = ctx.signature {
1781 if let Some(state) = self.states.get(&entity_bytecode.state_id)
1782 {
1783 {
1784 let mut cache =
1785 state.recent_tx_instructions.lock().unwrap();
1786 let entry = cache
1787 .get_or_insert_mut(signature.clone(), HashSet::new);
1788 entry.insert(event_type.to_string());
1789 }
1790
1791 let key = (signature.clone(), event_type.to_string());
1792 if let Some((_, deferred_ops)) =
1793 state.deferred_when_ops.remove(&key)
1794 {
1795 tracing::debug!(
1796 event_type = %event_type,
1797 signature = %signature,
1798 deferred_count = deferred_ops.len(),
1799 "flushing deferred when-ops"
1800 );
1801 for op in deferred_ops {
1802 match self.apply_deferred_when_op(
1803 entity_bytecode.state_id,
1804 &op,
1805 entity_bytecode
1806 .computed_fields_evaluator
1807 .as_ref(),
1808 Some(&entity_bytecode.computed_paths),
1809 ) {
1810 Ok(mutations) => {
1811 all_mutations.extend(mutations)
1812 }
1813 Err(e) => {
1814 tracing::warn!(
1815 "Failed to apply deferred when-op: {}",
1816 e
1817 );
1818 }
1819 }
1820 }
1821 }
1822 }
1823 }
1824 }
1825 }
1826
1827 if let Some(registered_pda) = self.take_last_pda_registered() {
1828 let pending_events = self.flush_pending_instruction_events(
1829 entity_bytecode.state_id,
1830 ®istered_pda,
1831 );
1832 for pending in pending_events {
1833 if let Some(pending_handler) =
1834 entity_bytecode.handlers.get(&pending.event_type)
1835 {
1836 if let Ok(reprocessed_mutations) = self.execute_handler(
1837 pending_handler,
1838 &pending.event_data,
1839 &pending.event_type,
1840 entity_bytecode.state_id,
1841 entity_name,
1842 entity_bytecode.computed_fields_evaluator.as_ref(),
1843 Some(&entity_bytecode.non_emitted_fields),
1844 ) {
1845 all_mutations.extend(reprocessed_mutations);
1846 }
1847 }
1848 }
1849 }
1850
1851 let lookup_keys = self.take_last_lookup_index_keys();
1852 if !lookup_keys.is_empty() {
1853 tracing::info!(
1854 keys = ?lookup_keys,
1855 entity = %entity_name,
1856 "vm.process_event: flushing pending updates for lookup_keys"
1857 );
1858 }
1859 for lookup_key in lookup_keys {
1860 if let Ok(pending_updates) =
1861 self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1862 {
1863 for pending in pending_updates {
1864 if let Some(pending_handler) =
1865 entity_bytecode.handlers.get(&pending.account_type)
1866 {
1867 self.current_context = Some(UpdateContext::new_account(
1868 pending.slot,
1869 pending.signature.clone(),
1870 pending.write_version,
1871 ));
1872 match self.execute_handler(
1873 pending_handler,
1874 &pending.account_data,
1875 &pending.account_type,
1876 entity_bytecode.state_id,
1877 entity_name,
1878 entity_bytecode.computed_fields_evaluator.as_ref(),
1879 Some(&entity_bytecode.non_emitted_fields),
1880 ) {
1881 Ok(reprocessed) => {
1882 all_mutations.extend(reprocessed);
1883 }
1884 Err(e) => {
1885 tracing::warn!(
1886 error = %e,
1887 account_type = %pending.account_type,
1888 "Flushed event reprocessing failed"
1889 );
1890 }
1891 }
1892 }
1893 }
1894 }
1895 }
1896 } else if let Some(ref mut log) = log {
1897 log.set("skip_reason", "no_handler");
1898 }
1899 } else if let Some(ref mut log) = log {
1900 log.set("skip_reason", "entity_not_found");
1901 }
1902 }
1903 } else if let Some(ref mut log) = log {
1904 log.set("skip_reason", "no_event_routing");
1905 }
1906
1907 if let Some(log) = log {
1908 log.set("mutations", all_mutations.len() as i64);
1909 if let Some(first) = all_mutations.first() {
1910 if let Some(key_str) = first.key.as_str() {
1911 log.set("primary_key", key_str);
1912 } else if let Some(key_num) = first.key.as_u64() {
1913 log.set("primary_key", key_num as i64);
1914 }
1915 }
1916 if let Some(state) = self.states.get(&0) {
1917 log.set("state_table_size", state.data.len() as i64);
1918 }
1919
1920 let warnings = self.take_warnings();
1921 if !warnings.is_empty() {
1922 log.set("warnings", warnings.len() as i64);
1923 log.set(
1924 "warning_messages",
1925 Value::Array(warnings.into_iter().map(Value::String).collect()),
1926 );
1927 log.set_level(crate::canonical_log::LogLevel::Warn);
1928 }
1929 } else {
1930 self.warnings.clear();
1931 }
1932
1933 if self.instructions_executed.is_multiple_of(1000) {
1934 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1935 for state_id in state_ids {
1936 let expired = self.cleanup_expired_when_ops(state_id, 60);
1937 if expired > 0 {
1938 tracing::debug!(
1939 "Cleaned up {} expired deferred when-ops for state {}",
1940 expired,
1941 state_id
1942 );
1943 }
1944 }
1945 }
1946
1947 Ok(all_mutations)
1948 }
1949
1950 pub fn process_any(
1951 &mut self,
1952 bytecode: &MultiEntityBytecode,
1953 any: prost_types::Any,
1954 ) -> Result<Vec<Mutation>> {
1955 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1956 self.process_event(bytecode, event_value, &event_type, None, None)
1957 }
1958
1959 #[cfg_attr(feature = "otel", instrument(
1960 name = "vm.execute_handler",
1961 skip(self, handler, event_value, entity_evaluator),
1962 level = "debug",
1963 fields(
1964 event_type = %event_type,
1965 handler_opcodes = handler.len(),
1966 )
1967 ))]
1968 #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1969 fn execute_handler(
1970 &mut self,
1971 handler: &[OpCode],
1972 event_value: &Value,
1973 event_type: &str,
1974 override_state_id: u32,
1975 entity_name: &str,
1976 entity_evaluator: Option<
1977 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1978 >,
1979 non_emitted_fields: Option<&HashSet<String>>,
1980 ) -> Result<Vec<Mutation>> {
1981 self.reset_registers();
1982 self.last_pda_lookup_miss = None;
1983
1984 let mut pc: usize = 0;
1985 let mut output = Vec::new();
1986 let mut dirty_tracker = DirtyTracker::new();
1987 let should_emit = |path: &str| {
1988 non_emitted_fields
1989 .map(|fields| !fields.contains(path))
1990 .unwrap_or(true)
1991 };
1992
1993 while pc < handler.len() {
1994 match &handler[pc] {
1995 OpCode::LoadEventField {
1996 path,
1997 dest,
1998 default,
1999 } => {
2000 let value = self.load_field(event_value, path, default.as_ref())?;
2001 self.registers[*dest] = value;
2002 pc += 1;
2003 }
2004 OpCode::LoadConstant { value, dest } => {
2005 self.registers[*dest] = value.clone();
2006 pc += 1;
2007 }
2008 OpCode::CopyRegister { source, dest } => {
2009 self.registers[*dest] = self.registers[*source].clone();
2010 pc += 1;
2011 }
2012 OpCode::CopyRegisterIfNull { source, dest } => {
2013 if self.registers[*dest].is_null() {
2014 self.registers[*dest] = self.registers[*source].clone();
2015 }
2016 pc += 1;
2017 }
2018 OpCode::GetEventType { dest } => {
2019 self.registers[*dest] = json!(event_type);
2020 pc += 1;
2021 }
2022 OpCode::CreateObject { dest } => {
2023 self.registers[*dest] = json!({});
2024 pc += 1;
2025 }
2026 OpCode::SetField {
2027 object,
2028 path,
2029 value,
2030 } => {
2031 self.set_field_auto_vivify(*object, path, *value)?;
2032 if should_emit(path) {
2033 dirty_tracker.mark_replaced(path);
2034 }
2035 pc += 1;
2036 }
2037 OpCode::SetFields { object, fields } => {
2038 for (path, value_reg) in fields {
2039 self.set_field_auto_vivify(*object, path, *value_reg)?;
2040 if should_emit(path) {
2041 dirty_tracker.mark_replaced(path);
2042 }
2043 }
2044 pc += 1;
2045 }
2046 OpCode::GetField { object, path, dest } => {
2047 let value = self.get_field(*object, path)?;
2048 self.registers[*dest] = value;
2049 pc += 1;
2050 }
2051 OpCode::AbortIfNullKey {
2052 key,
2053 is_account_event,
2054 } => {
2055 let key_value = &self.registers[*key];
2056 if key_value.is_null() && *is_account_event {
2057 tracing::debug!(
2058 event_type = %event_type,
2059 "AbortIfNullKey: key is null for account state event, \
2060 returning empty mutations for queueing"
2061 );
2062 return Ok(Vec::new());
2063 }
2064
2065 pc += 1;
2066 }
2067 OpCode::ReadOrInitState {
2068 state_id: _,
2069 key,
2070 default,
2071 dest,
2072 } => {
2073 let actual_state_id = override_state_id;
2074 let entity_name_owned = entity_name.to_string();
2075 self.states
2076 .entry(actual_state_id)
2077 .or_insert_with(|| StateTable {
2078 data: DashMap::new(),
2079 access_times: DashMap::new(),
2080 lookup_indexes: HashMap::new(),
2081 temporal_indexes: HashMap::new(),
2082 pda_reverse_lookups: HashMap::new(),
2083 pending_updates: DashMap::new(),
2084 pending_instruction_events: DashMap::new(),
2085 last_account_data: DashMap::new(),
2086 version_tracker: VersionTracker::new(),
2087 instruction_dedup_cache: VersionTracker::with_capacity(
2088 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
2089 ),
2090 config: StateTableConfig::default(),
2091 entity_name: entity_name_owned,
2092 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
2093 NonZeroUsize::new(1000).unwrap(),
2094 )),
2095 deferred_when_ops: DashMap::new(),
2096 });
2097 let key_value = self.registers[*key].clone();
2098 let warn_null_key = key_value.is_null()
2100 && event_type.ends_with("State")
2101 && !event_type.ends_with("IxState")
2102 && !event_type.ends_with("CpiEvent");
2103
2104 if warn_null_key {
2105 self.add_warning(format!(
2106 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
2107 key, event_type
2108 ));
2109 }
2110
2111 let state = self
2112 .states
2113 .get(&actual_state_id)
2114 .ok_or("State table not found")?;
2115
2116 if !key_value.is_null() {
2117 if let Some(ctx) = &self.current_context {
2118 if ctx.is_account_update() {
2123 if let (Some(slot), Some(write_version)) =
2124 (ctx.slot, ctx.write_version)
2125 {
2126 let account_address = event_value
2128 .get("__account_address")
2129 .cloned()
2130 .unwrap_or_else(|| key_value.clone());
2131
2132 if !state.is_fresh_update(
2133 &account_address,
2134 event_type,
2135 slot,
2136 write_version,
2137 ) {
2138 self.add_warning(format!(
2139 "Stale account update skipped: slot={}, write_version={}, account={}",
2140 slot, write_version, account_address
2141 ));
2142 return Ok(Vec::new());
2143 }
2144 }
2145 }
2146 else if ctx.is_instruction_update() {
2148 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
2149 if state.is_duplicate_instruction(
2150 &key_value, event_type, slot, txn_index,
2151 ) {
2152 self.add_warning(format!(
2153 "Duplicate instruction skipped: slot={}, txn_index={}",
2154 slot, txn_index
2155 ));
2156 return Ok(Vec::new());
2157 }
2158 }
2159 }
2160 }
2161 }
2162 let value = state
2163 .get_and_touch(&key_value)
2164 .unwrap_or_else(|| default.clone());
2165
2166 self.registers[*dest] = value;
2167 pc += 1;
2168 }
2169 OpCode::UpdateState {
2170 state_id: _,
2171 key,
2172 value,
2173 } => {
2174 let actual_state_id = override_state_id;
2175 let state = self
2176 .states
2177 .get(&actual_state_id)
2178 .ok_or("State table not found")?;
2179 let key_value = self.registers[*key].clone();
2180 let value_data = self.registers[*value].clone();
2181
2182 state.insert_with_eviction(key_value, value_data);
2183 pc += 1;
2184 }
2185 OpCode::AppendToArray {
2186 object,
2187 path,
2188 value,
2189 } => {
2190 let appended_value = self.registers[*value].clone();
2191 let max_len = self
2192 .states
2193 .get(&override_state_id)
2194 .map(|s| s.max_array_length())
2195 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2196 self.append_to_array(*object, path, *value, max_len)?;
2197 if should_emit(path) {
2198 dirty_tracker.mark_appended(path, appended_value);
2199 }
2200 pc += 1;
2201 }
2202 OpCode::GetCurrentTimestamp { dest } => {
2203 let timestamp = std::time::SystemTime::now()
2204 .duration_since(std::time::UNIX_EPOCH)
2205 .unwrap()
2206 .as_secs() as i64;
2207 self.registers[*dest] = json!(timestamp);
2208 pc += 1;
2209 }
2210 OpCode::CreateEvent { dest, event_value } => {
2211 let timestamp = std::time::SystemTime::now()
2212 .duration_since(std::time::UNIX_EPOCH)
2213 .unwrap()
2214 .as_secs() as i64;
2215
2216 let mut event_data = self.registers[*event_value].clone();
2218 if let Some(obj) = event_data.as_object_mut() {
2219 obj.remove("__update_context");
2220 }
2221
2222 let mut event = serde_json::Map::new();
2224 event.insert("timestamp".to_string(), json!(timestamp));
2225 event.insert("data".to_string(), event_data);
2226
2227 if let Some(ref ctx) = self.current_context {
2229 if let Some(slot) = ctx.slot {
2230 event.insert("slot".to_string(), json!(slot));
2231 }
2232 if let Some(ref signature) = ctx.signature {
2233 event.insert("signature".to_string(), json!(signature));
2234 }
2235 }
2236
2237 self.registers[*dest] = Value::Object(event);
2238 pc += 1;
2239 }
2240 OpCode::CreateCapture {
2241 dest,
2242 capture_value,
2243 } => {
2244 let timestamp = std::time::SystemTime::now()
2245 .duration_since(std::time::UNIX_EPOCH)
2246 .unwrap()
2247 .as_secs() as i64;
2248
2249 let capture_data = self.registers[*capture_value].clone();
2251
2252 let account_address = event_value
2254 .get("__account_address")
2255 .and_then(|v| v.as_str())
2256 .unwrap_or("")
2257 .to_string();
2258
2259 let mut capture = serde_json::Map::new();
2261 capture.insert("timestamp".to_string(), json!(timestamp));
2262 capture.insert("account_address".to_string(), json!(account_address));
2263 capture.insert("data".to_string(), capture_data);
2264
2265 if let Some(ref ctx) = self.current_context {
2267 if let Some(slot) = ctx.slot {
2268 capture.insert("slot".to_string(), json!(slot));
2269 }
2270 if let Some(ref signature) = ctx.signature {
2271 capture.insert("signature".to_string(), json!(signature));
2272 }
2273 }
2274
2275 self.registers[*dest] = Value::Object(capture);
2276 pc += 1;
2277 }
2278 OpCode::Transform {
2279 source,
2280 dest,
2281 transformation,
2282 } => {
2283 if source == dest {
2284 self.transform_in_place(*source, transformation)?;
2285 } else {
2286 let source_value = &self.registers[*source];
2287 let value = Self::apply_transformation(source_value, transformation)?;
2288 self.registers[*dest] = value;
2289 }
2290 pc += 1;
2291 }
2292 OpCode::EmitMutation {
2293 entity_name,
2294 key,
2295 state,
2296 } => {
2297 let primary_key = self.registers[*key].clone();
2298
2299 if primary_key.is_null() || dirty_tracker.is_empty() {
2300 let reason = if dirty_tracker.is_empty() {
2301 "no_fields_modified"
2302 } else {
2303 "null_primary_key"
2304 };
2305 self.add_warning(format!(
2306 "Skipping mutation for entity '{}': {} (dirty_fields={})",
2307 entity_name,
2308 reason,
2309 dirty_tracker.len()
2310 ));
2311 } else {
2312 let patch =
2313 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2314
2315 let append = dirty_tracker.appended_paths();
2316 let mutation = Mutation {
2317 export: entity_name.clone(),
2318 key: primary_key,
2319 patch,
2320 append,
2321 };
2322 output.push(mutation);
2323 }
2324 pc += 1;
2325 }
2326 OpCode::SetFieldIfNull {
2327 object,
2328 path,
2329 value,
2330 } => {
2331 let was_set = self.set_field_if_null(*object, path, *value)?;
2332 if was_set && should_emit(path) {
2333 dirty_tracker.mark_replaced(path);
2334 }
2335 pc += 1;
2336 }
2337 OpCode::SetFieldMax {
2338 object,
2339 path,
2340 value,
2341 } => {
2342 let was_updated = self.set_field_max(*object, path, *value)?;
2343 if was_updated && should_emit(path) {
2344 dirty_tracker.mark_replaced(path);
2345 }
2346 pc += 1;
2347 }
2348 OpCode::UpdateTemporalIndex {
2349 state_id: _,
2350 index_name,
2351 lookup_value,
2352 primary_key,
2353 timestamp,
2354 } => {
2355 let actual_state_id = override_state_id;
2356 let state = self
2357 .states
2358 .get_mut(&actual_state_id)
2359 .ok_or("State table not found")?;
2360 let index = state
2361 .temporal_indexes
2362 .entry(index_name.clone())
2363 .or_insert_with(TemporalIndex::new);
2364
2365 let lookup_val = self.registers[*lookup_value].clone();
2366 let pk_val = self.registers[*primary_key].clone();
2367 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2368 val
2369 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2370 val as i64
2371 } else {
2372 return Err(format!(
2373 "Timestamp must be a number (i64 or u64), got: {:?}",
2374 self.registers[*timestamp]
2375 )
2376 .into());
2377 };
2378
2379 index.insert(lookup_val, pk_val, ts_val);
2380 pc += 1;
2381 }
2382 OpCode::LookupTemporalIndex {
2383 state_id: _,
2384 index_name,
2385 lookup_value,
2386 timestamp,
2387 dest,
2388 } => {
2389 let actual_state_id = override_state_id;
2390 let state = self
2391 .states
2392 .get(&actual_state_id)
2393 .ok_or("State table not found")?;
2394 let lookup_val = &self.registers[*lookup_value];
2395
2396 let result = if self.registers[*timestamp].is_null() {
2397 if let Some(index) = state.temporal_indexes.get(index_name) {
2398 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2399 } else {
2400 Value::Null
2401 }
2402 } else {
2403 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2404 val
2405 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2406 val as i64
2407 } else {
2408 return Err(format!(
2409 "Timestamp must be a number (i64 or u64), got: {:?}",
2410 self.registers[*timestamp]
2411 )
2412 .into());
2413 };
2414
2415 if let Some(index) = state.temporal_indexes.get(index_name) {
2416 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2417 } else {
2418 Value::Null
2419 }
2420 };
2421
2422 self.registers[*dest] = result;
2423 pc += 1;
2424 }
2425 OpCode::UpdateLookupIndex {
2426 state_id: _,
2427 index_name,
2428 lookup_value,
2429 primary_key,
2430 } => {
2431 let actual_state_id = override_state_id;
2432 let state = self
2433 .states
2434 .get_mut(&actual_state_id)
2435 .ok_or("State table not found")?;
2436 let index = state
2437 .lookup_indexes
2438 .entry(index_name.clone())
2439 .or_insert_with(LookupIndex::new);
2440
2441 let lookup_val = self.registers[*lookup_value].clone();
2442 let pk_val = self.registers[*primary_key].clone();
2443
2444 index.insert(lookup_val.clone(), pk_val);
2445
2446 if let Some(key_str) = lookup_val.as_str() {
2448 self.last_lookup_index_keys.push(key_str.to_string());
2449 }
2450
2451 pc += 1;
2452 }
2453 OpCode::LookupIndex {
2454 state_id: _,
2455 index_name,
2456 lookup_value,
2457 dest,
2458 } => {
2459 let actual_state_id = override_state_id;
2460 let mut current_value = self.registers[*lookup_value].clone();
2461
2462 const MAX_CHAIN_DEPTH: usize = 5;
2463 let mut iterations = 0;
2464
2465 let final_result = if self.states.contains_key(&actual_state_id) {
2466 loop {
2467 iterations += 1;
2468 if iterations > MAX_CHAIN_DEPTH {
2469 break current_value;
2470 }
2471
2472 let resolved = self
2473 .states
2474 .get(&actual_state_id)
2475 .and_then(|state| {
2476 if let Some(index) = state.lookup_indexes.get(index_name) {
2477 if let Some(found) = index.lookup(¤t_value) {
2478 return Some(found);
2479 }
2480 }
2481
2482 for (name, index) in state.lookup_indexes.iter() {
2483 if name == index_name {
2484 continue;
2485 }
2486 if let Some(found) = index.lookup(¤t_value) {
2487 return Some(found);
2488 }
2489 }
2490
2491 None
2492 })
2493 .unwrap_or(Value::Null);
2494
2495 let mut resolved_from_pda = false;
2496 let resolved = if resolved.is_null() {
2497 if let Some(pda_str) = current_value.as_str() {
2498 resolved_from_pda = true;
2499 self.states
2500 .get_mut(&actual_state_id)
2501 .and_then(|state_mut| {
2502 state_mut
2503 .pda_reverse_lookups
2504 .get_mut("default_pda_lookup")
2505 })
2506 .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2507 .map(Value::String)
2508 .unwrap_or(Value::Null)
2509 } else {
2510 Value::Null
2511 }
2512 } else {
2513 resolved
2514 };
2515
2516 if resolved.is_null() {
2517 if iterations == 1 {
2518 if let Some(pda_str) = current_value.as_str() {
2519 self.last_pda_lookup_miss = Some(pda_str.to_string());
2520 }
2521 }
2522 break Value::Null;
2523 }
2524
2525 let can_chain =
2526 self.can_resolve_further(&resolved, actual_state_id, index_name);
2527
2528 if !can_chain {
2529 if resolved_from_pda {
2530 if let Some(resolved_str) = resolved.as_str() {
2531 self.last_lookup_index_miss =
2532 Some(resolved_str.to_string());
2533 }
2534 break Value::Null;
2535 }
2536 break resolved;
2537 }
2538
2539 current_value = resolved;
2540 }
2541 } else {
2542 Value::Null
2543 };
2544
2545 self.registers[*dest] = final_result;
2546 pc += 1;
2547 }
2548 OpCode::SetFieldSum {
2549 object,
2550 path,
2551 value,
2552 } => {
2553 let was_updated = self.set_field_sum(*object, path, *value)?;
2554 if was_updated && should_emit(path) {
2555 dirty_tracker.mark_replaced(path);
2556 }
2557 pc += 1;
2558 }
2559 OpCode::SetFieldIncrement { object, path } => {
2560 let was_updated = self.set_field_increment(*object, path)?;
2561 if was_updated && should_emit(path) {
2562 dirty_tracker.mark_replaced(path);
2563 }
2564 pc += 1;
2565 }
2566 OpCode::SetFieldMin {
2567 object,
2568 path,
2569 value,
2570 } => {
2571 let was_updated = self.set_field_min(*object, path, *value)?;
2572 if was_updated && should_emit(path) {
2573 dirty_tracker.mark_replaced(path);
2574 }
2575 pc += 1;
2576 }
2577 OpCode::AddToUniqueSet {
2578 state_id: _,
2579 set_name,
2580 value,
2581 count_object,
2582 count_path,
2583 } => {
2584 let value_to_add = self.registers[*value].clone();
2585
2586 let set_field_path = format!("__unique_set:{}", set_name);
2589
2590 let mut set: HashSet<Value> =
2592 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2593 if !existing.is_null() {
2594 serde_json::from_value(existing).unwrap_or_default()
2595 } else {
2596 HashSet::new()
2597 }
2598 } else {
2599 HashSet::new()
2600 };
2601
2602 let was_new = set.insert(value_to_add);
2604
2605 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2607 self.registers[100] = serde_json::to_value(set_as_vec)?;
2608 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2609
2610 if was_new {
2612 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2613 self.set_field_auto_vivify(*count_object, count_path, 100)?;
2614 if should_emit(count_path) {
2615 dirty_tracker.mark_replaced(count_path);
2616 }
2617 }
2618
2619 pc += 1;
2620 }
2621 OpCode::ConditionalSetField {
2622 object,
2623 path,
2624 value,
2625 condition_field,
2626 condition_op,
2627 condition_value,
2628 } => {
2629 let field_value = self.load_field(event_value, condition_field, None)?;
2630 let condition_met =
2631 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2632
2633 if condition_met {
2634 self.set_field_auto_vivify(*object, path, *value)?;
2635 if should_emit(path) {
2636 dirty_tracker.mark_replaced(path);
2637 }
2638 }
2639 pc += 1;
2640 }
2641 OpCode::SetFieldWhen {
2642 object,
2643 path,
2644 value,
2645 when_instruction,
2646 entity_name,
2647 key_reg,
2648 condition_field,
2649 condition_op,
2650 condition_value,
2651 } => {
2652 let actual_state_id = override_state_id;
2653 let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2654 condition_field.as_ref(),
2655 condition_op.as_ref(),
2656 condition_value.as_ref(),
2657 ) {
2658 let field_value = self.load_field(event_value, field, None)?;
2659 self.evaluate_comparison(&field_value, op, cond_value)?
2660 } else {
2661 true
2662 };
2663
2664 if !condition_met {
2665 pc += 1;
2666 continue;
2667 }
2668
2669 let signature = self
2670 .current_context
2671 .as_ref()
2672 .and_then(|c| c.signature.clone())
2673 .unwrap_or_default();
2674
2675 let emit = should_emit(path);
2676
2677 let instruction_seen = if !signature.is_empty() {
2678 if let Some(state) = self.states.get(&actual_state_id) {
2679 let mut cache = state.recent_tx_instructions.lock().unwrap();
2680 cache
2681 .get(&signature)
2682 .map(|set| set.contains(when_instruction))
2683 .unwrap_or(false)
2684 } else {
2685 false
2686 }
2687 } else {
2688 false
2689 };
2690
2691 if instruction_seen {
2692 self.set_field_auto_vivify(*object, path, *value)?;
2693 if emit {
2694 dirty_tracker.mark_replaced(path);
2695 }
2696 } else if !signature.is_empty() {
2697 let deferred = DeferredWhenOperation {
2698 entity_name: entity_name.clone(),
2699 primary_key: self.registers[*key_reg].clone(),
2700 field_path: path.clone(),
2701 field_value: self.registers[*value].clone(),
2702 when_instruction: when_instruction.clone(),
2703 signature: signature.clone(),
2704 slot: self
2705 .current_context
2706 .as_ref()
2707 .and_then(|c| c.slot)
2708 .unwrap_or(0),
2709 deferred_at: std::time::SystemTime::now()
2710 .duration_since(std::time::UNIX_EPOCH)
2711 .unwrap()
2712 .as_secs() as i64,
2713 emit,
2714 };
2715
2716 if let Some(state) = self.states.get(&actual_state_id) {
2717 let key = (signature, when_instruction.clone());
2718 state
2719 .deferred_when_ops
2720 .entry(key)
2721 .or_insert_with(Vec::new)
2722 .push(deferred);
2723 }
2724 }
2725
2726 pc += 1;
2727 }
2728 OpCode::SetFieldUnlessStopped {
2729 object,
2730 path,
2731 value,
2732 stop_field,
2733 stop_instruction,
2734 entity_name,
2735 key_reg: _,
2736 } => {
2737 let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2738 let stopped = stop_value.as_bool().unwrap_or(false);
2739
2740 if stopped {
2741 tracing::debug!(
2742 entity = %entity_name,
2743 field = %path,
2744 stop_field = %stop_field,
2745 stop_instruction = %stop_instruction,
2746 "stop flag set; skipping field update"
2747 );
2748 pc += 1;
2749 continue;
2750 }
2751
2752 self.set_field_auto_vivify(*object, path, *value)?;
2753 if should_emit(path) {
2754 dirty_tracker.mark_replaced(path);
2755 }
2756 pc += 1;
2757 }
2758 OpCode::ConditionalIncrement {
2759 object,
2760 path,
2761 condition_field,
2762 condition_op,
2763 condition_value,
2764 } => {
2765 let field_value = self.load_field(event_value, condition_field, None)?;
2766 let condition_met =
2767 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2768
2769 if condition_met {
2770 let was_updated = self.set_field_increment(*object, path)?;
2771 if was_updated && should_emit(path) {
2772 dirty_tracker.mark_replaced(path);
2773 }
2774 }
2775 pc += 1;
2776 }
2777 OpCode::EvaluateComputedFields {
2778 state,
2779 computed_paths,
2780 } => {
2781 if let Some(evaluator) = entity_evaluator {
2782 let old_values: Vec<_> = computed_paths
2783 .iter()
2784 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2785 .collect();
2786
2787 let state_value = &mut self.registers[*state];
2788 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2789 let context_timestamp = self
2790 .current_context
2791 .as_ref()
2792 .map(|c| c.timestamp())
2793 .unwrap_or_else(|| {
2794 std::time::SystemTime::now()
2795 .duration_since(std::time::UNIX_EPOCH)
2796 .unwrap()
2797 .as_secs() as i64
2798 });
2799 let eval_result = evaluator(state_value, context_slot, context_timestamp);
2800
2801 if eval_result.is_ok() {
2802 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2803 let new_value =
2804 Self::get_value_at_path(&self.registers[*state], path);
2805
2806 if new_value != *old_value && should_emit(path) {
2807 dirty_tracker.mark_replaced(path);
2808 }
2809 }
2810 }
2811 }
2812 pc += 1;
2813 }
2814 OpCode::QueueResolver {
2815 state_id: _,
2816 entity_name,
2817 resolver,
2818 input_path,
2819 input_value,
2820 url_template,
2821 strategy,
2822 extracts,
2823 condition,
2824 schedule_at,
2825 state,
2826 key,
2827 } => {
2828 let actual_state_id = override_state_id;
2829
2830 if self
2834 .current_context
2835 .as_ref()
2836 .map(|c| c.skip_resolvers)
2837 .unwrap_or(false)
2838 {
2839 pc += 1;
2840 continue;
2841 }
2842
2843 if let Some(cond) = condition {
2845 let field_val =
2846 Self::get_value_at_path(&self.registers[*state], &cond.field_path)
2847 .unwrap_or(Value::Null);
2848 let condition_met =
2849 self.evaluate_comparison(&field_val, &cond.op, &cond.value)?;
2850
2851 if !condition_met {
2852 pc += 1;
2853 continue;
2854 }
2855 }
2856
2857 if let Some(schedule_path) = schedule_at {
2859 let target_val =
2860 Self::get_value_at_path(&self.registers[*state], schedule_path);
2861
2862 match target_val.and_then(|v| v.as_u64()) {
2863 Some(target_slot) => {
2864 let current_slot = self
2865 .current_context
2866 .as_ref()
2867 .and_then(|ctx| ctx.slot)
2868 .unwrap_or(0);
2869 if current_slot < target_slot {
2870 let key_value = &self.registers[*key];
2871 if !key_value.is_null() {
2872 self.scheduled_callbacks.push((
2873 target_slot,
2874 ScheduledCallback {
2875 state_id: actual_state_id,
2876 entity_name: entity_name.clone(),
2877 primary_key: key_value.clone(),
2878 resolver: resolver.clone(),
2879 url_template: url_template.clone(),
2880 input_value: input_value.clone(),
2881 input_path: input_path.clone(),
2882 condition: condition.clone(),
2883 strategy: strategy.clone(),
2884 extracts: extracts.clone(),
2885 retry_count: 0,
2886 },
2887 ));
2888 }
2889 pc += 1;
2890 continue;
2891 }
2892 }
2894 None => {
2895 pc += 1;
2899 continue;
2900 }
2901 }
2902 }
2903
2904 let resolved_input = if let Some(template) = url_template {
2906 crate::scheduler::build_url_from_template(template, &self.registers[*state])
2907 .map(Value::String)
2908 } else if let Some(value) = input_value {
2909 Some(value.clone())
2910 } else if let Some(path) = input_path.as_ref() {
2911 Self::get_value_at_path(&self.registers[*state], path)
2912 } else {
2913 None
2914 };
2915
2916 if let Some(input) = resolved_input {
2917 let key_value = &self.registers[*key];
2918
2919 if input.is_null() || key_value.is_null() {
2920 pc += 1;
2921 continue;
2922 }
2923
2924 if matches!(strategy, ResolveStrategy::SetOnce)
2925 && extracts.iter().all(|extract| {
2929 match Self::get_value_at_path(
2930 &self.registers[*state],
2931 &extract.target_path,
2932 ) {
2933 Some(value) => !value.is_null(),
2934 None => false,
2935 }
2936 })
2937 {
2938 pc += 1;
2939 continue;
2940 }
2941
2942 let cache_key = resolver_cache_key(resolver, &input);
2943
2944 if let Some(cached) = self.get_cached_resolver_value(&cache_key) {
2945 Self::apply_resolver_extractions_to_value(
2946 &mut self.registers[*state],
2947 &cached,
2948 extracts,
2949 &mut dirty_tracker,
2950 &should_emit,
2951 )?;
2952 } else {
2953 let target = ResolverTarget {
2954 state_id: actual_state_id,
2955 entity_name: entity_name.clone(),
2956 primary_key: self.registers[*key].clone(),
2957 extracts: extracts.clone(),
2958 };
2959
2960 self.enqueue_resolver_request(
2961 cache_key,
2962 resolver.clone(),
2963 input,
2964 target,
2965 );
2966 }
2967 }
2968
2969 pc += 1;
2970 }
2971 OpCode::UpdatePdaReverseLookup {
2972 state_id: _,
2973 lookup_name,
2974 pda_address,
2975 primary_key,
2976 } => {
2977 let actual_state_id = override_state_id;
2978 let state = self
2979 .states
2980 .get_mut(&actual_state_id)
2981 .ok_or("State table not found")?;
2982
2983 let pda_val = self.registers[*pda_address].clone();
2984 let pk_val = self.registers[*primary_key].clone();
2985
2986 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2987 let pda_lookup = state
2988 .pda_reverse_lookups
2989 .entry(lookup_name.clone())
2990 .or_insert_with(|| {
2991 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2992 });
2993
2994 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2995 self.last_pda_registered = Some(pda_str.to_string());
2996 } else if !pk_val.is_null() {
2997 if let Some(pk_num) = pk_val.as_u64() {
2998 if let Some(pda_str) = pda_val.as_str() {
2999 let pda_lookup = state
3000 .pda_reverse_lookups
3001 .entry(lookup_name.clone())
3002 .or_insert_with(|| {
3003 PdaReverseLookup::new(
3004 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
3005 )
3006 });
3007
3008 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
3009 self.last_pda_registered = Some(pda_str.to_string());
3010 }
3011 }
3012 }
3013
3014 pc += 1;
3015 }
3016 }
3017
3018 self.instructions_executed += 1;
3019 }
3020
3021 Ok(output)
3022 }
3023
3024 fn load_field(
3025 &self,
3026 event_value: &Value,
3027 path: &FieldPath,
3028 default: Option<&Value>,
3029 ) -> Result<Value> {
3030 if path.segments.is_empty() {
3031 if let Some(obj) = event_value.as_object() {
3032 let filtered: serde_json::Map<String, Value> = obj
3033 .iter()
3034 .filter(|(k, _)| !k.starts_with("__"))
3035 .map(|(k, v)| (k.clone(), v.clone()))
3036 .collect();
3037 return Ok(Value::Object(filtered));
3038 }
3039 return Ok(event_value.clone());
3040 }
3041
3042 let mut current = event_value;
3043 for segment in path.segments.iter() {
3044 current = match current.get(segment) {
3045 Some(v) => v,
3046 None => {
3047 tracing::trace!(
3048 "load_field: segment={:?} not found in {:?}, returning default",
3049 segment,
3050 current
3051 );
3052 return Ok(default.cloned().unwrap_or(Value::Null));
3053 }
3054 };
3055 }
3056
3057 tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
3058 Ok(current.clone())
3059 }
3060
3061 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
3062 let mut current = value;
3063 for segment in path.split('.') {
3064 current = current.get(segment)?;
3065 }
3066 Some(current.clone())
3067 }
3068
3069 fn set_field_auto_vivify(
3070 &mut self,
3071 object_reg: Register,
3072 path: &str,
3073 value_reg: Register,
3074 ) -> Result<()> {
3075 let compiled = self.get_compiled_path(path);
3076 let segments = compiled.segments();
3077 let value = self.registers[value_reg].clone();
3078
3079 if !self.registers[object_reg].is_object() {
3080 self.registers[object_reg] = json!({});
3081 }
3082
3083 let obj = self.registers[object_reg]
3084 .as_object_mut()
3085 .ok_or("Not an object")?;
3086
3087 let mut current = obj;
3088 for (i, segment) in segments.iter().enumerate() {
3089 if i == segments.len() - 1 {
3090 current.insert(segment.to_string(), value);
3091 return Ok(());
3092 } else {
3093 current
3094 .entry(segment.to_string())
3095 .or_insert_with(|| json!({}));
3096 current = current
3097 .get_mut(segment)
3098 .and_then(|v| v.as_object_mut())
3099 .ok_or("Path collision: expected object")?;
3100 }
3101 }
3102
3103 Ok(())
3104 }
3105
3106 fn set_field_if_null(
3107 &mut self,
3108 object_reg: Register,
3109 path: &str,
3110 value_reg: Register,
3111 ) -> Result<bool> {
3112 let compiled = self.get_compiled_path(path);
3113 let segments = compiled.segments();
3114 let value = self.registers[value_reg].clone();
3115
3116 if value.is_null() {
3120 return Ok(false);
3121 }
3122
3123 if !self.registers[object_reg].is_object() {
3124 self.registers[object_reg] = json!({});
3125 }
3126
3127 let obj = self.registers[object_reg]
3128 .as_object_mut()
3129 .ok_or("Not an object")?;
3130
3131 let mut current = obj;
3132 for (i, segment) in segments.iter().enumerate() {
3133 if i == segments.len() - 1 {
3134 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
3135 current.insert(segment.to_string(), value);
3136 return Ok(true);
3137 }
3138 return Ok(false);
3139 } else {
3140 current
3141 .entry(segment.to_string())
3142 .or_insert_with(|| json!({}));
3143 current = current
3144 .get_mut(segment)
3145 .and_then(|v| v.as_object_mut())
3146 .ok_or("Path collision: expected object")?;
3147 }
3148 }
3149
3150 Ok(false)
3151 }
3152
3153 fn set_field_max(
3154 &mut self,
3155 object_reg: Register,
3156 path: &str,
3157 value_reg: Register,
3158 ) -> Result<bool> {
3159 let compiled = self.get_compiled_path(path);
3160 let segments = compiled.segments();
3161 let new_value = self.registers[value_reg].clone();
3162
3163 if !self.registers[object_reg].is_object() {
3164 self.registers[object_reg] = json!({});
3165 }
3166
3167 let obj = self.registers[object_reg]
3168 .as_object_mut()
3169 .ok_or("Not an object")?;
3170
3171 let mut current = obj;
3172 for (i, segment) in segments.iter().enumerate() {
3173 if i == segments.len() - 1 {
3174 let should_update = if let Some(current_value) = current.get(segment) {
3175 if current_value.is_null() {
3176 true
3177 } else {
3178 match (current_value.as_i64(), new_value.as_i64()) {
3179 (Some(current_val), Some(new_val)) => new_val > current_val,
3180 (Some(current_val), None) if new_value.as_u64().is_some() => {
3181 new_value.as_u64().unwrap() as i64 > current_val
3182 }
3183 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3184 new_val > current_value.as_u64().unwrap() as i64
3185 }
3186 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3187 (Some(current_val), Some(new_val)) => new_val > current_val,
3188 _ => match (current_value.as_f64(), new_value.as_f64()) {
3189 (Some(current_val), Some(new_val)) => new_val > current_val,
3190 _ => false,
3191 },
3192 },
3193 _ => false,
3194 }
3195 }
3196 } else {
3197 true
3198 };
3199
3200 if should_update {
3201 current.insert(segment.to_string(), new_value);
3202 return Ok(true);
3203 }
3204 return Ok(false);
3205 } else {
3206 current
3207 .entry(segment.to_string())
3208 .or_insert_with(|| json!({}));
3209 current = current
3210 .get_mut(segment)
3211 .and_then(|v| v.as_object_mut())
3212 .ok_or("Path collision: expected object")?;
3213 }
3214 }
3215
3216 Ok(false)
3217 }
3218
3219 fn set_field_sum(
3220 &mut self,
3221 object_reg: Register,
3222 path: &str,
3223 value_reg: Register,
3224 ) -> Result<bool> {
3225 let compiled = self.get_compiled_path(path);
3226 let segments = compiled.segments();
3227 let new_value = &self.registers[value_reg];
3228
3229 tracing::trace!(
3231 "set_field_sum: path={:?}, value={:?}, value_type={}",
3232 path,
3233 new_value,
3234 match new_value {
3235 serde_json::Value::Null => "null",
3236 serde_json::Value::Bool(_) => "bool",
3237 serde_json::Value::Number(_) => "number",
3238 serde_json::Value::String(_) => "string",
3239 serde_json::Value::Array(_) => "array",
3240 serde_json::Value::Object(_) => "object",
3241 }
3242 );
3243 let new_val_num = new_value
3244 .as_i64()
3245 .or_else(|| new_value.as_u64().map(|n| n as i64))
3246 .ok_or("Sum requires numeric value")?;
3247
3248 if !self.registers[object_reg].is_object() {
3249 self.registers[object_reg] = json!({});
3250 }
3251
3252 let obj = self.registers[object_reg]
3253 .as_object_mut()
3254 .ok_or("Not an object")?;
3255
3256 let mut current = obj;
3257 for (i, segment) in segments.iter().enumerate() {
3258 if i == segments.len() - 1 {
3259 let current_val = current
3260 .get(segment)
3261 .and_then(|v| {
3262 if v.is_null() {
3263 None
3264 } else {
3265 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3266 }
3267 })
3268 .unwrap_or(0);
3269
3270 let sum = current_val + new_val_num;
3271 current.insert(segment.to_string(), json!(sum));
3272 return Ok(true);
3273 } else {
3274 current
3275 .entry(segment.to_string())
3276 .or_insert_with(|| json!({}));
3277 current = current
3278 .get_mut(segment)
3279 .and_then(|v| v.as_object_mut())
3280 .ok_or("Path collision: expected object")?;
3281 }
3282 }
3283
3284 Ok(false)
3285 }
3286
3287 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3288 let compiled = self.get_compiled_path(path);
3289 let segments = compiled.segments();
3290
3291 if !self.registers[object_reg].is_object() {
3292 self.registers[object_reg] = json!({});
3293 }
3294
3295 let obj = self.registers[object_reg]
3296 .as_object_mut()
3297 .ok_or("Not an object")?;
3298
3299 let mut current = obj;
3300 for (i, segment) in segments.iter().enumerate() {
3301 if i == segments.len() - 1 {
3302 let current_val = current
3304 .get(segment)
3305 .and_then(|v| {
3306 if v.is_null() {
3307 None
3308 } else {
3309 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3310 }
3311 })
3312 .unwrap_or(0);
3313
3314 let incremented = current_val + 1;
3315 current.insert(segment.to_string(), json!(incremented));
3316 return Ok(true);
3317 } else {
3318 current
3319 .entry(segment.to_string())
3320 .or_insert_with(|| json!({}));
3321 current = current
3322 .get_mut(segment)
3323 .and_then(|v| v.as_object_mut())
3324 .ok_or("Path collision: expected object")?;
3325 }
3326 }
3327
3328 Ok(false)
3329 }
3330
3331 fn set_field_min(
3332 &mut self,
3333 object_reg: Register,
3334 path: &str,
3335 value_reg: Register,
3336 ) -> Result<bool> {
3337 let compiled = self.get_compiled_path(path);
3338 let segments = compiled.segments();
3339 let new_value = self.registers[value_reg].clone();
3340
3341 if !self.registers[object_reg].is_object() {
3342 self.registers[object_reg] = json!({});
3343 }
3344
3345 let obj = self.registers[object_reg]
3346 .as_object_mut()
3347 .ok_or("Not an object")?;
3348
3349 let mut current = obj;
3350 for (i, segment) in segments.iter().enumerate() {
3351 if i == segments.len() - 1 {
3352 let should_update = if let Some(current_value) = current.get(segment) {
3353 if current_value.is_null() {
3354 true
3355 } else {
3356 match (current_value.as_i64(), new_value.as_i64()) {
3357 (Some(current_val), Some(new_val)) => new_val < current_val,
3358 (Some(current_val), None) if new_value.as_u64().is_some() => {
3359 (new_value.as_u64().unwrap() as i64) < current_val
3360 }
3361 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3362 new_val < current_value.as_u64().unwrap() as i64
3363 }
3364 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3365 (Some(current_val), Some(new_val)) => new_val < current_val,
3366 _ => match (current_value.as_f64(), new_value.as_f64()) {
3367 (Some(current_val), Some(new_val)) => new_val < current_val,
3368 _ => false,
3369 },
3370 },
3371 _ => false,
3372 }
3373 }
3374 } else {
3375 true
3376 };
3377
3378 if should_update {
3379 current.insert(segment.to_string(), new_value);
3380 return Ok(true);
3381 }
3382 return Ok(false);
3383 } else {
3384 current
3385 .entry(segment.to_string())
3386 .or_insert_with(|| json!({}));
3387 current = current
3388 .get_mut(segment)
3389 .and_then(|v| v.as_object_mut())
3390 .ok_or("Path collision: expected object")?;
3391 }
3392 }
3393
3394 Ok(false)
3395 }
3396
3397 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3398 let compiled = self.get_compiled_path(path);
3399 let segments = compiled.segments();
3400 let mut current = &self.registers[object_reg];
3401
3402 for segment in segments {
3403 current = current
3404 .get(segment)
3405 .ok_or_else(|| format!("Field not found: {}", segment))?;
3406 }
3407
3408 Ok(current.clone())
3409 }
3410
3411 fn append_to_array(
3412 &mut self,
3413 object_reg: Register,
3414 path: &str,
3415 value_reg: Register,
3416 max_length: usize,
3417 ) -> Result<()> {
3418 let compiled = self.get_compiled_path(path);
3419 let segments = compiled.segments();
3420 let value = self.registers[value_reg].clone();
3421
3422 if !self.registers[object_reg].is_object() {
3423 self.registers[object_reg] = json!({});
3424 }
3425
3426 let obj = self.registers[object_reg]
3427 .as_object_mut()
3428 .ok_or("Not an object")?;
3429
3430 let mut current = obj;
3431 for (i, segment) in segments.iter().enumerate() {
3432 if i == segments.len() - 1 {
3433 current
3434 .entry(segment.to_string())
3435 .or_insert_with(|| json!([]));
3436 let arr = current
3437 .get_mut(segment)
3438 .and_then(|v| v.as_array_mut())
3439 .ok_or("Path is not an array")?;
3440 arr.push(value.clone());
3441
3442 if arr.len() > max_length {
3443 let excess = arr.len() - max_length;
3444 arr.drain(0..excess);
3445 }
3446 } else {
3447 current
3448 .entry(segment.to_string())
3449 .or_insert_with(|| json!({}));
3450 current = current
3451 .get_mut(segment)
3452 .and_then(|v| v.as_object_mut())
3453 .ok_or("Path collision: expected object")?;
3454 }
3455 }
3456
3457 Ok(())
3458 }
3459
3460 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3461 let value = &self.registers[reg];
3462 let transformed = Self::apply_transformation(value, transformation)?;
3463 self.registers[reg] = transformed;
3464 Ok(())
3465 }
3466
3467 fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3468 match transformation {
3469 Transformation::HexEncode => {
3470 if let Some(arr) = value.as_array() {
3471 let bytes: Vec<u8> = arr
3472 .iter()
3473 .filter_map(|v| v.as_u64().map(|n| n as u8))
3474 .collect();
3475 let hex = hex::encode(&bytes);
3476 Ok(json!(hex))
3477 } else if value.is_string() {
3478 Ok(value.clone())
3479 } else {
3480 Err("HexEncode requires an array of numbers".into())
3481 }
3482 }
3483 Transformation::HexDecode => {
3484 if let Some(s) = value.as_str() {
3485 let s = s.strip_prefix("0x").unwrap_or(s);
3486 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3487 Ok(json!(bytes))
3488 } else {
3489 Err("HexDecode requires a string".into())
3490 }
3491 }
3492 Transformation::Base58Encode => {
3493 if let Some(arr) = value.as_array() {
3494 let bytes: Vec<u8> = arr
3495 .iter()
3496 .filter_map(|v| v.as_u64().map(|n| n as u8))
3497 .collect();
3498 let encoded = bs58::encode(&bytes).into_string();
3499 Ok(json!(encoded))
3500 } else if value.is_string() {
3501 Ok(value.clone())
3502 } else {
3503 Err("Base58Encode requires an array of numbers".into())
3504 }
3505 }
3506 Transformation::Base58Decode => {
3507 if let Some(s) = value.as_str() {
3508 let bytes = bs58::decode(s)
3509 .into_vec()
3510 .map_err(|e| format!("Base58 decode error: {}", e))?;
3511 Ok(json!(bytes))
3512 } else {
3513 Err("Base58Decode requires a string".into())
3514 }
3515 }
3516 Transformation::ToString => Ok(json!(value.to_string())),
3517 Transformation::ToNumber => {
3518 if let Some(s) = value.as_str() {
3519 let n = s
3520 .parse::<i64>()
3521 .map_err(|e| format!("Parse error: {}", e))?;
3522 Ok(json!(n))
3523 } else {
3524 Ok(value.clone())
3525 }
3526 }
3527 }
3528 }
3529
3530 fn evaluate_comparison(
3531 &self,
3532 field_value: &Value,
3533 op: &ComparisonOp,
3534 condition_value: &Value,
3535 ) -> Result<bool> {
3536 use ComparisonOp::*;
3537
3538 match op {
3539 Equal => Ok(field_value == condition_value),
3540 NotEqual => Ok(field_value != condition_value),
3541 GreaterThan => {
3542 match (field_value.as_i64(), condition_value.as_i64()) {
3544 (Some(a), Some(b)) => Ok(a > b),
3545 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3546 (Some(a), Some(b)) => Ok(a > b),
3547 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3548 (Some(a), Some(b)) => Ok(a > b),
3549 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3550 },
3551 },
3552 }
3553 }
3554 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3555 (Some(a), Some(b)) => Ok(a >= b),
3556 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3557 (Some(a), Some(b)) => Ok(a >= b),
3558 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3559 (Some(a), Some(b)) => Ok(a >= b),
3560 _ => {
3561 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3562 }
3563 },
3564 },
3565 },
3566 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3567 (Some(a), Some(b)) => Ok(a < b),
3568 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3569 (Some(a), Some(b)) => Ok(a < b),
3570 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3571 (Some(a), Some(b)) => Ok(a < b),
3572 _ => Err("Cannot compare non-numeric values with LessThan".into()),
3573 },
3574 },
3575 },
3576 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3577 (Some(a), Some(b)) => Ok(a <= b),
3578 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3579 (Some(a), Some(b)) => Ok(a <= b),
3580 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3581 (Some(a), Some(b)) => Ok(a <= b),
3582 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3583 },
3584 },
3585 },
3586 }
3587 }
3588
3589 fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3590 if let Some(state) = self.states.get(&state_id) {
3591 if let Some(index) = state.lookup_indexes.get(index_name) {
3592 if index.lookup(value).is_some() {
3593 return true;
3594 }
3595 }
3596
3597 for (name, index) in state.lookup_indexes.iter() {
3598 if name == index_name {
3599 continue;
3600 }
3601 if index.lookup(value).is_some() {
3602 return true;
3603 }
3604 }
3605
3606 if let Some(pda_str) = value.as_str() {
3607 if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3608 if pda_lookup.contains(pda_str) {
3609 return true;
3610 }
3611 }
3612 }
3613 }
3614
3615 false
3616 }
3617
3618 #[allow(clippy::type_complexity)]
3619 fn apply_deferred_when_op(
3620 &mut self,
3621 state_id: u32,
3622 op: &DeferredWhenOperation,
3623 entity_evaluator: Option<
3624 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
3625 >,
3626 computed_paths: Option<&[String]>,
3627 ) -> Result<Vec<Mutation>> {
3628 let state = self.states.get(&state_id).ok_or("State not found")?;
3629
3630 if op.primary_key.is_null() {
3631 return Ok(vec![]);
3632 }
3633
3634 let mut entity_state = state
3635 .get_and_touch(&op.primary_key)
3636 .unwrap_or_else(|| json!({}));
3637
3638 let old_computed_values: Vec<_> = computed_paths
3640 .map(|paths| {
3641 paths
3642 .iter()
3643 .map(|path| Self::get_value_at_path(&entity_state, path))
3644 .collect()
3645 })
3646 .unwrap_or_default();
3647
3648 Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3649
3650 if let Some(evaluator) = entity_evaluator {
3652 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3653 let context_timestamp = self
3654 .current_context
3655 .as_ref()
3656 .map(|c| c.timestamp())
3657 .unwrap_or_else(|| {
3658 std::time::SystemTime::now()
3659 .duration_since(std::time::UNIX_EPOCH)
3660 .unwrap()
3661 .as_secs() as i64
3662 });
3663
3664 tracing::debug!(
3665 entity_name = %op.entity_name,
3666 primary_key = %op.primary_key,
3667 field_path = %op.field_path,
3668 "Re-evaluating computed fields after deferred when-op"
3669 );
3670
3671 if let Err(e) = evaluator(&mut entity_state, context_slot, context_timestamp) {
3672 tracing::warn!(
3673 entity_name = %op.entity_name,
3674 primary_key = %op.primary_key,
3675 error = %e,
3676 "Failed to evaluate computed fields after deferred when-op"
3677 );
3678 }
3679 }
3680
3681 state.insert_with_eviction(op.primary_key.clone(), entity_state.clone());
3682
3683 if !op.emit {
3684 return Ok(vec![]);
3685 }
3686
3687 let mut patch = json!({});
3688 Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3689
3690 if let Some(paths) = computed_paths {
3692 tracing::debug!(
3693 entity_name = %op.entity_name,
3694 primary_key = %op.primary_key,
3695 computed_paths_count = paths.len(),
3696 "Checking computed fields for changes after deferred when-op"
3697 );
3698 for (path, old_value) in paths.iter().zip(old_computed_values.iter()) {
3699 let new_value = Self::get_value_at_path(&entity_state, path);
3700 tracing::debug!(
3701 entity_name = %op.entity_name,
3702 primary_key = %op.primary_key,
3703 field_path = %path,
3704 old_value = ?old_value,
3705 new_value = ?new_value,
3706 "Comparing computed field values"
3707 );
3708 if let Some(ref new_val) = new_value {
3709 if Some(new_val) != old_value.as_ref() {
3710 Self::set_nested_field_value(&mut patch, path, new_val.clone())?;
3711 tracing::info!(
3712 entity_name = %op.entity_name,
3713 primary_key = %op.primary_key,
3714 field_path = %path,
3715 "Computed field changed after deferred when-op, including in mutation"
3716 );
3717 }
3718 }
3719 }
3720 }
3721
3722 Ok(vec![Mutation {
3723 export: op.entity_name.clone(),
3724 key: op.primary_key.clone(),
3725 patch,
3726 append: vec![],
3727 }])
3728 }
3729
3730 fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3731 let parts: Vec<&str> = path.split('.').collect();
3732 let mut current = obj;
3733
3734 for (i, part) in parts.iter().enumerate() {
3735 if i == parts.len() - 1 {
3736 if let Some(map) = current.as_object_mut() {
3737 map.insert(part.to_string(), value);
3738 return Ok(());
3739 }
3740 return Err("Cannot set field on non-object".into());
3741 }
3742
3743 if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3744 if let Some(map) = current.as_object_mut() {
3745 map.insert(part.to_string(), json!({}));
3746 }
3747 }
3748
3749 current = current.get_mut(*part).ok_or("Path navigation failed")?;
3750 }
3751
3752 Ok(())
3753 }
3754
3755 pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3756 let now = std::time::SystemTime::now()
3757 .duration_since(std::time::UNIX_EPOCH)
3758 .unwrap()
3759 .as_secs() as i64;
3760
3761 let state = match self.states.get(&state_id) {
3762 Some(s) => s,
3763 None => return 0,
3764 };
3765
3766 let mut removed = 0;
3767 state.deferred_when_ops.retain(|_, ops| {
3768 let before = ops.len();
3769 ops.retain(|op| now - op.deferred_at < max_age_secs);
3770 removed += before - ops.len();
3771 !ops.is_empty()
3772 });
3773
3774 removed
3775 }
3776
3777 #[cfg_attr(feature = "otel", instrument(
3786 name = "vm.update_pda_lookup",
3787 skip(self),
3788 fields(
3789 pda = %pda_address,
3790 seed = %seed_value,
3791 )
3792 ))]
3793 pub fn update_pda_reverse_lookup(
3794 &mut self,
3795 state_id: u32,
3796 lookup_name: &str,
3797 pda_address: String,
3798 seed_value: String,
3799 ) -> Result<Vec<PendingAccountUpdate>> {
3800 let state = self
3801 .states
3802 .get_mut(&state_id)
3803 .ok_or("State table not found")?;
3804
3805 let lookup = state
3806 .pda_reverse_lookups
3807 .entry(lookup_name.to_string())
3808 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3809
3810 let old_seed = lookup.index.peek(&pda_address).cloned();
3814 let mapping_changed = old_seed
3815 .as_ref()
3816 .map(|old| old != &seed_value)
3817 .unwrap_or(false);
3818
3819 if !mapping_changed && old_seed.is_none() {
3820 tracing::info!(
3821 pda = %pda_address,
3822 seed = %seed_value,
3823 "[PDA] First-time PDA reverse lookup established"
3824 );
3825 } else if !mapping_changed {
3826 tracing::debug!(
3827 pda = %pda_address,
3828 seed = %seed_value,
3829 "[PDA] PDA reverse lookup re-registered (same mapping)"
3830 );
3831 }
3832
3833 let evicted_pda = lookup.insert(pda_address.clone(), seed_value.clone());
3834
3835 if let Some(ref evicted) = evicted_pda {
3836 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3837 let count = evicted_updates.len();
3838 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3839 }
3840 }
3841
3842 let mut pending = self.flush_pending_updates(state_id, &pda_address)?;
3844
3845 if mapping_changed {
3853 if let Some(state) = self.states.get(&state_id) {
3854 for index in state.lookup_indexes.values() {
3856 index.remove(&Value::String(pda_address.clone()));
3857 }
3858
3859 if let Some((_, mut cached)) = state.last_account_data.remove(&pda_address) {
3860 tracing::info!(
3861 pda = %pda_address,
3862 old_seed = ?old_seed,
3863 new_seed = %seed_value,
3864 account_type = %cached.account_type,
3865 "PDA mapping changed — clearing stale indexes and reprocessing cached data"
3866 );
3867 cached.is_stale_reprocess = true;
3868 pending.push(cached);
3869 }
3870 }
3871 }
3872
3873 Ok(pending)
3874 }
3875
3876 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3881 let state = match self.states.get_mut(&state_id) {
3882 Some(s) => s,
3883 None => return 0,
3884 };
3885
3886 let now = std::time::SystemTime::now()
3887 .duration_since(std::time::UNIX_EPOCH)
3888 .unwrap()
3889 .as_secs() as i64;
3890
3891 let mut removed_count = 0;
3892
3893 state.pending_updates.retain(|_pda_address, updates| {
3895 let original_len = updates.len();
3896
3897 updates.retain(|update| {
3898 let age = now - update.queued_at;
3899 age <= PENDING_UPDATE_TTL_SECONDS
3900 });
3901
3902 removed_count += original_len - updates.len();
3903
3904 !updates.is_empty()
3906 });
3907
3908 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3910
3911 if removed_count > 0 {
3912 #[cfg(feature = "otel")]
3913 crate::vm_metrics::record_pending_updates_expired(
3914 removed_count as u64,
3915 &state.entity_name,
3916 );
3917 }
3918
3919 removed_count
3920 }
3921
3922 #[cfg_attr(feature = "otel", instrument(
3954 name = "vm.queue_account_update",
3955 skip(self, update),
3956 fields(
3957 pda = %update.pda_address,
3958 account_type = %update.account_type,
3959 slot = update.slot,
3960 )
3961 ))]
3962 pub fn queue_account_update(
3963 &mut self,
3964 state_id: u32,
3965 update: QueuedAccountUpdate,
3966 ) -> Result<()> {
3967 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3968 self.cleanup_expired_pending_updates(state_id);
3969 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3970 self.drop_oldest_pending_update(state_id)?;
3971 }
3972 }
3973
3974 let state = self
3975 .states
3976 .get_mut(&state_id)
3977 .ok_or("State table not found")?;
3978
3979 let pending = PendingAccountUpdate {
3980 account_type: update.account_type,
3981 pda_address: update.pda_address.clone(),
3982 account_data: update.account_data,
3983 slot: update.slot,
3984 write_version: update.write_version,
3985 signature: update.signature,
3986 queued_at: std::time::SystemTime::now()
3987 .duration_since(std::time::UNIX_EPOCH)
3988 .unwrap()
3989 .as_secs() as i64,
3990 is_stale_reprocess: false,
3991 };
3992
3993 let pda_address = pending.pda_address.clone();
3994 let slot = pending.slot;
3995
3996 let mut updates = state
3997 .pending_updates
3998 .entry(pda_address.clone())
3999 .or_insert_with(Vec::new);
4000
4001 let original_len = updates.len();
4002 updates.retain(|existing| existing.slot > slot);
4003 let removed_by_dedup = original_len - updates.len();
4004
4005 if removed_by_dedup > 0 {
4006 self.pending_queue_size = self
4007 .pending_queue_size
4008 .saturating_sub(removed_by_dedup as u64);
4009 }
4010
4011 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
4012 updates.remove(0);
4013 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4014 }
4015
4016 updates.push(pending);
4017 #[cfg(feature = "otel")]
4018 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
4019
4020 Ok(())
4021 }
4022
4023 pub fn queue_instruction_event(
4024 &mut self,
4025 state_id: u32,
4026 event: QueuedInstructionEvent,
4027 ) -> Result<()> {
4028 let state = self
4029 .states
4030 .get_mut(&state_id)
4031 .ok_or("State table not found")?;
4032
4033 let pda_address = event.pda_address.clone();
4034
4035 let pending = PendingInstructionEvent {
4036 event_type: event.event_type,
4037 pda_address: event.pda_address,
4038 event_data: event.event_data,
4039 slot: event.slot,
4040 signature: event.signature,
4041 queued_at: std::time::SystemTime::now()
4042 .duration_since(std::time::UNIX_EPOCH)
4043 .unwrap()
4044 .as_secs() as i64,
4045 };
4046
4047 let mut events = state
4048 .pending_instruction_events
4049 .entry(pda_address)
4050 .or_insert_with(Vec::new);
4051
4052 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
4053 events.remove(0);
4054 }
4055
4056 events.push(pending);
4057
4058 Ok(())
4059 }
4060
4061 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
4062 self.last_pda_lookup_miss.take()
4063 }
4064
4065 pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
4066 self.last_lookup_index_miss.take()
4067 }
4068
4069 pub fn take_last_pda_registered(&mut self) -> Option<String> {
4070 self.last_pda_registered.take()
4071 }
4072
4073 pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
4074 std::mem::take(&mut self.last_lookup_index_keys)
4075 }
4076
4077 pub fn flush_pending_instruction_events(
4078 &mut self,
4079 state_id: u32,
4080 pda_address: &str,
4081 ) -> Vec<PendingInstructionEvent> {
4082 let state = match self.states.get_mut(&state_id) {
4083 Some(s) => s,
4084 None => return Vec::new(),
4085 };
4086
4087 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
4088 events
4089 } else {
4090 Vec::new()
4091 }
4092 }
4093
4094 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
4096 let state = self.states.get(&state_id)?;
4097
4098 let now = std::time::SystemTime::now()
4099 .duration_since(std::time::UNIX_EPOCH)
4100 .unwrap()
4101 .as_secs() as i64;
4102
4103 let mut total_updates = 0;
4104 let mut oldest_timestamp = now;
4105 let mut largest_pda_queue = 0;
4106 let mut estimated_memory = 0;
4107
4108 for entry in state.pending_updates.iter() {
4109 let (_, updates) = entry.pair();
4110 total_updates += updates.len();
4111 largest_pda_queue = largest_pda_queue.max(updates.len());
4112
4113 for update in updates.iter() {
4114 oldest_timestamp = oldest_timestamp.min(update.queued_at);
4115 estimated_memory += update.account_type.len() +
4117 update.pda_address.len() +
4118 update.signature.len() +
4119 16 + estimate_json_size(&update.account_data);
4121 }
4122 }
4123
4124 Some(PendingQueueStats {
4125 total_updates,
4126 unique_pdas: state.pending_updates.len(),
4127 oldest_age_seconds: now - oldest_timestamp,
4128 largest_pda_queue_size: largest_pda_queue,
4129 estimated_memory_bytes: estimated_memory,
4130 })
4131 }
4132
4133 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
4134 let mut stats = VmMemoryStats {
4135 path_cache_size: self.path_cache.len(),
4136 ..Default::default()
4137 };
4138
4139 if let Some(state) = self.states.get(&state_id) {
4140 stats.state_table_entity_count = state.data.len();
4141 stats.state_table_max_entries = state.config.max_entries;
4142 stats.state_table_at_capacity = state.is_at_capacity();
4143
4144 stats.lookup_index_count = state.lookup_indexes.len();
4145 stats.lookup_index_total_entries =
4146 state.lookup_indexes.values().map(|idx| idx.len()).sum();
4147
4148 stats.temporal_index_count = state.temporal_indexes.len();
4149 stats.temporal_index_total_entries = state
4150 .temporal_indexes
4151 .values()
4152 .map(|idx| idx.total_entries())
4153 .sum();
4154
4155 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
4156 stats.pda_reverse_lookup_total_entries = state
4157 .pda_reverse_lookups
4158 .values()
4159 .map(|lookup| lookup.len())
4160 .sum();
4161
4162 stats.version_tracker_entries = state.version_tracker.len();
4163
4164 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
4165 }
4166
4167 stats
4168 }
4169
4170 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
4171 let pending_removed = self.cleanup_expired_pending_updates(state_id);
4172 let temporal_removed = self.cleanup_temporal_indexes(state_id);
4173
4174 #[cfg(feature = "otel")]
4175 if let Some(state) = self.states.get(&state_id) {
4176 crate::vm_metrics::record_cleanup(
4177 pending_removed,
4178 temporal_removed,
4179 &state.entity_name,
4180 );
4181 }
4182
4183 CleanupResult {
4184 pending_updates_removed: pending_removed,
4185 temporal_entries_removed: temporal_removed,
4186 }
4187 }
4188
4189 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
4190 let state = match self.states.get_mut(&state_id) {
4191 Some(s) => s,
4192 None => return 0,
4193 };
4194
4195 let now = std::time::SystemTime::now()
4196 .duration_since(std::time::UNIX_EPOCH)
4197 .unwrap()
4198 .as_secs() as i64;
4199
4200 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
4201 let mut total_removed = 0;
4202
4203 for (_, index) in state.temporal_indexes.iter_mut() {
4204 total_removed += index.cleanup_expired(cutoff);
4205 }
4206
4207 total_removed
4208 }
4209
4210 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
4211 let state = self.states.get(&state_id)?;
4212
4213 if state.is_at_capacity() {
4214 Some(CapacityWarning {
4215 current_entries: state.data.len(),
4216 max_entries: state.config.max_entries,
4217 entries_over_limit: state.entries_over_limit(),
4218 })
4219 } else {
4220 None
4221 }
4222 }
4223
4224 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
4226 let state = self
4227 .states
4228 .get_mut(&state_id)
4229 .ok_or("State table not found")?;
4230
4231 let mut oldest_pda: Option<String> = None;
4232 let mut oldest_timestamp = i64::MAX;
4233
4234 for entry in state.pending_updates.iter() {
4236 let (pda, updates) = entry.pair();
4237 if let Some(update) = updates.first() {
4238 if update.queued_at < oldest_timestamp {
4239 oldest_timestamp = update.queued_at;
4240 oldest_pda = Some(pda.clone());
4241 }
4242 }
4243 }
4244
4245 if let Some(pda) = oldest_pda {
4247 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
4248 if !updates.is_empty() {
4249 updates.remove(0);
4250 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4251
4252 if updates.is_empty() {
4254 drop(updates);
4255 state.pending_updates.remove(&pda);
4256 }
4257 }
4258 }
4259 }
4260
4261 Ok(())
4262 }
4263
4264 fn flush_pending_updates(
4269 &mut self,
4270 state_id: u32,
4271 pda_address: &str,
4272 ) -> Result<Vec<PendingAccountUpdate>> {
4273 let state = self
4274 .states
4275 .get_mut(&state_id)
4276 .ok_or("State table not found")?;
4277
4278 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
4279 let count = pending_updates.len();
4280 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4281 #[cfg(feature = "otel")]
4282 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
4283 Ok(pending_updates)
4284 } else {
4285 Ok(Vec::new())
4286 }
4287 }
4288
4289 pub fn cache_last_account_data(
4295 &mut self,
4296 state_id: u32,
4297 pda_address: &str,
4298 update: PendingAccountUpdate,
4299 ) {
4300 if let Some(state) = self.states.get(&state_id) {
4301 state
4302 .last_account_data
4303 .insert(pda_address.to_string(), update);
4304 }
4305 }
4306
4307 pub fn try_pda_reverse_lookup(
4309 &mut self,
4310 state_id: u32,
4311 lookup_name: &str,
4312 pda_address: &str,
4313 ) -> Option<String> {
4314 let state = self.states.get_mut(&state_id)?;
4315
4316 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
4317 if let Some(value) = lookup.lookup(pda_address) {
4318 self.pda_cache_hits += 1;
4319 return Some(value);
4320 }
4321 }
4322
4323 self.pda_cache_misses += 1;
4324 None
4325 }
4326
4327 pub fn try_lookup_index_resolution(&self, state_id: u32, value: &Value) -> Option<Value> {
4330 let state = self.states.get(&state_id)?;
4331
4332 for index in state.lookup_indexes.values() {
4333 if let Some(resolved) = index.lookup(value) {
4334 return Some(resolved);
4335 }
4336 }
4337
4338 None
4339 }
4340
4341 pub fn try_chained_pda_lookup(
4346 &mut self,
4347 state_id: u32,
4348 lookup_name: &str,
4349 pda_address: &str,
4350 ) -> Option<String> {
4351 let pda_result = self.try_pda_reverse_lookup(state_id, lookup_name, pda_address)?;
4353
4354 let pda_value = Value::String(pda_result.clone());
4357 if let Some(resolved) = self.try_lookup_index_resolution(state_id, &pda_value) {
4358 resolved.as_str().map(|s| s.to_string())
4359 } else {
4360 pda_value.as_str().map(|s| s.to_string())
4362 }
4363 }
4364
4365 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4372 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4373 }
4374
4375 fn evaluate_computed_expr_with_env(
4377 &self,
4378 expr: &ComputedExpr,
4379 state: &Value,
4380 env: &std::collections::HashMap<String, Value>,
4381 ) -> Result<Value> {
4382 match expr {
4383 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4384
4385 ComputedExpr::Var { name } => env
4386 .get(name)
4387 .cloned()
4388 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4389
4390 ComputedExpr::Let { name, value, body } => {
4391 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4392 let mut new_env = env.clone();
4393 new_env.insert(name.clone(), val);
4394 self.evaluate_computed_expr_with_env(body, state, &new_env)
4395 }
4396
4397 ComputedExpr::If {
4398 condition,
4399 then_branch,
4400 else_branch,
4401 } => {
4402 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4403 if self.value_to_bool(&cond_val) {
4404 self.evaluate_computed_expr_with_env(then_branch, state, env)
4405 } else {
4406 self.evaluate_computed_expr_with_env(else_branch, state, env)
4407 }
4408 }
4409
4410 ComputedExpr::None => Ok(Value::Null),
4411
4412 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4413
4414 ComputedExpr::Slice { expr, start, end } => {
4415 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4416 match val {
4417 Value::Array(arr) => {
4418 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4419 Ok(Value::Array(slice))
4420 }
4421 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4422 }
4423 }
4424
4425 ComputedExpr::Index { expr, index } => {
4426 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4427 match val {
4428 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4429 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4430 }
4431 }
4432
4433 ComputedExpr::U64FromLeBytes { bytes } => {
4434 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4435 let byte_vec = self.value_to_bytes(&val)?;
4436 if byte_vec.len() < 8 {
4437 return Err(format!(
4438 "u64::from_le_bytes requires 8 bytes, got {}",
4439 byte_vec.len()
4440 )
4441 .into());
4442 }
4443 let arr: [u8; 8] = byte_vec[..8]
4444 .try_into()
4445 .map_err(|_| "Failed to convert to [u8; 8]")?;
4446 Ok(Value::Number(serde_json::Number::from(u64::from_le_bytes(
4447 arr,
4448 ))))
4449 }
4450
4451 ComputedExpr::U64FromBeBytes { bytes } => {
4452 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4453 let byte_vec = self.value_to_bytes(&val)?;
4454 if byte_vec.len() < 8 {
4455 return Err(format!(
4456 "u64::from_be_bytes requires 8 bytes, got {}",
4457 byte_vec.len()
4458 )
4459 .into());
4460 }
4461 let arr: [u8; 8] = byte_vec[..8]
4462 .try_into()
4463 .map_err(|_| "Failed to convert to [u8; 8]")?;
4464 Ok(Value::Number(serde_json::Number::from(u64::from_be_bytes(
4465 arr,
4466 ))))
4467 }
4468
4469 ComputedExpr::ByteArray { bytes } => {
4470 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4471 }
4472
4473 ComputedExpr::Closure { param, body } => {
4474 Ok(json!({
4477 "__closure": {
4478 "param": param,
4479 "body": serde_json::to_value(body).unwrap_or(Value::Null)
4480 }
4481 }))
4482 }
4483
4484 ComputedExpr::Unary { op, expr } => {
4485 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4486 self.apply_unary_op(op, &val)
4487 }
4488
4489 ComputedExpr::JsonToBytes { expr } => {
4490 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4491 let bytes = self.value_to_bytes(&val)?;
4493 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4494 }
4495
4496 ComputedExpr::UnwrapOr { expr, default } => {
4497 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4498 if val.is_null() {
4499 Ok(default.clone())
4500 } else {
4501 Ok(val)
4502 }
4503 }
4504
4505 ComputedExpr::Binary { op, left, right } => {
4506 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4507 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4508 self.apply_binary_op(op, &l, &r)
4509 }
4510
4511 ComputedExpr::Cast { expr, to_type } => {
4512 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4513 self.apply_cast(&val, to_type)
4514 }
4515
4516 ComputedExpr::ResolverComputed {
4517 resolver,
4518 method,
4519 args,
4520 } => {
4521 let evaluated_args: Vec<Value> = args
4522 .iter()
4523 .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4524 .collect::<Result<Vec<_>>>()?;
4525 crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4526 }
4527
4528 ComputedExpr::MethodCall { expr, method, args } => {
4529 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4530 if method == "map" && args.len() == 1 {
4532 if let ComputedExpr::Closure { param, body } = &args[0] {
4533 if val.is_null() {
4535 return Ok(Value::Null);
4536 }
4537
4538 if let Value::Array(arr) = &val {
4539 let results: Result<Vec<Value>> = arr
4540 .iter()
4541 .map(|elem| {
4542 let mut closure_env = env.clone();
4543 closure_env.insert(param.clone(), elem.clone());
4544 self.evaluate_computed_expr_with_env(body, state, &closure_env)
4545 })
4546 .collect();
4547 return Ok(Value::Array(results?));
4548 }
4549
4550 let mut closure_env = env.clone();
4551 closure_env.insert(param.clone(), val);
4552 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4553 }
4554 }
4555 let evaluated_args: Vec<Value> = args
4556 .iter()
4557 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4558 .collect::<Result<Vec<_>>>()?;
4559 self.apply_method_call(&val, method, &evaluated_args)
4560 }
4561
4562 ComputedExpr::Literal { value } => Ok(value.clone()),
4563
4564 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4565
4566 ComputedExpr::ContextSlot => Ok(self
4567 .current_context
4568 .as_ref()
4569 .and_then(|ctx| ctx.slot)
4570 .map(|s| json!(s))
4571 .unwrap_or(Value::Null)),
4572
4573 ComputedExpr::ContextTimestamp => Ok(self
4574 .current_context
4575 .as_ref()
4576 .map(|ctx| json!(ctx.timestamp()))
4577 .unwrap_or(Value::Null)),
4578
4579 ComputedExpr::Keccak256 { expr } => {
4580 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4581 let bytes = self.value_to_bytes(&val)?;
4582 use sha3::{Digest, Keccak256};
4583 let hash = Keccak256::digest(&bytes);
4584 Ok(Value::Array(
4585 hash.to_vec().iter().map(|b| json!(*b)).collect(),
4586 ))
4587 }
4588 }
4589 }
4590
4591 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4593 match val {
4594 Value::Array(arr) => arr
4595 .iter()
4596 .map(|v| {
4597 v.as_u64()
4598 .map(|n| n as u8)
4599 .ok_or_else(|| "Array element not a valid byte".into())
4600 })
4601 .collect(),
4602 Value::String(s) => {
4603 if s.starts_with("0x") || s.starts_with("0X") {
4605 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4606 } else {
4607 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4608 }
4609 }
4610 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4611 }
4612 }
4613
4614 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4616 use crate::ast::UnaryOp;
4617 match op {
4618 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4619 UnaryOp::ReverseBits => match val.as_u64() {
4620 Some(n) => Ok(json!(n.reverse_bits())),
4621 None => match val.as_i64() {
4622 Some(n) => Ok(json!((n as u64).reverse_bits())),
4623 None => Err("reverse_bits requires an integer".into()),
4624 },
4625 },
4626 }
4627 }
4628
4629 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4631 let segments: Vec<&str> = path.split('.').collect();
4632 let mut current = state;
4633
4634 for segment in segments {
4635 match current.get(segment) {
4636 Some(v) => current = v,
4637 None => return Ok(Value::Null),
4638 }
4639 }
4640
4641 Ok(current.clone())
4642 }
4643
4644 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4646 match op {
4647 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4649 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4650 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4651 BinaryOp::Div => {
4652 if let Some(r) = right.as_i64() {
4654 if r == 0 {
4655 return Err("Division by zero".into());
4656 }
4657 }
4658 if let Some(r) = right.as_f64() {
4659 if r == 0.0 {
4660 return Err("Division by zero".into());
4661 }
4662 }
4663 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4664 }
4665 BinaryOp::Mod => {
4666 match (left.as_i64(), right.as_i64()) {
4668 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4669 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4670 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4671 _ => Err("Modulo requires non-zero integer operands".into()),
4672 },
4673 _ => Err("Modulo by zero".into()),
4674 }
4675 }
4676
4677 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4679 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4680 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4681 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4682 BinaryOp::Eq => Ok(json!(left == right)),
4683 BinaryOp::Ne => Ok(json!(left != right)),
4684
4685 BinaryOp::And => {
4687 let l_bool = self.value_to_bool(left);
4688 let r_bool = self.value_to_bool(right);
4689 Ok(json!(l_bool && r_bool))
4690 }
4691 BinaryOp::Or => {
4692 let l_bool = self.value_to_bool(left);
4693 let r_bool = self.value_to_bool(right);
4694 Ok(json!(l_bool || r_bool))
4695 }
4696
4697 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4699 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4700 _ => match (left.as_i64(), right.as_i64()) {
4701 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4702 _ => Err("XOR requires integer operands".into()),
4703 },
4704 },
4705 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4706 (Some(a), Some(b)) => Ok(json!(a & b)),
4707 _ => match (left.as_i64(), right.as_i64()) {
4708 (Some(a), Some(b)) => Ok(json!(a & b)),
4709 _ => Err("BitAnd requires integer operands".into()),
4710 },
4711 },
4712 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4713 (Some(a), Some(b)) => Ok(json!(a | b)),
4714 _ => match (left.as_i64(), right.as_i64()) {
4715 (Some(a), Some(b)) => Ok(json!(a | b)),
4716 _ => Err("BitOr requires integer operands".into()),
4717 },
4718 },
4719 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4720 (Some(a), Some(b)) => Ok(json!(a << b)),
4721 _ => match (left.as_i64(), right.as_i64()) {
4722 (Some(a), Some(b)) => Ok(json!(a << b)),
4723 _ => Err("Shl requires integer operands".into()),
4724 },
4725 },
4726 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4727 (Some(a), Some(b)) => Ok(json!(a >> b)),
4728 _ => match (left.as_i64(), right.as_i64()) {
4729 (Some(a), Some(b)) => Ok(json!(a >> b)),
4730 _ => Err("Shr requires integer operands".into()),
4731 },
4732 },
4733 }
4734 }
4735
4736 fn numeric_op<F1, F2>(
4738 &self,
4739 left: &Value,
4740 right: &Value,
4741 int_op: F1,
4742 float_op: F2,
4743 ) -> Result<Value>
4744 where
4745 F1: Fn(i64, i64) -> i64,
4746 F2: Fn(f64, f64) -> f64,
4747 {
4748 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4750 return Ok(json!(int_op(a, b)));
4751 }
4752
4753 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4755 return Ok(json!(int_op(a as i64, b as i64)));
4757 }
4758
4759 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4761 return Ok(json!(float_op(a, b)));
4762 }
4763
4764 if left.is_null() || right.is_null() {
4766 return Ok(Value::Null);
4767 }
4768
4769 Err(format!(
4770 "Cannot perform numeric operation on {:?} and {:?}",
4771 left, right
4772 )
4773 .into())
4774 }
4775
4776 fn comparison_op<F1, F2>(
4778 &self,
4779 left: &Value,
4780 right: &Value,
4781 int_cmp: F1,
4782 float_cmp: F2,
4783 ) -> Result<Value>
4784 where
4785 F1: Fn(i64, i64) -> bool,
4786 F2: Fn(f64, f64) -> bool,
4787 {
4788 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4790 return Ok(json!(int_cmp(a, b)));
4791 }
4792
4793 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4795 return Ok(json!(int_cmp(a as i64, b as i64)));
4796 }
4797
4798 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4800 return Ok(json!(float_cmp(a, b)));
4801 }
4802
4803 if left.is_null() || right.is_null() {
4805 return Ok(json!(false));
4806 }
4807
4808 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4809 }
4810
4811 fn value_to_bool(&self, value: &Value) -> bool {
4813 match value {
4814 Value::Null => false,
4815 Value::Bool(b) => *b,
4816 Value::Number(n) => {
4817 if let Some(i) = n.as_i64() {
4818 i != 0
4819 } else if let Some(f) = n.as_f64() {
4820 f != 0.0
4821 } else {
4822 true
4823 }
4824 }
4825 Value::String(s) => !s.is_empty(),
4826 Value::Array(arr) => !arr.is_empty(),
4827 Value::Object(obj) => !obj.is_empty(),
4828 }
4829 }
4830
4831 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4833 match to_type {
4834 "i8" | "i16" | "i32" | "i64" | "isize" => {
4835 if let Some(n) = value.as_i64() {
4836 Ok(json!(n))
4837 } else if let Some(n) = value.as_u64() {
4838 Ok(json!(n as i64))
4839 } else if let Some(n) = value.as_f64() {
4840 Ok(json!(n as i64))
4841 } else if let Some(s) = value.as_str() {
4842 s.parse::<i64>()
4843 .map(|n| json!(n))
4844 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4845 } else {
4846 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4847 }
4848 }
4849 "u8" | "u16" | "u32" | "u64" | "usize" => {
4850 if let Some(n) = value.as_u64() {
4851 Ok(json!(n))
4852 } else if let Some(n) = value.as_i64() {
4853 Ok(json!(n as u64))
4854 } else if let Some(n) = value.as_f64() {
4855 Ok(json!(n as u64))
4856 } else if let Some(s) = value.as_str() {
4857 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4858 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4859 })
4860 } else {
4861 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4862 }
4863 }
4864 "f32" | "f64" => {
4865 if let Some(n) = value.as_f64() {
4866 Ok(json!(n))
4867 } else if let Some(n) = value.as_i64() {
4868 Ok(json!(n as f64))
4869 } else if let Some(n) = value.as_u64() {
4870 Ok(json!(n as f64))
4871 } else if let Some(s) = value.as_str() {
4872 s.parse::<f64>()
4873 .map(|n| json!(n))
4874 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4875 } else {
4876 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4877 }
4878 }
4879 "String" | "string" => Ok(json!(value.to_string())),
4880 "bool" => Ok(json!(self.value_to_bool(value))),
4881 _ => {
4882 Ok(value.clone())
4884 }
4885 }
4886 }
4887
4888 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4890 match method {
4891 "unwrap_or" => {
4892 if value.is_null() && !args.is_empty() {
4893 Ok(args[0].clone())
4894 } else {
4895 Ok(value.clone())
4896 }
4897 }
4898 "unwrap_or_default" => {
4899 if value.is_null() {
4900 Ok(json!(0))
4902 } else {
4903 Ok(value.clone())
4904 }
4905 }
4906 "is_some" => Ok(json!(!value.is_null())),
4907 "is_none" => Ok(json!(value.is_null())),
4908 "abs" => {
4909 if let Some(n) = value.as_i64() {
4910 Ok(json!(n.abs()))
4911 } else if let Some(n) = value.as_f64() {
4912 Ok(json!(n.abs()))
4913 } else {
4914 Err(format!("Cannot call abs() on {:?}", value).into())
4915 }
4916 }
4917 "len" => {
4918 if let Some(s) = value.as_str() {
4919 Ok(json!(s.len()))
4920 } else if let Some(arr) = value.as_array() {
4921 Ok(json!(arr.len()))
4922 } else if let Some(obj) = value.as_object() {
4923 Ok(json!(obj.len()))
4924 } else {
4925 Err(format!("Cannot call len() on {:?}", value).into())
4926 }
4927 }
4928 "to_string" => Ok(json!(value.to_string())),
4929 "min" => {
4930 if args.is_empty() {
4931 return Err("min() requires an argument".into());
4932 }
4933 let other = &args[0];
4934 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4935 Ok(json!(a.min(b)))
4936 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4937 Ok(json!(a.min(b)))
4938 } else {
4939 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4940 }
4941 }
4942 "max" => {
4943 if args.is_empty() {
4944 return Err("max() requires an argument".into());
4945 }
4946 let other = &args[0];
4947 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4948 Ok(json!(a.max(b)))
4949 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4950 Ok(json!(a.max(b)))
4951 } else {
4952 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4953 }
4954 }
4955 "saturating_add" => {
4956 if args.is_empty() {
4957 return Err("saturating_add() requires an argument".into());
4958 }
4959 let other = &args[0];
4960 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4961 Ok(json!(a.saturating_add(b)))
4962 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4963 Ok(json!(a.saturating_add(b)))
4964 } else {
4965 Err(format!(
4966 "Cannot call saturating_add() on {:?} and {:?}",
4967 value, other
4968 )
4969 .into())
4970 }
4971 }
4972 "saturating_sub" => {
4973 if args.is_empty() {
4974 return Err("saturating_sub() requires an argument".into());
4975 }
4976 let other = &args[0];
4977 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4978 Ok(json!(a.saturating_sub(b)))
4979 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4980 Ok(json!(a.saturating_sub(b)))
4981 } else {
4982 Err(format!(
4983 "Cannot call saturating_sub() on {:?} and {:?}",
4984 value, other
4985 )
4986 .into())
4987 }
4988 }
4989 _ => Err(format!("Unknown method call: {}()", method).into()),
4990 }
4991 }
4992
4993 pub fn evaluate_computed_fields_from_ast(
4996 &self,
4997 state: &mut Value,
4998 computed_field_specs: &[ComputedFieldSpec],
4999 ) -> Result<Vec<String>> {
5000 let mut updated_paths = Vec::new();
5001
5002 crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
5003
5004 for spec in computed_field_specs {
5005 match self.evaluate_computed_expr(&spec.expression, state) {
5006 Ok(result) => {
5007 self.set_field_in_state(state, &spec.target_path, result)?;
5008 updated_paths.push(spec.target_path.clone());
5009 }
5010 Err(e) => {
5011 tracing::warn!(
5012 target_path = %spec.target_path,
5013 error = %e,
5014 "Failed to evaluate computed field"
5015 );
5016 }
5017 }
5018 }
5019
5020 Ok(updated_paths)
5021 }
5022
5023 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
5025 let segments: Vec<&str> = path.split('.').collect();
5026
5027 if segments.is_empty() {
5028 return Err("Empty path".into());
5029 }
5030
5031 let mut current = state;
5033 for (i, segment) in segments.iter().enumerate() {
5034 if i == segments.len() - 1 {
5035 if let Some(obj) = current.as_object_mut() {
5037 obj.insert(segment.to_string(), value);
5038 return Ok(());
5039 } else {
5040 return Err(format!("Cannot set field '{}' on non-object", segment).into());
5041 }
5042 } else {
5043 if !current.is_object() {
5045 *current = json!({});
5046 }
5047 let obj = current.as_object_mut().unwrap();
5048 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
5049 }
5050 }
5051
5052 Ok(())
5053 }
5054
5055 pub fn create_evaluator_from_specs(
5058 specs: Vec<ComputedFieldSpec>,
5059 ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
5060 move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
5061 let mut vm = VmContext::new();
5062 vm.current_context = Some(UpdateContext {
5063 slot: context_slot,
5064 timestamp: Some(context_timestamp),
5065 ..Default::default()
5066 });
5067 vm.evaluate_computed_fields_from_ast(state, &specs)?;
5068 Ok(())
5069 }
5070 }
5071}
5072
5073impl Default for VmContext {
5074 fn default() -> Self {
5075 Self::new()
5076 }
5077}
5078
5079impl crate::resolvers::ReverseLookupUpdater for VmContext {
5081 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
5082 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
5084 .unwrap_or_else(|e| {
5085 tracing::error!("Failed to update PDA reverse lookup: {}", e);
5086 Vec::new()
5087 })
5088 }
5089
5090 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
5091 self.flush_pending_updates(0, pda_address)
5093 .unwrap_or_else(|e| {
5094 tracing::error!("Failed to flush pending updates: {}", e);
5095 Vec::new()
5096 })
5097 }
5098}
5099
5100#[cfg(test)]
5101mod tests {
5102 use super::*;
5103 use crate::ast::{
5104 BinaryOp, ComputedExpr, ComputedFieldSpec, HttpMethod, UrlResolverConfig, UrlSource,
5105 };
5106
5107 #[test]
5108 fn test_url_resolver_cache_key_uses_method_and_resolved_url() {
5109 let field_path_resolver = ResolverType::Url(UrlResolverConfig {
5110 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5111 method: HttpMethod::Get,
5112 extract_path: None,
5113 });
5114 let template_resolver = ResolverType::Url(UrlResolverConfig {
5115 url_source: UrlSource::Template(vec![ast::UrlTemplatePart::Literal(
5116 "https://example.com/metadata".to_string(),
5117 )]),
5118 method: HttpMethod::Get,
5119 extract_path: Some("data".to_string()),
5120 });
5121 let input = json!("https://cdn.example.com/token.json");
5122
5123 assert_eq!(
5124 resolver_cache_key(&field_path_resolver, &input),
5125 resolver_cache_key(&template_resolver, &input)
5126 );
5127 }
5128
5129 #[test]
5130 fn test_url_resolver_cache_key_distinguishes_http_method() {
5131 let get_resolver = ResolverType::Url(UrlResolverConfig {
5132 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5133 method: HttpMethod::Get,
5134 extract_path: None,
5135 });
5136 let post_resolver = ResolverType::Url(UrlResolverConfig {
5137 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5138 method: HttpMethod::Post,
5139 extract_path: None,
5140 });
5141 let input = json!("https://api.example.com/round");
5142
5143 assert_ne!(
5144 resolver_cache_key(&get_resolver, &input),
5145 resolver_cache_key(&post_resolver, &input)
5146 );
5147 }
5148
5149 #[test]
5150 fn test_expired_resolver_cache_entry_is_dropped() {
5151 let mut vm = VmContext::new();
5152 let resolver = ResolverType::Url(UrlResolverConfig {
5153 url_source: UrlSource::FieldPath("metadata_uri".to_string()),
5154 method: HttpMethod::Get,
5155 extract_path: None,
5156 });
5157 let input = json!("https://cdn.example.com/token.json");
5158 let cache_key = resolver_cache_key(&resolver, &input);
5159
5160 vm.resolver_cache.put(
5161 cache_key.clone(),
5162 ResolverCacheEntry {
5163 value: json!({ "name": "Token" }),
5164 cached_at: Instant::now() - resolver_cache_ttl() - Duration::from_secs(1),
5165 },
5166 );
5167
5168 assert!(vm.get_cached_resolver_value(&cache_key).is_none());
5169 assert!(vm.resolver_cache.get(&cache_key).is_none());
5170 }
5171
5172 #[test]
5173 fn test_computed_field_preserves_integer_type() {
5174 let vm = VmContext::new();
5175
5176 let mut state = serde_json::json!({
5177 "trading": {
5178 "total_buy_volume": 20000000000_i64,
5179 "total_sell_volume": 17951316474_i64
5180 }
5181 });
5182
5183 let spec = ComputedFieldSpec {
5184 target_path: "trading.total_volume".to_string(),
5185 result_type: "Option<u64>".to_string(),
5186 expression: ComputedExpr::Binary {
5187 op: BinaryOp::Add,
5188 left: Box::new(ComputedExpr::UnwrapOr {
5189 expr: Box::new(ComputedExpr::FieldRef {
5190 path: "trading.total_buy_volume".to_string(),
5191 }),
5192 default: serde_json::json!(0),
5193 }),
5194 right: Box::new(ComputedExpr::UnwrapOr {
5195 expr: Box::new(ComputedExpr::FieldRef {
5196 path: "trading.total_sell_volume".to_string(),
5197 }),
5198 default: serde_json::json!(0),
5199 }),
5200 },
5201 };
5202
5203 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
5204 .unwrap();
5205
5206 let total_volume = state
5207 .get("trading")
5208 .and_then(|t| t.get("total_volume"))
5209 .expect("total_volume should exist");
5210
5211 let serialized = serde_json::to_string(total_volume).unwrap();
5212 assert!(
5213 !serialized.contains('.'),
5214 "Integer should not have decimal point: {}",
5215 serialized
5216 );
5217 assert_eq!(
5218 total_volume.as_i64(),
5219 Some(37951316474),
5220 "Value should be correct sum"
5221 );
5222 }
5223
5224 #[test]
5225 fn test_set_field_sum_preserves_integer_type() {
5226 let mut vm = VmContext::new();
5227 vm.registers[0] = serde_json::json!({});
5228 vm.registers[1] = serde_json::json!(20000000000_i64);
5229 vm.registers[2] = serde_json::json!(17951316474_i64);
5230
5231 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
5232 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
5233
5234 let state = &vm.registers[0];
5235 let buy_vol = state
5236 .get("trading")
5237 .and_then(|t| t.get("total_buy_volume"))
5238 .unwrap();
5239 let sell_vol = state
5240 .get("trading")
5241 .and_then(|t| t.get("total_sell_volume"))
5242 .unwrap();
5243
5244 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
5245 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
5246
5247 assert!(
5248 !buy_serialized.contains('.'),
5249 "Buy volume should not have decimal: {}",
5250 buy_serialized
5251 );
5252 assert!(
5253 !sell_serialized.contains('.'),
5254 "Sell volume should not have decimal: {}",
5255 sell_serialized
5256 );
5257 }
5258
5259 #[test]
5260 fn test_lookup_index_chaining() {
5261 let mut vm = VmContext::new();
5262
5263 let state = vm.states.get_mut(&0).unwrap();
5264
5265 state
5266 .pda_reverse_lookups
5267 .entry("default_pda_lookup".to_string())
5268 .or_insert_with(|| PdaReverseLookup::new(1000))
5269 .insert("pda_123".to_string(), "addr_456".to_string());
5270
5271 state
5272 .lookup_indexes
5273 .entry("round_address_lookup_index".to_string())
5274 .or_insert_with(LookupIndex::new)
5275 .insert(json!("addr_456"), json!(789));
5276
5277 let handler = vec![
5278 OpCode::LoadConstant {
5279 value: json!("pda_123"),
5280 dest: 0,
5281 },
5282 OpCode::LookupIndex {
5283 state_id: 0,
5284 index_name: "round_address_lookup_index".to_string(),
5285 lookup_value: 0,
5286 dest: 1,
5287 },
5288 ];
5289
5290 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5291 .unwrap();
5292
5293 assert_eq!(vm.registers[1], json!(789));
5294 }
5295
5296 #[test]
5297 fn test_lookup_index_no_chain() {
5298 let mut vm = VmContext::new();
5299
5300 let state = vm.states.get_mut(&0).unwrap();
5301 state
5302 .lookup_indexes
5303 .entry("test_index".to_string())
5304 .or_insert_with(LookupIndex::new)
5305 .insert(json!("key_abc"), json!(42));
5306
5307 let handler = vec![
5308 OpCode::LoadConstant {
5309 value: json!("key_abc"),
5310 dest: 0,
5311 },
5312 OpCode::LookupIndex {
5313 state_id: 0,
5314 index_name: "test_index".to_string(),
5315 lookup_value: 0,
5316 dest: 1,
5317 },
5318 ];
5319
5320 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5321 .unwrap();
5322
5323 assert_eq!(vm.registers[1], json!(42));
5324 }
5325
5326 #[test]
5327 fn test_conditional_set_field_with_zero_array() {
5328 let mut vm = VmContext::new();
5329
5330 let event_zeros = json!({
5331 "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5332 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5333 });
5334
5335 let event_nonzero = json!({
5336 "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
5337 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
5338 });
5339
5340 let zero_32: Value = json!([
5341 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5342 0, 0, 0
5343 ]);
5344
5345 let handler = vec![
5346 OpCode::CreateObject { dest: 2 },
5347 OpCode::LoadEventField {
5348 path: FieldPath::new(&["value"]),
5349 dest: 10,
5350 default: None,
5351 },
5352 OpCode::ConditionalSetField {
5353 object: 2,
5354 path: "captured_value".to_string(),
5355 value: 10,
5356 condition_field: FieldPath::new(&["value"]),
5357 condition_op: ComparisonOp::NotEqual,
5358 condition_value: zero_32,
5359 },
5360 ];
5361
5362 vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
5363 .unwrap();
5364 assert!(
5365 vm.registers[2].get("captured_value").is_none(),
5366 "Field should not be set when value is all zeros"
5367 );
5368
5369 vm.reset_registers();
5370 vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
5371 .unwrap();
5372 assert!(
5373 vm.registers[2].get("captured_value").is_some(),
5374 "Field should be set when value is non-zero"
5375 );
5376 }
5377
5378 #[test]
5379 fn test_when_instruction_arrives_first() {
5380 let mut vm = VmContext::new();
5381
5382 let signature = "test_sig_123".to_string();
5383
5384 {
5385 let state = vm.states.get(&0).unwrap();
5386 let mut cache = state.recent_tx_instructions.lock().unwrap();
5387 let mut set = HashSet::new();
5388 set.insert("RevealIxState".to_string());
5389 cache.put(signature.clone(), set);
5390 }
5391
5392 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5393
5394 let handler = vec![
5395 OpCode::CreateObject { dest: 2 },
5396 OpCode::LoadConstant {
5397 value: json!("primary_key_value"),
5398 dest: 1,
5399 },
5400 OpCode::LoadConstant {
5401 value: json!("the_revealed_value"),
5402 dest: 10,
5403 },
5404 OpCode::SetFieldWhen {
5405 object: 2,
5406 path: "entropy_value".to_string(),
5407 value: 10,
5408 when_instruction: "RevealIxState".to_string(),
5409 entity_name: "TestEntity".to_string(),
5410 key_reg: 1,
5411 condition_field: None,
5412 condition_op: None,
5413 condition_value: None,
5414 },
5415 ];
5416
5417 vm.execute_handler(
5418 &handler,
5419 &json!({}),
5420 "VarState",
5421 0,
5422 "TestEntity",
5423 None,
5424 None,
5425 )
5426 .unwrap();
5427
5428 assert_eq!(
5429 vm.registers[2].get("entropy_value").unwrap(),
5430 "the_revealed_value",
5431 "Field should be set when instruction was already seen"
5432 );
5433 }
5434
5435 #[test]
5436 fn test_when_account_arrives_first() {
5437 let mut vm = VmContext::new();
5438
5439 let signature = "test_sig_456".to_string();
5440
5441 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5442
5443 let handler = vec![
5444 OpCode::CreateObject { dest: 2 },
5445 OpCode::LoadConstant {
5446 value: json!("pk_123"),
5447 dest: 1,
5448 },
5449 OpCode::LoadConstant {
5450 value: json!("deferred_value"),
5451 dest: 10,
5452 },
5453 OpCode::SetFieldWhen {
5454 object: 2,
5455 path: "entropy_value".to_string(),
5456 value: 10,
5457 when_instruction: "RevealIxState".to_string(),
5458 entity_name: "TestEntity".to_string(),
5459 key_reg: 1,
5460 condition_field: None,
5461 condition_op: None,
5462 condition_value: None,
5463 },
5464 ];
5465
5466 vm.execute_handler(
5467 &handler,
5468 &json!({}),
5469 "VarState",
5470 0,
5471 "TestEntity",
5472 None,
5473 None,
5474 )
5475 .unwrap();
5476
5477 assert!(
5478 vm.registers[2].get("entropy_value").is_none(),
5479 "Field should not be set when instruction hasn't been seen"
5480 );
5481
5482 let state = vm.states.get(&0).unwrap();
5483 let key = (signature.clone(), "RevealIxState".to_string());
5484 assert!(
5485 state.deferred_when_ops.contains_key(&key),
5486 "Operation should be queued"
5487 );
5488
5489 {
5490 let mut cache = state.recent_tx_instructions.lock().unwrap();
5491 let mut set = HashSet::new();
5492 set.insert("RevealIxState".to_string());
5493 cache.put(signature.clone(), set);
5494 }
5495
5496 let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5497 for op in deferred {
5498 vm.apply_deferred_when_op(0, &op, None, None).unwrap();
5499 }
5500
5501 let state = vm.states.get(&0).unwrap();
5502 let entity = state.data.get(&json!("pk_123")).unwrap();
5503 assert_eq!(
5504 entity.get("entropy_value").unwrap(),
5505 "deferred_value",
5506 "Field should be set after instruction arrives"
5507 );
5508 }
5509
5510 #[test]
5511 fn test_when_cleanup_expired() {
5512 let mut vm = VmContext::new();
5513
5514 let state = vm.states.get(&0).unwrap();
5515 let key = ("old_sig".to_string(), "SomeIxState".to_string());
5516 state.deferred_when_ops.insert(
5517 key,
5518 vec![DeferredWhenOperation {
5519 entity_name: "Test".to_string(),
5520 primary_key: json!("pk"),
5521 field_path: "field".to_string(),
5522 field_value: json!("value"),
5523 when_instruction: "SomeIxState".to_string(),
5524 signature: "old_sig".to_string(),
5525 slot: 0,
5526 deferred_at: 0,
5527 emit: true,
5528 }],
5529 );
5530
5531 let removed = vm.cleanup_expired_when_ops(0, 60);
5532
5533 assert_eq!(removed, 1, "Should have removed 1 expired op");
5534 assert!(
5535 vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5536 "Deferred ops should be empty after cleanup"
5537 );
5538 }
5539
5540 #[test]
5541 fn test_deferred_when_op_recomputes_dependent_fields() {
5542 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5543
5544 let mut vm = VmContext::new();
5545
5546 let computed_specs = vec![
5550 ComputedFieldSpec {
5551 target_path: "results.pre_reveal_rng".to_string(),
5552 result_type: "Option<u64>".to_string(),
5553 expression: ComputedExpr::FieldRef {
5554 path: "entropy.base_value".to_string(),
5555 },
5556 },
5557 ComputedFieldSpec {
5558 target_path: "results.pre_reveal_winning_square".to_string(),
5559 result_type: "Option<u64>".to_string(),
5560 expression: ComputedExpr::MethodCall {
5561 expr: Box::new(ComputedExpr::FieldRef {
5562 path: "results.pre_reveal_rng".to_string(),
5563 }),
5564 method: "map".to_string(),
5565 args: vec![ComputedExpr::Closure {
5566 param: "r".to_string(),
5567 body: Box::new(ComputedExpr::Binary {
5568 op: BinaryOp::Mod,
5569 left: Box::new(ComputedExpr::Var {
5570 name: "r".to_string(),
5571 }),
5572 right: Box::new(ComputedExpr::Literal {
5573 value: serde_json::json!(25),
5574 }),
5575 }),
5576 }],
5577 },
5578 },
5579 ];
5580
5581 let evaluator: Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync> =
5582 Box::new(VmContext::create_evaluator_from_specs(computed_specs));
5583
5584 let primary_key = json!("test_pk");
5587 let op = DeferredWhenOperation {
5588 entity_name: "TestEntity".to_string(),
5589 primary_key: primary_key.clone(),
5590 field_path: "entropy.base_value".to_string(),
5591 field_value: json!(100),
5592 when_instruction: "TestIxState".to_string(),
5593 signature: "test_sig".to_string(),
5594 slot: 100,
5595 deferred_at: 0,
5596 emit: true,
5597 };
5598
5599 let initial_state = json!({
5601 "results": {}
5602 });
5603 vm.states
5604 .get(&0)
5605 .unwrap()
5606 .insert_with_eviction(primary_key.clone(), initial_state);
5607
5608 let mutations = vm
5610 .apply_deferred_when_op(
5611 0,
5612 &op,
5613 Some(&evaluator),
5614 Some(&[
5615 "results.pre_reveal_rng".to_string(),
5616 "results.pre_reveal_winning_square".to_string(),
5617 ]),
5618 )
5619 .unwrap();
5620
5621 let state = vm.states.get(&0).unwrap();
5623 let entity = state.data.get(&primary_key).unwrap();
5624
5625 println!(
5626 "Entity state: {}",
5627 serde_json::to_string_pretty(&*entity).unwrap()
5628 );
5629
5630 assert_eq!(
5632 entity.get("entropy").and_then(|e| e.get("base_value")),
5633 Some(&json!(100)),
5634 "Base value should be set"
5635 );
5636
5637 let pre_reveal_rng = entity
5639 .get("results")
5640 .and_then(|r| r.get("pre_reveal_rng"))
5641 .cloned();
5642 let pre_reveal_winning_square = entity
5643 .get("results")
5644 .and_then(|r| r.get("pre_reveal_winning_square"))
5645 .cloned();
5646
5647 assert_eq!(
5648 pre_reveal_rng,
5649 Some(json!(100)),
5650 "pre_reveal_rng should be computed"
5651 );
5652 assert_eq!(
5653 pre_reveal_winning_square,
5654 Some(json!(0)),
5655 "pre_reveal_winning_square should be 100 % 25 = 0"
5656 );
5657
5658 assert!(!mutations.is_empty(), "Should have mutations");
5660 let mutation = &mutations[0];
5661 let patch = &mutation.patch;
5662
5663 assert!(
5664 patch
5665 .get("results")
5666 .and_then(|r| r.get("pre_reveal_rng"))
5667 .is_some(),
5668 "Mutation should include pre_reveal_rng"
5669 );
5670 assert!(
5671 patch
5672 .get("results")
5673 .and_then(|r| r.get("pre_reveal_winning_square"))
5674 .is_some(),
5675 "Mutation should include pre_reveal_winning_square"
5676 );
5677 }
5678}