opentelemetry_proto/transform/
logs.rs1#[cfg(feature = "gen-tonic-messages")]
2pub mod tonic {
3 use crate::{
4 tonic::{
5 common::v1::{
6 any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
7 KeyValueList,
8 },
9 logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
10 resource::v1::Resource,
11 },
12 transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
13 };
14 use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
15 use opentelemetry_sdk::logs::LogBatch;
16 use std::borrow::Cow;
17 use std::collections::HashMap;
18
19 impl From<LogsAnyValue> for AnyValue {
20 fn from(value: LogsAnyValue) -> Self {
21 AnyValue {
22 value: Some(value.into()),
23 }
24 }
25 }
26
27 impl From<LogsAnyValue> for Value {
28 fn from(value: LogsAnyValue) -> Self {
29 match value {
30 LogsAnyValue::Double(f) => Value::DoubleValue(f),
31 LogsAnyValue::Int(i) => Value::IntValue(i),
32 LogsAnyValue::String(s) => Value::StringValue(s.into()),
33 LogsAnyValue::Boolean(b) => Value::BoolValue(b),
34 LogsAnyValue::ListAny(v) => Value::ArrayValue(ArrayValue {
35 values: v
36 .into_iter()
37 .map(|v| AnyValue {
38 value: Some(v.into()),
39 })
40 .collect(),
41 }),
42 LogsAnyValue::Map(m) => Value::KvlistValue(KeyValueList {
43 values: m
44 .into_iter()
45 .map(|(key, value)| KeyValue {
46 key: key.into(),
47 value: Some(AnyValue {
48 value: Some(value.into()),
49 }),
50 })
51 .collect(),
52 }),
53 LogsAnyValue::Bytes(v) => Value::BytesValue(*v),
54 _ => unreachable!("Nonexistent value type"),
55 }
56 }
57 }
58
59 impl From<&opentelemetry_sdk::logs::SdkLogRecord> for LogRecord {
60 fn from(log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> Self {
61 let trace_context = log_record.trace_context();
62 let severity_number = match log_record.severity_number() {
63 Some(Severity::Trace) => SeverityNumber::Trace,
64 Some(Severity::Trace2) => SeverityNumber::Trace2,
65 Some(Severity::Trace3) => SeverityNumber::Trace3,
66 Some(Severity::Trace4) => SeverityNumber::Trace4,
67 Some(Severity::Debug) => SeverityNumber::Debug,
68 Some(Severity::Debug2) => SeverityNumber::Debug2,
69 Some(Severity::Debug3) => SeverityNumber::Debug3,
70 Some(Severity::Debug4) => SeverityNumber::Debug4,
71 Some(Severity::Info) => SeverityNumber::Info,
72 Some(Severity::Info2) => SeverityNumber::Info2,
73 Some(Severity::Info3) => SeverityNumber::Info3,
74 Some(Severity::Info4) => SeverityNumber::Info4,
75 Some(Severity::Warn) => SeverityNumber::Warn,
76 Some(Severity::Warn2) => SeverityNumber::Warn2,
77 Some(Severity::Warn3) => SeverityNumber::Warn3,
78 Some(Severity::Warn4) => SeverityNumber::Warn4,
79 Some(Severity::Error) => SeverityNumber::Error,
80 Some(Severity::Error2) => SeverityNumber::Error2,
81 Some(Severity::Error3) => SeverityNumber::Error3,
82 Some(Severity::Error4) => SeverityNumber::Error4,
83 Some(Severity::Fatal) => SeverityNumber::Fatal,
84 Some(Severity::Fatal2) => SeverityNumber::Fatal2,
85 Some(Severity::Fatal3) => SeverityNumber::Fatal3,
86 Some(Severity::Fatal4) => SeverityNumber::Fatal4,
87 None => SeverityNumber::Unspecified,
88 };
89
90 LogRecord {
91 time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(),
92 observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()),
93 attributes: {
94 log_record
95 .attributes_iter()
96 .map(|kv| KeyValue {
97 key: kv.0.to_string(),
98 value: Some(AnyValue {
99 value: Some(kv.1.clone().into()),
100 }),
101 })
102 .collect()
103 },
104 event_name: log_record.event_name().unwrap_or_default().into(),
105 severity_number: severity_number.into(),
106 severity_text: log_record
107 .severity_text()
108 .map(Into::into)
109 .unwrap_or_default(),
110 body: log_record.body().cloned().map(Into::into),
111 dropped_attributes_count: 0,
112 flags: trace_context
113 .map(|ctx| {
114 ctx.trace_flags
115 .map(|flags| flags.to_u8() as u32)
116 .unwrap_or_default()
117 })
118 .unwrap_or_default(),
119 span_id: trace_context
120 .map(|ctx| ctx.span_id.to_bytes().to_vec())
121 .unwrap_or_default(),
122 trace_id: trace_context
123 .map(|ctx| ctx.trace_id.to_bytes().to_vec())
124 .unwrap_or_default(),
125 }
126 }
127 }
128
129 impl
130 From<(
131 (
132 &opentelemetry_sdk::logs::SdkLogRecord,
133 &opentelemetry::InstrumentationScope,
134 ),
135 &ResourceAttributesWithSchema,
136 )> for ResourceLogs
137 {
138 fn from(
139 data: (
140 (
141 &opentelemetry_sdk::logs::SdkLogRecord,
142 &opentelemetry::InstrumentationScope,
143 ),
144 &ResourceAttributesWithSchema,
145 ),
146 ) -> Self {
147 let ((log_record, instrumentation), resource) = data;
148
149 ResourceLogs {
150 resource: Some(Resource {
151 attributes: resource.attributes.0.clone(),
152 dropped_attributes_count: 0,
153 }),
154 schema_url: resource.schema_url.clone().unwrap_or_default(),
155 scope_logs: vec![ScopeLogs {
156 schema_url: instrumentation
157 .schema_url()
158 .map(ToOwned::to_owned)
159 .unwrap_or_default(),
160 scope: Some((instrumentation, log_record.target().cloned()).into()),
161 log_records: vec![log_record.into()],
162 }],
163 }
164 }
165 }
166
167 pub fn group_logs_by_resource_and_scope(
168 logs: LogBatch<'_>,
169 resource: &ResourceAttributesWithSchema,
170 ) -> Vec<ResourceLogs> {
171 let scope_map = logs.iter().fold(
173 HashMap::new(),
174 |mut scope_map: HashMap<
175 Cow<'static, str>,
176 Vec<(
177 &opentelemetry_sdk::logs::SdkLogRecord,
178 &opentelemetry::InstrumentationScope,
179 )>,
180 >,
181 (log_record, instrumentation)| {
182 let key = log_record
183 .target()
184 .cloned()
185 .unwrap_or_else(|| Cow::Owned(instrumentation.name().to_owned()));
186 scope_map
187 .entry(key)
188 .or_default()
189 .push((log_record, instrumentation));
190 scope_map
191 },
192 );
193
194 let scope_logs = scope_map
195 .into_iter()
196 .map(|(key, log_data)| ScopeLogs {
197 scope: Some(InstrumentationScope::from((
198 log_data.first().unwrap().1,
199 Some(key.into_owned().into()),
200 ))),
201 schema_url: resource.schema_url.clone().unwrap_or_default(),
202 log_records: log_data
203 .into_iter()
204 .map(|(log_record, _)| log_record.into())
205 .collect(),
206 })
207 .collect();
208
209 vec![ResourceLogs {
210 resource: Some(Resource {
211 attributes: resource.attributes.0.clone(),
212 dropped_attributes_count: 0,
213 }),
214 scope_logs,
215 schema_url: resource.schema_url.clone().unwrap_or_default(),
216 }]
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use crate::transform::common::tonic::ResourceAttributesWithSchema;
223 use opentelemetry::logs::LogRecord as _;
224 use opentelemetry::logs::Logger;
225 use opentelemetry::logs::LoggerProvider;
226 use opentelemetry::time::now;
227 use opentelemetry::InstrumentationScope;
228 use opentelemetry_sdk::error::OTelSdkResult;
229 use opentelemetry_sdk::logs::LogProcessor;
230 use opentelemetry_sdk::logs::SdkLoggerProvider;
231 use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource};
232
233 #[derive(Debug)]
234 struct MockProcessor;
235
236 impl LogProcessor for MockProcessor {
237 fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {}
238
239 fn force_flush(&self) -> OTelSdkResult {
240 Ok(())
241 }
242
243 fn shutdown(&self) -> OTelSdkResult {
244 Ok(())
245 }
246 }
247
248 fn create_test_log_data(
249 instrumentation_name: &str,
250 _message: &str,
251 ) -> (SdkLogRecord, InstrumentationScope) {
252 let processor = MockProcessor {};
253 let logger = SdkLoggerProvider::builder()
254 .with_log_processor(processor)
255 .build()
256 .logger("test");
257 let mut logrecord = logger.create_log_record();
258 logrecord.set_timestamp(now());
259 logrecord.set_observed_timestamp(now());
260 let instrumentation =
261 InstrumentationScope::builder(instrumentation_name.to_string()).build();
262 (logrecord, instrumentation)
263 }
264
265 #[test]
266 fn test_group_logs_by_resource_and_scope_single_scope() {
267 let resource = Resource::builder().build();
268 let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1");
269 let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2");
270
271 let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
272 let log_batch = LogBatch::new(&logs);
273 let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_logs =
276 crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
277
278 assert_eq!(grouped_logs.len(), 1);
279 let resource_logs = &grouped_logs[0];
280 assert_eq!(resource_logs.scope_logs.len(), 1);
281
282 let scope_logs = &resource_logs.scope_logs[0];
283 assert_eq!(scope_logs.log_records.len(), 2);
284 }
285
286 #[test]
287 fn test_group_logs_by_resource_and_scope_multiple_scopes() {
288 let resource = Resource::builder().build();
289 let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1");
290 let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2");
291
292 let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
293 let log_batch = LogBatch::new(&logs);
294 let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_logs =
296 crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
297
298 assert_eq!(grouped_logs.len(), 1);
299 let resource_logs = &grouped_logs[0];
300 assert_eq!(resource_logs.scope_logs.len(), 2);
301
302 let scope_logs_1 = &resource_logs
303 .scope_logs
304 .iter()
305 .find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
306 .unwrap();
307 let scope_logs_2 = &resource_logs
308 .scope_logs
309 .iter()
310 .find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
311 .unwrap();
312
313 assert_eq!(scope_logs_1.log_records.len(), 1);
314 assert_eq!(scope_logs_2.log_records.len(), 1);
315 }
316}