opentelemetry_proto/transform/
logs.rs

1#[cfg(feature = "gen-tonic-messages")]
2pub mod tonic {
3    use crate::{
4        tonic::{
5            common::v1::{
6                any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
7                KeyValueList,
8            },
9            logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
10            resource::v1::Resource,
11        },
12        transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
13    };
14    use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
15    use opentelemetry_sdk::logs::LogBatch;
16    use std::borrow::Cow;
17    use std::collections::HashMap;
18
19    impl From<LogsAnyValue> for AnyValue {
20        fn from(value: LogsAnyValue) -> Self {
21            AnyValue {
22                value: Some(value.into()),
23            }
24        }
25    }
26
27    impl From<LogsAnyValue> for Value {
28        fn from(value: LogsAnyValue) -> Self {
29            match value {
30                LogsAnyValue::Double(f) => Value::DoubleValue(f),
31                LogsAnyValue::Int(i) => Value::IntValue(i),
32                LogsAnyValue::String(s) => Value::StringValue(s.into()),
33                LogsAnyValue::Boolean(b) => Value::BoolValue(b),
34                LogsAnyValue::ListAny(v) => Value::ArrayValue(ArrayValue {
35                    values: v
36                        .into_iter()
37                        .map(|v| AnyValue {
38                            value: Some(v.into()),
39                        })
40                        .collect(),
41                }),
42                LogsAnyValue::Map(m) => Value::KvlistValue(KeyValueList {
43                    values: m
44                        .into_iter()
45                        .map(|(key, value)| KeyValue {
46                            key: key.into(),
47                            value: Some(AnyValue {
48                                value: Some(value.into()),
49                            }),
50                        })
51                        .collect(),
52                }),
53                LogsAnyValue::Bytes(v) => Value::BytesValue(*v),
54                _ => unreachable!("Nonexistent value type"),
55            }
56        }
57    }
58
59    impl From<&opentelemetry_sdk::logs::SdkLogRecord> for LogRecord {
60        fn from(log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> Self {
61            let trace_context = log_record.trace_context();
62            let severity_number = match log_record.severity_number() {
63                Some(Severity::Trace) => SeverityNumber::Trace,
64                Some(Severity::Trace2) => SeverityNumber::Trace2,
65                Some(Severity::Trace3) => SeverityNumber::Trace3,
66                Some(Severity::Trace4) => SeverityNumber::Trace4,
67                Some(Severity::Debug) => SeverityNumber::Debug,
68                Some(Severity::Debug2) => SeverityNumber::Debug2,
69                Some(Severity::Debug3) => SeverityNumber::Debug3,
70                Some(Severity::Debug4) => SeverityNumber::Debug4,
71                Some(Severity::Info) => SeverityNumber::Info,
72                Some(Severity::Info2) => SeverityNumber::Info2,
73                Some(Severity::Info3) => SeverityNumber::Info3,
74                Some(Severity::Info4) => SeverityNumber::Info4,
75                Some(Severity::Warn) => SeverityNumber::Warn,
76                Some(Severity::Warn2) => SeverityNumber::Warn2,
77                Some(Severity::Warn3) => SeverityNumber::Warn3,
78                Some(Severity::Warn4) => SeverityNumber::Warn4,
79                Some(Severity::Error) => SeverityNumber::Error,
80                Some(Severity::Error2) => SeverityNumber::Error2,
81                Some(Severity::Error3) => SeverityNumber::Error3,
82                Some(Severity::Error4) => SeverityNumber::Error4,
83                Some(Severity::Fatal) => SeverityNumber::Fatal,
84                Some(Severity::Fatal2) => SeverityNumber::Fatal2,
85                Some(Severity::Fatal3) => SeverityNumber::Fatal3,
86                Some(Severity::Fatal4) => SeverityNumber::Fatal4,
87                None => SeverityNumber::Unspecified,
88            };
89
90            LogRecord {
91                time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(),
92                observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()),
93                attributes: {
94                    log_record
95                        .attributes_iter()
96                        .map(|kv| KeyValue {
97                            key: kv.0.to_string(),
98                            value: Some(AnyValue {
99                                value: Some(kv.1.clone().into()),
100                            }),
101                        })
102                        .collect()
103                },
104                event_name: log_record.event_name().unwrap_or_default().into(),
105                severity_number: severity_number.into(),
106                severity_text: log_record
107                    .severity_text()
108                    .map(Into::into)
109                    .unwrap_or_default(),
110                body: log_record.body().cloned().map(Into::into),
111                dropped_attributes_count: 0,
112                flags: trace_context
113                    .map(|ctx| {
114                        ctx.trace_flags
115                            .map(|flags| flags.to_u8() as u32)
116                            .unwrap_or_default()
117                    })
118                    .unwrap_or_default(),
119                span_id: trace_context
120                    .map(|ctx| ctx.span_id.to_bytes().to_vec())
121                    .unwrap_or_default(),
122                trace_id: trace_context
123                    .map(|ctx| ctx.trace_id.to_bytes().to_vec())
124                    .unwrap_or_default(),
125            }
126        }
127    }
128
129    impl
130        From<(
131            (
132                &opentelemetry_sdk::logs::SdkLogRecord,
133                &opentelemetry::InstrumentationScope,
134            ),
135            &ResourceAttributesWithSchema,
136        )> for ResourceLogs
137    {
138        fn from(
139            data: (
140                (
141                    &opentelemetry_sdk::logs::SdkLogRecord,
142                    &opentelemetry::InstrumentationScope,
143                ),
144                &ResourceAttributesWithSchema,
145            ),
146        ) -> Self {
147            let ((log_record, instrumentation), resource) = data;
148
149            ResourceLogs {
150                resource: Some(Resource {
151                    attributes: resource.attributes.0.clone(),
152                    dropped_attributes_count: 0,
153                }),
154                schema_url: resource.schema_url.clone().unwrap_or_default(),
155                scope_logs: vec![ScopeLogs {
156                    schema_url: instrumentation
157                        .schema_url()
158                        .map(ToOwned::to_owned)
159                        .unwrap_or_default(),
160                    scope: Some((instrumentation, log_record.target().cloned()).into()),
161                    log_records: vec![log_record.into()],
162                }],
163            }
164        }
165    }
166
167    pub fn group_logs_by_resource_and_scope(
168        logs: LogBatch<'_>,
169        resource: &ResourceAttributesWithSchema,
170    ) -> Vec<ResourceLogs> {
171        // Group logs by target or instrumentation name
172        let scope_map = logs.iter().fold(
173            HashMap::new(),
174            |mut scope_map: HashMap<
175                Cow<'static, str>,
176                Vec<(
177                    &opentelemetry_sdk::logs::SdkLogRecord,
178                    &opentelemetry::InstrumentationScope,
179                )>,
180            >,
181             (log_record, instrumentation)| {
182                let key = log_record
183                    .target()
184                    .cloned()
185                    .unwrap_or_else(|| Cow::Owned(instrumentation.name().to_owned()));
186                scope_map
187                    .entry(key)
188                    .or_default()
189                    .push((log_record, instrumentation));
190                scope_map
191            },
192        );
193
194        let scope_logs = scope_map
195            .into_iter()
196            .map(|(key, log_data)| ScopeLogs {
197                scope: Some(InstrumentationScope::from((
198                    log_data.first().unwrap().1,
199                    Some(key.into_owned().into()),
200                ))),
201                schema_url: resource.schema_url.clone().unwrap_or_default(),
202                log_records: log_data
203                    .into_iter()
204                    .map(|(log_record, _)| log_record.into())
205                    .collect(),
206            })
207            .collect();
208
209        vec![ResourceLogs {
210            resource: Some(Resource {
211                attributes: resource.attributes.0.clone(),
212                dropped_attributes_count: 0,
213            }),
214            scope_logs,
215            schema_url: resource.schema_url.clone().unwrap_or_default(),
216        }]
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use crate::transform::common::tonic::ResourceAttributesWithSchema;
223    use opentelemetry::logs::LogRecord as _;
224    use opentelemetry::logs::Logger;
225    use opentelemetry::logs::LoggerProvider;
226    use opentelemetry::time::now;
227    use opentelemetry::InstrumentationScope;
228    use opentelemetry_sdk::error::OTelSdkResult;
229    use opentelemetry_sdk::logs::LogProcessor;
230    use opentelemetry_sdk::logs::SdkLoggerProvider;
231    use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource};
232
233    #[derive(Debug)]
234    struct MockProcessor;
235
236    impl LogProcessor for MockProcessor {
237        fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {}
238
239        fn force_flush(&self) -> OTelSdkResult {
240            Ok(())
241        }
242
243        fn shutdown(&self) -> OTelSdkResult {
244            Ok(())
245        }
246    }
247
248    fn create_test_log_data(
249        instrumentation_name: &str,
250        _message: &str,
251    ) -> (SdkLogRecord, InstrumentationScope) {
252        let processor = MockProcessor {};
253        let logger = SdkLoggerProvider::builder()
254            .with_log_processor(processor)
255            .build()
256            .logger("test");
257        let mut logrecord = logger.create_log_record();
258        logrecord.set_timestamp(now());
259        logrecord.set_observed_timestamp(now());
260        let instrumentation =
261            InstrumentationScope::builder(instrumentation_name.to_string()).build();
262        (logrecord, instrumentation)
263    }
264
265    #[test]
266    fn test_group_logs_by_resource_and_scope_single_scope() {
267        let resource = Resource::builder().build();
268        let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1");
269        let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2");
270
271        let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
272        let log_batch = LogBatch::new(&logs);
273        let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
274
275        let grouped_logs =
276            crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
277
278        assert_eq!(grouped_logs.len(), 1);
279        let resource_logs = &grouped_logs[0];
280        assert_eq!(resource_logs.scope_logs.len(), 1);
281
282        let scope_logs = &resource_logs.scope_logs[0];
283        assert_eq!(scope_logs.log_records.len(), 2);
284    }
285
286    #[test]
287    fn test_group_logs_by_resource_and_scope_multiple_scopes() {
288        let resource = Resource::builder().build();
289        let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1");
290        let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2");
291
292        let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
293        let log_batch = LogBatch::new(&logs);
294        let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
295        let grouped_logs =
296            crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
297
298        assert_eq!(grouped_logs.len(), 1);
299        let resource_logs = &grouped_logs[0];
300        assert_eq!(resource_logs.scope_logs.len(), 2);
301
302        let scope_logs_1 = &resource_logs
303            .scope_logs
304            .iter()
305            .find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
306            .unwrap();
307        let scope_logs_2 = &resource_logs
308            .scope_logs
309            .iter()
310            .find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
311            .unwrap();
312
313        assert_eq!(scope_logs_1.log_records.len(), 1);
314        assert_eq!(scope_logs_2.log_records.len(), 1);
315    }
316}