micromegas_telemetry_sink/
stream_info.rs

1use micromegas_telemetry::stream_info::StreamInfo;
2use micromegas_tracing::event::{EventStream, ExtractDeps, TracingBlock};
3use micromegas_transit::HeterogeneousQueue;
4use micromegas_transit::UserDefinedType;
5use std::collections::HashMap;
6
7fn extract_secondary_udts(
8    secondary_types: &mut HashMap<String, UserDefinedType>,
9    udt: &UserDefinedType,
10) {
11    for secondary in &udt.secondary_udts {
12        secondary_types.insert((*secondary.name).clone(), secondary.clone());
13        extract_secondary_udts(secondary_types, secondary);
14    }
15}
16
17fn flatten_metadata(udts: Vec<UserDefinedType>) -> Vec<UserDefinedType> {
18    let mut secondary_types = HashMap::new();
19    let mut result = vec![];
20    for udt in udts {
21        extract_secondary_udts(&mut secondary_types, &udt);
22        result.push(udt);
23    }
24    for (_k, v) in secondary_types {
25        result.push(v);
26    }
27    result
28}
29
30/// Creates a `StreamInfo` object from an `EventStream`.
31///
32/// This function extracts metadata about the stream, including its process and stream IDs,
33/// and the serialized metadata for its dependencies and objects.
34pub fn make_stream_info<Block>(stream: &EventStream<Block>) -> StreamInfo
35where
36    Block: TracingBlock,
37    <Block as TracingBlock>::Queue: micromegas_transit::HeterogeneousQueue,
38    <<Block as TracingBlock>::Queue as ExtractDeps>::DepsQueue:
39        micromegas_transit::HeterogeneousQueue,
40{
41    let dependencies_meta = flatten_metadata(
42        <<Block as TracingBlock>::Queue as ExtractDeps>::DepsQueue::reflect_contained(),
43    );
44    let obj_meta = flatten_metadata(<Block as TracingBlock>::Queue::reflect_contained());
45    StreamInfo {
46        process_id: stream.process_id().to_owned(),
47        stream_id: stream.stream_id().to_owned(),
48        dependencies_metadata: dependencies_meta,
49        objects_metadata: obj_meta,
50        tags: stream.tags().to_owned(),
51        properties: stream.properties().clone(),
52    }
53}