Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54/// Token issuer for signing JWTs.
55///
56/// Implemented by the runtime when HMAC auth is configured.
57/// Available via `ctx.issue_token()` in mutation handlers.
58pub trait TokenIssuer: Send + Sync {
59    /// Sign the given claims into a JWT string.
60    fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63/// Connection wrapper that implements sqlx's `Executor` trait with automatic
64/// `db.query` tracing spans.
65///
66/// Obtain via `ctx.conn().await?` in mutation handlers.
67/// Works with compile-time checked macros via `&mut conn`.
68///
69/// ```ignore
70/// let mut conn = ctx.conn().await?;
71/// sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
72///     .fetch_one(&mut *conn)
73///     .await?
74/// ```
75pub enum ForgeConn<'a> {
76    Pool(sqlx::pool::PoolConnection<Postgres>),
77    Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81    type Target = PgConnection;
82    fn deref(&self) -> &PgConnection {
83        match self {
84            ForgeConn::Pool(c) => c,
85            ForgeConn::Tx(g) => g,
86        }
87    }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91    fn deref_mut(&mut self) -> &mut PgConnection {
92        match self {
93            ForgeConn::Pool(c) => c,
94            ForgeConn::Tx(g) => g,
95        }
96    }
97}
98
99/// Pool wrapper that adds `db.query` tracing spans to every database operation.
100///
101/// Returned by [`QueryContext::db()`]. Implements sqlx's [`sqlx::Executor`] trait,
102/// so it works as a drop-in replacement for `&PgPool` with compile-time
103/// checked macros (`query!`, `query_as!`).
104///
105/// ```ignore
106/// sqlx::query_as!(User, "SELECT * FROM users")
107///     .fetch_all(ctx.db())
108///     .await?
109/// ```
110#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_tuple("ForgeDb").finish()
116    }
117}
118
119impl ForgeDb {
120    /// Create a `ForgeDb` from a pool reference. Clones the Arc-backed pool handle.
121    pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122        Self(pool.clone())
123    }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127    let bytes = sql.trim_start().as_bytes();
128    match bytes.get(..6) {
129        Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130        Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131        Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132        Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133        _ => "OTHER",
134    }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138    type Database = Postgres;
139
140    fn fetch_many<'e, 'q: 'e, E>(
141        self,
142        query: E,
143    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144    where
145        E: sqlx::Execute<'q, Postgres> + 'q,
146    {
147        (&self.0).fetch_many(query)
148    }
149
150    fn fetch_optional<'e, 'q: 'e, E>(
151        self,
152        query: E,
153    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154    where
155        E: sqlx::Execute<'q, Postgres> + 'q,
156    {
157        let op = sql_operation(query.sql());
158        let span =
159            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160        Box::pin(
161            async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162        )
163    }
164
165    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166    where
167        E: sqlx::Execute<'q, Postgres> + 'q,
168    {
169        let op = sql_operation(query.sql());
170        let span =
171            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172        Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173    }
174
175    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176    where
177        E: sqlx::Execute<'q, Postgres> + 'q,
178    {
179        let op = sql_operation(query.sql());
180        let span =
181            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182        Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183    }
184
185    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186    where
187        E: sqlx::Execute<'q, Postgres> + 'q,
188    {
189        let op = sql_operation(query.sql());
190        let span =
191            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192        Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193    }
194
195    fn prepare_with<'e, 'q: 'e>(
196        self,
197        sql: &'q str,
198        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200        Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201    }
202
203    fn describe<'e, 'q: 'e>(
204        self,
205        sql: &'q str,
206    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207        Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208    }
209}
210
211/// Abstraction over pool and transaction connections.
212///
213/// Allows shared helper functions to work with any context type.
214/// Obtain via `ctx.db_conn()` on pool-based contexts (queries, jobs, crons,
215/// daemons, webhooks, MCP tools) or via `ctx.db()` on `MutationContext`.
216///
217/// # Example
218///
219/// ```ignore
220/// pub async fn list_items(db: DbConn<'_>) -> Result<Vec<Item>> {
221///     db.fetch_all(sqlx::query_as!(Item, "SELECT * FROM items ORDER BY created_at DESC"))
222///         .await
223///         .map_err(Into::into)
224/// }
225/// ```
226pub enum DbConn<'a> {
227    /// Direct pool connection (queries, jobs, crons, daemons, webhooks, MCP).
228    Pool(sqlx::PgPool),
229    /// Transaction handle (transactional mutations).
230    Transaction(
231        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
232        &'a sqlx::PgPool,
233    ),
234}
235
236impl DbConn<'_> {
237    /// Fetch exactly one row.
238    pub async fn fetch_one<'q, O>(
239        &self,
240        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
241    ) -> sqlx::Result<O>
242    where
243        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
244    {
245        match self {
246            DbConn::Pool(pool) => query.fetch_one(pool).await,
247            DbConn::Transaction(tx, _) => {
248                let mut guard = tx.lock().await;
249                query.fetch_one(&mut **guard).await
250            }
251        }
252    }
253
254    /// Fetch zero or one row.
255    pub async fn fetch_optional<'q, O>(
256        &self,
257        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
258    ) -> sqlx::Result<Option<O>>
259    where
260        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
261    {
262        match self {
263            DbConn::Pool(pool) => query.fetch_optional(pool).await,
264            DbConn::Transaction(tx, _) => {
265                let mut guard = tx.lock().await;
266                query.fetch_optional(&mut **guard).await
267            }
268        }
269    }
270
271    /// Fetch all matching rows.
272    pub async fn fetch_all<'q, O>(
273        &self,
274        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
275    ) -> sqlx::Result<Vec<O>>
276    where
277        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
278    {
279        match self {
280            DbConn::Pool(pool) => query.fetch_all(pool).await,
281            DbConn::Transaction(tx, _) => {
282                let mut guard = tx.lock().await;
283                query.fetch_all(&mut **guard).await
284            }
285        }
286    }
287
288    /// Execute a statement (INSERT, UPDATE, DELETE).
289    pub async fn execute<'q>(
290        &self,
291        query: sqlx::query::Query<'q, Postgres, sqlx::postgres::PgArguments>,
292    ) -> sqlx::Result<PgQueryResult> {
293        match self {
294            DbConn::Pool(pool) => query.execute(pool).await,
295            DbConn::Transaction(tx, _) => {
296                let mut guard = tx.lock().await;
297                query.execute(&mut **guard).await
298            }
299        }
300    }
301}
302
303impl std::fmt::Debug for DbConn<'_> {
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        match self {
306            DbConn::Pool(_) => f.debug_tuple("DbConn::Pool").finish(),
307            DbConn::Transaction(_, _) => f.debug_tuple("DbConn::Transaction").finish(),
308        }
309    }
310}
311
312impl std::fmt::Debug for ForgeConn<'_> {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        match self {
315            ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
316            ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
317        }
318    }
319}
320
321impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
322    type Database = Postgres;
323
324    fn fetch_many<'e, 'q: 'e, E>(
325        self,
326        query: E,
327    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
328    where
329        'c: 'e,
330        E: sqlx::Execute<'q, Postgres> + 'q,
331    {
332        let conn: &'e mut PgConnection = &mut *self;
333        conn.fetch_many(query)
334    }
335
336    fn fetch_optional<'e, 'q: 'e, E>(
337        self,
338        query: E,
339    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
340    where
341        'c: 'e,
342        E: sqlx::Execute<'q, Postgres> + 'q,
343    {
344        let op = sql_operation(query.sql());
345        let span =
346            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
347        let conn: &'e mut PgConnection = &mut *self;
348        Box::pin(conn.fetch_optional(query).instrument(span))
349    }
350
351    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
352    where
353        'c: 'e,
354        E: sqlx::Execute<'q, Postgres> + 'q,
355    {
356        let op = sql_operation(query.sql());
357        let span =
358            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
359        let conn: &'e mut PgConnection = &mut *self;
360        Box::pin(conn.execute(query).instrument(span))
361    }
362
363    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
364    where
365        'c: 'e,
366        E: sqlx::Execute<'q, Postgres> + 'q,
367    {
368        let op = sql_operation(query.sql());
369        let span =
370            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
371        let conn: &'e mut PgConnection = &mut *self;
372        Box::pin(conn.fetch_all(query).instrument(span))
373    }
374
375    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
376    where
377        'c: 'e,
378        E: sqlx::Execute<'q, Postgres> + 'q,
379    {
380        let op = sql_operation(query.sql());
381        let span =
382            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
383        let conn: &'e mut PgConnection = &mut *self;
384        Box::pin(conn.fetch_one(query).instrument(span))
385    }
386
387    fn prepare_with<'e, 'q: 'e>(
388        self,
389        sql: &'q str,
390        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
391    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
392    where
393        'c: 'e,
394    {
395        let conn: &'e mut PgConnection = &mut *self;
396        conn.prepare_with(sql, parameters)
397    }
398
399    fn describe<'e, 'q: 'e>(
400        self,
401        sql: &'q str,
402    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
403    where
404        'c: 'e,
405    {
406        let conn: &'e mut PgConnection = &mut *self;
407        conn.describe(sql)
408    }
409}
410
411/// A job buffered for dispatch after transaction commit.
412#[derive(Debug, Clone)]
413pub struct PendingJob {
414    pub id: Uuid,
415    pub job_type: String,
416    pub args: serde_json::Value,
417    pub context: serde_json::Value,
418    pub owner_subject: Option<String>,
419    pub priority: i32,
420    pub max_attempts: i32,
421    pub worker_capability: Option<String>,
422}
423
424/// A workflow buffered for dispatch after transaction commit.
425#[derive(Debug, Clone)]
426pub struct PendingWorkflow {
427    pub id: Uuid,
428    pub workflow_name: String,
429    pub workflow_version: String,
430    pub workflow_signature: String,
431    pub input: serde_json::Value,
432    pub owner_subject: Option<String>,
433}
434
435/// Buffer for jobs and workflows dispatched during a transactional mutation.
436///
437/// Entries are flushed to the database atomically after the mutation transaction commits.
438/// If the transaction rolls back, buffered dispatches are discarded.
439#[derive(Default)]
440pub struct OutboxBuffer {
441    pub jobs: Vec<PendingJob>,
442    pub workflows: Vec<PendingWorkflow>,
443}
444
445/// Authentication context available to all functions.
446#[derive(Debug, Clone)]
447pub struct AuthContext {
448    /// The authenticated user ID (if any).
449    user_id: Option<Uuid>,
450    /// User roles.
451    roles: Vec<String>,
452    /// Custom claims from JWT.
453    claims: HashMap<String, serde_json::Value>,
454    /// Whether the request is authenticated.
455    authenticated: bool,
456}
457
458impl AuthContext {
459    /// Create an unauthenticated context.
460    pub fn unauthenticated() -> Self {
461        Self {
462            user_id: None,
463            roles: Vec::new(),
464            claims: HashMap::new(),
465            authenticated: false,
466        }
467    }
468
469    /// Create an authenticated context with a UUID user ID.
470    pub fn authenticated(
471        user_id: Uuid,
472        roles: Vec<String>,
473        claims: HashMap<String, serde_json::Value>,
474    ) -> Self {
475        Self {
476            user_id: Some(user_id),
477            roles,
478            claims,
479            authenticated: true,
480        }
481    }
482
483    /// Create an authenticated context without requiring a UUID user ID.
484    ///
485    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
486    /// Clerk). The raw subject string is available via `subject()` method
487    /// from the "sub" claim.
488    pub fn authenticated_without_uuid(
489        roles: Vec<String>,
490        claims: HashMap<String, serde_json::Value>,
491    ) -> Self {
492        Self {
493            user_id: None,
494            roles,
495            claims,
496            authenticated: true,
497        }
498    }
499
500    /// Check if the user is authenticated.
501    pub fn is_authenticated(&self) -> bool {
502        self.authenticated
503    }
504
505    /// Get the user ID if authenticated.
506    pub fn user_id(&self) -> Option<Uuid> {
507        self.user_id
508    }
509
510    /// Get the user ID, returning an error if not authenticated.
511    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
512        self.user_id
513            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
514    }
515
516    /// Check if the user has a specific role.
517    pub fn has_role(&self, role: &str) -> bool {
518        self.roles.iter().any(|r| r == role)
519    }
520
521    /// Require a specific role, returning an error if not present.
522    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
523        if self.has_role(role) {
524            Ok(())
525        } else {
526            Err(crate::error::ForgeError::Forbidden(format!(
527                "Required role '{}' not present",
528                role
529            )))
530        }
531    }
532
533    /// Get a custom claim value.
534    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
535        self.claims.get(key)
536    }
537
538    /// Get all custom claims.
539    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
540        &self.claims
541    }
542
543    /// Get all roles.
544    pub fn roles(&self) -> &[String] {
545        &self.roles
546    }
547
548    /// Get the raw subject claim.
549    ///
550    /// This works with any provider's subject format (UUID, email, custom ID).
551    /// For providers like Firebase or Clerk that don't use UUIDs, use this
552    /// instead of `user_id()`.
553    pub fn subject(&self) -> Option<&str> {
554        self.claims.get("sub").and_then(|v| v.as_str())
555    }
556
557    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
558    pub fn require_subject(&self) -> crate::error::Result<&str> {
559        if !self.authenticated {
560            return Err(crate::error::ForgeError::Unauthorized(
561                "Authentication required".to_string(),
562            ));
563        }
564        self.subject().ok_or_else(|| {
565            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
566        })
567    }
568
569    /// Get a stable principal identifier for access control and cache scoping.
570    ///
571    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
572    pub fn principal_id(&self) -> Option<String> {
573        self.subject()
574            .map(ToString::to_string)
575            .or_else(|| self.user_id.map(|id| id.to_string()))
576    }
577
578    /// Check whether this principal should be treated as privileged admin.
579    pub fn is_admin(&self) -> bool {
580        self.roles.iter().any(|r| r == "admin")
581    }
582
583    /// Get the tenant ID from the JWT claims, if present.
584    ///
585    /// Looks for a `tenant_id` claim in the token and attempts to parse it as
586    /// a UUID. Returns `None` if the claim is absent or not a valid UUID.
587    pub fn tenant_id(&self) -> Option<uuid::Uuid> {
588        self.claims
589            .get("tenant_id")
590            .and_then(|v| v.as_str())
591            .and_then(|s| uuid::Uuid::parse_str(s).ok())
592    }
593}
594
595/// Request metadata available to all functions.
596#[derive(Debug, Clone)]
597pub struct RequestMetadata {
598    /// Unique request ID for tracing.
599    pub request_id: Uuid,
600    /// Trace ID for distributed tracing.
601    pub trace_id: String,
602    /// Client IP address.
603    pub client_ip: Option<String>,
604    /// User agent string.
605    pub user_agent: Option<String>,
606    /// Correlation ID linking frontend events to this backend call.
607    pub correlation_id: Option<String>,
608    /// Request timestamp.
609    pub timestamp: chrono::DateTime<chrono::Utc>,
610}
611
612impl RequestMetadata {
613    /// Create new request metadata.
614    pub fn new() -> Self {
615        Self {
616            request_id: Uuid::new_v4(),
617            trace_id: Uuid::new_v4().to_string(),
618            client_ip: None,
619            user_agent: None,
620            correlation_id: None,
621            timestamp: chrono::Utc::now(),
622        }
623    }
624
625    /// Create with a specific trace ID.
626    pub fn with_trace_id(trace_id: String) -> Self {
627        Self {
628            request_id: Uuid::new_v4(),
629            trace_id,
630            client_ip: None,
631            user_agent: None,
632            correlation_id: None,
633            timestamp: chrono::Utc::now(),
634        }
635    }
636}
637
638impl Default for RequestMetadata {
639    fn default() -> Self {
640        Self::new()
641    }
642}
643
644/// Context for query functions (read-only database access).
645pub struct QueryContext {
646    /// Authentication context.
647    pub auth: AuthContext,
648    /// Request metadata.
649    pub request: RequestMetadata,
650    /// Database pool for read operations.
651    db_pool: sqlx::PgPool,
652    /// Environment variable provider.
653    env_provider: Arc<dyn EnvProvider>,
654}
655
656impl QueryContext {
657    /// Create a new query context.
658    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
659        Self {
660            auth,
661            request,
662            db_pool,
663            env_provider: Arc::new(RealEnvProvider::new()),
664        }
665    }
666
667    /// Create a query context with a custom environment provider.
668    pub fn with_env(
669        db_pool: sqlx::PgPool,
670        auth: AuthContext,
671        request: RequestMetadata,
672        env_provider: Arc<dyn EnvProvider>,
673    ) -> Self {
674        Self {
675            auth,
676            request,
677            db_pool,
678            env_provider,
679        }
680    }
681
682    /// Database handle with automatic `db.query` tracing spans.
683    ///
684    /// Works directly with sqlx compile-time checked macros:
685    /// ```ignore
686    /// sqlx::query_as!(User, "SELECT * FROM users")
687    ///     .fetch_all(ctx.db())
688    ///     .await?
689    /// ```
690    pub fn db(&self) -> ForgeDb {
691        ForgeDb(self.db_pool.clone())
692    }
693
694    /// Get a `DbConn` for use in shared helper functions.
695    ///
696    /// Returns a pool-backed `DbConn` that can be passed to functions
697    /// accepting `DbConn<'_>` for cross-context reuse.
698    ///
699    /// ```ignore
700    /// pub async fn list_items(db: DbConn<'_>) -> Result<Vec<Item>> { ... }
701    ///
702    /// #[forge::query]
703    /// pub async fn get_items(ctx: &QueryContext) -> Result<Vec<Item>> {
704    ///     list_items(ctx.db_conn()).await
705    /// }
706    /// ```
707    pub fn db_conn(&self) -> DbConn<'_> {
708        DbConn::Pool(self.db_pool.clone())
709    }
710
711    /// Get the authenticated user's UUID. Returns 401 if not authenticated.
712    pub fn user_id(&self) -> crate::error::Result<Uuid> {
713        self.auth.require_user_id()
714    }
715
716    /// Get the tenant ID from JWT claims, if present.
717    pub fn tenant_id(&self) -> Option<Uuid> {
718        self.auth.tenant_id()
719    }
720}
721
722impl EnvAccess for QueryContext {
723    fn env_provider(&self) -> &dyn EnvProvider {
724        self.env_provider.as_ref()
725    }
726}
727
728/// Callback type for looking up job info by name.
729pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
730
731/// Token TTL configuration resolved from `[auth]` in forge.toml.
732#[derive(Debug, Clone)]
733pub struct AuthTokenTtl {
734    /// Access token lifetime in seconds (default 3600).
735    pub access_token_secs: i64,
736    /// Refresh token lifetime in days (default 30).
737    pub refresh_token_days: i64,
738}
739
740impl Default for AuthTokenTtl {
741    fn default() -> Self {
742        Self {
743            access_token_secs: 3600,
744            refresh_token_days: 30,
745        }
746    }
747}
748
749/// Context for mutation functions (transactional database access).
750pub struct MutationContext {
751    /// Authentication context.
752    pub auth: AuthContext,
753    /// Request metadata.
754    pub request: RequestMetadata,
755    /// Database pool for transactional operations.
756    db_pool: sqlx::PgPool,
757    /// HTTP client with circuit breaker for external requests.
758    http_client: CircuitBreakerClient,
759    /// Default timeout for outbound HTTP requests made through the
760    /// circuit-breaker client. `None` means unlimited.
761    http_timeout: Option<Duration>,
762    /// Optional job dispatcher for dispatching background jobs.
763    job_dispatch: Option<Arc<dyn JobDispatch>>,
764    /// Optional workflow dispatcher for starting workflows.
765    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
766    /// Environment variable provider.
767    env_provider: Arc<dyn EnvProvider>,
768    /// Transaction handle for transactional mutations.
769    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
770    /// Outbox buffer for jobs/workflows dispatched during transaction.
771    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
772    /// Job info lookup for transactional dispatch.
773    job_info_lookup: Option<JobInfoLookup>,
774    /// Optional token issuer for signing JWTs (available when HMAC auth is configured).
775    token_issuer: Option<Arc<dyn TokenIssuer>>,
776    /// Token TTL config from forge.toml.
777    token_ttl: AuthTokenTtl,
778}
779
780impl MutationContext {
781    /// Create a new mutation context.
782    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
783        Self {
784            auth,
785            request,
786            db_pool,
787            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
788            http_timeout: None,
789            job_dispatch: None,
790            workflow_dispatch: None,
791            env_provider: Arc::new(RealEnvProvider::new()),
792            tx: None,
793            outbox: None,
794            job_info_lookup: None,
795            token_issuer: None,
796            token_ttl: AuthTokenTtl::default(),
797        }
798    }
799
800    /// Create a mutation context with dispatch capabilities.
801    pub fn with_dispatch(
802        db_pool: sqlx::PgPool,
803        auth: AuthContext,
804        request: RequestMetadata,
805        http_client: CircuitBreakerClient,
806        job_dispatch: Option<Arc<dyn JobDispatch>>,
807        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
808    ) -> Self {
809        Self {
810            auth,
811            request,
812            db_pool,
813            http_client,
814            http_timeout: None,
815            job_dispatch,
816            workflow_dispatch,
817            env_provider: Arc::new(RealEnvProvider::new()),
818            tx: None,
819            outbox: None,
820            job_info_lookup: None,
821            token_issuer: None,
822            token_ttl: AuthTokenTtl::default(),
823        }
824    }
825
826    /// Create a mutation context with a custom environment provider.
827    pub fn with_env(
828        db_pool: sqlx::PgPool,
829        auth: AuthContext,
830        request: RequestMetadata,
831        http_client: CircuitBreakerClient,
832        job_dispatch: Option<Arc<dyn JobDispatch>>,
833        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
834        env_provider: Arc<dyn EnvProvider>,
835    ) -> Self {
836        Self {
837            auth,
838            request,
839            db_pool,
840            http_client,
841            http_timeout: None,
842            job_dispatch,
843            workflow_dispatch,
844            env_provider,
845            tx: None,
846            outbox: None,
847            job_info_lookup: None,
848            token_issuer: None,
849            token_ttl: AuthTokenTtl::default(),
850        }
851    }
852
853    /// Returns handles to transaction and outbox for the caller to commit/flush.
854    #[allow(clippy::type_complexity)]
855    pub fn with_transaction(
856        db_pool: sqlx::PgPool,
857        tx: Transaction<'static, Postgres>,
858        auth: AuthContext,
859        request: RequestMetadata,
860        http_client: CircuitBreakerClient,
861        job_info_lookup: JobInfoLookup,
862    ) -> (
863        Self,
864        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
865        Arc<Mutex<OutboxBuffer>>,
866    ) {
867        let tx_handle = Arc::new(AsyncMutex::new(tx));
868        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
869
870        let ctx = Self {
871            auth,
872            request,
873            db_pool,
874            http_client,
875            http_timeout: None,
876            job_dispatch: None,
877            workflow_dispatch: None,
878            env_provider: Arc::new(RealEnvProvider::new()),
879            tx: Some(tx_handle.clone()),
880            outbox: Some(outbox.clone()),
881            job_info_lookup: Some(job_info_lookup),
882            token_issuer: None,
883            token_ttl: AuthTokenTtl::default(),
884        };
885
886        (ctx, tx_handle, outbox)
887    }
888
889    pub fn is_transactional(&self) -> bool {
890        self.tx.is_some()
891    }
892
893    /// Acquire a connection compatible with sqlx compile-time checked macros.
894    ///
895    /// In transactional mode, returns a guard over the active transaction.
896    /// Otherwise acquires a fresh connection from the pool.
897    ///
898    /// ```ignore
899    /// let mut conn = ctx.conn().await?;
900    /// sqlx::query_as!(User, "INSERT INTO users ... RETURNING *", ...)
901    ///     .fetch_one(&mut *conn)
902    ///     .await?
903    /// ```
904    pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
905        match &self.tx {
906            Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
907            None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
908        }
909    }
910
911    /// Direct pool access for operations that cannot run inside a transaction.
912    pub fn pool(&self) -> &sqlx::PgPool {
913        &self.db_pool
914    }
915
916    /// Get a `DbConn` for use in shared helper functions.
917    ///
918    /// In transactional mode, returns a transaction-backed `DbConn`.
919    /// Otherwise returns a pool-backed `DbConn`.
920    ///
921    /// ```ignore
922    /// pub async fn list_items(db: DbConn<'_>) -> Result<Vec<Item>> { ... }
923    ///
924    /// #[forge::mutation]
925    /// pub async fn items_snapshot(ctx: &MutationContext, input: Input) -> Result<Vec<Item>> {
926    ///     list_items(ctx.db()).await
927    /// }
928    /// ```
929    pub fn db(&self) -> DbConn<'_> {
930        match &self.tx {
931            Some(tx) => DbConn::Transaction(tx.clone(), &self.db_pool),
932            None => DbConn::Pool(self.db_pool.clone()),
933        }
934    }
935
936    /// Get a `DbConn` for use in shared helper functions (alias for `db()`).
937    pub fn db_conn(&self) -> DbConn<'_> {
938        self.db()
939    }
940
941    /// Get the HTTP client for external requests.
942    ///
943    /// Requests go through the circuit breaker automatically. When the handler
944    /// declared an explicit `timeout`, that timeout is also applied to outbound
945    /// HTTP requests unless the request overrides it.
946    pub fn http(&self) -> crate::http::HttpClient {
947        self.http_client.with_timeout(self.http_timeout)
948    }
949
950    /// Get the raw reqwest client, bypassing circuit breaker execution.
951    pub fn raw_http(&self) -> &reqwest::Client {
952        self.http_client.inner()
953    }
954
955    /// Set the default outbound HTTP request timeout for this context.
956    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
957        self.http_timeout = timeout;
958    }
959
960    /// Get the authenticated user's UUID. Returns 401 if not authenticated.
961    pub fn user_id(&self) -> crate::error::Result<Uuid> {
962        self.auth.require_user_id()
963    }
964
965    /// Get the tenant ID from JWT claims, if present.
966    pub fn tenant_id(&self) -> Option<Uuid> {
967        self.auth.tenant_id()
968    }
969
970    /// Set the token issuer for this context.
971    pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
972        self.token_issuer = Some(issuer);
973    }
974
975    /// Set the token TTL configuration (from forge.toml `[auth]`).
976    pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
977        self.token_ttl = ttl;
978    }
979
980    /// Issue a signed JWT from the given claims.
981    ///
982    /// Only available when HMAC auth is configured in `forge.toml`.
983    /// Returns an error if auth is not configured or uses an external provider (RSA/JWKS).
984    ///
985    /// ```ignore
986    /// let claims = Claims::builder()
987    ///     .user_id(user.id)
988    ///     .duration_secs(7 * 24 * 3600)
989    ///     .build()
990    ///     .map_err(|e| ForgeError::Internal(e))?;
991    ///
992    /// let token = ctx.issue_token(&claims)?;
993    /// ```
994    pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
995        let issuer = self.token_issuer.as_ref().ok_or_else(|| {
996            crate::error::ForgeError::Internal(
997                "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
998                    .into(),
999            )
1000        })?;
1001        issuer.sign(claims)
1002    }
1003
1004    /// Issue an access + refresh token pair for the given user.
1005    ///
1006    /// Stores the refresh token hash in `forge_refresh_tokens` and returns
1007    /// both tokens. Use `rotate_refresh_token()` to exchange a refresh token
1008    /// for a new pair, and `revoke_refresh_token()` to invalidate one.
1009    ///
1010    /// TTLs come from `[auth]` in forge.toml:
1011    /// - `access_token_ttl` (default "1h")
1012    /// - `refresh_token_ttl` (default "30d")
1013    pub async fn issue_token_pair(
1014        &self,
1015        user_id: Uuid,
1016        roles: &[&str],
1017    ) -> crate::error::Result<crate::auth::TokenPair> {
1018        let issuer = self.token_issuer.clone().ok_or_else(|| {
1019            crate::error::ForgeError::Internal(
1020                "Token issuer not available. Configure [auth] in forge.toml".into(),
1021            )
1022        })?;
1023        let access_ttl = self.token_ttl.access_token_secs;
1024        let refresh_ttl = self.token_ttl.refresh_token_days;
1025        crate::auth::tokens::issue_token_pair(
1026            &self.db_pool,
1027            user_id,
1028            roles,
1029            access_ttl,
1030            refresh_ttl,
1031            move |uid, r, ttl| {
1032                let claims = Claims::builder()
1033                    .subject(uid)
1034                    .roles(r.iter().map(|s| s.to_string()).collect())
1035                    .duration_secs(ttl)
1036                    .build()
1037                    .map_err(crate::error::ForgeError::Internal)?;
1038                issuer.sign(&claims)
1039            },
1040        )
1041        .await
1042    }
1043
1044    /// Rotate a refresh token: validate the old one, issue a new pair.
1045    ///
1046    /// The old token is atomically deleted and a new access + refresh pair
1047    /// is returned. Fails if the token is invalid or expired.
1048    pub async fn rotate_refresh_token(
1049        &self,
1050        old_refresh_token: &str,
1051    ) -> crate::error::Result<crate::auth::TokenPair> {
1052        let issuer = self.token_issuer.clone().ok_or_else(|| {
1053            crate::error::ForgeError::Internal(
1054                "Token issuer not available. Configure [auth] in forge.toml".into(),
1055            )
1056        })?;
1057        let access_ttl = self.token_ttl.access_token_secs;
1058        let refresh_ttl = self.token_ttl.refresh_token_days;
1059        crate::auth::tokens::rotate_refresh_token(
1060            &self.db_pool,
1061            old_refresh_token,
1062            &["user"],
1063            access_ttl,
1064            refresh_ttl,
1065            move |uid, r, ttl| {
1066                let claims = Claims::builder()
1067                    .subject(uid)
1068                    .roles(r.iter().map(|s| s.to_string()).collect())
1069                    .duration_secs(ttl)
1070                    .build()
1071                    .map_err(crate::error::ForgeError::Internal)?;
1072                issuer.sign(&claims)
1073            },
1074        )
1075        .await
1076    }
1077
1078    /// Revoke a specific refresh token (e.g., on logout).
1079    pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1080        crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1081    }
1082
1083    /// Revoke all refresh tokens for a user (e.g., on password change or account deletion).
1084    pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1085        crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1086    }
1087
1088    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
1089    pub async fn dispatch_job<T: serde::Serialize>(
1090        &self,
1091        job_type: &str,
1092        args: T,
1093    ) -> crate::error::Result<Uuid> {
1094        let args_json = serde_json::to_value(args)?;
1095
1096        // Transactional mode: buffer the job for atomic commit
1097        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1098            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1099                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1100            })?;
1101
1102            let pending = PendingJob {
1103                id: Uuid::new_v4(),
1104                job_type: job_type.to_string(),
1105                args: args_json,
1106                context: serde_json::json!({}),
1107                owner_subject: self.auth.principal_id(),
1108                priority: job_info.priority.as_i32(),
1109                max_attempts: job_info.retry.max_attempts as i32,
1110                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1111            };
1112
1113            let job_id = pending.id;
1114            outbox
1115                .lock()
1116                .expect("outbox lock poisoned")
1117                .jobs
1118                .push(pending);
1119            return Ok(job_id);
1120        }
1121
1122        // Non-transactional mode: dispatch immediately
1123        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1124            crate::error::ForgeError::Internal("Job dispatch not available".into())
1125        })?;
1126        dispatcher
1127            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1128            .await
1129    }
1130
1131    /// Dispatch a job with initial context.
1132    pub async fn dispatch_job_with_context<T: serde::Serialize>(
1133        &self,
1134        job_type: &str,
1135        args: T,
1136        context: serde_json::Value,
1137    ) -> crate::error::Result<Uuid> {
1138        let args_json = serde_json::to_value(args)?;
1139
1140        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1141            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1142                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1143            })?;
1144
1145            let pending = PendingJob {
1146                id: Uuid::new_v4(),
1147                job_type: job_type.to_string(),
1148                args: args_json,
1149                context,
1150                owner_subject: self.auth.principal_id(),
1151                priority: job_info.priority.as_i32(),
1152                max_attempts: job_info.retry.max_attempts as i32,
1153                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1154            };
1155
1156            let job_id = pending.id;
1157            outbox
1158                .lock()
1159                .expect("outbox lock poisoned")
1160                .jobs
1161                .push(pending);
1162            return Ok(job_id);
1163        }
1164
1165        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1166            crate::error::ForgeError::Internal("Job dispatch not available".into())
1167        })?;
1168        dispatcher
1169            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1170            .await
1171    }
1172
1173    /// Request cancellation for a job.
1174    pub async fn cancel_job(
1175        &self,
1176        job_id: Uuid,
1177        reason: Option<String>,
1178    ) -> crate::error::Result<bool> {
1179        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1180            crate::error::ForgeError::Internal("Job dispatch not available".into())
1181        })?;
1182        dispatcher.cancel(job_id, reason).await
1183    }
1184
1185    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
1186    pub async fn start_workflow<T: serde::Serialize>(
1187        &self,
1188        workflow_name: &str,
1189        input: T,
1190    ) -> crate::error::Result<Uuid> {
1191        let input_json = serde_json::to_value(input)?;
1192
1193        // Transactional mode: buffer the workflow for atomic commit
1194        if let Some(outbox) = &self.outbox {
1195            // Resolve version and signature eagerly so the INSERT has them ready.
1196            // The executor would do this anyway on flush, but doing it here surfaces
1197            // "no active version" errors at call time rather than after commit.
1198            let info = self
1199                .workflow_dispatch
1200                .as_ref()
1201                .and_then(|d| d.get_info(workflow_name))
1202                .ok_or_else(|| {
1203                    crate::error::ForgeError::NotFound(format!(
1204                        "No active version of workflow '{}'",
1205                        workflow_name
1206                    ))
1207                })?;
1208
1209            let pending = PendingWorkflow {
1210                id: Uuid::new_v4(),
1211                workflow_name: workflow_name.to_string(),
1212                workflow_version: info.version.to_string(),
1213                workflow_signature: info.signature.to_string(),
1214                input: input_json,
1215                owner_subject: self.auth.principal_id(),
1216            };
1217
1218            let workflow_id = pending.id;
1219            outbox
1220                .lock()
1221                .expect("outbox lock poisoned")
1222                .workflows
1223                .push(pending);
1224            return Ok(workflow_id);
1225        }
1226
1227        // Non-transactional mode: start immediately
1228        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1229            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1230        })?;
1231        dispatcher
1232            .start_by_name(workflow_name, input_json, self.auth.principal_id())
1233            .await
1234    }
1235}
1236
1237impl EnvAccess for MutationContext {
1238    fn env_provider(&self) -> &dyn EnvProvider {
1239        self.env_provider.as_ref()
1240    }
1241}
1242
1243#[cfg(test)]
1244#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1245mod tests {
1246    use super::*;
1247
1248    #[test]
1249    fn test_auth_context_unauthenticated() {
1250        let ctx = AuthContext::unauthenticated();
1251        assert!(!ctx.is_authenticated());
1252        assert!(ctx.user_id().is_none());
1253        assert!(ctx.require_user_id().is_err());
1254    }
1255
1256    #[test]
1257    fn test_auth_context_authenticated() {
1258        let user_id = Uuid::new_v4();
1259        let ctx = AuthContext::authenticated(
1260            user_id,
1261            vec!["admin".to_string(), "user".to_string()],
1262            HashMap::new(),
1263        );
1264
1265        assert!(ctx.is_authenticated());
1266        assert_eq!(ctx.user_id(), Some(user_id));
1267        assert!(ctx.require_user_id().is_ok());
1268        assert!(ctx.has_role("admin"));
1269        assert!(ctx.has_role("user"));
1270        assert!(!ctx.has_role("superadmin"));
1271        assert!(ctx.require_role("admin").is_ok());
1272        assert!(ctx.require_role("superadmin").is_err());
1273    }
1274
1275    #[test]
1276    fn test_auth_context_with_claims() {
1277        let mut claims = HashMap::new();
1278        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1279
1280        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1281
1282        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1283        assert!(ctx.claim("nonexistent").is_none());
1284    }
1285
1286    #[test]
1287    fn test_request_metadata() {
1288        let meta = RequestMetadata::new();
1289        assert!(!meta.trace_id.is_empty());
1290        assert!(meta.client_ip.is_none());
1291
1292        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1293        assert_eq!(meta2.trace_id, "trace-123");
1294    }
1295}