1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54pub trait TokenIssuer: Send + Sync {
59 fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63pub enum ForgeConn<'a> {
76 Pool(sqlx::pool::PoolConnection<Postgres>),
77 Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81 type Target = PgConnection;
82 fn deref(&self) -> &PgConnection {
83 match self {
84 ForgeConn::Pool(c) => c,
85 ForgeConn::Tx(g) => g,
86 }
87 }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91 fn deref_mut(&mut self) -> &mut PgConnection {
92 match self {
93 ForgeConn::Pool(c) => c,
94 ForgeConn::Tx(g) => g,
95 }
96 }
97}
98
99#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_tuple("ForgeDb").finish()
116 }
117}
118
119impl ForgeDb {
120 pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122 Self(pool.clone())
123 }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127 let bytes = sql.trim_start().as_bytes();
128 match bytes.get(..6) {
129 Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130 Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131 Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132 Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133 _ => "OTHER",
134 }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138 type Database = Postgres;
139
140 fn fetch_many<'e, 'q: 'e, E>(
141 self,
142 query: E,
143 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144 where
145 E: sqlx::Execute<'q, Postgres> + 'q,
146 {
147 (&self.0).fetch_many(query)
148 }
149
150 fn fetch_optional<'e, 'q: 'e, E>(
151 self,
152 query: E,
153 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154 where
155 E: sqlx::Execute<'q, Postgres> + 'q,
156 {
157 let op = sql_operation(query.sql());
158 let span =
159 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160 Box::pin(
161 async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162 )
163 }
164
165 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166 where
167 E: sqlx::Execute<'q, Postgres> + 'q,
168 {
169 let op = sql_operation(query.sql());
170 let span =
171 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172 Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173 }
174
175 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176 where
177 E: sqlx::Execute<'q, Postgres> + 'q,
178 {
179 let op = sql_operation(query.sql());
180 let span =
181 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182 Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183 }
184
185 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186 where
187 E: sqlx::Execute<'q, Postgres> + 'q,
188 {
189 let op = sql_operation(query.sql());
190 let span =
191 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192 Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193 }
194
195 fn prepare_with<'e, 'q: 'e>(
196 self,
197 sql: &'q str,
198 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200 Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201 }
202
203 fn describe<'e, 'q: 'e>(
204 self,
205 sql: &'q str,
206 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207 Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208 }
209}
210
211pub enum DbConn<'a> {
227 Pool(sqlx::PgPool),
229 Transaction(
231 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
232 &'a sqlx::PgPool,
233 ),
234}
235
236impl DbConn<'_> {
237 pub async fn fetch_one<'q, O>(
239 &self,
240 query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
241 ) -> sqlx::Result<O>
242 where
243 O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
244 {
245 match self {
246 DbConn::Pool(pool) => query.fetch_one(pool).await,
247 DbConn::Transaction(tx, _) => {
248 let mut guard = tx.lock().await;
249 query.fetch_one(&mut **guard).await
250 }
251 }
252 }
253
254 pub async fn fetch_optional<'q, O>(
256 &self,
257 query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
258 ) -> sqlx::Result<Option<O>>
259 where
260 O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
261 {
262 match self {
263 DbConn::Pool(pool) => query.fetch_optional(pool).await,
264 DbConn::Transaction(tx, _) => {
265 let mut guard = tx.lock().await;
266 query.fetch_optional(&mut **guard).await
267 }
268 }
269 }
270
271 pub async fn fetch_all<'q, O>(
273 &self,
274 query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
275 ) -> sqlx::Result<Vec<O>>
276 where
277 O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
278 {
279 match self {
280 DbConn::Pool(pool) => query.fetch_all(pool).await,
281 DbConn::Transaction(tx, _) => {
282 let mut guard = tx.lock().await;
283 query.fetch_all(&mut **guard).await
284 }
285 }
286 }
287
288 pub async fn execute<'q>(
290 &self,
291 query: sqlx::query::Query<'q, Postgres, sqlx::postgres::PgArguments>,
292 ) -> sqlx::Result<PgQueryResult> {
293 match self {
294 DbConn::Pool(pool) => query.execute(pool).await,
295 DbConn::Transaction(tx, _) => {
296 let mut guard = tx.lock().await;
297 query.execute(&mut **guard).await
298 }
299 }
300 }
301}
302
303impl std::fmt::Debug for DbConn<'_> {
304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305 match self {
306 DbConn::Pool(_) => f.debug_tuple("DbConn::Pool").finish(),
307 DbConn::Transaction(_, _) => f.debug_tuple("DbConn::Transaction").finish(),
308 }
309 }
310}
311
312impl std::fmt::Debug for ForgeConn<'_> {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 match self {
315 ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
316 ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
317 }
318 }
319}
320
321impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
322 type Database = Postgres;
323
324 fn fetch_many<'e, 'q: 'e, E>(
325 self,
326 query: E,
327 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
328 where
329 'c: 'e,
330 E: sqlx::Execute<'q, Postgres> + 'q,
331 {
332 let conn: &'e mut PgConnection = &mut *self;
333 conn.fetch_many(query)
334 }
335
336 fn fetch_optional<'e, 'q: 'e, E>(
337 self,
338 query: E,
339 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
340 where
341 'c: 'e,
342 E: sqlx::Execute<'q, Postgres> + 'q,
343 {
344 let op = sql_operation(query.sql());
345 let span =
346 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
347 let conn: &'e mut PgConnection = &mut *self;
348 Box::pin(conn.fetch_optional(query).instrument(span))
349 }
350
351 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
352 where
353 'c: 'e,
354 E: sqlx::Execute<'q, Postgres> + 'q,
355 {
356 let op = sql_operation(query.sql());
357 let span =
358 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
359 let conn: &'e mut PgConnection = &mut *self;
360 Box::pin(conn.execute(query).instrument(span))
361 }
362
363 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
364 where
365 'c: 'e,
366 E: sqlx::Execute<'q, Postgres> + 'q,
367 {
368 let op = sql_operation(query.sql());
369 let span =
370 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
371 let conn: &'e mut PgConnection = &mut *self;
372 Box::pin(conn.fetch_all(query).instrument(span))
373 }
374
375 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
376 where
377 'c: 'e,
378 E: sqlx::Execute<'q, Postgres> + 'q,
379 {
380 let op = sql_operation(query.sql());
381 let span =
382 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
383 let conn: &'e mut PgConnection = &mut *self;
384 Box::pin(conn.fetch_one(query).instrument(span))
385 }
386
387 fn prepare_with<'e, 'q: 'e>(
388 self,
389 sql: &'q str,
390 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
391 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
392 where
393 'c: 'e,
394 {
395 let conn: &'e mut PgConnection = &mut *self;
396 conn.prepare_with(sql, parameters)
397 }
398
399 fn describe<'e, 'q: 'e>(
400 self,
401 sql: &'q str,
402 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
403 where
404 'c: 'e,
405 {
406 let conn: &'e mut PgConnection = &mut *self;
407 conn.describe(sql)
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct PendingJob {
414 pub id: Uuid,
415 pub job_type: String,
416 pub args: serde_json::Value,
417 pub context: serde_json::Value,
418 pub owner_subject: Option<String>,
419 pub priority: i32,
420 pub max_attempts: i32,
421 pub worker_capability: Option<String>,
422}
423
424#[derive(Debug, Clone)]
426pub struct PendingWorkflow {
427 pub id: Uuid,
428 pub workflow_name: String,
429 pub input: serde_json::Value,
430 pub owner_subject: Option<String>,
431}
432
433#[derive(Default)]
438pub struct OutboxBuffer {
439 pub jobs: Vec<PendingJob>,
440 pub workflows: Vec<PendingWorkflow>,
441}
442
443#[derive(Debug, Clone)]
445pub struct AuthContext {
446 user_id: Option<Uuid>,
448 roles: Vec<String>,
450 claims: HashMap<String, serde_json::Value>,
452 authenticated: bool,
454}
455
456impl AuthContext {
457 pub fn unauthenticated() -> Self {
459 Self {
460 user_id: None,
461 roles: Vec::new(),
462 claims: HashMap::new(),
463 authenticated: false,
464 }
465 }
466
467 pub fn authenticated(
469 user_id: Uuid,
470 roles: Vec<String>,
471 claims: HashMap<String, serde_json::Value>,
472 ) -> Self {
473 Self {
474 user_id: Some(user_id),
475 roles,
476 claims,
477 authenticated: true,
478 }
479 }
480
481 pub fn authenticated_without_uuid(
487 roles: Vec<String>,
488 claims: HashMap<String, serde_json::Value>,
489 ) -> Self {
490 Self {
491 user_id: None,
492 roles,
493 claims,
494 authenticated: true,
495 }
496 }
497
498 pub fn is_authenticated(&self) -> bool {
500 self.authenticated
501 }
502
503 pub fn user_id(&self) -> Option<Uuid> {
505 self.user_id
506 }
507
508 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
510 self.user_id
511 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
512 }
513
514 pub fn has_role(&self, role: &str) -> bool {
516 self.roles.iter().any(|r| r == role)
517 }
518
519 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
521 if self.has_role(role) {
522 Ok(())
523 } else {
524 Err(crate::error::ForgeError::Forbidden(format!(
525 "Required role '{}' not present",
526 role
527 )))
528 }
529 }
530
531 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
533 self.claims.get(key)
534 }
535
536 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
538 &self.claims
539 }
540
541 pub fn roles(&self) -> &[String] {
543 &self.roles
544 }
545
546 pub fn subject(&self) -> Option<&str> {
552 self.claims.get("sub").and_then(|v| v.as_str())
553 }
554
555 pub fn require_subject(&self) -> crate::error::Result<&str> {
557 if !self.authenticated {
558 return Err(crate::error::ForgeError::Unauthorized(
559 "Authentication required".to_string(),
560 ));
561 }
562 self.subject().ok_or_else(|| {
563 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
564 })
565 }
566
567 pub fn principal_id(&self) -> Option<String> {
571 self.subject()
572 .map(ToString::to_string)
573 .or_else(|| self.user_id.map(|id| id.to_string()))
574 }
575
576 pub fn is_admin(&self) -> bool {
578 self.roles.iter().any(|r| r == "admin")
579 }
580
581 pub fn tenant_id(&self) -> Option<uuid::Uuid> {
586 self.claims
587 .get("tenant_id")
588 .and_then(|v| v.as_str())
589 .and_then(|s| uuid::Uuid::parse_str(s).ok())
590 }
591}
592
593#[derive(Debug, Clone)]
595pub struct RequestMetadata {
596 pub request_id: Uuid,
598 pub trace_id: String,
600 pub client_ip: Option<String>,
602 pub user_agent: Option<String>,
604 pub correlation_id: Option<String>,
606 pub timestamp: chrono::DateTime<chrono::Utc>,
608}
609
610impl RequestMetadata {
611 pub fn new() -> Self {
613 Self {
614 request_id: Uuid::new_v4(),
615 trace_id: Uuid::new_v4().to_string(),
616 client_ip: None,
617 user_agent: None,
618 correlation_id: None,
619 timestamp: chrono::Utc::now(),
620 }
621 }
622
623 pub fn with_trace_id(trace_id: String) -> Self {
625 Self {
626 request_id: Uuid::new_v4(),
627 trace_id,
628 client_ip: None,
629 user_agent: None,
630 correlation_id: None,
631 timestamp: chrono::Utc::now(),
632 }
633 }
634}
635
636impl Default for RequestMetadata {
637 fn default() -> Self {
638 Self::new()
639 }
640}
641
642pub struct QueryContext {
644 pub auth: AuthContext,
646 pub request: RequestMetadata,
648 db_pool: sqlx::PgPool,
650 env_provider: Arc<dyn EnvProvider>,
652}
653
654impl QueryContext {
655 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
657 Self {
658 auth,
659 request,
660 db_pool,
661 env_provider: Arc::new(RealEnvProvider::new()),
662 }
663 }
664
665 pub fn with_env(
667 db_pool: sqlx::PgPool,
668 auth: AuthContext,
669 request: RequestMetadata,
670 env_provider: Arc<dyn EnvProvider>,
671 ) -> Self {
672 Self {
673 auth,
674 request,
675 db_pool,
676 env_provider,
677 }
678 }
679
680 pub fn db(&self) -> ForgeDb {
689 ForgeDb(self.db_pool.clone())
690 }
691
692 pub fn db_conn(&self) -> DbConn<'_> {
706 DbConn::Pool(self.db_pool.clone())
707 }
708
709 pub fn user_id(&self) -> crate::error::Result<Uuid> {
711 self.auth.require_user_id()
712 }
713
714 pub fn tenant_id(&self) -> Option<Uuid> {
716 self.auth.tenant_id()
717 }
718}
719
720impl EnvAccess for QueryContext {
721 fn env_provider(&self) -> &dyn EnvProvider {
722 self.env_provider.as_ref()
723 }
724}
725
726pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
728
729#[derive(Debug, Clone)]
731pub struct AuthTokenTtl {
732 pub access_token_secs: i64,
734 pub refresh_token_days: i64,
736}
737
738impl Default for AuthTokenTtl {
739 fn default() -> Self {
740 Self {
741 access_token_secs: 3600,
742 refresh_token_days: 30,
743 }
744 }
745}
746
747pub struct MutationContext {
749 pub auth: AuthContext,
751 pub request: RequestMetadata,
753 db_pool: sqlx::PgPool,
755 http_client: CircuitBreakerClient,
757 http_timeout: Option<Duration>,
760 job_dispatch: Option<Arc<dyn JobDispatch>>,
762 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
764 env_provider: Arc<dyn EnvProvider>,
766 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
768 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
770 job_info_lookup: Option<JobInfoLookup>,
772 token_issuer: Option<Arc<dyn TokenIssuer>>,
774 token_ttl: AuthTokenTtl,
776}
777
778impl MutationContext {
779 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
781 Self {
782 auth,
783 request,
784 db_pool,
785 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
786 http_timeout: None,
787 job_dispatch: None,
788 workflow_dispatch: None,
789 env_provider: Arc::new(RealEnvProvider::new()),
790 tx: None,
791 outbox: None,
792 job_info_lookup: None,
793 token_issuer: None,
794 token_ttl: AuthTokenTtl::default(),
795 }
796 }
797
798 pub fn with_dispatch(
800 db_pool: sqlx::PgPool,
801 auth: AuthContext,
802 request: RequestMetadata,
803 http_client: CircuitBreakerClient,
804 job_dispatch: Option<Arc<dyn JobDispatch>>,
805 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
806 ) -> Self {
807 Self {
808 auth,
809 request,
810 db_pool,
811 http_client,
812 http_timeout: None,
813 job_dispatch,
814 workflow_dispatch,
815 env_provider: Arc::new(RealEnvProvider::new()),
816 tx: None,
817 outbox: None,
818 job_info_lookup: None,
819 token_issuer: None,
820 token_ttl: AuthTokenTtl::default(),
821 }
822 }
823
824 pub fn with_env(
826 db_pool: sqlx::PgPool,
827 auth: AuthContext,
828 request: RequestMetadata,
829 http_client: CircuitBreakerClient,
830 job_dispatch: Option<Arc<dyn JobDispatch>>,
831 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
832 env_provider: Arc<dyn EnvProvider>,
833 ) -> Self {
834 Self {
835 auth,
836 request,
837 db_pool,
838 http_client,
839 http_timeout: None,
840 job_dispatch,
841 workflow_dispatch,
842 env_provider,
843 tx: None,
844 outbox: None,
845 job_info_lookup: None,
846 token_issuer: None,
847 token_ttl: AuthTokenTtl::default(),
848 }
849 }
850
851 #[allow(clippy::type_complexity)]
853 pub fn with_transaction(
854 db_pool: sqlx::PgPool,
855 tx: Transaction<'static, Postgres>,
856 auth: AuthContext,
857 request: RequestMetadata,
858 http_client: CircuitBreakerClient,
859 job_info_lookup: JobInfoLookup,
860 ) -> (
861 Self,
862 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
863 Arc<Mutex<OutboxBuffer>>,
864 ) {
865 let tx_handle = Arc::new(AsyncMutex::new(tx));
866 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
867
868 let ctx = Self {
869 auth,
870 request,
871 db_pool,
872 http_client,
873 http_timeout: None,
874 job_dispatch: None,
875 workflow_dispatch: None,
876 env_provider: Arc::new(RealEnvProvider::new()),
877 tx: Some(tx_handle.clone()),
878 outbox: Some(outbox.clone()),
879 job_info_lookup: Some(job_info_lookup),
880 token_issuer: None,
881 token_ttl: AuthTokenTtl::default(),
882 };
883
884 (ctx, tx_handle, outbox)
885 }
886
887 pub fn is_transactional(&self) -> bool {
888 self.tx.is_some()
889 }
890
891 pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
903 match &self.tx {
904 Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
905 None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
906 }
907 }
908
909 pub fn pool(&self) -> &sqlx::PgPool {
911 &self.db_pool
912 }
913
914 pub fn db(&self) -> DbConn<'_> {
928 match &self.tx {
929 Some(tx) => DbConn::Transaction(tx.clone(), &self.db_pool),
930 None => DbConn::Pool(self.db_pool.clone()),
931 }
932 }
933
934 pub fn db_conn(&self) -> DbConn<'_> {
936 self.db()
937 }
938
939 pub fn http(&self) -> crate::http::HttpClient {
945 self.http_client.with_timeout(self.http_timeout)
946 }
947
948 pub fn raw_http(&self) -> &reqwest::Client {
950 self.http_client.inner()
951 }
952
953 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
955 self.http_timeout = timeout;
956 }
957
958 pub fn user_id(&self) -> crate::error::Result<Uuid> {
960 self.auth.require_user_id()
961 }
962
963 pub fn tenant_id(&self) -> Option<Uuid> {
965 self.auth.tenant_id()
966 }
967
968 pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
970 self.token_issuer = Some(issuer);
971 }
972
973 pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
975 self.token_ttl = ttl;
976 }
977
978 pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
993 let issuer = self.token_issuer.as_ref().ok_or_else(|| {
994 crate::error::ForgeError::Internal(
995 "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
996 .into(),
997 )
998 })?;
999 issuer.sign(claims)
1000 }
1001
1002 pub async fn issue_token_pair(
1012 &self,
1013 user_id: Uuid,
1014 roles: &[&str],
1015 ) -> crate::error::Result<crate::auth::TokenPair> {
1016 let issuer = self.token_issuer.clone().ok_or_else(|| {
1017 crate::error::ForgeError::Internal(
1018 "Token issuer not available. Configure [auth] in forge.toml".into(),
1019 )
1020 })?;
1021 let access_ttl = self.token_ttl.access_token_secs;
1022 let refresh_ttl = self.token_ttl.refresh_token_days;
1023 crate::auth::tokens::issue_token_pair(
1024 &self.db_pool,
1025 user_id,
1026 roles,
1027 access_ttl,
1028 refresh_ttl,
1029 move |uid, r, ttl| {
1030 let claims = Claims::builder()
1031 .subject(uid)
1032 .roles(r.iter().map(|s| s.to_string()).collect())
1033 .duration_secs(ttl)
1034 .build()
1035 .map_err(crate::error::ForgeError::Internal)?;
1036 issuer.sign(&claims)
1037 },
1038 )
1039 .await
1040 }
1041
1042 pub async fn rotate_refresh_token(
1047 &self,
1048 old_refresh_token: &str,
1049 ) -> crate::error::Result<crate::auth::TokenPair> {
1050 let issuer = self.token_issuer.clone().ok_or_else(|| {
1051 crate::error::ForgeError::Internal(
1052 "Token issuer not available. Configure [auth] in forge.toml".into(),
1053 )
1054 })?;
1055 let access_ttl = self.token_ttl.access_token_secs;
1056 let refresh_ttl = self.token_ttl.refresh_token_days;
1057 crate::auth::tokens::rotate_refresh_token(
1058 &self.db_pool,
1059 old_refresh_token,
1060 &["user"],
1061 access_ttl,
1062 refresh_ttl,
1063 move |uid, r, ttl| {
1064 let claims = Claims::builder()
1065 .subject(uid)
1066 .roles(r.iter().map(|s| s.to_string()).collect())
1067 .duration_secs(ttl)
1068 .build()
1069 .map_err(crate::error::ForgeError::Internal)?;
1070 issuer.sign(&claims)
1071 },
1072 )
1073 .await
1074 }
1075
1076 pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1078 crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1079 }
1080
1081 pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1083 crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1084 }
1085
1086 pub async fn dispatch_job<T: serde::Serialize>(
1088 &self,
1089 job_type: &str,
1090 args: T,
1091 ) -> crate::error::Result<Uuid> {
1092 let args_json = serde_json::to_value(args)?;
1093
1094 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1096 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1097 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1098 })?;
1099
1100 let pending = PendingJob {
1101 id: Uuid::new_v4(),
1102 job_type: job_type.to_string(),
1103 args: args_json,
1104 context: serde_json::json!({}),
1105 owner_subject: self.auth.principal_id(),
1106 priority: job_info.priority.as_i32(),
1107 max_attempts: job_info.retry.max_attempts as i32,
1108 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1109 };
1110
1111 let job_id = pending.id;
1112 outbox
1113 .lock()
1114 .expect("outbox lock poisoned")
1115 .jobs
1116 .push(pending);
1117 return Ok(job_id);
1118 }
1119
1120 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1122 crate::error::ForgeError::Internal("Job dispatch not available".into())
1123 })?;
1124 dispatcher
1125 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1126 .await
1127 }
1128
1129 pub async fn dispatch_job_with_context<T: serde::Serialize>(
1131 &self,
1132 job_type: &str,
1133 args: T,
1134 context: serde_json::Value,
1135 ) -> crate::error::Result<Uuid> {
1136 let args_json = serde_json::to_value(args)?;
1137
1138 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1139 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1140 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1141 })?;
1142
1143 let pending = PendingJob {
1144 id: Uuid::new_v4(),
1145 job_type: job_type.to_string(),
1146 args: args_json,
1147 context,
1148 owner_subject: self.auth.principal_id(),
1149 priority: job_info.priority.as_i32(),
1150 max_attempts: job_info.retry.max_attempts as i32,
1151 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1152 };
1153
1154 let job_id = pending.id;
1155 outbox
1156 .lock()
1157 .expect("outbox lock poisoned")
1158 .jobs
1159 .push(pending);
1160 return Ok(job_id);
1161 }
1162
1163 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1164 crate::error::ForgeError::Internal("Job dispatch not available".into())
1165 })?;
1166 dispatcher
1167 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1168 .await
1169 }
1170
1171 pub async fn cancel_job(
1173 &self,
1174 job_id: Uuid,
1175 reason: Option<String>,
1176 ) -> crate::error::Result<bool> {
1177 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1178 crate::error::ForgeError::Internal("Job dispatch not available".into())
1179 })?;
1180 dispatcher.cancel(job_id, reason).await
1181 }
1182
1183 pub async fn start_workflow<T: serde::Serialize>(
1185 &self,
1186 workflow_name: &str,
1187 input: T,
1188 ) -> crate::error::Result<Uuid> {
1189 let input_json = serde_json::to_value(input)?;
1190
1191 if let Some(outbox) = &self.outbox {
1193 let pending = PendingWorkflow {
1194 id: Uuid::new_v4(),
1195 workflow_name: workflow_name.to_string(),
1196 input: input_json,
1197 owner_subject: self.auth.principal_id(),
1198 };
1199
1200 let workflow_id = pending.id;
1201 outbox
1202 .lock()
1203 .expect("outbox lock poisoned")
1204 .workflows
1205 .push(pending);
1206 return Ok(workflow_id);
1207 }
1208
1209 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1211 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1212 })?;
1213 dispatcher
1214 .start_by_name(workflow_name, input_json, self.auth.principal_id())
1215 .await
1216 }
1217}
1218
1219impl EnvAccess for MutationContext {
1220 fn env_provider(&self) -> &dyn EnvProvider {
1221 self.env_provider.as_ref()
1222 }
1223}
1224
1225#[cfg(test)]
1226#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1227mod tests {
1228 use super::*;
1229
1230 #[test]
1231 fn test_auth_context_unauthenticated() {
1232 let ctx = AuthContext::unauthenticated();
1233 assert!(!ctx.is_authenticated());
1234 assert!(ctx.user_id().is_none());
1235 assert!(ctx.require_user_id().is_err());
1236 }
1237
1238 #[test]
1239 fn test_auth_context_authenticated() {
1240 let user_id = Uuid::new_v4();
1241 let ctx = AuthContext::authenticated(
1242 user_id,
1243 vec!["admin".to_string(), "user".to_string()],
1244 HashMap::new(),
1245 );
1246
1247 assert!(ctx.is_authenticated());
1248 assert_eq!(ctx.user_id(), Some(user_id));
1249 assert!(ctx.require_user_id().is_ok());
1250 assert!(ctx.has_role("admin"));
1251 assert!(ctx.has_role("user"));
1252 assert!(!ctx.has_role("superadmin"));
1253 assert!(ctx.require_role("admin").is_ok());
1254 assert!(ctx.require_role("superadmin").is_err());
1255 }
1256
1257 #[test]
1258 fn test_auth_context_with_claims() {
1259 let mut claims = HashMap::new();
1260 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1261
1262 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1263
1264 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1265 assert!(ctx.claim("nonexistent").is_none());
1266 }
1267
1268 #[test]
1269 fn test_request_metadata() {
1270 let meta = RequestMetadata::new();
1271 assert!(!meta.trace_id.is_empty());
1272 assert!(meta.client_ip.is_none());
1273
1274 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1275 assert_eq!(meta2.trace_id, "trace-123");
1276 }
1277}