newrelic_unofficial/
lib.rs1#![deny(unsafe_code)]
5
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::thread::{self, JoinHandle};
9use std::time::Duration;
10
11use crate::app_run::AppRun;
12use crate::collector::RpmError;
13pub use crate::config::Config;
14use crate::harvest::Harvest;
15use crate::sync_util::Shutdown;
16pub use crate::transaction::Transaction;
17
18mod apdex;
19mod app_run;
20mod collector;
21pub mod config;
22mod connect_reply;
23mod domain_defs;
24mod harvest;
25mod limits;
26mod metric_names;
27mod metrics;
28mod payloads;
29mod sync_util;
30mod transaction;
31mod transaction_trace;
32mod utilization;
33
34#[derive(Debug)]
35pub struct Daemon {
36 app: Application,
37 handle: Option<JoinHandle<()>>,
38}
39
40impl Daemon {
41 pub fn new(app_name: &str, license: &str) -> Result<Self, crate::config::ConfigError> {
42 Self::from_config(&Config::new(app_name, license))
43 }
44
45 pub(crate) fn from_config(config: &Config) -> Result<Self, crate::config::ConfigError> {
46 config.validate()?;
47
48 let app = Application::new(&config);
49 if !config.enabled {
50 return Ok(Daemon { app, handle: None });
51 }
52 let handle = {
53 let inner = app.inner.clone();
54 thread::spawn(move || {
55 inner.run();
56 })
57 };
58
59 Ok(Daemon {
60 app,
61 handle: Some(handle),
62 })
63 }
64
65 pub fn application(&self) -> &Application {
66 &self.app
67 }
68
69 pub fn start_transaction(&self, name: &str) -> Transaction {
70 self.app.start_transaction(name)
71 }
72
73 pub fn shutdown(&self) {
74 self.app.shutdown()
75 }
76}
77
78impl std::ops::Drop for Daemon {
79 fn drop(&mut self) {
80 self.shutdown();
81 if let Some(handle) = self.handle.take() {
82 let result = handle.join();
83 if let Err(e) = result {
84 log::error!("NewRelic daemon failed: {:?}", e);
85 }
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
91pub struct Application {
92 inner: Arc<ApplicationInner>,
93}
94
95impl Application {
96 fn new(config: &Config) -> Self {
97 Self {
98 inner: Arc::new(ApplicationInner::new(config)),
99 }
100 }
101
102 pub fn start_transaction(&self, name: &str) -> Transaction {
103 Transaction::new(&self.inner, name, None)
104 }
105
106 pub fn start_web_transaction<T>(&self, name: &str, request: &http::Request<T>) -> Transaction {
107 let mut parts = http::Request::new(()).into_parts().0;
108 parts.method = request.method().clone();
109 parts.uri = request.uri().clone();
110 parts.version = request.version();
111 parts.headers = request.headers().clone();
112 let request = http::Request::from_parts(parts, ());
113 Transaction::new(&self.inner, name, Some(request))
114 }
115
116 pub fn shutdown(&self) {
117 self.inner.shutdown.shutdown();
118 }
119}
120
121#[derive(Debug)]
122struct ApplicationInner {
123 config: Config,
124 state: Mutex<AppState>,
125 shutdown: Shutdown,
126}
127
128#[allow(clippy::large_enum_variant)]
129#[derive(Debug)]
130enum AppState {
131 Init,
132 Running { run: Arc<AppRun>, harvest: Harvest },
133 Dead,
134}
135
136impl ApplicationInner {
137 fn new(config: &Config) -> Self {
138 let state = if config.enabled {
139 AppState::Init
140 } else {
141 AppState::Dead
142 };
143 ApplicationInner {
144 config: config.clone(),
145 state: Mutex::new(state),
146 shutdown: Shutdown::new(),
147 }
148 }
149
150 fn run(self: &Arc<Self>) {
151 let mut attempt: u32 = 0;
152 loop {
153 let e = match crate::collector::connect_attempt(&self.config) {
154 Ok(run) => {
155 attempt = 0;
156 match self.run1(run) {
157 Ok(void) => match void {},
158 Err(e @ RpmError::Shutdown(..)) => {
159 self.shutdown();
160 e
161 }
162 Err(e) => e,
163 }
164 }
165 Err(e) => {
166 attempt = attempt.saturating_add(1);
167 e
168 }
169 };
170 if e.is_disconnect() {
171 log::error!("application disconnected: {}", e);
172 break;
173 } else {
174 let backoff_time = connect_backoff_time(attempt);
175 if let Err(_shutdown) = self.shutdown.sleep(backoff_time) {
176 break;
177 }
178 }
179 }
180 {
181 let mut state = self.state.lock();
182 *state = AppState::Dead;
183 }
184 }
185
186 fn run1(self: &Arc<Self>, run: AppRun) -> Result<Void, RpmError> {
187 log::debug!("run = {:#?}", run);
188 let harvest = Harvest::new(&run);
189 {
190 let mut state = self.state.lock();
191 *state = AppState::Running {
192 run: Arc::new(run),
193 harvest,
194 };
195 }
196 loop {
197 self.shutdown.sleep(Duration::from_secs(1))?;
198 let ready = {
200 let mut state = self.state.lock();
201 if let AppState::Running { run, harvest } = &mut *state {
202 Some((Arc::clone(run), harvest.ready(run, false)))
203 } else {
204 None
205 }
206 };
207 if let Some((run, ready)) = ready {
209 let result = ready.harvest(&run);
210 if let Err(e) = result {
211 if e.is_disconnect() || e.is_restart_exception() {
212 return Err(e);
213 } else {
214 log::warn!("harvest failure: {}", e);
215 }
216 }
217 }
218 }
219 }
220
221 fn shutdown(self: &Arc<Self>) {
222 log::debug!("shutting down...");
223 let mut old_state = {
224 let mut state = self.state.lock();
225 std::mem::replace(&mut *state, AppState::Dead)
226 };
227 if let AppState::Running { run, harvest } = &mut old_state {
228 let ready = harvest.ready(run, true);
229 let result = ready.harvest(run);
230 if let Err(e) = result {
231 log::warn!("harvest failure: {}", e);
232 }
233 }
234 }
235}
236
237enum Void {}
238
239fn connect_backoff_time(attempt: u32) -> Duration {
240 const CONNECT_BACKOFF_TIMES: &[Duration] = &[
241 Duration::from_secs(15),
242 Duration::from_secs(15),
243 Duration::from_secs(30),
244 Duration::from_secs(60),
245 Duration::from_secs(120),
246 Duration::from_secs(300),
247 ];
248 const BACKOFF_REPEAT: Duration = CONNECT_BACKOFF_TIMES[CONNECT_BACKOFF_TIMES.len() - 1];
249 CONNECT_BACKOFF_TIMES
250 .get(attempt as usize)
251 .copied()
252 .unwrap_or(BACKOFF_REPEAT)
253}