1use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37
38use futures_core::future::BoxFuture;
39use futures_core::stream::BoxStream;
40use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
41use sqlx::{Postgres, Transaction};
42use tokio::sync::Mutex as AsyncMutex;
43use uuid::Uuid;
44
45use tracing::Instrument;
46
47use super::dispatch::{JobDispatch, WorkflowDispatch};
48use crate::auth::Claims;
49use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
50use crate::http::CircuitBreakerClient;
51use crate::job::JobInfo;
52
53pub trait TokenIssuer: Send + Sync {
58 fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
60}
61
62pub enum ForgeConn<'a> {
75 Pool(sqlx::pool::PoolConnection<Postgres>),
76 Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
77}
78
79impl std::ops::Deref for ForgeConn<'_> {
80 type Target = PgConnection;
81 fn deref(&self) -> &PgConnection {
82 match self {
83 ForgeConn::Pool(c) => c,
84 ForgeConn::Tx(g) => g,
85 }
86 }
87}
88
89impl std::ops::DerefMut for ForgeConn<'_> {
90 fn deref_mut(&mut self) -> &mut PgConnection {
91 match self {
92 ForgeConn::Pool(c) => c,
93 ForgeConn::Tx(g) => g,
94 }
95 }
96}
97
98#[derive(Clone)]
110pub struct ForgeDb(sqlx::PgPool);
111
112impl std::fmt::Debug for ForgeDb {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_tuple("ForgeDb").finish()
115 }
116}
117
118impl ForgeDb {
119 pub fn from_pool(pool: &sqlx::PgPool) -> Self {
121 Self(pool.clone())
122 }
123}
124
125fn sql_operation(sql: &str) -> &'static str {
126 let bytes = sql.trim_start().as_bytes();
127 match bytes.get(..6) {
128 Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
129 Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
130 Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
131 Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
132 _ => "OTHER",
133 }
134}
135
136impl sqlx::Executor<'static> for ForgeDb {
137 type Database = Postgres;
138
139 fn fetch_many<'e, 'q: 'e, E>(
140 self,
141 query: E,
142 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
143 where
144 E: sqlx::Execute<'q, Postgres> + 'q,
145 {
146 (&self.0).fetch_many(query)
147 }
148
149 fn fetch_optional<'e, 'q: 'e, E>(
150 self,
151 query: E,
152 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
153 where
154 E: sqlx::Execute<'q, Postgres> + 'q,
155 {
156 let op = sql_operation(query.sql());
157 let span =
158 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
159 Box::pin(
160 async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
161 )
162 }
163
164 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
165 where
166 E: sqlx::Execute<'q, Postgres> + 'q,
167 {
168 let op = sql_operation(query.sql());
169 let span =
170 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
171 Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
172 }
173
174 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
175 where
176 E: sqlx::Execute<'q, Postgres> + 'q,
177 {
178 let op = sql_operation(query.sql());
179 let span =
180 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
181 Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
182 }
183
184 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
185 where
186 E: sqlx::Execute<'q, Postgres> + 'q,
187 {
188 let op = sql_operation(query.sql());
189 let span =
190 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
191 Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
192 }
193
194 fn prepare_with<'e, 'q: 'e>(
195 self,
196 sql: &'q str,
197 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
198 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
199 Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
200 }
201
202 fn describe<'e, 'q: 'e>(
203 self,
204 sql: &'q str,
205 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
206 Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
207 }
208}
209
210impl std::fmt::Debug for ForgeConn<'_> {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 match self {
213 ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
214 ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
215 }
216 }
217}
218
219impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
220 type Database = Postgres;
221
222 fn fetch_many<'e, 'q: 'e, E>(
223 self,
224 query: E,
225 ) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
226 where
227 'c: 'e,
228 E: sqlx::Execute<'q, Postgres> + 'q,
229 {
230 let conn: &'e mut PgConnection = &mut *self;
231 conn.fetch_many(query)
232 }
233
234 fn fetch_optional<'e, 'q: 'e, E>(
235 self,
236 query: E,
237 ) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
238 where
239 'c: 'e,
240 E: sqlx::Execute<'q, Postgres> + 'q,
241 {
242 let op = sql_operation(query.sql());
243 let span =
244 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
245 let conn: &'e mut PgConnection = &mut *self;
246 Box::pin(conn.fetch_optional(query).instrument(span))
247 }
248
249 fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
250 where
251 'c: 'e,
252 E: sqlx::Execute<'q, Postgres> + 'q,
253 {
254 let op = sql_operation(query.sql());
255 let span =
256 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
257 let conn: &'e mut PgConnection = &mut *self;
258 Box::pin(conn.execute(query).instrument(span))
259 }
260
261 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
262 where
263 'c: 'e,
264 E: sqlx::Execute<'q, Postgres> + 'q,
265 {
266 let op = sql_operation(query.sql());
267 let span =
268 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
269 let conn: &'e mut PgConnection = &mut *self;
270 Box::pin(conn.fetch_all(query).instrument(span))
271 }
272
273 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
274 where
275 'c: 'e,
276 E: sqlx::Execute<'q, Postgres> + 'q,
277 {
278 let op = sql_operation(query.sql());
279 let span =
280 tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
281 let conn: &'e mut PgConnection = &mut *self;
282 Box::pin(conn.fetch_one(query).instrument(span))
283 }
284
285 fn prepare_with<'e, 'q: 'e>(
286 self,
287 sql: &'q str,
288 parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
289 ) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
290 where
291 'c: 'e,
292 {
293 let conn: &'e mut PgConnection = &mut *self;
294 conn.prepare_with(sql, parameters)
295 }
296
297 fn describe<'e, 'q: 'e>(
298 self,
299 sql: &'q str,
300 ) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
301 where
302 'c: 'e,
303 {
304 let conn: &'e mut PgConnection = &mut *self;
305 conn.describe(sql)
306 }
307}
308
309#[derive(Debug, Clone)]
310pub struct PendingJob {
311 pub id: Uuid,
312 pub job_type: String,
313 pub args: serde_json::Value,
314 pub context: serde_json::Value,
315 pub owner_subject: Option<String>,
316 pub priority: i32,
317 pub max_attempts: i32,
318 pub worker_capability: Option<String>,
319}
320
321#[derive(Debug, Clone)]
322pub struct PendingWorkflow {
323 pub id: Uuid,
324 pub workflow_name: String,
325 pub input: serde_json::Value,
326 pub owner_subject: Option<String>,
327}
328
329#[derive(Default)]
330pub struct OutboxBuffer {
331 pub jobs: Vec<PendingJob>,
332 pub workflows: Vec<PendingWorkflow>,
333}
334
335#[derive(Debug, Clone)]
337pub struct AuthContext {
338 user_id: Option<Uuid>,
340 roles: Vec<String>,
342 claims: HashMap<String, serde_json::Value>,
344 authenticated: bool,
346}
347
348impl AuthContext {
349 pub fn unauthenticated() -> Self {
351 Self {
352 user_id: None,
353 roles: Vec::new(),
354 claims: HashMap::new(),
355 authenticated: false,
356 }
357 }
358
359 pub fn authenticated(
361 user_id: Uuid,
362 roles: Vec<String>,
363 claims: HashMap<String, serde_json::Value>,
364 ) -> Self {
365 Self {
366 user_id: Some(user_id),
367 roles,
368 claims,
369 authenticated: true,
370 }
371 }
372
373 pub fn authenticated_without_uuid(
379 roles: Vec<String>,
380 claims: HashMap<String, serde_json::Value>,
381 ) -> Self {
382 Self {
383 user_id: None,
384 roles,
385 claims,
386 authenticated: true,
387 }
388 }
389
390 pub fn is_authenticated(&self) -> bool {
392 self.authenticated
393 }
394
395 pub fn user_id(&self) -> Option<Uuid> {
397 self.user_id
398 }
399
400 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
402 self.user_id
403 .ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
404 }
405
406 pub fn has_role(&self, role: &str) -> bool {
408 self.roles.iter().any(|r| r == role)
409 }
410
411 pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
413 if self.has_role(role) {
414 Ok(())
415 } else {
416 Err(crate::error::ForgeError::Forbidden(format!(
417 "Required role '{}' not present",
418 role
419 )))
420 }
421 }
422
423 pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
425 self.claims.get(key)
426 }
427
428 pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
430 &self.claims
431 }
432
433 pub fn roles(&self) -> &[String] {
435 &self.roles
436 }
437
438 pub fn subject(&self) -> Option<&str> {
444 self.claims.get("sub").and_then(|v| v.as_str())
445 }
446
447 pub fn require_subject(&self) -> crate::error::Result<&str> {
449 if !self.authenticated {
450 return Err(crate::error::ForgeError::Unauthorized(
451 "Authentication required".to_string(),
452 ));
453 }
454 self.subject().ok_or_else(|| {
455 crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
456 })
457 }
458
459 pub fn principal_id(&self) -> Option<String> {
463 self.subject()
464 .map(ToString::to_string)
465 .or_else(|| self.user_id.map(|id| id.to_string()))
466 }
467
468 pub fn is_admin(&self) -> bool {
470 self.roles.iter().any(|r| r == "admin")
471 }
472
473 pub fn check_identity_args(
480 &self,
481 function_name: &str,
482 args: &serde_json::Value,
483 enforce_scope: bool,
484 ) -> crate::error::Result<()> {
485 use crate::error::ForgeError;
486
487 if self.is_admin() {
488 return Ok(());
489 }
490
491 if !self.is_authenticated() && !enforce_scope {
492 return Ok(());
493 }
494
495 let Some(obj) = args.as_object() else {
496 if enforce_scope && self.is_authenticated() {
497 return Err(ForgeError::Forbidden(format!(
498 "Function '{function_name}' must include identity or tenant scope arguments"
499 )));
500 }
501 return Ok(());
502 };
503
504 let mut principal_values: Vec<String> = Vec::new();
505 if let Some(user_id) = self.user_id().map(|id| id.to_string()) {
506 principal_values.push(user_id);
507 }
508 if let Some(subject) = self.principal_id()
509 && !principal_values.iter().any(|v| v == &subject)
510 {
511 principal_values.push(subject);
512 }
513
514 let mut has_scope_key = false;
515
516 for key in [
517 "user_id",
518 "userId",
519 "owner_id",
520 "ownerId",
521 "owner_subject",
522 "ownerSubject",
523 "subject",
524 "sub",
525 "principal_id",
526 "principalId",
527 ] {
528 let Some(value) = obj.get(key) else {
529 continue;
530 };
531 has_scope_key = true;
532
533 if !self.is_authenticated() {
534 return Err(ForgeError::Unauthorized(format!(
535 "Function '{function_name}' requires authentication for identity-scoped argument '{key}'"
536 )));
537 }
538
539 let serde_json::Value::String(actual) = value else {
540 return Err(ForgeError::InvalidArgument(format!(
541 "Function '{function_name}' argument '{key}' must be a non-empty string"
542 )));
543 };
544
545 if actual.trim().is_empty() || !principal_values.iter().any(|v| v == actual) {
546 return Err(ForgeError::Forbidden(format!(
547 "Function '{function_name}' argument '{key}' does not match authenticated principal"
548 )));
549 }
550 }
551
552 for key in ["tenant_id", "tenantId"] {
553 let Some(value) = obj.get(key) else {
554 continue;
555 };
556 has_scope_key = true;
557
558 if !self.is_authenticated() {
559 return Err(ForgeError::Unauthorized(format!(
560 "Function '{function_name}' requires authentication for tenant-scoped argument '{key}'"
561 )));
562 }
563
564 let expected = self
565 .claim("tenant_id")
566 .and_then(|v| v.as_str())
567 .ok_or_else(|| {
568 ForgeError::Forbidden(format!(
569 "Function '{function_name}' argument '{key}' is not allowed for this principal"
570 ))
571 })?;
572
573 let serde_json::Value::String(actual) = value else {
574 return Err(ForgeError::InvalidArgument(format!(
575 "Function '{function_name}' argument '{key}' must be a non-empty string"
576 )));
577 };
578
579 if actual.trim().is_empty() || actual != expected {
580 return Err(ForgeError::Forbidden(format!(
581 "Function '{function_name}' argument '{key}' does not match authenticated tenant"
582 )));
583 }
584 }
585
586 if enforce_scope && self.is_authenticated() && !has_scope_key {
587 return Err(ForgeError::Forbidden(format!(
588 "Function '{function_name}' must include identity or tenant scope arguments"
589 )));
590 }
591
592 Ok(())
593 }
594}
595
596#[derive(Debug, Clone)]
598pub struct RequestMetadata {
599 pub request_id: Uuid,
601 pub trace_id: String,
603 pub client_ip: Option<String>,
605 pub user_agent: Option<String>,
607 pub timestamp: chrono::DateTime<chrono::Utc>,
609}
610
611impl RequestMetadata {
612 pub fn new() -> Self {
614 Self {
615 request_id: Uuid::new_v4(),
616 trace_id: Uuid::new_v4().to_string(),
617 client_ip: None,
618 user_agent: None,
619 timestamp: chrono::Utc::now(),
620 }
621 }
622
623 pub fn with_trace_id(trace_id: String) -> Self {
625 Self {
626 request_id: Uuid::new_v4(),
627 trace_id,
628 client_ip: None,
629 user_agent: None,
630 timestamp: chrono::Utc::now(),
631 }
632 }
633}
634
635impl Default for RequestMetadata {
636 fn default() -> Self {
637 Self::new()
638 }
639}
640
641pub struct QueryContext {
643 pub auth: AuthContext,
645 pub request: RequestMetadata,
647 db_pool: sqlx::PgPool,
649 env_provider: Arc<dyn EnvProvider>,
651}
652
653impl QueryContext {
654 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
656 Self {
657 auth,
658 request,
659 db_pool,
660 env_provider: Arc::new(RealEnvProvider::new()),
661 }
662 }
663
664 pub fn with_env(
666 db_pool: sqlx::PgPool,
667 auth: AuthContext,
668 request: RequestMetadata,
669 env_provider: Arc<dyn EnvProvider>,
670 ) -> Self {
671 Self {
672 auth,
673 request,
674 db_pool,
675 env_provider,
676 }
677 }
678
679 pub fn db(&self) -> ForgeDb {
688 ForgeDb(self.db_pool.clone())
689 }
690
691 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
692 self.auth.require_user_id()
693 }
694
695 pub fn require_subject(&self) -> crate::error::Result<&str> {
697 self.auth.require_subject()
698 }
699}
700
701impl EnvAccess for QueryContext {
702 fn env_provider(&self) -> &dyn EnvProvider {
703 self.env_provider.as_ref()
704 }
705}
706
707pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
709
710pub struct MutationContext {
712 pub auth: AuthContext,
714 pub request: RequestMetadata,
716 db_pool: sqlx::PgPool,
718 http_client: CircuitBreakerClient,
720 job_dispatch: Option<Arc<dyn JobDispatch>>,
722 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
724 env_provider: Arc<dyn EnvProvider>,
726 tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
728 outbox: Option<Arc<Mutex<OutboxBuffer>>>,
730 job_info_lookup: Option<JobInfoLookup>,
732 token_issuer: Option<Arc<dyn TokenIssuer>>,
734}
735
736impl MutationContext {
737 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
739 Self {
740 auth,
741 request,
742 db_pool,
743 http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
744 job_dispatch: None,
745 workflow_dispatch: None,
746 env_provider: Arc::new(RealEnvProvider::new()),
747 tx: None,
748 outbox: None,
749 job_info_lookup: None,
750 token_issuer: None,
751 }
752 }
753
754 pub fn with_dispatch(
756 db_pool: sqlx::PgPool,
757 auth: AuthContext,
758 request: RequestMetadata,
759 http_client: CircuitBreakerClient,
760 job_dispatch: Option<Arc<dyn JobDispatch>>,
761 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
762 ) -> Self {
763 Self {
764 auth,
765 request,
766 db_pool,
767 http_client,
768 job_dispatch,
769 workflow_dispatch,
770 env_provider: Arc::new(RealEnvProvider::new()),
771 tx: None,
772 outbox: None,
773 job_info_lookup: None,
774 token_issuer: None,
775 }
776 }
777
778 pub fn with_env(
780 db_pool: sqlx::PgPool,
781 auth: AuthContext,
782 request: RequestMetadata,
783 http_client: CircuitBreakerClient,
784 job_dispatch: Option<Arc<dyn JobDispatch>>,
785 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
786 env_provider: Arc<dyn EnvProvider>,
787 ) -> Self {
788 Self {
789 auth,
790 request,
791 db_pool,
792 http_client,
793 job_dispatch,
794 workflow_dispatch,
795 env_provider,
796 tx: None,
797 outbox: None,
798 job_info_lookup: None,
799 token_issuer: None,
800 }
801 }
802
803 #[allow(clippy::type_complexity)]
805 pub fn with_transaction(
806 db_pool: sqlx::PgPool,
807 tx: Transaction<'static, Postgres>,
808 auth: AuthContext,
809 request: RequestMetadata,
810 http_client: CircuitBreakerClient,
811 job_info_lookup: JobInfoLookup,
812 ) -> (
813 Self,
814 Arc<AsyncMutex<Transaction<'static, Postgres>>>,
815 Arc<Mutex<OutboxBuffer>>,
816 ) {
817 let tx_handle = Arc::new(AsyncMutex::new(tx));
818 let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
819
820 let ctx = Self {
821 auth,
822 request,
823 db_pool,
824 http_client,
825 job_dispatch: None,
826 workflow_dispatch: None,
827 env_provider: Arc::new(RealEnvProvider::new()),
828 tx: Some(tx_handle.clone()),
829 outbox: Some(outbox.clone()),
830 job_info_lookup: Some(job_info_lookup),
831 token_issuer: None,
832 };
833
834 (ctx, tx_handle, outbox)
835 }
836
837 pub fn is_transactional(&self) -> bool {
838 self.tx.is_some()
839 }
840
841 pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
853 match &self.tx {
854 Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
855 None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
856 }
857 }
858
859 pub fn pool(&self) -> &sqlx::PgPool {
861 &self.db_pool
862 }
863
864 pub fn http(&self) -> &reqwest::Client {
870 self.http_client.inner()
871 }
872
873 pub fn http_with_circuit_breaker(&self) -> &CircuitBreakerClient {
875 &self.http_client
876 }
877
878 pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
879 self.auth.require_user_id()
880 }
881
882 pub fn require_subject(&self) -> crate::error::Result<&str> {
883 self.auth.require_subject()
884 }
885
886 pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
888 self.token_issuer = Some(issuer);
889 }
890
891 pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
906 let issuer = self.token_issuer.as_ref().ok_or_else(|| {
907 crate::error::ForgeError::Internal(
908 "Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
909 .into(),
910 )
911 })?;
912 issuer.sign(claims)
913 }
914
915 pub async fn dispatch_job<T: serde::Serialize>(
917 &self,
918 job_type: &str,
919 args: T,
920 ) -> crate::error::Result<Uuid> {
921 let args_json = serde_json::to_value(args)?;
922
923 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
925 let job_info = job_info_lookup(job_type).ok_or_else(|| {
926 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
927 })?;
928
929 let pending = PendingJob {
930 id: Uuid::new_v4(),
931 job_type: job_type.to_string(),
932 args: args_json,
933 context: serde_json::json!({}),
934 owner_subject: self.auth.principal_id(),
935 priority: job_info.priority.as_i32(),
936 max_attempts: job_info.retry.max_attempts as i32,
937 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
938 };
939
940 let job_id = pending.id;
941 outbox
942 .lock()
943 .expect("outbox lock poisoned")
944 .jobs
945 .push(pending);
946 return Ok(job_id);
947 }
948
949 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
951 crate::error::ForgeError::Internal("Job dispatch not available".into())
952 })?;
953 dispatcher
954 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
955 .await
956 }
957
958 pub async fn dispatch_job_with_context<T: serde::Serialize>(
960 &self,
961 job_type: &str,
962 args: T,
963 context: serde_json::Value,
964 ) -> crate::error::Result<Uuid> {
965 let args_json = serde_json::to_value(args)?;
966
967 if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
968 let job_info = job_info_lookup(job_type).ok_or_else(|| {
969 crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
970 })?;
971
972 let pending = PendingJob {
973 id: Uuid::new_v4(),
974 job_type: job_type.to_string(),
975 args: args_json,
976 context,
977 owner_subject: self.auth.principal_id(),
978 priority: job_info.priority.as_i32(),
979 max_attempts: job_info.retry.max_attempts as i32,
980 worker_capability: job_info.worker_capability.map(|s| s.to_string()),
981 };
982
983 let job_id = pending.id;
984 outbox
985 .lock()
986 .expect("outbox lock poisoned")
987 .jobs
988 .push(pending);
989 return Ok(job_id);
990 }
991
992 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
993 crate::error::ForgeError::Internal("Job dispatch not available".into())
994 })?;
995 dispatcher
996 .dispatch_by_name(job_type, args_json, self.auth.principal_id())
997 .await
998 }
999
1000 pub async fn cancel_job(
1002 &self,
1003 job_id: Uuid,
1004 reason: Option<String>,
1005 ) -> crate::error::Result<bool> {
1006 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
1007 crate::error::ForgeError::Internal("Job dispatch not available".into())
1008 })?;
1009 dispatcher.cancel(job_id, reason).await
1010 }
1011
1012 pub async fn start_workflow<T: serde::Serialize>(
1014 &self,
1015 workflow_name: &str,
1016 input: T,
1017 ) -> crate::error::Result<Uuid> {
1018 let input_json = serde_json::to_value(input)?;
1019
1020 if let Some(outbox) = &self.outbox {
1022 let pending = PendingWorkflow {
1023 id: Uuid::new_v4(),
1024 workflow_name: workflow_name.to_string(),
1025 input: input_json,
1026 owner_subject: self.auth.principal_id(),
1027 };
1028
1029 let workflow_id = pending.id;
1030 outbox
1031 .lock()
1032 .expect("outbox lock poisoned")
1033 .workflows
1034 .push(pending);
1035 return Ok(workflow_id);
1036 }
1037
1038 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
1040 crate::error::ForgeError::Internal("Workflow dispatch not available".into())
1041 })?;
1042 dispatcher
1043 .start_by_name(workflow_name, input_json, self.auth.principal_id())
1044 .await
1045 }
1046}
1047
1048impl EnvAccess for MutationContext {
1049 fn env_provider(&self) -> &dyn EnvProvider {
1050 self.env_provider.as_ref()
1051 }
1052}
1053
1054#[cfg(test)]
1055#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1056mod tests {
1057 use super::*;
1058
1059 #[test]
1060 fn test_auth_context_unauthenticated() {
1061 let ctx = AuthContext::unauthenticated();
1062 assert!(!ctx.is_authenticated());
1063 assert!(ctx.user_id().is_none());
1064 assert!(ctx.require_user_id().is_err());
1065 }
1066
1067 #[test]
1068 fn test_auth_context_authenticated() {
1069 let user_id = Uuid::new_v4();
1070 let ctx = AuthContext::authenticated(
1071 user_id,
1072 vec!["admin".to_string(), "user".to_string()],
1073 HashMap::new(),
1074 );
1075
1076 assert!(ctx.is_authenticated());
1077 assert_eq!(ctx.user_id(), Some(user_id));
1078 assert!(ctx.require_user_id().is_ok());
1079 assert!(ctx.has_role("admin"));
1080 assert!(ctx.has_role("user"));
1081 assert!(!ctx.has_role("superadmin"));
1082 assert!(ctx.require_role("admin").is_ok());
1083 assert!(ctx.require_role("superadmin").is_err());
1084 }
1085
1086 #[test]
1087 fn test_auth_context_with_claims() {
1088 let mut claims = HashMap::new();
1089 claims.insert("org_id".to_string(), serde_json::json!("org-123"));
1090
1091 let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
1092
1093 assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
1094 assert!(ctx.claim("nonexistent").is_none());
1095 }
1096
1097 #[test]
1098 fn test_request_metadata() {
1099 let meta = RequestMetadata::new();
1100 assert!(!meta.trace_id.is_empty());
1101 assert!(meta.client_ip.is_none());
1102
1103 let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
1104 assert_eq!(meta2.trace_id, "trace-123");
1105 }
1106}