1use crate::ast::{ComparisonOp, ResolverCondition, UrlTemplatePart};
2use crate::vm::ScheduledCallback;
3use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap, HashSet};
6
7pub const MAX_RETRIES: u32 = 100;
8
9type DedupKey = (String, String, String);
10
11pub struct SlotScheduler {
12 callbacks: BTreeMap<u64, Vec<ScheduledCallback>>,
13 registered: HashSet<DedupKey>,
14 slot_index: HashMap<DedupKey, u64>,
16}
17
18impl Default for SlotScheduler {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl SlotScheduler {
25 pub fn new() -> Self {
26 Self {
27 callbacks: BTreeMap::new(),
28 registered: HashSet::new(),
29 slot_index: HashMap::new(),
30 }
31 }
32
33 pub fn register(&mut self, target_slot: u64, callback: ScheduledCallback) {
34 let dedup_key = Self::dedup_key(&callback);
35 if let Some(old_slot) = self.slot_index.remove(&dedup_key) {
36 if let Some(cbs) = self.callbacks.get_mut(&old_slot) {
37 cbs.retain(|cb| Self::dedup_key(cb) != dedup_key);
38 if cbs.is_empty() {
39 self.callbacks.remove(&old_slot);
40 }
41 }
42 }
43 self.registered.insert(dedup_key.clone());
44 self.slot_index.insert(dedup_key, target_slot);
45 self.callbacks
46 .entry(target_slot)
47 .or_default()
48 .push(callback);
49 }
50
51 pub fn take_due(&mut self, current_slot: u64) -> Vec<ScheduledCallback> {
52 let future = self.callbacks.split_off(¤t_slot.saturating_add(1));
53 let due = std::mem::replace(&mut self.callbacks, future);
54
55 let mut result = Vec::new();
56 for (_slot, callbacks) in due {
57 for cb in callbacks {
58 let dedup_key = Self::dedup_key(&cb);
59 self.registered.remove(&dedup_key);
60 self.slot_index.remove(&dedup_key);
61 result.push(cb);
62 }
63 }
64 result
65 }
66
67 pub fn re_register(&mut self, callback: ScheduledCallback, next_slot: u64) {
68 self.register(next_slot, callback);
69 }
70
71 pub fn pending_count(&self) -> usize {
72 self.callbacks.values().map(|v| v.len()).sum()
73 }
74
75 fn dedup_key(cb: &ScheduledCallback) -> DedupKey {
76 let resolver_key = serde_json::to_string(&cb.resolver).unwrap_or_default();
77 let condition_key = cb.condition.as_ref()
78 .map(|c| serde_json::to_string(c).unwrap_or_default())
79 .unwrap_or_default();
80 let pk_key = cb.primary_key.to_string();
81 (cb.entity_name.clone(), pk_key, format!("{}:{}", resolver_key, condition_key))
82 }
83}
84
85pub fn evaluate_condition(condition: &ResolverCondition, state: &Value) -> bool {
86 let field_val = get_value_at_path(state, &condition.field_path).unwrap_or(Value::Null);
87 evaluate_comparison(&field_val, &condition.op, &condition.value)
88}
89
90const URL_SEGMENT_SET: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
96
97pub fn build_url_from_template(template: &[UrlTemplatePart], state: &Value) -> Option<String> {
98 let mut url = String::new();
99 for part in template {
100 match part {
101 UrlTemplatePart::Literal(s) => url.push_str(s),
102 UrlTemplatePart::FieldRef(path) => {
103 let val = get_value_at_path(state, path)?;
104 if val.is_null() {
105 return None;
106 }
107 let raw = match val.as_str() {
108 Some(s) => s.to_string(),
109 None => val.to_string().trim_matches('"').to_string(),
110 };
111 let encoded = utf8_percent_encode(&raw, URL_SEGMENT_SET).to_string();
112 url.push_str(&encoded);
113 }
114 }
115 }
116 Some(url)
117}
118
119pub fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
120 let mut current = value;
121 for segment in path.split('.') {
122 current = current.get(segment)?;
123 }
124 Some(current.clone())
125}
126
127fn evaluate_comparison(field_value: &Value, op: &ComparisonOp, condition_value: &Value) -> bool {
130 match op {
131 ComparisonOp::Equal => field_value == condition_value,
132 ComparisonOp::NotEqual => field_value != condition_value,
133 ComparisonOp::GreaterThan => compare_numeric(field_value, condition_value, |a, b| a > b, |a, b| a > b, |a, b| a > b),
134 ComparisonOp::GreaterThanOrEqual => compare_numeric(field_value, condition_value, |a, b| a >= b, |a, b| a >= b, |a, b| a >= b),
135 ComparisonOp::LessThan => compare_numeric(field_value, condition_value, |a, b| a < b, |a, b| a < b, |a, b| a < b),
136 ComparisonOp::LessThanOrEqual => compare_numeric(field_value, condition_value, |a, b| a <= b, |a, b| a <= b, |a, b| a <= b),
137 }
138}
139
140fn compare_numeric(
141 a: &Value,
142 b: &Value,
143 cmp_i64: fn(i64, i64) -> bool,
144 cmp_u64: fn(u64, u64) -> bool,
145 cmp_f64: fn(f64, f64) -> bool,
146) -> bool {
147 match (a.as_i64(), b.as_i64()) {
148 (Some(a), Some(b)) => cmp_i64(a, b),
149 _ => match (a.as_u64(), b.as_u64()) {
150 (Some(a), Some(b)) => cmp_u64(a, b),
151 _ => match (a.as_f64(), b.as_f64()) {
152 (Some(a), Some(b)) => cmp_f64(a, b),
153 _ => false,
154 },
155 },
156 }
157}