1use crate::ast::{
2 self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3 ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use serde_json::{json, Value};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::num::NonZeroUsize;
12
13#[cfg(feature = "otel")]
14use tracing::instrument;
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37 pub fn new(slot: u64, signature: String) -> Self {
39 Self {
40 slot: Some(slot),
41 signature: Some(signature),
42 timestamp: None,
43 write_version: None,
44 txn_index: None,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51 Self {
52 slot: Some(slot),
53 signature: Some(signature),
54 timestamp: Some(timestamp),
55 write_version: None,
56 txn_index: None,
57 metadata: HashMap::new(),
58 }
59 }
60
61 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63 Self {
64 slot: Some(slot),
65 signature: Some(signature),
66 timestamp: None,
67 write_version: Some(write_version),
68 txn_index: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75 Self {
76 slot: Some(slot),
77 signature: Some(signature),
78 timestamp: None,
79 write_version: None,
80 txn_index: Some(txn_index),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn timestamp(&self) -> i64 {
87 self.timestamp.unwrap_or_else(|| {
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .unwrap()
91 .as_secs() as i64
92 })
93 }
94
95 pub fn empty() -> Self {
97 Self::default()
98 }
99
100 pub fn is_account_update(&self) -> bool {
103 self.write_version.is_some() && self.txn_index.is_none()
104 }
105
106 pub fn is_instruction_update(&self) -> bool {
108 self.txn_index.is_some() && self.write_version.is_none()
109 }
110
111 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
112 self.metadata.insert(key, value);
113 self
114 }
115
116 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
118 self.metadata.get(key)
119 }
120
121 pub fn to_value(&self) -> Value {
123 let mut obj = serde_json::Map::new();
124 if let Some(slot) = self.slot {
125 obj.insert("slot".to_string(), json!(slot));
126 }
127 if let Some(ref sig) = self.signature {
128 obj.insert("signature".to_string(), json!(sig));
129 }
130 obj.insert("timestamp".to_string(), json!(self.timestamp()));
132 for (key, value) in &self.metadata {
133 obj.insert(key.clone(), value.clone());
134 }
135 Value::Object(obj)
136 }
137}
138
139pub type Register = usize;
140pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
141
142pub type RegisterValue = Value;
143
144pub trait ComputedFieldsEvaluator {
147 fn evaluate(&self, state: &mut Value) -> Result<()>;
148}
149
150const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
152const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
153const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
158
159const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
161const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
162
163const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
164
165const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
166
167const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
170
171const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
172
173const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
174
175const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 5_000;
176
177fn estimate_json_size(value: &Value) -> usize {
179 match value {
180 Value::Null => 4,
181 Value::Bool(_) => 5,
182 Value::Number(_) => 8,
183 Value::String(s) => s.len() + 2,
184 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
185 Value::Object(obj) => {
186 2 + obj
187 .iter()
188 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
189 .sum::<usize>()
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
195pub struct CompiledPath {
196 pub segments: std::sync::Arc<[String]>,
197}
198
199impl CompiledPath {
200 pub fn new(path: &str) -> Self {
201 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
202 CompiledPath {
203 segments: segments.into(),
204 }
205 }
206
207 fn segments(&self) -> &[String] {
208 &self.segments
209 }
210}
211
212#[derive(Debug, Clone)]
215pub enum FieldChange {
216 Replaced,
218 Appended(Vec<Value>),
220}
221
222#[derive(Debug, Clone, Default)]
225pub struct DirtyTracker {
226 changes: HashMap<String, FieldChange>,
227}
228
229impl DirtyTracker {
230 pub fn new() -> Self {
232 Self {
233 changes: HashMap::new(),
234 }
235 }
236
237 pub fn mark_replaced(&mut self, path: &str) {
239 self.changes.insert(path.to_string(), FieldChange::Replaced);
241 }
242
243 pub fn mark_appended(&mut self, path: &str, value: Value) {
245 match self.changes.get_mut(path) {
246 Some(FieldChange::Appended(values)) => {
247 values.push(value);
249 }
250 Some(FieldChange::Replaced) => {
251 }
254 None => {
255 self.changes
257 .insert(path.to_string(), FieldChange::Appended(vec![value]));
258 }
259 }
260 }
261
262 pub fn is_empty(&self) -> bool {
264 self.changes.is_empty()
265 }
266
267 pub fn len(&self) -> usize {
269 self.changes.len()
270 }
271
272 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
274 self.changes.iter()
275 }
276
277 pub fn dirty_paths(&self) -> HashSet<String> {
279 self.changes.keys().cloned().collect()
280 }
281
282 pub fn into_changes(self) -> HashMap<String, FieldChange> {
284 self.changes
285 }
286
287 pub fn changes(&self) -> &HashMap<String, FieldChange> {
289 &self.changes
290 }
291
292 pub fn appended_paths(&self) -> Vec<String> {
294 self.changes
295 .iter()
296 .filter_map(|(path, change)| match change {
297 FieldChange::Appended(_) => Some(path.clone()),
298 FieldChange::Replaced => None,
299 })
300 .collect()
301 }
302}
303
304pub struct VmContext {
305 registers: Vec<RegisterValue>,
306 states: HashMap<u32, StateTable>,
307 pub instructions_executed: u64,
308 pub cache_hits: u64,
309 path_cache: HashMap<String, CompiledPath>,
310 pub pda_cache_hits: u64,
311 pub pda_cache_misses: u64,
312 pub pending_queue_size: u64,
313 resolver_requests: VecDeque<ResolverRequest>,
314 resolver_pending: HashMap<String, PendingResolverEntry>,
315 resolver_cache: LruCache<String, Value>,
316 pub resolver_cache_hits: u64,
317 pub resolver_cache_misses: u64,
318 current_context: Option<UpdateContext>,
319 warnings: Vec<String>,
320 last_pda_lookup_miss: Option<String>,
321 last_lookup_index_miss: Option<String>,
322 last_pda_registered: Option<String>,
323 last_lookup_index_keys: Vec<String>,
324 scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
325}
326
327#[derive(Debug)]
328pub struct LookupIndex {
329 index: std::sync::Mutex<LruCache<String, Value>>,
330}
331
332impl LookupIndex {
333 pub fn new() -> Self {
334 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
335 }
336
337 pub fn with_capacity(capacity: usize) -> Self {
338 LookupIndex {
339 index: std::sync::Mutex::new(LruCache::new(
340 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
341 )),
342 }
343 }
344
345 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
346 let key = value_to_cache_key(lookup_value);
347 self.index.lock().unwrap().get(&key).cloned()
348 }
349
350 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
351 let key = value_to_cache_key(&lookup_value);
352 self.index.lock().unwrap().put(key, primary_key);
353 }
354
355 pub fn len(&self) -> usize {
356 self.index.lock().unwrap().len()
357 }
358
359 pub fn is_empty(&self) -> bool {
360 self.index.lock().unwrap().is_empty()
361 }
362}
363
364impl Default for LookupIndex {
365 fn default() -> Self {
366 Self::new()
367 }
368}
369
370fn value_to_cache_key(value: &Value) -> String {
371 match value {
372 Value::String(s) => s.clone(),
373 Value::Number(n) => n.to_string(),
374 Value::Bool(b) => b.to_string(),
375 Value::Null => "null".to_string(),
376 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
377 }
378}
379
380fn resolver_type_key(resolver: &ResolverType) -> String {
381 match resolver {
382 ResolverType::Token => "token".to_string(),
383 ResolverType::Url(config) => match &config.url_source {
384 ast::UrlSource::FieldPath(path) => format!("url:{}", path),
385 ast::UrlSource::Template(parts) => {
386 let key: String = parts
387 .iter()
388 .map(|p| match p {
389 ast::UrlTemplatePart::Literal(s) => s.clone(),
390 ast::UrlTemplatePart::FieldRef(f) => format!("{{{}}}", f),
391 })
392 .collect();
393 format!("url:{}", key)
394 }
395 },
396 }
397}
398
399fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
400 format!(
401 "{}:{}",
402 resolver_type_key(resolver),
403 value_to_cache_key(input)
404 )
405}
406
407#[derive(Debug)]
408pub struct TemporalIndex {
409 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
410}
411
412impl Default for TemporalIndex {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418impl TemporalIndex {
419 pub fn new() -> Self {
420 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
421 }
422
423 pub fn with_capacity(capacity: usize) -> Self {
424 TemporalIndex {
425 index: std::sync::Mutex::new(LruCache::new(
426 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
427 )),
428 }
429 }
430
431 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
432 let key = value_to_cache_key(lookup_value);
433 let mut cache = self.index.lock().unwrap();
434 if let Some(entries) = cache.get(&key) {
435 for i in (0..entries.len()).rev() {
436 if entries[i].1 <= timestamp {
437 return Some(entries[i].0.clone());
438 }
439 }
440 }
441 None
442 }
443
444 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
445 let key = value_to_cache_key(lookup_value);
446 let mut cache = self.index.lock().unwrap();
447 if let Some(entries) = cache.get(&key) {
448 if let Some(last) = entries.last() {
449 return Some(last.0.clone());
450 }
451 }
452 None
453 }
454
455 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
456 let key = value_to_cache_key(&lookup_value);
457 let mut cache = self.index.lock().unwrap();
458
459 let entries = cache.get_or_insert_mut(key, Vec::new);
460 entries.push((primary_key, timestamp));
461 entries.sort_by_key(|(_, ts)| *ts);
462
463 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
464 entries.retain(|(_, ts)| *ts >= cutoff);
465
466 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
467 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
468 entries.drain(0..excess);
469 }
470 }
471
472 pub fn len(&self) -> usize {
473 self.index.lock().unwrap().len()
474 }
475
476 pub fn is_empty(&self) -> bool {
477 self.index.lock().unwrap().is_empty()
478 }
479
480 pub fn total_entries(&self) -> usize {
481 self.index
482 .lock()
483 .unwrap()
484 .iter()
485 .map(|(_, entries)| entries.len())
486 .sum()
487 }
488
489 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
490 let mut cache = self.index.lock().unwrap();
491 let mut total_removed = 0;
492
493 for (_, entries) in cache.iter_mut() {
494 let original_len = entries.len();
495 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
496 total_removed += original_len - entries.len();
497 }
498
499 total_removed
500 }
501}
502
503#[derive(Debug)]
504pub struct PdaReverseLookup {
505 index: LruCache<String, String>,
507}
508
509impl PdaReverseLookup {
510 pub fn new(capacity: usize) -> Self {
511 PdaReverseLookup {
512 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
513 }
514 }
515
516 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
517 self.index.get(pda_address).cloned()
518 }
519
520 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
521 let evicted = if self.index.len() >= self.index.cap().get() {
522 self.index.peek_lru().map(|(k, _)| k.clone())
523 } else {
524 None
525 };
526
527 self.index.put(pda_address, seed_value);
528 evicted
529 }
530
531 pub fn len(&self) -> usize {
532 self.index.len()
533 }
534
535 pub fn is_empty(&self) -> bool {
536 self.index.is_empty()
537 }
538
539 pub fn contains(&self, pda_address: &str) -> bool {
540 self.index.peek(pda_address).is_some()
541 }
542}
543
544#[derive(Debug, Clone)]
546pub struct QueuedAccountUpdate {
547 pub pda_address: String,
548 pub account_type: String,
549 pub account_data: Value,
550 pub slot: u64,
551 pub write_version: u64,
552 pub signature: String,
553}
554
555#[derive(Debug, Clone)]
557pub struct PendingAccountUpdate {
558 pub account_type: String,
559 pub pda_address: String,
560 pub account_data: Value,
561 pub slot: u64,
562 pub write_version: u64,
563 pub signature: String,
564 pub queued_at: i64,
565}
566
567#[derive(Debug, Clone)]
569pub struct QueuedInstructionEvent {
570 pub pda_address: String,
571 pub event_type: String,
572 pub event_data: Value,
573 pub slot: u64,
574 pub signature: String,
575}
576
577#[derive(Debug, Clone)]
579pub struct PendingInstructionEvent {
580 pub event_type: String,
581 pub pda_address: String,
582 pub event_data: Value,
583 pub slot: u64,
584 pub signature: String,
585 pub queued_at: i64,
586}
587
588#[derive(Debug, Clone)]
589pub struct DeferredWhenOperation {
590 pub entity_name: String,
591 pub primary_key: Value,
592 pub field_path: String,
593 pub field_value: Value,
594 pub when_instruction: String,
595 pub signature: String,
596 pub slot: u64,
597 pub deferred_at: i64,
598 pub emit: bool,
599}
600
601#[derive(Debug, Clone)]
602pub struct ResolverRequest {
603 pub cache_key: String,
604 pub resolver: ResolverType,
605 pub input: Value,
606}
607
608#[derive(Debug, Clone)]
609pub struct ResolverTarget {
610 pub state_id: u32,
611 pub entity_name: String,
612 pub primary_key: Value,
613 pub extracts: Vec<ResolverExtractSpec>,
614}
615
616#[derive(Debug, Clone)]
617pub struct PendingResolverEntry {
618 pub resolver: ResolverType,
619 pub input: Value,
620 pub targets: Vec<ResolverTarget>,
621 pub queued_at: i64,
622}
623
624#[derive(Debug, Clone, PartialEq)]
625pub struct ScheduledCallback {
626 pub state_id: u32,
627 pub entity_name: String,
628 pub primary_key: Value,
629 pub resolver: ResolverType,
630 pub url_template: Option<Vec<ast::UrlTemplatePart>>,
631 pub input_value: Option<Value>,
632 pub input_path: Option<String>,
633 pub condition: Option<ast::ResolverCondition>,
634 pub strategy: ResolveStrategy,
635 pub extracts: Vec<ResolverExtractSpec>,
636 pub retry_count: u32,
637}
638
639impl PendingResolverEntry {
640 fn add_target(&mut self, target: ResolverTarget) {
641 if let Some(existing) = self.targets.iter_mut().find(|t| {
642 t.state_id == target.state_id
643 && t.entity_name == target.entity_name
644 && t.primary_key == target.primary_key
645 }) {
646 let mut seen = HashSet::new();
647 for extract in &existing.extracts {
648 seen.insert((extract.target_path.clone(), extract.source_path.clone()));
649 }
650
651 for extract in target.extracts {
652 let key = (extract.target_path.clone(), extract.source_path.clone());
653 if seen.insert(key) {
654 existing.extracts.push(extract);
655 }
656 }
657 } else {
658 self.targets.push(target);
659 }
660 }
661}
662
663#[derive(Debug, Clone)]
664pub struct PendingQueueStats {
665 pub total_updates: usize,
666 pub unique_pdas: usize,
667 pub oldest_age_seconds: i64,
668 pub largest_pda_queue_size: usize,
669 pub estimated_memory_bytes: usize,
670}
671
672#[derive(Debug, Clone, Default)]
673pub struct VmMemoryStats {
674 pub state_table_entity_count: usize,
675 pub state_table_max_entries: usize,
676 pub state_table_at_capacity: bool,
677 pub lookup_index_count: usize,
678 pub lookup_index_total_entries: usize,
679 pub temporal_index_count: usize,
680 pub temporal_index_total_entries: usize,
681 pub pda_reverse_lookup_count: usize,
682 pub pda_reverse_lookup_total_entries: usize,
683 pub version_tracker_entries: usize,
684 pub pending_queue_stats: Option<PendingQueueStats>,
685 pub path_cache_size: usize,
686}
687
688#[derive(Debug, Clone, Default)]
689pub struct CleanupResult {
690 pub pending_updates_removed: usize,
691 pub temporal_entries_removed: usize,
692}
693
694#[derive(Debug, Clone)]
695pub struct CapacityWarning {
696 pub current_entries: usize,
697 pub max_entries: usize,
698 pub entries_over_limit: usize,
699}
700
701#[derive(Debug, Clone)]
702pub struct StateTableConfig {
703 pub max_entries: usize,
704 pub max_array_length: usize,
705}
706
707impl Default for StateTableConfig {
708 fn default() -> Self {
709 Self {
710 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
711 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
712 }
713 }
714}
715
716#[derive(Debug)]
717pub struct VersionTracker {
718 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
719}
720
721impl VersionTracker {
722 pub fn new() -> Self {
723 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
724 }
725
726 pub fn with_capacity(capacity: usize) -> Self {
727 VersionTracker {
728 cache: std::sync::Mutex::new(LruCache::new(
729 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
730 )),
731 }
732 }
733
734 fn make_key(primary_key: &Value, event_type: &str) -> String {
735 format!("{}:{}", primary_key, event_type)
736 }
737
738 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
739 let key = Self::make_key(primary_key, event_type);
740 self.cache.lock().unwrap().get(&key).copied()
741 }
742
743 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
744 let key = Self::make_key(primary_key, event_type);
745 self.cache.lock().unwrap().put(key, (slot, ordering_value));
746 }
747
748 pub fn len(&self) -> usize {
749 self.cache.lock().unwrap().len()
750 }
751
752 pub fn is_empty(&self) -> bool {
753 self.cache.lock().unwrap().is_empty()
754 }
755}
756
757impl Default for VersionTracker {
758 fn default() -> Self {
759 Self::new()
760 }
761}
762
763#[derive(Debug)]
764pub struct StateTable {
765 pub data: DashMap<Value, Value>,
766 access_times: DashMap<Value, i64>,
767 pub lookup_indexes: HashMap<String, LookupIndex>,
768 pub temporal_indexes: HashMap<String, TemporalIndex>,
769 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
770 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
771 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
772 version_tracker: VersionTracker,
773 instruction_dedup_cache: VersionTracker,
774 config: StateTableConfig,
775 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
776 entity_name: String,
777 pub recent_tx_instructions:
778 std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
779 pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
780}
781
782impl StateTable {
783 pub fn is_at_capacity(&self) -> bool {
784 self.data.len() >= self.config.max_entries
785 }
786
787 pub fn entries_over_limit(&self) -> usize {
788 self.data.len().saturating_sub(self.config.max_entries)
789 }
790
791 pub fn max_array_length(&self) -> usize {
792 self.config.max_array_length
793 }
794
795 fn touch(&self, key: &Value) {
796 let now = std::time::SystemTime::now()
797 .duration_since(std::time::UNIX_EPOCH)
798 .unwrap()
799 .as_secs() as i64;
800 self.access_times.insert(key.clone(), now);
801 }
802
803 fn evict_lru(&self, count: usize) -> usize {
804 if count == 0 || self.data.is_empty() {
805 return 0;
806 }
807
808 let mut entries: Vec<(Value, i64)> = self
809 .access_times
810 .iter()
811 .map(|entry| (entry.key().clone(), *entry.value()))
812 .collect();
813
814 entries.sort_by_key(|(_, ts)| *ts);
815
816 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
817
818 let mut evicted = 0;
819 for key in to_evict {
820 self.data.remove(&key);
821 self.access_times.remove(&key);
822 evicted += 1;
823 }
824
825 #[cfg(feature = "otel")]
826 if evicted > 0 {
827 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
828 }
829
830 evicted
831 }
832
833 pub fn insert_with_eviction(&self, key: Value, value: Value) {
834 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
835 #[cfg(feature = "otel")]
836 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
837 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
838 self.evict_lru(to_evict.max(1));
839 }
840 self.data.insert(key.clone(), value);
841 self.touch(&key);
842 }
843
844 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
845 let result = self.data.get(key).map(|v| v.clone());
846 if result.is_some() {
847 self.touch(key);
848 }
849 result
850 }
851
852 pub fn is_fresh_update(
859 &self,
860 primary_key: &Value,
861 event_type: &str,
862 slot: u64,
863 ordering_value: u64,
864 ) -> bool {
865 let dominated = self
866 .version_tracker
867 .get(primary_key, event_type)
868 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
869 .unwrap_or(false);
870
871 if dominated {
872 return false;
873 }
874
875 self.version_tracker
876 .insert(primary_key, event_type, slot, ordering_value);
877 true
878 }
879
880 pub fn is_duplicate_instruction(
888 &self,
889 primary_key: &Value,
890 event_type: &str,
891 slot: u64,
892 txn_index: u64,
893 ) -> bool {
894 let is_duplicate = self
896 .instruction_dedup_cache
897 .get(primary_key, event_type)
898 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
899 .unwrap_or(false);
900
901 if is_duplicate {
902 return true;
903 }
904
905 self.instruction_dedup_cache
907 .insert(primary_key, event_type, slot, txn_index);
908 false
909 }
910}
911
912impl VmContext {
913 pub fn new() -> Self {
914 let mut vm = VmContext {
915 registers: vec![Value::Null; 256],
916 states: HashMap::new(),
917 instructions_executed: 0,
918 cache_hits: 0,
919 path_cache: HashMap::new(),
920 pda_cache_hits: 0,
921 pda_cache_misses: 0,
922 pending_queue_size: 0,
923 resolver_requests: VecDeque::new(),
924 resolver_pending: HashMap::new(),
925 resolver_cache: LruCache::new(
926 NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
927 .expect("capacity must be > 0"),
928 ),
929 resolver_cache_hits: 0,
930 resolver_cache_misses: 0,
931 current_context: None,
932 warnings: Vec::new(),
933 last_pda_lookup_miss: None,
934 last_lookup_index_miss: None,
935 last_pda_registered: None,
936 last_lookup_index_keys: Vec::new(),
937 scheduled_callbacks: Vec::new(),
938 };
939 vm.states.insert(
940 0,
941 StateTable {
942 data: DashMap::new(),
943 access_times: DashMap::new(),
944 lookup_indexes: HashMap::new(),
945 temporal_indexes: HashMap::new(),
946 pda_reverse_lookups: HashMap::new(),
947 pending_updates: DashMap::new(),
948 pending_instruction_events: DashMap::new(),
949 version_tracker: VersionTracker::new(),
950 instruction_dedup_cache: VersionTracker::with_capacity(
951 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
952 ),
953 config: StateTableConfig::default(),
954 entity_name: "default".to_string(),
955 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
956 NonZeroUsize::new(1000).unwrap(),
957 )),
958 deferred_when_ops: DashMap::new(),
959 },
960 );
961 vm
962 }
963
964 pub fn new_with_config(state_config: StateTableConfig) -> Self {
965 let mut vm = VmContext {
966 registers: vec![Value::Null; 256],
967 states: HashMap::new(),
968 instructions_executed: 0,
969 cache_hits: 0,
970 path_cache: HashMap::new(),
971 pda_cache_hits: 0,
972 pda_cache_misses: 0,
973 pending_queue_size: 0,
974 resolver_requests: VecDeque::new(),
975 resolver_pending: HashMap::new(),
976 resolver_cache: LruCache::new(
977 NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
978 .expect("capacity must be > 0"),
979 ),
980 resolver_cache_hits: 0,
981 resolver_cache_misses: 0,
982 current_context: None,
983 warnings: Vec::new(),
984 last_pda_lookup_miss: None,
985 last_lookup_index_miss: None,
986 last_pda_registered: None,
987 last_lookup_index_keys: Vec::new(),
988 scheduled_callbacks: Vec::new(),
989 };
990 vm.states.insert(
991 0,
992 StateTable {
993 data: DashMap::new(),
994 access_times: DashMap::new(),
995 lookup_indexes: HashMap::new(),
996 temporal_indexes: HashMap::new(),
997 pda_reverse_lookups: HashMap::new(),
998 pending_updates: DashMap::new(),
999 pending_instruction_events: DashMap::new(),
1000 version_tracker: VersionTracker::new(),
1001 instruction_dedup_cache: VersionTracker::with_capacity(
1002 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1003 ),
1004 config: state_config,
1005 entity_name: "default".to_string(),
1006 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1007 NonZeroUsize::new(1000).unwrap(),
1008 )),
1009 deferred_when_ops: DashMap::new(),
1010 },
1011 );
1012 vm
1013 }
1014
1015 pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1016 self.resolver_requests.drain(..).collect()
1017 }
1018
1019 pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1020 std::mem::take(&mut self.scheduled_callbacks)
1021 }
1022
1023 pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1024 self.states.get(&state_id)?.get_and_touch(key)
1025 }
1026
1027 pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1028 if requests.is_empty() {
1029 return;
1030 }
1031
1032 self.resolver_requests.extend(requests);
1033 }
1034
1035 pub fn apply_resolver_result(
1036 &mut self,
1037 bytecode: &MultiEntityBytecode,
1038 cache_key: &str,
1039 resolved_value: Value,
1040 ) -> Result<Vec<Mutation>> {
1041 self.resolver_cache
1042 .put(cache_key.to_string(), resolved_value.clone());
1043
1044 let entry = match self.resolver_pending.remove(cache_key) {
1045 Some(entry) => entry,
1046 None => return Ok(Vec::new()),
1047 };
1048
1049 let mut mutations = Vec::new();
1050
1051 for target in entry.targets {
1052 if target.primary_key.is_null() {
1053 continue;
1054 }
1055
1056 let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1057 Some(bc) => bc,
1058 None => continue,
1059 };
1060
1061 let state = match self.states.get(&target.state_id) {
1062 Some(s) => s,
1063 None => continue,
1064 };
1065
1066 let mut entity_state = state
1067 .get_and_touch(&target.primary_key)
1068 .unwrap_or_else(|| json!({}));
1069
1070 let mut dirty_tracker = DirtyTracker::new();
1071 let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1072
1073 Self::apply_resolver_extractions_to_value(
1074 &mut entity_state,
1075 &resolved_value,
1076 &target.extracts,
1077 &mut dirty_tracker,
1078 &should_emit,
1079 )?;
1080
1081 if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1082 let old_values: Vec<_> = entity_bytecode
1083 .computed_paths
1084 .iter()
1085 .map(|path| Self::get_value_at_path(&entity_state, path))
1086 .collect();
1087
1088 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1089 let context_timestamp = self
1090 .current_context
1091 .as_ref()
1092 .map(|c| c.timestamp())
1093 .unwrap_or_else(|| {
1094 std::time::SystemTime::now()
1095 .duration_since(std::time::UNIX_EPOCH)
1096 .unwrap()
1097 .as_secs() as i64
1098 });
1099 let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1100 if eval_result.is_ok() {
1101 for (path, old_value) in
1102 entity_bytecode.computed_paths.iter().zip(old_values.iter())
1103 {
1104 let new_value = Self::get_value_at_path(&entity_state, path);
1105 if new_value != *old_value && should_emit(path) {
1106 dirty_tracker.mark_replaced(path);
1107 }
1108 }
1109 }
1110 }
1111
1112 state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1113
1114 if dirty_tracker.is_empty() {
1115 continue;
1116 }
1117
1118 let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1119
1120 mutations.push(Mutation {
1121 export: target.entity_name.clone(),
1122 key: target.primary_key.clone(),
1123 patch,
1124 append: vec![],
1125 });
1126 }
1127
1128 Ok(mutations)
1129 }
1130
1131 pub fn enqueue_resolver_request(
1132 &mut self,
1133 cache_key: String,
1134 resolver: ResolverType,
1135 input: Value,
1136 target: ResolverTarget,
1137 ) {
1138 if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1139 entry.add_target(target);
1140 return;
1141 }
1142
1143 let queued_at = std::time::SystemTime::now()
1144 .duration_since(std::time::UNIX_EPOCH)
1145 .unwrap()
1146 .as_secs() as i64;
1147
1148 self.resolver_pending.insert(
1149 cache_key.clone(),
1150 PendingResolverEntry {
1151 resolver: resolver.clone(),
1152 input: input.clone(),
1153 targets: vec![target],
1154 queued_at,
1155 },
1156 );
1157
1158 self.resolver_requests.push_back(ResolverRequest {
1159 cache_key,
1160 resolver,
1161 input,
1162 });
1163 }
1164
1165 fn apply_resolver_extractions_to_value<F>(
1166 state: &mut Value,
1167 resolved_value: &Value,
1168 extracts: &[ResolverExtractSpec],
1169 dirty_tracker: &mut DirtyTracker,
1170 should_emit: &F,
1171 ) -> Result<()>
1172 where
1173 F: Fn(&str) -> bool,
1174 {
1175 for extract in extracts {
1176 let resolved = match extract.source_path.as_deref() {
1177 Some(path) => Self::get_value_at_path(resolved_value, path),
1178 None => Some(resolved_value.clone()),
1179 };
1180
1181 let Some(value) = resolved else {
1182 continue;
1183 };
1184
1185 let value = if let Some(transform) = &extract.transform {
1186 Self::apply_transformation(&value, transform)?
1187 } else {
1188 value
1189 };
1190
1191 Self::set_nested_field_value(state, &extract.target_path, value)?;
1192 if should_emit(&extract.target_path) {
1193 dirty_tracker.mark_replaced(&extract.target_path);
1194 }
1195 }
1196
1197 Ok(())
1198 }
1199
1200 fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1201 if tracker.is_empty() {
1202 return Ok(json!({}));
1203 }
1204
1205 let mut partial = serde_json::Map::new();
1206
1207 for (path, change) in tracker.iter() {
1208 let segments: Vec<&str> = path.split('.').collect();
1209
1210 let value_to_insert = match change {
1211 FieldChange::Replaced => {
1212 let mut current = state;
1213 let mut found = true;
1214
1215 for segment in &segments {
1216 match current.get(*segment) {
1217 Some(v) => current = v,
1218 None => {
1219 found = false;
1220 break;
1221 }
1222 }
1223 }
1224
1225 if !found {
1226 continue;
1227 }
1228 current.clone()
1229 }
1230 FieldChange::Appended(values) => Value::Array(values.clone()),
1231 };
1232
1233 let mut target = &mut partial;
1234 for (i, segment) in segments.iter().enumerate() {
1235 if i == segments.len() - 1 {
1236 target.insert(segment.to_string(), value_to_insert.clone());
1237 } else {
1238 target
1239 .entry(segment.to_string())
1240 .or_insert_with(|| json!({}));
1241 target = target
1242 .get_mut(*segment)
1243 .and_then(|v| v.as_object_mut())
1244 .ok_or("Failed to build nested structure")?;
1245 }
1246 }
1247 }
1248
1249 Ok(Value::Object(partial))
1250 }
1251
1252 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1255 self.states.get_mut(&state_id)
1256 }
1257
1258 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1260 &mut self.registers
1261 }
1262
1263 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1265 &self.path_cache
1266 }
1267
1268 pub fn current_context(&self) -> Option<&UpdateContext> {
1270 self.current_context.as_ref()
1271 }
1272
1273 pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1275 self.current_context = context;
1276 }
1277
1278 fn add_warning(&mut self, msg: String) {
1279 self.warnings.push(msg);
1280 }
1281
1282 pub fn take_warnings(&mut self) -> Vec<String> {
1283 std::mem::take(&mut self.warnings)
1284 }
1285
1286 pub fn has_warnings(&self) -> bool {
1287 !self.warnings.is_empty()
1288 }
1289
1290 pub fn update_state_from_register(
1291 &mut self,
1292 state_id: u32,
1293 key: Value,
1294 register: Register,
1295 ) -> Result<()> {
1296 let state = self.states.get(&state_id).ok_or("State table not found")?;
1297 let value = self.registers[register].clone();
1298 state.insert_with_eviction(key, value);
1299 Ok(())
1300 }
1301
1302 fn reset_registers(&mut self) {
1303 for reg in &mut self.registers {
1304 *reg = Value::Null;
1305 }
1306 }
1307
1308 pub fn extract_partial_state(
1310 &self,
1311 state_reg: Register,
1312 dirty_fields: &HashSet<String>,
1313 ) -> Result<Value> {
1314 let full_state = &self.registers[state_reg];
1315
1316 if dirty_fields.is_empty() {
1317 return Ok(json!({}));
1318 }
1319
1320 let mut partial = serde_json::Map::new();
1321
1322 for path in dirty_fields {
1323 let segments: Vec<&str> = path.split('.').collect();
1324
1325 let mut current = full_state;
1326 let mut found = true;
1327
1328 for segment in &segments {
1329 match current.get(segment) {
1330 Some(v) => current = v,
1331 None => {
1332 found = false;
1333 break;
1334 }
1335 }
1336 }
1337
1338 if !found {
1339 continue;
1340 }
1341
1342 let mut target = &mut partial;
1343 for (i, segment) in segments.iter().enumerate() {
1344 if i == segments.len() - 1 {
1345 target.insert(segment.to_string(), current.clone());
1346 } else {
1347 target
1348 .entry(segment.to_string())
1349 .or_insert_with(|| json!({}));
1350 target = target
1351 .get_mut(*segment)
1352 .and_then(|v| v.as_object_mut())
1353 .ok_or("Failed to build nested structure")?;
1354 }
1355 }
1356 }
1357
1358 Ok(Value::Object(partial))
1359 }
1360
1361 pub fn extract_partial_state_with_tracker(
1365 &self,
1366 state_reg: Register,
1367 tracker: &DirtyTracker,
1368 ) -> Result<Value> {
1369 let full_state = &self.registers[state_reg];
1370
1371 if tracker.is_empty() {
1372 return Ok(json!({}));
1373 }
1374
1375 let mut partial = serde_json::Map::new();
1376
1377 for (path, change) in tracker.iter() {
1378 let segments: Vec<&str> = path.split('.').collect();
1379
1380 let value_to_insert = match change {
1381 FieldChange::Replaced => {
1382 let mut current = full_state;
1383 let mut found = true;
1384
1385 for segment in &segments {
1386 match current.get(*segment) {
1387 Some(v) => current = v,
1388 None => {
1389 found = false;
1390 break;
1391 }
1392 }
1393 }
1394
1395 if !found {
1396 continue;
1397 }
1398 current.clone()
1399 }
1400 FieldChange::Appended(values) => Value::Array(values.clone()),
1401 };
1402
1403 let mut target = &mut partial;
1404 for (i, segment) in segments.iter().enumerate() {
1405 if i == segments.len() - 1 {
1406 target.insert(segment.to_string(), value_to_insert.clone());
1407 } else {
1408 target
1409 .entry(segment.to_string())
1410 .or_insert_with(|| json!({}));
1411 target = target
1412 .get_mut(*segment)
1413 .and_then(|v| v.as_object_mut())
1414 .ok_or("Failed to build nested structure")?;
1415 }
1416 }
1417 }
1418
1419 Ok(Value::Object(partial))
1420 }
1421
1422 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1423 if let Some(compiled) = self.path_cache.get(path) {
1424 self.cache_hits += 1;
1425 #[cfg(feature = "otel")]
1426 crate::vm_metrics::record_path_cache_hit();
1427 return compiled.clone();
1428 }
1429 #[cfg(feature = "otel")]
1430 crate::vm_metrics::record_path_cache_miss();
1431 let compiled = CompiledPath::new(path);
1432 self.path_cache.insert(path.to_string(), compiled.clone());
1433 compiled
1434 }
1435
1436 #[cfg_attr(feature = "otel", instrument(
1438 name = "vm.process_event",
1439 skip(self, bytecode, event_value, log),
1440 level = "info",
1441 fields(
1442 event_type = %event_type,
1443 slot = context.as_ref().and_then(|c| c.slot),
1444 )
1445 ))]
1446 pub fn process_event(
1447 &mut self,
1448 bytecode: &MultiEntityBytecode,
1449 event_value: Value,
1450 event_type: &str,
1451 context: Option<&UpdateContext>,
1452 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1453 ) -> Result<Vec<Mutation>> {
1454 self.current_context = context.cloned();
1455
1456 let mut event_value = event_value;
1457 if let Some(ctx) = context {
1458 if let Some(obj) = event_value.as_object_mut() {
1459 obj.insert("__update_context".to_string(), ctx.to_value());
1460 }
1461 }
1462
1463 let mut all_mutations = Vec::new();
1464
1465 if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1466 if let Some(ctx) = context {
1467 if let Some(signature) = ctx.signature.clone() {
1468 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1469 for state_id in state_ids {
1470 if let Some(state) = self.states.get(&state_id) {
1471 {
1472 let mut cache = state.recent_tx_instructions.lock().unwrap();
1473 let entry =
1474 cache.get_or_insert_mut(signature.clone(), HashSet::new);
1475 entry.insert(event_type.to_string());
1476 }
1477
1478 let key = (signature.clone(), event_type.to_string());
1479 if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1480 tracing::debug!(
1481 event_type = %event_type,
1482 signature = %signature,
1483 deferred_count = deferred_ops.len(),
1484 "flushing deferred when-ops"
1485 );
1486 for op in deferred_ops {
1487 match self.apply_deferred_when_op(state_id, &op) {
1488 Ok(mutations) => all_mutations.extend(mutations),
1489 Err(e) => tracing::warn!(
1490 "Failed to apply deferred when-op: {}",
1491 e
1492 ),
1493 }
1494 }
1495 }
1496 }
1497 }
1498 }
1499 }
1500 }
1501
1502 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1503 for entity_name in entity_names {
1504 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1505 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1506 if let Some(ref mut log) = log {
1507 log.set("entity", entity_name.clone());
1508 log.inc("handlers", 1);
1509 }
1510
1511 let opcodes_before = self.instructions_executed;
1512 let cache_before = self.cache_hits;
1513 let pda_hits_before = self.pda_cache_hits;
1514 let pda_misses_before = self.pda_cache_misses;
1515
1516 let mutations = self.execute_handler(
1517 handler,
1518 &event_value,
1519 event_type,
1520 entity_bytecode.state_id,
1521 entity_name,
1522 entity_bytecode.computed_fields_evaluator.as_ref(),
1523 Some(&entity_bytecode.non_emitted_fields),
1524 )?;
1525
1526 if let Some(ref mut log) = log {
1527 log.inc(
1528 "opcodes",
1529 (self.instructions_executed - opcodes_before) as i64,
1530 );
1531 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1532 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1533 log.inc(
1534 "pda_misses",
1535 (self.pda_cache_misses - pda_misses_before) as i64,
1536 );
1537 }
1538
1539 if mutations.is_empty() {
1540 let is_tx_event =
1543 event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1544 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1545 if is_tx_event {
1546 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1547 let signature = context
1548 .and_then(|c| c.signature.clone())
1549 .unwrap_or_default();
1550 let _ = self.queue_instruction_event(
1551 entity_bytecode.state_id,
1552 QueuedInstructionEvent {
1553 pda_address: missed_pda,
1554 event_type: event_type.to_string(),
1555 event_data: event_value.clone(),
1556 slot,
1557 signature,
1558 },
1559 );
1560 } else {
1561 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1566 let signature = context
1567 .and_then(|c| c.signature.clone())
1568 .unwrap_or_default();
1569 if let Some(write_version) =
1570 context.and_then(|c| c.write_version)
1571 {
1572 let _ = self.queue_account_update(
1573 entity_bytecode.state_id,
1574 QueuedAccountUpdate {
1575 pda_address: missed_pda,
1576 account_type: event_type.to_string(),
1577 account_data: event_value.clone(),
1578 slot,
1579 write_version,
1580 signature,
1581 },
1582 );
1583 } else {
1584 tracing::warn!(
1585 event_type = %event_type,
1586 "Dropping queued account update: write_version missing from context"
1587 );
1588 }
1589 }
1590 }
1591 if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1592 if !is_tx_event {
1593 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1594 let signature = context
1595 .and_then(|c| c.signature.clone())
1596 .unwrap_or_default();
1597 if let Some(write_version) =
1598 context.and_then(|c| c.write_version)
1599 {
1600 let _ = self.queue_account_update(
1601 entity_bytecode.state_id,
1602 QueuedAccountUpdate {
1603 pda_address: missed_lookup,
1604 account_type: event_type.to_string(),
1605 account_data: event_value.clone(),
1606 slot,
1607 write_version,
1608 signature,
1609 },
1610 );
1611 } else {
1612 tracing::trace!(
1613 event_type = %event_type,
1614 "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1615 );
1616 }
1617 }
1618 }
1619 }
1620
1621 all_mutations.extend(mutations);
1622
1623 if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1624 if let Some(ctx) = context {
1625 if let Some(ref signature) = ctx.signature {
1626 if let Some(state) = self.states.get(&entity_bytecode.state_id)
1627 {
1628 {
1629 let mut cache =
1630 state.recent_tx_instructions.lock().unwrap();
1631 let entry = cache
1632 .get_or_insert_mut(signature.clone(), HashSet::new);
1633 entry.insert(event_type.to_string());
1634 }
1635
1636 let key = (signature.clone(), event_type.to_string());
1637 if let Some((_, deferred_ops)) =
1638 state.deferred_when_ops.remove(&key)
1639 {
1640 tracing::debug!(
1641 event_type = %event_type,
1642 signature = %signature,
1643 deferred_count = deferred_ops.len(),
1644 "flushing deferred when-ops"
1645 );
1646 for op in deferred_ops {
1647 match self.apply_deferred_when_op(
1648 entity_bytecode.state_id,
1649 &op,
1650 ) {
1651 Ok(mutations) => {
1652 all_mutations.extend(mutations)
1653 }
1654 Err(e) => {
1655 tracing::warn!(
1656 "Failed to apply deferred when-op: {}",
1657 e
1658 );
1659 }
1660 }
1661 }
1662 }
1663 }
1664 }
1665 }
1666 }
1667
1668 if let Some(registered_pda) = self.take_last_pda_registered() {
1669 let pending_events = self.flush_pending_instruction_events(
1670 entity_bytecode.state_id,
1671 ®istered_pda,
1672 );
1673 for pending in pending_events {
1674 if let Some(pending_handler) =
1675 entity_bytecode.handlers.get(&pending.event_type)
1676 {
1677 if let Ok(reprocessed_mutations) = self.execute_handler(
1678 pending_handler,
1679 &pending.event_data,
1680 &pending.event_type,
1681 entity_bytecode.state_id,
1682 entity_name,
1683 entity_bytecode.computed_fields_evaluator.as_ref(),
1684 Some(&entity_bytecode.non_emitted_fields),
1685 ) {
1686 all_mutations.extend(reprocessed_mutations);
1687 }
1688 }
1689 }
1690 }
1691
1692 let lookup_keys = self.take_last_lookup_index_keys();
1693 if !lookup_keys.is_empty() {
1694 tracing::info!(
1695 keys = ?lookup_keys,
1696 entity = %entity_name,
1697 "vm.process_event: flushing pending updates for lookup_keys"
1698 );
1699 }
1700 for lookup_key in lookup_keys {
1701 if let Ok(pending_updates) =
1702 self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1703 {
1704 for pending in pending_updates {
1705 if let Some(pending_handler) =
1706 entity_bytecode.handlers.get(&pending.account_type)
1707 {
1708 self.current_context = Some(UpdateContext::new_account(
1709 pending.slot,
1710 pending.signature.clone(),
1711 pending.write_version,
1712 ));
1713 match self.execute_handler(
1714 pending_handler,
1715 &pending.account_data,
1716 &pending.account_type,
1717 entity_bytecode.state_id,
1718 entity_name,
1719 entity_bytecode.computed_fields_evaluator.as_ref(),
1720 Some(&entity_bytecode.non_emitted_fields),
1721 ) {
1722 Ok(reprocessed) => {
1723 all_mutations.extend(reprocessed);
1724 }
1725 Err(e) => {
1726 tracing::warn!(
1727 error = %e,
1728 account_type = %pending.account_type,
1729 "Flushed event reprocessing failed"
1730 );
1731 }
1732 }
1733 }
1734 }
1735 }
1736 }
1737 } else if let Some(ref mut log) = log {
1738 log.set("skip_reason", "no_handler");
1739 }
1740 } else if let Some(ref mut log) = log {
1741 log.set("skip_reason", "entity_not_found");
1742 }
1743 }
1744 } else if let Some(ref mut log) = log {
1745 log.set("skip_reason", "no_event_routing");
1746 }
1747
1748 if let Some(log) = log {
1749 log.set("mutations", all_mutations.len() as i64);
1750 if let Some(first) = all_mutations.first() {
1751 if let Some(key_str) = first.key.as_str() {
1752 log.set("primary_key", key_str);
1753 } else if let Some(key_num) = first.key.as_u64() {
1754 log.set("primary_key", key_num as i64);
1755 }
1756 }
1757 if let Some(state) = self.states.get(&0) {
1758 log.set("state_table_size", state.data.len() as i64);
1759 }
1760
1761 let warnings = self.take_warnings();
1762 if !warnings.is_empty() {
1763 log.set("warnings", warnings.len() as i64);
1764 log.set(
1765 "warning_messages",
1766 Value::Array(warnings.into_iter().map(Value::String).collect()),
1767 );
1768 log.set_level(crate::canonical_log::LogLevel::Warn);
1769 }
1770 } else {
1771 self.warnings.clear();
1772 }
1773
1774 if self.instructions_executed.is_multiple_of(1000) {
1775 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1776 for state_id in state_ids {
1777 let expired = self.cleanup_expired_when_ops(state_id, 60);
1778 if expired > 0 {
1779 tracing::debug!(
1780 "Cleaned up {} expired deferred when-ops for state {}",
1781 expired,
1782 state_id
1783 );
1784 }
1785 }
1786 }
1787
1788 Ok(all_mutations)
1789 }
1790
1791 pub fn process_any(
1792 &mut self,
1793 bytecode: &MultiEntityBytecode,
1794 any: prost_types::Any,
1795 ) -> Result<Vec<Mutation>> {
1796 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1797 self.process_event(bytecode, event_value, &event_type, None, None)
1798 }
1799
1800 #[cfg_attr(feature = "otel", instrument(
1801 name = "vm.execute_handler",
1802 skip(self, handler, event_value, entity_evaluator),
1803 level = "debug",
1804 fields(
1805 event_type = %event_type,
1806 handler_opcodes = handler.len(),
1807 )
1808 ))]
1809 #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1810 fn execute_handler(
1811 &mut self,
1812 handler: &[OpCode],
1813 event_value: &Value,
1814 event_type: &str,
1815 override_state_id: u32,
1816 entity_name: &str,
1817 entity_evaluator: Option<
1818 &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1819 >,
1820 non_emitted_fields: Option<&HashSet<String>>,
1821 ) -> Result<Vec<Mutation>> {
1822 self.reset_registers();
1823 self.last_pda_lookup_miss = None;
1824
1825 let mut pc: usize = 0;
1826 let mut output = Vec::new();
1827 let mut dirty_tracker = DirtyTracker::new();
1828 let should_emit = |path: &str| {
1829 non_emitted_fields
1830 .map(|fields| !fields.contains(path))
1831 .unwrap_or(true)
1832 };
1833
1834 while pc < handler.len() {
1835 match &handler[pc] {
1836 OpCode::LoadEventField {
1837 path,
1838 dest,
1839 default,
1840 } => {
1841 let value = self.load_field(event_value, path, default.as_ref())?;
1842 self.registers[*dest] = value;
1843 pc += 1;
1844 }
1845 OpCode::LoadConstant { value, dest } => {
1846 self.registers[*dest] = value.clone();
1847 pc += 1;
1848 }
1849 OpCode::CopyRegister { source, dest } => {
1850 self.registers[*dest] = self.registers[*source].clone();
1851 pc += 1;
1852 }
1853 OpCode::CopyRegisterIfNull { source, dest } => {
1854 if self.registers[*dest].is_null() {
1855 self.registers[*dest] = self.registers[*source].clone();
1856 }
1857 pc += 1;
1858 }
1859 OpCode::GetEventType { dest } => {
1860 self.registers[*dest] = json!(event_type);
1861 pc += 1;
1862 }
1863 OpCode::CreateObject { dest } => {
1864 self.registers[*dest] = json!({});
1865 pc += 1;
1866 }
1867 OpCode::SetField {
1868 object,
1869 path,
1870 value,
1871 } => {
1872 self.set_field_auto_vivify(*object, path, *value)?;
1873 if should_emit(path) {
1874 dirty_tracker.mark_replaced(path);
1875 }
1876 pc += 1;
1877 }
1878 OpCode::SetFields { object, fields } => {
1879 for (path, value_reg) in fields {
1880 self.set_field_auto_vivify(*object, path, *value_reg)?;
1881 if should_emit(path) {
1882 dirty_tracker.mark_replaced(path);
1883 }
1884 }
1885 pc += 1;
1886 }
1887 OpCode::GetField { object, path, dest } => {
1888 let value = self.get_field(*object, path)?;
1889 self.registers[*dest] = value;
1890 pc += 1;
1891 }
1892 OpCode::ReadOrInitState {
1893 state_id: _,
1894 key,
1895 default,
1896 dest,
1897 } => {
1898 let actual_state_id = override_state_id;
1899 let entity_name_owned = entity_name.to_string();
1900 self.states
1901 .entry(actual_state_id)
1902 .or_insert_with(|| StateTable {
1903 data: DashMap::new(),
1904 access_times: DashMap::new(),
1905 lookup_indexes: HashMap::new(),
1906 temporal_indexes: HashMap::new(),
1907 pda_reverse_lookups: HashMap::new(),
1908 pending_updates: DashMap::new(),
1909 pending_instruction_events: DashMap::new(),
1910 version_tracker: VersionTracker::new(),
1911 instruction_dedup_cache: VersionTracker::with_capacity(
1912 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1913 ),
1914 config: StateTableConfig::default(),
1915 entity_name: entity_name_owned,
1916 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1917 NonZeroUsize::new(1000).unwrap(),
1918 )),
1919 deferred_when_ops: DashMap::new(),
1920 });
1921 let key_value = self.registers[*key].clone();
1922 let warn_null_key = key_value.is_null()
1924 && event_type.ends_with("State")
1925 && !event_type.ends_with("IxState")
1926 && !event_type.ends_with("CpiEvent");
1927
1928 if warn_null_key {
1929 self.add_warning(format!(
1930 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1931 key, event_type
1932 ));
1933 }
1934
1935 let state = self
1936 .states
1937 .get(&actual_state_id)
1938 .ok_or("State table not found")?;
1939
1940 if !key_value.is_null() {
1941 if let Some(ctx) = &self.current_context {
1942 if ctx.is_account_update() {
1944 if let (Some(slot), Some(write_version)) =
1945 (ctx.slot, ctx.write_version)
1946 {
1947 if !state.is_fresh_update(
1948 &key_value,
1949 event_type,
1950 slot,
1951 write_version,
1952 ) {
1953 self.add_warning(format!(
1954 "Stale account update skipped: slot={}, write_version={}",
1955 slot, write_version
1956 ));
1957 return Ok(Vec::new());
1958 }
1959 }
1960 }
1961 else if ctx.is_instruction_update() {
1963 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1964 if state.is_duplicate_instruction(
1965 &key_value, event_type, slot, txn_index,
1966 ) {
1967 self.add_warning(format!(
1968 "Duplicate instruction skipped: slot={}, txn_index={}",
1969 slot, txn_index
1970 ));
1971 return Ok(Vec::new());
1972 }
1973 }
1974 }
1975 }
1976 }
1977 let value = state
1978 .get_and_touch(&key_value)
1979 .unwrap_or_else(|| default.clone());
1980
1981 self.registers[*dest] = value;
1982 pc += 1;
1983 }
1984 OpCode::UpdateState {
1985 state_id: _,
1986 key,
1987 value,
1988 } => {
1989 let actual_state_id = override_state_id;
1990 let state = self
1991 .states
1992 .get(&actual_state_id)
1993 .ok_or("State table not found")?;
1994 let key_value = self.registers[*key].clone();
1995 let value_data = self.registers[*value].clone();
1996
1997 state.insert_with_eviction(key_value, value_data);
1998 pc += 1;
1999 }
2000 OpCode::AppendToArray {
2001 object,
2002 path,
2003 value,
2004 } => {
2005 let appended_value = self.registers[*value].clone();
2006 let max_len = self
2007 .states
2008 .get(&override_state_id)
2009 .map(|s| s.max_array_length())
2010 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2011 self.append_to_array(*object, path, *value, max_len)?;
2012 if should_emit(path) {
2013 dirty_tracker.mark_appended(path, appended_value);
2014 }
2015 pc += 1;
2016 }
2017 OpCode::GetCurrentTimestamp { dest } => {
2018 let timestamp = std::time::SystemTime::now()
2019 .duration_since(std::time::UNIX_EPOCH)
2020 .unwrap()
2021 .as_secs() as i64;
2022 self.registers[*dest] = json!(timestamp);
2023 pc += 1;
2024 }
2025 OpCode::CreateEvent { dest, event_value } => {
2026 let timestamp = std::time::SystemTime::now()
2027 .duration_since(std::time::UNIX_EPOCH)
2028 .unwrap()
2029 .as_secs() as i64;
2030
2031 let mut event_data = self.registers[*event_value].clone();
2033 if let Some(obj) = event_data.as_object_mut() {
2034 obj.remove("__update_context");
2035 }
2036
2037 let mut event = serde_json::Map::new();
2039 event.insert("timestamp".to_string(), json!(timestamp));
2040 event.insert("data".to_string(), event_data);
2041
2042 if let Some(ref ctx) = self.current_context {
2044 if let Some(slot) = ctx.slot {
2045 event.insert("slot".to_string(), json!(slot));
2046 }
2047 if let Some(ref signature) = ctx.signature {
2048 event.insert("signature".to_string(), json!(signature));
2049 }
2050 }
2051
2052 self.registers[*dest] = Value::Object(event);
2053 pc += 1;
2054 }
2055 OpCode::CreateCapture {
2056 dest,
2057 capture_value,
2058 } => {
2059 let timestamp = std::time::SystemTime::now()
2060 .duration_since(std::time::UNIX_EPOCH)
2061 .unwrap()
2062 .as_secs() as i64;
2063
2064 let capture_data = self.registers[*capture_value].clone();
2066
2067 let account_address = event_value
2069 .get("__account_address")
2070 .and_then(|v| v.as_str())
2071 .unwrap_or("")
2072 .to_string();
2073
2074 let mut capture = serde_json::Map::new();
2076 capture.insert("timestamp".to_string(), json!(timestamp));
2077 capture.insert("account_address".to_string(), json!(account_address));
2078 capture.insert("data".to_string(), capture_data);
2079
2080 if let Some(ref ctx) = self.current_context {
2082 if let Some(slot) = ctx.slot {
2083 capture.insert("slot".to_string(), json!(slot));
2084 }
2085 if let Some(ref signature) = ctx.signature {
2086 capture.insert("signature".to_string(), json!(signature));
2087 }
2088 }
2089
2090 self.registers[*dest] = Value::Object(capture);
2091 pc += 1;
2092 }
2093 OpCode::Transform {
2094 source,
2095 dest,
2096 transformation,
2097 } => {
2098 if source == dest {
2099 self.transform_in_place(*source, transformation)?;
2100 } else {
2101 let source_value = &self.registers[*source];
2102 let value = Self::apply_transformation(source_value, transformation)?;
2103 self.registers[*dest] = value;
2104 }
2105 pc += 1;
2106 }
2107 OpCode::EmitMutation {
2108 entity_name,
2109 key,
2110 state,
2111 } => {
2112 let primary_key = self.registers[*key].clone();
2113
2114 if primary_key.is_null() || dirty_tracker.is_empty() {
2115 let reason = if dirty_tracker.is_empty() {
2116 "no_fields_modified"
2117 } else {
2118 "null_primary_key"
2119 };
2120 self.add_warning(format!(
2121 "Skipping mutation for entity '{}': {} (dirty_fields={})",
2122 entity_name,
2123 reason,
2124 dirty_tracker.len()
2125 ));
2126 } else {
2127 let patch =
2128 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2129
2130 let append = dirty_tracker.appended_paths();
2131 let mutation = Mutation {
2132 export: entity_name.clone(),
2133 key: primary_key,
2134 patch,
2135 append,
2136 };
2137 output.push(mutation);
2138 }
2139 pc += 1;
2140 }
2141 OpCode::SetFieldIfNull {
2142 object,
2143 path,
2144 value,
2145 } => {
2146 let was_set = self.set_field_if_null(*object, path, *value)?;
2147 if was_set && should_emit(path) {
2148 dirty_tracker.mark_replaced(path);
2149 }
2150 pc += 1;
2151 }
2152 OpCode::SetFieldMax {
2153 object,
2154 path,
2155 value,
2156 } => {
2157 let was_updated = self.set_field_max(*object, path, *value)?;
2158 if was_updated && should_emit(path) {
2159 dirty_tracker.mark_replaced(path);
2160 }
2161 pc += 1;
2162 }
2163 OpCode::UpdateTemporalIndex {
2164 state_id: _,
2165 index_name,
2166 lookup_value,
2167 primary_key,
2168 timestamp,
2169 } => {
2170 let actual_state_id = override_state_id;
2171 let state = self
2172 .states
2173 .get_mut(&actual_state_id)
2174 .ok_or("State table not found")?;
2175 let index = state
2176 .temporal_indexes
2177 .entry(index_name.clone())
2178 .or_insert_with(TemporalIndex::new);
2179
2180 let lookup_val = self.registers[*lookup_value].clone();
2181 let pk_val = self.registers[*primary_key].clone();
2182 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2183 val
2184 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2185 val as i64
2186 } else {
2187 return Err(format!(
2188 "Timestamp must be a number (i64 or u64), got: {:?}",
2189 self.registers[*timestamp]
2190 )
2191 .into());
2192 };
2193
2194 index.insert(lookup_val, pk_val, ts_val);
2195 pc += 1;
2196 }
2197 OpCode::LookupTemporalIndex {
2198 state_id: _,
2199 index_name,
2200 lookup_value,
2201 timestamp,
2202 dest,
2203 } => {
2204 let actual_state_id = override_state_id;
2205 let state = self
2206 .states
2207 .get(&actual_state_id)
2208 .ok_or("State table not found")?;
2209 let lookup_val = &self.registers[*lookup_value];
2210
2211 let result = if self.registers[*timestamp].is_null() {
2212 if let Some(index) = state.temporal_indexes.get(index_name) {
2213 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2214 } else {
2215 Value::Null
2216 }
2217 } else {
2218 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2219 val
2220 } else if let Some(val) = self.registers[*timestamp].as_u64() {
2221 val as i64
2222 } else {
2223 return Err(format!(
2224 "Timestamp must be a number (i64 or u64), got: {:?}",
2225 self.registers[*timestamp]
2226 )
2227 .into());
2228 };
2229
2230 if let Some(index) = state.temporal_indexes.get(index_name) {
2231 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2232 } else {
2233 Value::Null
2234 }
2235 };
2236
2237 self.registers[*dest] = result;
2238 pc += 1;
2239 }
2240 OpCode::UpdateLookupIndex {
2241 state_id: _,
2242 index_name,
2243 lookup_value,
2244 primary_key,
2245 } => {
2246 let actual_state_id = override_state_id;
2247 let state = self
2248 .states
2249 .get_mut(&actual_state_id)
2250 .ok_or("State table not found")?;
2251 let index = state
2252 .lookup_indexes
2253 .entry(index_name.clone())
2254 .or_insert_with(LookupIndex::new);
2255
2256 let lookup_val = self.registers[*lookup_value].clone();
2257 let pk_val = self.registers[*primary_key].clone();
2258
2259 index.insert(lookup_val.clone(), pk_val);
2260
2261 if let Some(key_str) = lookup_val.as_str() {
2263 self.last_lookup_index_keys.push(key_str.to_string());
2264 }
2265
2266 pc += 1;
2267 }
2268 OpCode::LookupIndex {
2269 state_id: _,
2270 index_name,
2271 lookup_value,
2272 dest,
2273 } => {
2274 let actual_state_id = override_state_id;
2275 let mut current_value = self.registers[*lookup_value].clone();
2276
2277 const MAX_CHAIN_DEPTH: usize = 5;
2278 let mut iterations = 0;
2279
2280 let final_result = if self.states.contains_key(&actual_state_id) {
2281 loop {
2282 iterations += 1;
2283 if iterations > MAX_CHAIN_DEPTH {
2284 break current_value;
2285 }
2286
2287 let resolved = self
2288 .states
2289 .get(&actual_state_id)
2290 .and_then(|state| {
2291 if let Some(index) = state.lookup_indexes.get(index_name) {
2292 if let Some(found) = index.lookup(¤t_value) {
2293 return Some(found);
2294 }
2295 }
2296
2297 for (name, index) in state.lookup_indexes.iter() {
2298 if name == index_name {
2299 continue;
2300 }
2301 if let Some(found) = index.lookup(¤t_value) {
2302 return Some(found);
2303 }
2304 }
2305
2306 None
2307 })
2308 .unwrap_or(Value::Null);
2309
2310 let mut resolved_from_pda = false;
2311 let resolved = if resolved.is_null() {
2312 if let Some(pda_str) = current_value.as_str() {
2313 resolved_from_pda = true;
2314 self.states
2315 .get_mut(&actual_state_id)
2316 .and_then(|state_mut| {
2317 state_mut
2318 .pda_reverse_lookups
2319 .get_mut("default_pda_lookup")
2320 })
2321 .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2322 .map(Value::String)
2323 .unwrap_or(Value::Null)
2324 } else {
2325 Value::Null
2326 }
2327 } else {
2328 resolved
2329 };
2330
2331 if resolved.is_null() {
2332 if iterations == 1 {
2333 if let Some(pda_str) = current_value.as_str() {
2334 self.last_pda_lookup_miss = Some(pda_str.to_string());
2335 }
2336 }
2337 break Value::Null;
2338 }
2339
2340 let can_chain =
2341 self.can_resolve_further(&resolved, actual_state_id, index_name);
2342
2343 if !can_chain {
2344 if resolved_from_pda {
2345 if let Some(resolved_str) = resolved.as_str() {
2346 self.last_lookup_index_miss =
2347 Some(resolved_str.to_string());
2348 }
2349 break Value::Null;
2350 }
2351 break resolved;
2352 }
2353
2354 current_value = resolved;
2355 }
2356 } else {
2357 Value::Null
2358 };
2359
2360 self.registers[*dest] = final_result;
2361 pc += 1;
2362 }
2363 OpCode::SetFieldSum {
2364 object,
2365 path,
2366 value,
2367 } => {
2368 let was_updated = self.set_field_sum(*object, path, *value)?;
2369 if was_updated && should_emit(path) {
2370 dirty_tracker.mark_replaced(path);
2371 }
2372 pc += 1;
2373 }
2374 OpCode::SetFieldIncrement { object, path } => {
2375 let was_updated = self.set_field_increment(*object, path)?;
2376 if was_updated && should_emit(path) {
2377 dirty_tracker.mark_replaced(path);
2378 }
2379 pc += 1;
2380 }
2381 OpCode::SetFieldMin {
2382 object,
2383 path,
2384 value,
2385 } => {
2386 let was_updated = self.set_field_min(*object, path, *value)?;
2387 if was_updated && should_emit(path) {
2388 dirty_tracker.mark_replaced(path);
2389 }
2390 pc += 1;
2391 }
2392 OpCode::AddToUniqueSet {
2393 state_id: _,
2394 set_name,
2395 value,
2396 count_object,
2397 count_path,
2398 } => {
2399 let value_to_add = self.registers[*value].clone();
2400
2401 let set_field_path = format!("__unique_set:{}", set_name);
2404
2405 let mut set: HashSet<Value> =
2407 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2408 if !existing.is_null() {
2409 serde_json::from_value(existing).unwrap_or_default()
2410 } else {
2411 HashSet::new()
2412 }
2413 } else {
2414 HashSet::new()
2415 };
2416
2417 let was_new = set.insert(value_to_add);
2419
2420 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2422 self.registers[100] = serde_json::to_value(set_as_vec)?;
2423 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2424
2425 if was_new {
2427 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2428 self.set_field_auto_vivify(*count_object, count_path, 100)?;
2429 if should_emit(count_path) {
2430 dirty_tracker.mark_replaced(count_path);
2431 }
2432 }
2433
2434 pc += 1;
2435 }
2436 OpCode::ConditionalSetField {
2437 object,
2438 path,
2439 value,
2440 condition_field,
2441 condition_op,
2442 condition_value,
2443 } => {
2444 let field_value = self.load_field(event_value, condition_field, None)?;
2445 let condition_met =
2446 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2447
2448 if condition_met {
2449 self.set_field_auto_vivify(*object, path, *value)?;
2450 if should_emit(path) {
2451 dirty_tracker.mark_replaced(path);
2452 }
2453 }
2454 pc += 1;
2455 }
2456 OpCode::SetFieldWhen {
2457 object,
2458 path,
2459 value,
2460 when_instruction,
2461 entity_name,
2462 key_reg,
2463 condition_field,
2464 condition_op,
2465 condition_value,
2466 } => {
2467 let actual_state_id = override_state_id;
2468 let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2469 condition_field.as_ref(),
2470 condition_op.as_ref(),
2471 condition_value.as_ref(),
2472 ) {
2473 let field_value = self.load_field(event_value, field, None)?;
2474 self.evaluate_comparison(&field_value, op, cond_value)?
2475 } else {
2476 true
2477 };
2478
2479 if !condition_met {
2480 pc += 1;
2481 continue;
2482 }
2483
2484 let signature = self
2485 .current_context
2486 .as_ref()
2487 .and_then(|c| c.signature.clone())
2488 .unwrap_or_default();
2489
2490 let emit = should_emit(path);
2491
2492 let instruction_seen = if !signature.is_empty() {
2493 if let Some(state) = self.states.get(&actual_state_id) {
2494 let mut cache = state.recent_tx_instructions.lock().unwrap();
2495 cache
2496 .get(&signature)
2497 .map(|set| set.contains(when_instruction))
2498 .unwrap_or(false)
2499 } else {
2500 false
2501 }
2502 } else {
2503 false
2504 };
2505
2506 if instruction_seen {
2507 self.set_field_auto_vivify(*object, path, *value)?;
2508 if emit {
2509 dirty_tracker.mark_replaced(path);
2510 }
2511 } else if !signature.is_empty() {
2512 let deferred = DeferredWhenOperation {
2513 entity_name: entity_name.clone(),
2514 primary_key: self.registers[*key_reg].clone(),
2515 field_path: path.clone(),
2516 field_value: self.registers[*value].clone(),
2517 when_instruction: when_instruction.clone(),
2518 signature: signature.clone(),
2519 slot: self
2520 .current_context
2521 .as_ref()
2522 .and_then(|c| c.slot)
2523 .unwrap_or(0),
2524 deferred_at: std::time::SystemTime::now()
2525 .duration_since(std::time::UNIX_EPOCH)
2526 .unwrap()
2527 .as_secs() as i64,
2528 emit,
2529 };
2530
2531 if let Some(state) = self.states.get(&actual_state_id) {
2532 let key = (signature, when_instruction.clone());
2533 state
2534 .deferred_when_ops
2535 .entry(key)
2536 .or_insert_with(Vec::new)
2537 .push(deferred);
2538 }
2539 }
2540
2541 pc += 1;
2542 }
2543 OpCode::SetFieldUnlessStopped {
2544 object,
2545 path,
2546 value,
2547 stop_field,
2548 stop_instruction,
2549 entity_name,
2550 key_reg: _,
2551 } => {
2552 let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2553 let stopped = stop_value.as_bool().unwrap_or(false);
2554
2555 if stopped {
2556 tracing::debug!(
2557 entity = %entity_name,
2558 field = %path,
2559 stop_field = %stop_field,
2560 stop_instruction = %stop_instruction,
2561 "stop flag set; skipping field update"
2562 );
2563 pc += 1;
2564 continue;
2565 }
2566
2567 self.set_field_auto_vivify(*object, path, *value)?;
2568 if should_emit(path) {
2569 dirty_tracker.mark_replaced(path);
2570 }
2571 pc += 1;
2572 }
2573 OpCode::ConditionalIncrement {
2574 object,
2575 path,
2576 condition_field,
2577 condition_op,
2578 condition_value,
2579 } => {
2580 let field_value = self.load_field(event_value, condition_field, None)?;
2581 let condition_met =
2582 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2583
2584 if condition_met {
2585 let was_updated = self.set_field_increment(*object, path)?;
2586 if was_updated && should_emit(path) {
2587 dirty_tracker.mark_replaced(path);
2588 }
2589 }
2590 pc += 1;
2591 }
2592 OpCode::EvaluateComputedFields {
2593 state,
2594 computed_paths,
2595 } => {
2596 if let Some(evaluator) = entity_evaluator {
2597 let old_values: Vec<_> = computed_paths
2598 .iter()
2599 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2600 .collect();
2601
2602 let state_value = &mut self.registers[*state];
2603 let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2604 let context_timestamp = self
2605 .current_context
2606 .as_ref()
2607 .map(|c| c.timestamp())
2608 .unwrap_or_else(|| {
2609 std::time::SystemTime::now()
2610 .duration_since(std::time::UNIX_EPOCH)
2611 .unwrap()
2612 .as_secs() as i64
2613 });
2614 let eval_result = evaluator(state_value, context_slot, context_timestamp);
2615
2616 if eval_result.is_ok() {
2617 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2618 let new_value =
2619 Self::get_value_at_path(&self.registers[*state], path);
2620
2621 if new_value != *old_value && should_emit(path) {
2622 dirty_tracker.mark_replaced(path);
2623 }
2624 }
2625 }
2626 }
2627 pc += 1;
2628 }
2629 OpCode::QueueResolver {
2630 state_id: _,
2631 entity_name,
2632 resolver,
2633 input_path,
2634 input_value,
2635 url_template,
2636 strategy,
2637 extracts,
2638 condition,
2639 schedule_at,
2640 state,
2641 key,
2642 } => {
2643 let actual_state_id = override_state_id;
2644
2645 if let Some(cond) = condition {
2647 let field_val =
2648 Self::get_value_at_path(&self.registers[*state], &cond.field_path)
2649 .unwrap_or(Value::Null);
2650 if !self.evaluate_comparison(&field_val, &cond.op, &cond.value)? {
2651 pc += 1;
2652 continue;
2653 }
2654 }
2655
2656 if let Some(schedule_path) = schedule_at {
2658 let target_val =
2659 Self::get_value_at_path(&self.registers[*state], schedule_path);
2660
2661 match target_val.and_then(|v| v.as_u64()) {
2662 Some(target_slot) => {
2663 let current_slot = self
2664 .current_context
2665 .as_ref()
2666 .and_then(|ctx| ctx.slot)
2667 .unwrap_or(0);
2668 if current_slot < target_slot {
2669 let key_value = &self.registers[*key];
2670 if !key_value.is_null() {
2671 self.scheduled_callbacks.push((
2672 target_slot,
2673 ScheduledCallback {
2674 state_id: actual_state_id,
2675 entity_name: entity_name.clone(),
2676 primary_key: key_value.clone(),
2677 resolver: resolver.clone(),
2678 url_template: url_template.clone(),
2679 input_value: input_value.clone(),
2680 input_path: input_path.clone(),
2681 condition: condition.clone(),
2682 strategy: strategy.clone(),
2683 extracts: extracts.clone(),
2684 retry_count: 0,
2685 },
2686 ));
2687 }
2688 pc += 1;
2689 continue;
2690 }
2691 }
2693 None => {
2694 tracing::warn!(
2698 schedule_at_path = %schedule_path,
2699 entity = %entity_name,
2700 "schedule_at field path is missing or not a valid u64 in state; \
2701 skipping resolver (state may not be fully populated yet)"
2702 );
2703 pc += 1;
2704 continue;
2705 }
2706 }
2707 }
2708
2709 let resolved_input = if let Some(template) = url_template {
2711 crate::scheduler::build_url_from_template(template, &self.registers[*state])
2712 .map(Value::String)
2713 } else if let Some(value) = input_value {
2714 Some(value.clone())
2715 } else if let Some(path) = input_path.as_ref() {
2716 Self::get_value_at_path(&self.registers[*state], path)
2717 } else {
2718 None
2719 };
2720
2721 if let Some(input) = resolved_input {
2722 let key_value = &self.registers[*key];
2723
2724 if input.is_null() || key_value.is_null() {
2725 tracing::warn!(
2726 entity = %entity_name,
2727 resolver = ?resolver,
2728 input_path = %input_path.as_deref().unwrap_or("<literal>"),
2729 input_is_null = input.is_null(),
2730 key_is_null = key_value.is_null(),
2731 input = ?input,
2732 key = ?key_value,
2733 "Resolver skipped: null input or key"
2734 );
2735 pc += 1;
2736 continue;
2737 }
2738
2739 if matches!(strategy, ResolveStrategy::SetOnce)
2740 && extracts.iter().all(|extract| {
2744 match Self::get_value_at_path(
2745 &self.registers[*state],
2746 &extract.target_path,
2747 ) {
2748 Some(value) => !value.is_null(),
2749 None => false,
2750 }
2751 })
2752 {
2753 pc += 1;
2754 continue;
2755 }
2756
2757 let cache_key = resolver_cache_key(resolver, &input);
2758
2759 let cached_value = self.resolver_cache.get(&cache_key).cloned();
2760 if let Some(cached) = cached_value {
2761 self.resolver_cache_hits += 1;
2762 Self::apply_resolver_extractions_to_value(
2763 &mut self.registers[*state],
2764 &cached,
2765 extracts,
2766 &mut dirty_tracker,
2767 &should_emit,
2768 )?;
2769 } else {
2770 self.resolver_cache_misses += 1;
2771 let target = ResolverTarget {
2772 state_id: actual_state_id,
2773 entity_name: entity_name.clone(),
2774 primary_key: self.registers[*key].clone(),
2775 extracts: extracts.clone(),
2776 };
2777
2778 self.enqueue_resolver_request(
2779 cache_key,
2780 resolver.clone(),
2781 input,
2782 target,
2783 );
2784 }
2785 } else {
2786 tracing::warn!(
2787 entity = %entity_name,
2788 resolver = ?resolver,
2789 input_path = %input_path.as_deref().unwrap_or("<literal>"),
2790 state = ?self.registers[*state],
2791 "Resolver skipped: input path not found in state"
2792 );
2793 }
2794
2795 pc += 1;
2796 }
2797 OpCode::UpdatePdaReverseLookup {
2798 state_id: _,
2799 lookup_name,
2800 pda_address,
2801 primary_key,
2802 } => {
2803 let actual_state_id = override_state_id;
2804 let state = self
2805 .states
2806 .get_mut(&actual_state_id)
2807 .ok_or("State table not found")?;
2808
2809 let pda_val = self.registers[*pda_address].clone();
2810 let pk_val = self.registers[*primary_key].clone();
2811
2812 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2813 let pda_lookup = state
2814 .pda_reverse_lookups
2815 .entry(lookup_name.clone())
2816 .or_insert_with(|| {
2817 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2818 });
2819
2820 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2821 self.last_pda_registered = Some(pda_str.to_string());
2822 } else if !pk_val.is_null() {
2823 if let Some(pk_num) = pk_val.as_u64() {
2824 if let Some(pda_str) = pda_val.as_str() {
2825 let pda_lookup = state
2826 .pda_reverse_lookups
2827 .entry(lookup_name.clone())
2828 .or_insert_with(|| {
2829 PdaReverseLookup::new(
2830 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2831 )
2832 });
2833
2834 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2835 self.last_pda_registered = Some(pda_str.to_string());
2836 }
2837 }
2838 }
2839
2840 pc += 1;
2841 }
2842 }
2843
2844 self.instructions_executed += 1;
2845 }
2846
2847 Ok(output)
2848 }
2849
2850 fn load_field(
2851 &self,
2852 event_value: &Value,
2853 path: &FieldPath,
2854 default: Option<&Value>,
2855 ) -> Result<Value> {
2856 if path.segments.is_empty() {
2857 if let Some(obj) = event_value.as_object() {
2858 let filtered: serde_json::Map<String, Value> = obj
2859 .iter()
2860 .filter(|(k, _)| !k.starts_with("__"))
2861 .map(|(k, v)| (k.clone(), v.clone()))
2862 .collect();
2863 return Ok(Value::Object(filtered));
2864 }
2865 return Ok(event_value.clone());
2866 }
2867
2868 let mut current = event_value;
2869 for segment in path.segments.iter() {
2870 current = match current.get(segment) {
2871 Some(v) => v,
2872 None => {
2873 tracing::trace!(
2874 "load_field: segment={:?} not found in {:?}, returning default",
2875 segment,
2876 current
2877 );
2878 return Ok(default.cloned().unwrap_or(Value::Null));
2879 }
2880 };
2881 }
2882
2883 tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
2884 Ok(current.clone())
2885 }
2886
2887 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
2888 let mut current = value;
2889 for segment in path.split('.') {
2890 current = current.get(segment)?;
2891 }
2892 Some(current.clone())
2893 }
2894
2895 fn set_field_auto_vivify(
2896 &mut self,
2897 object_reg: Register,
2898 path: &str,
2899 value_reg: Register,
2900 ) -> Result<()> {
2901 let compiled = self.get_compiled_path(path);
2902 let segments = compiled.segments();
2903 let value = self.registers[value_reg].clone();
2904
2905 if !self.registers[object_reg].is_object() {
2906 self.registers[object_reg] = json!({});
2907 }
2908
2909 let obj = self.registers[object_reg]
2910 .as_object_mut()
2911 .ok_or("Not an object")?;
2912
2913 let mut current = obj;
2914 for (i, segment) in segments.iter().enumerate() {
2915 if i == segments.len() - 1 {
2916 current.insert(segment.to_string(), value);
2917 return Ok(());
2918 } else {
2919 current
2920 .entry(segment.to_string())
2921 .or_insert_with(|| json!({}));
2922 current = current
2923 .get_mut(segment)
2924 .and_then(|v| v.as_object_mut())
2925 .ok_or("Path collision: expected object")?;
2926 }
2927 }
2928
2929 Ok(())
2930 }
2931
2932 fn set_field_if_null(
2933 &mut self,
2934 object_reg: Register,
2935 path: &str,
2936 value_reg: Register,
2937 ) -> Result<bool> {
2938 let compiled = self.get_compiled_path(path);
2939 let segments = compiled.segments();
2940 let value = self.registers[value_reg].clone();
2941
2942 if value.is_null() {
2946 return Ok(false);
2947 }
2948
2949 if !self.registers[object_reg].is_object() {
2950 self.registers[object_reg] = json!({});
2951 }
2952
2953 let obj = self.registers[object_reg]
2954 .as_object_mut()
2955 .ok_or("Not an object")?;
2956
2957 let mut current = obj;
2958 for (i, segment) in segments.iter().enumerate() {
2959 if i == segments.len() - 1 {
2960 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2961 current.insert(segment.to_string(), value);
2962 return Ok(true);
2963 }
2964 return Ok(false);
2965 } else {
2966 current
2967 .entry(segment.to_string())
2968 .or_insert_with(|| json!({}));
2969 current = current
2970 .get_mut(segment)
2971 .and_then(|v| v.as_object_mut())
2972 .ok_or("Path collision: expected object")?;
2973 }
2974 }
2975
2976 Ok(false)
2977 }
2978
2979 fn set_field_max(
2980 &mut self,
2981 object_reg: Register,
2982 path: &str,
2983 value_reg: Register,
2984 ) -> Result<bool> {
2985 let compiled = self.get_compiled_path(path);
2986 let segments = compiled.segments();
2987 let new_value = self.registers[value_reg].clone();
2988
2989 if !self.registers[object_reg].is_object() {
2990 self.registers[object_reg] = json!({});
2991 }
2992
2993 let obj = self.registers[object_reg]
2994 .as_object_mut()
2995 .ok_or("Not an object")?;
2996
2997 let mut current = obj;
2998 for (i, segment) in segments.iter().enumerate() {
2999 if i == segments.len() - 1 {
3000 let should_update = if let Some(current_value) = current.get(segment) {
3001 if current_value.is_null() {
3002 true
3003 } else {
3004 match (current_value.as_i64(), new_value.as_i64()) {
3005 (Some(current_val), Some(new_val)) => new_val > current_val,
3006 (Some(current_val), None) if new_value.as_u64().is_some() => {
3007 new_value.as_u64().unwrap() as i64 > current_val
3008 }
3009 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3010 new_val > current_value.as_u64().unwrap() as i64
3011 }
3012 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3013 (Some(current_val), Some(new_val)) => new_val > current_val,
3014 _ => match (current_value.as_f64(), new_value.as_f64()) {
3015 (Some(current_val), Some(new_val)) => new_val > current_val,
3016 _ => false,
3017 },
3018 },
3019 _ => false,
3020 }
3021 }
3022 } else {
3023 true
3024 };
3025
3026 if should_update {
3027 current.insert(segment.to_string(), new_value);
3028 return Ok(true);
3029 }
3030 return Ok(false);
3031 } else {
3032 current
3033 .entry(segment.to_string())
3034 .or_insert_with(|| json!({}));
3035 current = current
3036 .get_mut(segment)
3037 .and_then(|v| v.as_object_mut())
3038 .ok_or("Path collision: expected object")?;
3039 }
3040 }
3041
3042 Ok(false)
3043 }
3044
3045 fn set_field_sum(
3046 &mut self,
3047 object_reg: Register,
3048 path: &str,
3049 value_reg: Register,
3050 ) -> Result<bool> {
3051 let compiled = self.get_compiled_path(path);
3052 let segments = compiled.segments();
3053 let new_value = &self.registers[value_reg];
3054
3055 tracing::trace!(
3057 "set_field_sum: path={:?}, value={:?}, value_type={}",
3058 path,
3059 new_value,
3060 match new_value {
3061 serde_json::Value::Null => "null",
3062 serde_json::Value::Bool(_) => "bool",
3063 serde_json::Value::Number(_) => "number",
3064 serde_json::Value::String(_) => "string",
3065 serde_json::Value::Array(_) => "array",
3066 serde_json::Value::Object(_) => "object",
3067 }
3068 );
3069 let new_val_num = new_value
3070 .as_i64()
3071 .or_else(|| new_value.as_u64().map(|n| n as i64))
3072 .ok_or("Sum requires numeric value")?;
3073
3074 if !self.registers[object_reg].is_object() {
3075 self.registers[object_reg] = json!({});
3076 }
3077
3078 let obj = self.registers[object_reg]
3079 .as_object_mut()
3080 .ok_or("Not an object")?;
3081
3082 let mut current = obj;
3083 for (i, segment) in segments.iter().enumerate() {
3084 if i == segments.len() - 1 {
3085 let current_val = current
3086 .get(segment)
3087 .and_then(|v| {
3088 if v.is_null() {
3089 None
3090 } else {
3091 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3092 }
3093 })
3094 .unwrap_or(0);
3095
3096 let sum = current_val + new_val_num;
3097 current.insert(segment.to_string(), json!(sum));
3098 return Ok(true);
3099 } else {
3100 current
3101 .entry(segment.to_string())
3102 .or_insert_with(|| json!({}));
3103 current = current
3104 .get_mut(segment)
3105 .and_then(|v| v.as_object_mut())
3106 .ok_or("Path collision: expected object")?;
3107 }
3108 }
3109
3110 Ok(false)
3111 }
3112
3113 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3114 let compiled = self.get_compiled_path(path);
3115 let segments = compiled.segments();
3116
3117 if !self.registers[object_reg].is_object() {
3118 self.registers[object_reg] = json!({});
3119 }
3120
3121 let obj = self.registers[object_reg]
3122 .as_object_mut()
3123 .ok_or("Not an object")?;
3124
3125 let mut current = obj;
3126 for (i, segment) in segments.iter().enumerate() {
3127 if i == segments.len() - 1 {
3128 let current_val = current
3130 .get(segment)
3131 .and_then(|v| {
3132 if v.is_null() {
3133 None
3134 } else {
3135 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3136 }
3137 })
3138 .unwrap_or(0);
3139
3140 let incremented = current_val + 1;
3141 current.insert(segment.to_string(), json!(incremented));
3142 return Ok(true);
3143 } else {
3144 current
3145 .entry(segment.to_string())
3146 .or_insert_with(|| json!({}));
3147 current = current
3148 .get_mut(segment)
3149 .and_then(|v| v.as_object_mut())
3150 .ok_or("Path collision: expected object")?;
3151 }
3152 }
3153
3154 Ok(false)
3155 }
3156
3157 fn set_field_min(
3158 &mut self,
3159 object_reg: Register,
3160 path: &str,
3161 value_reg: Register,
3162 ) -> Result<bool> {
3163 let compiled = self.get_compiled_path(path);
3164 let segments = compiled.segments();
3165 let new_value = self.registers[value_reg].clone();
3166
3167 if !self.registers[object_reg].is_object() {
3168 self.registers[object_reg] = json!({});
3169 }
3170
3171 let obj = self.registers[object_reg]
3172 .as_object_mut()
3173 .ok_or("Not an object")?;
3174
3175 let mut current = obj;
3176 for (i, segment) in segments.iter().enumerate() {
3177 if i == segments.len() - 1 {
3178 let should_update = if let Some(current_value) = current.get(segment) {
3179 if current_value.is_null() {
3180 true
3181 } else {
3182 match (current_value.as_i64(), new_value.as_i64()) {
3183 (Some(current_val), Some(new_val)) => new_val < current_val,
3184 (Some(current_val), None) if new_value.as_u64().is_some() => {
3185 (new_value.as_u64().unwrap() as i64) < current_val
3186 }
3187 (None, Some(new_val)) if current_value.as_u64().is_some() => {
3188 new_val < current_value.as_u64().unwrap() as i64
3189 }
3190 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3191 (Some(current_val), Some(new_val)) => new_val < current_val,
3192 _ => match (current_value.as_f64(), new_value.as_f64()) {
3193 (Some(current_val), Some(new_val)) => new_val < current_val,
3194 _ => false,
3195 },
3196 },
3197 _ => false,
3198 }
3199 }
3200 } else {
3201 true
3202 };
3203
3204 if should_update {
3205 current.insert(segment.to_string(), new_value);
3206 return Ok(true);
3207 }
3208 return Ok(false);
3209 } else {
3210 current
3211 .entry(segment.to_string())
3212 .or_insert_with(|| json!({}));
3213 current = current
3214 .get_mut(segment)
3215 .and_then(|v| v.as_object_mut())
3216 .ok_or("Path collision: expected object")?;
3217 }
3218 }
3219
3220 Ok(false)
3221 }
3222
3223 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3224 let compiled = self.get_compiled_path(path);
3225 let segments = compiled.segments();
3226 let mut current = &self.registers[object_reg];
3227
3228 for segment in segments {
3229 current = current
3230 .get(segment)
3231 .ok_or_else(|| format!("Field not found: {}", segment))?;
3232 }
3233
3234 Ok(current.clone())
3235 }
3236
3237 fn append_to_array(
3238 &mut self,
3239 object_reg: Register,
3240 path: &str,
3241 value_reg: Register,
3242 max_length: usize,
3243 ) -> Result<()> {
3244 let compiled = self.get_compiled_path(path);
3245 let segments = compiled.segments();
3246 let value = self.registers[value_reg].clone();
3247
3248 if !self.registers[object_reg].is_object() {
3249 self.registers[object_reg] = json!({});
3250 }
3251
3252 let obj = self.registers[object_reg]
3253 .as_object_mut()
3254 .ok_or("Not an object")?;
3255
3256 let mut current = obj;
3257 for (i, segment) in segments.iter().enumerate() {
3258 if i == segments.len() - 1 {
3259 current
3260 .entry(segment.to_string())
3261 .or_insert_with(|| json!([]));
3262 let arr = current
3263 .get_mut(segment)
3264 .and_then(|v| v.as_array_mut())
3265 .ok_or("Path is not an array")?;
3266 arr.push(value.clone());
3267
3268 if arr.len() > max_length {
3269 let excess = arr.len() - max_length;
3270 arr.drain(0..excess);
3271 }
3272 } else {
3273 current
3274 .entry(segment.to_string())
3275 .or_insert_with(|| json!({}));
3276 current = current
3277 .get_mut(segment)
3278 .and_then(|v| v.as_object_mut())
3279 .ok_or("Path collision: expected object")?;
3280 }
3281 }
3282
3283 Ok(())
3284 }
3285
3286 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3287 let value = &self.registers[reg];
3288 let transformed = Self::apply_transformation(value, transformation)?;
3289 self.registers[reg] = transformed;
3290 Ok(())
3291 }
3292
3293 fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3294 match transformation {
3295 Transformation::HexEncode => {
3296 if let Some(arr) = value.as_array() {
3297 let bytes: Vec<u8> = arr
3298 .iter()
3299 .filter_map(|v| v.as_u64().map(|n| n as u8))
3300 .collect();
3301 let hex = hex::encode(&bytes);
3302 Ok(json!(hex))
3303 } else if value.is_string() {
3304 Ok(value.clone())
3305 } else {
3306 Err("HexEncode requires an array of numbers".into())
3307 }
3308 }
3309 Transformation::HexDecode => {
3310 if let Some(s) = value.as_str() {
3311 let s = s.strip_prefix("0x").unwrap_or(s);
3312 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3313 Ok(json!(bytes))
3314 } else {
3315 Err("HexDecode requires a string".into())
3316 }
3317 }
3318 Transformation::Base58Encode => {
3319 if let Some(arr) = value.as_array() {
3320 let bytes: Vec<u8> = arr
3321 .iter()
3322 .filter_map(|v| v.as_u64().map(|n| n as u8))
3323 .collect();
3324 let encoded = bs58::encode(&bytes).into_string();
3325 Ok(json!(encoded))
3326 } else if value.is_string() {
3327 Ok(value.clone())
3328 } else {
3329 Err("Base58Encode requires an array of numbers".into())
3330 }
3331 }
3332 Transformation::Base58Decode => {
3333 if let Some(s) = value.as_str() {
3334 let bytes = bs58::decode(s)
3335 .into_vec()
3336 .map_err(|e| format!("Base58 decode error: {}", e))?;
3337 Ok(json!(bytes))
3338 } else {
3339 Err("Base58Decode requires a string".into())
3340 }
3341 }
3342 Transformation::ToString => Ok(json!(value.to_string())),
3343 Transformation::ToNumber => {
3344 if let Some(s) = value.as_str() {
3345 let n = s
3346 .parse::<i64>()
3347 .map_err(|e| format!("Parse error: {}", e))?;
3348 Ok(json!(n))
3349 } else {
3350 Ok(value.clone())
3351 }
3352 }
3353 }
3354 }
3355
3356 fn evaluate_comparison(
3357 &self,
3358 field_value: &Value,
3359 op: &ComparisonOp,
3360 condition_value: &Value,
3361 ) -> Result<bool> {
3362 use ComparisonOp::*;
3363
3364 match op {
3365 Equal => Ok(field_value == condition_value),
3366 NotEqual => Ok(field_value != condition_value),
3367 GreaterThan => {
3368 match (field_value.as_i64(), condition_value.as_i64()) {
3370 (Some(a), Some(b)) => Ok(a > b),
3371 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3372 (Some(a), Some(b)) => Ok(a > b),
3373 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3374 (Some(a), Some(b)) => Ok(a > b),
3375 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3376 },
3377 },
3378 }
3379 }
3380 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3381 (Some(a), Some(b)) => Ok(a >= b),
3382 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3383 (Some(a), Some(b)) => Ok(a >= b),
3384 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3385 (Some(a), Some(b)) => Ok(a >= b),
3386 _ => {
3387 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3388 }
3389 },
3390 },
3391 },
3392 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3393 (Some(a), Some(b)) => Ok(a < b),
3394 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3395 (Some(a), Some(b)) => Ok(a < b),
3396 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3397 (Some(a), Some(b)) => Ok(a < b),
3398 _ => Err("Cannot compare non-numeric values with LessThan".into()),
3399 },
3400 },
3401 },
3402 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3403 (Some(a), Some(b)) => Ok(a <= b),
3404 _ => match (field_value.as_u64(), condition_value.as_u64()) {
3405 (Some(a), Some(b)) => Ok(a <= b),
3406 _ => match (field_value.as_f64(), condition_value.as_f64()) {
3407 (Some(a), Some(b)) => Ok(a <= b),
3408 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3409 },
3410 },
3411 },
3412 }
3413 }
3414
3415 fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3416 if let Some(state) = self.states.get(&state_id) {
3417 if let Some(index) = state.lookup_indexes.get(index_name) {
3418 if index.lookup(value).is_some() {
3419 return true;
3420 }
3421 }
3422
3423 for (name, index) in state.lookup_indexes.iter() {
3424 if name == index_name {
3425 continue;
3426 }
3427 if index.lookup(value).is_some() {
3428 return true;
3429 }
3430 }
3431
3432 if let Some(pda_str) = value.as_str() {
3433 if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3434 if pda_lookup.contains(pda_str) {
3435 return true;
3436 }
3437 }
3438 }
3439 }
3440
3441 false
3442 }
3443
3444 fn apply_deferred_when_op(
3445 &mut self,
3446 state_id: u32,
3447 op: &DeferredWhenOperation,
3448 ) -> Result<Vec<Mutation>> {
3449 let state = self.states.get(&state_id).ok_or("State not found")?;
3450
3451 if op.primary_key.is_null() {
3452 return Ok(vec![]);
3453 }
3454
3455 let mut entity_state = state
3456 .get_and_touch(&op.primary_key)
3457 .unwrap_or_else(|| json!({}));
3458
3459 Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3460
3461 state.insert_with_eviction(op.primary_key.clone(), entity_state);
3462
3463 if !op.emit {
3464 return Ok(vec![]);
3465 }
3466
3467 let mut patch = json!({});
3468 Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3469
3470 Ok(vec![Mutation {
3471 export: op.entity_name.clone(),
3472 key: op.primary_key.clone(),
3473 patch,
3474 append: vec![],
3475 }])
3476 }
3477
3478 fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3479 let parts: Vec<&str> = path.split('.').collect();
3480 let mut current = obj;
3481
3482 for (i, part) in parts.iter().enumerate() {
3483 if i == parts.len() - 1 {
3484 if let Some(map) = current.as_object_mut() {
3485 map.insert(part.to_string(), value);
3486 return Ok(());
3487 }
3488 return Err("Cannot set field on non-object".into());
3489 }
3490
3491 if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3492 if let Some(map) = current.as_object_mut() {
3493 map.insert(part.to_string(), json!({}));
3494 }
3495 }
3496
3497 current = current.get_mut(*part).ok_or("Path navigation failed")?;
3498 }
3499
3500 Ok(())
3501 }
3502
3503 pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3504 let now = std::time::SystemTime::now()
3505 .duration_since(std::time::UNIX_EPOCH)
3506 .unwrap()
3507 .as_secs() as i64;
3508
3509 let state = match self.states.get(&state_id) {
3510 Some(s) => s,
3511 None => return 0,
3512 };
3513
3514 let mut removed = 0;
3515 state.deferred_when_ops.retain(|_, ops| {
3516 let before = ops.len();
3517 ops.retain(|op| now - op.deferred_at < max_age_secs);
3518 removed += before - ops.len();
3519 !ops.is_empty()
3520 });
3521
3522 removed
3523 }
3524
3525 #[cfg_attr(feature = "otel", instrument(
3534 name = "vm.update_pda_lookup",
3535 skip(self),
3536 fields(
3537 pda = %pda_address,
3538 seed = %seed_value,
3539 )
3540 ))]
3541 pub fn update_pda_reverse_lookup(
3542 &mut self,
3543 state_id: u32,
3544 lookup_name: &str,
3545 pda_address: String,
3546 seed_value: String,
3547 ) -> Result<Vec<PendingAccountUpdate>> {
3548 let state = self
3549 .states
3550 .get_mut(&state_id)
3551 .ok_or("State table not found")?;
3552
3553 let lookup = state
3554 .pda_reverse_lookups
3555 .entry(lookup_name.to_string())
3556 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3557
3558 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
3559
3560 if let Some(ref evicted) = evicted_pda {
3561 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3562 let count = evicted_updates.len();
3563 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3564 }
3565 }
3566
3567 self.flush_pending_updates(state_id, &pda_address)
3569 }
3570
3571 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3576 let state = match self.states.get_mut(&state_id) {
3577 Some(s) => s,
3578 None => return 0,
3579 };
3580
3581 let now = std::time::SystemTime::now()
3582 .duration_since(std::time::UNIX_EPOCH)
3583 .unwrap()
3584 .as_secs() as i64;
3585
3586 let mut removed_count = 0;
3587
3588 state.pending_updates.retain(|_pda_address, updates| {
3590 let original_len = updates.len();
3591
3592 updates.retain(|update| {
3593 let age = now - update.queued_at;
3594 age <= PENDING_UPDATE_TTL_SECONDS
3595 });
3596
3597 removed_count += original_len - updates.len();
3598
3599 !updates.is_empty()
3601 });
3602
3603 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3605
3606 if removed_count > 0 {
3607 #[cfg(feature = "otel")]
3608 crate::vm_metrics::record_pending_updates_expired(
3609 removed_count as u64,
3610 &state.entity_name,
3611 );
3612 }
3613
3614 removed_count
3615 }
3616
3617 #[cfg_attr(feature = "otel", instrument(
3649 name = "vm.queue_account_update",
3650 skip(self, update),
3651 fields(
3652 pda = %update.pda_address,
3653 account_type = %update.account_type,
3654 slot = update.slot,
3655 )
3656 ))]
3657 pub fn queue_account_update(
3658 &mut self,
3659 state_id: u32,
3660 update: QueuedAccountUpdate,
3661 ) -> Result<()> {
3662 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3663 self.cleanup_expired_pending_updates(state_id);
3664 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3665 self.drop_oldest_pending_update(state_id)?;
3666 }
3667 }
3668
3669 let state = self
3670 .states
3671 .get_mut(&state_id)
3672 .ok_or("State table not found")?;
3673
3674 let pending = PendingAccountUpdate {
3675 account_type: update.account_type,
3676 pda_address: update.pda_address.clone(),
3677 account_data: update.account_data,
3678 slot: update.slot,
3679 write_version: update.write_version,
3680 signature: update.signature,
3681 queued_at: std::time::SystemTime::now()
3682 .duration_since(std::time::UNIX_EPOCH)
3683 .unwrap()
3684 .as_secs() as i64,
3685 };
3686
3687 let pda_address = pending.pda_address.clone();
3688 let slot = pending.slot;
3689
3690 let mut updates = state
3691 .pending_updates
3692 .entry(pda_address.clone())
3693 .or_insert_with(Vec::new);
3694
3695 let original_len = updates.len();
3696 updates.retain(|existing| existing.slot > slot);
3697 let removed_by_dedup = original_len - updates.len();
3698
3699 if removed_by_dedup > 0 {
3700 self.pending_queue_size = self
3701 .pending_queue_size
3702 .saturating_sub(removed_by_dedup as u64);
3703 }
3704
3705 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3706 updates.remove(0);
3707 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3708 }
3709
3710 updates.push(pending);
3711 #[cfg(feature = "otel")]
3712 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3713
3714 Ok(())
3715 }
3716
3717 pub fn queue_instruction_event(
3718 &mut self,
3719 state_id: u32,
3720 event: QueuedInstructionEvent,
3721 ) -> Result<()> {
3722 let state = self
3723 .states
3724 .get_mut(&state_id)
3725 .ok_or("State table not found")?;
3726
3727 let pda_address = event.pda_address.clone();
3728
3729 let pending = PendingInstructionEvent {
3730 event_type: event.event_type,
3731 pda_address: event.pda_address,
3732 event_data: event.event_data,
3733 slot: event.slot,
3734 signature: event.signature,
3735 queued_at: std::time::SystemTime::now()
3736 .duration_since(std::time::UNIX_EPOCH)
3737 .unwrap()
3738 .as_secs() as i64,
3739 };
3740
3741 let mut events = state
3742 .pending_instruction_events
3743 .entry(pda_address)
3744 .or_insert_with(Vec::new);
3745
3746 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3747 events.remove(0);
3748 }
3749
3750 events.push(pending);
3751
3752 Ok(())
3753 }
3754
3755 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
3756 self.last_pda_lookup_miss.take()
3757 }
3758
3759 pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
3760 self.last_lookup_index_miss.take()
3761 }
3762
3763 pub fn take_last_pda_registered(&mut self) -> Option<String> {
3764 self.last_pda_registered.take()
3765 }
3766
3767 pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
3768 std::mem::take(&mut self.last_lookup_index_keys)
3769 }
3770
3771 pub fn flush_pending_instruction_events(
3772 &mut self,
3773 state_id: u32,
3774 pda_address: &str,
3775 ) -> Vec<PendingInstructionEvent> {
3776 let state = match self.states.get_mut(&state_id) {
3777 Some(s) => s,
3778 None => return Vec::new(),
3779 };
3780
3781 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
3782 events
3783 } else {
3784 Vec::new()
3785 }
3786 }
3787
3788 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
3790 let state = self.states.get(&state_id)?;
3791
3792 let now = std::time::SystemTime::now()
3793 .duration_since(std::time::UNIX_EPOCH)
3794 .unwrap()
3795 .as_secs() as i64;
3796
3797 let mut total_updates = 0;
3798 let mut oldest_timestamp = now;
3799 let mut largest_pda_queue = 0;
3800 let mut estimated_memory = 0;
3801
3802 for entry in state.pending_updates.iter() {
3803 let (_, updates) = entry.pair();
3804 total_updates += updates.len();
3805 largest_pda_queue = largest_pda_queue.max(updates.len());
3806
3807 for update in updates.iter() {
3808 oldest_timestamp = oldest_timestamp.min(update.queued_at);
3809 estimated_memory += update.account_type.len() +
3811 update.pda_address.len() +
3812 update.signature.len() +
3813 16 + estimate_json_size(&update.account_data);
3815 }
3816 }
3817
3818 Some(PendingQueueStats {
3819 total_updates,
3820 unique_pdas: state.pending_updates.len(),
3821 oldest_age_seconds: now - oldest_timestamp,
3822 largest_pda_queue_size: largest_pda_queue,
3823 estimated_memory_bytes: estimated_memory,
3824 })
3825 }
3826
3827 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
3828 let mut stats = VmMemoryStats {
3829 path_cache_size: self.path_cache.len(),
3830 ..Default::default()
3831 };
3832
3833 if let Some(state) = self.states.get(&state_id) {
3834 stats.state_table_entity_count = state.data.len();
3835 stats.state_table_max_entries = state.config.max_entries;
3836 stats.state_table_at_capacity = state.is_at_capacity();
3837
3838 stats.lookup_index_count = state.lookup_indexes.len();
3839 stats.lookup_index_total_entries =
3840 state.lookup_indexes.values().map(|idx| idx.len()).sum();
3841
3842 stats.temporal_index_count = state.temporal_indexes.len();
3843 stats.temporal_index_total_entries = state
3844 .temporal_indexes
3845 .values()
3846 .map(|idx| idx.total_entries())
3847 .sum();
3848
3849 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
3850 stats.pda_reverse_lookup_total_entries = state
3851 .pda_reverse_lookups
3852 .values()
3853 .map(|lookup| lookup.len())
3854 .sum();
3855
3856 stats.version_tracker_entries = state.version_tracker.len();
3857
3858 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
3859 }
3860
3861 stats
3862 }
3863
3864 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
3865 let pending_removed = self.cleanup_expired_pending_updates(state_id);
3866 let temporal_removed = self.cleanup_temporal_indexes(state_id);
3867
3868 #[cfg(feature = "otel")]
3869 if let Some(state) = self.states.get(&state_id) {
3870 crate::vm_metrics::record_cleanup(
3871 pending_removed,
3872 temporal_removed,
3873 &state.entity_name,
3874 );
3875 }
3876
3877 CleanupResult {
3878 pending_updates_removed: pending_removed,
3879 temporal_entries_removed: temporal_removed,
3880 }
3881 }
3882
3883 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
3884 let state = match self.states.get_mut(&state_id) {
3885 Some(s) => s,
3886 None => return 0,
3887 };
3888
3889 let now = std::time::SystemTime::now()
3890 .duration_since(std::time::UNIX_EPOCH)
3891 .unwrap()
3892 .as_secs() as i64;
3893
3894 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
3895 let mut total_removed = 0;
3896
3897 for (_, index) in state.temporal_indexes.iter_mut() {
3898 total_removed += index.cleanup_expired(cutoff);
3899 }
3900
3901 total_removed
3902 }
3903
3904 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
3905 let state = self.states.get(&state_id)?;
3906
3907 if state.is_at_capacity() {
3908 Some(CapacityWarning {
3909 current_entries: state.data.len(),
3910 max_entries: state.config.max_entries,
3911 entries_over_limit: state.entries_over_limit(),
3912 })
3913 } else {
3914 None
3915 }
3916 }
3917
3918 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
3920 let state = self
3921 .states
3922 .get_mut(&state_id)
3923 .ok_or("State table not found")?;
3924
3925 let mut oldest_pda: Option<String> = None;
3926 let mut oldest_timestamp = i64::MAX;
3927
3928 for entry in state.pending_updates.iter() {
3930 let (pda, updates) = entry.pair();
3931 if let Some(update) = updates.first() {
3932 if update.queued_at < oldest_timestamp {
3933 oldest_timestamp = update.queued_at;
3934 oldest_pda = Some(pda.clone());
3935 }
3936 }
3937 }
3938
3939 if let Some(pda) = oldest_pda {
3941 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
3942 if !updates.is_empty() {
3943 updates.remove(0);
3944 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3945
3946 if updates.is_empty() {
3948 drop(updates);
3949 state.pending_updates.remove(&pda);
3950 }
3951 }
3952 }
3953 }
3954
3955 Ok(())
3956 }
3957
3958 fn flush_pending_updates(
3963 &mut self,
3964 state_id: u32,
3965 pda_address: &str,
3966 ) -> Result<Vec<PendingAccountUpdate>> {
3967 let state = self
3968 .states
3969 .get_mut(&state_id)
3970 .ok_or("State table not found")?;
3971
3972 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
3973 let count = pending_updates.len();
3974 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3975 #[cfg(feature = "otel")]
3976 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
3977 Ok(pending_updates)
3978 } else {
3979 Ok(Vec::new())
3980 }
3981 }
3982
3983 pub fn try_pda_reverse_lookup(
3985 &mut self,
3986 state_id: u32,
3987 lookup_name: &str,
3988 pda_address: &str,
3989 ) -> Option<String> {
3990 let state = self.states.get_mut(&state_id)?;
3991
3992 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
3993 if let Some(value) = lookup.lookup(pda_address) {
3994 self.pda_cache_hits += 1;
3995 return Some(value);
3996 }
3997 }
3998
3999 self.pda_cache_misses += 1;
4000 None
4001 }
4002
4003 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4010 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4011 }
4012
4013 fn evaluate_computed_expr_with_env(
4015 &self,
4016 expr: &ComputedExpr,
4017 state: &Value,
4018 env: &std::collections::HashMap<String, Value>,
4019 ) -> Result<Value> {
4020 match expr {
4021 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4022
4023 ComputedExpr::Var { name } => env
4024 .get(name)
4025 .cloned()
4026 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4027
4028 ComputedExpr::Let { name, value, body } => {
4029 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4030 let mut new_env = env.clone();
4031 new_env.insert(name.clone(), val);
4032 self.evaluate_computed_expr_with_env(body, state, &new_env)
4033 }
4034
4035 ComputedExpr::If {
4036 condition,
4037 then_branch,
4038 else_branch,
4039 } => {
4040 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4041 if self.value_to_bool(&cond_val) {
4042 self.evaluate_computed_expr_with_env(then_branch, state, env)
4043 } else {
4044 self.evaluate_computed_expr_with_env(else_branch, state, env)
4045 }
4046 }
4047
4048 ComputedExpr::None => Ok(Value::Null),
4049
4050 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4051
4052 ComputedExpr::Slice { expr, start, end } => {
4053 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4054 match val {
4055 Value::Array(arr) => {
4056 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4057 Ok(Value::Array(slice))
4058 }
4059 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4060 }
4061 }
4062
4063 ComputedExpr::Index { expr, index } => {
4064 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4065 match val {
4066 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4067 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4068 }
4069 }
4070
4071 ComputedExpr::U64FromLeBytes { bytes } => {
4072 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4073 let byte_vec = self.value_to_bytes(&val)?;
4074 if byte_vec.len() < 8 {
4075 return Err(format!(
4076 "u64::from_le_bytes requires 8 bytes, got {}",
4077 byte_vec.len()
4078 )
4079 .into());
4080 }
4081 let arr: [u8; 8] = byte_vec[..8]
4082 .try_into()
4083 .map_err(|_| "Failed to convert to [u8; 8]")?;
4084 Ok(json!(u64::from_le_bytes(arr)))
4085 }
4086
4087 ComputedExpr::U64FromBeBytes { bytes } => {
4088 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4089 let byte_vec = self.value_to_bytes(&val)?;
4090 if byte_vec.len() < 8 {
4091 return Err(format!(
4092 "u64::from_be_bytes requires 8 bytes, got {}",
4093 byte_vec.len()
4094 )
4095 .into());
4096 }
4097 let arr: [u8; 8] = byte_vec[..8]
4098 .try_into()
4099 .map_err(|_| "Failed to convert to [u8; 8]")?;
4100 Ok(json!(u64::from_be_bytes(arr)))
4101 }
4102
4103 ComputedExpr::ByteArray { bytes } => {
4104 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4105 }
4106
4107 ComputedExpr::Closure { param, body } => {
4108 Ok(json!({
4111 "__closure": {
4112 "param": param,
4113 "body": serde_json::to_value(body).unwrap_or(Value::Null)
4114 }
4115 }))
4116 }
4117
4118 ComputedExpr::Unary { op, expr } => {
4119 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4120 self.apply_unary_op(op, &val)
4121 }
4122
4123 ComputedExpr::JsonToBytes { expr } => {
4124 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4125 let bytes = self.value_to_bytes(&val)?;
4127 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4128 }
4129
4130 ComputedExpr::UnwrapOr { expr, default } => {
4131 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4132 if val.is_null() {
4133 Ok(default.clone())
4134 } else {
4135 Ok(val)
4136 }
4137 }
4138
4139 ComputedExpr::Binary { op, left, right } => {
4140 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4141 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4142 self.apply_binary_op(op, &l, &r)
4143 }
4144
4145 ComputedExpr::Cast { expr, to_type } => {
4146 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4147 self.apply_cast(&val, to_type)
4148 }
4149
4150 ComputedExpr::ResolverComputed {
4151 resolver,
4152 method,
4153 args,
4154 } => {
4155 let evaluated_args: Vec<Value> = args
4156 .iter()
4157 .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4158 .collect::<Result<Vec<_>>>()?;
4159 crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4160 }
4161
4162 ComputedExpr::MethodCall { expr, method, args } => {
4163 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4164 if method == "map" && args.len() == 1 {
4166 if let ComputedExpr::Closure { param, body } = &args[0] {
4167 if val.is_null() {
4169 return Ok(Value::Null);
4170 }
4171
4172 if let Value::Array(arr) = &val {
4173 let results: Result<Vec<Value>> = arr
4174 .iter()
4175 .map(|elem| {
4176 let mut closure_env = env.clone();
4177 closure_env.insert(param.clone(), elem.clone());
4178 self.evaluate_computed_expr_with_env(body, state, &closure_env)
4179 })
4180 .collect();
4181 return Ok(Value::Array(results?));
4182 }
4183
4184 let mut closure_env = env.clone();
4185 closure_env.insert(param.clone(), val);
4186 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4187 }
4188 }
4189 let evaluated_args: Vec<Value> = args
4190 .iter()
4191 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4192 .collect::<Result<Vec<_>>>()?;
4193 self.apply_method_call(&val, method, &evaluated_args)
4194 }
4195
4196 ComputedExpr::Literal { value } => Ok(value.clone()),
4197
4198 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4199
4200 ComputedExpr::ContextSlot => Ok(self
4201 .current_context
4202 .as_ref()
4203 .and_then(|ctx| ctx.slot)
4204 .map(|s| json!(s))
4205 .unwrap_or(Value::Null)),
4206
4207 ComputedExpr::ContextTimestamp => Ok(self
4208 .current_context
4209 .as_ref()
4210 .map(|ctx| json!(ctx.timestamp()))
4211 .unwrap_or(Value::Null)),
4212 }
4213 }
4214
4215 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4217 match val {
4218 Value::Array(arr) => arr
4219 .iter()
4220 .map(|v| {
4221 v.as_u64()
4222 .map(|n| n as u8)
4223 .ok_or_else(|| "Array element not a valid byte".into())
4224 })
4225 .collect(),
4226 Value::String(s) => {
4227 if s.starts_with("0x") || s.starts_with("0X") {
4229 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4230 } else {
4231 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4232 }
4233 }
4234 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4235 }
4236 }
4237
4238 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4240 use crate::ast::UnaryOp;
4241 match op {
4242 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4243 UnaryOp::ReverseBits => match val.as_u64() {
4244 Some(n) => Ok(json!(n.reverse_bits())),
4245 None => match val.as_i64() {
4246 Some(n) => Ok(json!((n as u64).reverse_bits())),
4247 None => Err("reverse_bits requires an integer".into()),
4248 },
4249 },
4250 }
4251 }
4252
4253 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4255 let segments: Vec<&str> = path.split('.').collect();
4256 let mut current = state;
4257
4258 for segment in segments {
4259 match current.get(segment) {
4260 Some(v) => current = v,
4261 None => return Ok(Value::Null),
4262 }
4263 }
4264
4265 Ok(current.clone())
4266 }
4267
4268 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4270 match op {
4271 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4273 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4274 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4275 BinaryOp::Div => {
4276 if let Some(r) = right.as_i64() {
4278 if r == 0 {
4279 return Err("Division by zero".into());
4280 }
4281 }
4282 if let Some(r) = right.as_f64() {
4283 if r == 0.0 {
4284 return Err("Division by zero".into());
4285 }
4286 }
4287 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4288 }
4289 BinaryOp::Mod => {
4290 match (left.as_i64(), right.as_i64()) {
4292 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4293 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4294 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4295 _ => Err("Modulo requires non-zero integer operands".into()),
4296 },
4297 _ => Err("Modulo by zero".into()),
4298 }
4299 }
4300
4301 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4303 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4304 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4305 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4306 BinaryOp::Eq => Ok(json!(left == right)),
4307 BinaryOp::Ne => Ok(json!(left != right)),
4308
4309 BinaryOp::And => {
4311 let l_bool = self.value_to_bool(left);
4312 let r_bool = self.value_to_bool(right);
4313 Ok(json!(l_bool && r_bool))
4314 }
4315 BinaryOp::Or => {
4316 let l_bool = self.value_to_bool(left);
4317 let r_bool = self.value_to_bool(right);
4318 Ok(json!(l_bool || r_bool))
4319 }
4320
4321 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4323 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4324 _ => match (left.as_i64(), right.as_i64()) {
4325 (Some(a), Some(b)) => Ok(json!(a ^ b)),
4326 _ => Err("XOR requires integer operands".into()),
4327 },
4328 },
4329 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4330 (Some(a), Some(b)) => Ok(json!(a & b)),
4331 _ => match (left.as_i64(), right.as_i64()) {
4332 (Some(a), Some(b)) => Ok(json!(a & b)),
4333 _ => Err("BitAnd requires integer operands".into()),
4334 },
4335 },
4336 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4337 (Some(a), Some(b)) => Ok(json!(a | b)),
4338 _ => match (left.as_i64(), right.as_i64()) {
4339 (Some(a), Some(b)) => Ok(json!(a | b)),
4340 _ => Err("BitOr requires integer operands".into()),
4341 },
4342 },
4343 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4344 (Some(a), Some(b)) => Ok(json!(a << b)),
4345 _ => match (left.as_i64(), right.as_i64()) {
4346 (Some(a), Some(b)) => Ok(json!(a << b)),
4347 _ => Err("Shl requires integer operands".into()),
4348 },
4349 },
4350 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4351 (Some(a), Some(b)) => Ok(json!(a >> b)),
4352 _ => match (left.as_i64(), right.as_i64()) {
4353 (Some(a), Some(b)) => Ok(json!(a >> b)),
4354 _ => Err("Shr requires integer operands".into()),
4355 },
4356 },
4357 }
4358 }
4359
4360 fn numeric_op<F1, F2>(
4362 &self,
4363 left: &Value,
4364 right: &Value,
4365 int_op: F1,
4366 float_op: F2,
4367 ) -> Result<Value>
4368 where
4369 F1: Fn(i64, i64) -> i64,
4370 F2: Fn(f64, f64) -> f64,
4371 {
4372 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4374 return Ok(json!(int_op(a, b)));
4375 }
4376
4377 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4379 return Ok(json!(int_op(a as i64, b as i64)));
4381 }
4382
4383 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4385 return Ok(json!(float_op(a, b)));
4386 }
4387
4388 if left.is_null() || right.is_null() {
4390 return Ok(Value::Null);
4391 }
4392
4393 Err(format!(
4394 "Cannot perform numeric operation on {:?} and {:?}",
4395 left, right
4396 )
4397 .into())
4398 }
4399
4400 fn comparison_op<F1, F2>(
4402 &self,
4403 left: &Value,
4404 right: &Value,
4405 int_cmp: F1,
4406 float_cmp: F2,
4407 ) -> Result<Value>
4408 where
4409 F1: Fn(i64, i64) -> bool,
4410 F2: Fn(f64, f64) -> bool,
4411 {
4412 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4414 return Ok(json!(int_cmp(a, b)));
4415 }
4416
4417 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4419 return Ok(json!(int_cmp(a as i64, b as i64)));
4420 }
4421
4422 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4424 return Ok(json!(float_cmp(a, b)));
4425 }
4426
4427 if left.is_null() || right.is_null() {
4429 return Ok(json!(false));
4430 }
4431
4432 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4433 }
4434
4435 fn value_to_bool(&self, value: &Value) -> bool {
4437 match value {
4438 Value::Null => false,
4439 Value::Bool(b) => *b,
4440 Value::Number(n) => {
4441 if let Some(i) = n.as_i64() {
4442 i != 0
4443 } else if let Some(f) = n.as_f64() {
4444 f != 0.0
4445 } else {
4446 true
4447 }
4448 }
4449 Value::String(s) => !s.is_empty(),
4450 Value::Array(arr) => !arr.is_empty(),
4451 Value::Object(obj) => !obj.is_empty(),
4452 }
4453 }
4454
4455 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4457 match to_type {
4458 "i8" | "i16" | "i32" | "i64" | "isize" => {
4459 if let Some(n) = value.as_i64() {
4460 Ok(json!(n))
4461 } else if let Some(n) = value.as_u64() {
4462 Ok(json!(n as i64))
4463 } else if let Some(n) = value.as_f64() {
4464 Ok(json!(n as i64))
4465 } else if let Some(s) = value.as_str() {
4466 s.parse::<i64>()
4467 .map(|n| json!(n))
4468 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4469 } else {
4470 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4471 }
4472 }
4473 "u8" | "u16" | "u32" | "u64" | "usize" => {
4474 if let Some(n) = value.as_u64() {
4475 Ok(json!(n))
4476 } else if let Some(n) = value.as_i64() {
4477 Ok(json!(n as u64))
4478 } else if let Some(n) = value.as_f64() {
4479 Ok(json!(n as u64))
4480 } else if let Some(s) = value.as_str() {
4481 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4482 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4483 })
4484 } else {
4485 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4486 }
4487 }
4488 "f32" | "f64" => {
4489 if let Some(n) = value.as_f64() {
4490 Ok(json!(n))
4491 } else if let Some(n) = value.as_i64() {
4492 Ok(json!(n as f64))
4493 } else if let Some(n) = value.as_u64() {
4494 Ok(json!(n as f64))
4495 } else if let Some(s) = value.as_str() {
4496 s.parse::<f64>()
4497 .map(|n| json!(n))
4498 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4499 } else {
4500 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4501 }
4502 }
4503 "String" | "string" => Ok(json!(value.to_string())),
4504 "bool" => Ok(json!(self.value_to_bool(value))),
4505 _ => {
4506 Ok(value.clone())
4508 }
4509 }
4510 }
4511
4512 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4514 match method {
4515 "unwrap_or" => {
4516 if value.is_null() && !args.is_empty() {
4517 Ok(args[0].clone())
4518 } else {
4519 Ok(value.clone())
4520 }
4521 }
4522 "unwrap_or_default" => {
4523 if value.is_null() {
4524 Ok(json!(0))
4526 } else {
4527 Ok(value.clone())
4528 }
4529 }
4530 "is_some" => Ok(json!(!value.is_null())),
4531 "is_none" => Ok(json!(value.is_null())),
4532 "abs" => {
4533 if let Some(n) = value.as_i64() {
4534 Ok(json!(n.abs()))
4535 } else if let Some(n) = value.as_f64() {
4536 Ok(json!(n.abs()))
4537 } else {
4538 Err(format!("Cannot call abs() on {:?}", value).into())
4539 }
4540 }
4541 "len" => {
4542 if let Some(s) = value.as_str() {
4543 Ok(json!(s.len()))
4544 } else if let Some(arr) = value.as_array() {
4545 Ok(json!(arr.len()))
4546 } else if let Some(obj) = value.as_object() {
4547 Ok(json!(obj.len()))
4548 } else {
4549 Err(format!("Cannot call len() on {:?}", value).into())
4550 }
4551 }
4552 "to_string" => Ok(json!(value.to_string())),
4553 "min" => {
4554 if args.is_empty() {
4555 return Err("min() requires an argument".into());
4556 }
4557 let other = &args[0];
4558 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4559 Ok(json!(a.min(b)))
4560 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4561 Ok(json!(a.min(b)))
4562 } else {
4563 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4564 }
4565 }
4566 "max" => {
4567 if args.is_empty() {
4568 return Err("max() requires an argument".into());
4569 }
4570 let other = &args[0];
4571 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4572 Ok(json!(a.max(b)))
4573 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4574 Ok(json!(a.max(b)))
4575 } else {
4576 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4577 }
4578 }
4579 "saturating_add" => {
4580 if args.is_empty() {
4581 return Err("saturating_add() requires an argument".into());
4582 }
4583 let other = &args[0];
4584 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4585 Ok(json!(a.saturating_add(b)))
4586 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4587 Ok(json!(a.saturating_add(b)))
4588 } else {
4589 Err(format!(
4590 "Cannot call saturating_add() on {:?} and {:?}",
4591 value, other
4592 )
4593 .into())
4594 }
4595 }
4596 "saturating_sub" => {
4597 if args.is_empty() {
4598 return Err("saturating_sub() requires an argument".into());
4599 }
4600 let other = &args[0];
4601 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4602 Ok(json!(a.saturating_sub(b)))
4603 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4604 Ok(json!(a.saturating_sub(b)))
4605 } else {
4606 Err(format!(
4607 "Cannot call saturating_sub() on {:?} and {:?}",
4608 value, other
4609 )
4610 .into())
4611 }
4612 }
4613 _ => Err(format!("Unknown method call: {}()", method).into()),
4614 }
4615 }
4616
4617 pub fn evaluate_computed_fields_from_ast(
4620 &self,
4621 state: &mut Value,
4622 computed_field_specs: &[ComputedFieldSpec],
4623 ) -> Result<Vec<String>> {
4624 let mut updated_paths = Vec::new();
4625
4626 crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
4627
4628 for spec in computed_field_specs {
4629 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
4630 self.set_field_in_state(state, &spec.target_path, result)?;
4631 updated_paths.push(spec.target_path.clone());
4632 }
4633 }
4634
4635 Ok(updated_paths)
4636 }
4637
4638 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
4640 let segments: Vec<&str> = path.split('.').collect();
4641
4642 if segments.is_empty() {
4643 return Err("Empty path".into());
4644 }
4645
4646 let mut current = state;
4648 for (i, segment) in segments.iter().enumerate() {
4649 if i == segments.len() - 1 {
4650 if let Some(obj) = current.as_object_mut() {
4652 obj.insert(segment.to_string(), value);
4653 return Ok(());
4654 } else {
4655 return Err(format!("Cannot set field '{}' on non-object", segment).into());
4656 }
4657 } else {
4658 if !current.is_object() {
4660 *current = json!({});
4661 }
4662 let obj = current.as_object_mut().unwrap();
4663 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
4664 }
4665 }
4666
4667 Ok(())
4668 }
4669
4670 pub fn create_evaluator_from_specs(
4673 specs: Vec<ComputedFieldSpec>,
4674 ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
4675 move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
4676 let mut vm = VmContext::new();
4677 vm.current_context = Some(UpdateContext {
4678 slot: context_slot,
4679 timestamp: Some(context_timestamp),
4680 ..Default::default()
4681 });
4682 vm.evaluate_computed_fields_from_ast(state, &specs)?;
4683 Ok(())
4684 }
4685 }
4686}
4687
4688impl Default for VmContext {
4689 fn default() -> Self {
4690 Self::new()
4691 }
4692}
4693
4694impl crate::resolvers::ReverseLookupUpdater for VmContext {
4696 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
4697 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
4699 .unwrap_or_else(|e| {
4700 tracing::error!("Failed to update PDA reverse lookup: {}", e);
4701 Vec::new()
4702 })
4703 }
4704
4705 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
4706 self.flush_pending_updates(0, pda_address)
4708 .unwrap_or_else(|e| {
4709 tracing::error!("Failed to flush pending updates: {}", e);
4710 Vec::new()
4711 })
4712 }
4713}
4714
4715#[cfg(test)]
4716mod tests {
4717 use super::*;
4718 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
4719
4720 #[test]
4721 fn test_computed_field_preserves_integer_type() {
4722 let vm = VmContext::new();
4723
4724 let mut state = serde_json::json!({
4725 "trading": {
4726 "total_buy_volume": 20000000000_i64,
4727 "total_sell_volume": 17951316474_i64
4728 }
4729 });
4730
4731 let spec = ComputedFieldSpec {
4732 target_path: "trading.total_volume".to_string(),
4733 result_type: "Option<u64>".to_string(),
4734 expression: ComputedExpr::Binary {
4735 op: BinaryOp::Add,
4736 left: Box::new(ComputedExpr::UnwrapOr {
4737 expr: Box::new(ComputedExpr::FieldRef {
4738 path: "trading.total_buy_volume".to_string(),
4739 }),
4740 default: serde_json::json!(0),
4741 }),
4742 right: Box::new(ComputedExpr::UnwrapOr {
4743 expr: Box::new(ComputedExpr::FieldRef {
4744 path: "trading.total_sell_volume".to_string(),
4745 }),
4746 default: serde_json::json!(0),
4747 }),
4748 },
4749 };
4750
4751 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
4752 .unwrap();
4753
4754 let total_volume = state
4755 .get("trading")
4756 .and_then(|t| t.get("total_volume"))
4757 .expect("total_volume should exist");
4758
4759 let serialized = serde_json::to_string(total_volume).unwrap();
4760 assert!(
4761 !serialized.contains('.'),
4762 "Integer should not have decimal point: {}",
4763 serialized
4764 );
4765 assert_eq!(
4766 total_volume.as_i64(),
4767 Some(37951316474),
4768 "Value should be correct sum"
4769 );
4770 }
4771
4772 #[test]
4773 fn test_set_field_sum_preserves_integer_type() {
4774 let mut vm = VmContext::new();
4775 vm.registers[0] = serde_json::json!({});
4776 vm.registers[1] = serde_json::json!(20000000000_i64);
4777 vm.registers[2] = serde_json::json!(17951316474_i64);
4778
4779 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
4780 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
4781
4782 let state = &vm.registers[0];
4783 let buy_vol = state
4784 .get("trading")
4785 .and_then(|t| t.get("total_buy_volume"))
4786 .unwrap();
4787 let sell_vol = state
4788 .get("trading")
4789 .and_then(|t| t.get("total_sell_volume"))
4790 .unwrap();
4791
4792 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
4793 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
4794
4795 assert!(
4796 !buy_serialized.contains('.'),
4797 "Buy volume should not have decimal: {}",
4798 buy_serialized
4799 );
4800 assert!(
4801 !sell_serialized.contains('.'),
4802 "Sell volume should not have decimal: {}",
4803 sell_serialized
4804 );
4805 }
4806
4807 #[test]
4808 fn test_lookup_index_chaining() {
4809 let mut vm = VmContext::new();
4810
4811 let state = vm.states.get_mut(&0).unwrap();
4812
4813 state
4814 .pda_reverse_lookups
4815 .entry("default_pda_lookup".to_string())
4816 .or_insert_with(|| PdaReverseLookup::new(1000))
4817 .insert("pda_123".to_string(), "addr_456".to_string());
4818
4819 state
4820 .lookup_indexes
4821 .entry("round_address_lookup_index".to_string())
4822 .or_insert_with(LookupIndex::new)
4823 .insert(json!("addr_456"), json!(789));
4824
4825 let handler = vec![
4826 OpCode::LoadConstant {
4827 value: json!("pda_123"),
4828 dest: 0,
4829 },
4830 OpCode::LookupIndex {
4831 state_id: 0,
4832 index_name: "round_address_lookup_index".to_string(),
4833 lookup_value: 0,
4834 dest: 1,
4835 },
4836 ];
4837
4838 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4839 .unwrap();
4840
4841 assert_eq!(vm.registers[1], json!(789));
4842 }
4843
4844 #[test]
4845 fn test_lookup_index_no_chain() {
4846 let mut vm = VmContext::new();
4847
4848 let state = vm.states.get_mut(&0).unwrap();
4849 state
4850 .lookup_indexes
4851 .entry("test_index".to_string())
4852 .or_insert_with(LookupIndex::new)
4853 .insert(json!("key_abc"), json!(42));
4854
4855 let handler = vec![
4856 OpCode::LoadConstant {
4857 value: json!("key_abc"),
4858 dest: 0,
4859 },
4860 OpCode::LookupIndex {
4861 state_id: 0,
4862 index_name: "test_index".to_string(),
4863 lookup_value: 0,
4864 dest: 1,
4865 },
4866 ];
4867
4868 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4869 .unwrap();
4870
4871 assert_eq!(vm.registers[1], json!(42));
4872 }
4873
4874 #[test]
4875 fn test_conditional_set_field_with_zero_array() {
4876 let mut vm = VmContext::new();
4877
4878 let event_zeros = json!({
4879 "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4880 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
4881 });
4882
4883 let event_nonzero = json!({
4884 "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
4885 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
4886 });
4887
4888 let zero_32: Value = json!([
4889 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4890 0, 0, 0
4891 ]);
4892
4893 let handler = vec![
4894 OpCode::CreateObject { dest: 2 },
4895 OpCode::LoadEventField {
4896 path: FieldPath::new(&["value"]),
4897 dest: 10,
4898 default: None,
4899 },
4900 OpCode::ConditionalSetField {
4901 object: 2,
4902 path: "captured_value".to_string(),
4903 value: 10,
4904 condition_field: FieldPath::new(&["value"]),
4905 condition_op: ComparisonOp::NotEqual,
4906 condition_value: zero_32,
4907 },
4908 ];
4909
4910 vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
4911 .unwrap();
4912 assert!(
4913 vm.registers[2].get("captured_value").is_none(),
4914 "Field should not be set when value is all zeros"
4915 );
4916
4917 vm.reset_registers();
4918 vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
4919 .unwrap();
4920 assert!(
4921 vm.registers[2].get("captured_value").is_some(),
4922 "Field should be set when value is non-zero"
4923 );
4924 }
4925
4926 #[test]
4927 fn test_when_instruction_arrives_first() {
4928 let mut vm = VmContext::new();
4929
4930 let signature = "test_sig_123".to_string();
4931
4932 {
4933 let state = vm.states.get(&0).unwrap();
4934 let mut cache = state.recent_tx_instructions.lock().unwrap();
4935 let mut set = HashSet::new();
4936 set.insert("RevealIxState".to_string());
4937 cache.put(signature.clone(), set);
4938 }
4939
4940 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4941
4942 let handler = vec![
4943 OpCode::CreateObject { dest: 2 },
4944 OpCode::LoadConstant {
4945 value: json!("primary_key_value"),
4946 dest: 1,
4947 },
4948 OpCode::LoadConstant {
4949 value: json!("the_revealed_value"),
4950 dest: 10,
4951 },
4952 OpCode::SetFieldWhen {
4953 object: 2,
4954 path: "entropy_value".to_string(),
4955 value: 10,
4956 when_instruction: "RevealIxState".to_string(),
4957 entity_name: "TestEntity".to_string(),
4958 key_reg: 1,
4959 condition_field: None,
4960 condition_op: None,
4961 condition_value: None,
4962 },
4963 ];
4964
4965 vm.execute_handler(
4966 &handler,
4967 &json!({}),
4968 "VarState",
4969 0,
4970 "TestEntity",
4971 None,
4972 None,
4973 )
4974 .unwrap();
4975
4976 assert_eq!(
4977 vm.registers[2].get("entropy_value").unwrap(),
4978 "the_revealed_value",
4979 "Field should be set when instruction was already seen"
4980 );
4981 }
4982
4983 #[test]
4984 fn test_when_account_arrives_first() {
4985 let mut vm = VmContext::new();
4986
4987 let signature = "test_sig_456".to_string();
4988
4989 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4990
4991 let handler = vec![
4992 OpCode::CreateObject { dest: 2 },
4993 OpCode::LoadConstant {
4994 value: json!("pk_123"),
4995 dest: 1,
4996 },
4997 OpCode::LoadConstant {
4998 value: json!("deferred_value"),
4999 dest: 10,
5000 },
5001 OpCode::SetFieldWhen {
5002 object: 2,
5003 path: "entropy_value".to_string(),
5004 value: 10,
5005 when_instruction: "RevealIxState".to_string(),
5006 entity_name: "TestEntity".to_string(),
5007 key_reg: 1,
5008 condition_field: None,
5009 condition_op: None,
5010 condition_value: None,
5011 },
5012 ];
5013
5014 vm.execute_handler(
5015 &handler,
5016 &json!({}),
5017 "VarState",
5018 0,
5019 "TestEntity",
5020 None,
5021 None,
5022 )
5023 .unwrap();
5024
5025 assert!(
5026 vm.registers[2].get("entropy_value").is_none(),
5027 "Field should not be set when instruction hasn't been seen"
5028 );
5029
5030 let state = vm.states.get(&0).unwrap();
5031 let key = (signature.clone(), "RevealIxState".to_string());
5032 assert!(
5033 state.deferred_when_ops.contains_key(&key),
5034 "Operation should be queued"
5035 );
5036
5037 {
5038 let mut cache = state.recent_tx_instructions.lock().unwrap();
5039 let mut set = HashSet::new();
5040 set.insert("RevealIxState".to_string());
5041 cache.put(signature.clone(), set);
5042 }
5043
5044 let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5045 for op in deferred {
5046 vm.apply_deferred_when_op(0, &op).unwrap();
5047 }
5048
5049 let state = vm.states.get(&0).unwrap();
5050 let entity = state.data.get(&json!("pk_123")).unwrap();
5051 assert_eq!(
5052 entity.get("entropy_value").unwrap(),
5053 "deferred_value",
5054 "Field should be set after instruction arrives"
5055 );
5056 }
5057
5058 #[test]
5059 fn test_when_cleanup_expired() {
5060 let mut vm = VmContext::new();
5061
5062 let state = vm.states.get(&0).unwrap();
5063 let key = ("old_sig".to_string(), "SomeIxState".to_string());
5064 state.deferred_when_ops.insert(
5065 key,
5066 vec![DeferredWhenOperation {
5067 entity_name: "Test".to_string(),
5068 primary_key: json!("pk"),
5069 field_path: "field".to_string(),
5070 field_value: json!("value"),
5071 when_instruction: "SomeIxState".to_string(),
5072 signature: "old_sig".to_string(),
5073 slot: 0,
5074 deferred_at: 0,
5075 emit: true,
5076 }],
5077 );
5078
5079 let removed = vm.cleanup_expired_when_ops(0, 60);
5080
5081 assert_eq!(removed, 1, "Should have removed 1 expired op");
5082 assert!(
5083 vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5084 "Deferred ops should be empty after cleanup"
5085 );
5086 }
5087}