plane_core/
logging.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
use crate::{
    messages::logging::{Component, LogMessage, SerializableLevel},
    nats::TypedNats,
};
use anyhow::{anyhow, Result};
use chrono::Utc;
use std::{collections::BTreeMap, fmt::Debug};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{
    field::{Field, Visit},
    span::Attributes,
    Id,
};
use tracing_stackdriver::Stackdriver;
use tracing_subscriber::{layer::Context, util::SubscriberInitExt, EnvFilter, Layer};
use tracing_subscriber::{layer::SubscriberExt, registry::LookupSpan};

const TRACE_STACKDRIVER: &str = "TRACE_STACKDRIVER";
const LOG_DEFAULT: &str = "info,sqlx=warn";

async fn do_logs(nc: &TypedNats, mut recv: tokio::sync::mpsc::Receiver<LogMessage>) {
    while let Some(msg) = recv.recv().await {
        if let Err(err) = nc.publish(&msg).await {
            println!("{:?}", err);
        }
    }
}

pub struct TracingHandle {
    recv: Option<Receiver<LogMessage>>,
}

impl TracingHandle {
    pub fn init(component: Component) -> Result<Self> {
        let (send, recv) = tokio::sync::mpsc::channel::<LogMessage>(128);

        let filter_layer =
            EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new(LOG_DEFAULT))?;

        let registry = tracing_subscriber::registry()
            .with(LogManagerLogger::new(send, component))
            .with(filter_layer);

        let trace_stackdriver = std::env::var(TRACE_STACKDRIVER).is_ok();
        if trace_stackdriver {
            registry.with(Stackdriver::layer()).init();
        } else {
            registry.with(tracing_subscriber::fmt::layer()).init();
        };

        Ok(TracingHandle { recv: Some(recv) })
    }

    pub fn attach_nats(&mut self, nats: TypedNats) -> Result<()> {
        let recv = self.recv.take().ok_or_else(|| {
            anyhow!("connect_nats on TracingHandle should not be called more than once.")
        })?;
        tokio::spawn(async move {
            do_logs(&nats, recv).await;
            tracing::error!("do_logs terminated.");
        });

        Ok(())
    }
}

/// Helper trait for consuming and logging errors without further processing.
/// Mostly useful for when a function returns a `Result<()>` and we want to
/// know if it failed but still continue execution.
pub trait LogError<E: Debug> {
    fn log_error(&self, message: &str);
}

impl<T, E: Debug> LogError<E> for Result<T, E> {
    fn log_error(&self, message: &str) {
        if let Err(error) = self {
            tracing::error!(?error, message);
        }
    }
}

impl<T> LogError<()> for Option<T> {
    fn log_error(&self, message: &str) {
        if self.is_none() {
            tracing::error!(message);
        }
    }
}

pub struct LogManagerLogger {
    sender: Sender<LogMessage>,
    component: Component,
}

impl LogManagerLogger {
    fn new(sender: Sender<LogMessage>, component: Component) -> LogManagerLogger {
        LogManagerLogger { sender, component }
    }
}

impl<S> Layer<S> for LogManagerLogger
where
    S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
        // Adapted from:
        // https://github.com/LukeMathWalker/tracing-bunyan-formatter/blob/master/src/storage_layer.rs
        let span = ctx.span(id).expect("Span not found, this is a bug");

        let mut visitor = if let Some(parent_span) = span.parent() {
            let mut extensions = parent_span.extensions_mut();
            extensions
                .get_mut::<JsonVisitor>()
                .map(|v| v.clone())
                .unwrap_or_default()
        } else {
            JsonVisitor::default()
        };

        let mut extensions = span.extensions_mut();

        attrs.record(&mut visitor);
        extensions.insert(visitor);
    }

    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
        let mut visitor = if let Some(span) = ctx.lookup_current() {
            let extensions = span.extensions();
            if let Some(visitor) = extensions.get::<JsonVisitor>() {
                visitor.clone()
            } else {
                JsonVisitor::default()
            }
        } else {
            JsonVisitor::default()
        };

        event.record(&mut visitor);

        let output = LogMessage {
            component: self.component.clone(),
            target: event.metadata().target().to_string(),
            name: event.metadata().name().to_string(),
            severity: SerializableLevel(*event.metadata().level()),
            time: Utc::now(),
            fields: visitor.0,
        };

        // make sure logging in this function call does not trigger an infinite loop
        // self.nc.publish_drone_log_message(&msg).unwrap();
        if self.sender.try_send(output).is_err() {
            println!("Warning: sender buffer is full.");
        }
    }
}

#[derive(Clone, Default)]
struct JsonVisitor(BTreeMap<String, serde_json::Value>);

impl Visit for JsonVisitor {
    fn record_f64(&mut self, field: &Field, value: f64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }
    fn record_i64(&mut self, field: &Field, value: i64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }
    fn record_u64(&mut self, field: &Field, value: u64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }
    fn record_bool(&mut self, field: &Field, value: bool) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }
    fn record_str(&mut self, field: &Field, value: &str) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }
    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
        // If a field value is a serde_json Value type, it will
        // stringify to valid JSON. Since we are outputting JSON,
        // rather than encoding JSON as a string, we replace it
        // with the JSON-decoded string.
        // This is kind of inefficient, because we pass a Value
        // through an encode/decode cycle just to get back the
        // same value, but it's the best we can do (see
        // https://github.com/tokio-rs/tracing/issues/663)
        let value = format!("{:?}", value);
        match serde_json::from_str::<serde_json::Value>(&value) {
            Ok(value) => {
                self.0.insert(field.name().into(), value);
            }
            Err(_) => {
                self.0.insert(field.name().into(), value.into());
            }
        }
    }
}