1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use futures_core::future::BoxFuture;
40use futures_core::stream::BoxStream;
41use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
42use sqlx::{Postgres, Transaction};
43use tokio::sync::Mutex as AsyncMutex;
44use uuid::Uuid;
45
46use tracing::Instrument;
47
48use super::dispatch::{JobDispatch, WorkflowDispatch};
49use crate::auth::Claims;
50use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
51use crate::http::CircuitBreakerClient;
52use crate::job::JobInfo;
53
54pub trait TokenIssuer: Send + Sync {
59 fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
61}
62
63pub enum ForgeConn<'a> {
76 Pool(sqlx::pool::PoolConnection<Postgres>),
77 Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
78}
79
80impl std::ops::Deref for ForgeConn<'_> {
81 type Target = PgConnection;
82 fn deref(&self) -> &PgConnection {
83 match self {
84 ForgeConn::Pool(c) => c,
85 ForgeConn::Tx(g) => g,
86 }
87 }
88}
89
90impl std::ops::DerefMut for ForgeConn<'_> {
91 fn deref_mut(&mut self) -> &mut PgConnection {
92 match self {
93 ForgeConn::Pool(c) => c,
94 ForgeConn::Tx(g) => g,
95 }
96 }
97}
98
99#[derive(Clone)]
111pub struct ForgeDb(sqlx::PgPool);
112
113impl std::fmt::Debug for ForgeDb {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_tuple("ForgeDb").finish()
116 }
117}
118
119impl ForgeDb {
120 pub fn from_pool(pool: &sqlx::PgPool) -> Self {
122 Self(pool.clone())
123 }
124}
125
126fn sql_operation(sql: &str) -> &'static str {
127 let bytes = sql.trim_start().as_bytes();
128 match bytes.get(..6) {
129 Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
130 Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
131 Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
132 Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
133 _ => "OTHER",
134 }
135}
136
137impl sqlx::Executor<'static> for ForgeDb {
138 type Database = Postgres;
139
140 fn fetch_many<'e, 'q: 'e, E>(
141 self,
142 query: E,
143 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
144 where
145 E: sqlx::Execute<'q, Postgres> + 'q,
146 {
147 (&self.0).fetch_many(query)
148 }
149
150 fn fetch_optional<'e, 'q: 'e, E>(
151 self,
152 query: E,
153 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
154 where
155 E: sqlx::Execute<'q, Postgres> + 'q,
156 {
157 let op = sql_operation(query.sql());
158 let span =
159 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
160 Box::pin(
161 async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
162 )
163 }
164
165 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
166 where
167 E: sqlx::Execute<'q, Postgres> + 'q,
168 {
169 let op = sql_operation(query.sql());
170 let span =
171 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
172 Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
173 }
174
175 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
176 where
177 E: sqlx::Execute<'q, Postgres> + 'q,
178 {
179 let op = sql_operation(query.sql());
180 let span =
181 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
182 Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
183 }
184
185 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
186 where
187 E: sqlx::Execute<'q, Postgres> + 'q,
188 {
189 let op = sql_operation(query.sql());
190 let span =
191 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
192 Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
193 }
194
195 fn prepare_with<'e, 'q: 'e>(
196 self,
197 sql: &'q str,
198 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
199 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
200 Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
201 }
202
203 fn describe<'e, 'q: 'e>(
204 self,
205 sql: &'q str,
206 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
207 Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
208 }
209}
210
211impl std::fmt::Debug for ForgeConn<'_> {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 match self {
214 ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
215 ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
216 }
217 }
218}
219
220impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
221 type Database = Postgres;
222
223 fn fetch_many<'e, 'q: 'e, E>(
224 self,
225 query: E,
226 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
227 where
228 'c: 'e,
229 E: sqlx::Execute<'q, Postgres> + 'q,
230 {
231 let conn: &'e mut PgConnection = &mut *self;
232 conn.fetch_many(query)
233 }
234
235 fn fetch_optional<'e, 'q: 'e, E>(
236 self,
237 query: E,
238 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
239 where
240 'c: 'e,
241 E: sqlx::Execute<'q, Postgres> + 'q,
242 {
243 let op = sql_operation(query.sql());
244 let span =
245 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
246 let conn: &'e mut PgConnection = &mut *self;
247 Box::pin(conn.fetch_optional(query).instrument(span))
248 }
249
250 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
251 where
252 'c: 'e,
253 E: sqlx::Execute<'q, Postgres> + 'q,
254 {
255 let op = sql_operation(query.sql());
256 let span =
257 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
258 let conn: &'e mut PgConnection = &mut *self;
259 Box::pin(conn.execute(query).instrument(span))
260 }
261
262 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
263 where
264 'c: 'e,
265 E: sqlx::Execute<'q, Postgres> + 'q,
266 {
267 let op = sql_operation(query.sql());
268 let span =
269 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
270 let conn: &'e mut PgConnection = &mut *self;
271 Box::pin(conn.fetch_all(query).instrument(span))
272 }
273
274 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
275 where
276 'c: 'e,
277 E: sqlx::Execute<'q, Postgres> + 'q,
278 {
279 let op = sql_operation(query.sql());
280 let span =
281 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
282 let conn: &'e mut PgConnection = &mut *self;
283 Box::pin(conn.fetch_one(query).instrument(span))
284 }
285
286 fn prepare_with<'e, 'q: 'e>(
287 self,
288 sql: &'q str,
289 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
290 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
291 where
292 'c: 'e,
293 {
294 let conn: &'e mut PgConnection = &mut *self;
295 conn.prepare_with(sql, parameters)
296 }
297
298 fn describe<'e, 'q: 'e>(
299 self,
300 sql: &'q str,
301 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
302 where
303 'c: 'e,
304 {
305 let conn: &'e mut PgConnection = &mut *self;
306 conn.describe(sql)
307 }
308}
309
310#[derive(Debug, Clone)]
311pub struct PendingJob {
312 pub id: Uuid,
313 pub job_type: String,
314 pub args: serde_json::Value,
315 pub context: serde_json::Value,
316 pub owner_subject: Option<String>,
317 pub priority: i32,
318 pub max_attempts: i32,
319 pub worker_capability: Option<String>,
320}
321
322#[derive(Debug, Clone)]
323pub struct PendingWorkflow {
324 pub id: Uuid,
325 pub workflow_name: String,
326 pub input: serde_json::Value,
327 pub owner_subject: Option<String>,
328}
329
330#[derive(Default)]
331pub struct OutboxBuffer {
332 pub jobs: Vec<PendingJob>,
333 pub workflows: Vec<PendingWorkflow>,
334}
335
336#[derive(Debug, Clone)]
338pub struct AuthContext {
339 user_id: Option<Uuid>,
341 roles: Vec<String>,
343 claims: HashMap<String, serde_json::Value>,
345 authenticated: bool,
347}
348
349impl AuthContext {
350 pub fn unauthenticated() -> Self {
352 Self {
353 user_id: None,
354 roles: Vec::new(),
355 claims: HashMap::new(),
356 authenticated: false,
357 }
358 }
359
360 pub fn authenticated(
362 user_id: Uuid,
363 roles: Vec<String>,
364 claims: HashMap<String, serde_json::Value>,
365 ) -> Self {
366 Self {
367 user_id: Some(user_id),
368 roles,
369 claims,
370 authenticated: true,
371 }
372 }
373
374 pub fn authenticated_without_uuid(
380 roles: Vec<String>,
381 claims: HashMap<String, serde_json::Value>,
382 ) -> Self {
383 Self {
384 user_id: None,
385 roles,
386 claims,
387 authenticated: true,
388 }
389 }
390
391 pub fn is_authenticated(&self) -> bool {
393 self.authenticated
394 }
395
396 pub fn user_id(&self) -> Option<Uuid> {
398 self.user_id
399 }
400
401 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
403 self.user_id
404 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
405 }
406
407 pub fn has_role(&self, role: &str) -> bool {
409 self.roles.iter().any(|r| r == role)
410 }
411
412 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
414 if self.has_role(role) {
415 Ok(())
416 } else {
417 Err(crate::error::ForgeError::Forbidden(format!(
418 "Required role '{}' not present",
419 role
420 )))
421 }
422 }
423
424 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
426 self.claims.get(key)
427 }
428
429 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
431 &self.claims
432 }
433
434 pub fn roles(&self) -> &[String] {
436 &self.roles
437 }
438
439 pub fn subject(&self) -> Option<&str> {
445 self.claims.get("sub").and_then(|v| v.as_str())
446 }
447
448 pub fn require_subject(&self) -> crate::error::Result<&str> {
450 if !self.authenticated {
451 return Err(crate::error::ForgeError::Unauthorized(
452 "Authentication required".to_string(),
453 ));
454 }
455 self.subject().ok_or_else(|| {
456 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
457 })
458 }
459
460 pub fn principal_id(&self) -> Option<String> {
464 self.subject()
465 .map(ToString::to_string)
466 .or_else(|| self.user_id.map(|id| id.to_string()))
467 }
468
469 pub fn is_admin(&self) -> bool {
471 self.roles.iter().any(|r| r == "admin")
472 }
473
474 pub fn tenant_id(&self) -> Option<uuid::Uuid> {
479 self.claims
480 .get("tenant_id")
481 .and_then(|v| v.as_str())
482 .and_then(|s| uuid::Uuid::parse_str(s).ok())
483 }
484}
485
486#[derive(Debug, Clone)]
488pub struct RequestMetadata {
489 pub request_id: Uuid,
491 pub trace_id: String,
493 pub client_ip: Option<String>,
495 pub user_agent: Option<String>,
497 pub correlation_id: Option<String>,
499 pub timestamp: chrono::DateTime<chrono::Utc>,
501}
502
503impl RequestMetadata {
504 pub fn new() -> Self {
506 Self {
507 request_id: Uuid::new_v4(),
508 trace_id: Uuid::new_v4().to_string(),
509 client_ip: None,
510 user_agent: None,
511 correlation_id: None,
512 timestamp: chrono::Utc::now(),
513 }
514 }
515
516 pub fn with_trace_id(trace_id: String) -> Self {
518 Self {
519 request_id: Uuid::new_v4(),
520 trace_id,
521 client_ip: None,
522 user_agent: None,
523 correlation_id: None,
524 timestamp: chrono::Utc::now(),
525 }
526 }
527}
528
529impl Default for RequestMetadata {
530 fn default() -> Self {
531 Self::new()
532 }
533}
534
535pub struct QueryContext {
537 pub auth: AuthContext,
539 pub request: RequestMetadata,
541 db_pool: sqlx::PgPool,
543 env_provider: Arc<dyn EnvProvider>,
545}
546
547impl QueryContext {
548 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
550 Self {
551 auth,
552 request,
553 db_pool,
554 env_provider: Arc::new(RealEnvProvider::new()),
555 }
556 }
557
558 pub fn with_env(
560 db_pool: sqlx::PgPool,
561 auth: AuthContext,
562 request: RequestMetadata,
563 env_provider: Arc<dyn EnvProvider>,
564 ) -> Self {
565 Self {
566 auth,
567 request,
568 db_pool,
569 env_provider,
570 }
571 }
572
573 pub fn db(&self) -> ForgeDb {
582 ForgeDb(self.db_pool.clone())
583 }
584
585 pub fn user_id(&self) -> crate::error::Result<Uuid> {
587 self.auth.require_user_id()
588 }
589
590 pub fn tenant_id(&self) -> Option<Uuid> {
592 self.auth.tenant_id()
593 }
594}
595
596impl EnvAccess for QueryContext {
597 fn env_provider(&self) -> &dyn EnvProvider {
598 self.env_provider.as_ref()
599 }
600}
601
602pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
604
605#[derive(Debug, Clone)]
607pub struct AuthTokenTtl {
608 pub access_token_secs: i64,
610 pub refresh_token_days: i64,
612}
613
614impl Default for AuthTokenTtl {
615 fn default() -> Self {
616 Self {
617 access_token_secs: 3600,
618 refresh_token_days: 30,
619 }
620 }
621}
622
623pub struct MutationContext {
625 pub auth: AuthContext,
627 pub request: RequestMetadata,
629 db_pool: sqlx::PgPool,
631 http_client: CircuitBreakerClient,
633 http_timeout: Option<Duration>,
636 job_dispatch: Option<Arc<dyn JobDispatch>>,
638 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
640 env_provider: Arc<dyn EnvProvider>,
642 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
644 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
646 job_info_lookup: Option<JobInfoLookup>,
648 token_issuer: Option<Arc<dyn TokenIssuer>>,
650 token_ttl: AuthTokenTtl,
652}
653
654impl MutationContext {
655 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
657 Self {
658 auth,
659 request,
660 db_pool,
661 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
662 http_timeout: None,
663 job_dispatch: None,
664 workflow_dispatch: None,
665 env_provider: Arc::new(RealEnvProvider::new()),
666 tx: None,
667 outbox: None,
668 job_info_lookup: None,
669 token_issuer: None,
670 token_ttl: AuthTokenTtl::default(),
671 }
672 }
673
674 pub fn with_dispatch(
676 db_pool: sqlx::PgPool,
677 auth: AuthContext,
678 request: RequestMetadata,
679 http_client: CircuitBreakerClient,
680 job_dispatch: Option<Arc<dyn JobDispatch>>,
681 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
682 ) -> Self {
683 Self {
684 auth,
685 request,
686 db_pool,
687 http_client,
688 http_timeout: None,
689 job_dispatch,
690 workflow_dispatch,
691 env_provider: Arc::new(RealEnvProvider::new()),
692 tx: None,
693 outbox: None,
694 job_info_lookup: None,
695 token_issuer: None,
696 token_ttl: AuthTokenTtl::default(),
697 }
698 }
699
700 pub fn with_env(
702 db_pool: sqlx::PgPool,
703 auth: AuthContext,
704 request: RequestMetadata,
705 http_client: CircuitBreakerClient,
706 job_dispatch: Option<Arc<dyn JobDispatch>>,
707 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
708 env_provider: Arc<dyn EnvProvider>,
709 ) -> Self {
710 Self {
711 auth,
712 request,
713 db_pool,
714 http_client,
715 http_timeout: None,
716 job_dispatch,
717 workflow_dispatch,
718 env_provider,
719 tx: None,
720 outbox: None,
721 job_info_lookup: None,
722 token_issuer: None,
723 token_ttl: AuthTokenTtl::default(),
724 }
725 }
726
727 #[allow(clippy::type_complexity)]
729 pub fn with_transaction(
730 db_pool: sqlx::PgPool,
731 tx: Transaction<'static, Postgres>,
732 auth: AuthContext,
733 request: RequestMetadata,
734 http_client: CircuitBreakerClient,
735 job_info_lookup: JobInfoLookup,
736 ) -> (
737 Self,
738 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
739 Arc<Mutex<OutboxBuffer>>,
740 ) {
741 let tx_handle = Arc::new(AsyncMutex::new(tx));
742 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
743
744 let ctx = Self {
745 auth,
746 request,
747 db_pool,
748 http_client,
749 http_timeout: None,
750 job_dispatch: None,
751 workflow_dispatch: None,
752 env_provider: Arc::new(RealEnvProvider::new()),
753 tx: Some(tx_handle.clone()),
754 outbox: Some(outbox.clone()),
755 job_info_lookup: Some(job_info_lookup),
756 token_issuer: None,
757 token_ttl: AuthTokenTtl::default(),
758 };
759
760 (ctx, tx_handle, outbox)
761 }
762
763 pub fn is_transactional(&self) -> bool {
764 self.tx.is_some()
765 }
766
767 pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
779 match &self.tx {
780 Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
781 None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
782 }
783 }
784
785 pub fn pool(&self) -> &sqlx::PgPool {
787 &self.db_pool
788 }
789
790 pub fn http(&self) -> crate::http::HttpClient {
796 self.http_client.with_timeout(self.http_timeout)
797 }
798
799 pub fn raw_http(&self) -> &reqwest::Client {
801 self.http_client.inner()
802 }
803
804 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
806 self.http_timeout = timeout;
807 }
808
809 pub fn user_id(&self) -> crate::error::Result<Uuid> {
811 self.auth.require_user_id()
812 }
813
814 pub fn tenant_id(&self) -> Option<Uuid> {
816 self.auth.tenant_id()
817 }
818
819 pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
821 self.token_issuer = Some(issuer);
822 }
823
824 pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
826 self.token_ttl = ttl;
827 }
828
829 pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
844 let issuer = self.token_issuer.as_ref().ok_or_else(|| {
845 crate::error::ForgeError::Internal(
846 "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
847 .into(),
848 )
849 })?;
850 issuer.sign(claims)
851 }
852
853 pub async fn issue_token_pair(
863 &self,
864 user_id: Uuid,
865 roles: &[&str],
866 ) -> crate::error::Result<crate::auth::TokenPair> {
867 let issuer = self.token_issuer.clone().ok_or_else(|| {
868 crate::error::ForgeError::Internal(
869 "Token issuer not available. Configure [auth] in forge.toml".into(),
870 )
871 })?;
872 let access_ttl = self.token_ttl.access_token_secs;
873 let refresh_ttl = self.token_ttl.refresh_token_days;
874 crate::auth::tokens::issue_token_pair(
875 &self.db_pool,
876 user_id,
877 roles,
878 access_ttl,
879 refresh_ttl,
880 move |uid, r, ttl| {
881 let claims = Claims::builder()
882 .subject(uid)
883 .roles(r.iter().map(|s| s.to_string()).collect())
884 .duration_secs(ttl)
885 .build()
886 .map_err(crate::error::ForgeError::Internal)?;
887 issuer.sign(&claims)
888 },
889 )
890 .await
891 }
892
893 pub async fn rotate_refresh_token(
898 &self,
899 old_refresh_token: &str,
900 ) -> crate::error::Result<crate::auth::TokenPair> {
901 let issuer = self.token_issuer.clone().ok_or_else(|| {
902 crate::error::ForgeError::Internal(
903 "Token issuer not available. Configure [auth] in forge.toml".into(),
904 )
905 })?;
906 let access_ttl = self.token_ttl.access_token_secs;
907 let refresh_ttl = self.token_ttl.refresh_token_days;
908 crate::auth::tokens::rotate_refresh_token(
909 &self.db_pool,
910 old_refresh_token,
911 &["user"],
912 access_ttl,
913 refresh_ttl,
914 move |uid, r, ttl| {
915 let claims = Claims::builder()
916 .subject(uid)
917 .roles(r.iter().map(|s| s.to_string()).collect())
918 .duration_secs(ttl)
919 .build()
920 .map_err(crate::error::ForgeError::Internal)?;
921 issuer.sign(&claims)
922 },
923 )
924 .await
925 }
926
927 pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
929 crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
930 }
931
932 pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
934 crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
935 }
936
937 pub async fn dispatch_job<T: serde::Serialize>(
939 &self,
940 job_type: &str,
941 args: T,
942 ) -> crate::error::Result<Uuid> {
943 let args_json = serde_json::to_value(args)?;
944
945 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
947 let job_info = job_info_lookup(job_type).ok_or_else(|| {
948 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
949 })?;
950
951 let pending = PendingJob {
952 id: Uuid::new_v4(),
953 job_type: job_type.to_string(),
954 args: args_json,
955 context: serde_json::json!({}),
956 owner_subject: self.auth.principal_id(),
957 priority: job_info.priority.as_i32(),
958 max_attempts: job_info.retry.max_attempts as i32,
959 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
960 };
961
962 let job_id = pending.id;
963 outbox
964 .lock()
965 .expect("outbox lock poisoned")
966 .jobs
967 .push(pending);
968 return Ok(job_id);
969 }
970
971 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
973 crate::error::ForgeError::Internal("Job dispatch not available".into())
974 })?;
975 dispatcher
976 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
977 .await
978 }
979
980 pub async fn dispatch_job_with_context<T: serde::Serialize>(
982 &self,
983 job_type: &str,
984 args: T,
985 context: serde_json::Value,
986 ) -> crate::error::Result<Uuid> {
987 let args_json = serde_json::to_value(args)?;
988
989 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
990 let job_info = job_info_lookup(job_type).ok_or_else(|| {
991 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
992 })?;
993
994 let pending = PendingJob {
995 id: Uuid::new_v4(),
996 job_type: job_type.to_string(),
997 args: args_json,
998 context,
999 owner_subject: self.auth.principal_id(),
1000 priority: job_info.priority.as_i32(),
1001 max_attempts: job_info.retry.max_attempts as i32,
1002 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
1003 };
1004
1005 let job_id = pending.id;
1006 outbox
1007 .lock()
1008 .expect("outbox lock poisoned")
1009 .jobs
1010 .push(pending);
1011 return Ok(job_id);
1012 }
1013
1014 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1015 crate::error::ForgeError::Internal("Job dispatch not available".into())
1016 })?;
1017 dispatcher
1018 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
1019 .await
1020 }
1021
1022 pub async fn cancel_job(
1024 &self,
1025 job_id: Uuid,
1026 reason: Option<String>,
1027 ) -> crate::error::Result<bool> {
1028 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1029 crate::error::ForgeError::Internal("Job dispatch not available".into())
1030 })?;
1031 dispatcher.cancel(job_id, reason).await
1032 }
1033
1034 pub async fn start_workflow<T: serde::Serialize>(
1036 &self,
1037 workflow_name: &str,
1038 input: T,
1039 ) -> crate::error::Result<Uuid> {
1040 let input_json = serde_json::to_value(input)?;
1041
1042 if let Some(outbox) = &self.outbox {
1044 let pending = PendingWorkflow {
1045 id: Uuid::new_v4(),
1046 workflow_name: workflow_name.to_string(),
1047 input: input_json,
1048 owner_subject: self.auth.principal_id(),
1049 };
1050
1051 let workflow_id = pending.id;
1052 outbox
1053 .lock()
1054 .expect("outbox lock poisoned")
1055 .workflows
1056 .push(pending);
1057 return Ok(workflow_id);
1058 }
1059
1060 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1062 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1063 })?;
1064 dispatcher
1065 .start_by_name(workflow_name, input_json, self.auth.principal_id())
1066 .await
1067 }
1068}
1069
1070impl EnvAccess for MutationContext {
1071 fn env_provider(&self) -> &dyn EnvProvider {
1072 self.env_provider.as_ref()
1073 }
1074}
1075
1076#[cfg(test)]
1077#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1078mod tests {
1079 use super::*;
1080
1081 #[test]
1082 fn test_auth_context_unauthenticated() {
1083 let ctx = AuthContext::unauthenticated();
1084 assert!(!ctx.is_authenticated());
1085 assert!(ctx.user_id().is_none());
1086 assert!(ctx.require_user_id().is_err());
1087 }
1088
1089 #[test]
1090 fn test_auth_context_authenticated() {
1091 let user_id = Uuid::new_v4();
1092 let ctx = AuthContext::authenticated(
1093 user_id,
1094 vec!["admin".to_string(), "user".to_string()],
1095 HashMap::new(),
1096 );
1097
1098 assert!(ctx.is_authenticated());
1099 assert_eq!(ctx.user_id(), Some(user_id));
1100 assert!(ctx.require_user_id().is_ok());
1101 assert!(ctx.has_role("admin"));
1102 assert!(ctx.has_role("user"));
1103 assert!(!ctx.has_role("superadmin"));
1104 assert!(ctx.require_role("admin").is_ok());
1105 assert!(ctx.require_role("superadmin").is_err());
1106 }
1107
1108 #[test]
1109 fn test_auth_context_with_claims() {
1110 let mut claims = HashMap::new();
1111 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1112
1113 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1114
1115 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1116 assert!(ctx.claim("nonexistent").is_none());
1117 }
1118
1119 #[test]
1120 fn test_request_metadata() {
1121 let meta = RequestMetadata::new();
1122 assert!(!meta.trace_id.is_empty());
1123 assert!(meta.client_ip.is_none());
1124
1125 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1126 assert_eq!(meta2.trace_id, "trace-123");
1127 }
1128}