1use base64::Engine;
93use chrono::{DateTime, Duration, Utc};
94use serde::{Deserialize, Serialize, Serializer};
95use std::any::TypeId;
96use std::borrow::Cow;
97use std::collections::HashMap;
98use std::fmt::{Debug, Display};
99use std::str::FromStr;
100use std::sync::{Arc, LazyLock, Mutex, RwLock};
101use tracing::field::Field;
102use tracing::span::{Attributes, Record};
103use tracing::{Event, Id, Level, Subscriber};
104use tracing_subscriber::layer::Context;
105use tracing_subscriber::registry::LookupSpan;
106use tracing_subscriber::{field, Layer as TracingLayer};
107
108#[cfg(feature = "opentelemetry")]
109use {
110 opentelemetry::TraceId,
111 tracing_opentelemetry::OtelData
112};
113
114#[derive(Serialize, Default)]
134#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
135pub enum Severity {
136 #[default] Default,
137 Debug,
138 Info,
139 Notice,
140 Warning,
141 Error,
142 Critical,
143 Alert,
144 Emergency,
145}
146
147impl Display for Severity {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.write_str(match self {
150 Severity::Default => "DEFAULT",
151 Severity::Debug => "DEBUG",
152 Severity::Info => "INFO",
153 Severity::Notice => "NOTICE",
154 Severity::Warning => "WARNING",
155 Severity::Error => "ERROR",
156 Severity::Critical => "CRITICAL",
157 Severity::Alert => "ALERT",
158 Severity::Emergency => "EMERGENCY",
159 })
160 }
161}
162
163#[derive(Debug)]
171pub struct InvalidSeverity(Box<str>);
172
173impl Display for InvalidSeverity {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 write!(f, "invalid severity: {:?}", self.0)
176 }
177}
178
179impl std::error::Error for InvalidSeverity {}
180
181impl FromStr for Severity {
182 type Err = InvalidSeverity;
183
184 fn from_str(s: &str) -> Result<Self, Self::Err> {
185 match s {
186 "DEFAULT" => Ok(Severity::Default),
187 "DEBUG" => Ok(Severity::Debug),
188 "INFO" => Ok(Severity::Info),
189 "NOTICE" => Ok(Severity::Notice),
190 "WARNING" => Ok(Severity::Warning),
191 "ERROR" => Ok(Severity::Error),
192 "CRITICAL" => Ok(Severity::Critical),
193 "ALERT" => Ok(Severity::Alert),
194 "EMERGENCY" => Ok(Severity::Emergency),
195 other => Err(InvalidSeverity(other.into())),
196 }
197 }
198}
199
200#[derive(Serialize, Clone)]
203#[serde(rename_all = "camelCase")]
204struct SpanExposition {
205 #[serde(rename = "@trace", skip_serializing_if = "Option::is_none")]
206 trace_id: Option<Box<str>>,
207 #[serde(rename = "@id")]
208 id: Box<str>,
209 #[serde(flatten)]
210 fields: HashMap<Cow<'static, str>, serde_json::Value>
211}
212
213#[derive(Serialize, Default)]
214#[serde(rename_all = "camelCase")]
215struct LogEntry<'a> {
216 #[serde(skip_serializing_if = "Option::is_none")]
217 severity: Option<Severity>,
218 #[serde(skip_serializing_if = "Option::is_none")]
219 message: Option<Box<str>>,
220 #[serde(skip_serializing_if = "Option::is_none")]
221 http_request: Option<HttpRequestInfo>,
222 #[serde(skip_serializing_if = "Option::is_none")]
223 time: Option<DateTime<Utc>>,
224 #[serde(skip_serializing_if = "Option::is_none", rename = "logging.googleapis.com/insertId")]
225 insert_id: Option<&'a str>,
226 #[serde(skip_serializing_if = "Option::is_none", rename = "logging.googleapis.com/labels")]
227 labels: Option<HashMap<Cow<'static, str>, serde_json::Value>>,
228 #[serde(skip_serializing_if = "Option::is_none", rename = "logging.googleapis.com/operation")]
229 operation: Option<OperationDetail>,
230 #[serde(skip_serializing_if = "Option::is_none", rename = "logging.googleapis.com/sourceLocation")]
231 source_location: Option<SourceLocation<'a>>,
232 #[serde(skip_serializing_if = "Option::is_none", rename = "logging.googleapis.com/spanId")]
233 span_id: Option<Box<str>>,
234 #[serde(skip_serializing_if = "Option::is_none", rename = "logging.googleapis.com/trace")]
235 trace: Option<Box<str>>,
236 #[serde(skip_serializing_if = "Option::is_none", rename = "logging.googleapis.com/trace_sampled")]
237 trace_sampled: Option<bool>,
238
239 #[serde(skip_serializing_if = "Vec::is_empty", rename = "@spans")]
240 x_spans: Vec<Arc<SpanExposition>>,
241 #[serde(skip_serializing_if = "HashMap::is_empty", rename = "@effective_fields")]
242 x_fields: HashMap<Cow<'static, str>, serde_json::Value>,
243
244 #[serde(flatten)]
245 extra: HashMap<Cow<'static, str>, serde_json::Value>,
246}
247
248fn duration_serializer<S: Serializer>(dur: &Option<Duration>, ser: S) -> Result<S::Ok, S::Error> {
249 ser.serialize_str(&format!("{:.5}s", dur.unwrap_or_else(|| unreachable!("this should never be serialized if None")).as_seconds_f32()))
250}
251
252#[derive(Serialize, Default, Clone)]
253#[serde(rename_all = "camelCase")]
254struct HttpRequestInfo {
255 #[serde(skip_serializing_if = "Option::is_none")]
256 request_method: Option<Box<str>>,
257 #[serde(skip_serializing_if = "Option::is_none")]
258 request_url: Option<Box<str>>,
259 #[serde(skip_serializing_if = "Option::is_none")]
260 request_size: Option<u64>,
261 #[serde(skip_serializing_if = "Option::is_none")]
262 status: Option<u16>,
263 #[serde(skip_serializing_if = "Option::is_none")]
264 response_size: Option<u64>,
265 #[serde(skip_serializing_if = "Option::is_none")]
266 user_agent: Option<Box<str>>,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 remote_ip: Option<Box<str>>,
269 #[serde(skip_serializing_if = "Option::is_none")]
270 server_ip: Option<Box<str>>,
271 #[serde(skip_serializing_if = "Option::is_none")]
272 referer: Option<Box<str>>,
273 #[serde(skip_serializing_if = "Option::is_none", serialize_with = "duration_serializer")]
274 latency: Option<Duration>,
275 #[serde(skip_serializing_if = "Option::is_none")]
276 cache_lookup: Option<bool>,
277 #[serde(skip_serializing_if = "Option::is_none")]
278 cache_hit: Option<bool>,
279 #[serde(skip_serializing_if = "Option::is_none")]
280 cache_validated_with_origin_server: Option<bool>,
281 #[serde(skip_serializing_if = "Option::is_none")]
282 cache_fill_bytes: Option<u64>,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 protocol: Option<Box<str>>,
285}
286
287#[derive(Deserialize, Serialize, Default, Clone)]
288#[serde(rename_all = "camelCase")]
289#[non_exhaustive]
290pub struct OperationInfo {
291 #[serde(skip_serializing_if = "Option::is_none")]
292 pub id: Option<Box<str>>,
293 #[serde(skip_serializing_if = "Option::is_none")]
294 pub producer: Option<Box<str>>,
295}
296
297impl OperationInfo {
298 pub fn new(id: impl AsRef<str>, producer: Option<impl AsRef<str>>) -> Self {
299 let mut v = Self::default();
300 v.id = Some(id.as_ref().into());
301 v.producer = producer.map(|v| v.as_ref().into());
302 v
303 }
304}
305
306#[derive(Serialize, Default)]
307#[serde(rename_all = "camelCase")]
308struct OperationDetail {
309 #[serde(skip_serializing_if = "Option::is_none", flatten)]
310 pub info: Option<OperationInfo>,
311 #[serde(skip_serializing_if = "Option::is_none")]
312 pub first: Option<bool>,
313 #[serde(skip_serializing_if = "Option::is_none")]
314 pub last: Option<bool>,
315}
316
317#[derive(Serialize, Default)]
318#[serde(rename_all = "camelCase")]
319struct SourceLocation<'a> {
320 #[serde(skip_serializing_if = "Option::is_none")]
321 pub file: Option<&'a str>,
322 #[serde(skip_serializing_if = "Option::is_none")]
323 pub line: Option<u64>,
324 #[serde(skip_serializing_if = "Option::is_none")]
325 pub function: Option<Box<str>>,
326}
327
328#[derive(Clone)]
329struct OperationData {
330 info: OperationInfo,
331 first: bool,
332 last: bool,
333}
334
335impl OperationData {
336 pub fn new(info: OperationInfo) -> Self {
337 Self {
338 info,
339 first: true,
340 last: false,
341 }
342 }
343}
344
345pub struct Layer<W: std::io::Write> {
346 writer: Mutex<W>,
347 #[cfg_attr(not(feature = "opentelemetry"), allow(unused))]
348 project_id: Box<str>,
349 operations: Mutex<HashMap<Id, Arc<Mutex<OperationData>>>>,
350 expositions: RwLock<HashMap<Id, Arc<SpanExposition>>>
351}
352
353impl<W: std::io::Write> Layer<W> {
354 fn new(writer: W, project_id: impl AsRef<str>) -> Self {
355 Self {
356 writer: writer.into(),
357 project_id: project_id.as_ref().into(),
358 operations: HashMap::new().into(),
359 expositions: HashMap::new().into()
360 }
361 }
362}
363
364impl<W: std::io::Write + 'static, S: Subscriber + for<'a> LookupSpan<'a>> TracingLayer<S> for Layer<W> {
365 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, #[allow(unused)] ctx: Context<'_, S>) {
366 if let Some(mut expositions) = self.expositions.write().ok() {
367 let span_id = format!("{:016x}", id.into_u64());
368
369 let mut exposition = SpanExposition {
370 trace_id: None,
371 id: span_id.into(),
372 fields: HashMap::new()
373 };
374
375 #[cfg(feature = "opentelemetry")]
376 self.try_find_trace_id(id, ctx, &mut exposition);
377 attrs.record(&mut exposition.visit());
378
379 expositions.insert(id.clone(), Arc::new(exposition));
380 }
381 }
382
383 fn on_record(&self, id: &Id, values: &Record<'_>, #[allow(unused)] ctx: Context<'_, S>) {
384 if let Some(mut expositions) = self.expositions.write().ok() {
385 let arc = expositions.get_mut(id).unwrap();
386 let mut exposition = (**arc).clone();
387
388 if exposition.trace_id.is_none() {
389 #[cfg(feature = "opentelemetry")]
390 self.try_find_trace_id(id, ctx, &mut exposition);
391 }
392
393 values.record(&mut exposition.visit());
394 expositions.insert(id.clone(), Arc::new(exposition));
395 }
396 }
397
398 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
399 let mut log = LogEntry {
400 severity: match *event.metadata().level() {
401 Level::TRACE | Level::DEBUG => Some(Severity::Debug),
402 Level::INFO => Some(Severity::Info),
403 Level::WARN => Some(Severity::Warning),
404 Level::ERROR => Some(Severity::Error),
405 },
406 time: Some(Utc::now()),
407 source_location: Some(SourceLocation {
408 file: event.metadata().file(),
409 line: event.metadata().line().map(|v| v as u64),
410 function: None
411 }),
412 ..LogEntry::default()
413 };
414
415 let mut span_ref = ctx.event_span(event);
416
417 let mut ops = self.operations.lock().ok();
418 let expositions = self.expositions.read().ok();
419 let mut looking_for_function = true;
420
421 while let Some(span) = span_ref {
422 let drop_ops = if let Some(op) = ops.as_ref().and_then(|v| v.get(&span.id())) && log.operation.is_none() {
423 if let Ok(mut op_m) = op.lock() {
424 log.operation = Some(OperationDetail {
425 info: Some(op_m.info.clone()),
426 first: if op_m.first {
427 op_m.first = false;
428 Some(true)
429 } else { None },
430 last: if op_m.last {
431 op_m.last = false;
432 Some(true)
433 } else { None },
434 });
435
436 true
437 } else {
438 false
439 }
440 } else {
441 false
442 };
443
444 if drop_ops {
445 ops.take();
447 }
448
449 let span_id = format!("{:016x}", span.id().into_u64());
450 if log.span_id.is_none() {
451 log.span_id = Some(span_id.clone().into())
452 }
453
454 #[cfg(feature = "opentelemetry")]
455 if let Some(otel) = span.extensions().get::<OtelData>() {
456 if let Some(trace) = otel.trace_id() {
457 if log.trace.is_none() && trace != TraceId::INVALID {
458 log.trace = Some(format!("projects/{}/traces/{:032x}", self.project_id, trace).into());
459 }
460 }
461 }
462
463 if let Some(exposition) = expositions.as_ref().and_then(|v| v.get(&span.id())).cloned() {
464 for (k, v) in &exposition.fields {
465 if !log.x_fields.contains_key(k) {
466 log.x_fields.insert(k.clone(), v.clone());
467 }
468 }
469
470 if looking_for_function {
471 if let Some(serde_json::Value::String(s)) = exposition.fields.get("function") {
472 if let Some(src) = log.source_location.as_mut() && src.function.is_none() {
473 src.function = Some(s.as_str().into());
474 looking_for_function = false;
475 }
476 }
477 }
478
479 log.x_spans.push(exposition);
480 }
481
482 span_ref = span.parent();
483 }
484
485 event.record(&mut log);
486
487 let _ = writeln!(self.writer.lock().unwrap(), "{}", serde_json::to_string(&log).unwrap());
488 }
489
490 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
491 if let Some(mut ops) = self.operations.lock().ok() {
492 ops.remove(&id);
493 }
494
495 if let Some(mut expositions) = self.expositions.write().ok() {
496 expositions.remove(&id);
497 }
498 }
499
500 fn on_id_change(&self, old: &Id, new: &Id, _ctx: Context<'_, S>) {
501 if let Some(mut ops) = self.operations.lock().ok() {
502 if let Some(data) = ops.remove(old) {
503 ops.insert(new.clone(), data);
504 }
505 }
506 }
507
508 unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
509 if id == TypeId::of::<Self>() {
510 Some(self as *const _ as *const ())
511 } else if id == TypeId::of::<Mutex<HashMap<Id, Arc<Mutex<OperationData>>>>>() {
512 let access = &self.operations;
513 Some(access as *const _ as *const ())
514 } else {
515 None
516 }
517 }
518}
519
520#[cfg(feature = "opentelemetry")]
521impl<W: std::io::Write> Layer<W> {
522 fn try_find_trace_id<S: Subscriber + for<'a> LookupSpan<'a>>(&self, id: &Id, ctx: Context<'_, S>, exposition: &mut SpanExposition) {
523 if let Some(span) = ctx.span(id) {
524 if let Some(otel) = span.extensions().get::<OtelData>() {
525 if let Some(trace) = otel.trace_id() {
526 exposition.trace_id = Some(format!("projects/{}/traces/{:032x}", self.project_id, trace).into());
527 }
528 }
529 }
530 }
531}
532
533static B64: LazyLock<base64::engine::GeneralPurpose> = LazyLock::new(|| {
534 base64::engine::GeneralPurpose::new(&base64::alphabet::STANDARD, Default::default())
535});
536
537impl<'a> LogEntry<'a> {
538 fn record(&mut self, field: &Field, value: impl Into<serde_json::Value>) {
539 if field.name().starts_with("labels.") {
540 self.labels.get_or_insert(HashMap::new()).insert(
541 field.name()[7..].into(),
542 value.into()
543 );
544 } else {
545 self.extra.insert(field.name().into(), value.into());
546 }
547 }
548}
549
550impl<'a> field::Visit for LogEntry<'a> {
551 fn record_f64(&mut self, field: &Field, value: f64) {
552 self.record(field, value);
553 }
554
555 fn record_i64(&mut self, field: &Field, value: i64) {
556 if value >= 0 {
557 self.record_u64(field, value as u64);
558 } else {
559 self.record(field, value);
560 }
561 }
562
563 fn record_u64(&mut self, field: &Field, value: u64) {
564 match field.name() {
565 name if name.starts_with("http.") => {
566 let http = self.http_request.get_or_insert_default();
567 match &name[5..] {
568 "request_size" => http.request_size = Some(value),
569 "status" if value < 65536 => http.status = Some(value as u16),
570 "response_size" => http.response_size = Some(value),
571 "cache_fill_bytes" => http.cache_fill_bytes = Some(value),
572 "latency_ns" => http.latency = Some(Duration::nanoseconds(value as i64)),
573 "latency_ms" => http.latency = Some(Duration::milliseconds(value as i64)),
574 "latency_sec" => http.latency = Some(Duration::seconds(value as i64)),
575 _ => self.record(field, value)
576 }
577 },
578 _ => self.record(field, value)
579 }
580 }
581
582 fn record_bool(&mut self, field: &Field, value: bool) {
583 match field.name() {
584 name if name.starts_with("http.") => {
585 let http = self.http_request.get_or_insert_default();
586 match &name[5..] {
587 "cache_lookup" => http.cache_lookup = Some(value),
588 "cache_hit" => http.cache_hit = Some(value),
589 "cache_validated_with_origin_server" => http.cache_validated_with_origin_server = Some(value),
590 _ => self.record(field, value)
591 }
592 },
593 _ => self.record(field, value)
594 }
595 }
596
597 fn record_str(&mut self, field: &Field, value: &str) {
598 match field.name() {
599 "message" => self.message = Some(value.into()),
600 "severity" => self.severity = value.parse().ok(),
601 name if name.starts_with("http.") => {
602 let http = self.http_request.get_or_insert_default();
603 match &name[5..] {
604 "request_method" => http.request_method = Some(value.into()),
605 "request_url" => http.request_url = Some(value.into()),
606 "user_agent" => http.user_agent = Some(value.into()),
607 "remote_ip" => http.remote_ip = Some(value.into()),
608 "server_ip" => http.server_ip = Some(value.into()),
609 "referer" => http.referer = Some(value.into()),
610 "protocol" => http.protocol = Some(value.into()),
611 _ => self.record(field, value)
612 }
613 },
614 _ => self.record(field, value)
615 }
616 }
617
618 fn record_bytes(&mut self, field: &Field, value: &[u8]) {
619 self.record(field, B64.encode(value));
620 }
621
622 fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
623 self.record_str(field, &format!("{:?}", value));
624 }
625}
626
627trait Exposition {
628 fn record(&mut self, field: &Field, value: impl Into<serde_json::Value>);
629
630 fn visit(&'_ mut self) -> Visit<'_, Self> {
631 Visit(self)
632 }
633}
634
635impl Exposition for SpanExposition {
636 fn record(&mut self, field: &Field, value: impl Into<serde_json::Value>) {
637 self.fields.record(field, value);
638 }
639}
640
641impl Exposition for HashMap<Cow<'static, str>, serde_json::Value> {
642 fn record(&mut self, field: &Field, value: impl Into<serde_json::Value>) {
643 let mut s = Cow::Borrowed(field.name());
644
645 if s.len() >= 1 && s.starts_with('@') {
646 s.to_mut().insert(1, '@');
647 }
648
649 self.insert(s.into(), value.into());
650 }
651}
652
653struct Visit<'a, T: Exposition + ?Sized>(&'a mut T);
654
655impl<T: Exposition + ?Sized> field::Visit for Visit<'_, T> {
656 fn record_f64(&mut self, field: &Field, value: f64) {
657 self.0.record(field, value);
658 }
659
660 fn record_i64(&mut self, field: &Field, value: i64) {
661 self.0.record(field, value);
662 }
663
664 fn record_u64(&mut self, field: &Field, value: u64) {
665 self.0.record(field, value);
666 }
667
668 fn record_bool(&mut self, field: &Field, value: bool) {
669 self.0.record(field, value);
670 }
671
672 fn record_str(&mut self, field: &Field, value: &str) {
673 self.0.record(field, value);
674 }
675
676 fn record_bytes(&mut self, field: &Field, value: &[u8]) {
677 self.0.record(field, B64.encode(value));
678 }
679
680 fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
681 self.record_str(field, &format!("{:?}", value));
682 }
683}
684
685pub struct Operation(Option<Arc<Mutex<OperationData>>>);
690
691impl Operation {
692 pub fn init(&self, info: OperationInfo) -> &Self {
701 if let Some(mut data) = self.0.as_ref().and_then(|v| v.lock().ok()) {
702 data.info = info;
703 }
704
705 self
706 }
707
708 pub fn import(&self, info: OperationInfo) -> &Self {
712 if let Some(mut data) = self.0.as_ref().and_then(|v| v.lock().ok()) {
713 data.info = info;
714 data.first = false;
715 }
716
717 self
718 }
719
720 pub fn export(&self) -> OperationInfo {
724 if let Some(data) = self.0.as_ref().and_then(|v| v.lock().ok()) {
725 data.info.clone()
726 } else {
727 OperationInfo::default()
728 }
729 }
730
731 pub fn end(self) {
738 if let Some(mut data) = self.0.as_ref().and_then(|v| v.lock().ok()) {
739 data.last = true;
740 }
741 }
742}
743
744pub trait SpanExt {
749 fn operation(&'_ self) -> Operation;
756
757 fn start_operation(&'_ self, id: impl AsRef<str>, producer: Option<impl AsRef<str>>) -> Operation {
774 let op = self.operation();
775 op.init(OperationInfo::new(id, producer));
776 op
777 }
778}
779
780impl SpanExt for tracing::Span {
781 fn operation(&'_ self) -> Operation {
782 Operation(self.with_subscriber(|(id, subscriber)| {
783 if let Some(operations) = subscriber.downcast_ref::<Mutex<HashMap<Id, Arc<Mutex<OperationData>>>>>() {
784 operations.lock().ok().map(|mut v| {
785 v.entry(id.clone()).or_insert_with(|| Arc::new(Mutex::new(OperationData::new(OperationInfo::default())))).clone()
786 })
787 } else { None }
788 }).and_then(|v| v))
789 }
790}
791
792pub struct LayerBuilder<ProjectId = (), W: std::io::Write = std::io::Stdout> {
793 project_id: ProjectId,
794 writer: W
795}
796
797impl<W: std::io::Write> LayerBuilder<(), W> {
798 pub fn with_project_id<T: AsRef<str>>(self, text: T) -> LayerBuilder<T, W> {
809 LayerBuilder {
810 project_id: text,
811 writer: self.writer
812 }
813 }
814}
815
816impl<ProjectId, W: std::io::Write> LayerBuilder<ProjectId, W> {
817 pub fn with_writer<N: std::io::Write>(self, writer: N) -> LayerBuilder<ProjectId, N> {
820 LayerBuilder {
821 project_id: self.project_id,
822 writer
823 }
824 }
825}
826
827trait ProjectIdTrait {
828 fn string(&self) -> &str;
829}
830
831#[cfg(not(feature = "opentelemetry"))]
832impl<T> ProjectIdTrait for T {
833 fn string(&self) -> &str {
834 ""
835 }
836}
837
838#[cfg(feature = "opentelemetry")]
839impl<T: AsRef<str>> ProjectIdTrait for T {
840 fn string(&self) -> &str {
841 self.as_ref()
842 }
843}
844
845#[allow(private_bounds)]
846impl<ProjectId: ProjectIdTrait, W: std::io::Write> LayerBuilder<ProjectId, W> {
847 pub fn build(self) -> Layer<W> {
848 Layer::new(self.writer, self.project_id.string())
849 }
850}
851
852pub fn builder() -> LayerBuilder {
857 LayerBuilder {
858 project_id: (),
859 writer: std::io::stdout()
860 }
861}