Skip to main content

hyperstack_interpreter/
scheduler.rs

1use crate::ast::{ComparisonOp, ResolverCondition, UrlTemplatePart};
2use crate::vm::ScheduledCallback;
3use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap, HashSet};
6
7pub const MAX_RETRIES: u32 = 100;
8
9type DedupKey = (String, String, String);
10
11pub struct SlotScheduler {
12    callbacks: BTreeMap<u64, Vec<ScheduledCallback>>,
13    registered: HashSet<DedupKey>,
14    /// Reverse index: dedup_key → slot, for O(1) targeted removal in register().
15    slot_index: HashMap<DedupKey, u64>,
16}
17
18impl Default for SlotScheduler {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl SlotScheduler {
25    pub fn new() -> Self {
26        Self {
27            callbacks: BTreeMap::new(),
28            registered: HashSet::new(),
29            slot_index: HashMap::new(),
30        }
31    }
32
33    pub fn register(&mut self, target_slot: u64, callback: ScheduledCallback) {
34        let dedup_key = Self::dedup_key(&callback);
35        if let Some(old_slot) = self.slot_index.remove(&dedup_key) {
36            if let Some(cbs) = self.callbacks.get_mut(&old_slot) {
37                cbs.retain(|cb| Self::dedup_key(cb) != dedup_key);
38                if cbs.is_empty() {
39                    self.callbacks.remove(&old_slot);
40                }
41            }
42        }
43        self.registered.insert(dedup_key.clone());
44        self.slot_index.insert(dedup_key, target_slot);
45        self.callbacks
46            .entry(target_slot)
47            .or_default()
48            .push(callback);
49    }
50
51    pub fn take_due(&mut self, current_slot: u64) -> Vec<ScheduledCallback> {
52        let future = self.callbacks.split_off(&current_slot.saturating_add(1));
53        let due = std::mem::replace(&mut self.callbacks, future);
54
55        let mut result = Vec::new();
56        for (_slot, callbacks) in due {
57            for cb in callbacks {
58                let dedup_key = Self::dedup_key(&cb);
59                self.registered.remove(&dedup_key);
60                self.slot_index.remove(&dedup_key);
61                result.push(cb);
62            }
63        }
64        result
65    }
66
67    pub fn re_register(&mut self, callback: ScheduledCallback, next_slot: u64) {
68        self.register(next_slot, callback);
69    }
70
71    pub fn pending_count(&self) -> usize {
72        self.callbacks.values().map(|v| v.len()).sum()
73    }
74
75    fn dedup_key(cb: &ScheduledCallback) -> DedupKey {
76        let resolver_key = serde_json::to_string(&cb.resolver).unwrap_or_default();
77        let condition_key = cb.condition.as_ref()
78            .map(|c| serde_json::to_string(c).unwrap_or_default())
79            .unwrap_or_default();
80        let pk_key = cb.primary_key.to_string();
81        (cb.entity_name.clone(), pk_key, format!("{}:{}", resolver_key, condition_key))
82    }
83}
84
85pub fn evaluate_condition(condition: &ResolverCondition, state: &Value) -> bool {
86    let field_val = get_value_at_path(state, &condition.field_path).unwrap_or(Value::Null);
87    evaluate_comparison(&field_val, &condition.op, &condition.value)
88}
89
90/// NON_ALPHANUMERIC minus RFC 3986 unreserved chars (`-`, `.`, `_`, `~`) that are safe in URLs.
91/// NOTE: This uses path-segment encoding for all field references, including those in query
92/// parameter positions. Query strings permit additional chars (`!`, `$`, `'`, `(`, `)`, `+`, etc.)
93/// that will be over-encoded. This is safe for the current numeric/base58 use-cases but may need
94/// a path-vs-query split if general-purpose URL templates are needed.
95const URL_SEGMENT_SET: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
96
97pub fn build_url_from_template(template: &[UrlTemplatePart], state: &Value) -> Option<String> {
98    let mut url = String::new();
99    for part in template {
100        match part {
101            UrlTemplatePart::Literal(s) => url.push_str(s),
102            UrlTemplatePart::FieldRef(path) => {
103                let val = get_value_at_path(state, path)?;
104                if val.is_null() {
105                    return None;
106                }
107                let raw = match val.as_str() {
108                    Some(s) => s.to_string(),
109                    None => val.to_string().trim_matches('"').to_string(),
110                };
111                let encoded = utf8_percent_encode(&raw, URL_SEGMENT_SET).to_string();
112                url.push_str(&encoded);
113            }
114        }
115    }
116    Some(url)
117}
118
119pub fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
120    let mut current = value;
121    for segment in path.split('.') {
122        current = current.get(segment)?;
123    }
124    Some(current.clone())
125}
126
127/// Mirrors the VM's i64 → u64 → f64 comparison cascade to avoid divergent
128/// condition evaluation between registration time and execution time.
129fn evaluate_comparison(field_value: &Value, op: &ComparisonOp, condition_value: &Value) -> bool {
130    match op {
131        ComparisonOp::Equal => field_value == condition_value,
132        ComparisonOp::NotEqual => field_value != condition_value,
133        ComparisonOp::GreaterThan => compare_numeric(field_value, condition_value, |a, b| a > b, |a, b| a > b, |a, b| a > b),
134        ComparisonOp::GreaterThanOrEqual => compare_numeric(field_value, condition_value, |a, b| a >= b, |a, b| a >= b, |a, b| a >= b),
135        ComparisonOp::LessThan => compare_numeric(field_value, condition_value, |a, b| a < b, |a, b| a < b, |a, b| a < b),
136        ComparisonOp::LessThanOrEqual => compare_numeric(field_value, condition_value, |a, b| a <= b, |a, b| a <= b, |a, b| a <= b),
137    }
138}
139
140fn compare_numeric(
141    a: &Value,
142    b: &Value,
143    cmp_i64: fn(i64, i64) -> bool,
144    cmp_u64: fn(u64, u64) -> bool,
145    cmp_f64: fn(f64, f64) -> bool,
146) -> bool {
147    match (a.as_i64(), b.as_i64()) {
148        (Some(a), Some(b)) => cmp_i64(a, b),
149        _ => match (a.as_u64(), b.as_u64()) {
150            (Some(a), Some(b)) => cmp_u64(a, b),
151            _ => match (a.as_f64(), b.as_f64()) {
152                (Some(a), Some(b)) => cmp_f64(a, b),
153                _ => false,
154            },
155        },
156    }
157}