use crate::{
commit::CommitHash, encoding::encoding_error, events::EventRecord,
UtcDateTime,
};
use async_trait::async_trait;
use binary_stream::futures::{
BinaryReader, BinaryWriter, Decodable, Encodable,
};
use std::io::{Result, SeekFrom};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
#[async_trait]
impl Encodable for EventRecord {
async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
&self,
writer: &mut BinaryWriter<W>,
) -> Result<()> {
let size_pos = writer.stream_position().await?;
writer.write_u32(0).await?;
self.0.encode(&mut *writer).await?;
writer.write_bytes(self.1.as_ref()).await?;
writer.write_bytes(self.2.as_ref()).await?;
writer.write_u32(self.3.len() as u32).await?;
writer.write_bytes(&self.3).await?;
let row_pos = writer.stream_position().await?;
let row_len = row_pos - (size_pos + 4);
writer.seek(SeekFrom::Start(size_pos)).await?;
writer.write_u32(row_len as u32).await?;
writer.seek(SeekFrom::Start(row_pos)).await?;
writer.write_u32(row_len as u32).await?;
Ok(())
}
}
#[async_trait]
impl Decodable for EventRecord {
async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
&mut self,
reader: &mut BinaryReader<R>,
) -> Result<()> {
let _ = reader.read_u32().await?;
let mut time: UtcDateTime = Default::default();
time.decode(&mut *reader).await?;
let previous: [u8; 32] = reader
.read_bytes(32)
.await?
.as_slice()
.try_into()
.map_err(encoding_error)?;
let commit: [u8; 32] = reader
.read_bytes(32)
.await?
.as_slice()
.try_into()
.map_err(encoding_error)?;
let length = reader.read_u32().await?;
let buffer = reader.read_bytes(length as usize).await?;
self.0 = time;
self.1 = CommitHash(previous);
self.2 = CommitHash(commit);
self.3 = buffer;
let _ = reader.read_u32().await?;
Ok(())
}
}