use crate::middleware::proxy_trust::ProxyTrustResult;
use crate::protocol::error::Error;
use crate::protocol::headers::{self, names};
use crate::protocol::json_mode;
use crate::protocol::offset::Offset;
use crate::protocol::problem::{ProblemResponse, Result, request_instance};
use crate::protocol::stream_name::StreamName;
use crate::router::StreamBasePath;
use crate::storage::{CreateStreamResult, CreateWithDataResult, Storage, StreamConfig};
use axum::{
Extension,
body::Body,
extract::{OriginalUri, State},
http::{HeaderMap, HeaderValue, StatusCode},
response::{IntoResponse, Response},
};
use chrono::Utc;
use std::str::FromStr;
use std::sync::Arc;
#[allow(clippy::too_many_lines)]
pub async fn create_stream<S: Storage>(
State(storage): State<Arc<S>>,
StreamName(name): StreamName,
original_uri: OriginalUri,
Extension(StreamBasePath(stream_base_path)): Extension<StreamBasePath>,
Extension(request_origin): Extension<ProxyTrustResult>,
headers: HeaderMap,
body: Body,
) -> Result<Response> {
let instance = request_instance(&original_uri);
let result = async {
let body_bytes =
axum::body::to_bytes(body, usize::MAX)
.await
.map_err(|e| Error::InvalidHeader {
header: "Content-Length".to_string(),
reason: format!("Failed to read body: {e}"),
})?;
let content_type = headers.get("content-type").and_then(|v| v.to_str().ok());
if let Some(ct) = content_type
&& ct.trim().is_empty()
{
return Err(ProblemResponse::from(Error::InvalidHeader {
header: "Content-Type".to_string(),
reason: "empty value".to_string(),
}));
}
let normalized_ct = content_type.map_or_else(
|| "application/octet-stream".to_string(),
headers::normalize_content_type,
);
let ttl_seconds =
if let Some(ttl_value) = headers.get(names::STREAM_TTL).and_then(|v| v.to_str().ok()) {
Some(headers::parse_ttl(ttl_value)?)
} else {
None
};
let expires_at = if let Some(expires_value) = headers
.get(names::STREAM_EXPIRES_AT)
.and_then(|v| v.to_str().ok())
{
Some(headers::parse_expires_at(expires_value)?)
} else {
None
};
if ttl_seconds.is_some() && expires_at.is_some() {
return Err(ProblemResponse::from(Error::ConflictingExpiration));
}
let created_closed = headers
.get(names::STREAM_CLOSED)
.and_then(|v| v.to_str().ok())
.is_some_and(headers::parse_bool);
let mut config = StreamConfig::new(normalized_ct.clone());
if let Some(ttl) = ttl_seconds {
let expires_at =
Utc::now() + chrono::Duration::seconds(i64::try_from(ttl).unwrap_or(i64::MAX));
config = config.with_expires_at(expires_at);
config = config.with_ttl(ttl);
} else if let Some(expires) = expires_at {
config = config.with_expires_at(expires);
}
if created_closed {
config = config.with_created_closed(true);
}
let forked_from = headers
.get(names::STREAM_FORKED_FROM)
.and_then(|v| v.to_str().ok())
.map(String::from);
let fork_offset_raw = headers
.get(names::STREAM_FORK_OFFSET)
.and_then(|v| v.to_str().ok())
.map(String::from);
if let Some(forked_from_value) = forked_from {
let source_name = strip_stream_base_path(&forked_from_value, &stream_base_path);
let fork_offset = if let Some(ref raw) = fork_offset_raw {
Some(Offset::from_str(raw)?)
} else {
None
};
let create_result = storage
.create_fork(&name, &source_name, fork_offset.as_ref(), config)
.map_err(ProblemResponse::from)?;
let meta = storage.head(&name)?;
let status = if matches!(create_result, CreateStreamResult::Created) {
StatusCode::CREATED
} else {
StatusCode::OK
};
let location = build_location_url(&request_origin, &stream_base_path, &name);
let mut response_headers = HeaderMap::new();
response_headers.insert("content-type", meta.config.content_type.parse().unwrap());
response_headers.insert(
names::STREAM_NEXT_OFFSET,
HeaderValue::from_bytes(meta.next_offset.as_str().as_bytes()).unwrap(),
);
response_headers.insert("location", location.parse().unwrap());
if meta.closed {
response_headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
}
return Ok((status, response_headers).into_response());
}
let messages = if body_bytes.is_empty() {
vec![]
} else if json_mode::is_json_content_type(&normalized_ct) {
json_mode::process_append(&body_bytes)?
} else {
vec![body_bytes]
};
let create_with_data_result = storage
.create_stream_with_data(&name, config, messages, created_closed)
.map_err(ProblemResponse::from)?;
let CreateWithDataResult {
status: create_status,
next_offset,
closed,
} = create_with_data_result;
let status = if matches!(create_status, CreateStreamResult::Created) {
StatusCode::CREATED
} else {
StatusCode::OK
};
let location = build_location_url(&request_origin, &stream_base_path, &name);
let mut response_headers = HeaderMap::new();
response_headers.insert("content-type", normalized_ct.parse().unwrap());
response_headers.insert(
names::STREAM_NEXT_OFFSET,
HeaderValue::from_bytes(next_offset.as_str().as_bytes()).unwrap(),
);
response_headers.insert("location", location.parse().unwrap());
if closed {
response_headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
}
Ok((status, response_headers).into_response())
}
.await;
result.map_err(|problem| problem.with_instance(instance))
}
fn strip_stream_base_path(value: &str, stream_base_path: &str) -> String {
let prefix = if stream_base_path == "/" {
"/".to_string()
} else {
format!("{stream_base_path}/")
};
if let Some(stripped) = value.strip_prefix(&prefix) {
stripped.to_string()
} else {
value.to_string()
}
}
fn build_location_url(origin: &ProxyTrustResult, stream_base_path: &str, name: &str) -> String {
let scheme = origin.scheme.as_str();
let host = origin.authority.as_deref().unwrap_or("localhost");
if stream_base_path == "/" {
format!("{scheme}://{host}/{name}")
} else {
format!("{scheme}://{host}{stream_base_path}/{name}")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_location_prefers_x_forwarded_host() {
let location = build_location_url(
&ProxyTrustResult {
peer_ip: None,
trusted: true,
scheme: "https".to_string(),
authority: Some("proxy.example.com".to_string()),
client_address: None,
},
"/v1/stream",
"orders",
);
assert_eq!(location, "https://proxy.example.com/v1/stream/orders");
}
#[test]
fn test_build_location_falls_back_to_host_and_http() {
let location = build_location_url(
&ProxyTrustResult {
peer_ip: None,
trusted: false,
scheme: "http".to_string(),
authority: Some("localhost:4437".to_string()),
client_address: None,
},
"/v1/stream",
"orders",
);
assert_eq!(location, "http://localhost:4437/v1/stream/orders");
}
#[test]
fn test_build_location_supports_custom_base_path() {
let location = build_location_url(
&ProxyTrustResult {
peer_ip: None,
trusted: false,
scheme: "http".to_string(),
authority: Some("localhost:4437".to_string()),
client_address: None,
},
"/streams",
"orders",
);
assert_eq!(location, "http://localhost:4437/streams/orders");
}
#[test]
fn test_build_location_supports_root_base_path() {
let location = build_location_url(
&ProxyTrustResult {
peer_ip: None,
trusted: false,
scheme: "http".to_string(),
authority: Some("localhost:4437".to_string()),
client_address: None,
},
"/",
"orders",
);
assert_eq!(location, "http://localhost:4437/orders");
}
}