1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37
38use sqlx::postgres::{PgArguments, PgQueryResult, PgRow};
39use sqlx::{FromRow, Postgres, Transaction};
40use tokio::sync::Mutex as AsyncMutex;
41use uuid::Uuid;
42
43use tracing::Instrument;
44
45use super::dispatch::{JobDispatch, WorkflowDispatch};
46use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
47use crate::http::CircuitBreakerClient;
48use crate::job::JobInfo;
49
50pub enum DbConn<'a> {
52 Pool(&'a sqlx::PgPool),
53 Transaction(Arc<AsyncMutex<Transaction<'static, Postgres>>>),
54}
55
56impl DbConn<'_> {
57 pub async fn fetch_one<'q, O>(
58 &self,
59 query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
60 ) -> sqlx::Result<O>
61 where
62 O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
63 {
64 let span = tracing::info_span!(
65 "db.query",
66 db.system = "postgresql",
67 db.operation.name = "fetch_one",
68 );
69 async {
70 match self {
71 DbConn::Pool(pool) => query.fetch_one(*pool).await,
72 DbConn::Transaction(tx) => query.fetch_one(&mut **tx.lock().await).await,
73 }
74 }
75 .instrument(span)
76 .await
77 }
78
79 pub async fn fetch_optional<'q, O>(
80 &self,
81 query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
82 ) -> sqlx::Result<Option<O>>
83 where
84 O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
85 {
86 let span = tracing::info_span!(
87 "db.query",
88 db.system = "postgresql",
89 db.operation.name = "fetch_optional",
90 );
91 async {
92 match self {
93 DbConn::Pool(pool) => query.fetch_optional(*pool).await,
94 DbConn::Transaction(tx) => query.fetch_optional(&mut **tx.lock().await).await,
95 }
96 }
97 .instrument(span)
98 .await
99 }
100
101 pub async fn fetch_all<'q, O>(
102 &self,
103 query: sqlx::query::QueryAs<'q, Postgres, O, PgArguments>,
104 ) -> sqlx::Result<Vec<O>>
105 where
106 O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
107 {
108 let span = tracing::info_span!(
109 "db.query",
110 db.system = "postgresql",
111 db.operation.name = "fetch_all",
112 );
113 async {
114 match self {
115 DbConn::Pool(pool) => query.fetch_all(*pool).await,
116 DbConn::Transaction(tx) => query.fetch_all(&mut **tx.lock().await).await,
117 }
118 }
119 .instrument(span)
120 .await
121 }
122
123 pub async fn execute<'q>(
124 &self,
125 query: sqlx::query::Query<'q, Postgres, PgArguments>,
126 ) -> sqlx::Result<PgQueryResult> {
127 let span = tracing::info_span!(
128 "db.query",
129 db.system = "postgresql",
130 db.operation.name = "execute",
131 );
132 async {
133 match self {
134 DbConn::Pool(pool) => query.execute(*pool).await,
135 DbConn::Transaction(tx) => query.execute(&mut **tx.lock().await).await,
136 }
137 }
138 .instrument(span)
139 .await
140 }
141}
142
143#[derive(Debug, Clone)]
144pub struct PendingJob {
145 pub id: Uuid,
146 pub job_type: String,
147 pub args: serde_json::Value,
148 pub context: serde_json::Value,
149 pub owner_subject: Option<String>,
150 pub priority: i32,
151 pub max_attempts: i32,
152 pub worker_capability: Option<String>,
153}
154
155#[derive(Debug, Clone)]
156pub struct PendingWorkflow {
157 pub id: Uuid,
158 pub workflow_name: String,
159 pub input: serde_json::Value,
160 pub owner_subject: Option<String>,
161}
162
163#[derive(Default)]
164pub struct OutboxBuffer {
165 pub jobs: Vec<PendingJob>,
166 pub workflows: Vec<PendingWorkflow>,
167}
168
169#[derive(Debug, Clone)]
171pub struct AuthContext {
172 user_id: Option<Uuid>,
174 roles: Vec<String>,
176 claims: HashMap<String, serde_json::Value>,
178 authenticated: bool,
180}
181
182impl AuthContext {
183 pub fn unauthenticated() -> Self {
185 Self {
186 user_id: None,
187 roles: Vec::new(),
188 claims: HashMap::new(),
189 authenticated: false,
190 }
191 }
192
193 pub fn authenticated(
195 user_id: Uuid,
196 roles: Vec<String>,
197 claims: HashMap<String, serde_json::Value>,
198 ) -> Self {
199 Self {
200 user_id: Some(user_id),
201 roles,
202 claims,
203 authenticated: true,
204 }
205 }
206
207 pub fn authenticated_without_uuid(
213 roles: Vec<String>,
214 claims: HashMap<String, serde_json::Value>,
215 ) -> Self {
216 Self {
217 user_id: None,
218 roles,
219 claims,
220 authenticated: true,
221 }
222 }
223
224 pub fn is_authenticated(&self) -> bool {
226 self.authenticated
227 }
228
229 pub fn user_id(&self) -> Option<Uuid> {
231 self.user_id
232 }
233
234 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
236 self.user_id
237 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
238 }
239
240 pub fn has_role(&self, role: &str) -> bool {
242 self.roles.iter().any(|r| r == role)
243 }
244
245 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
247 if self.has_role(role) {
248 Ok(())
249 } else {
250 Err(crate::error::ForgeError::Forbidden(format!(
251 "Required role '{}' not present",
252 role
253 )))
254 }
255 }
256
257 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
259 self.claims.get(key)
260 }
261
262 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
264 &self.claims
265 }
266
267 pub fn roles(&self) -> &[String] {
269 &self.roles
270 }
271
272 pub fn subject(&self) -> Option<&str> {
278 self.claims.get("sub").and_then(|v| v.as_str())
279 }
280
281 pub fn require_subject(&self) -> crate::error::Result<&str> {
283 if !self.authenticated {
284 return Err(crate::error::ForgeError::Unauthorized(
285 "Authentication required".to_string(),
286 ));
287 }
288 self.subject().ok_or_else(|| {
289 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
290 })
291 }
292
293 pub fn principal_id(&self) -> Option<String> {
297 self.subject()
298 .map(ToString::to_string)
299 .or_else(|| self.user_id.map(|id| id.to_string()))
300 }
301
302 pub fn is_admin(&self) -> bool {
304 self.roles.iter().any(|r| r == "admin")
305 }
306}
307
308#[derive(Debug, Clone)]
310pub struct RequestMetadata {
311 pub request_id: Uuid,
313 pub trace_id: String,
315 pub client_ip: Option<String>,
317 pub user_agent: Option<String>,
319 pub timestamp: chrono::DateTime<chrono::Utc>,
321}
322
323impl RequestMetadata {
324 pub fn new() -> Self {
326 Self {
327 request_id: Uuid::new_v4(),
328 trace_id: Uuid::new_v4().to_string(),
329 client_ip: None,
330 user_agent: None,
331 timestamp: chrono::Utc::now(),
332 }
333 }
334
335 pub fn with_trace_id(trace_id: String) -> Self {
337 Self {
338 request_id: Uuid::new_v4(),
339 trace_id,
340 client_ip: None,
341 user_agent: None,
342 timestamp: chrono::Utc::now(),
343 }
344 }
345}
346
347impl Default for RequestMetadata {
348 fn default() -> Self {
349 Self::new()
350 }
351}
352
353pub struct QueryContext {
355 pub auth: AuthContext,
357 pub request: RequestMetadata,
359 db_pool: sqlx::PgPool,
361 env_provider: Arc<dyn EnvProvider>,
363}
364
365impl QueryContext {
366 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
368 Self {
369 auth,
370 request,
371 db_pool,
372 env_provider: Arc::new(RealEnvProvider::new()),
373 }
374 }
375
376 pub fn with_env(
378 db_pool: sqlx::PgPool,
379 auth: AuthContext,
380 request: RequestMetadata,
381 env_provider: Arc<dyn EnvProvider>,
382 ) -> Self {
383 Self {
384 auth,
385 request,
386 db_pool,
387 env_provider,
388 }
389 }
390
391 pub fn db(&self) -> &sqlx::PgPool {
392 &self.db_pool
393 }
394
395 pub fn db_conn(&self) -> DbConn<'_> {
398 DbConn::Pool(&self.db_pool)
399 }
400
401 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
402 self.auth.require_user_id()
403 }
404
405 pub fn require_subject(&self) -> crate::error::Result<&str> {
407 self.auth.require_subject()
408 }
409}
410
411impl EnvAccess for QueryContext {
412 fn env_provider(&self) -> &dyn EnvProvider {
413 self.env_provider.as_ref()
414 }
415}
416
417pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
419
420pub struct MutationContext {
422 pub auth: AuthContext,
424 pub request: RequestMetadata,
426 db_pool: sqlx::PgPool,
428 http_client: CircuitBreakerClient,
430 job_dispatch: Option<Arc<dyn JobDispatch>>,
432 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
434 env_provider: Arc<dyn EnvProvider>,
436 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
438 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
440 job_info_lookup: Option<JobInfoLookup>,
442}
443
444impl MutationContext {
445 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
447 Self {
448 auth,
449 request,
450 db_pool,
451 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
452 job_dispatch: None,
453 workflow_dispatch: None,
454 env_provider: Arc::new(RealEnvProvider::new()),
455 tx: None,
456 outbox: None,
457 job_info_lookup: None,
458 }
459 }
460
461 pub fn with_dispatch(
463 db_pool: sqlx::PgPool,
464 auth: AuthContext,
465 request: RequestMetadata,
466 http_client: CircuitBreakerClient,
467 job_dispatch: Option<Arc<dyn JobDispatch>>,
468 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
469 ) -> Self {
470 Self {
471 auth,
472 request,
473 db_pool,
474 http_client,
475 job_dispatch,
476 workflow_dispatch,
477 env_provider: Arc::new(RealEnvProvider::new()),
478 tx: None,
479 outbox: None,
480 job_info_lookup: None,
481 }
482 }
483
484 pub fn with_env(
486 db_pool: sqlx::PgPool,
487 auth: AuthContext,
488 request: RequestMetadata,
489 http_client: CircuitBreakerClient,
490 job_dispatch: Option<Arc<dyn JobDispatch>>,
491 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
492 env_provider: Arc<dyn EnvProvider>,
493 ) -> Self {
494 Self {
495 auth,
496 request,
497 db_pool,
498 http_client,
499 job_dispatch,
500 workflow_dispatch,
501 env_provider,
502 tx: None,
503 outbox: None,
504 job_info_lookup: None,
505 }
506 }
507
508 #[allow(clippy::type_complexity)]
510 pub fn with_transaction(
511 db_pool: sqlx::PgPool,
512 tx: Transaction<'static, Postgres>,
513 auth: AuthContext,
514 request: RequestMetadata,
515 http_client: CircuitBreakerClient,
516 job_info_lookup: JobInfoLookup,
517 ) -> (
518 Self,
519 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
520 Arc<Mutex<OutboxBuffer>>,
521 ) {
522 let tx_handle = Arc::new(AsyncMutex::new(tx));
523 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
524
525 let ctx = Self {
526 auth,
527 request,
528 db_pool,
529 http_client,
530 job_dispatch: None,
531 workflow_dispatch: None,
532 env_provider: Arc::new(RealEnvProvider::new()),
533 tx: Some(tx_handle.clone()),
534 outbox: Some(outbox.clone()),
535 job_info_lookup: Some(job_info_lookup),
536 };
537
538 (ctx, tx_handle, outbox)
539 }
540
541 pub fn is_transactional(&self) -> bool {
542 self.tx.is_some()
543 }
544
545 pub fn db(&self) -> DbConn<'_> {
546 match &self.tx {
547 Some(tx) => DbConn::Transaction(tx.clone()),
548 None => DbConn::Pool(&self.db_pool),
549 }
550 }
551
552 pub fn pool(&self) -> &sqlx::PgPool {
554 &self.db_pool
555 }
556
557 pub fn http(&self) -> &reqwest::Client {
563 self.http_client.inner()
564 }
565
566 pub fn http_with_circuit_breaker(&self) -> &CircuitBreakerClient {
568 &self.http_client
569 }
570
571 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
572 self.auth.require_user_id()
573 }
574
575 pub fn require_subject(&self) -> crate::error::Result<&str> {
576 self.auth.require_subject()
577 }
578
579 pub async fn dispatch_job<T: serde::Serialize>(
581 &self,
582 job_type: &str,
583 args: T,
584 ) -> crate::error::Result<Uuid> {
585 let args_json = serde_json::to_value(args)?;
586
587 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
589 let job_info = job_info_lookup(job_type).ok_or_else(|| {
590 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
591 })?;
592
593 let pending = PendingJob {
594 id: Uuid::new_v4(),
595 job_type: job_type.to_string(),
596 args: args_json,
597 context: serde_json::json!({}),
598 owner_subject: self.auth.principal_id(),
599 priority: job_info.priority.as_i32(),
600 max_attempts: job_info.retry.max_attempts as i32,
601 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
602 };
603
604 let job_id = pending.id;
605 outbox
606 .lock()
607 .expect("outbox lock poisoned")
608 .jobs
609 .push(pending);
610 return Ok(job_id);
611 }
612
613 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
615 crate::error::ForgeError::Internal("Job dispatch not available".into())
616 })?;
617 dispatcher
618 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
619 .await
620 }
621
622 pub async fn dispatch_job_with_context<T: serde::Serialize>(
624 &self,
625 job_type: &str,
626 args: T,
627 context: serde_json::Value,
628 ) -> crate::error::Result<Uuid> {
629 let args_json = serde_json::to_value(args)?;
630
631 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
632 let job_info = job_info_lookup(job_type).ok_or_else(|| {
633 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
634 })?;
635
636 let pending = PendingJob {
637 id: Uuid::new_v4(),
638 job_type: job_type.to_string(),
639 args: args_json,
640 context,
641 owner_subject: self.auth.principal_id(),
642 priority: job_info.priority.as_i32(),
643 max_attempts: job_info.retry.max_attempts as i32,
644 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
645 };
646
647 let job_id = pending.id;
648 outbox
649 .lock()
650 .expect("outbox lock poisoned")
651 .jobs
652 .push(pending);
653 return Ok(job_id);
654 }
655
656 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
657 crate::error::ForgeError::Internal("Job dispatch not available".into())
658 })?;
659 dispatcher
660 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
661 .await
662 }
663
664 pub async fn cancel_job(
666 &self,
667 job_id: Uuid,
668 reason: Option<String>,
669 ) -> crate::error::Result<bool> {
670 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
671 crate::error::ForgeError::Internal("Job dispatch not available".into())
672 })?;
673 dispatcher.cancel(job_id, reason).await
674 }
675
676 pub async fn start_workflow<T: serde::Serialize>(
678 &self,
679 workflow_name: &str,
680 input: T,
681 ) -> crate::error::Result<Uuid> {
682 let input_json = serde_json::to_value(input)?;
683
684 if let Some(outbox) = &self.outbox {
686 let pending = PendingWorkflow {
687 id: Uuid::new_v4(),
688 workflow_name: workflow_name.to_string(),
689 input: input_json,
690 owner_subject: self.auth.principal_id(),
691 };
692
693 let workflow_id = pending.id;
694 outbox
695 .lock()
696 .expect("outbox lock poisoned")
697 .workflows
698 .push(pending);
699 return Ok(workflow_id);
700 }
701
702 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
704 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
705 })?;
706 dispatcher
707 .start_by_name(workflow_name, input_json, self.auth.principal_id())
708 .await
709 }
710}
711
712impl EnvAccess for MutationContext {
713 fn env_provider(&self) -> &dyn EnvProvider {
714 self.env_provider.as_ref()
715 }
716}
717
718#[cfg(test)]
719#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_auth_context_unauthenticated() {
725 let ctx = AuthContext::unauthenticated();
726 assert!(!ctx.is_authenticated());
727 assert!(ctx.user_id().is_none());
728 assert!(ctx.require_user_id().is_err());
729 }
730
731 #[test]
732 fn test_auth_context_authenticated() {
733 let user_id = Uuid::new_v4();
734 let ctx = AuthContext::authenticated(
735 user_id,
736 vec!["admin".to_string(), "user".to_string()],
737 HashMap::new(),
738 );
739
740 assert!(ctx.is_authenticated());
741 assert_eq!(ctx.user_id(), Some(user_id));
742 assert!(ctx.require_user_id().is_ok());
743 assert!(ctx.has_role("admin"));
744 assert!(ctx.has_role("user"));
745 assert!(!ctx.has_role("superadmin"));
746 assert!(ctx.require_role("admin").is_ok());
747 assert!(ctx.require_role("superadmin").is_err());
748 }
749
750 #[test]
751 fn test_auth_context_with_claims() {
752 let mut claims = HashMap::new();
753 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
754
755 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
756
757 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
758 assert!(ctx.claim("nonexistent").is_none());
759 }
760
761 #[test]
762 fn test_request_metadata() {
763 let meta = RequestMetadata::new();
764 assert!(!meta.trace_id.is_empty());
765 assert!(meta.client_ip.is_none());
766
767 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
768 assert_eq!(meta2.trace_id, "trace-123");
769 }
770}