fusillade/request/
transitions.rs

1//! State transitions for batch requests using the typestate pattern.
2//!
3//! This module implements state transitions for HTTP batch requests using Rust's
4//! type system to enforce valid state transitions at compile time. Each request
5//! state is represented as a distinct type parameter on `Request<State>`.
6//!
7//! # Typestate Pattern
8//!
9//! The typestate pattern leverages Rust's type system to make invalid states
10//! unrepresentable. A `Request<Pending>` can only call methods available for
11//! pending requests, and transitions return different types:
12//!
13//! ```text
14//! Request<Pending> ──claim()──> Request<Claimed> ──process()──> Request<Processing>
15//!       │                             │                               │
16//!       │                             │                               └──complete()──> Request<Completed>
17//!       │                             │                               └──complete()──> Request<Failed>
18//!       └──cancel()──> Request<Canceled>                              └──cancel()────> Request<Canceled>
19//!                              │
20//!                              └──unclaim()─> Request<Pending>
21//!
22//! Request<Failed> ──retry()──> Request<Pending>  (if retries remain)
23//!                 ──retry()──> None              (if max retries reached)
24//! ```
25//!
26//! # State Lifecycle
27//!
28//! ## 1. Pending → Claimed
29//!
30//! A daemon claims a pending request for processing:
31//! - Records which daemon claimed it
32//! - Sets claimed_at timestamp
33//! - Preserves retry attempt count
34//!
35//! ## 2. Claimed → Processing
36//!
37//! The daemon starts executing the HTTP request:
38//! - Spawns an async task to make the HTTP call
39//! - Creates a channel to receive the result
40//! - Provides an abort handle for cancellation
41//!
42//! ## 3. Processing → Completed or Failed
43//!
44//! The HTTP request completes:
45//! - **Success**: Transitions to `Completed` with response body
46//! - **Failure**: Transitions to `Failed` with error message
47//! - **Retriable**: HTTP succeeded but status code indicates retry (e.g., 429, 500)
48//!
49//! ## 4. Failed → Pending (Retry)
50//!
51//! Failed requests can be retried with exponential backoff:
52//! - Increments retry_attempt counter
53//! - Calculates backoff delay: `backoff_ms * (factor ^ attempt)`
54//! - Sets not_before timestamp to delay retry
55//! - Returns `None` if max retries exceeded
56//!
57//! ## 5. Any State → Canceled
58//!
59//! Requests can be canceled from most states:
60//! - `Pending`: Simply marks as canceled
61//! - `Claimed`: Releases claim and cancels
62//! - `Processing`: Aborts the in-flight HTTP request
63//!
64//! # Retry Configuration
65//!
66//! Exponential backoff and retry limits are configured via [`RetryConfig`]:
67//!
68//! ```rust
69//! # use fusillade::request::transitions::RetryConfig;
70//! let config = RetryConfig {
71//!     max_retries: Some(1000),
72//!     stop_before_deadline_ms: Some(900_000),
73//!     backoff_ms: 1000,         // Start with 1 second
74//!     backoff_factor: 2,        // Double each time (1s, 2s, 4s)
75//!     max_backoff_ms: 60000,    // Cap at 60 seconds
76//! };
77//! ```
78//!
79//! # Example Workflow
80//!
81//! ```ignore
82//! // Daemon claims a pending request
83//! let pending: Request<Pending> = storage.next_pending().await?;
84//! let claimed = pending.claim(daemon_id, &storage).await?;
85//!
86//! // Start processing
87//! let processing = claimed.process(http_client, timeout_ms, &storage).await?;
88//!
89//! // Wait for completion
90//! let result = processing.complete(&storage, |resp| resp.status >= 500).await?;
91//!
92//! match result {
93//!     Ok(completed) => println!("Success: {}", completed.state.response_status),
94//!     Err(failed) => {
95//!         // Attempt retry with backoff
96//!         if let Some(retrying) = failed.retry(retry_attempt, config, &storage).await? {
97//!             println!("Retrying request...");
98//!         } else {
99//!             println!("Max retries exceeded");
100//!         }
101//!     }
102//! }
103//! ```
104
105use std::sync::Arc;
106
107use metrics::counter;
108use tokio::sync::Mutex;
109
110use crate::{
111    FusilladeError,
112    error::Result,
113    http::{HttpClient, HttpResponse},
114    manager::Storage,
115};
116
117use super::types::{
118    Canceled, Claimed, Completed, DaemonId, Failed, FailureReason, Pending, Processing, Request,
119    RequestCompletionResult,
120};
121
122/// Reason for cancelling a request.
123#[derive(Debug, Clone, Copy)]
124pub enum CancellationReason {
125    /// User-initiated cancellation (should persist Canceled state).
126    User,
127    /// Daemon shutdown (abort HTTP but don't persist state change).
128    Shutdown,
129    /// Request superseded by racing pair (abort HTTP but don't persist - supersede_racing_pair already updated DB).
130    Superseded,
131}
132
133impl Request<Pending> {
134    pub async fn claim<S: Storage + ?Sized>(
135        self,
136        daemon_id: DaemonId,
137        storage: &S,
138    ) -> Result<Request<Claimed>> {
139        let request = Request {
140            data: self.data,
141            state: Claimed {
142                daemon_id,
143                claimed_at: chrono::Utc::now(),
144                retry_attempt: self.state.retry_attempt, // Carry over retry attempt
145                batch_expires_at: self.state.batch_expires_at, // Carry over batch deadline
146            },
147        };
148        storage.persist(&request).await?;
149        Ok(request)
150    }
151
152    pub async fn cancel<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Canceled>> {
153        let request = Request {
154            data: self.data,
155            state: Canceled {
156                canceled_at: chrono::Utc::now(),
157            },
158        };
159        storage.persist(&request).await?;
160        Ok(request)
161    }
162}
163
164impl Request<Claimed> {
165    pub async fn unclaim<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Pending>> {
166        let request = Request {
167            data: self.data,
168            state: Pending {
169                retry_attempt: self.state.retry_attempt, // Preserve retry attempt
170                not_before: None,                        // Can be claimed immediately
171                batch_expires_at: self.state.batch_expires_at, // Carry over batch deadline
172            },
173        };
174        storage.persist(&request).await?;
175        Ok(request)
176    }
177
178    pub async fn cancel<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Canceled>> {
179        let request = Request {
180            data: self.data,
181            state: Canceled {
182                canceled_at: chrono::Utc::now(),
183            },
184        };
185        storage.persist(&request).await?;
186        Ok(request)
187    }
188
189    pub async fn process<H: HttpClient + 'static, S: Storage>(
190        self,
191        http_client: H,
192        timeout_ms: u64,
193        storage: &S,
194    ) -> Result<Request<Processing>> {
195        let request_data = self.data.clone();
196
197        // Create a channel for the HTTP result
198        let (tx, rx) = tokio::sync::mpsc::channel(1);
199
200        // Spawn the HTTP request as an async task
201        let task_handle = tokio::spawn(async move {
202            let result = http_client
203                .execute(&request_data, &request_data.api_key, timeout_ms)
204                .await;
205            let _ = tx.send(result).await; // Ignore send errors (receiver dropped)
206        });
207
208        let processing_state = Processing {
209            daemon_id: self.state.daemon_id,
210            claimed_at: self.state.claimed_at,
211            started_at: chrono::Utc::now(),
212            retry_attempt: self.state.retry_attempt, // Carry over retry attempt
213            batch_expires_at: self.state.batch_expires_at, // Carry over batch deadline
214            result_rx: Arc::new(Mutex::new(rx)),
215            abort_handle: task_handle.abort_handle(),
216        };
217
218        let request = Request {
219            data: self.data,
220            state: processing_state,
221        };
222
223        // Persist the Processing state so we can cancel it if needed
224        // If persist fails, abort the spawned HTTP task
225        if let Err(e) = storage.persist(&request).await {
226            request.state.abort_handle.abort();
227            return Err(e);
228        }
229
230        Ok(request)
231    }
232}
233
234/// Configuration for retry behavior.
235#[derive(Debug, Clone)]
236pub struct RetryConfig {
237    pub max_retries: Option<u32>,
238    pub stop_before_deadline_ms: Option<i64>,
239    pub backoff_ms: u64,
240    pub backoff_factor: u64,
241    pub max_backoff_ms: u64,
242}
243
244impl From<&crate::daemon::DaemonConfig> for RetryConfig {
245    fn from(config: &crate::daemon::DaemonConfig) -> Self {
246        RetryConfig {
247            max_retries: config.max_retries,
248            stop_before_deadline_ms: config.stop_before_deadline_ms,
249            backoff_ms: config.backoff_ms,
250            backoff_factor: config.backoff_factor,
251            max_backoff_ms: config.max_backoff_ms,
252        }
253    }
254}
255
256impl Request<Failed> {
257    /// Attempt to retry this failed request.
258    ///
259    /// If retries are available, transitions the request back to Pending with:
260    /// - Incremented retry_attempt
261    /// - Calculated not_before timestamp for exponential backoff
262    ///
263    /// If no retries remain, returns None and the request stays Failed.
264    ///
265    /// The retry logic considers:
266    /// - max_retries: Hard cap on total retry attempts
267    /// - stop_before_deadline_ms: Deadline-aware retry (stops before batch expiration)
268    pub fn can_retry(
269        self,
270        retry_attempt: u32,
271        config: RetryConfig,
272    ) -> std::result::Result<Request<Pending>, Box<Self>> {
273        // Calculate exponential backoff: backoff_ms * (backoff_factor ^ retry_attempt)
274        let backoff_duration = {
275            let exponential = config
276                .backoff_ms
277                .saturating_mul(config.backoff_factor.saturating_pow(retry_attempt));
278            exponential.min(config.max_backoff_ms)
279        };
280
281        let now = chrono::Utc::now();
282        let not_before = now + chrono::Duration::milliseconds(backoff_duration as i64);
283
284        if let Some(max_retries) = config.max_retries
285            && retry_attempt >= max_retries
286        {
287            counter!(
288                "fusillade_retry_denied_total",
289                "model" => self.data.model.clone(),
290                "reason" => "max_retries"
291            )
292            .increment(1);
293            tracing::debug!(
294                request_id = %self.data.id,
295                retry_attempt,
296                max_retries,
297                "No retries remaining (reached max_retries), request remains failed"
298            );
299            return Err(Box::new(self));
300        }
301
302        // Determine the effective deadline (with or without buffer)
303        let effective_deadline = if let Some(stop_before_deadline_ms) =
304            config.stop_before_deadline_ms
305        {
306            self.state.batch_expires_at - chrono::Duration::milliseconds(stop_before_deadline_ms)
307        } else {
308            // No buffer configured - use the actual deadline
309            self.state.batch_expires_at
310        };
311
312        // Check if the next retry would start before the effective deadline
313        if not_before >= effective_deadline {
314            counter!(
315                "fusillade_retry_denied_total",
316                "model" => self.data.model.clone(),
317                "reason" => "deadline"
318            )
319            .increment(1);
320            let time_until_deadline = self.state.batch_expires_at - now;
321            tracing::warn!(
322                request_id = %self.data.id,
323                retry_attempt,
324                time_until_deadline_seconds = time_until_deadline.num_seconds(),
325                batch_expires_at = %self.state.batch_expires_at,
326                stop_before_deadline_ms = config.stop_before_deadline_ms,
327                "No retries remaining (would exceed batch deadline), request remains failed"
328            );
329            return Err(Box::new(self));
330        }
331
332        let time_until_deadline = effective_deadline - now;
333        tracing::debug!(
334            request_id = %self.data.id,
335            retry_attempt,
336            time_until_deadline_seconds = time_until_deadline.num_seconds(),
337            batch_expires_at = %self.state.batch_expires_at,
338            "Retrying (deadline-aware: time remaining)"
339        );
340
341        tracing::info!(
342            request_id = %self.data.id,
343            retry_attempt = retry_attempt + 1,
344            backoff_ms = backoff_duration,
345            not_before = %not_before,
346            batch_expires_at = %self.state.batch_expires_at,
347            "Retrying failed request with exponential backoff"
348        );
349
350        let request = Request {
351            data: self.data,
352            state: Pending {
353                retry_attempt: retry_attempt + 1,
354                not_before: Some(not_before),
355                batch_expires_at: self.state.batch_expires_at,
356            },
357        };
358
359        Ok(request)
360    }
361}
362
363impl Request<Processing> {
364    /// Wait for the HTTP request to complete.
365    ///
366    /// This method awaits the result from the spawned HTTP task and transitions
367    /// the request to one of three terminal states: `Completed`, `Failed`, or `Canceled`.
368    ///
369    /// The `should_retry` predicate determines whether a response should be considered
370    /// a failure (and thus eligible for retry) or a success.
371    ///
372    /// The `cancellation` future allows external cancellation of the request. It should
373    /// resolve to a `CancellationReason`:
374    /// - `CancellationReason::User`: User-initiated cancellation (persists Canceled state)
375    /// - `CancellationReason::Shutdown`: Daemon shutdown (aborts HTTP but doesn't persist)
376    /// - `CancellationReason::Superseded`: Request superseded by racing pair (aborts HTTP but doesn't persist)
377    ///
378    /// Returns:
379    /// - `RequestCompletionResult::Completed` if the HTTP request succeeded
380    /// - `RequestCompletionResult::Failed` if the HTTP request failed or should be retried
381    /// - `RequestCompletionResult::Canceled` if the request was canceled by user
382    /// - `Err(FusilladeError::Shutdown)` if the daemon is shutting down or request was superseded
383    pub async fn complete<S, F, Fut>(
384        self,
385        storage: &S,
386        should_retry: F,
387        cancellation: Fut,
388    ) -> Result<RequestCompletionResult>
389    where
390        S: Storage + ?Sized,
391        F: Fn(&HttpResponse) -> bool,
392        Fut: std::future::Future<Output = CancellationReason>,
393    {
394        // Await the result from the channel (lock the mutex to access the receiver)
395        // We use an enum to track whether we got a result or cancellation so we can
396        // drop the mutex guard before calling self.cancel()
397        enum Outcome {
398            Result(Option<std::result::Result<HttpResponse, FusilladeError>>),
399            Canceled(CancellationReason),
400        }
401
402        let outcome = {
403            let mut rx = self.state.result_rx.lock().await;
404
405            tokio::select! {
406                // Wait for the HTTP request to finish processing
407                result = rx.recv() => Outcome::Result(result),
408                // Handle cancellation
409                reason = cancellation => Outcome::Canceled(reason),
410            }
411        };
412
413        // Handle cancellation outside the mutex guard
414        let result = match outcome {
415            Outcome::Canceled(CancellationReason::User) => {
416                // User cancellation: persist Canceled state
417                // (self.cancel() will abort the HTTP task)
418                let canceled = self.cancel(storage).await?;
419                return Ok(RequestCompletionResult::Canceled(canceled));
420            }
421            Outcome::Canceled(CancellationReason::Shutdown) => {
422                // Shutdown: abort HTTP task but don't persist state change
423                // Request stays in Processing state and will be reclaimed later
424                self.state.abort_handle.abort();
425                return Err(FusilladeError::Shutdown);
426            }
427            Outcome::Canceled(CancellationReason::Superseded) => {
428                // Superseded: abort HTTP task but don't persist state change
429                // supersede_racing_pair already updated the DB to state='superseded'
430                self.state.abort_handle.abort();
431                return Err(FusilladeError::Shutdown);
432            }
433            Outcome::Result(result) => result,
434        };
435
436        match result {
437            Some(Ok(http_response)) => {
438                // Check if this is an error response (4xx or 5xx)
439                let is_error = http_response.status >= 400;
440
441                // Check if this response should be retried
442                if should_retry(&http_response) {
443                    // Record retriable HTTP status for observability
444                    counter!(
445                        "fusillade_http_status_retriable_total",
446                        "model" => self.data.model.clone(),
447                        "status" => http_response.status.to_string()
448                    )
449                    .increment(1);
450
451                    // Treat as failure for retry purposes
452                    let failed_state = Failed {
453                        reason: FailureReason::RetriableHttpStatus {
454                            status: http_response.status,
455                            body: http_response.body.clone(),
456                        },
457                        failed_at: chrono::Utc::now(),
458                        retry_attempt: self.state.retry_attempt,
459                        batch_expires_at: self.state.batch_expires_at,
460                    };
461                    let request = Request {
462                        data: self.data,
463                        state: failed_state,
464                    };
465                    Ok(RequestCompletionResult::Failed(request))
466                } else if is_error {
467                    // Non-retriable error (e.g., 4xx client errors)
468                    // Mark as failed but don't retry
469                    let failed_state = Failed {
470                        reason: FailureReason::NonRetriableHttpStatus {
471                            status: http_response.status,
472                            body: http_response.body.clone(),
473                        },
474                        failed_at: chrono::Utc::now(),
475                        retry_attempt: self.state.retry_attempt,
476                        batch_expires_at: self.state.batch_expires_at,
477                    };
478                    let request = Request {
479                        data: self.data,
480                        state: failed_state,
481                    };
482                    storage.persist(&request).await?;
483                    Ok(RequestCompletionResult::Failed(request))
484                } else {
485                    // HTTP request completed successfully
486                    let completed_state = Completed {
487                        response_status: http_response.status,
488                        response_body: http_response.body,
489                        claimed_at: self.state.claimed_at,
490                        started_at: self.state.started_at,
491                        completed_at: chrono::Utc::now(),
492                    };
493                    let request = Request {
494                        data: self.data,
495                        state: completed_state,
496                    };
497                    storage.persist(&request).await?;
498                    Ok(RequestCompletionResult::Completed(request))
499                }
500            }
501            Some(Err(e)) => {
502                // HTTP request failed (network error, timeout, etc.)
503                let failed_state = Failed {
504                    reason: FailureReason::NetworkError {
505                        error: crate::error::error_serialization::serialize_error(&e.into()),
506                    },
507                    failed_at: chrono::Utc::now(),
508                    retry_attempt: self.state.retry_attempt,
509                    batch_expires_at: self.state.batch_expires_at,
510                };
511                let request = Request {
512                    data: self.data,
513                    state: failed_state,
514                };
515                Ok(RequestCompletionResult::Failed(request))
516            }
517            None => {
518                // Channel closed - task died without sending a result
519                let failed_state = Failed {
520                    reason: FailureReason::TaskTerminated,
521                    failed_at: chrono::Utc::now(),
522                    retry_attempt: self.state.retry_attempt,
523                    batch_expires_at: self.state.batch_expires_at,
524                };
525                let request = Request {
526                    data: self.data,
527                    state: failed_state,
528                };
529                storage.persist(&request).await?;
530                Ok(RequestCompletionResult::Failed(request))
531            }
532        }
533    }
534
535    pub async fn cancel<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Canceled>> {
536        // Abort the in-flight HTTP request
537        self.state.abort_handle.abort();
538
539        let request = Request {
540            data: self.data,
541            state: Canceled {
542                canceled_at: chrono::Utc::now(),
543            },
544        };
545        storage.persist(&request).await?;
546        Ok(request)
547    }
548}