1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
//! Glue between this crate and [`axum`].
//!
//! This module provides integration with the [Axum](https://github.com/tokio-rs/axum) web framework.
//! It enables parsing NiFi Flow Files from HTTP requests and generating Flow File responses.
//!
//! # HTTP Integration
//!
//! ## Extractors
//!
//! - [StreamedFlowFiles](crate::axum::StreamedFlowFiles) - Extract a stream of multiple flow files from a request body.
//! - [StreamedFlowFileFuture](crate::axum::StreamedFlowFileFuture) - Extract a single flow file, returning an error if zero or
//! more than one is provided.
//!
//! ## Content-Type
//!
//! Both extractors require the `Content-Type: application/flowfile-v3` header. Requests with
//! other content types will receive a `415 UNSUPPORTED_MEDIA_TYPE` response.
//!
//! ## Responses
//!
//! - [`FlowFile`](crate::FlowFile) implements `IntoResponse`, enabling direct return from handlers.
//! - [`FlowFileParsingError`] implements `IntoResponse`, returning appropriate error codes.
//!
//! # Example
//!
//! ```
//! use nifioxide::axum::StreamedFlowFiles;
//! use futures::StreamExt;
//!
//! async fn handle_multiple(mut stream: StreamedFlowFiles) -> impl axum::response::IntoResponse {
//! while let Some(result) = stream.next().await {
//! let mut ff = result.unwrap();
//! }
//! "OK"
//! }
//! ```
pub use StreamedFlowFileFuture;
pub use ;
use ;
use AsyncWrite;
use ReaderStream;
use crateFlowFileParsingError;
/// HTTP response implementation for [`FlowFileParsingError`].
///
/// Maps parsing errors to appropriate HTTP status codes:
///
/// - [`BadMagicBytes`](FlowFileParsingError::BadMagicBytes): `400 Bad Request`
/// - [`Malformed`](FlowFileParsingError::Malformed): `400 Bad Request`
/// - [`BrokenChannel`](FlowFileParsingError::BrokenChannel): `400 Bad Request`
/// - [`ContentLengthLengthMismatch`](FlowFileParsingError::ContentLengthLengthMismatch): `400 Bad Request`
/// - [`FlowFileExpected`](FlowFileParsingError::FlowFileExpected): `400 Bad Request`
/// - [`SingleFlowFileExpected`](FlowFileParsingError::SingleFlowFileExpected): `422 Unprocessable Entity`
/// - [`Io`](FlowFileParsingError::Io): `500 Internal Server Error`
/// Make an [`AsyncWrite`] that is connected to a [`axum::body::Body`].
///
/// Writing bytes into the writer will end up in the streamed response body.
///
/// This is useful when you want to process some stream of data into the response, but avoid having
/// the whole response buffered in memory. Axum needs to be given a response body so that it can
/// start pulling bytes from it onto the network connection. But we also want to keep processing
/// the data stream at the same time. We solve this by spawning a task that produces bytes into a
/// writer and returning the connected body to axum.
///
/// See [`tokio::io::simplex`] for the meaning of `max_buf_size`.
///
/// # Example
///
/// ```
/// # use tokio::io::AsyncWriteExt;
/// async fn streamed_response_body() -> impl axum::response::IntoResponse {
/// let (mut w, body) = nifioxide::axum::make_response_stream(1024);
/// tokio::spawn(async move {
/// let data = b"Hello, World!";
/// for _ in 0..10 {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// w.write_all(data.as_ref()).await?;
/// }
/// Ok::<_, tokio::io::Error>(())
/// });
/// (axum::http::StatusCode::OK, body)
/// }
/// ```