1use re_build_info::CrateVersion;
13use re_log_types::{BlueprintActivationCommand, SetStoreInfo};
14
15use crate::ApplicationIdInjector;
16use crate::rrd::CodecError;
17
18pub trait ToTransport {
27 type Output;
28 type Context<'a>;
29
30 fn to_transport(&self, context: Self::Context<'_>) -> Result<Self::Output, CodecError>;
31}
32
33impl ToTransport for re_log_types::LogMsg {
34 type Output = re_protos::log_msg::v1alpha1::log_msg::Msg;
35 type Context<'a> = crate::rrd::Compression;
36
37 fn to_transport(&self, compression: Self::Context<'_>) -> Result<Self::Output, CodecError> {
38 log_msg_app_to_transport(self, compression)
39 }
40}
41
42impl ToTransport for re_log_types::ArrowMsg {
43 type Output = re_protos::log_msg::v1alpha1::ArrowMsg;
44 type Context<'a> = (re_log_types::StoreId, crate::rrd::Compression);
45
46 fn to_transport(
47 &self,
48 (store_id, compression): Self::Context<'_>,
49 ) -> Result<Self::Output, CodecError> {
50 arrow_msg_app_to_transport(self, store_id, compression)
51 }
52}
53
54impl ToTransport for crate::RrdFooter {
55 type Output = re_protos::log_msg::v1alpha1::RrdFooter;
56 type Context<'a> = ();
57
58 fn to_transport(&self, _: Self::Context<'_>) -> Result<Self::Output, CodecError> {
59 let manifests: Result<Vec<_>, _> = self
60 .manifests
61 .values()
62 .map(|manifest| manifest.to_transport(()))
63 .collect();
64
65 Ok(Self::Output {
66 manifests: manifests?,
67 })
68 }
69}
70
71impl ToTransport for crate::RrdManifest {
72 type Output = re_protos::log_msg::v1alpha1::RrdManifest;
73 type Context<'a> = ();
74
75 fn to_transport(&self, (): Self::Context<'_>) -> Result<Self::Output, CodecError> {
76 {
77 self.sanity_check_cheap()?;
78
79 #[cfg(test)]
81 self.sanity_check_heavy()?;
82 }
83
84 let sorbet_schema = re_protos::common::v1alpha1::Schema::try_from(&self.sorbet_schema)
85 .map_err(CodecError::ArrowSerialization)?;
86
87 Ok(Self::Output {
88 store_id: Some(self.store_id.clone().into()),
89 sorbet_schema_sha256: Some(self.sorbet_schema_sha256.to_vec().into()),
90 sorbet_schema: Some(sorbet_schema),
91 data: Some(self.data.clone().into()),
92 })
93 }
94}
95
96pub trait ToApplication {
98 type Output;
99 type Context<'a>;
100
101 fn to_application(&self, context: Self::Context<'_>) -> Result<Self::Output, CodecError>;
102}
103
104impl ToApplication for re_protos::log_msg::v1alpha1::log_msg::Msg {
105 type Output = re_log_types::LogMsg;
106 type Context<'a> = (&'a mut dyn ApplicationIdInjector, Option<CrateVersion>);
107
108 fn to_application(
109 &self,
110 (app_id_injector, patched_version): Self::Context<'_>,
111 ) -> Result<Self::Output, CodecError> {
112 let mut log_msg = log_msg_transport_to_app(app_id_injector, self)?;
113
114 if let Some(patched_version) = patched_version
115 && let re_log_types::LogMsg::SetStoreInfo(msg) = &mut log_msg
116 {
117 msg.info.store_version = Some(patched_version);
122 }
123
124 Ok(log_msg)
125 }
126}
127
128impl ToApplication for re_protos::log_msg::v1alpha1::LogMsg {
129 type Output = re_log_types::LogMsg;
130 type Context<'a> = (&'a mut dyn ApplicationIdInjector, Option<CrateVersion>);
131
132 fn to_application(
133 &self,
134 app_id_injector: Self::Context<'_>,
135 ) -> Result<Self::Output, CodecError> {
136 let Some(msg) = self.msg.as_ref() else {
137 return Err(re_protos::missing_field!(Self, "msg").into());
138 };
139
140 msg.to_application(app_id_injector)
141 }
142}
143
144impl ToApplication for re_protos::log_msg::v1alpha1::ArrowMsg {
145 type Output = re_log_types::ArrowMsg;
146 type Context<'a> = ();
147
148 fn to_application(&self, _context: Self::Context<'_>) -> Result<Self::Output, CodecError> {
149 arrow_msg_transport_to_app(self)
150 }
151}
152
153impl ToApplication for re_protos::log_msg::v1alpha1::RrdFooter {
154 type Output = crate::RrdFooter;
155 type Context<'a> = ();
156
157 fn to_application(&self, _context: Self::Context<'_>) -> Result<Self::Output, CodecError> {
158 let manifests: Result<std::collections::HashMap<_, _>, _> = self
159 .manifests
160 .iter()
161 .map(|manifest| {
162 let manifest = manifest.to_application(())?;
163 Ok::<_, CodecError>((manifest.store_id.clone(), manifest))
164 })
165 .collect();
166
167 Ok(Self::Output {
168 manifests: manifests?,
169 })
170 }
171}
172
173impl ToApplication for re_protos::log_msg::v1alpha1::RrdManifest {
174 type Output = crate::RrdManifest;
175 type Context<'a> = ();
176
177 fn to_application(&self, _context: Self::Context<'_>) -> Result<Self::Output, CodecError> {
178 let store_id = self
179 .store_id
180 .as_ref()
181 .ok_or_else(|| re_protos::missing_field!(Self, "store_id"))?;
182
183 let sorbet_schema = self
184 .sorbet_schema
185 .as_ref()
186 .ok_or_else(|| re_protos::missing_field!(Self, "sorbet_schema"))?;
187
188 let sorbet_schema_sha256 = self
189 .sorbet_schema_sha256
190 .as_ref()
191 .ok_or_else(|| re_protos::missing_field!(Self, "sorbet_schema_sha256"))?;
192 let sorbet_schema_sha256: [u8; 32] = (**sorbet_schema_sha256)
193 .try_into()
194 .map_err(|err| re_protos::invalid_field!(Self, "sorbet_schema_sha256", err))?;
195
196 let data = self
197 .data
198 .as_ref()
199 .ok_or_else(|| re_protos::missing_field!(Self, "data"))?;
200
201 let rrd_manifest = Self::Output {
202 store_id: store_id.clone().try_into()?,
203 sorbet_schema: sorbet_schema
204 .try_into()
205 .map_err(CodecError::ArrowDeserialization)?,
206 sorbet_schema_sha256,
207 data: data.try_into()?,
208 };
209
210 {
211 rrd_manifest.sanity_check_cheap()?;
212
213 #[cfg(test)]
215 rrd_manifest.sanity_check_heavy()?;
216 }
217
218 Ok(rrd_manifest)
219 }
220}
221
222#[tracing::instrument(level = "trace", skip_all)]
232fn log_msg_transport_to_app<I: ApplicationIdInjector + ?Sized>(
233 app_id_injector: &mut I,
234 message: &re_protos::log_msg::v1alpha1::log_msg::Msg,
235) -> Result<re_log_types::LogMsg, CodecError> {
236 re_tracing::profile_function!();
237
238 use re_protos::log_msg::v1alpha1::log_msg::Msg;
239 use re_protos::missing_field;
240
241 match message {
242 Msg::SetStoreInfo(set_store_info) => {
243 let set_store_info: SetStoreInfo = set_store_info.clone().try_into()?;
244 app_id_injector.store_info_received(&set_store_info.info);
245 Ok(re_log_types::LogMsg::SetStoreInfo(set_store_info))
246 }
247
248 Msg::ArrowMsg(arrow_msg) => {
249 let encoded = arrow_msg_transport_to_app(arrow_msg)?;
250
251 let store_id: re_log_types::StoreId = match arrow_msg
253 .store_id
254 .as_ref()
255 .ok_or_else(|| missing_field!(re_protos::log_msg::v1alpha1::ArrowMsg, "store_id"))?
256 .clone()
257 .try_into()
258 {
259 Ok(store_id) => store_id,
260 Err(err) => {
261 let Some(store_id) = app_id_injector.recover_store_id(err.clone()) else {
262 return Err(err.into());
263 };
264
265 store_id
266 }
267 };
268
269 Ok(re_log_types::LogMsg::ArrowMsg(store_id, encoded))
270 }
271
272 Msg::BlueprintActivationCommand(blueprint_activation_command) => {
273 let blueprint_id: re_log_types::StoreId = match blueprint_activation_command
275 .blueprint_id
276 .as_ref()
277 .ok_or_else(|| {
278 missing_field!(
279 re_protos::log_msg::v1alpha1::BlueprintActivationCommand,
280 "blueprint_id"
281 )
282 })?
283 .clone()
284 .try_into()
285 {
286 Ok(store_id) => store_id,
287 Err(err) => {
288 let Some(store_id) = app_id_injector.recover_store_id(err.clone()) else {
289 return Err(err.into());
290 };
291
292 store_id
293 }
294 };
295
296 Ok(re_log_types::LogMsg::BlueprintActivationCommand(
297 BlueprintActivationCommand {
298 blueprint_id,
299 make_active: blueprint_activation_command.make_active,
300 make_default: blueprint_activation_command.make_default,
301 },
302 ))
303 }
304 }
305}
306
307#[tracing::instrument(level = "trace", skip_all)]
309fn arrow_msg_transport_to_app(
310 arrow_msg: &re_protos::log_msg::v1alpha1::ArrowMsg,
311) -> Result<re_log_types::ArrowMsg, CodecError> {
312 re_tracing::profile_function!();
313
314 use re_protos::log_msg::v1alpha1::Encoding;
315
316 if arrow_msg.encoding() != Encoding::ArrowIpc {
317 return Err(CodecError::UnsupportedEncoding);
318 }
319
320 let batch = decode_arrow(
321 &arrow_msg.payload,
322 arrow_msg.uncompressed_size as usize,
323 arrow_msg.compression().into(),
324 )?;
325
326 let chunk_id = re_sorbet::chunk_id_of_schema(batch.schema_ref())?.as_tuid();
327
328 let chunk_batch = re_sorbet::ChunkBatch::try_from(&batch)?;
340
341 Ok(re_log_types::ArrowMsg {
346 chunk_id,
347 batch: chunk_batch.into(),
348 on_release: None,
349 })
350}
351
352#[tracing::instrument(level = "trace", skip_all)]
354fn log_msg_app_to_transport(
355 message: &re_log_types::LogMsg,
356 compression: crate::rrd::Compression,
357) -> Result<re_protos::log_msg::v1alpha1::log_msg::Msg, CodecError> {
358 re_tracing::profile_function!();
359
360 let proto_msg = match message {
361 re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
362 re_protos::log_msg::v1alpha1::log_msg::Msg::SetStoreInfo(set_store_info.clone().into())
363 }
364
365 re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
366 let arrow_msg = arrow_msg_app_to_transport(arrow_msg, store_id.clone(), compression)?;
367 re_protos::log_msg::v1alpha1::log_msg::Msg::ArrowMsg(arrow_msg)
368 }
369
370 re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
371 re_protos::log_msg::v1alpha1::log_msg::Msg::BlueprintActivationCommand(
372 blueprint_activation_command.clone().into(),
373 )
374 }
375 };
376
377 Ok(proto_msg)
378}
379
380#[tracing::instrument(level = "trace", skip_all)]
382fn arrow_msg_app_to_transport(
383 arrow_msg: &re_log_types::ArrowMsg,
384 store_id: re_log_types::StoreId,
385 compression: crate::rrd::Compression,
386) -> Result<re_protos::log_msg::v1alpha1::ArrowMsg, CodecError> {
387 re_tracing::profile_function!();
388
389 let re_log_types::ArrowMsg {
390 chunk_id,
391 batch,
392 on_release: _,
393 } = arrow_msg;
394
395 let payload = encode_arrow(batch, compression)?;
396
397 Ok(re_protos::log_msg::v1alpha1::ArrowMsg {
398 store_id: Some(store_id.into()),
399 chunk_id: Some((*chunk_id).into()),
400 compression: re_protos::log_msg::v1alpha1::Compression::from(compression) as i32,
401 uncompressed_size: payload.uncompressed_size,
402 encoding: re_protos::log_msg::v1alpha1::Encoding::ArrowIpc as i32,
403 payload: payload.data.into(),
404 is_static: re_sorbet::is_static_chunk(batch),
405 })
406}
407
408struct EncodedArrowRecordBatch {
411 uncompressed_size: u64,
412 data: Vec<u8>,
413}
414
415#[tracing::instrument(level = "debug", skip_all)]
417fn encode_arrow(
418 batch: &arrow::array::RecordBatch,
419 compression: crate::rrd::Compression,
420) -> Result<EncodedArrowRecordBatch, CodecError> {
421 re_tracing::profile_function!();
422
423 let mut uncompressed = Vec::new();
424 {
425 let schema = batch.schema_ref().as_ref();
426
427 let mut sw = {
428 let _span = tracing::trace_span!("schema").entered();
429 ::arrow::ipc::writer::StreamWriter::try_new(&mut uncompressed, schema)
430 .map_err(CodecError::ArrowSerialization)?
431 };
432
433 {
434 let _span = tracing::trace_span!("data").entered();
435 sw.write(batch).map_err(CodecError::ArrowSerialization)?;
436 }
437
438 sw.finish().map_err(CodecError::ArrowSerialization)?;
439 }
440
441 let uncompressed_size = uncompressed.len().try_into()?;
444
445 let data = match compression {
446 crate::rrd::Compression::Off => uncompressed,
447 crate::rrd::Compression::LZ4 => {
448 re_tracing::profile_scope!("lz4::compress");
449 let _span = tracing::trace_span!("lz4::compress").entered();
450 lz4_flex::block::compress(&uncompressed)
451 }
452 };
453
454 Ok(EncodedArrowRecordBatch {
455 uncompressed_size,
456 data,
457 })
458}
459
460#[tracing::instrument(level = "debug", skip_all)]
466fn decode_arrow(
467 data: &[u8],
468 uncompressed_size: usize,
469 compression: crate::rrd::Compression,
470) -> Result<arrow::array::RecordBatch, CodecError> {
471 let mut uncompressed = Vec::new();
472 let data = match compression {
473 crate::rrd::Compression::Off => data,
474 crate::rrd::Compression::LZ4 => {
475 re_tracing::profile_scope!("LZ4-decompress");
476 let _span = tracing::trace_span!("lz4::decompress").entered();
477 uncompressed.resize(uncompressed_size, 0);
478 lz4_flex::block::decompress_into(data, &mut uncompressed)?;
479 uncompressed.as_slice()
480 }
481 };
482
483 let mut stream = {
484 let _span = tracing::trace_span!("schema").entered();
485 ::arrow::ipc::reader::StreamReader::try_new(data, None)
486 .map_err(CodecError::ArrowDeserialization)?
487 };
488
489 let _span = tracing::trace_span!("data").entered();
490 stream
491 .next()
492 .ok_or(CodecError::MissingRecordBatch)?
493 .map_err(CodecError::ArrowDeserialization)
494}