1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37
38use sqlx::postgres::{PgArguments, PgQueryResult, PgRow};
39use sqlx::{FromRow, Postgres, Transaction};
40use tokio::sync::Mutex as AsyncMutex;
41use uuid::Uuid;
42
43use tracing::Instrument;
44
45use super::dispatch::{JobDispatch, WorkflowDispatch};
46use crate::auth::Claims;
47use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
48use crate::http::CircuitBreakerClient;
49use crate::job::JobInfo;
50
51pub trait TokenIssuer: Send + Sync {
56 fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
58}
59
60pub enum DbConn<'a> {
62 Pool(&'a sqlx::PgPool),
63 Transaction(Arc<AsyncMutex<Transaction<'static, Postgres>>>),
64}
65
66impl DbConn<'_> {
67 pub async fn fetch_one<'q, O>(
68 &self,
69 query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
70 ) -> sqlx::Result<O>
71 where
72 O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
73 {
74 let span = tracing::info_span!(
75 "db.query",
76 db.system = "postgresql",
77 db.operation.name = "fetch_one",
78 );
79 async {
80 match self {
81 DbConn::Pool(pool) => query.fetch_one(*pool).await,
82 DbConn::Transaction(tx) => query.fetch_one(&mut **tx.lock().await).await,
83 }
84 }
85 .instrument(span)
86 .await
87 }
88
89 pub async fn fetch_optional<'q, O>(
90 &self,
91 query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
92 ) -> sqlx::Result<Option<O>>
93 where
94 O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
95 {
96 let span = tracing::info_span!(
97 "db.query",
98 db.system = "postgresql",
99 db.operation.name = "fetch_optional",
100 );
101 async {
102 match self {
103 DbConn::Pool(pool) => query.fetch_optional(*pool).await,
104 DbConn::Transaction(tx) => query.fetch_optional(&mut **tx.lock().await).await,
105 }
106 }
107 .instrument(span)
108 .await
109 }
110
111 pub async fn fetch_all<'q, O>(
112 &self,
113 query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
114 ) -> sqlx::Result<Vec<O>>
115 where
116 O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
117 {
118 let span = tracing::info_span!(
119 "db.query",
120 db.system = "postgresql",
121 db.operation.name = "fetch_all",
122 );
123 async {
124 match self {
125 DbConn::Pool(pool) => query.fetch_all(*pool).await,
126 DbConn::Transaction(tx) => query.fetch_all(&mut **tx.lock().await).await,
127 }
128 }
129 .instrument(span)
130 .await
131 }
132
133 pub async fn execute<'q>(
134 &self,
135 query: sqlx::query::Query<'q, Postgres, PgArguments>,
136 ) -> sqlx::Result<PgQueryResult> {
137 let span = tracing::info_span!(
138 "db.query",
139 db.system = "postgresql",
140 db.operation.name = "execute",
141 );
142 async {
143 match self {
144 DbConn::Pool(pool) => query.execute(*pool).await,
145 DbConn::Transaction(tx) => query.execute(&mut **tx.lock().await).await,
146 }
147 }
148 .instrument(span)
149 .await
150 }
151}
152
153#[derive(Debug, Clone)]
154pub struct PendingJob {
155 pub id: Uuid,
156 pub job_type: String,
157 pub args: serde_json::Value,
158 pub context: serde_json::Value,
159 pub owner_subject: Option<String>,
160 pub priority: i32,
161 pub max_attempts: i32,
162 pub worker_capability: Option<String>,
163}
164
165#[derive(Debug, Clone)]
166pub struct PendingWorkflow {
167 pub id: Uuid,
168 pub workflow_name: String,
169 pub input: serde_json::Value,
170 pub owner_subject: Option<String>,
171}
172
173#[derive(Default)]
174pub struct OutboxBuffer {
175 pub jobs: Vec<PendingJob>,
176 pub workflows: Vec<PendingWorkflow>,
177}
178
179#[derive(Debug, Clone)]
181pub struct AuthContext {
182 user_id: Option<Uuid>,
184 roles: Vec<String>,
186 claims: HashMap<String, serde_json::Value>,
188 authenticated: bool,
190}
191
192impl AuthContext {
193 pub fn unauthenticated() -> Self {
195 Self {
196 user_id: None,
197 roles: Vec::new(),
198 claims: HashMap::new(),
199 authenticated: false,
200 }
201 }
202
203 pub fn authenticated(
205 user_id: Uuid,
206 roles: Vec<String>,
207 claims: HashMap<String, serde_json::Value>,
208 ) -> Self {
209 Self {
210 user_id: Some(user_id),
211 roles,
212 claims,
213 authenticated: true,
214 }
215 }
216
217 pub fn authenticated_without_uuid(
223 roles: Vec<String>,
224 claims: HashMap<String, serde_json::Value>,
225 ) -> Self {
226 Self {
227 user_id: None,
228 roles,
229 claims,
230 authenticated: true,
231 }
232 }
233
234 pub fn is_authenticated(&self) -> bool {
236 self.authenticated
237 }
238
239 pub fn user_id(&self) -> Option<Uuid> {
241 self.user_id
242 }
243
244 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
246 self.user_id
247 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
248 }
249
250 pub fn has_role(&self, role: &str) -> bool {
252 self.roles.iter().any(|r| r == role)
253 }
254
255 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
257 if self.has_role(role) {
258 Ok(())
259 } else {
260 Err(crate::error::ForgeError::Forbidden(format!(
261 "Required role '{}' not present",
262 role
263 )))
264 }
265 }
266
267 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
269 self.claims.get(key)
270 }
271
272 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
274 &self.claims
275 }
276
277 pub fn roles(&self) -> &[String] {
279 &self.roles
280 }
281
282 pub fn subject(&self) -> Option<&str> {
288 self.claims.get("sub").and_then(|v| v.as_str())
289 }
290
291 pub fn require_subject(&self) -> crate::error::Result<&str> {
293 if !self.authenticated {
294 return Err(crate::error::ForgeError::Unauthorized(
295 "Authentication required".to_string(),
296 ));
297 }
298 self.subject().ok_or_else(|| {
299 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
300 })
301 }
302
303 pub fn principal_id(&self) -> Option<String> {
307 self.subject()
308 .map(ToString::to_string)
309 .or_else(|| self.user_id.map(|id| id.to_string()))
310 }
311
312 pub fn is_admin(&self) -> bool {
314 self.roles.iter().any(|r| r == "admin")
315 }
316}
317
318#[derive(Debug, Clone)]
320pub struct RequestMetadata {
321 pub request_id: Uuid,
323 pub trace_id: String,
325 pub client_ip: Option<String>,
327 pub user_agent: Option<String>,
329 pub timestamp: chrono::DateTime<chrono::Utc>,
331}
332
333impl RequestMetadata {
334 pub fn new() -> Self {
336 Self {
337 request_id: Uuid::new_v4(),
338 trace_id: Uuid::new_v4().to_string(),
339 client_ip: None,
340 user_agent: None,
341 timestamp: chrono::Utc::now(),
342 }
343 }
344
345 pub fn with_trace_id(trace_id: String) -> Self {
347 Self {
348 request_id: Uuid::new_v4(),
349 trace_id,
350 client_ip: None,
351 user_agent: None,
352 timestamp: chrono::Utc::now(),
353 }
354 }
355}
356
357impl Default for RequestMetadata {
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363pub struct QueryContext {
365 pub auth: AuthContext,
367 pub request: RequestMetadata,
369 db_pool: sqlx::PgPool,
371 env_provider: Arc<dyn EnvProvider>,
373}
374
375impl QueryContext {
376 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
378 Self {
379 auth,
380 request,
381 db_pool,
382 env_provider: Arc::new(RealEnvProvider::new()),
383 }
384 }
385
386 pub fn with_env(
388 db_pool: sqlx::PgPool,
389 auth: AuthContext,
390 request: RequestMetadata,
391 env_provider: Arc<dyn EnvProvider>,
392 ) -> Self {
393 Self {
394 auth,
395 request,
396 db_pool,
397 env_provider,
398 }
399 }
400
401 pub fn db(&self) -> &sqlx::PgPool {
402 &self.db_pool
403 }
404
405 pub fn db_conn(&self) -> DbConn<'_> {
408 DbConn::Pool(&self.db_pool)
409 }
410
411 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
412 self.auth.require_user_id()
413 }
414
415 pub fn require_subject(&self) -> crate::error::Result<&str> {
417 self.auth.require_subject()
418 }
419}
420
421impl EnvAccess for QueryContext {
422 fn env_provider(&self) -> &dyn EnvProvider {
423 self.env_provider.as_ref()
424 }
425}
426
427pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
429
430pub struct MutationContext {
432 pub auth: AuthContext,
434 pub request: RequestMetadata,
436 db_pool: sqlx::PgPool,
438 http_client: CircuitBreakerClient,
440 job_dispatch: Option<Arc<dyn JobDispatch>>,
442 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
444 env_provider: Arc<dyn EnvProvider>,
446 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
448 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
450 job_info_lookup: Option<JobInfoLookup>,
452 token_issuer: Option<Arc<dyn TokenIssuer>>,
454}
455
456impl MutationContext {
457 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
459 Self {
460 auth,
461 request,
462 db_pool,
463 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
464 job_dispatch: None,
465 workflow_dispatch: None,
466 env_provider: Arc::new(RealEnvProvider::new()),
467 tx: None,
468 outbox: None,
469 job_info_lookup: None,
470 token_issuer: None,
471 }
472 }
473
474 pub fn with_dispatch(
476 db_pool: sqlx::PgPool,
477 auth: AuthContext,
478 request: RequestMetadata,
479 http_client: CircuitBreakerClient,
480 job_dispatch: Option<Arc<dyn JobDispatch>>,
481 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
482 ) -> Self {
483 Self {
484 auth,
485 request,
486 db_pool,
487 http_client,
488 job_dispatch,
489 workflow_dispatch,
490 env_provider: Arc::new(RealEnvProvider::new()),
491 tx: None,
492 outbox: None,
493 job_info_lookup: None,
494 token_issuer: None,
495 }
496 }
497
498 pub fn with_env(
500 db_pool: sqlx::PgPool,
501 auth: AuthContext,
502 request: RequestMetadata,
503 http_client: CircuitBreakerClient,
504 job_dispatch: Option<Arc<dyn JobDispatch>>,
505 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
506 env_provider: Arc<dyn EnvProvider>,
507 ) -> Self {
508 Self {
509 auth,
510 request,
511 db_pool,
512 http_client,
513 job_dispatch,
514 workflow_dispatch,
515 env_provider,
516 tx: None,
517 outbox: None,
518 job_info_lookup: None,
519 token_issuer: None,
520 }
521 }
522
523 #[allow(clippy::type_complexity)]
525 pub fn with_transaction(
526 db_pool: sqlx::PgPool,
527 tx: Transaction<'static, Postgres>,
528 auth: AuthContext,
529 request: RequestMetadata,
530 http_client: CircuitBreakerClient,
531 job_info_lookup: JobInfoLookup,
532 ) -> (
533 Self,
534 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
535 Arc<Mutex<OutboxBuffer>>,
536 ) {
537 let tx_handle = Arc::new(AsyncMutex::new(tx));
538 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
539
540 let ctx = Self {
541 auth,
542 request,
543 db_pool,
544 http_client,
545 job_dispatch: None,
546 workflow_dispatch: None,
547 env_provider: Arc::new(RealEnvProvider::new()),
548 tx: Some(tx_handle.clone()),
549 outbox: Some(outbox.clone()),
550 job_info_lookup: Some(job_info_lookup),
551 token_issuer: None,
552 };
553
554 (ctx, tx_handle, outbox)
555 }
556
557 pub fn is_transactional(&self) -> bool {
558 self.tx.is_some()
559 }
560
561 pub fn db(&self) -> DbConn<'_> {
562 match &self.tx {
563 Some(tx) => DbConn::Transaction(tx.clone()),
564 None => DbConn::Pool(&self.db_pool),
565 }
566 }
567
568 pub fn pool(&self) -> &sqlx::PgPool {
570 &self.db_pool
571 }
572
573 pub fn http(&self) -> &reqwest::Client {
579 self.http_client.inner()
580 }
581
582 pub fn http_with_circuit_breaker(&self) -> &CircuitBreakerClient {
584 &self.http_client
585 }
586
587 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
588 self.auth.require_user_id()
589 }
590
591 pub fn require_subject(&self) -> crate::error::Result<&str> {
592 self.auth.require_subject()
593 }
594
595 pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
597 self.token_issuer = Some(issuer);
598 }
599
600 pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
615 let issuer = self.token_issuer.as_ref().ok_or_else(|| {
616 crate::error::ForgeError::Internal(
617 "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
618 .into(),
619 )
620 })?;
621 issuer.sign(claims)
622 }
623
624 pub async fn dispatch_job<T: serde::Serialize>(
626 &self,
627 job_type: &str,
628 args: T,
629 ) -> crate::error::Result<Uuid> {
630 let args_json = serde_json::to_value(args)?;
631
632 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
634 let job_info = job_info_lookup(job_type).ok_or_else(|| {
635 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
636 })?;
637
638 let pending = PendingJob {
639 id: Uuid::new_v4(),
640 job_type: job_type.to_string(),
641 args: args_json,
642 context: serde_json::json!({}),
643 owner_subject: self.auth.principal_id(),
644 priority: job_info.priority.as_i32(),
645 max_attempts: job_info.retry.max_attempts as i32,
646 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
647 };
648
649 let job_id = pending.id;
650 outbox
651 .lock()
652 .expect("outbox lock poisoned")
653 .jobs
654 .push(pending);
655 return Ok(job_id);
656 }
657
658 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
660 crate::error::ForgeError::Internal("Job dispatch not available".into())
661 })?;
662 dispatcher
663 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
664 .await
665 }
666
667 pub async fn dispatch_job_with_context<T: serde::Serialize>(
669 &self,
670 job_type: &str,
671 args: T,
672 context: serde_json::Value,
673 ) -> crate::error::Result<Uuid> {
674 let args_json = serde_json::to_value(args)?;
675
676 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
677 let job_info = job_info_lookup(job_type).ok_or_else(|| {
678 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
679 })?;
680
681 let pending = PendingJob {
682 id: Uuid::new_v4(),
683 job_type: job_type.to_string(),
684 args: args_json,
685 context,
686 owner_subject: self.auth.principal_id(),
687 priority: job_info.priority.as_i32(),
688 max_attempts: job_info.retry.max_attempts as i32,
689 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
690 };
691
692 let job_id = pending.id;
693 outbox
694 .lock()
695 .expect("outbox lock poisoned")
696 .jobs
697 .push(pending);
698 return Ok(job_id);
699 }
700
701 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
702 crate::error::ForgeError::Internal("Job dispatch not available".into())
703 })?;
704 dispatcher
705 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
706 .await
707 }
708
709 pub async fn cancel_job(
711 &self,
712 job_id: Uuid,
713 reason: Option<String>,
714 ) -> crate::error::Result<bool> {
715 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
716 crate::error::ForgeError::Internal("Job dispatch not available".into())
717 })?;
718 dispatcher.cancel(job_id, reason).await
719 }
720
721 pub async fn start_workflow<T: serde::Serialize>(
723 &self,
724 workflow_name: &str,
725 input: T,
726 ) -> crate::error::Result<Uuid> {
727 let input_json = serde_json::to_value(input)?;
728
729 if let Some(outbox) = &self.outbox {
731 let pending = PendingWorkflow {
732 id: Uuid::new_v4(),
733 workflow_name: workflow_name.to_string(),
734 input: input_json,
735 owner_subject: self.auth.principal_id(),
736 };
737
738 let workflow_id = pending.id;
739 outbox
740 .lock()
741 .expect("outbox lock poisoned")
742 .workflows
743 .push(pending);
744 return Ok(workflow_id);
745 }
746
747 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
749 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
750 })?;
751 dispatcher
752 .start_by_name(workflow_name, input_json, self.auth.principal_id())
753 .await
754 }
755}
756
757impl EnvAccess for MutationContext {
758 fn env_provider(&self) -> &dyn EnvProvider {
759 self.env_provider.as_ref()
760 }
761}
762
763#[cfg(test)]
764#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
765mod tests {
766 use super::*;
767
768 #[test]
769 fn test_auth_context_unauthenticated() {
770 let ctx = AuthContext::unauthenticated();
771 assert!(!ctx.is_authenticated());
772 assert!(ctx.user_id().is_none());
773 assert!(ctx.require_user_id().is_err());
774 }
775
776 #[test]
777 fn test_auth_context_authenticated() {
778 let user_id = Uuid::new_v4();
779 let ctx = AuthContext::authenticated(
780 user_id,
781 vec!["admin".to_string(), "user".to_string()],
782 HashMap::new(),
783 );
784
785 assert!(ctx.is_authenticated());
786 assert_eq!(ctx.user_id(), Some(user_id));
787 assert!(ctx.require_user_id().is_ok());
788 assert!(ctx.has_role("admin"));
789 assert!(ctx.has_role("user"));
790 assert!(!ctx.has_role("superadmin"));
791 assert!(ctx.require_role("admin").is_ok());
792 assert!(ctx.require_role("superadmin").is_err());
793 }
794
795 #[test]
796 fn test_auth_context_with_claims() {
797 let mut claims = HashMap::new();
798 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
799
800 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
801
802 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
803 assert!(ctx.claim("nonexistent").is_none());
804 }
805
806 #[test]
807 fn test_request_metadata() {
808 let meta = RequestMetadata::new();
809 assert!(!meta.trace_id.is_empty());
810 assert!(meta.client_ip.is_none());
811
812 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
813 assert_eq!(meta2.trace_id, "trace-123");
814 }
815}