Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37
38use futures_core::future::BoxFuture;
39use futures_core::stream::BoxStream;
40use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
41use sqlx::{Postgres, Transaction};
42use tokio::sync::Mutex as AsyncMutex;
43use uuid::Uuid;
44
45use tracing::Instrument;
46
47use super::dispatch::{JobDispatch, WorkflowDispatch};
48use crate::auth::Claims;
49use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
50use crate::http::CircuitBreakerClient;
51use crate::job::JobInfo;
52
53/// Token issuer for signing JWTs.
54///
55/// Implemented by the runtime when HMAC auth is configured.
56/// Available via `ctx.issue_token()` in mutation handlers.
57pub trait TokenIssuer: Send + Sync {
58    /// Sign the given claims into a JWT string.
59    fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
60}
61
62/// Connection wrapper that implements sqlx's `Executor` trait with automatic
63/// `db.query` tracing spans.
64///
65/// Obtain via `ctx.conn().await?` in mutation handlers.
66/// Works with compile-time checked macros via `&mut conn`.
67///
68/// ```ignore
69/// let mut conn = ctx.conn().await?;
70/// sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
71///     .fetch_one(&mut *conn)
72///     .await?
73/// ```
74pub enum ForgeConn<'a> {
75    Pool(sqlx::pool::PoolConnection<Postgres>),
76    Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
77}
78
79impl std::ops::Deref for ForgeConn<'_> {
80    type Target = PgConnection;
81    fn deref(&self) -> &PgConnection {
82        match self {
83            ForgeConn::Pool(c) => c,
84            ForgeConn::Tx(g) => g,
85        }
86    }
87}
88
89impl std::ops::DerefMut for ForgeConn<'_> {
90    fn deref_mut(&mut self) -> &mut PgConnection {
91        match self {
92            ForgeConn::Pool(c) => c,
93            ForgeConn::Tx(g) => g,
94        }
95    }
96}
97
98/// Pool wrapper that adds `db.query` tracing spans to every database operation.
99///
100/// Returned by [`QueryContext::db()`]. Implements sqlx's [`sqlx::Executor`] trait,
101/// so it works as a drop-in replacement for `&PgPool` with compile-time
102/// checked macros (`query!`, `query_as!`).
103///
104/// ```ignore
105/// sqlx::query_as!(User, "SELECT * FROM users")
106///     .fetch_all(ctx.db())
107///     .await?
108/// ```
109#[derive(Clone)]
110pub struct ForgeDb(sqlx::PgPool);
111
112impl std::fmt::Debug for ForgeDb {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_tuple("ForgeDb").finish()
115    }
116}
117
118impl ForgeDb {
119    /// Create a `ForgeDb` from a pool reference. Clones the Arc-backed pool handle.
120    pub fn from_pool(pool: &sqlx::PgPool) -> Self {
121        Self(pool.clone())
122    }
123}
124
125fn sql_operation(sql: &str) -> &'static str {
126    let bytes = sql.trim_start().as_bytes();
127    match bytes.get(..6) {
128        Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
129        Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
130        Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
131        Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
132        _ => "OTHER",
133    }
134}
135
136impl sqlx::Executor<'static> for ForgeDb {
137    type Database = Postgres;
138
139    fn fetch_many<'e, 'q: 'e, E>(
140        self,
141        query: E,
142    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
143    where
144        E: sqlx::Execute<'q, Postgres> + 'q,
145    {
146        (&self.0).fetch_many(query)
147    }
148
149    fn fetch_optional<'e, 'q: 'e, E>(
150        self,
151        query: E,
152    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
153    where
154        E: sqlx::Execute<'q, Postgres> + 'q,
155    {
156        let op = sql_operation(query.sql());
157        let span =
158            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
159        Box::pin(
160            async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
161        )
162    }
163
164    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
165    where
166        E: sqlx::Execute<'q, Postgres> + 'q,
167    {
168        let op = sql_operation(query.sql());
169        let span =
170            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
171        Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
172    }
173
174    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
175    where
176        E: sqlx::Execute<'q, Postgres> + 'q,
177    {
178        let op = sql_operation(query.sql());
179        let span =
180            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
181        Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
182    }
183
184    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
185    where
186        E: sqlx::Execute<'q, Postgres> + 'q,
187    {
188        let op = sql_operation(query.sql());
189        let span =
190            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
191        Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
192    }
193
194    fn prepare_with<'e, 'q: 'e>(
195        self,
196        sql: &'q str,
197        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
198    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
199        Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
200    }
201
202    fn describe<'e, 'q: 'e>(
203        self,
204        sql: &'q str,
205    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
206        Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
207    }
208}
209
210impl std::fmt::Debug for ForgeConn<'_> {
211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212        match self {
213            ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
214            ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
215        }
216    }
217}
218
219impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
220    type Database = Postgres;
221
222    fn fetch_many<'e, 'q: 'e, E>(
223        self,
224        query: E,
225    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
226    where
227        'c: 'e,
228        E: sqlx::Execute<'q, Postgres> + 'q,
229    {
230        let conn: &'e mut PgConnection = &mut *self;
231        conn.fetch_many(query)
232    }
233
234    fn fetch_optional<'e, 'q: 'e, E>(
235        self,
236        query: E,
237    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
238    where
239        'c: 'e,
240        E: sqlx::Execute<'q, Postgres> + 'q,
241    {
242        let op = sql_operation(query.sql());
243        let span =
244            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
245        let conn: &'e mut PgConnection = &mut *self;
246        Box::pin(conn.fetch_optional(query).instrument(span))
247    }
248
249    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
250    where
251        'c: 'e,
252        E: sqlx::Execute<'q, Postgres> + 'q,
253    {
254        let op = sql_operation(query.sql());
255        let span =
256            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
257        let conn: &'e mut PgConnection = &mut *self;
258        Box::pin(conn.execute(query).instrument(span))
259    }
260
261    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
262    where
263        'c: 'e,
264        E: sqlx::Execute<'q, Postgres> + 'q,
265    {
266        let op = sql_operation(query.sql());
267        let span =
268            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
269        let conn: &'e mut PgConnection = &mut *self;
270        Box::pin(conn.fetch_all(query).instrument(span))
271    }
272
273    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
274    where
275        'c: 'e,
276        E: sqlx::Execute<'q, Postgres> + 'q,
277    {
278        let op = sql_operation(query.sql());
279        let span =
280            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
281        let conn: &'e mut PgConnection = &mut *self;
282        Box::pin(conn.fetch_one(query).instrument(span))
283    }
284
285    fn prepare_with<'e, 'q: 'e>(
286        self,
287        sql: &'q str,
288        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
289    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
290    where
291        'c: 'e,
292    {
293        let conn: &'e mut PgConnection = &mut *self;
294        conn.prepare_with(sql, parameters)
295    }
296
297    fn describe<'e, 'q: 'e>(
298        self,
299        sql: &'q str,
300    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
301    where
302        'c: 'e,
303    {
304        let conn: &'e mut PgConnection = &mut *self;
305        conn.describe(sql)
306    }
307}
308
309#[derive(Debug, Clone)]
310pub struct PendingJob {
311    pub id: Uuid,
312    pub job_type: String,
313    pub args: serde_json::Value,
314    pub context: serde_json::Value,
315    pub owner_subject: Option<String>,
316    pub priority: i32,
317    pub max_attempts: i32,
318    pub worker_capability: Option<String>,
319}
320
321#[derive(Debug, Clone)]
322pub struct PendingWorkflow {
323    pub id: Uuid,
324    pub workflow_name: String,
325    pub input: serde_json::Value,
326    pub owner_subject: Option<String>,
327}
328
329#[derive(Default)]
330pub struct OutboxBuffer {
331    pub jobs: Vec<PendingJob>,
332    pub workflows: Vec<PendingWorkflow>,
333}
334
335/// Authentication context available to all functions.
336#[derive(Debug, Clone)]
337pub struct AuthContext {
338    /// The authenticated user ID (if any).
339    user_id: Option<Uuid>,
340    /// User roles.
341    roles: Vec<String>,
342    /// Custom claims from JWT.
343    claims: HashMap<String, serde_json::Value>,
344    /// Whether the request is authenticated.
345    authenticated: bool,
346}
347
348impl AuthContext {
349    /// Create an unauthenticated context.
350    pub fn unauthenticated() -> Self {
351        Self {
352            user_id: None,
353            roles: Vec::new(),
354            claims: HashMap::new(),
355            authenticated: false,
356        }
357    }
358
359    /// Create an authenticated context with a UUID user ID.
360    pub fn authenticated(
361        user_id: Uuid,
362        roles: Vec<String>,
363        claims: HashMap<String, serde_json::Value>,
364    ) -> Self {
365        Self {
366            user_id: Some(user_id),
367            roles,
368            claims,
369            authenticated: true,
370        }
371    }
372
373    /// Create an authenticated context without requiring a UUID user ID.
374    ///
375    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
376    /// Clerk). The raw subject string is available via `subject()` method
377    /// from the "sub" claim.
378    pub fn authenticated_without_uuid(
379        roles: Vec<String>,
380        claims: HashMap<String, serde_json::Value>,
381    ) -> Self {
382        Self {
383            user_id: None,
384            roles,
385            claims,
386            authenticated: true,
387        }
388    }
389
390    /// Check if the user is authenticated.
391    pub fn is_authenticated(&self) -> bool {
392        self.authenticated
393    }
394
395    /// Get the user ID if authenticated.
396    pub fn user_id(&self) -> Option<Uuid> {
397        self.user_id
398    }
399
400    /// Get the user ID, returning an error if not authenticated.
401    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
402        self.user_id
403            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
404    }
405
406    /// Check if the user has a specific role.
407    pub fn has_role(&self, role: &str) -> bool {
408        self.roles.iter().any(|r| r == role)
409    }
410
411    /// Require a specific role, returning an error if not present.
412    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
413        if self.has_role(role) {
414            Ok(())
415        } else {
416            Err(crate::error::ForgeError::Forbidden(format!(
417                "Required role '{}' not present",
418                role
419            )))
420        }
421    }
422
423    /// Get a custom claim value.
424    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
425        self.claims.get(key)
426    }
427
428    /// Get all custom claims.
429    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
430        &self.claims
431    }
432
433    /// Get all roles.
434    pub fn roles(&self) -> &[String] {
435        &self.roles
436    }
437
438    /// Get the raw subject claim.
439    ///
440    /// This works with any provider's subject format (UUID, email, custom ID).
441    /// For providers like Firebase or Clerk that don't use UUIDs, use this
442    /// instead of `user_id()`.
443    pub fn subject(&self) -> Option<&str> {
444        self.claims.get("sub").and_then(|v| v.as_str())
445    }
446
447    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
448    pub fn require_subject(&self) -> crate::error::Result<&str> {
449        if !self.authenticated {
450            return Err(crate::error::ForgeError::Unauthorized(
451                "Authentication required".to_string(),
452            ));
453        }
454        self.subject().ok_or_else(|| {
455            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
456        })
457    }
458
459    /// Get a stable principal identifier for access control and cache scoping.
460    ///
461    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
462    pub fn principal_id(&self) -> Option<String> {
463        self.subject()
464            .map(ToString::to_string)
465            .or_else(|| self.user_id.map(|id| id.to_string()))
466    }
467
468    /// Check whether this principal should be treated as privileged admin.
469    pub fn is_admin(&self) -> bool {
470        self.roles.iter().any(|r| r == "admin")
471    }
472
473    /// Validate that identity/tenant-scoped arguments in a function call match
474    /// the authenticated principal.
475    ///
476    /// When `enforce_scope` is true (private functions), at least one scope key
477    /// must be present and match. When false, existing scope keys are still
478    /// validated but their absence is tolerated.
479    pub fn check_identity_args(
480        &self,
481        function_name: &str,
482        args: &serde_json::Value,
483        enforce_scope: bool,
484    ) -> crate::error::Result<()> {
485        use crate::error::ForgeError;
486
487        if self.is_admin() {
488            return Ok(());
489        }
490
491        if !self.is_authenticated() && !enforce_scope {
492            return Ok(());
493        }
494
495        let Some(obj) = args.as_object() else {
496            if enforce_scope && self.is_authenticated() {
497                return Err(ForgeError::Forbidden(format!(
498                    "Function '{function_name}' must include identity or tenant scope arguments"
499                )));
500            }
501            return Ok(());
502        };
503
504        let mut principal_values: Vec<String> = Vec::new();
505        if let Some(user_id) = self.user_id().map(|id| id.to_string()) {
506            principal_values.push(user_id);
507        }
508        if let Some(subject) = self.principal_id()
509            && !principal_values.iter().any(|v| v == &subject)
510        {
511            principal_values.push(subject);
512        }
513
514        let mut has_scope_key = false;
515
516        for key in [
517            "user_id",
518            "userId",
519            "owner_id",
520            "ownerId",
521            "owner_subject",
522            "ownerSubject",
523            "subject",
524            "sub",
525            "principal_id",
526            "principalId",
527        ] {
528            let Some(value) = obj.get(key) else {
529                continue;
530            };
531            has_scope_key = true;
532
533            if !self.is_authenticated() {
534                return Err(ForgeError::Unauthorized(format!(
535                    "Function '{function_name}' requires authentication for identity-scoped argument '{key}'"
536                )));
537            }
538
539            let serde_json::Value::String(actual) = value else {
540                return Err(ForgeError::InvalidArgument(format!(
541                    "Function '{function_name}' argument '{key}' must be a non-empty string"
542                )));
543            };
544
545            if actual.trim().is_empty() || !principal_values.iter().any(|v| v == actual) {
546                return Err(ForgeError::Forbidden(format!(
547                    "Function '{function_name}' argument '{key}' does not match authenticated principal"
548                )));
549            }
550        }
551
552        for key in ["tenant_id", "tenantId"] {
553            let Some(value) = obj.get(key) else {
554                continue;
555            };
556            has_scope_key = true;
557
558            if !self.is_authenticated() {
559                return Err(ForgeError::Unauthorized(format!(
560                    "Function '{function_name}' requires authentication for tenant-scoped argument '{key}'"
561                )));
562            }
563
564            let expected = self
565                .claim("tenant_id")
566                .and_then(|v| v.as_str())
567                .ok_or_else(|| {
568                    ForgeError::Forbidden(format!(
569                        "Function '{function_name}' argument '{key}' is not allowed for this principal"
570                    ))
571                })?;
572
573            let serde_json::Value::String(actual) = value else {
574                return Err(ForgeError::InvalidArgument(format!(
575                    "Function '{function_name}' argument '{key}' must be a non-empty string"
576                )));
577            };
578
579            if actual.trim().is_empty() || actual != expected {
580                return Err(ForgeError::Forbidden(format!(
581                    "Function '{function_name}' argument '{key}' does not match authenticated tenant"
582                )));
583            }
584        }
585
586        if enforce_scope && self.is_authenticated() && !has_scope_key {
587            return Err(ForgeError::Forbidden(format!(
588                "Function '{function_name}' must include identity or tenant scope arguments"
589            )));
590        }
591
592        Ok(())
593    }
594}
595
596/// Request metadata available to all functions.
597#[derive(Debug, Clone)]
598pub struct RequestMetadata {
599    /// Unique request ID for tracing.
600    pub request_id: Uuid,
601    /// Trace ID for distributed tracing.
602    pub trace_id: String,
603    /// Client IP address.
604    pub client_ip: Option<String>,
605    /// User agent string.
606    pub user_agent: Option<String>,
607    /// Request timestamp.
608    pub timestamp: chrono::DateTime<chrono::Utc>,
609}
610
611impl RequestMetadata {
612    /// Create new request metadata.
613    pub fn new() -> Self {
614        Self {
615            request_id: Uuid::new_v4(),
616            trace_id: Uuid::new_v4().to_string(),
617            client_ip: None,
618            user_agent: None,
619            timestamp: chrono::Utc::now(),
620        }
621    }
622
623    /// Create with a specific trace ID.
624    pub fn with_trace_id(trace_id: String) -> Self {
625        Self {
626            request_id: Uuid::new_v4(),
627            trace_id,
628            client_ip: None,
629            user_agent: None,
630            timestamp: chrono::Utc::now(),
631        }
632    }
633}
634
635impl Default for RequestMetadata {
636    fn default() -> Self {
637        Self::new()
638    }
639}
640
641/// Context for query functions (read-only database access).
642pub struct QueryContext {
643    /// Authentication context.
644    pub auth: AuthContext,
645    /// Request metadata.
646    pub request: RequestMetadata,
647    /// Database pool for read operations.
648    db_pool: sqlx::PgPool,
649    /// Environment variable provider.
650    env_provider: Arc<dyn EnvProvider>,
651}
652
653impl QueryContext {
654    /// Create a new query context.
655    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
656        Self {
657            auth,
658            request,
659            db_pool,
660            env_provider: Arc::new(RealEnvProvider::new()),
661        }
662    }
663
664    /// Create a query context with a custom environment provider.
665    pub fn with_env(
666        db_pool: sqlx::PgPool,
667        auth: AuthContext,
668        request: RequestMetadata,
669        env_provider: Arc<dyn EnvProvider>,
670    ) -> Self {
671        Self {
672            auth,
673            request,
674            db_pool,
675            env_provider,
676        }
677    }
678
679    /// Database handle with automatic `db.query` tracing spans.
680    ///
681    /// Works directly with sqlx compile-time checked macros:
682    /// ```ignore
683    /// sqlx::query_as!(User, "SELECT * FROM users")
684    ///     .fetch_all(ctx.db())
685    ///     .await?
686    /// ```
687    pub fn db(&self) -> ForgeDb {
688        ForgeDb(self.db_pool.clone())
689    }
690
691    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
692        self.auth.require_user_id()
693    }
694
695    /// Like `require_user_id()` but for non-UUID auth providers.
696    pub fn require_subject(&self) -> crate::error::Result<&str> {
697        self.auth.require_subject()
698    }
699}
700
701impl EnvAccess for QueryContext {
702    fn env_provider(&self) -> &dyn EnvProvider {
703        self.env_provider.as_ref()
704    }
705}
706
707/// Callback type for looking up job info by name.
708pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
709
710/// Context for mutation functions (transactional database access).
711pub struct MutationContext {
712    /// Authentication context.
713    pub auth: AuthContext,
714    /// Request metadata.
715    pub request: RequestMetadata,
716    /// Database pool for transactional operations.
717    db_pool: sqlx::PgPool,
718    /// HTTP client with circuit breaker for external requests.
719    http_client: CircuitBreakerClient,
720    /// Optional job dispatcher for dispatching background jobs.
721    job_dispatch: Option<Arc<dyn JobDispatch>>,
722    /// Optional workflow dispatcher for starting workflows.
723    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
724    /// Environment variable provider.
725    env_provider: Arc<dyn EnvProvider>,
726    /// Transaction handle for transactional mutations.
727    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
728    /// Outbox buffer for jobs/workflows dispatched during transaction.
729    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
730    /// Job info lookup for transactional dispatch.
731    job_info_lookup: Option<JobInfoLookup>,
732    /// Optional token issuer for signing JWTs (available when HMAC auth is configured).
733    token_issuer: Option<Arc<dyn TokenIssuer>>,
734}
735
736impl MutationContext {
737    /// Create a new mutation context.
738    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
739        Self {
740            auth,
741            request,
742            db_pool,
743            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
744            job_dispatch: None,
745            workflow_dispatch: None,
746            env_provider: Arc::new(RealEnvProvider::new()),
747            tx: None,
748            outbox: None,
749            job_info_lookup: None,
750            token_issuer: None,
751        }
752    }
753
754    /// Create a mutation context with dispatch capabilities.
755    pub fn with_dispatch(
756        db_pool: sqlx::PgPool,
757        auth: AuthContext,
758        request: RequestMetadata,
759        http_client: CircuitBreakerClient,
760        job_dispatch: Option<Arc<dyn JobDispatch>>,
761        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
762    ) -> Self {
763        Self {
764            auth,
765            request,
766            db_pool,
767            http_client,
768            job_dispatch,
769            workflow_dispatch,
770            env_provider: Arc::new(RealEnvProvider::new()),
771            tx: None,
772            outbox: None,
773            job_info_lookup: None,
774            token_issuer: None,
775        }
776    }
777
778    /// Create a mutation context with a custom environment provider.
779    pub fn with_env(
780        db_pool: sqlx::PgPool,
781        auth: AuthContext,
782        request: RequestMetadata,
783        http_client: CircuitBreakerClient,
784        job_dispatch: Option<Arc<dyn JobDispatch>>,
785        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
786        env_provider: Arc<dyn EnvProvider>,
787    ) -> Self {
788        Self {
789            auth,
790            request,
791            db_pool,
792            http_client,
793            job_dispatch,
794            workflow_dispatch,
795            env_provider,
796            tx: None,
797            outbox: None,
798            job_info_lookup: None,
799            token_issuer: None,
800        }
801    }
802
803    /// Returns handles to transaction and outbox for the caller to commit/flush.
804    #[allow(clippy::type_complexity)]
805    pub fn with_transaction(
806        db_pool: sqlx::PgPool,
807        tx: Transaction<'static, Postgres>,
808        auth: AuthContext,
809        request: RequestMetadata,
810        http_client: CircuitBreakerClient,
811        job_info_lookup: JobInfoLookup,
812    ) -> (
813        Self,
814        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
815        Arc<Mutex<OutboxBuffer>>,
816    ) {
817        let tx_handle = Arc::new(AsyncMutex::new(tx));
818        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
819
820        let ctx = Self {
821            auth,
822            request,
823            db_pool,
824            http_client,
825            job_dispatch: None,
826            workflow_dispatch: None,
827            env_provider: Arc::new(RealEnvProvider::new()),
828            tx: Some(tx_handle.clone()),
829            outbox: Some(outbox.clone()),
830            job_info_lookup: Some(job_info_lookup),
831            token_issuer: None,
832        };
833
834        (ctx, tx_handle, outbox)
835    }
836
837    pub fn is_transactional(&self) -> bool {
838        self.tx.is_some()
839    }
840
841    /// Acquire a connection compatible with sqlx compile-time checked macros.
842    ///
843    /// In transactional mode, returns a guard over the active transaction.
844    /// Otherwise acquires a fresh connection from the pool.
845    ///
846    /// ```ignore
847    /// let mut conn = ctx.conn().await?;
848    /// sqlx::query_as!(User, "INSERT INTO users ... RETURNING *", ...)
849    ///     .fetch_one(&mut *conn)
850    ///     .await?
851    /// ```
852    pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
853        match &self.tx {
854            Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
855            None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
856        }
857    }
858
859    /// Direct pool access for operations that cannot run inside a transaction.
860    pub fn pool(&self) -> &sqlx::PgPool {
861        &self.db_pool
862    }
863
864    /// Get the HTTP client for external requests.
865    ///
866    /// The client includes circuit breaker protection that tracks failure rates
867    /// per host. After repeated failures, requests fail fast to prevent cascade
868    /// failures when downstream services are unhealthy.
869    pub fn http(&self) -> &reqwest::Client {
870        self.http_client.inner()
871    }
872
873    /// Get the circuit breaker client directly for advanced usage.
874    pub fn http_with_circuit_breaker(&self) -> &CircuitBreakerClient {
875        &self.http_client
876    }
877
878    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
879        self.auth.require_user_id()
880    }
881
882    pub fn require_subject(&self) -> crate::error::Result<&str> {
883        self.auth.require_subject()
884    }
885
886    /// Set the token issuer for this context.
887    pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
888        self.token_issuer = Some(issuer);
889    }
890
891    /// Issue a signed JWT from the given claims.
892    ///
893    /// Only available when HMAC auth is configured in `forge.toml`.
894    /// Returns an error if auth is not configured or uses an external provider (RSA/JWKS).
895    ///
896    /// ```ignore
897    /// let claims = Claims::builder()
898    ///     .user_id(user.id)
899    ///     .duration_secs(7 * 24 * 3600)
900    ///     .build()
901    ///     .map_err(|e| ForgeError::Internal(e))?;
902    ///
903    /// let token = ctx.issue_token(&claims)?;
904    /// ```
905    pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
906        let issuer = self.token_issuer.as_ref().ok_or_else(|| {
907            crate::error::ForgeError::Internal(
908                "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
909                    .into(),
910            )
911        })?;
912        issuer.sign(claims)
913    }
914
915    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
916    pub async fn dispatch_job<T: serde::Serialize>(
917        &self,
918        job_type: &str,
919        args: T,
920    ) -> crate::error::Result<Uuid> {
921        let args_json = serde_json::to_value(args)?;
922
923        // Transactional mode: buffer the job for atomic commit
924        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
925            let job_info = job_info_lookup(job_type).ok_or_else(|| {
926                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
927            })?;
928
929            let pending = PendingJob {
930                id: Uuid::new_v4(),
931                job_type: job_type.to_string(),
932                args: args_json,
933                context: serde_json::json!({}),
934                owner_subject: self.auth.principal_id(),
935                priority: job_info.priority.as_i32(),
936                max_attempts: job_info.retry.max_attempts as i32,
937                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
938            };
939
940            let job_id = pending.id;
941            outbox
942                .lock()
943                .expect("outbox lock poisoned")
944                .jobs
945                .push(pending);
946            return Ok(job_id);
947        }
948
949        // Non-transactional mode: dispatch immediately
950        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
951            crate::error::ForgeError::Internal("Job dispatch not available".into())
952        })?;
953        dispatcher
954            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
955            .await
956    }
957
958    /// Dispatch a job with initial context.
959    pub async fn dispatch_job_with_context<T: serde::Serialize>(
960        &self,
961        job_type: &str,
962        args: T,
963        context: serde_json::Value,
964    ) -> crate::error::Result<Uuid> {
965        let args_json = serde_json::to_value(args)?;
966
967        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
968            let job_info = job_info_lookup(job_type).ok_or_else(|| {
969                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
970            })?;
971
972            let pending = PendingJob {
973                id: Uuid::new_v4(),
974                job_type: job_type.to_string(),
975                args: args_json,
976                context,
977                owner_subject: self.auth.principal_id(),
978                priority: job_info.priority.as_i32(),
979                max_attempts: job_info.retry.max_attempts as i32,
980                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
981            };
982
983            let job_id = pending.id;
984            outbox
985                .lock()
986                .expect("outbox lock poisoned")
987                .jobs
988                .push(pending);
989            return Ok(job_id);
990        }
991
992        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
993            crate::error::ForgeError::Internal("Job dispatch not available".into())
994        })?;
995        dispatcher
996            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
997            .await
998    }
999
1000    /// Request cancellation for a job.
1001    pub async fn cancel_job(
1002        &self,
1003        job_id: Uuid,
1004        reason: Option<String>,
1005    ) -> crate::error::Result<bool> {
1006        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1007            crate::error::ForgeError::Internal("Job dispatch not available".into())
1008        })?;
1009        dispatcher.cancel(job_id, reason).await
1010    }
1011
1012    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
1013    pub async fn start_workflow<T: serde::Serialize>(
1014        &self,
1015        workflow_name: &str,
1016        input: T,
1017    ) -> crate::error::Result<Uuid> {
1018        let input_json = serde_json::to_value(input)?;
1019
1020        // Transactional mode: buffer the workflow for atomic commit
1021        if let Some(outbox) = &self.outbox {
1022            let pending = PendingWorkflow {
1023                id: Uuid::new_v4(),
1024                workflow_name: workflow_name.to_string(),
1025                input: input_json,
1026                owner_subject: self.auth.principal_id(),
1027            };
1028
1029            let workflow_id = pending.id;
1030            outbox
1031                .lock()
1032                .expect("outbox lock poisoned")
1033                .workflows
1034                .push(pending);
1035            return Ok(workflow_id);
1036        }
1037
1038        // Non-transactional mode: start immediately
1039        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1040            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1041        })?;
1042        dispatcher
1043            .start_by_name(workflow_name, input_json, self.auth.principal_id())
1044            .await
1045    }
1046}
1047
1048impl EnvAccess for MutationContext {
1049    fn env_provider(&self) -> &dyn EnvProvider {
1050        self.env_provider.as_ref()
1051    }
1052}
1053
1054#[cfg(test)]
1055#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1056mod tests {
1057    use super::*;
1058
1059    #[test]
1060    fn test_auth_context_unauthenticated() {
1061        let ctx = AuthContext::unauthenticated();
1062        assert!(!ctx.is_authenticated());
1063        assert!(ctx.user_id().is_none());
1064        assert!(ctx.require_user_id().is_err());
1065    }
1066
1067    #[test]
1068    fn test_auth_context_authenticated() {
1069        let user_id = Uuid::new_v4();
1070        let ctx = AuthContext::authenticated(
1071            user_id,
1072            vec!["admin".to_string(), "user".to_string()],
1073            HashMap::new(),
1074        );
1075
1076        assert!(ctx.is_authenticated());
1077        assert_eq!(ctx.user_id(), Some(user_id));
1078        assert!(ctx.require_user_id().is_ok());
1079        assert!(ctx.has_role("admin"));
1080        assert!(ctx.has_role("user"));
1081        assert!(!ctx.has_role("superadmin"));
1082        assert!(ctx.require_role("admin").is_ok());
1083        assert!(ctx.require_role("superadmin").is_err());
1084    }
1085
1086    #[test]
1087    fn test_auth_context_with_claims() {
1088        let mut claims = HashMap::new();
1089        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1090
1091        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1092
1093        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1094        assert!(ctx.claim("nonexistent").is_none());
1095    }
1096
1097    #[test]
1098    fn test_request_metadata() {
1099        let meta = RequestMetadata::new();
1100        assert!(!meta.trace_id.is_empty());
1101        assert!(meta.client_ip.is_none());
1102
1103        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1104        assert_eq!(meta2.trace_id, "trace-123");
1105    }
1106}