use serde::{Deserialize, Serialize};
use crate::errors::ZerobusError;
use crate::offset_generator::OffsetId;
use crate::ZerobusResult;
pub const STREAM_READY_OFFSET: OffsetId = -1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlightBatchMetadata {
pub offset_id: OffsetId,
}
impl FlightBatchMetadata {
pub fn new(offset_id: OffsetId) -> Self {
Self { offset_id }
}
#[allow(clippy::result_large_err)]
pub fn to_bytes(&self) -> ZerobusResult<Vec<u8>> {
serde_json::to_vec(self).map_err(|e| {
ZerobusError::InvalidArgument(format!("Failed to serialize FlightBatchMetadata: {}", e))
})
}
#[cfg(test)]
#[allow(clippy::result_large_err)]
pub fn from_bytes(bytes: &[u8]) -> ZerobusResult<Self> {
if bytes.is_empty() {
return Err(ZerobusError::InvalidArgument(
"Empty app_metadata in FlightData - client must provide offset_id".to_string(),
));
}
serde_json::from_slice(bytes).map_err(|e| {
ZerobusError::InvalidArgument(format!("Failed to parse FlightBatchMetadata: {}", e))
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlightAckMetadata {
pub ack_up_to_offset: OffsetId,
pub ack_up_to_records: u64,
}
impl FlightAckMetadata {
#[allow(dead_code)]
pub fn new(ack_up_to_offset: OffsetId, ack_up_to_records: u64) -> Self {
Self {
ack_up_to_offset,
ack_up_to_records,
}
}
#[allow(dead_code)]
pub fn stream_ready() -> Self {
Self {
ack_up_to_offset: STREAM_READY_OFFSET,
ack_up_to_records: 0,
}
}
pub fn is_stream_ready(&self) -> bool {
self.ack_up_to_offset == STREAM_READY_OFFSET
}
#[allow(clippy::result_large_err)]
pub fn from_bytes(bytes: &[u8]) -> ZerobusResult<Self> {
if bytes.is_empty() {
return Err(ZerobusError::InvalidArgument(
"Empty app_metadata in PutResult - server should provide ack offset".to_string(),
));
}
serde_json::from_slice(bytes).map_err(|e| {
ZerobusError::InvalidArgument(format!("Failed to parse FlightAckMetadata: {}", e))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_flight_batch_metadata_roundtrip() {
let metadata = FlightBatchMetadata::new(42);
let bytes = metadata.to_bytes().unwrap();
let parsed = FlightBatchMetadata::from_bytes(&bytes).unwrap();
assert_eq!(parsed.offset_id, 42);
}
#[test]
fn test_flight_batch_metadata_parse() {
let json = r#"{"offset_id": 42}"#;
let parsed = FlightBatchMetadata::from_bytes(json.as_bytes()).unwrap();
assert_eq!(parsed.offset_id, 42);
}
#[test]
fn test_flight_batch_metadata_empty_error() {
let result = FlightBatchMetadata::from_bytes(&[]);
assert!(result.is_err());
}
#[test]
fn test_flight_ack_metadata_parse_with_records() {
let json = r#"{"ack_up_to_offset": 99, "ack_up_to_records": 5000}"#;
let parsed = FlightAckMetadata::from_bytes(json.as_bytes()).unwrap();
assert_eq!(parsed.ack_up_to_offset, 99);
assert_eq!(parsed.ack_up_to_records, 5000);
}
#[test]
fn test_flight_ack_metadata_empty_error() {
let result = FlightAckMetadata::from_bytes(&[]);
assert!(result.is_err());
}
}