1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54pub trait TokenIssuer: Send + Sync {
59 fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63pub enum ForgeConn<'a> {
76 Pool(sqlx::pool::PoolConnection<Postgres>),
77 Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81 type Target = PgConnection;
82 fn deref(&self) -> &PgConnection {
83 match self {
84 ForgeConn::Pool(c) => c,
85 ForgeConn::Tx(g) => g,
86 }
87 }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91 fn deref_mut(&mut self) -> &mut PgConnection {
92 match self {
93 ForgeConn::Pool(c) => c,
94 ForgeConn::Tx(g) => g,
95 }
96 }
97}
98
99#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_tuple("ForgeDb").finish()
116 }
117}
118
119impl ForgeDb {
120 pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122 Self(pool.clone())
123 }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127 let bytes = sql.trim_start().as_bytes();
128 match bytes.get(..6) {
129 Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130 Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131 Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132 Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133 _ => "OTHER",
134 }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138 type Database = Postgres;
139
140 fn fetch_many<'e, 'q: 'e, E>(
141 self,
142 query: E,
143 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144 where
145 E: sqlx::Execute<'q, Postgres> + 'q,
146 {
147 (&self.0).fetch_many(query)
148 }
149
150 fn fetch_optional<'e, 'q: 'e, E>(
151 self,
152 query: E,
153 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154 where
155 E: sqlx::Execute<'q, Postgres> + 'q,
156 {
157 let op = sql_operation(query.sql());
158 let span =
159 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160 Box::pin(
161 async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162 )
163 }
164
165 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166 where
167 E: sqlx::Execute<'q, Postgres> + 'q,
168 {
169 let op = sql_operation(query.sql());
170 let span =
171 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172 Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173 }
174
175 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176 where
177 E: sqlx::Execute<'q, Postgres> + 'q,
178 {
179 let op = sql_operation(query.sql());
180 let span =
181 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182 Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183 }
184
185 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186 where
187 E: sqlx::Execute<'q, Postgres> + 'q,
188 {
189 let op = sql_operation(query.sql());
190 let span =
191 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192 Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193 }
194
195 fn prepare_with<'e, 'q: 'e>(
196 self,
197 sql: &'q str,
198 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200 Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201 }
202
203 fn describe<'e, 'q: 'e>(
204 self,
205 sql: &'q str,
206 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207 Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208 }
209}
210
211pub enum DbConn<'a> {
227 Pool(sqlx::PgPool),
229 Transaction(
231 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
232 &'a sqlx::PgPool,
233 ),
234}
235
236impl DbConn<'_> {
237 pub async fn fetch_one<'q, O>(
239 &self,
240 query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
241 ) -> sqlx::Result<O>
242 where
243 O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
244 {
245 match self {
246 DbConn::Pool(pool) => query.fetch_one(pool).await,
247 DbConn::Transaction(tx, _) => {
248 let mut guard = tx.lock().await;
249 query.fetch_one(&mut **guard).await
250 }
251 }
252 }
253
254 pub async fn fetch_optional<'q, O>(
256 &self,
257 query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
258 ) -> sqlx::Result<Option<O>>
259 where
260 O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
261 {
262 match self {
263 DbConn::Pool(pool) => query.fetch_optional(pool).await,
264 DbConn::Transaction(tx, _) => {
265 let mut guard = tx.lock().await;
266 query.fetch_optional(&mut **guard).await
267 }
268 }
269 }
270
271 pub async fn fetch_all<'q, O>(
273 &self,
274 query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
275 ) -> sqlx::Result<Vec<O>>
276 where
277 O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
278 {
279 match self {
280 DbConn::Pool(pool) => query.fetch_all(pool).await,
281 DbConn::Transaction(tx, _) => {
282 let mut guard = tx.lock().await;
283 query.fetch_all(&mut **guard).await
284 }
285 }
286 }
287
288 pub async fn execute<'q>(
290 &self,
291 query: sqlx::query::Query<'q, Postgres, sqlx::postgres::PgArguments>,
292 ) -> sqlx::Result<PgQueryResult> {
293 match self {
294 DbConn::Pool(pool) => query.execute(pool).await,
295 DbConn::Transaction(tx, _) => {
296 let mut guard = tx.lock().await;
297 query.execute(&mut **guard).await
298 }
299 }
300 }
301}
302
303impl std::fmt::Debug for DbConn<'_> {
304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305 match self {
306 DbConn::Pool(_) => f.debug_tuple("DbConn::Pool").finish(),
307 DbConn::Transaction(_, _) => f.debug_tuple("DbConn::Transaction").finish(),
308 }
309 }
310}
311
312impl std::fmt::Debug for ForgeConn<'_> {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 match self {
315 ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
316 ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
317 }
318 }
319}
320
321impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
322 type Database = Postgres;
323
324 fn fetch_many<'e, 'q: 'e, E>(
325 self,
326 query: E,
327 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
328 where
329 'c: 'e,
330 E: sqlx::Execute<'q, Postgres> + 'q,
331 {
332 let conn: &'e mut PgConnection = &mut *self;
333 conn.fetch_many(query)
334 }
335
336 fn fetch_optional<'e, 'q: 'e, E>(
337 self,
338 query: E,
339 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
340 where
341 'c: 'e,
342 E: sqlx::Execute<'q, Postgres> + 'q,
343 {
344 let op = sql_operation(query.sql());
345 let span =
346 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
347 let conn: &'e mut PgConnection = &mut *self;
348 Box::pin(conn.fetch_optional(query).instrument(span))
349 }
350
351 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
352 where
353 'c: 'e,
354 E: sqlx::Execute<'q, Postgres> + 'q,
355 {
356 let op = sql_operation(query.sql());
357 let span =
358 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
359 let conn: &'e mut PgConnection = &mut *self;
360 Box::pin(conn.execute(query).instrument(span))
361 }
362
363 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
364 where
365 'c: 'e,
366 E: sqlx::Execute<'q, Postgres> + 'q,
367 {
368 let op = sql_operation(query.sql());
369 let span =
370 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
371 let conn: &'e mut PgConnection = &mut *self;
372 Box::pin(conn.fetch_all(query).instrument(span))
373 }
374
375 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
376 where
377 'c: 'e,
378 E: sqlx::Execute<'q, Postgres> + 'q,
379 {
380 let op = sql_operation(query.sql());
381 let span =
382 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
383 let conn: &'e mut PgConnection = &mut *self;
384 Box::pin(conn.fetch_one(query).instrument(span))
385 }
386
387 fn prepare_with<'e, 'q: 'e>(
388 self,
389 sql: &'q str,
390 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
391 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
392 where
393 'c: 'e,
394 {
395 let conn: &'e mut PgConnection = &mut *self;
396 conn.prepare_with(sql, parameters)
397 }
398
399 fn describe<'e, 'q: 'e>(
400 self,
401 sql: &'q str,
402 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
403 where
404 'c: 'e,
405 {
406 let conn: &'e mut PgConnection = &mut *self;
407 conn.describe(sql)
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct PendingJob {
414 pub id: Uuid,
415 pub job_type: String,
416 pub args: serde_json::Value,
417 pub context: serde_json::Value,
418 pub owner_subject: Option<String>,
419 pub priority: i32,
420 pub max_attempts: i32,
421 pub worker_capability: Option<String>,
422}
423
424#[derive(Debug, Clone)]
426pub struct PendingWorkflow {
427 pub id: Uuid,
428 pub workflow_name: String,
429 pub workflow_version: String,
430 pub workflow_signature: String,
431 pub input: serde_json::Value,
432 pub owner_subject: Option<String>,
433}
434
435#[derive(Default)]
440pub struct OutboxBuffer {
441 pub jobs: Vec<PendingJob>,
442 pub workflows: Vec<PendingWorkflow>,
443}
444
445#[derive(Debug, Clone)]
447pub struct AuthContext {
448 user_id: Option<Uuid>,
450 roles: Vec<String>,
452 claims: HashMap<String, serde_json::Value>,
454 authenticated: bool,
456}
457
458impl AuthContext {
459 pub fn unauthenticated() -> Self {
461 Self {
462 user_id: None,
463 roles: Vec::new(),
464 claims: HashMap::new(),
465 authenticated: false,
466 }
467 }
468
469 pub fn authenticated(
471 user_id: Uuid,
472 roles: Vec<String>,
473 claims: HashMap<String, serde_json::Value>,
474 ) -> Self {
475 Self {
476 user_id: Some(user_id),
477 roles,
478 claims,
479 authenticated: true,
480 }
481 }
482
483 pub fn authenticated_without_uuid(
489 roles: Vec<String>,
490 claims: HashMap<String, serde_json::Value>,
491 ) -> Self {
492 Self {
493 user_id: None,
494 roles,
495 claims,
496 authenticated: true,
497 }
498 }
499
500 pub fn is_authenticated(&self) -> bool {
502 self.authenticated
503 }
504
505 pub fn user_id(&self) -> Option<Uuid> {
507 self.user_id
508 }
509
510 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
512 self.user_id
513 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
514 }
515
516 pub fn has_role(&self, role: &str) -> bool {
518 self.roles.iter().any(|r| r == role)
519 }
520
521 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
523 if self.has_role(role) {
524 Ok(())
525 } else {
526 Err(crate::error::ForgeError::Forbidden(format!(
527 "Required role '{}' not present",
528 role
529 )))
530 }
531 }
532
533 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
535 self.claims.get(key)
536 }
537
538 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
540 &self.claims
541 }
542
543 pub fn roles(&self) -> &[String] {
545 &self.roles
546 }
547
548 pub fn subject(&self) -> Option<&str> {
554 self.claims.get("sub").and_then(|v| v.as_str())
555 }
556
557 pub fn require_subject(&self) -> crate::error::Result<&str> {
559 if !self.authenticated {
560 return Err(crate::error::ForgeError::Unauthorized(
561 "Authentication required".to_string(),
562 ));
563 }
564 self.subject().ok_or_else(|| {
565 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
566 })
567 }
568
569 pub fn principal_id(&self) -> Option<String> {
573 self.subject()
574 .map(ToString::to_string)
575 .or_else(|| self.user_id.map(|id| id.to_string()))
576 }
577
578 pub fn is_admin(&self) -> bool {
580 self.roles.iter().any(|r| r == "admin")
581 }
582
583 pub fn tenant_id(&self) -> Option<uuid::Uuid> {
588 self.claims
589 .get("tenant_id")
590 .and_then(|v| v.as_str())
591 .and_then(|s| uuid::Uuid::parse_str(s).ok())
592 }
593}
594
595#[derive(Debug, Clone)]
597pub struct RequestMetadata {
598 pub request_id: Uuid,
600 pub trace_id: String,
602 pub client_ip: Option<String>,
604 pub user_agent: Option<String>,
606 pub correlation_id: Option<String>,
608 pub timestamp: chrono::DateTime<chrono::Utc>,
610}
611
612impl RequestMetadata {
613 pub fn new() -> Self {
615 Self {
616 request_id: Uuid::new_v4(),
617 trace_id: Uuid::new_v4().to_string(),
618 client_ip: None,
619 user_agent: None,
620 correlation_id: None,
621 timestamp: chrono::Utc::now(),
622 }
623 }
624
625 pub fn with_trace_id(trace_id: String) -> Self {
627 Self {
628 request_id: Uuid::new_v4(),
629 trace_id,
630 client_ip: None,
631 user_agent: None,
632 correlation_id: None,
633 timestamp: chrono::Utc::now(),
634 }
635 }
636}
637
638impl Default for RequestMetadata {
639 fn default() -> Self {
640 Self::new()
641 }
642}
643
644pub struct QueryContext {
646 pub auth: AuthContext,
648 pub request: RequestMetadata,
650 db_pool: sqlx::PgPool,
652 env_provider: Arc<dyn EnvProvider>,
654}
655
656impl QueryContext {
657 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
659 Self {
660 auth,
661 request,
662 db_pool,
663 env_provider: Arc::new(RealEnvProvider::new()),
664 }
665 }
666
667 pub fn with_env(
669 db_pool: sqlx::PgPool,
670 auth: AuthContext,
671 request: RequestMetadata,
672 env_provider: Arc<dyn EnvProvider>,
673 ) -> Self {
674 Self {
675 auth,
676 request,
677 db_pool,
678 env_provider,
679 }
680 }
681
682 pub fn db(&self) -> ForgeDb {
691 ForgeDb(self.db_pool.clone())
692 }
693
694 pub fn db_conn(&self) -> DbConn<'_> {
708 DbConn::Pool(self.db_pool.clone())
709 }
710
711 pub fn user_id(&self) -> crate::error::Result<Uuid> {
713 self.auth.require_user_id()
714 }
715
716 pub fn tenant_id(&self) -> Option<Uuid> {
718 self.auth.tenant_id()
719 }
720}
721
722impl EnvAccess for QueryContext {
723 fn env_provider(&self) -> &dyn EnvProvider {
724 self.env_provider.as_ref()
725 }
726}
727
728pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
730
731#[derive(Debug, Clone)]
733pub struct AuthTokenTtl {
734 pub access_token_secs: i64,
736 pub refresh_token_days: i64,
738}
739
740impl Default for AuthTokenTtl {
741 fn default() -> Self {
742 Self {
743 access_token_secs: 3600,
744 refresh_token_days: 30,
745 }
746 }
747}
748
749pub struct MutationContext {
751 pub auth: AuthContext,
753 pub request: RequestMetadata,
755 db_pool: sqlx::PgPool,
757 http_client: CircuitBreakerClient,
759 http_timeout: Option<Duration>,
762 job_dispatch: Option<Arc<dyn JobDispatch>>,
764 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
766 env_provider: Arc<dyn EnvProvider>,
768 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
770 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
772 job_info_lookup: Option<JobInfoLookup>,
774 token_issuer: Option<Arc<dyn TokenIssuer>>,
776 token_ttl: AuthTokenTtl,
778}
779
780impl MutationContext {
781 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
783 Self {
784 auth,
785 request,
786 db_pool,
787 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
788 http_timeout: None,
789 job_dispatch: None,
790 workflow_dispatch: None,
791 env_provider: Arc::new(RealEnvProvider::new()),
792 tx: None,
793 outbox: None,
794 job_info_lookup: None,
795 token_issuer: None,
796 token_ttl: AuthTokenTtl::default(),
797 }
798 }
799
800 pub fn with_dispatch(
802 db_pool: sqlx::PgPool,
803 auth: AuthContext,
804 request: RequestMetadata,
805 http_client: CircuitBreakerClient,
806 job_dispatch: Option<Arc<dyn JobDispatch>>,
807 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
808 ) -> Self {
809 Self {
810 auth,
811 request,
812 db_pool,
813 http_client,
814 http_timeout: None,
815 job_dispatch,
816 workflow_dispatch,
817 env_provider: Arc::new(RealEnvProvider::new()),
818 tx: None,
819 outbox: None,
820 job_info_lookup: None,
821 token_issuer: None,
822 token_ttl: AuthTokenTtl::default(),
823 }
824 }
825
826 pub fn with_env(
828 db_pool: sqlx::PgPool,
829 auth: AuthContext,
830 request: RequestMetadata,
831 http_client: CircuitBreakerClient,
832 job_dispatch: Option<Arc<dyn JobDispatch>>,
833 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
834 env_provider: Arc<dyn EnvProvider>,
835 ) -> Self {
836 Self {
837 auth,
838 request,
839 db_pool,
840 http_client,
841 http_timeout: None,
842 job_dispatch,
843 workflow_dispatch,
844 env_provider,
845 tx: None,
846 outbox: None,
847 job_info_lookup: None,
848 token_issuer: None,
849 token_ttl: AuthTokenTtl::default(),
850 }
851 }
852
853 #[allow(clippy::type_complexity)]
855 pub fn with_transaction(
856 db_pool: sqlx::PgPool,
857 tx: Transaction<'static, Postgres>,
858 auth: AuthContext,
859 request: RequestMetadata,
860 http_client: CircuitBreakerClient,
861 job_info_lookup: JobInfoLookup,
862 ) -> (
863 Self,
864 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
865 Arc<Mutex<OutboxBuffer>>,
866 ) {
867 let tx_handle = Arc::new(AsyncMutex::new(tx));
868 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
869
870 let ctx = Self {
871 auth,
872 request,
873 db_pool,
874 http_client,
875 http_timeout: None,
876 job_dispatch: None,
877 workflow_dispatch: None,
878 env_provider: Arc::new(RealEnvProvider::new()),
879 tx: Some(tx_handle.clone()),
880 outbox: Some(outbox.clone()),
881 job_info_lookup: Some(job_info_lookup),
882 token_issuer: None,
883 token_ttl: AuthTokenTtl::default(),
884 };
885
886 (ctx, tx_handle, outbox)
887 }
888
889 pub fn is_transactional(&self) -> bool {
890 self.tx.is_some()
891 }
892
893 pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
905 match &self.tx {
906 Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
907 None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
908 }
909 }
910
911 pub fn pool(&self) -> &sqlx::PgPool {
913 &self.db_pool
914 }
915
916 pub fn db(&self) -> DbConn<'_> {
930 match &self.tx {
931 Some(tx) => DbConn::Transaction(tx.clone(), &self.db_pool),
932 None => DbConn::Pool(self.db_pool.clone()),
933 }
934 }
935
936 pub fn db_conn(&self) -> DbConn<'_> {
938 self.db()
939 }
940
941 pub fn http(&self) -> crate::http::HttpClient {
947 self.http_client.with_timeout(self.http_timeout)
948 }
949
950 pub fn raw_http(&self) -> &reqwest::Client {
952 self.http_client.inner()
953 }
954
955 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
957 self.http_timeout = timeout;
958 }
959
960 pub fn user_id(&self) -> crate::error::Result<Uuid> {
962 self.auth.require_user_id()
963 }
964
965 pub fn tenant_id(&self) -> Option<Uuid> {
967 self.auth.tenant_id()
968 }
969
970 pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
972 self.token_issuer = Some(issuer);
973 }
974
975 pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
977 self.token_ttl = ttl;
978 }
979
980 pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
995 let issuer = self.token_issuer.as_ref().ok_or_else(|| {
996 crate::error::ForgeError::Internal(
997 "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
998 .into(),
999 )
1000 })?;
1001 issuer.sign(claims)
1002 }
1003
1004 pub async fn issue_token_pair(
1014 &self,
1015 user_id: Uuid,
1016 roles: &[&str],
1017 ) -> crate::error::Result<crate::auth::TokenPair> {
1018 let issuer = self.token_issuer.clone().ok_or_else(|| {
1019 crate::error::ForgeError::Internal(
1020 "Token issuer not available. Configure [auth] in forge.toml".into(),
1021 )
1022 })?;
1023 let access_ttl = self.token_ttl.access_token_secs;
1024 let refresh_ttl = self.token_ttl.refresh_token_days;
1025 crate::auth::tokens::issue_token_pair(
1026 &self.db_pool,
1027 user_id,
1028 roles,
1029 access_ttl,
1030 refresh_ttl,
1031 move |uid, r, ttl| {
1032 let claims = Claims::builder()
1033 .subject(uid)
1034 .roles(r.iter().map(|s| s.to_string()).collect())
1035 .duration_secs(ttl)
1036 .build()
1037 .map_err(crate::error::ForgeError::Internal)?;
1038 issuer.sign(&claims)
1039 },
1040 )
1041 .await
1042 }
1043
1044 pub async fn rotate_refresh_token(
1049 &self,
1050 old_refresh_token: &str,
1051 ) -> crate::error::Result<crate::auth::TokenPair> {
1052 let issuer = self.token_issuer.clone().ok_or_else(|| {
1053 crate::error::ForgeError::Internal(
1054 "Token issuer not available. Configure [auth] in forge.toml".into(),
1055 )
1056 })?;
1057 let access_ttl = self.token_ttl.access_token_secs;
1058 let refresh_ttl = self.token_ttl.refresh_token_days;
1059 crate::auth::tokens::rotate_refresh_token(
1060 &self.db_pool,
1061 old_refresh_token,
1062 &["user"],
1063 access_ttl,
1064 refresh_ttl,
1065 move |uid, r, ttl| {
1066 let claims = Claims::builder()
1067 .subject(uid)
1068 .roles(r.iter().map(|s| s.to_string()).collect())
1069 .duration_secs(ttl)
1070 .build()
1071 .map_err(crate::error::ForgeError::Internal)?;
1072 issuer.sign(&claims)
1073 },
1074 )
1075 .await
1076 }
1077
1078 pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
1080 crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
1081 }
1082
1083 pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
1085 crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
1086 }
1087
1088 pub async fn dispatch_job<T: serde::Serialize>(
1090 &self,
1091 job_type: &str,
1092 args: T,
1093 ) -> crate::error::Result<Uuid> {
1094 let args_json = serde_json::to_value(args)?;
1095
1096 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1098 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1099 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1100 })?;
1101
1102 let pending = PendingJob {
1103 id: Uuid::new_v4(),
1104 job_type: job_type.to_string(),
1105 args: args_json,
1106 context: serde_json::json!({}),
1107 owner_subject: self.auth.principal_id(),
1108 priority: job_info.priority.as_i32(),
1109 max_attempts: job_info.retry.max_attempts as i32,
1110 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1111 };
1112
1113 let job_id = pending.id;
1114 outbox
1115 .lock()
1116 .expect("outbox lock poisoned")
1117 .jobs
1118 .push(pending);
1119 return Ok(job_id);
1120 }
1121
1122 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1124 crate::error::ForgeError::Internal("Job dispatch not available".into())
1125 })?;
1126 dispatcher
1127 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1128 .await
1129 }
1130
1131 pub async fn dispatch_job_with_context<T: serde::Serialize>(
1133 &self,
1134 job_type: &str,
1135 args: T,
1136 context: serde_json::Value,
1137 ) -> crate::error::Result<Uuid> {
1138 let args_json = serde_json::to_value(args)?;
1139
1140 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
1141 let job_info = job_info_lookup(job_type).ok_or_else(|| {
1142 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
1143 })?;
1144
1145 let pending = PendingJob {
1146 id: Uuid::new_v4(),
1147 job_type: job_type.to_string(),
1148 args: args_json,
1149 context,
1150 owner_subject: self.auth.principal_id(),
1151 priority: job_info.priority.as_i32(),
1152 max_attempts: job_info.retry.max_attempts as i32,
1153 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1154 };
1155
1156 let job_id = pending.id;
1157 outbox
1158 .lock()
1159 .expect("outbox lock poisoned")
1160 .jobs
1161 .push(pending);
1162 return Ok(job_id);
1163 }
1164
1165 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1166 crate::error::ForgeError::Internal("Job dispatch not available".into())
1167 })?;
1168 dispatcher
1169 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1170 .await
1171 }
1172
1173 pub async fn cancel_job(
1175 &self,
1176 job_id: Uuid,
1177 reason: Option<String>,
1178 ) -> crate::error::Result<bool> {
1179 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1180 crate::error::ForgeError::Internal("Job dispatch not available".into())
1181 })?;
1182 dispatcher.cancel(job_id, reason).await
1183 }
1184
1185 pub async fn start_workflow<T: serde::Serialize>(
1187 &self,
1188 workflow_name: &str,
1189 input: T,
1190 ) -> crate::error::Result<Uuid> {
1191 let input_json = serde_json::to_value(input)?;
1192
1193 if let Some(outbox) = &self.outbox {
1195 let info = self
1199 .workflow_dispatch
1200 .as_ref()
1201 .and_then(|d| d.get_info(workflow_name))
1202 .ok_or_else(|| {
1203 crate::error::ForgeError::NotFound(format!(
1204 "No active version of workflow '{}'",
1205 workflow_name
1206 ))
1207 })?;
1208
1209 let pending = PendingWorkflow {
1210 id: Uuid::new_v4(),
1211 workflow_name: workflow_name.to_string(),
1212 workflow_version: info.version.to_string(),
1213 workflow_signature: info.signature.to_string(),
1214 input: input_json,
1215 owner_subject: self.auth.principal_id(),
1216 };
1217
1218 let workflow_id = pending.id;
1219 outbox
1220 .lock()
1221 .expect("outbox lock poisoned")
1222 .workflows
1223 .push(pending);
1224 return Ok(workflow_id);
1225 }
1226
1227 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1229 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1230 })?;
1231 dispatcher
1232 .start_by_name(workflow_name, input_json, self.auth.principal_id())
1233 .await
1234 }
1235}
1236
1237impl EnvAccess for MutationContext {
1238 fn env_provider(&self) -> &dyn EnvProvider {
1239 self.env_provider.as_ref()
1240 }
1241}
1242
1243#[cfg(test)]
1244#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1245mod tests {
1246 use super::*;
1247
1248 #[test]
1249 fn test_auth_context_unauthenticated() {
1250 let ctx = AuthContext::unauthenticated();
1251 assert!(!ctx.is_authenticated());
1252 assert!(ctx.user_id().is_none());
1253 assert!(ctx.require_user_id().is_err());
1254 }
1255
1256 #[test]
1257 fn test_auth_context_authenticated() {
1258 let user_id = Uuid::new_v4();
1259 let ctx = AuthContext::authenticated(
1260 user_id,
1261 vec!["admin".to_string(), "user".to_string()],
1262 HashMap::new(),
1263 );
1264
1265 assert!(ctx.is_authenticated());
1266 assert_eq!(ctx.user_id(), Some(user_id));
1267 assert!(ctx.require_user_id().is_ok());
1268 assert!(ctx.has_role("admin"));
1269 assert!(ctx.has_role("user"));
1270 assert!(!ctx.has_role("superadmin"));
1271 assert!(ctx.require_role("admin").is_ok());
1272 assert!(ctx.require_role("superadmin").is_err());
1273 }
1274
1275 #[test]
1276 fn test_auth_context_with_claims() {
1277 let mut claims = HashMap::new();
1278 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1279
1280 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1281
1282 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1283 assert!(ctx.claim("nonexistent").is_none());
1284 }
1285
1286 #[test]
1287 fn test_request_metadata() {
1288 let meta = RequestMetadata::new();
1289 assert!(!meta.trace_id.is_empty());
1290 assert!(meta.client_ip.is_none());
1291
1292 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1293 assert_eq!(meta2.trace_id, "trace-123");
1294 }
1295}