sos_core/encoding/v1/
event_record.rs

1use crate::{
2    commit::CommitHash, encoding::encoding_error, events::EventRecord,
3    UtcDateTime,
4};
5use async_trait::async_trait;
6use binary_stream::futures::{
7    BinaryReader, BinaryWriter, Decodable, Encodable,
8};
9use std::io::{Result, SeekFrom};
10use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
11
12#[async_trait]
13impl Encodable for EventRecord {
14    async fn encode<W: AsyncWrite + AsyncSeek + Unpin + Send>(
15        &self,
16        writer: &mut BinaryWriter<W>,
17    ) -> Result<()> {
18        // Prepare the bytes for the row length
19        let size_pos = writer.stream_position().await?;
20        writer.write_u32(0).await?;
21
22        // Encodable the time component
23        self.0.encode(&mut *writer).await?;
24
25        // Write the previous commit hash bytes
26        writer.write_bytes(self.1.as_ref()).await?;
27
28        // Write the commit hash bytes
29        writer.write_bytes(self.2.as_ref()).await?;
30
31        // FIXME: ensure the buffer size does not exceed u32
32
33        // Write the data bytes
34        writer.write_u32(self.3.len() as u32).await?;
35        writer.write_bytes(&self.3).await?;
36
37        // Backtrack to size_pos and write new length
38        let row_pos = writer.stream_position().await?;
39        let row_len = row_pos - (size_pos + 4);
40        writer.seek(SeekFrom::Start(size_pos)).await?;
41        writer.write_u32(row_len as u32).await?;
42        writer.seek(SeekFrom::Start(row_pos)).await?;
43
44        // Write out the row len at the end of the record too
45        // so we can support double ended iteration
46        writer.write_u32(row_len as u32).await?;
47
48        Ok(())
49    }
50}
51
52#[async_trait]
53impl Decodable for EventRecord {
54    async fn decode<R: AsyncRead + AsyncSeek + Unpin + Send>(
55        &mut self,
56        reader: &mut BinaryReader<R>,
57    ) -> Result<()> {
58        // Read in the row length
59        let _ = reader.read_u32().await?;
60
61        // Decodable the time component
62        let mut time: UtcDateTime = Default::default();
63        time.decode(&mut *reader).await?;
64
65        // Read the hash bytes
66        let previous: [u8; 32] = reader
67            .read_bytes(32)
68            .await?
69            .as_slice()
70            .try_into()
71            .map_err(encoding_error)?;
72        let commit: [u8; 32] = reader
73            .read_bytes(32)
74            .await?
75            .as_slice()
76            .try_into()
77            .map_err(encoding_error)?;
78
79        // Read the data bytes
80        let length = reader.read_u32().await?;
81        let buffer = reader.read_bytes(length as usize).await?;
82
83        self.0 = time;
84        self.1 = CommitHash(previous);
85        self.2 = CommitHash(commit);
86        self.3 = buffer;
87
88        // Read in the row length appended to the end of the record
89        let _ = reader.read_u32().await?;
90
91        Ok(())
92    }
93}