Skip to main content

forge_core/function/
context.rs

1//! Execution contexts for queries and mutations.
2//!
3//! Every function receives a context providing access to:
4//!
5//! - Database connection (pool or transaction)
6//! - Authentication state (user ID, roles, claims)
7//! - Request metadata (request ID, trace ID, client IP)
8//! - Environment variables
9//! - Job/workflow dispatch (mutations only)
10//!
11//! # QueryContext vs MutationContext
12//!
13//! | Feature | QueryContext | MutationContext |
14//! |---------|--------------|-----------------|
15//! | Database | Pool (read-only) | Transaction or pool |
16//! | Dispatch jobs | No | Yes |
17//! | Start workflows | No | Yes |
18//! | HTTP client | No | Yes (circuit breaker) |
19//!
20//! # Transactional Mutations
21//!
22//! When `transactional = true` (default), mutations run in a transaction.
23//! Jobs and workflows dispatched during the mutation are buffered and only
24//! inserted after the transaction commits successfully.
25//!
26//! ```text
27//! BEGIN
28//!   ├── ctx.db().execute(...)
29//!   ├── ctx.dispatch_job("send_email", ...)  // buffered
30//!   └── return Ok(result)
31//! COMMIT
32//!   └── INSERT INTO forge_jobs (buffered jobs)
33//! ```
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54/// Token issuer for signing JWTs.
55///
56/// Implemented by the runtime when HMAC auth is configured.
57/// Available via `ctx.issue_token()` in mutation handlers.
58pub trait TokenIssuer: Send + Sync {
59    /// Sign the given claims into a JWT string.
60    fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63/// Connection wrapper that implements sqlx's `Executor` trait with automatic
64/// `db.query` tracing spans.
65///
66/// Obtain via `ctx.conn().await?` in mutation handlers.
67/// Works with compile-time checked macros via `&mut conn`.
68///
69/// ```ignore
70/// let mut conn = ctx.conn().await?;
71/// sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
72///     .fetch_one(&mut *conn)
73///     .await?
74/// ```
75pub enum ForgeConn<'a> {
76    Pool(sqlx::pool::PoolConnection<Postgres>),
77    Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81    type Target = PgConnection;
82    fn deref(&self) -> &PgConnection {
83        match self {
84            ForgeConn::Pool(c) => c,
85            ForgeConn::Tx(g) => g,
86        }
87    }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91    fn deref_mut(&mut self) -> &mut PgConnection {
92        match self {
93            ForgeConn::Pool(c) => c,
94            ForgeConn::Tx(g) => g,
95        }
96    }
97}
98
99/// Pool wrapper that adds `db.query` tracing spans to every database operation.
100///
101/// Returned by [`QueryContext::db()`]. Implements sqlx's [`sqlx::Executor`] trait,
102/// so it works as a drop-in replacement for `&PgPool` with compile-time
103/// checked macros (`query!`, `query_as!`).
104///
105/// ```ignore
106/// sqlx::query_as!(User, "SELECT * FROM users")
107///     .fetch_all(ctx.db())
108///     .await?
109/// ```
110#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_tuple("ForgeDb").finish()
116    }
117}
118
119impl ForgeDb {
120    /// Create a `ForgeDb` from a pool reference. Clones the Arc-backed pool handle.
121    pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122        Self(pool.clone())
123    }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127    let bytes = sql.trim_start().as_bytes();
128    match bytes.get(..6) {
129        Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130        Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131        Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132        Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133        _ => "OTHER",
134    }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138    type Database = Postgres;
139
140    fn fetch_many<'e, 'q: 'e, E>(
141        self,
142        query: E,
143    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144    where
145        E: sqlx::Execute<'q, Postgres> + 'q,
146    {
147        (&self.0).fetch_many(query)
148    }
149
150    fn fetch_optional<'e, 'q: 'e, E>(
151        self,
152        query: E,
153    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154    where
155        E: sqlx::Execute<'q, Postgres> + 'q,
156    {
157        let op = sql_operation(query.sql());
158        let span =
159            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160        Box::pin(
161            async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162        )
163    }
164
165    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166    where
167        E: sqlx::Execute<'q, Postgres> + 'q,
168    {
169        let op = sql_operation(query.sql());
170        let span =
171            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172        Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173    }
174
175    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176    where
177        E: sqlx::Execute<'q, Postgres> + 'q,
178    {
179        let op = sql_operation(query.sql());
180        let span =
181            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182        Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183    }
184
185    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186    where
187        E: sqlx::Execute<'q, Postgres> + 'q,
188    {
189        let op = sql_operation(query.sql());
190        let span =
191            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192        Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193    }
194
195    fn prepare_with<'e, 'q: 'e>(
196        self,
197        sql: &'q str,
198        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200        Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201    }
202
203    fn describe<'e, 'q: 'e>(
204        self,
205        sql: &'q str,
206    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207        Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208    }
209}
210
211/// Abstraction over pool and transaction connections.
212///
213/// Allows shared helper functions to work with any context type.
214/// Obtain via `ctx.db_conn()` on pool-based contexts (queries, jobs, crons,
215/// daemons, webhooks, MCP tools) or via `ctx.db()` on `MutationContext`.
216///
217/// # Example
218///
219/// ```ignore
220/// pub async fn list_items(db: DbConn<'_>) -> Result<Vec<Item>> {
221///     db.fetch_all(sqlx::query_as!(Item, "SELECT * FROM items ORDER BY created_at DESC"))
222///         .await
223///         .map_err(Into::into)
224/// }
225/// ```
226pub enum DbConn<'a> {
227    /// Direct pool connection (queries, jobs, crons, daemons, webhooks, MCP).
228    Pool(sqlx::PgPool),
229    /// Transaction handle (transactional mutations).
230    Transaction(
231        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
232        &'a sqlx::PgPool,
233    ),
234}
235
236impl DbConn<'_> {
237    /// Fetch exactly one row.
238    pub async fn fetch_one<'q, O>(
239        &self,
240        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
241    ) -> sqlx::Result<O>
242    where
243        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
244    {
245        match self {
246            DbConn::Pool(pool) => query.fetch_one(pool).await,
247            DbConn::Transaction(tx, _) => {
248                let mut guard = tx.lock().await;
249                query.fetch_one(&mut **guard).await
250            }
251        }
252    }
253
254    /// Fetch zero or one row.
255    pub async fn fetch_optional<'q, O>(
256        &self,
257        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
258    ) -> sqlx::Result<Option<O>>
259    where
260        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
261    {
262        match self {
263            DbConn::Pool(pool) => query.fetch_optional(pool).await,
264            DbConn::Transaction(tx, _) => {
265                let mut guard = tx.lock().await;
266                query.fetch_optional(&mut **guard).await
267            }
268        }
269    }
270
271    /// Fetch all matching rows.
272    pub async fn fetch_all<'q, O>(
273        &self,
274        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
275    ) -> sqlx::Result<Vec<O>>
276    where
277        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
278    {
279        match self {
280            DbConn::Pool(pool) => query.fetch_all(pool).await,
281            DbConn::Transaction(tx, _) => {
282                let mut guard = tx.lock().await;
283                query.fetch_all(&mut **guard).await
284            }
285        }
286    }
287
288    /// Execute a statement (INSERT, UPDATE, DELETE).
289    pub async fn execute<'q>(
290        &self,
291        query: sqlx::query::Query<'q, Postgres, sqlx::postgres::PgArguments>,
292    ) -> sqlx::Result<PgQueryResult> {
293        match self {
294            DbConn::Pool(pool) => query.execute(pool).await,
295            DbConn::Transaction(tx, _) => {
296                let mut guard = tx.lock().await;
297                query.execute(&mut **guard).await
298            }
299        }
300    }
301}
302
303impl std::fmt::Debug for DbConn<'_> {
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        match self {
306            DbConn::Pool(_) => f.debug_tuple("DbConn::Pool").finish(),
307            DbConn::Transaction(_, _) => f.debug_tuple("DbConn::Transaction").finish(),
308        }
309    }
310}
311
312impl std::fmt::Debug for ForgeConn<'_> {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        match self {
315            ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
316            ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
317        }
318    }
319}
320
321impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
322    type Database = Postgres;
323
324    fn fetch_many<'e, 'q: 'e, E>(
325        self,
326        query: E,
327    ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
328    where
329        'c: 'e,
330        E: sqlx::Execute<'q, Postgres> + 'q,
331    {
332        let conn: &'e mut PgConnection = &mut *self;
333        conn.fetch_many(query)
334    }
335
336    fn fetch_optional<'e, 'q: 'e, E>(
337        self,
338        query: E,
339    ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
340    where
341        'c: 'e,
342        E: sqlx::Execute<'q, Postgres> + 'q,
343    {
344        let op = sql_operation(query.sql());
345        let span =
346            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
347        let conn: &'e mut PgConnection = &mut *self;
348        Box::pin(conn.fetch_optional(query).instrument(span))
349    }
350
351    fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
352    where
353        'c: 'e,
354        E: sqlx::Execute<'q, Postgres> + 'q,
355    {
356        let op = sql_operation(query.sql());
357        let span =
358            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
359        let conn: &'e mut PgConnection = &mut *self;
360        Box::pin(conn.execute(query).instrument(span))
361    }
362
363    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
364    where
365        'c: 'e,
366        E: sqlx::Execute<'q, Postgres> + 'q,
367    {
368        let op = sql_operation(query.sql());
369        let span =
370            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
371        let conn: &'e mut PgConnection = &mut *self;
372        Box::pin(conn.fetch_all(query).instrument(span))
373    }
374
375    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
376    where
377        'c: 'e,
378        E: sqlx::Execute<'q, Postgres> + 'q,
379    {
380        let op = sql_operation(query.sql());
381        let span =
382            tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
383        let conn: &'e mut PgConnection = &mut *self;
384        Box::pin(conn.fetch_one(query).instrument(span))
385    }
386
387    fn prepare_with<'e, 'q: 'e>(
388        self,
389        sql: &'q str,
390        parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
391    ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
392    where
393        'c: 'e,
394    {
395        let conn: &'e mut PgConnection = &mut *self;
396        conn.prepare_with(sql, parameters)
397    }
398
399    fn describe<'e, 'q: 'e>(
400        self,
401        sql: &'q str,
402    ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
403    where
404        'c: 'e,
405    {
406        let conn: &'e mut PgConnection = &mut *self;
407        conn.describe(sql)
408    }
409}
410
411/// A job buffered for dispatch after transaction commit.
412#[derive(Debug, Clone)]
413pub struct PendingJob {
414    pub id: Uuid,
415    pub job_type: String,
416    pub args: serde_json::Value,
417    pub context: serde_json::Value,
418    pub owner_subject: Option<String>,
419    pub priority: i32,
420    pub max_attempts: i32,
421    pub worker_capability: Option<String>,
422}
423
424/// A workflow buffered for dispatch after transaction commit.
425#[derive(Debug, Clone)]
426pub struct PendingWorkflow {
427    pub id: Uuid,
428    pub workflow_name: String,
429    pub input: serde_json::Value,
430    pub owner_subject: Option<String>,
431}
432
433/// Buffer for jobs and workflows dispatched during a transactional mutation.
434///
435/// Entries are flushed to the database atomically after the mutation transaction commits.
436/// If the transaction rolls back, buffered dispatches are discarded.
437#[derive(Default)]
438pub struct OutboxBuffer {
439    pub jobs: Vec<PendingJob>,
440    pub workflows: Vec<PendingWorkflow>,
441}
442
443/// Authentication context available to all functions.
444#[derive(Debug, Clone)]
445pub struct AuthContext {
446    /// The authenticated user ID (if any).
447    user_id: Option<Uuid>,
448    /// User roles.
449    roles: Vec<String>,
450    /// Custom claims from JWT.
451    claims: HashMap<String, serde_json::Value>,
452    /// Whether the request is authenticated.
453    authenticated: bool,
454}
455
456impl AuthContext {
457    /// Create an unauthenticated context.
458    pub fn unauthenticated() -> Self {
459        Self {
460            user_id: None,
461            roles: Vec::new(),
462            claims: HashMap::new(),
463            authenticated: false,
464        }
465    }
466
467    /// Create an authenticated context with a UUID user ID.
468    pub fn authenticated(
469        user_id: Uuid,
470        roles: Vec<String>,
471        claims: HashMap<String, serde_json::Value>,
472    ) -> Self {
473        Self {
474            user_id: Some(user_id),
475            roles,
476            claims,
477            authenticated: true,
478        }
479    }
480
481    /// Create an authenticated context without requiring a UUID user ID.
482    ///
483    /// Use this for auth providers that don't use UUID subjects (e.g., Firebase,
484    /// Clerk). The raw subject string is available via `subject()` method
485    /// from the "sub" claim.
486    pub fn authenticated_without_uuid(
487        roles: Vec<String>,
488        claims: HashMap<String, serde_json::Value>,
489    ) -> Self {
490        Self {
491            user_id: None,
492            roles,
493            claims,
494            authenticated: true,
495        }
496    }
497
498    /// Check if the user is authenticated.
499    pub fn is_authenticated(&self) -> bool {
500        self.authenticated
501    }
502
503    /// Get the user ID if authenticated.
504    pub fn user_id(&self) -> Option<Uuid> {
505        self.user_id
506    }
507
508    /// Get the user ID, returning an error if not authenticated.
509    pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
510        self.user_id
511            .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
512    }
513
514    /// Check if the user has a specific role.
515    pub fn has_role(&self, role: &str) -> bool {
516        self.roles.iter().any(|r| r == role)
517    }
518
519    /// Require a specific role, returning an error if not present.
520    pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
521        if self.has_role(role) {
522            Ok(())
523        } else {
524            Err(crate::error::ForgeError::Forbidden(format!(
525                "Required role '{}' not present",
526                role
527            )))
528        }
529    }
530
531    /// Get a custom claim value.
532    pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
533        self.claims.get(key)
534    }
535
536    /// Get all custom claims.
537    pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
538        &self.claims
539    }
540
541    /// Get all roles.
542    pub fn roles(&self) -> &[String] {
543        &self.roles
544    }
545
546    /// Get the raw subject claim.
547    ///
548    /// This works with any provider's subject format (UUID, email, custom ID).
549    /// For providers like Firebase or Clerk that don't use UUIDs, use this
550    /// instead of `user_id()`.
551    pub fn subject(&self) -> Option<&str> {
552        self.claims.get("sub").and_then(|v| v.as_str())
553    }
554
555    /// Like `require_user_id()` but returns the raw subject string for non-UUID providers.
556    pub fn require_subject(&self) -> crate::error::Result<&str> {
557        if !self.authenticated {
558            return Err(crate::error::ForgeError::Unauthorized(
559                "Authentication required".to_string(),
560            ));
561        }
562        self.subject().ok_or_else(|| {
563            crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
564        })
565    }
566
567    /// Get a stable principal identifier for access control and cache scoping.
568    ///
569    /// Prefers the raw JWT `sub` claim and falls back to UUID user_id.
570    pub fn principal_id(&self) -> Option<String> {
571        self.subject()
572            .map(ToString::to_string)
573            .or_else(|| self.user_id.map(|id| id.to_string()))
574    }
575
576    /// Check whether this principal should be treated as privileged admin.
577    pub fn is_admin(&self) -> bool {
578        self.roles.iter().any(|r| r == "admin")
579    }
580
581    /// Get the tenant ID from the JWT claims, if present.
582    ///
583    /// Looks for a `tenant_id` claim in the token and attempts to parse it as
584    /// a UUID. Returns `None` if the claim is absent or not a valid UUID.
585    pub fn tenant_id(&self) -> Option<uuid::Uuid> {
586        self.claims
587            .get("tenant_id")
588            .and_then(|v| v.as_str())
589            .and_then(|s| uuid::Uuid::parse_str(s).ok())
590    }
591}
592
593/// Request metadata available to all functions.
594#[derive(Debug, Clone)]
595pub struct RequestMetadata {
596    /// Unique request ID for tracing.
597    pub request_id: Uuid,
598    /// Trace ID for distributed tracing.
599    pub trace_id: String,
600    /// Client IP address.
601    pub client_ip: Option<String>,
602    /// User agent string.
603    pub user_agent: Option<String>,
604    /// Correlation ID linking frontend events to this backend call.
605    pub correlation_id: Option<String>,
606    /// Request timestamp.
607    pub timestamp: chrono::DateTime<chrono::Utc>,
608}
609
610impl RequestMetadata {
611    /// Create new request metadata.
612    pub fn new() -> Self {
613        Self {
614            request_id: Uuid::new_v4(),
615            trace_id: Uuid::new_v4().to_string(),
616            client_ip: None,
617            user_agent: None,
618            correlation_id: None,
619            timestamp: chrono::Utc::now(),
620        }
621    }
622
623    /// Create with a specific trace ID.
624    pub fn with_trace_id(trace_id: String) -> Self {
625        Self {
626            request_id: Uuid::new_v4(),
627            trace_id,
628            client_ip: None,
629            user_agent: None,
630            correlation_id: None,
631            timestamp: chrono::Utc::now(),
632        }
633    }
634}
635
636impl Default for RequestMetadata {
637    fn default() -> Self {
638        Self::new()
639    }
640}
641
642/// Context for query functions (read-only database access).
643pub struct QueryContext {
644    /// Authentication context.
645    pub auth: AuthContext,
646    /// Request metadata.
647    pub request: RequestMetadata,
648    /// Database pool for read operations.
649    db_pool: sqlx::PgPool,
650    /// Environment variable provider.
651    env_provider: Arc<dyn EnvProvider>,
652}
653
654impl QueryContext {
655    /// Create a new query context.
656    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
657        Self {
658            auth,
659            request,
660            db_pool,
661            env_provider: Arc::new(RealEnvProvider::new()),
662        }
663    }
664
665    /// Create a query context with a custom environment provider.
666    pub fn with_env(
667        db_pool: sqlx::PgPool,
668        auth: AuthContext,
669        request: RequestMetadata,
670        env_provider: Arc<dyn EnvProvider>,
671    ) -> Self {
672        Self {
673            auth,
674            request,
675            db_pool,
676            env_provider,
677        }
678    }
679
680    /// Database handle with automatic `db.query` tracing spans.
681    ///
682    /// Works directly with sqlx compile-time checked macros:
683    /// ```ignore
684    /// sqlx::query_as!(User, "SELECT * FROM users")
685    ///     .fetch_all(ctx.db())
686    ///     .await?
687    /// ```
688    pub fn db(&self) -> ForgeDb {
689        ForgeDb(self.db_pool.clone())
690    }
691
692    /// Get a `DbConn` for use in shared helper functions.
693    ///
694    /// Returns a pool-backed `DbConn` that can be passed to functions
695    /// accepting `DbConn<'_>` for cross-context reuse.
696    ///
697    /// ```ignore
698    /// pub async fn list_items(db: DbConn<'_>) -> Result<Vec<Item>> { ... }
699    ///
700    /// #[forge::query]
701    /// pub async fn get_items(ctx: &QueryContext) -> Result<Vec<Item>> {
702    ///     list_items(ctx.db_conn()).await
703    /// }
704    /// ```
705    pub fn db_conn(&self) -> DbConn<'_> {
706        DbConn::Pool(self.db_pool.clone())
707    }
708
709    /// Get the authenticated user's UUID. Returns 401 if not authenticated.
710    pub fn user_id(&self) -> crate::error::Result<Uuid> {
711        self.auth.require_user_id()
712    }
713
714    /// Get the tenant ID from JWT claims, if present.
715    pub fn tenant_id(&self) -> Option<Uuid> {
716        self.auth.tenant_id()
717    }
718}
719
720impl EnvAccess for QueryContext {
721    fn env_provider(&self) -> &dyn EnvProvider {
722        self.env_provider.as_ref()
723    }
724}
725
726/// Callback type for looking up job info by name.
727pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
728
729/// Token TTL configuration resolved from `[auth]` in forge.toml.
730#[derive(Debug, Clone)]
731pub struct AuthTokenTtl {
732    /// Access token lifetime in seconds (default 3600).
733    pub access_token_secs: i64,
734    /// Refresh token lifetime in days (default 30).
735    pub refresh_token_days: i64,
736}
737
738impl Default for AuthTokenTtl {
739    fn default() -> Self {
740        Self {
741            access_token_secs: 3600,
742            refresh_token_days: 30,
743        }
744    }
745}
746
747/// Context for mutation functions (transactional database access).
748pub struct MutationContext {
749    /// Authentication context.
750    pub auth: AuthContext,
751    /// Request metadata.
752    pub request: RequestMetadata,
753    /// Database pool for transactional operations.
754    db_pool: sqlx::PgPool,
755    /// HTTP client with circuit breaker for external requests.
756    http_client: CircuitBreakerClient,
757    /// Default timeout for outbound HTTP requests made through the
758    /// circuit-breaker client. `None` means unlimited.
759    http_timeout: Option<Duration>,
760    /// Optional job dispatcher for dispatching background jobs.
761    job_dispatch: Option<Arc<dyn JobDispatch>>,
762    /// Optional workflow dispatcher for starting workflows.
763    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
764    /// Environment variable provider.
765    env_provider: Arc<dyn EnvProvider>,
766    /// Transaction handle for transactional mutations.
767    tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
768    /// Outbox buffer for jobs/workflows dispatched during transaction.
769    outbox: Option<Arc<Mutex<OutboxBuffer>>>,
770    /// Job info lookup for transactional dispatch.
771    job_info_lookup: Option<JobInfoLookup>,
772    /// Optional token issuer for signing JWTs (available when HMAC auth is configured).
773    token_issuer: Option<Arc<dyn TokenIssuer>>,
774    /// Token TTL config from forge.toml.
775    token_ttl: AuthTokenTtl,
776}
777
778impl MutationContext {
779    /// Create a new mutation context.
780    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
781        Self {
782            auth,
783            request,
784            db_pool,
785            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
786            http_timeout: None,
787            job_dispatch: None,
788            workflow_dispatch: None,
789            env_provider: Arc::new(RealEnvProvider::new()),
790            tx: None,
791            outbox: None,
792            job_info_lookup: None,
793            token_issuer: None,
794            token_ttl: AuthTokenTtl::default(),
795        }
796    }
797
798    /// Create a mutation context with dispatch capabilities.
799    pub fn with_dispatch(
800        db_pool: sqlx::PgPool,
801        auth: AuthContext,
802        request: RequestMetadata,
803        http_client: CircuitBreakerClient,
804        job_dispatch: Option<Arc<dyn JobDispatch>>,
805        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
806    ) -> Self {
807        Self {
808            auth,
809            request,
810            db_pool,
811            http_client,
812            http_timeout: None,
813            job_dispatch,
814            workflow_dispatch,
815            env_provider: Arc::new(RealEnvProvider::new()),
816            tx: None,
817            outbox: None,
818            job_info_lookup: None,
819            token_issuer: None,
820            token_ttl: AuthTokenTtl::default(),
821        }
822    }
823
824    /// Create a mutation context with a custom environment provider.
825    pub fn with_env(
826        db_pool: sqlx::PgPool,
827        auth: AuthContext,
828        request: RequestMetadata,
829        http_client: CircuitBreakerClient,
830        job_dispatch: Option<Arc<dyn JobDispatch>>,
831        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
832        env_provider: Arc<dyn EnvProvider>,
833    ) -> Self {
834        Self {
835            auth,
836            request,
837            db_pool,
838            http_client,
839            http_timeout: None,
840            job_dispatch,
841            workflow_dispatch,
842            env_provider,
843            tx: None,
844            outbox: None,
845            job_info_lookup: None,
846            token_issuer: None,
847            token_ttl: AuthTokenTtl::default(),
848        }
849    }
850
851    /// Returns handles to transaction and outbox for the caller to commit/flush.
852    #[allow(clippy::type_complexity)]
853    pub fn with_transaction(
854        db_pool: sqlx::PgPool,
855        tx: Transaction<'static, Postgres>,
856        auth: AuthContext,
857        request: RequestMetadata,
858        http_client: CircuitBreakerClient,
859        job_info_lookup: JobInfoLookup,
860    ) -> (
861        Self,
862        Arc<AsyncMutex<Transaction<'static, Postgres>>>,
863        Arc<Mutex<OutboxBuffer>>,
864    ) {
865        let tx_handle = Arc::new(AsyncMutex::new(tx));
866        let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
867
868        let ctx = Self {
869            auth,
870            request,
871            db_pool,
872            http_client,
873            http_timeout: None,
874            job_dispatch: None,
875            workflow_dispatch: None,
876            env_provider: Arc::new(RealEnvProvider::new()),
877            tx: Some(tx_handle.clone()),
878            outbox: Some(outbox.clone()),
879            job_info_lookup: Some(job_info_lookup),
880            token_issuer: None,
881            token_ttl: AuthTokenTtl::default(),
882        };
883
884        (ctx, tx_handle, outbox)
885    }
886
887    pub fn is_transactional(&self) -> bool {
888        self.tx.is_some()
889    }
890
891    /// Acquire a connection compatible with sqlx compile-time checked macros.
892    ///
893    /// In transactional mode, returns a guard over the active transaction.
894    /// Otherwise acquires a fresh connection from the pool.
895    ///
896    /// ```ignore
897    /// let mut conn = ctx.conn().await?;
898    /// sqlx::query_as!(User, "INSERT INTO users ... RETURNING *", ...)
899    ///     .fetch_one(&mut *conn)
900    ///     .await?
901    /// ```
902    pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
903        match &self.tx {
904            Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
905            None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
906        }
907    }
908
909    /// Direct pool access for operations that cannot run inside a transaction.
910    pub fn pool(&self) -> &sqlx::PgPool {
911        &self.db_pool
912    }
913
914    /// Get a `DbConn` for use in shared helper functions.
915    ///
916    /// In transactional mode, returns a transaction-backed `DbConn`.
917    /// Otherwise returns a pool-backed `DbConn`.
918    ///
919    /// ```ignore
920    /// pub async fn list_items(db: DbConn<'_>) -> Result<Vec<Item>> { ... }
921    ///
922    /// #[forge::mutation]
923    /// pub async fn items_snapshot(ctx: &MutationContext, input: Input) -> Result<Vec<Item>> {
924    ///     list_items(ctx.db()).await
925    /// }
926    /// ```
927    pub fn db(&self) -> DbConn<'_> {
928        match &self.tx {
929            Some(tx) => DbConn::Transaction(tx.clone(), &self.db_pool),
930            None => DbConn::Pool(self.db_pool.clone()),
931        }
932    }
933
934    /// Get a `DbConn` for use in shared helper functions (alias for `db()`).
935    pub fn db_conn(&self) -> DbConn<'_> {
936        self.db()
937    }
938
939    /// Get the HTTP client for external requests.
940    ///
941    /// Requests go through the circuit breaker automatically. When the handler
942    /// declared an explicit `timeout`, that timeout is also applied to outbound
943    /// HTTP requests unless the request overrides it.
944    pub fn http(&self) -> crate::http::HttpClient {
945        self.http_client.with_timeout(self.http_timeout)
946    }
947
948    /// Get the raw reqwest client, bypassing circuit breaker execution.
949    pub fn raw_http(&self) -> &reqwest::Client {
950        self.http_client.inner()
951    }
952
953    /// Set the default outbound HTTP request timeout for this context.
954    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
955        self.http_timeout = timeout;
956    }
957
958    /// Get the authenticated user's UUID. Returns 401 if not authenticated.
959    pub fn user_id(&self) -> crate::error::Result<Uuid> {
960        self.auth.require_user_id()
961    }
962
963    /// Get the tenant ID from JWT claims, if present.
964    pub fn tenant_id(&self) -> Option<Uuid> {
965        self.auth.tenant_id()
966    }
967
968    /// Set the token issuer for this context.
969    pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
970        self.token_issuer = Some(issuer);
971    }
972
973    /// Set the token TTL configuration (from forge.toml `[auth]`).
974    pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
975        self.token_ttl = ttl;
976    }
977
978    /// Issue a signed JWT from the given claims.
979    ///
980    /// Only available when HMAC auth is configured in `forge.toml`.
981    /// Returns an error if auth is not configured or uses an external provider (RSA/JWKS).
982    ///
983    /// ```ignore
984    /// let claims = Claims::builder()
985    ///     .user_id(user.id)
986    ///     .duration_secs(7 * 24 * 3600)
987    ///     .build()
988    ///     .map_err(|e| ForgeError::Internal(e))?;
989    ///
990    /// let token = ctx.issue_token(&claims)?;
991    /// ```
992    pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
993        let issuer = self.token_issuer.as_ref().ok_or_else(|| {
994            crate::error::ForgeError::Internal(
995                "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
996                    .into(),
997            )
998        })?;
999        issuer.sign(claims)
1000    }
1001
1002    /// Issue an access + refresh token pair for the given user.
1003    ///
1004    /// Stores the refresh token hash in `forge_refresh_tokens` and returns
1005    /// both tokens. Use `rotate_refresh_token()` to exchange a refresh token
1006    /// for a new pair, and `revoke_refresh_token()` to invalidate one.
1007    ///
1008    /// TTLs come from `[auth]` in forge.toml:
1009    /// - `access_token_ttl` (default "1h")
1010    /// - `refresh_token_ttl` (default "30d")
1011    pub async fn issue_token_pair(
1012        &self,
1013        user_id: Uuid,
1014        roles: &[&str],
1015    ) -> crate::error::Result<crate::auth::TokenPair> {
1016        let issuer = self.token_issuer.clone().ok_or_else(|| {
1017            crate::error::ForgeError::Internal(
1018                "Token issuer not available. Configure [auth] in forge.toml".into(),
1019            )
1020        })?;
1021        let access_ttl = self.token_ttl.access_token_secs;
1022        let refresh_ttl = self.token_ttl.refresh_token_days;
1023        crate::auth::tokens::issue_token_pair(
1024            &self.db_pool,
1025            user_id,
1026            roles,
1027            access_ttl,
1028            refresh_ttl,
1029            move |uid, r, ttl| {
1030                let claims = Claims::builder()
1031                    .subject(uid)
1032                    .roles(r.iter().map(|s| s.to_string()).collect())
1033                    .duration_secs(ttl)
1034                    .build()
1035                    .map_err(crate::error::ForgeError::Internal)?;
1036                issuer.sign(&claims)
1037            },
1038        )
1039        .await
1040    }
1041
1042    /// Rotate a refresh token: validate the old one, issue a new pair.
1043    ///
1044    /// The old token is atomically deleted and a new access + refresh pair
1045    /// is returned. Fails if the token is invalid or expired.
1046    pub async fn rotate_refresh_token(
1047        &self,
1048        old_refresh_token: &str,
1049    ) -> crate::error::Result<crate::auth::TokenPair> {
1050        let issuer = self.token_issuer.clone().ok_or_else(|| {
1051            crate::error::ForgeError::Internal(
1052                "Token issuer not available. Configure [auth] in forge.toml".into(),
1053            )
1054        })?;
1055        let access_ttl = self.token_ttl.access_token_secs;
1056        let refresh_ttl = self.token_ttl.refresh_token_days;
1057        crate::auth::tokens::rotate_refresh_token(
1058            &self.db_pool,
1059            old_refresh_token,
1060            &["user"],
1061            access_ttl,
1062            refresh_ttl,
1063            move |uid, r, ttl| {
1064                let claims = Claims::builder()
1065                    .subject(uid)
1066                    .roles(r.iter().map(|s| s.to_string()).collect())
1067                    .duration_secs(ttl)
1068                    .build()
1069                    .map_err(crate::error::ForgeError::Internal)?;
1070                issuer.sign(&claims)
1071            },
1072        )
1073        .await
1074    }
1075
1076    /// Revoke a specific refresh token (e.g., on logout).
1077    pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1078        crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1079    }
1080
1081    /// Revoke all refresh tokens for a user (e.g., on password change or account deletion).
1082    pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1083        crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1084    }
1085
1086    /// In transactional mode, buffers for atomic commit; otherwise dispatches immediately.
1087    pub async fn dispatch_job<T: serde::Serialize>(
1088        &self,
1089        job_type: &str,
1090        args: T,
1091    ) -> crate::error::Result<Uuid> {
1092        let args_json = serde_json::to_value(args)?;
1093
1094        // Transactional mode: buffer the job for atomic commit
1095        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1096            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1097                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1098            })?;
1099
1100            let pending = PendingJob {
1101                id: Uuid::new_v4(),
1102                job_type: job_type.to_string(),
1103                args: args_json,
1104                context: serde_json::json!({}),
1105                owner_subject: self.auth.principal_id(),
1106                priority: job_info.priority.as_i32(),
1107                max_attempts: job_info.retry.max_attempts as i32,
1108                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1109            };
1110
1111            let job_id = pending.id;
1112            outbox
1113                .lock()
1114                .expect("outbox lock poisoned")
1115                .jobs
1116                .push(pending);
1117            return Ok(job_id);
1118        }
1119
1120        // Non-transactional mode: dispatch immediately
1121        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1122            crate::error::ForgeError::Internal("Job dispatch not available".into())
1123        })?;
1124        dispatcher
1125            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1126            .await
1127    }
1128
1129    /// Dispatch a job with initial context.
1130    pub async fn dispatch_job_with_context<T: serde::Serialize>(
1131        &self,
1132        job_type: &str,
1133        args: T,
1134        context: serde_json::Value,
1135    ) -> crate::error::Result<Uuid> {
1136        let args_json = serde_json::to_value(args)?;
1137
1138        if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1139            let job_info = job_info_lookup(job_type).ok_or_else(|| {
1140                crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1141            })?;
1142
1143            let pending = PendingJob {
1144                id: Uuid::new_v4(),
1145                job_type: job_type.to_string(),
1146                args: args_json,
1147                context,
1148                owner_subject: self.auth.principal_id(),
1149                priority: job_info.priority.as_i32(),
1150                max_attempts: job_info.retry.max_attempts as i32,
1151                worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1152            };
1153
1154            let job_id = pending.id;
1155            outbox
1156                .lock()
1157                .expect("outbox lock poisoned")
1158                .jobs
1159                .push(pending);
1160            return Ok(job_id);
1161        }
1162
1163        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1164            crate::error::ForgeError::Internal("Job dispatch not available".into())
1165        })?;
1166        dispatcher
1167            .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1168            .await
1169    }
1170
1171    /// Request cancellation for a job.
1172    pub async fn cancel_job(
1173        &self,
1174        job_id: Uuid,
1175        reason: Option<String>,
1176    ) -> crate::error::Result<bool> {
1177        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1178            crate::error::ForgeError::Internal("Job dispatch not available".into())
1179        })?;
1180        dispatcher.cancel(job_id, reason).await
1181    }
1182
1183    /// In transactional mode, buffers for atomic commit; otherwise starts immediately.
1184    pub async fn start_workflow<T: serde::Serialize>(
1185        &self,
1186        workflow_name: &str,
1187        input: T,
1188    ) -> crate::error::Result<Uuid> {
1189        let input_json = serde_json::to_value(input)?;
1190
1191        // Transactional mode: buffer the workflow for atomic commit
1192        if let Some(outbox) = &self.outbox {
1193            let pending = PendingWorkflow {
1194                id: Uuid::new_v4(),
1195                workflow_name: workflow_name.to_string(),
1196                input: input_json,
1197                owner_subject: self.auth.principal_id(),
1198            };
1199
1200            let workflow_id = pending.id;
1201            outbox
1202                .lock()
1203                .expect("outbox lock poisoned")
1204                .workflows
1205                .push(pending);
1206            return Ok(workflow_id);
1207        }
1208
1209        // Non-transactional mode: start immediately
1210        let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1211            crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1212        })?;
1213        dispatcher
1214            .start_by_name(workflow_name, input_json, self.auth.principal_id())
1215            .await
1216    }
1217}
1218
1219impl EnvAccess for MutationContext {
1220    fn env_provider(&self) -> &dyn EnvProvider {
1221        self.env_provider.as_ref()
1222    }
1223}
1224
1225#[cfg(test)]
1226#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1227mod tests {
1228    use super::*;
1229
1230    #[test]
1231    fn test_auth_context_unauthenticated() {
1232        let ctx = AuthContext::unauthenticated();
1233        assert!(!ctx.is_authenticated());
1234        assert!(ctx.user_id().is_none());
1235        assert!(ctx.require_user_id().is_err());
1236    }
1237
1238    #[test]
1239    fn test_auth_context_authenticated() {
1240        let user_id = Uuid::new_v4();
1241        let ctx = AuthContext::authenticated(
1242            user_id,
1243            vec!["admin".to_string(), "user".to_string()],
1244            HashMap::new(),
1245        );
1246
1247        assert!(ctx.is_authenticated());
1248        assert_eq!(ctx.user_id(), Some(user_id));
1249        assert!(ctx.require_user_id().is_ok());
1250        assert!(ctx.has_role("admin"));
1251        assert!(ctx.has_role("user"));
1252        assert!(!ctx.has_role("superadmin"));
1253        assert!(ctx.require_role("admin").is_ok());
1254        assert!(ctx.require_role("superadmin").is_err());
1255    }
1256
1257    #[test]
1258    fn test_auth_context_with_claims() {
1259        let mut claims = HashMap::new();
1260        claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1261
1262        let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1263
1264        assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1265        assert!(ctx.claim("nonexistent").is_none());
1266    }
1267
1268    #[test]
1269    fn test_request_metadata() {
1270        let meta = RequestMetadata::new();
1271        assert!(!meta.trace_id.is_empty());
1272        assert!(meta.client_ip.is_none());
1273
1274        let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1275        assert_eq!(meta2.trace_id, "trace-123");
1276    }
1277}