use axum::{
extract::{Path, Query, State},
http::{header::AUTHORIZATION, HeaderMap},
Json,
};
use mlua_swarm::store::output::{OutputEvent, OutputRecord, OutputRef};
use mlua_swarm::{types::Verb, CapToken, TaskId};
use serde::{Deserialize, Serialize};
use crate::{ApiError, AppState};
#[derive(Debug, Deserialize)]
pub struct DataEmitReq {
pub task_id: String,
pub attempt: u32,
pub producer_agent: String,
pub event: OutputEvent,
#[serde(default)]
pub parent_refs: Vec<OutputRef>,
}
#[derive(Debug, Deserialize)]
pub struct DataEmitNamedReq {
pub task_id: String,
pub attempt: u32,
pub event: OutputEvent,
#[serde(default)]
pub parent_refs: Vec<OutputRef>,
}
#[derive(Debug, Serialize)]
pub struct DataEmitResp {
pub out_id: OutputRef,
}
#[derive(Debug, Deserialize)]
pub struct TokenQuery {
pub token: Option<String>,
}
pub async fn data_emit(
State(state): State<AppState>,
Query(q): Query<TokenQuery>,
headers: HeaderMap,
Json(req): Json<DataEmitReq>,
) -> Result<Json<DataEmitResp>, ApiError> {
emit_inner(&state, &headers, q.token.as_deref(), req).await
}
pub async fn data_emit_named(
State(state): State<AppState>,
Path(name): Path<String>,
Query(q): Query<TokenQuery>,
headers: HeaderMap,
Json(req): Json<DataEmitNamedReq>,
) -> Result<Json<DataEmitResp>, ApiError> {
let req = DataEmitReq {
task_id: req.task_id,
attempt: req.attempt,
producer_agent: name,
event: req.event,
parent_refs: req.parent_refs,
};
emit_inner(&state, &headers, q.token.as_deref(), req).await
}
async fn emit_inner(
state: &AppState,
headers: &HeaderMap,
query_token: Option<&str>,
req: DataEmitReq,
) -> Result<Json<DataEmitResp>, ApiError> {
let token = extract_captoken(headers, query_token)?;
let tid = TaskId(req.task_id.clone());
state
.engine
.verify_token_for_task(&token, Verb::EmitOutput, &tid)
.await
.map_err(|e| ApiError::engine(format!("data_emit verify: {e}")))?;
let out_id = state
.data_store
.append(
&req.task_id,
req.attempt,
&req.producer_agent,
req.event,
req.parent_refs,
)
.await
.map_err(|e| ApiError::engine(format!("data_emit: {e}")))?;
Ok(Json(DataEmitResp { out_id }))
}
fn extract_captoken(headers: &HeaderMap, query_token: Option<&str>) -> Result<CapToken, ApiError> {
let encoded: &str = if let Some(v) = headers.get(AUTHORIZATION) {
v.to_str()
.map_err(|_| ApiError::bad_request("invalid Authorization header encoding".into()))?
.strip_prefix("Bearer ")
.ok_or_else(|| ApiError::bad_request("Authorization must be 'Bearer <token>'".into()))?
.trim()
} else if let Some(t) = query_token {
t.trim()
} else {
return Err(ApiError::bad_request(
"missing token: pass 'Authorization: Bearer <token>' or '?token=<token>'".into(),
));
};
if encoded.is_empty() {
return Err(ApiError::bad_request("token is empty".into()));
}
CapToken::decode(encoded).map_err(|e| ApiError::bad_request(format!("invalid token: {e}")))
}
pub async fn data_get(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Json<OutputRecord>, ApiError> {
use mlua_swarm::store::output::OutputStoreError;
let id = OutputRef(key.clone());
match state.data_store.get(&id).await {
Ok(record) => Ok(Json(record)),
Err(OutputStoreError::NotFound(_)) => {
let record = state
.data_store
.get_latest_by_name(&key)
.await
.map_err(|e| match e {
OutputStoreError::NotFound(k) => {
ApiError::not_found(format!("output not found (id nor name): {k}"))
}
other => ApiError::engine(format!("data_get by name: {other}")),
})?;
Ok(Json(record))
}
Err(other) => Err(ApiError::engine(format!("data_get: {other}"))),
}
}