use bytes::{Bytes, BytesMut};
use crate::owned::remote_log_segment_metadata_record::RemoteLogSegmentMetadataRecord;
use crate::owned::remote_log_segment_metadata_snapshot_record::RemoteLogSegmentMetadataSnapshotRecord;
use crate::owned::remote_log_segment_metadata_update_record::RemoteLogSegmentMetadataUpdateRecord;
use crate::owned::remote_partition_delete_metadata_record::RemotePartitionDeleteMetadataRecord;
use crate::records::metadata::envelope::{decode_value_header, encode_value};
use crate::{Decode, Encode, ProtocolError};
#[derive(Debug, Clone, PartialEq)]
pub enum RemoteLogMetadataRecord {
SegmentMetadata(RemoteLogSegmentMetadataRecord), SegmentMetadataUpdate(RemoteLogSegmentMetadataUpdateRecord), PartitionDelete(RemotePartitionDeleteMetadataRecord), SegmentMetadataSnapshot(RemoteLogSegmentMetadataSnapshotRecord), Unknown {
api_key: u32,
api_version: u32,
body: Bytes,
},
}
fn api_version_to_i16(version: u32) -> Result<i16, ProtocolError> {
i16::try_from(version)
.map_err(|_| ProtocolError::SchemaMismatch("remote-log metadata record apiVersion"))
}
impl RemoteLogMetadataRecord {
#[must_use]
pub fn api_key(&self) -> u32 {
match self {
Self::SegmentMetadata(_) => 0,
Self::SegmentMetadataUpdate(_) => 1,
Self::PartitionDelete(_) => 2,
Self::SegmentMetadataSnapshot(_) => 3,
Self::Unknown { api_key, .. } => *api_key,
}
}
pub fn encode_value(&self) -> Result<Bytes, ProtocolError> {
let api_key = self.api_key();
let v: i16 = 0;
let mut body = BytesMut::new();
match self {
Self::SegmentMetadata(r) => r.encode(&mut body, v)?,
Self::SegmentMetadataUpdate(r) => r.encode(&mut body, v)?,
Self::PartitionDelete(r) => r.encode(&mut body, v)?,
Self::SegmentMetadataSnapshot(r) => r.encode(&mut body, v)?,
Self::Unknown {
body: raw,
api_version,
..
} => {
return Ok(encode_value(api_key, *api_version, raw));
}
}
Ok(encode_value(api_key, 0, &body))
}
pub fn decode_value(value: &[u8]) -> Result<Self, ProtocolError> {
let mut cur: &[u8] = value;
let hdr = decode_value_header(&mut cur)
.map_err(|_| ProtocolError::SchemaMismatch("remote-log metadata record envelope"))?;
let v = api_version_to_i16(hdr.api_version)?;
let rec = match hdr.api_key {
0 => Self::SegmentMetadata(RemoteLogSegmentMetadataRecord::decode(&mut cur, v)?),
1 => Self::SegmentMetadataUpdate(RemoteLogSegmentMetadataUpdateRecord::decode(
&mut cur, v,
)?),
2 => Self::PartitionDelete(RemotePartitionDeleteMetadataRecord::decode(&mut cur, v)?),
3 => Self::SegmentMetadataSnapshot(RemoteLogSegmentMetadataSnapshotRecord::decode(
&mut cur, v,
)?),
other => {
return Ok(Self::Unknown {
api_key: other,
api_version: hdr.api_version,
body: Bytes::copy_from_slice(cur),
});
}
};
if !cur.is_empty() {
return Err(ProtocolError::SchemaMismatch(
"trailing bytes after remote-log metadata record body",
));
}
Ok(rec)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn segment_record_roundtrips_through_dispatch() {
let rec =
RemoteLogMetadataRecord::SegmentMetadata(RemoteLogSegmentMetadataRecord::default());
let bytes = rec.encode_value().expect("encodes");
let back = RemoteLogMetadataRecord::decode_value(&bytes).expect("decodes");
assert!(back == rec);
}
#[test]
fn unknown_api_key_is_preserved() {
use crate::records::metadata::envelope::encode_value;
let bytes = encode_value(99, 0, &[0xAA]);
let rec = RemoteLogMetadataRecord::decode_value(&bytes).expect("decodes");
assert!(matches!(
rec,
RemoteLogMetadataRecord::Unknown { api_key: 99, .. }
));
}
}