1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54pub trait TokenIssuer: Send + Sync {
59 fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63pub enum ForgeConn<'a> {
76 Pool(sqlx::pool::PoolConnection<Postgres>),
77 Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81 type Target = PgConnection;
82 fn deref(&self) -> &PgConnection {
83 match self {
84 ForgeConn::Pool(c) => c,
85 ForgeConn::Tx(g) => g,
86 }
87 }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91 fn deref_mut(&mut self) -> &mut PgConnection {
92 match self {
93 ForgeConn::Pool(c) => c,
94 ForgeConn::Tx(g) => g,
95 }
96 }
97}
98
99#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_tuple("ForgeDb").finish()
116 }
117}
118
119impl ForgeDb {
120 pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122 Self(pool.clone())
123 }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127 let bytes = sql.trim_start().as_bytes();
128 match bytes.get(..6) {
129 Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130 Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131 Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132 Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133 _ => "OTHER",
134 }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138 type Database = Postgres;
139
140 fn fetch_many<'e, 'q: 'e, E>(
141 self,
142 query: E,
143 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144 where
145 E: sqlx::Execute<'q, Postgres> + 'q,
146 {
147 (&self.0).fetch_many(query)
148 }
149
150 fn fetch_optional<'e, 'q: 'e, E>(
151 self,
152 query: E,
153 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154 where
155 E: sqlx::Execute<'q, Postgres> + 'q,
156 {
157 let op = sql_operation(query.sql());
158 let span =
159 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160 Box::pin(
161 async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162 )
163 }
164
165 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166 where
167 E: sqlx::Execute<'q, Postgres> + 'q,
168 {
169 let op = sql_operation(query.sql());
170 let span =
171 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172 Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173 }
174
175 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176 where
177 E: sqlx::Execute<'q, Postgres> + 'q,
178 {
179 let op = sql_operation(query.sql());
180 let span =
181 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182 Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183 }
184
185 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186 where
187 E: sqlx::Execute<'q, Postgres> + 'q,
188 {
189 let op = sql_operation(query.sql());
190 let span =
191 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192 Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193 }
194
195 fn prepare_with<'e, 'q: 'e>(
196 self,
197 sql: &'q str,
198 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200 Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201 }
202
203 fn describe<'e, 'q: 'e>(
204 self,
205 sql: &'q str,
206 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207 Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208 }
209}
210
211impl std::fmt::Debug for ForgeConn<'_> {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 match self {
214 ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
215 ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
216 }
217 }
218}
219
220impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
221 type Database = Postgres;
222
223 fn fetch_many<'e, 'q: 'e, E>(
224 self,
225 query: E,
226 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
227 where
228 'c: 'e,
229 E: sqlx::Execute<'q, Postgres> + 'q,
230 {
231 let conn: &'e mut PgConnection = &mut *self;
232 conn.fetch_many(query)
233 }
234
235 fn fetch_optional<'e, 'q: 'e, E>(
236 self,
237 query: E,
238 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
239 where
240 'c: 'e,
241 E: sqlx::Execute<'q, Postgres> + 'q,
242 {
243 let op = sql_operation(query.sql());
244 let span =
245 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
246 let conn: &'e mut PgConnection = &mut *self;
247 Box::pin(conn.fetch_optional(query).instrument(span))
248 }
249
250 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
251 where
252 'c: 'e,
253 E: sqlx::Execute<'q, Postgres> + 'q,
254 {
255 let op = sql_operation(query.sql());
256 let span =
257 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
258 let conn: &'e mut PgConnection = &mut *self;
259 Box::pin(conn.execute(query).instrument(span))
260 }
261
262 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
263 where
264 'c: 'e,
265 E: sqlx::Execute<'q, Postgres> + 'q,
266 {
267 let op = sql_operation(query.sql());
268 let span =
269 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
270 let conn: &'e mut PgConnection = &mut *self;
271 Box::pin(conn.fetch_all(query).instrument(span))
272 }
273
274 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
275 where
276 'c: 'e,
277 E: sqlx::Execute<'q, Postgres> + 'q,
278 {
279 let op = sql_operation(query.sql());
280 let span =
281 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
282 let conn: &'e mut PgConnection = &mut *self;
283 Box::pin(conn.fetch_one(query).instrument(span))
284 }
285
286 fn prepare_with<'e, 'q: 'e>(
287 self,
288 sql: &'q str,
289 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
290 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
291 where
292 'c: 'e,
293 {
294 let conn: &'e mut PgConnection = &mut *self;
295 conn.prepare_with(sql, parameters)
296 }
297
298 fn describe<'e, 'q: 'e>(
299 self,
300 sql: &'q str,
301 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
302 where
303 'c: 'e,
304 {
305 let conn: &'e mut PgConnection = &mut *self;
306 conn.describe(sql)
307 }
308}
309
310#[derive(Debug, Clone)]
311pub struct PendingJob {
312 pub id: Uuid,
313 pub job_type: String,
314 pub args: serde_json::Value,
315 pub context: serde_json::Value,
316 pub owner_subject: Option<String>,
317 pub priority: i32,
318 pub max_attempts: i32,
319 pub worker_capability: Option<String>,
320}
321
322#[derive(Debug, Clone)]
323pub struct PendingWorkflow {
324 pub id: Uuid,
325 pub workflow_name: String,
326 pub input: serde_json::Value,
327 pub owner_subject: Option<String>,
328}
329
330#[derive(Default)]
331pub struct OutboxBuffer {
332 pub jobs: Vec<PendingJob>,
333 pub workflows: Vec<PendingWorkflow>,
334}
335
336#[derive(Debug, Clone)]
338pub struct AuthContext {
339 user_id: Option<Uuid>,
341 roles: Vec<String>,
343 claims: HashMap<String, serde_json::Value>,
345 authenticated: bool,
347}
348
349impl AuthContext {
350 pub fn unauthenticated() -> Self {
352 Self {
353 user_id: None,
354 roles: Vec::new(),
355 claims: HashMap::new(),
356 authenticated: false,
357 }
358 }
359
360 pub fn authenticated(
362 user_id: Uuid,
363 roles: Vec<String>,
364 claims: HashMap<String, serde_json::Value>,
365 ) -> Self {
366 Self {
367 user_id: Some(user_id),
368 roles,
369 claims,
370 authenticated: true,
371 }
372 }
373
374 pub fn authenticated_without_uuid(
380 roles: Vec<String>,
381 claims: HashMap<String, serde_json::Value>,
382 ) -> Self {
383 Self {
384 user_id: None,
385 roles,
386 claims,
387 authenticated: true,
388 }
389 }
390
391 pub fn is_authenticated(&self) -> bool {
393 self.authenticated
394 }
395
396 pub fn user_id(&self) -> Option<Uuid> {
398 self.user_id
399 }
400
401 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
403 self.user_id
404 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
405 }
406
407 pub fn has_role(&self, role: &str) -> bool {
409 self.roles.iter().any(|r| r == role)
410 }
411
412 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
414 if self.has_role(role) {
415 Ok(())
416 } else {
417 Err(crate::error::ForgeError::Forbidden(format!(
418 "Required role '{}' not present",
419 role
420 )))
421 }
422 }
423
424 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
426 self.claims.get(key)
427 }
428
429 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
431 &self.claims
432 }
433
434 pub fn roles(&self) -> &[String] {
436 &self.roles
437 }
438
439 pub fn subject(&self) -> Option<&str> {
445 self.claims.get("sub").and_then(|v| v.as_str())
446 }
447
448 pub fn require_subject(&self) -> crate::error::Result<&str> {
450 if !self.authenticated {
451 return Err(crate::error::ForgeError::Unauthorized(
452 "Authentication required".to_string(),
453 ));
454 }
455 self.subject().ok_or_else(|| {
456 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
457 })
458 }
459
460 pub fn principal_id(&self) -> Option<String> {
464 self.subject()
465 .map(ToString::to_string)
466 .or_else(|| self.user_id.map(|id| id.to_string()))
467 }
468
469 pub fn is_admin(&self) -> bool {
471 self.roles.iter().any(|r| r == "admin")
472 }
473
474 pub fn check_identity_args(
481 &self,
482 function_name: &str,
483 args: &serde_json::Value,
484 enforce_scope: bool,
485 ) -> crate::error::Result<()> {
486 use crate::error::ForgeError;
487
488 if self.is_admin() {
489 return Ok(());
490 }
491
492 if !self.is_authenticated() && !enforce_scope {
493 return Ok(());
494 }
495
496 let Some(obj) = args.as_object() else {
497 if enforce_scope && self.is_authenticated() {
498 return Err(ForgeError::Forbidden(format!(
499 "Function '{function_name}' must include identity or tenant scope arguments"
500 )));
501 }
502 return Ok(());
503 };
504
505 let mut principal_values: Vec<String> = Vec::new();
506 if let Some(user_id) = self.user_id().map(|id| id.to_string()) {
507 principal_values.push(user_id);
508 }
509 if let Some(subject) = self.principal_id()
510 && !principal_values.iter().any(|v| v == &subject)
511 {
512 principal_values.push(subject);
513 }
514
515 let mut has_scope_key = false;
516
517 for key in [
518 "user_id",
519 "userId",
520 "owner_id",
521 "ownerId",
522 "owner_subject",
523 "ownerSubject",
524 "subject",
525 "sub",
526 "principal_id",
527 "principalId",
528 ] {
529 let Some(value) = obj.get(key) else {
530 continue;
531 };
532 has_scope_key = true;
533
534 if !self.is_authenticated() {
535 return Err(ForgeError::Unauthorized(format!(
536 "Function '{function_name}' requires authentication for identity-scoped argument '{key}'"
537 )));
538 }
539
540 let serde_json::Value::String(actual) = value else {
541 return Err(ForgeError::InvalidArgument(format!(
542 "Function '{function_name}' argument '{key}' must be a non-empty string"
543 )));
544 };
545
546 if actual.trim().is_empty() || !principal_values.iter().any(|v| v == actual) {
547 return Err(ForgeError::Forbidden(format!(
548 "Function '{function_name}' argument '{key}' does not match authenticated principal"
549 )));
550 }
551 }
552
553 for key in ["tenant_id", "tenantId"] {
554 let Some(value) = obj.get(key) else {
555 continue;
556 };
557 has_scope_key = true;
558
559 if !self.is_authenticated() {
560 return Err(ForgeError::Unauthorized(format!(
561 "Function '{function_name}' requires authentication for tenant-scoped argument '{key}'"
562 )));
563 }
564
565 let expected = self
566 .claim("tenant_id")
567 .and_then(|v| v.as_str())
568 .ok_or_else(|| {
569 ForgeError::Forbidden(format!(
570 "Function '{function_name}' argument '{key}' is not allowed for this principal"
571 ))
572 })?;
573
574 let serde_json::Value::String(actual) = value else {
575 return Err(ForgeError::InvalidArgument(format!(
576 "Function '{function_name}' argument '{key}' must be a non-empty string"
577 )));
578 };
579
580 if actual.trim().is_empty() || actual != expected {
581 return Err(ForgeError::Forbidden(format!(
582 "Function '{function_name}' argument '{key}' does not match authenticated tenant"
583 )));
584 }
585 }
586
587 if enforce_scope && self.is_authenticated() && !has_scope_key {
588 return Err(ForgeError::Forbidden(format!(
589 "Function '{function_name}' must include identity or tenant scope arguments"
590 )));
591 }
592
593 Ok(())
594 }
595}
596
597#[derive(Debug, Clone)]
599pub struct RequestMetadata {
600 pub request_id: Uuid,
602 pub trace_id: String,
604 pub client_ip: Option<String>,
606 pub user_agent: Option<String>,
608 pub timestamp: chrono::DateTime<chrono::Utc>,
610}
611
612impl RequestMetadata {
613 pub fn new() -> Self {
615 Self {
616 request_id: Uuid::new_v4(),
617 trace_id: Uuid::new_v4().to_string(),
618 client_ip: None,
619 user_agent: None,
620 timestamp: chrono::Utc::now(),
621 }
622 }
623
624 pub fn with_trace_id(trace_id: String) -> Self {
626 Self {
627 request_id: Uuid::new_v4(),
628 trace_id,
629 client_ip: None,
630 user_agent: None,
631 timestamp: chrono::Utc::now(),
632 }
633 }
634}
635
636impl Default for RequestMetadata {
637 fn default() -> Self {
638 Self::new()
639 }
640}
641
642pub struct QueryContext {
644 pub auth: AuthContext,
646 pub request: RequestMetadata,
648 db_pool: sqlx::PgPool,
650 env_provider: Arc<dyn EnvProvider>,
652}
653
654impl QueryContext {
655 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
657 Self {
658 auth,
659 request,
660 db_pool,
661 env_provider: Arc::new(RealEnvProvider::new()),
662 }
663 }
664
665 pub fn with_env(
667 db_pool: sqlx::PgPool,
668 auth: AuthContext,
669 request: RequestMetadata,
670 env_provider: Arc<dyn EnvProvider>,
671 ) -> Self {
672 Self {
673 auth,
674 request,
675 db_pool,
676 env_provider,
677 }
678 }
679
680 pub fn db(&self) -> ForgeDb {
689 ForgeDb(self.db_pool.clone())
690 }
691
692 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
693 self.auth.require_user_id()
694 }
695
696 pub fn require_subject(&self) -> crate::error::Result<&str> {
698 self.auth.require_subject()
699 }
700}
701
702impl EnvAccess for QueryContext {
703 fn env_provider(&self) -> &dyn EnvProvider {
704 self.env_provider.as_ref()
705 }
706}
707
708pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
710
711#[derive(Debug, Clone)]
713pub struct AuthTokenTtl {
714 pub access_token_secs: i64,
716 pub refresh_token_days: i64,
718}
719
720impl Default for AuthTokenTtl {
721 fn default() -> Self {
722 Self {
723 access_token_secs: 3600,
724 refresh_token_days: 30,
725 }
726 }
727}
728
729pub struct MutationContext {
731 pub auth: AuthContext,
733 pub request: RequestMetadata,
735 db_pool: sqlx::PgPool,
737 http_client: CircuitBreakerClient,
739 http_timeout: Option<Duration>,
742 job_dispatch: Option<Arc<dyn JobDispatch>>,
744 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
746 env_provider: Arc<dyn EnvProvider>,
748 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
750 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
752 job_info_lookup: Option<JobInfoLookup>,
754 token_issuer: Option<Arc<dyn TokenIssuer>>,
756 token_ttl: AuthTokenTtl,
758}
759
760impl MutationContext {
761 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
763 Self {
764 auth,
765 request,
766 db_pool,
767 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
768 http_timeout: None,
769 job_dispatch: None,
770 workflow_dispatch: None,
771 env_provider: Arc::new(RealEnvProvider::new()),
772 tx: None,
773 outbox: None,
774 job_info_lookup: None,
775 token_issuer: None,
776 token_ttl: AuthTokenTtl::default(),
777 }
778 }
779
780 pub fn with_dispatch(
782 db_pool: sqlx::PgPool,
783 auth: AuthContext,
784 request: RequestMetadata,
785 http_client: CircuitBreakerClient,
786 job_dispatch: Option<Arc<dyn JobDispatch>>,
787 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
788 ) -> Self {
789 Self {
790 auth,
791 request,
792 db_pool,
793 http_client,
794 http_timeout: None,
795 job_dispatch,
796 workflow_dispatch,
797 env_provider: Arc::new(RealEnvProvider::new()),
798 tx: None,
799 outbox: None,
800 job_info_lookup: None,
801 token_issuer: None,
802 token_ttl: AuthTokenTtl::default(),
803 }
804 }
805
806 pub fn with_env(
808 db_pool: sqlx::PgPool,
809 auth: AuthContext,
810 request: RequestMetadata,
811 http_client: CircuitBreakerClient,
812 job_dispatch: Option<Arc<dyn JobDispatch>>,
813 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
814 env_provider: Arc<dyn EnvProvider>,
815 ) -> Self {
816 Self {
817 auth,
818 request,
819 db_pool,
820 http_client,
821 http_timeout: None,
822 job_dispatch,
823 workflow_dispatch,
824 env_provider,
825 tx: None,
826 outbox: None,
827 job_info_lookup: None,
828 token_issuer: None,
829 token_ttl: AuthTokenTtl::default(),
830 }
831 }
832
833 #[allow(clippy::type_complexity)]
835 pub fn with_transaction(
836 db_pool: sqlx::PgPool,
837 tx: Transaction<'static, Postgres>,
838 auth: AuthContext,
839 request: RequestMetadata,
840 http_client: CircuitBreakerClient,
841 job_info_lookup: JobInfoLookup,
842 ) -> (
843 Self,
844 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
845 Arc<Mutex<OutboxBuffer>>,
846 ) {
847 let tx_handle = Arc::new(AsyncMutex::new(tx));
848 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
849
850 let ctx = Self {
851 auth,
852 request,
853 db_pool,
854 http_client,
855 http_timeout: None,
856 job_dispatch: None,
857 workflow_dispatch: None,
858 env_provider: Arc::new(RealEnvProvider::new()),
859 tx: Some(tx_handle.clone()),
860 outbox: Some(outbox.clone()),
861 job_info_lookup: Some(job_info_lookup),
862 token_issuer: None,
863 token_ttl: AuthTokenTtl::default(),
864 };
865
866 (ctx, tx_handle, outbox)
867 }
868
869 pub fn is_transactional(&self) -> bool {
870 self.tx.is_some()
871 }
872
873 pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
885 match &self.tx {
886 Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
887 None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
888 }
889 }
890
891 pub fn pool(&self) -> &sqlx::PgPool {
893 &self.db_pool
894 }
895
896 pub fn http(&self) -> crate::http::HttpClient {
902 self.http_client.with_timeout(self.http_timeout)
903 }
904
905 pub fn raw_http(&self) -> &reqwest::Client {
907 self.http_client.inner()
908 }
909
910 pub fn http_with_circuit_breaker(&self) -> crate::http::HttpClient {
912 self.http()
913 }
914
915 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
917 self.http_timeout = timeout;
918 }
919
920 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
921 self.auth.require_user_id()
922 }
923
924 pub fn require_subject(&self) -> crate::error::Result<&str> {
925 self.auth.require_subject()
926 }
927
928 pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
930 self.token_issuer = Some(issuer);
931 }
932
933 pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
935 self.token_ttl = ttl;
936 }
937
938 pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
953 let issuer = self.token_issuer.as_ref().ok_or_else(|| {
954 crate::error::ForgeError::Internal(
955 "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
956 .into(),
957 )
958 })?;
959 issuer.sign(claims)
960 }
961
962 pub async fn issue_token_pair(
972 &self,
973 user_id: Uuid,
974 roles: &[&str],
975 ) -> crate::error::Result<crate::auth::TokenPair> {
976 let issuer = self.token_issuer.clone().ok_or_else(|| {
977 crate::error::ForgeError::Internal(
978 "Token issuer not available. Configure [auth] in forge.toml".into(),
979 )
980 })?;
981 let access_ttl = self.token_ttl.access_token_secs;
982 let refresh_ttl = self.token_ttl.refresh_token_days;
983 crate::auth::tokens::issue_token_pair(
984 &self.db_pool,
985 user_id,
986 roles,
987 access_ttl,
988 refresh_ttl,
989 move |uid, r, ttl| {
990 let claims = Claims::builder()
991 .subject(uid)
992 .roles(r.iter().map(|s| s.to_string()).collect())
993 .duration_secs(ttl)
994 .build()
995 .map_err(crate::error::ForgeError::Internal)?;
996 issuer.sign(&claims)
997 },
998 )
999 .await
1000 }
1001
1002 pub async fn rotate_refresh_token(
1007 &self,
1008 old_refresh_token: &str,
1009 ) -> crate::error::Result<crate::auth::TokenPair> {
1010 let issuer = self.token_issuer.clone().ok_or_else(|| {
1011 crate::error::ForgeError::Internal(
1012 "Token issuer not available. Configure [auth] in forge.toml".into(),
1013 )
1014 })?;
1015 let access_ttl = self.token_ttl.access_token_secs;
1016 let refresh_ttl = self.token_ttl.refresh_token_days;
1017 crate::auth::tokens::rotate_refresh_token(
1018 &self.db_pool,
1019 old_refresh_token,
1020 &["user"],
1021 access_ttl,
1022 refresh_ttl,
1023 move |uid, r, ttl| {
1024 let claims = Claims::builder()
1025 .subject(uid)
1026 .roles(r.iter().map(|s| s.to_string()).collect())
1027 .duration_secs(ttl)
1028 .build()
1029 .map_err(crate::error::ForgeError::Internal)?;
1030 issuer.sign(&claims)
1031 },
1032 )
1033 .await
1034 }
1035
1036 pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1038 crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1039 }
1040
1041 pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1043 crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1044 }
1045
1046 pub async fn dispatch_job<T: serde::Serialize>(
1048 &self,
1049 job_type: &str,
1050 args: T,
1051 ) -> crate::error::Result<Uuid> {
1052 let args_json = serde_json::to_value(args)?;
1053
1054 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1056 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1057 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1058 })?;
1059
1060 let pending = PendingJob {
1061 id: Uuid::new_v4(),
1062 job_type: job_type.to_string(),
1063 args: args_json,
1064 context: serde_json::json!({}),
1065 owner_subject: self.auth.principal_id(),
1066 priority: job_info.priority.as_i32(),
1067 max_attempts: job_info.retry.max_attempts as i32,
1068 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1069 };
1070
1071 let job_id = pending.id;
1072 outbox
1073 .lock()
1074 .expect("outbox lock poisoned")
1075 .jobs
1076 .push(pending);
1077 return Ok(job_id);
1078 }
1079
1080 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1082 crate::error::ForgeError::Internal("Job dispatch not available".into())
1083 })?;
1084 dispatcher
1085 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1086 .await
1087 }
1088
1089 pub async fn dispatch_job_with_context<T: serde::Serialize>(
1091 &self,
1092 job_type: &str,
1093 args: T,
1094 context: serde_json::Value,
1095 ) -> crate::error::Result<Uuid> {
1096 let args_json = serde_json::to_value(args)?;
1097
1098 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1099 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1100 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1101 })?;
1102
1103 let pending = PendingJob {
1104 id: Uuid::new_v4(),
1105 job_type: job_type.to_string(),
1106 args: args_json,
1107 context,
1108 owner_subject: self.auth.principal_id(),
1109 priority: job_info.priority.as_i32(),
1110 max_attempts: job_info.retry.max_attempts as i32,
1111 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1112 };
1113
1114 let job_id = pending.id;
1115 outbox
1116 .lock()
1117 .expect("outbox lock poisoned")
1118 .jobs
1119 .push(pending);
1120 return Ok(job_id);
1121 }
1122
1123 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1124 crate::error::ForgeError::Internal("Job dispatch not available".into())
1125 })?;
1126 dispatcher
1127 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1128 .await
1129 }
1130
1131 pub async fn cancel_job(
1133 &self,
1134 job_id: Uuid,
1135 reason: Option<String>,
1136 ) -> crate::error::Result<bool> {
1137 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1138 crate::error::ForgeError::Internal("Job dispatch not available".into())
1139 })?;
1140 dispatcher.cancel(job_id, reason).await
1141 }
1142
1143 pub async fn start_workflow<T: serde::Serialize>(
1145 &self,
1146 workflow_name: &str,
1147 input: T,
1148 ) -> crate::error::Result<Uuid> {
1149 let input_json = serde_json::to_value(input)?;
1150
1151 if let Some(outbox) = &self.outbox {
1153 let pending = PendingWorkflow {
1154 id: Uuid::new_v4(),
1155 workflow_name: workflow_name.to_string(),
1156 input: input_json,
1157 owner_subject: self.auth.principal_id(),
1158 };
1159
1160 let workflow_id = pending.id;
1161 outbox
1162 .lock()
1163 .expect("outbox lock poisoned")
1164 .workflows
1165 .push(pending);
1166 return Ok(workflow_id);
1167 }
1168
1169 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1171 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1172 })?;
1173 dispatcher
1174 .start_by_name(workflow_name, input_json, self.auth.principal_id())
1175 .await
1176 }
1177}
1178
1179impl EnvAccess for MutationContext {
1180 fn env_provider(&self) -> &dyn EnvProvider {
1181 self.env_provider.as_ref()
1182 }
1183}
1184
1185#[cfg(test)]
1186#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1187mod tests {
1188 use super::*;
1189
1190 #[test]
1191 fn test_auth_context_unauthenticated() {
1192 let ctx = AuthContext::unauthenticated();
1193 assert!(!ctx.is_authenticated());
1194 assert!(ctx.user_id().is_none());
1195 assert!(ctx.require_user_id().is_err());
1196 }
1197
1198 #[test]
1199 fn test_auth_context_authenticated() {
1200 let user_id = Uuid::new_v4();
1201 let ctx = AuthContext::authenticated(
1202 user_id,
1203 vec!["admin".to_string(), "user".to_string()],
1204 HashMap::new(),
1205 );
1206
1207 assert!(ctx.is_authenticated());
1208 assert_eq!(ctx.user_id(), Some(user_id));
1209 assert!(ctx.require_user_id().is_ok());
1210 assert!(ctx.has_role("admin"));
1211 assert!(ctx.has_role("user"));
1212 assert!(!ctx.has_role("superadmin"));
1213 assert!(ctx.require_role("admin").is_ok());
1214 assert!(ctx.require_role("superadmin").is_err());
1215 }
1216
1217 #[test]
1218 fn test_auth_context_with_claims() {
1219 let mut claims = HashMap::new();
1220 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1221
1222 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1223
1224 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1225 assert!(ctx.claim("nonexistent").is_none());
1226 }
1227
1228 #[test]
1229 fn test_request_metadata() {
1230 let meta = RequestMetadata::new();
1231 assert!(!meta.trace_id.is_empty());
1232 assert!(meta.client_ip.is_none());
1233
1234 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1235 assert_eq!(meta2.trace_id, "trace-123");
1236 }
1237}