Skip to main content

ralph_workflow/reducer/state/
agent_chain.rs

1// Agent fallback chain state.
2//
3// Contains AgentChainState and backoff computation helpers.
4
5use serde::de::Deserializer;
6use sha2::{Digest, Sha256};
7
8/// Agent fallback chain state (explicit, not loop indices).
9///
10/// Tracks position in the multi-level fallback chain:
11/// - Agent level (primary → fallback1 → fallback2)
12/// - Model level (within each agent, try different models)
13/// - Retry cycle (exhaust all agents, start over with exponential backoff)
14#[derive(Clone, Serialize, Deserialize, Debug)]
15pub struct AgentChainState {
16    pub agents: Vec<String>,
17    pub current_agent_index: usize,
18    pub models_per_agent: Vec<Vec<String>>,
19    pub current_model_index: usize,
20    pub retry_cycle: u32,
21    pub max_cycles: u32,
22    /// Base delay between retry cycles in milliseconds.
23    #[serde(default = "default_retry_delay_ms")]
24    pub retry_delay_ms: u64,
25    /// Multiplier for exponential backoff.
26    #[serde(default = "default_backoff_multiplier")]
27    pub backoff_multiplier: f64,
28    /// Maximum backoff delay in milliseconds.
29    #[serde(default = "default_max_backoff_ms")]
30    pub max_backoff_ms: u64,
31    /// Pending backoff delay (milliseconds) that must be waited before continuing.
32    #[serde(default)]
33    pub backoff_pending_ms: Option<u64>,
34    pub current_role: AgentRole,
35    /// Prompt context preserved from a rate-limited agent for continuation.
36    ///
37    /// When an agent hits 429, we save the prompt here so the next agent can
38    /// continue the SAME role/task instead of starting from scratch.
39    ///
40    /// IMPORTANT: This must be role-scoped to prevent cross-task contamination
41    /// (e.g., a developer continuation prompt overriding an analysis prompt).
42    #[serde(
43        default,
44        deserialize_with = "deserialize_rate_limit_continuation_prompt"
45    )]
46    pub rate_limit_continuation_prompt: Option<RateLimitContinuationPrompt>,
47    /// Session ID from the last agent response.
48    ///
49    /// Used for XSD retry to continue with the same session when possible.
50    /// Agents that support sessions (e.g., Claude Code) emit session IDs
51    /// that can be passed back for continuation.
52    #[serde(default)]
53    pub last_session_id: Option<String>,
54}
55
56/// Role-scoped continuation prompt captured from a rate limit (429).
57#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
58pub struct RateLimitContinuationPrompt {
59    pub role: AgentRole,
60    pub prompt: String,
61}
62
63#[derive(Deserialize)]
64#[serde(untagged)]
65enum RateLimitContinuationPromptRepr {
66    LegacyString(String),
67    Structured { role: AgentRole, prompt: String },
68}
69
70fn deserialize_rate_limit_continuation_prompt<'de, D>(
71    deserializer: D,
72) -> Result<Option<RateLimitContinuationPrompt>, D::Error>
73where
74    D: Deserializer<'de>,
75{
76    let opt = Option::<RateLimitContinuationPromptRepr>::deserialize(deserializer)?;
77    Ok(opt.map(|repr| match repr {
78        RateLimitContinuationPromptRepr::LegacyString(prompt) => RateLimitContinuationPrompt {
79            role: AgentRole::Developer,
80            prompt,
81        },
82        RateLimitContinuationPromptRepr::Structured { role, prompt } => {
83            RateLimitContinuationPrompt { role, prompt }
84        }
85    }))
86}
87
88const fn default_retry_delay_ms() -> u64 {
89    1000
90}
91
92const fn default_backoff_multiplier() -> f64 {
93    2.0
94}
95
96const fn default_max_backoff_ms() -> u64 {
97    60000
98}
99
100impl AgentChainState {
101    pub fn initial() -> Self {
102        Self {
103            agents: Vec::new(),
104            current_agent_index: 0,
105            models_per_agent: Vec::new(),
106            current_model_index: 0,
107            retry_cycle: 0,
108            max_cycles: 3,
109            retry_delay_ms: default_retry_delay_ms(),
110            backoff_multiplier: default_backoff_multiplier(),
111            max_backoff_ms: default_max_backoff_ms(),
112            backoff_pending_ms: None,
113            current_role: AgentRole::Developer,
114            rate_limit_continuation_prompt: None,
115            last_session_id: None,
116        }
117    }
118
119    pub fn with_agents(
120        mut self,
121        agents: Vec<String>,
122        models_per_agent: Vec<Vec<String>>,
123        role: AgentRole,
124    ) -> Self {
125        self.agents = agents;
126        self.models_per_agent = models_per_agent;
127        self.current_role = role;
128        self
129    }
130
131    /// Builder method to set the maximum number of retry cycles.
132    ///
133    /// A retry cycle is when all agents have been exhausted and we start
134    /// over with exponential backoff.
135    pub fn with_max_cycles(mut self, max_cycles: u32) -> Self {
136        self.max_cycles = max_cycles;
137        self
138    }
139
140    pub fn with_backoff_policy(
141        mut self,
142        retry_delay_ms: u64,
143        backoff_multiplier: f64,
144        max_backoff_ms: u64,
145    ) -> Self {
146        self.retry_delay_ms = retry_delay_ms;
147        self.backoff_multiplier = backoff_multiplier;
148        self.max_backoff_ms = max_backoff_ms;
149        self
150    }
151
152    pub fn current_agent(&self) -> Option<&String> {
153        self.agents.get(self.current_agent_index)
154    }
155
156    /// Stable signature of the current consumer set (agents + configured models + role).
157    ///
158    /// This is used to dedupe oversize materialization decisions across reducer retries.
159    /// The signature is stable under:
160    /// - switching the current agent/model index
161    /// - retry cycles
162    ///
163    /// It changes only when the configured consumer set changes.
164    pub fn consumer_signature_sha256(&self) -> String {
165        let mut pairs: Vec<String> = self
166            .agents
167            .iter()
168            .enumerate()
169            .map(|(idx, agent)| {
170                let models = self
171                    .models_per_agent
172                    .get(idx)
173                    .cloned()
174                    .unwrap_or_default()
175                    .join(",");
176                format!("{agent}|{models}")
177            })
178            .collect();
179        pairs.sort();
180
181        let mut hasher = Sha256::new();
182        hasher.update(format!("{:?}\n", self.current_role).as_bytes());
183        for pair in pairs {
184            hasher.update(pair.as_bytes());
185            hasher.update(b"\n");
186        }
187        let digest = hasher.finalize();
188        digest.iter().map(|b| format!("{b:02x}")).collect()
189    }
190
191    /// Get the currently selected model for the current agent.
192    ///
193    /// Returns `None` if:
194    /// - No models are configured
195    /// - The current agent index is out of bounds
196    /// - The current model index is out of bounds
197    pub fn current_model(&self) -> Option<&String> {
198        self.models_per_agent
199            .get(self.current_agent_index)
200            .and_then(|models| models.get(self.current_model_index))
201    }
202
203    pub fn is_exhausted(&self) -> bool {
204        self.retry_cycle >= self.max_cycles
205            && self.current_agent_index == 0
206            && self.current_model_index == 0
207    }
208
209    pub fn advance_to_next_model(&self) -> Self {
210        let start_agent_index = self.current_agent_index;
211        let new = self.clone();
212
213        // When models are configured, we try each model for the current agent once.
214        // If the models list is exhausted, advance to the next agent/retry cycle
215        // instead of looping models indefinitely.
216        let mut next = match new.models_per_agent.get(new.current_agent_index) {
217            Some(models) if !models.is_empty() => {
218                if new.current_model_index + 1 < models.len() {
219                    let mut advanced = new;
220                    advanced.current_model_index += 1;
221                    advanced
222                } else {
223                    new.switch_to_next_agent()
224                }
225            }
226            _ => new.switch_to_next_agent(),
227        };
228
229        if next.current_agent_index != start_agent_index {
230            next.last_session_id = None;
231        }
232
233        next
234    }
235
236    pub fn switch_to_next_agent(&self) -> Self {
237        let mut new = self.clone();
238        if new.current_agent_index + 1 < new.agents.len() {
239            new.current_agent_index += 1;
240            new.current_model_index = 0;
241            new.backoff_pending_ms = None;
242        } else {
243            new.current_agent_index = 0;
244            new.current_model_index = 0;
245            new.retry_cycle += 1;
246            if new.is_exhausted() {
247                new.backoff_pending_ms = None;
248            } else {
249                new.backoff_pending_ms = Some(new.calculate_backoff_delay_ms_for_retry_cycle());
250            }
251        }
252        new
253    }
254
255    /// Switch to a specific agent by name.
256    ///
257    /// If `to_agent` is unknown, falls back to `switch_to_next_agent()` to keep the
258    /// reducer deterministic.
259    pub fn switch_to_agent_named(&self, to_agent: &str) -> Self {
260        let Some(target_index) = self.agents.iter().position(|a| a == to_agent) else {
261            return self.switch_to_next_agent();
262        };
263
264        let mut new = self.clone();
265        if target_index == new.current_agent_index {
266            new.current_model_index = 0;
267            new.backoff_pending_ms = None;
268            return new;
269        }
270
271        new.current_agent_index = target_index;
272        new.current_model_index = 0;
273
274        if target_index <= self.current_agent_index {
275            // Treat switching to an earlier agent as starting a new retry cycle.
276            new.retry_cycle += 1;
277            if new.is_exhausted() {
278                new.backoff_pending_ms = None;
279            } else {
280                new.backoff_pending_ms = Some(new.calculate_backoff_delay_ms_for_retry_cycle());
281            }
282        } else {
283            new.backoff_pending_ms = None;
284        }
285
286        new
287    }
288
289    /// Switch to next agent after rate limit, preserving prompt for continuation.
290    ///
291    /// This is used when an agent hits a 429 rate limit error. Instead of
292    /// retrying with the same agent (which would likely hit rate limits again),
293    /// we switch to the next agent and preserve the prompt so the new agent
294    /// can continue the same work.
295    pub fn switch_to_next_agent_with_prompt(&self, prompt: Option<String>) -> Self {
296        let mut next = self.switch_to_next_agent();
297        // Back-compat: older callers didn't track role. Preserve prompt only.
298        next.rate_limit_continuation_prompt = prompt.map(|p| RateLimitContinuationPrompt {
299            role: next.current_role,
300            prompt: p,
301        });
302        next
303    }
304
305    /// Switch to next agent after rate limit, preserving prompt for continuation (role-scoped).
306    pub fn switch_to_next_agent_with_prompt_for_role(
307        &self,
308        role: AgentRole,
309        prompt: Option<String>,
310    ) -> Self {
311        let mut next = self.switch_to_next_agent();
312        next.rate_limit_continuation_prompt =
313            prompt.map(|p| RateLimitContinuationPrompt { role, prompt: p });
314        next
315    }
316
317    /// Clear continuation prompt after successful execution.
318    ///
319    /// Called when an agent successfully completes its task, clearing any
320    /// saved prompt context from previous rate-limited agents.
321    pub fn clear_continuation_prompt(&self) -> Self {
322        let mut new = self.clone();
323        new.rate_limit_continuation_prompt = None;
324        new
325    }
326
327    pub fn reset_for_role(&self, role: AgentRole) -> Self {
328        let mut new = self.clone();
329        new.current_role = role;
330        new.current_agent_index = 0;
331        new.current_model_index = 0;
332        new.retry_cycle = 0;
333        new.backoff_pending_ms = None;
334        new.rate_limit_continuation_prompt = None;
335        new.last_session_id = None;
336        new
337    }
338
339    pub fn reset(&self) -> Self {
340        let mut new = self.clone();
341        new.current_agent_index = 0;
342        new.current_model_index = 0;
343        new.backoff_pending_ms = None;
344        new.rate_limit_continuation_prompt = None;
345        new.last_session_id = None;
346        new
347    }
348
349    /// Store session ID from agent response for potential reuse.
350    pub fn with_session_id(&self, session_id: Option<String>) -> Self {
351        let mut new = self.clone();
352        new.last_session_id = session_id;
353        new
354    }
355
356    /// Clear session ID (e.g., when switching agents or starting new work).
357    pub fn clear_session_id(&self) -> Self {
358        let mut new = self.clone();
359        new.last_session_id = None;
360        new
361    }
362
363    pub fn start_retry_cycle(&self) -> Self {
364        let mut new = self.clone();
365        new.current_agent_index = 0;
366        new.current_model_index = 0;
367        new.retry_cycle += 1;
368        if new.is_exhausted() {
369            new.backoff_pending_ms = None;
370        } else {
371            new.backoff_pending_ms = Some(new.calculate_backoff_delay_ms_for_retry_cycle());
372        }
373        new
374    }
375
376    pub fn clear_backoff_pending(&self) -> Self {
377        let mut new = self.clone();
378        new.backoff_pending_ms = None;
379        new
380    }
381
382    fn calculate_backoff_delay_ms_for_retry_cycle(&self) -> u64 {
383        // The first retry cycle should use the base delay.
384        let cycle_index = self.retry_cycle.saturating_sub(1);
385        calculate_backoff_delay_ms(
386            self.retry_delay_ms,
387            self.backoff_multiplier,
388            self.max_backoff_ms,
389            cycle_index,
390        )
391    }
392}
393
394// Backoff computation helpers.
395// These mirror the semantics in `crate::agents::fallback::FallbackConfig::calculate_backoff`
396// but live in reducer state so orchestration can derive BackoffWait effects purely.
397
398const IEEE_754_EXP_BIAS: i32 = 1023;
399const IEEE_754_EXP_MASK: u64 = 0x7FF;
400const IEEE_754_MANTISSA_MASK: u64 = 0x000F_FFFF_FFFF_FFFF;
401const IEEE_754_IMPLICIT_ONE: u64 = 1u64 << 52;
402
403fn f64_to_u64_via_bits(value: f64) -> u64 {
404    if !value.is_finite() || value < 0.0 {
405        return 0;
406    }
407    let bits = value.to_bits();
408    let exp_biased = ((bits >> 52) & IEEE_754_EXP_MASK) as i32;
409    let mantissa = bits & IEEE_754_MANTISSA_MASK;
410    if exp_biased == 0 {
411        return 0;
412    }
413    let exp = exp_biased - IEEE_754_EXP_BIAS;
414    if exp < 0 {
415        return 0;
416    }
417    let full_mantissa = mantissa | IEEE_754_IMPLICIT_ONE;
418    let shift = 52i32 - exp;
419    if shift <= 0 {
420        u64::MAX
421    } else if shift < 64 {
422        full_mantissa >> shift
423    } else {
424        0
425    }
426}
427
428fn multiplier_hundredths(backoff_multiplier: f64) -> u64 {
429    const EPSILON: f64 = 0.0001;
430    let m = backoff_multiplier;
431    if (m - 1.0).abs() < EPSILON {
432        return 100;
433    } else if (m - 1.5).abs() < EPSILON {
434        return 150;
435    } else if (m - 2.0).abs() < EPSILON {
436        return 200;
437    } else if (m - 2.5).abs() < EPSILON {
438        return 250;
439    } else if (m - 3.0).abs() < EPSILON {
440        return 300;
441    } else if (m - 4.0).abs() < EPSILON {
442        return 400;
443    } else if (m - 5.0).abs() < EPSILON {
444        return 500;
445    } else if (m - 10.0).abs() < EPSILON {
446        return 1000;
447    }
448
449    let clamped = m.clamp(0.0, 1000.0);
450    let multiplied = clamped * 100.0;
451    let rounded = multiplied.round();
452    f64_to_u64_via_bits(rounded)
453}
454
455fn calculate_backoff_delay_ms(
456    retry_delay_ms: u64,
457    backoff_multiplier: f64,
458    max_backoff_ms: u64,
459    cycle: u32,
460) -> u64 {
461    let mult_hundredths = multiplier_hundredths(backoff_multiplier);
462    let mut delay_hundredths = retry_delay_ms.saturating_mul(100);
463    for _ in 0..cycle {
464        delay_hundredths = delay_hundredths.saturating_mul(mult_hundredths);
465        delay_hundredths = delay_hundredths.saturating_div(100);
466    }
467    delay_hundredths.div_euclid(100).min(max_backoff_ms)
468}