1use crate::ast::{BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation};
2use crate::compiler::{MultiEntityBytecode, OpCode};
3use crate::Mutation;
4use dashmap::DashMap;
5use lru::LruCache;
6use serde_json::{json, Value};
7use std::collections::{HashMap, HashSet};
8use std::num::NonZeroUsize;
9
10#[cfg(feature = "otel")]
11use tracing::instrument;
12
13#[derive(Debug, Clone, Default)]
16pub struct UpdateContext {
17 pub slot: Option<u64>,
19 pub signature: Option<String>,
21 pub timestamp: Option<i64>,
24 pub metadata: HashMap<String, Value>,
26}
27
28impl UpdateContext {
29 pub fn new(slot: u64, signature: String) -> Self {
31 Self {
32 slot: Some(slot),
33 signature: Some(signature),
34 timestamp: None,
35 metadata: HashMap::new(),
36 }
37 }
38
39 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
41 Self {
42 slot: Some(slot),
43 signature: Some(signature),
44 timestamp: Some(timestamp),
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn timestamp(&self) -> i64 {
51 self.timestamp.unwrap_or_else(|| {
52 std::time::SystemTime::now()
53 .duration_since(std::time::UNIX_EPOCH)
54 .unwrap()
55 .as_secs() as i64
56 })
57 }
58
59 pub fn empty() -> Self {
61 Self::default()
62 }
63
64 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
66 self.metadata.insert(key, value);
67 self
68 }
69
70 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
72 self.metadata.get(key)
73 }
74
75 pub fn to_value(&self) -> Value {
77 let mut obj = serde_json::Map::new();
78 if let Some(slot) = self.slot {
79 obj.insert("slot".to_string(), json!(slot));
80 }
81 if let Some(ref sig) = self.signature {
82 obj.insert("signature".to_string(), json!(sig));
83 }
84 obj.insert("timestamp".to_string(), json!(self.timestamp()));
86 for (key, value) in &self.metadata {
87 obj.insert(key.clone(), value.clone());
88 }
89 Value::Object(obj)
90 }
91}
92
93pub type Register = usize;
94pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
95
96pub type RegisterValue = Value;
97
98pub trait ComputedFieldsEvaluator {
101 fn evaluate(&self, state: &mut Value) -> Result<()>;
102}
103
104const MAX_PENDING_UPDATES_TOTAL: usize = 10_000;
106const MAX_PENDING_UPDATES_PER_PDA: usize = 10;
107const PENDING_UPDATE_TTL_SECONDS: i64 = 300; fn estimate_json_size(value: &Value) -> usize {
111 match value {
112 Value::Null => 4,
113 Value::Bool(_) => 5,
114 Value::Number(_) => 8,
115 Value::String(s) => s.len() + 2,
116 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
117 Value::Object(obj) => {
118 2 + obj
119 .iter()
120 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
121 .sum::<usize>()
122 }
123 }
124}
125
126#[derive(Debug, Clone)]
127pub struct CompiledPath {
128 pub segments: std::sync::Arc<[String]>,
129}
130
131impl CompiledPath {
132 pub fn new(path: &str) -> Self {
133 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
134 CompiledPath {
135 segments: segments.into(),
136 }
137 }
138
139 fn segments(&self) -> &[String] {
140 &self.segments
141 }
142}
143
144pub struct VmContext {
145 registers: Vec<RegisterValue>,
146 states: HashMap<u32, StateTable>,
147 pub instructions_executed: u64,
148 pub cache_hits: u64,
149 path_cache: HashMap<String, CompiledPath>,
150 pub pda_cache_hits: u64,
151 pub pda_cache_misses: u64,
152 pub pending_queue_size: u64,
153 current_context: Option<UpdateContext>,
155}
156
157#[derive(Debug)]
158pub struct LookupIndex {
159 index: DashMap<Value, Value>,
160}
161
162impl Default for LookupIndex {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168impl LookupIndex {
169 pub fn new() -> Self {
170 LookupIndex {
171 index: DashMap::new(),
172 }
173 }
174
175 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
176 self.index.get(lookup_value).map(|v| v.clone())
177 }
178
179 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
180 self.index.insert(lookup_value, primary_key);
181 }
182
183 pub fn len(&self) -> usize {
184 self.index.len()
185 }
186
187 pub fn is_empty(&self) -> bool {
188 self.index.is_empty()
189 }
190}
191
192#[derive(Debug)]
193pub struct TemporalIndex {
194 index: DashMap<Value, Vec<(Value, i64)>>,
195}
196
197impl Default for TemporalIndex {
198 fn default() -> Self {
199 Self::new()
200 }
201}
202
203impl TemporalIndex {
204 pub fn new() -> Self {
205 TemporalIndex {
206 index: DashMap::new(),
207 }
208 }
209
210 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
211 if let Some(entries) = self.index.get(lookup_value) {
212 let entries_vec = entries.value();
213 for i in (0..entries_vec.len()).rev() {
214 if entries_vec[i].1 <= timestamp {
215 return Some(entries_vec[i].0.clone());
216 }
217 }
218 }
219 None
220 }
221
222 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
223 if let Some(entries) = self.index.get(lookup_value) {
224 let entries_vec = entries.value();
225 if let Some(last) = entries_vec.last() {
226 return Some(last.0.clone());
227 }
228 }
229 None
230 }
231
232 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
233 let lookup_key = lookup_value.clone();
234 self.index
235 .entry(lookup_value)
236 .or_default()
237 .push((primary_key, timestamp));
238
239 if let Some(mut entries) = self.index.get_mut(&lookup_key) {
240 entries.sort_by_key(|(_, ts)| *ts);
241 }
242 }
243}
244
245#[derive(Debug)]
246pub struct PdaReverseLookup {
247 index: LruCache<String, String>,
249}
250
251impl PdaReverseLookup {
252 pub fn new(capacity: usize) -> Self {
253 PdaReverseLookup {
254 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
255 }
256 }
257
258 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
259 self.index.get(pda_address).cloned()
260 }
261
262 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
263 let evicted = if self.index.len() >= self.index.cap().get() {
265 self.index.peek_lru().map(|(k, _)| k.clone())
267 } else {
268 None
269 };
270
271 self.index.put(pda_address, seed_value);
272 evicted
273 }
274}
275
276#[derive(Debug, Clone)]
277pub struct PendingAccountUpdate {
278 pub account_type: String,
279 pub pda_address: String,
280 pub account_data: Value,
281 pub slot: u64,
282 pub signature: String,
283 pub queued_at: i64,
284}
285
286#[derive(Debug, Clone)]
287pub struct PendingQueueStats {
288 pub total_updates: usize,
289 pub unique_pdas: usize,
290 pub oldest_age_seconds: i64,
291 pub largest_pda_queue_size: usize,
292 pub estimated_memory_bytes: usize,
293}
294
295#[derive(Debug)]
296pub struct StateTable {
297 pub data: DashMap<Value, Value>,
298 pub lookup_indexes: HashMap<String, LookupIndex>,
299 pub temporal_indexes: HashMap<String, TemporalIndex>,
300 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
301 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
302}
303
304impl VmContext {
305 pub fn new() -> Self {
306 let mut vm = VmContext {
307 registers: vec![Value::Null; 256],
308 states: HashMap::new(),
309 instructions_executed: 0,
310 cache_hits: 0,
311 path_cache: HashMap::new(),
312 pda_cache_hits: 0,
313 pda_cache_misses: 0,
314 pending_queue_size: 0,
315 current_context: None,
316 };
317 vm.states.insert(
318 0,
319 StateTable {
320 data: DashMap::new(),
321 lookup_indexes: HashMap::new(),
322 temporal_indexes: HashMap::new(),
323 pda_reverse_lookups: HashMap::new(),
324 pending_updates: DashMap::new(),
325 },
326 );
327 vm
328 }
329
330 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
333 self.states.get_mut(&state_id)
334 }
335
336 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
338 &mut self.registers
339 }
340
341 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
343 &self.path_cache
344 }
345
346 pub fn current_context(&self) -> Option<&UpdateContext> {
348 self.current_context.as_ref()
349 }
350
351 pub fn update_state_from_register(
354 &mut self,
355 state_id: u32,
356 key: Value,
357 register: Register,
358 ) -> Result<()> {
359 let state = self.states.get(&state_id).ok_or("State table not found")?;
360 let value = self.registers[register].clone();
361 state.data.insert(key, value);
362 Ok(())
363 }
364
365 fn reset_registers(&mut self) {
366 for reg in &mut self.registers {
367 *reg = Value::Null;
368 }
369 }
370
371 pub fn extract_partial_state(
373 &self,
374 state_reg: Register,
375 dirty_fields: &HashSet<String>,
376 ) -> Result<Value> {
377 let full_state = &self.registers[state_reg];
378
379 if dirty_fields.is_empty() {
380 return Ok(json!({}));
381 }
382
383 let mut partial = serde_json::Map::new();
384
385 for path in dirty_fields {
386 let segments: Vec<&str> = path.split('.').collect();
387
388 let mut current = full_state;
389 let mut found = true;
390
391 for segment in &segments {
392 match current.get(segment) {
393 Some(v) => current = v,
394 None => {
395 found = false;
396 break;
397 }
398 }
399 }
400
401 if !found {
402 continue;
403 }
404
405 let mut target = &mut partial;
406 for (i, segment) in segments.iter().enumerate() {
407 if i == segments.len() - 1 {
408 target.insert(segment.to_string(), current.clone());
409 } else {
410 target
411 .entry(segment.to_string())
412 .or_insert_with(|| json!({}));
413 target = target
414 .get_mut(*segment)
415 .and_then(|v| v.as_object_mut())
416 .ok_or("Failed to build nested structure")?;
417 }
418 }
419 }
420
421 Ok(Value::Object(partial))
422 }
423
424 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
425 if let Some(compiled) = self.path_cache.get(path) {
426 self.cache_hits += 1;
427 return compiled.clone();
428 }
429 let compiled = CompiledPath::new(path);
430 self.path_cache.insert(path.to_string(), compiled.clone());
431 compiled
432 }
433
434 #[cfg_attr(feature = "otel", instrument(
436 name = "vm.process_event",
437 skip(self, bytecode, event_value, context),
438 fields(
439 event_type = %event_type,
440 slot = context.as_ref().and_then(|c| c.slot),
441 )
442 ))]
443 pub fn process_event_with_context(
444 &mut self,
445 bytecode: &MultiEntityBytecode,
446 mut event_value: Value,
447 event_type: &str,
448 context: Option<&UpdateContext>,
449 ) -> Result<Vec<Mutation>> {
450 self.current_context = context.cloned();
452
453 if let Some(ctx) = context {
455 if let Some(obj) = event_value.as_object_mut() {
456 obj.insert("__update_context".to_string(), ctx.to_value());
457 }
458 }
459
460 let mut all_mutations = Vec::new();
461
462 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
463 tracing::debug!(
464 "🔀 Event type '{}' routes to {} entity/entities",
465 event_type,
466 entity_names.len()
467 );
468
469 for entity_name in entity_names {
470 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
471 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
472 tracing::debug!(
473 " ▶️ Executing handler for entity '{}', event '{}'",
474 entity_name,
475 event_type
476 );
477 tracing::debug!(" Handler has {} opcodes", handler.len());
478
479 let mutations = self.execute_handler(
480 handler,
481 event_value.clone(),
482 event_type,
483 entity_bytecode.state_id,
484 entity_bytecode.computed_fields_evaluator.as_ref(),
485 )?;
486
487 tracing::debug!(" Handler produced {} mutation(s)", mutations.len());
488 all_mutations.extend(mutations);
489 } else {
490 tracing::debug!(
491 " ⊘ No handler found for entity '{}', event '{}'",
492 entity_name,
493 event_type
494 );
495 }
496 } else {
497 tracing::debug!(" ⊘ Entity '{}' not found in bytecode", entity_name);
498 }
499 }
500 } else {
501 tracing::debug!("⊘ No event routing found for event type '{}'", event_type);
502 }
503
504 Ok(all_mutations)
505 }
506
507 pub fn process_event(
509 &mut self,
510 bytecode: &MultiEntityBytecode,
511 event_value: Value,
512 event_type: &str,
513 ) -> Result<Vec<Mutation>> {
514 self.process_event_with_context(bytecode, event_value, event_type, None)
515 }
516
517 pub fn process_any(
518 &mut self,
519 bytecode: &MultiEntityBytecode,
520 any: prost_types::Any,
521 ) -> Result<Vec<Mutation>> {
522 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
523 self.process_event(bytecode, event_value, &event_type)
524 }
525
526 #[cfg_attr(feature = "otel", instrument(
527 name = "vm.execute_handler",
528 skip(self, handler, event_value, entity_evaluator),
529 level = "debug",
530 fields(
531 event_type = %event_type,
532 handler_opcodes = handler.len(),
533 )
534 ))]
535 #[allow(clippy::type_complexity)]
536 fn execute_handler(
537 &mut self,
538 handler: &[OpCode],
539 event_value: Value,
540 event_type: &str,
541 override_state_id: u32,
542 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
543 ) -> Result<Vec<Mutation>> {
544 self.reset_registers();
545
546 tracing::trace!(
547 "Executing handler: event_type={}, handler_opcodes={}, event_value={:?}",
548 event_type,
549 handler.len(),
550 event_value
551 );
552
553 let mut pc: usize = 0;
554 let mut output = Vec::new();
555 let mut dirty_fields: HashSet<String> = HashSet::new();
556
557 while pc < handler.len() {
558 match &handler[pc] {
559 OpCode::LoadEventField {
560 path,
561 dest,
562 default,
563 } => {
564 let value = self.load_field(&event_value, path, default.as_ref())?;
565 tracing::trace!(
566 "LoadEventField: path={:?}, dest={}, value={:?}",
567 path.segments,
568 dest,
569 value
570 );
571 if value.is_null() && path.segments.len() >= 2 && path.segments[0] == "accounts" {
573 tracing::warn!(
574 "⚠️ LoadEventField returned NULL for accounts path: {:?} -> dest={}",
575 path.segments, dest
576 );
577 }
578 self.registers[*dest] = value;
579 pc += 1;
580 }
581 OpCode::LoadConstant { value, dest } => {
582 self.registers[*dest] = value.clone();
583 pc += 1;
584 }
585 OpCode::CopyRegister { source, dest } => {
586 let value = self.registers[*source].clone();
587 tracing::trace!(
588 "CopyRegister: source={}, dest={}, value={:?}",
589 source,
590 dest,
591 if value.is_null() { "NULL".to_string() } else { format!("{}", value) }
592 );
593 self.registers[*dest] = value;
594 pc += 1;
595 }
596 OpCode::CopyRegisterIfNull { source, dest } => {
597 if self.registers[*dest].is_null() {
598 self.registers[*dest] = self.registers[*source].clone();
599 tracing::trace!(
600 "CopyRegisterIfNull: copied from reg {} to reg {} (value={:?})",
601 source,
602 dest,
603 self.registers[*source]
604 );
605 } else {
606 tracing::trace!(
607 "CopyRegisterIfNull: dest reg {} not null, skipping copy",
608 dest
609 );
610 }
611 pc += 1;
612 }
613 OpCode::GetEventType { dest } => {
614 self.registers[*dest] = json!(event_type);
615 pc += 1;
616 }
617 OpCode::CreateObject { dest } => {
618 self.registers[*dest] = json!({});
619 pc += 1;
620 }
621 OpCode::SetField {
622 object,
623 path,
624 value,
625 } => {
626 let val = self.registers[*value].clone();
627 tracing::trace!("SetField: path={}, value={:?}", path, val);
628
629 self.set_field_auto_vivify(*object, path, *value)?;
630 dirty_fields.insert(path.to_string());
631 pc += 1;
632 }
633 OpCode::SetFields { object, fields } => {
634 for (path, value_reg) in fields {
635 self.set_field_auto_vivify(*object, path, *value_reg)?;
636 dirty_fields.insert(path.to_string());
637 }
638 pc += 1;
639 }
640 OpCode::GetField { object, path, dest } => {
641 let value = self.get_field(*object, path)?;
642 self.registers[*dest] = value;
643 pc += 1;
644 }
645 OpCode::ReadOrInitState {
646 state_id: _,
647 key,
648 default,
649 dest,
650 } => {
651 let actual_state_id = override_state_id;
652 self.states
653 .entry(actual_state_id)
654 .or_insert_with(|| StateTable {
655 data: DashMap::new(),
656 lookup_indexes: HashMap::new(),
657 temporal_indexes: HashMap::new(),
658 pda_reverse_lookups: HashMap::new(),
659 pending_updates: DashMap::new(),
660 });
661 let state = self
662 .states
663 .get(&actual_state_id)
664 .ok_or("State table not found")?;
665 let key_value = &self.registers[*key];
666 if key_value.is_null() {
667 if event_type.ends_with("State") && !event_type.ends_with("IxState") {
670 tracing::warn!(
671 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
672 key,
673 event_type
674 );
675 } else {
676 tracing::debug!(
677 "ReadOrInitState: key register {} is NULL (expected for PDA registration hook), event_type={}",
678 key,
679 event_type
680 );
681 }
682 }
683 let value = state
684 .data
685 .get(key_value)
686 .map(|v| v.clone())
687 .unwrap_or_else(|| default.clone());
688
689 self.registers[*dest] = value;
690 pc += 1;
691 }
692 OpCode::UpdateState {
693 state_id: _,
694 key,
695 value,
696 } => {
697 let actual_state_id = override_state_id;
698 let state = self
699 .states
700 .get(&actual_state_id)
701 .ok_or("State table not found")?;
702 let key_value = self.registers[*key].clone();
703 let value_data = self.registers[*value].clone();
704
705 state.data.insert(key_value, value_data);
706 pc += 1;
707 }
708 OpCode::AppendToArray {
709 object,
710 path,
711 value,
712 } => {
713 tracing::trace!("AppendToArray: path={}, value={:?}", path, self.registers[*value]);
714 self.append_to_array(*object, path, *value)?;
715 dirty_fields.insert(path.to_string());
716 pc += 1;
717 }
718 OpCode::GetCurrentTimestamp { dest } => {
719 let timestamp = std::time::SystemTime::now()
720 .duration_since(std::time::UNIX_EPOCH)
721 .unwrap()
722 .as_secs() as i64;
723 self.registers[*dest] = json!(timestamp);
724 pc += 1;
725 }
726 OpCode::CreateEvent { dest, event_value } => {
727 tracing::trace!("CreateEvent: event_value_reg={}, event_value={:?}", event_value, self.registers[*event_value]);
728 let timestamp = std::time::SystemTime::now()
729 .duration_since(std::time::UNIX_EPOCH)
730 .unwrap()
731 .as_secs() as i64;
732
733 let mut event_data = self.registers[*event_value].clone();
735 if let Some(obj) = event_data.as_object_mut() {
736 obj.remove("__update_context");
737 }
738
739 let mut event = serde_json::Map::new();
741 event.insert("timestamp".to_string(), json!(timestamp));
742 event.insert("data".to_string(), event_data);
743
744 if let Some(ref ctx) = self.current_context {
746 if let Some(slot) = ctx.slot {
747 event.insert("slot".to_string(), json!(slot));
748 }
749 if let Some(ref signature) = ctx.signature {
750 event.insert("signature".to_string(), json!(signature));
751 }
752 }
753
754 self.registers[*dest] = Value::Object(event);
755 pc += 1;
756 }
757 OpCode::CreateCapture {
758 dest,
759 capture_value,
760 } => {
761 let timestamp = std::time::SystemTime::now()
762 .duration_since(std::time::UNIX_EPOCH)
763 .unwrap()
764 .as_secs() as i64;
765
766 let capture_data = self.registers[*capture_value].clone();
768
769 let account_address = event_value
771 .get("__account_address")
772 .and_then(|v| v.as_str())
773 .unwrap_or("")
774 .to_string();
775
776 let mut capture = serde_json::Map::new();
778 capture.insert("timestamp".to_string(), json!(timestamp));
779 capture.insert("account_address".to_string(), json!(account_address));
780 capture.insert("data".to_string(), capture_data);
781
782 if let Some(ref ctx) = self.current_context {
784 if let Some(slot) = ctx.slot {
785 capture.insert("slot".to_string(), json!(slot));
786 }
787 if let Some(ref signature) = ctx.signature {
788 capture.insert("signature".to_string(), json!(signature));
789 }
790 }
791
792 self.registers[*dest] = Value::Object(capture);
793 pc += 1;
794 }
795 OpCode::Transform {
796 source,
797 dest,
798 transformation,
799 } => {
800 if source == dest {
801 let result = self.transform_in_place(*source, transformation);
802 if let Err(ref e) = result {
803 tracing::error!(
804 "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
805 transformation, pc, e, source, &self.registers[*source]
806 );
807 }
808 result?;
809 } else {
810 let source_value = &self.registers[*source];
811 let result = self.apply_transformation(source_value, transformation);
812 if let Err(ref e) = result {
813 tracing::error!(
814 "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
815 transformation, pc, e, source, source_value
816 );
817 }
818 let value = result?;
819 self.registers[*dest] = value;
820 }
821 pc += 1;
822 }
823 OpCode::EmitMutation {
824 entity_name,
825 key,
826 state,
827 } => {
828 let primary_key = self.registers[*key].clone();
829
830 if primary_key.is_null() || dirty_fields.is_empty() {
831 tracing::warn!(
833 " ⊘ [VM] Skipping mutation for entity '{}': primary_key={}, dirty_fields.len()={}",
834 entity_name,
835 if primary_key.is_null() { "NULL" } else { "present" },
836 dirty_fields.len()
837 );
838 if dirty_fields.is_empty() {
839 tracing::warn!(" Reason: No fields were modified by this handler");
840 }
841 if primary_key.is_null() {
842 tracing::warn!(" Reason: Primary key could not be resolved (check __resolved_primary_key or key_resolution)");
843 tracing::warn!(" Debug: reg[15]={:?}, reg[17]={:?}, reg[19]={:?}, reg[20]={:?}",
845 self.registers.get(15).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
846 self.registers.get(17).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
847 self.registers.get(19).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
848 self.registers.get(20).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) })
849 );
850 }
851 } else {
852 let patch = self.extract_partial_state(*state, &dirty_fields)?;
853 tracing::debug!(
854 " Patch structure: {}",
855 serde_json::to_string_pretty(&patch).unwrap_or_default()
856 );
857 let mutation = Mutation {
858 export: entity_name.clone(),
859 key: primary_key,
860 patch,
861 };
862 output.push(mutation);
863 }
864 pc += 1;
865 }
866 OpCode::SetFieldIfNull {
867 object,
868 path,
869 value,
870 } => {
871 let val = self.registers[*value].clone();
872 let was_set = self.set_field_if_null(*object, path, *value)?;
873 tracing::trace!(
874 "SetFieldIfNull: path={}, value={:?}, was_set={}",
875 path,
876 val,
877 was_set
878 );
879 if was_set {
880 dirty_fields.insert(path.to_string());
881 }
882 pc += 1;
883 }
884 OpCode::SetFieldMax {
885 object,
886 path,
887 value,
888 } => {
889 let was_updated = self.set_field_max(*object, path, *value)?;
890 if was_updated {
891 dirty_fields.insert(path.to_string());
892 }
893 pc += 1;
894 }
895 OpCode::UpdateTemporalIndex {
896 state_id: _,
897 index_name,
898 lookup_value,
899 primary_key,
900 timestamp,
901 } => {
902 let actual_state_id = override_state_id;
903 let state = self
904 .states
905 .get_mut(&actual_state_id)
906 .ok_or("State table not found")?;
907 let index = state
908 .temporal_indexes
909 .entry(index_name.clone())
910 .or_insert_with(TemporalIndex::new);
911
912 let lookup_val = self.registers[*lookup_value].clone();
913 let pk_val = self.registers[*primary_key].clone();
914 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
915 val
916 } else if let Some(val) = self.registers[*timestamp].as_u64() {
917 val as i64
918 } else {
919 return Err(format!(
920 "Timestamp must be a number (i64 or u64), got: {:?}",
921 self.registers[*timestamp]
922 )
923 .into());
924 };
925
926 index.insert(lookup_val, pk_val, ts_val);
927 pc += 1;
928 }
929 OpCode::LookupTemporalIndex {
930 state_id: _,
931 index_name,
932 lookup_value,
933 timestamp,
934 dest,
935 } => {
936 let actual_state_id = override_state_id;
937 let state = self
938 .states
939 .get(&actual_state_id)
940 .ok_or("State table not found")?;
941 let lookup_val = &self.registers[*lookup_value];
942
943 let result = if self.registers[*timestamp].is_null() {
944 if let Some(index) = state.temporal_indexes.get(index_name) {
945 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
946 } else {
947 Value::Null
948 }
949 } else {
950 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
951 val
952 } else if let Some(val) = self.registers[*timestamp].as_u64() {
953 val as i64
954 } else {
955 return Err(format!(
956 "Timestamp must be a number (i64 or u64), got: {:?}",
957 self.registers[*timestamp]
958 )
959 .into());
960 };
961
962 if let Some(index) = state.temporal_indexes.get(index_name) {
963 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
964 } else {
965 Value::Null
966 }
967 };
968
969 self.registers[*dest] = result;
970 pc += 1;
971 }
972 OpCode::UpdateLookupIndex {
973 state_id: _,
974 index_name,
975 lookup_value,
976 primary_key,
977 } => {
978 let actual_state_id = override_state_id;
979 let state = self
980 .states
981 .get_mut(&actual_state_id)
982 .ok_or("State table not found")?;
983 let index = state
984 .lookup_indexes
985 .entry(index_name.clone())
986 .or_insert_with(LookupIndex::new);
987
988 let lookup_val = self.registers[*lookup_value].clone();
989 let pk_val = self.registers[*primary_key].clone();
990
991 tracing::debug!(
992 "📝 UpdateLookupIndex: {} -> {} (index: {})",
993 lookup_val, pk_val, index_name
994 );
995
996 index.insert(lookup_val, pk_val);
997 pc += 1;
998 }
999 OpCode::LookupIndex {
1000 state_id: _,
1001 index_name,
1002 lookup_value,
1003 dest,
1004 } => {
1005 let actual_state_id = override_state_id;
1006 let lookup_val = self.registers[*lookup_value].clone();
1007
1008 let result = {
1009 let state = self
1010 .states
1011 .get(&actual_state_id)
1012 .ok_or("State table not found")?;
1013
1014 if let Some(index) = state.lookup_indexes.get(index_name) {
1015 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1016 tracing::debug!(
1017 "🔍 LookupIndex: {} -> {} (index: {}, entries: {})",
1018 lookup_val, found, index_name, index.len()
1019 );
1020 found
1021 } else {
1022 Value::Null
1023 }
1024 };
1025
1026 let final_result = if result.is_null() {
1029 if let Some(pda_str) = lookup_val.as_str() {
1030 let state = self
1031 .states
1032 .get_mut(&actual_state_id)
1033 .ok_or("State table not found")?;
1034
1035 if let Some(pda_lookup) = state.pda_reverse_lookups.get_mut("default_pda_lookup") {
1036 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1037 tracing::info!(
1038 "🔍 LookupIndex (PDA fallback): {} -> {} (via default_pda_lookup)",
1039 &pda_str[..pda_str.len().min(8)], resolved
1040 );
1041 Value::String(resolved)
1042 } else {
1043 tracing::debug!(
1044 "🔍 LookupIndex (PDA fallback): {} -> NOT FOUND in PDA reverse lookup",
1045 &pda_str[..pda_str.len().min(8)]
1046 );
1047 Value::Null
1048 }
1049 } else {
1050 tracing::debug!(
1051 "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist, no PDA lookup table)",
1052 lookup_val, index_name
1053 );
1054 Value::Null
1055 }
1056 } else {
1057 tracing::debug!(
1058 "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist)",
1059 lookup_val, index_name
1060 );
1061 Value::Null
1062 }
1063 } else {
1064 result
1065 };
1066
1067 self.registers[*dest] = final_result;
1068 pc += 1;
1069 }
1070 OpCode::SetFieldSum {
1071 object,
1072 path,
1073 value,
1074 } => {
1075 let was_updated = self.set_field_sum(*object, path, *value)?;
1076 if was_updated {
1077 dirty_fields.insert(path.to_string());
1078 }
1079 pc += 1;
1080 }
1081 OpCode::SetFieldIncrement { object, path } => {
1082 let was_updated = self.set_field_increment(*object, path)?;
1083 if was_updated {
1084 dirty_fields.insert(path.to_string());
1085 }
1086 pc += 1;
1087 }
1088 OpCode::SetFieldMin {
1089 object,
1090 path,
1091 value,
1092 } => {
1093 let was_updated = self.set_field_min(*object, path, *value)?;
1094 if was_updated {
1095 dirty_fields.insert(path.to_string());
1096 }
1097 pc += 1;
1098 }
1099 OpCode::AddToUniqueSet {
1100 state_id: _,
1101 set_name,
1102 value,
1103 count_object,
1104 count_path,
1105 } => {
1106 let value_to_add = self.registers[*value].clone();
1107
1108 let set_field_path = format!("__unique_set:{}", set_name);
1111
1112 let mut set: HashSet<Value> =
1114 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1115 if !existing.is_null() {
1116 serde_json::from_value(existing).unwrap_or_default()
1117 } else {
1118 HashSet::new()
1119 }
1120 } else {
1121 HashSet::new()
1122 };
1123
1124 let was_new = set.insert(value_to_add);
1126
1127 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1129 self.registers[100] = serde_json::to_value(set_as_vec)?;
1130 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1131
1132 if was_new {
1134 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1135 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1136 dirty_fields.insert(count_path.to_string());
1137 }
1138
1139 pc += 1;
1140 }
1141 OpCode::ConditionalSetField {
1142 object,
1143 path,
1144 value,
1145 condition_field,
1146 condition_op,
1147 condition_value,
1148 } => {
1149 let field_value = self.load_field(&event_value, condition_field, None)?;
1151
1152 let condition_met =
1154 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1155
1156 if condition_met {
1157 let val = self.registers[*value].clone();
1158 tracing::trace!(
1159 "ConditionalSetField: condition met, setting {}={:?}",
1160 path,
1161 val
1162 );
1163 self.set_field_auto_vivify(*object, path, *value)?;
1164 dirty_fields.insert(path.to_string());
1165 } else {
1166 tracing::trace!(
1167 "ConditionalSetField: condition not met, skipping {}",
1168 path
1169 );
1170 }
1171 pc += 1;
1172 }
1173 OpCode::ConditionalIncrement {
1174 object,
1175 path,
1176 condition_field,
1177 condition_op,
1178 condition_value,
1179 } => {
1180 let field_value = self.load_field(&event_value, condition_field, None)?;
1182
1183 let condition_met =
1185 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1186
1187 if condition_met {
1188 tracing::trace!(
1189 "ConditionalIncrement: condition met, incrementing {}",
1190 path
1191 );
1192 let was_updated = self.set_field_increment(*object, path)?;
1193 if was_updated {
1194 dirty_fields.insert(path.to_string());
1195 }
1196 } else {
1197 tracing::trace!(
1198 "ConditionalIncrement: condition not met, skipping {}",
1199 path
1200 );
1201 }
1202 pc += 1;
1203 }
1204 OpCode::EvaluateComputedFields {
1205 state,
1206 computed_paths,
1207 } => {
1208 if let Some(evaluator) = entity_evaluator {
1210 let state_value = &mut self.registers[*state];
1211 match evaluator(state_value) {
1212 Ok(()) => {
1213 for path in computed_paths {
1215 dirty_fields.insert(path.clone());
1216 }
1217 }
1218 Err(_e) => {
1219 }
1221 }
1222 }
1223 pc += 1;
1224 }
1225 OpCode::UpdatePdaReverseLookup {
1226 state_id: _,
1227 lookup_name,
1228 pda_address,
1229 primary_key,
1230 } => {
1231 let actual_state_id = override_state_id;
1232 let state = self
1233 .states
1234 .get_mut(&actual_state_id)
1235 .ok_or("State table not found")?;
1236
1237 let pda_val = self.registers[*pda_address].clone();
1238 let pk_val = self.registers[*primary_key].clone();
1239
1240 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1242 let pda_lookup = state
1244 .pda_reverse_lookups
1245 .entry(lookup_name.clone())
1246 .or_insert_with(|| PdaReverseLookup::new(10000));
1247
1248 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1249
1250 tracing::debug!(
1251 "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {})",
1252 pda_str, pk_str, lookup_name
1253 );
1254 } else if !pk_val.is_null() {
1255 if let Some(pk_num) = pk_val.as_u64() {
1257 if let Some(pda_str) = pda_val.as_str() {
1258 let pda_lookup = state
1259 .pda_reverse_lookups
1260 .entry(lookup_name.clone())
1261 .or_insert_with(|| PdaReverseLookup::new(10000));
1262
1263 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1264
1265 tracing::debug!(
1266 "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {}, pk was u64)",
1267 pda_str, pk_num, lookup_name
1268 );
1269 }
1270 }
1271 }
1272
1273 pc += 1;
1274 }
1275 }
1276
1277 self.instructions_executed += 1;
1278 }
1279
1280 Ok(output)
1281 }
1282
1283 fn load_field(
1284 &self,
1285 event_value: &Value,
1286 path: &FieldPath,
1287 default: Option<&Value>,
1288 ) -> Result<Value> {
1289 if path.segments.iter().any(|s| s.is_empty()) {
1291 tracing::warn!(
1292 "load_field: path contains empty segment! path={:?}",
1293 path.segments
1294 );
1295 }
1296
1297 if path.segments.is_empty() {
1298 if let Some(obj) = event_value.as_object() {
1302 let filtered: serde_json::Map<String, Value> = obj
1303 .iter()
1304 .filter(|(k, _)| !k.starts_with("__"))
1305 .map(|(k, v)| (k.clone(), v.clone()))
1306 .collect();
1307 return Ok(Value::Object(filtered));
1308 }
1309 return Ok(event_value.clone());
1310 }
1311
1312 let mut current = event_value;
1313 for (i, segment) in path.segments.iter().enumerate() {
1314 current = match current.get(segment) {
1315 Some(v) => v,
1316 None => {
1317 if path.segments.len() == 2 && path.segments[0] == "accounts" {
1319 tracing::warn!(
1320 "load_field: could not find segment '{}' at index {} in path {:?}",
1321 segment,
1322 i,
1323 path.segments,
1324 );
1325 tracing::warn!(
1326 " event has top-level keys: {:?}",
1327 event_value.as_object().map(|o| o.keys().collect::<Vec<_>>())
1328 );
1329 if let Some(accounts) = event_value.get("accounts") {
1331 tracing::warn!(
1332 " 'accounts' exists with keys: {:?}",
1333 accounts.as_object().map(|o| o.keys().collect::<Vec<_>>())
1334 );
1335 } else {
1336 tracing::warn!(" 'accounts' key does NOT exist in event_value");
1337 }
1338 }
1339 return Ok(default.cloned().unwrap_or(Value::Null));
1340 }
1341 };
1342 }
1343
1344 Ok(current.clone())
1345 }
1346
1347 fn set_field_auto_vivify(
1348 &mut self,
1349 object_reg: Register,
1350 path: &str,
1351 value_reg: Register,
1352 ) -> Result<()> {
1353 let compiled = self.get_compiled_path(path);
1354 let segments = compiled.segments();
1355 let value = self.registers[value_reg].clone();
1356
1357 if !self.registers[object_reg].is_object() {
1358 self.registers[object_reg] = json!({});
1359 }
1360
1361 let obj = self.registers[object_reg]
1362 .as_object_mut()
1363 .ok_or("Not an object")?;
1364
1365 let mut current = obj;
1366 for (i, segment) in segments.iter().enumerate() {
1367 if i == segments.len() - 1 {
1368 current.insert(segment.to_string(), value.clone());
1369 } else {
1370 current
1371 .entry(segment.to_string())
1372 .or_insert_with(|| json!({}));
1373 current = current
1374 .get_mut(segment)
1375 .and_then(|v| v.as_object_mut())
1376 .ok_or("Path collision: expected object")?;
1377 }
1378 }
1379
1380 Ok(())
1381 }
1382
1383 fn set_field_if_null(
1384 &mut self,
1385 object_reg: Register,
1386 path: &str,
1387 value_reg: Register,
1388 ) -> Result<bool> {
1389 let compiled = self.get_compiled_path(path);
1390 let segments = compiled.segments();
1391 let value = self.registers[value_reg].clone();
1392
1393 if !self.registers[object_reg].is_object() {
1394 self.registers[object_reg] = json!({});
1395 }
1396
1397 let obj = self.registers[object_reg]
1398 .as_object_mut()
1399 .ok_or("Not an object")?;
1400
1401 let mut current = obj;
1402 let mut was_set = false;
1403 for (i, segment) in segments.iter().enumerate() {
1404 if i == segments.len() - 1 {
1405 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1406 current.insert(segment.to_string(), value.clone());
1407 was_set = true;
1408 }
1409 } else {
1410 current
1411 .entry(segment.to_string())
1412 .or_insert_with(|| json!({}));
1413 current = current
1414 .get_mut(segment)
1415 .and_then(|v| v.as_object_mut())
1416 .ok_or("Path collision: expected object")?;
1417 }
1418 }
1419
1420 Ok(was_set)
1421 }
1422
1423 fn set_field_max(
1424 &mut self,
1425 object_reg: Register,
1426 path: &str,
1427 value_reg: Register,
1428 ) -> Result<bool> {
1429 let compiled = self.get_compiled_path(path);
1430 let segments = compiled.segments();
1431 let new_value = self.registers[value_reg].clone();
1432
1433 if !self.registers[object_reg].is_object() {
1434 self.registers[object_reg] = json!({});
1435 }
1436
1437 let obj = self.registers[object_reg]
1438 .as_object_mut()
1439 .ok_or("Not an object")?;
1440
1441 let mut current = obj;
1442 let mut was_updated = false;
1443 for (i, segment) in segments.iter().enumerate() {
1444 if i == segments.len() - 1 {
1445 let should_update = if let Some(current_value) = current.get(segment) {
1446 if current_value.is_null() {
1447 true
1448 } else {
1449 match (current_value.as_i64(), new_value.as_i64()) {
1450 (Some(current_val), Some(new_val)) => new_val > current_val,
1451 (Some(current_val), None) if new_value.as_u64().is_some() => {
1452 new_value.as_u64().unwrap() as i64 > current_val
1453 }
1454 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1455 new_val > current_value.as_u64().unwrap() as i64
1456 }
1457 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1458 (Some(current_val), Some(new_val)) => new_val > current_val,
1459 _ => match (current_value.as_f64(), new_value.as_f64()) {
1460 (Some(current_val), Some(new_val)) => new_val > current_val,
1461 _ => false,
1462 },
1463 },
1464 _ => false,
1465 }
1466 }
1467 } else {
1468 true
1469 };
1470
1471 if should_update {
1472 current.insert(segment.to_string(), new_value.clone());
1473 was_updated = true;
1474 }
1475 } else {
1476 current
1477 .entry(segment.to_string())
1478 .or_insert_with(|| json!({}));
1479 current = current
1480 .get_mut(segment)
1481 .and_then(|v| v.as_object_mut())
1482 .ok_or("Path collision: expected object")?;
1483 }
1484 }
1485
1486 Ok(was_updated)
1487 }
1488
1489 fn set_field_sum(
1490 &mut self,
1491 object_reg: Register,
1492 path: &str,
1493 value_reg: Register,
1494 ) -> Result<bool> {
1495 let compiled = self.get_compiled_path(path);
1496 let segments = compiled.segments();
1497 let new_value = self.registers[value_reg].clone();
1498
1499 if !self.registers[object_reg].is_object() {
1500 self.registers[object_reg] = json!({});
1501 }
1502
1503 let obj = self.registers[object_reg]
1504 .as_object_mut()
1505 .ok_or("Not an object")?;
1506
1507 let mut current = obj;
1508 for (i, segment) in segments.iter().enumerate() {
1509 if i == segments.len() - 1 {
1510 let current_val = current
1512 .get(segment)
1513 .and_then(|v| {
1514 if v.is_null() {
1515 None
1516 } else {
1517 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1518 }
1519 })
1520 .unwrap_or(0);
1521
1522 let new_val_num = new_value
1524 .as_i64()
1525 .or_else(|| new_value.as_u64().map(|n| n as i64))
1526 .ok_or("Sum requires numeric value")?;
1527
1528 let sum = current_val + new_val_num;
1529 current.insert(segment.to_string(), json!(sum));
1530 return Ok(true);
1531 } else {
1532 current
1533 .entry(segment.to_string())
1534 .or_insert_with(|| json!({}));
1535 current = current
1536 .get_mut(segment)
1537 .and_then(|v| v.as_object_mut())
1538 .ok_or("Path collision: expected object")?;
1539 }
1540 }
1541
1542 Ok(false)
1543 }
1544
1545 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1546 let compiled = self.get_compiled_path(path);
1547 let segments = compiled.segments();
1548
1549 if !self.registers[object_reg].is_object() {
1550 self.registers[object_reg] = json!({});
1551 }
1552
1553 let obj = self.registers[object_reg]
1554 .as_object_mut()
1555 .ok_or("Not an object")?;
1556
1557 let mut current = obj;
1558 for (i, segment) in segments.iter().enumerate() {
1559 if i == segments.len() - 1 {
1560 let current_val = current
1562 .get(segment)
1563 .and_then(|v| {
1564 if v.is_null() {
1565 None
1566 } else {
1567 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1568 }
1569 })
1570 .unwrap_or(0);
1571
1572 let incremented = current_val + 1;
1573 current.insert(segment.to_string(), json!(incremented));
1574 return Ok(true);
1575 } else {
1576 current
1577 .entry(segment.to_string())
1578 .or_insert_with(|| json!({}));
1579 current = current
1580 .get_mut(segment)
1581 .and_then(|v| v.as_object_mut())
1582 .ok_or("Path collision: expected object")?;
1583 }
1584 }
1585
1586 Ok(false)
1587 }
1588
1589 fn set_field_min(
1590 &mut self,
1591 object_reg: Register,
1592 path: &str,
1593 value_reg: Register,
1594 ) -> Result<bool> {
1595 let compiled = self.get_compiled_path(path);
1596 let segments = compiled.segments();
1597 let new_value = self.registers[value_reg].clone();
1598
1599 if !self.registers[object_reg].is_object() {
1600 self.registers[object_reg] = json!({});
1601 }
1602
1603 let obj = self.registers[object_reg]
1604 .as_object_mut()
1605 .ok_or("Not an object")?;
1606
1607 let mut current = obj;
1608 let mut was_updated = false;
1609 for (i, segment) in segments.iter().enumerate() {
1610 if i == segments.len() - 1 {
1611 let should_update = if let Some(current_value) = current.get(segment) {
1612 if current_value.is_null() {
1613 true
1614 } else {
1615 match (current_value.as_i64(), new_value.as_i64()) {
1616 (Some(current_val), Some(new_val)) => new_val < current_val,
1617 (Some(current_val), None) if new_value.as_u64().is_some() => {
1618 (new_value.as_u64().unwrap() as i64) < current_val
1619 }
1620 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1621 new_val < current_value.as_u64().unwrap() as i64
1622 }
1623 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1624 (Some(current_val), Some(new_val)) => new_val < current_val,
1625 _ => match (current_value.as_f64(), new_value.as_f64()) {
1626 (Some(current_val), Some(new_val)) => new_val < current_val,
1627 _ => false,
1628 },
1629 },
1630 _ => false,
1631 }
1632 }
1633 } else {
1634 true
1635 };
1636
1637 if should_update {
1638 current.insert(segment.to_string(), new_value.clone());
1639 was_updated = true;
1640 }
1641 } else {
1642 current
1643 .entry(segment.to_string())
1644 .or_insert_with(|| json!({}));
1645 current = current
1646 .get_mut(segment)
1647 .and_then(|v| v.as_object_mut())
1648 .ok_or("Path collision: expected object")?;
1649 }
1650 }
1651
1652 Ok(was_updated)
1653 }
1654
1655 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
1656 let compiled = self.get_compiled_path(path);
1657 let segments = compiled.segments();
1658 let mut current = &self.registers[object_reg];
1659
1660 for segment in segments {
1661 current = current
1662 .get(segment)
1663 .ok_or_else(|| format!("Field not found: {}", segment))?;
1664 }
1665
1666 Ok(current.clone())
1667 }
1668
1669 fn append_to_array(
1670 &mut self,
1671 object_reg: Register,
1672 path: &str,
1673 value_reg: Register,
1674 ) -> Result<()> {
1675 let compiled = self.get_compiled_path(path);
1676 let segments = compiled.segments();
1677 let value = self.registers[value_reg].clone();
1678
1679 if !self.registers[object_reg].is_object() {
1680 self.registers[object_reg] = json!({});
1681 }
1682
1683 let obj = self.registers[object_reg]
1684 .as_object_mut()
1685 .ok_or("Not an object")?;
1686
1687 let mut current = obj;
1688 for (i, segment) in segments.iter().enumerate() {
1689 if i == segments.len() - 1 {
1690 current
1691 .entry(segment.to_string())
1692 .or_insert_with(|| json!([]));
1693 let arr = current
1694 .get_mut(segment)
1695 .and_then(|v| v.as_array_mut())
1696 .ok_or("Path is not an array")?;
1697 arr.push(value.clone());
1698 } else {
1699 current
1700 .entry(segment.to_string())
1701 .or_insert_with(|| json!({}));
1702 current = current
1703 .get_mut(segment)
1704 .and_then(|v| v.as_object_mut())
1705 .ok_or("Path collision: expected object")?;
1706 }
1707 }
1708
1709 Ok(())
1710 }
1711
1712 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
1713 let value = &self.registers[reg];
1714 let transformed = self.apply_transformation(value, transformation)?;
1715 self.registers[reg] = transformed;
1716 Ok(())
1717 }
1718
1719 fn apply_transformation(
1720 &self,
1721 value: &Value,
1722 transformation: &Transformation,
1723 ) -> Result<Value> {
1724 match transformation {
1725 Transformation::HexEncode => {
1726 if let Some(arr) = value.as_array() {
1727 let bytes: Vec<u8> = arr
1728 .iter()
1729 .filter_map(|v| v.as_u64().map(|n| n as u8))
1730 .collect();
1731 let hex = hex::encode(&bytes);
1732 Ok(json!(hex))
1733 } else {
1734 Err("HexEncode requires an array of numbers".into())
1735 }
1736 }
1737 Transformation::HexDecode => {
1738 if let Some(s) = value.as_str() {
1739 let s = s.strip_prefix("0x").unwrap_or(s);
1740 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
1741 Ok(json!(bytes))
1742 } else {
1743 Err("HexDecode requires a string".into())
1744 }
1745 }
1746 Transformation::Base58Encode => {
1747 if let Some(arr) = value.as_array() {
1748 let bytes: Vec<u8> = arr
1749 .iter()
1750 .filter_map(|v| v.as_u64().map(|n| n as u8))
1751 .collect();
1752 let encoded = bs58::encode(&bytes).into_string();
1753 Ok(json!(encoded))
1754 } else if value.is_string() {
1755 tracing::debug!("Base58Encode: value is already a string, passing through: {:?}", value);
1757 Ok(value.clone())
1758 } else {
1759 tracing::error!("Base58Encode failed: value type is {:?}, value: {:?}",
1760 if value.is_null() { "null" }
1761 else if value.is_boolean() { "boolean" }
1762 else if value.is_number() { "number" }
1763 else if value.is_object() { "object" }
1764 else { "unknown" },
1765 value
1766 );
1767 Err("Base58Encode requires an array of numbers".into())
1768 }
1769 }
1770 Transformation::Base58Decode => {
1771 if let Some(s) = value.as_str() {
1772 let bytes = bs58::decode(s).into_vec().map_err(|e| format!("Base58 decode error: {}", e))?;
1773 Ok(json!(bytes))
1774 } else {
1775 Err("Base58Decode requires a string".into())
1776 }
1777 }
1778 Transformation::ToString => Ok(json!(value.to_string())),
1779 Transformation::ToNumber => {
1780 if let Some(s) = value.as_str() {
1781 let n = s
1782 .parse::<i64>()
1783 .map_err(|e| format!("Parse error: {}", e))?;
1784 Ok(json!(n))
1785 } else {
1786 Ok(value.clone())
1787 }
1788 }
1789 }
1790 }
1791
1792 fn evaluate_comparison(
1793 &self,
1794 field_value: &Value,
1795 op: &ComparisonOp,
1796 condition_value: &Value,
1797 ) -> Result<bool> {
1798 use ComparisonOp::*;
1799
1800 match op {
1801 Equal => Ok(field_value == condition_value),
1802 NotEqual => Ok(field_value != condition_value),
1803 GreaterThan => {
1804 match (field_value.as_i64(), condition_value.as_i64()) {
1806 (Some(a), Some(b)) => Ok(a > b),
1807 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1808 (Some(a), Some(b)) => Ok(a > b),
1809 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1810 (Some(a), Some(b)) => Ok(a > b),
1811 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
1812 },
1813 },
1814 }
1815 }
1816 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1817 (Some(a), Some(b)) => Ok(a >= b),
1818 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1819 (Some(a), Some(b)) => Ok(a >= b),
1820 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1821 (Some(a), Some(b)) => Ok(a >= b),
1822 _ => {
1823 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
1824 }
1825 },
1826 },
1827 },
1828 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
1829 (Some(a), Some(b)) => Ok(a < b),
1830 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1831 (Some(a), Some(b)) => Ok(a < b),
1832 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1833 (Some(a), Some(b)) => Ok(a < b),
1834 _ => Err("Cannot compare non-numeric values with LessThan".into()),
1835 },
1836 },
1837 },
1838 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1839 (Some(a), Some(b)) => Ok(a <= b),
1840 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1841 (Some(a), Some(b)) => Ok(a <= b),
1842 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1843 (Some(a), Some(b)) => Ok(a <= b),
1844 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
1845 },
1846 },
1847 },
1848 }
1849 }
1850
1851 #[cfg_attr(feature = "otel", instrument(
1865 name = "vm.update_pda_lookup",
1866 skip(self),
1867 fields(
1868 pda = %pda_address,
1869 seed = %seed_value,
1870 )
1871 ))]
1872 pub fn update_pda_reverse_lookup(
1873 &mut self,
1874 state_id: u32,
1875 lookup_name: &str,
1876 pda_address: String,
1877 seed_value: String,
1878 ) -> Result<Vec<PendingAccountUpdate>> {
1879 let state = self
1880 .states
1881 .get_mut(&state_id)
1882 .ok_or("State table not found")?;
1883
1884 let lookup = state
1885 .pda_reverse_lookups
1886 .entry(lookup_name.to_string())
1887 .or_insert_with(|| PdaReverseLookup::new(10000));
1888
1889 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
1891
1892 if let Some(ref evicted) = evicted_pda {
1894 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
1895 let count = evicted_updates.len();
1896 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
1897 tracing::info!(
1898 "Cleaned up {} pending updates for evicted PDA {} from LRU cache",
1899 count,
1900 evicted
1901 );
1902 }
1903 }
1904
1905 self.flush_pending_updates(state_id, &pda_address)
1907 }
1908
1909 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
1914 let state = match self.states.get_mut(&state_id) {
1915 Some(s) => s,
1916 None => return 0,
1917 };
1918
1919 let now = std::time::SystemTime::now()
1920 .duration_since(std::time::UNIX_EPOCH)
1921 .unwrap()
1922 .as_secs() as i64;
1923
1924 let mut removed_count = 0;
1925
1926 state.pending_updates.retain(|_pda_address, updates| {
1928 let original_len = updates.len();
1929
1930 updates.retain(|update| {
1931 let age = now - update.queued_at;
1932 age <= PENDING_UPDATE_TTL_SECONDS
1933 });
1934
1935 removed_count += original_len - updates.len();
1936
1937 !updates.is_empty()
1939 });
1940
1941 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
1943
1944 if removed_count > 0 {
1945 tracing::info!(
1946 "Cleaned up {} expired pending updates (TTL: {}s)",
1947 removed_count,
1948 PENDING_UPDATE_TTL_SECONDS
1949 );
1950 }
1951
1952 removed_count
1953 }
1954
1955 #[cfg_attr(feature = "otel", instrument(
1990 name = "vm.queue_account_update",
1991 skip(self, account_data),
1992 fields(
1993 pda = %pda_address,
1994 account_type = %account_type,
1995 slot = slot,
1996 )
1997 ))]
1998 pub fn queue_account_update(
1999 &mut self,
2000 state_id: u32,
2001 pda_address: String,
2002 account_type: String,
2003 account_data: Value,
2004 slot: u64,
2005 signature: String,
2006 ) -> Result<()> {
2007 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2009 tracing::warn!(
2010 "Pending queue size limit reached ({}), cleaning up expired updates",
2011 MAX_PENDING_UPDATES_TOTAL
2012 );
2013 let removed = self.cleanup_expired_pending_updates(state_id);
2014
2015 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2017 tracing::warn!(
2018 "Still at limit after cleanup (removed {}), will drop oldest update",
2019 removed
2020 );
2021 self.drop_oldest_pending_update(state_id)?;
2022 }
2023 }
2024
2025 let state = self
2026 .states
2027 .get_mut(&state_id)
2028 .ok_or("State table not found")?;
2029
2030 let pending = PendingAccountUpdate {
2031 account_type,
2032 pda_address: pda_address.clone(),
2033 account_data,
2034 slot,
2035 signature,
2036 queued_at: std::time::SystemTime::now()
2037 .duration_since(std::time::UNIX_EPOCH)
2038 .unwrap()
2039 .as_secs() as i64,
2040 };
2041
2042 {
2044 let mut updates = state
2045 .pending_updates
2046 .entry(pda_address.clone())
2047 .or_insert_with(Vec::new);
2048
2049 let original_len = updates.len();
2052 updates.retain(|existing| {
2053 existing.slot > slot
2055 });
2056 let removed_by_dedup = original_len - updates.len();
2057
2058 if removed_by_dedup > 0 {
2059 self.pending_queue_size = self
2060 .pending_queue_size
2061 .saturating_sub(removed_by_dedup as u64);
2062 tracing::debug!(
2063 "Deduplicated {} older update(s) for PDA {} (new slot: {})",
2064 removed_by_dedup,
2065 pda_address,
2066 slot
2067 );
2068 }
2069
2070 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2072 tracing::warn!(
2073 "Per-PDA limit reached for {} ({} updates), dropping oldest",
2074 pda_address,
2075 updates.len()
2076 );
2077 updates.remove(0);
2078 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2079 }
2080
2081 updates.push(pending);
2082 self.pending_queue_size += 1;
2083 }
2084
2085 Ok(())
2086 }
2087
2088 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2090 let state = self.states.get(&state_id)?;
2091
2092 let now = std::time::SystemTime::now()
2093 .duration_since(std::time::UNIX_EPOCH)
2094 .unwrap()
2095 .as_secs() as i64;
2096
2097 let mut total_updates = 0;
2098 let mut oldest_timestamp = now;
2099 let mut largest_pda_queue = 0;
2100 let mut estimated_memory = 0;
2101
2102 for entry in state.pending_updates.iter() {
2103 let (_, updates) = entry.pair();
2104 total_updates += updates.len();
2105 largest_pda_queue = largest_pda_queue.max(updates.len());
2106
2107 for update in updates.iter() {
2108 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2109 estimated_memory += update.account_type.len() +
2111 update.pda_address.len() +
2112 update.signature.len() +
2113 16 + estimate_json_size(&update.account_data);
2115 }
2116 }
2117
2118 Some(PendingQueueStats {
2119 total_updates,
2120 unique_pdas: state.pending_updates.len(),
2121 oldest_age_seconds: now - oldest_timestamp,
2122 largest_pda_queue_size: largest_pda_queue,
2123 estimated_memory_bytes: estimated_memory,
2124 })
2125 }
2126
2127 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2129 let state = self
2130 .states
2131 .get_mut(&state_id)
2132 .ok_or("State table not found")?;
2133
2134 let mut oldest_pda: Option<String> = None;
2135 let mut oldest_timestamp = i64::MAX;
2136
2137 for entry in state.pending_updates.iter() {
2139 let (pda, updates) = entry.pair();
2140 if let Some(update) = updates.first() {
2141 if update.queued_at < oldest_timestamp {
2142 oldest_timestamp = update.queued_at;
2143 oldest_pda = Some(pda.clone());
2144 }
2145 }
2146 }
2147
2148 if let Some(pda) = oldest_pda {
2150 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2151 if !updates.is_empty() {
2152 updates.remove(0);
2153 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2154
2155 if updates.is_empty() {
2157 drop(updates);
2158 state.pending_updates.remove(&pda);
2159 }
2160 }
2161 }
2162 }
2163
2164 Ok(())
2165 }
2166
2167 fn flush_pending_updates(
2172 &mut self,
2173 state_id: u32,
2174 pda_address: &str,
2175 ) -> Result<Vec<PendingAccountUpdate>> {
2176 let state = self
2177 .states
2178 .get_mut(&state_id)
2179 .ok_or("State table not found")?;
2180
2181 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2182 let count = pending_updates.len();
2183 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2184 Ok(pending_updates)
2185 } else {
2186 Ok(Vec::new())
2187 }
2188 }
2189
2190 pub fn try_pda_reverse_lookup(
2192 &mut self,
2193 state_id: u32,
2194 lookup_name: &str,
2195 pda_address: &str,
2196 ) -> Option<String> {
2197 let state = self.states.get_mut(&state_id)?;
2198
2199 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2200 if let Some(value) = lookup.lookup(pda_address) {
2201 self.pda_cache_hits += 1;
2202 return Some(value);
2203 }
2204 }
2205
2206 self.pda_cache_misses += 1;
2207 None
2208 }
2209
2210 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2217 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2218 }
2219
2220 fn evaluate_computed_expr_with_env(
2222 &self,
2223 expr: &ComputedExpr,
2224 state: &Value,
2225 env: &std::collections::HashMap<String, Value>,
2226 ) -> Result<Value> {
2227 match expr {
2228 ComputedExpr::FieldRef { path } => {
2229 self.get_field_from_state(state, path)
2230 }
2231
2232 ComputedExpr::Var { name } => {
2233 env.get(name)
2234 .cloned()
2235 .ok_or_else(|| format!("Undefined variable: {}", name).into())
2236 }
2237
2238 ComputedExpr::Let { name, value, body } => {
2239 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2240 let mut new_env = env.clone();
2241 new_env.insert(name.clone(), val);
2242 self.evaluate_computed_expr_with_env(body, state, &new_env)
2243 }
2244
2245 ComputedExpr::If { condition, then_branch, else_branch } => {
2246 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2247 if self.value_to_bool(&cond_val) {
2248 self.evaluate_computed_expr_with_env(then_branch, state, env)
2249 } else {
2250 self.evaluate_computed_expr_with_env(else_branch, state, env)
2251 }
2252 }
2253
2254 ComputedExpr::None => Ok(Value::Null),
2255
2256 ComputedExpr::Some { value } => {
2257 self.evaluate_computed_expr_with_env(value, state, env)
2258 }
2259
2260 ComputedExpr::Slice { expr, start, end } => {
2261 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2262 match val {
2263 Value::Array(arr) => {
2264 let slice: Vec<Value> = arr.get(*start..*end)
2265 .unwrap_or(&[])
2266 .to_vec();
2267 Ok(Value::Array(slice))
2268 }
2269 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2270 }
2271 }
2272
2273 ComputedExpr::Index { expr, index } => {
2274 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2275 match val {
2276 Value::Array(arr) => {
2277 Ok(arr.get(*index).cloned().unwrap_or(Value::Null))
2278 }
2279 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2280 }
2281 }
2282
2283 ComputedExpr::U64FromLeBytes { bytes } => {
2284 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2285 let byte_vec = self.value_to_bytes(&val)?;
2286 if byte_vec.len() < 8 {
2287 return Err(format!("u64::from_le_bytes requires 8 bytes, got {}", byte_vec.len()).into());
2288 }
2289 let arr: [u8; 8] = byte_vec[..8].try_into()
2290 .map_err(|_| "Failed to convert to [u8; 8]")?;
2291 Ok(json!(u64::from_le_bytes(arr)))
2292 }
2293
2294 ComputedExpr::U64FromBeBytes { bytes } => {
2295 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2296 let byte_vec = self.value_to_bytes(&val)?;
2297 if byte_vec.len() < 8 {
2298 return Err(format!("u64::from_be_bytes requires 8 bytes, got {}", byte_vec.len()).into());
2299 }
2300 let arr: [u8; 8] = byte_vec[..8].try_into()
2301 .map_err(|_| "Failed to convert to [u8; 8]")?;
2302 Ok(json!(u64::from_be_bytes(arr)))
2303 }
2304
2305 ComputedExpr::ByteArray { bytes } => {
2306 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2307 }
2308
2309 ComputedExpr::Closure { param, body } => {
2310 Ok(json!({
2313 "__closure": {
2314 "param": param,
2315 "body": serde_json::to_value(body).unwrap_or(Value::Null)
2316 }
2317 }))
2318 }
2319
2320 ComputedExpr::Unary { op, expr } => {
2321 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2322 self.apply_unary_op(op, &val)
2323 }
2324
2325 ComputedExpr::JsonToBytes { expr } => {
2326 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2327 let bytes = self.value_to_bytes(&val)?;
2329 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2330 }
2331
2332 ComputedExpr::UnwrapOr { expr, default } => {
2333 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2334 if val.is_null() {
2335 Ok(default.clone())
2336 } else {
2337 Ok(val)
2338 }
2339 }
2340
2341 ComputedExpr::Binary { op, left, right } => {
2342 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2343 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2344 self.apply_binary_op(op, &l, &r)
2345 }
2346
2347 ComputedExpr::Cast { expr, to_type } => {
2348 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2349 self.apply_cast(&val, to_type)
2350 }
2351
2352 ComputedExpr::MethodCall { expr, method, args } => {
2353 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2354 if method == "map" && args.len() == 1 {
2356 if let ComputedExpr::Closure { param, body } = &args[0] {
2357 if val.is_null() {
2359 return Ok(Value::Null);
2360 }
2361 let mut closure_env = env.clone();
2363 closure_env.insert(param.clone(), val);
2364 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2365 }
2366 }
2367 let evaluated_args: Vec<Value> = args
2368 .iter()
2369 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2370 .collect::<Result<Vec<_>>>()?;
2371 self.apply_method_call(&val, method, &evaluated_args)
2372 }
2373
2374 ComputedExpr::Literal { value } => {
2375 Ok(value.clone())
2376 }
2377
2378 ComputedExpr::Paren { expr } => {
2379 self.evaluate_computed_expr_with_env(expr, state, env)
2380 }
2381 }
2382 }
2383
2384 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2386 match val {
2387 Value::Array(arr) => {
2388 arr.iter()
2389 .map(|v| v.as_u64().map(|n| n as u8).ok_or_else(|| "Array element not a valid byte".into()))
2390 .collect()
2391 }
2392 Value::String(s) => {
2393 if s.starts_with("0x") || s.starts_with("0X") {
2395 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2396 } else {
2397 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2398 }
2399 }
2400 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2401 }
2402 }
2403
2404 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2406 use crate::ast::UnaryOp;
2407 match op {
2408 UnaryOp::Not => {
2409 Ok(json!(!self.value_to_bool(val)))
2410 }
2411 UnaryOp::ReverseBits => {
2412 match val.as_u64() {
2413 Some(n) => Ok(json!(n.reverse_bits())),
2414 None => match val.as_i64() {
2415 Some(n) => Ok(json!((n as u64).reverse_bits())),
2416 None => Err("reverse_bits requires an integer".into()),
2417 }
2418 }
2419 }
2420 }
2421 }
2422
2423 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2425 let segments: Vec<&str> = path.split('.').collect();
2426 let mut current = state;
2427
2428 for segment in segments {
2429 match current.get(segment) {
2430 Some(v) => current = v,
2431 None => return Ok(Value::Null),
2432 }
2433 }
2434
2435 Ok(current.clone())
2436 }
2437
2438 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2440 match op {
2441 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2443 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2444 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2445 BinaryOp::Div => {
2446 if let Some(r) = right.as_i64() {
2448 if r == 0 {
2449 return Err("Division by zero".into());
2450 }
2451 }
2452 if let Some(r) = right.as_f64() {
2453 if r == 0.0 {
2454 return Err("Division by zero".into());
2455 }
2456 }
2457 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2458 }
2459 BinaryOp::Mod => {
2460 match (left.as_i64(), right.as_i64()) {
2462 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2463 (None, _) | (_, None) => {
2464 match (left.as_u64(), right.as_u64()) {
2465 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2466 _ => Err("Modulo requires non-zero integer operands".into()),
2467 }
2468 }
2469 _ => Err("Modulo by zero".into()),
2470 }
2471 }
2472
2473 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2475 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2476 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2477 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2478 BinaryOp::Eq => Ok(json!(left == right)),
2479 BinaryOp::Ne => Ok(json!(left != right)),
2480
2481 BinaryOp::And => {
2483 let l_bool = self.value_to_bool(left);
2484 let r_bool = self.value_to_bool(right);
2485 Ok(json!(l_bool && r_bool))
2486 }
2487 BinaryOp::Or => {
2488 let l_bool = self.value_to_bool(left);
2489 let r_bool = self.value_to_bool(right);
2490 Ok(json!(l_bool || r_bool))
2491 }
2492
2493 BinaryOp::Xor => {
2495 match (left.as_u64(), right.as_u64()) {
2496 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2497 _ => match (left.as_i64(), right.as_i64()) {
2498 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2499 _ => Err("XOR requires integer operands".into()),
2500 }
2501 }
2502 }
2503 BinaryOp::BitAnd => {
2504 match (left.as_u64(), right.as_u64()) {
2505 (Some(a), Some(b)) => Ok(json!(a & b)),
2506 _ => match (left.as_i64(), right.as_i64()) {
2507 (Some(a), Some(b)) => Ok(json!(a & b)),
2508 _ => Err("BitAnd requires integer operands".into()),
2509 }
2510 }
2511 }
2512 BinaryOp::BitOr => {
2513 match (left.as_u64(), right.as_u64()) {
2514 (Some(a), Some(b)) => Ok(json!(a | b)),
2515 _ => match (left.as_i64(), right.as_i64()) {
2516 (Some(a), Some(b)) => Ok(json!(a | b)),
2517 _ => Err("BitOr requires integer operands".into()),
2518 }
2519 }
2520 }
2521 BinaryOp::Shl => {
2522 match (left.as_u64(), right.as_u64()) {
2523 (Some(a), Some(b)) => Ok(json!(a << b)),
2524 _ => match (left.as_i64(), right.as_i64()) {
2525 (Some(a), Some(b)) => Ok(json!(a << b)),
2526 _ => Err("Shl requires integer operands".into()),
2527 }
2528 }
2529 }
2530 BinaryOp::Shr => {
2531 match (left.as_u64(), right.as_u64()) {
2532 (Some(a), Some(b)) => Ok(json!(a >> b)),
2533 _ => match (left.as_i64(), right.as_i64()) {
2534 (Some(a), Some(b)) => Ok(json!(a >> b)),
2535 _ => Err("Shr requires integer operands".into()),
2536 }
2537 }
2538 }
2539 }
2540 }
2541
2542 fn numeric_op<F1, F2>(
2544 &self,
2545 left: &Value,
2546 right: &Value,
2547 int_op: F1,
2548 float_op: F2,
2549 ) -> Result<Value>
2550 where
2551 F1: Fn(i64, i64) -> i64,
2552 F2: Fn(f64, f64) -> f64,
2553 {
2554 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2556 return Ok(json!(int_op(a, b)));
2557 }
2558
2559 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2561 return Ok(json!(int_op(a as i64, b as i64)));
2563 }
2564
2565 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2567 return Ok(json!(float_op(a, b)));
2568 }
2569
2570 if left.is_null() || right.is_null() {
2572 return Ok(Value::Null);
2573 }
2574
2575 Err(format!(
2576 "Cannot perform numeric operation on {:?} and {:?}",
2577 left, right
2578 )
2579 .into())
2580 }
2581
2582 fn comparison_op<F1, F2>(
2584 &self,
2585 left: &Value,
2586 right: &Value,
2587 int_cmp: F1,
2588 float_cmp: F2,
2589 ) -> Result<Value>
2590 where
2591 F1: Fn(i64, i64) -> bool,
2592 F2: Fn(f64, f64) -> bool,
2593 {
2594 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2596 return Ok(json!(int_cmp(a, b)));
2597 }
2598
2599 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2601 return Ok(json!(int_cmp(a as i64, b as i64)));
2602 }
2603
2604 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2606 return Ok(json!(float_cmp(a, b)));
2607 }
2608
2609 if left.is_null() || right.is_null() {
2611 return Ok(json!(false));
2612 }
2613
2614 Err(format!(
2615 "Cannot compare {:?} and {:?}",
2616 left, right
2617 )
2618 .into())
2619 }
2620
2621 fn value_to_bool(&self, value: &Value) -> bool {
2623 match value {
2624 Value::Null => false,
2625 Value::Bool(b) => *b,
2626 Value::Number(n) => {
2627 if let Some(i) = n.as_i64() {
2628 i != 0
2629 } else if let Some(f) = n.as_f64() {
2630 f != 0.0
2631 } else {
2632 true
2633 }
2634 }
2635 Value::String(s) => !s.is_empty(),
2636 Value::Array(arr) => !arr.is_empty(),
2637 Value::Object(obj) => !obj.is_empty(),
2638 }
2639 }
2640
2641 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
2643 match to_type {
2644 "i8" | "i16" | "i32" | "i64" | "isize" => {
2645 if let Some(n) = value.as_i64() {
2646 Ok(json!(n))
2647 } else if let Some(n) = value.as_u64() {
2648 Ok(json!(n as i64))
2649 } else if let Some(n) = value.as_f64() {
2650 Ok(json!(n as i64))
2651 } else if let Some(s) = value.as_str() {
2652 s.parse::<i64>()
2653 .map(|n| json!(n))
2654 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
2655 } else {
2656 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2657 }
2658 }
2659 "u8" | "u16" | "u32" | "u64" | "usize" => {
2660 if let Some(n) = value.as_u64() {
2661 Ok(json!(n))
2662 } else if let Some(n) = value.as_i64() {
2663 Ok(json!(n as u64))
2664 } else if let Some(n) = value.as_f64() {
2665 Ok(json!(n as u64))
2666 } else if let Some(s) = value.as_str() {
2667 s.parse::<u64>()
2668 .map(|n| json!(n))
2669 .map_err(|e| format!("Cannot parse '{}' as unsigned integer: {}", s, e).into())
2670 } else {
2671 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2672 }
2673 }
2674 "f32" | "f64" => {
2675 if let Some(n) = value.as_f64() {
2676 Ok(json!(n))
2677 } else if let Some(n) = value.as_i64() {
2678 Ok(json!(n as f64))
2679 } else if let Some(n) = value.as_u64() {
2680 Ok(json!(n as f64))
2681 } else if let Some(s) = value.as_str() {
2682 s.parse::<f64>()
2683 .map(|n| json!(n))
2684 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
2685 } else {
2686 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2687 }
2688 }
2689 "String" | "string" => {
2690 Ok(json!(value.to_string()))
2691 }
2692 "bool" => {
2693 Ok(json!(self.value_to_bool(value)))
2694 }
2695 _ => {
2696 Ok(value.clone())
2698 }
2699 }
2700 }
2701
2702 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
2704 match method {
2705 "unwrap_or" => {
2706 if value.is_null() && !args.is_empty() {
2707 Ok(args[0].clone())
2708 } else {
2709 Ok(value.clone())
2710 }
2711 }
2712 "unwrap_or_default" => {
2713 if value.is_null() {
2714 Ok(json!(0))
2716 } else {
2717 Ok(value.clone())
2718 }
2719 }
2720 "is_some" => {
2721 Ok(json!(!value.is_null()))
2722 }
2723 "is_none" => {
2724 Ok(json!(value.is_null()))
2725 }
2726 "abs" => {
2727 if let Some(n) = value.as_i64() {
2728 Ok(json!(n.abs()))
2729 } else if let Some(n) = value.as_f64() {
2730 Ok(json!(n.abs()))
2731 } else {
2732 Err(format!("Cannot call abs() on {:?}", value).into())
2733 }
2734 }
2735 "len" => {
2736 if let Some(s) = value.as_str() {
2737 Ok(json!(s.len()))
2738 } else if let Some(arr) = value.as_array() {
2739 Ok(json!(arr.len()))
2740 } else if let Some(obj) = value.as_object() {
2741 Ok(json!(obj.len()))
2742 } else {
2743 Err(format!("Cannot call len() on {:?}", value).into())
2744 }
2745 }
2746 "to_string" => {
2747 Ok(json!(value.to_string()))
2748 }
2749 "min" => {
2750 if args.is_empty() {
2751 return Err("min() requires an argument".into());
2752 }
2753 let other = &args[0];
2754 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2755 Ok(json!(a.min(b)))
2756 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2757 Ok(json!(a.min(b)))
2758 } else {
2759 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
2760 }
2761 }
2762 "max" => {
2763 if args.is_empty() {
2764 return Err("max() requires an argument".into());
2765 }
2766 let other = &args[0];
2767 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2768 Ok(json!(a.max(b)))
2769 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2770 Ok(json!(a.max(b)))
2771 } else {
2772 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
2773 }
2774 }
2775 "saturating_add" => {
2776 if args.is_empty() {
2777 return Err("saturating_add() requires an argument".into());
2778 }
2779 let other = &args[0];
2780 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2781 Ok(json!(a.saturating_add(b)))
2782 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2783 Ok(json!(a.saturating_add(b)))
2784 } else {
2785 Err(format!("Cannot call saturating_add() on {:?} and {:?}", value, other).into())
2786 }
2787 }
2788 "saturating_sub" => {
2789 if args.is_empty() {
2790 return Err("saturating_sub() requires an argument".into());
2791 }
2792 let other = &args[0];
2793 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2794 Ok(json!(a.saturating_sub(b)))
2795 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2796 Ok(json!(a.saturating_sub(b)))
2797 } else {
2798 Err(format!("Cannot call saturating_sub() on {:?} and {:?}", value, other).into())
2799 }
2800 }
2801 _ => {
2802 tracing::warn!("Unknown method call: {}() on {:?}", method, value);
2804 Ok(value.clone())
2805 }
2806 }
2807 }
2808
2809 pub fn evaluate_computed_fields_from_ast(
2812 &self,
2813 state: &mut Value,
2814 computed_field_specs: &[ComputedFieldSpec],
2815 ) -> Result<Vec<String>> {
2816 let mut updated_paths = Vec::new();
2817
2818 for spec in computed_field_specs {
2819 match self.evaluate_computed_expr(&spec.expression, state) {
2820 Ok(result) => {
2821 self.set_field_in_state(state, &spec.target_path, result)?;
2823 updated_paths.push(spec.target_path.clone());
2824 }
2825 Err(e) => {
2826 tracing::warn!(
2828 "Failed to evaluate computed field '{}': {}",
2829 spec.target_path,
2830 e
2831 );
2832 }
2833 }
2834 }
2835
2836 Ok(updated_paths)
2837 }
2838
2839 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
2841 let segments: Vec<&str> = path.split('.').collect();
2842
2843 if segments.is_empty() {
2844 return Err("Empty path".into());
2845 }
2846
2847 let mut current = state;
2849 for (i, segment) in segments.iter().enumerate() {
2850 if i == segments.len() - 1 {
2851 if let Some(obj) = current.as_object_mut() {
2853 obj.insert(segment.to_string(), value);
2854 return Ok(());
2855 } else {
2856 return Err(format!("Cannot set field '{}' on non-object", segment).into());
2857 }
2858 } else {
2859 if !current.is_object() {
2861 *current = json!({});
2862 }
2863 let obj = current.as_object_mut().unwrap();
2864 current = obj
2865 .entry(segment.to_string())
2866 .or_insert_with(|| json!({}));
2867 }
2868 }
2869
2870 Ok(())
2871 }
2872
2873 pub fn create_evaluator_from_specs(
2876 specs: Vec<ComputedFieldSpec>,
2877 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
2878 move |state: &mut Value| {
2879 let vm = VmContext::new();
2882 vm.evaluate_computed_fields_from_ast(state, &specs)?;
2883 Ok(())
2884 }
2885 }
2886}
2887
2888impl Default for VmContext {
2889 fn default() -> Self {
2890 Self::new()
2891 }
2892}
2893
2894impl crate::resolvers::ReverseLookupUpdater for VmContext {
2896 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
2897 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
2899 .unwrap_or_else(|e| {
2900 tracing::error!("Failed to update PDA reverse lookup: {}", e);
2901 Vec::new()
2902 })
2903 }
2904
2905 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
2906 self.flush_pending_updates(0, pda_address)
2908 .unwrap_or_else(|e| {
2909 tracing::error!("Failed to flush pending updates: {}", e);
2910 Vec::new()
2911 })
2912 }
2913}