Skip to main content

fastapi_core/
context.rs

1//! Request context with asupersync integration.
2//!
3//! [`RequestContext`] wraps asupersync's [`Cx`] to provide request-scoped
4//! capabilities for HTTP request handling.
5
6use asupersync::types::CancelReason;
7use asupersync::{Budget, Cx, Outcome, RegionId, TaskId};
8use std::sync::Arc;
9
10use crate::dependency::{CleanupStack, DependencyCache, DependencyOverrides, ResolutionStack};
11
12/// Default maximum body size: 1MB.
13pub const DEFAULT_MAX_BODY_SIZE: usize = 1024 * 1024;
14
15/// Configuration for request body limits.
16///
17/// This struct holds the body size limit configuration that applies to a request.
18/// It can be configured at the application level (via `AppConfig`) and optionally
19/// overridden on a per-route basis.
20#[derive(Debug, Clone, Copy)]
21pub struct BodyLimitConfig {
22    /// Maximum body size in bytes.
23    max_size: usize,
24}
25
26impl Default for BodyLimitConfig {
27    fn default() -> Self {
28        Self {
29            max_size: DEFAULT_MAX_BODY_SIZE,
30        }
31    }
32}
33
34impl BodyLimitConfig {
35    /// Creates a new body limit config with the specified maximum size.
36    #[must_use]
37    pub fn new(max_size: usize) -> Self {
38        Self { max_size }
39    }
40
41    /// Returns the maximum body size in bytes.
42    #[must_use]
43    pub fn max_size(&self) -> usize {
44        self.max_size
45    }
46}
47
48/// Request context that wraps asupersync's capability context.
49///
50/// `RequestContext` provides access to:
51/// - Request-scoped identity (request ID, trace context)
52/// - Cancellation checkpoints for cancel-safe handlers
53/// - Budget/deadline awareness for timeout enforcement
54/// - Region-scoped spawning for background work
55/// - Body size limit configuration for DoS prevention
56///
57/// # Example
58///
59/// ```ignore
60/// async fn handler(ctx: &RequestContext) -> impl IntoResponse {
61///     // Check for client disconnect
62///     ctx.checkpoint()?;
63///
64///     // Get remaining time budget
65///     let remaining = ctx.remaining_budget();
66///
67///     // Check body size limit
68///     let max_body = ctx.body_limit().max_size();
69///
70///     // Do work...
71///     "Hello, World!"
72/// }
73/// ```
74#[derive(Debug, Clone)]
75pub struct RequestContext {
76    /// The underlying capability context.
77    cx: Cx,
78    /// Unique request identifier for tracing.
79    request_id: u64,
80    /// Request-scoped dependency cache.
81    dependency_cache: Arc<DependencyCache>,
82    /// Dependency overrides (primarily for testing).
83    dependency_overrides: Arc<DependencyOverrides>,
84    /// Stack tracking dependencies currently being resolved (for cycle detection).
85    resolution_stack: Arc<ResolutionStack>,
86    /// Cleanup functions to run after handler completion (LIFO order).
87    cleanup_stack: Arc<CleanupStack>,
88    /// Body size limit configuration for this request.
89    body_limit: BodyLimitConfig,
90}
91
92impl RequestContext {
93    /// Creates a new request context from an asupersync Cx.
94    ///
95    /// This is typically called by the server when accepting a new request,
96    /// creating a new region for the request lifecycle. Uses the default
97    /// body size limit (1MB).
98    #[must_use]
99    pub fn new(cx: Cx, request_id: u64) -> Self {
100        Self {
101            cx,
102            request_id,
103            dependency_cache: Arc::new(DependencyCache::new()),
104            dependency_overrides: Arc::new(DependencyOverrides::new()),
105            resolution_stack: Arc::new(ResolutionStack::new()),
106            cleanup_stack: Arc::new(CleanupStack::new()),
107            body_limit: BodyLimitConfig::default(),
108        }
109    }
110
111    /// Creates a new request context with a custom body size limit.
112    ///
113    /// Use this when the application has configured a specific `max_body_size`
114    /// in `AppConfig`, or when a route has an override.
115    #[must_use]
116    pub fn with_body_limit(cx: Cx, request_id: u64, max_body_size: usize) -> Self {
117        Self {
118            cx,
119            request_id,
120            dependency_cache: Arc::new(DependencyCache::new()),
121            dependency_overrides: Arc::new(DependencyOverrides::new()),
122            resolution_stack: Arc::new(ResolutionStack::new()),
123            cleanup_stack: Arc::new(CleanupStack::new()),
124            body_limit: BodyLimitConfig::new(max_body_size),
125        }
126    }
127
128    /// Creates a new request context with shared dependency overrides.
129    #[must_use]
130    pub fn with_overrides(cx: Cx, request_id: u64, overrides: Arc<DependencyOverrides>) -> Self {
131        Self {
132            cx,
133            request_id,
134            dependency_cache: Arc::new(DependencyCache::new()),
135            dependency_overrides: overrides,
136            resolution_stack: Arc::new(ResolutionStack::new()),
137            cleanup_stack: Arc::new(CleanupStack::new()),
138            body_limit: BodyLimitConfig::default(),
139        }
140    }
141
142    /// Creates a new request context with overrides and a custom body size limit.
143    #[must_use]
144    pub fn with_overrides_and_body_limit(
145        cx: Cx,
146        request_id: u64,
147        overrides: Arc<DependencyOverrides>,
148        max_body_size: usize,
149    ) -> Self {
150        Self {
151            cx,
152            request_id,
153            dependency_cache: Arc::new(DependencyCache::new()),
154            dependency_overrides: overrides,
155            resolution_stack: Arc::new(ResolutionStack::new()),
156            cleanup_stack: Arc::new(CleanupStack::new()),
157            body_limit: BodyLimitConfig::new(max_body_size),
158        }
159    }
160
161    /// Returns the unique request identifier.
162    ///
163    /// Useful for logging and tracing across the request lifecycle.
164    #[must_use]
165    pub fn request_id(&self) -> u64 {
166        self.request_id
167    }
168
169    /// Returns the dependency cache for this request.
170    #[must_use]
171    pub fn dependency_cache(&self) -> &DependencyCache {
172        &self.dependency_cache
173    }
174
175    /// Returns the dependency overrides registry.
176    #[must_use]
177    pub fn dependency_overrides(&self) -> &DependencyOverrides {
178        &self.dependency_overrides
179    }
180
181    /// Returns the resolution stack for cycle detection.
182    #[must_use]
183    pub fn resolution_stack(&self) -> &ResolutionStack {
184        &self.resolution_stack
185    }
186
187    /// Returns the cleanup stack for registering cleanup functions.
188    ///
189    /// Cleanup functions run after the handler completes in LIFO order.
190    #[must_use]
191    pub fn cleanup_stack(&self) -> &CleanupStack {
192        &self.cleanup_stack
193    }
194
195    /// Returns the body limit configuration for this request.
196    ///
197    /// This can be used by body extractors (e.g., `Json<T>`) to enforce
198    /// size limits and prevent DoS attacks.
199    #[must_use]
200    pub fn body_limit(&self) -> &BodyLimitConfig {
201        &self.body_limit
202    }
203
204    /// Returns the maximum body size in bytes for this request.
205    ///
206    /// This is a convenience method equivalent to `ctx.body_limit().max_size()`.
207    #[must_use]
208    pub fn max_body_size(&self) -> usize {
209        self.body_limit.max_size()
210    }
211
212    /// Returns the underlying region ID from asupersync.
213    ///
214    /// The region represents the request's lifecycle scope - all spawned
215    /// tasks belong to this region and will be cleaned up when the
216    /// request completes or is cancelled.
217    #[must_use]
218    pub fn region_id(&self) -> RegionId {
219        self.cx.region_id()
220    }
221
222    /// Returns the current task ID.
223    #[must_use]
224    pub fn task_id(&self) -> TaskId {
225        self.cx.task_id()
226    }
227
228    /// Returns the current budget.
229    ///
230    /// The budget represents the remaining computational resources (time, polls)
231    /// available for this request. When exhausted, the request should be
232    /// cancelled gracefully.
233    #[must_use]
234    pub fn budget(&self) -> Budget {
235        self.cx.budget()
236    }
237
238    /// Checks if cancellation has been requested.
239    ///
240    /// This includes client disconnection, timeout, or explicit cancellation.
241    /// Handlers should check this periodically and exit early if true.
242    #[must_use]
243    pub fn is_cancelled(&self) -> bool {
244        self.cx.is_cancel_requested()
245    }
246
247    /// Cooperative cancellation checkpoint.
248    ///
249    /// Call this at natural suspension points in your handler to allow
250    /// graceful cancellation. Returns `Err` if cancellation is pending.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the request has been cancelled and cancellation
255    /// is not currently masked.
256    ///
257    /// # Example
258    ///
259    /// ```ignore
260    /// async fn process_items(ctx: &RequestContext, items: Vec<Item>) -> Result<(), HttpError> {
261    ///     for item in items {
262    ///         ctx.checkpoint()?;  // Allow cancellation between items
263    ///         process_item(item).await?;
264    ///     }
265    ///     Ok(())
266    /// }
267    /// ```
268    pub fn checkpoint(&self) -> Result<(), CancelledError> {
269        self.cx.checkpoint().map_err(|_| CancelledError)
270    }
271
272    /// Executes a closure with cancellation masked.
273    ///
274    /// While masked, `checkpoint()` will not return an error even if
275    /// cancellation is pending. Use this for critical sections that
276    /// must complete atomically.
277    ///
278    /// # Example
279    ///
280    /// ```ignore
281    /// // Commit transaction - must not be interrupted
282    /// ctx.masked(|| {
283    ///     db.commit().await?;
284    ///     Ok(())
285    /// })
286    /// ```
287    pub fn masked<F, R>(&self, f: F) -> R
288    where
289        F: FnOnce() -> R,
290    {
291        self.cx.masked(f)
292    }
293
294    /// Records a trace event for this request.
295    ///
296    /// Events are associated with the request's trace context and can be
297    /// used for debugging and observability.
298    pub fn trace(&self, message: &str) {
299        self.cx.trace(message);
300    }
301
302    /// Returns a reference to the underlying asupersync Cx.
303    ///
304    /// Use this when you need direct access to asupersync primitives,
305    /// such as spawning tasks or using combinators.
306    #[must_use]
307    pub fn cx(&self) -> &Cx {
308        &self.cx
309    }
310}
311
312/// Error returned when a request has been cancelled.
313///
314/// This is returned by `checkpoint()` when the request should stop
315/// processing. The server will convert this to an appropriate HTTP
316/// response (typically 499 Client Closed Request or 504 Gateway Timeout).
317#[derive(Debug, Clone, Copy)]
318pub struct CancelledError;
319
320impl std::fmt::Display for CancelledError {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        write!(f, "request cancelled")
323    }
324}
325
326impl std::error::Error for CancelledError {}
327
328/// Extension trait for converting HTTP results to asupersync Outcome.
329///
330/// This bridges the HTTP error model with asupersync's 4-valued outcome
331/// (Ok, Err, Cancelled, Panicked).
332pub trait IntoOutcome<T, E> {
333    /// Converts this result into an asupersync Outcome.
334    fn into_outcome(self) -> Outcome<T, E>;
335}
336
337impl<T, E> IntoOutcome<T, E> for Result<T, E> {
338    fn into_outcome(self) -> Outcome<T, E> {
339        match self {
340            Ok(v) => Outcome::Ok(v),
341            Err(e) => Outcome::Err(e),
342        }
343    }
344}
345
346impl<T, E> IntoOutcome<T, E> for Result<T, CancelledError>
347where
348    E: Default,
349{
350    fn into_outcome(self) -> Outcome<T, E> {
351        match self {
352            Ok(v) => Outcome::Ok(v),
353            Err(CancelledError) => Outcome::Cancelled(CancelReason::user("request cancelled")),
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn cancelled_error_display() {
364        let err = CancelledError;
365        assert_eq!(format!("{err}"), "request cancelled");
366    }
367
368    #[test]
369    fn checkpoint_returns_error_when_cancel_requested() {
370        let cx = Cx::for_testing();
371        let ctx = RequestContext::new(cx, 1);
372        ctx.cx().set_cancel_requested(true);
373        assert!(ctx.checkpoint().is_err());
374    }
375
376    #[test]
377    fn masked_defers_cancellation_at_checkpoint() {
378        let cx = Cx::for_testing();
379        let ctx = RequestContext::new(cx, 1);
380        ctx.cx().set_cancel_requested(true);
381
382        let result = ctx.masked(|| ctx.checkpoint());
383        assert!(result.is_ok());
384        assert!(ctx.checkpoint().is_err());
385    }
386
387    // ========================================================================
388    // Body Limit Tests
389    // ========================================================================
390
391    #[test]
392    fn body_limit_config_default() {
393        let config = BodyLimitConfig::default();
394        assert_eq!(config.max_size(), DEFAULT_MAX_BODY_SIZE);
395        assert_eq!(config.max_size(), 1024 * 1024); // 1MB
396    }
397
398    #[test]
399    fn body_limit_config_custom() {
400        let config = BodyLimitConfig::new(512 * 1024);
401        assert_eq!(config.max_size(), 512 * 1024); // 512KB
402    }
403
404    #[test]
405    fn request_context_default_body_limit() {
406        let cx = Cx::for_testing();
407        let ctx = RequestContext::new(cx, 1);
408        assert_eq!(ctx.max_body_size(), DEFAULT_MAX_BODY_SIZE);
409        assert_eq!(ctx.body_limit().max_size(), DEFAULT_MAX_BODY_SIZE);
410    }
411
412    #[test]
413    fn request_context_custom_body_limit() {
414        let cx = Cx::for_testing();
415        let ctx = RequestContext::with_body_limit(cx, 1, 2 * 1024 * 1024);
416        assert_eq!(ctx.max_body_size(), 2 * 1024 * 1024); // 2MB
417    }
418
419    #[test]
420    fn request_context_with_overrides_has_default_limit() {
421        let cx = Cx::for_testing();
422        let overrides = Arc::new(DependencyOverrides::new());
423        let ctx = RequestContext::with_overrides(cx, 1, overrides);
424        assert_eq!(ctx.max_body_size(), DEFAULT_MAX_BODY_SIZE);
425    }
426
427    #[test]
428    fn request_context_with_overrides_and_custom_limit() {
429        let cx = Cx::for_testing();
430        let overrides = Arc::new(DependencyOverrides::new());
431        let ctx = RequestContext::with_overrides_and_body_limit(cx, 1, overrides, 4 * 1024 * 1024);
432        assert_eq!(ctx.max_body_size(), 4 * 1024 * 1024); // 4MB
433    }
434
435    // ========================================================================
436    // bd-3st7: Request Isolation Tests
437    // ========================================================================
438
439    #[test]
440    #[allow(clippy::similar_names)]
441    fn request_id_isolation_unique_per_context() {
442        // Test that each RequestContext gets the request_id it was created with (bd-3st7)
443        let cx1 = Cx::for_testing();
444        let cx2 = Cx::for_testing();
445        let cx3 = Cx::for_testing();
446
447        let ctx1 = RequestContext::new(cx1, 100);
448        let ctx2 = RequestContext::new(cx2, 200);
449        let ctx3 = RequestContext::new(cx3, 300);
450
451        // Each context has its own request_id
452        assert_eq!(ctx1.request_id(), 100);
453        assert_eq!(ctx2.request_id(), 200);
454        assert_eq!(ctx3.request_id(), 300);
455
456        // Request IDs don't affect each other
457        assert_ne!(ctx1.request_id(), ctx2.request_id());
458        assert_ne!(ctx2.request_id(), ctx3.request_id());
459    }
460
461    #[test]
462    #[allow(clippy::similar_names)]
463    fn dependency_cache_isolation_per_request() {
464        // Test that each RequestContext has its own dependency cache (bd-3st7)
465        let cx1 = Cx::for_testing();
466        let cx2 = Cx::for_testing();
467
468        let ctx1 = RequestContext::new(cx1, 1);
469        let ctx2 = RequestContext::new(cx2, 2);
470
471        // Cache a value in ctx1's dependency cache
472        ctx1.dependency_cache().insert::<i32>(42);
473
474        // ctx2's cache should NOT have this value
475        let value1 = ctx1.dependency_cache().get::<i32>();
476        let value2 = ctx2.dependency_cache().get::<i32>();
477
478        assert!(value1.is_some(), "ctx1 should have cached value");
479        assert_eq!(value1.unwrap(), 42);
480        assert!(value2.is_none(), "ctx2 should NOT have ctx1's cached value");
481    }
482
483    #[test]
484    #[allow(clippy::similar_names)]
485    fn cleanup_stack_isolation_per_request() {
486        // Test that each RequestContext has its own cleanup stack (bd-3st7)
487        use std::sync::atomic::{AtomicUsize, Ordering};
488
489        let cleanup_counter1 = Arc::new(AtomicUsize::new(0));
490        let cleanup_counter2 = Arc::new(AtomicUsize::new(0));
491
492        let cx1 = Cx::for_testing();
493        let cx2 = Cx::for_testing();
494
495        let ctx1 = RequestContext::new(cx1, 1);
496        let ctx2 = RequestContext::new(cx2, 2);
497
498        // Register cleanup for ctx1
499        {
500            let counter = cleanup_counter1.clone();
501            ctx1.cleanup_stack().push(Box::new(move || {
502                Box::pin(async move {
503                    counter.fetch_add(1, Ordering::SeqCst);
504                })
505            }));
506        }
507
508        // Register cleanup for ctx2
509        {
510            let counter = cleanup_counter2.clone();
511            ctx2.cleanup_stack().push(Box::new(move || {
512                Box::pin(async move {
513                    counter.fetch_add(1, Ordering::SeqCst);
514                })
515            }));
516        }
517
518        // Run ctx1's cleanups
519        futures_executor::block_on(ctx1.cleanup_stack().run_cleanups());
520
521        // Only ctx1's cleanup should have run
522        assert_eq!(
523            cleanup_counter1.load(Ordering::SeqCst),
524            1,
525            "ctx1 cleanup should have run"
526        );
527        assert_eq!(
528            cleanup_counter2.load(Ordering::SeqCst),
529            0,
530            "ctx2 cleanup should NOT have run"
531        );
532
533        // Now run ctx2's cleanups
534        futures_executor::block_on(ctx2.cleanup_stack().run_cleanups());
535        assert_eq!(
536            cleanup_counter2.load(Ordering::SeqCst),
537            1,
538            "ctx2 cleanup should have run"
539        );
540    }
541
542    #[test]
543    #[allow(clippy::similar_names)]
544    fn cx_cancellation_isolation_per_request() {
545        // Test that cancelling one request's Cx doesn't affect others (bd-3st7)
546        let cx1 = Cx::for_testing();
547        let cx2 = Cx::for_testing();
548        let cx3 = Cx::for_testing();
549
550        let ctx1 = RequestContext::new(cx1, 1);
551        let ctx2 = RequestContext::new(cx2, 2);
552        let ctx3 = RequestContext::new(cx3, 3);
553
554        // Initially none are cancelled
555        assert!(ctx1.checkpoint().is_ok(), "ctx1 should not be cancelled");
556        assert!(ctx2.checkpoint().is_ok(), "ctx2 should not be cancelled");
557        assert!(ctx3.checkpoint().is_ok(), "ctx3 should not be cancelled");
558
559        // Cancel ctx2 only
560        ctx2.cx().set_cancel_requested(true);
561
562        // Only ctx2 should be cancelled
563        assert!(
564            ctx1.checkpoint().is_ok(),
565            "ctx1 should still not be cancelled"
566        );
567        assert!(ctx2.checkpoint().is_err(), "ctx2 should be cancelled");
568        assert!(
569            ctx3.checkpoint().is_ok(),
570            "ctx3 should still not be cancelled"
571        );
572    }
573
574    #[test]
575    #[allow(clippy::similar_names)]
576    fn body_limit_isolation_per_request() {
577        // Test that body limits are per-request (bd-3st7)
578        let cx1 = Cx::for_testing();
579        let cx2 = Cx::for_testing();
580
581        // Create contexts with different body limits
582        let ctx1 = RequestContext::with_body_limit(cx1, 1, 1024); // 1KB limit
583        let ctx2 = RequestContext::with_body_limit(cx2, 2, 1024 * 1024); // 1MB limit
584
585        // Each has its own limit
586        assert_eq!(ctx1.max_body_size(), 1024);
587        assert_eq!(ctx2.max_body_size(), 1024 * 1024);
588
589        // They don't affect each other
590        assert_ne!(ctx1.max_body_size(), ctx2.max_body_size());
591    }
592
593    #[test]
594    fn concurrent_requests_fully_isolated() {
595        // Simulate concurrent requests with different values and verify isolation (bd-3st7)
596        use std::thread;
597
598        const NUM_REQUESTS: usize = 100;
599        let results = Arc::new(parking_lot::Mutex::new(Vec::with_capacity(NUM_REQUESTS)));
600
601        let handles: Vec<_> = (0..NUM_REQUESTS)
602            .map(|i| {
603                let results = results.clone();
604                thread::spawn(move || {
605                    let cx = Cx::for_testing();
606                    let request_id = (i + 1) as u64 * 1000; // Unique ID per "request"
607                    let ctx = RequestContext::new(cx, request_id);
608
609                    // Cache a value unique to this request
610                    ctx.dependency_cache().insert::<u64>(request_id);
611
612                    // Verify we can retrieve our own value
613                    let cached = ctx.dependency_cache().get::<u64>();
614                    let retrieved = cached.unwrap_or(0);
615
616                    results.lock().push((request_id, retrieved));
617                })
618            })
619            .collect();
620
621        // Wait for all threads to complete
622        for handle in handles {
623            handle.join().expect("Thread panicked");
624        }
625
626        // Verify each request got exactly its own values
627        let results = results.lock();
628        assert_eq!(results.len(), NUM_REQUESTS);
629
630        for (request_id, retrieved) in results.iter() {
631            assert_eq!(
632                request_id, retrieved,
633                "Request {request_id} should retrieve its own cached value, not another request's"
634            );
635        }
636    }
637
638    #[test]
639    #[allow(clippy::similar_names)]
640    fn resolution_stack_isolation_per_request() {
641        // Test that resolution stacks are per-request for cycle detection (bd-3st7)
642        use crate::dependency::DependencyScope;
643
644        let cx1 = Cx::for_testing();
645        let cx2 = Cx::for_testing();
646
647        let ctx1 = RequestContext::new(cx1, 1);
648        let ctx2 = RequestContext::new(cx2, 2);
649
650        // Push i32 onto ctx1's resolution stack
651        ctx1.resolution_stack()
652            .push::<i32>("i32", DependencyScope::Request);
653
654        // ctx1 should detect the cycle when pushing i32 again
655        let cycle1 = ctx1.resolution_stack().check_cycle::<i32>("i32");
656        assert!(cycle1.is_some(), "ctx1 should detect cycle for i32");
657
658        // ctx2's resolution stack should be independent - no cycle
659        let cycle2 = ctx2.resolution_stack().check_cycle::<i32>("i32");
660        assert!(
661            cycle2.is_none(),
662            "ctx2 should NOT see ctx1's resolution stack"
663        );
664
665        // ctx2 can push i32 independently
666        ctx2.resolution_stack()
667            .push::<i32>("i32", DependencyScope::Request);
668        assert_eq!(ctx2.resolution_stack().depth(), 1);
669
670        // Both stacks are independent
671        assert_eq!(ctx1.resolution_stack().depth(), 1);
672        assert_eq!(ctx2.resolution_stack().depth(), 1);
673
674        // Clean up
675        ctx1.resolution_stack().pop();
676        ctx2.resolution_stack().pop();
677        assert!(ctx1.resolution_stack().is_empty());
678        assert!(ctx2.resolution_stack().is_empty());
679    }
680}