1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54pub trait TokenIssuer: Send + Sync {
59 fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63pub enum ForgeConn<'a> {
76 Pool(sqlx::pool::PoolConnection<Postgres>),
77 Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81 type Target = PgConnection;
82 fn deref(&self) -> &PgConnection {
83 match self {
84 ForgeConn::Pool(c) => c,
85 ForgeConn::Tx(g) => g,
86 }
87 }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91 fn deref_mut(&mut self) -> &mut PgConnection {
92 match self {
93 ForgeConn::Pool(c) => c,
94 ForgeConn::Tx(g) => g,
95 }
96 }
97}
98
99#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_tuple("ForgeDb").finish()
116 }
117}
118
119impl ForgeDb {
120 pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122 Self(pool.clone())
123 }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127 let bytes = sql.trim_start().as_bytes();
128 match bytes.get(..6) {
129 Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130 Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131 Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132 Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133 _ => "OTHER",
134 }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138 type Database = Postgres;
139
140 fn fetch_many<'e, 'q: 'e, E>(
141 self,
142 query: E,
143 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144 where
145 E: sqlx::Execute<'q, Postgres> + 'q,
146 {
147 (&self.0).fetch_many(query)
148 }
149
150 fn fetch_optional<'e, 'q: 'e, E>(
151 self,
152 query: E,
153 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154 where
155 E: sqlx::Execute<'q, Postgres> + 'q,
156 {
157 let op = sql_operation(query.sql());
158 let span =
159 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160 Box::pin(
161 async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162 )
163 }
164
165 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166 where
167 E: sqlx::Execute<'q, Postgres> + 'q,
168 {
169 let op = sql_operation(query.sql());
170 let span =
171 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172 Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173 }
174
175 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176 where
177 E: sqlx::Execute<'q, Postgres> + 'q,
178 {
179 let op = sql_operation(query.sql());
180 let span =
181 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182 Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183 }
184
185 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186 where
187 E: sqlx::Execute<'q, Postgres> + 'q,
188 {
189 let op = sql_operation(query.sql());
190 let span =
191 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192 Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193 }
194
195 fn prepare_with<'e, 'q: 'e>(
196 self,
197 sql: &'q str,
198 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200 Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201 }
202
203 fn describe<'e, 'q: 'e>(
204 self,
205 sql: &'q str,
206 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207 Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208 }
209}
210
211impl std::fmt::Debug for ForgeConn<'_> {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 match self {
214 ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
215 ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
216 }
217 }
218}
219
220impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
221 type Database = Postgres;
222
223 fn fetch_many<'e, 'q: 'e, E>(
224 self,
225 query: E,
226 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
227 where
228 'c: 'e,
229 E: sqlx::Execute<'q, Postgres> + 'q,
230 {
231 let conn: &'e mut PgConnection = &mut *self;
232 conn.fetch_many(query)
233 }
234
235 fn fetch_optional<'e, 'q: 'e, E>(
236 self,
237 query: E,
238 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
239 where
240 'c: 'e,
241 E: sqlx::Execute<'q, Postgres> + 'q,
242 {
243 let op = sql_operation(query.sql());
244 let span =
245 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
246 let conn: &'e mut PgConnection = &mut *self;
247 Box::pin(conn.fetch_optional(query).instrument(span))
248 }
249
250 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
251 where
252 'c: 'e,
253 E: sqlx::Execute<'q, Postgres> + 'q,
254 {
255 let op = sql_operation(query.sql());
256 let span =
257 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
258 let conn: &'e mut PgConnection = &mut *self;
259 Box::pin(conn.execute(query).instrument(span))
260 }
261
262 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
263 where
264 'c: 'e,
265 E: sqlx::Execute<'q, Postgres> + 'q,
266 {
267 let op = sql_operation(query.sql());
268 let span =
269 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
270 let conn: &'e mut PgConnection = &mut *self;
271 Box::pin(conn.fetch_all(query).instrument(span))
272 }
273
274 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
275 where
276 'c: 'e,
277 E: sqlx::Execute<'q, Postgres> + 'q,
278 {
279 let op = sql_operation(query.sql());
280 let span =
281 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
282 let conn: &'e mut PgConnection = &mut *self;
283 Box::pin(conn.fetch_one(query).instrument(span))
284 }
285
286 fn prepare_with<'e, 'q: 'e>(
287 self,
288 sql: &'q str,
289 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
290 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
291 where
292 'c: 'e,
293 {
294 let conn: &'e mut PgConnection = &mut *self;
295 conn.prepare_with(sql, parameters)
296 }
297
298 fn describe<'e, 'q: 'e>(
299 self,
300 sql: &'q str,
301 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
302 where
303 'c: 'e,
304 {
305 let conn: &'e mut PgConnection = &mut *self;
306 conn.describe(sql)
307 }
308}
309
310#[derive(Debug, Clone)]
311pub struct PendingJob {
312 pub id: Uuid,
313 pub job_type: String,
314 pub args: serde_json::Value,
315 pub context: serde_json::Value,
316 pub owner_subject: Option<String>,
317 pub priority: i32,
318 pub max_attempts: i32,
319 pub worker_capability: Option<String>,
320}
321
322#[derive(Debug, Clone)]
323pub struct PendingWorkflow {
324 pub id: Uuid,
325 pub workflow_name: String,
326 pub input: serde_json::Value,
327 pub owner_subject: Option<String>,
328}
329
330#[derive(Default)]
331pub struct OutboxBuffer {
332 pub jobs: Vec<PendingJob>,
333 pub workflows: Vec<PendingWorkflow>,
334}
335
336#[derive(Debug, Clone)]
338pub struct AuthContext {
339 user_id: Option<Uuid>,
341 roles: Vec<String>,
343 claims: HashMap<String, serde_json::Value>,
345 authenticated: bool,
347}
348
349impl AuthContext {
350 pub fn unauthenticated() -> Self {
352 Self {
353 user_id: None,
354 roles: Vec::new(),
355 claims: HashMap::new(),
356 authenticated: false,
357 }
358 }
359
360 pub fn authenticated(
362 user_id: Uuid,
363 roles: Vec<String>,
364 claims: HashMap<String, serde_json::Value>,
365 ) -> Self {
366 Self {
367 user_id: Some(user_id),
368 roles,
369 claims,
370 authenticated: true,
371 }
372 }
373
374 pub fn authenticated_without_uuid(
380 roles: Vec<String>,
381 claims: HashMap<String, serde_json::Value>,
382 ) -> Self {
383 Self {
384 user_id: None,
385 roles,
386 claims,
387 authenticated: true,
388 }
389 }
390
391 pub fn is_authenticated(&self) -> bool {
393 self.authenticated
394 }
395
396 pub fn user_id(&self) -> Option<Uuid> {
398 self.user_id
399 }
400
401 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
403 self.user_id
404 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
405 }
406
407 pub fn has_role(&self, role: &str) -> bool {
409 self.roles.iter().any(|r| r == role)
410 }
411
412 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
414 if self.has_role(role) {
415 Ok(())
416 } else {
417 Err(crate::error::ForgeError::Forbidden(format!(
418 "Required role '{}' not present",
419 role
420 )))
421 }
422 }
423
424 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
426 self.claims.get(key)
427 }
428
429 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
431 &self.claims
432 }
433
434 pub fn roles(&self) -> &[String] {
436 &self.roles
437 }
438
439 pub fn subject(&self) -> Option<&str> {
445 self.claims.get("sub").and_then(|v| v.as_str())
446 }
447
448 pub fn require_subject(&self) -> crate::error::Result<&str> {
450 if !self.authenticated {
451 return Err(crate::error::ForgeError::Unauthorized(
452 "Authentication required".to_string(),
453 ));
454 }
455 self.subject().ok_or_else(|| {
456 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
457 })
458 }
459
460 pub fn principal_id(&self) -> Option<String> {
464 self.subject()
465 .map(ToString::to_string)
466 .or_else(|| self.user_id.map(|id| id.to_string()))
467 }
468
469 pub fn is_admin(&self) -> bool {
471 self.roles.iter().any(|r| r == "admin")
472 }
473
474 pub fn tenant_id(&self) -> Option<uuid::Uuid> {
479 self.claims
480 .get("tenant_id")
481 .and_then(|v| v.as_str())
482 .and_then(|s| uuid::Uuid::parse_str(s).ok())
483 }
484
485 pub fn check_identity_args(
492 &self,
493 function_name: &str,
494 args: &serde_json::Value,
495 enforce_scope: bool,
496 ) -> crate::error::Result<()> {
497 use crate::error::ForgeError;
498
499 if self.is_admin() {
500 return Ok(());
501 }
502
503 if !self.is_authenticated() && !enforce_scope {
504 return Ok(());
505 }
506
507 let Some(obj) = args.as_object() else {
508 if enforce_scope && self.is_authenticated() {
509 return Err(ForgeError::Forbidden(format!(
510 "Function '{function_name}' must include identity or tenant scope arguments"
511 )));
512 }
513 return Ok(());
514 };
515
516 let mut principal_values: Vec<String> = Vec::new();
517 if let Some(user_id) = self.user_id().map(|id| id.to_string()) {
518 principal_values.push(user_id);
519 }
520 if let Some(subject) = self.principal_id()
521 && !principal_values.iter().any(|v| v == &subject)
522 {
523 principal_values.push(subject);
524 }
525
526 let mut has_scope_key = false;
527
528 for key in [
529 "user_id",
530 "userId",
531 "owner_id",
532 "ownerId",
533 "owner_subject",
534 "ownerSubject",
535 "subject",
536 "sub",
537 "principal_id",
538 "principalId",
539 ] {
540 let Some(value) = obj.get(key) else {
541 continue;
542 };
543 has_scope_key = true;
544
545 if !self.is_authenticated() {
546 return Err(ForgeError::Unauthorized(format!(
547 "Function '{function_name}' requires authentication for identity-scoped argument '{key}'"
548 )));
549 }
550
551 let serde_json::Value::String(actual) = value else {
552 return Err(ForgeError::InvalidArgument(format!(
553 "Function '{function_name}' argument '{key}' must be a non-empty string"
554 )));
555 };
556
557 if actual.trim().is_empty() || !principal_values.iter().any(|v| v == actual) {
558 return Err(ForgeError::Forbidden(format!(
559 "Function '{function_name}' argument '{key}' does not match authenticated principal"
560 )));
561 }
562 }
563
564 for key in ["tenant_id", "tenantId"] {
565 let Some(value) = obj.get(key) else {
566 continue;
567 };
568 has_scope_key = true;
569
570 if !self.is_authenticated() {
571 return Err(ForgeError::Unauthorized(format!(
572 "Function '{function_name}' requires authentication for tenant-scoped argument '{key}'"
573 )));
574 }
575
576 let expected = self
577 .claim("tenant_id")
578 .and_then(|v| v.as_str())
579 .ok_or_else(|| {
580 ForgeError::Forbidden(format!(
581 "Function '{function_name}' argument '{key}' is not allowed for this principal"
582 ))
583 })?;
584
585 let serde_json::Value::String(actual) = value else {
586 return Err(ForgeError::InvalidArgument(format!(
587 "Function '{function_name}' argument '{key}' must be a non-empty string"
588 )));
589 };
590
591 if actual.trim().is_empty() || actual != expected {
592 return Err(ForgeError::Forbidden(format!(
593 "Function '{function_name}' argument '{key}' does not match authenticated tenant"
594 )));
595 }
596 }
597
598 if enforce_scope && self.is_authenticated() && !has_scope_key {
599 return Err(ForgeError::Forbidden(format!(
600 "Function '{function_name}' must include identity or tenant scope arguments"
601 )));
602 }
603
604 Ok(())
605 }
606}
607
608#[derive(Debug, Clone)]
610pub struct RequestMetadata {
611 pub request_id: Uuid,
613 pub trace_id: String,
615 pub client_ip: Option<String>,
617 pub user_agent: Option<String>,
619 pub correlation_id: Option<String>,
621 pub timestamp: chrono::DateTime<chrono::Utc>,
623}
624
625impl RequestMetadata {
626 pub fn new() -> Self {
628 Self {
629 request_id: Uuid::new_v4(),
630 trace_id: Uuid::new_v4().to_string(),
631 client_ip: None,
632 user_agent: None,
633 correlation_id: None,
634 timestamp: chrono::Utc::now(),
635 }
636 }
637
638 pub fn with_trace_id(trace_id: String) -> Self {
640 Self {
641 request_id: Uuid::new_v4(),
642 trace_id,
643 client_ip: None,
644 user_agent: None,
645 correlation_id: None,
646 timestamp: chrono::Utc::now(),
647 }
648 }
649}
650
651impl Default for RequestMetadata {
652 fn default() -> Self {
653 Self::new()
654 }
655}
656
657pub struct QueryContext {
659 pub auth: AuthContext,
661 pub request: RequestMetadata,
663 db_pool: sqlx::PgPool,
665 env_provider: Arc<dyn EnvProvider>,
667}
668
669impl QueryContext {
670 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
672 Self {
673 auth,
674 request,
675 db_pool,
676 env_provider: Arc::new(RealEnvProvider::new()),
677 }
678 }
679
680 pub fn with_env(
682 db_pool: sqlx::PgPool,
683 auth: AuthContext,
684 request: RequestMetadata,
685 env_provider: Arc<dyn EnvProvider>,
686 ) -> Self {
687 Self {
688 auth,
689 request,
690 db_pool,
691 env_provider,
692 }
693 }
694
695 pub fn db(&self) -> ForgeDb {
704 ForgeDb(self.db_pool.clone())
705 }
706
707 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
708 self.auth.require_user_id()
709 }
710
711 pub fn require_subject(&self) -> crate::error::Result<&str> {
713 self.auth.require_subject()
714 }
715}
716
717impl EnvAccess for QueryContext {
718 fn env_provider(&self) -> &dyn EnvProvider {
719 self.env_provider.as_ref()
720 }
721}
722
723pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
725
726#[derive(Debug, Clone)]
728pub struct AuthTokenTtl {
729 pub access_token_secs: i64,
731 pub refresh_token_days: i64,
733}
734
735impl Default for AuthTokenTtl {
736 fn default() -> Self {
737 Self {
738 access_token_secs: 3600,
739 refresh_token_days: 30,
740 }
741 }
742}
743
744pub struct MutationContext {
746 pub auth: AuthContext,
748 pub request: RequestMetadata,
750 db_pool: sqlx::PgPool,
752 http_client: CircuitBreakerClient,
754 http_timeout: Option<Duration>,
757 job_dispatch: Option<Arc<dyn JobDispatch>>,
759 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
761 env_provider: Arc<dyn EnvProvider>,
763 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
765 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
767 job_info_lookup: Option<JobInfoLookup>,
769 token_issuer: Option<Arc<dyn TokenIssuer>>,
771 token_ttl: AuthTokenTtl,
773}
774
775impl MutationContext {
776 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
778 Self {
779 auth,
780 request,
781 db_pool,
782 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
783 http_timeout: None,
784 job_dispatch: None,
785 workflow_dispatch: None,
786 env_provider: Arc::new(RealEnvProvider::new()),
787 tx: None,
788 outbox: None,
789 job_info_lookup: None,
790 token_issuer: None,
791 token_ttl: AuthTokenTtl::default(),
792 }
793 }
794
795 pub fn with_dispatch(
797 db_pool: sqlx::PgPool,
798 auth: AuthContext,
799 request: RequestMetadata,
800 http_client: CircuitBreakerClient,
801 job_dispatch: Option<Arc<dyn JobDispatch>>,
802 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
803 ) -> Self {
804 Self {
805 auth,
806 request,
807 db_pool,
808 http_client,
809 http_timeout: None,
810 job_dispatch,
811 workflow_dispatch,
812 env_provider: Arc::new(RealEnvProvider::new()),
813 tx: None,
814 outbox: None,
815 job_info_lookup: None,
816 token_issuer: None,
817 token_ttl: AuthTokenTtl::default(),
818 }
819 }
820
821 pub fn with_env(
823 db_pool: sqlx::PgPool,
824 auth: AuthContext,
825 request: RequestMetadata,
826 http_client: CircuitBreakerClient,
827 job_dispatch: Option<Arc<dyn JobDispatch>>,
828 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
829 env_provider: Arc<dyn EnvProvider>,
830 ) -> Self {
831 Self {
832 auth,
833 request,
834 db_pool,
835 http_client,
836 http_timeout: None,
837 job_dispatch,
838 workflow_dispatch,
839 env_provider,
840 tx: None,
841 outbox: None,
842 job_info_lookup: None,
843 token_issuer: None,
844 token_ttl: AuthTokenTtl::default(),
845 }
846 }
847
848 #[allow(clippy::type_complexity)]
850 pub fn with_transaction(
851 db_pool: sqlx::PgPool,
852 tx: Transaction<'static, Postgres>,
853 auth: AuthContext,
854 request: RequestMetadata,
855 http_client: CircuitBreakerClient,
856 job_info_lookup: JobInfoLookup,
857 ) -> (
858 Self,
859 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
860 Arc<Mutex<OutboxBuffer>>,
861 ) {
862 let tx_handle = Arc::new(AsyncMutex::new(tx));
863 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
864
865 let ctx = Self {
866 auth,
867 request,
868 db_pool,
869 http_client,
870 http_timeout: None,
871 job_dispatch: None,
872 workflow_dispatch: None,
873 env_provider: Arc::new(RealEnvProvider::new()),
874 tx: Some(tx_handle.clone()),
875 outbox: Some(outbox.clone()),
876 job_info_lookup: Some(job_info_lookup),
877 token_issuer: None,
878 token_ttl: AuthTokenTtl::default(),
879 };
880
881 (ctx, tx_handle, outbox)
882 }
883
884 pub fn is_transactional(&self) -> bool {
885 self.tx.is_some()
886 }
887
888 pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
900 match &self.tx {
901 Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
902 None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
903 }
904 }
905
906 pub fn pool(&self) -> &sqlx::PgPool {
908 &self.db_pool
909 }
910
911 pub fn http(&self) -> crate::http::HttpClient {
917 self.http_client.with_timeout(self.http_timeout)
918 }
919
920 pub fn raw_http(&self) -> &reqwest::Client {
922 self.http_client.inner()
923 }
924
925 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
927 self.http_timeout = timeout;
928 }
929
930 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
931 self.auth.require_user_id()
932 }
933
934 pub fn require_subject(&self) -> crate::error::Result<&str> {
935 self.auth.require_subject()
936 }
937
938 pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
940 self.token_issuer = Some(issuer);
941 }
942
943 pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
945 self.token_ttl = ttl;
946 }
947
948 pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
963 let issuer = self.token_issuer.as_ref().ok_or_else(|| {
964 crate::error::ForgeError::Internal(
965 "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
966 .into(),
967 )
968 })?;
969 issuer.sign(claims)
970 }
971
972 pub async fn issue_token_pair(
982 &self,
983 user_id: Uuid,
984 roles: &[&str],
985 ) -> crate::error::Result<crate::auth::TokenPair> {
986 let issuer = self.token_issuer.clone().ok_or_else(|| {
987 crate::error::ForgeError::Internal(
988 "Token issuer not available. Configure [auth] in forge.toml".into(),
989 )
990 })?;
991 let access_ttl = self.token_ttl.access_token_secs;
992 let refresh_ttl = self.token_ttl.refresh_token_days;
993 crate::auth::tokens::issue_token_pair(
994 &self.db_pool,
995 user_id,
996 roles,
997 access_ttl,
998 refresh_ttl,
999 move |uid, r, ttl| {
1000 let claims = Claims::builder()
1001 .subject(uid)
1002 .roles(r.iter().map(|s| s.to_string()).collect())
1003 .duration_secs(ttl)
1004 .build()
1005 .map_err(crate::error::ForgeError::Internal)?;
1006 issuer.sign(&claims)
1007 },
1008 )
1009 .await
1010 }
1011
1012 pub async fn rotate_refresh_token(
1017 &self,
1018 old_refresh_token: &str,
1019 ) -> crate::error::Result<crate::auth::TokenPair> {
1020 let issuer = self.token_issuer.clone().ok_or_else(|| {
1021 crate::error::ForgeError::Internal(
1022 "Token issuer not available. Configure [auth] in forge.toml".into(),
1023 )
1024 })?;
1025 let access_ttl = self.token_ttl.access_token_secs;
1026 let refresh_ttl = self.token_ttl.refresh_token_days;
1027 crate::auth::tokens::rotate_refresh_token(
1028 &self.db_pool,
1029 old_refresh_token,
1030 &["user"],
1031 access_ttl,
1032 refresh_ttl,
1033 move |uid, r, ttl| {
1034 let claims = Claims::builder()
1035 .subject(uid)
1036 .roles(r.iter().map(|s| s.to_string()).collect())
1037 .duration_secs(ttl)
1038 .build()
1039 .map_err(crate::error::ForgeError::Internal)?;
1040 issuer.sign(&claims)
1041 },
1042 )
1043 .await
1044 }
1045
1046 pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1048 crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1049 }
1050
1051 pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1053 crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1054 }
1055
1056 pub async fn dispatch_job<T: serde::Serialize>(
1058 &self,
1059 job_type: &str,
1060 args: T,
1061 ) -> crate::error::Result<Uuid> {
1062 let args_json = serde_json::to_value(args)?;
1063
1064 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1066 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1067 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1068 })?;
1069
1070 let pending = PendingJob {
1071 id: Uuid::new_v4(),
1072 job_type: job_type.to_string(),
1073 args: args_json,
1074 context: serde_json::json!({}),
1075 owner_subject: self.auth.principal_id(),
1076 priority: job_info.priority.as_i32(),
1077 max_attempts: job_info.retry.max_attempts as i32,
1078 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1079 };
1080
1081 let job_id = pending.id;
1082 outbox
1083 .lock()
1084 .expect("outbox lock poisoned")
1085 .jobs
1086 .push(pending);
1087 return Ok(job_id);
1088 }
1089
1090 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1092 crate::error::ForgeError::Internal("Job dispatch not available".into())
1093 })?;
1094 dispatcher
1095 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1096 .await
1097 }
1098
1099 pub async fn dispatch_job_with_context<T: serde::Serialize>(
1101 &self,
1102 job_type: &str,
1103 args: T,
1104 context: serde_json::Value,
1105 ) -> crate::error::Result<Uuid> {
1106 let args_json = serde_json::to_value(args)?;
1107
1108 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1109 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1110 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1111 })?;
1112
1113 let pending = PendingJob {
1114 id: Uuid::new_v4(),
1115 job_type: job_type.to_string(),
1116 args: args_json,
1117 context,
1118 owner_subject: self.auth.principal_id(),
1119 priority: job_info.priority.as_i32(),
1120 max_attempts: job_info.retry.max_attempts as i32,
1121 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1122 };
1123
1124 let job_id = pending.id;
1125 outbox
1126 .lock()
1127 .expect("outbox lock poisoned")
1128 .jobs
1129 .push(pending);
1130 return Ok(job_id);
1131 }
1132
1133 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1134 crate::error::ForgeError::Internal("Job dispatch not available".into())
1135 })?;
1136 dispatcher
1137 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1138 .await
1139 }
1140
1141 pub async fn cancel_job(
1143 &self,
1144 job_id: Uuid,
1145 reason: Option<String>,
1146 ) -> crate::error::Result<bool> {
1147 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1148 crate::error::ForgeError::Internal("Job dispatch not available".into())
1149 })?;
1150 dispatcher.cancel(job_id, reason).await
1151 }
1152
1153 pub async fn start_workflow<T: serde::Serialize>(
1155 &self,
1156 workflow_name: &str,
1157 input: T,
1158 ) -> crate::error::Result<Uuid> {
1159 let input_json = serde_json::to_value(input)?;
1160
1161 if let Some(outbox) = &self.outbox {
1163 let pending = PendingWorkflow {
1164 id: Uuid::new_v4(),
1165 workflow_name: workflow_name.to_string(),
1166 input: input_json,
1167 owner_subject: self.auth.principal_id(),
1168 };
1169
1170 let workflow_id = pending.id;
1171 outbox
1172 .lock()
1173 .expect("outbox lock poisoned")
1174 .workflows
1175 .push(pending);
1176 return Ok(workflow_id);
1177 }
1178
1179 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1181 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1182 })?;
1183 dispatcher
1184 .start_by_name(workflow_name, input_json, self.auth.principal_id())
1185 .await
1186 }
1187}
1188
1189impl EnvAccess for MutationContext {
1190 fn env_provider(&self) -> &dyn EnvProvider {
1191 self.env_provider.as_ref()
1192 }
1193}
1194
1195#[cfg(test)]
1196#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1197mod tests {
1198 use super::*;
1199
1200 #[test]
1201 fn test_auth_context_unauthenticated() {
1202 let ctx = AuthContext::unauthenticated();
1203 assert!(!ctx.is_authenticated());
1204 assert!(ctx.user_id().is_none());
1205 assert!(ctx.require_user_id().is_err());
1206 }
1207
1208 #[test]
1209 fn test_auth_context_authenticated() {
1210 let user_id = Uuid::new_v4();
1211 let ctx = AuthContext::authenticated(
1212 user_id,
1213 vec!["admin".to_string(), "user".to_string()],
1214 HashMap::new(),
1215 );
1216
1217 assert!(ctx.is_authenticated());
1218 assert_eq!(ctx.user_id(), Some(user_id));
1219 assert!(ctx.require_user_id().is_ok());
1220 assert!(ctx.has_role("admin"));
1221 assert!(ctx.has_role("user"));
1222 assert!(!ctx.has_role("superadmin"));
1223 assert!(ctx.require_role("admin").is_ok());
1224 assert!(ctx.require_role("superadmin").is_err());
1225 }
1226
1227 #[test]
1228 fn test_auth_context_with_claims() {
1229 let mut claims = HashMap::new();
1230 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1231
1232 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1233
1234 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1235 assert!(ctx.claim("nonexistent").is_none());
1236 }
1237
1238 #[test]
1239 fn test_request_metadata() {
1240 let meta = RequestMetadata::new();
1241 assert!(!meta.trace_id.is_empty());
1242 assert!(meta.client_ip.is_none());
1243
1244 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1245 assert_eq!(meta2.trace_id, "trace-123");
1246 }
1247}