micromegas_analytics/
payload.rs

1use anyhow::{Context, Result};
2use micromegas_telemetry::{
3    blob_storage::BlobStorage, compression::decompress, stream_info::StreamInfo,
4};
5use micromegas_tracing::{parsing::make_custom_readers, prelude::*};
6use micromegas_transit::{parse_object_buffer, read_dependencies, value::Value};
7use std::sync::Arc;
8
9#[span_fn]
10pub async fn fetch_block_payload(
11    blob_storage: Arc<BlobStorage>,
12    process_id: sqlx::types::Uuid,
13    stream_id: sqlx::types::Uuid,
14    block_id: sqlx::types::Uuid,
15) -> Result<micromegas_telemetry::block_wire_format::BlockPayload> {
16    let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
17    let buffer: Vec<u8> = blob_storage
18        .read_blob(&obj_path)
19        .await
20        .with_context(|| "reading block payload from blob storage")?
21        .into();
22    {
23        span_scope!("decode");
24        let payload: micromegas_telemetry::block_wire_format::BlockPayload =
25            ciborium::from_reader(&buffer[..])
26                .with_context(|| format!("reading payload {}", &block_id))?;
27        Ok(payload)
28    }
29}
30
31// parse_block calls fun for each object in the block until fun returns `false`
32#[span_fn]
33pub fn parse_block<F>(
34    stream: &StreamInfo,
35    payload: &micromegas_telemetry::block_wire_format::BlockPayload,
36    fun: F,
37) -> Result<bool>
38where
39    F: FnMut(Value) -> Result<bool>,
40{
41    let dep_udts = &stream.dependencies_metadata;
42    let custom_readers = make_custom_readers();
43    let dependencies = read_dependencies(
44        &custom_readers,
45        dep_udts,
46        &decompress(&payload.dependencies).with_context(|| "decompressing dependencies payload")?,
47    )
48    .with_context(|| "reading dependencies")?;
49    let obj_udts = &stream.objects_metadata;
50    let continue_iterating = parse_object_buffer(
51        &custom_readers,
52        &dependencies,
53        obj_udts,
54        &decompress(&payload.objects).with_context(|| "decompressing objects payload")?,
55        fun,
56    )
57    .with_context(|| "parsing object buffer")?;
58    Ok(continue_iterating)
59}