Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54/// Token issuer for signing JWTs.
55///
56/// Implemented by the runtime when HMAC auth is configured.
57/// Available via `ctx.issue_token()` in mutation handlers.
58pub trait TokenIssuer: Send + Sync {
59    /// Sign the given claims into a JWT string.
60    fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63/// Connection wrapper that implements sqlx's `Executor` trait with automatic
64/// `db.query` tracing spans.
65///
66/// Obtain via `ctx.conn().await?` in mutation handlers.
67/// Works with compile-time checked macros via `&mut conn`.
68///
69/// ```ignore
70/// let mut conn = ctx.conn().await?;
71/// sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
72///     .fetch_one(&mut *conn)
73///     .await?
74/// ```
75pub enum ForgeConn<'a> {
76    Pool(sqlx::pool::PoolConnection<Postgres>),
77    Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81    type Target = PgConnection;
82    fn deref(&self) -> &PgConnection {
83        match self {
84            ForgeConn::Pool(c) => c,
85            ForgeConn::Tx(g) => g,
86        }
87    }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91    fn deref_mut(&mut self) -> &mut PgConnection {
92        match self {
93            ForgeConn::Pool(c) => c,
94            ForgeConn::Tx(g) => g,
95        }
96    }
97}
98
99/// Pool wrapper that adds `db.query` tracing spans to every database operation.
100///
101/// Returned by [`QueryContext::db()`]. Implements sqlx's [`sqlx::Executor`] trait,
102/// so it works as a drop-in replacement for `&PgPool` with compile-time
103/// checked macros (`query!`, `query_as!`).
104///
105/// ```ignore
106/// sqlx::query_as!(User, "SELECT * FROM users")
107///     .fetch_all(ctx.db())
108///     .await?
109/// ```
110#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_tuple("ForgeDb").finish()
116    }
117}
118
119impl ForgeDb {
120    /// Create a `ForgeDb` from a pool reference. Clones the Arc-backed pool handle.
121    pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122        Self(pool.clone())
123    }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127    let bytes = sql.trim_start().as_bytes();
128    match bytes.get(..6) {
129        Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130        Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131        Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132        Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133        _ => "OTHER",
134    }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138    type Database = Postgres;
139
140    fn fetch_many<'e, 'q: 'e, E>(
141        self,
142        query: E,
143    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144    where
145        E: sqlx::Execute<'q, Postgres> + 'q,
146    {
147        (&self.0).fetch_many(query)
148    }
149
150    fn fetch_optional<'e, 'q: 'e, E>(
151        self,
152        query: E,
153    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154    where
155        E: sqlx::Execute<'q, Postgres> + 'q,
156    {
157        let op = sql_operation(query.sql());
158        let span =
159            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160        Box::pin(
161            async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162        )
163    }
164
165    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166    where
167        E: sqlx::Execute<'q, Postgres> + 'q,
168    {
169        let op = sql_operation(query.sql());
170        let span =
171            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172        Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173    }
174
175    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176    where
177        E: sqlx::Execute<'q, Postgres> + 'q,
178    {
179        let op = sql_operation(query.sql());
180        let span =
181            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182        Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183    }
184
185    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186    where
187        E: sqlx::Execute<'q, Postgres> + 'q,
188    {
189        let op = sql_operation(query.sql());
190        let span =
191            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192        Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193    }
194
195    fn prepare_with<'e, 'q: 'e>(
196        self,
197        sql: &'q str,
198        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200        Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201    }
202
203    fn describe<'e, 'q: 'e>(
204        self,
205        sql: &'q str,
206    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207        Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208    }
209}
210
211impl std::fmt::Debug for ForgeConn<'_> {
212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213        match self {
214            ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
215            ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
216        }
217    }
218}
219
220impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
221    type Database = Postgres;
222
223    fn fetch_many<'e, 'q: 'e, E>(
224        self,
225        query: E,
226    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
227    where
228        'c: 'e,
229        E: sqlx::Execute<'q, Postgres> + 'q,
230    {
231        let conn: &'e mut PgConnection = &mut *self;
232        conn.fetch_many(query)
233    }
234
235    fn fetch_optional<'e, 'q: 'e, E>(
236        self,
237        query: E,
238    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
239    where
240        'c: 'e,
241        E: sqlx::Execute<'q, Postgres> + 'q,
242    {
243        let op = sql_operation(query.sql());
244        let span =
245            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
246        let conn: &'e mut PgConnection = &mut *self;
247        Box::pin(conn.fetch_optional(query).instrument(span))
248    }
249
250    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
251    where
252        'c: 'e,
253        E: sqlx::Execute<'q, Postgres> + 'q,
254    {
255        let op = sql_operation(query.sql());
256        let span =
257            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
258        let conn: &'e mut PgConnection = &mut *self;
259        Box::pin(conn.execute(query).instrument(span))
260    }
261
262    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
263    where
264        'c: 'e,
265        E: sqlx::Execute<'q, Postgres> + 'q,
266    {
267        let op = sql_operation(query.sql());
268        let span =
269            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
270        let conn: &'e mut PgConnection = &mut *self;
271        Box::pin(conn.fetch_all(query).instrument(span))
272    }
273
274    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
275    where
276        'c: 'e,
277        E: sqlx::Execute<'q, Postgres> + 'q,
278    {
279        let op = sql_operation(query.sql());
280        let span =
281            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
282        let conn: &'e mut PgConnection = &mut *self;
283        Box::pin(conn.fetch_one(query).instrument(span))
284    }
285
286    fn prepare_with<'e, 'q: 'e>(
287        self,
288        sql: &'q str,
289        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
290    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
291    where
292        'c: 'e,
293    {
294        let conn: &'e mut PgConnection = &mut *self;
295        conn.prepare_with(sql, parameters)
296    }
297
298    fn describe<'e, 'q: 'e>(
299        self,
300        sql: &'q str,
301    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
302    where
303        'c: 'e,
304    {
305        let conn: &'e mut PgConnection = &mut *self;
306        conn.describe(sql)
307    }
308}
309
310#[derive(Debug, Clone)]
311pub struct PendingJob {
312    pub id: Uuid,
313    pub job_type: String,
314    pub args: serde_json::Value,
315    pub context: serde_json::Value,
316    pub owner_subject: Option<String>,
317    pub priority: i32,
318    pub max_attempts: i32,
319    pub worker_capability: Option<String>,
320}
321
322#[derive(Debug, Clone)]
323pub struct PendingWorkflow {
324    pub id: Uuid,
325    pub workflow_name: String,
326    pub input: serde_json::Value,
327    pub owner_subject: Option<String>,
328}
329
330#[derive(Default)]
331pub struct OutboxBuffer {
332    pub jobs: Vec<PendingJob>,
333    pub workflows: Vec<PendingWorkflow>,
334}
335
336/// Authentication context available to all functions.
337#[derive(Debug, Clone)]
338pub struct AuthContext {
339    /// The authenticated user ID (if any).
340    user_id: Option<Uuid>,
341    /// User roles.
342    roles: Vec<String>,
343    /// Custom claims from JWT.
344    claims: HashMap<String, serde_json::Value>,
345    /// Whether the request is authenticated.
346    authenticated: bool,
347}
348
349impl AuthContext {
350    /// Create an unauthenticated context.
351    pub fn unauthenticated() -> Self {
352        Self {
353            user_id: None,
354            roles: Vec::new(),
355            claims: HashMap::new(),
356            authenticated: false,
357        }
358    }
359
360    /// Create an authenticated context with a UUID user ID.
361    pub fn authenticated(
362        user_id: Uuid,
363        roles: Vec<String>,
364        claims: HashMap<String, serde_json::Value>,
365    ) -> Self {
366        Self {
367            user_id: Some(user_id),
368            roles,
369            claims,
370            authenticated: true,
371        }
372    }
373
374    /// Create an authenticated context without requiring a UUID user ID.
375    ///
376    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
377    /// Clerk). The raw subject string is available via `subject()` method
378    /// from the "sub" claim.
379    pub fn authenticated_without_uuid(
380        roles: Vec<String>,
381        claims: HashMap<String, serde_json::Value>,
382    ) -> Self {
383        Self {
384            user_id: None,
385            roles,
386            claims,
387            authenticated: true,
388        }
389    }
390
391    /// Check if the user is authenticated.
392    pub fn is_authenticated(&self) -> bool {
393        self.authenticated
394    }
395
396    /// Get the user ID if authenticated.
397    pub fn user_id(&self) -> Option<Uuid> {
398        self.user_id
399    }
400
401    /// Get the user ID, returning an error if not authenticated.
402    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
403        self.user_id
404            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
405    }
406
407    /// Check if the user has a specific role.
408    pub fn has_role(&self, role: &str) -> bool {
409        self.roles.iter().any(|r| r == role)
410    }
411
412    /// Require a specific role, returning an error if not present.
413    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
414        if self.has_role(role) {
415            Ok(())
416        } else {
417            Err(crate::error::ForgeError::Forbidden(format!(
418                "Required role '{}' not present",
419                role
420            )))
421        }
422    }
423
424    /// Get a custom claim value.
425    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
426        self.claims.get(key)
427    }
428
429    /// Get all custom claims.
430    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
431        &self.claims
432    }
433
434    /// Get all roles.
435    pub fn roles(&self) -> &[String] {
436        &self.roles
437    }
438
439    /// Get the raw subject claim.
440    ///
441    /// This works with any provider's subject format (UUID, email, custom ID).
442    /// For providers like Firebase or Clerk that don't use UUIDs, use this
443    /// instead of `user_id()`.
444    pub fn subject(&self) -> Option<&str> {
445        self.claims.get("sub").and_then(|v| v.as_str())
446    }
447
448    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
449    pub fn require_subject(&self) -> crate::error::Result<&str> {
450        if !self.authenticated {
451            return Err(crate::error::ForgeError::Unauthorized(
452                "Authentication required".to_string(),
453            ));
454        }
455        self.subject().ok_or_else(|| {
456            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
457        })
458    }
459
460    /// Get a stable principal identifier for access control and cache scoping.
461    ///
462    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
463    pub fn principal_id(&self) -> Option<String> {
464        self.subject()
465            .map(ToString::to_string)
466            .or_else(|| self.user_id.map(|id| id.to_string()))
467    }
468
469    /// Check whether this principal should be treated as privileged admin.
470    pub fn is_admin(&self) -> bool {
471        self.roles.iter().any(|r| r == "admin")
472    }
473
474    /// Validate that identity/tenant-scoped arguments in a function call match
475    /// the authenticated principal.
476    ///
477    /// When `enforce_scope` is true (private functions), at least one scope key
478    /// must be present and match. When false, existing scope keys are still
479    /// validated but their absence is tolerated.
480    pub fn check_identity_args(
481        &self,
482        function_name: &str,
483        args: &serde_json::Value,
484        enforce_scope: bool,
485    ) -> crate::error::Result<()> {
486        use crate::error::ForgeError;
487
488        if self.is_admin() {
489            return Ok(());
490        }
491
492        if !self.is_authenticated() && !enforce_scope {
493            return Ok(());
494        }
495
496        let Some(obj) = args.as_object() else {
497            if enforce_scope && self.is_authenticated() {
498                return Err(ForgeError::Forbidden(format!(
499                    "Function '{function_name}' must include identity or tenant scope arguments"
500                )));
501            }
502            return Ok(());
503        };
504
505        let mut principal_values: Vec<String> = Vec::new();
506        if let Some(user_id) = self.user_id().map(|id| id.to_string()) {
507            principal_values.push(user_id);
508        }
509        if let Some(subject) = self.principal_id()
510            && !principal_values.iter().any(|v| v == &subject)
511        {
512            principal_values.push(subject);
513        }
514
515        let mut has_scope_key = false;
516
517        for key in [
518            "user_id",
519            "userId",
520            "owner_id",
521            "ownerId",
522            "owner_subject",
523            "ownerSubject",
524            "subject",
525            "sub",
526            "principal_id",
527            "principalId",
528        ] {
529            let Some(value) = obj.get(key) else {
530                continue;
531            };
532            has_scope_key = true;
533
534            if !self.is_authenticated() {
535                return Err(ForgeError::Unauthorized(format!(
536                    "Function '{function_name}' requires authentication for identity-scoped argument '{key}'"
537                )));
538            }
539
540            let serde_json::Value::String(actual) = value else {
541                return Err(ForgeError::InvalidArgument(format!(
542                    "Function '{function_name}' argument '{key}' must be a non-empty string"
543                )));
544            };
545
546            if actual.trim().is_empty() || !principal_values.iter().any(|v| v == actual) {
547                return Err(ForgeError::Forbidden(format!(
548                    "Function '{function_name}' argument '{key}' does not match authenticated principal"
549                )));
550            }
551        }
552
553        for key in ["tenant_id", "tenantId"] {
554            let Some(value) = obj.get(key) else {
555                continue;
556            };
557            has_scope_key = true;
558
559            if !self.is_authenticated() {
560                return Err(ForgeError::Unauthorized(format!(
561                    "Function '{function_name}' requires authentication for tenant-scoped argument '{key}'"
562                )));
563            }
564
565            let expected = self
566                .claim("tenant_id")
567                .and_then(|v| v.as_str())
568                .ok_or_else(|| {
569                    ForgeError::Forbidden(format!(
570                        "Function '{function_name}' argument '{key}' is not allowed for this principal"
571                    ))
572                })?;
573
574            let serde_json::Value::String(actual) = value else {
575                return Err(ForgeError::InvalidArgument(format!(
576                    "Function '{function_name}' argument '{key}' must be a non-empty string"
577                )));
578            };
579
580            if actual.trim().is_empty() || actual != expected {
581                return Err(ForgeError::Forbidden(format!(
582                    "Function '{function_name}' argument '{key}' does not match authenticated tenant"
583                )));
584            }
585        }
586
587        if enforce_scope && self.is_authenticated() && !has_scope_key {
588            return Err(ForgeError::Forbidden(format!(
589                "Function '{function_name}' must include identity or tenant scope arguments"
590            )));
591        }
592
593        Ok(())
594    }
595}
596
597/// Request metadata available to all functions.
598#[derive(Debug, Clone)]
599pub struct RequestMetadata {
600    /// Unique request ID for tracing.
601    pub request_id: Uuid,
602    /// Trace ID for distributed tracing.
603    pub trace_id: String,
604    /// Client IP address.
605    pub client_ip: Option<String>,
606    /// User agent string.
607    pub user_agent: Option<String>,
608    /// Request timestamp.
609    pub timestamp: chrono::DateTime<chrono::Utc>,
610}
611
612impl RequestMetadata {
613    /// Create new request metadata.
614    pub fn new() -> Self {
615        Self {
616            request_id: Uuid::new_v4(),
617            trace_id: Uuid::new_v4().to_string(),
618            client_ip: None,
619            user_agent: None,
620            timestamp: chrono::Utc::now(),
621        }
622    }
623
624    /// Create with a specific trace ID.
625    pub fn with_trace_id(trace_id: String) -> Self {
626        Self {
627            request_id: Uuid::new_v4(),
628            trace_id,
629            client_ip: None,
630            user_agent: None,
631            timestamp: chrono::Utc::now(),
632        }
633    }
634}
635
636impl Default for RequestMetadata {
637    fn default() -> Self {
638        Self::new()
639    }
640}
641
642/// Context for query functions (read-only database access).
643pub struct QueryContext {
644    /// Authentication context.
645    pub auth: AuthContext,
646    /// Request metadata.
647    pub request: RequestMetadata,
648    /// Database pool for read operations.
649    db_pool: sqlx::PgPool,
650    /// Environment variable provider.
651    env_provider: Arc<dyn EnvProvider>,
652}
653
654impl QueryContext {
655    /// Create a new query context.
656    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
657        Self {
658            auth,
659            request,
660            db_pool,
661            env_provider: Arc::new(RealEnvProvider::new()),
662        }
663    }
664
665    /// Create a query context with a custom environment provider.
666    pub fn with_env(
667        db_pool: sqlx::PgPool,
668        auth: AuthContext,
669        request: RequestMetadata,
670        env_provider: Arc<dyn EnvProvider>,
671    ) -> Self {
672        Self {
673            auth,
674            request,
675            db_pool,
676            env_provider,
677        }
678    }
679
680    /// Database handle with automatic `db.query` tracing spans.
681    ///
682    /// Works directly with sqlx compile-time checked macros:
683    /// ```ignore
684    /// sqlx::query_as!(User, "SELECT * FROM users")
685    ///     .fetch_all(ctx.db())
686    ///     .await?
687    /// ```
688    pub fn db(&self) -> ForgeDb {
689        ForgeDb(self.db_pool.clone())
690    }
691
692    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
693        self.auth.require_user_id()
694    }
695
696    /// Like `require_user_id()` but for non-UUID auth providers.
697    pub fn require_subject(&self) -> crate::error::Result<&str> {
698        self.auth.require_subject()
699    }
700}
701
702impl EnvAccess for QueryContext {
703    fn env_provider(&self) -> &dyn EnvProvider {
704        self.env_provider.as_ref()
705    }
706}
707
708/// Callback type for looking up job info by name.
709pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
710
711/// Token TTL configuration resolved from `[auth]` in forge.toml.
712#[derive(Debug, Clone)]
713pub struct AuthTokenTtl {
714    /// Access token lifetime in seconds (default 3600).
715    pub access_token_secs: i64,
716    /// Refresh token lifetime in days (default 30).
717    pub refresh_token_days: i64,
718}
719
720impl Default for AuthTokenTtl {
721    fn default() -> Self {
722        Self {
723            access_token_secs: 3600,
724            refresh_token_days: 30,
725        }
726    }
727}
728
729/// Context for mutation functions (transactional database access).
730pub struct MutationContext {
731    /// Authentication context.
732    pub auth: AuthContext,
733    /// Request metadata.
734    pub request: RequestMetadata,
735    /// Database pool for transactional operations.
736    db_pool: sqlx::PgPool,
737    /// HTTP client with circuit breaker for external requests.
738    http_client: CircuitBreakerClient,
739    /// Default timeout for outbound HTTP requests made through the
740    /// circuit-breaker client. `None` means unlimited.
741    http_timeout: Option<Duration>,
742    /// Optional job dispatcher for dispatching background jobs.
743    job_dispatch: Option<Arc<dyn JobDispatch>>,
744    /// Optional workflow dispatcher for starting workflows.
745    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
746    /// Environment variable provider.
747    env_provider: Arc<dyn EnvProvider>,
748    /// Transaction handle for transactional mutations.
749    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
750    /// Outbox buffer for jobs/workflows dispatched during transaction.
751    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
752    /// Job info lookup for transactional dispatch.
753    job_info_lookup: Option<JobInfoLookup>,
754    /// Optional token issuer for signing JWTs (available when HMAC auth is configured).
755    token_issuer: Option<Arc<dyn TokenIssuer>>,
756    /// Token TTL config from forge.toml.
757    token_ttl: AuthTokenTtl,
758}
759
760impl MutationContext {
761    /// Create a new mutation context.
762    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
763        Self {
764            auth,
765            request,
766            db_pool,
767            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
768            http_timeout: None,
769            job_dispatch: None,
770            workflow_dispatch: None,
771            env_provider: Arc::new(RealEnvProvider::new()),
772            tx: None,
773            outbox: None,
774            job_info_lookup: None,
775            token_issuer: None,
776            token_ttl: AuthTokenTtl::default(),
777        }
778    }
779
780    /// Create a mutation context with dispatch capabilities.
781    pub fn with_dispatch(
782        db_pool: sqlx::PgPool,
783        auth: AuthContext,
784        request: RequestMetadata,
785        http_client: CircuitBreakerClient,
786        job_dispatch: Option<Arc<dyn JobDispatch>>,
787        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
788    ) -> Self {
789        Self {
790            auth,
791            request,
792            db_pool,
793            http_client,
794            http_timeout: None,
795            job_dispatch,
796            workflow_dispatch,
797            env_provider: Arc::new(RealEnvProvider::new()),
798            tx: None,
799            outbox: None,
800            job_info_lookup: None,
801            token_issuer: None,
802            token_ttl: AuthTokenTtl::default(),
803        }
804    }
805
806    /// Create a mutation context with a custom environment provider.
807    pub fn with_env(
808        db_pool: sqlx::PgPool,
809        auth: AuthContext,
810        request: RequestMetadata,
811        http_client: CircuitBreakerClient,
812        job_dispatch: Option<Arc<dyn JobDispatch>>,
813        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
814        env_provider: Arc<dyn EnvProvider>,
815    ) -> Self {
816        Self {
817            auth,
818            request,
819            db_pool,
820            http_client,
821            http_timeout: None,
822            job_dispatch,
823            workflow_dispatch,
824            env_provider,
825            tx: None,
826            outbox: None,
827            job_info_lookup: None,
828            token_issuer: None,
829            token_ttl: AuthTokenTtl::default(),
830        }
831    }
832
833    /// Returns handles to transaction and outbox for the caller to commit/flush.
834    #[allow(clippy::type_complexity)]
835    pub fn with_transaction(
836        db_pool: sqlx::PgPool,
837        tx: Transaction<'static, Postgres>,
838        auth: AuthContext,
839        request: RequestMetadata,
840        http_client: CircuitBreakerClient,
841        job_info_lookup: JobInfoLookup,
842    ) -> (
843        Self,
844        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
845        Arc<Mutex<OutboxBuffer>>,
846    ) {
847        let tx_handle = Arc::new(AsyncMutex::new(tx));
848        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
849
850        let ctx = Self {
851            auth,
852            request,
853            db_pool,
854            http_client,
855            http_timeout: None,
856            job_dispatch: None,
857            workflow_dispatch: None,
858            env_provider: Arc::new(RealEnvProvider::new()),
859            tx: Some(tx_handle.clone()),
860            outbox: Some(outbox.clone()),
861            job_info_lookup: Some(job_info_lookup),
862            token_issuer: None,
863            token_ttl: AuthTokenTtl::default(),
864        };
865
866        (ctx, tx_handle, outbox)
867    }
868
869    pub fn is_transactional(&self) -> bool {
870        self.tx.is_some()
871    }
872
873    /// Acquire a connection compatible with sqlx compile-time checked macros.
874    ///
875    /// In transactional mode, returns a guard over the active transaction.
876    /// Otherwise acquires a fresh connection from the pool.
877    ///
878    /// ```ignore
879    /// let mut conn = ctx.conn().await?;
880    /// sqlx::query_as!(User, "INSERT INTO users ... RETURNING *", ...)
881    ///     .fetch_one(&mut *conn)
882    ///     .await?
883    /// ```
884    pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
885        match &self.tx {
886            Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
887            None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
888        }
889    }
890
891    /// Direct pool access for operations that cannot run inside a transaction.
892    pub fn pool(&self) -> &sqlx::PgPool {
893        &self.db_pool
894    }
895
896    /// Get the HTTP client for external requests.
897    ///
898    /// Requests go through the circuit breaker automatically. When the handler
899    /// declared an explicit `timeout`, that timeout is also applied to outbound
900    /// HTTP requests unless the request overrides it.
901    pub fn http(&self) -> crate::http::HttpClient {
902        self.http_client.with_timeout(self.http_timeout)
903    }
904
905    /// Get the raw reqwest client, bypassing circuit breaker execution.
906    pub fn raw_http(&self) -> &reqwest::Client {
907        self.http_client.inner()
908    }
909
910    /// Get the circuit-breaker-backed HTTP client explicitly.
911    pub fn http_with_circuit_breaker(&self) -> crate::http::HttpClient {
912        self.http()
913    }
914
915    /// Set the default outbound HTTP request timeout for this context.
916    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
917        self.http_timeout = timeout;
918    }
919
920    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
921        self.auth.require_user_id()
922    }
923
924    pub fn require_subject(&self) -> crate::error::Result<&str> {
925        self.auth.require_subject()
926    }
927
928    /// Set the token issuer for this context.
929    pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
930        self.token_issuer = Some(issuer);
931    }
932
933    /// Set the token TTL configuration (from forge.toml `[auth]`).
934    pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
935        self.token_ttl = ttl;
936    }
937
938    /// Issue a signed JWT from the given claims.
939    ///
940    /// Only available when HMAC auth is configured in `forge.toml`.
941    /// Returns an error if auth is not configured or uses an external provider (RSA/JWKS).
942    ///
943    /// ```ignore
944    /// let claims = Claims::builder()
945    ///     .user_id(user.id)
946    ///     .duration_secs(7 * 24 * 3600)
947    ///     .build()
948    ///     .map_err(|e| ForgeError::Internal(e))?;
949    ///
950    /// let token = ctx.issue_token(&claims)?;
951    /// ```
952    pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
953        let issuer = self.token_issuer.as_ref().ok_or_else(|| {
954            crate::error::ForgeError::Internal(
955                "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
956                    .into(),
957            )
958        })?;
959        issuer.sign(claims)
960    }
961
962    /// Issue an access + refresh token pair for the given user.
963    ///
964    /// Stores the refresh token hash in `forge_refresh_tokens` and returns
965    /// both tokens. Use `rotate_refresh_token()` to exchange a refresh token
966    /// for a new pair, and `revoke_refresh_token()` to invalidate one.
967    ///
968    /// TTLs come from `[auth]` in forge.toml:
969    /// - `access_token_ttl` (default "1h")
970    /// - `refresh_token_ttl` (default "30d")
971    pub async fn issue_token_pair(
972        &self,
973        user_id: Uuid,
974        roles: &[&str],
975    ) -> crate::error::Result<crate::auth::TokenPair> {
976        let issuer = self.token_issuer.clone().ok_or_else(|| {
977            crate::error::ForgeError::Internal(
978                "Token issuer not available. Configure [auth] in forge.toml".into(),
979            )
980        })?;
981        let access_ttl = self.token_ttl.access_token_secs;
982        let refresh_ttl = self.token_ttl.refresh_token_days;
983        crate::auth::tokens::issue_token_pair(
984            &self.db_pool,
985            user_id,
986            roles,
987            access_ttl,
988            refresh_ttl,
989            move |uid, r, ttl| {
990                let claims = Claims::builder()
991                    .subject(uid)
992                    .roles(r.iter().map(|s| s.to_string()).collect())
993                    .duration_secs(ttl)
994                    .build()
995                    .map_err(crate::error::ForgeError::Internal)?;
996                issuer.sign(&claims)
997            },
998        )
999        .await
1000    }
1001
1002    /// Rotate a refresh token: validate the old one, issue a new pair.
1003    ///
1004    /// The old token is atomically deleted and a new access + refresh pair
1005    /// is returned. Fails if the token is invalid or expired.
1006    pub async fn rotate_refresh_token(
1007        &self,
1008        old_refresh_token: &str,
1009    ) -> crate::error::Result<crate::auth::TokenPair> {
1010        let issuer = self.token_issuer.clone().ok_or_else(|| {
1011            crate::error::ForgeError::Internal(
1012                "Token issuer not available. Configure [auth] in forge.toml".into(),
1013            )
1014        })?;
1015        let access_ttl = self.token_ttl.access_token_secs;
1016        let refresh_ttl = self.token_ttl.refresh_token_days;
1017        crate::auth::tokens::rotate_refresh_token(
1018            &self.db_pool,
1019            old_refresh_token,
1020            &["user"],
1021            access_ttl,
1022            refresh_ttl,
1023            move |uid, r, ttl| {
1024                let claims = Claims::builder()
1025                    .subject(uid)
1026                    .roles(r.iter().map(|s| s.to_string()).collect())
1027                    .duration_secs(ttl)
1028                    .build()
1029                    .map_err(crate::error::ForgeError::Internal)?;
1030                issuer.sign(&claims)
1031            },
1032        )
1033        .await
1034    }
1035
1036    /// Revoke a specific refresh token (e.g., on logout).
1037    pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1038        crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1039    }
1040
1041    /// Revoke all refresh tokens for a user (e.g., on password change or account deletion).
1042    pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1043        crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1044    }
1045
1046    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
1047    pub async fn dispatch_job<T: serde::Serialize>(
1048        &self,
1049        job_type: &str,
1050        args: T,
1051    ) -> crate::error::Result<Uuid> {
1052        let args_json = serde_json::to_value(args)?;
1053
1054        // Transactional mode: buffer the job for atomic commit
1055        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1056            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1057                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1058            })?;
1059
1060            let pending = PendingJob {
1061                id: Uuid::new_v4(),
1062                job_type: job_type.to_string(),
1063                args: args_json,
1064                context: serde_json::json!({}),
1065                owner_subject: self.auth.principal_id(),
1066                priority: job_info.priority.as_i32(),
1067                max_attempts: job_info.retry.max_attempts as i32,
1068                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1069            };
1070
1071            let job_id = pending.id;
1072            outbox
1073                .lock()
1074                .expect("outbox lock poisoned")
1075                .jobs
1076                .push(pending);
1077            return Ok(job_id);
1078        }
1079
1080        // Non-transactional mode: dispatch immediately
1081        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1082            crate::error::ForgeError::Internal("Job dispatch not available".into())
1083        })?;
1084        dispatcher
1085            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1086            .await
1087    }
1088
1089    /// Dispatch a job with initial context.
1090    pub async fn dispatch_job_with_context<T: serde::Serialize>(
1091        &self,
1092        job_type: &str,
1093        args: T,
1094        context: serde_json::Value,
1095    ) -> crate::error::Result<Uuid> {
1096        let args_json = serde_json::to_value(args)?;
1097
1098        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1099            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1100                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1101            })?;
1102
1103            let pending = PendingJob {
1104                id: Uuid::new_v4(),
1105                job_type: job_type.to_string(),
1106                args: args_json,
1107                context,
1108                owner_subject: self.auth.principal_id(),
1109                priority: job_info.priority.as_i32(),
1110                max_attempts: job_info.retry.max_attempts as i32,
1111                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1112            };
1113
1114            let job_id = pending.id;
1115            outbox
1116                .lock()
1117                .expect("outbox lock poisoned")
1118                .jobs
1119                .push(pending);
1120            return Ok(job_id);
1121        }
1122
1123        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1124            crate::error::ForgeError::Internal("Job dispatch not available".into())
1125        })?;
1126        dispatcher
1127            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1128            .await
1129    }
1130
1131    /// Request cancellation for a job.
1132    pub async fn cancel_job(
1133        &self,
1134        job_id: Uuid,
1135        reason: Option<String>,
1136    ) -> crate::error::Result<bool> {
1137        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1138            crate::error::ForgeError::Internal("Job dispatch not available".into())
1139        })?;
1140        dispatcher.cancel(job_id, reason).await
1141    }
1142
1143    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
1144    pub async fn start_workflow<T: serde::Serialize>(
1145        &self,
1146        workflow_name: &str,
1147        input: T,
1148    ) -> crate::error::Result<Uuid> {
1149        let input_json = serde_json::to_value(input)?;
1150
1151        // Transactional mode: buffer the workflow for atomic commit
1152        if let Some(outbox) = &self.outbox {
1153            let pending = PendingWorkflow {
1154                id: Uuid::new_v4(),
1155                workflow_name: workflow_name.to_string(),
1156                input: input_json,
1157                owner_subject: self.auth.principal_id(),
1158            };
1159
1160            let workflow_id = pending.id;
1161            outbox
1162                .lock()
1163                .expect("outbox lock poisoned")
1164                .workflows
1165                .push(pending);
1166            return Ok(workflow_id);
1167        }
1168
1169        // Non-transactional mode: start immediately
1170        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1171            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1172        })?;
1173        dispatcher
1174            .start_by_name(workflow_name, input_json, self.auth.principal_id())
1175            .await
1176    }
1177}
1178
1179impl EnvAccess for MutationContext {
1180    fn env_provider(&self) -> &dyn EnvProvider {
1181        self.env_provider.as_ref()
1182    }
1183}
1184
1185#[cfg(test)]
1186#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1187mod tests {
1188    use super::*;
1189
1190    #[test]
1191    fn test_auth_context_unauthenticated() {
1192        let ctx = AuthContext::unauthenticated();
1193        assert!(!ctx.is_authenticated());
1194        assert!(ctx.user_id().is_none());
1195        assert!(ctx.require_user_id().is_err());
1196    }
1197
1198    #[test]
1199    fn test_auth_context_authenticated() {
1200        let user_id = Uuid::new_v4();
1201        let ctx = AuthContext::authenticated(
1202            user_id,
1203            vec!["admin".to_string(), "user".to_string()],
1204            HashMap::new(),
1205        );
1206
1207        assert!(ctx.is_authenticated());
1208        assert_eq!(ctx.user_id(), Some(user_id));
1209        assert!(ctx.require_user_id().is_ok());
1210        assert!(ctx.has_role("admin"));
1211        assert!(ctx.has_role("user"));
1212        assert!(!ctx.has_role("superadmin"));
1213        assert!(ctx.require_role("admin").is_ok());
1214        assert!(ctx.require_role("superadmin").is_err());
1215    }
1216
1217    #[test]
1218    fn test_auth_context_with_claims() {
1219        let mut claims = HashMap::new();
1220        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1221
1222        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1223
1224        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1225        assert!(ctx.claim("nonexistent").is_none());
1226    }
1227
1228    #[test]
1229    fn test_request_metadata() {
1230        let meta = RequestMetadata::new();
1231        assert!(!meta.trace_id.is_empty());
1232        assert!(meta.client_ip.is_none());
1233
1234        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1235        assert_eq!(meta2.trace_id, "trace-123");
1236    }
1237}