Skip to main content

robinpath_modules/modules/
retry_mod.rs

1use robinpath::{RobinPath, Value};
2use std::sync::{Arc, Mutex};
3use std::collections::HashMap;
4
5struct CircuitBreaker {
6    failures: u32,
7    threshold: u32,
8    reset_timeout: u64,
9    state: String,
10    last_failure: u64,
11    successes_in_half_open: u32,
12    half_open_threshold: u32,
13}
14
15fn now_ms() -> u64 {
16    std::time::SystemTime::now()
17        .duration_since(std::time::UNIX_EPOCH)
18        .unwrap_or_default()
19        .as_millis() as u64
20}
21
22pub fn register(rp: &mut RobinPath) {
23    let state = Arc::new(Mutex::new(HashMap::<String, CircuitBreaker>::new()));
24
25    // retry.withBackoff attempt initialDelay? factor? maxDelay? jitter? → delay ms
26    rp.register_builtin("retry.withBackoff", |args, _| {
27        let attempt = args.first().map(|v| v.to_number() as u32).unwrap_or(0);
28        let initial_delay = args.get(1).map(|v| v.to_number()).unwrap_or(1000.0);
29        let factor = args.get(2).map(|v| v.to_number()).unwrap_or(2.0);
30        let max_delay = args.get(3).map(|v| v.to_number()).unwrap_or(30000.0);
31        let jitter = args.get(4).map(|v| match v {
32            Value::Bool(b) => *b,
33            Value::String(s) => s != "false",
34            _ => true,
35        }).unwrap_or(true);
36
37        let mut delay = initial_delay * factor.powi(attempt as i32);
38        delay = delay.min(max_delay);
39        if jitter {
40            // Deterministic "jitter" based on attempt number (no random in sync context)
41            let jitter_factor = 0.75 + (attempt as f64 % 4.0) * 0.0625;
42            delay *= jitter_factor;
43        }
44        Ok(Value::Number(delay.round()))
45    });
46
47    // retry.isRetryable statusCode → bool
48    rp.register_builtin("retry.isRetryable", |args, _| {
49        let status = args.first().map(|v| v.to_number() as u16).unwrap_or(0);
50        let retryable = matches!(status, 408 | 429 | 500 | 502 | 503 | 504);
51        Ok(Value::Bool(retryable))
52    });
53
54    // retry.delay ms → true (sync sleep)
55    rp.register_builtin("retry.delay", |args, _| {
56        let ms = args.first().map(|v| v.to_number() as u64).unwrap_or(1000);
57        std::thread::sleep(std::time::Duration::from_millis(ms));
58        Ok(Value::Bool(true))
59    });
60
61    // retry.attempts maxAttempts? initialDelay? factor? → array of {attempt, delay, totalWait}
62    rp.register_builtin("retry.attempts", |args, _| {
63        let max_attempts = args.first().map(|v| v.to_number() as u32).unwrap_or(3);
64        let initial_delay = args.get(1).map(|v| v.to_number()).unwrap_or(1000.0);
65        let factor = args.get(2).map(|v| v.to_number()).unwrap_or(2.0);
66        let mut total_wait = 0.0_f64;
67        let mut result = Vec::new();
68        for i in 0..max_attempts {
69            let d = if i == 0 { 0.0 } else { (initial_delay * factor.powi((i - 1) as i32)).round() };
70            total_wait += d;
71            let mut obj = indexmap::IndexMap::new();
72            obj.insert("attempt".to_string(), Value::Number((i + 1) as f64));
73            obj.insert("delay".to_string(), Value::Number(d));
74            obj.insert("totalWait".to_string(), Value::Number(total_wait));
75            result.push(Value::Object(obj));
76        }
77        Ok(Value::Array(result))
78    });
79
80    // retry.createBreaker name threshold? resetTimeout? → {name, state, threshold, resetTimeout}
81    let s = state.clone();
82    rp.register_builtin("retry.createBreaker", move |args, _| {
83        let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
84        let threshold = args.get(1).map(|v| v.to_number() as u32).unwrap_or(5);
85        let reset_timeout = args.get(2).map(|v| v.to_number() as u64).unwrap_or(60000);
86        let breaker = CircuitBreaker {
87            failures: 0,
88            threshold,
89            reset_timeout,
90            state: "closed".to_string(),
91            last_failure: 0,
92            successes_in_half_open: 0,
93            half_open_threshold: 1,
94        };
95        s.lock().unwrap().insert(name.clone(), breaker);
96        let mut obj = indexmap::IndexMap::new();
97        obj.insert("name".to_string(), Value::String(name));
98        obj.insert("state".to_string(), Value::String("closed".to_string()));
99        obj.insert("threshold".to_string(), Value::Number(threshold as f64));
100        obj.insert("resetTimeout".to_string(), Value::Number(reset_timeout as f64));
101        Ok(Value::Object(obj))
102    });
103
104    // retry.breakerState name → {name, state, failures, threshold, resetTimeout}
105    let s = state.clone();
106    rp.register_builtin("retry.breakerState", move |args, _| {
107        let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
108        let mut breakers = s.lock().unwrap();
109        if let Some(b) = breakers.get_mut(&name) {
110            let now = now_ms();
111            if b.state == "open" && now - b.last_failure >= b.reset_timeout {
112                b.state = "half-open".to_string();
113                b.successes_in_half_open = 0;
114            }
115            let mut obj = indexmap::IndexMap::new();
116            obj.insert("name".to_string(), Value::String(name));
117            obj.insert("state".to_string(), Value::String(b.state.clone()));
118            obj.insert("failures".to_string(), Value::Number(b.failures as f64));
119            obj.insert("threshold".to_string(), Value::Number(b.threshold as f64));
120            obj.insert("resetTimeout".to_string(), Value::Number(b.reset_timeout as f64));
121            Ok(Value::Object(obj))
122        } else {
123            let mut obj = indexmap::IndexMap::new();
124            obj.insert("name".to_string(), Value::String(name));
125            obj.insert("state".to_string(), Value::String("unknown".to_string()));
126            Ok(Value::Object(obj))
127        }
128    });
129
130    // retry.breakerRecord name success → {name, state, failures}
131    let s = state.clone();
132    rp.register_builtin("retry.breakerRecord", move |args, _| {
133        let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
134        let success = args.get(1).map(|v| match v {
135            Value::Bool(b) => *b,
136            Value::String(s) => s != "false" && s != "failure",
137            _ => true,
138        }).unwrap_or(true);
139        let mut breakers = s.lock().unwrap();
140        let b = breakers.get_mut(&name)
141            .ok_or_else(|| format!("Circuit breaker \"{}\" not found. Create it first.", name))?;
142        let now = now_ms();
143        if b.state == "open" && now - b.last_failure >= b.reset_timeout {
144            b.state = "half-open".to_string();
145            b.successes_in_half_open = 0;
146        }
147        if success {
148            if b.state == "half-open" {
149                b.successes_in_half_open += 1;
150                if b.successes_in_half_open >= b.half_open_threshold {
151                    b.state = "closed".to_string();
152                    b.failures = 0;
153                }
154            } else {
155                b.failures = 0;
156            }
157        } else {
158            b.failures += 1;
159            b.last_failure = now;
160            if b.failures >= b.threshold {
161                b.state = "open".to_string();
162            }
163        }
164        let mut obj = indexmap::IndexMap::new();
165        obj.insert("name".to_string(), Value::String(name));
166        obj.insert("state".to_string(), Value::String(b.state.clone()));
167        obj.insert("failures".to_string(), Value::Number(b.failures as f64));
168        Ok(Value::Object(obj))
169    });
170
171    // retry.breakerAllow name → bool
172    let s = state.clone();
173    rp.register_builtin("retry.breakerAllow", move |args, _| {
174        let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
175        let mut breakers = s.lock().unwrap();
176        if let Some(b) = breakers.get_mut(&name) {
177            if b.state == "open" {
178                let now = now_ms();
179                if now - b.last_failure >= b.reset_timeout {
180                    b.state = "half-open".to_string();
181                    b.successes_in_half_open = 0;
182                    Ok(Value::Bool(true))
183                } else {
184                    Ok(Value::Bool(false))
185                }
186            } else {
187                Ok(Value::Bool(true))
188            }
189        } else {
190            Ok(Value::Bool(true))
191        }
192    });
193
194    // retry.breakerReset name → bool
195    let s = state.clone();
196    rp.register_builtin("retry.breakerReset", move |args, _| {
197        let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
198        let mut breakers = s.lock().unwrap();
199        if let Some(b) = breakers.get_mut(&name) {
200            b.failures = 0;
201            b.state = "closed".to_string();
202            b.last_failure = 0;
203            b.successes_in_half_open = 0;
204            Ok(Value::Bool(true))
205        } else {
206            Ok(Value::Bool(false))
207        }
208    });
209}