1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3 ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use serde_json::{json, Value};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::num::NonZeroUsize;
12
13#[cfg(feature = "otel")]
14use tracing::instrument;
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37 pub fn new(slot: u64, signature: String) -> Self {
39 Self {
40 slot: Some(slot),
41 signature: Some(signature),
42 timestamp: None,
43 write_version: None,
44 txn_index: None,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51 Self {
52 slot: Some(slot),
53 signature: Some(signature),
54 timestamp: Some(timestamp),
55 write_version: None,
56 txn_index: None,
57 metadata: HashMap::new(),
58 }
59 }
60
61 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63 Self {
64 slot: Some(slot),
65 signature: Some(signature),
66 timestamp: None,
67 write_version: Some(write_version),
68 txn_index: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75 Self {
76 slot: Some(slot),
77 signature: Some(signature),
78 timestamp: None,
79 write_version: None,
80 txn_index: Some(txn_index),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn timestamp(&self) -> i64 {
87 self.timestamp.unwrap_or_else(|| {
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .unwrap()
91 .as_secs() as i64
92 })
93 }
94
95 pub fn empty() -> Self {
97 Self::default()
98 }
99
100 pub fn is_account_update(&self) -> bool {
103 self.write_version.is_some() && self.txn_index.is_none()
104 }
105
106 pub fn is_instruction_update(&self) -> bool {
108 self.txn_index.is_some() && self.write_version.is_none()
109 }
110
111 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
112 self.metadata.insert(key, value);
113 self
114 }
115
116 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
118 self.metadata.get(key)
119 }
120
121 pub fn to_value(&self) -> Value {
123 let mut obj = serde_json::Map::new();
124 if let Some(slot) = self.slot {
125 obj.insert("slot".to_string(), json!(slot));
126 }
127 if let Some(ref sig) = self.signature {
128 obj.insert("signature".to_string(), json!(sig));
129 }
130 obj.insert("timestamp".to_string(), json!(self.timestamp()));
132 for (key, value) in &self.metadata {
133 obj.insert(key.clone(), value.clone());
134 }
135 Value::Object(obj)
136 }
137}
138
139pub type Register = usize;
140pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
141
142pub type RegisterValue = Value;
143
144pub trait ComputedFieldsEvaluator {
147 fn evaluate(&self, state: &mut Value) -> Result<()>;
148}
149
150const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
152const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
153const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
158
159const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
161const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
162
163const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
164
165const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
166
167const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
170
171const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
172
173const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
174
175const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 5_000;
176
177fn estimate_json_size(value: &Value) -> usize {
179 match value {
180 Value::Null => 4,
181 Value::Bool(_) => 5,
182 Value::Number(_) => 8,
183 Value::String(s) => s.len() + 2,
184 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
185 Value::Object(obj) => {
186 2 + obj
187 .iter()
188 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
189 .sum::<usize>()
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
195pub struct CompiledPath {
196 pub segments: std::sync::Arc<[String]>,
197}
198
199impl CompiledPath {
200 pub fn new(path: &str) -> Self {
201 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
202 CompiledPath {
203 segments: segments.into(),
204 }
205 }
206
207 fn segments(&self) -> &[String] {
208 &self.segments
209 }
210}
211
212#[derive(Debug, Clone)]
215pub enum FieldChange {
216 Replaced,
218 Appended(Vec<Value>),
220}
221
222#[derive(Debug, Clone, Default)]
225pub struct DirtyTracker {
226 changes: HashMap<String, FieldChange>,
227}
228
229impl DirtyTracker {
230 pub fn new() -> Self {
232 Self {
233 changes: HashMap::new(),
234 }
235 }
236
237 pub fn mark_replaced(&mut self, path: &str) {
239 self.changes.insert(path.to_string(), FieldChange::Replaced);
241 }
242
243 pub fn mark_appended(&mut self, path: &str, value: Value) {
245 match self.changes.get_mut(path) {
246 Some(FieldChange::Appended(values)) => {
247 values.push(value);
249 }
250 Some(FieldChange::Replaced) => {
251 }
254 None => {
255 self.changes
257 .insert(path.to_string(), FieldChange::Appended(vec![value]));
258 }
259 }
260 }
261
262 pub fn is_empty(&self) -> bool {
264 self.changes.is_empty()
265 }
266
267 pub fn len(&self) -> usize {
269 self.changes.len()
270 }
271
272 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
274 self.changes.iter()
275 }
276
277 pub fn dirty_paths(&self) -> HashSet<String> {
279 self.changes.keys().cloned().collect()
280 }
281
282 pub fn into_changes(self) -> HashMap<String, FieldChange> {
284 self.changes
285 }
286
287 pub fn changes(&self) -> &HashMap<String, FieldChange> {
289 &self.changes
290 }
291
292 pub fn appended_paths(&self) -> Vec<String> {
294 self.changes
295 .iter()
296 .filter_map(|(path, change)| match change {
297 FieldChange::Appended(_) => Some(path.clone()),
298 FieldChange::Replaced => None,
299 })
300 .collect()
301 }
302}
303
304pub struct VmContext {
305 registers: Vec<RegisterValue>,
306 states: HashMap<u32, StateTable>,
307 pub instructions_executed: u64,
308 pub cache_hits: u64,
309 path_cache: HashMap<String, CompiledPath>,
310 pub pda_cache_hits: u64,
311 pub pda_cache_misses: u64,
312 pub pending_queue_size: u64,
313 resolver_requests: VecDeque<ResolverRequest>,
314 resolver_pending: HashMap<String, PendingResolverEntry>,
315 resolver_cache: LruCache<String, Value>,
316 pub resolver_cache_hits: u64,
317 pub resolver_cache_misses: u64,
318 current_context: Option<UpdateContext>,
319 warnings: Vec<String>,
320 last_pda_lookup_miss: Option<String>,
321 last_lookup_index_miss: Option<String>,
322 last_pda_registered: Option<String>,
323 last_lookup_index_keys: Vec<String>,
324}
325
326#[derive(Debug)]
327pub struct LookupIndex {
328 index: std::sync::Mutex<LruCache<String, Value>>,
329}
330
331impl LookupIndex {
332 pub fn new() -> Self {
333 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
334 }
335
336 pub fn with_capacity(capacity: usize) -> Self {
337 LookupIndex {
338 index: std::sync::Mutex::new(LruCache::new(
339 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
340 )),
341 }
342 }
343
344 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
345 let key = value_to_cache_key(lookup_value);
346 self.index.lock().unwrap().get(&key).cloned()
347 }
348
349 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
350 let key = value_to_cache_key(&lookup_value);
351 self.index.lock().unwrap().put(key, primary_key);
352 }
353
354 pub fn len(&self) -> usize {
355 self.index.lock().unwrap().len()
356 }
357
358 pub fn is_empty(&self) -> bool {
359 self.index.lock().unwrap().is_empty()
360 }
361}
362
363impl Default for LookupIndex {
364 fn default() -> Self {
365 Self::new()
366 }
367}
368
369fn value_to_cache_key(value: &Value) -> String {
370 match value {
371 Value::String(s) => s.clone(),
372 Value::Number(n) => n.to_string(),
373 Value::Bool(b) => b.to_string(),
374 Value::Null => "null".to_string(),
375 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
376 }
377}
378
379fn resolver_type_key(resolver: &ResolverType) -> &'static str {
380 match resolver {
381 ResolverType::Token => "token",
382 }
383}
384
385fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
386 format!(
387 "{}:{}",
388 resolver_type_key(resolver),
389 value_to_cache_key(input)
390 )
391}
392
393#[derive(Debug)]
394pub struct TemporalIndex {
395 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
396}
397
398impl Default for TemporalIndex {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404impl TemporalIndex {
405 pub fn new() -> Self {
406 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
407 }
408
409 pub fn with_capacity(capacity: usize) -> Self {
410 TemporalIndex {
411 index: std::sync::Mutex::new(LruCache::new(
412 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
413 )),
414 }
415 }
416
417 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
418 let key = value_to_cache_key(lookup_value);
419 let mut cache = self.index.lock().unwrap();
420 if let Some(entries) = cache.get(&key) {
421 for i in (0..entries.len()).rev() {
422 if entries[i].1 <= timestamp {
423 return Some(entries[i].0.clone());
424 }
425 }
426 }
427 None
428 }
429
430 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
431 let key = value_to_cache_key(lookup_value);
432 let mut cache = self.index.lock().unwrap();
433 if let Some(entries) = cache.get(&key) {
434 if let Some(last) = entries.last() {
435 return Some(last.0.clone());
436 }
437 }
438 None
439 }
440
441 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
442 let key = value_to_cache_key(&lookup_value);
443 let mut cache = self.index.lock().unwrap();
444
445 let entries = cache.get_or_insert_mut(key, Vec::new);
446 entries.push((primary_key, timestamp));
447 entries.sort_by_key(|(_, ts)| *ts);
448
449 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
450 entries.retain(|(_, ts)| *ts >= cutoff);
451
452 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
453 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
454 entries.drain(0..excess);
455 }
456 }
457
458 pub fn len(&self) -> usize {
459 self.index.lock().unwrap().len()
460 }
461
462 pub fn is_empty(&self) -> bool {
463 self.index.lock().unwrap().is_empty()
464 }
465
466 pub fn total_entries(&self) -> usize {
467 self.index
468 .lock()
469 .unwrap()
470 .iter()
471 .map(|(_, entries)| entries.len())
472 .sum()
473 }
474
475 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
476 let mut cache = self.index.lock().unwrap();
477 let mut total_removed = 0;
478
479 for (_, entries) in cache.iter_mut() {
480 let original_len = entries.len();
481 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
482 total_removed += original_len - entries.len();
483 }
484
485 total_removed
486 }
487}
488
489#[derive(Debug)]
490pub struct PdaReverseLookup {
491 index: LruCache<String, String>,
493}
494
495impl PdaReverseLookup {
496 pub fn new(capacity: usize) -> Self {
497 PdaReverseLookup {
498 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
499 }
500 }
501
502 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
503 self.index.get(pda_address).cloned()
504 }
505
506 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
507 let evicted = if self.index.len() >= self.index.cap().get() {
508 self.index.peek_lru().map(|(k, _)| k.clone())
509 } else {
510 None
511 };
512
513 self.index.put(pda_address, seed_value);
514 evicted
515 }
516
517 pub fn len(&self) -> usize {
518 self.index.len()
519 }
520
521 pub fn is_empty(&self) -> bool {
522 self.index.is_empty()
523 }
524
525 pub fn contains(&self, pda_address: &str) -> bool {
526 self.index.peek(pda_address).is_some()
527 }
528}
529
530#[derive(Debug, Clone)]
532pub struct QueuedAccountUpdate {
533 pub pda_address: String,
534 pub account_type: String,
535 pub account_data: Value,
536 pub slot: u64,
537 pub write_version: u64,
538 pub signature: String,
539}
540
541#[derive(Debug, Clone)]
543pub struct PendingAccountUpdate {
544 pub account_type: String,
545 pub pda_address: String,
546 pub account_data: Value,
547 pub slot: u64,
548 pub write_version: u64,
549 pub signature: String,
550 pub queued_at: i64,
551}
552
553#[derive(Debug, Clone)]
555pub struct QueuedInstructionEvent {
556 pub pda_address: String,
557 pub event_type: String,
558 pub event_data: Value,
559 pub slot: u64,
560 pub signature: String,
561}
562
563#[derive(Debug, Clone)]
565pub struct PendingInstructionEvent {
566 pub event_type: String,
567 pub pda_address: String,
568 pub event_data: Value,
569 pub slot: u64,
570 pub signature: String,
571 pub queued_at: i64,
572}
573
574#[derive(Debug, Clone)]
575pub struct DeferredWhenOperation {
576 pub entity_name: String,
577 pub primary_key: Value,
578 pub field_path: String,
579 pub field_value: Value,
580 pub when_instruction: String,
581 pub signature: String,
582 pub slot: u64,
583 pub deferred_at: i64,
584 pub emit: bool,
585}
586
587#[derive(Debug, Clone)]
588pub struct ResolverRequest {
589 pub cache_key: String,
590 pub resolver: ResolverType,
591 pub input: Value,
592}
593
594#[derive(Debug, Clone)]
595pub struct ResolverTarget {
596 pub state_id: u32,
597 pub entity_name: String,
598 pub primary_key: Value,
599 pub extracts: Vec<ResolverExtractSpec>,
600}
601
602#[derive(Debug, Clone)]
603pub struct PendingResolverEntry {
604 pub resolver: ResolverType,
605 pub input: Value,
606 pub targets: Vec<ResolverTarget>,
607 pub queued_at: i64,
608}
609
610impl PendingResolverEntry {
611 fn add_target(&mut self, target: ResolverTarget) {
612 if let Some(existing) = self.targets.iter_mut().find(|t| {
613 t.state_id == target.state_id
614 && t.entity_name == target.entity_name
615 && t.primary_key == target.primary_key
616 }) {
617 let mut seen = HashSet::new();
618 for extract in &existing.extracts {
619 seen.insert((extract.target_path.clone(), extract.source_path.clone()));
620 }
621
622 for extract in target.extracts {
623 let key = (extract.target_path.clone(), extract.source_path.clone());
624 if seen.insert(key) {
625 existing.extracts.push(extract);
626 }
627 }
628 } else {
629 self.targets.push(target);
630 }
631 }
632}
633
634#[derive(Debug, Clone)]
635pub struct PendingQueueStats {
636 pub total_updates: usize,
637 pub unique_pdas: usize,
638 pub oldest_age_seconds: i64,
639 pub largest_pda_queue_size: usize,
640 pub estimated_memory_bytes: usize,
641}
642
643#[derive(Debug, Clone, Default)]
644pub struct VmMemoryStats {
645 pub state_table_entity_count: usize,
646 pub state_table_max_entries: usize,
647 pub state_table_at_capacity: bool,
648 pub lookup_index_count: usize,
649 pub lookup_index_total_entries: usize,
650 pub temporal_index_count: usize,
651 pub temporal_index_total_entries: usize,
652 pub pda_reverse_lookup_count: usize,
653 pub pda_reverse_lookup_total_entries: usize,
654 pub version_tracker_entries: usize,
655 pub pending_queue_stats: Option<PendingQueueStats>,
656 pub path_cache_size: usize,
657}
658
659#[derive(Debug, Clone, Default)]
660pub struct CleanupResult {
661 pub pending_updates_removed: usize,
662 pub temporal_entries_removed: usize,
663}
664
665#[derive(Debug, Clone)]
666pub struct CapacityWarning {
667 pub current_entries: usize,
668 pub max_entries: usize,
669 pub entries_over_limit: usize,
670}
671
672#[derive(Debug, Clone)]
673pub struct StateTableConfig {
674 pub max_entries: usize,
675 pub max_array_length: usize,
676}
677
678impl Default for StateTableConfig {
679 fn default() -> Self {
680 Self {
681 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
682 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
683 }
684 }
685}
686
687#[derive(Debug)]
688pub struct VersionTracker {
689 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
690}
691
692impl VersionTracker {
693 pub fn new() -> Self {
694 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
695 }
696
697 pub fn with_capacity(capacity: usize) -> Self {
698 VersionTracker {
699 cache: std::sync::Mutex::new(LruCache::new(
700 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
701 )),
702 }
703 }
704
705 fn make_key(primary_key: &Value, event_type: &str) -> String {
706 format!("{}:{}", primary_key, event_type)
707 }
708
709 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
710 let key = Self::make_key(primary_key, event_type);
711 self.cache.lock().unwrap().get(&key).copied()
712 }
713
714 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
715 let key = Self::make_key(primary_key, event_type);
716 self.cache.lock().unwrap().put(key, (slot, ordering_value));
717 }
718
719 pub fn len(&self) -> usize {
720 self.cache.lock().unwrap().len()
721 }
722
723 pub fn is_empty(&self) -> bool {
724 self.cache.lock().unwrap().is_empty()
725 }
726}
727
728impl Default for VersionTracker {
729 fn default() -> Self {
730 Self::new()
731 }
732}
733
734#[derive(Debug)]
735pub struct StateTable {
736 pub data: DashMap<Value, Value>,
737 access_times: DashMap<Value, i64>,
738 pub lookup_indexes: HashMap<String, LookupIndex>,
739 pub temporal_indexes: HashMap<String, TemporalIndex>,
740 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
741 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
742 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
743 version_tracker: VersionTracker,
744 instruction_dedup_cache: VersionTracker,
745 config: StateTableConfig,
746 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
747 entity_name: String,
748 pub recent_tx_instructions:
749 std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
750 pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
751}
752
753impl StateTable {
754 pub fn is_at_capacity(&self) -> bool {
755 self.data.len() >= self.config.max_entries
756 }
757
758 pub fn entries_over_limit(&self) -> usize {
759 self.data.len().saturating_sub(self.config.max_entries)
760 }
761
762 pub fn max_array_length(&self) -> usize {
763 self.config.max_array_length
764 }
765
766 fn touch(&self, key: &Value) {
767 let now = std::time::SystemTime::now()
768 .duration_since(std::time::UNIX_EPOCH)
769 .unwrap()
770 .as_secs() as i64;
771 self.access_times.insert(key.clone(), now);
772 }
773
774 fn evict_lru(&self, count: usize) -> usize {
775 if count == 0 || self.data.is_empty() {
776 return 0;
777 }
778
779 let mut entries: Vec<(Value, i64)> = self
780 .access_times
781 .iter()
782 .map(|entry| (entry.key().clone(), *entry.value()))
783 .collect();
784
785 entries.sort_by_key(|(_, ts)| *ts);
786
787 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
788
789 let mut evicted = 0;
790 for key in to_evict {
791 self.data.remove(&key);
792 self.access_times.remove(&key);
793 evicted += 1;
794 }
795
796 #[cfg(feature = "otel")]
797 if evicted > 0 {
798 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
799 }
800
801 evicted
802 }
803
804 pub fn insert_with_eviction(&self, key: Value, value: Value) {
805 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
806 #[cfg(feature = "otel")]
807 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
808 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
809 self.evict_lru(to_evict.max(1));
810 }
811 self.data.insert(key.clone(), value);
812 self.touch(&key);
813 }
814
815 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
816 let result = self.data.get(key).map(|v| v.clone());
817 if result.is_some() {
818 self.touch(key);
819 }
820 result
821 }
822
823 pub fn is_fresh_update(
830 &self,
831 primary_key: &Value,
832 event_type: &str,
833 slot: u64,
834 ordering_value: u64,
835 ) -> bool {
836 let dominated = self
837 .version_tracker
838 .get(primary_key, event_type)
839 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
840 .unwrap_or(false);
841
842 if dominated {
843 return false;
844 }
845
846 self.version_tracker
847 .insert(primary_key, event_type, slot, ordering_value);
848 true
849 }
850
851 pub fn is_duplicate_instruction(
859 &self,
860 primary_key: &Value,
861 event_type: &str,
862 slot: u64,
863 txn_index: u64,
864 ) -> bool {
865 let is_duplicate = self
867 .instruction_dedup_cache
868 .get(primary_key, event_type)
869 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
870 .unwrap_or(false);
871
872 if is_duplicate {
873 return true;
874 }
875
876 self.instruction_dedup_cache
878 .insert(primary_key, event_type, slot, txn_index);
879 false
880 }
881}
882
883impl VmContext {
884 pub fn new() -> Self {
885 let mut vm = VmContext {
886 registers: vec![Value::Null; 256],
887 states: HashMap::new(),
888 instructions_executed: 0,
889 cache_hits: 0,
890 path_cache: HashMap::new(),
891 pda_cache_hits: 0,
892 pda_cache_misses: 0,
893 pending_queue_size: 0,
894 resolver_requests: VecDeque::new(),
895 resolver_pending: HashMap::new(),
896 resolver_cache: LruCache::new(
897 NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
898 .expect("capacity must be > 0"),
899 ),
900 resolver_cache_hits: 0,
901 resolver_cache_misses: 0,
902 current_context: None,
903 warnings: Vec::new(),
904 last_pda_lookup_miss: None,
905 last_lookup_index_miss: None,
906 last_pda_registered: None,
907 last_lookup_index_keys: Vec::new(),
908 };
909 vm.states.insert(
910 0,
911 StateTable {
912 data: DashMap::new(),
913 access_times: DashMap::new(),
914 lookup_indexes: HashMap::new(),
915 temporal_indexes: HashMap::new(),
916 pda_reverse_lookups: HashMap::new(),
917 pending_updates: DashMap::new(),
918 pending_instruction_events: DashMap::new(),
919 version_tracker: VersionTracker::new(),
920 instruction_dedup_cache: VersionTracker::with_capacity(
921 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
922 ),
923 config: StateTableConfig::default(),
924 entity_name: "default".to_string(),
925 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
926 NonZeroUsize::new(1000).unwrap(),
927 )),
928 deferred_when_ops: DashMap::new(),
929 },
930 );
931 vm
932 }
933
934 pub fn new_with_config(state_config: StateTableConfig) -> Self {
935 let mut vm = VmContext {
936 registers: vec![Value::Null; 256],
937 states: HashMap::new(),
938 instructions_executed: 0,
939 cache_hits: 0,
940 path_cache: HashMap::new(),
941 pda_cache_hits: 0,
942 pda_cache_misses: 0,
943 pending_queue_size: 0,
944 resolver_requests: VecDeque::new(),
945 resolver_pending: HashMap::new(),
946 resolver_cache: LruCache::new(
947 NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
948 .expect("capacity must be > 0"),
949 ),
950 resolver_cache_hits: 0,
951 resolver_cache_misses: 0,
952 current_context: None,
953 warnings: Vec::new(),
954 last_pda_lookup_miss: None,
955 last_lookup_index_miss: None,
956 last_pda_registered: None,
957 last_lookup_index_keys: Vec::new(),
958 };
959 vm.states.insert(
960 0,
961 StateTable {
962 data: DashMap::new(),
963 access_times: DashMap::new(),
964 lookup_indexes: HashMap::new(),
965 temporal_indexes: HashMap::new(),
966 pda_reverse_lookups: HashMap::new(),
967 pending_updates: DashMap::new(),
968 pending_instruction_events: DashMap::new(),
969 version_tracker: VersionTracker::new(),
970 instruction_dedup_cache: VersionTracker::with_capacity(
971 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
972 ),
973 config: state_config,
974 entity_name: "default".to_string(),
975 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
976 NonZeroUsize::new(1000).unwrap(),
977 )),
978 deferred_when_ops: DashMap::new(),
979 },
980 );
981 vm
982 }
983
984 pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
985 self.resolver_requests.drain(..).collect()
986 }
987
988 pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
989 if requests.is_empty() {
990 return;
991 }
992
993 self.resolver_requests.extend(requests);
994 }
995
996 pub fn apply_resolver_result(
997 &mut self,
998 bytecode: &MultiEntityBytecode,
999 cache_key: &str,
1000 resolved_value: Value,
1001 ) -> Result<Vec<Mutation>> {
1002 self.resolver_cache
1003 .put(cache_key.to_string(), resolved_value.clone());
1004
1005 let entry = match self.resolver_pending.remove(cache_key) {
1006 Some(entry) => entry,
1007 None => return Ok(Vec::new()),
1008 };
1009
1010 let mut mutations = Vec::new();
1011
1012 for target in entry.targets {
1013 if target.primary_key.is_null() {
1014 continue;
1015 }
1016
1017 let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1018 Some(bc) => bc,
1019 None => continue,
1020 };
1021
1022 let state = match self.states.get(&target.state_id) {
1023 Some(s) => s,
1024 None => continue,
1025 };
1026
1027 let mut entity_state = state
1028 .get_and_touch(&target.primary_key)
1029 .unwrap_or_else(|| json!({}));
1030
1031 let mut dirty_tracker = DirtyTracker::new();
1032 let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1033
1034 Self::apply_resolver_extractions_to_value(
1035 &mut entity_state,
1036 &resolved_value,
1037 &target.extracts,
1038 &mut dirty_tracker,
1039 &should_emit,
1040 )?;
1041
1042 if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1043 let old_values: Vec<_> = entity_bytecode
1044 .computed_paths
1045 .iter()
1046 .map(|path| Self::get_value_at_path(&entity_state, path))
1047 .collect();
1048
1049 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1050 let context_timestamp = self
1051 .current_context
1052 .as_ref()
1053 .map(|c| c.timestamp())
1054 .unwrap_or_else(|| {
1055 std::time::SystemTime::now()
1056 .duration_since(std::time::UNIX_EPOCH)
1057 .unwrap()
1058 .as_secs() as i64
1059 });
1060 let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1061 if eval_result.is_ok() {
1062 for (path, old_value) in
1063 entity_bytecode.computed_paths.iter().zip(old_values.iter())
1064 {
1065 let new_value = Self::get_value_at_path(&entity_state, path);
1066 if new_value != *old_value && should_emit(path) {
1067 dirty_tracker.mark_replaced(path);
1068 }
1069 }
1070 }
1071 }
1072
1073 state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1074
1075 if dirty_tracker.is_empty() {
1076 continue;
1077 }
1078
1079 let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1080
1081 mutations.push(Mutation {
1082 export: target.entity_name.clone(),
1083 key: target.primary_key.clone(),
1084 patch,
1085 append: vec![],
1086 });
1087 }
1088
1089 Ok(mutations)
1090 }
1091
1092 fn enqueue_resolver_request(
1093 &mut self,
1094 cache_key: String,
1095 resolver: ResolverType,
1096 input: Value,
1097 target: ResolverTarget,
1098 ) {
1099 if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1100 entry.add_target(target);
1101 return;
1102 }
1103
1104 let queued_at = std::time::SystemTime::now()
1105 .duration_since(std::time::UNIX_EPOCH)
1106 .unwrap()
1107 .as_secs() as i64;
1108
1109 self.resolver_pending.insert(
1110 cache_key.clone(),
1111 PendingResolverEntry {
1112 resolver: resolver.clone(),
1113 input: input.clone(),
1114 targets: vec![target],
1115 queued_at,
1116 },
1117 );
1118
1119 self.resolver_requests.push_back(ResolverRequest {
1120 cache_key,
1121 resolver,
1122 input,
1123 });
1124 }
1125
1126 fn apply_resolver_extractions_to_value<F>(
1127 state: &mut Value,
1128 resolved_value: &Value,
1129 extracts: &[ResolverExtractSpec],
1130 dirty_tracker: &mut DirtyTracker,
1131 should_emit: &F,
1132 ) -> Result<()>
1133 where
1134 F: Fn(&str) -> bool,
1135 {
1136 for extract in extracts {
1137 let resolved = match extract.source_path.as_deref() {
1138 Some(path) => Self::get_value_at_path(resolved_value, path),
1139 None => Some(resolved_value.clone()),
1140 };
1141
1142 let Some(value) = resolved else {
1143 continue;
1144 };
1145
1146 let value = if let Some(transform) = &extract.transform {
1147 Self::apply_transformation(&value, transform)?
1148 } else {
1149 value
1150 };
1151
1152 Self::set_nested_field_value(state, &extract.target_path, value)?;
1153 if should_emit(&extract.target_path) {
1154 dirty_tracker.mark_replaced(&extract.target_path);
1155 }
1156 }
1157
1158 Ok(())
1159 }
1160
1161 fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1162 if tracker.is_empty() {
1163 return Ok(json!({}));
1164 }
1165
1166 let mut partial = serde_json::Map::new();
1167
1168 for (path, change) in tracker.iter() {
1169 let segments: Vec<&str> = path.split('.').collect();
1170
1171 let value_to_insert = match change {
1172 FieldChange::Replaced => {
1173 let mut current = state;
1174 let mut found = true;
1175
1176 for segment in &segments {
1177 match current.get(*segment) {
1178 Some(v) => current = v,
1179 None => {
1180 found = false;
1181 break;
1182 }
1183 }
1184 }
1185
1186 if !found {
1187 continue;
1188 }
1189 current.clone()
1190 }
1191 FieldChange::Appended(values) => Value::Array(values.clone()),
1192 };
1193
1194 let mut target = &mut partial;
1195 for (i, segment) in segments.iter().enumerate() {
1196 if i == segments.len() - 1 {
1197 target.insert(segment.to_string(), value_to_insert.clone());
1198 } else {
1199 target
1200 .entry(segment.to_string())
1201 .or_insert_with(|| json!({}));
1202 target = target
1203 .get_mut(*segment)
1204 .and_then(|v| v.as_object_mut())
1205 .ok_or("Failed to build nested structure")?;
1206 }
1207 }
1208 }
1209
1210 Ok(Value::Object(partial))
1211 }
1212
1213 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1216 self.states.get_mut(&state_id)
1217 }
1218
1219 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1221 &mut self.registers
1222 }
1223
1224 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1226 &self.path_cache
1227 }
1228
1229 pub fn current_context(&self) -> Option<&UpdateContext> {
1231 self.current_context.as_ref()
1232 }
1233
1234 pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1236 self.current_context = context;
1237 }
1238
1239 fn add_warning(&mut self, msg: String) {
1240 self.warnings.push(msg);
1241 }
1242
1243 pub fn take_warnings(&mut self) -> Vec<String> {
1244 std::mem::take(&mut self.warnings)
1245 }
1246
1247 pub fn has_warnings(&self) -> bool {
1248 !self.warnings.is_empty()
1249 }
1250
1251 pub fn update_state_from_register(
1252 &mut self,
1253 state_id: u32,
1254 key: Value,
1255 register: Register,
1256 ) -> Result<()> {
1257 let state = self.states.get(&state_id).ok_or("State table not found")?;
1258 let value = self.registers[register].clone();
1259 state.insert_with_eviction(key, value);
1260 Ok(())
1261 }
1262
1263 fn reset_registers(&mut self) {
1264 for reg in &mut self.registers {
1265 *reg = Value::Null;
1266 }
1267 }
1268
1269 pub fn extract_partial_state(
1271 &self,
1272 state_reg: Register,
1273 dirty_fields: &HashSet<String>,
1274 ) -> Result<Value> {
1275 let full_state = &self.registers[state_reg];
1276
1277 if dirty_fields.is_empty() {
1278 return Ok(json!({}));
1279 }
1280
1281 let mut partial = serde_json::Map::new();
1282
1283 for path in dirty_fields {
1284 let segments: Vec<&str> = path.split('.').collect();
1285
1286 let mut current = full_state;
1287 let mut found = true;
1288
1289 for segment in &segments {
1290 match current.get(segment) {
1291 Some(v) => current = v,
1292 None => {
1293 found = false;
1294 break;
1295 }
1296 }
1297 }
1298
1299 if !found {
1300 continue;
1301 }
1302
1303 let mut target = &mut partial;
1304 for (i, segment) in segments.iter().enumerate() {
1305 if i == segments.len() - 1 {
1306 target.insert(segment.to_string(), current.clone());
1307 } else {
1308 target
1309 .entry(segment.to_string())
1310 .or_insert_with(|| json!({}));
1311 target = target
1312 .get_mut(*segment)
1313 .and_then(|v| v.as_object_mut())
1314 .ok_or("Failed to build nested structure")?;
1315 }
1316 }
1317 }
1318
1319 Ok(Value::Object(partial))
1320 }
1321
1322 pub fn extract_partial_state_with_tracker(
1326 &self,
1327 state_reg: Register,
1328 tracker: &DirtyTracker,
1329 ) -> Result<Value> {
1330 let full_state = &self.registers[state_reg];
1331
1332 if tracker.is_empty() {
1333 return Ok(json!({}));
1334 }
1335
1336 let mut partial = serde_json::Map::new();
1337
1338 for (path, change) in tracker.iter() {
1339 let segments: Vec<&str> = path.split('.').collect();
1340
1341 let value_to_insert = match change {
1342 FieldChange::Replaced => {
1343 let mut current = full_state;
1344 let mut found = true;
1345
1346 for segment in &segments {
1347 match current.get(*segment) {
1348 Some(v) => current = v,
1349 None => {
1350 found = false;
1351 break;
1352 }
1353 }
1354 }
1355
1356 if !found {
1357 continue;
1358 }
1359 current.clone()
1360 }
1361 FieldChange::Appended(values) => Value::Array(values.clone()),
1362 };
1363
1364 let mut target = &mut partial;
1365 for (i, segment) in segments.iter().enumerate() {
1366 if i == segments.len() - 1 {
1367 target.insert(segment.to_string(), value_to_insert.clone());
1368 } else {
1369 target
1370 .entry(segment.to_string())
1371 .or_insert_with(|| json!({}));
1372 target = target
1373 .get_mut(*segment)
1374 .and_then(|v| v.as_object_mut())
1375 .ok_or("Failed to build nested structure")?;
1376 }
1377 }
1378 }
1379
1380 Ok(Value::Object(partial))
1381 }
1382
1383 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1384 if let Some(compiled) = self.path_cache.get(path) {
1385 self.cache_hits += 1;
1386 #[cfg(feature = "otel")]
1387 crate::vm_metrics::record_path_cache_hit();
1388 return compiled.clone();
1389 }
1390 #[cfg(feature = "otel")]
1391 crate::vm_metrics::record_path_cache_miss();
1392 let compiled = CompiledPath::new(path);
1393 self.path_cache.insert(path.to_string(), compiled.clone());
1394 compiled
1395 }
1396
1397 #[cfg_attr(feature = "otel", instrument(
1399 name = "vm.process_event",
1400 skip(self, bytecode, event_value, log),
1401 level = "info",
1402 fields(
1403 event_type = %event_type,
1404 slot = context.as_ref().and_then(|c| c.slot),
1405 )
1406 ))]
1407 pub fn process_event(
1408 &mut self,
1409 bytecode: &MultiEntityBytecode,
1410 event_value: Value,
1411 event_type: &str,
1412 context: Option<&UpdateContext>,
1413 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1414 ) -> Result<Vec<Mutation>> {
1415 self.current_context = context.cloned();
1416
1417 let mut event_value = event_value;
1418 if let Some(ctx) = context {
1419 if let Some(obj) = event_value.as_object_mut() {
1420 obj.insert("__update_context".to_string(), ctx.to_value());
1421 }
1422 }
1423
1424 let mut all_mutations = Vec::new();
1425
1426 if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1427 if let Some(ctx) = context {
1428 if let Some(signature) = ctx.signature.clone() {
1429 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1430 for state_id in state_ids {
1431 if let Some(state) = self.states.get(&state_id) {
1432 {
1433 let mut cache = state.recent_tx_instructions.lock().unwrap();
1434 let entry =
1435 cache.get_or_insert_mut(signature.clone(), HashSet::new);
1436 entry.insert(event_type.to_string());
1437 }
1438
1439 let key = (signature.clone(), event_type.to_string());
1440 if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1441 tracing::debug!(
1442 event_type = %event_type,
1443 signature = %signature,
1444 deferred_count = deferred_ops.len(),
1445 "flushing deferred when-ops"
1446 );
1447 for op in deferred_ops {
1448 match self.apply_deferred_when_op(state_id, &op) {
1449 Ok(mutations) => all_mutations.extend(mutations),
1450 Err(e) => tracing::warn!(
1451 "Failed to apply deferred when-op: {}",
1452 e
1453 ),
1454 }
1455 }
1456 }
1457 }
1458 }
1459 }
1460 }
1461 }
1462
1463 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1464 for entity_name in entity_names {
1465 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1466 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1467 if let Some(ref mut log) = log {
1468 log.set("entity", entity_name.clone());
1469 log.inc("handlers", 1);
1470 }
1471
1472 let opcodes_before = self.instructions_executed;
1473 let cache_before = self.cache_hits;
1474 let pda_hits_before = self.pda_cache_hits;
1475 let pda_misses_before = self.pda_cache_misses;
1476
1477 let mutations = self.execute_handler(
1478 handler,
1479 &event_value,
1480 event_type,
1481 entity_bytecode.state_id,
1482 entity_name,
1483 entity_bytecode.computed_fields_evaluator.as_ref(),
1484 Some(&entity_bytecode.non_emitted_fields),
1485 )?;
1486
1487 if let Some(ref mut log) = log {
1488 log.inc(
1489 "opcodes",
1490 (self.instructions_executed - opcodes_before) as i64,
1491 );
1492 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1493 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1494 log.inc(
1495 "pda_misses",
1496 (self.pda_cache_misses - pda_misses_before) as i64,
1497 );
1498 }
1499
1500 if mutations.is_empty() {
1501 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1502 if event_type.ends_with("IxState") {
1503 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1504 let signature = context
1505 .and_then(|c| c.signature.clone())
1506 .unwrap_or_default();
1507 let _ = self.queue_instruction_event(
1508 entity_bytecode.state_id,
1509 QueuedInstructionEvent {
1510 pda_address: missed_pda,
1511 event_type: event_type.to_string(),
1512 event_data: event_value.clone(),
1513 slot,
1514 signature,
1515 },
1516 );
1517 }
1518 }
1519
1520 if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1521 if !event_type.ends_with("IxState") {
1522 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1523 let signature = context
1524 .and_then(|c| c.signature.clone())
1525 .unwrap_or_default();
1526 if let Some(write_version) =
1527 context.and_then(|c| c.write_version)
1528 {
1529 let _ = self.queue_account_update(
1530 entity_bytecode.state_id,
1531 QueuedAccountUpdate {
1532 pda_address: missed_lookup,
1533 account_type: event_type.to_string(),
1534 account_data: event_value.clone(),
1535 slot,
1536 write_version,
1537 signature,
1538 },
1539 );
1540 }
1541 }
1542 }
1543 }
1544
1545 all_mutations.extend(mutations);
1546
1547 if event_type.ends_with("IxState") {
1548 if let Some(ctx) = context {
1549 if let Some(ref signature) = ctx.signature {
1550 if let Some(state) = self.states.get(&entity_bytecode.state_id)
1551 {
1552 {
1553 let mut cache =
1554 state.recent_tx_instructions.lock().unwrap();
1555 let entry = cache
1556 .get_or_insert_mut(signature.clone(), HashSet::new);
1557 entry.insert(event_type.to_string());
1558 }
1559
1560 let key = (signature.clone(), event_type.to_string());
1561 if let Some((_, deferred_ops)) =
1562 state.deferred_when_ops.remove(&key)
1563 {
1564 tracing::debug!(
1565 event_type = %event_type,
1566 signature = %signature,
1567 deferred_count = deferred_ops.len(),
1568 "flushing deferred when-ops"
1569 );
1570 for op in deferred_ops {
1571 match self.apply_deferred_when_op(
1572 entity_bytecode.state_id,
1573 &op,
1574 ) {
1575 Ok(mutations) => {
1576 all_mutations.extend(mutations)
1577 }
1578 Err(e) => {
1579 tracing::warn!(
1580 "Failed to apply deferred when-op: {}",
1581 e
1582 );
1583 }
1584 }
1585 }
1586 }
1587 }
1588 }
1589 }
1590 }
1591
1592 if let Some(registered_pda) = self.take_last_pda_registered() {
1593 let pending_events = self.flush_pending_instruction_events(
1594 entity_bytecode.state_id,
1595 ®istered_pda,
1596 );
1597 for pending in pending_events {
1598 if let Some(pending_handler) =
1599 entity_bytecode.handlers.get(&pending.event_type)
1600 {
1601 if let Ok(reprocessed_mutations) = self.execute_handler(
1602 pending_handler,
1603 &pending.event_data,
1604 &pending.event_type,
1605 entity_bytecode.state_id,
1606 entity_name,
1607 entity_bytecode.computed_fields_evaluator.as_ref(),
1608 Some(&entity_bytecode.non_emitted_fields),
1609 ) {
1610 all_mutations.extend(reprocessed_mutations);
1611 }
1612 }
1613 }
1614 }
1615
1616 let lookup_keys = self.take_last_lookup_index_keys();
1617 for lookup_key in lookup_keys {
1618 if let Ok(pending_updates) =
1619 self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1620 {
1621 for pending in pending_updates {
1622 if let Some(pending_handler) =
1623 entity_bytecode.handlers.get(&pending.account_type)
1624 {
1625 self.current_context = Some(UpdateContext::new_account(
1626 pending.slot,
1627 pending.signature.clone(),
1628 pending.write_version,
1629 ));
1630 if let Ok(reprocessed) = self.execute_handler(
1631 pending_handler,
1632 &pending.account_data,
1633 &pending.account_type,
1634 entity_bytecode.state_id,
1635 entity_name,
1636 entity_bytecode.computed_fields_evaluator.as_ref(),
1637 Some(&entity_bytecode.non_emitted_fields),
1638 ) {
1639 all_mutations.extend(reprocessed);
1640 }
1641 }
1642 }
1643 }
1644 }
1645 } else if let Some(ref mut log) = log {
1646 log.set("skip_reason", "no_handler");
1647 }
1648 } else if let Some(ref mut log) = log {
1649 log.set("skip_reason", "entity_not_found");
1650 }
1651 }
1652 } else if let Some(ref mut log) = log {
1653 log.set("skip_reason", "no_event_routing");
1654 }
1655
1656 if let Some(log) = log {
1657 log.set("mutations", all_mutations.len() as i64);
1658 if let Some(first) = all_mutations.first() {
1659 if let Some(key_str) = first.key.as_str() {
1660 log.set("primary_key", key_str);
1661 } else if let Some(key_num) = first.key.as_u64() {
1662 log.set("primary_key", key_num as i64);
1663 }
1664 }
1665 if let Some(state) = self.states.get(&0) {
1666 log.set("state_table_size", state.data.len() as i64);
1667 }
1668
1669 let warnings = self.take_warnings();
1670 if !warnings.is_empty() {
1671 log.set("warnings", warnings.len() as i64);
1672 log.set(
1673 "warning_messages",
1674 Value::Array(warnings.into_iter().map(Value::String).collect()),
1675 );
1676 log.set_level(crate::canonical_log::LogLevel::Warn);
1677 }
1678 } else {
1679 self.warnings.clear();
1680 }
1681
1682 if self.instructions_executed.is_multiple_of(1000) {
1683 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1684 for state_id in state_ids {
1685 let expired = self.cleanup_expired_when_ops(state_id, 60);
1686 if expired > 0 {
1687 tracing::debug!(
1688 "Cleaned up {} expired deferred when-ops for state {}",
1689 expired,
1690 state_id
1691 );
1692 }
1693 }
1694 }
1695
1696 Ok(all_mutations)
1697 }
1698
1699 pub fn process_any(
1700 &mut self,
1701 bytecode: &MultiEntityBytecode,
1702 any: prost_types::Any,
1703 ) -> Result<Vec<Mutation>> {
1704 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1705 self.process_event(bytecode, event_value, &event_type, None, None)
1706 }
1707
1708 #[cfg_attr(feature = "otel", instrument(
1709 name = "vm.execute_handler",
1710 skip(self, handler, event_value, entity_evaluator),
1711 level = "debug",
1712 fields(
1713 event_type = %event_type,
1714 handler_opcodes = handler.len(),
1715 )
1716 ))]
1717 #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1718 fn execute_handler(
1719 &mut self,
1720 handler: &[OpCode],
1721 event_value: &Value,
1722 event_type: &str,
1723 override_state_id: u32,
1724 entity_name: &str,
1725 entity_evaluator: Option<
1726 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1727 >,
1728 non_emitted_fields: Option<&HashSet<String>>,
1729 ) -> Result<Vec<Mutation>> {
1730 self.reset_registers();
1731 self.last_pda_lookup_miss = None;
1732
1733 let mut pc: usize = 0;
1734 let mut output = Vec::new();
1735 let mut dirty_tracker = DirtyTracker::new();
1736 let should_emit = |path: &str| {
1737 non_emitted_fields
1738 .map(|fields| !fields.contains(path))
1739 .unwrap_or(true)
1740 };
1741
1742 while pc < handler.len() {
1743 match &handler[pc] {
1744 OpCode::LoadEventField {
1745 path,
1746 dest,
1747 default,
1748 } => {
1749 let value = self.load_field(event_value, path, default.as_ref())?;
1750 self.registers[*dest] = value;
1751 pc += 1;
1752 }
1753 OpCode::LoadConstant { value, dest } => {
1754 self.registers[*dest] = value.clone();
1755 pc += 1;
1756 }
1757 OpCode::CopyRegister { source, dest } => {
1758 self.registers[*dest] = self.registers[*source].clone();
1759 pc += 1;
1760 }
1761 OpCode::CopyRegisterIfNull { source, dest } => {
1762 if self.registers[*dest].is_null() {
1763 self.registers[*dest] = self.registers[*source].clone();
1764 }
1765 pc += 1;
1766 }
1767 OpCode::GetEventType { dest } => {
1768 self.registers[*dest] = json!(event_type);
1769 pc += 1;
1770 }
1771 OpCode::CreateObject { dest } => {
1772 self.registers[*dest] = json!({});
1773 pc += 1;
1774 }
1775 OpCode::SetField {
1776 object,
1777 path,
1778 value,
1779 } => {
1780 self.set_field_auto_vivify(*object, path, *value)?;
1781 if should_emit(path) {
1782 dirty_tracker.mark_replaced(path);
1783 }
1784 pc += 1;
1785 }
1786 OpCode::SetFields { object, fields } => {
1787 for (path, value_reg) in fields {
1788 self.set_field_auto_vivify(*object, path, *value_reg)?;
1789 if should_emit(path) {
1790 dirty_tracker.mark_replaced(path);
1791 }
1792 }
1793 pc += 1;
1794 }
1795 OpCode::GetField { object, path, dest } => {
1796 let value = self.get_field(*object, path)?;
1797 self.registers[*dest] = value;
1798 pc += 1;
1799 }
1800 OpCode::ReadOrInitState {
1801 state_id: _,
1802 key,
1803 default,
1804 dest,
1805 } => {
1806 let actual_state_id = override_state_id;
1807 let entity_name_owned = entity_name.to_string();
1808 self.states
1809 .entry(actual_state_id)
1810 .or_insert_with(|| StateTable {
1811 data: DashMap::new(),
1812 access_times: DashMap::new(),
1813 lookup_indexes: HashMap::new(),
1814 temporal_indexes: HashMap::new(),
1815 pda_reverse_lookups: HashMap::new(),
1816 pending_updates: DashMap::new(),
1817 pending_instruction_events: DashMap::new(),
1818 version_tracker: VersionTracker::new(),
1819 instruction_dedup_cache: VersionTracker::with_capacity(
1820 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1821 ),
1822 config: StateTableConfig::default(),
1823 entity_name: entity_name_owned,
1824 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1825 NonZeroUsize::new(1000).unwrap(),
1826 )),
1827 deferred_when_ops: DashMap::new(),
1828 });
1829 let key_value = self.registers[*key].clone();
1830 let warn_null_key = key_value.is_null()
1831 && event_type.ends_with("State")
1832 && !event_type.ends_with("IxState");
1833
1834 if warn_null_key {
1835 self.add_warning(format!(
1836 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1837 key, event_type
1838 ));
1839 }
1840
1841 let state = self
1842 .states
1843 .get(&actual_state_id)
1844 .ok_or("State table not found")?;
1845
1846 if !key_value.is_null() {
1847 if let Some(ctx) = &self.current_context {
1848 if ctx.is_account_update() {
1850 if let (Some(slot), Some(write_version)) =
1851 (ctx.slot, ctx.write_version)
1852 {
1853 if !state.is_fresh_update(
1854 &key_value,
1855 event_type,
1856 slot,
1857 write_version,
1858 ) {
1859 self.add_warning(format!(
1860 "Stale account update skipped: slot={}, write_version={}",
1861 slot, write_version
1862 ));
1863 return Ok(Vec::new());
1864 }
1865 }
1866 }
1867 else if ctx.is_instruction_update() {
1869 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1870 if state.is_duplicate_instruction(
1871 &key_value, event_type, slot, txn_index,
1872 ) {
1873 self.add_warning(format!(
1874 "Duplicate instruction skipped: slot={}, txn_index={}",
1875 slot, txn_index
1876 ));
1877 return Ok(Vec::new());
1878 }
1879 }
1880 }
1881 }
1882 }
1883 let value = state
1884 .get_and_touch(&key_value)
1885 .unwrap_or_else(|| default.clone());
1886
1887 self.registers[*dest] = value;
1888 pc += 1;
1889 }
1890 OpCode::UpdateState {
1891 state_id: _,
1892 key,
1893 value,
1894 } => {
1895 let actual_state_id = override_state_id;
1896 let state = self
1897 .states
1898 .get(&actual_state_id)
1899 .ok_or("State table not found")?;
1900 let key_value = self.registers[*key].clone();
1901 let value_data = self.registers[*value].clone();
1902
1903 state.insert_with_eviction(key_value, value_data);
1904 pc += 1;
1905 }
1906 OpCode::AppendToArray {
1907 object,
1908 path,
1909 value,
1910 } => {
1911 let appended_value = self.registers[*value].clone();
1912 let max_len = self
1913 .states
1914 .get(&override_state_id)
1915 .map(|s| s.max_array_length())
1916 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1917 self.append_to_array(*object, path, *value, max_len)?;
1918 if should_emit(path) {
1919 dirty_tracker.mark_appended(path, appended_value);
1920 }
1921 pc += 1;
1922 }
1923 OpCode::GetCurrentTimestamp { dest } => {
1924 let timestamp = std::time::SystemTime::now()
1925 .duration_since(std::time::UNIX_EPOCH)
1926 .unwrap()
1927 .as_secs() as i64;
1928 self.registers[*dest] = json!(timestamp);
1929 pc += 1;
1930 }
1931 OpCode::CreateEvent { dest, event_value } => {
1932 let timestamp = std::time::SystemTime::now()
1933 .duration_since(std::time::UNIX_EPOCH)
1934 .unwrap()
1935 .as_secs() as i64;
1936
1937 let mut event_data = self.registers[*event_value].clone();
1939 if let Some(obj) = event_data.as_object_mut() {
1940 obj.remove("__update_context");
1941 }
1942
1943 let mut event = serde_json::Map::new();
1945 event.insert("timestamp".to_string(), json!(timestamp));
1946 event.insert("data".to_string(), event_data);
1947
1948 if let Some(ref ctx) = self.current_context {
1950 if let Some(slot) = ctx.slot {
1951 event.insert("slot".to_string(), json!(slot));
1952 }
1953 if let Some(ref signature) = ctx.signature {
1954 event.insert("signature".to_string(), json!(signature));
1955 }
1956 }
1957
1958 self.registers[*dest] = Value::Object(event);
1959 pc += 1;
1960 }
1961 OpCode::CreateCapture {
1962 dest,
1963 capture_value,
1964 } => {
1965 let timestamp = std::time::SystemTime::now()
1966 .duration_since(std::time::UNIX_EPOCH)
1967 .unwrap()
1968 .as_secs() as i64;
1969
1970 let capture_data = self.registers[*capture_value].clone();
1972
1973 let account_address = event_value
1975 .get("__account_address")
1976 .and_then(|v| v.as_str())
1977 .unwrap_or("")
1978 .to_string();
1979
1980 let mut capture = serde_json::Map::new();
1982 capture.insert("timestamp".to_string(), json!(timestamp));
1983 capture.insert("account_address".to_string(), json!(account_address));
1984 capture.insert("data".to_string(), capture_data);
1985
1986 if let Some(ref ctx) = self.current_context {
1988 if let Some(slot) = ctx.slot {
1989 capture.insert("slot".to_string(), json!(slot));
1990 }
1991 if let Some(ref signature) = ctx.signature {
1992 capture.insert("signature".to_string(), json!(signature));
1993 }
1994 }
1995
1996 self.registers[*dest] = Value::Object(capture);
1997 pc += 1;
1998 }
1999 OpCode::Transform {
2000 source,
2001 dest,
2002 transformation,
2003 } => {
2004 if source == dest {
2005 self.transform_in_place(*source, transformation)?;
2006 } else {
2007 let source_value = &self.registers[*source];
2008 let value = Self::apply_transformation(source_value, transformation)?;
2009 self.registers[*dest] = value;
2010 }
2011 pc += 1;
2012 }
2013 OpCode::EmitMutation {
2014 entity_name,
2015 key,
2016 state,
2017 } => {
2018 let primary_key = self.registers[*key].clone();
2019
2020 if primary_key.is_null() || dirty_tracker.is_empty() {
2021 let reason = if dirty_tracker.is_empty() {
2022 "no_fields_modified"
2023 } else {
2024 "null_primary_key"
2025 };
2026 self.add_warning(format!(
2027 "Skipping mutation for entity '{}': {} (dirty_fields={})",
2028 entity_name,
2029 reason,
2030 dirty_tracker.len()
2031 ));
2032 } else {
2033 let patch =
2034 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2035
2036 let append = dirty_tracker.appended_paths();
2037 let mutation = Mutation {
2038 export: entity_name.clone(),
2039 key: primary_key,
2040 patch,
2041 append,
2042 };
2043 output.push(mutation);
2044 }
2045 pc += 1;
2046 }
2047 OpCode::SetFieldIfNull {
2048 object,
2049 path,
2050 value,
2051 } => {
2052 let was_set = self.set_field_if_null(*object, path, *value)?;
2053 if was_set && should_emit(path) {
2054 dirty_tracker.mark_replaced(path);
2055 }
2056 pc += 1;
2057 }
2058 OpCode::SetFieldMax {
2059 object,
2060 path,
2061 value,
2062 } => {
2063 let was_updated = self.set_field_max(*object, path, *value)?;
2064 if was_updated && should_emit(path) {
2065 dirty_tracker.mark_replaced(path);
2066 }
2067 pc += 1;
2068 }
2069 OpCode::UpdateTemporalIndex {
2070 state_id: _,
2071 index_name,
2072 lookup_value,
2073 primary_key,
2074 timestamp,
2075 } => {
2076 let actual_state_id = override_state_id;
2077 let state = self
2078 .states
2079 .get_mut(&actual_state_id)
2080 .ok_or("State table not found")?;
2081 let index = state
2082 .temporal_indexes
2083 .entry(index_name.clone())
2084 .or_insert_with(TemporalIndex::new);
2085
2086 let lookup_val = self.registers[*lookup_value].clone();
2087 let pk_val = self.registers[*primary_key].clone();
2088 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2089 val
2090 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2091 val as i64
2092 } else {
2093 return Err(format!(
2094 "Timestamp must be a number (i64 or u64), got: {:?}",
2095 self.registers[*timestamp]
2096 )
2097 .into());
2098 };
2099
2100 index.insert(lookup_val, pk_val, ts_val);
2101 pc += 1;
2102 }
2103 OpCode::LookupTemporalIndex {
2104 state_id: _,
2105 index_name,
2106 lookup_value,
2107 timestamp,
2108 dest,
2109 } => {
2110 let actual_state_id = override_state_id;
2111 let state = self
2112 .states
2113 .get(&actual_state_id)
2114 .ok_or("State table not found")?;
2115 let lookup_val = &self.registers[*lookup_value];
2116
2117 let result = if self.registers[*timestamp].is_null() {
2118 if let Some(index) = state.temporal_indexes.get(index_name) {
2119 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2120 } else {
2121 Value::Null
2122 }
2123 } else {
2124 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2125 val
2126 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2127 val as i64
2128 } else {
2129 return Err(format!(
2130 "Timestamp must be a number (i64 or u64), got: {:?}",
2131 self.registers[*timestamp]
2132 )
2133 .into());
2134 };
2135
2136 if let Some(index) = state.temporal_indexes.get(index_name) {
2137 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2138 } else {
2139 Value::Null
2140 }
2141 };
2142
2143 self.registers[*dest] = result;
2144 pc += 1;
2145 }
2146 OpCode::UpdateLookupIndex {
2147 state_id: _,
2148 index_name,
2149 lookup_value,
2150 primary_key,
2151 } => {
2152 let actual_state_id = override_state_id;
2153 let state = self
2154 .states
2155 .get_mut(&actual_state_id)
2156 .ok_or("State table not found")?;
2157 let index = state
2158 .lookup_indexes
2159 .entry(index_name.clone())
2160 .or_insert_with(LookupIndex::new);
2161
2162 let lookup_val = self.registers[*lookup_value].clone();
2163 let pk_val = self.registers[*primary_key].clone();
2164
2165 index.insert(lookup_val.clone(), pk_val);
2166
2167 if let Some(key_str) = lookup_val.as_str() {
2169 self.last_lookup_index_keys.push(key_str.to_string());
2170 }
2171
2172 pc += 1;
2173 }
2174 OpCode::LookupIndex {
2175 state_id: _,
2176 index_name,
2177 lookup_value,
2178 dest,
2179 } => {
2180 let actual_state_id = override_state_id;
2181 let mut current_value = self.registers[*lookup_value].clone();
2182
2183 const MAX_CHAIN_DEPTH: usize = 5;
2184 let mut iterations = 0;
2185
2186 let final_result = if self.states.contains_key(&actual_state_id) {
2187 loop {
2188 iterations += 1;
2189 if iterations > MAX_CHAIN_DEPTH {
2190 break current_value;
2191 }
2192
2193 let resolved = self
2194 .states
2195 .get(&actual_state_id)
2196 .and_then(|state| {
2197 if let Some(index) = state.lookup_indexes.get(index_name) {
2198 if let Some(found) = index.lookup(¤t_value) {
2199 return Some(found);
2200 }
2201 }
2202
2203 for (name, index) in state.lookup_indexes.iter() {
2204 if name == index_name {
2205 continue;
2206 }
2207 if let Some(found) = index.lookup(¤t_value) {
2208 return Some(found);
2209 }
2210 }
2211
2212 None
2213 })
2214 .unwrap_or(Value::Null);
2215
2216 let mut resolved_from_pda = false;
2217 let resolved = if resolved.is_null() {
2218 if let Some(pda_str) = current_value.as_str() {
2219 resolved_from_pda = true;
2220 self.states
2221 .get_mut(&actual_state_id)
2222 .and_then(|state_mut| {
2223 state_mut
2224 .pda_reverse_lookups
2225 .get_mut("default_pda_lookup")
2226 })
2227 .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2228 .map(Value::String)
2229 .unwrap_or(Value::Null)
2230 } else {
2231 Value::Null
2232 }
2233 } else {
2234 resolved
2235 };
2236
2237 if resolved.is_null() {
2238 if iterations == 1 {
2239 if let Some(pda_str) = current_value.as_str() {
2240 self.last_pda_lookup_miss = Some(pda_str.to_string());
2241 }
2242 }
2243 break Value::Null;
2244 }
2245
2246 let can_chain =
2247 self.can_resolve_further(&resolved, actual_state_id, index_name);
2248
2249 if !can_chain {
2250 if resolved_from_pda {
2251 if let Some(resolved_str) = resolved.as_str() {
2252 self.last_lookup_index_miss =
2253 Some(resolved_str.to_string());
2254 }
2255 break Value::Null;
2256 }
2257 break resolved;
2258 }
2259
2260 current_value = resolved;
2261 }
2262 } else {
2263 Value::Null
2264 };
2265
2266 self.registers[*dest] = final_result;
2267 pc += 1;
2268 }
2269 OpCode::SetFieldSum {
2270 object,
2271 path,
2272 value,
2273 } => {
2274 let was_updated = self.set_field_sum(*object, path, *value)?;
2275 if was_updated && should_emit(path) {
2276 dirty_tracker.mark_replaced(path);
2277 }
2278 pc += 1;
2279 }
2280 OpCode::SetFieldIncrement { object, path } => {
2281 let was_updated = self.set_field_increment(*object, path)?;
2282 if was_updated && should_emit(path) {
2283 dirty_tracker.mark_replaced(path);
2284 }
2285 pc += 1;
2286 }
2287 OpCode::SetFieldMin {
2288 object,
2289 path,
2290 value,
2291 } => {
2292 let was_updated = self.set_field_min(*object, path, *value)?;
2293 if was_updated && should_emit(path) {
2294 dirty_tracker.mark_replaced(path);
2295 }
2296 pc += 1;
2297 }
2298 OpCode::AddToUniqueSet {
2299 state_id: _,
2300 set_name,
2301 value,
2302 count_object,
2303 count_path,
2304 } => {
2305 let value_to_add = self.registers[*value].clone();
2306
2307 let set_field_path = format!("__unique_set:{}", set_name);
2310
2311 let mut set: HashSet<Value> =
2313 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2314 if !existing.is_null() {
2315 serde_json::from_value(existing).unwrap_or_default()
2316 } else {
2317 HashSet::new()
2318 }
2319 } else {
2320 HashSet::new()
2321 };
2322
2323 let was_new = set.insert(value_to_add);
2325
2326 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2328 self.registers[100] = serde_json::to_value(set_as_vec)?;
2329 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2330
2331 if was_new {
2333 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2334 self.set_field_auto_vivify(*count_object, count_path, 100)?;
2335 if should_emit(count_path) {
2336 dirty_tracker.mark_replaced(count_path);
2337 }
2338 }
2339
2340 pc += 1;
2341 }
2342 OpCode::ConditionalSetField {
2343 object,
2344 path,
2345 value,
2346 condition_field,
2347 condition_op,
2348 condition_value,
2349 } => {
2350 let field_value = self.load_field(event_value, condition_field, None)?;
2351 let condition_met =
2352 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2353
2354 if condition_met {
2355 self.set_field_auto_vivify(*object, path, *value)?;
2356 if should_emit(path) {
2357 dirty_tracker.mark_replaced(path);
2358 }
2359 }
2360 pc += 1;
2361 }
2362 OpCode::SetFieldWhen {
2363 object,
2364 path,
2365 value,
2366 when_instruction,
2367 entity_name,
2368 key_reg,
2369 condition_field,
2370 condition_op,
2371 condition_value,
2372 } => {
2373 let actual_state_id = override_state_id;
2374 let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2375 condition_field.as_ref(),
2376 condition_op.as_ref(),
2377 condition_value.as_ref(),
2378 ) {
2379 let field_value = self.load_field(event_value, field, None)?;
2380 self.evaluate_comparison(&field_value, op, cond_value)?
2381 } else {
2382 true
2383 };
2384
2385 if !condition_met {
2386 pc += 1;
2387 continue;
2388 }
2389
2390 let signature = self
2391 .current_context
2392 .as_ref()
2393 .and_then(|c| c.signature.clone())
2394 .unwrap_or_default();
2395
2396 let emit = should_emit(path);
2397
2398 let instruction_seen = if !signature.is_empty() {
2399 if let Some(state) = self.states.get(&actual_state_id) {
2400 let mut cache = state.recent_tx_instructions.lock().unwrap();
2401 cache
2402 .get(&signature)
2403 .map(|set| set.contains(when_instruction))
2404 .unwrap_or(false)
2405 } else {
2406 false
2407 }
2408 } else {
2409 false
2410 };
2411
2412 if instruction_seen {
2413 self.set_field_auto_vivify(*object, path, *value)?;
2414 if emit {
2415 dirty_tracker.mark_replaced(path);
2416 }
2417 } else if !signature.is_empty() {
2418 let deferred = DeferredWhenOperation {
2419 entity_name: entity_name.clone(),
2420 primary_key: self.registers[*key_reg].clone(),
2421 field_path: path.clone(),
2422 field_value: self.registers[*value].clone(),
2423 when_instruction: when_instruction.clone(),
2424 signature: signature.clone(),
2425 slot: self
2426 .current_context
2427 .as_ref()
2428 .and_then(|c| c.slot)
2429 .unwrap_or(0),
2430 deferred_at: std::time::SystemTime::now()
2431 .duration_since(std::time::UNIX_EPOCH)
2432 .unwrap()
2433 .as_secs() as i64,
2434 emit,
2435 };
2436
2437 if let Some(state) = self.states.get(&actual_state_id) {
2438 let key = (signature, when_instruction.clone());
2439 state
2440 .deferred_when_ops
2441 .entry(key)
2442 .or_insert_with(Vec::new)
2443 .push(deferred);
2444 }
2445 }
2446
2447 pc += 1;
2448 }
2449 OpCode::SetFieldUnlessStopped {
2450 object,
2451 path,
2452 value,
2453 stop_field,
2454 stop_instruction,
2455 entity_name,
2456 key_reg: _,
2457 } => {
2458 let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2459 let stopped = stop_value.as_bool().unwrap_or(false);
2460
2461 if stopped {
2462 tracing::debug!(
2463 entity = %entity_name,
2464 field = %path,
2465 stop_field = %stop_field,
2466 stop_instruction = %stop_instruction,
2467 "stop flag set; skipping field update"
2468 );
2469 pc += 1;
2470 continue;
2471 }
2472
2473 self.set_field_auto_vivify(*object, path, *value)?;
2474 if should_emit(path) {
2475 dirty_tracker.mark_replaced(path);
2476 }
2477 pc += 1;
2478 }
2479 OpCode::ConditionalIncrement {
2480 object,
2481 path,
2482 condition_field,
2483 condition_op,
2484 condition_value,
2485 } => {
2486 let field_value = self.load_field(event_value, condition_field, None)?;
2487 let condition_met =
2488 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2489
2490 if condition_met {
2491 let was_updated = self.set_field_increment(*object, path)?;
2492 if was_updated && should_emit(path) {
2493 dirty_tracker.mark_replaced(path);
2494 }
2495 }
2496 pc += 1;
2497 }
2498 OpCode::EvaluateComputedFields {
2499 state,
2500 computed_paths,
2501 } => {
2502 if let Some(evaluator) = entity_evaluator {
2503 let old_values: Vec<_> = computed_paths
2504 .iter()
2505 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2506 .collect();
2507
2508 let state_value = &mut self.registers[*state];
2509 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2510 let context_timestamp = self
2511 .current_context
2512 .as_ref()
2513 .map(|c| c.timestamp())
2514 .unwrap_or_else(|| {
2515 std::time::SystemTime::now()
2516 .duration_since(std::time::UNIX_EPOCH)
2517 .unwrap()
2518 .as_secs() as i64
2519 });
2520 let eval_result = evaluator(state_value, context_slot, context_timestamp);
2521
2522 if eval_result.is_ok() {
2523 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2524 let new_value =
2525 Self::get_value_at_path(&self.registers[*state], path);
2526
2527 if new_value != *old_value && should_emit(path) {
2528 dirty_tracker.mark_replaced(path);
2529 }
2530 }
2531 }
2532 }
2533 pc += 1;
2534 }
2535 OpCode::QueueResolver {
2536 state_id: _,
2537 entity_name,
2538 resolver,
2539 input_path,
2540 input_value,
2541 strategy,
2542 extracts,
2543 state,
2544 key,
2545 } => {
2546 let actual_state_id = override_state_id;
2547 let resolved_input = if let Some(value) = input_value {
2548 Some(value.clone())
2549 } else if let Some(path) = input_path.as_ref() {
2550 Self::get_value_at_path(&self.registers[*state], path)
2551 } else {
2552 None
2553 };
2554
2555 if let Some(input) = resolved_input {
2556 let key_value = &self.registers[*key];
2557
2558 if input.is_null() || key_value.is_null() {
2559 tracing::warn!(
2560 entity = %entity_name,
2561 resolver = ?resolver,
2562 input_path = %input_path.as_deref().unwrap_or("<literal>"),
2563 input_is_null = input.is_null(),
2564 key_is_null = key_value.is_null(),
2565 input = ?input,
2566 key = ?key_value,
2567 "Resolver skipped: null input or key"
2568 );
2569 pc += 1;
2570 continue;
2571 }
2572
2573 if matches!(strategy, ResolveStrategy::SetOnce)
2574 && extracts.iter().all(|extract| {
2575 match Self::get_value_at_path(
2576 &self.registers[*state],
2577 &extract.target_path,
2578 ) {
2579 Some(value) => !value.is_null(),
2580 None => false,
2581 }
2582 })
2583 {
2584 pc += 1;
2585 continue;
2586 }
2587
2588 let cache_key = resolver_cache_key(resolver, &input);
2589
2590 let cached_value = self.resolver_cache.get(&cache_key).cloned();
2591 if let Some(cached) = cached_value {
2592 self.resolver_cache_hits += 1;
2593 Self::apply_resolver_extractions_to_value(
2594 &mut self.registers[*state],
2595 &cached,
2596 extracts,
2597 &mut dirty_tracker,
2598 &should_emit,
2599 )?;
2600 } else {
2601 self.resolver_cache_misses += 1;
2602 let target = ResolverTarget {
2603 state_id: actual_state_id,
2604 entity_name: entity_name.clone(),
2605 primary_key: self.registers[*key].clone(),
2606 extracts: extracts.clone(),
2607 };
2608
2609 self.enqueue_resolver_request(
2610 cache_key,
2611 resolver.clone(),
2612 input,
2613 target,
2614 );
2615 }
2616 } else {
2617 tracing::warn!(
2618 entity = %entity_name,
2619 resolver = ?resolver,
2620 input_path = %input_path.as_deref().unwrap_or("<literal>"),
2621 state = ?self.registers[*state],
2622 "Resolver skipped: input path not found in state"
2623 );
2624 }
2625
2626 pc += 1;
2627 }
2628 OpCode::UpdatePdaReverseLookup {
2629 state_id: _,
2630 lookup_name,
2631 pda_address,
2632 primary_key,
2633 } => {
2634 let actual_state_id = override_state_id;
2635 let state = self
2636 .states
2637 .get_mut(&actual_state_id)
2638 .ok_or("State table not found")?;
2639
2640 let pda_val = self.registers[*pda_address].clone();
2641 let pk_val = self.registers[*primary_key].clone();
2642
2643 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2644 let pda_lookup = state
2645 .pda_reverse_lookups
2646 .entry(lookup_name.clone())
2647 .or_insert_with(|| {
2648 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2649 });
2650
2651 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2652 self.last_pda_registered = Some(pda_str.to_string());
2653 } else if !pk_val.is_null() {
2654 if let Some(pk_num) = pk_val.as_u64() {
2655 if let Some(pda_str) = pda_val.as_str() {
2656 let pda_lookup = state
2657 .pda_reverse_lookups
2658 .entry(lookup_name.clone())
2659 .or_insert_with(|| {
2660 PdaReverseLookup::new(
2661 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2662 )
2663 });
2664
2665 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2666 self.last_pda_registered = Some(pda_str.to_string());
2667 }
2668 }
2669 }
2670
2671 pc += 1;
2672 }
2673 }
2674
2675 self.instructions_executed += 1;
2676 }
2677
2678 Ok(output)
2679 }
2680
2681 fn load_field(
2682 &self,
2683 event_value: &Value,
2684 path: &FieldPath,
2685 default: Option<&Value>,
2686 ) -> Result<Value> {
2687 if path.segments.is_empty() {
2688 if let Some(obj) = event_value.as_object() {
2689 let filtered: serde_json::Map<String, Value> = obj
2690 .iter()
2691 .filter(|(k, _)| !k.starts_with("__"))
2692 .map(|(k, v)| (k.clone(), v.clone()))
2693 .collect();
2694 return Ok(Value::Object(filtered));
2695 }
2696 return Ok(event_value.clone());
2697 }
2698
2699 let mut current = event_value;
2700 for segment in path.segments.iter() {
2701 current = match current.get(segment) {
2702 Some(v) => v,
2703 None => return Ok(default.cloned().unwrap_or(Value::Null)),
2704 };
2705 }
2706
2707 Ok(current.clone())
2708 }
2709
2710 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
2711 let mut current = value;
2712 for segment in path.split('.') {
2713 current = current.get(segment)?;
2714 }
2715 Some(current.clone())
2716 }
2717
2718 fn set_field_auto_vivify(
2719 &mut self,
2720 object_reg: Register,
2721 path: &str,
2722 value_reg: Register,
2723 ) -> Result<()> {
2724 let compiled = self.get_compiled_path(path);
2725 let segments = compiled.segments();
2726 let value = self.registers[value_reg].clone();
2727
2728 if !self.registers[object_reg].is_object() {
2729 self.registers[object_reg] = json!({});
2730 }
2731
2732 let obj = self.registers[object_reg]
2733 .as_object_mut()
2734 .ok_or("Not an object")?;
2735
2736 let mut current = obj;
2737 for (i, segment) in segments.iter().enumerate() {
2738 if i == segments.len() - 1 {
2739 current.insert(segment.to_string(), value);
2740 return Ok(());
2741 } else {
2742 current
2743 .entry(segment.to_string())
2744 .or_insert_with(|| json!({}));
2745 current = current
2746 .get_mut(segment)
2747 .and_then(|v| v.as_object_mut())
2748 .ok_or("Path collision: expected object")?;
2749 }
2750 }
2751
2752 Ok(())
2753 }
2754
2755 fn set_field_if_null(
2756 &mut self,
2757 object_reg: Register,
2758 path: &str,
2759 value_reg: Register,
2760 ) -> Result<bool> {
2761 let compiled = self.get_compiled_path(path);
2762 let segments = compiled.segments();
2763 let value = self.registers[value_reg].clone();
2764
2765 if value.is_null() {
2769 return Ok(false);
2770 }
2771
2772 if !self.registers[object_reg].is_object() {
2773 self.registers[object_reg] = json!({});
2774 }
2775
2776 let obj = self.registers[object_reg]
2777 .as_object_mut()
2778 .ok_or("Not an object")?;
2779
2780 let mut current = obj;
2781 for (i, segment) in segments.iter().enumerate() {
2782 if i == segments.len() - 1 {
2783 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2784 current.insert(segment.to_string(), value);
2785 return Ok(true);
2786 }
2787 return Ok(false);
2788 } else {
2789 current
2790 .entry(segment.to_string())
2791 .or_insert_with(|| json!({}));
2792 current = current
2793 .get_mut(segment)
2794 .and_then(|v| v.as_object_mut())
2795 .ok_or("Path collision: expected object")?;
2796 }
2797 }
2798
2799 Ok(false)
2800 }
2801
2802 fn set_field_max(
2803 &mut self,
2804 object_reg: Register,
2805 path: &str,
2806 value_reg: Register,
2807 ) -> Result<bool> {
2808 let compiled = self.get_compiled_path(path);
2809 let segments = compiled.segments();
2810 let new_value = self.registers[value_reg].clone();
2811
2812 if !self.registers[object_reg].is_object() {
2813 self.registers[object_reg] = json!({});
2814 }
2815
2816 let obj = self.registers[object_reg]
2817 .as_object_mut()
2818 .ok_or("Not an object")?;
2819
2820 let mut current = obj;
2821 for (i, segment) in segments.iter().enumerate() {
2822 if i == segments.len() - 1 {
2823 let should_update = if let Some(current_value) = current.get(segment) {
2824 if current_value.is_null() {
2825 true
2826 } else {
2827 match (current_value.as_i64(), new_value.as_i64()) {
2828 (Some(current_val), Some(new_val)) => new_val > current_val,
2829 (Some(current_val), None) if new_value.as_u64().is_some() => {
2830 new_value.as_u64().unwrap() as i64 > current_val
2831 }
2832 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2833 new_val > current_value.as_u64().unwrap() as i64
2834 }
2835 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2836 (Some(current_val), Some(new_val)) => new_val > current_val,
2837 _ => match (current_value.as_f64(), new_value.as_f64()) {
2838 (Some(current_val), Some(new_val)) => new_val > current_val,
2839 _ => false,
2840 },
2841 },
2842 _ => false,
2843 }
2844 }
2845 } else {
2846 true
2847 };
2848
2849 if should_update {
2850 current.insert(segment.to_string(), new_value);
2851 return Ok(true);
2852 }
2853 return Ok(false);
2854 } else {
2855 current
2856 .entry(segment.to_string())
2857 .or_insert_with(|| json!({}));
2858 current = current
2859 .get_mut(segment)
2860 .and_then(|v| v.as_object_mut())
2861 .ok_or("Path collision: expected object")?;
2862 }
2863 }
2864
2865 Ok(false)
2866 }
2867
2868 fn set_field_sum(
2869 &mut self,
2870 object_reg: Register,
2871 path: &str,
2872 value_reg: Register,
2873 ) -> Result<bool> {
2874 let compiled = self.get_compiled_path(path);
2875 let segments = compiled.segments();
2876 let new_value = &self.registers[value_reg];
2877
2878 let new_val_num = new_value
2880 .as_i64()
2881 .or_else(|| new_value.as_u64().map(|n| n as i64))
2882 .ok_or("Sum requires numeric value")?;
2883
2884 if !self.registers[object_reg].is_object() {
2885 self.registers[object_reg] = json!({});
2886 }
2887
2888 let obj = self.registers[object_reg]
2889 .as_object_mut()
2890 .ok_or("Not an object")?;
2891
2892 let mut current = obj;
2893 for (i, segment) in segments.iter().enumerate() {
2894 if i == segments.len() - 1 {
2895 let current_val = current
2896 .get(segment)
2897 .and_then(|v| {
2898 if v.is_null() {
2899 None
2900 } else {
2901 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2902 }
2903 })
2904 .unwrap_or(0);
2905
2906 let sum = current_val + new_val_num;
2907 current.insert(segment.to_string(), json!(sum));
2908 return Ok(true);
2909 } else {
2910 current
2911 .entry(segment.to_string())
2912 .or_insert_with(|| json!({}));
2913 current = current
2914 .get_mut(segment)
2915 .and_then(|v| v.as_object_mut())
2916 .ok_or("Path collision: expected object")?;
2917 }
2918 }
2919
2920 Ok(false)
2921 }
2922
2923 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2924 let compiled = self.get_compiled_path(path);
2925 let segments = compiled.segments();
2926
2927 if !self.registers[object_reg].is_object() {
2928 self.registers[object_reg] = json!({});
2929 }
2930
2931 let obj = self.registers[object_reg]
2932 .as_object_mut()
2933 .ok_or("Not an object")?;
2934
2935 let mut current = obj;
2936 for (i, segment) in segments.iter().enumerate() {
2937 if i == segments.len() - 1 {
2938 let current_val = current
2940 .get(segment)
2941 .and_then(|v| {
2942 if v.is_null() {
2943 None
2944 } else {
2945 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2946 }
2947 })
2948 .unwrap_or(0);
2949
2950 let incremented = current_val + 1;
2951 current.insert(segment.to_string(), json!(incremented));
2952 return Ok(true);
2953 } else {
2954 current
2955 .entry(segment.to_string())
2956 .or_insert_with(|| json!({}));
2957 current = current
2958 .get_mut(segment)
2959 .and_then(|v| v.as_object_mut())
2960 .ok_or("Path collision: expected object")?;
2961 }
2962 }
2963
2964 Ok(false)
2965 }
2966
2967 fn set_field_min(
2968 &mut self,
2969 object_reg: Register,
2970 path: &str,
2971 value_reg: Register,
2972 ) -> Result<bool> {
2973 let compiled = self.get_compiled_path(path);
2974 let segments = compiled.segments();
2975 let new_value = self.registers[value_reg].clone();
2976
2977 if !self.registers[object_reg].is_object() {
2978 self.registers[object_reg] = json!({});
2979 }
2980
2981 let obj = self.registers[object_reg]
2982 .as_object_mut()
2983 .ok_or("Not an object")?;
2984
2985 let mut current = obj;
2986 for (i, segment) in segments.iter().enumerate() {
2987 if i == segments.len() - 1 {
2988 let should_update = if let Some(current_value) = current.get(segment) {
2989 if current_value.is_null() {
2990 true
2991 } else {
2992 match (current_value.as_i64(), new_value.as_i64()) {
2993 (Some(current_val), Some(new_val)) => new_val < current_val,
2994 (Some(current_val), None) if new_value.as_u64().is_some() => {
2995 (new_value.as_u64().unwrap() as i64) < current_val
2996 }
2997 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2998 new_val < current_value.as_u64().unwrap() as i64
2999 }
3000 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3001 (Some(current_val), Some(new_val)) => new_val < current_val,
3002 _ => match (current_value.as_f64(), new_value.as_f64()) {
3003 (Some(current_val), Some(new_val)) => new_val < current_val,
3004 _ => false,
3005 },
3006 },
3007 _ => false,
3008 }
3009 }
3010 } else {
3011 true
3012 };
3013
3014 if should_update {
3015 current.insert(segment.to_string(), new_value);
3016 return Ok(true);
3017 }
3018 return Ok(false);
3019 } else {
3020 current
3021 .entry(segment.to_string())
3022 .or_insert_with(|| json!({}));
3023 current = current
3024 .get_mut(segment)
3025 .and_then(|v| v.as_object_mut())
3026 .ok_or("Path collision: expected object")?;
3027 }
3028 }
3029
3030 Ok(false)
3031 }
3032
3033 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3034 let compiled = self.get_compiled_path(path);
3035 let segments = compiled.segments();
3036 let mut current = &self.registers[object_reg];
3037
3038 for segment in segments {
3039 current = current
3040 .get(segment)
3041 .ok_or_else(|| format!("Field not found: {}", segment))?;
3042 }
3043
3044 Ok(current.clone())
3045 }
3046
3047 fn append_to_array(
3048 &mut self,
3049 object_reg: Register,
3050 path: &str,
3051 value_reg: Register,
3052 max_length: usize,
3053 ) -> Result<()> {
3054 let compiled = self.get_compiled_path(path);
3055 let segments = compiled.segments();
3056 let value = self.registers[value_reg].clone();
3057
3058 if !self.registers[object_reg].is_object() {
3059 self.registers[object_reg] = json!({});
3060 }
3061
3062 let obj = self.registers[object_reg]
3063 .as_object_mut()
3064 .ok_or("Not an object")?;
3065
3066 let mut current = obj;
3067 for (i, segment) in segments.iter().enumerate() {
3068 if i == segments.len() - 1 {
3069 current
3070 .entry(segment.to_string())
3071 .or_insert_with(|| json!([]));
3072 let arr = current
3073 .get_mut(segment)
3074 .and_then(|v| v.as_array_mut())
3075 .ok_or("Path is not an array")?;
3076 arr.push(value.clone());
3077
3078 if arr.len() > max_length {
3079 let excess = arr.len() - max_length;
3080 arr.drain(0..excess);
3081 }
3082 } else {
3083 current
3084 .entry(segment.to_string())
3085 .or_insert_with(|| json!({}));
3086 current = current
3087 .get_mut(segment)
3088 .and_then(|v| v.as_object_mut())
3089 .ok_or("Path collision: expected object")?;
3090 }
3091 }
3092
3093 Ok(())
3094 }
3095
3096 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3097 let value = &self.registers[reg];
3098 let transformed = Self::apply_transformation(value, transformation)?;
3099 self.registers[reg] = transformed;
3100 Ok(())
3101 }
3102
3103 fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3104 match transformation {
3105 Transformation::HexEncode => {
3106 if let Some(arr) = value.as_array() {
3107 let bytes: Vec<u8> = arr
3108 .iter()
3109 .filter_map(|v| v.as_u64().map(|n| n as u8))
3110 .collect();
3111 let hex = hex::encode(&bytes);
3112 Ok(json!(hex))
3113 } else if value.is_string() {
3114 Ok(value.clone())
3115 } else {
3116 Err("HexEncode requires an array of numbers".into())
3117 }
3118 }
3119 Transformation::HexDecode => {
3120 if let Some(s) = value.as_str() {
3121 let s = s.strip_prefix("0x").unwrap_or(s);
3122 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3123 Ok(json!(bytes))
3124 } else {
3125 Err("HexDecode requires a string".into())
3126 }
3127 }
3128 Transformation::Base58Encode => {
3129 if let Some(arr) = value.as_array() {
3130 let bytes: Vec<u8> = arr
3131 .iter()
3132 .filter_map(|v| v.as_u64().map(|n| n as u8))
3133 .collect();
3134 let encoded = bs58::encode(&bytes).into_string();
3135 Ok(json!(encoded))
3136 } else if value.is_string() {
3137 Ok(value.clone())
3138 } else {
3139 Err("Base58Encode requires an array of numbers".into())
3140 }
3141 }
3142 Transformation::Base58Decode => {
3143 if let Some(s) = value.as_str() {
3144 let bytes = bs58::decode(s)
3145 .into_vec()
3146 .map_err(|e| format!("Base58 decode error: {}", e))?;
3147 Ok(json!(bytes))
3148 } else {
3149 Err("Base58Decode requires a string".into())
3150 }
3151 }
3152 Transformation::ToString => Ok(json!(value.to_string())),
3153 Transformation::ToNumber => {
3154 if let Some(s) = value.as_str() {
3155 let n = s
3156 .parse::<i64>()
3157 .map_err(|e| format!("Parse error: {}", e))?;
3158 Ok(json!(n))
3159 } else {
3160 Ok(value.clone())
3161 }
3162 }
3163 }
3164 }
3165
3166 fn evaluate_comparison(
3167 &self,
3168 field_value: &Value,
3169 op: &ComparisonOp,
3170 condition_value: &Value,
3171 ) -> Result<bool> {
3172 use ComparisonOp::*;
3173
3174 match op {
3175 Equal => Ok(field_value == condition_value),
3176 NotEqual => Ok(field_value != condition_value),
3177 GreaterThan => {
3178 match (field_value.as_i64(), condition_value.as_i64()) {
3180 (Some(a), Some(b)) => Ok(a > b),
3181 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3182 (Some(a), Some(b)) => Ok(a > b),
3183 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3184 (Some(a), Some(b)) => Ok(a > b),
3185 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3186 },
3187 },
3188 }
3189 }
3190 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3191 (Some(a), Some(b)) => Ok(a >= b),
3192 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3193 (Some(a), Some(b)) => Ok(a >= b),
3194 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3195 (Some(a), Some(b)) => Ok(a >= b),
3196 _ => {
3197 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3198 }
3199 },
3200 },
3201 },
3202 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3203 (Some(a), Some(b)) => Ok(a < b),
3204 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3205 (Some(a), Some(b)) => Ok(a < b),
3206 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3207 (Some(a), Some(b)) => Ok(a < b),
3208 _ => Err("Cannot compare non-numeric values with LessThan".into()),
3209 },
3210 },
3211 },
3212 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3213 (Some(a), Some(b)) => Ok(a <= b),
3214 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3215 (Some(a), Some(b)) => Ok(a <= b),
3216 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3217 (Some(a), Some(b)) => Ok(a <= b),
3218 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3219 },
3220 },
3221 },
3222 }
3223 }
3224
3225 fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3226 if let Some(state) = self.states.get(&state_id) {
3227 if let Some(index) = state.lookup_indexes.get(index_name) {
3228 if index.lookup(value).is_some() {
3229 return true;
3230 }
3231 }
3232
3233 for (name, index) in state.lookup_indexes.iter() {
3234 if name == index_name {
3235 continue;
3236 }
3237 if index.lookup(value).is_some() {
3238 return true;
3239 }
3240 }
3241
3242 if let Some(pda_str) = value.as_str() {
3243 if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3244 if pda_lookup.contains(pda_str) {
3245 return true;
3246 }
3247 }
3248 }
3249 }
3250
3251 false
3252 }
3253
3254 fn apply_deferred_when_op(
3255 &mut self,
3256 state_id: u32,
3257 op: &DeferredWhenOperation,
3258 ) -> Result<Vec<Mutation>> {
3259 let state = self.states.get(&state_id).ok_or("State not found")?;
3260
3261 if op.primary_key.is_null() {
3262 return Ok(vec![]);
3263 }
3264
3265 let mut entity_state = state
3266 .get_and_touch(&op.primary_key)
3267 .unwrap_or_else(|| json!({}));
3268
3269 Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3270
3271 state.insert_with_eviction(op.primary_key.clone(), entity_state);
3272
3273 if !op.emit {
3274 return Ok(vec![]);
3275 }
3276
3277 let mut patch = json!({});
3278 Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3279
3280 Ok(vec![Mutation {
3281 export: op.entity_name.clone(),
3282 key: op.primary_key.clone(),
3283 patch,
3284 append: vec![],
3285 }])
3286 }
3287
3288 fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3289 let parts: Vec<&str> = path.split('.').collect();
3290 let mut current = obj;
3291
3292 for (i, part) in parts.iter().enumerate() {
3293 if i == parts.len() - 1 {
3294 if let Some(map) = current.as_object_mut() {
3295 map.insert(part.to_string(), value);
3296 return Ok(());
3297 }
3298 return Err("Cannot set field on non-object".into());
3299 }
3300
3301 if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3302 if let Some(map) = current.as_object_mut() {
3303 map.insert(part.to_string(), json!({}));
3304 }
3305 }
3306
3307 current = current.get_mut(*part).ok_or("Path navigation failed")?;
3308 }
3309
3310 Ok(())
3311 }
3312
3313 pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3314 let now = std::time::SystemTime::now()
3315 .duration_since(std::time::UNIX_EPOCH)
3316 .unwrap()
3317 .as_secs() as i64;
3318
3319 let state = match self.states.get(&state_id) {
3320 Some(s) => s,
3321 None => return 0,
3322 };
3323
3324 let mut removed = 0;
3325 state.deferred_when_ops.retain(|_, ops| {
3326 let before = ops.len();
3327 ops.retain(|op| now - op.deferred_at < max_age_secs);
3328 removed += before - ops.len();
3329 !ops.is_empty()
3330 });
3331
3332 removed
3333 }
3334
3335 #[cfg_attr(feature = "otel", instrument(
3344 name = "vm.update_pda_lookup",
3345 skip(self),
3346 fields(
3347 pda = %pda_address,
3348 seed = %seed_value,
3349 )
3350 ))]
3351 pub fn update_pda_reverse_lookup(
3352 &mut self,
3353 state_id: u32,
3354 lookup_name: &str,
3355 pda_address: String,
3356 seed_value: String,
3357 ) -> Result<Vec<PendingAccountUpdate>> {
3358 let state = self
3359 .states
3360 .get_mut(&state_id)
3361 .ok_or("State table not found")?;
3362
3363 let lookup = state
3364 .pda_reverse_lookups
3365 .entry(lookup_name.to_string())
3366 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3367
3368 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
3369
3370 if let Some(ref evicted) = evicted_pda {
3371 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3372 let count = evicted_updates.len();
3373 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3374 }
3375 }
3376
3377 self.flush_pending_updates(state_id, &pda_address)
3379 }
3380
3381 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3386 let state = match self.states.get_mut(&state_id) {
3387 Some(s) => s,
3388 None => return 0,
3389 };
3390
3391 let now = std::time::SystemTime::now()
3392 .duration_since(std::time::UNIX_EPOCH)
3393 .unwrap()
3394 .as_secs() as i64;
3395
3396 let mut removed_count = 0;
3397
3398 state.pending_updates.retain(|_pda_address, updates| {
3400 let original_len = updates.len();
3401
3402 updates.retain(|update| {
3403 let age = now - update.queued_at;
3404 age <= PENDING_UPDATE_TTL_SECONDS
3405 });
3406
3407 removed_count += original_len - updates.len();
3408
3409 !updates.is_empty()
3411 });
3412
3413 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3415
3416 if removed_count > 0 {
3417 #[cfg(feature = "otel")]
3418 crate::vm_metrics::record_pending_updates_expired(
3419 removed_count as u64,
3420 &state.entity_name,
3421 );
3422 }
3423
3424 removed_count
3425 }
3426
3427 #[cfg_attr(feature = "otel", instrument(
3459 name = "vm.queue_account_update",
3460 skip(self, update),
3461 fields(
3462 pda = %update.pda_address,
3463 account_type = %update.account_type,
3464 slot = update.slot,
3465 )
3466 ))]
3467 pub fn queue_account_update(
3468 &mut self,
3469 state_id: u32,
3470 update: QueuedAccountUpdate,
3471 ) -> Result<()> {
3472 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3473 self.cleanup_expired_pending_updates(state_id);
3474 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3475 self.drop_oldest_pending_update(state_id)?;
3476 }
3477 }
3478
3479 let state = self
3480 .states
3481 .get_mut(&state_id)
3482 .ok_or("State table not found")?;
3483
3484 let pending = PendingAccountUpdate {
3485 account_type: update.account_type,
3486 pda_address: update.pda_address.clone(),
3487 account_data: update.account_data,
3488 slot: update.slot,
3489 write_version: update.write_version,
3490 signature: update.signature,
3491 queued_at: std::time::SystemTime::now()
3492 .duration_since(std::time::UNIX_EPOCH)
3493 .unwrap()
3494 .as_secs() as i64,
3495 };
3496
3497 let pda_address = pending.pda_address.clone();
3498 let slot = pending.slot;
3499
3500 let mut updates = state
3501 .pending_updates
3502 .entry(pda_address.clone())
3503 .or_insert_with(Vec::new);
3504
3505 let original_len = updates.len();
3506 updates.retain(|existing| existing.slot > slot);
3507 let removed_by_dedup = original_len - updates.len();
3508
3509 if removed_by_dedup > 0 {
3510 self.pending_queue_size = self
3511 .pending_queue_size
3512 .saturating_sub(removed_by_dedup as u64);
3513 }
3514
3515 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3516 updates.remove(0);
3517 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3518 }
3519
3520 updates.push(pending);
3521 #[cfg(feature = "otel")]
3522 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3523
3524 Ok(())
3525 }
3526
3527 pub fn queue_instruction_event(
3528 &mut self,
3529 state_id: u32,
3530 event: QueuedInstructionEvent,
3531 ) -> Result<()> {
3532 let state = self
3533 .states
3534 .get_mut(&state_id)
3535 .ok_or("State table not found")?;
3536
3537 let pda_address = event.pda_address.clone();
3538
3539 let pending = PendingInstructionEvent {
3540 event_type: event.event_type,
3541 pda_address: event.pda_address,
3542 event_data: event.event_data,
3543 slot: event.slot,
3544 signature: event.signature,
3545 queued_at: std::time::SystemTime::now()
3546 .duration_since(std::time::UNIX_EPOCH)
3547 .unwrap()
3548 .as_secs() as i64,
3549 };
3550
3551 let mut events = state
3552 .pending_instruction_events
3553 .entry(pda_address)
3554 .or_insert_with(Vec::new);
3555
3556 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3557 events.remove(0);
3558 }
3559
3560 events.push(pending);
3561
3562 Ok(())
3563 }
3564
3565 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
3566 self.last_pda_lookup_miss.take()
3567 }
3568
3569 pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
3570 self.last_lookup_index_miss.take()
3571 }
3572
3573 pub fn take_last_pda_registered(&mut self) -> Option<String> {
3574 self.last_pda_registered.take()
3575 }
3576
3577 pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
3578 std::mem::take(&mut self.last_lookup_index_keys)
3579 }
3580
3581 pub fn flush_pending_instruction_events(
3582 &mut self,
3583 state_id: u32,
3584 pda_address: &str,
3585 ) -> Vec<PendingInstructionEvent> {
3586 let state = match self.states.get_mut(&state_id) {
3587 Some(s) => s,
3588 None => return Vec::new(),
3589 };
3590
3591 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
3592 events
3593 } else {
3594 Vec::new()
3595 }
3596 }
3597
3598 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
3600 let state = self.states.get(&state_id)?;
3601
3602 let now = std::time::SystemTime::now()
3603 .duration_since(std::time::UNIX_EPOCH)
3604 .unwrap()
3605 .as_secs() as i64;
3606
3607 let mut total_updates = 0;
3608 let mut oldest_timestamp = now;
3609 let mut largest_pda_queue = 0;
3610 let mut estimated_memory = 0;
3611
3612 for entry in state.pending_updates.iter() {
3613 let (_, updates) = entry.pair();
3614 total_updates += updates.len();
3615 largest_pda_queue = largest_pda_queue.max(updates.len());
3616
3617 for update in updates.iter() {
3618 oldest_timestamp = oldest_timestamp.min(update.queued_at);
3619 estimated_memory += update.account_type.len() +
3621 update.pda_address.len() +
3622 update.signature.len() +
3623 16 + estimate_json_size(&update.account_data);
3625 }
3626 }
3627
3628 Some(PendingQueueStats {
3629 total_updates,
3630 unique_pdas: state.pending_updates.len(),
3631 oldest_age_seconds: now - oldest_timestamp,
3632 largest_pda_queue_size: largest_pda_queue,
3633 estimated_memory_bytes: estimated_memory,
3634 })
3635 }
3636
3637 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
3638 let mut stats = VmMemoryStats {
3639 path_cache_size: self.path_cache.len(),
3640 ..Default::default()
3641 };
3642
3643 if let Some(state) = self.states.get(&state_id) {
3644 stats.state_table_entity_count = state.data.len();
3645 stats.state_table_max_entries = state.config.max_entries;
3646 stats.state_table_at_capacity = state.is_at_capacity();
3647
3648 stats.lookup_index_count = state.lookup_indexes.len();
3649 stats.lookup_index_total_entries =
3650 state.lookup_indexes.values().map(|idx| idx.len()).sum();
3651
3652 stats.temporal_index_count = state.temporal_indexes.len();
3653 stats.temporal_index_total_entries = state
3654 .temporal_indexes
3655 .values()
3656 .map(|idx| idx.total_entries())
3657 .sum();
3658
3659 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
3660 stats.pda_reverse_lookup_total_entries = state
3661 .pda_reverse_lookups
3662 .values()
3663 .map(|lookup| lookup.len())
3664 .sum();
3665
3666 stats.version_tracker_entries = state.version_tracker.len();
3667
3668 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
3669 }
3670
3671 stats
3672 }
3673
3674 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
3675 let pending_removed = self.cleanup_expired_pending_updates(state_id);
3676 let temporal_removed = self.cleanup_temporal_indexes(state_id);
3677
3678 #[cfg(feature = "otel")]
3679 if let Some(state) = self.states.get(&state_id) {
3680 crate::vm_metrics::record_cleanup(
3681 pending_removed,
3682 temporal_removed,
3683 &state.entity_name,
3684 );
3685 }
3686
3687 CleanupResult {
3688 pending_updates_removed: pending_removed,
3689 temporal_entries_removed: temporal_removed,
3690 }
3691 }
3692
3693 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
3694 let state = match self.states.get_mut(&state_id) {
3695 Some(s) => s,
3696 None => return 0,
3697 };
3698
3699 let now = std::time::SystemTime::now()
3700 .duration_since(std::time::UNIX_EPOCH)
3701 .unwrap()
3702 .as_secs() as i64;
3703
3704 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
3705 let mut total_removed = 0;
3706
3707 for (_, index) in state.temporal_indexes.iter_mut() {
3708 total_removed += index.cleanup_expired(cutoff);
3709 }
3710
3711 total_removed
3712 }
3713
3714 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
3715 let state = self.states.get(&state_id)?;
3716
3717 if state.is_at_capacity() {
3718 Some(CapacityWarning {
3719 current_entries: state.data.len(),
3720 max_entries: state.config.max_entries,
3721 entries_over_limit: state.entries_over_limit(),
3722 })
3723 } else {
3724 None
3725 }
3726 }
3727
3728 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
3730 let state = self
3731 .states
3732 .get_mut(&state_id)
3733 .ok_or("State table not found")?;
3734
3735 let mut oldest_pda: Option<String> = None;
3736 let mut oldest_timestamp = i64::MAX;
3737
3738 for entry in state.pending_updates.iter() {
3740 let (pda, updates) = entry.pair();
3741 if let Some(update) = updates.first() {
3742 if update.queued_at < oldest_timestamp {
3743 oldest_timestamp = update.queued_at;
3744 oldest_pda = Some(pda.clone());
3745 }
3746 }
3747 }
3748
3749 if let Some(pda) = oldest_pda {
3751 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
3752 if !updates.is_empty() {
3753 updates.remove(0);
3754 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3755
3756 if updates.is_empty() {
3758 drop(updates);
3759 state.pending_updates.remove(&pda);
3760 }
3761 }
3762 }
3763 }
3764
3765 Ok(())
3766 }
3767
3768 fn flush_pending_updates(
3773 &mut self,
3774 state_id: u32,
3775 pda_address: &str,
3776 ) -> Result<Vec<PendingAccountUpdate>> {
3777 let state = self
3778 .states
3779 .get_mut(&state_id)
3780 .ok_or("State table not found")?;
3781
3782 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
3783 let count = pending_updates.len();
3784 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3785 #[cfg(feature = "otel")]
3786 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
3787 Ok(pending_updates)
3788 } else {
3789 Ok(Vec::new())
3790 }
3791 }
3792
3793 pub fn try_pda_reverse_lookup(
3795 &mut self,
3796 state_id: u32,
3797 lookup_name: &str,
3798 pda_address: &str,
3799 ) -> Option<String> {
3800 let state = self.states.get_mut(&state_id)?;
3801
3802 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
3803 if let Some(value) = lookup.lookup(pda_address) {
3804 self.pda_cache_hits += 1;
3805 return Some(value);
3806 }
3807 }
3808
3809 self.pda_cache_misses += 1;
3810 None
3811 }
3812
3813 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
3820 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
3821 }
3822
3823 fn evaluate_computed_expr_with_env(
3825 &self,
3826 expr: &ComputedExpr,
3827 state: &Value,
3828 env: &std::collections::HashMap<String, Value>,
3829 ) -> Result<Value> {
3830 match expr {
3831 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
3832
3833 ComputedExpr::Var { name } => env
3834 .get(name)
3835 .cloned()
3836 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
3837
3838 ComputedExpr::Let { name, value, body } => {
3839 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
3840 let mut new_env = env.clone();
3841 new_env.insert(name.clone(), val);
3842 self.evaluate_computed_expr_with_env(body, state, &new_env)
3843 }
3844
3845 ComputedExpr::If {
3846 condition,
3847 then_branch,
3848 else_branch,
3849 } => {
3850 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
3851 if self.value_to_bool(&cond_val) {
3852 self.evaluate_computed_expr_with_env(then_branch, state, env)
3853 } else {
3854 self.evaluate_computed_expr_with_env(else_branch, state, env)
3855 }
3856 }
3857
3858 ComputedExpr::None => Ok(Value::Null),
3859
3860 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
3861
3862 ComputedExpr::Slice { expr, start, end } => {
3863 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3864 match val {
3865 Value::Array(arr) => {
3866 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
3867 Ok(Value::Array(slice))
3868 }
3869 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
3870 }
3871 }
3872
3873 ComputedExpr::Index { expr, index } => {
3874 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3875 match val {
3876 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
3877 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
3878 }
3879 }
3880
3881 ComputedExpr::U64FromLeBytes { bytes } => {
3882 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3883 let byte_vec = self.value_to_bytes(&val)?;
3884 if byte_vec.len() < 8 {
3885 return Err(format!(
3886 "u64::from_le_bytes requires 8 bytes, got {}",
3887 byte_vec.len()
3888 )
3889 .into());
3890 }
3891 let arr: [u8; 8] = byte_vec[..8]
3892 .try_into()
3893 .map_err(|_| "Failed to convert to [u8; 8]")?;
3894 Ok(json!(u64::from_le_bytes(arr)))
3895 }
3896
3897 ComputedExpr::U64FromBeBytes { bytes } => {
3898 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3899 let byte_vec = self.value_to_bytes(&val)?;
3900 if byte_vec.len() < 8 {
3901 return Err(format!(
3902 "u64::from_be_bytes requires 8 bytes, got {}",
3903 byte_vec.len()
3904 )
3905 .into());
3906 }
3907 let arr: [u8; 8] = byte_vec[..8]
3908 .try_into()
3909 .map_err(|_| "Failed to convert to [u8; 8]")?;
3910 Ok(json!(u64::from_be_bytes(arr)))
3911 }
3912
3913 ComputedExpr::ByteArray { bytes } => {
3914 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3915 }
3916
3917 ComputedExpr::Closure { param, body } => {
3918 Ok(json!({
3921 "__closure": {
3922 "param": param,
3923 "body": serde_json::to_value(body).unwrap_or(Value::Null)
3924 }
3925 }))
3926 }
3927
3928 ComputedExpr::Unary { op, expr } => {
3929 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3930 self.apply_unary_op(op, &val)
3931 }
3932
3933 ComputedExpr::JsonToBytes { expr } => {
3934 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3935 let bytes = self.value_to_bytes(&val)?;
3937 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3938 }
3939
3940 ComputedExpr::UnwrapOr { expr, default } => {
3941 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3942 if val.is_null() {
3943 Ok(default.clone())
3944 } else {
3945 Ok(val)
3946 }
3947 }
3948
3949 ComputedExpr::Binary { op, left, right } => {
3950 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3951 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3952 self.apply_binary_op(op, &l, &r)
3953 }
3954
3955 ComputedExpr::Cast { expr, to_type } => {
3956 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3957 self.apply_cast(&val, to_type)
3958 }
3959
3960 ComputedExpr::ResolverComputed {
3961 resolver,
3962 method,
3963 args,
3964 } => {
3965 let evaluated_args: Vec<Value> = args
3966 .iter()
3967 .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
3968 .collect::<Result<Vec<_>>>()?;
3969 crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
3970 }
3971
3972 ComputedExpr::MethodCall { expr, method, args } => {
3973 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3974 if method == "map" && args.len() == 1 {
3976 if let ComputedExpr::Closure { param, body } = &args[0] {
3977 if val.is_null() {
3979 return Ok(Value::Null);
3980 }
3981
3982 if let Value::Array(arr) = &val {
3983 let results: Result<Vec<Value>> = arr
3984 .iter()
3985 .map(|elem| {
3986 let mut closure_env = env.clone();
3987 closure_env.insert(param.clone(), elem.clone());
3988 self.evaluate_computed_expr_with_env(body, state, &closure_env)
3989 })
3990 .collect();
3991 return Ok(Value::Array(results?));
3992 }
3993
3994 let mut closure_env = env.clone();
3995 closure_env.insert(param.clone(), val);
3996 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3997 }
3998 }
3999 let evaluated_args: Vec<Value> = args
4000 .iter()
4001 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4002 .collect::<Result<Vec<_>>>()?;
4003 self.apply_method_call(&val, method, &evaluated_args)
4004 }
4005
4006 ComputedExpr::Literal { value } => Ok(value.clone()),
4007
4008 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4009
4010 ComputedExpr::ContextSlot => Ok(self
4011 .current_context
4012 .as_ref()
4013 .and_then(|ctx| ctx.slot)
4014 .map(|s| json!(s))
4015 .unwrap_or(Value::Null)),
4016
4017 ComputedExpr::ContextTimestamp => Ok(self
4018 .current_context
4019 .as_ref()
4020 .map(|ctx| json!(ctx.timestamp()))
4021 .unwrap_or(Value::Null)),
4022 }
4023 }
4024
4025 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4027 match val {
4028 Value::Array(arr) => arr
4029 .iter()
4030 .map(|v| {
4031 v.as_u64()
4032 .map(|n| n as u8)
4033 .ok_or_else(|| "Array element not a valid byte".into())
4034 })
4035 .collect(),
4036 Value::String(s) => {
4037 if s.starts_with("0x") || s.starts_with("0X") {
4039 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4040 } else {
4041 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4042 }
4043 }
4044 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4045 }
4046 }
4047
4048 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4050 use crate::ast::UnaryOp;
4051 match op {
4052 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4053 UnaryOp::ReverseBits => match val.as_u64() {
4054 Some(n) => Ok(json!(n.reverse_bits())),
4055 None => match val.as_i64() {
4056 Some(n) => Ok(json!((n as u64).reverse_bits())),
4057 None => Err("reverse_bits requires an integer".into()),
4058 },
4059 },
4060 }
4061 }
4062
4063 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4065 let segments: Vec<&str> = path.split('.').collect();
4066 let mut current = state;
4067
4068 for segment in segments {
4069 match current.get(segment) {
4070 Some(v) => current = v,
4071 None => return Ok(Value::Null),
4072 }
4073 }
4074
4075 Ok(current.clone())
4076 }
4077
4078 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4080 match op {
4081 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4083 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4084 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4085 BinaryOp::Div => {
4086 if let Some(r) = right.as_i64() {
4088 if r == 0 {
4089 return Err("Division by zero".into());
4090 }
4091 }
4092 if let Some(r) = right.as_f64() {
4093 if r == 0.0 {
4094 return Err("Division by zero".into());
4095 }
4096 }
4097 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4098 }
4099 BinaryOp::Mod => {
4100 match (left.as_i64(), right.as_i64()) {
4102 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4103 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4104 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4105 _ => Err("Modulo requires non-zero integer operands".into()),
4106 },
4107 _ => Err("Modulo by zero".into()),
4108 }
4109 }
4110
4111 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4113 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4114 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4115 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4116 BinaryOp::Eq => Ok(json!(left == right)),
4117 BinaryOp::Ne => Ok(json!(left != right)),
4118
4119 BinaryOp::And => {
4121 let l_bool = self.value_to_bool(left);
4122 let r_bool = self.value_to_bool(right);
4123 Ok(json!(l_bool && r_bool))
4124 }
4125 BinaryOp::Or => {
4126 let l_bool = self.value_to_bool(left);
4127 let r_bool = self.value_to_bool(right);
4128 Ok(json!(l_bool || r_bool))
4129 }
4130
4131 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4133 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4134 _ => match (left.as_i64(), right.as_i64()) {
4135 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4136 _ => Err("XOR requires integer operands".into()),
4137 },
4138 },
4139 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4140 (Some(a), Some(b)) => Ok(json!(a & b)),
4141 _ => match (left.as_i64(), right.as_i64()) {
4142 (Some(a), Some(b)) => Ok(json!(a & b)),
4143 _ => Err("BitAnd requires integer operands".into()),
4144 },
4145 },
4146 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4147 (Some(a), Some(b)) => Ok(json!(a | b)),
4148 _ => match (left.as_i64(), right.as_i64()) {
4149 (Some(a), Some(b)) => Ok(json!(a | b)),
4150 _ => Err("BitOr requires integer operands".into()),
4151 },
4152 },
4153 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4154 (Some(a), Some(b)) => Ok(json!(a << b)),
4155 _ => match (left.as_i64(), right.as_i64()) {
4156 (Some(a), Some(b)) => Ok(json!(a << b)),
4157 _ => Err("Shl requires integer operands".into()),
4158 },
4159 },
4160 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4161 (Some(a), Some(b)) => Ok(json!(a >> b)),
4162 _ => match (left.as_i64(), right.as_i64()) {
4163 (Some(a), Some(b)) => Ok(json!(a >> b)),
4164 _ => Err("Shr requires integer operands".into()),
4165 },
4166 },
4167 }
4168 }
4169
4170 fn numeric_op<F1, F2>(
4172 &self,
4173 left: &Value,
4174 right: &Value,
4175 int_op: F1,
4176 float_op: F2,
4177 ) -> Result<Value>
4178 where
4179 F1: Fn(i64, i64) -> i64,
4180 F2: Fn(f64, f64) -> f64,
4181 {
4182 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4184 return Ok(json!(int_op(a, b)));
4185 }
4186
4187 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4189 return Ok(json!(int_op(a as i64, b as i64)));
4191 }
4192
4193 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4195 return Ok(json!(float_op(a, b)));
4196 }
4197
4198 if left.is_null() || right.is_null() {
4200 return Ok(Value::Null);
4201 }
4202
4203 Err(format!(
4204 "Cannot perform numeric operation on {:?} and {:?}",
4205 left, right
4206 )
4207 .into())
4208 }
4209
4210 fn comparison_op<F1, F2>(
4212 &self,
4213 left: &Value,
4214 right: &Value,
4215 int_cmp: F1,
4216 float_cmp: F2,
4217 ) -> Result<Value>
4218 where
4219 F1: Fn(i64, i64) -> bool,
4220 F2: Fn(f64, f64) -> bool,
4221 {
4222 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4224 return Ok(json!(int_cmp(a, b)));
4225 }
4226
4227 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4229 return Ok(json!(int_cmp(a as i64, b as i64)));
4230 }
4231
4232 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4234 return Ok(json!(float_cmp(a, b)));
4235 }
4236
4237 if left.is_null() || right.is_null() {
4239 return Ok(json!(false));
4240 }
4241
4242 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4243 }
4244
4245 fn value_to_bool(&self, value: &Value) -> bool {
4247 match value {
4248 Value::Null => false,
4249 Value::Bool(b) => *b,
4250 Value::Number(n) => {
4251 if let Some(i) = n.as_i64() {
4252 i != 0
4253 } else if let Some(f) = n.as_f64() {
4254 f != 0.0
4255 } else {
4256 true
4257 }
4258 }
4259 Value::String(s) => !s.is_empty(),
4260 Value::Array(arr) => !arr.is_empty(),
4261 Value::Object(obj) => !obj.is_empty(),
4262 }
4263 }
4264
4265 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4267 match to_type {
4268 "i8" | "i16" | "i32" | "i64" | "isize" => {
4269 if let Some(n) = value.as_i64() {
4270 Ok(json!(n))
4271 } else if let Some(n) = value.as_u64() {
4272 Ok(json!(n as i64))
4273 } else if let Some(n) = value.as_f64() {
4274 Ok(json!(n as i64))
4275 } else if let Some(s) = value.as_str() {
4276 s.parse::<i64>()
4277 .map(|n| json!(n))
4278 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4279 } else {
4280 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4281 }
4282 }
4283 "u8" | "u16" | "u32" | "u64" | "usize" => {
4284 if let Some(n) = value.as_u64() {
4285 Ok(json!(n))
4286 } else if let Some(n) = value.as_i64() {
4287 Ok(json!(n as u64))
4288 } else if let Some(n) = value.as_f64() {
4289 Ok(json!(n as u64))
4290 } else if let Some(s) = value.as_str() {
4291 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4292 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4293 })
4294 } else {
4295 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4296 }
4297 }
4298 "f32" | "f64" => {
4299 if let Some(n) = value.as_f64() {
4300 Ok(json!(n))
4301 } else if let Some(n) = value.as_i64() {
4302 Ok(json!(n as f64))
4303 } else if let Some(n) = value.as_u64() {
4304 Ok(json!(n as f64))
4305 } else if let Some(s) = value.as_str() {
4306 s.parse::<f64>()
4307 .map(|n| json!(n))
4308 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4309 } else {
4310 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4311 }
4312 }
4313 "String" | "string" => Ok(json!(value.to_string())),
4314 "bool" => Ok(json!(self.value_to_bool(value))),
4315 _ => {
4316 Ok(value.clone())
4318 }
4319 }
4320 }
4321
4322 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4324 match method {
4325 "unwrap_or" => {
4326 if value.is_null() && !args.is_empty() {
4327 Ok(args[0].clone())
4328 } else {
4329 Ok(value.clone())
4330 }
4331 }
4332 "unwrap_or_default" => {
4333 if value.is_null() {
4334 Ok(json!(0))
4336 } else {
4337 Ok(value.clone())
4338 }
4339 }
4340 "is_some" => Ok(json!(!value.is_null())),
4341 "is_none" => Ok(json!(value.is_null())),
4342 "abs" => {
4343 if let Some(n) = value.as_i64() {
4344 Ok(json!(n.abs()))
4345 } else if let Some(n) = value.as_f64() {
4346 Ok(json!(n.abs()))
4347 } else {
4348 Err(format!("Cannot call abs() on {:?}", value).into())
4349 }
4350 }
4351 "len" => {
4352 if let Some(s) = value.as_str() {
4353 Ok(json!(s.len()))
4354 } else if let Some(arr) = value.as_array() {
4355 Ok(json!(arr.len()))
4356 } else if let Some(obj) = value.as_object() {
4357 Ok(json!(obj.len()))
4358 } else {
4359 Err(format!("Cannot call len() on {:?}", value).into())
4360 }
4361 }
4362 "to_string" => Ok(json!(value.to_string())),
4363 "min" => {
4364 if args.is_empty() {
4365 return Err("min() requires an argument".into());
4366 }
4367 let other = &args[0];
4368 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4369 Ok(json!(a.min(b)))
4370 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4371 Ok(json!(a.min(b)))
4372 } else {
4373 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4374 }
4375 }
4376 "max" => {
4377 if args.is_empty() {
4378 return Err("max() requires an argument".into());
4379 }
4380 let other = &args[0];
4381 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4382 Ok(json!(a.max(b)))
4383 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4384 Ok(json!(a.max(b)))
4385 } else {
4386 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4387 }
4388 }
4389 "saturating_add" => {
4390 if args.is_empty() {
4391 return Err("saturating_add() requires an argument".into());
4392 }
4393 let other = &args[0];
4394 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4395 Ok(json!(a.saturating_add(b)))
4396 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4397 Ok(json!(a.saturating_add(b)))
4398 } else {
4399 Err(format!(
4400 "Cannot call saturating_add() on {:?} and {:?}",
4401 value, other
4402 )
4403 .into())
4404 }
4405 }
4406 "saturating_sub" => {
4407 if args.is_empty() {
4408 return Err("saturating_sub() requires an argument".into());
4409 }
4410 let other = &args[0];
4411 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4412 Ok(json!(a.saturating_sub(b)))
4413 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4414 Ok(json!(a.saturating_sub(b)))
4415 } else {
4416 Err(format!(
4417 "Cannot call saturating_sub() on {:?} and {:?}",
4418 value, other
4419 )
4420 .into())
4421 }
4422 }
4423 _ => Err(format!("Unknown method call: {}()", method).into()),
4424 }
4425 }
4426
4427 pub fn evaluate_computed_fields_from_ast(
4430 &self,
4431 state: &mut Value,
4432 computed_field_specs: &[ComputedFieldSpec],
4433 ) -> Result<Vec<String>> {
4434 let mut updated_paths = Vec::new();
4435
4436 crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
4437
4438 for spec in computed_field_specs {
4439 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
4440 self.set_field_in_state(state, &spec.target_path, result)?;
4441 updated_paths.push(spec.target_path.clone());
4442 }
4443 }
4444
4445 Ok(updated_paths)
4446 }
4447
4448 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
4450 let segments: Vec<&str> = path.split('.').collect();
4451
4452 if segments.is_empty() {
4453 return Err("Empty path".into());
4454 }
4455
4456 let mut current = state;
4458 for (i, segment) in segments.iter().enumerate() {
4459 if i == segments.len() - 1 {
4460 if let Some(obj) = current.as_object_mut() {
4462 obj.insert(segment.to_string(), value);
4463 return Ok(());
4464 } else {
4465 return Err(format!("Cannot set field '{}' on non-object", segment).into());
4466 }
4467 } else {
4468 if !current.is_object() {
4470 *current = json!({});
4471 }
4472 let obj = current.as_object_mut().unwrap();
4473 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
4474 }
4475 }
4476
4477 Ok(())
4478 }
4479
4480 pub fn create_evaluator_from_specs(
4483 specs: Vec<ComputedFieldSpec>,
4484 ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
4485 move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
4486 let mut vm = VmContext::new();
4487 vm.current_context = Some(UpdateContext {
4488 slot: context_slot,
4489 timestamp: Some(context_timestamp),
4490 ..Default::default()
4491 });
4492 vm.evaluate_computed_fields_from_ast(state, &specs)?;
4493 Ok(())
4494 }
4495 }
4496}
4497
4498impl Default for VmContext {
4499 fn default() -> Self {
4500 Self::new()
4501 }
4502}
4503
4504impl crate::resolvers::ReverseLookupUpdater for VmContext {
4506 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
4507 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
4509 .unwrap_or_else(|e| {
4510 tracing::error!("Failed to update PDA reverse lookup: {}", e);
4511 Vec::new()
4512 })
4513 }
4514
4515 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
4516 self.flush_pending_updates(0, pda_address)
4518 .unwrap_or_else(|e| {
4519 tracing::error!("Failed to flush pending updates: {}", e);
4520 Vec::new()
4521 })
4522 }
4523}
4524
4525#[cfg(test)]
4526mod tests {
4527 use super::*;
4528 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
4529
4530 #[test]
4531 fn test_computed_field_preserves_integer_type() {
4532 let vm = VmContext::new();
4533
4534 let mut state = serde_json::json!({
4535 "trading": {
4536 "total_buy_volume": 20000000000_i64,
4537 "total_sell_volume": 17951316474_i64
4538 }
4539 });
4540
4541 let spec = ComputedFieldSpec {
4542 target_path: "trading.total_volume".to_string(),
4543 result_type: "Option<u64>".to_string(),
4544 expression: ComputedExpr::Binary {
4545 op: BinaryOp::Add,
4546 left: Box::new(ComputedExpr::UnwrapOr {
4547 expr: Box::new(ComputedExpr::FieldRef {
4548 path: "trading.total_buy_volume".to_string(),
4549 }),
4550 default: serde_json::json!(0),
4551 }),
4552 right: Box::new(ComputedExpr::UnwrapOr {
4553 expr: Box::new(ComputedExpr::FieldRef {
4554 path: "trading.total_sell_volume".to_string(),
4555 }),
4556 default: serde_json::json!(0),
4557 }),
4558 },
4559 };
4560
4561 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
4562 .unwrap();
4563
4564 let total_volume = state
4565 .get("trading")
4566 .and_then(|t| t.get("total_volume"))
4567 .expect("total_volume should exist");
4568
4569 let serialized = serde_json::to_string(total_volume).unwrap();
4570 assert!(
4571 !serialized.contains('.'),
4572 "Integer should not have decimal point: {}",
4573 serialized
4574 );
4575 assert_eq!(
4576 total_volume.as_i64(),
4577 Some(37951316474),
4578 "Value should be correct sum"
4579 );
4580 }
4581
4582 #[test]
4583 fn test_set_field_sum_preserves_integer_type() {
4584 let mut vm = VmContext::new();
4585 vm.registers[0] = serde_json::json!({});
4586 vm.registers[1] = serde_json::json!(20000000000_i64);
4587 vm.registers[2] = serde_json::json!(17951316474_i64);
4588
4589 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
4590 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
4591
4592 let state = &vm.registers[0];
4593 let buy_vol = state
4594 .get("trading")
4595 .and_then(|t| t.get("total_buy_volume"))
4596 .unwrap();
4597 let sell_vol = state
4598 .get("trading")
4599 .and_then(|t| t.get("total_sell_volume"))
4600 .unwrap();
4601
4602 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
4603 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
4604
4605 assert!(
4606 !buy_serialized.contains('.'),
4607 "Buy volume should not have decimal: {}",
4608 buy_serialized
4609 );
4610 assert!(
4611 !sell_serialized.contains('.'),
4612 "Sell volume should not have decimal: {}",
4613 sell_serialized
4614 );
4615 }
4616
4617 #[test]
4618 fn test_lookup_index_chaining() {
4619 let mut vm = VmContext::new();
4620
4621 let state = vm.states.get_mut(&0).unwrap();
4622
4623 state
4624 .pda_reverse_lookups
4625 .entry("default_pda_lookup".to_string())
4626 .or_insert_with(|| PdaReverseLookup::new(1000))
4627 .insert("pda_123".to_string(), "addr_456".to_string());
4628
4629 state
4630 .lookup_indexes
4631 .entry("round_address_lookup_index".to_string())
4632 .or_insert_with(LookupIndex::new)
4633 .insert(json!("addr_456"), json!(789));
4634
4635 let handler = vec![
4636 OpCode::LoadConstant {
4637 value: json!("pda_123"),
4638 dest: 0,
4639 },
4640 OpCode::LookupIndex {
4641 state_id: 0,
4642 index_name: "round_address_lookup_index".to_string(),
4643 lookup_value: 0,
4644 dest: 1,
4645 },
4646 ];
4647
4648 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4649 .unwrap();
4650
4651 assert_eq!(vm.registers[1], json!(789));
4652 }
4653
4654 #[test]
4655 fn test_lookup_index_no_chain() {
4656 let mut vm = VmContext::new();
4657
4658 let state = vm.states.get_mut(&0).unwrap();
4659 state
4660 .lookup_indexes
4661 .entry("test_index".to_string())
4662 .or_insert_with(LookupIndex::new)
4663 .insert(json!("key_abc"), json!(42));
4664
4665 let handler = vec![
4666 OpCode::LoadConstant {
4667 value: json!("key_abc"),
4668 dest: 0,
4669 },
4670 OpCode::LookupIndex {
4671 state_id: 0,
4672 index_name: "test_index".to_string(),
4673 lookup_value: 0,
4674 dest: 1,
4675 },
4676 ];
4677
4678 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4679 .unwrap();
4680
4681 assert_eq!(vm.registers[1], json!(42));
4682 }
4683
4684 #[test]
4685 fn test_conditional_set_field_with_zero_array() {
4686 let mut vm = VmContext::new();
4687
4688 let event_zeros = json!({
4689 "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4690 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
4691 });
4692
4693 let event_nonzero = json!({
4694 "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
4695 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
4696 });
4697
4698 let zero_32: Value = json!([
4699 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4700 0, 0, 0
4701 ]);
4702
4703 let handler = vec![
4704 OpCode::CreateObject { dest: 2 },
4705 OpCode::LoadEventField {
4706 path: FieldPath::new(&["value"]),
4707 dest: 10,
4708 default: None,
4709 },
4710 OpCode::ConditionalSetField {
4711 object: 2,
4712 path: "captured_value".to_string(),
4713 value: 10,
4714 condition_field: FieldPath::new(&["value"]),
4715 condition_op: ComparisonOp::NotEqual,
4716 condition_value: zero_32,
4717 },
4718 ];
4719
4720 vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
4721 .unwrap();
4722 assert!(
4723 vm.registers[2].get("captured_value").is_none(),
4724 "Field should not be set when value is all zeros"
4725 );
4726
4727 vm.reset_registers();
4728 vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
4729 .unwrap();
4730 assert!(
4731 vm.registers[2].get("captured_value").is_some(),
4732 "Field should be set when value is non-zero"
4733 );
4734 }
4735
4736 #[test]
4737 fn test_when_instruction_arrives_first() {
4738 let mut vm = VmContext::new();
4739
4740 let signature = "test_sig_123".to_string();
4741
4742 {
4743 let state = vm.states.get(&0).unwrap();
4744 let mut cache = state.recent_tx_instructions.lock().unwrap();
4745 let mut set = HashSet::new();
4746 set.insert("RevealIxState".to_string());
4747 cache.put(signature.clone(), set);
4748 }
4749
4750 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4751
4752 let handler = vec![
4753 OpCode::CreateObject { dest: 2 },
4754 OpCode::LoadConstant {
4755 value: json!("primary_key_value"),
4756 dest: 1,
4757 },
4758 OpCode::LoadConstant {
4759 value: json!("the_revealed_value"),
4760 dest: 10,
4761 },
4762 OpCode::SetFieldWhen {
4763 object: 2,
4764 path: "entropy_value".to_string(),
4765 value: 10,
4766 when_instruction: "RevealIxState".to_string(),
4767 entity_name: "TestEntity".to_string(),
4768 key_reg: 1,
4769 condition_field: None,
4770 condition_op: None,
4771 condition_value: None,
4772 },
4773 ];
4774
4775 vm.execute_handler(
4776 &handler,
4777 &json!({}),
4778 "VarState",
4779 0,
4780 "TestEntity",
4781 None,
4782 None,
4783 )
4784 .unwrap();
4785
4786 assert_eq!(
4787 vm.registers[2].get("entropy_value").unwrap(),
4788 "the_revealed_value",
4789 "Field should be set when instruction was already seen"
4790 );
4791 }
4792
4793 #[test]
4794 fn test_when_account_arrives_first() {
4795 let mut vm = VmContext::new();
4796
4797 let signature = "test_sig_456".to_string();
4798
4799 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4800
4801 let handler = vec![
4802 OpCode::CreateObject { dest: 2 },
4803 OpCode::LoadConstant {
4804 value: json!("pk_123"),
4805 dest: 1,
4806 },
4807 OpCode::LoadConstant {
4808 value: json!("deferred_value"),
4809 dest: 10,
4810 },
4811 OpCode::SetFieldWhen {
4812 object: 2,
4813 path: "entropy_value".to_string(),
4814 value: 10,
4815 when_instruction: "RevealIxState".to_string(),
4816 entity_name: "TestEntity".to_string(),
4817 key_reg: 1,
4818 condition_field: None,
4819 condition_op: None,
4820 condition_value: None,
4821 },
4822 ];
4823
4824 vm.execute_handler(
4825 &handler,
4826 &json!({}),
4827 "VarState",
4828 0,
4829 "TestEntity",
4830 None,
4831 None,
4832 )
4833 .unwrap();
4834
4835 assert!(
4836 vm.registers[2].get("entropy_value").is_none(),
4837 "Field should not be set when instruction hasn't been seen"
4838 );
4839
4840 let state = vm.states.get(&0).unwrap();
4841 let key = (signature.clone(), "RevealIxState".to_string());
4842 assert!(
4843 state.deferred_when_ops.contains_key(&key),
4844 "Operation should be queued"
4845 );
4846
4847 {
4848 let mut cache = state.recent_tx_instructions.lock().unwrap();
4849 let mut set = HashSet::new();
4850 set.insert("RevealIxState".to_string());
4851 cache.put(signature.clone(), set);
4852 }
4853
4854 let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
4855 for op in deferred {
4856 vm.apply_deferred_when_op(0, &op).unwrap();
4857 }
4858
4859 let state = vm.states.get(&0).unwrap();
4860 let entity = state.data.get(&json!("pk_123")).unwrap();
4861 assert_eq!(
4862 entity.get("entropy_value").unwrap(),
4863 "deferred_value",
4864 "Field should be set after instruction arrives"
4865 );
4866 }
4867
4868 #[test]
4869 fn test_when_cleanup_expired() {
4870 let mut vm = VmContext::new();
4871
4872 let state = vm.states.get(&0).unwrap();
4873 let key = ("old_sig".to_string(), "SomeIxState".to_string());
4874 state.deferred_when_ops.insert(
4875 key,
4876 vec![DeferredWhenOperation {
4877 entity_name: "Test".to_string(),
4878 primary_key: json!("pk"),
4879 field_path: "field".to_string(),
4880 field_value: json!("value"),
4881 when_instruction: "SomeIxState".to_string(),
4882 signature: "old_sig".to_string(),
4883 slot: 0,
4884 deferred_at: 0,
4885 emit: true,
4886 }],
4887 );
4888
4889 let removed = vm.cleanup_expired_when_ops(0, 60);
4890
4891 assert_eq!(removed, 1, "Should have removed 1 expired op");
4892 assert!(
4893 vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
4894 "Deferred ops should be empty after cleanup"
4895 );
4896 }
4897}