Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37
38use sqlx::postgres::{PgArguments, PgQueryResult, PgRow};
39use sqlx::{FromRow, Postgres, Transaction};
40use tokio::sync::Mutex as AsyncMutex;
41use uuid::Uuid;
42
43use tracing::Instrument;
44
45use super::dispatch::{JobDispatch, WorkflowDispatch};
46use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
47use crate::http::CircuitBreakerClient;
48use crate::job::JobInfo;
49
50/// Abstracts over pool and transaction connections so handlers can work with either.
51pub enum DbConn<'a> {
52    Pool(&'a sqlx::PgPool),
53    Transaction(Arc<AsyncMutex<Transaction<'static, Postgres>>>),
54}
55
56impl DbConn<'_> {
57    pub async fn fetch_one<'q, O>(
58        &self,
59        query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
60    ) -> sqlx::Result<O>
61    where
62        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
63    {
64        let span = tracing::info_span!(
65            "db.query",
66            db.system = "postgresql",
67            db.operation.name = "fetch_one",
68        );
69        async {
70            match self {
71                DbConn::Pool(pool) => query.fetch_one(*pool).await,
72                DbConn::Transaction(tx) => query.fetch_one(&mut **tx.lock().await).await,
73            }
74        }
75        .instrument(span)
76        .await
77    }
78
79    pub async fn fetch_optional<'q, O>(
80        &self,
81        query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
82    ) -> sqlx::Result<Option<O>>
83    where
84        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
85    {
86        let span = tracing::info_span!(
87            "db.query",
88            db.system = "postgresql",
89            db.operation.name = "fetch_optional",
90        );
91        async {
92            match self {
93                DbConn::Pool(pool) => query.fetch_optional(*pool).await,
94                DbConn::Transaction(tx) => query.fetch_optional(&mut **tx.lock().await).await,
95            }
96        }
97        .instrument(span)
98        .await
99    }
100
101    pub async fn fetch_all<'q, O>(
102        &self,
103        query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
104    ) -> sqlx::Result<Vec<O>>
105    where
106        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
107    {
108        let span = tracing::info_span!(
109            "db.query",
110            db.system = "postgresql",
111            db.operation.name = "fetch_all",
112        );
113        async {
114            match self {
115                DbConn::Pool(pool) => query.fetch_all(*pool).await,
116                DbConn::Transaction(tx) => query.fetch_all(&mut **tx.lock().await).await,
117            }
118        }
119        .instrument(span)
120        .await
121    }
122
123    pub async fn execute<'q>(
124        &self,
125        query: sqlx::query::Query<'q, Postgres, PgArguments>,
126    ) -> sqlx::Result<PgQueryResult> {
127        let span = tracing::info_span!(
128            "db.query",
129            db.system = "postgresql",
130            db.operation.name = "execute",
131        );
132        async {
133            match self {
134                DbConn::Pool(pool) => query.execute(*pool).await,
135                DbConn::Transaction(tx) => query.execute(&mut **tx.lock().await).await,
136            }
137        }
138        .instrument(span)
139        .await
140    }
141}
142
143#[derive(Debug, Clone)]
144pub struct PendingJob {
145    pub id: Uuid,
146    pub job_type: String,
147    pub args: serde_json::Value,
148    pub context: serde_json::Value,
149    pub owner_subject: Option<String>,
150    pub priority: i32,
151    pub max_attempts: i32,
152    pub worker_capability: Option<String>,
153}
154
155#[derive(Debug, Clone)]
156pub struct PendingWorkflow {
157    pub id: Uuid,
158    pub workflow_name: String,
159    pub input: serde_json::Value,
160    pub owner_subject: Option<String>,
161}
162
163#[derive(Default)]
164pub struct OutboxBuffer {
165    pub jobs: Vec<PendingJob>,
166    pub workflows: Vec<PendingWorkflow>,
167}
168
169/// Authentication context available to all functions.
170#[derive(Debug, Clone)]
171pub struct AuthContext {
172    /// The authenticated user ID (if any).
173    user_id: Option<Uuid>,
174    /// User roles.
175    roles: Vec<String>,
176    /// Custom claims from JWT.
177    claims: HashMap<String, serde_json::Value>,
178    /// Whether the request is authenticated.
179    authenticated: bool,
180}
181
182impl AuthContext {
183    /// Create an unauthenticated context.
184    pub fn unauthenticated() -> Self {
185        Self {
186            user_id: None,
187            roles: Vec::new(),
188            claims: HashMap::new(),
189            authenticated: false,
190        }
191    }
192
193    /// Create an authenticated context with a UUID user ID.
194    pub fn authenticated(
195        user_id: Uuid,
196        roles: Vec<String>,
197        claims: HashMap<String, serde_json::Value>,
198    ) -> Self {
199        Self {
200            user_id: Some(user_id),
201            roles,
202            claims,
203            authenticated: true,
204        }
205    }
206
207    /// Create an authenticated context without requiring a UUID user ID.
208    ///
209    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
210    /// Clerk). The raw subject string is available via `subject()` method
211    /// from the "sub" claim.
212    pub fn authenticated_without_uuid(
213        roles: Vec<String>,
214        claims: HashMap<String, serde_json::Value>,
215    ) -> Self {
216        Self {
217            user_id: None,
218            roles,
219            claims,
220            authenticated: true,
221        }
222    }
223
224    /// Check if the user is authenticated.
225    pub fn is_authenticated(&self) -> bool {
226        self.authenticated
227    }
228
229    /// Get the user ID if authenticated.
230    pub fn user_id(&self) -> Option<Uuid> {
231        self.user_id
232    }
233
234    /// Get the user ID, returning an error if not authenticated.
235    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
236        self.user_id
237            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
238    }
239
240    /// Check if the user has a specific role.
241    pub fn has_role(&self, role: &str) -> bool {
242        self.roles.iter().any(|r| r == role)
243    }
244
245    /// Require a specific role, returning an error if not present.
246    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
247        if self.has_role(role) {
248            Ok(())
249        } else {
250            Err(crate::error::ForgeError::Forbidden(format!(
251                "Required role '{}' not present",
252                role
253            )))
254        }
255    }
256
257    /// Get a custom claim value.
258    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
259        self.claims.get(key)
260    }
261
262    /// Get all custom claims.
263    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
264        &self.claims
265    }
266
267    /// Get all roles.
268    pub fn roles(&self) -> &[String] {
269        &self.roles
270    }
271
272    /// Get the raw subject claim.
273    ///
274    /// This works with any provider's subject format (UUID, email, custom ID).
275    /// For providers like Firebase or Clerk that don't use UUIDs, use this
276    /// instead of `user_id()`.
277    pub fn subject(&self) -> Option<&str> {
278        self.claims.get("sub").and_then(|v| v.as_str())
279    }
280
281    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
282    pub fn require_subject(&self) -> crate::error::Result<&str> {
283        if !self.authenticated {
284            return Err(crate::error::ForgeError::Unauthorized(
285                "Authentication required".to_string(),
286            ));
287        }
288        self.subject().ok_or_else(|| {
289            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
290        })
291    }
292
293    /// Get a stable principal identifier for access control and cache scoping.
294    ///
295    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
296    pub fn principal_id(&self) -> Option<String> {
297        self.subject()
298            .map(ToString::to_string)
299            .or_else(|| self.user_id.map(|id| id.to_string()))
300    }
301
302    /// Check whether this principal should be treated as privileged admin.
303    pub fn is_admin(&self) -> bool {
304        self.roles.iter().any(|r| r == "admin")
305    }
306}
307
308/// Request metadata available to all functions.
309#[derive(Debug, Clone)]
310pub struct RequestMetadata {
311    /// Unique request ID for tracing.
312    pub request_id: Uuid,
313    /// Trace ID for distributed tracing.
314    pub trace_id: String,
315    /// Client IP address.
316    pub client_ip: Option<String>,
317    /// User agent string.
318    pub user_agent: Option<String>,
319    /// Request timestamp.
320    pub timestamp: chrono::DateTime<chrono::Utc>,
321}
322
323impl RequestMetadata {
324    /// Create new request metadata.
325    pub fn new() -> Self {
326        Self {
327            request_id: Uuid::new_v4(),
328            trace_id: Uuid::new_v4().to_string(),
329            client_ip: None,
330            user_agent: None,
331            timestamp: chrono::Utc::now(),
332        }
333    }
334
335    /// Create with a specific trace ID.
336    pub fn with_trace_id(trace_id: String) -> Self {
337        Self {
338            request_id: Uuid::new_v4(),
339            trace_id,
340            client_ip: None,
341            user_agent: None,
342            timestamp: chrono::Utc::now(),
343        }
344    }
345}
346
347impl Default for RequestMetadata {
348    fn default() -> Self {
349        Self::new()
350    }
351}
352
353/// Context for query functions (read-only database access).
354pub struct QueryContext {
355    /// Authentication context.
356    pub auth: AuthContext,
357    /// Request metadata.
358    pub request: RequestMetadata,
359    /// Database pool for read operations.
360    db_pool: sqlx::PgPool,
361    /// Environment variable provider.
362    env_provider: Arc<dyn EnvProvider>,
363}
364
365impl QueryContext {
366    /// Create a new query context.
367    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
368        Self {
369            auth,
370            request,
371            db_pool,
372            env_provider: Arc::new(RealEnvProvider::new()),
373        }
374    }
375
376    /// Create a query context with a custom environment provider.
377    pub fn with_env(
378        db_pool: sqlx::PgPool,
379        auth: AuthContext,
380        request: RequestMetadata,
381        env_provider: Arc<dyn EnvProvider>,
382    ) -> Self {
383        Self {
384            auth,
385            request,
386            db_pool,
387            env_provider,
388        }
389    }
390
391    pub fn db(&self) -> &sqlx::PgPool {
392        &self.db_pool
393    }
394
395    /// Returns a `DbConn` wrapping the pool, allowing shared helper functions
396    /// that accept `DbConn` to work with both query and mutation contexts.
397    pub fn db_conn(&self) -> DbConn<'_> {
398        DbConn::Pool(&self.db_pool)
399    }
400
401    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
402        self.auth.require_user_id()
403    }
404
405    /// Like `require_user_id()` but for non-UUID auth providers.
406    pub fn require_subject(&self) -> crate::error::Result<&str> {
407        self.auth.require_subject()
408    }
409}
410
411impl EnvAccess for QueryContext {
412    fn env_provider(&self) -> &dyn EnvProvider {
413        self.env_provider.as_ref()
414    }
415}
416
417/// Callback type for looking up job info by name.
418pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
419
420/// Context for mutation functions (transactional database access).
421pub struct MutationContext {
422    /// Authentication context.
423    pub auth: AuthContext,
424    /// Request metadata.
425    pub request: RequestMetadata,
426    /// Database pool for transactional operations.
427    db_pool: sqlx::PgPool,
428    /// HTTP client with circuit breaker for external requests.
429    http_client: CircuitBreakerClient,
430    /// Optional job dispatcher for dispatching background jobs.
431    job_dispatch: Option<Arc<dyn JobDispatch>>,
432    /// Optional workflow dispatcher for starting workflows.
433    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
434    /// Environment variable provider.
435    env_provider: Arc<dyn EnvProvider>,
436    /// Transaction handle for transactional mutations.
437    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
438    /// Outbox buffer for jobs/workflows dispatched during transaction.
439    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
440    /// Job info lookup for transactional dispatch.
441    job_info_lookup: Option<JobInfoLookup>,
442}
443
444impl MutationContext {
445    /// Create a new mutation context.
446    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
447        Self {
448            auth,
449            request,
450            db_pool,
451            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
452            job_dispatch: None,
453            workflow_dispatch: None,
454            env_provider: Arc::new(RealEnvProvider::new()),
455            tx: None,
456            outbox: None,
457            job_info_lookup: None,
458        }
459    }
460
461    /// Create a mutation context with dispatch capabilities.
462    pub fn with_dispatch(
463        db_pool: sqlx::PgPool,
464        auth: AuthContext,
465        request: RequestMetadata,
466        http_client: CircuitBreakerClient,
467        job_dispatch: Option<Arc<dyn JobDispatch>>,
468        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
469    ) -> Self {
470        Self {
471            auth,
472            request,
473            db_pool,
474            http_client,
475            job_dispatch,
476            workflow_dispatch,
477            env_provider: Arc::new(RealEnvProvider::new()),
478            tx: None,
479            outbox: None,
480            job_info_lookup: None,
481        }
482    }
483
484    /// Create a mutation context with a custom environment provider.
485    pub fn with_env(
486        db_pool: sqlx::PgPool,
487        auth: AuthContext,
488        request: RequestMetadata,
489        http_client: CircuitBreakerClient,
490        job_dispatch: Option<Arc<dyn JobDispatch>>,
491        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
492        env_provider: Arc<dyn EnvProvider>,
493    ) -> Self {
494        Self {
495            auth,
496            request,
497            db_pool,
498            http_client,
499            job_dispatch,
500            workflow_dispatch,
501            env_provider,
502            tx: None,
503            outbox: None,
504            job_info_lookup: None,
505        }
506    }
507
508    /// Returns handles to transaction and outbox for the caller to commit/flush.
509    #[allow(clippy::type_complexity)]
510    pub fn with_transaction(
511        db_pool: sqlx::PgPool,
512        tx: Transaction<'static, Postgres>,
513        auth: AuthContext,
514        request: RequestMetadata,
515        http_client: CircuitBreakerClient,
516        job_info_lookup: JobInfoLookup,
517    ) -> (
518        Self,
519        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
520        Arc<Mutex<OutboxBuffer>>,
521    ) {
522        let tx_handle = Arc::new(AsyncMutex::new(tx));
523        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
524
525        let ctx = Self {
526            auth,
527            request,
528            db_pool,
529            http_client,
530            job_dispatch: None,
531            workflow_dispatch: None,
532            env_provider: Arc::new(RealEnvProvider::new()),
533            tx: Some(tx_handle.clone()),
534            outbox: Some(outbox.clone()),
535            job_info_lookup: Some(job_info_lookup),
536        };
537
538        (ctx, tx_handle, outbox)
539    }
540
541    pub fn is_transactional(&self) -> bool {
542        self.tx.is_some()
543    }
544
545    pub fn db(&self) -> DbConn<'_> {
546        match &self.tx {
547            Some(tx) => DbConn::Transaction(tx.clone()),
548            None => DbConn::Pool(&self.db_pool),
549        }
550    }
551
552    /// Direct pool access for operations that cannot run inside a transaction.
553    pub fn pool(&self) -> &sqlx::PgPool {
554        &self.db_pool
555    }
556
557    /// Get the HTTP client for external requests.
558    ///
559    /// The client includes circuit breaker protection that tracks failure rates
560    /// per host. After repeated failures, requests fail fast to prevent cascade
561    /// failures when downstream services are unhealthy.
562    pub fn http(&self) -> &reqwest::Client {
563        self.http_client.inner()
564    }
565
566    /// Get the circuit breaker client directly for advanced usage.
567    pub fn http_with_circuit_breaker(&self) -> &CircuitBreakerClient {
568        &self.http_client
569    }
570
571    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
572        self.auth.require_user_id()
573    }
574
575    pub fn require_subject(&self) -> crate::error::Result<&str> {
576        self.auth.require_subject()
577    }
578
579    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
580    pub async fn dispatch_job<T: serde::Serialize>(
581        &self,
582        job_type: &str,
583        args: T,
584    ) -> crate::error::Result<Uuid> {
585        let args_json = serde_json::to_value(args)?;
586
587        // Transactional mode: buffer the job for atomic commit
588        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
589            let job_info = job_info_lookup(job_type).ok_or_else(|| {
590                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
591            })?;
592
593            let pending = PendingJob {
594                id: Uuid::new_v4(),
595                job_type: job_type.to_string(),
596                args: args_json,
597                context: serde_json::json!({}),
598                owner_subject: self.auth.principal_id(),
599                priority: job_info.priority.as_i32(),
600                max_attempts: job_info.retry.max_attempts as i32,
601                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
602            };
603
604            let job_id = pending.id;
605            outbox
606                .lock()
607                .expect("outbox lock poisoned")
608                .jobs
609                .push(pending);
610            return Ok(job_id);
611        }
612
613        // Non-transactional mode: dispatch immediately
614        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
615            crate::error::ForgeError::Internal("Job dispatch not available".into())
616        })?;
617        dispatcher
618            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
619            .await
620    }
621
622    /// Dispatch a job with initial context.
623    pub async fn dispatch_job_with_context<T: serde::Serialize>(
624        &self,
625        job_type: &str,
626        args: T,
627        context: serde_json::Value,
628    ) -> crate::error::Result<Uuid> {
629        let args_json = serde_json::to_value(args)?;
630
631        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
632            let job_info = job_info_lookup(job_type).ok_or_else(|| {
633                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
634            })?;
635
636            let pending = PendingJob {
637                id: Uuid::new_v4(),
638                job_type: job_type.to_string(),
639                args: args_json,
640                context,
641                owner_subject: self.auth.principal_id(),
642                priority: job_info.priority.as_i32(),
643                max_attempts: job_info.retry.max_attempts as i32,
644                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
645            };
646
647            let job_id = pending.id;
648            outbox
649                .lock()
650                .expect("outbox lock poisoned")
651                .jobs
652                .push(pending);
653            return Ok(job_id);
654        }
655
656        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
657            crate::error::ForgeError::Internal("Job dispatch not available".into())
658        })?;
659        dispatcher
660            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
661            .await
662    }
663
664    /// Request cancellation for a job.
665    pub async fn cancel_job(
666        &self,
667        job_id: Uuid,
668        reason: Option<String>,
669    ) -> crate::error::Result<bool> {
670        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
671            crate::error::ForgeError::Internal("Job dispatch not available".into())
672        })?;
673        dispatcher.cancel(job_id, reason).await
674    }
675
676    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
677    pub async fn start_workflow<T: serde::Serialize>(
678        &self,
679        workflow_name: &str,
680        input: T,
681    ) -> crate::error::Result<Uuid> {
682        let input_json = serde_json::to_value(input)?;
683
684        // Transactional mode: buffer the workflow for atomic commit
685        if let Some(outbox) = &self.outbox {
686            let pending = PendingWorkflow {
687                id: Uuid::new_v4(),
688                workflow_name: workflow_name.to_string(),
689                input: input_json,
690                owner_subject: self.auth.principal_id(),
691            };
692
693            let workflow_id = pending.id;
694            outbox
695                .lock()
696                .expect("outbox lock poisoned")
697                .workflows
698                .push(pending);
699            return Ok(workflow_id);
700        }
701
702        // Non-transactional mode: start immediately
703        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
704            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
705        })?;
706        dispatcher
707            .start_by_name(workflow_name, input_json, self.auth.principal_id())
708            .await
709    }
710}
711
712impl EnvAccess for MutationContext {
713    fn env_provider(&self) -> &dyn EnvProvider {
714        self.env_provider.as_ref()
715    }
716}
717
718#[cfg(test)]
719#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn test_auth_context_unauthenticated() {
725        let ctx = AuthContext::unauthenticated();
726        assert!(!ctx.is_authenticated());
727        assert!(ctx.user_id().is_none());
728        assert!(ctx.require_user_id().is_err());
729    }
730
731    #[test]
732    fn test_auth_context_authenticated() {
733        let user_id = Uuid::new_v4();
734        let ctx = AuthContext::authenticated(
735            user_id,
736            vec!["admin".to_string(), "user".to_string()],
737            HashMap::new(),
738        );
739
740        assert!(ctx.is_authenticated());
741        assert_eq!(ctx.user_id(), Some(user_id));
742        assert!(ctx.require_user_id().is_ok());
743        assert!(ctx.has_role("admin"));
744        assert!(ctx.has_role("user"));
745        assert!(!ctx.has_role("superadmin"));
746        assert!(ctx.require_role("admin").is_ok());
747        assert!(ctx.require_role("superadmin").is_err());
748    }
749
750    #[test]
751    fn test_auth_context_with_claims() {
752        let mut claims = HashMap::new();
753        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
754
755        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
756
757        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
758        assert!(ctx.claim("nonexistent").is_none());
759    }
760
761    #[test]
762    fn test_request_metadata() {
763        let meta = RequestMetadata::new();
764        assert!(!meta.trace_id.is_empty());
765        assert!(meta.client_ip.is_none());
766
767        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
768        assert_eq!(meta2.trace_id, "trace-123");
769    }
770}