Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37
38use sqlx::postgres::{PgArguments, PgQueryResult, PgRow};
39use sqlx::{FromRow, Postgres, Transaction};
40use tokio::sync::Mutex as AsyncMutex;
41use uuid::Uuid;
42
43use tracing::Instrument;
44
45use super::dispatch::{JobDispatch, WorkflowDispatch};
46use crate::auth::Claims;
47use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
48use crate::http::CircuitBreakerClient;
49use crate::job::JobInfo;
50
51/// Token issuer for signing JWTs.
52///
53/// Implemented by the runtime when HMAC auth is configured.
54/// Available via `ctx.issue_token()` in mutation handlers.
55pub trait TokenIssuer: Send + Sync {
56    /// Sign the given claims into a JWT string.
57    fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
58}
59
60/// Abstracts over pool and transaction connections so handlers can work with either.
61pub enum DbConn<'a> {
62    Pool(&'a sqlx::PgPool),
63    Transaction(Arc<AsyncMutex<Transaction<'static, Postgres>>>),
64}
65
66impl DbConn<'_> {
67    pub async fn fetch_one<'q, O>(
68        &self,
69        query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
70    ) -> sqlx::Result<O>
71    where
72        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
73    {
74        let span = tracing::info_span!(
75            "db.query",
76            db.system = "postgresql",
77            db.operation.name = "fetch_one",
78        );
79        async {
80            match self {
81                DbConn::Pool(pool) => query.fetch_one(*pool).await,
82                DbConn::Transaction(tx) => query.fetch_one(&mut **tx.lock().await).await,
83            }
84        }
85        .instrument(span)
86        .await
87    }
88
89    pub async fn fetch_optional<'q, O>(
90        &self,
91        query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
92    ) -> sqlx::Result<Option<O>>
93    where
94        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
95    {
96        let span = tracing::info_span!(
97            "db.query",
98            db.system = "postgresql",
99            db.operation.name = "fetch_optional",
100        );
101        async {
102            match self {
103                DbConn::Pool(pool) => query.fetch_optional(*pool).await,
104                DbConn::Transaction(tx) => query.fetch_optional(&mut **tx.lock().await).await,
105            }
106        }
107        .instrument(span)
108        .await
109    }
110
111    pub async fn fetch_all<'q, O>(
112        &self,
113        query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
114    ) -> sqlx::Result<Vec<O>>
115    where
116        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
117    {
118        let span = tracing::info_span!(
119            "db.query",
120            db.system = "postgresql",
121            db.operation.name = "fetch_all",
122        );
123        async {
124            match self {
125                DbConn::Pool(pool) => query.fetch_all(*pool).await,
126                DbConn::Transaction(tx) => query.fetch_all(&mut **tx.lock().await).await,
127            }
128        }
129        .instrument(span)
130        .await
131    }
132
133    pub async fn execute<'q>(
134        &self,
135        query: sqlx::query::Query<'q, Postgres, PgArguments>,
136    ) -> sqlx::Result<PgQueryResult> {
137        let span = tracing::info_span!(
138            "db.query",
139            db.system = "postgresql",
140            db.operation.name = "execute",
141        );
142        async {
143            match self {
144                DbConn::Pool(pool) => query.execute(*pool).await,
145                DbConn::Transaction(tx) => query.execute(&mut **tx.lock().await).await,
146            }
147        }
148        .instrument(span)
149        .await
150    }
151}
152
153#[derive(Debug, Clone)]
154pub struct PendingJob {
155    pub id: Uuid,
156    pub job_type: String,
157    pub args: serde_json::Value,
158    pub context: serde_json::Value,
159    pub owner_subject: Option<String>,
160    pub priority: i32,
161    pub max_attempts: i32,
162    pub worker_capability: Option<String>,
163}
164
165#[derive(Debug, Clone)]
166pub struct PendingWorkflow {
167    pub id: Uuid,
168    pub workflow_name: String,
169    pub input: serde_json::Value,
170    pub owner_subject: Option<String>,
171}
172
173#[derive(Default)]
174pub struct OutboxBuffer {
175    pub jobs: Vec<PendingJob>,
176    pub workflows: Vec<PendingWorkflow>,
177}
178
179/// Authentication context available to all functions.
180#[derive(Debug, Clone)]
181pub struct AuthContext {
182    /// The authenticated user ID (if any).
183    user_id: Option<Uuid>,
184    /// User roles.
185    roles: Vec<String>,
186    /// Custom claims from JWT.
187    claims: HashMap<String, serde_json::Value>,
188    /// Whether the request is authenticated.
189    authenticated: bool,
190}
191
192impl AuthContext {
193    /// Create an unauthenticated context.
194    pub fn unauthenticated() -> Self {
195        Self {
196            user_id: None,
197            roles: Vec::new(),
198            claims: HashMap::new(),
199            authenticated: false,
200        }
201    }
202
203    /// Create an authenticated context with a UUID user ID.
204    pub fn authenticated(
205        user_id: Uuid,
206        roles: Vec<String>,
207        claims: HashMap<String, serde_json::Value>,
208    ) -> Self {
209        Self {
210            user_id: Some(user_id),
211            roles,
212            claims,
213            authenticated: true,
214        }
215    }
216
217    /// Create an authenticated context without requiring a UUID user ID.
218    ///
219    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
220    /// Clerk). The raw subject string is available via `subject()` method
221    /// from the "sub" claim.
222    pub fn authenticated_without_uuid(
223        roles: Vec<String>,
224        claims: HashMap<String, serde_json::Value>,
225    ) -> Self {
226        Self {
227            user_id: None,
228            roles,
229            claims,
230            authenticated: true,
231        }
232    }
233
234    /// Check if the user is authenticated.
235    pub fn is_authenticated(&self) -> bool {
236        self.authenticated
237    }
238
239    /// Get the user ID if authenticated.
240    pub fn user_id(&self) -> Option<Uuid> {
241        self.user_id
242    }
243
244    /// Get the user ID, returning an error if not authenticated.
245    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
246        self.user_id
247            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
248    }
249
250    /// Check if the user has a specific role.
251    pub fn has_role(&self, role: &str) -> bool {
252        self.roles.iter().any(|r| r == role)
253    }
254
255    /// Require a specific role, returning an error if not present.
256    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
257        if self.has_role(role) {
258            Ok(())
259        } else {
260            Err(crate::error::ForgeError::Forbidden(format!(
261                "Required role '{}' not present",
262                role
263            )))
264        }
265    }
266
267    /// Get a custom claim value.
268    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
269        self.claims.get(key)
270    }
271
272    /// Get all custom claims.
273    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
274        &self.claims
275    }
276
277    /// Get all roles.
278    pub fn roles(&self) -> &[String] {
279        &self.roles
280    }
281
282    /// Get the raw subject claim.
283    ///
284    /// This works with any provider's subject format (UUID, email, custom ID).
285    /// For providers like Firebase or Clerk that don't use UUIDs, use this
286    /// instead of `user_id()`.
287    pub fn subject(&self) -> Option<&str> {
288        self.claims.get("sub").and_then(|v| v.as_str())
289    }
290
291    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
292    pub fn require_subject(&self) -> crate::error::Result<&str> {
293        if !self.authenticated {
294            return Err(crate::error::ForgeError::Unauthorized(
295                "Authentication required".to_string(),
296            ));
297        }
298        self.subject().ok_or_else(|| {
299            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
300        })
301    }
302
303    /// Get a stable principal identifier for access control and cache scoping.
304    ///
305    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
306    pub fn principal_id(&self) -> Option<String> {
307        self.subject()
308            .map(ToString::to_string)
309            .or_else(|| self.user_id.map(|id| id.to_string()))
310    }
311
312    /// Check whether this principal should be treated as privileged admin.
313    pub fn is_admin(&self) -> bool {
314        self.roles.iter().any(|r| r == "admin")
315    }
316}
317
318/// Request metadata available to all functions.
319#[derive(Debug, Clone)]
320pub struct RequestMetadata {
321    /// Unique request ID for tracing.
322    pub request_id: Uuid,
323    /// Trace ID for distributed tracing.
324    pub trace_id: String,
325    /// Client IP address.
326    pub client_ip: Option<String>,
327    /// User agent string.
328    pub user_agent: Option<String>,
329    /// Request timestamp.
330    pub timestamp: chrono::DateTime<chrono::Utc>,
331}
332
333impl RequestMetadata {
334    /// Create new request metadata.
335    pub fn new() -> Self {
336        Self {
337            request_id: Uuid::new_v4(),
338            trace_id: Uuid::new_v4().to_string(),
339            client_ip: None,
340            user_agent: None,
341            timestamp: chrono::Utc::now(),
342        }
343    }
344
345    /// Create with a specific trace ID.
346    pub fn with_trace_id(trace_id: String) -> Self {
347        Self {
348            request_id: Uuid::new_v4(),
349            trace_id,
350            client_ip: None,
351            user_agent: None,
352            timestamp: chrono::Utc::now(),
353        }
354    }
355}
356
357impl Default for RequestMetadata {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363/// Context for query functions (read-only database access).
364pub struct QueryContext {
365    /// Authentication context.
366    pub auth: AuthContext,
367    /// Request metadata.
368    pub request: RequestMetadata,
369    /// Database pool for read operations.
370    db_pool: sqlx::PgPool,
371    /// Environment variable provider.
372    env_provider: Arc<dyn EnvProvider>,
373}
374
375impl QueryContext {
376    /// Create a new query context.
377    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
378        Self {
379            auth,
380            request,
381            db_pool,
382            env_provider: Arc::new(RealEnvProvider::new()),
383        }
384    }
385
386    /// Create a query context with a custom environment provider.
387    pub fn with_env(
388        db_pool: sqlx::PgPool,
389        auth: AuthContext,
390        request: RequestMetadata,
391        env_provider: Arc<dyn EnvProvider>,
392    ) -> Self {
393        Self {
394            auth,
395            request,
396            db_pool,
397            env_provider,
398        }
399    }
400
401    pub fn db(&self) -> &sqlx::PgPool {
402        &self.db_pool
403    }
404
405    /// Returns a `DbConn` wrapping the pool, allowing shared helper functions
406    /// that accept `DbConn` to work with both query and mutation contexts.
407    pub fn db_conn(&self) -> DbConn<'_> {
408        DbConn::Pool(&self.db_pool)
409    }
410
411    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
412        self.auth.require_user_id()
413    }
414
415    /// Like `require_user_id()` but for non-UUID auth providers.
416    pub fn require_subject(&self) -> crate::error::Result<&str> {
417        self.auth.require_subject()
418    }
419}
420
421impl EnvAccess for QueryContext {
422    fn env_provider(&self) -> &dyn EnvProvider {
423        self.env_provider.as_ref()
424    }
425}
426
427/// Callback type for looking up job info by name.
428pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
429
430/// Context for mutation functions (transactional database access).
431pub struct MutationContext {
432    /// Authentication context.
433    pub auth: AuthContext,
434    /// Request metadata.
435    pub request: RequestMetadata,
436    /// Database pool for transactional operations.
437    db_pool: sqlx::PgPool,
438    /// HTTP client with circuit breaker for external requests.
439    http_client: CircuitBreakerClient,
440    /// Optional job dispatcher for dispatching background jobs.
441    job_dispatch: Option<Arc<dyn JobDispatch>>,
442    /// Optional workflow dispatcher for starting workflows.
443    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
444    /// Environment variable provider.
445    env_provider: Arc<dyn EnvProvider>,
446    /// Transaction handle for transactional mutations.
447    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
448    /// Outbox buffer for jobs/workflows dispatched during transaction.
449    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
450    /// Job info lookup for transactional dispatch.
451    job_info_lookup: Option<JobInfoLookup>,
452    /// Optional token issuer for signing JWTs (available when HMAC auth is configured).
453    token_issuer: Option<Arc<dyn TokenIssuer>>,
454}
455
456impl MutationContext {
457    /// Create a new mutation context.
458    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
459        Self {
460            auth,
461            request,
462            db_pool,
463            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
464            job_dispatch: None,
465            workflow_dispatch: None,
466            env_provider: Arc::new(RealEnvProvider::new()),
467            tx: None,
468            outbox: None,
469            job_info_lookup: None,
470            token_issuer: None,
471        }
472    }
473
474    /// Create a mutation context with dispatch capabilities.
475    pub fn with_dispatch(
476        db_pool: sqlx::PgPool,
477        auth: AuthContext,
478        request: RequestMetadata,
479        http_client: CircuitBreakerClient,
480        job_dispatch: Option<Arc<dyn JobDispatch>>,
481        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
482    ) -> Self {
483        Self {
484            auth,
485            request,
486            db_pool,
487            http_client,
488            job_dispatch,
489            workflow_dispatch,
490            env_provider: Arc::new(RealEnvProvider::new()),
491            tx: None,
492            outbox: None,
493            job_info_lookup: None,
494            token_issuer: None,
495        }
496    }
497
498    /// Create a mutation context with a custom environment provider.
499    pub fn with_env(
500        db_pool: sqlx::PgPool,
501        auth: AuthContext,
502        request: RequestMetadata,
503        http_client: CircuitBreakerClient,
504        job_dispatch: Option<Arc<dyn JobDispatch>>,
505        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
506        env_provider: Arc<dyn EnvProvider>,
507    ) -> Self {
508        Self {
509            auth,
510            request,
511            db_pool,
512            http_client,
513            job_dispatch,
514            workflow_dispatch,
515            env_provider,
516            tx: None,
517            outbox: None,
518            job_info_lookup: None,
519            token_issuer: None,
520        }
521    }
522
523    /// Returns handles to transaction and outbox for the caller to commit/flush.
524    #[allow(clippy::type_complexity)]
525    pub fn with_transaction(
526        db_pool: sqlx::PgPool,
527        tx: Transaction<'static, Postgres>,
528        auth: AuthContext,
529        request: RequestMetadata,
530        http_client: CircuitBreakerClient,
531        job_info_lookup: JobInfoLookup,
532    ) -> (
533        Self,
534        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
535        Arc<Mutex<OutboxBuffer>>,
536    ) {
537        let tx_handle = Arc::new(AsyncMutex::new(tx));
538        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
539
540        let ctx = Self {
541            auth,
542            request,
543            db_pool,
544            http_client,
545            job_dispatch: None,
546            workflow_dispatch: None,
547            env_provider: Arc::new(RealEnvProvider::new()),
548            tx: Some(tx_handle.clone()),
549            outbox: Some(outbox.clone()),
550            job_info_lookup: Some(job_info_lookup),
551            token_issuer: None,
552        };
553
554        (ctx, tx_handle, outbox)
555    }
556
557    pub fn is_transactional(&self) -> bool {
558        self.tx.is_some()
559    }
560
561    pub fn db(&self) -> DbConn<'_> {
562        match &self.tx {
563            Some(tx) => DbConn::Transaction(tx.clone()),
564            None => DbConn::Pool(&self.db_pool),
565        }
566    }
567
568    /// Direct pool access for operations that cannot run inside a transaction.
569    pub fn pool(&self) -> &sqlx::PgPool {
570        &self.db_pool
571    }
572
573    /// Get the HTTP client for external requests.
574    ///
575    /// The client includes circuit breaker protection that tracks failure rates
576    /// per host. After repeated failures, requests fail fast to prevent cascade
577    /// failures when downstream services are unhealthy.
578    pub fn http(&self) -> &reqwest::Client {
579        self.http_client.inner()
580    }
581
582    /// Get the circuit breaker client directly for advanced usage.
583    pub fn http_with_circuit_breaker(&self) -> &CircuitBreakerClient {
584        &self.http_client
585    }
586
587    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
588        self.auth.require_user_id()
589    }
590
591    pub fn require_subject(&self) -> crate::error::Result<&str> {
592        self.auth.require_subject()
593    }
594
595    /// Set the token issuer for this context.
596    pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
597        self.token_issuer = Some(issuer);
598    }
599
600    /// Issue a signed JWT from the given claims.
601    ///
602    /// Only available when HMAC auth is configured in `forge.toml`.
603    /// Returns an error if auth is not configured or uses an external provider (RSA/JWKS).
604    ///
605    /// ```ignore
606    /// let claims = Claims::builder()
607    ///     .user_id(user.id)
608    ///     .duration_secs(7 * 24 * 3600)
609    ///     .build()
610    ///     .map_err(|e| ForgeError::Internal(e))?;
611    ///
612    /// let token = ctx.issue_token(&claims)?;
613    /// ```
614    pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
615        let issuer = self.token_issuer.as_ref().ok_or_else(|| {
616            crate::error::ForgeError::Internal(
617                "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
618                    .into(),
619            )
620        })?;
621        issuer.sign(claims)
622    }
623
624    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
625    pub async fn dispatch_job<T: serde::Serialize>(
626        &self,
627        job_type: &str,
628        args: T,
629    ) -> crate::error::Result<Uuid> {
630        let args_json = serde_json::to_value(args)?;
631
632        // Transactional mode: buffer the job for atomic commit
633        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
634            let job_info = job_info_lookup(job_type).ok_or_else(|| {
635                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
636            })?;
637
638            let pending = PendingJob {
639                id: Uuid::new_v4(),
640                job_type: job_type.to_string(),
641                args: args_json,
642                context: serde_json::json!({}),
643                owner_subject: self.auth.principal_id(),
644                priority: job_info.priority.as_i32(),
645                max_attempts: job_info.retry.max_attempts as i32,
646                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
647            };
648
649            let job_id = pending.id;
650            outbox
651                .lock()
652                .expect("outbox lock poisoned")
653                .jobs
654                .push(pending);
655            return Ok(job_id);
656        }
657
658        // Non-transactional mode: dispatch immediately
659        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
660            crate::error::ForgeError::Internal("Job dispatch not available".into())
661        })?;
662        dispatcher
663            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
664            .await
665    }
666
667    /// Dispatch a job with initial context.
668    pub async fn dispatch_job_with_context<T: serde::Serialize>(
669        &self,
670        job_type: &str,
671        args: T,
672        context: serde_json::Value,
673    ) -> crate::error::Result<Uuid> {
674        let args_json = serde_json::to_value(args)?;
675
676        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
677            let job_info = job_info_lookup(job_type).ok_or_else(|| {
678                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
679            })?;
680
681            let pending = PendingJob {
682                id: Uuid::new_v4(),
683                job_type: job_type.to_string(),
684                args: args_json,
685                context,
686                owner_subject: self.auth.principal_id(),
687                priority: job_info.priority.as_i32(),
688                max_attempts: job_info.retry.max_attempts as i32,
689                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
690            };
691
692            let job_id = pending.id;
693            outbox
694                .lock()
695                .expect("outbox lock poisoned")
696                .jobs
697                .push(pending);
698            return Ok(job_id);
699        }
700
701        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
702            crate::error::ForgeError::Internal("Job dispatch not available".into())
703        })?;
704        dispatcher
705            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
706            .await
707    }
708
709    /// Request cancellation for a job.
710    pub async fn cancel_job(
711        &self,
712        job_id: Uuid,
713        reason: Option<String>,
714    ) -> crate::error::Result<bool> {
715        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
716            crate::error::ForgeError::Internal("Job dispatch not available".into())
717        })?;
718        dispatcher.cancel(job_id, reason).await
719    }
720
721    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
722    pub async fn start_workflow<T: serde::Serialize>(
723        &self,
724        workflow_name: &str,
725        input: T,
726    ) -> crate::error::Result<Uuid> {
727        let input_json = serde_json::to_value(input)?;
728
729        // Transactional mode: buffer the workflow for atomic commit
730        if let Some(outbox) = &self.outbox {
731            let pending = PendingWorkflow {
732                id: Uuid::new_v4(),
733                workflow_name: workflow_name.to_string(),
734                input: input_json,
735                owner_subject: self.auth.principal_id(),
736            };
737
738            let workflow_id = pending.id;
739            outbox
740                .lock()
741                .expect("outbox lock poisoned")
742                .workflows
743                .push(pending);
744            return Ok(workflow_id);
745        }
746
747        // Non-transactional mode: start immediately
748        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
749            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
750        })?;
751        dispatcher
752            .start_by_name(workflow_name, input_json, self.auth.principal_id())
753            .await
754    }
755}
756
757impl EnvAccess for MutationContext {
758    fn env_provider(&self) -> &dyn EnvProvider {
759        self.env_provider.as_ref()
760    }
761}
762
763#[cfg(test)]
764#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
765mod tests {
766    use super::*;
767
768    #[test]
769    fn test_auth_context_unauthenticated() {
770        let ctx = AuthContext::unauthenticated();
771        assert!(!ctx.is_authenticated());
772        assert!(ctx.user_id().is_none());
773        assert!(ctx.require_user_id().is_err());
774    }
775
776    #[test]
777    fn test_auth_context_authenticated() {
778        let user_id = Uuid::new_v4();
779        let ctx = AuthContext::authenticated(
780            user_id,
781            vec!["admin".to_string(), "user".to_string()],
782            HashMap::new(),
783        );
784
785        assert!(ctx.is_authenticated());
786        assert_eq!(ctx.user_id(), Some(user_id));
787        assert!(ctx.require_user_id().is_ok());
788        assert!(ctx.has_role("admin"));
789        assert!(ctx.has_role("user"));
790        assert!(!ctx.has_role("superadmin"));
791        assert!(ctx.require_role("admin").is_ok());
792        assert!(ctx.require_role("superadmin").is_err());
793    }
794
795    #[test]
796    fn test_auth_context_with_claims() {
797        let mut claims = HashMap::new();
798        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
799
800        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
801
802        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
803        assert!(ctx.claim("nonexistent").is_none());
804    }
805
806    #[test]
807    fn test_request_metadata() {
808        let meta = RequestMetadata::new();
809        assert!(!meta.trace_id.is_empty());
810        assert!(meta.client_ip.is_none());
811
812        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
813        assert_eq!(meta2.trace_id, "trace-123");
814    }
815}