use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use sqlx::postgres::{PgConnection, PgQueryResult, PgRow};
use sqlx::{Postgres, Transaction};
use tokio::sync::Mutex as AsyncMutex;
use uuid::Uuid;
use tracing::Instrument;
use super::dispatch::{JobDispatch, WorkflowDispatch};
use crate::auth::Claims;
use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
use crate::http::CircuitBreakerClient;
use crate::job::JobInfo;
pub trait TokenIssuer: Send + Sync {
fn sign(&self, claims: &Claims) -> crate::error::Result<String>;
}
pub enum ForgeConn<'a> {
Pool(sqlx::pool::PoolConnection<Postgres>),
Tx(tokio::sync::MutexGuard<'a, Transaction<'static, Postgres>>),
}
impl std::ops::Deref for ForgeConn<'_> {
type Target = PgConnection;
fn deref(&self) -> &PgConnection {
match self {
ForgeConn::Pool(c) => c,
ForgeConn::Tx(g) => g,
}
}
}
impl std::ops::DerefMut for ForgeConn<'_> {
fn deref_mut(&mut self) -> &mut PgConnection {
match self {
ForgeConn::Pool(c) => c,
ForgeConn::Tx(g) => g,
}
}
}
#[derive(Clone)]
pub struct ForgeDb(sqlx::PgPool);
impl std::fmt::Debug for ForgeDb {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ForgeDb").finish()
}
}
impl ForgeDb {
pub fn from_pool(pool: &sqlx::PgPool) -> Self {
Self(pool.clone())
}
}
fn sql_operation(sql: &str) -> &'static str {
let bytes = sql.trim_start().as_bytes();
match bytes.get(..6) {
Some(prefix) if prefix.eq_ignore_ascii_case(b"select") => "SELECT",
Some(prefix) if prefix.eq_ignore_ascii_case(b"insert") => "INSERT",
Some(prefix) if prefix.eq_ignore_ascii_case(b"update") => "UPDATE",
Some(prefix) if prefix.eq_ignore_ascii_case(b"delete") => "DELETE",
_ => "OTHER",
}
}
impl sqlx::Executor<'static> for ForgeDb {
type Database = Postgres;
fn fetch_many<'e, 'q: 'e, E>(
self,
query: E,
) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
where
E: sqlx::Execute<'q, Postgres> + 'q,
{
(&self.0).fetch_many(query)
}
fn fetch_optional<'e, 'q: 'e, E>(
self,
query: E,
) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
where
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
Box::pin(
async move { sqlx::Executor::fetch_optional(&self.0, query).await }.instrument(span),
)
}
fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
where
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
Box::pin(async move { sqlx::Executor::execute(&self.0, query).await }.instrument(span))
}
fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
where
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
Box::pin(async move { sqlx::Executor::fetch_all(&self.0, query).await }.instrument(span))
}
fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
where
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
Box::pin(async move { sqlx::Executor::fetch_one(&self.0, query).await }.instrument(span))
}
fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>> {
Box::pin(async move { sqlx::Executor::prepare_with(&self.0, sql, parameters).await })
}
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>> {
Box::pin(async move { sqlx::Executor::describe(&self.0, sql).await })
}
}
pub enum DbConn<'a> {
Pool(sqlx::PgPool),
Transaction(
Arc<AsyncMutex<Transaction<'static, Postgres>>>,
&'a sqlx::PgPool,
),
}
impl DbConn<'_> {
pub async fn fetch_one<'q, O>(
&self,
query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
) -> sqlx::Result<O>
where
O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
{
match self {
DbConn::Pool(pool) => query.fetch_one(pool).await,
DbConn::Transaction(tx, _) => {
let mut guard = tx.lock().await;
query.fetch_one(&mut **guard).await
}
}
}
pub async fn fetch_optional<'q, O>(
&self,
query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
) -> sqlx::Result<Option<O>>
where
O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
{
match self {
DbConn::Pool(pool) => query.fetch_optional(pool).await,
DbConn::Transaction(tx, _) => {
let mut guard = tx.lock().await;
query.fetch_optional(&mut **guard).await
}
}
}
pub async fn fetch_all<'q, O>(
&self,
query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
) -> sqlx::Result<Vec<O>>
where
O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
{
match self {
DbConn::Pool(pool) => query.fetch_all(pool).await,
DbConn::Transaction(tx, _) => {
let mut guard = tx.lock().await;
query.fetch_all(&mut **guard).await
}
}
}
pub async fn execute<'q>(
&self,
query: sqlx::query::Query<'q, Postgres, sqlx::postgres::PgArguments>,
) -> sqlx::Result<PgQueryResult> {
match self {
DbConn::Pool(pool) => query.execute(pool).await,
DbConn::Transaction(tx, _) => {
let mut guard = tx.lock().await;
query.execute(&mut **guard).await
}
}
}
}
impl std::fmt::Debug for DbConn<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DbConn::Pool(_) => f.debug_tuple("DbConn::Pool").finish(),
DbConn::Transaction(_, _) => f.debug_tuple("DbConn::Transaction").finish(),
}
}
}
impl std::fmt::Debug for ForgeConn<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ForgeConn::Pool(_) => f.debug_tuple("ForgeConn::Pool").finish(),
ForgeConn::Tx(_) => f.debug_tuple("ForgeConn::Tx").finish(),
}
}
}
impl<'c> sqlx::Executor<'c> for &'c mut ForgeConn<'_> {
type Database = Postgres;
fn fetch_many<'e, 'q: 'e, E>(
self,
query: E,
) -> BoxStream<'e, Result<sqlx::Either<PgQueryResult, PgRow>, sqlx::Error>>
where
'c: 'e,
E: sqlx::Execute<'q, Postgres> + 'q,
{
let conn: &'e mut PgConnection = &mut *self;
conn.fetch_many(query)
}
fn fetch_optional<'e, 'q: 'e, E>(
self,
query: E,
) -> BoxFuture<'e, Result<Option<PgRow>, sqlx::Error>>
where
'c: 'e,
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
let conn: &'e mut PgConnection = &mut *self;
Box::pin(conn.fetch_optional(query).instrument(span))
}
fn execute<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgQueryResult, sqlx::Error>>
where
'c: 'e,
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
let conn: &'e mut PgConnection = &mut *self;
Box::pin(conn.execute(query).instrument(span))
}
fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Vec<PgRow>, sqlx::Error>>
where
'c: 'e,
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
let conn: &'e mut PgConnection = &mut *self;
Box::pin(conn.fetch_all(query).instrument(span))
}
fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<PgRow, sqlx::Error>>
where
'c: 'e,
E: sqlx::Execute<'q, Postgres> + 'q,
{
let op = sql_operation(query.sql());
let span =
tracing::info_span!("db.query", db.system = "postgresql", db.operation.name = op,);
let conn: &'e mut PgConnection = &mut *self;
Box::pin(conn.fetch_one(query).instrument(span))
}
fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Postgres as sqlx::Database>::TypeInfo],
) -> BoxFuture<'e, Result<<Postgres as sqlx::Database>::Statement<'q>, sqlx::Error>>
where
'c: 'e,
{
let conn: &'e mut PgConnection = &mut *self;
conn.prepare_with(sql, parameters)
}
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> BoxFuture<'e, Result<sqlx::Describe<Postgres>, sqlx::Error>>
where
'c: 'e,
{
let conn: &'e mut PgConnection = &mut *self;
conn.describe(sql)
}
}
#[derive(Debug, Clone)]
pub struct PendingJob {
pub id: Uuid,
pub job_type: String,
pub args: serde_json::Value,
pub context: serde_json::Value,
pub owner_subject: Option<String>,
pub priority: i32,
pub max_attempts: i32,
pub worker_capability: Option<String>,
}
#[derive(Debug, Clone)]
pub struct PendingWorkflow {
pub id: Uuid,
pub workflow_name: String,
pub workflow_version: String,
pub workflow_signature: String,
pub input: serde_json::Value,
pub owner_subject: Option<String>,
}
#[derive(Default)]
pub struct OutboxBuffer {
pub jobs: Vec<PendingJob>,
pub workflows: Vec<PendingWorkflow>,
}
#[derive(Debug, Clone)]
pub struct AuthContext {
user_id: Option<Uuid>,
roles: Vec<String>,
claims: HashMap<String, serde_json::Value>,
authenticated: bool,
}
impl AuthContext {
pub fn unauthenticated() -> Self {
Self {
user_id: None,
roles: Vec::new(),
claims: HashMap::new(),
authenticated: false,
}
}
pub fn authenticated(
user_id: Uuid,
roles: Vec<String>,
claims: HashMap<String, serde_json::Value>,
) -> Self {
Self {
user_id: Some(user_id),
roles,
claims,
authenticated: true,
}
}
pub fn authenticated_without_uuid(
roles: Vec<String>,
claims: HashMap<String, serde_json::Value>,
) -> Self {
Self {
user_id: None,
roles,
claims,
authenticated: true,
}
}
pub fn is_authenticated(&self) -> bool {
self.authenticated
}
pub fn user_id(&self) -> Option<Uuid> {
self.user_id
}
pub fn require_user_id(&self) -> crate::error::Result<Uuid> {
self.user_id
.ok_or_else(|| crate::error::ForgeError::Unauthorized("Authentication required".into()))
}
pub fn has_role(&self, role: &str) -> bool {
self.roles.iter().any(|r| r == role)
}
pub fn require_role(&self, role: &str) -> crate::error::Result<()> {
if self.has_role(role) {
Ok(())
} else {
Err(crate::error::ForgeError::Forbidden(format!(
"Required role '{}' not present",
role
)))
}
}
pub fn claim(&self, key: &str) -> Option<&serde_json::Value> {
self.claims.get(key)
}
pub fn claims(&self) -> &HashMap<String, serde_json::Value> {
&self.claims
}
pub fn roles(&self) -> &[String] {
&self.roles
}
pub fn subject(&self) -> Option<&str> {
self.claims.get("sub").and_then(|v| v.as_str())
}
pub fn require_subject(&self) -> crate::error::Result<&str> {
if !self.authenticated {
return Err(crate::error::ForgeError::Unauthorized(
"Authentication required".to_string(),
));
}
self.subject().ok_or_else(|| {
crate::error::ForgeError::Unauthorized("No subject claim in token".to_string())
})
}
pub fn principal_id(&self) -> Option<String> {
self.subject()
.map(ToString::to_string)
.or_else(|| self.user_id.map(|id| id.to_string()))
}
pub fn is_admin(&self) -> bool {
self.roles.iter().any(|r| r == "admin")
}
pub fn tenant_id(&self) -> Option<uuid::Uuid> {
self.claims
.get("tenant_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
}
}
#[derive(Debug, Clone)]
pub struct RequestMetadata {
pub request_id: Uuid,
pub trace_id: String,
pub client_ip: Option<String>,
pub user_agent: Option<String>,
pub correlation_id: Option<String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl RequestMetadata {
pub fn new() -> Self {
Self {
request_id: Uuid::new_v4(),
trace_id: Uuid::new_v4().to_string(),
client_ip: None,
user_agent: None,
correlation_id: None,
timestamp: chrono::Utc::now(),
}
}
pub fn with_trace_id(trace_id: String) -> Self {
Self {
request_id: Uuid::new_v4(),
trace_id,
client_ip: None,
user_agent: None,
correlation_id: None,
timestamp: chrono::Utc::now(),
}
}
}
impl Default for RequestMetadata {
fn default() -> Self {
Self::new()
}
}
pub struct QueryContext {
pub auth: AuthContext,
pub request: RequestMetadata,
db_pool: sqlx::PgPool,
env_provider: Arc<dyn EnvProvider>,
}
impl QueryContext {
pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
Self {
auth,
request,
db_pool,
env_provider: Arc::new(RealEnvProvider::new()),
}
}
pub fn with_env(
db_pool: sqlx::PgPool,
auth: AuthContext,
request: RequestMetadata,
env_provider: Arc<dyn EnvProvider>,
) -> Self {
Self {
auth,
request,
db_pool,
env_provider,
}
}
pub fn db(&self) -> ForgeDb {
ForgeDb(self.db_pool.clone())
}
pub fn db_conn(&self) -> DbConn<'_> {
DbConn::Pool(self.db_pool.clone())
}
pub fn user_id(&self) -> crate::error::Result<Uuid> {
self.auth.require_user_id()
}
pub fn tenant_id(&self) -> Option<Uuid> {
self.auth.tenant_id()
}
}
impl EnvAccess for QueryContext {
fn env_provider(&self) -> &dyn EnvProvider {
self.env_provider.as_ref()
}
}
pub type JobInfoLookup = Arc<dyn Fn(&str) -> Option<JobInfo> + Send + Sync>;
#[derive(Debug, Clone)]
pub struct AuthTokenTtl {
pub access_token_secs: i64,
pub refresh_token_days: i64,
}
impl Default for AuthTokenTtl {
fn default() -> Self {
Self {
access_token_secs: 3600,
refresh_token_days: 30,
}
}
}
pub struct MutationContext {
pub auth: AuthContext,
pub request: RequestMetadata,
db_pool: sqlx::PgPool,
http_client: CircuitBreakerClient,
http_timeout: Option<Duration>,
job_dispatch: Option<Arc<dyn JobDispatch>>,
workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
env_provider: Arc<dyn EnvProvider>,
tx: Option<Arc<AsyncMutex<Transaction<'static, Postgres>>>>,
outbox: Option<Arc<Mutex<OutboxBuffer>>>,
job_info_lookup: Option<JobInfoLookup>,
token_issuer: Option<Arc<dyn TokenIssuer>>,
token_ttl: AuthTokenTtl,
}
impl MutationContext {
pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
Self {
auth,
request,
db_pool,
http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
http_timeout: None,
job_dispatch: None,
workflow_dispatch: None,
env_provider: Arc::new(RealEnvProvider::new()),
tx: None,
outbox: None,
job_info_lookup: None,
token_issuer: None,
token_ttl: AuthTokenTtl::default(),
}
}
pub fn with_dispatch(
db_pool: sqlx::PgPool,
auth: AuthContext,
request: RequestMetadata,
http_client: CircuitBreakerClient,
job_dispatch: Option<Arc<dyn JobDispatch>>,
workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
) -> Self {
Self {
auth,
request,
db_pool,
http_client,
http_timeout: None,
job_dispatch,
workflow_dispatch,
env_provider: Arc::new(RealEnvProvider::new()),
tx: None,
outbox: None,
job_info_lookup: None,
token_issuer: None,
token_ttl: AuthTokenTtl::default(),
}
}
pub fn with_env(
db_pool: sqlx::PgPool,
auth: AuthContext,
request: RequestMetadata,
http_client: CircuitBreakerClient,
job_dispatch: Option<Arc<dyn JobDispatch>>,
workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
env_provider: Arc<dyn EnvProvider>,
) -> Self {
Self {
auth,
request,
db_pool,
http_client,
http_timeout: None,
job_dispatch,
workflow_dispatch,
env_provider,
tx: None,
outbox: None,
job_info_lookup: None,
token_issuer: None,
token_ttl: AuthTokenTtl::default(),
}
}
#[allow(clippy::type_complexity)]
pub fn with_transaction(
db_pool: sqlx::PgPool,
tx: Transaction<'static, Postgres>,
auth: AuthContext,
request: RequestMetadata,
http_client: CircuitBreakerClient,
job_info_lookup: JobInfoLookup,
) -> (
Self,
Arc<AsyncMutex<Transaction<'static, Postgres>>>,
Arc<Mutex<OutboxBuffer>>,
) {
let tx_handle = Arc::new(AsyncMutex::new(tx));
let outbox = Arc::new(Mutex::new(OutboxBuffer::default()));
let ctx = Self {
auth,
request,
db_pool,
http_client,
http_timeout: None,
job_dispatch: None,
workflow_dispatch: None,
env_provider: Arc::new(RealEnvProvider::new()),
tx: Some(tx_handle.clone()),
outbox: Some(outbox.clone()),
job_info_lookup: Some(job_info_lookup),
token_issuer: None,
token_ttl: AuthTokenTtl::default(),
};
(ctx, tx_handle, outbox)
}
pub fn is_transactional(&self) -> bool {
self.tx.is_some()
}
pub async fn conn(&self) -> sqlx::Result<ForgeConn<'_>> {
match &self.tx {
Some(tx) => Ok(ForgeConn::Tx(tx.lock().await)),
None => Ok(ForgeConn::Pool(self.db_pool.acquire().await?)),
}
}
pub fn pool(&self) -> &sqlx::PgPool {
&self.db_pool
}
pub fn db(&self) -> DbConn<'_> {
match &self.tx {
Some(tx) => DbConn::Transaction(tx.clone(), &self.db_pool),
None => DbConn::Pool(self.db_pool.clone()),
}
}
pub fn db_conn(&self) -> DbConn<'_> {
self.db()
}
pub fn http(&self) -> crate::http::HttpClient {
self.http_client.with_timeout(self.http_timeout)
}
pub fn raw_http(&self) -> &reqwest::Client {
self.http_client.inner()
}
pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
self.http_timeout = timeout;
}
pub fn user_id(&self) -> crate::error::Result<Uuid> {
self.auth.require_user_id()
}
pub fn tenant_id(&self) -> Option<Uuid> {
self.auth.tenant_id()
}
pub fn set_token_issuer(&mut self, issuer: Arc<dyn TokenIssuer>) {
self.token_issuer = Some(issuer);
}
pub fn set_token_ttl(&mut self, ttl: AuthTokenTtl) {
self.token_ttl = ttl;
}
pub fn issue_token(&self, claims: &Claims) -> crate::error::Result<String> {
let issuer = self.token_issuer.as_ref().ok_or_else(|| {
crate::error::ForgeError::Internal(
"Token issuer not available. Configure [auth] with an HMAC algorithm in forge.toml"
.into(),
)
})?;
issuer.sign(claims)
}
pub async fn issue_token_pair(
&self,
user_id: Uuid,
roles: &[&str],
) -> crate::error::Result<crate::auth::TokenPair> {
let issuer = self.token_issuer.clone().ok_or_else(|| {
crate::error::ForgeError::Internal(
"Token issuer not available. Configure [auth] in forge.toml".into(),
)
})?;
let access_ttl = self.token_ttl.access_token_secs;
let refresh_ttl = self.token_ttl.refresh_token_days;
crate::auth::tokens::issue_token_pair(
&self.db_pool,
user_id,
roles,
access_ttl,
refresh_ttl,
move |uid, r, ttl| {
let claims = Claims::builder()
.subject(uid)
.roles(r.iter().map(|s| s.to_string()).collect())
.duration_secs(ttl)
.build()
.map_err(crate::error::ForgeError::Internal)?;
issuer.sign(&claims)
},
)
.await
}
pub async fn rotate_refresh_token(
&self,
old_refresh_token: &str,
) -> crate::error::Result<crate::auth::TokenPair> {
let issuer = self.token_issuer.clone().ok_or_else(|| {
crate::error::ForgeError::Internal(
"Token issuer not available. Configure [auth] in forge.toml".into(),
)
})?;
let access_ttl = self.token_ttl.access_token_secs;
let refresh_ttl = self.token_ttl.refresh_token_days;
crate::auth::tokens::rotate_refresh_token(
&self.db_pool,
old_refresh_token,
&["user"],
access_ttl,
refresh_ttl,
move |uid, r, ttl| {
let claims = Claims::builder()
.subject(uid)
.roles(r.iter().map(|s| s.to_string()).collect())
.duration_secs(ttl)
.build()
.map_err(crate::error::ForgeError::Internal)?;
issuer.sign(&claims)
},
)
.await
}
pub async fn revoke_refresh_token(&self, refresh_token: &str) -> crate::error::Result<()> {
crate::auth::tokens::revoke_refresh_token(&self.db_pool, refresh_token).await
}
pub async fn revoke_all_refresh_tokens(&self, user_id: Uuid) -> crate::error::Result<()> {
crate::auth::tokens::revoke_all_refresh_tokens(&self.db_pool, user_id).await
}
pub async fn dispatch_job<T: serde::Serialize>(
&self,
job_type: &str,
args: T,
) -> crate::error::Result<Uuid> {
let args_json = serde_json::to_value(args)?;
if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
let job_info = job_info_lookup(job_type).ok_or_else(|| {
crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
})?;
let pending = PendingJob {
id: Uuid::new_v4(),
job_type: job_type.to_string(),
args: args_json,
context: serde_json::json!({}),
owner_subject: self.auth.principal_id(),
priority: job_info.priority.as_i32(),
max_attempts: job_info.retry.max_attempts as i32,
worker_capability: job_info.worker_capability.map(|s| s.to_string()),
};
let job_id = pending.id;
outbox
.lock()
.expect("outbox lock poisoned")
.jobs
.push(pending);
return Ok(job_id);
}
let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
crate::error::ForgeError::Internal("Job dispatch not available".into())
})?;
dispatcher
.dispatch_by_name(job_type, args_json, self.auth.principal_id())
.await
}
pub async fn dispatch_job_with_context<T: serde::Serialize>(
&self,
job_type: &str,
args: T,
context: serde_json::Value,
) -> crate::error::Result<Uuid> {
let args_json = serde_json::to_value(args)?;
if let (Some(outbox), Some(job_info_lookup)) = (&self.outbox, &self.job_info_lookup) {
let job_info = job_info_lookup(job_type).ok_or_else(|| {
crate::error::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
})?;
let pending = PendingJob {
id: Uuid::new_v4(),
job_type: job_type.to_string(),
args: args_json,
context,
owner_subject: self.auth.principal_id(),
priority: job_info.priority.as_i32(),
max_attempts: job_info.retry.max_attempts as i32,
worker_capability: job_info.worker_capability.map(|s| s.to_string()),
};
let job_id = pending.id;
outbox
.lock()
.expect("outbox lock poisoned")
.jobs
.push(pending);
return Ok(job_id);
}
let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
crate::error::ForgeError::Internal("Job dispatch not available".into())
})?;
dispatcher
.dispatch_by_name(job_type, args_json, self.auth.principal_id())
.await
}
pub async fn cancel_job(
&self,
job_id: Uuid,
reason: Option<String>,
) -> crate::error::Result<bool> {
let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
crate::error::ForgeError::Internal("Job dispatch not available".into())
})?;
dispatcher.cancel(job_id, reason).await
}
pub async fn start_workflow<T: serde::Serialize>(
&self,
workflow_name: &str,
input: T,
) -> crate::error::Result<Uuid> {
let input_json = serde_json::to_value(input)?;
if let Some(outbox) = &self.outbox {
let info = self
.workflow_dispatch
.as_ref()
.and_then(|d| d.get_info(workflow_name))
.ok_or_else(|| {
crate::error::ForgeError::NotFound(format!(
"No active version of workflow '{}'",
workflow_name
))
})?;
let pending = PendingWorkflow {
id: Uuid::new_v4(),
workflow_name: workflow_name.to_string(),
workflow_version: info.version.to_string(),
workflow_signature: info.signature.to_string(),
input: input_json,
owner_subject: self.auth.principal_id(),
};
let workflow_id = pending.id;
outbox
.lock()
.expect("outbox lock poisoned")
.workflows
.push(pending);
return Ok(workflow_id);
}
let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
crate::error::ForgeError::Internal("Workflow dispatch not available".into())
})?;
dispatcher
.start_by_name(workflow_name, input_json, self.auth.principal_id())
.await
}
}
impl EnvAccess for MutationContext {
fn env_provider(&self) -> &dyn EnvProvider {
self.env_provider.as_ref()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
use super::*;
#[test]
fn test_auth_context_unauthenticated() {
let ctx = AuthContext::unauthenticated();
assert!(!ctx.is_authenticated());
assert!(ctx.user_id().is_none());
assert!(ctx.require_user_id().is_err());
}
#[test]
fn test_auth_context_authenticated() {
let user_id = Uuid::new_v4();
let ctx = AuthContext::authenticated(
user_id,
vec!["admin".to_string(), "user".to_string()],
HashMap::new(),
);
assert!(ctx.is_authenticated());
assert_eq!(ctx.user_id(), Some(user_id));
assert!(ctx.require_user_id().is_ok());
assert!(ctx.has_role("admin"));
assert!(ctx.has_role("user"));
assert!(!ctx.has_role("superadmin"));
assert!(ctx.require_role("admin").is_ok());
assert!(ctx.require_role("superadmin").is_err());
}
#[test]
fn test_auth_context_with_claims() {
let mut claims = HashMap::new();
claims.insert("org_id".to_string(), serde_json::json!("org-123"));
let ctx = AuthContext::authenticated(Uuid::new_v4(), vec![], claims);
assert_eq!(ctx.claim("org_id"), Some(&serde_json::json!("org-123")));
assert!(ctx.claim("nonexistent").is_none());
}
#[test]
fn test_request_metadata() {
let meta = RequestMetadata::new();
assert!(!meta.trace_id.is_empty());
assert!(meta.client_ip.is_none());
let meta2 = RequestMetadata::with_trace_id("trace-123".to_string());
assert_eq!(meta2.trace_id, "trace-123");
}
}