Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54/// Token issuer for signing JWTs.
55///
56/// Implemented by the runtime when HMAC auth is configured.
57/// Available via `ctx.issue_token()` in mutation handlers.
58pub trait TokenIssuer: Send + Sync {
59    /// Sign the given claims into a JWT string.
60    fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63/// Connection wrapper that implements sqlx's `Executor` trait with automatic
64/// `db.query` tracing spans.
65///
66/// Obtain via `ctx.conn().await?` in mutation handlers.
67/// Works with compile-time checked macros via `&mut conn`.
68///
69/// ```ignore
70/// let mut conn = ctx.conn().await?;
71/// sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
72///     .fetch_one(&mut *conn)
73///     .await?
74/// ```
75pub enum ForgeConn<'a> {
76    Pool(sqlx::pool::PoolConnection<Postgres>),
77    Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81    type Target = PgConnection;
82    fn deref(&self) -> &PgConnection {
83        match self {
84            ForgeConn::Pool(c) => c,
85            ForgeConn::Tx(g) => g,
86        }
87    }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91    fn deref_mut(&mut self) -> &mut PgConnection {
92        match self {
93            ForgeConn::Pool(c) => c,
94            ForgeConn::Tx(g) => g,
95        }
96    }
97}
98
99/// Pool wrapper that adds `db.query` tracing spans to every database operation.
100///
101/// Returned by [`QueryContext::db()`]. Implements sqlx's [`sqlx::Executor`] trait,
102/// so it works as a drop-in replacement for `&PgPool` with compile-time
103/// checked macros (`query!`, `query_as!`).
104///
105/// ```ignore
106/// sqlx::query_as!(User, "SELECT * FROM users")
107///     .fetch_all(ctx.db())
108///     .await?
109/// ```
110#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_tuple("ForgeDb").finish()
116    }
117}
118
119impl ForgeDb {
120    /// Create a `ForgeDb` from a pool reference. Clones the Arc-backed pool handle.
121    pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122        Self(pool.clone())
123    }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127    let bytes = sql.trim_start().as_bytes();
128    match bytes.get(..6) {
129        Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130        Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131        Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132        Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133        _ => "OTHER",
134    }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138    type Database = Postgres;
139
140    fn fetch_many<'e, 'q: 'e, E>(
141        self,
142        query: E,
143    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144    where
145        E: sqlx::Execute<'q, Postgres> + 'q,
146    {
147        (&self.0).fetch_many(query)
148    }
149
150    fn fetch_optional<'e, 'q: 'e, E>(
151        self,
152        query: E,
153    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154    where
155        E: sqlx::Execute<'q, Postgres> + 'q,
156    {
157        let op = sql_operation(query.sql());
158        let span =
159            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160        Box::pin(
161            async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162        )
163    }
164
165    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166    where
167        E: sqlx::Execute<'q, Postgres> + 'q,
168    {
169        let op = sql_operation(query.sql());
170        let span =
171            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172        Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173    }
174
175    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176    where
177        E: sqlx::Execute<'q, Postgres> + 'q,
178    {
179        let op = sql_operation(query.sql());
180        let span =
181            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182        Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183    }
184
185    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186    where
187        E: sqlx::Execute<'q, Postgres> + 'q,
188    {
189        let op = sql_operation(query.sql());
190        let span =
191            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192        Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193    }
194
195    fn prepare_with<'e, 'q: 'e>(
196        self,
197        sql: &'q str,
198        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200        Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201    }
202
203    fn describe<'e, 'q: 'e>(
204        self,
205        sql: &'q str,
206    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207        Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208    }
209}
210
211impl std::fmt::Debug for ForgeConn<'_> {
212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213        match self {
214            ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
215            ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
216        }
217    }
218}
219
220impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
221    type Database = Postgres;
222
223    fn fetch_many<'e, 'q: 'e, E>(
224        self,
225        query: E,
226    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
227    where
228        'c: 'e,
229        E: sqlx::Execute<'q, Postgres> + 'q,
230    {
231        let conn: &'e mut PgConnection = &mut *self;
232        conn.fetch_many(query)
233    }
234
235    fn fetch_optional<'e, 'q: 'e, E>(
236        self,
237        query: E,
238    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
239    where
240        'c: 'e,
241        E: sqlx::Execute<'q, Postgres> + 'q,
242    {
243        let op = sql_operation(query.sql());
244        let span =
245            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
246        let conn: &'e mut PgConnection = &mut *self;
247        Box::pin(conn.fetch_optional(query).instrument(span))
248    }
249
250    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
251    where
252        'c: 'e,
253        E: sqlx::Execute<'q, Postgres> + 'q,
254    {
255        let op = sql_operation(query.sql());
256        let span =
257            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
258        let conn: &'e mut PgConnection = &mut *self;
259        Box::pin(conn.execute(query).instrument(span))
260    }
261
262    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
263    where
264        'c: 'e,
265        E: sqlx::Execute<'q, Postgres> + 'q,
266    {
267        let op = sql_operation(query.sql());
268        let span =
269            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
270        let conn: &'e mut PgConnection = &mut *self;
271        Box::pin(conn.fetch_all(query).instrument(span))
272    }
273
274    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
275    where
276        'c: 'e,
277        E: sqlx::Execute<'q, Postgres> + 'q,
278    {
279        let op = sql_operation(query.sql());
280        let span =
281            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
282        let conn: &'e mut PgConnection = &mut *self;
283        Box::pin(conn.fetch_one(query).instrument(span))
284    }
285
286    fn prepare_with<'e, 'q: 'e>(
287        self,
288        sql: &'q str,
289        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
290    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
291    where
292        'c: 'e,
293    {
294        let conn: &'e mut PgConnection = &mut *self;
295        conn.prepare_with(sql, parameters)
296    }
297
298    fn describe<'e, 'q: 'e>(
299        self,
300        sql: &'q str,
301    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
302    where
303        'c: 'e,
304    {
305        let conn: &'e mut PgConnection = &mut *self;
306        conn.describe(sql)
307    }
308}
309
310#[derive(Debug, Clone)]
311pub struct PendingJob {
312    pub id: Uuid,
313    pub job_type: String,
314    pub args: serde_json::Value,
315    pub context: serde_json::Value,
316    pub owner_subject: Option<String>,
317    pub priority: i32,
318    pub max_attempts: i32,
319    pub worker_capability: Option<String>,
320}
321
322#[derive(Debug, Clone)]
323pub struct PendingWorkflow {
324    pub id: Uuid,
325    pub workflow_name: String,
326    pub input: serde_json::Value,
327    pub owner_subject: Option<String>,
328}
329
330#[derive(Default)]
331pub struct OutboxBuffer {
332    pub jobs: Vec<PendingJob>,
333    pub workflows: Vec<PendingWorkflow>,
334}
335
336/// Authentication context available to all functions.
337#[derive(Debug, Clone)]
338pub struct AuthContext {
339    /// The authenticated user ID (if any).
340    user_id: Option<Uuid>,
341    /// User roles.
342    roles: Vec<String>,
343    /// Custom claims from JWT.
344    claims: HashMap<String, serde_json::Value>,
345    /// Whether the request is authenticated.
346    authenticated: bool,
347}
348
349impl AuthContext {
350    /// Create an unauthenticated context.
351    pub fn unauthenticated() -> Self {
352        Self {
353            user_id: None,
354            roles: Vec::new(),
355            claims: HashMap::new(),
356            authenticated: false,
357        }
358    }
359
360    /// Create an authenticated context with a UUID user ID.
361    pub fn authenticated(
362        user_id: Uuid,
363        roles: Vec<String>,
364        claims: HashMap<String, serde_json::Value>,
365    ) -> Self {
366        Self {
367            user_id: Some(user_id),
368            roles,
369            claims,
370            authenticated: true,
371        }
372    }
373
374    /// Create an authenticated context without requiring a UUID user ID.
375    ///
376    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
377    /// Clerk). The raw subject string is available via `subject()` method
378    /// from the "sub" claim.
379    pub fn authenticated_without_uuid(
380        roles: Vec<String>,
381        claims: HashMap<String, serde_json::Value>,
382    ) -> Self {
383        Self {
384            user_id: None,
385            roles,
386            claims,
387            authenticated: true,
388        }
389    }
390
391    /// Check if the user is authenticated.
392    pub fn is_authenticated(&self) -> bool {
393        self.authenticated
394    }
395
396    /// Get the user ID if authenticated.
397    pub fn user_id(&self) -> Option<Uuid> {
398        self.user_id
399    }
400
401    /// Get the user ID, returning an error if not authenticated.
402    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
403        self.user_id
404            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
405    }
406
407    /// Check if the user has a specific role.
408    pub fn has_role(&self, role: &str) -> bool {
409        self.roles.iter().any(|r| r == role)
410    }
411
412    /// Require a specific role, returning an error if not present.
413    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
414        if self.has_role(role) {
415            Ok(())
416        } else {
417            Err(crate::error::ForgeError::Forbidden(format!(
418                "Required role '{}' not present",
419                role
420            )))
421        }
422    }
423
424    /// Get a custom claim value.
425    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
426        self.claims.get(key)
427    }
428
429    /// Get all custom claims.
430    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
431        &self.claims
432    }
433
434    /// Get all roles.
435    pub fn roles(&self) -> &[String] {
436        &self.roles
437    }
438
439    /// Get the raw subject claim.
440    ///
441    /// This works with any provider's subject format (UUID, email, custom ID).
442    /// For providers like Firebase or Clerk that don't use UUIDs, use this
443    /// instead of `user_id()`.
444    pub fn subject(&self) -> Option<&str> {
445        self.claims.get("sub").and_then(|v| v.as_str())
446    }
447
448    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
449    pub fn require_subject(&self) -> crate::error::Result<&str> {
450        if !self.authenticated {
451            return Err(crate::error::ForgeError::Unauthorized(
452                "Authentication required".to_string(),
453            ));
454        }
455        self.subject().ok_or_else(|| {
456            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
457        })
458    }
459
460    /// Get a stable principal identifier for access control and cache scoping.
461    ///
462    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
463    pub fn principal_id(&self) -> Option<String> {
464        self.subject()
465            .map(ToString::to_string)
466            .or_else(|| self.user_id.map(|id| id.to_string()))
467    }
468
469    /// Check whether this principal should be treated as privileged admin.
470    pub fn is_admin(&self) -> bool {
471        self.roles.iter().any(|r| r == "admin")
472    }
473
474    /// Get the tenant ID from the JWT claims, if present.
475    ///
476    /// Looks for a `tenant_id` claim in the token and attempts to parse it as
477    /// a UUID. Returns `None` if the claim is absent or not a valid UUID.
478    pub fn tenant_id(&self) -> Option<uuid::Uuid> {
479        self.claims
480            .get("tenant_id")
481            .and_then(|v| v.as_str())
482            .and_then(|s| uuid::Uuid::parse_str(s).ok())
483    }
484}
485
486/// Request metadata available to all functions.
487#[derive(Debug, Clone)]
488pub struct RequestMetadata {
489    /// Unique request ID for tracing.
490    pub request_id: Uuid,
491    /// Trace ID for distributed tracing.
492    pub trace_id: String,
493    /// Client IP address.
494    pub client_ip: Option<String>,
495    /// User agent string.
496    pub user_agent: Option<String>,
497    /// Correlation ID linking frontend events to this backend call.
498    pub correlation_id: Option<String>,
499    /// Request timestamp.
500    pub timestamp: chrono::DateTime<chrono::Utc>,
501}
502
503impl RequestMetadata {
504    /// Create new request metadata.
505    pub fn new() -> Self {
506        Self {
507            request_id: Uuid::new_v4(),
508            trace_id: Uuid::new_v4().to_string(),
509            client_ip: None,
510            user_agent: None,
511            correlation_id: None,
512            timestamp: chrono::Utc::now(),
513        }
514    }
515
516    /// Create with a specific trace ID.
517    pub fn with_trace_id(trace_id: String) -> Self {
518        Self {
519            request_id: Uuid::new_v4(),
520            trace_id,
521            client_ip: None,
522            user_agent: None,
523            correlation_id: None,
524            timestamp: chrono::Utc::now(),
525        }
526    }
527}
528
529impl Default for RequestMetadata {
530    fn default() -> Self {
531        Self::new()
532    }
533}
534
535/// Context for query functions (read-only database access).
536pub struct QueryContext {
537    /// Authentication context.
538    pub auth: AuthContext,
539    /// Request metadata.
540    pub request: RequestMetadata,
541    /// Database pool for read operations.
542    db_pool: sqlx::PgPool,
543    /// Environment variable provider.
544    env_provider: Arc<dyn EnvProvider>,
545}
546
547impl QueryContext {
548    /// Create a new query context.
549    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
550        Self {
551            auth,
552            request,
553            db_pool,
554            env_provider: Arc::new(RealEnvProvider::new()),
555        }
556    }
557
558    /// Create a query context with a custom environment provider.
559    pub fn with_env(
560        db_pool: sqlx::PgPool,
561        auth: AuthContext,
562        request: RequestMetadata,
563        env_provider: Arc<dyn EnvProvider>,
564    ) -> Self {
565        Self {
566            auth,
567            request,
568            db_pool,
569            env_provider,
570        }
571    }
572
573    /// Database handle with automatic `db.query` tracing spans.
574    ///
575    /// Works directly with sqlx compile-time checked macros:
576    /// ```ignore
577    /// sqlx::query_as!(User, "SELECT * FROM users")
578    ///     .fetch_all(ctx.db())
579    ///     .await?
580    /// ```
581    pub fn db(&self) -> ForgeDb {
582        ForgeDb(self.db_pool.clone())
583    }
584
585    /// Get the authenticated user's UUID. Returns 401 if not authenticated.
586    pub fn user_id(&self) -> crate::error::Result<Uuid> {
587        self.auth.require_user_id()
588    }
589
590    /// Get the tenant ID from JWT claims, if present.
591    pub fn tenant_id(&self) -> Option<Uuid> {
592        self.auth.tenant_id()
593    }
594}
595
596impl EnvAccess for QueryContext {
597    fn env_provider(&self) -> &dyn EnvProvider {
598        self.env_provider.as_ref()
599    }
600}
601
602/// Callback type for looking up job info by name.
603pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
604
605/// Token TTL configuration resolved from `[auth]` in forge.toml.
606#[derive(Debug, Clone)]
607pub struct AuthTokenTtl {
608    /// Access token lifetime in seconds (default 3600).
609    pub access_token_secs: i64,
610    /// Refresh token lifetime in days (default 30).
611    pub refresh_token_days: i64,
612}
613
614impl Default for AuthTokenTtl {
615    fn default() -> Self {
616        Self {
617            access_token_secs: 3600,
618            refresh_token_days: 30,
619        }
620    }
621}
622
623/// Context for mutation functions (transactional database access).
624pub struct MutationContext {
625    /// Authentication context.
626    pub auth: AuthContext,
627    /// Request metadata.
628    pub request: RequestMetadata,
629    /// Database pool for transactional operations.
630    db_pool: sqlx::PgPool,
631    /// HTTP client with circuit breaker for external requests.
632    http_client: CircuitBreakerClient,
633    /// Default timeout for outbound HTTP requests made through the
634    /// circuit-breaker client. `None` means unlimited.
635    http_timeout: Option<Duration>,
636    /// Optional job dispatcher for dispatching background jobs.
637    job_dispatch: Option<Arc<dyn JobDispatch>>,
638    /// Optional workflow dispatcher for starting workflows.
639    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
640    /// Environment variable provider.
641    env_provider: Arc<dyn EnvProvider>,
642    /// Transaction handle for transactional mutations.
643    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
644    /// Outbox buffer for jobs/workflows dispatched during transaction.
645    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
646    /// Job info lookup for transactional dispatch.
647    job_info_lookup: Option<JobInfoLookup>,
648    /// Optional token issuer for signing JWTs (available when HMAC auth is configured).
649    token_issuer: Option<Arc<dyn TokenIssuer>>,
650    /// Token TTL config from forge.toml.
651    token_ttl: AuthTokenTtl,
652}
653
654impl MutationContext {
655    /// Create a new mutation context.
656    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
657        Self {
658            auth,
659            request,
660            db_pool,
661            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
662            http_timeout: None,
663            job_dispatch: None,
664            workflow_dispatch: None,
665            env_provider: Arc::new(RealEnvProvider::new()),
666            tx: None,
667            outbox: None,
668            job_info_lookup: None,
669            token_issuer: None,
670            token_ttl: AuthTokenTtl::default(),
671        }
672    }
673
674    /// Create a mutation context with dispatch capabilities.
675    pub fn with_dispatch(
676        db_pool: sqlx::PgPool,
677        auth: AuthContext,
678        request: RequestMetadata,
679        http_client: CircuitBreakerClient,
680        job_dispatch: Option<Arc<dyn JobDispatch>>,
681        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
682    ) -> Self {
683        Self {
684            auth,
685            request,
686            db_pool,
687            http_client,
688            http_timeout: None,
689            job_dispatch,
690            workflow_dispatch,
691            env_provider: Arc::new(RealEnvProvider::new()),
692            tx: None,
693            outbox: None,
694            job_info_lookup: None,
695            token_issuer: None,
696            token_ttl: AuthTokenTtl::default(),
697        }
698    }
699
700    /// Create a mutation context with a custom environment provider.
701    pub fn with_env(
702        db_pool: sqlx::PgPool,
703        auth: AuthContext,
704        request: RequestMetadata,
705        http_client: CircuitBreakerClient,
706        job_dispatch: Option<Arc<dyn JobDispatch>>,
707        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
708        env_provider: Arc<dyn EnvProvider>,
709    ) -> Self {
710        Self {
711            auth,
712            request,
713            db_pool,
714            http_client,
715            http_timeout: None,
716            job_dispatch,
717            workflow_dispatch,
718            env_provider,
719            tx: None,
720            outbox: None,
721            job_info_lookup: None,
722            token_issuer: None,
723            token_ttl: AuthTokenTtl::default(),
724        }
725    }
726
727    /// Returns handles to transaction and outbox for the caller to commit/flush.
728    #[allow(clippy::type_complexity)]
729    pub fn with_transaction(
730        db_pool: sqlx::PgPool,
731        tx: Transaction<'static, Postgres>,
732        auth: AuthContext,
733        request: RequestMetadata,
734        http_client: CircuitBreakerClient,
735        job_info_lookup: JobInfoLookup,
736    ) -> (
737        Self,
738        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
739        Arc<Mutex<OutboxBuffer>>,
740    ) {
741        let tx_handle = Arc::new(AsyncMutex::new(tx));
742        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
743
744        let ctx = Self {
745            auth,
746            request,
747            db_pool,
748            http_client,
749            http_timeout: None,
750            job_dispatch: None,
751            workflow_dispatch: None,
752            env_provider: Arc::new(RealEnvProvider::new()),
753            tx: Some(tx_handle.clone()),
754            outbox: Some(outbox.clone()),
755            job_info_lookup: Some(job_info_lookup),
756            token_issuer: None,
757            token_ttl: AuthTokenTtl::default(),
758        };
759
760        (ctx, tx_handle, outbox)
761    }
762
763    pub fn is_transactional(&self) -> bool {
764        self.tx.is_some()
765    }
766
767    /// Acquire a connection compatible with sqlx compile-time checked macros.
768    ///
769    /// In transactional mode, returns a guard over the active transaction.
770    /// Otherwise acquires a fresh connection from the pool.
771    ///
772    /// ```ignore
773    /// let mut conn = ctx.conn().await?;
774    /// sqlx::query_as!(User, "INSERT INTO users ... RETURNING *", ...)
775    ///     .fetch_one(&mut *conn)
776    ///     .await?
777    /// ```
778    pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
779        match &self.tx {
780            Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
781            None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
782        }
783    }
784
785    /// Direct pool access for operations that cannot run inside a transaction.
786    pub fn pool(&self) -> &sqlx::PgPool {
787        &self.db_pool
788    }
789
790    /// Get the HTTP client for external requests.
791    ///
792    /// Requests go through the circuit breaker automatically. When the handler
793    /// declared an explicit `timeout`, that timeout is also applied to outbound
794    /// HTTP requests unless the request overrides it.
795    pub fn http(&self) -> crate::http::HttpClient {
796        self.http_client.with_timeout(self.http_timeout)
797    }
798
799    /// Get the raw reqwest client, bypassing circuit breaker execution.
800    pub fn raw_http(&self) -> &reqwest::Client {
801        self.http_client.inner()
802    }
803
804    /// Set the default outbound HTTP request timeout for this context.
805    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
806        self.http_timeout = timeout;
807    }
808
809    /// Get the authenticated user's UUID. Returns 401 if not authenticated.
810    pub fn user_id(&self) -> crate::error::Result<Uuid> {
811        self.auth.require_user_id()
812    }
813
814    /// Get the tenant ID from JWT claims, if present.
815    pub fn tenant_id(&self) -> Option<Uuid> {
816        self.auth.tenant_id()
817    }
818
819    /// Set the token issuer for this context.
820    pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
821        self.token_issuer = Some(issuer);
822    }
823
824    /// Set the token TTL configuration (from forge.toml `[auth]`).
825    pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
826        self.token_ttl = ttl;
827    }
828
829    /// Issue a signed JWT from the given claims.
830    ///
831    /// Only available when HMAC auth is configured in `forge.toml`.
832    /// Returns an error if auth is not configured or uses an external provider (RSA/JWKS).
833    ///
834    /// ```ignore
835    /// let claims = Claims::builder()
836    ///     .user_id(user.id)
837    ///     .duration_secs(7 * 24 * 3600)
838    ///     .build()
839    ///     .map_err(|e| ForgeError::Internal(e))?;
840    ///
841    /// let token = ctx.issue_token(&claims)?;
842    /// ```
843    pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
844        let issuer = self.token_issuer.as_ref().ok_or_else(|| {
845            crate::error::ForgeError::Internal(
846                "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
847                    .into(),
848            )
849        })?;
850        issuer.sign(claims)
851    }
852
853    /// Issue an access + refresh token pair for the given user.
854    ///
855    /// Stores the refresh token hash in `forge_refresh_tokens` and returns
856    /// both tokens. Use `rotate_refresh_token()` to exchange a refresh token
857    /// for a new pair, and `revoke_refresh_token()` to invalidate one.
858    ///
859    /// TTLs come from `[auth]` in forge.toml:
860    /// - `access_token_ttl` (default "1h")
861    /// - `refresh_token_ttl` (default "30d")
862    pub async fn issue_token_pair(
863        &self,
864        user_id: Uuid,
865        roles: &[&str],
866    ) -> crate::error::Result<crate::auth::TokenPair> {
867        let issuer = self.token_issuer.clone().ok_or_else(|| {
868            crate::error::ForgeError::Internal(
869                "Token issuer not available. Configure [auth] in forge.toml".into(),
870            )
871        })?;
872        let access_ttl = self.token_ttl.access_token_secs;
873        let refresh_ttl = self.token_ttl.refresh_token_days;
874        crate::auth::tokens::issue_token_pair(
875            &self.db_pool,
876            user_id,
877            roles,
878            access_ttl,
879            refresh_ttl,
880            move |uid, r, ttl| {
881                let claims = Claims::builder()
882                    .subject(uid)
883                    .roles(r.iter().map(|s| s.to_string()).collect())
884                    .duration_secs(ttl)
885                    .build()
886                    .map_err(crate::error::ForgeError::Internal)?;
887                issuer.sign(&claims)
888            },
889        )
890        .await
891    }
892
893    /// Rotate a refresh token: validate the old one, issue a new pair.
894    ///
895    /// The old token is atomically deleted and a new access + refresh pair
896    /// is returned. Fails if the token is invalid or expired.
897    pub async fn rotate_refresh_token(
898        &self,
899        old_refresh_token: &str,
900    ) -> crate::error::Result<crate::auth::TokenPair> {
901        let issuer = self.token_issuer.clone().ok_or_else(|| {
902            crate::error::ForgeError::Internal(
903                "Token issuer not available. Configure [auth] in forge.toml".into(),
904            )
905        })?;
906        let access_ttl = self.token_ttl.access_token_secs;
907        let refresh_ttl = self.token_ttl.refresh_token_days;
908        crate::auth::tokens::rotate_refresh_token(
909            &self.db_pool,
910            old_refresh_token,
911            &["user"],
912            access_ttl,
913            refresh_ttl,
914            move |uid, r, ttl| {
915                let claims = Claims::builder()
916                    .subject(uid)
917                    .roles(r.iter().map(|s| s.to_string()).collect())
918                    .duration_secs(ttl)
919                    .build()
920                    .map_err(crate::error::ForgeError::Internal)?;
921                issuer.sign(&claims)
922            },
923        )
924        .await
925    }
926
927    /// Revoke a specific refresh token (e.g., on logout).
928    pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
929        crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
930    }
931
932    /// Revoke all refresh tokens for a user (e.g., on password change or account deletion).
933    pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
934        crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
935    }
936
937    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
938    pub async fn dispatch_job<T: serde::Serialize>(
939        &self,
940        job_type: &str,
941        args: T,
942    ) -> crate::error::Result<Uuid> {
943        let args_json = serde_json::to_value(args)?;
944
945        // Transactional mode: buffer the job for atomic commit
946        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
947            let job_info = job_info_lookup(job_type).ok_or_else(|| {
948                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
949            })?;
950
951            let pending = PendingJob {
952                id: Uuid::new_v4(),
953                job_type: job_type.to_string(),
954                args: args_json,
955                context: serde_json::json!({}),
956                owner_subject: self.auth.principal_id(),
957                priority: job_info.priority.as_i32(),
958                max_attempts: job_info.retry.max_attempts as i32,
959                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
960            };
961
962            let job_id = pending.id;
963            outbox
964                .lock()
965                .expect("outbox lock poisoned")
966                .jobs
967                .push(pending);
968            return Ok(job_id);
969        }
970
971        // Non-transactional mode: dispatch immediately
972        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
973            crate::error::ForgeError::Internal("Job dispatch not available".into())
974        })?;
975        dispatcher
976            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
977            .await
978    }
979
980    /// Dispatch a job with initial context.
981    pub async fn dispatch_job_with_context<T: serde::Serialize>(
982        &self,
983        job_type: &str,
984        args: T,
985        context: serde_json::Value,
986    ) -> crate::error::Result<Uuid> {
987        let args_json = serde_json::to_value(args)?;
988
989        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
990            let job_info = job_info_lookup(job_type).ok_or_else(|| {
991                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
992            })?;
993
994            let pending = PendingJob {
995                id: Uuid::new_v4(),
996                job_type: job_type.to_string(),
997                args: args_json,
998                context,
999                owner_subject: self.auth.principal_id(),
1000                priority: job_info.priority.as_i32(),
1001                max_attempts: job_info.retry.max_attempts as i32,
1002                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1003            };
1004
1005            let job_id = pending.id;
1006            outbox
1007                .lock()
1008                .expect("outbox lock poisoned")
1009                .jobs
1010                .push(pending);
1011            return Ok(job_id);
1012        }
1013
1014        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1015            crate::error::ForgeError::Internal("Job dispatch not available".into())
1016        })?;
1017        dispatcher
1018            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1019            .await
1020    }
1021
1022    /// Request cancellation for a job.
1023    pub async fn cancel_job(
1024        &self,
1025        job_id: Uuid,
1026        reason: Option<String>,
1027    ) -> crate::error::Result<bool> {
1028        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1029            crate::error::ForgeError::Internal("Job dispatch not available".into())
1030        })?;
1031        dispatcher.cancel(job_id, reason).await
1032    }
1033
1034    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
1035    pub async fn start_workflow<T: serde::Serialize>(
1036        &self,
1037        workflow_name: &str,
1038        input: T,
1039    ) -> crate::error::Result<Uuid> {
1040        let input_json = serde_json::to_value(input)?;
1041
1042        // Transactional mode: buffer the workflow for atomic commit
1043        if let Some(outbox) = &self.outbox {
1044            let pending = PendingWorkflow {
1045                id: Uuid::new_v4(),
1046                workflow_name: workflow_name.to_string(),
1047                input: input_json,
1048                owner_subject: self.auth.principal_id(),
1049            };
1050
1051            let workflow_id = pending.id;
1052            outbox
1053                .lock()
1054                .expect("outbox lock poisoned")
1055                .workflows
1056                .push(pending);
1057            return Ok(workflow_id);
1058        }
1059
1060        // Non-transactional mode: start immediately
1061        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1062            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1063        })?;
1064        dispatcher
1065            .start_by_name(workflow_name, input_json, self.auth.principal_id())
1066            .await
1067    }
1068}
1069
1070impl EnvAccess for MutationContext {
1071    fn env_provider(&self) -> &dyn EnvProvider {
1072        self.env_provider.as_ref()
1073    }
1074}
1075
1076#[cfg(test)]
1077#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1078mod tests {
1079    use super::*;
1080
1081    #[test]
1082    fn test_auth_context_unauthenticated() {
1083        let ctx = AuthContext::unauthenticated();
1084        assert!(!ctx.is_authenticated());
1085        assert!(ctx.user_id().is_none());
1086        assert!(ctx.require_user_id().is_err());
1087    }
1088
1089    #[test]
1090    fn test_auth_context_authenticated() {
1091        let user_id = Uuid::new_v4();
1092        let ctx = AuthContext::authenticated(
1093            user_id,
1094            vec!["admin".to_string(), "user".to_string()],
1095            HashMap::new(),
1096        );
1097
1098        assert!(ctx.is_authenticated());
1099        assert_eq!(ctx.user_id(), Some(user_id));
1100        assert!(ctx.require_user_id().is_ok());
1101        assert!(ctx.has_role("admin"));
1102        assert!(ctx.has_role("user"));
1103        assert!(!ctx.has_role("superadmin"));
1104        assert!(ctx.require_role("admin").is_ok());
1105        assert!(ctx.require_role("superadmin").is_err());
1106    }
1107
1108    #[test]
1109    fn test_auth_context_with_claims() {
1110        let mut claims = HashMap::new();
1111        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1112
1113        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1114
1115        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1116        assert!(ctx.claim("nonexistent").is_none());
1117    }
1118
1119    #[test]
1120    fn test_request_metadata() {
1121        let meta = RequestMetadata::new();
1122        assert!(!meta.trace_id.is_empty());
1123        assert!(meta.client_ip.is_none());
1124
1125        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1126        assert_eq!(meta2.trace_id, "trace-123");
1127    }
1128}