ralph_workflow/reducer/state/
agent_chain.rs1use serde::de::Deserializer;
6use sha2::{Digest, Sha256};
7
8#[derive(Clone, Serialize, Deserialize, Debug)]
15pub struct AgentChainState {
16 pub agents: Vec<String>,
17 pub current_agent_index: usize,
18 pub models_per_agent: Vec<Vec<String>>,
19 pub current_model_index: usize,
20 pub retry_cycle: u32,
21 pub max_cycles: u32,
22 #[serde(default = "default_retry_delay_ms")]
24 pub retry_delay_ms: u64,
25 #[serde(default = "default_backoff_multiplier")]
27 pub backoff_multiplier: f64,
28 #[serde(default = "default_max_backoff_ms")]
30 pub max_backoff_ms: u64,
31 #[serde(default)]
33 pub backoff_pending_ms: Option<u64>,
34 pub current_role: AgentRole,
35 #[serde(
43 default,
44 deserialize_with = "deserialize_rate_limit_continuation_prompt"
45 )]
46 pub rate_limit_continuation_prompt: Option<RateLimitContinuationPrompt>,
47 #[serde(default)]
53 pub last_session_id: Option<String>,
54}
55
56#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
58pub struct RateLimitContinuationPrompt {
59 pub role: AgentRole,
60 pub prompt: String,
61}
62
63#[derive(Deserialize)]
64#[serde(untagged)]
65enum RateLimitContinuationPromptRepr {
66 LegacyString(String),
67 Structured { role: AgentRole, prompt: String },
68}
69
70fn deserialize_rate_limit_continuation_prompt<'de, D>(
71 deserializer: D,
72) -> Result<Option<RateLimitContinuationPrompt>, D::Error>
73where
74 D: Deserializer<'de>,
75{
76 let opt = Option::<RateLimitContinuationPromptRepr>::deserialize(deserializer)?;
77 Ok(opt.map(|repr| match repr {
78 RateLimitContinuationPromptRepr::LegacyString(prompt) => RateLimitContinuationPrompt {
79 role: AgentRole::Developer,
80 prompt,
81 },
82 RateLimitContinuationPromptRepr::Structured { role, prompt } => {
83 RateLimitContinuationPrompt { role, prompt }
84 }
85 }))
86}
87
88const fn default_retry_delay_ms() -> u64 {
89 1000
90}
91
92const fn default_backoff_multiplier() -> f64 {
93 2.0
94}
95
96const fn default_max_backoff_ms() -> u64 {
97 60000
98}
99
100impl AgentChainState {
101 pub fn initial() -> Self {
102 Self {
103 agents: Vec::new(),
104 current_agent_index: 0,
105 models_per_agent: Vec::new(),
106 current_model_index: 0,
107 retry_cycle: 0,
108 max_cycles: 3,
109 retry_delay_ms: default_retry_delay_ms(),
110 backoff_multiplier: default_backoff_multiplier(),
111 max_backoff_ms: default_max_backoff_ms(),
112 backoff_pending_ms: None,
113 current_role: AgentRole::Developer,
114 rate_limit_continuation_prompt: None,
115 last_session_id: None,
116 }
117 }
118
119 pub fn with_agents(
120 mut self,
121 agents: Vec<String>,
122 models_per_agent: Vec<Vec<String>>,
123 role: AgentRole,
124 ) -> Self {
125 self.agents = agents;
126 self.models_per_agent = models_per_agent;
127 self.current_role = role;
128 self
129 }
130
131 pub fn with_max_cycles(mut self, max_cycles: u32) -> Self {
136 self.max_cycles = max_cycles;
137 self
138 }
139
140 pub fn with_backoff_policy(
141 mut self,
142 retry_delay_ms: u64,
143 backoff_multiplier: f64,
144 max_backoff_ms: u64,
145 ) -> Self {
146 self.retry_delay_ms = retry_delay_ms;
147 self.backoff_multiplier = backoff_multiplier;
148 self.max_backoff_ms = max_backoff_ms;
149 self
150 }
151
152 pub fn current_agent(&self) -> Option<&String> {
153 self.agents.get(self.current_agent_index)
154 }
155
156 pub fn consumer_signature_sha256(&self) -> String {
165 let mut pairs: Vec<String> = self
166 .agents
167 .iter()
168 .enumerate()
169 .map(|(idx, agent)| {
170 let models = self
171 .models_per_agent
172 .get(idx)
173 .cloned()
174 .unwrap_or_default()
175 .join(",");
176 format!("{agent}|{models}")
177 })
178 .collect();
179 pairs.sort();
180
181 let mut hasher = Sha256::new();
182 hasher.update(format!("{:?}\n", self.current_role).as_bytes());
183 for pair in pairs {
184 hasher.update(pair.as_bytes());
185 hasher.update(b"\n");
186 }
187 let digest = hasher.finalize();
188 digest.iter().map(|b| format!("{b:02x}")).collect()
189 }
190
191 pub fn current_model(&self) -> Option<&String> {
198 self.models_per_agent
199 .get(self.current_agent_index)
200 .and_then(|models| models.get(self.current_model_index))
201 }
202
203 pub fn is_exhausted(&self) -> bool {
204 self.retry_cycle >= self.max_cycles
205 && self.current_agent_index == 0
206 && self.current_model_index == 0
207 }
208
209 pub fn advance_to_next_model(&self) -> Self {
210 let start_agent_index = self.current_agent_index;
211 let new = self.clone();
212
213 let mut next = match new.models_per_agent.get(new.current_agent_index) {
217 Some(models) if !models.is_empty() => {
218 if new.current_model_index + 1 < models.len() {
219 let mut advanced = new;
220 advanced.current_model_index += 1;
221 advanced
222 } else {
223 new.switch_to_next_agent()
224 }
225 }
226 _ => new.switch_to_next_agent(),
227 };
228
229 if next.current_agent_index != start_agent_index {
230 next.last_session_id = None;
231 }
232
233 next
234 }
235
236 pub fn switch_to_next_agent(&self) -> Self {
237 let mut new = self.clone();
238 if new.current_agent_index + 1 < new.agents.len() {
239 new.current_agent_index += 1;
240 new.current_model_index = 0;
241 new.backoff_pending_ms = None;
242 } else {
243 new.current_agent_index = 0;
244 new.current_model_index = 0;
245 new.retry_cycle += 1;
246 if new.is_exhausted() {
247 new.backoff_pending_ms = None;
248 } else {
249 new.backoff_pending_ms = Some(new.calculate_backoff_delay_ms_for_retry_cycle());
250 }
251 }
252 new
253 }
254
255 pub fn switch_to_agent_named(&self, to_agent: &str) -> Self {
260 let Some(target_index) = self.agents.iter().position(|a| a == to_agent) else {
261 return self.switch_to_next_agent();
262 };
263
264 let mut new = self.clone();
265 if target_index == new.current_agent_index {
266 new.current_model_index = 0;
267 new.backoff_pending_ms = None;
268 return new;
269 }
270
271 new.current_agent_index = target_index;
272 new.current_model_index = 0;
273
274 if target_index <= self.current_agent_index {
275 new.retry_cycle += 1;
277 if new.is_exhausted() {
278 new.backoff_pending_ms = None;
279 } else {
280 new.backoff_pending_ms = Some(new.calculate_backoff_delay_ms_for_retry_cycle());
281 }
282 } else {
283 new.backoff_pending_ms = None;
284 }
285
286 new
287 }
288
289 pub fn switch_to_next_agent_with_prompt(&self, prompt: Option<String>) -> Self {
296 let mut next = self.switch_to_next_agent();
297 next.rate_limit_continuation_prompt = prompt.map(|p| RateLimitContinuationPrompt {
299 role: next.current_role,
300 prompt: p,
301 });
302 next
303 }
304
305 pub fn switch_to_next_agent_with_prompt_for_role(
307 &self,
308 role: AgentRole,
309 prompt: Option<String>,
310 ) -> Self {
311 let mut next = self.switch_to_next_agent();
312 next.rate_limit_continuation_prompt =
313 prompt.map(|p| RateLimitContinuationPrompt { role, prompt: p });
314 next
315 }
316
317 pub fn clear_continuation_prompt(&self) -> Self {
322 let mut new = self.clone();
323 new.rate_limit_continuation_prompt = None;
324 new
325 }
326
327 pub fn reset_for_role(&self, role: AgentRole) -> Self {
328 let mut new = self.clone();
329 new.current_role = role;
330 new.current_agent_index = 0;
331 new.current_model_index = 0;
332 new.retry_cycle = 0;
333 new.backoff_pending_ms = None;
334 new.rate_limit_continuation_prompt = None;
335 new.last_session_id = None;
336 new
337 }
338
339 pub fn reset(&self) -> Self {
340 let mut new = self.clone();
341 new.current_agent_index = 0;
342 new.current_model_index = 0;
343 new.backoff_pending_ms = None;
344 new.rate_limit_continuation_prompt = None;
345 new.last_session_id = None;
346 new
347 }
348
349 pub fn with_session_id(&self, session_id: Option<String>) -> Self {
351 let mut new = self.clone();
352 new.last_session_id = session_id;
353 new
354 }
355
356 pub fn clear_session_id(&self) -> Self {
358 let mut new = self.clone();
359 new.last_session_id = None;
360 new
361 }
362
363 pub fn start_retry_cycle(&self) -> Self {
364 let mut new = self.clone();
365 new.current_agent_index = 0;
366 new.current_model_index = 0;
367 new.retry_cycle += 1;
368 if new.is_exhausted() {
369 new.backoff_pending_ms = None;
370 } else {
371 new.backoff_pending_ms = Some(new.calculate_backoff_delay_ms_for_retry_cycle());
372 }
373 new
374 }
375
376 pub fn clear_backoff_pending(&self) -> Self {
377 let mut new = self.clone();
378 new.backoff_pending_ms = None;
379 new
380 }
381
382 fn calculate_backoff_delay_ms_for_retry_cycle(&self) -> u64 {
383 let cycle_index = self.retry_cycle.saturating_sub(1);
385 calculate_backoff_delay_ms(
386 self.retry_delay_ms,
387 self.backoff_multiplier,
388 self.max_backoff_ms,
389 cycle_index,
390 )
391 }
392}
393
394const IEEE_754_EXP_BIAS: i32 = 1023;
399const IEEE_754_EXP_MASK: u64 = 0x7FF;
400const IEEE_754_MANTISSA_MASK: u64 = 0x000F_FFFF_FFFF_FFFF;
401const IEEE_754_IMPLICIT_ONE: u64 = 1u64 << 52;
402
403fn f64_to_u64_via_bits(value: f64) -> u64 {
404 if !value.is_finite() || value < 0.0 {
405 return 0;
406 }
407 let bits = value.to_bits();
408 let exp_biased = ((bits >> 52) & IEEE_754_EXP_MASK) as i32;
409 let mantissa = bits & IEEE_754_MANTISSA_MASK;
410 if exp_biased == 0 {
411 return 0;
412 }
413 let exp = exp_biased - IEEE_754_EXP_BIAS;
414 if exp < 0 {
415 return 0;
416 }
417 let full_mantissa = mantissa | IEEE_754_IMPLICIT_ONE;
418 let shift = 52i32 - exp;
419 if shift <= 0 {
420 u64::MAX
421 } else if shift < 64 {
422 full_mantissa >> shift
423 } else {
424 0
425 }
426}
427
428fn multiplier_hundredths(backoff_multiplier: f64) -> u64 {
429 const EPSILON: f64 = 0.0001;
430 let m = backoff_multiplier;
431 if (m - 1.0).abs() < EPSILON {
432 return 100;
433 } else if (m - 1.5).abs() < EPSILON {
434 return 150;
435 } else if (m - 2.0).abs() < EPSILON {
436 return 200;
437 } else if (m - 2.5).abs() < EPSILON {
438 return 250;
439 } else if (m - 3.0).abs() < EPSILON {
440 return 300;
441 } else if (m - 4.0).abs() < EPSILON {
442 return 400;
443 } else if (m - 5.0).abs() < EPSILON {
444 return 500;
445 } else if (m - 10.0).abs() < EPSILON {
446 return 1000;
447 }
448
449 let clamped = m.clamp(0.0, 1000.0);
450 let multiplied = clamped * 100.0;
451 let rounded = multiplied.round();
452 f64_to_u64_via_bits(rounded)
453}
454
455fn calculate_backoff_delay_ms(
456 retry_delay_ms: u64,
457 backoff_multiplier: f64,
458 max_backoff_ms: u64,
459 cycle: u32,
460) -> u64 {
461 let mult_hundredths = multiplier_hundredths(backoff_multiplier);
462 let mut delay_hundredths = retry_delay_ms.saturating_mul(100);
463 for _ in 0..cycle {
464 delay_hundredths = delay_hundredths.saturating_mul(mult_hundredths);
465 delay_hundredths = delay_hundredths.saturating_div(100);
466 }
467 delay_hundredths.div_euclid(100).min(max_backoff_ms)
468}