fusillade/request/transitions.rs
1//! State transitions for batch requests using the typestate pattern.
2//!
3//! This module implements state transitions for HTTP batch requests using Rust's
4//! type system to enforce valid state transitions at compile time. Each request
5//! state is represented as a distinct type parameter on `Request<State>`.
6//!
7//! # Typestate Pattern
8//!
9//! The typestate pattern leverages Rust's type system to make invalid states
10//! unrepresentable. A `Request<Pending>` can only call methods available for
11//! pending requests, and transitions return different types:
12//!
13//! ```text
14//! Request<Pending> ──claim()──> Request<Claimed> ──process()──> Request<Processing>
15//! │ │ │
16//! │ │ └──complete()──> Request<Completed>
17//! │ │ └──complete()──> Request<Failed>
18//! └──cancel()──> Request<Canceled> └──cancel()────> Request<Canceled>
19//! │
20//! └──unclaim()─> Request<Pending>
21//!
22//! Request<Failed> ──retry()──> Request<Pending> (if retries remain)
23//! ──retry()──> None (if max retries reached)
24//! ```
25//!
26//! # State Lifecycle
27//!
28//! ## 1. Pending → Claimed
29//!
30//! A daemon claims a pending request for processing:
31//! - Records which daemon claimed it
32//! - Sets claimed_at timestamp
33//! - Preserves retry attempt count
34//!
35//! ## 2. Claimed → Processing
36//!
37//! The daemon starts executing the HTTP request:
38//! - Spawns an async task to make the HTTP call
39//! - Creates a channel to receive the result
40//! - Provides an abort handle for cancellation
41//!
42//! ## 3. Processing → Completed or Failed
43//!
44//! The HTTP request completes:
45//! - **Success**: Transitions to `Completed` with response body
46//! - **Failure**: Transitions to `Failed` with error message
47//! - **Retriable**: HTTP succeeded but status code indicates retry (e.g., 429, 500)
48//!
49//! ## 4. Failed → Pending (Retry)
50//!
51//! Failed requests can be retried with exponential backoff:
52//! - Increments retry_attempt counter
53//! - Calculates backoff delay: `backoff_ms * (factor ^ attempt)`
54//! - Sets not_before timestamp to delay retry
55//! - Returns `None` if max retries exceeded
56//!
57//! ## 5. Any State → Canceled
58//!
59//! Requests can be canceled from most states:
60//! - `Pending`: Simply marks as canceled
61//! - `Claimed`: Releases claim and cancels
62//! - `Processing`: Aborts the in-flight HTTP request
63//!
64//! # Retry Configuration
65//!
66//! Exponential backoff and retry limits are configured via [`RetryConfig`]:
67//!
68//! ```rust
69//! # use fusillade::request::transitions::RetryConfig;
70//! let config = RetryConfig {
71//! max_retries: Some(1000),
72//! stop_before_deadline_ms: Some(900_000),
73//! backoff_ms: 1000, // Start with 1 second
74//! backoff_factor: 2, // Double each time (1s, 2s, 4s)
75//! max_backoff_ms: 60000, // Cap at 60 seconds
76//! };
77//! ```
78//!
79//! # Example Workflow
80//!
81//! ```ignore
82//! // Daemon claims a pending request
83//! let pending: Request<Pending> = storage.next_pending().await?;
84//! let claimed = pending.claim(daemon_id, &storage).await?;
85//!
86//! // Start processing
87//! let processing = claimed.process(http_client, timeout_ms, &storage).await?;
88//!
89//! // Wait for completion
90//! let result = processing.complete(&storage, |resp| resp.status >= 500).await?;
91//!
92//! match result {
93//! Ok(completed) => println!("Success: {}", completed.state.response_status),
94//! Err(failed) => {
95//! // Attempt retry with backoff
96//! if let Some(retrying) = failed.retry(retry_attempt, config, &storage).await? {
97//! println!("Retrying request...");
98//! } else {
99//! println!("Max retries exceeded");
100//! }
101//! }
102//! }
103//! ```
104
105use std::sync::Arc;
106
107use metrics::counter;
108use tokio::sync::Mutex;
109
110use crate::{
111 FusilladeError,
112 error::Result,
113 http::{HttpClient, HttpResponse},
114 manager::Storage,
115};
116
117use super::types::{
118 Canceled, Claimed, Completed, DaemonId, Failed, FailureReason, Pending, Processing, Request,
119 RequestCompletionResult,
120};
121
122/// Reason for cancelling a request.
123#[derive(Debug, Clone, Copy)]
124pub enum CancellationReason {
125 /// User-initiated cancellation (should persist Canceled state).
126 User,
127 /// Daemon shutdown (abort HTTP but don't persist state change).
128 Shutdown,
129 /// Request superseded by racing pair (abort HTTP but don't persist - supersede_racing_pair already updated DB).
130 Superseded,
131}
132
133impl Request<Pending> {
134 pub async fn claim<S: Storage + ?Sized>(
135 self,
136 daemon_id: DaemonId,
137 storage: &S,
138 ) -> Result<Request<Claimed>> {
139 let request = Request {
140 data: self.data,
141 state: Claimed {
142 daemon_id,
143 claimed_at: chrono::Utc::now(),
144 retry_attempt: self.state.retry_attempt, // Carry over retry attempt
145 batch_expires_at: self.state.batch_expires_at, // Carry over batch deadline
146 },
147 };
148 storage.persist(&request).await?;
149 Ok(request)
150 }
151
152 pub async fn cancel<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Canceled>> {
153 let request = Request {
154 data: self.data,
155 state: Canceled {
156 canceled_at: chrono::Utc::now(),
157 },
158 };
159 storage.persist(&request).await?;
160 Ok(request)
161 }
162}
163
164impl Request<Claimed> {
165 pub async fn unclaim<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Pending>> {
166 let request = Request {
167 data: self.data,
168 state: Pending {
169 retry_attempt: self.state.retry_attempt, // Preserve retry attempt
170 not_before: None, // Can be claimed immediately
171 batch_expires_at: self.state.batch_expires_at, // Carry over batch deadline
172 },
173 };
174 storage.persist(&request).await?;
175 Ok(request)
176 }
177
178 pub async fn cancel<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Canceled>> {
179 let request = Request {
180 data: self.data,
181 state: Canceled {
182 canceled_at: chrono::Utc::now(),
183 },
184 };
185 storage.persist(&request).await?;
186 Ok(request)
187 }
188
189 pub async fn process<H: HttpClient + 'static, S: Storage>(
190 self,
191 http_client: H,
192 timeout_ms: u64,
193 storage: &S,
194 ) -> Result<Request<Processing>> {
195 let request_data = self.data.clone();
196
197 // Create a channel for the HTTP result
198 let (tx, rx) = tokio::sync::mpsc::channel(1);
199
200 // Spawn the HTTP request as an async task
201 let task_handle = tokio::spawn(async move {
202 let result = http_client
203 .execute(&request_data, &request_data.api_key, timeout_ms)
204 .await;
205 let _ = tx.send(result).await; // Ignore send errors (receiver dropped)
206 });
207
208 let processing_state = Processing {
209 daemon_id: self.state.daemon_id,
210 claimed_at: self.state.claimed_at,
211 started_at: chrono::Utc::now(),
212 retry_attempt: self.state.retry_attempt, // Carry over retry attempt
213 batch_expires_at: self.state.batch_expires_at, // Carry over batch deadline
214 result_rx: Arc::new(Mutex::new(rx)),
215 abort_handle: task_handle.abort_handle(),
216 };
217
218 let request = Request {
219 data: self.data,
220 state: processing_state,
221 };
222
223 // Persist the Processing state so we can cancel it if needed
224 // If persist fails, abort the spawned HTTP task
225 if let Err(e) = storage.persist(&request).await {
226 request.state.abort_handle.abort();
227 return Err(e);
228 }
229
230 Ok(request)
231 }
232}
233
234/// Configuration for retry behavior.
235#[derive(Debug, Clone)]
236pub struct RetryConfig {
237 pub max_retries: Option<u32>,
238 pub stop_before_deadline_ms: Option<i64>,
239 pub backoff_ms: u64,
240 pub backoff_factor: u64,
241 pub max_backoff_ms: u64,
242}
243
244impl From<&crate::daemon::DaemonConfig> for RetryConfig {
245 fn from(config: &crate::daemon::DaemonConfig) -> Self {
246 RetryConfig {
247 max_retries: config.max_retries,
248 stop_before_deadline_ms: config.stop_before_deadline_ms,
249 backoff_ms: config.backoff_ms,
250 backoff_factor: config.backoff_factor,
251 max_backoff_ms: config.max_backoff_ms,
252 }
253 }
254}
255
256impl Request<Failed> {
257 /// Attempt to retry this failed request.
258 ///
259 /// If retries are available, transitions the request back to Pending with:
260 /// - Incremented retry_attempt
261 /// - Calculated not_before timestamp for exponential backoff
262 ///
263 /// If no retries remain, returns None and the request stays Failed.
264 ///
265 /// The retry logic considers:
266 /// - max_retries: Hard cap on total retry attempts
267 /// - stop_before_deadline_ms: Deadline-aware retry (stops before batch expiration)
268 pub fn can_retry(
269 self,
270 retry_attempt: u32,
271 config: RetryConfig,
272 ) -> std::result::Result<Request<Pending>, Box<Self>> {
273 // Calculate exponential backoff: backoff_ms * (backoff_factor ^ retry_attempt)
274 let backoff_duration = {
275 let exponential = config
276 .backoff_ms
277 .saturating_mul(config.backoff_factor.saturating_pow(retry_attempt));
278 exponential.min(config.max_backoff_ms)
279 };
280
281 let now = chrono::Utc::now();
282 let not_before = now + chrono::Duration::milliseconds(backoff_duration as i64);
283
284 if let Some(max_retries) = config.max_retries
285 && retry_attempt >= max_retries
286 {
287 counter!(
288 "fusillade_retry_denied_total",
289 "model" => self.data.model.clone(),
290 "reason" => "max_retries"
291 )
292 .increment(1);
293 tracing::debug!(
294 request_id = %self.data.id,
295 retry_attempt,
296 max_retries,
297 "No retries remaining (reached max_retries), request remains failed"
298 );
299 return Err(Box::new(self));
300 }
301
302 // Determine the effective deadline (with or without buffer)
303 let effective_deadline = if let Some(stop_before_deadline_ms) =
304 config.stop_before_deadline_ms
305 {
306 self.state.batch_expires_at - chrono::Duration::milliseconds(stop_before_deadline_ms)
307 } else {
308 // No buffer configured - use the actual deadline
309 self.state.batch_expires_at
310 };
311
312 // Check if the next retry would start before the effective deadline
313 if not_before >= effective_deadline {
314 counter!(
315 "fusillade_retry_denied_total",
316 "model" => self.data.model.clone(),
317 "reason" => "deadline"
318 )
319 .increment(1);
320 let time_until_deadline = self.state.batch_expires_at - now;
321 tracing::warn!(
322 request_id = %self.data.id,
323 retry_attempt,
324 time_until_deadline_seconds = time_until_deadline.num_seconds(),
325 batch_expires_at = %self.state.batch_expires_at,
326 stop_before_deadline_ms = config.stop_before_deadline_ms,
327 "No retries remaining (would exceed batch deadline), request remains failed"
328 );
329 return Err(Box::new(self));
330 }
331
332 let time_until_deadline = effective_deadline - now;
333 tracing::debug!(
334 request_id = %self.data.id,
335 retry_attempt,
336 time_until_deadline_seconds = time_until_deadline.num_seconds(),
337 batch_expires_at = %self.state.batch_expires_at,
338 "Retrying (deadline-aware: time remaining)"
339 );
340
341 tracing::info!(
342 request_id = %self.data.id,
343 retry_attempt = retry_attempt + 1,
344 backoff_ms = backoff_duration,
345 not_before = %not_before,
346 batch_expires_at = %self.state.batch_expires_at,
347 "Retrying failed request with exponential backoff"
348 );
349
350 let request = Request {
351 data: self.data,
352 state: Pending {
353 retry_attempt: retry_attempt + 1,
354 not_before: Some(not_before),
355 batch_expires_at: self.state.batch_expires_at,
356 },
357 };
358
359 Ok(request)
360 }
361}
362
363impl Request<Processing> {
364 /// Wait for the HTTP request to complete.
365 ///
366 /// This method awaits the result from the spawned HTTP task and transitions
367 /// the request to one of three terminal states: `Completed`, `Failed`, or `Canceled`.
368 ///
369 /// The `should_retry` predicate determines whether a response should be considered
370 /// a failure (and thus eligible for retry) or a success.
371 ///
372 /// The `cancellation` future allows external cancellation of the request. It should
373 /// resolve to a `CancellationReason`:
374 /// - `CancellationReason::User`: User-initiated cancellation (persists Canceled state)
375 /// - `CancellationReason::Shutdown`: Daemon shutdown (aborts HTTP but doesn't persist)
376 /// - `CancellationReason::Superseded`: Request superseded by racing pair (aborts HTTP but doesn't persist)
377 ///
378 /// Returns:
379 /// - `RequestCompletionResult::Completed` if the HTTP request succeeded
380 /// - `RequestCompletionResult::Failed` if the HTTP request failed or should be retried
381 /// - `RequestCompletionResult::Canceled` if the request was canceled by user
382 /// - `Err(FusilladeError::Shutdown)` if the daemon is shutting down or request was superseded
383 pub async fn complete<S, F, Fut>(
384 self,
385 storage: &S,
386 should_retry: F,
387 cancellation: Fut,
388 ) -> Result<RequestCompletionResult>
389 where
390 S: Storage + ?Sized,
391 F: Fn(&HttpResponse) -> bool,
392 Fut: std::future::Future<Output = CancellationReason>,
393 {
394 // Await the result from the channel (lock the mutex to access the receiver)
395 // We use an enum to track whether we got a result or cancellation so we can
396 // drop the mutex guard before calling self.cancel()
397 enum Outcome {
398 Result(Option<std::result::Result<HttpResponse, FusilladeError>>),
399 Canceled(CancellationReason),
400 }
401
402 let outcome = {
403 let mut rx = self.state.result_rx.lock().await;
404
405 tokio::select! {
406 // Wait for the HTTP request to finish processing
407 result = rx.recv() => Outcome::Result(result),
408 // Handle cancellation
409 reason = cancellation => Outcome::Canceled(reason),
410 }
411 };
412
413 // Handle cancellation outside the mutex guard
414 let result = match outcome {
415 Outcome::Canceled(CancellationReason::User) => {
416 // User cancellation: persist Canceled state
417 // (self.cancel() will abort the HTTP task)
418 let canceled = self.cancel(storage).await?;
419 return Ok(RequestCompletionResult::Canceled(canceled));
420 }
421 Outcome::Canceled(CancellationReason::Shutdown) => {
422 // Shutdown: abort HTTP task but don't persist state change
423 // Request stays in Processing state and will be reclaimed later
424 self.state.abort_handle.abort();
425 return Err(FusilladeError::Shutdown);
426 }
427 Outcome::Canceled(CancellationReason::Superseded) => {
428 // Superseded: abort HTTP task but don't persist state change
429 // supersede_racing_pair already updated the DB to state='superseded'
430 self.state.abort_handle.abort();
431 return Err(FusilladeError::Shutdown);
432 }
433 Outcome::Result(result) => result,
434 };
435
436 match result {
437 Some(Ok(http_response)) => {
438 // Check if this is an error response (4xx or 5xx)
439 let is_error = http_response.status >= 400;
440
441 // Check if this response should be retried
442 if should_retry(&http_response) {
443 // Record retriable HTTP status for observability
444 counter!(
445 "fusillade_http_status_retriable_total",
446 "model" => self.data.model.clone(),
447 "status" => http_response.status.to_string()
448 )
449 .increment(1);
450
451 // Treat as failure for retry purposes
452 let failed_state = Failed {
453 reason: FailureReason::RetriableHttpStatus {
454 status: http_response.status,
455 body: http_response.body.clone(),
456 },
457 failed_at: chrono::Utc::now(),
458 retry_attempt: self.state.retry_attempt,
459 batch_expires_at: self.state.batch_expires_at,
460 };
461 let request = Request {
462 data: self.data,
463 state: failed_state,
464 };
465 Ok(RequestCompletionResult::Failed(request))
466 } else if is_error {
467 // Non-retriable error (e.g., 4xx client errors)
468 // Mark as failed but don't retry
469 let failed_state = Failed {
470 reason: FailureReason::NonRetriableHttpStatus {
471 status: http_response.status,
472 body: http_response.body.clone(),
473 },
474 failed_at: chrono::Utc::now(),
475 retry_attempt: self.state.retry_attempt,
476 batch_expires_at: self.state.batch_expires_at,
477 };
478 let request = Request {
479 data: self.data,
480 state: failed_state,
481 };
482 storage.persist(&request).await?;
483 Ok(RequestCompletionResult::Failed(request))
484 } else {
485 // HTTP request completed successfully
486 let completed_state = Completed {
487 response_status: http_response.status,
488 response_body: http_response.body,
489 claimed_at: self.state.claimed_at,
490 started_at: self.state.started_at,
491 completed_at: chrono::Utc::now(),
492 };
493 let request = Request {
494 data: self.data,
495 state: completed_state,
496 };
497 storage.persist(&request).await?;
498 Ok(RequestCompletionResult::Completed(request))
499 }
500 }
501 Some(Err(e)) => {
502 // HTTP request failed (network error, timeout, etc.)
503 let failed_state = Failed {
504 reason: FailureReason::NetworkError {
505 error: crate::error::error_serialization::serialize_error(&e.into()),
506 },
507 failed_at: chrono::Utc::now(),
508 retry_attempt: self.state.retry_attempt,
509 batch_expires_at: self.state.batch_expires_at,
510 };
511 let request = Request {
512 data: self.data,
513 state: failed_state,
514 };
515 Ok(RequestCompletionResult::Failed(request))
516 }
517 None => {
518 // Channel closed - task died without sending a result
519 let failed_state = Failed {
520 reason: FailureReason::TaskTerminated,
521 failed_at: chrono::Utc::now(),
522 retry_attempt: self.state.retry_attempt,
523 batch_expires_at: self.state.batch_expires_at,
524 };
525 let request = Request {
526 data: self.data,
527 state: failed_state,
528 };
529 storage.persist(&request).await?;
530 Ok(RequestCompletionResult::Failed(request))
531 }
532 }
533 }
534
535 pub async fn cancel<S: Storage + ?Sized>(self, storage: &S) -> Result<Request<Canceled>> {
536 // Abort the in-flight HTTP request
537 self.state.abort_handle.abort();
538
539 let request = Request {
540 data: self.data,
541 state: Canceled {
542 canceled_at: chrono::Utc::now(),
543 },
544 };
545 storage.persist(&request).await?;
546 Ok(request)
547 }
548}