use std::collections::HashMap;
use tokio::io::{AsyncWrite, AsyncWriteExt};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FlowFileHeader {
size: u64,
attributes: HashMap<String, String>,
}
impl FlowFileHeader {
#[must_use]
pub fn new(size: u64, attributes: HashMap<String, String>) -> Self {
Self { size, attributes }
}
#[doc(alias = "len")]
#[must_use]
pub fn size(&self) -> u64 {
self.size
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.size == 0
}
#[must_use]
pub fn attributes(&self) -> &HashMap<String, String> {
&self.attributes
}
#[must_use]
pub fn attributes_mut(&mut self) -> &mut HashMap<String, String> {
&mut self.attributes
}
}
impl<S: Into<u64>, T: Into<HashMap<String, String>>> From<(S, T)> for FlowFileHeader {
fn from((size, attributes): (S, T)) -> Self {
Self {
size: size.into(),
attributes: attributes.into(),
}
}
}
impl FlowFileHeader {
pub async fn serialize_header_into<W: AsyncWrite + Unpin>(
&self,
mut writer: W,
) -> tokio::io::Result<()> {
writer.write_all(b"NiFiFF3").await?;
write_field_length(&mut writer, self.attributes.len()).await?;
for (key, value) in &self.attributes {
write_string(&mut writer, key).await?;
write_string(&mut writer, value).await?;
}
writer.write_u64(self.size).await?;
Ok(())
}
}
async fn write_field_length<W: AsyncWrite + Unpin>(w: &mut W, len: usize) -> tokio::io::Result<()> {
let Ok(len) = u32::try_from(len) else {
return Err(tokio::io::Error::other("Field length exceeds u32::MAX"));
};
if let Ok(len) = u16::try_from(len) {
return w.write_u16(len).await;
}
w.write_u16(u16::MAX).await?;
w.write_u32(len).await
}
async fn write_string<W: AsyncWrite + Unpin>(w: &mut W, s: &str) -> tokio::io::Result<()> {
write_field_length(w, s.len()).await?;
w.write_all(s.as_bytes()).await
}