use axum::http::{
StatusCode,
header::{CONTENT_LENGTH, CONTENT_TYPE},
};
use futures::{FutureExt, StreamExt};
use super::streamreader::{IntoFlowFiles, StreamedFlowFile, StreamedFlowFiles};
use crate::FlowFileParsingError;
impl<S> axum::extract::FromRequest<S> for StreamedFlowFiles
where
S: Send + Sync,
{
type Rejection = <Self as TryFrom<axum::extract::Request>>::Error;
async fn from_request(req: axum::extract::Request, _: &S) -> Result<Self, Self::Rejection> {
req.try_into()
}
}
impl TryFrom<axum::extract::Request> for StreamedFlowFiles {
type Error = (StatusCode, &'static str);
fn try_from(req: axum::extract::Request) -> Result<Self, Self::Error> {
if req
.headers()
.get(CONTENT_TYPE)
.is_none_or(|value| value.to_str().unwrap_or("") != "application/flowfile-v3")
{
return Err((
StatusCode::UNSUPPORTED_MEDIA_TYPE,
"Only `Content-Type: application/flowfile-v3` is accepted.",
));
}
let maybe_content_length = req
.headers()
.get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse().ok());
let stream = req.into_body().into_data_stream();
Ok((stream, maybe_content_length).into_flow_files())
}
}
impl<S> axum::extract::FromRequest<S> for StreamedFlowFileFuture
where
S: Send + Sync,
{
type Rejection = <Self as TryFrom<axum::extract::Request>>::Error;
async fn from_request(req: axum::extract::Request, _: &S) -> Result<Self, Self::Rejection> {
req.try_into()
}
}
impl TryFrom<axum::extract::Request> for StreamedFlowFileFuture {
type Error = (StatusCode, &'static str);
fn try_from(req: axum::extract::Request) -> Result<Self, Self::Error> {
req.try_into().map(StreamedFlowFileFuture)
}
}
pub struct StreamedFlowFileFuture(StreamedFlowFiles);
impl Future for StreamedFlowFileFuture {
type Output = Result<StreamedFlowFile, FlowFileParsingError>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let Some(mut ff) = std::task::ready!(self.0.next().poll_unpin(cx)) else {
return std::task::Poll::Ready(Err(FlowFileParsingError::FlowFileExpected));
};
if !self.0.is_empty() {
return std::task::Poll::Ready(Err(FlowFileParsingError::SingleFlowFileExpected));
}
if let Ok(ff) = ff.as_mut() {
ff.disable_automatic_return_of_internal_reader();
}
std::task::Poll::Ready(ff)
}
}
#[cfg(test)]
mod tests {
use axum::{body::Body, http::Request};
use super::*;
fn make_flow_file_request(content: Vec<u8>, content_type: &str) -> Request<Body> {
Request::builder()
.header("Content-Type", content_type)
.header("Content-Length", content.len())
.body(Body::from(axum::body::Bytes::from(content)))
.unwrap()
}
#[tokio::test]
async fn flow_file_iterator_rejects_wrong_content_type() {
let req = Request::builder()
.header("Content-Type", "application/json")
.body(Body::empty())
.unwrap();
let result: Result<StreamedFlowFiles, _> = req.try_into();
assert!(result.is_err());
}
#[tokio::test]
async fn flow_file_iterator_accepts_flowfile_v3() {
let req = make_flow_file_request(vec![], "application/flowfile-v3");
let result: Result<StreamedFlowFiles, _> = req.try_into();
assert!(result.is_ok());
}
#[tokio::test]
async fn flow_file_iterator_non_empty_when_non_zero_content_length() {
let req = Request::builder()
.header("Content-Type", "application/flowfile-v3")
.header("Content-Length", "12345")
.body(Body::empty())
.unwrap();
let iter: StreamedFlowFiles = req.try_into().unwrap();
assert!(!iter.is_empty());
}
#[tokio::test]
async fn flow_file_iterator_empty_when_zero_content_length() {
let req = Request::builder()
.header("Content-Type", "application/flowfile-v3")
.header("Content-Length", "0")
.body(Body::empty())
.unwrap();
let iter: StreamedFlowFiles = req.try_into().unwrap();
assert!(iter.is_empty());
}
#[tokio::test]
async fn streamed_flow_file_future_requires_content() {
let req = Request::builder()
.header("Content-Type", "application/flowfile-v3")
.body(Body::empty())
.unwrap();
let result: Result<StreamedFlowFileFuture, _> = req.try_into();
assert!(result.is_ok());
let ff_future = result.unwrap();
let result = ff_future.await;
assert!(result.is_err());
}
}