use std::time::Instant;
use axum::{
extract::State,
http::{HeaderMap, StatusCode},
Extension, Json,
};
use chrono::Utc;
use gradatum_acl_policy::{AclDecision, AclOp};
use gradatum_core::audit::http::{HttpAuditActor, HttpAuditEvent};
use gradatum_core::trust::TrustContext;
use gradatum_core::{
CurateSpec, Job, JobClass, JobLifecycle, JobLineage, JobMode, JobPriority, JobRecord, JobRetry,
JobScheduling, JobScope, JobSpec, JobStatus, TriggerSource,
};
use gradatum_queue::NewJob;
use ulid::Ulid;
use crate::api_v1::dto::{
EnqueuedResponse, EnqueuedResponseUlid, VaultClassifyRequest, VaultDowngradeRequest,
VaultWriteRequest,
};
use crate::state::AppState;
fn parse_sha256_hex(hex: &str) -> Option<[u8; 32]> {
if hex.len() != 64 {
return None;
}
let mut bytes = [0u8; 32];
for (i, chunk) in hex.as_bytes().chunks(2).enumerate() {
let hi = (chunk[0] as char).to_digit(16)?;
let lo = (chunk[1] as char).to_digit(16)?;
bytes[i] = (hi * 16 + lo) as u8;
}
Some(bytes)
}
fn build_curate_job_record(req: &VaultWriteRequest) -> (JobRecord, Ulid) {
let now = Utc::now();
let note_id = Ulid::new();
let class = JobClass::Agent;
let expected_sha256: Option<[u8; 32]> =
req.expected_sha256.as_deref().and_then(parse_sha256_hex);
let record = JobRecord {
id: Ulid::new(),
spec: JobSpec {
kind: Job::Curate(CurateSpec {
note_id,
tenant_id: req.tenant_id.clone(),
title: Some(req.title.clone()),
body: Some(req.body.clone()),
author: req.author.clone(),
tags: req.tags.clone(),
section_hint: req.section_hint.clone(),
expected_sha256,
}),
class,
mode: JobMode::Batch,
scope: JobScope::VaultWide,
priority: JobPriority::High,
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Pending,
created_at: now,
started_at: None,
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry::default(),
lineage: JobLineage {
triggered_by: None,
parent_job: None,
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
};
(record, note_id)
}
fn extract_request_id(headers: &HeaderMap) -> String {
headers
.get("X-Request-ID")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| ulid::Ulid::new().to_string())
}
fn actor_from_trust(trust: &TrustContext) -> HttpAuditActor {
match trust {
TrustContext::BearerToken { kid, sub, aud, .. } => HttpAuditActor {
kid: kid.clone(),
sub: sub.clone(),
aud: aud.clone(),
},
TrustContext::Mtls { cn, .. } => HttpAuditActor {
kid: format!("mtls:{cn}"),
sub: cn.clone(),
aud: "gradatum".into(),
},
TrustContext::Studio { user, .. } => HttpAuditActor {
kid: "studio".into(),
sub: user.clone(),
aud: "gradatum-studio".into(),
},
TrustContext::Unauthenticated => HttpAuditActor {
kid: String::new(),
sub: String::new(),
aud: String::new(),
},
}
}
async fn emit_auth_failure_audit(
state: &AppState,
trust: &TrustContext,
tenant_id: &str,
request_id: &str,
error_msg: &str,
) {
let evt = HttpAuditEvent {
ts: chrono::Utc::now(),
event: "auth_failure".into(),
actor: actor_from_trust(trust),
tenant_id: tenant_id.into(),
locus: format!("{}/main", tenant_id),
note_id: None,
content_hash: None,
outcome: "denied".into(),
curator: None,
request_id: request_id.into(),
};
if let Err(e) = state.audit.record(evt).await {
tracing::warn!(error = %e, error_msg = error_msg, "audit emit auth_failure échoué");
}
}
pub async fn vault_write(
State(state): State<AppState>,
Extension(trust): Extension<TrustContext>,
headers: HeaderMap,
Json(req): Json<VaultWriteRequest>,
) -> Result<(StatusCode, Json<EnqueuedResponseUlid>), StatusCode> {
let start = Instant::now();
let request_id = extract_request_id(&headers);
if !trust.is_authenticated() {
emit_auth_failure_audit(
&state,
&trust,
&req.tenant_id,
&request_id,
"unauthenticated",
)
.await;
return Err(StatusCode::UNAUTHORIZED);
}
let locus = format!("{}/main", req.tenant_id);
if state.acl.evaluate(&trust, AclOp::Write, &locus) != AclDecision::Allow {
emit_auth_failure_audit(&state, &trust, &req.tenant_id, &request_id, "acl_deny").await;
return Err(StatusCode::FORBIDDEN);
}
let (record, _note_id_prealloc) = build_curate_job_record(&req);
let job_ulid = state
.job_store
.enqueue(record)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_id_str = job_ulid.to_string();
let duration_ms = start.elapsed().as_millis() as i64;
let audit_evt = HttpAuditEvent {
ts: chrono::Utc::now(),
event: "vault_write".into(),
actor: actor_from_trust(&trust),
tenant_id: req.tenant_id.clone(),
locus: locus.clone(),
note_id: None,
content_hash: None,
outcome: "queued".into(),
curator: Some(serde_json::json!({ "job_id": job_id_str, "duration_ms": duration_ms })),
request_id: request_id.clone(),
};
if let Err(e) = state.audit.record(audit_evt).await {
tracing::warn!(error = %e, "audit emit vault_write échoué — non fatal");
}
Ok((
StatusCode::ACCEPTED,
Json(EnqueuedResponseUlid {
job_id: job_id_str,
status: "queued",
poll_url: format!("/api/v1/jobs/{job_ulid}/v2"),
}),
))
}
pub async fn vault_classify(
State(state): State<AppState>,
Extension(trust): Extension<TrustContext>,
headers: HeaderMap,
Json(req): Json<VaultClassifyRequest>,
) -> Result<(StatusCode, Json<EnqueuedResponse>), StatusCode> {
let start = Instant::now();
let request_id = extract_request_id(&headers);
if !trust.is_authenticated() {
emit_auth_failure_audit(
&state,
&trust,
&req.tenant_id,
&request_id,
"unauthenticated",
)
.await;
return Err(StatusCode::UNAUTHORIZED);
}
let locus = format!("{}/main", req.tenant_id);
if state.acl.evaluate(&trust, AclOp::Write, &locus) != AclDecision::Allow {
emit_auth_failure_audit(&state, &trust, &req.tenant_id, &request_id, "acl_deny").await;
return Err(StatusCode::FORBIDDEN);
}
let payload = bincode::serde::encode_to_vec(&req, bincode::config::standard())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_id = state
.queue
.enqueue(NewJob {
tenant_id: req.tenant_id.clone(),
kind: "classify".to_string(),
payload,
max_attempts: 5,
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let duration_ms = start.elapsed().as_millis() as i64;
let audit_evt = HttpAuditEvent {
ts: chrono::Utc::now(),
event: "vault_classify".into(),
actor: actor_from_trust(&trust),
tenant_id: req.tenant_id.clone(),
locus: locus.clone(),
note_id: Some(req.note_id.clone()),
content_hash: None,
outcome: "queued".into(),
curator: Some(serde_json::json!({ "job_id": job_id, "duration_ms": duration_ms })),
request_id,
};
if let Err(e) = state.audit.record(audit_evt).await {
tracing::warn!(error = %e, "audit emit vault_classify échoué — non fatal");
}
Ok((
StatusCode::ACCEPTED,
Json(EnqueuedResponse {
job_id,
status: "queued",
poll_url: format!("/api/v1/jobs/{job_id}"),
}),
))
}
#[allow(dead_code)] pub async fn vault_downgrade(
State(state): State<AppState>,
Extension(trust): Extension<TrustContext>,
headers: HeaderMap,
Json(req): Json<VaultDowngradeRequest>,
) -> Result<(StatusCode, Json<EnqueuedResponse>), StatusCode> {
let start = Instant::now();
let request_id = extract_request_id(&headers);
if !trust.is_authenticated() {
emit_auth_failure_audit(
&state,
&trust,
&req.tenant_id,
&request_id,
"unauthenticated",
)
.await;
return Err(StatusCode::UNAUTHORIZED);
}
let locus = format!("{}/main", req.tenant_id);
if state.acl.evaluate(&trust, AclOp::Write, &locus) != AclDecision::Allow {
emit_auth_failure_audit(&state, &trust, &req.tenant_id, &request_id, "acl_deny").await;
return Err(StatusCode::FORBIDDEN);
}
let payload = bincode::serde::encode_to_vec(&req, bincode::config::standard())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_id = state
.queue
.enqueue(NewJob {
tenant_id: req.tenant_id.clone(),
kind: "downgrade".to_string(),
payload,
max_attempts: 5,
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let duration_ms = start.elapsed().as_millis() as i64;
let audit_evt = HttpAuditEvent {
ts: chrono::Utc::now(),
event: "vault_downgrade".into(),
actor: actor_from_trust(&trust),
tenant_id: req.tenant_id.clone(),
locus: locus.clone(),
note_id: Some(req.note_id.clone()),
content_hash: None,
outcome: "queued".into(),
curator: Some(serde_json::json!({ "job_id": job_id, "duration_ms": duration_ms })),
request_id,
};
if let Err(e) = state.audit.record(audit_evt).await {
tracing::warn!(error = %e, "audit emit vault_downgrade échoué — non fatal");
}
Ok((
StatusCode::ACCEPTED,
Json(EnqueuedResponse {
job_id,
status: "queued",
poll_url: format!("/api/v1/jobs/{job_id}"),
}),
))
}