use serde::{Deserialize, Serialize};
use crate::errors::ZerobusError;
use crate::offset_generator::OffsetId;
use crate::ZerobusResult;
pub const STREAM_READY_OFFSET: OffsetId = -1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlightBatchMetadata {
pub offset_id: OffsetId,
}
impl FlightBatchMetadata {
pub fn new(offset_id: OffsetId) -> Self {
Self { offset_id }
}
#[allow(clippy::result_large_err)]
pub fn to_bytes(&self) -> ZerobusResult<Vec<u8>> {
serde_json::to_vec(self).map_err(|e| {
ZerobusError::InvalidArgument(format!("Failed to serialize FlightBatchMetadata: {}", e))
})
}
#[cfg(test)]
#[allow(clippy::result_large_err)]
pub fn from_bytes(bytes: &[u8]) -> ZerobusResult<Self> {
if bytes.is_empty() {
return Err(ZerobusError::InvalidArgument(
"Empty app_metadata in FlightData - client must provide offset_id".to_string(),
));
}
serde_json::from_slice(bytes).map_err(|e| {
ZerobusError::InvalidArgument(format!("Failed to parse FlightBatchMetadata: {}", e))
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlightAckMetadata {
pub ack_up_to_offset: OffsetId,
pub ack_up_to_records: u64,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub close_stream_duration_ms: Option<u64>,
}
impl FlightAckMetadata {
#[allow(dead_code)]
pub fn new(ack_up_to_offset: OffsetId, ack_up_to_records: u64) -> Self {
Self {
ack_up_to_offset,
ack_up_to_records,
close_stream_duration_ms: None,
}
}
#[allow(dead_code)]
pub fn stream_ready() -> Self {
Self {
ack_up_to_offset: STREAM_READY_OFFSET,
ack_up_to_records: 0,
close_stream_duration_ms: None,
}
}
pub fn is_close_signal(&self) -> bool {
self.close_stream_duration_ms.is_some()
}
pub fn is_stream_ready(&self) -> bool {
self.ack_up_to_offset == STREAM_READY_OFFSET
}
#[allow(clippy::result_large_err)]
pub fn from_bytes(bytes: &[u8]) -> ZerobusResult<Self> {
if bytes.is_empty() {
return Err(ZerobusError::InvalidArgument(
"Empty app_metadata in PutResult - server should provide ack offset".to_string(),
));
}
serde_json::from_slice(bytes).map_err(|e| {
ZerobusError::InvalidArgument(format!("Failed to parse FlightAckMetadata: {}", e))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_flight_batch_metadata_roundtrip() {
let metadata = FlightBatchMetadata::new(42);
let bytes = metadata.to_bytes().unwrap();
let parsed = FlightBatchMetadata::from_bytes(&bytes).unwrap();
assert_eq!(parsed.offset_id, 42);
}
#[test]
fn test_flight_batch_metadata_parse() {
let json = r#"{"offset_id": 42}"#;
let parsed = FlightBatchMetadata::from_bytes(json.as_bytes()).unwrap();
assert_eq!(parsed.offset_id, 42);
}
#[test]
fn test_flight_batch_metadata_empty_error() {
let result = FlightBatchMetadata::from_bytes(&[]);
assert!(result.is_err());
}
#[test]
fn test_flight_ack_metadata_parse_with_records() {
let json = r#"{"ack_up_to_offset": 99, "ack_up_to_records": 5000}"#;
let parsed = FlightAckMetadata::from_bytes(json.as_bytes()).unwrap();
assert_eq!(parsed.ack_up_to_offset, 99);
assert_eq!(parsed.ack_up_to_records, 5000);
assert!(parsed.close_stream_duration_ms.is_none());
assert!(!parsed.is_close_signal());
}
#[test]
fn test_flight_ack_metadata_with_close_signal() {
let json = r#"{"ack_up_to_offset": 5, "ack_up_to_records": 100, "close_stream_duration_ms": 2000}"#;
let parsed = FlightAckMetadata::from_bytes(json.as_bytes()).unwrap();
assert_eq!(parsed.ack_up_to_offset, 5);
assert_eq!(parsed.ack_up_to_records, 100);
assert_eq!(parsed.close_stream_duration_ms, Some(2000));
assert!(parsed.is_close_signal());
}
#[test]
fn test_flight_ack_metadata_close_signal_only() {
let json =
r#"{"ack_up_to_offset": -1, "ack_up_to_records": 0, "close_stream_duration_ms": 5000}"#;
let parsed = FlightAckMetadata::from_bytes(json.as_bytes()).unwrap();
assert!(parsed.is_close_signal());
assert!(parsed.is_stream_ready());
assert_eq!(parsed.close_stream_duration_ms, Some(5000));
}
#[test]
fn test_flight_ack_metadata_empty_error() {
let result = FlightAckMetadata::from_bytes(&[]);
assert!(result.is_err());
}
}