test_span/
report.rs

1use ::daggy::{Dag, NodeIndex};
2use ::serde::{Deserialize, Serialize};
3use daggy::petgraph::graph::DefaultIx;
4use daggy::Walker;
5use indexmap::IndexMap;
6use linked_hash_map::LinkedHashMap;
7use once_cell::sync::Lazy;
8use std::collections::{HashMap, HashSet};
9use tracing::Level;
10
11use crate::attribute::OwnedMetadata;
12use crate::layer::{ALL_LOGS, ALL_SPANS, SPAN_ID_TO_ROOT_AND_NODE_INDEX};
13use crate::log::LogsRecorder;
14use crate::record::{Record, RecordValue, RecordWithMetadata, Recorder};
15use crate::LazyMutex;
16
17pub(crate) static ALL_DAGS: LazyMutex<IndexMap<u64, Dag<u64, ()>>> = Lazy::new(Default::default);
18
19#[derive(Debug)]
20pub struct Filter {
21    default_level: Level,
22    targets: HashMap<String, Level>,
23}
24
25impl Filter {
26    pub fn new(default_level: Level) -> Self {
27        Self {
28            default_level,
29            targets: Default::default(),
30        }
31    }
32
33    pub fn with_target(self, key: String, value: Level) -> Self {
34        let mut targets = self.targets;
35        targets.insert(key, value);
36        Self { targets, ..self }
37    }
38
39    pub fn is_enabled(&self, metadata: &OwnedMetadata) -> bool {
40        let mut for_target = self
41            .targets
42            .iter()
43            .filter(|(key, _)| metadata.target.starts_with(key.as_str()))
44            .collect::<Vec<_>>();
45        for_target.sort_by(|(a, _), (b, _)| b.len().cmp(&a.len()));
46
47        for_target
48            .first()
49            .map(|(_, level)| level)
50            .unwrap_or(&&self.default_level)
51            .ge(&&metadata
52                .level
53                .parse::<Level>()
54                .expect("metadata level is invalid"))
55    }
56}
57
58/// A tree which is effectively a Tree containing all the spans
59///
60/// It can't do much yet, except being Serialized, which comes in handy for snapshots.
61#[derive(Debug, Serialize, PartialEq, Eq)]
62pub struct Span {
63    // the span id
64    #[serde(skip_serializing)]
65    id: u64,
66    // the function name
67    name: String,
68    // the recorded variables and logs
69    record: RecordWithMetadata,
70    // the node's children
71    children: LinkedHashMap<ChildKey, Span>,
72}
73
74#[derive(Default, Debug, Hash, PartialEq, Eq)]
75struct ChildKey(String, usize);
76
77impl Serialize for ChildKey {
78    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
79    where
80        S: serde::Serializer,
81    {
82        serializer.serialize_str(&self.0)
83    }
84}
85
86impl Span {
87    // Create a span from a name, a span_id, and recorded variables
88    pub fn from(name: String, id: u64, record: RecordWithMetadata) -> Self {
89        Self {
90            name,
91            id,
92            record,
93            children: Default::default(),
94        }
95    }
96}
97pub struct Report {
98    root_index: NodeIndex,
99    root_id: u64,
100    dag: Dag<u64, (), DefaultIx>,
101    spans: IndexMap<u64, Recorder>,
102    logs: LogsRecorder,
103    node_to_id: IndexMap<NodeIndex, u64>,
104}
105
106impl Report {
107    pub fn from_root(root_node: u64) -> Self {
108        let id_to_node = SPAN_ID_TO_ROOT_AND_NODE_INDEX.lock().unwrap().clone();
109        let (global_root, root_node_index) = id_to_node
110            .get(&root_node)
111            .copied()
112            .expect("couldn't find rood node");
113
114        let node_to_id: IndexMap<NodeIndex, u64> = id_to_node
115            .into_iter()
116            .filter(|&(_key, (root, _value))| root == global_root)
117            .map(|(key, (_root, value))| (value, key))
118            .collect();
119
120        let relevant_spans = node_to_id.values().cloned().collect::<HashSet<_>>();
121        let spans = ALL_SPANS
122            .lock()
123            .unwrap()
124            .clone()
125            .into_iter()
126            .filter(|(span_id, _)| relevant_spans.contains(span_id))
127            .collect();
128        let logs = ALL_LOGS.lock().unwrap().for_spans(relevant_spans);
129
130        let dag = ALL_DAGS
131            .lock()
132            .unwrap()
133            .get(&global_root)
134            .expect("no dag for root")
135            .clone();
136
137        Self {
138            root_index: root_node_index,
139            root_id: root_node,
140            dag,
141            spans,
142            node_to_id,
143            logs,
144        }
145    }
146
147    pub fn logs(&self, filter: &Filter) -> Records {
148        if let Some(recorder) = self.spans.get(&self.root_id) {
149            let mut contents = recorder.contents(filter);
150            contents.append(
151                self.logs
152                    .record_for_span_id_and_filter(self.root_id, filter),
153            );
154
155            let mut records: Vec<_> = contents.entries().cloned().collect();
156
157            self.dfs_logs_insert(&mut records, self.root_index, filter);
158
159            Records::new(records)
160        } else {
161            Default::default()
162        }
163    }
164
165    pub fn spans(&self, filter: &Filter) -> Span {
166        if let Some(recorder) = self.spans.get(&self.root_id) {
167            let metadata = recorder
168                .metadata()
169                .expect("recorder without metadata");
170            let span_name = format!("{}::{}", metadata.target, metadata.name);
171
172            let mut root_span = Span::from(span_name, self.root_id, recorder.contents(filter));
173
174            self.dfs_span_insert(&mut root_span, self.root_index, filter);
175
176            root_span
177        } else {
178            Span::from("root".to_string(), 0, RecordWithMetadata::for_root())
179        }
180    }
181
182    fn dfs_logs_insert(&self, records: &mut Vec<Record>, current_node: NodeIndex, filter: &Filter) {
183        for child_node in self.sorted_children(current_node) {
184            let child_id = self
185                .node_to_id
186                .get(&child_node)
187                .expect("couldn't find span id for node");
188
189            let mut child_record = self
190                .spans
191                .get(child_id)
192                .expect("graph and hashmap are tied; qed")
193                .contents(filter);
194
195            child_record.append(self.logs.record_for_span_id_and_filter(*child_id, filter));
196            records.extend(child_record.entries().cloned());
197            self.dfs_logs_insert(records, child_node, filter);
198        }
199    }
200
201    fn dfs_span_insert(&self, current_span: &mut Span, current_node: NodeIndex, filter: &Filter) {
202        current_span.children = self
203            .sorted_children(current_node)
204            .flat_map(|child_node| {
205                let child_id = self
206                    .node_to_id
207                    .get(&child_node)
208                    .expect("couldn't find span id for node");
209                let child_recorder = self
210                    .spans
211                    .get(child_id)
212                    .expect("graph and hashmap are tied; qed");
213
214                let metadata = child_recorder
215                    .metadata()
216                    .expect("couldn't find metadata for child record");
217
218                let span_name = format!("{}::{}", metadata.target, metadata.name);
219                let mut contents = child_recorder.contents(filter);
220                contents.append(self.logs.record_for_span_id_and_filter(*child_id, filter));
221
222                if !filter.is_enabled(metadata) {
223                    // We continue to fetch children spans with an enabled filter
224                    let mut child_span = Span::from(span_name, *child_id, contents);
225                    self.dfs_span_insert(&mut child_span, child_node, filter);
226
227                    child_span
228                        .children
229                        .into_iter()
230                        .collect::<Vec<(ChildKey, Span)>>()
231                } else {
232                    let mut child_span = Span::from(span_name.clone(), *child_id, contents);
233                    self.dfs_span_insert(&mut child_span, child_node, filter);
234
235                    vec![(ChildKey(span_name, child_node.index()), child_span)]
236                }
237            })
238            .collect();
239    }
240
241    fn sorted_children(&self, node: NodeIndex) -> impl Iterator<Item = NodeIndex> {
242        let mut children = self
243            .dag
244            .children(node)
245            .iter(&self.dag)
246            .map(|(_, node)| node)
247            .collect::<Vec<_>>();
248        children.sort();
249
250        children.into_iter()
251    }
252}
253
254/// A Vec of log entries.
255#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
256pub struct Records(Vec<Record>);
257
258impl Records {
259    /// Create a Records from log entries
260    pub fn new(records: Vec<Record>) -> Self {
261        Self(records)
262    }
263
264    /// check if log message has been stored with the given payload.
265    pub fn contains_message(&self, lookup: impl AsRef<str>) -> bool {
266        self.contains_value("message", RecordValue::Debug(lookup.as_ref().to_string()))
267    }
268
269    /// check if log entry (this can be span attributes or log messages) has been stored with the given payload.
270    pub fn contains_value(&self, field_name: impl AsRef<str>, lookup: RecordValue) -> bool {
271        self.0
272            .iter()
273            .any(|(field, value)| field.as_str() == field_name.as_ref() && value == &lookup)
274    }
275}