micromegas_analytics/
payload.rs1use anyhow::{Context, Result};
2use micromegas_telemetry::{
3 blob_storage::BlobStorage, compression::decompress, stream_info::StreamInfo,
4};
5use micromegas_tracing::{parsing::make_custom_readers, prelude::*};
6use micromegas_transit::{parse_object_buffer, read_dependencies, value::Value};
7use std::sync::Arc;
8
9#[span_fn]
10pub async fn fetch_block_payload(
11 blob_storage: Arc<BlobStorage>,
12 process_id: sqlx::types::Uuid,
13 stream_id: sqlx::types::Uuid,
14 block_id: sqlx::types::Uuid,
15) -> Result<micromegas_telemetry::block_wire_format::BlockPayload> {
16 let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
17 let buffer: Vec<u8> = blob_storage
18 .read_blob(&obj_path)
19 .await
20 .with_context(|| "reading block payload from blob storage")?
21 .into();
22 {
23 span_scope!("decode");
24 let payload: micromegas_telemetry::block_wire_format::BlockPayload =
25 ciborium::from_reader(&buffer[..])
26 .with_context(|| format!("reading payload {}", &block_id))?;
27 Ok(payload)
28 }
29}
30
31#[span_fn]
33pub fn parse_block<F>(
34 stream: &StreamInfo,
35 payload: µmegas_telemetry::block_wire_format::BlockPayload,
36 fun: F,
37) -> Result<bool>
38where
39 F: FnMut(Value) -> Result<bool>,
40{
41 let dep_udts = &stream.dependencies_metadata;
42 let custom_readers = make_custom_readers();
43 let dependencies = read_dependencies(
44 &custom_readers,
45 dep_udts,
46 &decompress(&payload.dependencies).with_context(|| "decompressing dependencies payload")?,
47 )
48 .with_context(|| "reading dependencies")?;
49 let obj_udts = &stream.objects_metadata;
50 let continue_iterating = parse_object_buffer(
51 &custom_readers,
52 &dependencies,
53 obj_udts,
54 &decompress(&payload.objects).with_context(|| "decompressing objects payload")?,
55 fun,
56 )
57 .with_context(|| "parsing object buffer")?;
58 Ok(continue_iterating)
59}