robinpath_modules/modules/
retry_mod.rs1use robinpath::{RobinPath, Value};
2use std::sync::{Arc, Mutex};
3use std::collections::HashMap;
4
5struct CircuitBreaker {
6 failures: u32,
7 threshold: u32,
8 reset_timeout: u64,
9 state: String,
10 last_failure: u64,
11 successes_in_half_open: u32,
12 half_open_threshold: u32,
13}
14
15fn now_ms() -> u64 {
16 std::time::SystemTime::now()
17 .duration_since(std::time::UNIX_EPOCH)
18 .unwrap_or_default()
19 .as_millis() as u64
20}
21
22pub fn register(rp: &mut RobinPath) {
23 let state = Arc::new(Mutex::new(HashMap::<String, CircuitBreaker>::new()));
24
25 rp.register_builtin("retry.withBackoff", |args, _| {
27 let attempt = args.first().map(|v| v.to_number() as u32).unwrap_or(0);
28 let initial_delay = args.get(1).map(|v| v.to_number()).unwrap_or(1000.0);
29 let factor = args.get(2).map(|v| v.to_number()).unwrap_or(2.0);
30 let max_delay = args.get(3).map(|v| v.to_number()).unwrap_or(30000.0);
31 let jitter = args.get(4).map(|v| match v {
32 Value::Bool(b) => *b,
33 Value::String(s) => s != "false",
34 _ => true,
35 }).unwrap_or(true);
36
37 let mut delay = initial_delay * factor.powi(attempt as i32);
38 delay = delay.min(max_delay);
39 if jitter {
40 let jitter_factor = 0.75 + (attempt as f64 % 4.0) * 0.0625;
42 delay *= jitter_factor;
43 }
44 Ok(Value::Number(delay.round()))
45 });
46
47 rp.register_builtin("retry.isRetryable", |args, _| {
49 let status = args.first().map(|v| v.to_number() as u16).unwrap_or(0);
50 let retryable = matches!(status, 408 | 429 | 500 | 502 | 503 | 504);
51 Ok(Value::Bool(retryable))
52 });
53
54 rp.register_builtin("retry.delay", |args, _| {
56 let ms = args.first().map(|v| v.to_number() as u64).unwrap_or(1000);
57 std::thread::sleep(std::time::Duration::from_millis(ms));
58 Ok(Value::Bool(true))
59 });
60
61 rp.register_builtin("retry.attempts", |args, _| {
63 let max_attempts = args.first().map(|v| v.to_number() as u32).unwrap_or(3);
64 let initial_delay = args.get(1).map(|v| v.to_number()).unwrap_or(1000.0);
65 let factor = args.get(2).map(|v| v.to_number()).unwrap_or(2.0);
66 let mut total_wait = 0.0_f64;
67 let mut result = Vec::new();
68 for i in 0..max_attempts {
69 let d = if i == 0 { 0.0 } else { (initial_delay * factor.powi((i - 1) as i32)).round() };
70 total_wait += d;
71 let mut obj = indexmap::IndexMap::new();
72 obj.insert("attempt".to_string(), Value::Number((i + 1) as f64));
73 obj.insert("delay".to_string(), Value::Number(d));
74 obj.insert("totalWait".to_string(), Value::Number(total_wait));
75 result.push(Value::Object(obj));
76 }
77 Ok(Value::Array(result))
78 });
79
80 let s = state.clone();
82 rp.register_builtin("retry.createBreaker", move |args, _| {
83 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
84 let threshold = args.get(1).map(|v| v.to_number() as u32).unwrap_or(5);
85 let reset_timeout = args.get(2).map(|v| v.to_number() as u64).unwrap_or(60000);
86 let breaker = CircuitBreaker {
87 failures: 0,
88 threshold,
89 reset_timeout,
90 state: "closed".to_string(),
91 last_failure: 0,
92 successes_in_half_open: 0,
93 half_open_threshold: 1,
94 };
95 s.lock().unwrap().insert(name.clone(), breaker);
96 let mut obj = indexmap::IndexMap::new();
97 obj.insert("name".to_string(), Value::String(name));
98 obj.insert("state".to_string(), Value::String("closed".to_string()));
99 obj.insert("threshold".to_string(), Value::Number(threshold as f64));
100 obj.insert("resetTimeout".to_string(), Value::Number(reset_timeout as f64));
101 Ok(Value::Object(obj))
102 });
103
104 let s = state.clone();
106 rp.register_builtin("retry.breakerState", move |args, _| {
107 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
108 let mut breakers = s.lock().unwrap();
109 if let Some(b) = breakers.get_mut(&name) {
110 let now = now_ms();
111 if b.state == "open" && now - b.last_failure >= b.reset_timeout {
112 b.state = "half-open".to_string();
113 b.successes_in_half_open = 0;
114 }
115 let mut obj = indexmap::IndexMap::new();
116 obj.insert("name".to_string(), Value::String(name));
117 obj.insert("state".to_string(), Value::String(b.state.clone()));
118 obj.insert("failures".to_string(), Value::Number(b.failures as f64));
119 obj.insert("threshold".to_string(), Value::Number(b.threshold as f64));
120 obj.insert("resetTimeout".to_string(), Value::Number(b.reset_timeout as f64));
121 Ok(Value::Object(obj))
122 } else {
123 let mut obj = indexmap::IndexMap::new();
124 obj.insert("name".to_string(), Value::String(name));
125 obj.insert("state".to_string(), Value::String("unknown".to_string()));
126 Ok(Value::Object(obj))
127 }
128 });
129
130 let s = state.clone();
132 rp.register_builtin("retry.breakerRecord", move |args, _| {
133 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
134 let success = args.get(1).map(|v| match v {
135 Value::Bool(b) => *b,
136 Value::String(s) => s != "false" && s != "failure",
137 _ => true,
138 }).unwrap_or(true);
139 let mut breakers = s.lock().unwrap();
140 let b = breakers.get_mut(&name)
141 .ok_or_else(|| format!("Circuit breaker \"{}\" not found. Create it first.", name))?;
142 let now = now_ms();
143 if b.state == "open" && now - b.last_failure >= b.reset_timeout {
144 b.state = "half-open".to_string();
145 b.successes_in_half_open = 0;
146 }
147 if success {
148 if b.state == "half-open" {
149 b.successes_in_half_open += 1;
150 if b.successes_in_half_open >= b.half_open_threshold {
151 b.state = "closed".to_string();
152 b.failures = 0;
153 }
154 } else {
155 b.failures = 0;
156 }
157 } else {
158 b.failures += 1;
159 b.last_failure = now;
160 if b.failures >= b.threshold {
161 b.state = "open".to_string();
162 }
163 }
164 let mut obj = indexmap::IndexMap::new();
165 obj.insert("name".to_string(), Value::String(name));
166 obj.insert("state".to_string(), Value::String(b.state.clone()));
167 obj.insert("failures".to_string(), Value::Number(b.failures as f64));
168 Ok(Value::Object(obj))
169 });
170
171 let s = state.clone();
173 rp.register_builtin("retry.breakerAllow", move |args, _| {
174 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
175 let mut breakers = s.lock().unwrap();
176 if let Some(b) = breakers.get_mut(&name) {
177 if b.state == "open" {
178 let now = now_ms();
179 if now - b.last_failure >= b.reset_timeout {
180 b.state = "half-open".to_string();
181 b.successes_in_half_open = 0;
182 Ok(Value::Bool(true))
183 } else {
184 Ok(Value::Bool(false))
185 }
186 } else {
187 Ok(Value::Bool(true))
188 }
189 } else {
190 Ok(Value::Bool(true))
191 }
192 });
193
194 let s = state.clone();
196 rp.register_builtin("retry.breakerReset", move |args, _| {
197 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
198 let mut breakers = s.lock().unwrap();
199 if let Some(b) = breakers.get_mut(&name) {
200 b.failures = 0;
201 b.state = "closed".to_string();
202 b.last_failure = 0;
203 b.successes_in_half_open = 0;
204 Ok(Value::Bool(true))
205 } else {
206 Ok(Value::Bool(false))
207 }
208 });
209}