use crate::protocol::error::Error;
use crate::protocol::headers::{self, names};
use crate::protocol::json_mode;
use crate::protocol::problem::{ProblemResponse, Result, request_instance};
use crate::protocol::producer;
use crate::protocol::stream_name::StreamName;
use crate::storage::{ProducerAppendResult, Storage};
use axum::{
body::Body,
extract::{OriginalUri, State},
http::{HeaderMap, HeaderValue, StatusCode},
response::{IntoResponse, Response},
};
use std::sync::Arc;
pub async fn append_data<S: Storage>(
State(storage): State<Arc<S>>,
StreamName(name): StreamName,
original_uri: OriginalUri,
headers: HeaderMap,
body: Body,
) -> Result<Response> {
let instance = request_instance(&original_uri);
let result = async {
let body_bytes =
axum::body::to_bytes(body, usize::MAX)
.await
.map_err(|e| Error::InvalidHeader {
header: "Content-Length".to_string(),
reason: format!("Failed to read body: {e}"),
})?;
let should_close = headers
.get(names::STREAM_CLOSED)
.and_then(|v| v.to_str().ok())
.is_some_and(headers::parse_bool);
let is_close_only = body_bytes.is_empty() && should_close;
let content_type_raw = headers.get("content-type").and_then(|v| v.to_str().ok());
let normalized_ct = if is_close_only {
content_type_raw
.map(headers::normalize_content_type)
.unwrap_or_default()
} else {
let ct = content_type_raw.ok_or_else(|| Error::InvalidHeader {
header: "Content-Type".to_string(),
reason: "missing required header".to_string(),
})?;
headers::normalize_content_type(ct)
};
let producer_headers = producer::parse_producer_headers(&headers)?;
let stream_seq = headers
.get(names::STREAM_SEQ)
.and_then(|v| v.to_str().ok())
.map(str::to_string);
if body_bytes.is_empty() && !should_close {
return Err(ProblemResponse::from(Error::EmptyBody));
}
let messages = if body_bytes.is_empty() {
vec![]
} else if json_mode::is_json_content_type(&normalized_ct) {
let parsed = json_mode::process_append(&body_bytes)?;
if parsed.is_empty() && !should_close {
return Err(ProblemResponse::from(Error::EmptyArray));
}
parsed
} else {
vec![body_bytes]
};
let seq_ref = stream_seq.as_deref();
if let Some(ref prod) = producer_headers {
handle_producer_append(
&storage,
&name,
messages,
&normalized_ct,
prod,
should_close,
is_close_only,
seq_ref,
)
} else {
handle_non_producer_append(
&storage,
&name,
messages,
&normalized_ct,
should_close,
seq_ref,
)
}
}
.await;
result.map_err(|problem| problem.with_instance(instance))
}
fn handle_non_producer_append<S: Storage>(
storage: &Arc<S>,
name: &str,
messages: Vec<bytes::Bytes>,
content_type: &str,
should_close: bool,
seq: Option<&str>,
) -> Result<Response> {
let next_offset = if messages.is_empty() {
storage.head(name)?.next_offset
} else {
match storage.batch_append(name, messages, content_type, seq) {
Ok(next_offset) => next_offset,
Err(Error::StreamClosed) => {
return Err(stream_closed_response(storage, name));
}
Err(e) => return Err(e.into()),
}
};
if should_close {
storage.close_stream(name)?;
}
let mut response_headers = HeaderMap::new();
response_headers.insert(
names::STREAM_NEXT_OFFSET,
HeaderValue::from_bytes(next_offset.as_str().as_bytes()).unwrap(),
);
if should_close {
response_headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
}
Ok((StatusCode::NO_CONTENT, response_headers).into_response())
}
#[allow(clippy::too_many_arguments)] fn handle_producer_append<S: Storage>(
storage: &Arc<S>,
name: &str,
messages: Vec<bytes::Bytes>,
content_type: &str,
producer: &producer::ProducerHeaders,
should_close: bool,
is_close_only: bool,
seq: Option<&str>,
) -> Result<Response> {
match storage.append_with_producer(name, messages, content_type, producer, should_close, seq) {
Ok(result) => {
let (status, epoch, seq, next_offset, closed) = match result {
ProducerAppendResult::Accepted {
epoch,
seq,
next_offset,
closed,
} => {
let status = if is_close_only {
StatusCode::NO_CONTENT
} else {
StatusCode::OK
};
(status, epoch, seq, next_offset, closed)
}
ProducerAppendResult::Duplicate {
epoch,
seq,
next_offset,
closed,
} => (StatusCode::NO_CONTENT, epoch, seq, next_offset, closed),
};
let mut response_headers = HeaderMap::new();
response_headers.insert(
names::STREAM_NEXT_OFFSET,
HeaderValue::from_bytes(next_offset.as_str().as_bytes()).unwrap(),
);
response_headers.insert(names::PRODUCER_EPOCH, epoch.to_string().parse().unwrap());
response_headers.insert(names::PRODUCER_SEQ, seq.to_string().parse().unwrap());
if closed {
response_headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
}
Ok((status, response_headers).into_response())
}
Err(Error::StreamClosed) => Err(stream_closed_response(storage, name)),
Err(Error::EpochFenced { current, .. }) => Err(ProblemResponse::from(Error::EpochFenced {
current,
received: producer.epoch,
})
.with_header(names::PRODUCER_EPOCH, current.to_string().parse().unwrap())),
Err(Error::SequenceGap { expected, actual }) => Err(ProblemResponse::from(
Error::SequenceGap { expected, actual },
)
.with_header(
names::PRODUCER_EXPECTED_SEQ,
expected.to_string().parse().unwrap(),
)
.with_header(
names::PRODUCER_RECEIVED_SEQ,
actual.to_string().parse().unwrap(),
)),
Err(e) => Err(e.into()),
}
}
fn stream_closed_response<S: Storage>(storage: &Arc<S>, name: &str) -> ProblemResponse {
let response = ProblemResponse::from(Error::StreamClosed)
.with_header(names::STREAM_CLOSED, "true".parse().unwrap());
if let Ok(metadata) = storage.head(name) {
response.with_header(
names::STREAM_NEXT_OFFSET,
HeaderValue::from_bytes(metadata.next_offset.as_str().as_bytes()).unwrap(),
)
} else {
response
}
}