Skip to main content

fuel_telemetry/
telemetry_layer.rs

1use std::{env::var, io::Write, path::PathBuf};
2use tracing::{
3    span::{self, Id, Record},
4    Event,
5};
6use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
7use tracing_subscriber::{
8    fmt::{format::DefaultFields, Layer},
9    layer::Context,
10    Layer as LayerTrait, Registry,
11};
12
13use crate::{
14    errors::TelemetryError, telemetry_config, telemetry_formatter::TelemetryFormatter, Result,
15};
16
17/// A `tracing` `Layer` to generate telemetry to be later consumed into InfluxDB
18///
19/// This `tracing` `Layer` formats telemetry and stores the output in a known
20/// location ($FUEL_HOME/tmp/<crate>.telemetry), ready for ingestion by an
21/// InfluxDB collector.
22pub struct TelemetryLayer {
23    pub inner_layer: Layer<Registry, DefaultFields, TelemetryFormatter, NonBlocking>,
24}
25
26impl TelemetryLayer {
27    /// Although public, `__new()` is only intended to be called via the
28    /// following constructor macros:
29    ///
30    /// - `fuel_telemetry::new!()`
31    /// - `fuel_telemetry::new_with_watchers!()`
32    /// - `fuel_telemetry::new_with_watchers_and_init!()`
33    pub fn __new() -> Result<(Self, WorkerGuard)> {
34        Self::__new_with_helpers(&mut DefaultNewHelpers)
35    }
36
37    fn __new_with_helpers(helpers: &mut impl NewHelpers) -> Result<(Self, WorkerGuard)> {
38        let (writer, guard) = {
39            if var("FUELUP_NO_TELEMETRY").is_ok() {
40                // If telemetry is disabled, discards all output
41                helpers.create_non_blocking_sink()
42            } else {
43                let telemetry_pkg_name = var("TELEMETRY_PKG_NAME");
44
45                // This value needs to come from the cargo target which must be
46                // set from a macro constructor. Calling `env!('CARGO_PKG_NAME')`
47                // here will be incorrect as the macro will have already expaneded
48                // leading to the constant value "fuel-telemetry" for all targets
49                if telemetry_pkg_name.is_err() || var("TELEMETRY_PKG_VERSION").is_err() {
50                    return Err(TelemetryError::InvalidUsage);
51                }
52
53                // If telemetry is enabled, telemetry will be written to a file
54                // that is rotated hourly with the filename format:
55                // "$FUELUP_TMP/<crate>.telemetry.YYYY-MM-DD-HH"
56                helpers.create_non_blocking_appender(tracing_appender::rolling::hourly(
57                    PathBuf::from(telemetry_config()?.fuelup_tmp.clone()),
58                    format!(
59                        "{}.telemetry",
60                        telemetry_pkg_name.map_err(|_| TelemetryError::UnreadableCrateName)?
61                    ),
62                ))
63            }
64        };
65
66        // We need to disable ANSI codes as it breaks InfluxDB parsing
67        let inner_layer = tracing_subscriber::fmt::layer()
68            .with_writer(writer)
69            .with_ansi(false)
70            .event_format(TelemetryFormatter::new());
71
72        Ok((Self { inner_layer }, guard))
73    }
74}
75
76/// Sets `TRACE_ID` env variable to a new UUID.
77pub fn set_trace_id_env_to_new_uuid() {
78    let trace_id = uuid::Uuid::new_v4().to_string();
79    std::env::set_var("TRACE_ID", trace_id);
80}
81
82trait NewHelpers {
83    fn create_non_blocking_sink(&mut self) -> (NonBlocking, WorkerGuard) {
84        tracing_appender::non_blocking(std::io::sink())
85    }
86
87    fn create_non_blocking_appender(
88        &mut self,
89        writer: impl Write + Send + 'static,
90    ) -> (NonBlocking, WorkerGuard) {
91        tracing_appender::non_blocking(writer)
92    }
93}
94
95struct DefaultNewHelpers;
96impl NewHelpers for DefaultNewHelpers {}
97
98// Implement the `Layer` trait for `TelemetryLayer`
99//
100// Here we simply proxy calls to the inner layer.
101impl LayerTrait<Registry> for TelemetryLayer {
102    fn on_close(&self, id: Id, ctx: Context<'_, Registry>) {
103        self.inner_layer.on_close(id, ctx);
104    }
105
106    fn on_enter(&self, id: &span::Id, ctx: Context<'_, Registry>) {
107        self.inner_layer.on_enter(id, ctx);
108    }
109
110    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, Registry>) {
111        self.inner_layer.on_event(event, ctx);
112    }
113
114    fn on_exit(&self, id: &span::Id, ctx: Context<'_, Registry>) {
115        self.inner_layer.on_exit(id, ctx);
116    }
117
118    fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, Registry>) {
119        self.inner_layer.on_id_change(old, new, ctx);
120    }
121
122    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, Registry>) {
123        self.inner_layer.on_new_span(attrs, id, ctx);
124    }
125
126    fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, Registry>) {
127        self.inner_layer.on_follows_from(span, follows, ctx);
128    }
129
130    fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, Registry>) {
131        self.inner_layer.on_record(span, values, ctx);
132    }
133}
134
135#[cfg(test)]
136mod __new {
137    use super::*;
138    use crate::setup_fuelup_home;
139    use rusty_fork::rusty_fork_test;
140    use std::env::{remove_var, set_var};
141
142    rusty_fork_test! {
143        #[test]
144        fn opt_out_is_true() {
145            setup_fuelup_home();
146
147            set_var("FUELUP_NO_TELEMETRY", "true");
148
149            #[derive(Default)]
150            struct OptOutHelpers {
151                create_non_blocking_sink_called: bool,
152                create_non_blocking_appender_called: bool,
153            }
154
155            impl NewHelpers for OptOutHelpers {
156                fn create_non_blocking_sink(&mut self) -> (NonBlocking, WorkerGuard) {
157                    self.create_non_blocking_sink_called = true;
158                    tracing_appender::non_blocking(std::io::sink())
159                }
160
161                fn create_non_blocking_appender(
162                    &mut self,
163                    writer: impl Write + Send + 'static,
164                ) -> (NonBlocking, WorkerGuard) {
165                    self.create_non_blocking_appender_called = true;
166                    tracing_appender::non_blocking(writer)
167                }
168            }
169
170            let mut helpers = OptOutHelpers::default();
171            let result = TelemetryLayer::__new_with_helpers(&mut helpers);
172
173            assert!(result.is_ok());
174            assert!(helpers.create_non_blocking_sink_called);
175            assert!(!helpers.create_non_blocking_appender_called);
176        }
177
178        #[test]
179        fn opt_out_is_empty() {
180            setup_fuelup_home();
181
182            // Even though it's empty, we only care if it's set
183            set_var("FUELUP_NO_TELEMETRY", "");
184
185            #[derive(Default)]
186            struct OptOutHelpers {
187                create_non_blocking_sink_called: bool,
188                create_non_blocking_appender_called: bool,
189            }
190
191            impl NewHelpers for OptOutHelpers {
192                fn create_non_blocking_sink(&mut self) -> (NonBlocking, WorkerGuard) {
193                    self.create_non_blocking_sink_called = true;
194                    tracing_appender::non_blocking(std::io::sink())
195                }
196
197                fn create_non_blocking_appender(
198                    &mut self,
199                    writer: impl Write + Send + 'static,
200                ) -> (NonBlocking, WorkerGuard) {
201                    self.create_non_blocking_appender_called = true;
202                    tracing_appender::non_blocking(writer)
203                }
204            }
205
206            let mut helpers = OptOutHelpers::default();
207            let result = TelemetryLayer::__new_with_helpers(&mut helpers);
208
209            assert!(result.is_ok());
210            assert!(helpers.create_non_blocking_sink_called);
211            assert!(!helpers.create_non_blocking_appender_called);
212        }
213
214        #[test]
215        fn telemetry_pkg_name_is_not_set() {
216            setup_fuelup_home();
217
218            remove_var("TELEMETRY_PKG_NAME");
219            set_var("TELEMETRY_PKG_VERSION", "1.0.0");
220
221            let result = TelemetryLayer::__new();
222
223            assert_eq!(result.err(), Some(TelemetryError::InvalidUsage));
224        }
225
226        #[test]
227        fn telemetry_pkg_version_is_not_set() {
228            setup_fuelup_home();
229
230            remove_var("TELEMETRY_PKG_VERSION");
231            set_var("TELEMETRY_PKG_NAME", "test_pkg_name");
232
233            let result = TelemetryLayer::__new();
234
235            assert_eq!(result.err(), Some(TelemetryError::InvalidUsage));
236        }
237
238        #[test]
239        fn ok() {
240            setup_fuelup_home();
241
242            set_var("TELEMETRY_PKG_NAME", "test_pkg_name");
243            set_var("TELEMETRY_PKG_VERSION", "1.0.0");
244
245            #[derive(Default)]
246            struct OkHelpers {
247                create_non_blocking_sink_called: bool,
248                create_non_blocking_appender_called: bool,
249            }
250
251            impl NewHelpers for OkHelpers {
252                fn create_non_blocking_sink(&mut self) -> (NonBlocking, WorkerGuard) {
253                    self.create_non_blocking_sink_called = true;
254                    tracing_appender::non_blocking(std::io::sink())
255                }
256
257                fn create_non_blocking_appender(
258                    &mut self,
259                    writer: impl Write + Send + 'static,
260                ) -> (NonBlocking, WorkerGuard) {
261                    self.create_non_blocking_appender_called = true;
262                    tracing_appender::non_blocking(writer)
263                }
264            }
265
266            let mut helpers = OkHelpers::default();
267            let result = TelemetryLayer::__new_with_helpers(&mut helpers);
268
269            assert!(result.is_ok());
270            assert!(!helpers.create_non_blocking_sink_called);
271            assert!(helpers.create_non_blocking_appender_called);
272        }
273    }
274}