1#[cfg(feature = "decoder")]
4pub mod decoder;
5
6#[cfg(feature = "encoder")]
7pub mod encoder;
8
9pub mod codec;
10
11pub mod protobuf_conversions;
12
13#[cfg(feature = "decoder")]
14pub mod legacy;
15
16#[cfg(feature = "encoder")]
17#[cfg(not(target_arch = "wasm32"))]
18mod file_sink;
19
20#[cfg(feature = "stream_from_http")]
21pub mod stream_rrd_from_http;
22
23#[cfg(feature = "encoder")]
26#[cfg(not(target_arch = "wasm32"))]
27pub use file_sink::{FileSink, FileSinkError};
28
29#[cfg(any(feature = "encoder", feature = "decoder"))]
32const RRD_HEADER: &[u8; 4] = b"RRF2";
33
34#[cfg(feature = "decoder")]
35const OLD_RRD_HEADERS: &[[u8; 4]] = &[*b"RRF0", *b"RRF1"];
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
41#[repr(u8)]
42pub enum Compression {
43 Off = 0,
44
45 LZ4 = 1,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
51#[repr(u8)]
52pub enum Serializer {
53 LegacyMsgPack = 1,
55
56 Protobuf = 2,
57}
58
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub struct EncodingOptions {
61 pub compression: Compression,
62 pub serializer: Serializer,
63}
64
65impl EncodingOptions {
66 pub const PROTOBUF_COMPRESSED: Self = Self {
67 compression: Compression::LZ4,
68 serializer: Serializer::Protobuf,
69 };
70 pub const PROTOBUF_UNCOMPRESSED: Self = Self {
71 compression: Compression::Off,
72 serializer: Serializer::Protobuf,
73 };
74
75 pub fn from_bytes(bytes: [u8; 4]) -> Result<Self, OptionsError> {
76 match bytes {
77 [compression, serializer, 0, 0] => {
78 let compression = match compression {
79 0 => Compression::Off,
80 1 => Compression::LZ4,
81 _ => return Err(OptionsError::UnknownCompression(compression)),
82 };
83 let serializer = match serializer {
84 1 => Serializer::LegacyMsgPack,
85 2 => Serializer::Protobuf,
86 _ => return Err(OptionsError::UnknownSerializer(serializer)),
87 };
88 Ok(Self {
89 compression,
90 serializer,
91 })
92 }
93 _ => Err(OptionsError::UnknownReservedBytes),
94 }
95 }
96
97 pub fn to_bytes(self) -> [u8; 4] {
98 [
99 self.compression as u8,
100 self.serializer as u8,
101 0, 0, ]
104 }
105}
106
107#[derive(thiserror::Error, Debug)]
109#[allow(clippy::enum_variant_names)]
110pub enum OptionsError {
111 #[error("Reserved bytes not zero")]
112 UnknownReservedBytes,
113
114 #[error("Unknown compression: {0}")]
115 UnknownCompression(u8),
116
117 #[error("Unknown serializer: {0}")]
118 UnknownSerializer(u8),
119}
120
121#[cfg(any(feature = "encoder", feature = "decoder"))]
122#[derive(Debug, Clone, Copy)]
123pub(crate) struct FileHeader {
124 pub magic: [u8; 4],
125 pub version: [u8; 4],
126 pub options: EncodingOptions,
127}
128
129#[cfg(any(feature = "encoder", feature = "decoder"))]
130impl FileHeader {
131 #[cfg(feature = "decoder")]
132 pub const SIZE: usize = 12;
133
134 #[cfg(feature = "encoder")]
135 pub fn encode(&self, write: &mut impl std::io::Write) -> Result<(), encoder::EncodeError> {
136 write
137 .write_all(&self.magic)
138 .map_err(encoder::EncodeError::Write)?;
139 write
140 .write_all(&self.version)
141 .map_err(encoder::EncodeError::Write)?;
142 write
143 .write_all(&self.options.to_bytes())
144 .map_err(encoder::EncodeError::Write)?;
145 Ok(())
146 }
147
148 #[cfg(feature = "decoder")]
149 pub fn decode(read: &mut impl std::io::Read) -> Result<Self, decoder::DecodeError> {
150 let to_array_4b = |slice: &[u8]| slice.try_into().expect("always returns an Ok() variant");
151
152 let mut buffer = [0_u8; Self::SIZE];
153 read.read_exact(&mut buffer)
154 .map_err(decoder::DecodeError::Read)?;
155 let magic = to_array_4b(&buffer[0..4]);
156 let version = to_array_4b(&buffer[4..8]);
157 let options = EncodingOptions::from_bytes(to_array_4b(&buffer[8..]))?;
158 Ok(Self {
159 magic,
160 version,
161 options,
162 })
163 }
164}
165
166#[cfg(any(feature = "encoder", feature = "decoder"))]
167#[derive(Clone, Copy, Debug)]
168pub(crate) enum LegacyMessageHeader {
169 Data {
170 compressed_len: u32,
172 uncompressed_len: u32,
173 },
174 EndOfStream,
175}
176
177#[cfg(any(feature = "encoder", feature = "decoder"))]
178impl LegacyMessageHeader {
179 #[cfg(feature = "decoder")]
180 pub const SIZE: usize = 8;
181
182 #[cfg(feature = "decoder")]
183 pub fn decode(read: &mut impl std::io::Read) -> Result<Self, decoder::DecodeError> {
184 let mut buffer = [0_u8; Self::SIZE];
185 read.read_exact(&mut buffer)
186 .map_err(decoder::DecodeError::Read)?;
187
188 Self::from_bytes(&buffer)
189 }
190
191 #[cfg(feature = "decoder")]
193 pub fn from_bytes(data: &[u8]) -> Result<Self, decoder::DecodeError> {
194 if data.len() != 8 {
195 return Err(decoder::DecodeError::Codec(
196 codec::CodecError::HeaderDecoding(std::io::Error::new(
197 std::io::ErrorKind::InvalidData,
198 "invalid header length",
199 )),
200 ));
201 }
202
203 fn u32_from_le_slice(bytes: &[u8]) -> u32 {
204 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
205 }
206
207 if u32_from_le_slice(&data[0..4]) == 0 && u32_from_le_slice(&data[4..]) == 0 {
208 Ok(Self::EndOfStream)
209 } else {
210 let compressed = u32_from_le_slice(&data[0..4]);
211 let uncompressed = u32_from_le_slice(&data[4..]);
212 Ok(Self::Data {
213 compressed_len: compressed,
214 uncompressed_len: uncompressed,
215 })
216 }
217 }
218}