observer/
lib.rs

1#[macro_use]
2extern crate serde_json;
3#[allow(unused_imports)]
4#[macro_use]
5extern crate lazy_static;
6extern crate failure;
7#[allow(unused_imports)]
8#[macro_use]
9extern crate observer_attribute;
10#[macro_use]
11extern crate serde_derive;
12
13pub mod backends;
14pub mod context;
15#[cfg(feature = "mysql")]
16pub mod mysql;
17pub mod observe;
18pub mod observe_fields;
19#[cfg(feature = "postgres")]
20pub mod pg;
21pub mod span;
22mod sql_parse;
23
24pub use crate::context::Context;
25pub use crate::observe::Observe;
26pub use crate::observe_fields::*;
27pub use crate::span::{Span, SpanItem};
28
29#[macro_use]
30extern crate log;
31
32#[cfg(test)]
33mod tests;
34
35pub type Result<T> = std::result::Result<T, failure::Error>;
36
37pub trait Backend: Send + Sync {
38    fn app_started(&self) {}
39    fn app_ended(&self) {}
40    fn context_created(&self, _id: &str) {}
41    fn context_ended(&self, _ctx: &crate::Context) {}
42    fn span_created(&self, _id: &str) {}
43    fn span_data(&self, _key: &str, _value: &str) {}
44    fn span_ended(&self, _span: Option<&crate::span::Span>) {}
45}
46
47pub struct Observer {
48    backends: Vec<Box<dyn Backend>>,
49}
50
51lazy_static! {
52    static ref OBSERVER: std::sync::Arc<antidote::RwLock<Option<Observer>>> =
53        std::sync::Arc::new(antidote::RwLock::new(None));
54}
55
56thread_local! {
57    static CONTEXT: std::cell::RefCell<Option<Context>> = std::cell::RefCell::new(None);
58}
59
60pub fn builder(backend: Box<dyn Backend>) -> Observer {
61    Observer::builder(backend)
62}
63
64pub fn create_context(context_id: &str) {
65    let obj = OBSERVER.as_ref().read();
66    if let Some(obj) = obj.as_ref() {
67        obj.create_context(context_id);
68    }
69}
70
71pub fn end_context() -> Option<impl serde::Serialize> {
72    let obj = OBSERVER.as_ref().read();
73    if let Some(obj) = obj.as_ref() {
74        Some(obj.end_context())
75    } else {
76        None
77    }
78}
79
80pub fn printed_context() -> Option<String> {
81    use backends::logger::print_context;
82    CONTEXT.with(|context| {
83        if let Some(ctx) = context.borrow().as_ref() {
84            Some(print_context(ctx))
85        } else {
86            None
87        }
88    })
89}
90
91pub fn shape_hash() -> String {
92    use sha2::Digest;
93
94    let trace_without_data = shape_trace().unwrap_or_else(|| "".to_string());
95    format!("{:x}", sha2::Sha256::digest(trace_without_data.as_bytes()))
96}
97
98pub fn shape_trace() -> Option<String> {
99    CONTEXT.with(|context| {
100        if let Some(ctx) = context.borrow().as_ref() {
101            Some(ctx.trace_without_data(false))
102        } else {
103            None
104        }
105    })
106}
107
108pub fn test_trace() -> Option<String> {
109    CONTEXT.with(|context| {
110        if let Some(ctx) = context.borrow().as_ref() {
111            Some(ctx.trace_without_data(true))
112        } else {
113            None
114        }
115    })
116}
117
118pub fn trace() -> Option<String> {
119    CONTEXT.with(|context| {
120        if let Some(ctx) = context.borrow().as_ref() {
121            Some(ctx.trace_without_data(true))
122        } else {
123            None
124        }
125    })
126}
127
128pub fn log(value: &'static str) {
129    let obj = OBSERVER.as_ref().read();
130    if let Some(obj) = obj.as_ref() {
131        obj.span_log(value);
132    }
133}
134
135pub(crate) fn start_span(id: &str) {
136    let obj = OBSERVER.as_ref().read();
137    if let Some(obj) = obj.as_ref() {
138        obj.create_span(id);
139    }
140}
141
142pub(crate) fn end_span(is_critical: bool, err: Option<String>) {
143    let obj = OBSERVER.as_ref().read();
144    if let Some(obj) = obj.as_ref() {
145        obj.end_span(is_critical, err);
146    }
147}
148
149pub(crate) fn field(key: &'static str, value: serde_json::Value) {
150    CONTEXT.with(|context| {
151        if let Some(ctx) = context.borrow().as_ref() {
152            ctx.observe_span_field(key, value);
153        }
154    });
155}
156
157pub(crate) fn transient_field(key: &'static str, value: serde_json::Value) {
158    CONTEXT.with(|context| {
159        if let Some(ctx) = context.borrow().as_ref() {
160            ctx.observe_span_transient_field(key, value);
161        }
162    });
163}
164
165#[allow(dead_code)]
166pub(crate) fn observe_query(
167    query: String,
168    bind: Option<String>,
169    result: std::result::Result<usize, String>,
170) {
171    CONTEXT.with(|context| {
172        if let Some(ctx) = context.borrow().as_ref() {
173            ctx.observe_query(query, bind, result);
174        }
175    });
176}
177
178pub(crate) fn observe_result(result: impl serde::Serialize) {
179    CONTEXT.with(|ctx| {
180        if let Some(ctx) = ctx.borrow().as_ref() {
181            ctx.observe_span_result(result);
182        }
183    });
184}
185
186#[allow(dead_code)]
187pub fn observe_span_id(id: &str) {
188    CONTEXT.with(|context| {
189        if let Some(ctx) = context.borrow().as_ref() {
190            ctx.observe_span_id(id);
191        }
192    });
193}
194
195impl Observer {
196    /// Initialized Observer with different backends(NewRelic, StatsD, Sentry, Jaeger, etc...)
197    /// and call their app started method
198
199    pub fn builder(backend: Box<dyn Backend>) -> Self {
200        Observer {
201            backends: vec![backend],
202        }
203    }
204
205    pub fn add_backend(mut self, backend: Box<dyn Backend>) -> Self {
206        self.backends.push(backend);
207        self
208    }
209
210    pub fn init(self) {
211        for backend in self.backends.iter() {
212            backend.app_started()
213        }
214
215        let mut obj = OBSERVER.as_ref().write();
216        obj.replace(self);
217    }
218
219    /// It will iterate through all backends and call their context_created method.
220    pub(crate) fn create_context(&self, context_id: &str) {
221        CONTEXT.with(|obj| {
222            let mut context = obj.borrow_mut();
223            if context.is_none() {
224                context.replace(Context::new(context_id.to_string()));
225            }
226            for backend in self.backends.iter() {
227                backend.context_created(context_id);
228            }
229        });
230    }
231
232    /// It will end context object and drop things if needed.
233    pub(crate) fn end_context(&self) -> impl serde::Serialize {
234        CONTEXT.with(|ctx| {
235            let mut ctx = ctx.borrow_mut();
236            match ctx.as_ref() {
237                Some(ctx) => {
238                    ctx.finalise();
239                    for backend in self.backends.iter() {
240                        backend.context_ended(&ctx);
241                    }
242                }
243                None => {
244                    unreachable!("this is bug");
245                }
246            };
247            ctx.take()
248        })
249    }
250
251    pub(crate) fn create_span(&self, id: &str) {
252        CONTEXT.with(|ctx| {
253            if let Some(ctx) = ctx.borrow().as_ref() {
254                ctx.start_span(id);
255                for backend in self.backends.iter() {
256                    backend.span_created(id);
257                }
258            }
259        });
260    }
261
262    pub(crate) fn end_span(&self, is_critical: bool, err: Option<String>) {
263        CONTEXT.with(|ctx| {
264            if let Some(ctx) = ctx.borrow().as_ref() {
265                ctx.end_span(is_critical, err);
266                for backend in self.backends.iter() {
267                    backend.span_ended(ctx.span_stack.borrow().last());
268                }
269            }
270        });
271    }
272
273    pub(crate) fn span_log(&self, value: &'static str) {
274        CONTEXT.with(|ctx| {
275            if let Some(ctx) = ctx.borrow().as_ref() {
276                ctx.span_log(value);
277            }
278        });
279    }
280}