micromegas_telemetry_sink/
stream_info.rs1use micromegas_telemetry::stream_info::StreamInfo;
2use micromegas_tracing::event::{EventStream, ExtractDeps, TracingBlock};
3use micromegas_transit::HeterogeneousQueue;
4use micromegas_transit::UserDefinedType;
5use std::collections::HashMap;
6
7fn extract_secondary_udts(
8 secondary_types: &mut HashMap<String, UserDefinedType>,
9 udt: &UserDefinedType,
10) {
11 for secondary in &udt.secondary_udts {
12 secondary_types.insert((*secondary.name).clone(), secondary.clone());
13 extract_secondary_udts(secondary_types, secondary);
14 }
15}
16
17fn flatten_metadata(udts: Vec<UserDefinedType>) -> Vec<UserDefinedType> {
18 let mut secondary_types = HashMap::new();
19 let mut result = vec![];
20 for udt in udts {
21 extract_secondary_udts(&mut secondary_types, &udt);
22 result.push(udt);
23 }
24 for (_k, v) in secondary_types {
25 result.push(v);
26 }
27 result
28}
29
30pub fn make_stream_info<Block>(stream: &EventStream<Block>) -> StreamInfo
35where
36 Block: TracingBlock,
37 <Block as TracingBlock>::Queue: micromegas_transit::HeterogeneousQueue,
38 <<Block as TracingBlock>::Queue as ExtractDeps>::DepsQueue:
39 micromegas_transit::HeterogeneousQueue,
40{
41 let dependencies_meta = flatten_metadata(
42 <<Block as TracingBlock>::Queue as ExtractDeps>::DepsQueue::reflect_contained(),
43 );
44 let obj_meta = flatten_metadata(<Block as TracingBlock>::Queue::reflect_contained());
45 StreamInfo {
46 process_id: stream.process_id().to_owned(),
47 stream_id: stream.stream_id().to_owned(),
48 dependencies_metadata: dependencies_meta,
49 objects_metadata: obj_meta,
50 tags: stream.tags().to_owned(),
51 properties: stream.properties().clone(),
52 }
53}