use crate::error::{Error, Result};
use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, NaiveDateTime};
use std::convert::TryFrom;
use std::time::Duration;
pub const METADATA_LENGTH_SIZE: usize = 4;
const NUMBER_SIZE: usize = 4;
const OFFSET_SIZE: usize = 8;
const LENGTH_SIZE: usize = 8;
const DIGEST_LENGTH_SIZE: usize = 4;
const PARENT_ID_LENGTH_SIZE: usize = 4;
const TRAFFIC_TYPE_SIZE: usize = 1;
const COST_SIZE: usize = 8;
const CREATED_AT_SIZE: usize = 8;
#[derive(Debug, Clone)]
pub struct CachePieceContent {
metadata_length: u32,
metadata: CachePieceMetadata,
}
impl CachePieceContent {
#[allow(clippy::too_many_arguments)]
pub fn new(
number: u32,
offset: u64,
length: u64,
digest: String,
parent_id: String,
traffic_type: u8,
cost: Duration,
created_at: NaiveDateTime,
) -> Self {
Self {
metadata_length: (NUMBER_SIZE
+ OFFSET_SIZE
+ LENGTH_SIZE
+ DIGEST_LENGTH_SIZE
+ digest.len()
+ PARENT_ID_LENGTH_SIZE
+ parent_id.len()
+ TRAFFIC_TYPE_SIZE
+ COST_SIZE
+ CREATED_AT_SIZE) as u32,
metadata: CachePieceMetadata {
number,
offset,
length,
digest,
parent_id,
traffic_type,
cost,
created_at,
},
}
}
pub fn metadata(&self) -> CachePieceMetadata {
self.metadata.clone()
}
pub fn metadata_len(&self) -> u32 {
self.metadata_length
}
pub fn is_empty(&self) -> bool {
self.metadata.length == 0
}
}
impl TryFrom<Bytes> for CachePieceContent {
type Error = Error;
fn try_from(bytes: Bytes) -> Result<Self> {
let metadata_length = u32::from_be_bytes(
bytes
.get(..METADATA_LENGTH_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for metadata length".to_string(),
))?
.try_into()?,
);
if bytes.len() != METADATA_LENGTH_SIZE + metadata_length as usize {
return Err(Error::InvalidPacket(format!(
"expected {} bytes for CachePieceContent, got {}",
METADATA_LENGTH_SIZE + metadata_length as usize,
bytes.len()
)));
}
let metadata = (
bytes.slice(METADATA_LENGTH_SIZE..METADATA_LENGTH_SIZE + metadata_length as usize),
metadata_length,
)
.try_into()?;
Ok(CachePieceContent {
metadata_length,
metadata,
})
}
}
impl From<CachePieceContent> for Bytes {
fn from(content: CachePieceContent) -> Bytes {
let (metadata_bytes, metadata_length) = content.metadata.into();
let mut bytes = BytesMut::with_capacity(METADATA_LENGTH_SIZE + metadata_length as usize);
bytes.put_u32(metadata_length);
bytes.extend_from_slice(&metadata_bytes);
bytes.freeze()
}
}
#[derive(Debug, Clone)]
pub struct CachePieceMetadata {
pub number: u32,
pub offset: u64,
pub length: u64,
pub digest: String,
pub parent_id: String,
pub traffic_type: u8,
pub cost: Duration,
pub created_at: NaiveDateTime,
}
impl CachePieceMetadata {
#[allow(clippy::too_many_arguments)]
pub fn new(
number: u32,
offset: u64,
length: u64,
digest: String,
parent_id: String,
traffic_type: u8,
cost: Duration,
created_at: NaiveDateTime,
) -> Self {
Self {
number,
offset,
length,
digest,
parent_id,
traffic_type,
cost,
created_at,
}
}
}
impl TryFrom<(Bytes, u32)> for CachePieceMetadata {
type Error = Error;
fn try_from(input: (Bytes, u32)) -> Result<Self> {
let (bytes, length) = input;
if bytes.len() != length as usize {
return Err(Error::InvalidLength(format!(
"expected {} bytes for CachePieceMetadata, got {}",
length,
bytes.len()
)));
}
let mut bytes_offset = 0;
let number = u32::from_be_bytes(
bytes
.get(bytes_offset..bytes_offset + NUMBER_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for piece number".to_string(),
))?
.try_into()?,
);
bytes_offset += NUMBER_SIZE;
let offset = u64::from_be_bytes(
bytes
.get(bytes_offset..bytes_offset + OFFSET_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for piece offset".to_string(),
))?
.try_into()?,
);
bytes_offset += OFFSET_SIZE;
let length = u64::from_be_bytes(
bytes
.get(bytes_offset..bytes_offset + LENGTH_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for piece length".to_string(),
))?
.try_into()?,
);
bytes_offset += LENGTH_SIZE;
let digest_length = u32::from_be_bytes(
bytes
.get(bytes_offset..bytes_offset + DIGEST_LENGTH_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for digest length".to_string(),
))?
.try_into()?,
) as usize;
bytes_offset += DIGEST_LENGTH_SIZE;
let digest = String::from_utf8(
bytes
.get(bytes_offset..bytes_offset + digest_length)
.ok_or(Error::InvalidPacket(
"insufficient bytes for digest length".to_string(),
))?
.to_vec(),
)?;
bytes_offset += digest_length;
let parent_id_length = u32::from_be_bytes(
bytes
.get(bytes_offset..bytes_offset + PARENT_ID_LENGTH_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for parent id length".to_string(),
))?
.try_into()?,
) as usize;
bytes_offset += PARENT_ID_LENGTH_SIZE;
let parent_id = String::from_utf8(
bytes
.get(bytes_offset..bytes_offset + parent_id_length)
.ok_or(Error::InvalidPacket(
"insufficient bytes for parent id".to_string(),
))?
.to_vec(),
)?;
bytes_offset += parent_id_length;
let traffic_type = bytes
.get(bytes_offset)
.ok_or(Error::InvalidPacket(
"insufficient bytes for traffic type".to_string(),
))?
.to_owned();
bytes_offset += TRAFFIC_TYPE_SIZE;
let cost = Duration::from_secs(u64::from_be_bytes(
bytes
.get(bytes_offset..bytes_offset + COST_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for cost".to_string(),
))?
.try_into()?,
));
bytes_offset += COST_SIZE;
let created_at = DateTime::from_timestamp(
i64::from_be_bytes(
bytes
.get(bytes_offset..bytes_offset + CREATED_AT_SIZE)
.ok_or(Error::InvalidPacket(
"insufficient bytes for created_at".to_string(),
))?
.try_into()?,
),
0,
)
.ok_or_else(|| Error::InvalidPacket("invalid timestamp for created_at".to_string()))?
.naive_utc();
Ok(CachePieceMetadata {
number,
offset,
length,
digest,
parent_id,
traffic_type,
cost,
created_at,
})
}
}
impl From<CachePieceMetadata> for (Bytes, u32) {
fn from(metadata: CachePieceMetadata) -> (Bytes, u32) {
let CachePieceMetadata {
number,
offset,
length,
digest,
parent_id,
traffic_type,
cost,
created_at,
} = metadata;
let parent_id = parent_id.as_bytes();
let bytes_length = NUMBER_SIZE
+ OFFSET_SIZE
+ LENGTH_SIZE
+ DIGEST_LENGTH_SIZE
+ digest.len()
+ PARENT_ID_LENGTH_SIZE
+ parent_id.len()
+ TRAFFIC_TYPE_SIZE
+ COST_SIZE
+ CREATED_AT_SIZE;
let mut bytes = BytesMut::with_capacity(bytes_length);
bytes.put_u32(number);
bytes.put_u64(offset);
bytes.put_u64(length);
bytes.put_u32(digest.len() as u32);
bytes.extend_from_slice(digest.as_bytes());
bytes.put_u32(parent_id.len() as u32);
bytes.extend_from_slice(parent_id);
bytes.put_u8(traffic_type);
bytes.put_u64(cost.as_secs());
bytes.put_i64(created_at.and_utc().timestamp());
(bytes.freeze(), bytes_length as u32)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use std::time::Duration;
fn create_test_cache_piece_content() -> CachePieceContent {
CachePieceContent::new(
42,
1024,
2048,
"a".repeat(32),
"test_parent_id".to_string(),
1,
Duration::from_secs(5),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
)
}
fn create_test_metadata() -> CachePieceMetadata {
CachePieceMetadata {
number: 42,
offset: 1024,
length: 2048,
digest: "a".repeat(32),
parent_id: "test_parent_id".to_string(),
traffic_type: 1,
cost: Duration::from_secs(5),
created_at: DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
}
}
#[test]
fn test_cache_piece_content_conversion_roundtrip() {
let original = create_test_cache_piece_content();
let bytes = Bytes::from(original.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata().number, original.metadata().number);
assert_eq!(result.metadata().offset, original.metadata().offset);
assert_eq!(result.metadata().length, original.metadata().length);
assert_eq!(result.metadata().digest, original.metadata().digest);
assert_eq!(result.metadata().parent_id, original.metadata().parent_id);
assert_eq!(
result.metadata().traffic_type,
original.metadata().traffic_type
);
assert_eq!(result.metadata().cost, original.metadata().cost);
assert_eq!(result.metadata().created_at, original.metadata().created_at);
assert_eq!(result.metadata_len(), original.metadata_len());
}
#[test]
fn test_cache_piece_content_try_from_insufficient_bytes_for_metadata_length() {
let short_bytes = Bytes::from(vec![0u8; 4]); let result = CachePieceContent::try_from(short_bytes);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Error::InvalidPacket(_)));
}
#[test]
fn test_cache_piece_content_try_from_insufficient_metadata_bytes() {
let mut bytes = BytesMut::new();
bytes.put_u32(100); bytes.put(&vec![0u8; 50][..]);
let result = CachePieceContent::try_from(bytes.freeze());
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Error::InvalidPacket(_)));
}
#[test]
fn test_cache_piece_content_with_empty_parent_id() {
let cache_piece_content = CachePieceContent::new(
1,
0,
100,
"b".repeat(32),
String::new(), 2,
Duration::from_secs(1),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let bytes = Bytes::from(cache_piece_content.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata().parent_id, "");
assert_eq!(result.metadata().number, 1);
assert_eq!(result.metadata().traffic_type, 2);
assert_eq!(result.metadata().length, 100);
}
#[test]
fn test_cache_piece_content_with_long_parent_id() {
let long_parent_id = "x".repeat(1000);
let cache_piece_content = CachePieceContent::new(
999,
12345,
67890,
"c".repeat(32),
long_parent_id.clone(),
255,
Duration::from_secs(3600),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let bytes = Bytes::from(cache_piece_content.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata().parent_id, long_parent_id);
assert_eq!(result.metadata().number, 999);
assert_eq!(result.metadata().traffic_type, 255);
assert_eq!(result.metadata().length, 67890);
}
#[test]
fn test_cache_piece_content_with_zero_values() {
let cache_piece_content = CachePieceContent::new(
0,
0,
0,
"d".repeat(32),
"zero_test".to_string(),
0,
Duration::from_secs(0),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let bytes = Bytes::from(cache_piece_content.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata().number, 0);
assert_eq!(result.metadata().offset, 0);
assert_eq!(result.metadata().length, 0);
assert_eq!(result.metadata().traffic_type, 0);
assert_eq!(result.metadata().cost, Duration::from_secs(0));
assert!(result.is_empty());
}
#[test]
fn test_cache_piece_content_with_max_values() {
let cache_piece_content = CachePieceContent::new(
u32::MAX,
u64::MAX,
u64::MAX,
"e".repeat(32),
"max_values_test".to_string(),
u8::MAX,
Duration::from_secs(u64::MAX),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let bytes = Bytes::from(cache_piece_content.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata().number, u32::MAX);
assert_eq!(result.metadata().offset, u64::MAX);
assert_eq!(result.metadata().length, u64::MAX);
assert_eq!(result.metadata().traffic_type, u8::MAX);
assert_eq!(result.metadata().cost, Duration::from_secs(u64::MAX));
}
#[test]
fn test_cache_piece_content_metadata_length_calculation() {
let cache_piece_content = CachePieceContent::new(
123,
456,
789,
"f".repeat(32),
"length_test".to_string(),
42,
Duration::from_secs(100),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let expected_length = (NUMBER_SIZE
+ OFFSET_SIZE
+ LENGTH_SIZE
+ DIGEST_LENGTH_SIZE
+ 32 + PARENT_ID_LENGTH_SIZE
+ "length_test".len()
+ TRAFFIC_TYPE_SIZE
+ COST_SIZE
+ CREATED_AT_SIZE) as u32;
assert_eq!(cache_piece_content.metadata_len(), expected_length);
let bytes = Bytes::from(cache_piece_content.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata_len(), expected_length);
}
#[test]
fn test_cache_piece_content_with_short_digest() {
let cache_piece_content = CachePieceContent::new(
1,
0,
100,
"short".to_string(), "test".to_string(),
1,
Duration::from_secs(1),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let bytes = Bytes::from(cache_piece_content.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata().digest, "short");
assert_eq!(result.metadata().number, 1);
}
#[test]
fn test_cache_piece_content_with_long_digest() {
let long_digest = "g".repeat(128); let cache_piece_content = CachePieceContent::new(
5,
1000,
2000,
long_digest.clone(),
"digest_test".to_string(),
10,
Duration::from_secs(50),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let bytes = Bytes::from(cache_piece_content.clone());
let result = CachePieceContent::try_from(bytes).unwrap();
assert_eq!(result.metadata().digest, long_digest);
assert_eq!(result.metadata().number, 5);
assert_eq!(result.metadata().offset, 1000);
assert_eq!(result.metadata().length, 2000);
}
#[test]
fn test_cache_piece_content_bytes_structure() {
let cache_piece_content = create_test_cache_piece_content();
let bytes: Bytes = cache_piece_content.clone().into();
let metadata_length_bytes = &bytes[..METADATA_LENGTH_SIZE];
let metadata_length = u32::from_be_bytes(metadata_length_bytes.try_into().unwrap());
assert_eq!(metadata_length, cache_piece_content.metadata_len());
assert_eq!(bytes.len(), METADATA_LENGTH_SIZE + metadata_length as usize);
}
#[test]
fn test_cache_piece_content_new() {
let cache_piece_content = CachePieceContent::new(
42,
1024,
2048,
"a".repeat(32),
"test_parent_id".to_string(),
1,
Duration::from_secs(5),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
assert_eq!(cache_piece_content.metadata().number, 42);
assert_eq!(cache_piece_content.metadata().offset, 1024);
assert_eq!(cache_piece_content.metadata().length, 2048);
assert_eq!(cache_piece_content.metadata().digest, "a".repeat(32));
assert_eq!(cache_piece_content.metadata().parent_id, "test_parent_id");
assert_eq!(cache_piece_content.metadata().traffic_type, 1);
assert_eq!(cache_piece_content.metadata().cost, Duration::from_secs(5));
assert_eq!(
cache_piece_content.metadata_len(),
(NUMBER_SIZE
+ OFFSET_SIZE
+ LENGTH_SIZE
+ DIGEST_LENGTH_SIZE
+ cache_piece_content.metadata().digest.len()
+ PARENT_ID_LENGTH_SIZE
+ cache_piece_content.metadata().parent_id.len()
+ TRAFFIC_TYPE_SIZE
+ COST_SIZE
+ CREATED_AT_SIZE) as u32,
);
}
#[test]
fn test_cache_piece_content_is_empty() {
let empty_cache_piece_content = CachePieceContent::new(
0,
0,
0,
"a".repeat(32),
"test".to_string(),
0,
Duration::from_secs(0),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
let non_empty_cache_piece_content = CachePieceContent::new(
1,
0,
100,
"a".repeat(32),
"test".to_string(),
0,
Duration::from_secs(0),
DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
);
assert!(empty_cache_piece_content.is_empty());
assert!(!non_empty_cache_piece_content.is_empty());
}
#[test]
fn test_cache_piece_metadata_conversion_roundtrip() {
let metadata = create_test_metadata();
let (bytes, length) = <(Bytes, u32)>::from(metadata.clone());
let result = CachePieceMetadata::try_from((bytes, length)).unwrap();
assert_eq!(result.number, metadata.number);
assert_eq!(result.offset, metadata.offset);
assert_eq!(result.length, metadata.length);
assert_eq!(result.digest, metadata.digest);
assert_eq!(result.parent_id, metadata.parent_id);
assert_eq!(result.traffic_type, metadata.traffic_type);
assert_eq!(result.cost, metadata.cost);
assert_eq!(result.created_at, metadata.created_at);
}
#[test]
fn test_cache_piece_metadata_try_from_invalid_length() {
let metadata = create_test_metadata();
let (bytes, correct_length) = <(Bytes, u32)>::from(metadata);
let wrong_length = correct_length + 10;
let result = CachePieceMetadata::try_from((bytes, wrong_length));
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Error::InvalidLength(_)));
}
#[test]
fn test_cache_piece_metadata_try_from_too_short_bytes() {
let short_bytes = Bytes::from(vec![0u8; 10]);
let result = CachePieceMetadata::try_from((short_bytes, 10));
assert!(result.is_err());
}
#[test]
fn test_cache_piece_metadata_with_empty_parent_id() {
let metadata = CachePieceMetadata {
number: 1,
offset: 0,
length: 100,
digest: "b".repeat(32),
parent_id: String::new(),
traffic_type: 2,
cost: Duration::from_secs(1),
created_at: DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
};
let (bytes, length) = <(Bytes, u32)>::from(metadata.clone());
let result = CachePieceMetadata::try_from((bytes, length)).unwrap();
assert_eq!(result.parent_id, "");
assert_eq!(result.number, 1);
assert_eq!(result.traffic_type, 2);
}
#[test]
fn test_cache_piece_metadata_with_long_parent_id() {
let long_parent_id = "x".repeat(1000); let metadata = CachePieceMetadata {
number: 999,
offset: 12345,
length: 67890,
digest: "c".repeat(32),
parent_id: long_parent_id.clone(),
traffic_type: 255,
cost: Duration::from_secs(3600),
created_at: DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
};
let (bytes, length) = <(Bytes, u32)>::from(metadata.clone());
let result = CachePieceMetadata::try_from((bytes, length)).unwrap();
assert_eq!(result.parent_id, long_parent_id);
assert_eq!(result.number, 999);
assert_eq!(result.traffic_type, 255);
}
#[test]
fn test_cache_piece_metadata_with_zero_cost() {
let metadata = CachePieceMetadata {
number: 0,
offset: 0,
length: 0,
digest: "d".repeat(32),
parent_id: "zero_cost_test".to_string(),
traffic_type: 0,
cost: Duration::from_secs(0),
created_at: DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
};
let (bytes, length) = <(Bytes, u32)>::from(metadata.clone());
let result = CachePieceMetadata::try_from((bytes, length)).unwrap();
assert_eq!(result.cost, Duration::from_secs(0));
assert_eq!(result.parent_id, "zero_cost_test");
}
#[test]
fn test_cache_piece_metadata_with_max_values() {
let metadata = CachePieceMetadata {
number: u32::MAX,
offset: u64::MAX,
length: u64::MAX,
digest: "e".repeat(32),
parent_id: "max_values_test".to_string(),
traffic_type: u8::MAX,
cost: Duration::from_secs(u64::MAX),
created_at: DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
};
let (bytes, length) = <(Bytes, u32)>::from(metadata.clone());
let result = CachePieceMetadata::try_from((bytes, length)).unwrap();
assert_eq!(result.number, u32::MAX);
assert_eq!(result.offset, u64::MAX);
assert_eq!(result.length, u64::MAX);
assert_eq!(result.traffic_type, u8::MAX);
assert_eq!(result.cost, Duration::from_secs(u64::MAX));
}
#[test]
fn test_cache_piece_metadata_invalid_utf8_in_digest() {
let metadata_with_short_digest = CachePieceMetadata {
number: 1,
offset: 0,
length: 100,
digest: "short".to_string(),
parent_id: "test".to_string(),
traffic_type: 1,
cost: Duration::from_secs(1),
created_at: DateTime::from_timestamp(1693152000, 0).unwrap().naive_utc(),
};
let (bytes, length) = <(Bytes, u32)>::from(metadata_with_short_digest);
let result = CachePieceMetadata::try_from((bytes, length));
assert!(result.is_ok());
}
}