robinpath_modules/modules/
retry_mod.rs1use robinpath::{RobinPath, Value};
2use std::sync::{LazyLock, Mutex};
3use std::collections::HashMap;
4
5struct CircuitBreaker {
6 failures: u32,
7 threshold: u32,
8 reset_timeout: u64,
9 state: String,
10 last_failure: u64,
11 successes_in_half_open: u32,
12 half_open_threshold: u32,
13}
14
15static BREAKERS: LazyLock<Mutex<HashMap<String, CircuitBreaker>>> =
16 LazyLock::new(|| Mutex::new(HashMap::new()));
17
18fn now_ms() -> u64 {
19 std::time::SystemTime::now()
20 .duration_since(std::time::UNIX_EPOCH)
21 .unwrap_or_default()
22 .as_millis() as u64
23}
24
25pub fn register(rp: &mut RobinPath) {
26 rp.register_builtin("retry.withBackoff", |args, _| {
28 let attempt = args.first().map(|v| v.to_number() as u32).unwrap_or(0);
29 let initial_delay = args.get(1).map(|v| v.to_number()).unwrap_or(1000.0);
30 let factor = args.get(2).map(|v| v.to_number()).unwrap_or(2.0);
31 let max_delay = args.get(3).map(|v| v.to_number()).unwrap_or(30000.0);
32 let jitter = args.get(4).map(|v| match v {
33 Value::Bool(b) => *b,
34 Value::String(s) => s != "false",
35 _ => true,
36 }).unwrap_or(true);
37
38 let mut delay = initial_delay * factor.powi(attempt as i32);
39 delay = delay.min(max_delay);
40 if jitter {
41 let jitter_factor = 0.75 + (attempt as f64 % 4.0) * 0.0625;
43 delay *= jitter_factor;
44 }
45 Ok(Value::Number(delay.round()))
46 });
47
48 rp.register_builtin("retry.isRetryable", |args, _| {
50 let status = args.first().map(|v| v.to_number() as u16).unwrap_or(0);
51 let retryable = matches!(status, 408 | 429 | 500 | 502 | 503 | 504);
52 Ok(Value::Bool(retryable))
53 });
54
55 rp.register_builtin("retry.delay", |args, _| {
57 let ms = args.first().map(|v| v.to_number() as u64).unwrap_or(1000);
58 std::thread::sleep(std::time::Duration::from_millis(ms));
59 Ok(Value::Bool(true))
60 });
61
62 rp.register_builtin("retry.attempts", |args, _| {
64 let max_attempts = args.first().map(|v| v.to_number() as u32).unwrap_or(3);
65 let initial_delay = args.get(1).map(|v| v.to_number()).unwrap_or(1000.0);
66 let factor = args.get(2).map(|v| v.to_number()).unwrap_or(2.0);
67 let mut total_wait = 0.0_f64;
68 let mut result = Vec::new();
69 for i in 0..max_attempts {
70 let d = if i == 0 { 0.0 } else { (initial_delay * factor.powi((i - 1) as i32)).round() };
71 total_wait += d;
72 let mut obj = indexmap::IndexMap::new();
73 obj.insert("attempt".to_string(), Value::Number((i + 1) as f64));
74 obj.insert("delay".to_string(), Value::Number(d));
75 obj.insert("totalWait".to_string(), Value::Number(total_wait));
76 result.push(Value::Object(obj));
77 }
78 Ok(Value::Array(result))
79 });
80
81 rp.register_builtin("retry.createBreaker", |args, _| {
83 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
84 let threshold = args.get(1).map(|v| v.to_number() as u32).unwrap_or(5);
85 let reset_timeout = args.get(2).map(|v| v.to_number() as u64).unwrap_or(60000);
86 let breaker = CircuitBreaker {
87 failures: 0,
88 threshold,
89 reset_timeout,
90 state: "closed".to_string(),
91 last_failure: 0,
92 successes_in_half_open: 0,
93 half_open_threshold: 1,
94 };
95 BREAKERS.lock().unwrap().insert(name.clone(), breaker);
96 let mut obj = indexmap::IndexMap::new();
97 obj.insert("name".to_string(), Value::String(name));
98 obj.insert("state".to_string(), Value::String("closed".to_string()));
99 obj.insert("threshold".to_string(), Value::Number(threshold as f64));
100 obj.insert("resetTimeout".to_string(), Value::Number(reset_timeout as f64));
101 Ok(Value::Object(obj))
102 });
103
104 rp.register_builtin("retry.breakerState", |args, _| {
106 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
107 let mut breakers = BREAKERS.lock().unwrap();
108 if let Some(b) = breakers.get_mut(&name) {
109 let now = now_ms();
110 if b.state == "open" && now - b.last_failure >= b.reset_timeout {
111 b.state = "half-open".to_string();
112 b.successes_in_half_open = 0;
113 }
114 let mut obj = indexmap::IndexMap::new();
115 obj.insert("name".to_string(), Value::String(name));
116 obj.insert("state".to_string(), Value::String(b.state.clone()));
117 obj.insert("failures".to_string(), Value::Number(b.failures as f64));
118 obj.insert("threshold".to_string(), Value::Number(b.threshold as f64));
119 obj.insert("resetTimeout".to_string(), Value::Number(b.reset_timeout as f64));
120 Ok(Value::Object(obj))
121 } else {
122 let mut obj = indexmap::IndexMap::new();
123 obj.insert("name".to_string(), Value::String(name));
124 obj.insert("state".to_string(), Value::String("unknown".to_string()));
125 Ok(Value::Object(obj))
126 }
127 });
128
129 rp.register_builtin("retry.breakerRecord", |args, _| {
131 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
132 let success = args.get(1).map(|v| match v {
133 Value::Bool(b) => *b,
134 Value::String(s) => s != "false" && s != "failure",
135 _ => true,
136 }).unwrap_or(true);
137 let mut breakers = BREAKERS.lock().unwrap();
138 let b = breakers.get_mut(&name)
139 .ok_or_else(|| format!("Circuit breaker \"{}\" not found. Create it first.", name))?;
140 let now = now_ms();
141 if b.state == "open" && now - b.last_failure >= b.reset_timeout {
142 b.state = "half-open".to_string();
143 b.successes_in_half_open = 0;
144 }
145 if success {
146 if b.state == "half-open" {
147 b.successes_in_half_open += 1;
148 if b.successes_in_half_open >= b.half_open_threshold {
149 b.state = "closed".to_string();
150 b.failures = 0;
151 }
152 } else {
153 b.failures = 0;
154 }
155 } else {
156 b.failures += 1;
157 b.last_failure = now;
158 if b.failures >= b.threshold {
159 b.state = "open".to_string();
160 }
161 }
162 let mut obj = indexmap::IndexMap::new();
163 obj.insert("name".to_string(), Value::String(name));
164 obj.insert("state".to_string(), Value::String(b.state.clone()));
165 obj.insert("failures".to_string(), Value::Number(b.failures as f64));
166 Ok(Value::Object(obj))
167 });
168
169 rp.register_builtin("retry.breakerAllow", |args, _| {
171 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
172 let mut breakers = BREAKERS.lock().unwrap();
173 if let Some(b) = breakers.get_mut(&name) {
174 if b.state == "open" {
175 let now = now_ms();
176 if now - b.last_failure >= b.reset_timeout {
177 b.state = "half-open".to_string();
178 b.successes_in_half_open = 0;
179 Ok(Value::Bool(true))
180 } else {
181 Ok(Value::Bool(false))
182 }
183 } else {
184 Ok(Value::Bool(true))
185 }
186 } else {
187 Ok(Value::Bool(true))
188 }
189 });
190
191 rp.register_builtin("retry.breakerReset", |args, _| {
193 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
194 let mut breakers = BREAKERS.lock().unwrap();
195 if let Some(b) = breakers.get_mut(&name) {
196 b.failures = 0;
197 b.state = "closed".to_string();
198 b.last_failure = 0;
199 b.successes_in_half_open = 0;
200 Ok(Value::Bool(true))
201 } else {
202 Ok(Value::Bool(false))
203 }
204 });
205}