1use crate::ast::{
2 self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3 ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use serde_json::{json, Value};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::num::NonZeroUsize;
12
13#[cfg(feature = "otel")]
14use tracing::instrument;
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub skip_resolvers: bool,
36 pub metadata: HashMap<String, Value>,
38}
39
40impl UpdateContext {
41 pub fn new(slot: u64, signature: String) -> Self {
43 Self {
44 slot: Some(slot),
45 signature: Some(signature),
46 timestamp: None,
47 write_version: None,
48 txn_index: None,
49 skip_resolvers: false,
50 metadata: HashMap::new(),
51 }
52 }
53
54 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
56 Self {
57 slot: Some(slot),
58 signature: Some(signature),
59 timestamp: Some(timestamp),
60 write_version: None,
61 txn_index: None,
62 skip_resolvers: false,
63 metadata: HashMap::new(),
64 }
65 }
66
67 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
69 Self {
70 slot: Some(slot),
71 signature: Some(signature),
72 timestamp: None,
73 write_version: Some(write_version),
74 txn_index: None,
75 skip_resolvers: false,
76 metadata: HashMap::new(),
77 }
78 }
79
80 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
82 Self {
83 slot: Some(slot),
84 signature: Some(signature),
85 timestamp: None,
86 write_version: None,
87 txn_index: Some(txn_index),
88 skip_resolvers: false,
89 metadata: HashMap::new(),
90 }
91 }
92
93 pub fn new_reprocessed(slot: u64, write_version: u64) -> Self {
97 Self {
98 slot: Some(slot),
99 signature: None,
100 timestamp: None,
101 write_version: Some(write_version),
102 txn_index: None,
103 skip_resolvers: true,
104 metadata: HashMap::new(),
105 }
106 }
107
108 pub fn timestamp(&self) -> i64 {
110 self.timestamp.unwrap_or_else(|| {
111 std::time::SystemTime::now()
112 .duration_since(std::time::UNIX_EPOCH)
113 .unwrap()
114 .as_secs() as i64
115 })
116 }
117
118 pub fn empty() -> Self {
120 Self::default()
121 }
122
123 pub fn is_account_update(&self) -> bool {
126 self.write_version.is_some() && self.txn_index.is_none()
127 }
128
129 pub fn is_instruction_update(&self) -> bool {
131 self.txn_index.is_some() && self.write_version.is_none()
132 }
133
134 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
135 self.metadata.insert(key, value);
136 self
137 }
138
139 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
141 self.metadata.get(key)
142 }
143
144 pub fn to_value(&self) -> Value {
146 let mut obj = serde_json::Map::new();
147 if let Some(slot) = self.slot {
148 obj.insert("slot".to_string(), json!(slot));
149 }
150 if let Some(ref sig) = self.signature {
151 obj.insert("signature".to_string(), json!(sig));
152 }
153 obj.insert("timestamp".to_string(), json!(self.timestamp()));
155 for (key, value) in &self.metadata {
156 obj.insert(key.clone(), value.clone());
157 }
158 Value::Object(obj)
159 }
160}
161
162pub type Register = usize;
163pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
164
165pub type RegisterValue = Value;
166
167pub trait ComputedFieldsEvaluator {
170 fn evaluate(&self, state: &mut Value) -> Result<()>;
171}
172
173const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
175const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
176const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
181
182const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
184const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
185
186const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
187
188const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
189
190const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
193
194const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
195
196const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
197
198const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 5_000;
199
200fn estimate_json_size(value: &Value) -> usize {
202 match value {
203 Value::Null => 4,
204 Value::Bool(_) => 5,
205 Value::Number(_) => 8,
206 Value::String(s) => s.len() + 2,
207 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
208 Value::Object(obj) => {
209 2 + obj
210 .iter()
211 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
212 .sum::<usize>()
213 }
214 }
215}
216
217#[derive(Debug, Clone)]
218pub struct CompiledPath {
219 pub segments: std::sync::Arc<[String]>,
220}
221
222impl CompiledPath {
223 pub fn new(path: &str) -> Self {
224 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
225 CompiledPath {
226 segments: segments.into(),
227 }
228 }
229
230 fn segments(&self) -> &[String] {
231 &self.segments
232 }
233}
234
235#[derive(Debug, Clone)]
238pub enum FieldChange {
239 Replaced,
241 Appended(Vec<Value>),
243}
244
245#[derive(Debug, Clone, Default)]
248pub struct DirtyTracker {
249 changes: HashMap<String, FieldChange>,
250}
251
252impl DirtyTracker {
253 pub fn new() -> Self {
255 Self {
256 changes: HashMap::new(),
257 }
258 }
259
260 pub fn mark_replaced(&mut self, path: &str) {
262 self.changes.insert(path.to_string(), FieldChange::Replaced);
264 }
265
266 pub fn mark_appended(&mut self, path: &str, value: Value) {
268 match self.changes.get_mut(path) {
269 Some(FieldChange::Appended(values)) => {
270 values.push(value);
272 }
273 Some(FieldChange::Replaced) => {
274 }
277 None => {
278 self.changes
280 .insert(path.to_string(), FieldChange::Appended(vec![value]));
281 }
282 }
283 }
284
285 pub fn is_empty(&self) -> bool {
287 self.changes.is_empty()
288 }
289
290 pub fn len(&self) -> usize {
292 self.changes.len()
293 }
294
295 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
297 self.changes.iter()
298 }
299
300 pub fn dirty_paths(&self) -> HashSet<String> {
302 self.changes.keys().cloned().collect()
303 }
304
305 pub fn into_changes(self) -> HashMap<String, FieldChange> {
307 self.changes
308 }
309
310 pub fn changes(&self) -> &HashMap<String, FieldChange> {
312 &self.changes
313 }
314
315 pub fn appended_paths(&self) -> Vec<String> {
317 self.changes
318 .iter()
319 .filter_map(|(path, change)| match change {
320 FieldChange::Appended(_) => Some(path.clone()),
321 FieldChange::Replaced => None,
322 })
323 .collect()
324 }
325}
326
327pub struct VmContext {
328 registers: Vec<RegisterValue>,
329 states: HashMap<u32, StateTable>,
330 pub instructions_executed: u64,
331 pub cache_hits: u64,
332 path_cache: HashMap<String, CompiledPath>,
333 pub pda_cache_hits: u64,
334 pub pda_cache_misses: u64,
335 pub pending_queue_size: u64,
336 resolver_requests: VecDeque<ResolverRequest>,
337 resolver_pending: HashMap<String, PendingResolverEntry>,
338 resolver_cache: LruCache<String, Value>,
339 pub resolver_cache_hits: u64,
340 pub resolver_cache_misses: u64,
341 current_context: Option<UpdateContext>,
342 warnings: Vec<String>,
343 last_pda_lookup_miss: Option<String>,
344 last_lookup_index_miss: Option<String>,
345 last_pda_registered: Option<String>,
346 last_lookup_index_keys: Vec<String>,
347 scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
348}
349
350#[derive(Debug)]
351pub struct LookupIndex {
352 index: std::sync::Mutex<LruCache<String, Value>>,
353}
354
355impl LookupIndex {
356 pub fn new() -> Self {
357 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
358 }
359
360 pub fn with_capacity(capacity: usize) -> Self {
361 LookupIndex {
362 index: std::sync::Mutex::new(LruCache::new(
363 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
364 )),
365 }
366 }
367
368 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
369 let key = value_to_cache_key(lookup_value);
370 self.index.lock().unwrap().get(&key).cloned()
371 }
372
373 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
374 let key = value_to_cache_key(&lookup_value);
375 self.index.lock().unwrap().put(key, primary_key);
376 }
377
378 pub fn remove(&self, lookup_value: &Value) {
381 let key = value_to_cache_key(lookup_value);
382 self.index.lock().unwrap().pop(&key);
383 }
384
385 pub fn len(&self) -> usize {
386 self.index.lock().unwrap().len()
387 }
388
389 pub fn is_empty(&self) -> bool {
390 self.index.lock().unwrap().is_empty()
391 }
392}
393
394impl Default for LookupIndex {
395 fn default() -> Self {
396 Self::new()
397 }
398}
399
400fn value_to_cache_key(value: &Value) -> String {
401 match value {
402 Value::String(s) => s.clone(),
403 Value::Number(n) => n.to_string(),
404 Value::Bool(b) => b.to_string(),
405 Value::Null => "null".to_string(),
406 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
407 }
408}
409
410fn resolver_type_key(resolver: &ResolverType) -> String {
411 match resolver {
412 ResolverType::Token => "token".to_string(),
413 ResolverType::Url(config) => match &config.url_source {
414 ast::UrlSource::FieldPath(path) => format!("url:{}", path),
415 ast::UrlSource::Template(parts) => {
416 let key: String = parts
417 .iter()
418 .map(|p| match p {
419 ast::UrlTemplatePart::Literal(s) => s.clone(),
420 ast::UrlTemplatePart::FieldRef(f) => format!("{{{}}}", f),
421 })
422 .collect();
423 format!("url:{}", key)
424 }
425 },
426 }
427}
428
429fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
430 format!(
431 "{}:{}",
432 resolver_type_key(resolver),
433 value_to_cache_key(input)
434 )
435}
436
437#[derive(Debug)]
438pub struct TemporalIndex {
439 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
440}
441
442impl Default for TemporalIndex {
443 fn default() -> Self {
444 Self::new()
445 }
446}
447
448impl TemporalIndex {
449 pub fn new() -> Self {
450 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
451 }
452
453 pub fn with_capacity(capacity: usize) -> Self {
454 TemporalIndex {
455 index: std::sync::Mutex::new(LruCache::new(
456 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
457 )),
458 }
459 }
460
461 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
462 let key = value_to_cache_key(lookup_value);
463 let mut cache = self.index.lock().unwrap();
464 if let Some(entries) = cache.get(&key) {
465 for i in (0..entries.len()).rev() {
466 if entries[i].1 <= timestamp {
467 return Some(entries[i].0.clone());
468 }
469 }
470 }
471 None
472 }
473
474 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
475 let key = value_to_cache_key(lookup_value);
476 let mut cache = self.index.lock().unwrap();
477 if let Some(entries) = cache.get(&key) {
478 if let Some(last) = entries.last() {
479 return Some(last.0.clone());
480 }
481 }
482 None
483 }
484
485 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
486 let key = value_to_cache_key(&lookup_value);
487 let mut cache = self.index.lock().unwrap();
488
489 let entries = cache.get_or_insert_mut(key, Vec::new);
490 entries.push((primary_key, timestamp));
491 entries.sort_by_key(|(_, ts)| *ts);
492
493 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
494 entries.retain(|(_, ts)| *ts >= cutoff);
495
496 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
497 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
498 entries.drain(0..excess);
499 }
500 }
501
502 pub fn len(&self) -> usize {
503 self.index.lock().unwrap().len()
504 }
505
506 pub fn is_empty(&self) -> bool {
507 self.index.lock().unwrap().is_empty()
508 }
509
510 pub fn total_entries(&self) -> usize {
511 self.index
512 .lock()
513 .unwrap()
514 .iter()
515 .map(|(_, entries)| entries.len())
516 .sum()
517 }
518
519 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
520 let mut cache = self.index.lock().unwrap();
521 let mut total_removed = 0;
522
523 for (_, entries) in cache.iter_mut() {
524 let original_len = entries.len();
525 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
526 total_removed += original_len - entries.len();
527 }
528
529 total_removed
530 }
531}
532
533#[derive(Debug)]
534pub struct PdaReverseLookup {
535 index: LruCache<String, String>,
537}
538
539impl PdaReverseLookup {
540 pub fn new(capacity: usize) -> Self {
541 PdaReverseLookup {
542 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
543 }
544 }
545
546 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
547 self.index.get(pda_address).cloned()
548 }
549
550 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
551 let evicted = if self.index.len() >= self.index.cap().get() {
552 self.index.peek_lru().map(|(k, _)| k.clone())
553 } else {
554 None
555 };
556
557 self.index.put(pda_address, seed_value);
558 evicted
559 }
560
561 pub fn len(&self) -> usize {
562 self.index.len()
563 }
564
565 pub fn is_empty(&self) -> bool {
566 self.index.is_empty()
567 }
568
569 pub fn contains(&self, pda_address: &str) -> bool {
570 self.index.peek(pda_address).is_some()
571 }
572}
573
574#[derive(Debug, Clone)]
576pub struct QueuedAccountUpdate {
577 pub pda_address: String,
578 pub account_type: String,
579 pub account_data: Value,
580 pub slot: u64,
581 pub write_version: u64,
582 pub signature: String,
583}
584
585#[derive(Debug, Clone)]
587pub struct PendingAccountUpdate {
588 pub account_type: String,
589 pub pda_address: String,
590 pub account_data: Value,
591 pub slot: u64,
592 pub write_version: u64,
593 pub signature: String,
594 pub queued_at: i64,
595 pub is_stale_reprocess: bool,
601}
602
603#[derive(Debug, Clone)]
605pub struct QueuedInstructionEvent {
606 pub pda_address: String,
607 pub event_type: String,
608 pub event_data: Value,
609 pub slot: u64,
610 pub signature: String,
611}
612
613#[derive(Debug, Clone)]
615pub struct PendingInstructionEvent {
616 pub event_type: String,
617 pub pda_address: String,
618 pub event_data: Value,
619 pub slot: u64,
620 pub signature: String,
621 pub queued_at: i64,
622}
623
624#[derive(Debug, Clone)]
625pub struct DeferredWhenOperation {
626 pub entity_name: String,
627 pub primary_key: Value,
628 pub field_path: String,
629 pub field_value: Value,
630 pub when_instruction: String,
631 pub signature: String,
632 pub slot: u64,
633 pub deferred_at: i64,
634 pub emit: bool,
635}
636
637#[derive(Debug, Clone)]
638pub struct ResolverRequest {
639 pub cache_key: String,
640 pub resolver: ResolverType,
641 pub input: Value,
642}
643
644#[derive(Debug, Clone)]
645pub struct ResolverTarget {
646 pub state_id: u32,
647 pub entity_name: String,
648 pub primary_key: Value,
649 pub extracts: Vec<ResolverExtractSpec>,
650}
651
652#[derive(Debug, Clone)]
653pub struct PendingResolverEntry {
654 pub resolver: ResolverType,
655 pub input: Value,
656 pub targets: Vec<ResolverTarget>,
657 pub queued_at: i64,
658}
659
660#[derive(Debug, Clone, PartialEq)]
661pub struct ScheduledCallback {
662 pub state_id: u32,
663 pub entity_name: String,
664 pub primary_key: Value,
665 pub resolver: ResolverType,
666 pub url_template: Option<Vec<ast::UrlTemplatePart>>,
667 pub input_value: Option<Value>,
668 pub input_path: Option<String>,
669 pub condition: Option<ast::ResolverCondition>,
670 pub strategy: ResolveStrategy,
671 pub extracts: Vec<ResolverExtractSpec>,
672 pub retry_count: u32,
673}
674
675impl PendingResolverEntry {
676 fn add_target(&mut self, target: ResolverTarget) {
677 if let Some(existing) = self.targets.iter_mut().find(|t| {
678 t.state_id == target.state_id
679 && t.entity_name == target.entity_name
680 && t.primary_key == target.primary_key
681 }) {
682 let mut seen = HashSet::new();
683 for extract in &existing.extracts {
684 seen.insert((extract.target_path.clone(), extract.source_path.clone()));
685 }
686
687 for extract in target.extracts {
688 let key = (extract.target_path.clone(), extract.source_path.clone());
689 if seen.insert(key) {
690 existing.extracts.push(extract);
691 }
692 }
693 } else {
694 self.targets.push(target);
695 }
696 }
697}
698
699#[derive(Debug, Clone)]
700pub struct PendingQueueStats {
701 pub total_updates: usize,
702 pub unique_pdas: usize,
703 pub oldest_age_seconds: i64,
704 pub largest_pda_queue_size: usize,
705 pub estimated_memory_bytes: usize,
706}
707
708#[derive(Debug, Clone, Default)]
709pub struct VmMemoryStats {
710 pub state_table_entity_count: usize,
711 pub state_table_max_entries: usize,
712 pub state_table_at_capacity: bool,
713 pub lookup_index_count: usize,
714 pub lookup_index_total_entries: usize,
715 pub temporal_index_count: usize,
716 pub temporal_index_total_entries: usize,
717 pub pda_reverse_lookup_count: usize,
718 pub pda_reverse_lookup_total_entries: usize,
719 pub version_tracker_entries: usize,
720 pub pending_queue_stats: Option<PendingQueueStats>,
721 pub path_cache_size: usize,
722}
723
724#[derive(Debug, Clone, Default)]
725pub struct CleanupResult {
726 pub pending_updates_removed: usize,
727 pub temporal_entries_removed: usize,
728}
729
730#[derive(Debug, Clone)]
731pub struct CapacityWarning {
732 pub current_entries: usize,
733 pub max_entries: usize,
734 pub entries_over_limit: usize,
735}
736
737#[derive(Debug, Clone)]
738pub struct StateTableConfig {
739 pub max_entries: usize,
740 pub max_array_length: usize,
741}
742
743impl Default for StateTableConfig {
744 fn default() -> Self {
745 Self {
746 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
747 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
748 }
749 }
750}
751
752#[derive(Debug)]
753pub struct VersionTracker {
754 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
755}
756
757impl VersionTracker {
758 pub fn new() -> Self {
759 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
760 }
761
762 pub fn with_capacity(capacity: usize) -> Self {
763 VersionTracker {
764 cache: std::sync::Mutex::new(LruCache::new(
765 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
766 )),
767 }
768 }
769
770 fn make_key(primary_key: &Value, event_type: &str) -> String {
771 format!("{}:{}", primary_key, event_type)
772 }
773
774 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
775 let key = Self::make_key(primary_key, event_type);
776 self.cache.lock().unwrap().get(&key).copied()
777 }
778
779 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
780 let key = Self::make_key(primary_key, event_type);
781 self.cache.lock().unwrap().put(key, (slot, ordering_value));
782 }
783
784 pub fn len(&self) -> usize {
785 self.cache.lock().unwrap().len()
786 }
787
788 pub fn is_empty(&self) -> bool {
789 self.cache.lock().unwrap().is_empty()
790 }
791}
792
793impl Default for VersionTracker {
794 fn default() -> Self {
795 Self::new()
796 }
797}
798
799#[derive(Debug)]
800pub struct StateTable {
801 pub data: DashMap<Value, Value>,
802 access_times: DashMap<Value, i64>,
803 pub lookup_indexes: HashMap<String, LookupIndex>,
804 pub temporal_indexes: HashMap<String, TemporalIndex>,
805 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
806 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
807 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
808 pub last_account_data: DashMap<String, PendingAccountUpdate>,
812 version_tracker: VersionTracker,
813 instruction_dedup_cache: VersionTracker,
814 config: StateTableConfig,
815 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
816 entity_name: String,
817 pub recent_tx_instructions:
818 std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
819 pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
820}
821
822impl StateTable {
823 pub fn is_at_capacity(&self) -> bool {
824 self.data.len() >= self.config.max_entries
825 }
826
827 pub fn entries_over_limit(&self) -> usize {
828 self.data.len().saturating_sub(self.config.max_entries)
829 }
830
831 pub fn max_array_length(&self) -> usize {
832 self.config.max_array_length
833 }
834
835 fn touch(&self, key: &Value) {
836 let now = std::time::SystemTime::now()
837 .duration_since(std::time::UNIX_EPOCH)
838 .unwrap()
839 .as_secs() as i64;
840 self.access_times.insert(key.clone(), now);
841 }
842
843 fn evict_lru(&self, count: usize) -> usize {
844 if count == 0 || self.data.is_empty() {
845 return 0;
846 }
847
848 let mut entries: Vec<(Value, i64)> = self
849 .access_times
850 .iter()
851 .map(|entry| (entry.key().clone(), *entry.value()))
852 .collect();
853
854 entries.sort_by_key(|(_, ts)| *ts);
855
856 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
857
858 let mut evicted = 0;
859 for key in to_evict {
860 self.data.remove(&key);
861 self.access_times.remove(&key);
862 evicted += 1;
863 }
864
865 #[cfg(feature = "otel")]
866 if evicted > 0 {
867 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
868 }
869
870 evicted
871 }
872
873 pub fn insert_with_eviction(&self, key: Value, value: Value) {
874 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
875 #[cfg(feature = "otel")]
876 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
877 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
878 self.evict_lru(to_evict.max(1));
879 }
880 self.data.insert(key.clone(), value);
881 self.touch(&key);
882 }
883
884 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
885 let result = self.data.get(key).map(|v| v.clone());
886 if result.is_some() {
887 self.touch(key);
888 }
889 result
890 }
891
892 pub fn is_fresh_update(
899 &self,
900 primary_key: &Value,
901 event_type: &str,
902 slot: u64,
903 ordering_value: u64,
904 ) -> bool {
905 let dominated = self
906 .version_tracker
907 .get(primary_key, event_type)
908 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
909 .unwrap_or(false);
910
911 if dominated {
912 return false;
913 }
914
915 self.version_tracker
916 .insert(primary_key, event_type, slot, ordering_value);
917 true
918 }
919
920 pub fn is_duplicate_instruction(
928 &self,
929 primary_key: &Value,
930 event_type: &str,
931 slot: u64,
932 txn_index: u64,
933 ) -> bool {
934 let is_duplicate = self
936 .instruction_dedup_cache
937 .get(primary_key, event_type)
938 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
939 .unwrap_or(false);
940
941 if is_duplicate {
942 return true;
943 }
944
945 self.instruction_dedup_cache
947 .insert(primary_key, event_type, slot, txn_index);
948 false
949 }
950}
951
952impl VmContext {
953 pub fn new() -> Self {
954 let mut vm = VmContext {
955 registers: vec![Value::Null; 256],
956 states: HashMap::new(),
957 instructions_executed: 0,
958 cache_hits: 0,
959 path_cache: HashMap::new(),
960 pda_cache_hits: 0,
961 pda_cache_misses: 0,
962 pending_queue_size: 0,
963 resolver_requests: VecDeque::new(),
964 resolver_pending: HashMap::new(),
965 resolver_cache: LruCache::new(
966 NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
967 .expect("capacity must be > 0"),
968 ),
969 resolver_cache_hits: 0,
970 resolver_cache_misses: 0,
971 current_context: None,
972 warnings: Vec::new(),
973 last_pda_lookup_miss: None,
974 last_lookup_index_miss: None,
975 last_pda_registered: None,
976 last_lookup_index_keys: Vec::new(),
977 scheduled_callbacks: Vec::new(),
978 };
979 vm.states.insert(
980 0,
981 StateTable {
982 data: DashMap::new(),
983 access_times: DashMap::new(),
984 lookup_indexes: HashMap::new(),
985 temporal_indexes: HashMap::new(),
986 pda_reverse_lookups: HashMap::new(),
987 pending_updates: DashMap::new(),
988 pending_instruction_events: DashMap::new(),
989 last_account_data: DashMap::new(),
990 version_tracker: VersionTracker::new(),
991 instruction_dedup_cache: VersionTracker::with_capacity(
992 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
993 ),
994 config: StateTableConfig::default(),
995 entity_name: String::new(),
996 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
997 NonZeroUsize::new(1000).unwrap(),
998 )),
999 deferred_when_ops: DashMap::new(),
1000 },
1001 );
1002
1003 vm
1004 }
1005
1006 pub fn new_multi_entity() -> Self {
1008 VmContext {
1009 registers: vec![Value::Null; 256],
1010 states: HashMap::new(),
1011 instructions_executed: 0,
1012 cache_hits: 0,
1013 path_cache: HashMap::new(),
1014 pda_cache_hits: 0,
1015 pda_cache_misses: 0,
1016 pending_queue_size: 0,
1017 resolver_requests: VecDeque::new(),
1018 resolver_pending: HashMap::new(),
1019 resolver_cache: LruCache::new(
1020 NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
1021 .expect("capacity must be > 0"),
1022 ),
1023 resolver_cache_hits: 0,
1024 resolver_cache_misses: 0,
1025 current_context: None,
1026 warnings: Vec::new(),
1027 last_pda_lookup_miss: None,
1028 last_lookup_index_miss: None,
1029 last_pda_registered: None,
1030 last_lookup_index_keys: Vec::new(),
1031 scheduled_callbacks: Vec::new(),
1032 }
1033 }
1034
1035 pub fn new_with_config(state_config: StateTableConfig) -> Self {
1036 let mut vm = VmContext {
1037 registers: vec![Value::Null; 256],
1038 states: HashMap::new(),
1039 instructions_executed: 0,
1040 cache_hits: 0,
1041 path_cache: HashMap::new(),
1042 pda_cache_hits: 0,
1043 pda_cache_misses: 0,
1044 pending_queue_size: 0,
1045 resolver_requests: VecDeque::new(),
1046 resolver_pending: HashMap::new(),
1047 resolver_cache: LruCache::new(
1048 NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
1049 .expect("capacity must be > 0"),
1050 ),
1051 resolver_cache_hits: 0,
1052 resolver_cache_misses: 0,
1053 current_context: None,
1054 warnings: Vec::new(),
1055 last_pda_lookup_miss: None,
1056 last_lookup_index_miss: None,
1057 last_pda_registered: None,
1058 last_lookup_index_keys: Vec::new(),
1059 scheduled_callbacks: Vec::new(),
1060 };
1061 vm.states.insert(
1062 0,
1063 StateTable {
1064 data: DashMap::new(),
1065 access_times: DashMap::new(),
1066 lookup_indexes: HashMap::new(),
1067 temporal_indexes: HashMap::new(),
1068 pda_reverse_lookups: HashMap::new(),
1069 pending_updates: DashMap::new(),
1070 pending_instruction_events: DashMap::new(),
1071 last_account_data: DashMap::new(),
1072 version_tracker: VersionTracker::new(),
1073 instruction_dedup_cache: VersionTracker::with_capacity(
1074 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1075 ),
1076 config: state_config,
1077 entity_name: "default".to_string(),
1078 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1079 NonZeroUsize::new(1000).unwrap(),
1080 )),
1081 deferred_when_ops: DashMap::new(),
1082 },
1083 );
1084 vm
1085 }
1086
1087 pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1088 self.resolver_requests.drain(..).collect()
1089 }
1090
1091 pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1092 std::mem::take(&mut self.scheduled_callbacks)
1093 }
1094
1095 pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1096 self.states.get(&state_id)?.get_and_touch(key)
1097 }
1098
1099 pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1100 if requests.is_empty() {
1101 return;
1102 }
1103
1104 self.resolver_requests.extend(requests);
1105 }
1106
1107 pub fn apply_resolver_result(
1108 &mut self,
1109 bytecode: &MultiEntityBytecode,
1110 cache_key: &str,
1111 resolved_value: Value,
1112 ) -> Result<Vec<Mutation>> {
1113 self.resolver_cache
1114 .put(cache_key.to_string(), resolved_value.clone());
1115
1116 let entry = match self.resolver_pending.remove(cache_key) {
1117 Some(entry) => entry,
1118 None => return Ok(Vec::new()),
1119 };
1120
1121 let mut mutations = Vec::new();
1122
1123 for target in entry.targets {
1124 if target.primary_key.is_null() {
1125 continue;
1126 }
1127
1128 let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1129 Some(bc) => bc,
1130 None => continue,
1131 };
1132
1133 let state = match self.states.get(&target.state_id) {
1134 Some(s) => s,
1135 None => continue,
1136 };
1137
1138 let mut entity_state = state
1139 .get_and_touch(&target.primary_key)
1140 .unwrap_or_else(|| json!({}));
1141
1142 let mut dirty_tracker = DirtyTracker::new();
1143 let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1144
1145 Self::apply_resolver_extractions_to_value(
1146 &mut entity_state,
1147 &resolved_value,
1148 &target.extracts,
1149 &mut dirty_tracker,
1150 &should_emit,
1151 )?;
1152
1153 if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1154 let old_values: Vec<_> = entity_bytecode
1155 .computed_paths
1156 .iter()
1157 .map(|path| Self::get_value_at_path(&entity_state, path))
1158 .collect();
1159
1160 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1161 let context_timestamp = self
1162 .current_context
1163 .as_ref()
1164 .map(|c| c.timestamp())
1165 .unwrap_or_else(|| {
1166 std::time::SystemTime::now()
1167 .duration_since(std::time::UNIX_EPOCH)
1168 .unwrap()
1169 .as_secs() as i64
1170 });
1171 let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1172
1173 if eval_result.is_ok() {
1174 let mut changed_fields = Vec::new();
1175 for (path, old_value) in
1176 entity_bytecode.computed_paths.iter().zip(old_values.iter())
1177 {
1178 let new_value = Self::get_value_at_path(&entity_state, path);
1179 let changed = new_value != *old_value;
1180 let will_emit = should_emit(path);
1181
1182 if changed && will_emit {
1183 dirty_tracker.mark_replaced(path);
1184 changed_fields.push(path.clone());
1185 }
1186 }
1187 }
1188 }
1189
1190 state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1191
1192 if dirty_tracker.is_empty() {
1193 continue;
1194 }
1195
1196 let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1197
1198 mutations.push(Mutation {
1199 export: target.entity_name.clone(),
1200 key: target.primary_key.clone(),
1201 patch,
1202 append: vec![],
1203 });
1204 }
1205
1206 Ok(mutations)
1207 }
1208
1209 pub fn enqueue_resolver_request(
1210 &mut self,
1211 cache_key: String,
1212 resolver: ResolverType,
1213 input: Value,
1214 target: ResolverTarget,
1215 ) {
1216 if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1217 entry.add_target(target);
1218 return;
1219 }
1220
1221 let queued_at = std::time::SystemTime::now()
1222 .duration_since(std::time::UNIX_EPOCH)
1223 .unwrap()
1224 .as_secs() as i64;
1225
1226 self.resolver_pending.insert(
1227 cache_key.clone(),
1228 PendingResolverEntry {
1229 resolver: resolver.clone(),
1230 input: input.clone(),
1231 targets: vec![target],
1232 queued_at,
1233 },
1234 );
1235
1236 self.resolver_requests.push_back(ResolverRequest {
1237 cache_key,
1238 resolver,
1239 input,
1240 });
1241 }
1242
1243 fn apply_resolver_extractions_to_value<F>(
1244 state: &mut Value,
1245 resolved_value: &Value,
1246 extracts: &[ResolverExtractSpec],
1247 dirty_tracker: &mut DirtyTracker,
1248 should_emit: &F,
1249 ) -> Result<()>
1250 where
1251 F: Fn(&str) -> bool,
1252 {
1253 for extract in extracts {
1254 let resolved = match extract.source_path.as_deref() {
1255 Some(path) => Self::get_value_at_path(resolved_value, path),
1256 None => Some(resolved_value.clone()),
1257 };
1258
1259 let Some(value) = resolved else {
1260 continue;
1261 };
1262
1263 let value = if let Some(transform) = &extract.transform {
1264 Self::apply_transformation(&value, transform)?
1265 } else {
1266 value
1267 };
1268
1269 Self::set_nested_field_value(state, &extract.target_path, value)?;
1270 if should_emit(&extract.target_path) {
1271 dirty_tracker.mark_replaced(&extract.target_path);
1272 }
1273 }
1274
1275 Ok(())
1276 }
1277
1278 fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1279 if tracker.is_empty() {
1280 return Ok(json!({}));
1281 }
1282
1283 let mut partial = serde_json::Map::new();
1284
1285 for (path, change) in tracker.iter() {
1286 let segments: Vec<&str> = path.split('.').collect();
1287
1288 let value_to_insert = match change {
1289 FieldChange::Replaced => {
1290 let mut current = state;
1291 let mut found = true;
1292
1293 for segment in &segments {
1294 match current.get(*segment) {
1295 Some(v) => current = v,
1296 None => {
1297 found = false;
1298 break;
1299 }
1300 }
1301 }
1302
1303 if !found {
1304 continue;
1305 }
1306 current.clone()
1307 }
1308 FieldChange::Appended(values) => Value::Array(values.clone()),
1309 };
1310
1311 let mut target = &mut partial;
1312 for (i, segment) in segments.iter().enumerate() {
1313 if i == segments.len() - 1 {
1314 target.insert(segment.to_string(), value_to_insert.clone());
1315 } else {
1316 target
1317 .entry(segment.to_string())
1318 .or_insert_with(|| json!({}));
1319 target = target
1320 .get_mut(*segment)
1321 .and_then(|v| v.as_object_mut())
1322 .ok_or("Failed to build nested structure")?;
1323 }
1324 }
1325 }
1326
1327 Ok(Value::Object(partial))
1328 }
1329
1330 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1333 self.states.get_mut(&state_id)
1334 }
1335
1336 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1338 &mut self.registers
1339 }
1340
1341 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1343 &self.path_cache
1344 }
1345
1346 pub fn current_context(&self) -> Option<&UpdateContext> {
1348 self.current_context.as_ref()
1349 }
1350
1351 pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1353 self.current_context = context;
1354 }
1355
1356 fn add_warning(&mut self, msg: String) {
1357 self.warnings.push(msg);
1358 }
1359
1360 pub fn take_warnings(&mut self) -> Vec<String> {
1361 std::mem::take(&mut self.warnings)
1362 }
1363
1364 pub fn has_warnings(&self) -> bool {
1365 !self.warnings.is_empty()
1366 }
1367
1368 pub fn update_state_from_register(
1369 &mut self,
1370 state_id: u32,
1371 key: Value,
1372 register: Register,
1373 ) -> Result<()> {
1374 let state = self.states.get(&state_id).ok_or("State table not found")?;
1375 let value = self.registers[register].clone();
1376 state.insert_with_eviction(key, value);
1377 Ok(())
1378 }
1379
1380 fn reset_registers(&mut self) {
1381 for reg in &mut self.registers {
1382 *reg = Value::Null;
1383 }
1384 }
1385
1386 pub fn extract_partial_state(
1388 &self,
1389 state_reg: Register,
1390 dirty_fields: &HashSet<String>,
1391 ) -> Result<Value> {
1392 let full_state = &self.registers[state_reg];
1393
1394 if dirty_fields.is_empty() {
1395 return Ok(json!({}));
1396 }
1397
1398 let mut partial = serde_json::Map::new();
1399
1400 for path in dirty_fields {
1401 let segments: Vec<&str> = path.split('.').collect();
1402
1403 let mut current = full_state;
1404 let mut found = true;
1405
1406 for segment in &segments {
1407 match current.get(segment) {
1408 Some(v) => current = v,
1409 None => {
1410 found = false;
1411 break;
1412 }
1413 }
1414 }
1415
1416 if !found {
1417 continue;
1418 }
1419
1420 let mut target = &mut partial;
1421 for (i, segment) in segments.iter().enumerate() {
1422 if i == segments.len() - 1 {
1423 target.insert(segment.to_string(), current.clone());
1424 } else {
1425 target
1426 .entry(segment.to_string())
1427 .or_insert_with(|| json!({}));
1428 target = target
1429 .get_mut(*segment)
1430 .and_then(|v| v.as_object_mut())
1431 .ok_or("Failed to build nested structure")?;
1432 }
1433 }
1434 }
1435
1436 Ok(Value::Object(partial))
1437 }
1438
1439 pub fn extract_partial_state_with_tracker(
1443 &self,
1444 state_reg: Register,
1445 tracker: &DirtyTracker,
1446 ) -> Result<Value> {
1447 let full_state = &self.registers[state_reg];
1448
1449 if tracker.is_empty() {
1450 return Ok(json!({}));
1451 }
1452
1453 let mut partial = serde_json::Map::new();
1454
1455 for (path, change) in tracker.iter() {
1456 let segments: Vec<&str> = path.split('.').collect();
1457
1458 let value_to_insert = match change {
1459 FieldChange::Replaced => {
1460 let mut current = full_state;
1461 let mut found = true;
1462
1463 for segment in &segments {
1464 match current.get(*segment) {
1465 Some(v) => current = v,
1466 None => {
1467 found = false;
1468 break;
1469 }
1470 }
1471 }
1472
1473 if !found {
1474 continue;
1475 }
1476 current.clone()
1477 }
1478 FieldChange::Appended(values) => Value::Array(values.clone()),
1479 };
1480
1481 let mut target = &mut partial;
1482 for (i, segment) in segments.iter().enumerate() {
1483 if i == segments.len() - 1 {
1484 target.insert(segment.to_string(), value_to_insert.clone());
1485 } else {
1486 target
1487 .entry(segment.to_string())
1488 .or_insert_with(|| json!({}));
1489 target = target
1490 .get_mut(*segment)
1491 .and_then(|v| v.as_object_mut())
1492 .ok_or("Failed to build nested structure")?;
1493 }
1494 }
1495 }
1496
1497 Ok(Value::Object(partial))
1498 }
1499
1500 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1501 if let Some(compiled) = self.path_cache.get(path) {
1502 self.cache_hits += 1;
1503 #[cfg(feature = "otel")]
1504 crate::vm_metrics::record_path_cache_hit();
1505 return compiled.clone();
1506 }
1507 #[cfg(feature = "otel")]
1508 crate::vm_metrics::record_path_cache_miss();
1509 let compiled = CompiledPath::new(path);
1510 self.path_cache.insert(path.to_string(), compiled.clone());
1511 compiled
1512 }
1513
1514 #[cfg_attr(feature = "otel", instrument(
1516 name = "vm.process_event",
1517 skip(self, bytecode, event_value, log),
1518 level = "info",
1519 fields(
1520 event_type = %event_type,
1521 slot = context.as_ref().and_then(|c| c.slot),
1522 )
1523 ))]
1524 pub fn process_event(
1525 &mut self,
1526 bytecode: &MultiEntityBytecode,
1527 event_value: Value,
1528 event_type: &str,
1529 context: Option<&UpdateContext>,
1530 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1531 ) -> Result<Vec<Mutation>> {
1532 self.current_context = context.cloned();
1533
1534 let mut event_value = event_value;
1535 if let Some(ctx) = context {
1536 if let Some(obj) = event_value.as_object_mut() {
1537 obj.insert("__update_context".to_string(), ctx.to_value());
1538 }
1539 }
1540
1541 let mut all_mutations = Vec::new();
1542
1543 if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1544 if let Some(ctx) = context {
1545 if let Some(signature) = ctx.signature.clone() {
1546 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1547 for state_id in state_ids {
1548 if let Some(state) = self.states.get(&state_id) {
1549 {
1550 let mut cache = state.recent_tx_instructions.lock().unwrap();
1551 let entry =
1552 cache.get_or_insert_mut(signature.clone(), HashSet::new);
1553 entry.insert(event_type.to_string());
1554 }
1555
1556 let key = (signature.clone(), event_type.to_string());
1557 if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1558 tracing::debug!(
1559 event_type = %event_type,
1560 signature = %signature,
1561 deferred_count = deferred_ops.len(),
1562 "flushing deferred when-ops"
1563 );
1564 for op in deferred_ops {
1565 let (evaluator, computed_paths) = bytecode
1567 .entities
1568 .get(&op.entity_name)
1569 .map(|eb| {
1570 (
1571 eb.computed_fields_evaluator.as_ref(),
1572 eb.computed_paths.as_slice(),
1573 )
1574 })
1575 .unwrap_or((None, &[]));
1576
1577 match self.apply_deferred_when_op(
1578 state_id,
1579 &op,
1580 evaluator,
1581 Some(computed_paths),
1582 ) {
1583 Ok(mutations) => all_mutations.extend(mutations),
1584 Err(e) => tracing::warn!(
1585 "Failed to apply deferred when-op: {}",
1586 e
1587 ),
1588 }
1589 }
1590 }
1591 }
1592 }
1593 }
1594 }
1595 }
1596
1597 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1598 for entity_name in entity_names {
1599 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1600 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1601 if let Some(ref mut log) = log {
1602 log.set("entity", entity_name.clone());
1603 log.inc("handlers", 1);
1604 }
1605
1606 let opcodes_before = self.instructions_executed;
1607 let cache_before = self.cache_hits;
1608 let pda_hits_before = self.pda_cache_hits;
1609 let pda_misses_before = self.pda_cache_misses;
1610
1611 let mutations = self.execute_handler(
1612 handler,
1613 &event_value,
1614 event_type,
1615 entity_bytecode.state_id,
1616 entity_name,
1617 entity_bytecode.computed_fields_evaluator.as_ref(),
1618 Some(&entity_bytecode.non_emitted_fields),
1619 )?;
1620
1621 if let Some(ref mut log) = log {
1622 log.inc(
1623 "opcodes",
1624 (self.instructions_executed - opcodes_before) as i64,
1625 );
1626 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1627 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1628 log.inc(
1629 "pda_misses",
1630 (self.pda_cache_misses - pda_misses_before) as i64,
1631 );
1632 }
1633
1634 if mutations.is_empty() {
1635 let is_tx_event =
1638 event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1639 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1640 if is_tx_event {
1641 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1642 let signature = context
1643 .and_then(|c| c.signature.clone())
1644 .unwrap_or_default();
1645 let _ = self.queue_instruction_event(
1646 entity_bytecode.state_id,
1647 QueuedInstructionEvent {
1648 pda_address: missed_pda,
1649 event_type: event_type.to_string(),
1650 event_data: event_value.clone(),
1651 slot,
1652 signature,
1653 },
1654 );
1655 } else {
1656 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1661 let signature = context
1662 .and_then(|c| c.signature.clone())
1663 .unwrap_or_default();
1664 if let Some(write_version) =
1665 context.and_then(|c| c.write_version)
1666 {
1667 let _ = self.queue_account_update(
1668 entity_bytecode.state_id,
1669 QueuedAccountUpdate {
1670 pda_address: missed_pda,
1671 account_type: event_type.to_string(),
1672 account_data: event_value.clone(),
1673 slot,
1674 write_version,
1675 signature,
1676 },
1677 );
1678 } else {
1679 tracing::warn!(
1680 event_type = %event_type,
1681 "Dropping queued account update: write_version missing from context"
1682 );
1683 }
1684 }
1685 }
1686 if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1687 if !is_tx_event {
1688 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1689 let signature = context
1690 .and_then(|c| c.signature.clone())
1691 .unwrap_or_default();
1692 if let Some(write_version) =
1693 context.and_then(|c| c.write_version)
1694 {
1695 let _ = self.queue_account_update(
1696 entity_bytecode.state_id,
1697 QueuedAccountUpdate {
1698 pda_address: missed_lookup,
1699 account_type: event_type.to_string(),
1700 account_data: event_value.clone(),
1701 slot,
1702 write_version,
1703 signature,
1704 },
1705 );
1706 } else {
1707 tracing::trace!(
1708 event_type = %event_type,
1709 "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1710 );
1711 }
1712 }
1713 }
1714 }
1715
1716 all_mutations.extend(mutations);
1717
1718 if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1719 if let Some(ctx) = context {
1720 if let Some(ref signature) = ctx.signature {
1721 if let Some(state) = self.states.get(&entity_bytecode.state_id)
1722 {
1723 {
1724 let mut cache =
1725 state.recent_tx_instructions.lock().unwrap();
1726 let entry = cache
1727 .get_or_insert_mut(signature.clone(), HashSet::new);
1728 entry.insert(event_type.to_string());
1729 }
1730
1731 let key = (signature.clone(), event_type.to_string());
1732 if let Some((_, deferred_ops)) =
1733 state.deferred_when_ops.remove(&key)
1734 {
1735 tracing::debug!(
1736 event_type = %event_type,
1737 signature = %signature,
1738 deferred_count = deferred_ops.len(),
1739 "flushing deferred when-ops"
1740 );
1741 for op in deferred_ops {
1742 match self.apply_deferred_when_op(
1743 entity_bytecode.state_id,
1744 &op,
1745 entity_bytecode
1746 .computed_fields_evaluator
1747 .as_ref(),
1748 Some(&entity_bytecode.computed_paths),
1749 ) {
1750 Ok(mutations) => {
1751 all_mutations.extend(mutations)
1752 }
1753 Err(e) => {
1754 tracing::warn!(
1755 "Failed to apply deferred when-op: {}",
1756 e
1757 );
1758 }
1759 }
1760 }
1761 }
1762 }
1763 }
1764 }
1765 }
1766
1767 if let Some(registered_pda) = self.take_last_pda_registered() {
1768 let pending_events = self.flush_pending_instruction_events(
1769 entity_bytecode.state_id,
1770 ®istered_pda,
1771 );
1772 for pending in pending_events {
1773 if let Some(pending_handler) =
1774 entity_bytecode.handlers.get(&pending.event_type)
1775 {
1776 if let Ok(reprocessed_mutations) = self.execute_handler(
1777 pending_handler,
1778 &pending.event_data,
1779 &pending.event_type,
1780 entity_bytecode.state_id,
1781 entity_name,
1782 entity_bytecode.computed_fields_evaluator.as_ref(),
1783 Some(&entity_bytecode.non_emitted_fields),
1784 ) {
1785 all_mutations.extend(reprocessed_mutations);
1786 }
1787 }
1788 }
1789 }
1790
1791 let lookup_keys = self.take_last_lookup_index_keys();
1792 if !lookup_keys.is_empty() {
1793 tracing::info!(
1794 keys = ?lookup_keys,
1795 entity = %entity_name,
1796 "vm.process_event: flushing pending updates for lookup_keys"
1797 );
1798 }
1799 for lookup_key in lookup_keys {
1800 if let Ok(pending_updates) =
1801 self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1802 {
1803 for pending in pending_updates {
1804 if let Some(pending_handler) =
1805 entity_bytecode.handlers.get(&pending.account_type)
1806 {
1807 self.current_context = Some(UpdateContext::new_account(
1808 pending.slot,
1809 pending.signature.clone(),
1810 pending.write_version,
1811 ));
1812 match self.execute_handler(
1813 pending_handler,
1814 &pending.account_data,
1815 &pending.account_type,
1816 entity_bytecode.state_id,
1817 entity_name,
1818 entity_bytecode.computed_fields_evaluator.as_ref(),
1819 Some(&entity_bytecode.non_emitted_fields),
1820 ) {
1821 Ok(reprocessed) => {
1822 all_mutations.extend(reprocessed);
1823 }
1824 Err(e) => {
1825 tracing::warn!(
1826 error = %e,
1827 account_type = %pending.account_type,
1828 "Flushed event reprocessing failed"
1829 );
1830 }
1831 }
1832 }
1833 }
1834 }
1835 }
1836 } else if let Some(ref mut log) = log {
1837 log.set("skip_reason", "no_handler");
1838 }
1839 } else if let Some(ref mut log) = log {
1840 log.set("skip_reason", "entity_not_found");
1841 }
1842 }
1843 } else if let Some(ref mut log) = log {
1844 log.set("skip_reason", "no_event_routing");
1845 }
1846
1847 if let Some(log) = log {
1848 log.set("mutations", all_mutations.len() as i64);
1849 if let Some(first) = all_mutations.first() {
1850 if let Some(key_str) = first.key.as_str() {
1851 log.set("primary_key", key_str);
1852 } else if let Some(key_num) = first.key.as_u64() {
1853 log.set("primary_key", key_num as i64);
1854 }
1855 }
1856 if let Some(state) = self.states.get(&0) {
1857 log.set("state_table_size", state.data.len() as i64);
1858 }
1859
1860 let warnings = self.take_warnings();
1861 if !warnings.is_empty() {
1862 log.set("warnings", warnings.len() as i64);
1863 log.set(
1864 "warning_messages",
1865 Value::Array(warnings.into_iter().map(Value::String).collect()),
1866 );
1867 log.set_level(crate::canonical_log::LogLevel::Warn);
1868 }
1869 } else {
1870 self.warnings.clear();
1871 }
1872
1873 if self.instructions_executed.is_multiple_of(1000) {
1874 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1875 for state_id in state_ids {
1876 let expired = self.cleanup_expired_when_ops(state_id, 60);
1877 if expired > 0 {
1878 tracing::debug!(
1879 "Cleaned up {} expired deferred when-ops for state {}",
1880 expired,
1881 state_id
1882 );
1883 }
1884 }
1885 }
1886
1887 Ok(all_mutations)
1888 }
1889
1890 pub fn process_any(
1891 &mut self,
1892 bytecode: &MultiEntityBytecode,
1893 any: prost_types::Any,
1894 ) -> Result<Vec<Mutation>> {
1895 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1896 self.process_event(bytecode, event_value, &event_type, None, None)
1897 }
1898
1899 #[cfg_attr(feature = "otel", instrument(
1900 name = "vm.execute_handler",
1901 skip(self, handler, event_value, entity_evaluator),
1902 level = "debug",
1903 fields(
1904 event_type = %event_type,
1905 handler_opcodes = handler.len(),
1906 )
1907 ))]
1908 #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1909 fn execute_handler(
1910 &mut self,
1911 handler: &[OpCode],
1912 event_value: &Value,
1913 event_type: &str,
1914 override_state_id: u32,
1915 entity_name: &str,
1916 entity_evaluator: Option<
1917 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1918 >,
1919 non_emitted_fields: Option<&HashSet<String>>,
1920 ) -> Result<Vec<Mutation>> {
1921 self.reset_registers();
1922 self.last_pda_lookup_miss = None;
1923
1924 let mut pc: usize = 0;
1925 let mut output = Vec::new();
1926 let mut dirty_tracker = DirtyTracker::new();
1927 let should_emit = |path: &str| {
1928 non_emitted_fields
1929 .map(|fields| !fields.contains(path))
1930 .unwrap_or(true)
1931 };
1932
1933 while pc < handler.len() {
1934 match &handler[pc] {
1935 OpCode::LoadEventField {
1936 path,
1937 dest,
1938 default,
1939 } => {
1940 let value = self.load_field(event_value, path, default.as_ref())?;
1941 self.registers[*dest] = value;
1942 pc += 1;
1943 }
1944 OpCode::LoadConstant { value, dest } => {
1945 self.registers[*dest] = value.clone();
1946 pc += 1;
1947 }
1948 OpCode::CopyRegister { source, dest } => {
1949 self.registers[*dest] = self.registers[*source].clone();
1950 pc += 1;
1951 }
1952 OpCode::CopyRegisterIfNull { source, dest } => {
1953 if self.registers[*dest].is_null() {
1954 self.registers[*dest] = self.registers[*source].clone();
1955 }
1956 pc += 1;
1957 }
1958 OpCode::GetEventType { dest } => {
1959 self.registers[*dest] = json!(event_type);
1960 pc += 1;
1961 }
1962 OpCode::CreateObject { dest } => {
1963 self.registers[*dest] = json!({});
1964 pc += 1;
1965 }
1966 OpCode::SetField {
1967 object,
1968 path,
1969 value,
1970 } => {
1971 self.set_field_auto_vivify(*object, path, *value)?;
1972 if should_emit(path) {
1973 dirty_tracker.mark_replaced(path);
1974 }
1975 pc += 1;
1976 }
1977 OpCode::SetFields { object, fields } => {
1978 for (path, value_reg) in fields {
1979 self.set_field_auto_vivify(*object, path, *value_reg)?;
1980 if should_emit(path) {
1981 dirty_tracker.mark_replaced(path);
1982 }
1983 }
1984 pc += 1;
1985 }
1986 OpCode::GetField { object, path, dest } => {
1987 let value = self.get_field(*object, path)?;
1988 self.registers[*dest] = value;
1989 pc += 1;
1990 }
1991 OpCode::AbortIfNullKey {
1992 key,
1993 is_account_event,
1994 } => {
1995 let key_value = &self.registers[*key];
1996 if key_value.is_null() && *is_account_event {
1997 tracing::debug!(
1998 event_type = %event_type,
1999 "AbortIfNullKey: key is null for account state event, \
2000 returning empty mutations for queueing"
2001 );
2002 return Ok(Vec::new());
2003 }
2004
2005 pc += 1;
2006 }
2007 OpCode::ReadOrInitState {
2008 state_id: _,
2009 key,
2010 default,
2011 dest,
2012 } => {
2013 let actual_state_id = override_state_id;
2014 let entity_name_owned = entity_name.to_string();
2015 self.states
2016 .entry(actual_state_id)
2017 .or_insert_with(|| StateTable {
2018 data: DashMap::new(),
2019 access_times: DashMap::new(),
2020 lookup_indexes: HashMap::new(),
2021 temporal_indexes: HashMap::new(),
2022 pda_reverse_lookups: HashMap::new(),
2023 pending_updates: DashMap::new(),
2024 pending_instruction_events: DashMap::new(),
2025 last_account_data: DashMap::new(),
2026 version_tracker: VersionTracker::new(),
2027 instruction_dedup_cache: VersionTracker::with_capacity(
2028 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
2029 ),
2030 config: StateTableConfig::default(),
2031 entity_name: entity_name_owned,
2032 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
2033 NonZeroUsize::new(1000).unwrap(),
2034 )),
2035 deferred_when_ops: DashMap::new(),
2036 });
2037 let key_value = self.registers[*key].clone();
2038 let warn_null_key = key_value.is_null()
2040 && event_type.ends_with("State")
2041 && !event_type.ends_with("IxState")
2042 && !event_type.ends_with("CpiEvent");
2043
2044 if warn_null_key {
2045 self.add_warning(format!(
2046 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
2047 key, event_type
2048 ));
2049 }
2050
2051 let state = self
2052 .states
2053 .get(&actual_state_id)
2054 .ok_or("State table not found")?;
2055
2056 if !key_value.is_null() {
2057 if let Some(ctx) = &self.current_context {
2058 if ctx.is_account_update() {
2063 if let (Some(slot), Some(write_version)) =
2064 (ctx.slot, ctx.write_version)
2065 {
2066 let account_address = event_value
2068 .get("__account_address")
2069 .cloned()
2070 .unwrap_or_else(|| key_value.clone());
2071
2072 if !state.is_fresh_update(
2073 &account_address,
2074 event_type,
2075 slot,
2076 write_version,
2077 ) {
2078 self.add_warning(format!(
2079 "Stale account update skipped: slot={}, write_version={}, account={}",
2080 slot, write_version, account_address
2081 ));
2082 return Ok(Vec::new());
2083 }
2084 }
2085 }
2086 else if ctx.is_instruction_update() {
2088 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
2089 if state.is_duplicate_instruction(
2090 &key_value, event_type, slot, txn_index,
2091 ) {
2092 self.add_warning(format!(
2093 "Duplicate instruction skipped: slot={}, txn_index={}",
2094 slot, txn_index
2095 ));
2096 return Ok(Vec::new());
2097 }
2098 }
2099 }
2100 }
2101 }
2102 let value = state
2103 .get_and_touch(&key_value)
2104 .unwrap_or_else(|| default.clone());
2105
2106 self.registers[*dest] = value;
2107 pc += 1;
2108 }
2109 OpCode::UpdateState {
2110 state_id: _,
2111 key,
2112 value,
2113 } => {
2114 let actual_state_id = override_state_id;
2115 let state = self
2116 .states
2117 .get(&actual_state_id)
2118 .ok_or("State table not found")?;
2119 let key_value = self.registers[*key].clone();
2120 let value_data = self.registers[*value].clone();
2121
2122 state.insert_with_eviction(key_value, value_data);
2123 pc += 1;
2124 }
2125 OpCode::AppendToArray {
2126 object,
2127 path,
2128 value,
2129 } => {
2130 let appended_value = self.registers[*value].clone();
2131 let max_len = self
2132 .states
2133 .get(&override_state_id)
2134 .map(|s| s.max_array_length())
2135 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2136 self.append_to_array(*object, path, *value, max_len)?;
2137 if should_emit(path) {
2138 dirty_tracker.mark_appended(path, appended_value);
2139 }
2140 pc += 1;
2141 }
2142 OpCode::GetCurrentTimestamp { dest } => {
2143 let timestamp = std::time::SystemTime::now()
2144 .duration_since(std::time::UNIX_EPOCH)
2145 .unwrap()
2146 .as_secs() as i64;
2147 self.registers[*dest] = json!(timestamp);
2148 pc += 1;
2149 }
2150 OpCode::CreateEvent { dest, event_value } => {
2151 let timestamp = std::time::SystemTime::now()
2152 .duration_since(std::time::UNIX_EPOCH)
2153 .unwrap()
2154 .as_secs() as i64;
2155
2156 let mut event_data = self.registers[*event_value].clone();
2158 if let Some(obj) = event_data.as_object_mut() {
2159 obj.remove("__update_context");
2160 }
2161
2162 let mut event = serde_json::Map::new();
2164 event.insert("timestamp".to_string(), json!(timestamp));
2165 event.insert("data".to_string(), event_data);
2166
2167 if let Some(ref ctx) = self.current_context {
2169 if let Some(slot) = ctx.slot {
2170 event.insert("slot".to_string(), json!(slot));
2171 }
2172 if let Some(ref signature) = ctx.signature {
2173 event.insert("signature".to_string(), json!(signature));
2174 }
2175 }
2176
2177 self.registers[*dest] = Value::Object(event);
2178 pc += 1;
2179 }
2180 OpCode::CreateCapture {
2181 dest,
2182 capture_value,
2183 } => {
2184 let timestamp = std::time::SystemTime::now()
2185 .duration_since(std::time::UNIX_EPOCH)
2186 .unwrap()
2187 .as_secs() as i64;
2188
2189 let capture_data = self.registers[*capture_value].clone();
2191
2192 let account_address = event_value
2194 .get("__account_address")
2195 .and_then(|v| v.as_str())
2196 .unwrap_or("")
2197 .to_string();
2198
2199 let mut capture = serde_json::Map::new();
2201 capture.insert("timestamp".to_string(), json!(timestamp));
2202 capture.insert("account_address".to_string(), json!(account_address));
2203 capture.insert("data".to_string(), capture_data);
2204
2205 if let Some(ref ctx) = self.current_context {
2207 if let Some(slot) = ctx.slot {
2208 capture.insert("slot".to_string(), json!(slot));
2209 }
2210 if let Some(ref signature) = ctx.signature {
2211 capture.insert("signature".to_string(), json!(signature));
2212 }
2213 }
2214
2215 self.registers[*dest] = Value::Object(capture);
2216 pc += 1;
2217 }
2218 OpCode::Transform {
2219 source,
2220 dest,
2221 transformation,
2222 } => {
2223 if source == dest {
2224 self.transform_in_place(*source, transformation)?;
2225 } else {
2226 let source_value = &self.registers[*source];
2227 let value = Self::apply_transformation(source_value, transformation)?;
2228 self.registers[*dest] = value;
2229 }
2230 pc += 1;
2231 }
2232 OpCode::EmitMutation {
2233 entity_name,
2234 key,
2235 state,
2236 } => {
2237 let primary_key = self.registers[*key].clone();
2238
2239 if primary_key.is_null() || dirty_tracker.is_empty() {
2240 let reason = if dirty_tracker.is_empty() {
2241 "no_fields_modified"
2242 } else {
2243 "null_primary_key"
2244 };
2245 self.add_warning(format!(
2246 "Skipping mutation for entity '{}': {} (dirty_fields={})",
2247 entity_name,
2248 reason,
2249 dirty_tracker.len()
2250 ));
2251 } else {
2252 let patch =
2253 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2254
2255 let append = dirty_tracker.appended_paths();
2256 let mutation = Mutation {
2257 export: entity_name.clone(),
2258 key: primary_key,
2259 patch,
2260 append,
2261 };
2262 output.push(mutation);
2263 }
2264 pc += 1;
2265 }
2266 OpCode::SetFieldIfNull {
2267 object,
2268 path,
2269 value,
2270 } => {
2271 let was_set = self.set_field_if_null(*object, path, *value)?;
2272 if was_set && should_emit(path) {
2273 dirty_tracker.mark_replaced(path);
2274 }
2275 pc += 1;
2276 }
2277 OpCode::SetFieldMax {
2278 object,
2279 path,
2280 value,
2281 } => {
2282 let was_updated = self.set_field_max(*object, path, *value)?;
2283 if was_updated && should_emit(path) {
2284 dirty_tracker.mark_replaced(path);
2285 }
2286 pc += 1;
2287 }
2288 OpCode::UpdateTemporalIndex {
2289 state_id: _,
2290 index_name,
2291 lookup_value,
2292 primary_key,
2293 timestamp,
2294 } => {
2295 let actual_state_id = override_state_id;
2296 let state = self
2297 .states
2298 .get_mut(&actual_state_id)
2299 .ok_or("State table not found")?;
2300 let index = state
2301 .temporal_indexes
2302 .entry(index_name.clone())
2303 .or_insert_with(TemporalIndex::new);
2304
2305 let lookup_val = self.registers[*lookup_value].clone();
2306 let pk_val = self.registers[*primary_key].clone();
2307 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2308 val
2309 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2310 val as i64
2311 } else {
2312 return Err(format!(
2313 "Timestamp must be a number (i64 or u64), got: {:?}",
2314 self.registers[*timestamp]
2315 )
2316 .into());
2317 };
2318
2319 index.insert(lookup_val, pk_val, ts_val);
2320 pc += 1;
2321 }
2322 OpCode::LookupTemporalIndex {
2323 state_id: _,
2324 index_name,
2325 lookup_value,
2326 timestamp,
2327 dest,
2328 } => {
2329 let actual_state_id = override_state_id;
2330 let state = self
2331 .states
2332 .get(&actual_state_id)
2333 .ok_or("State table not found")?;
2334 let lookup_val = &self.registers[*lookup_value];
2335
2336 let result = if self.registers[*timestamp].is_null() {
2337 if let Some(index) = state.temporal_indexes.get(index_name) {
2338 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2339 } else {
2340 Value::Null
2341 }
2342 } else {
2343 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2344 val
2345 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2346 val as i64
2347 } else {
2348 return Err(format!(
2349 "Timestamp must be a number (i64 or u64), got: {:?}",
2350 self.registers[*timestamp]
2351 )
2352 .into());
2353 };
2354
2355 if let Some(index) = state.temporal_indexes.get(index_name) {
2356 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2357 } else {
2358 Value::Null
2359 }
2360 };
2361
2362 self.registers[*dest] = result;
2363 pc += 1;
2364 }
2365 OpCode::UpdateLookupIndex {
2366 state_id: _,
2367 index_name,
2368 lookup_value,
2369 primary_key,
2370 } => {
2371 let actual_state_id = override_state_id;
2372 let state = self
2373 .states
2374 .get_mut(&actual_state_id)
2375 .ok_or("State table not found")?;
2376 let index = state
2377 .lookup_indexes
2378 .entry(index_name.clone())
2379 .or_insert_with(LookupIndex::new);
2380
2381 let lookup_val = self.registers[*lookup_value].clone();
2382 let pk_val = self.registers[*primary_key].clone();
2383
2384 index.insert(lookup_val.clone(), pk_val);
2385
2386 if let Some(key_str) = lookup_val.as_str() {
2388 self.last_lookup_index_keys.push(key_str.to_string());
2389 }
2390
2391 pc += 1;
2392 }
2393 OpCode::LookupIndex {
2394 state_id: _,
2395 index_name,
2396 lookup_value,
2397 dest,
2398 } => {
2399 let actual_state_id = override_state_id;
2400 let mut current_value = self.registers[*lookup_value].clone();
2401
2402 const MAX_CHAIN_DEPTH: usize = 5;
2403 let mut iterations = 0;
2404
2405 let final_result = if self.states.contains_key(&actual_state_id) {
2406 loop {
2407 iterations += 1;
2408 if iterations > MAX_CHAIN_DEPTH {
2409 break current_value;
2410 }
2411
2412 let resolved = self
2413 .states
2414 .get(&actual_state_id)
2415 .and_then(|state| {
2416 if let Some(index) = state.lookup_indexes.get(index_name) {
2417 if let Some(found) = index.lookup(¤t_value) {
2418 return Some(found);
2419 }
2420 }
2421
2422 for (name, index) in state.lookup_indexes.iter() {
2423 if name == index_name {
2424 continue;
2425 }
2426 if let Some(found) = index.lookup(¤t_value) {
2427 return Some(found);
2428 }
2429 }
2430
2431 None
2432 })
2433 .unwrap_or(Value::Null);
2434
2435 let mut resolved_from_pda = false;
2436 let resolved = if resolved.is_null() {
2437 if let Some(pda_str) = current_value.as_str() {
2438 resolved_from_pda = true;
2439 self.states
2440 .get_mut(&actual_state_id)
2441 .and_then(|state_mut| {
2442 state_mut
2443 .pda_reverse_lookups
2444 .get_mut("default_pda_lookup")
2445 })
2446 .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2447 .map(Value::String)
2448 .unwrap_or(Value::Null)
2449 } else {
2450 Value::Null
2451 }
2452 } else {
2453 resolved
2454 };
2455
2456 if resolved.is_null() {
2457 if iterations == 1 {
2458 if let Some(pda_str) = current_value.as_str() {
2459 self.last_pda_lookup_miss = Some(pda_str.to_string());
2460 }
2461 }
2462 break Value::Null;
2463 }
2464
2465 let can_chain =
2466 self.can_resolve_further(&resolved, actual_state_id, index_name);
2467
2468 if !can_chain {
2469 if resolved_from_pda {
2470 if let Some(resolved_str) = resolved.as_str() {
2471 self.last_lookup_index_miss =
2472 Some(resolved_str.to_string());
2473 }
2474 break Value::Null;
2475 }
2476 break resolved;
2477 }
2478
2479 current_value = resolved;
2480 }
2481 } else {
2482 Value::Null
2483 };
2484
2485 self.registers[*dest] = final_result;
2486 pc += 1;
2487 }
2488 OpCode::SetFieldSum {
2489 object,
2490 path,
2491 value,
2492 } => {
2493 let was_updated = self.set_field_sum(*object, path, *value)?;
2494 if was_updated && should_emit(path) {
2495 dirty_tracker.mark_replaced(path);
2496 }
2497 pc += 1;
2498 }
2499 OpCode::SetFieldIncrement { object, path } => {
2500 let was_updated = self.set_field_increment(*object, path)?;
2501 if was_updated && should_emit(path) {
2502 dirty_tracker.mark_replaced(path);
2503 }
2504 pc += 1;
2505 }
2506 OpCode::SetFieldMin {
2507 object,
2508 path,
2509 value,
2510 } => {
2511 let was_updated = self.set_field_min(*object, path, *value)?;
2512 if was_updated && should_emit(path) {
2513 dirty_tracker.mark_replaced(path);
2514 }
2515 pc += 1;
2516 }
2517 OpCode::AddToUniqueSet {
2518 state_id: _,
2519 set_name,
2520 value,
2521 count_object,
2522 count_path,
2523 } => {
2524 let value_to_add = self.registers[*value].clone();
2525
2526 let set_field_path = format!("__unique_set:{}", set_name);
2529
2530 let mut set: HashSet<Value> =
2532 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2533 if !existing.is_null() {
2534 serde_json::from_value(existing).unwrap_or_default()
2535 } else {
2536 HashSet::new()
2537 }
2538 } else {
2539 HashSet::new()
2540 };
2541
2542 let was_new = set.insert(value_to_add);
2544
2545 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2547 self.registers[100] = serde_json::to_value(set_as_vec)?;
2548 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2549
2550 if was_new {
2552 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2553 self.set_field_auto_vivify(*count_object, count_path, 100)?;
2554 if should_emit(count_path) {
2555 dirty_tracker.mark_replaced(count_path);
2556 }
2557 }
2558
2559 pc += 1;
2560 }
2561 OpCode::ConditionalSetField {
2562 object,
2563 path,
2564 value,
2565 condition_field,
2566 condition_op,
2567 condition_value,
2568 } => {
2569 let field_value = self.load_field(event_value, condition_field, None)?;
2570 let condition_met =
2571 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2572
2573 if condition_met {
2574 self.set_field_auto_vivify(*object, path, *value)?;
2575 if should_emit(path) {
2576 dirty_tracker.mark_replaced(path);
2577 }
2578 }
2579 pc += 1;
2580 }
2581 OpCode::SetFieldWhen {
2582 object,
2583 path,
2584 value,
2585 when_instruction,
2586 entity_name,
2587 key_reg,
2588 condition_field,
2589 condition_op,
2590 condition_value,
2591 } => {
2592 let actual_state_id = override_state_id;
2593 let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2594 condition_field.as_ref(),
2595 condition_op.as_ref(),
2596 condition_value.as_ref(),
2597 ) {
2598 let field_value = self.load_field(event_value, field, None)?;
2599 self.evaluate_comparison(&field_value, op, cond_value)?
2600 } else {
2601 true
2602 };
2603
2604 if !condition_met {
2605 pc += 1;
2606 continue;
2607 }
2608
2609 let signature = self
2610 .current_context
2611 .as_ref()
2612 .and_then(|c| c.signature.clone())
2613 .unwrap_or_default();
2614
2615 let emit = should_emit(path);
2616
2617 let instruction_seen = if !signature.is_empty() {
2618 if let Some(state) = self.states.get(&actual_state_id) {
2619 let mut cache = state.recent_tx_instructions.lock().unwrap();
2620 cache
2621 .get(&signature)
2622 .map(|set| set.contains(when_instruction))
2623 .unwrap_or(false)
2624 } else {
2625 false
2626 }
2627 } else {
2628 false
2629 };
2630
2631 if instruction_seen {
2632 self.set_field_auto_vivify(*object, path, *value)?;
2633 if emit {
2634 dirty_tracker.mark_replaced(path);
2635 }
2636 } else if !signature.is_empty() {
2637 let deferred = DeferredWhenOperation {
2638 entity_name: entity_name.clone(),
2639 primary_key: self.registers[*key_reg].clone(),
2640 field_path: path.clone(),
2641 field_value: self.registers[*value].clone(),
2642 when_instruction: when_instruction.clone(),
2643 signature: signature.clone(),
2644 slot: self
2645 .current_context
2646 .as_ref()
2647 .and_then(|c| c.slot)
2648 .unwrap_or(0),
2649 deferred_at: std::time::SystemTime::now()
2650 .duration_since(std::time::UNIX_EPOCH)
2651 .unwrap()
2652 .as_secs() as i64,
2653 emit,
2654 };
2655
2656 if let Some(state) = self.states.get(&actual_state_id) {
2657 let key = (signature, when_instruction.clone());
2658 state
2659 .deferred_when_ops
2660 .entry(key)
2661 .or_insert_with(Vec::new)
2662 .push(deferred);
2663 }
2664 }
2665
2666 pc += 1;
2667 }
2668 OpCode::SetFieldUnlessStopped {
2669 object,
2670 path,
2671 value,
2672 stop_field,
2673 stop_instruction,
2674 entity_name,
2675 key_reg: _,
2676 } => {
2677 let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2678 let stopped = stop_value.as_bool().unwrap_or(false);
2679
2680 if stopped {
2681 tracing::debug!(
2682 entity = %entity_name,
2683 field = %path,
2684 stop_field = %stop_field,
2685 stop_instruction = %stop_instruction,
2686 "stop flag set; skipping field update"
2687 );
2688 pc += 1;
2689 continue;
2690 }
2691
2692 self.set_field_auto_vivify(*object, path, *value)?;
2693 if should_emit(path) {
2694 dirty_tracker.mark_replaced(path);
2695 }
2696 pc += 1;
2697 }
2698 OpCode::ConditionalIncrement {
2699 object,
2700 path,
2701 condition_field,
2702 condition_op,
2703 condition_value,
2704 } => {
2705 let field_value = self.load_field(event_value, condition_field, None)?;
2706 let condition_met =
2707 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2708
2709 if condition_met {
2710 let was_updated = self.set_field_increment(*object, path)?;
2711 if was_updated && should_emit(path) {
2712 dirty_tracker.mark_replaced(path);
2713 }
2714 }
2715 pc += 1;
2716 }
2717 OpCode::EvaluateComputedFields {
2718 state,
2719 computed_paths,
2720 } => {
2721 if let Some(evaluator) = entity_evaluator {
2722 let old_values: Vec<_> = computed_paths
2723 .iter()
2724 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2725 .collect();
2726
2727 let state_value = &mut self.registers[*state];
2728 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2729 let context_timestamp = self
2730 .current_context
2731 .as_ref()
2732 .map(|c| c.timestamp())
2733 .unwrap_or_else(|| {
2734 std::time::SystemTime::now()
2735 .duration_since(std::time::UNIX_EPOCH)
2736 .unwrap()
2737 .as_secs() as i64
2738 });
2739 let eval_result = evaluator(state_value, context_slot, context_timestamp);
2740
2741 if eval_result.is_ok() {
2742 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2743 let new_value =
2744 Self::get_value_at_path(&self.registers[*state], path);
2745
2746 if new_value != *old_value && should_emit(path) {
2747 dirty_tracker.mark_replaced(path);
2748 }
2749 }
2750 }
2751 }
2752 pc += 1;
2753 }
2754 OpCode::QueueResolver {
2755 state_id: _,
2756 entity_name,
2757 resolver,
2758 input_path,
2759 input_value,
2760 url_template,
2761 strategy,
2762 extracts,
2763 condition,
2764 schedule_at,
2765 state,
2766 key,
2767 } => {
2768 let actual_state_id = override_state_id;
2769
2770 if self
2774 .current_context
2775 .as_ref()
2776 .map(|c| c.skip_resolvers)
2777 .unwrap_or(false)
2778 {
2779 pc += 1;
2780 continue;
2781 }
2782
2783 if let Some(cond) = condition {
2785 let field_val =
2786 Self::get_value_at_path(&self.registers[*state], &cond.field_path)
2787 .unwrap_or(Value::Null);
2788 let condition_met =
2789 self.evaluate_comparison(&field_val, &cond.op, &cond.value)?;
2790
2791 if !condition_met {
2792 pc += 1;
2793 continue;
2794 }
2795 }
2796
2797 if let Some(schedule_path) = schedule_at {
2799 let target_val =
2800 Self::get_value_at_path(&self.registers[*state], schedule_path);
2801
2802 match target_val.and_then(|v| v.as_u64()) {
2803 Some(target_slot) => {
2804 let current_slot = self
2805 .current_context
2806 .as_ref()
2807 .and_then(|ctx| ctx.slot)
2808 .unwrap_or(0);
2809 if current_slot < target_slot {
2810 let key_value = &self.registers[*key];
2811 if !key_value.is_null() {
2812 self.scheduled_callbacks.push((
2813 target_slot,
2814 ScheduledCallback {
2815 state_id: actual_state_id,
2816 entity_name: entity_name.clone(),
2817 primary_key: key_value.clone(),
2818 resolver: resolver.clone(),
2819 url_template: url_template.clone(),
2820 input_value: input_value.clone(),
2821 input_path: input_path.clone(),
2822 condition: condition.clone(),
2823 strategy: strategy.clone(),
2824 extracts: extracts.clone(),
2825 retry_count: 0,
2826 },
2827 ));
2828 }
2829 pc += 1;
2830 continue;
2831 }
2832 }
2834 None => {
2835 pc += 1;
2839 continue;
2840 }
2841 }
2842 }
2843
2844 let resolved_input = if let Some(template) = url_template {
2846 crate::scheduler::build_url_from_template(template, &self.registers[*state])
2847 .map(Value::String)
2848 } else if let Some(value) = input_value {
2849 Some(value.clone())
2850 } else if let Some(path) = input_path.as_ref() {
2851 Self::get_value_at_path(&self.registers[*state], path)
2852 } else {
2853 None
2854 };
2855
2856 if let Some(input) = resolved_input {
2857 let key_value = &self.registers[*key];
2858
2859 if input.is_null() || key_value.is_null() {
2860 pc += 1;
2861 continue;
2862 }
2863
2864 if matches!(strategy, ResolveStrategy::SetOnce)
2865 && extracts.iter().all(|extract| {
2869 match Self::get_value_at_path(
2870 &self.registers[*state],
2871 &extract.target_path,
2872 ) {
2873 Some(value) => !value.is_null(),
2874 None => false,
2875 }
2876 })
2877 {
2878 pc += 1;
2879 continue;
2880 }
2881
2882 let cache_key = resolver_cache_key(resolver, &input);
2883
2884 let cached_value = self.resolver_cache.get(&cache_key).cloned();
2885 if let Some(cached) = cached_value {
2886 self.resolver_cache_hits += 1;
2887 Self::apply_resolver_extractions_to_value(
2888 &mut self.registers[*state],
2889 &cached,
2890 extracts,
2891 &mut dirty_tracker,
2892 &should_emit,
2893 )?;
2894 } else {
2895 self.resolver_cache_misses += 1;
2896 let target = ResolverTarget {
2897 state_id: actual_state_id,
2898 entity_name: entity_name.clone(),
2899 primary_key: self.registers[*key].clone(),
2900 extracts: extracts.clone(),
2901 };
2902
2903 self.enqueue_resolver_request(
2904 cache_key,
2905 resolver.clone(),
2906 input,
2907 target,
2908 );
2909 }
2910 }
2911
2912 pc += 1;
2913 }
2914 OpCode::UpdatePdaReverseLookup {
2915 state_id: _,
2916 lookup_name,
2917 pda_address,
2918 primary_key,
2919 } => {
2920 let actual_state_id = override_state_id;
2921 let state = self
2922 .states
2923 .get_mut(&actual_state_id)
2924 .ok_or("State table not found")?;
2925
2926 let pda_val = self.registers[*pda_address].clone();
2927 let pk_val = self.registers[*primary_key].clone();
2928
2929 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2930 let pda_lookup = state
2931 .pda_reverse_lookups
2932 .entry(lookup_name.clone())
2933 .or_insert_with(|| {
2934 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2935 });
2936
2937 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2938 self.last_pda_registered = Some(pda_str.to_string());
2939 } else if !pk_val.is_null() {
2940 if let Some(pk_num) = pk_val.as_u64() {
2941 if let Some(pda_str) = pda_val.as_str() {
2942 let pda_lookup = state
2943 .pda_reverse_lookups
2944 .entry(lookup_name.clone())
2945 .or_insert_with(|| {
2946 PdaReverseLookup::new(
2947 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2948 )
2949 });
2950
2951 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2952 self.last_pda_registered = Some(pda_str.to_string());
2953 }
2954 }
2955 }
2956
2957 pc += 1;
2958 }
2959 }
2960
2961 self.instructions_executed += 1;
2962 }
2963
2964 Ok(output)
2965 }
2966
2967 fn load_field(
2968 &self,
2969 event_value: &Value,
2970 path: &FieldPath,
2971 default: Option<&Value>,
2972 ) -> Result<Value> {
2973 if path.segments.is_empty() {
2974 if let Some(obj) = event_value.as_object() {
2975 let filtered: serde_json::Map<String, Value> = obj
2976 .iter()
2977 .filter(|(k, _)| !k.starts_with("__"))
2978 .map(|(k, v)| (k.clone(), v.clone()))
2979 .collect();
2980 return Ok(Value::Object(filtered));
2981 }
2982 return Ok(event_value.clone());
2983 }
2984
2985 let mut current = event_value;
2986 for segment in path.segments.iter() {
2987 current = match current.get(segment) {
2988 Some(v) => v,
2989 None => {
2990 tracing::trace!(
2991 "load_field: segment={:?} not found in {:?}, returning default",
2992 segment,
2993 current
2994 );
2995 return Ok(default.cloned().unwrap_or(Value::Null));
2996 }
2997 };
2998 }
2999
3000 tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
3001 Ok(current.clone())
3002 }
3003
3004 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
3005 let mut current = value;
3006 for segment in path.split('.') {
3007 current = current.get(segment)?;
3008 }
3009 Some(current.clone())
3010 }
3011
3012 fn set_field_auto_vivify(
3013 &mut self,
3014 object_reg: Register,
3015 path: &str,
3016 value_reg: Register,
3017 ) -> Result<()> {
3018 let compiled = self.get_compiled_path(path);
3019 let segments = compiled.segments();
3020 let value = self.registers[value_reg].clone();
3021
3022 if !self.registers[object_reg].is_object() {
3023 self.registers[object_reg] = json!({});
3024 }
3025
3026 let obj = self.registers[object_reg]
3027 .as_object_mut()
3028 .ok_or("Not an object")?;
3029
3030 let mut current = obj;
3031 for (i, segment) in segments.iter().enumerate() {
3032 if i == segments.len() - 1 {
3033 current.insert(segment.to_string(), value);
3034 return Ok(());
3035 } else {
3036 current
3037 .entry(segment.to_string())
3038 .or_insert_with(|| json!({}));
3039 current = current
3040 .get_mut(segment)
3041 .and_then(|v| v.as_object_mut())
3042 .ok_or("Path collision: expected object")?;
3043 }
3044 }
3045
3046 Ok(())
3047 }
3048
3049 fn set_field_if_null(
3050 &mut self,
3051 object_reg: Register,
3052 path: &str,
3053 value_reg: Register,
3054 ) -> Result<bool> {
3055 let compiled = self.get_compiled_path(path);
3056 let segments = compiled.segments();
3057 let value = self.registers[value_reg].clone();
3058
3059 if value.is_null() {
3063 return Ok(false);
3064 }
3065
3066 if !self.registers[object_reg].is_object() {
3067 self.registers[object_reg] = json!({});
3068 }
3069
3070 let obj = self.registers[object_reg]
3071 .as_object_mut()
3072 .ok_or("Not an object")?;
3073
3074 let mut current = obj;
3075 for (i, segment) in segments.iter().enumerate() {
3076 if i == segments.len() - 1 {
3077 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
3078 current.insert(segment.to_string(), value);
3079 return Ok(true);
3080 }
3081 return Ok(false);
3082 } else {
3083 current
3084 .entry(segment.to_string())
3085 .or_insert_with(|| json!({}));
3086 current = current
3087 .get_mut(segment)
3088 .and_then(|v| v.as_object_mut())
3089 .ok_or("Path collision: expected object")?;
3090 }
3091 }
3092
3093 Ok(false)
3094 }
3095
3096 fn set_field_max(
3097 &mut self,
3098 object_reg: Register,
3099 path: &str,
3100 value_reg: Register,
3101 ) -> Result<bool> {
3102 let compiled = self.get_compiled_path(path);
3103 let segments = compiled.segments();
3104 let new_value = self.registers[value_reg].clone();
3105
3106 if !self.registers[object_reg].is_object() {
3107 self.registers[object_reg] = json!({});
3108 }
3109
3110 let obj = self.registers[object_reg]
3111 .as_object_mut()
3112 .ok_or("Not an object")?;
3113
3114 let mut current = obj;
3115 for (i, segment) in segments.iter().enumerate() {
3116 if i == segments.len() - 1 {
3117 let should_update = if let Some(current_value) = current.get(segment) {
3118 if current_value.is_null() {
3119 true
3120 } else {
3121 match (current_value.as_i64(), new_value.as_i64()) {
3122 (Some(current_val), Some(new_val)) => new_val > current_val,
3123 (Some(current_val), None) if new_value.as_u64().is_some() => {
3124 new_value.as_u64().unwrap() as i64 > current_val
3125 }
3126 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3127 new_val > current_value.as_u64().unwrap() as i64
3128 }
3129 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3130 (Some(current_val), Some(new_val)) => new_val > current_val,
3131 _ => match (current_value.as_f64(), new_value.as_f64()) {
3132 (Some(current_val), Some(new_val)) => new_val > current_val,
3133 _ => false,
3134 },
3135 },
3136 _ => false,
3137 }
3138 }
3139 } else {
3140 true
3141 };
3142
3143 if should_update {
3144 current.insert(segment.to_string(), new_value);
3145 return Ok(true);
3146 }
3147 return Ok(false);
3148 } else {
3149 current
3150 .entry(segment.to_string())
3151 .or_insert_with(|| json!({}));
3152 current = current
3153 .get_mut(segment)
3154 .and_then(|v| v.as_object_mut())
3155 .ok_or("Path collision: expected object")?;
3156 }
3157 }
3158
3159 Ok(false)
3160 }
3161
3162 fn set_field_sum(
3163 &mut self,
3164 object_reg: Register,
3165 path: &str,
3166 value_reg: Register,
3167 ) -> Result<bool> {
3168 let compiled = self.get_compiled_path(path);
3169 let segments = compiled.segments();
3170 let new_value = &self.registers[value_reg];
3171
3172 tracing::trace!(
3174 "set_field_sum: path={:?}, value={:?}, value_type={}",
3175 path,
3176 new_value,
3177 match new_value {
3178 serde_json::Value::Null => "null",
3179 serde_json::Value::Bool(_) => "bool",
3180 serde_json::Value::Number(_) => "number",
3181 serde_json::Value::String(_) => "string",
3182 serde_json::Value::Array(_) => "array",
3183 serde_json::Value::Object(_) => "object",
3184 }
3185 );
3186 let new_val_num = new_value
3187 .as_i64()
3188 .or_else(|| new_value.as_u64().map(|n| n as i64))
3189 .ok_or("Sum requires numeric value")?;
3190
3191 if !self.registers[object_reg].is_object() {
3192 self.registers[object_reg] = json!({});
3193 }
3194
3195 let obj = self.registers[object_reg]
3196 .as_object_mut()
3197 .ok_or("Not an object")?;
3198
3199 let mut current = obj;
3200 for (i, segment) in segments.iter().enumerate() {
3201 if i == segments.len() - 1 {
3202 let current_val = current
3203 .get(segment)
3204 .and_then(|v| {
3205 if v.is_null() {
3206 None
3207 } else {
3208 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3209 }
3210 })
3211 .unwrap_or(0);
3212
3213 let sum = current_val + new_val_num;
3214 current.insert(segment.to_string(), json!(sum));
3215 return Ok(true);
3216 } else {
3217 current
3218 .entry(segment.to_string())
3219 .or_insert_with(|| json!({}));
3220 current = current
3221 .get_mut(segment)
3222 .and_then(|v| v.as_object_mut())
3223 .ok_or("Path collision: expected object")?;
3224 }
3225 }
3226
3227 Ok(false)
3228 }
3229
3230 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3231 let compiled = self.get_compiled_path(path);
3232 let segments = compiled.segments();
3233
3234 if !self.registers[object_reg].is_object() {
3235 self.registers[object_reg] = json!({});
3236 }
3237
3238 let obj = self.registers[object_reg]
3239 .as_object_mut()
3240 .ok_or("Not an object")?;
3241
3242 let mut current = obj;
3243 for (i, segment) in segments.iter().enumerate() {
3244 if i == segments.len() - 1 {
3245 let current_val = current
3247 .get(segment)
3248 .and_then(|v| {
3249 if v.is_null() {
3250 None
3251 } else {
3252 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3253 }
3254 })
3255 .unwrap_or(0);
3256
3257 let incremented = current_val + 1;
3258 current.insert(segment.to_string(), json!(incremented));
3259 return Ok(true);
3260 } else {
3261 current
3262 .entry(segment.to_string())
3263 .or_insert_with(|| json!({}));
3264 current = current
3265 .get_mut(segment)
3266 .and_then(|v| v.as_object_mut())
3267 .ok_or("Path collision: expected object")?;
3268 }
3269 }
3270
3271 Ok(false)
3272 }
3273
3274 fn set_field_min(
3275 &mut self,
3276 object_reg: Register,
3277 path: &str,
3278 value_reg: Register,
3279 ) -> Result<bool> {
3280 let compiled = self.get_compiled_path(path);
3281 let segments = compiled.segments();
3282 let new_value = self.registers[value_reg].clone();
3283
3284 if !self.registers[object_reg].is_object() {
3285 self.registers[object_reg] = json!({});
3286 }
3287
3288 let obj = self.registers[object_reg]
3289 .as_object_mut()
3290 .ok_or("Not an object")?;
3291
3292 let mut current = obj;
3293 for (i, segment) in segments.iter().enumerate() {
3294 if i == segments.len() - 1 {
3295 let should_update = if let Some(current_value) = current.get(segment) {
3296 if current_value.is_null() {
3297 true
3298 } else {
3299 match (current_value.as_i64(), new_value.as_i64()) {
3300 (Some(current_val), Some(new_val)) => new_val < current_val,
3301 (Some(current_val), None) if new_value.as_u64().is_some() => {
3302 (new_value.as_u64().unwrap() as i64) < current_val
3303 }
3304 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3305 new_val < current_value.as_u64().unwrap() as i64
3306 }
3307 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3308 (Some(current_val), Some(new_val)) => new_val < current_val,
3309 _ => match (current_value.as_f64(), new_value.as_f64()) {
3310 (Some(current_val), Some(new_val)) => new_val < current_val,
3311 _ => false,
3312 },
3313 },
3314 _ => false,
3315 }
3316 }
3317 } else {
3318 true
3319 };
3320
3321 if should_update {
3322 current.insert(segment.to_string(), new_value);
3323 return Ok(true);
3324 }
3325 return Ok(false);
3326 } else {
3327 current
3328 .entry(segment.to_string())
3329 .or_insert_with(|| json!({}));
3330 current = current
3331 .get_mut(segment)
3332 .and_then(|v| v.as_object_mut())
3333 .ok_or("Path collision: expected object")?;
3334 }
3335 }
3336
3337 Ok(false)
3338 }
3339
3340 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3341 let compiled = self.get_compiled_path(path);
3342 let segments = compiled.segments();
3343 let mut current = &self.registers[object_reg];
3344
3345 for segment in segments {
3346 current = current
3347 .get(segment)
3348 .ok_or_else(|| format!("Field not found: {}", segment))?;
3349 }
3350
3351 Ok(current.clone())
3352 }
3353
3354 fn append_to_array(
3355 &mut self,
3356 object_reg: Register,
3357 path: &str,
3358 value_reg: Register,
3359 max_length: usize,
3360 ) -> Result<()> {
3361 let compiled = self.get_compiled_path(path);
3362 let segments = compiled.segments();
3363 let value = self.registers[value_reg].clone();
3364
3365 if !self.registers[object_reg].is_object() {
3366 self.registers[object_reg] = json!({});
3367 }
3368
3369 let obj = self.registers[object_reg]
3370 .as_object_mut()
3371 .ok_or("Not an object")?;
3372
3373 let mut current = obj;
3374 for (i, segment) in segments.iter().enumerate() {
3375 if i == segments.len() - 1 {
3376 current
3377 .entry(segment.to_string())
3378 .or_insert_with(|| json!([]));
3379 let arr = current
3380 .get_mut(segment)
3381 .and_then(|v| v.as_array_mut())
3382 .ok_or("Path is not an array")?;
3383 arr.push(value.clone());
3384
3385 if arr.len() > max_length {
3386 let excess = arr.len() - max_length;
3387 arr.drain(0..excess);
3388 }
3389 } else {
3390 current
3391 .entry(segment.to_string())
3392 .or_insert_with(|| json!({}));
3393 current = current
3394 .get_mut(segment)
3395 .and_then(|v| v.as_object_mut())
3396 .ok_or("Path collision: expected object")?;
3397 }
3398 }
3399
3400 Ok(())
3401 }
3402
3403 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3404 let value = &self.registers[reg];
3405 let transformed = Self::apply_transformation(value, transformation)?;
3406 self.registers[reg] = transformed;
3407 Ok(())
3408 }
3409
3410 fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3411 match transformation {
3412 Transformation::HexEncode => {
3413 if let Some(arr) = value.as_array() {
3414 let bytes: Vec<u8> = arr
3415 .iter()
3416 .filter_map(|v| v.as_u64().map(|n| n as u8))
3417 .collect();
3418 let hex = hex::encode(&bytes);
3419 Ok(json!(hex))
3420 } else if value.is_string() {
3421 Ok(value.clone())
3422 } else {
3423 Err("HexEncode requires an array of numbers".into())
3424 }
3425 }
3426 Transformation::HexDecode => {
3427 if let Some(s) = value.as_str() {
3428 let s = s.strip_prefix("0x").unwrap_or(s);
3429 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3430 Ok(json!(bytes))
3431 } else {
3432 Err("HexDecode requires a string".into())
3433 }
3434 }
3435 Transformation::Base58Encode => {
3436 if let Some(arr) = value.as_array() {
3437 let bytes: Vec<u8> = arr
3438 .iter()
3439 .filter_map(|v| v.as_u64().map(|n| n as u8))
3440 .collect();
3441 let encoded = bs58::encode(&bytes).into_string();
3442 Ok(json!(encoded))
3443 } else if value.is_string() {
3444 Ok(value.clone())
3445 } else {
3446 Err("Base58Encode requires an array of numbers".into())
3447 }
3448 }
3449 Transformation::Base58Decode => {
3450 if let Some(s) = value.as_str() {
3451 let bytes = bs58::decode(s)
3452 .into_vec()
3453 .map_err(|e| format!("Base58 decode error: {}", e))?;
3454 Ok(json!(bytes))
3455 } else {
3456 Err("Base58Decode requires a string".into())
3457 }
3458 }
3459 Transformation::ToString => Ok(json!(value.to_string())),
3460 Transformation::ToNumber => {
3461 if let Some(s) = value.as_str() {
3462 let n = s
3463 .parse::<i64>()
3464 .map_err(|e| format!("Parse error: {}", e))?;
3465 Ok(json!(n))
3466 } else {
3467 Ok(value.clone())
3468 }
3469 }
3470 }
3471 }
3472
3473 fn evaluate_comparison(
3474 &self,
3475 field_value: &Value,
3476 op: &ComparisonOp,
3477 condition_value: &Value,
3478 ) -> Result<bool> {
3479 use ComparisonOp::*;
3480
3481 match op {
3482 Equal => Ok(field_value == condition_value),
3483 NotEqual => Ok(field_value != condition_value),
3484 GreaterThan => {
3485 match (field_value.as_i64(), condition_value.as_i64()) {
3487 (Some(a), Some(b)) => Ok(a > b),
3488 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3489 (Some(a), Some(b)) => Ok(a > b),
3490 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3491 (Some(a), Some(b)) => Ok(a > b),
3492 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3493 },
3494 },
3495 }
3496 }
3497 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3498 (Some(a), Some(b)) => Ok(a >= b),
3499 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3500 (Some(a), Some(b)) => Ok(a >= b),
3501 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3502 (Some(a), Some(b)) => Ok(a >= b),
3503 _ => {
3504 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3505 }
3506 },
3507 },
3508 },
3509 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3510 (Some(a), Some(b)) => Ok(a < b),
3511 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3512 (Some(a), Some(b)) => Ok(a < b),
3513 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3514 (Some(a), Some(b)) => Ok(a < b),
3515 _ => Err("Cannot compare non-numeric values with LessThan".into()),
3516 },
3517 },
3518 },
3519 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3520 (Some(a), Some(b)) => Ok(a <= b),
3521 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3522 (Some(a), Some(b)) => Ok(a <= b),
3523 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3524 (Some(a), Some(b)) => Ok(a <= b),
3525 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3526 },
3527 },
3528 },
3529 }
3530 }
3531
3532 fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3533 if let Some(state) = self.states.get(&state_id) {
3534 if let Some(index) = state.lookup_indexes.get(index_name) {
3535 if index.lookup(value).is_some() {
3536 return true;
3537 }
3538 }
3539
3540 for (name, index) in state.lookup_indexes.iter() {
3541 if name == index_name {
3542 continue;
3543 }
3544 if index.lookup(value).is_some() {
3545 return true;
3546 }
3547 }
3548
3549 if let Some(pda_str) = value.as_str() {
3550 if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3551 if pda_lookup.contains(pda_str) {
3552 return true;
3553 }
3554 }
3555 }
3556 }
3557
3558 false
3559 }
3560
3561 #[allow(clippy::type_complexity)]
3562 fn apply_deferred_when_op(
3563 &mut self,
3564 state_id: u32,
3565 op: &DeferredWhenOperation,
3566 entity_evaluator: Option<
3567 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
3568 >,
3569 computed_paths: Option<&[String]>,
3570 ) -> Result<Vec<Mutation>> {
3571 let state = self.states.get(&state_id).ok_or("State not found")?;
3572
3573 if op.primary_key.is_null() {
3574 return Ok(vec![]);
3575 }
3576
3577 let mut entity_state = state
3578 .get_and_touch(&op.primary_key)
3579 .unwrap_or_else(|| json!({}));
3580
3581 let old_computed_values: Vec<_> = computed_paths
3583 .map(|paths| {
3584 paths
3585 .iter()
3586 .map(|path| Self::get_value_at_path(&entity_state, path))
3587 .collect()
3588 })
3589 .unwrap_or_default();
3590
3591 Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3592
3593 if let Some(evaluator) = entity_evaluator {
3595 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3596 let context_timestamp = self
3597 .current_context
3598 .as_ref()
3599 .map(|c| c.timestamp())
3600 .unwrap_or_else(|| {
3601 std::time::SystemTime::now()
3602 .duration_since(std::time::UNIX_EPOCH)
3603 .unwrap()
3604 .as_secs() as i64
3605 });
3606
3607 tracing::debug!(
3608 entity_name = %op.entity_name,
3609 primary_key = %op.primary_key,
3610 field_path = %op.field_path,
3611 "Re-evaluating computed fields after deferred when-op"
3612 );
3613
3614 if let Err(e) = evaluator(&mut entity_state, context_slot, context_timestamp) {
3615 tracing::warn!(
3616 entity_name = %op.entity_name,
3617 primary_key = %op.primary_key,
3618 error = %e,
3619 "Failed to evaluate computed fields after deferred when-op"
3620 );
3621 }
3622 }
3623
3624 state.insert_with_eviction(op.primary_key.clone(), entity_state.clone());
3625
3626 if !op.emit {
3627 return Ok(vec![]);
3628 }
3629
3630 let mut patch = json!({});
3631 Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3632
3633 if let Some(paths) = computed_paths {
3635 tracing::debug!(
3636 entity_name = %op.entity_name,
3637 primary_key = %op.primary_key,
3638 computed_paths_count = paths.len(),
3639 "Checking computed fields for changes after deferred when-op"
3640 );
3641 for (path, old_value) in paths.iter().zip(old_computed_values.iter()) {
3642 let new_value = Self::get_value_at_path(&entity_state, path);
3643 tracing::debug!(
3644 entity_name = %op.entity_name,
3645 primary_key = %op.primary_key,
3646 field_path = %path,
3647 old_value = ?old_value,
3648 new_value = ?new_value,
3649 "Comparing computed field values"
3650 );
3651 if let Some(ref new_val) = new_value {
3652 if Some(new_val) != old_value.as_ref() {
3653 Self::set_nested_field_value(&mut patch, path, new_val.clone())?;
3654 tracing::info!(
3655 entity_name = %op.entity_name,
3656 primary_key = %op.primary_key,
3657 field_path = %path,
3658 "Computed field changed after deferred when-op, including in mutation"
3659 );
3660 }
3661 }
3662 }
3663 }
3664
3665 Ok(vec![Mutation {
3666 export: op.entity_name.clone(),
3667 key: op.primary_key.clone(),
3668 patch,
3669 append: vec![],
3670 }])
3671 }
3672
3673 fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3674 let parts: Vec<&str> = path.split('.').collect();
3675 let mut current = obj;
3676
3677 for (i, part) in parts.iter().enumerate() {
3678 if i == parts.len() - 1 {
3679 if let Some(map) = current.as_object_mut() {
3680 map.insert(part.to_string(), value);
3681 return Ok(());
3682 }
3683 return Err("Cannot set field on non-object".into());
3684 }
3685
3686 if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3687 if let Some(map) = current.as_object_mut() {
3688 map.insert(part.to_string(), json!({}));
3689 }
3690 }
3691
3692 current = current.get_mut(*part).ok_or("Path navigation failed")?;
3693 }
3694
3695 Ok(())
3696 }
3697
3698 pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3699 let now = std::time::SystemTime::now()
3700 .duration_since(std::time::UNIX_EPOCH)
3701 .unwrap()
3702 .as_secs() as i64;
3703
3704 let state = match self.states.get(&state_id) {
3705 Some(s) => s,
3706 None => return 0,
3707 };
3708
3709 let mut removed = 0;
3710 state.deferred_when_ops.retain(|_, ops| {
3711 let before = ops.len();
3712 ops.retain(|op| now - op.deferred_at < max_age_secs);
3713 removed += before - ops.len();
3714 !ops.is_empty()
3715 });
3716
3717 removed
3718 }
3719
3720 #[cfg_attr(feature = "otel", instrument(
3729 name = "vm.update_pda_lookup",
3730 skip(self),
3731 fields(
3732 pda = %pda_address,
3733 seed = %seed_value,
3734 )
3735 ))]
3736 pub fn update_pda_reverse_lookup(
3737 &mut self,
3738 state_id: u32,
3739 lookup_name: &str,
3740 pda_address: String,
3741 seed_value: String,
3742 ) -> Result<Vec<PendingAccountUpdate>> {
3743 let state = self
3744 .states
3745 .get_mut(&state_id)
3746 .ok_or("State table not found")?;
3747
3748 let lookup = state
3749 .pda_reverse_lookups
3750 .entry(lookup_name.to_string())
3751 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3752
3753 let old_seed = lookup.index.peek(&pda_address).cloned();
3757 let mapping_changed = old_seed
3758 .as_ref()
3759 .map(|old| old != &seed_value)
3760 .unwrap_or(false);
3761
3762 if !mapping_changed && old_seed.is_none() {
3763 tracing::info!(
3764 pda = %pda_address,
3765 seed = %seed_value,
3766 "[PDA] First-time PDA reverse lookup established"
3767 );
3768 } else if !mapping_changed {
3769 tracing::debug!(
3770 pda = %pda_address,
3771 seed = %seed_value,
3772 "[PDA] PDA reverse lookup re-registered (same mapping)"
3773 );
3774 }
3775
3776 let evicted_pda = lookup.insert(pda_address.clone(), seed_value.clone());
3777
3778 if let Some(ref evicted) = evicted_pda {
3779 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3780 let count = evicted_updates.len();
3781 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3782 }
3783 }
3784
3785 let mut pending = self.flush_pending_updates(state_id, &pda_address)?;
3787
3788 if mapping_changed {
3796 if let Some(state) = self.states.get(&state_id) {
3797 for index in state.lookup_indexes.values() {
3799 index.remove(&Value::String(pda_address.clone()));
3800 }
3801
3802 if let Some((_, mut cached)) = state.last_account_data.remove(&pda_address) {
3803 tracing::info!(
3804 pda = %pda_address,
3805 old_seed = ?old_seed,
3806 new_seed = %seed_value,
3807 account_type = %cached.account_type,
3808 "PDA mapping changed — clearing stale indexes and reprocessing cached data"
3809 );
3810 cached.is_stale_reprocess = true;
3811 pending.push(cached);
3812 }
3813 }
3814 }
3815
3816 Ok(pending)
3817 }
3818
3819 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3824 let state = match self.states.get_mut(&state_id) {
3825 Some(s) => s,
3826 None => return 0,
3827 };
3828
3829 let now = std::time::SystemTime::now()
3830 .duration_since(std::time::UNIX_EPOCH)
3831 .unwrap()
3832 .as_secs() as i64;
3833
3834 let mut removed_count = 0;
3835
3836 state.pending_updates.retain(|_pda_address, updates| {
3838 let original_len = updates.len();
3839
3840 updates.retain(|update| {
3841 let age = now - update.queued_at;
3842 age <= PENDING_UPDATE_TTL_SECONDS
3843 });
3844
3845 removed_count += original_len - updates.len();
3846
3847 !updates.is_empty()
3849 });
3850
3851 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3853
3854 if removed_count > 0 {
3855 #[cfg(feature = "otel")]
3856 crate::vm_metrics::record_pending_updates_expired(
3857 removed_count as u64,
3858 &state.entity_name,
3859 );
3860 }
3861
3862 removed_count
3863 }
3864
3865 #[cfg_attr(feature = "otel", instrument(
3897 name = "vm.queue_account_update",
3898 skip(self, update),
3899 fields(
3900 pda = %update.pda_address,
3901 account_type = %update.account_type,
3902 slot = update.slot,
3903 )
3904 ))]
3905 pub fn queue_account_update(
3906 &mut self,
3907 state_id: u32,
3908 update: QueuedAccountUpdate,
3909 ) -> Result<()> {
3910 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3911 self.cleanup_expired_pending_updates(state_id);
3912 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3913 self.drop_oldest_pending_update(state_id)?;
3914 }
3915 }
3916
3917 let state = self
3918 .states
3919 .get_mut(&state_id)
3920 .ok_or("State table not found")?;
3921
3922 let pending = PendingAccountUpdate {
3923 account_type: update.account_type,
3924 pda_address: update.pda_address.clone(),
3925 account_data: update.account_data,
3926 slot: update.slot,
3927 write_version: update.write_version,
3928 signature: update.signature,
3929 queued_at: std::time::SystemTime::now()
3930 .duration_since(std::time::UNIX_EPOCH)
3931 .unwrap()
3932 .as_secs() as i64,
3933 is_stale_reprocess: false,
3934 };
3935
3936 let pda_address = pending.pda_address.clone();
3937 let slot = pending.slot;
3938
3939 let mut updates = state
3940 .pending_updates
3941 .entry(pda_address.clone())
3942 .or_insert_with(Vec::new);
3943
3944 let original_len = updates.len();
3945 updates.retain(|existing| existing.slot > slot);
3946 let removed_by_dedup = original_len - updates.len();
3947
3948 if removed_by_dedup > 0 {
3949 self.pending_queue_size = self
3950 .pending_queue_size
3951 .saturating_sub(removed_by_dedup as u64);
3952 }
3953
3954 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3955 updates.remove(0);
3956 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3957 }
3958
3959 updates.push(pending);
3960 #[cfg(feature = "otel")]
3961 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3962
3963 Ok(())
3964 }
3965
3966 pub fn queue_instruction_event(
3967 &mut self,
3968 state_id: u32,
3969 event: QueuedInstructionEvent,
3970 ) -> Result<()> {
3971 let state = self
3972 .states
3973 .get_mut(&state_id)
3974 .ok_or("State table not found")?;
3975
3976 let pda_address = event.pda_address.clone();
3977
3978 let pending = PendingInstructionEvent {
3979 event_type: event.event_type,
3980 pda_address: event.pda_address,
3981 event_data: event.event_data,
3982 slot: event.slot,
3983 signature: event.signature,
3984 queued_at: std::time::SystemTime::now()
3985 .duration_since(std::time::UNIX_EPOCH)
3986 .unwrap()
3987 .as_secs() as i64,
3988 };
3989
3990 let mut events = state
3991 .pending_instruction_events
3992 .entry(pda_address)
3993 .or_insert_with(Vec::new);
3994
3995 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3996 events.remove(0);
3997 }
3998
3999 events.push(pending);
4000
4001 Ok(())
4002 }
4003
4004 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
4005 self.last_pda_lookup_miss.take()
4006 }
4007
4008 pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
4009 self.last_lookup_index_miss.take()
4010 }
4011
4012 pub fn take_last_pda_registered(&mut self) -> Option<String> {
4013 self.last_pda_registered.take()
4014 }
4015
4016 pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
4017 std::mem::take(&mut self.last_lookup_index_keys)
4018 }
4019
4020 pub fn flush_pending_instruction_events(
4021 &mut self,
4022 state_id: u32,
4023 pda_address: &str,
4024 ) -> Vec<PendingInstructionEvent> {
4025 let state = match self.states.get_mut(&state_id) {
4026 Some(s) => s,
4027 None => return Vec::new(),
4028 };
4029
4030 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
4031 events
4032 } else {
4033 Vec::new()
4034 }
4035 }
4036
4037 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
4039 let state = self.states.get(&state_id)?;
4040
4041 let now = std::time::SystemTime::now()
4042 .duration_since(std::time::UNIX_EPOCH)
4043 .unwrap()
4044 .as_secs() as i64;
4045
4046 let mut total_updates = 0;
4047 let mut oldest_timestamp = now;
4048 let mut largest_pda_queue = 0;
4049 let mut estimated_memory = 0;
4050
4051 for entry in state.pending_updates.iter() {
4052 let (_, updates) = entry.pair();
4053 total_updates += updates.len();
4054 largest_pda_queue = largest_pda_queue.max(updates.len());
4055
4056 for update in updates.iter() {
4057 oldest_timestamp = oldest_timestamp.min(update.queued_at);
4058 estimated_memory += update.account_type.len() +
4060 update.pda_address.len() +
4061 update.signature.len() +
4062 16 + estimate_json_size(&update.account_data);
4064 }
4065 }
4066
4067 Some(PendingQueueStats {
4068 total_updates,
4069 unique_pdas: state.pending_updates.len(),
4070 oldest_age_seconds: now - oldest_timestamp,
4071 largest_pda_queue_size: largest_pda_queue,
4072 estimated_memory_bytes: estimated_memory,
4073 })
4074 }
4075
4076 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
4077 let mut stats = VmMemoryStats {
4078 path_cache_size: self.path_cache.len(),
4079 ..Default::default()
4080 };
4081
4082 if let Some(state) = self.states.get(&state_id) {
4083 stats.state_table_entity_count = state.data.len();
4084 stats.state_table_max_entries = state.config.max_entries;
4085 stats.state_table_at_capacity = state.is_at_capacity();
4086
4087 stats.lookup_index_count = state.lookup_indexes.len();
4088 stats.lookup_index_total_entries =
4089 state.lookup_indexes.values().map(|idx| idx.len()).sum();
4090
4091 stats.temporal_index_count = state.temporal_indexes.len();
4092 stats.temporal_index_total_entries = state
4093 .temporal_indexes
4094 .values()
4095 .map(|idx| idx.total_entries())
4096 .sum();
4097
4098 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
4099 stats.pda_reverse_lookup_total_entries = state
4100 .pda_reverse_lookups
4101 .values()
4102 .map(|lookup| lookup.len())
4103 .sum();
4104
4105 stats.version_tracker_entries = state.version_tracker.len();
4106
4107 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
4108 }
4109
4110 stats
4111 }
4112
4113 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
4114 let pending_removed = self.cleanup_expired_pending_updates(state_id);
4115 let temporal_removed = self.cleanup_temporal_indexes(state_id);
4116
4117 #[cfg(feature = "otel")]
4118 if let Some(state) = self.states.get(&state_id) {
4119 crate::vm_metrics::record_cleanup(
4120 pending_removed,
4121 temporal_removed,
4122 &state.entity_name,
4123 );
4124 }
4125
4126 CleanupResult {
4127 pending_updates_removed: pending_removed,
4128 temporal_entries_removed: temporal_removed,
4129 }
4130 }
4131
4132 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
4133 let state = match self.states.get_mut(&state_id) {
4134 Some(s) => s,
4135 None => return 0,
4136 };
4137
4138 let now = std::time::SystemTime::now()
4139 .duration_since(std::time::UNIX_EPOCH)
4140 .unwrap()
4141 .as_secs() as i64;
4142
4143 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
4144 let mut total_removed = 0;
4145
4146 for (_, index) in state.temporal_indexes.iter_mut() {
4147 total_removed += index.cleanup_expired(cutoff);
4148 }
4149
4150 total_removed
4151 }
4152
4153 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
4154 let state = self.states.get(&state_id)?;
4155
4156 if state.is_at_capacity() {
4157 Some(CapacityWarning {
4158 current_entries: state.data.len(),
4159 max_entries: state.config.max_entries,
4160 entries_over_limit: state.entries_over_limit(),
4161 })
4162 } else {
4163 None
4164 }
4165 }
4166
4167 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
4169 let state = self
4170 .states
4171 .get_mut(&state_id)
4172 .ok_or("State table not found")?;
4173
4174 let mut oldest_pda: Option<String> = None;
4175 let mut oldest_timestamp = i64::MAX;
4176
4177 for entry in state.pending_updates.iter() {
4179 let (pda, updates) = entry.pair();
4180 if let Some(update) = updates.first() {
4181 if update.queued_at < oldest_timestamp {
4182 oldest_timestamp = update.queued_at;
4183 oldest_pda = Some(pda.clone());
4184 }
4185 }
4186 }
4187
4188 if let Some(pda) = oldest_pda {
4190 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
4191 if !updates.is_empty() {
4192 updates.remove(0);
4193 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4194
4195 if updates.is_empty() {
4197 drop(updates);
4198 state.pending_updates.remove(&pda);
4199 }
4200 }
4201 }
4202 }
4203
4204 Ok(())
4205 }
4206
4207 fn flush_pending_updates(
4212 &mut self,
4213 state_id: u32,
4214 pda_address: &str,
4215 ) -> Result<Vec<PendingAccountUpdate>> {
4216 let state = self
4217 .states
4218 .get_mut(&state_id)
4219 .ok_or("State table not found")?;
4220
4221 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
4222 let count = pending_updates.len();
4223 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4224 #[cfg(feature = "otel")]
4225 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
4226 Ok(pending_updates)
4227 } else {
4228 Ok(Vec::new())
4229 }
4230 }
4231
4232 pub fn cache_last_account_data(
4238 &mut self,
4239 state_id: u32,
4240 pda_address: &str,
4241 update: PendingAccountUpdate,
4242 ) {
4243 if let Some(state) = self.states.get(&state_id) {
4244 state
4245 .last_account_data
4246 .insert(pda_address.to_string(), update);
4247 }
4248 }
4249
4250 pub fn try_pda_reverse_lookup(
4252 &mut self,
4253 state_id: u32,
4254 lookup_name: &str,
4255 pda_address: &str,
4256 ) -> Option<String> {
4257 let state = self.states.get_mut(&state_id)?;
4258
4259 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
4260 if let Some(value) = lookup.lookup(pda_address) {
4261 self.pda_cache_hits += 1;
4262 return Some(value);
4263 }
4264 }
4265
4266 self.pda_cache_misses += 1;
4267 None
4268 }
4269
4270 pub fn try_lookup_index_resolution(&self, state_id: u32, value: &Value) -> Option<Value> {
4273 let state = self.states.get(&state_id)?;
4274
4275 for index in state.lookup_indexes.values() {
4276 if let Some(resolved) = index.lookup(value) {
4277 return Some(resolved);
4278 }
4279 }
4280
4281 None
4282 }
4283
4284 pub fn try_chained_pda_lookup(
4289 &mut self,
4290 state_id: u32,
4291 lookup_name: &str,
4292 pda_address: &str,
4293 ) -> Option<String> {
4294 let pda_result = self.try_pda_reverse_lookup(state_id, lookup_name, pda_address)?;
4296
4297 let pda_value = Value::String(pda_result.clone());
4300 if let Some(resolved) = self.try_lookup_index_resolution(state_id, &pda_value) {
4301 resolved.as_str().map(|s| s.to_string())
4302 } else {
4303 pda_value.as_str().map(|s| s.to_string())
4305 }
4306 }
4307
4308 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4315 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4316 }
4317
4318 fn evaluate_computed_expr_with_env(
4320 &self,
4321 expr: &ComputedExpr,
4322 state: &Value,
4323 env: &std::collections::HashMap<String, Value>,
4324 ) -> Result<Value> {
4325 match expr {
4326 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4327
4328 ComputedExpr::Var { name } => env
4329 .get(name)
4330 .cloned()
4331 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4332
4333 ComputedExpr::Let { name, value, body } => {
4334 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4335 let mut new_env = env.clone();
4336 new_env.insert(name.clone(), val);
4337 self.evaluate_computed_expr_with_env(body, state, &new_env)
4338 }
4339
4340 ComputedExpr::If {
4341 condition,
4342 then_branch,
4343 else_branch,
4344 } => {
4345 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4346 if self.value_to_bool(&cond_val) {
4347 self.evaluate_computed_expr_with_env(then_branch, state, env)
4348 } else {
4349 self.evaluate_computed_expr_with_env(else_branch, state, env)
4350 }
4351 }
4352
4353 ComputedExpr::None => Ok(Value::Null),
4354
4355 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4356
4357 ComputedExpr::Slice { expr, start, end } => {
4358 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4359 match val {
4360 Value::Array(arr) => {
4361 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4362 Ok(Value::Array(slice))
4363 }
4364 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4365 }
4366 }
4367
4368 ComputedExpr::Index { expr, index } => {
4369 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4370 match val {
4371 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4372 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4373 }
4374 }
4375
4376 ComputedExpr::U64FromLeBytes { bytes } => {
4377 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4378 let byte_vec = self.value_to_bytes(&val)?;
4379 if byte_vec.len() < 8 {
4380 return Err(format!(
4381 "u64::from_le_bytes requires 8 bytes, got {}",
4382 byte_vec.len()
4383 )
4384 .into());
4385 }
4386 let arr: [u8; 8] = byte_vec[..8]
4387 .try_into()
4388 .map_err(|_| "Failed to convert to [u8; 8]")?;
4389 Ok(Value::Number(serde_json::Number::from(u64::from_le_bytes(
4390 arr,
4391 ))))
4392 }
4393
4394 ComputedExpr::U64FromBeBytes { bytes } => {
4395 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4396 let byte_vec = self.value_to_bytes(&val)?;
4397 if byte_vec.len() < 8 {
4398 return Err(format!(
4399 "u64::from_be_bytes requires 8 bytes, got {}",
4400 byte_vec.len()
4401 )
4402 .into());
4403 }
4404 let arr: [u8; 8] = byte_vec[..8]
4405 .try_into()
4406 .map_err(|_| "Failed to convert to [u8; 8]")?;
4407 Ok(Value::Number(serde_json::Number::from(u64::from_be_bytes(
4408 arr,
4409 ))))
4410 }
4411
4412 ComputedExpr::ByteArray { bytes } => {
4413 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4414 }
4415
4416 ComputedExpr::Closure { param, body } => {
4417 Ok(json!({
4420 "__closure": {
4421 "param": param,
4422 "body": serde_json::to_value(body).unwrap_or(Value::Null)
4423 }
4424 }))
4425 }
4426
4427 ComputedExpr::Unary { op, expr } => {
4428 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4429 self.apply_unary_op(op, &val)
4430 }
4431
4432 ComputedExpr::JsonToBytes { expr } => {
4433 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4434 let bytes = self.value_to_bytes(&val)?;
4436 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4437 }
4438
4439 ComputedExpr::UnwrapOr { expr, default } => {
4440 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4441 if val.is_null() {
4442 Ok(default.clone())
4443 } else {
4444 Ok(val)
4445 }
4446 }
4447
4448 ComputedExpr::Binary { op, left, right } => {
4449 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4450 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4451 self.apply_binary_op(op, &l, &r)
4452 }
4453
4454 ComputedExpr::Cast { expr, to_type } => {
4455 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4456 self.apply_cast(&val, to_type)
4457 }
4458
4459 ComputedExpr::ResolverComputed {
4460 resolver,
4461 method,
4462 args,
4463 } => {
4464 let evaluated_args: Vec<Value> = args
4465 .iter()
4466 .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4467 .collect::<Result<Vec<_>>>()?;
4468 crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4469 }
4470
4471 ComputedExpr::MethodCall { expr, method, args } => {
4472 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4473 if method == "map" && args.len() == 1 {
4475 if let ComputedExpr::Closure { param, body } = &args[0] {
4476 if val.is_null() {
4478 return Ok(Value::Null);
4479 }
4480
4481 if let Value::Array(arr) = &val {
4482 let results: Result<Vec<Value>> = arr
4483 .iter()
4484 .map(|elem| {
4485 let mut closure_env = env.clone();
4486 closure_env.insert(param.clone(), elem.clone());
4487 self.evaluate_computed_expr_with_env(body, state, &closure_env)
4488 })
4489 .collect();
4490 return Ok(Value::Array(results?));
4491 }
4492
4493 let mut closure_env = env.clone();
4494 closure_env.insert(param.clone(), val);
4495 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4496 }
4497 }
4498 let evaluated_args: Vec<Value> = args
4499 .iter()
4500 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4501 .collect::<Result<Vec<_>>>()?;
4502 self.apply_method_call(&val, method, &evaluated_args)
4503 }
4504
4505 ComputedExpr::Literal { value } => Ok(value.clone()),
4506
4507 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4508
4509 ComputedExpr::ContextSlot => Ok(self
4510 .current_context
4511 .as_ref()
4512 .and_then(|ctx| ctx.slot)
4513 .map(|s| json!(s))
4514 .unwrap_or(Value::Null)),
4515
4516 ComputedExpr::ContextTimestamp => Ok(self
4517 .current_context
4518 .as_ref()
4519 .map(|ctx| json!(ctx.timestamp()))
4520 .unwrap_or(Value::Null)),
4521
4522 ComputedExpr::Keccak256 { expr } => {
4523 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4524 let bytes = self.value_to_bytes(&val)?;
4525 use sha3::{Digest, Keccak256};
4526 let hash = Keccak256::digest(&bytes);
4527 Ok(Value::Array(
4528 hash.to_vec().iter().map(|b| json!(*b)).collect(),
4529 ))
4530 }
4531 }
4532 }
4533
4534 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4536 match val {
4537 Value::Array(arr) => arr
4538 .iter()
4539 .map(|v| {
4540 v.as_u64()
4541 .map(|n| n as u8)
4542 .ok_or_else(|| "Array element not a valid byte".into())
4543 })
4544 .collect(),
4545 Value::String(s) => {
4546 if s.starts_with("0x") || s.starts_with("0X") {
4548 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4549 } else {
4550 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4551 }
4552 }
4553 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4554 }
4555 }
4556
4557 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4559 use crate::ast::UnaryOp;
4560 match op {
4561 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4562 UnaryOp::ReverseBits => match val.as_u64() {
4563 Some(n) => Ok(json!(n.reverse_bits())),
4564 None => match val.as_i64() {
4565 Some(n) => Ok(json!((n as u64).reverse_bits())),
4566 None => Err("reverse_bits requires an integer".into()),
4567 },
4568 },
4569 }
4570 }
4571
4572 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4574 let segments: Vec<&str> = path.split('.').collect();
4575 let mut current = state;
4576
4577 for segment in segments {
4578 match current.get(segment) {
4579 Some(v) => current = v,
4580 None => return Ok(Value::Null),
4581 }
4582 }
4583
4584 Ok(current.clone())
4585 }
4586
4587 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4589 match op {
4590 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4592 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4593 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4594 BinaryOp::Div => {
4595 if let Some(r) = right.as_i64() {
4597 if r == 0 {
4598 return Err("Division by zero".into());
4599 }
4600 }
4601 if let Some(r) = right.as_f64() {
4602 if r == 0.0 {
4603 return Err("Division by zero".into());
4604 }
4605 }
4606 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4607 }
4608 BinaryOp::Mod => {
4609 match (left.as_i64(), right.as_i64()) {
4611 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4612 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4613 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4614 _ => Err("Modulo requires non-zero integer operands".into()),
4615 },
4616 _ => Err("Modulo by zero".into()),
4617 }
4618 }
4619
4620 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4622 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4623 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4624 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4625 BinaryOp::Eq => Ok(json!(left == right)),
4626 BinaryOp::Ne => Ok(json!(left != right)),
4627
4628 BinaryOp::And => {
4630 let l_bool = self.value_to_bool(left);
4631 let r_bool = self.value_to_bool(right);
4632 Ok(json!(l_bool && r_bool))
4633 }
4634 BinaryOp::Or => {
4635 let l_bool = self.value_to_bool(left);
4636 let r_bool = self.value_to_bool(right);
4637 Ok(json!(l_bool || r_bool))
4638 }
4639
4640 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4642 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4643 _ => match (left.as_i64(), right.as_i64()) {
4644 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4645 _ => Err("XOR requires integer operands".into()),
4646 },
4647 },
4648 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4649 (Some(a), Some(b)) => Ok(json!(a & b)),
4650 _ => match (left.as_i64(), right.as_i64()) {
4651 (Some(a), Some(b)) => Ok(json!(a & b)),
4652 _ => Err("BitAnd requires integer operands".into()),
4653 },
4654 },
4655 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4656 (Some(a), Some(b)) => Ok(json!(a | b)),
4657 _ => match (left.as_i64(), right.as_i64()) {
4658 (Some(a), Some(b)) => Ok(json!(a | b)),
4659 _ => Err("BitOr requires integer operands".into()),
4660 },
4661 },
4662 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4663 (Some(a), Some(b)) => Ok(json!(a << b)),
4664 _ => match (left.as_i64(), right.as_i64()) {
4665 (Some(a), Some(b)) => Ok(json!(a << b)),
4666 _ => Err("Shl requires integer operands".into()),
4667 },
4668 },
4669 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4670 (Some(a), Some(b)) => Ok(json!(a >> b)),
4671 _ => match (left.as_i64(), right.as_i64()) {
4672 (Some(a), Some(b)) => Ok(json!(a >> b)),
4673 _ => Err("Shr requires integer operands".into()),
4674 },
4675 },
4676 }
4677 }
4678
4679 fn numeric_op<F1, F2>(
4681 &self,
4682 left: &Value,
4683 right: &Value,
4684 int_op: F1,
4685 float_op: F2,
4686 ) -> Result<Value>
4687 where
4688 F1: Fn(i64, i64) -> i64,
4689 F2: Fn(f64, f64) -> f64,
4690 {
4691 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4693 return Ok(json!(int_op(a, b)));
4694 }
4695
4696 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4698 return Ok(json!(int_op(a as i64, b as i64)));
4700 }
4701
4702 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4704 return Ok(json!(float_op(a, b)));
4705 }
4706
4707 if left.is_null() || right.is_null() {
4709 return Ok(Value::Null);
4710 }
4711
4712 Err(format!(
4713 "Cannot perform numeric operation on {:?} and {:?}",
4714 left, right
4715 )
4716 .into())
4717 }
4718
4719 fn comparison_op<F1, F2>(
4721 &self,
4722 left: &Value,
4723 right: &Value,
4724 int_cmp: F1,
4725 float_cmp: F2,
4726 ) -> Result<Value>
4727 where
4728 F1: Fn(i64, i64) -> bool,
4729 F2: Fn(f64, f64) -> bool,
4730 {
4731 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4733 return Ok(json!(int_cmp(a, b)));
4734 }
4735
4736 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4738 return Ok(json!(int_cmp(a as i64, b as i64)));
4739 }
4740
4741 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4743 return Ok(json!(float_cmp(a, b)));
4744 }
4745
4746 if left.is_null() || right.is_null() {
4748 return Ok(json!(false));
4749 }
4750
4751 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4752 }
4753
4754 fn value_to_bool(&self, value: &Value) -> bool {
4756 match value {
4757 Value::Null => false,
4758 Value::Bool(b) => *b,
4759 Value::Number(n) => {
4760 if let Some(i) = n.as_i64() {
4761 i != 0
4762 } else if let Some(f) = n.as_f64() {
4763 f != 0.0
4764 } else {
4765 true
4766 }
4767 }
4768 Value::String(s) => !s.is_empty(),
4769 Value::Array(arr) => !arr.is_empty(),
4770 Value::Object(obj) => !obj.is_empty(),
4771 }
4772 }
4773
4774 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4776 match to_type {
4777 "i8" | "i16" | "i32" | "i64" | "isize" => {
4778 if let Some(n) = value.as_i64() {
4779 Ok(json!(n))
4780 } else if let Some(n) = value.as_u64() {
4781 Ok(json!(n as i64))
4782 } else if let Some(n) = value.as_f64() {
4783 Ok(json!(n as i64))
4784 } else if let Some(s) = value.as_str() {
4785 s.parse::<i64>()
4786 .map(|n| json!(n))
4787 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4788 } else {
4789 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4790 }
4791 }
4792 "u8" | "u16" | "u32" | "u64" | "usize" => {
4793 if let Some(n) = value.as_u64() {
4794 Ok(json!(n))
4795 } else if let Some(n) = value.as_i64() {
4796 Ok(json!(n as u64))
4797 } else if let Some(n) = value.as_f64() {
4798 Ok(json!(n as u64))
4799 } else if let Some(s) = value.as_str() {
4800 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4801 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4802 })
4803 } else {
4804 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4805 }
4806 }
4807 "f32" | "f64" => {
4808 if let Some(n) = value.as_f64() {
4809 Ok(json!(n))
4810 } else if let Some(n) = value.as_i64() {
4811 Ok(json!(n as f64))
4812 } else if let Some(n) = value.as_u64() {
4813 Ok(json!(n as f64))
4814 } else if let Some(s) = value.as_str() {
4815 s.parse::<f64>()
4816 .map(|n| json!(n))
4817 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4818 } else {
4819 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4820 }
4821 }
4822 "String" | "string" => Ok(json!(value.to_string())),
4823 "bool" => Ok(json!(self.value_to_bool(value))),
4824 _ => {
4825 Ok(value.clone())
4827 }
4828 }
4829 }
4830
4831 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4833 match method {
4834 "unwrap_or" => {
4835 if value.is_null() && !args.is_empty() {
4836 Ok(args[0].clone())
4837 } else {
4838 Ok(value.clone())
4839 }
4840 }
4841 "unwrap_or_default" => {
4842 if value.is_null() {
4843 Ok(json!(0))
4845 } else {
4846 Ok(value.clone())
4847 }
4848 }
4849 "is_some" => Ok(json!(!value.is_null())),
4850 "is_none" => Ok(json!(value.is_null())),
4851 "abs" => {
4852 if let Some(n) = value.as_i64() {
4853 Ok(json!(n.abs()))
4854 } else if let Some(n) = value.as_f64() {
4855 Ok(json!(n.abs()))
4856 } else {
4857 Err(format!("Cannot call abs() on {:?}", value).into())
4858 }
4859 }
4860 "len" => {
4861 if let Some(s) = value.as_str() {
4862 Ok(json!(s.len()))
4863 } else if let Some(arr) = value.as_array() {
4864 Ok(json!(arr.len()))
4865 } else if let Some(obj) = value.as_object() {
4866 Ok(json!(obj.len()))
4867 } else {
4868 Err(format!("Cannot call len() on {:?}", value).into())
4869 }
4870 }
4871 "to_string" => Ok(json!(value.to_string())),
4872 "min" => {
4873 if args.is_empty() {
4874 return Err("min() requires an argument".into());
4875 }
4876 let other = &args[0];
4877 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4878 Ok(json!(a.min(b)))
4879 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4880 Ok(json!(a.min(b)))
4881 } else {
4882 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4883 }
4884 }
4885 "max" => {
4886 if args.is_empty() {
4887 return Err("max() requires an argument".into());
4888 }
4889 let other = &args[0];
4890 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4891 Ok(json!(a.max(b)))
4892 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4893 Ok(json!(a.max(b)))
4894 } else {
4895 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4896 }
4897 }
4898 "saturating_add" => {
4899 if args.is_empty() {
4900 return Err("saturating_add() requires an argument".into());
4901 }
4902 let other = &args[0];
4903 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4904 Ok(json!(a.saturating_add(b)))
4905 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4906 Ok(json!(a.saturating_add(b)))
4907 } else {
4908 Err(format!(
4909 "Cannot call saturating_add() on {:?} and {:?}",
4910 value, other
4911 )
4912 .into())
4913 }
4914 }
4915 "saturating_sub" => {
4916 if args.is_empty() {
4917 return Err("saturating_sub() requires an argument".into());
4918 }
4919 let other = &args[0];
4920 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4921 Ok(json!(a.saturating_sub(b)))
4922 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4923 Ok(json!(a.saturating_sub(b)))
4924 } else {
4925 Err(format!(
4926 "Cannot call saturating_sub() on {:?} and {:?}",
4927 value, other
4928 )
4929 .into())
4930 }
4931 }
4932 _ => Err(format!("Unknown method call: {}()", method).into()),
4933 }
4934 }
4935
4936 pub fn evaluate_computed_fields_from_ast(
4939 &self,
4940 state: &mut Value,
4941 computed_field_specs: &[ComputedFieldSpec],
4942 ) -> Result<Vec<String>> {
4943 let mut updated_paths = Vec::new();
4944
4945 crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
4946
4947 for spec in computed_field_specs {
4948 match self.evaluate_computed_expr(&spec.expression, state) {
4949 Ok(result) => {
4950 self.set_field_in_state(state, &spec.target_path, result)?;
4951 updated_paths.push(spec.target_path.clone());
4952 }
4953 Err(e) => {
4954 tracing::warn!(
4955 target_path = %spec.target_path,
4956 error = %e,
4957 "Failed to evaluate computed field"
4958 );
4959 }
4960 }
4961 }
4962
4963 Ok(updated_paths)
4964 }
4965
4966 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
4968 let segments: Vec<&str> = path.split('.').collect();
4969
4970 if segments.is_empty() {
4971 return Err("Empty path".into());
4972 }
4973
4974 let mut current = state;
4976 for (i, segment) in segments.iter().enumerate() {
4977 if i == segments.len() - 1 {
4978 if let Some(obj) = current.as_object_mut() {
4980 obj.insert(segment.to_string(), value);
4981 return Ok(());
4982 } else {
4983 return Err(format!("Cannot set field '{}' on non-object", segment).into());
4984 }
4985 } else {
4986 if !current.is_object() {
4988 *current = json!({});
4989 }
4990 let obj = current.as_object_mut().unwrap();
4991 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
4992 }
4993 }
4994
4995 Ok(())
4996 }
4997
4998 pub fn create_evaluator_from_specs(
5001 specs: Vec<ComputedFieldSpec>,
5002 ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
5003 move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
5004 let mut vm = VmContext::new();
5005 vm.current_context = Some(UpdateContext {
5006 slot: context_slot,
5007 timestamp: Some(context_timestamp),
5008 ..Default::default()
5009 });
5010 vm.evaluate_computed_fields_from_ast(state, &specs)?;
5011 Ok(())
5012 }
5013 }
5014}
5015
5016impl Default for VmContext {
5017 fn default() -> Self {
5018 Self::new()
5019 }
5020}
5021
5022impl crate::resolvers::ReverseLookupUpdater for VmContext {
5024 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
5025 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
5027 .unwrap_or_else(|e| {
5028 tracing::error!("Failed to update PDA reverse lookup: {}", e);
5029 Vec::new()
5030 })
5031 }
5032
5033 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
5034 self.flush_pending_updates(0, pda_address)
5036 .unwrap_or_else(|e| {
5037 tracing::error!("Failed to flush pending updates: {}", e);
5038 Vec::new()
5039 })
5040 }
5041}
5042
5043#[cfg(test)]
5044mod tests {
5045 use super::*;
5046 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5047
5048 #[test]
5049 fn test_computed_field_preserves_integer_type() {
5050 let vm = VmContext::new();
5051
5052 let mut state = serde_json::json!({
5053 "trading": {
5054 "total_buy_volume": 20000000000_i64,
5055 "total_sell_volume": 17951316474_i64
5056 }
5057 });
5058
5059 let spec = ComputedFieldSpec {
5060 target_path: "trading.total_volume".to_string(),
5061 result_type: "Option<u64>".to_string(),
5062 expression: ComputedExpr::Binary {
5063 op: BinaryOp::Add,
5064 left: Box::new(ComputedExpr::UnwrapOr {
5065 expr: Box::new(ComputedExpr::FieldRef {
5066 path: "trading.total_buy_volume".to_string(),
5067 }),
5068 default: serde_json::json!(0),
5069 }),
5070 right: Box::new(ComputedExpr::UnwrapOr {
5071 expr: Box::new(ComputedExpr::FieldRef {
5072 path: "trading.total_sell_volume".to_string(),
5073 }),
5074 default: serde_json::json!(0),
5075 }),
5076 },
5077 };
5078
5079 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
5080 .unwrap();
5081
5082 let total_volume = state
5083 .get("trading")
5084 .and_then(|t| t.get("total_volume"))
5085 .expect("total_volume should exist");
5086
5087 let serialized = serde_json::to_string(total_volume).unwrap();
5088 assert!(
5089 !serialized.contains('.'),
5090 "Integer should not have decimal point: {}",
5091 serialized
5092 );
5093 assert_eq!(
5094 total_volume.as_i64(),
5095 Some(37951316474),
5096 "Value should be correct sum"
5097 );
5098 }
5099
5100 #[test]
5101 fn test_set_field_sum_preserves_integer_type() {
5102 let mut vm = VmContext::new();
5103 vm.registers[0] = serde_json::json!({});
5104 vm.registers[1] = serde_json::json!(20000000000_i64);
5105 vm.registers[2] = serde_json::json!(17951316474_i64);
5106
5107 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
5108 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
5109
5110 let state = &vm.registers[0];
5111 let buy_vol = state
5112 .get("trading")
5113 .and_then(|t| t.get("total_buy_volume"))
5114 .unwrap();
5115 let sell_vol = state
5116 .get("trading")
5117 .and_then(|t| t.get("total_sell_volume"))
5118 .unwrap();
5119
5120 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
5121 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
5122
5123 assert!(
5124 !buy_serialized.contains('.'),
5125 "Buy volume should not have decimal: {}",
5126 buy_serialized
5127 );
5128 assert!(
5129 !sell_serialized.contains('.'),
5130 "Sell volume should not have decimal: {}",
5131 sell_serialized
5132 );
5133 }
5134
5135 #[test]
5136 fn test_lookup_index_chaining() {
5137 let mut vm = VmContext::new();
5138
5139 let state = vm.states.get_mut(&0).unwrap();
5140
5141 state
5142 .pda_reverse_lookups
5143 .entry("default_pda_lookup".to_string())
5144 .or_insert_with(|| PdaReverseLookup::new(1000))
5145 .insert("pda_123".to_string(), "addr_456".to_string());
5146
5147 state
5148 .lookup_indexes
5149 .entry("round_address_lookup_index".to_string())
5150 .or_insert_with(LookupIndex::new)
5151 .insert(json!("addr_456"), json!(789));
5152
5153 let handler = vec![
5154 OpCode::LoadConstant {
5155 value: json!("pda_123"),
5156 dest: 0,
5157 },
5158 OpCode::LookupIndex {
5159 state_id: 0,
5160 index_name: "round_address_lookup_index".to_string(),
5161 lookup_value: 0,
5162 dest: 1,
5163 },
5164 ];
5165
5166 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5167 .unwrap();
5168
5169 assert_eq!(vm.registers[1], json!(789));
5170 }
5171
5172 #[test]
5173 fn test_lookup_index_no_chain() {
5174 let mut vm = VmContext::new();
5175
5176 let state = vm.states.get_mut(&0).unwrap();
5177 state
5178 .lookup_indexes
5179 .entry("test_index".to_string())
5180 .or_insert_with(LookupIndex::new)
5181 .insert(json!("key_abc"), json!(42));
5182
5183 let handler = vec![
5184 OpCode::LoadConstant {
5185 value: json!("key_abc"),
5186 dest: 0,
5187 },
5188 OpCode::LookupIndex {
5189 state_id: 0,
5190 index_name: "test_index".to_string(),
5191 lookup_value: 0,
5192 dest: 1,
5193 },
5194 ];
5195
5196 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5197 .unwrap();
5198
5199 assert_eq!(vm.registers[1], json!(42));
5200 }
5201
5202 #[test]
5203 fn test_conditional_set_field_with_zero_array() {
5204 let mut vm = VmContext::new();
5205
5206 let event_zeros = json!({
5207 "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5208 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5209 });
5210
5211 let event_nonzero = json!({
5212 "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
5213 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
5214 });
5215
5216 let zero_32: Value = json!([
5217 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5218 0, 0, 0
5219 ]);
5220
5221 let handler = vec![
5222 OpCode::CreateObject { dest: 2 },
5223 OpCode::LoadEventField {
5224 path: FieldPath::new(&["value"]),
5225 dest: 10,
5226 default: None,
5227 },
5228 OpCode::ConditionalSetField {
5229 object: 2,
5230 path: "captured_value".to_string(),
5231 value: 10,
5232 condition_field: FieldPath::new(&["value"]),
5233 condition_op: ComparisonOp::NotEqual,
5234 condition_value: zero_32,
5235 },
5236 ];
5237
5238 vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
5239 .unwrap();
5240 assert!(
5241 vm.registers[2].get("captured_value").is_none(),
5242 "Field should not be set when value is all zeros"
5243 );
5244
5245 vm.reset_registers();
5246 vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
5247 .unwrap();
5248 assert!(
5249 vm.registers[2].get("captured_value").is_some(),
5250 "Field should be set when value is non-zero"
5251 );
5252 }
5253
5254 #[test]
5255 fn test_when_instruction_arrives_first() {
5256 let mut vm = VmContext::new();
5257
5258 let signature = "test_sig_123".to_string();
5259
5260 {
5261 let state = vm.states.get(&0).unwrap();
5262 let mut cache = state.recent_tx_instructions.lock().unwrap();
5263 let mut set = HashSet::new();
5264 set.insert("RevealIxState".to_string());
5265 cache.put(signature.clone(), set);
5266 }
5267
5268 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5269
5270 let handler = vec![
5271 OpCode::CreateObject { dest: 2 },
5272 OpCode::LoadConstant {
5273 value: json!("primary_key_value"),
5274 dest: 1,
5275 },
5276 OpCode::LoadConstant {
5277 value: json!("the_revealed_value"),
5278 dest: 10,
5279 },
5280 OpCode::SetFieldWhen {
5281 object: 2,
5282 path: "entropy_value".to_string(),
5283 value: 10,
5284 when_instruction: "RevealIxState".to_string(),
5285 entity_name: "TestEntity".to_string(),
5286 key_reg: 1,
5287 condition_field: None,
5288 condition_op: None,
5289 condition_value: None,
5290 },
5291 ];
5292
5293 vm.execute_handler(
5294 &handler,
5295 &json!({}),
5296 "VarState",
5297 0,
5298 "TestEntity",
5299 None,
5300 None,
5301 )
5302 .unwrap();
5303
5304 assert_eq!(
5305 vm.registers[2].get("entropy_value").unwrap(),
5306 "the_revealed_value",
5307 "Field should be set when instruction was already seen"
5308 );
5309 }
5310
5311 #[test]
5312 fn test_when_account_arrives_first() {
5313 let mut vm = VmContext::new();
5314
5315 let signature = "test_sig_456".to_string();
5316
5317 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5318
5319 let handler = vec![
5320 OpCode::CreateObject { dest: 2 },
5321 OpCode::LoadConstant {
5322 value: json!("pk_123"),
5323 dest: 1,
5324 },
5325 OpCode::LoadConstant {
5326 value: json!("deferred_value"),
5327 dest: 10,
5328 },
5329 OpCode::SetFieldWhen {
5330 object: 2,
5331 path: "entropy_value".to_string(),
5332 value: 10,
5333 when_instruction: "RevealIxState".to_string(),
5334 entity_name: "TestEntity".to_string(),
5335 key_reg: 1,
5336 condition_field: None,
5337 condition_op: None,
5338 condition_value: None,
5339 },
5340 ];
5341
5342 vm.execute_handler(
5343 &handler,
5344 &json!({}),
5345 "VarState",
5346 0,
5347 "TestEntity",
5348 None,
5349 None,
5350 )
5351 .unwrap();
5352
5353 assert!(
5354 vm.registers[2].get("entropy_value").is_none(),
5355 "Field should not be set when instruction hasn't been seen"
5356 );
5357
5358 let state = vm.states.get(&0).unwrap();
5359 let key = (signature.clone(), "RevealIxState".to_string());
5360 assert!(
5361 state.deferred_when_ops.contains_key(&key),
5362 "Operation should be queued"
5363 );
5364
5365 {
5366 let mut cache = state.recent_tx_instructions.lock().unwrap();
5367 let mut set = HashSet::new();
5368 set.insert("RevealIxState".to_string());
5369 cache.put(signature.clone(), set);
5370 }
5371
5372 let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5373 for op in deferred {
5374 vm.apply_deferred_when_op(0, &op, None, None).unwrap();
5375 }
5376
5377 let state = vm.states.get(&0).unwrap();
5378 let entity = state.data.get(&json!("pk_123")).unwrap();
5379 assert_eq!(
5380 entity.get("entropy_value").unwrap(),
5381 "deferred_value",
5382 "Field should be set after instruction arrives"
5383 );
5384 }
5385
5386 #[test]
5387 fn test_when_cleanup_expired() {
5388 let mut vm = VmContext::new();
5389
5390 let state = vm.states.get(&0).unwrap();
5391 let key = ("old_sig".to_string(), "SomeIxState".to_string());
5392 state.deferred_when_ops.insert(
5393 key,
5394 vec![DeferredWhenOperation {
5395 entity_name: "Test".to_string(),
5396 primary_key: json!("pk"),
5397 field_path: "field".to_string(),
5398 field_value: json!("value"),
5399 when_instruction: "SomeIxState".to_string(),
5400 signature: "old_sig".to_string(),
5401 slot: 0,
5402 deferred_at: 0,
5403 emit: true,
5404 }],
5405 );
5406
5407 let removed = vm.cleanup_expired_when_ops(0, 60);
5408
5409 assert_eq!(removed, 1, "Should have removed 1 expired op");
5410 assert!(
5411 vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5412 "Deferred ops should be empty after cleanup"
5413 );
5414 }
5415
5416 #[test]
5417 fn test_deferred_when_op_recomputes_dependent_fields() {
5418 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5419
5420 let mut vm = VmContext::new();
5421
5422 let computed_specs = vec![
5426 ComputedFieldSpec {
5427 target_path: "results.pre_reveal_rng".to_string(),
5428 result_type: "Option<u64>".to_string(),
5429 expression: ComputedExpr::FieldRef {
5430 path: "entropy.base_value".to_string(),
5431 },
5432 },
5433 ComputedFieldSpec {
5434 target_path: "results.pre_reveal_winning_square".to_string(),
5435 result_type: "Option<u64>".to_string(),
5436 expression: ComputedExpr::MethodCall {
5437 expr: Box::new(ComputedExpr::FieldRef {
5438 path: "results.pre_reveal_rng".to_string(),
5439 }),
5440 method: "map".to_string(),
5441 args: vec![ComputedExpr::Closure {
5442 param: "r".to_string(),
5443 body: Box::new(ComputedExpr::Binary {
5444 op: BinaryOp::Mod,
5445 left: Box::new(ComputedExpr::Var {
5446 name: "r".to_string(),
5447 }),
5448 right: Box::new(ComputedExpr::Literal {
5449 value: serde_json::json!(25),
5450 }),
5451 }),
5452 }],
5453 },
5454 },
5455 ];
5456
5457 let evaluator: Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync> =
5458 Box::new(VmContext::create_evaluator_from_specs(computed_specs));
5459
5460 let primary_key = json!("test_pk");
5463 let op = DeferredWhenOperation {
5464 entity_name: "TestEntity".to_string(),
5465 primary_key: primary_key.clone(),
5466 field_path: "entropy.base_value".to_string(),
5467 field_value: json!(100),
5468 when_instruction: "TestIxState".to_string(),
5469 signature: "test_sig".to_string(),
5470 slot: 100,
5471 deferred_at: 0,
5472 emit: true,
5473 };
5474
5475 let initial_state = json!({
5477 "results": {}
5478 });
5479 vm.states
5480 .get(&0)
5481 .unwrap()
5482 .insert_with_eviction(primary_key.clone(), initial_state);
5483
5484 let mutations = vm
5486 .apply_deferred_when_op(
5487 0,
5488 &op,
5489 Some(&evaluator),
5490 Some(&[
5491 "results.pre_reveal_rng".to_string(),
5492 "results.pre_reveal_winning_square".to_string(),
5493 ]),
5494 )
5495 .unwrap();
5496
5497 let state = vm.states.get(&0).unwrap();
5499 let entity = state.data.get(&primary_key).unwrap();
5500
5501 println!(
5502 "Entity state: {}",
5503 serde_json::to_string_pretty(&*entity).unwrap()
5504 );
5505
5506 assert_eq!(
5508 entity.get("entropy").and_then(|e| e.get("base_value")),
5509 Some(&json!(100)),
5510 "Base value should be set"
5511 );
5512
5513 let pre_reveal_rng = entity
5515 .get("results")
5516 .and_then(|r| r.get("pre_reveal_rng"))
5517 .cloned();
5518 let pre_reveal_winning_square = entity
5519 .get("results")
5520 .and_then(|r| r.get("pre_reveal_winning_square"))
5521 .cloned();
5522
5523 assert_eq!(
5524 pre_reveal_rng,
5525 Some(json!(100)),
5526 "pre_reveal_rng should be computed"
5527 );
5528 assert_eq!(
5529 pre_reveal_winning_square,
5530 Some(json!(0)),
5531 "pre_reveal_winning_square should be 100 % 25 = 0"
5532 );
5533
5534 assert!(!mutations.is_empty(), "Should have mutations");
5536 let mutation = &mutations[0];
5537 let patch = &mutation.patch;
5538
5539 assert!(
5540 patch
5541 .get("results")
5542 .and_then(|r| r.get("pre_reveal_rng"))
5543 .is_some(),
5544 "Mutation should include pre_reveal_rng"
5545 );
5546 assert!(
5547 patch
5548 .get("results")
5549 .and_then(|r| r.get("pre_reveal_winning_square"))
5550 .is_some(),
5551 "Mutation should include pre_reveal_winning_square"
5552 );
5553 }
5554}