re_log_encoding/
encoder.rs1use crate::codec;
4use crate::codec::file::{self, encoder};
5use crate::FileHeader;
6use crate::Serializer;
7use crate::{Compression, EncodingOptions};
8use re_build_info::CrateVersion;
9use re_chunk::{ChunkError, ChunkResult};
10use re_log_types::LogMsg;
11
12#[derive(thiserror::Error, Debug)]
16pub enum EncodeError {
17 #[error("Failed to write: {0}")]
18 Write(#[from] std::io::Error),
19
20 #[error("lz4 error: {0}")]
21 Lz4(#[from] lz4_flex::block::CompressError),
22
23 #[error("Protobuf error: {0}")]
24 Protobuf(#[from] re_protos::external::prost::EncodeError),
25
26 #[error("Arrow error: {0}")]
27 Arrow(#[from] arrow::error::ArrowError),
28
29 #[error("{0}")]
30 Codec(#[from] codec::CodecError),
31
32 #[error("Chunk error: {0}")]
33 Chunk(#[from] ChunkError),
34
35 #[error("Called append on already finished encoder")]
36 AlreadyFinished,
37}
38
39pub fn encode_to_bytes<'a>(
42 version: CrateVersion,
43 options: EncodingOptions,
44 msgs: impl IntoIterator<Item = &'a LogMsg>,
45) -> Result<Vec<u8>, EncodeError> {
46 let mut bytes: Vec<u8> = vec![];
47 {
48 let mut encoder = Encoder::new(version, options, std::io::Cursor::new(&mut bytes))?;
49 for msg in msgs {
50 encoder.append(msg)?;
51 }
52 }
53 Ok(bytes)
54}
55
56pub struct DroppableEncoder<W: std::io::Write> {
62 encoder: Encoder<W>,
63
64 is_finished: bool,
66}
67
68impl<W: std::io::Write> DroppableEncoder<W> {
69 #[inline]
70 pub fn new(
71 version: CrateVersion,
72 options: EncodingOptions,
73 write: W,
74 ) -> Result<Self, EncodeError> {
75 Ok(Self {
76 encoder: Encoder::new(version, options, write)?,
77 is_finished: false,
78 })
79 }
80
81 #[inline]
83 pub fn append(&mut self, message: &LogMsg) -> Result<u64, EncodeError> {
84 self.encoder.append(message)
85 }
86
87 #[inline]
88 pub fn finish(&mut self) -> Result<(), EncodeError> {
89 if !self.is_finished {
90 self.encoder.finish()?;
91 }
92
93 self.is_finished = true;
94
95 Ok(())
96 }
97
98 #[inline]
99 pub fn flush_blocking(&mut self) -> std::io::Result<()> {
100 self.encoder.flush_blocking()
101 }
102}
103
104impl<W: std::io::Write> std::ops::Drop for DroppableEncoder<W> {
105 fn drop(&mut self) {
106 if !self.is_finished {
107 if let Err(err) = self.finish() {
108 re_log::warn!("encoder couldn't be finished: {err}");
109 }
110 }
111 }
112}
113
114pub struct Encoder<W: std::io::Write> {
119 serializer: Serializer,
120 compression: Compression,
121 write: W,
122 scratch: Vec<u8>,
123}
124
125impl<W: std::io::Write> Encoder<W> {
126 pub fn new(
127 version: CrateVersion,
128 options: EncodingOptions,
129 mut write: W,
130 ) -> Result<Self, EncodeError> {
131 FileHeader {
132 magic: *crate::RRD_HEADER,
133 version: version.to_bytes(),
134 options,
135 }
136 .encode(&mut write)?;
137
138 Ok(Self {
139 serializer: options.serializer,
140 compression: options.compression,
141 write,
142 scratch: Vec::new(),
143 })
144 }
145
146 pub fn append(&mut self, message: &LogMsg) -> Result<u64, EncodeError> {
148 re_tracing::profile_function!();
149
150 self.scratch.clear();
151 match self.serializer {
152 Serializer::Protobuf => {
153 encoder::encode(&mut self.scratch, message, self.compression)?;
154
155 self.write
156 .write_all(&self.scratch)
157 .map(|_| self.scratch.len() as _)
158 .map_err(EncodeError::Write)
159 }
160 }
161 }
162
163 #[inline]
166 pub fn finish(&mut self) -> Result<(), EncodeError> {
167 match self.serializer {
168 Serializer::Protobuf => {
169 file::MessageHeader {
170 kind: file::MessageKind::End,
171 len: 0,
172 }
173 .encode(&mut self.write)?;
174 }
175 }
176 Ok(())
177 }
178
179 #[inline]
180 pub fn flush_blocking(&mut self) -> std::io::Result<()> {
181 self.write.flush()
182 }
183
184 #[inline]
185 pub fn into_inner(self) -> W {
186 self.write
187 }
188}
189
190pub fn encode(
192 version: CrateVersion,
193 options: EncodingOptions,
194 messages: impl Iterator<Item = ChunkResult<LogMsg>>,
195 write: &mut impl std::io::Write,
196) -> Result<u64, EncodeError> {
197 re_tracing::profile_function!();
198 let mut encoder = DroppableEncoder::new(version, options, write)?;
199 let mut size_bytes = 0;
200 for message in messages {
201 size_bytes += encoder.append(&message?)?;
202 }
203 Ok(size_bytes)
204}
205
206pub fn encode_ref<'a>(
208 version: CrateVersion,
209 options: EncodingOptions,
210 messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
211 write: &mut impl std::io::Write,
212) -> Result<u64, EncodeError> {
213 re_tracing::profile_function!();
214 let mut encoder = DroppableEncoder::new(version, options, write)?;
215 let mut size_bytes = 0;
216 for message in messages {
217 size_bytes += encoder.append(message?)?;
218 }
219 Ok(size_bytes)
220}
221
222pub fn encode_as_bytes(
223 version: CrateVersion,
224 options: EncodingOptions,
225 messages: impl Iterator<Item = ChunkResult<LogMsg>>,
226) -> Result<Vec<u8>, EncodeError> {
227 re_tracing::profile_function!();
228 let mut bytes: Vec<u8> = vec![];
229 let mut encoder = Encoder::new(version, options, &mut bytes)?;
230 for message in messages {
231 encoder.append(&message?)?;
232 }
233 encoder.finish()?;
234 Ok(bytes)
235}
236
237#[inline]
238pub fn local_encoder() -> Result<DroppableEncoder<Vec<u8>>, EncodeError> {
239 DroppableEncoder::new(
240 CrateVersion::LOCAL,
241 EncodingOptions::PROTOBUF_COMPRESSED,
242 Vec::new(),
243 )
244}
245
246#[inline]
247pub fn local_raw_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
248 Encoder::new(
249 CrateVersion::LOCAL,
250 EncodingOptions::PROTOBUF_COMPRESSED,
251 Vec::new(),
252 )
253}
254
255#[inline]
256pub fn encode_as_bytes_local(
257 messages: impl Iterator<Item = ChunkResult<LogMsg>>,
258) -> Result<Vec<u8>, EncodeError> {
259 let mut encoder = local_raw_encoder()?;
260 for message in messages {
261 encoder.append(&message?)?;
262 }
263 encoder.finish()?;
264 Ok(encoder.into_inner())
265}
266
267#[inline]
268pub fn encode_ref_as_bytes_local<'a>(
269 messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
270) -> Result<Vec<u8>, EncodeError> {
271 let mut encoder = local_raw_encoder()?;
272 for message in messages {
273 encoder.append(message?)?;
274 }
275 encoder.finish()?;
276 Ok(encoder.into_inner())
277}