use compact_u64::*;
use ufotofu::{BulkConsumer, BulkProducer};
use ufotofu_codec::{
Blame, DecodableCanonic, DecodeError, Encodable, EncodableKnownSize, EncodableSync,
RelativeDecodable, RelativeDecodableCanonic, RelativeDecodableSync, RelativeEncodable,
RelativeEncodableKnownSize, RelativeEncodableSync,
};
use willow_encoding::is_bitflagged;
use crate::{Entry, NamespaceId, Path, PayloadDigest, SubspaceId};
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
RelativeEncodable<Entry<MCL, MCC, MPL, N, S, PD>> for Entry<MCL, MCC, MPL, N, S, PD>
where
N: NamespaceId + Encodable,
S: SubspaceId + Encodable,
PD: PayloadDigest + Encodable,
{
async fn relative_encode<Consumer>(
&self,
consumer: &mut Consumer,
reference: &Entry<MCL, MCC, MPL, N, S, PD>,
) -> Result<(), Consumer::Error>
where
Consumer: BulkConsumer<Item = u8>,
{
let time_diff = self.timestamp().abs_diff(reference.timestamp());
let mut header: u8 = 0b0000_0000;
if self.namespace_id() != reference.namespace_id() {
header |= 0b1000_0000;
}
if self.subspace_id() != reference.subspace_id() {
header |= 0b0100_0000;
}
if self.timestamp() > reference.timestamp() {
header |= 0b0010_0000;
}
let time_diff_tag = Tag::min_tag(time_diff, TagWidth::two());
let payload_length_tag = Tag::min_tag(self.payload_length(), TagWidth::two());
header |= time_diff_tag.data_at_offset(4);
header |= payload_length_tag.data_at_offset(6);
consumer.consume(header).await?;
if self.namespace_id() != reference.namespace_id() {
self.namespace_id().encode(consumer).await?;
}
if self.subspace_id() != reference.subspace_id() {
self.subspace_id().encode(consumer).await?;
}
self.path()
.relative_encode(consumer, reference.path())
.await?;
CompactU64(time_diff)
.relative_encode(consumer, &time_diff_tag.encoding_width())
.await?;
CompactU64(self.payload_length())
.relative_encode(consumer, &payload_length_tag.encoding_width())
.await?;
self.payload_digest().encode(consumer).await?;
Ok(())
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
RelativeDecodable<Entry<MCL, MCC, MPL, N, S, PD>, Blame> for Entry<MCL, MCC, MPL, N, S, PD>
where
N: NamespaceId + DecodableCanonic,
S: SubspaceId + DecodableCanonic,
PD: PayloadDigest + DecodableCanonic,
Blame: From<N::ErrorReason>
+ From<S::ErrorReason>
+ From<PD::ErrorReason>
+ From<N::ErrorCanonic>
+ From<S::ErrorCanonic>
+ From<PD::ErrorCanonic>,
{
async fn relative_decode<P>(
producer: &mut P,
r: &Entry<MCL, MCC, MPL, N, S, PD>,
) -> Result<Self, DecodeError<P::Final, P::Error, Blame>>
where
P: BulkProducer<Item = u8>,
Self: Sized,
{
relative_decode_maybe_canonic::<false, MCL, MCC, MPL, N, S, PD, P>(producer, r).await
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
RelativeDecodableCanonic<Entry<MCL, MCC, MPL, N, S, PD>, Blame, Blame>
for Entry<MCL, MCC, MPL, N, S, PD>
where
N: NamespaceId + DecodableCanonic,
S: SubspaceId + DecodableCanonic,
PD: PayloadDigest + DecodableCanonic,
Blame: From<N::ErrorReason>
+ From<S::ErrorReason>
+ From<PD::ErrorReason>
+ From<N::ErrorCanonic>
+ From<S::ErrorCanonic>
+ From<PD::ErrorCanonic>,
{
async fn relative_decode_canonic<P>(
producer: &mut P,
r: &Entry<MCL, MCC, MPL, N, S, PD>,
) -> Result<Self, DecodeError<P::Final, P::Error, Blame>>
where
P: BulkProducer<Item = u8>,
Self: Sized,
{
relative_decode_maybe_canonic::<true, MCL, MCC, MPL, N, S, PD, P>(producer, r).await
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
RelativeEncodableKnownSize<Entry<MCL, MCC, MPL, N, S, PD>> for Entry<MCL, MCC, MPL, N, S, PD>
where
N: NamespaceId + EncodableKnownSize,
S: SubspaceId + EncodableKnownSize,
PD: PayloadDigest + EncodableKnownSize,
{
fn relative_len_of_encoding(&self, r: &Entry<MCL, MCC, MPL, N, S, PD>) -> usize {
let namespace_len = if self.namespace_id() != r.namespace_id() {
self.namespace_id().len_of_encoding()
} else {
0
};
let subspace_len = if self.subspace_id() != r.subspace_id() {
self.subspace_id().len_of_encoding()
} else {
0
};
let path_len = self.path().relative_len_of_encoding(r.path());
let time_diff = self.timestamp().abs_diff(r.timestamp());
let time_diff_tag = Tag::min_tag(time_diff, TagWidth::two());
let time_diff_len =
CompactU64(time_diff).relative_len_of_encoding(&time_diff_tag.encoding_width());
let payload_length_tag = Tag::min_tag(self.payload_length(), TagWidth::two());
let payload_length_len = CompactU64(self.payload_length())
.relative_len_of_encoding(&payload_length_tag.encoding_width());
let payload_digest_len = self.payload_digest().len_of_encoding();
1 + namespace_len
+ subspace_len
+ path_len
+ time_diff_len
+ payload_length_len
+ payload_digest_len
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
RelativeEncodableSync<Entry<MCL, MCC, MPL, N, S, PD>> for Entry<MCL, MCC, MPL, N, S, PD>
where
N: NamespaceId + EncodableSync,
S: SubspaceId + EncodableSync,
PD: PayloadDigest + EncodableSync,
{
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
RelativeDecodableSync<Entry<MCL, MCC, MPL, N, S, PD>, Blame> for Entry<MCL, MCC, MPL, N, S, PD>
where
N: NamespaceId + DecodableCanonic,
S: SubspaceId + DecodableCanonic,
PD: PayloadDigest + DecodableCanonic,
Blame: From<N::ErrorReason>
+ From<S::ErrorReason>
+ From<PD::ErrorReason>
+ From<N::ErrorCanonic>
+ From<S::ErrorCanonic>
+ From<PD::ErrorCanonic>,
{
}
async fn relative_decode_maybe_canonic<
const CANONIC: bool,
const MCL: usize,
const MCC: usize,
const MPL: usize,
N,
S,
PD,
P,
>(
producer: &mut P,
r: &Entry<MCL, MCC, MPL, N, S, PD>,
) -> Result<Entry<MCL, MCC, MPL, N, S, PD>, DecodeError<P::Final, P::Error, Blame>>
where
P: BulkProducer<Item = u8>,
N: NamespaceId + DecodableCanonic,
S: SubspaceId + DecodableCanonic,
PD: PayloadDigest + DecodableCanonic,
Blame: From<N::ErrorReason>
+ From<S::ErrorReason>
+ From<PD::ErrorReason>
+ From<N::ErrorCanonic>
+ From<S::ErrorCanonic>
+ From<PD::ErrorCanonic>,
{
let header = producer.produce_item().await?;
if CANONIC && is_bitflagged(header, 3) {
return Err(DecodeError::Other(Blame::TheirFault));
}
let is_namespace_encoded = is_bitflagged(header, 0);
let is_subspace_encoded = is_bitflagged(header, 1);
let add_or_subtract_time_diff = is_bitflagged(header, 2);
let time_diff_tag = Tag::from_raw(header, TagWidth::two(), 4);
let payload_length_tag = Tag::from_raw(header, TagWidth::two(), 6);
let namespace_id = if is_namespace_encoded {
if CANONIC {
N::decode_canonic(producer)
.await
.map_err(DecodeError::map_other_from)?
} else {
N::decode(producer)
.await
.map_err(DecodeError::map_other_from)?
}
} else {
r.namespace_id().clone()
};
if CANONIC && is_namespace_encoded && &namespace_id == r.namespace_id() {
return Err(DecodeError::Other(Blame::TheirFault));
}
let subspace_id = if is_subspace_encoded {
if CANONIC {
S::decode_canonic(producer)
.await
.map_err(DecodeError::map_other_from)?
} else {
S::decode(producer)
.await
.map_err(DecodeError::map_other_from)?
}
} else {
r.subspace_id().clone()
};
if CANONIC && is_subspace_encoded && &subspace_id == r.subspace_id() {
return Err(DecodeError::Other(Blame::TheirFault));
}
let path = if CANONIC {
Path::<MCL, MCC, MPL>::relative_decode_canonic(producer, r.path()).await?
} else {
Path::<MCL, MCC, MPL>::relative_decode(producer, r.path()).await?
};
let time_diff = if CANONIC {
CompactU64::relative_decode_canonic(producer, &time_diff_tag)
.await
.map_err(DecodeError::map_other_from)?
.0
} else {
CompactU64::relative_decode(producer, &time_diff_tag)
.await
.map_err(DecodeError::map_other_from)?
.0
};
let timestamp = if add_or_subtract_time_diff {
r.timestamp()
.checked_add(time_diff)
.ok_or(DecodeError::Other(Blame::TheirFault))?
} else {
r.timestamp()
.checked_sub(time_diff)
.ok_or(DecodeError::Other(Blame::TheirFault))?
};
if CANONIC {
let should_have_subtracted = timestamp <= r.timestamp();
if add_or_subtract_time_diff && should_have_subtracted {
return Err(DecodeError::Other(Blame::TheirFault));
}
}
let payload_length = if CANONIC {
CompactU64::relative_decode_canonic(producer, &payload_length_tag)
.await
.map_err(DecodeError::map_other_from)?
.0
} else {
CompactU64::relative_decode(producer, &payload_length_tag)
.await
.map_err(DecodeError::map_other_from)?
.0
};
let payload_digest = if CANONIC {
PD::decode_canonic(producer)
.await
.map_err(DecodeError::map_other_from)?
} else {
PD::decode(producer)
.await
.map_err(DecodeError::map_other_from)?
};
Ok(Entry::new(
namespace_id,
subspace_id,
path,
timestamp,
payload_length,
payload_digest,
))
}