mod delete;
mod post;
pub(crate) use delete::execute_delete;
pub(crate) use post::execute_post;
use std::sync::atomic::Ordering;
use axum::{
body::Bytes,
http::{header, HeaderMap, HeaderValue, StatusCode},
response::IntoResponse,
};
use crate::{
auth, can_read, can_write, http_semantics as hs, is_insufficient_storage_error,
needs_write_approve, not_found, payload_too_large, server_error, storage_error, store,
to_header_map, unauthorized, world, AuthGate, Core, ErrorReason, Phase, TraceCtx, Verb,
};
pub(crate) async fn execute(
verb: Verb,
headers: HeaderMap,
body: Bytes,
tier: auth::Tier,
world: String,
core: &Core,
trace: &TraceCtx,
) -> Phase {
match verb {
Verb::Get => execute_get(headers, tier, world, core, trace).await,
Verb::Head => execute_head(headers, tier, world, core, trace).await,
Verb::Put => execute_put(headers, body, tier, world, core, trace).await,
Verb::Post => execute_post(headers, body, tier, world, core, trace).await,
Verb::Delete => execute_delete(headers, tier, world, core, trace).await,
}
}
pub(crate) async fn execute_get(
headers: HeaderMap,
tier: auth::Tier,
world: String,
core: &Core,
trace: &TraceCtx,
) -> Phase {
if !can_read(core, tier) {
return Phase::Error {
resp: unauthorized("read requires read token"),
reason: ErrorReason::Auth(AuthGate::Read),
};
}
let Some((stage, etag)) = (match core.read_world_with_etag(&world) {
Ok(current) => current,
Err(e) => {
return Phase::Error {
resp: storage_error("storage read", e),
reason: ErrorReason::StorageRead,
};
}
}) else {
return Phase::Error {
resp: not_found(),
reason: ErrorReason::NotFound,
};
};
if hs::read_not_modified(&headers, &etag) {
trace.emit_aux_kv("body_size", &stage.body.len().to_string());
return Phase::ExecutedRead(hs::not_modified(&world, &etag, &stage));
}
let mut resp_headers = vec![
(
header::CONTENT_TYPE,
HeaderValue::from_str(&stage.content_type)
.unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream")),
),
(header::ACCEPT_RANGES, HeaderValue::from_static("bytes")),
(header::ETAG, hs::etag_header(&etag)),
];
hs::apply_world_links(&world, &mut resp_headers);
hs::apply_meta_headers(&stage.headers, &mut resp_headers);
match hs::effective_range(&headers, stage.body.len(), &etag) {
Ok(Some((start, end))) => {
let chunk = stage.body[start..=end].to_vec();
resp_headers.push((
header::CONTENT_LENGTH,
HeaderValue::from_str(&chunk.len().to_string()).unwrap(),
));
resp_headers.push((
header::CONTENT_RANGE,
HeaderValue::from_str(&format!("bytes {start}-{end}/{}", stage.body.len()))
.unwrap(),
));
trace.emit_aux_kv("body_size", &chunk.len().to_string());
Phase::ExecutedRead(
(
StatusCode::PARTIAL_CONTENT,
to_header_map(resp_headers),
chunk,
)
.into_response(),
)
}
Ok(None) => {
resp_headers.push((
header::CONTENT_LENGTH,
HeaderValue::from_str(&stage.body.len().to_string()).unwrap(),
));
trace.emit_aux_kv("body_size", &stage.body.len().to_string());
Phase::ExecutedRead(
(StatusCode::OK, to_header_map(resp_headers), stage.body).into_response(),
)
}
Err(()) => Phase::Error {
resp: hs::range_not_satisfiable(stage.body.len()),
reason: ErrorReason::RangeNotSatisfiable,
},
}
}
pub(crate) async fn execute_head(
headers: HeaderMap,
tier: auth::Tier,
world: String,
core: &Core,
trace: &TraceCtx,
) -> Phase {
if !can_read(core, tier) {
return Phase::Error {
resp: unauthorized("read requires read token"),
reason: ErrorReason::Auth(AuthGate::Read),
};
}
let Some((stage, etag)) = (match core.read_world_with_etag(&world) {
Ok(current) => current,
Err(e) => {
return Phase::Error {
resp: storage_error("storage read", e),
reason: ErrorReason::StorageRead,
};
}
}) else {
return Phase::Error {
resp: not_found(),
reason: ErrorReason::NotFound,
};
};
if hs::read_not_modified(&headers, &etag) {
trace.emit_aux_kv("body_size", &stage.body.len().to_string());
return Phase::ExecutedRead(hs::not_modified(&world, &etag, &stage));
}
let mut resp_headers = vec![
(
header::CONTENT_TYPE,
HeaderValue::from_str(&stage.content_type)
.unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream")),
),
(
header::CONTENT_LENGTH,
HeaderValue::from_str(&stage.body.len().to_string()).unwrap(),
),
(header::ACCEPT_RANGES, HeaderValue::from_static("bytes")),
(header::ETAG, hs::etag_header(&etag)),
];
hs::apply_world_links(&world, &mut resp_headers);
hs::apply_meta_headers(&stage.headers, &mut resp_headers);
match hs::effective_range(&headers, stage.body.len(), &etag) {
Ok(Some((start, end))) => {
resp_headers.retain(|(name, _)| name != header::CONTENT_LENGTH);
let chunk_len = end - start + 1;
resp_headers.push((
header::CONTENT_LENGTH,
HeaderValue::from_str(&chunk_len.to_string()).unwrap(),
));
resp_headers.push((
header::CONTENT_RANGE,
HeaderValue::from_str(&format!("bytes {start}-{end}/{}", stage.body.len()))
.unwrap(),
));
trace.emit_aux_kv("body_size", &chunk_len.to_string());
Phase::ExecutedRead(
(StatusCode::PARTIAL_CONTENT, to_header_map(resp_headers), "").into_response(),
)
}
Ok(None) => {
trace.emit_aux_kv("body_size", &stage.body.len().to_string());
Phase::ExecutedRead((StatusCode::OK, to_header_map(resp_headers), "").into_response())
}
Err(()) => Phase::Error {
resp: hs::range_not_satisfiable(stage.body.len()),
reason: ErrorReason::RangeNotSatisfiable,
},
}
}
pub(crate) async fn execute_put(
headers: HeaderMap,
body: Bytes,
tier: auth::Tier,
world: String,
core: &Core,
trace: &TraceCtx,
) -> Phase {
if !can_write(&world, tier) {
let gate = if needs_write_approve(&world) {
AuthGate::WriteApprove
} else {
AuthGate::Write
};
return Phase::Error {
resp: unauthorized("write requires token; system worlds need approve token"),
reason: ErrorReason::Auth(gate),
};
}
if body.len() > core.max_world_bytes {
return Phase::Error {
resp: payload_too_large(core.max_world_bytes),
reason: ErrorReason::PayloadTooLarge,
};
}
let content_type = hs::request_content_type(&headers);
let meta = hs::request_meta_headers(
&headers,
&core.persist_header_allowlist,
&core.persist_header_user_deny,
);
let _write_guard = core.acquire_world_lock(&world).await;
trace.emit_aux("lock_acquired");
core.clear_tombstone(&world);
if let Err(resp) = hs::check_write_preconditions(core, &world, &headers) {
let reason = if resp.status() == StatusCode::PRECONDITION_FAILED {
ErrorReason::PreconditionFailed
} else {
ErrorReason::StorageRead
};
return Phase::Error { resp, reason };
}
let (existed, etag) = if store::is_persistent(&world) {
let prev_len_opt = match world::body_len(&core.data, &world) {
Ok(v) => v,
Err(e) => {
return Phase::Error {
resp: storage_error("storage metadata", e),
reason: ErrorReason::StorageRead,
};
}
};
let existed = prev_len_opt.is_some();
let prev_len = prev_len_opt.unwrap_or(0);
if let Some(quota) = core.max_storage_bytes {
let used = core.storage_body_bytes.load(Ordering::Relaxed);
trace.emit_aux_kv("quota_check", &format!("used={used} quota={quota}"));
}
if let Err(boxed) = core.reserve_storage(prev_len, body.len()) {
return Phase::Error {
resp: *boxed,
reason: ErrorReason::QuotaExceeded,
};
}
match world::write_with_audit_checked(
&core.data,
&world,
&body,
&content_type,
&meta,
&core.hmac_key,
None,
) {
Ok(result) => {
if !existed {
core.durable_world_count.fetch_add(1, Ordering::Relaxed);
}
let etag = hs::hmac_etag(&result.hmac);
trace.emit_aux_kv("sqlite_committed", &format!("etag={}", etag_preview(&etag)));
(existed, etag)
}
Err(world::WriteAuditError::Quota { .. }) => {
core.rollback_storage_reservation(prev_len, body.len());
return Phase::Error {
resp: server_error("unexpected quota error".to_string()),
reason: ErrorReason::StorageWriteAudit,
};
}
Err(world::WriteAuditError::Sqlite(e)) => {
core.rollback_storage_reservation(prev_len, body.len());
let reason = if is_insufficient_storage_error(&e) {
ErrorReason::InsufficientStorage
} else {
ErrorReason::StorageWriteAudit
};
return Phase::Error {
resp: storage_error("storage/audit", e),
reason,
};
}
}
} else {
match core
.mem
.write_with_quota(&world, &body, &content_type, &meta, core.max_memory_bytes)
{
Ok(outcome) => {
let etag = hs::body_etag(&body);
trace.emit_aux_kv("sqlite_committed", &format!("etag={}", etag_preview(&etag)));
(outcome.existed, etag)
}
Err(store::MemoryQuotaError { quota, .. }) => {
return Phase::Error {
resp: payload_too_large(quota),
reason: ErrorReason::PayloadTooLarge,
};
}
}
};
core.notify("PUT", &world, &etag);
trace.emit_aux("notify_sent");
let status = if existed {
StatusCode::OK
} else {
StatusCode::CREATED
};
let mut resp_headers = vec![(header::ETAG, hs::etag_header(&etag))];
if status == StatusCode::CREATED {
resp_headers.push((
header::LOCATION,
HeaderValue::from_str(&hs::world_url(&world))
.unwrap_or_else(|_| HeaderValue::from_static("/")),
));
}
Phase::CommittedWrite((status, to_header_map(resp_headers), "").into_response())
}
pub(in crate::handler) fn etag_preview(etag: &str) -> String {
etag.chars().take(16).collect()
}