Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54/// Token issuer for signing JWTs.
55///
56/// Implemented by the runtime when HMAC auth is configured.
57/// Available via `ctx.issue_token()` in mutation handlers.
58pub trait TokenIssuer: Send + Sync {
59    /// Sign the given claims into a JWT string.
60    fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63/// Connection wrapper that implements sqlx's `Executor` trait with automatic
64/// `db.query` tracing spans.
65///
66/// Obtain via `ctx.conn().await?` in mutation handlers.
67/// Works with compile-time checked macros via `&mut conn`.
68///
69/// ```ignore
70/// let mut conn = ctx.conn().await?;
71/// sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
72///     .fetch_one(&mut *conn)
73///     .await?
74/// ```
75pub enum ForgeConn<'a> {
76    Pool(sqlx::pool::PoolConnection<Postgres>),
77    Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81    type Target = PgConnection;
82    fn deref(&self) -> &PgConnection {
83        match self {
84            ForgeConn::Pool(c) => c,
85            ForgeConn::Tx(g) => g,
86        }
87    }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91    fn deref_mut(&mut self) -> &mut PgConnection {
92        match self {
93            ForgeConn::Pool(c) => c,
94            ForgeConn::Tx(g) => g,
95        }
96    }
97}
98
99/// Pool wrapper that adds `db.query` tracing spans to every database operation.
100///
101/// Returned by [`QueryContext::db()`]. Implements sqlx's [`sqlx::Executor`] trait,
102/// so it works as a drop-in replacement for `&PgPool` with compile-time
103/// checked macros (`query!`, `query_as!`).
104///
105/// ```ignore
106/// sqlx::query_as!(User, "SELECT * FROM users")
107///     .fetch_all(ctx.db())
108///     .await?
109/// ```
110#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_tuple("ForgeDb").finish()
116    }
117}
118
119impl ForgeDb {
120    /// Create a `ForgeDb` from a pool reference. Clones the Arc-backed pool handle.
121    pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122        Self(pool.clone())
123    }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127    let bytes = sql.trim_start().as_bytes();
128    match bytes.get(..6) {
129        Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130        Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131        Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132        Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133        _ => "OTHER",
134    }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138    type Database = Postgres;
139
140    fn fetch_many<'e, 'q: 'e, E>(
141        self,
142        query: E,
143    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144    where
145        E: sqlx::Execute<'q, Postgres> + 'q,
146    {
147        (&self.0).fetch_many(query)
148    }
149
150    fn fetch_optional<'e, 'q: 'e, E>(
151        self,
152        query: E,
153    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154    where
155        E: sqlx::Execute<'q, Postgres> + 'q,
156    {
157        let op = sql_operation(query.sql());
158        let span =
159            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160        Box::pin(
161            async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162        )
163    }
164
165    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166    where
167        E: sqlx::Execute<'q, Postgres> + 'q,
168    {
169        let op = sql_operation(query.sql());
170        let span =
171            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172        Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173    }
174
175    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176    where
177        E: sqlx::Execute<'q, Postgres> + 'q,
178    {
179        let op = sql_operation(query.sql());
180        let span =
181            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182        Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183    }
184
185    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186    where
187        E: sqlx::Execute<'q, Postgres> + 'q,
188    {
189        let op = sql_operation(query.sql());
190        let span =
191            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192        Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193    }
194
195    fn prepare_with<'e, 'q: 'e>(
196        self,
197        sql: &'q str,
198        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200        Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201    }
202
203    fn describe<'e, 'q: 'e>(
204        self,
205        sql: &'q str,
206    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207        Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208    }
209}
210
211impl std::fmt::Debug for ForgeConn<'_> {
212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213        match self {
214            ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
215            ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
216        }
217    }
218}
219
220impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
221    type Database = Postgres;
222
223    fn fetch_many<'e, 'q: 'e, E>(
224        self,
225        query: E,
226    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
227    where
228        'c: 'e,
229        E: sqlx::Execute<'q, Postgres> + 'q,
230    {
231        let conn: &'e mut PgConnection = &mut *self;
232        conn.fetch_many(query)
233    }
234
235    fn fetch_optional<'e, 'q: 'e, E>(
236        self,
237        query: E,
238    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
239    where
240        'c: 'e,
241        E: sqlx::Execute<'q, Postgres> + 'q,
242    {
243        let op = sql_operation(query.sql());
244        let span =
245            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
246        let conn: &'e mut PgConnection = &mut *self;
247        Box::pin(conn.fetch_optional(query).instrument(span))
248    }
249
250    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
251    where
252        'c: 'e,
253        E: sqlx::Execute<'q, Postgres> + 'q,
254    {
255        let op = sql_operation(query.sql());
256        let span =
257            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
258        let conn: &'e mut PgConnection = &mut *self;
259        Box::pin(conn.execute(query).instrument(span))
260    }
261
262    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
263    where
264        'c: 'e,
265        E: sqlx::Execute<'q, Postgres> + 'q,
266    {
267        let op = sql_operation(query.sql());
268        let span =
269            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
270        let conn: &'e mut PgConnection = &mut *self;
271        Box::pin(conn.fetch_all(query).instrument(span))
272    }
273
274    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
275    where
276        'c: 'e,
277        E: sqlx::Execute<'q, Postgres> + 'q,
278    {
279        let op = sql_operation(query.sql());
280        let span =
281            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
282        let conn: &'e mut PgConnection = &mut *self;
283        Box::pin(conn.fetch_one(query).instrument(span))
284    }
285
286    fn prepare_with<'e, 'q: 'e>(
287        self,
288        sql: &'q str,
289        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
290    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
291    where
292        'c: 'e,
293    {
294        let conn: &'e mut PgConnection = &mut *self;
295        conn.prepare_with(sql, parameters)
296    }
297
298    fn describe<'e, 'q: 'e>(
299        self,
300        sql: &'q str,
301    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
302    where
303        'c: 'e,
304    {
305        let conn: &'e mut PgConnection = &mut *self;
306        conn.describe(sql)
307    }
308}
309
310#[derive(Debug, Clone)]
311pub struct PendingJob {
312    pub id: Uuid,
313    pub job_type: String,
314    pub args: serde_json::Value,
315    pub context: serde_json::Value,
316    pub owner_subject: Option<String>,
317    pub priority: i32,
318    pub max_attempts: i32,
319    pub worker_capability: Option<String>,
320}
321
322#[derive(Debug, Clone)]
323pub struct PendingWorkflow {
324    pub id: Uuid,
325    pub workflow_name: String,
326    pub input: serde_json::Value,
327    pub owner_subject: Option<String>,
328}
329
330#[derive(Default)]
331pub struct OutboxBuffer {
332    pub jobs: Vec<PendingJob>,
333    pub workflows: Vec<PendingWorkflow>,
334}
335
336/// Authentication context available to all functions.
337#[derive(Debug, Clone)]
338pub struct AuthContext {
339    /// The authenticated user ID (if any).
340    user_id: Option<Uuid>,
341    /// User roles.
342    roles: Vec<String>,
343    /// Custom claims from JWT.
344    claims: HashMap<String, serde_json::Value>,
345    /// Whether the request is authenticated.
346    authenticated: bool,
347}
348
349impl AuthContext {
350    /// Create an unauthenticated context.
351    pub fn unauthenticated() -> Self {
352        Self {
353            user_id: None,
354            roles: Vec::new(),
355            claims: HashMap::new(),
356            authenticated: false,
357        }
358    }
359
360    /// Create an authenticated context with a UUID user ID.
361    pub fn authenticated(
362        user_id: Uuid,
363        roles: Vec<String>,
364        claims: HashMap<String, serde_json::Value>,
365    ) -> Self {
366        Self {
367            user_id: Some(user_id),
368            roles,
369            claims,
370            authenticated: true,
371        }
372    }
373
374    /// Create an authenticated context without requiring a UUID user ID.
375    ///
376    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
377    /// Clerk). The raw subject string is available via `subject()` method
378    /// from the "sub" claim.
379    pub fn authenticated_without_uuid(
380        roles: Vec<String>,
381        claims: HashMap<String, serde_json::Value>,
382    ) -> Self {
383        Self {
384            user_id: None,
385            roles,
386            claims,
387            authenticated: true,
388        }
389    }
390
391    /// Check if the user is authenticated.
392    pub fn is_authenticated(&self) -> bool {
393        self.authenticated
394    }
395
396    /// Get the user ID if authenticated.
397    pub fn user_id(&self) -> Option<Uuid> {
398        self.user_id
399    }
400
401    /// Get the user ID, returning an error if not authenticated.
402    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
403        self.user_id
404            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
405    }
406
407    /// Check if the user has a specific role.
408    pub fn has_role(&self, role: &str) -> bool {
409        self.roles.iter().any(|r| r == role)
410    }
411
412    /// Require a specific role, returning an error if not present.
413    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
414        if self.has_role(role) {
415            Ok(())
416        } else {
417            Err(crate::error::ForgeError::Forbidden(format!(
418                "Required role '{}' not present",
419                role
420            )))
421        }
422    }
423
424    /// Get a custom claim value.
425    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
426        self.claims.get(key)
427    }
428
429    /// Get all custom claims.
430    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
431        &self.claims
432    }
433
434    /// Get all roles.
435    pub fn roles(&self) -> &[String] {
436        &self.roles
437    }
438
439    /// Get the raw subject claim.
440    ///
441    /// This works with any provider's subject format (UUID, email, custom ID).
442    /// For providers like Firebase or Clerk that don't use UUIDs, use this
443    /// instead of `user_id()`.
444    pub fn subject(&self) -> Option<&str> {
445        self.claims.get("sub").and_then(|v| v.as_str())
446    }
447
448    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
449    pub fn require_subject(&self) -> crate::error::Result<&str> {
450        if !self.authenticated {
451            return Err(crate::error::ForgeError::Unauthorized(
452                "Authentication required".to_string(),
453            ));
454        }
455        self.subject().ok_or_else(|| {
456            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
457        })
458    }
459
460    /// Get a stable principal identifier for access control and cache scoping.
461    ///
462    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
463    pub fn principal_id(&self) -> Option<String> {
464        self.subject()
465            .map(ToString::to_string)
466            .or_else(|| self.user_id.map(|id| id.to_string()))
467    }
468
469    /// Check whether this principal should be treated as privileged admin.
470    pub fn is_admin(&self) -> bool {
471        self.roles.iter().any(|r| r == "admin")
472    }
473
474    /// Get the tenant ID from the JWT claims, if present.
475    ///
476    /// Looks for a `tenant_id` claim in the token and attempts to parse it as
477    /// a UUID. Returns `None` if the claim is absent or not a valid UUID.
478    pub fn tenant_id(&self) -> Option<uuid::Uuid> {
479        self.claims
480            .get("tenant_id")
481            .and_then(|v| v.as_str())
482            .and_then(|s| uuid::Uuid::parse_str(s).ok())
483    }
484
485    /// Validate that identity/tenant-scoped arguments in a function call match
486    /// the authenticated principal.
487    ///
488    /// When `enforce_scope` is true (private functions), at least one scope key
489    /// must be present and match. When false, existing scope keys are still
490    /// validated but their absence is tolerated.
491    pub fn check_identity_args(
492        &self,
493        function_name: &str,
494        args: &serde_json::Value,
495        enforce_scope: bool,
496    ) -> crate::error::Result<()> {
497        use crate::error::ForgeError;
498
499        if self.is_admin() {
500            return Ok(());
501        }
502
503        if !self.is_authenticated() && !enforce_scope {
504            return Ok(());
505        }
506
507        let Some(obj) = args.as_object() else {
508            if enforce_scope && self.is_authenticated() {
509                return Err(ForgeError::Forbidden(format!(
510                    "Function '{function_name}' must include identity or tenant scope arguments"
511                )));
512            }
513            return Ok(());
514        };
515
516        let mut principal_values: Vec<String> = Vec::new();
517        if let Some(user_id) = self.user_id().map(|id| id.to_string()) {
518            principal_values.push(user_id);
519        }
520        if let Some(subject) = self.principal_id()
521            && !principal_values.iter().any(|v| v == &subject)
522        {
523            principal_values.push(subject);
524        }
525
526        let mut has_scope_key = false;
527
528        for key in [
529            "user_id",
530            "userId",
531            "owner_id",
532            "ownerId",
533            "owner_subject",
534            "ownerSubject",
535            "subject",
536            "sub",
537            "principal_id",
538            "principalId",
539        ] {
540            let Some(value) = obj.get(key) else {
541                continue;
542            };
543            has_scope_key = true;
544
545            if !self.is_authenticated() {
546                return Err(ForgeError::Unauthorized(format!(
547                    "Function '{function_name}' requires authentication for identity-scoped argument '{key}'"
548                )));
549            }
550
551            let serde_json::Value::String(actual) = value else {
552                return Err(ForgeError::InvalidArgument(format!(
553                    "Function '{function_name}' argument '{key}' must be a non-empty string"
554                )));
555            };
556
557            if actual.trim().is_empty() || !principal_values.iter().any(|v| v == actual) {
558                return Err(ForgeError::Forbidden(format!(
559                    "Function '{function_name}' argument '{key}' does not match authenticated principal"
560                )));
561            }
562        }
563
564        for key in ["tenant_id", "tenantId"] {
565            let Some(value) = obj.get(key) else {
566                continue;
567            };
568            has_scope_key = true;
569
570            if !self.is_authenticated() {
571                return Err(ForgeError::Unauthorized(format!(
572                    "Function '{function_name}' requires authentication for tenant-scoped argument '{key}'"
573                )));
574            }
575
576            let expected = self
577                .claim("tenant_id")
578                .and_then(|v| v.as_str())
579                .ok_or_else(|| {
580                    ForgeError::Forbidden(format!(
581                        "Function '{function_name}' argument '{key}' is not allowed for this principal"
582                    ))
583                })?;
584
585            let serde_json::Value::String(actual) = value else {
586                return Err(ForgeError::InvalidArgument(format!(
587                    "Function '{function_name}' argument '{key}' must be a non-empty string"
588                )));
589            };
590
591            if actual.trim().is_empty() || actual != expected {
592                return Err(ForgeError::Forbidden(format!(
593                    "Function '{function_name}' argument '{key}' does not match authenticated tenant"
594                )));
595            }
596        }
597
598        if enforce_scope && self.is_authenticated() && !has_scope_key {
599            return Err(ForgeError::Forbidden(format!(
600                "Function '{function_name}' must include identity or tenant scope arguments"
601            )));
602        }
603
604        Ok(())
605    }
606}
607
608/// Request metadata available to all functions.
609#[derive(Debug, Clone)]
610pub struct RequestMetadata {
611    /// Unique request ID for tracing.
612    pub request_id: Uuid,
613    /// Trace ID for distributed tracing.
614    pub trace_id: String,
615    /// Client IP address.
616    pub client_ip: Option<String>,
617    /// User agent string.
618    pub user_agent: Option<String>,
619    /// Correlation ID linking frontend events to this backend call.
620    pub correlation_id: Option<String>,
621    /// Request timestamp.
622    pub timestamp: chrono::DateTime<chrono::Utc>,
623}
624
625impl RequestMetadata {
626    /// Create new request metadata.
627    pub fn new() -> Self {
628        Self {
629            request_id: Uuid::new_v4(),
630            trace_id: Uuid::new_v4().to_string(),
631            client_ip: None,
632            user_agent: None,
633            correlation_id: None,
634            timestamp: chrono::Utc::now(),
635        }
636    }
637
638    /// Create with a specific trace ID.
639    pub fn with_trace_id(trace_id: String) -> Self {
640        Self {
641            request_id: Uuid::new_v4(),
642            trace_id,
643            client_ip: None,
644            user_agent: None,
645            correlation_id: None,
646            timestamp: chrono::Utc::now(),
647        }
648    }
649}
650
651impl Default for RequestMetadata {
652    fn default() -> Self {
653        Self::new()
654    }
655}
656
657/// Context for query functions (read-only database access).
658pub struct QueryContext {
659    /// Authentication context.
660    pub auth: AuthContext,
661    /// Request metadata.
662    pub request: RequestMetadata,
663    /// Database pool for read operations.
664    db_pool: sqlx::PgPool,
665    /// Environment variable provider.
666    env_provider: Arc<dyn EnvProvider>,
667}
668
669impl QueryContext {
670    /// Create a new query context.
671    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
672        Self {
673            auth,
674            request,
675            db_pool,
676            env_provider: Arc::new(RealEnvProvider::new()),
677        }
678    }
679
680    /// Create a query context with a custom environment provider.
681    pub fn with_env(
682        db_pool: sqlx::PgPool,
683        auth: AuthContext,
684        request: RequestMetadata,
685        env_provider: Arc<dyn EnvProvider>,
686    ) -> Self {
687        Self {
688            auth,
689            request,
690            db_pool,
691            env_provider,
692        }
693    }
694
695    /// Database handle with automatic `db.query` tracing spans.
696    ///
697    /// Works directly with sqlx compile-time checked macros:
698    /// ```ignore
699    /// sqlx::query_as!(User, "SELECT * FROM users")
700    ///     .fetch_all(ctx.db())
701    ///     .await?
702    /// ```
703    pub fn db(&self) -> ForgeDb {
704        ForgeDb(self.db_pool.clone())
705    }
706
707    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
708        self.auth.require_user_id()
709    }
710
711    /// Like `require_user_id()` but for non-UUID auth providers.
712    pub fn require_subject(&self) -> crate::error::Result<&str> {
713        self.auth.require_subject()
714    }
715}
716
717impl EnvAccess for QueryContext {
718    fn env_provider(&self) -> &dyn EnvProvider {
719        self.env_provider.as_ref()
720    }
721}
722
723/// Callback type for looking up job info by name.
724pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
725
726/// Token TTL configuration resolved from `[auth]` in forge.toml.
727#[derive(Debug, Clone)]
728pub struct AuthTokenTtl {
729    /// Access token lifetime in seconds (default 3600).
730    pub access_token_secs: i64,
731    /// Refresh token lifetime in days (default 30).
732    pub refresh_token_days: i64,
733}
734
735impl Default for AuthTokenTtl {
736    fn default() -> Self {
737        Self {
738            access_token_secs: 3600,
739            refresh_token_days: 30,
740        }
741    }
742}
743
744/// Context for mutation functions (transactional database access).
745pub struct MutationContext {
746    /// Authentication context.
747    pub auth: AuthContext,
748    /// Request metadata.
749    pub request: RequestMetadata,
750    /// Database pool for transactional operations.
751    db_pool: sqlx::PgPool,
752    /// HTTP client with circuit breaker for external requests.
753    http_client: CircuitBreakerClient,
754    /// Default timeout for outbound HTTP requests made through the
755    /// circuit-breaker client. `None` means unlimited.
756    http_timeout: Option<Duration>,
757    /// Optional job dispatcher for dispatching background jobs.
758    job_dispatch: Option<Arc<dyn JobDispatch>>,
759    /// Optional workflow dispatcher for starting workflows.
760    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
761    /// Environment variable provider.
762    env_provider: Arc<dyn EnvProvider>,
763    /// Transaction handle for transactional mutations.
764    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
765    /// Outbox buffer for jobs/workflows dispatched during transaction.
766    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
767    /// Job info lookup for transactional dispatch.
768    job_info_lookup: Option<JobInfoLookup>,
769    /// Optional token issuer for signing JWTs (available when HMAC auth is configured).
770    token_issuer: Option<Arc<dyn TokenIssuer>>,
771    /// Token TTL config from forge.toml.
772    token_ttl: AuthTokenTtl,
773}
774
775impl MutationContext {
776    /// Create a new mutation context.
777    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
778        Self {
779            auth,
780            request,
781            db_pool,
782            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
783            http_timeout: None,
784            job_dispatch: None,
785            workflow_dispatch: None,
786            env_provider: Arc::new(RealEnvProvider::new()),
787            tx: None,
788            outbox: None,
789            job_info_lookup: None,
790            token_issuer: None,
791            token_ttl: AuthTokenTtl::default(),
792        }
793    }
794
795    /// Create a mutation context with dispatch capabilities.
796    pub fn with_dispatch(
797        db_pool: sqlx::PgPool,
798        auth: AuthContext,
799        request: RequestMetadata,
800        http_client: CircuitBreakerClient,
801        job_dispatch: Option<Arc<dyn JobDispatch>>,
802        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
803    ) -> Self {
804        Self {
805            auth,
806            request,
807            db_pool,
808            http_client,
809            http_timeout: None,
810            job_dispatch,
811            workflow_dispatch,
812            env_provider: Arc::new(RealEnvProvider::new()),
813            tx: None,
814            outbox: None,
815            job_info_lookup: None,
816            token_issuer: None,
817            token_ttl: AuthTokenTtl::default(),
818        }
819    }
820
821    /// Create a mutation context with a custom environment provider.
822    pub fn with_env(
823        db_pool: sqlx::PgPool,
824        auth: AuthContext,
825        request: RequestMetadata,
826        http_client: CircuitBreakerClient,
827        job_dispatch: Option<Arc<dyn JobDispatch>>,
828        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
829        env_provider: Arc<dyn EnvProvider>,
830    ) -> Self {
831        Self {
832            auth,
833            request,
834            db_pool,
835            http_client,
836            http_timeout: None,
837            job_dispatch,
838            workflow_dispatch,
839            env_provider,
840            tx: None,
841            outbox: None,
842            job_info_lookup: None,
843            token_issuer: None,
844            token_ttl: AuthTokenTtl::default(),
845        }
846    }
847
848    /// Returns handles to transaction and outbox for the caller to commit/flush.
849    #[allow(clippy::type_complexity)]
850    pub fn with_transaction(
851        db_pool: sqlx::PgPool,
852        tx: Transaction<'static, Postgres>,
853        auth: AuthContext,
854        request: RequestMetadata,
855        http_client: CircuitBreakerClient,
856        job_info_lookup: JobInfoLookup,
857    ) -> (
858        Self,
859        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
860        Arc<Mutex<OutboxBuffer>>,
861    ) {
862        let tx_handle = Arc::new(AsyncMutex::new(tx));
863        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
864
865        let ctx = Self {
866            auth,
867            request,
868            db_pool,
869            http_client,
870            http_timeout: None,
871            job_dispatch: None,
872            workflow_dispatch: None,
873            env_provider: Arc::new(RealEnvProvider::new()),
874            tx: Some(tx_handle.clone()),
875            outbox: Some(outbox.clone()),
876            job_info_lookup: Some(job_info_lookup),
877            token_issuer: None,
878            token_ttl: AuthTokenTtl::default(),
879        };
880
881        (ctx, tx_handle, outbox)
882    }
883
884    pub fn is_transactional(&self) -> bool {
885        self.tx.is_some()
886    }
887
888    /// Acquire a connection compatible with sqlx compile-time checked macros.
889    ///
890    /// In transactional mode, returns a guard over the active transaction.
891    /// Otherwise acquires a fresh connection from the pool.
892    ///
893    /// ```ignore
894    /// let mut conn = ctx.conn().await?;
895    /// sqlx::query_as!(User, "INSERT INTO users ... RETURNING *", ...)
896    ///     .fetch_one(&mut *conn)
897    ///     .await?
898    /// ```
899    pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
900        match &self.tx {
901            Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
902            None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
903        }
904    }
905
906    /// Direct pool access for operations that cannot run inside a transaction.
907    pub fn pool(&self) -> &sqlx::PgPool {
908        &self.db_pool
909    }
910
911    /// Get the HTTP client for external requests.
912    ///
913    /// Requests go through the circuit breaker automatically. When the handler
914    /// declared an explicit `timeout`, that timeout is also applied to outbound
915    /// HTTP requests unless the request overrides it.
916    pub fn http(&self) -> crate::http::HttpClient {
917        self.http_client.with_timeout(self.http_timeout)
918    }
919
920    /// Get the raw reqwest client, bypassing circuit breaker execution.
921    pub fn raw_http(&self) -> &reqwest::Client {
922        self.http_client.inner()
923    }
924
925    /// Set the default outbound HTTP request timeout for this context.
926    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
927        self.http_timeout = timeout;
928    }
929
930    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
931        self.auth.require_user_id()
932    }
933
934    pub fn require_subject(&self) -> crate::error::Result<&str> {
935        self.auth.require_subject()
936    }
937
938    /// Set the token issuer for this context.
939    pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
940        self.token_issuer = Some(issuer);
941    }
942
943    /// Set the token TTL configuration (from forge.toml `[auth]`).
944    pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
945        self.token_ttl = ttl;
946    }
947
948    /// Issue a signed JWT from the given claims.
949    ///
950    /// Only available when HMAC auth is configured in `forge.toml`.
951    /// Returns an error if auth is not configured or uses an external provider (RSA/JWKS).
952    ///
953    /// ```ignore
954    /// let claims = Claims::builder()
955    ///     .user_id(user.id)
956    ///     .duration_secs(7 * 24 * 3600)
957    ///     .build()
958    ///     .map_err(|e| ForgeError::Internal(e))?;
959    ///
960    /// let token = ctx.issue_token(&claims)?;
961    /// ```
962    pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
963        let issuer = self.token_issuer.as_ref().ok_or_else(|| {
964            crate::error::ForgeError::Internal(
965                "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
966                    .into(),
967            )
968        })?;
969        issuer.sign(claims)
970    }
971
972    /// Issue an access + refresh token pair for the given user.
973    ///
974    /// Stores the refresh token hash in `forge_refresh_tokens` and returns
975    /// both tokens. Use `rotate_refresh_token()` to exchange a refresh token
976    /// for a new pair, and `revoke_refresh_token()` to invalidate one.
977    ///
978    /// TTLs come from `[auth]` in forge.toml:
979    /// - `access_token_ttl` (default "1h")
980    /// - `refresh_token_ttl` (default "30d")
981    pub async fn issue_token_pair(
982        &self,
983        user_id: Uuid,
984        roles: &[&str],
985    ) -> crate::error::Result<crate::auth::TokenPair> {
986        let issuer = self.token_issuer.clone().ok_or_else(|| {
987            crate::error::ForgeError::Internal(
988                "Token issuer not available. Configure [auth] in forge.toml".into(),
989            )
990        })?;
991        let access_ttl = self.token_ttl.access_token_secs;
992        let refresh_ttl = self.token_ttl.refresh_token_days;
993        crate::auth::tokens::issue_token_pair(
994            &self.db_pool,
995            user_id,
996            roles,
997            access_ttl,
998            refresh_ttl,
999            move |uid, r, ttl| {
1000                let claims = Claims::builder()
1001                    .subject(uid)
1002                    .roles(r.iter().map(|s| s.to_string()).collect())
1003                    .duration_secs(ttl)
1004                    .build()
1005                    .map_err(crate::error::ForgeError::Internal)?;
1006                issuer.sign(&claims)
1007            },
1008        )
1009        .await
1010    }
1011
1012    /// Rotate a refresh token: validate the old one, issue a new pair.
1013    ///
1014    /// The old token is atomically deleted and a new access + refresh pair
1015    /// is returned. Fails if the token is invalid or expired.
1016    pub async fn rotate_refresh_token(
1017        &self,
1018        old_refresh_token: &str,
1019    ) -> crate::error::Result<crate::auth::TokenPair> {
1020        let issuer = self.token_issuer.clone().ok_or_else(|| {
1021            crate::error::ForgeError::Internal(
1022                "Token issuer not available. Configure [auth] in forge.toml".into(),
1023            )
1024        })?;
1025        let access_ttl = self.token_ttl.access_token_secs;
1026        let refresh_ttl = self.token_ttl.refresh_token_days;
1027        crate::auth::tokens::rotate_refresh_token(
1028            &self.db_pool,
1029            old_refresh_token,
1030            &["user"],
1031            access_ttl,
1032            refresh_ttl,
1033            move |uid, r, ttl| {
1034                let claims = Claims::builder()
1035                    .subject(uid)
1036                    .roles(r.iter().map(|s| s.to_string()).collect())
1037                    .duration_secs(ttl)
1038                    .build()
1039                    .map_err(crate::error::ForgeError::Internal)?;
1040                issuer.sign(&claims)
1041            },
1042        )
1043        .await
1044    }
1045
1046    /// Revoke a specific refresh token (e.g., on logout).
1047    pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1048        crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1049    }
1050
1051    /// Revoke all refresh tokens for a user (e.g., on password change or account deletion).
1052    pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1053        crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1054    }
1055
1056    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
1057    pub async fn dispatch_job<T: serde::Serialize>(
1058        &self,
1059        job_type: &str,
1060        args: T,
1061    ) -> crate::error::Result<Uuid> {
1062        let args_json = serde_json::to_value(args)?;
1063
1064        // Transactional mode: buffer the job for atomic commit
1065        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1066            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1067                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1068            })?;
1069
1070            let pending = PendingJob {
1071                id: Uuid::new_v4(),
1072                job_type: job_type.to_string(),
1073                args: args_json,
1074                context: serde_json::json!({}),
1075                owner_subject: self.auth.principal_id(),
1076                priority: job_info.priority.as_i32(),
1077                max_attempts: job_info.retry.max_attempts as i32,
1078                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1079            };
1080
1081            let job_id = pending.id;
1082            outbox
1083                .lock()
1084                .expect("outbox lock poisoned")
1085                .jobs
1086                .push(pending);
1087            return Ok(job_id);
1088        }
1089
1090        // Non-transactional mode: dispatch immediately
1091        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1092            crate::error::ForgeError::Internal("Job dispatch not available".into())
1093        })?;
1094        dispatcher
1095            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1096            .await
1097    }
1098
1099    /// Dispatch a job with initial context.
1100    pub async fn dispatch_job_with_context<T: serde::Serialize>(
1101        &self,
1102        job_type: &str,
1103        args: T,
1104        context: serde_json::Value,
1105    ) -> crate::error::Result<Uuid> {
1106        let args_json = serde_json::to_value(args)?;
1107
1108        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1109            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1110                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1111            })?;
1112
1113            let pending = PendingJob {
1114                id: Uuid::new_v4(),
1115                job_type: job_type.to_string(),
1116                args: args_json,
1117                context,
1118                owner_subject: self.auth.principal_id(),
1119                priority: job_info.priority.as_i32(),
1120                max_attempts: job_info.retry.max_attempts as i32,
1121                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1122            };
1123
1124            let job_id = pending.id;
1125            outbox
1126                .lock()
1127                .expect("outbox lock poisoned")
1128                .jobs
1129                .push(pending);
1130            return Ok(job_id);
1131        }
1132
1133        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1134            crate::error::ForgeError::Internal("Job dispatch not available".into())
1135        })?;
1136        dispatcher
1137            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1138            .await
1139    }
1140
1141    /// Request cancellation for a job.
1142    pub async fn cancel_job(
1143        &self,
1144        job_id: Uuid,
1145        reason: Option<String>,
1146    ) -> crate::error::Result<bool> {
1147        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1148            crate::error::ForgeError::Internal("Job dispatch not available".into())
1149        })?;
1150        dispatcher.cancel(job_id, reason).await
1151    }
1152
1153    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
1154    pub async fn start_workflow<T: serde::Serialize>(
1155        &self,
1156        workflow_name: &str,
1157        input: T,
1158    ) -> crate::error::Result<Uuid> {
1159        let input_json = serde_json::to_value(input)?;
1160
1161        // Transactional mode: buffer the workflow for atomic commit
1162        if let Some(outbox) = &self.outbox {
1163            let pending = PendingWorkflow {
1164                id: Uuid::new_v4(),
1165                workflow_name: workflow_name.to_string(),
1166                input: input_json,
1167                owner_subject: self.auth.principal_id(),
1168            };
1169
1170            let workflow_id = pending.id;
1171            outbox
1172                .lock()
1173                .expect("outbox lock poisoned")
1174                .workflows
1175                .push(pending);
1176            return Ok(workflow_id);
1177        }
1178
1179        // Non-transactional mode: start immediately
1180        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1181            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1182        })?;
1183        dispatcher
1184            .start_by_name(workflow_name, input_json, self.auth.principal_id())
1185            .await
1186    }
1187}
1188
1189impl EnvAccess for MutationContext {
1190    fn env_provider(&self) -> &dyn EnvProvider {
1191        self.env_provider.as_ref()
1192    }
1193}
1194
1195#[cfg(test)]
1196#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1197mod tests {
1198    use super::*;
1199
1200    #[test]
1201    fn test_auth_context_unauthenticated() {
1202        let ctx = AuthContext::unauthenticated();
1203        assert!(!ctx.is_authenticated());
1204        assert!(ctx.user_id().is_none());
1205        assert!(ctx.require_user_id().is_err());
1206    }
1207
1208    #[test]
1209    fn test_auth_context_authenticated() {
1210        let user_id = Uuid::new_v4();
1211        let ctx = AuthContext::authenticated(
1212            user_id,
1213            vec!["admin".to_string(), "user".to_string()],
1214            HashMap::new(),
1215        );
1216
1217        assert!(ctx.is_authenticated());
1218        assert_eq!(ctx.user_id(), Some(user_id));
1219        assert!(ctx.require_user_id().is_ok());
1220        assert!(ctx.has_role("admin"));
1221        assert!(ctx.has_role("user"));
1222        assert!(!ctx.has_role("superadmin"));
1223        assert!(ctx.require_role("admin").is_ok());
1224        assert!(ctx.require_role("superadmin").is_err());
1225    }
1226
1227    #[test]
1228    fn test_auth_context_with_claims() {
1229        let mut claims = HashMap::new();
1230        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1231
1232        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1233
1234        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1235        assert!(ctx.claim("nonexistent").is_none());
1236    }
1237
1238    #[test]
1239    fn test_request_metadata() {
1240        let meta = RequestMetadata::new();
1241        assert!(!meta.trace_id.is_empty());
1242        assert!(meta.client_ip.is_none());
1243
1244        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1245        assert_eq!(meta2.trace_id, "trace-123");
1246    }
1247}