1use {
4 crate::{counter::CounterPoint, datapoint::DataPoint},
5 crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError},
6 gethostname::gethostname,
7 log::*,
8 solana_cluster_type::ClusterType,
9 solana_sha256_hasher::hash,
10 std::{
11 cmp,
12 collections::HashMap,
13 convert::Into,
14 env,
15 fmt::Write,
16 sync::{Arc, Barrier, Mutex, Once, RwLock},
17 thread,
18 time::{Duration, Instant, UNIX_EPOCH},
19 },
20 thiserror::Error,
21};
22
23type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
24
25#[derive(Debug, Error)]
26pub enum MetricsError {
27 #[error(transparent)]
28 VarError(#[from] env::VarError),
29 #[error(transparent)]
30 ReqwestError(#[from] reqwest::Error),
31 #[error("SOLANA_METRICS_CONFIG is invalid: '{0}'")]
32 ConfigInvalid(String),
33 #[error("SOLANA_METRICS_CONFIG is incomplete")]
34 ConfigIncomplete,
35 #[error("SOLANA_METRICS_CONFIG database mismatch: {0}")]
36 DbMismatch(String),
37}
38
39impl From<MetricsError> for String {
40 fn from(error: MetricsError) -> Self {
41 error.to_string()
42 }
43}
44
45impl From<&CounterPoint> for DataPoint {
46 fn from(counter_point: &CounterPoint) -> Self {
47 let mut point = Self::new(counter_point.name);
48 point.timestamp = counter_point.timestamp;
49 point.add_field_i64("count", counter_point.count);
50 point
51 }
52}
53
54#[derive(Debug)]
55enum MetricsCommand {
56 Flush(Arc<Barrier>),
57 Submit(DataPoint, log::Level),
58 SubmitCounter(CounterPoint, log::Level, u64),
59}
60
61pub struct MetricsAgent {
62 sender: Sender<MetricsCommand>,
63}
64
65pub trait MetricsWriter {
66 fn write(&self, points: Vec<DataPoint>);
69}
70
71struct InfluxDbMetricsWriter {
72 write_url: Option<String>,
73}
74
75impl InfluxDbMetricsWriter {
76 fn new() -> Self {
77 Self {
78 write_url: Self::build_write_url().ok(),
79 }
80 }
81
82 fn build_write_url() -> Result<String, MetricsError> {
83 let config = get_metrics_config().map_err(|err| {
84 info!("metrics disabled: {err}");
85 err
86 })?;
87
88 info!(
89 "metrics configuration: host={} db={} username={}",
90 config.host, config.db, config.username
91 );
92
93 let write_url = format!(
94 "{}/write?db={}&u={}&p={}&precision=n",
95 &config.host, &config.db, &config.username, &config.password
96 );
97
98 Ok(write_url)
99 }
100}
101
102pub fn serialize_points(points: &Vec<DataPoint>, host_id: &str) -> String {
103 const TIMESTAMP_LEN: usize = 20;
104 const HOST_ID_LEN: usize = 8; const EXTRA_LEN: usize = 2; let mut len = 0;
107 for point in points {
108 for (name, value) in &point.fields {
109 len += name.len() + value.len() + EXTRA_LEN;
110 }
111 for (name, value) in &point.tags {
112 len += name.len() + value.len() + EXTRA_LEN;
113 }
114 len += point.name.len();
115 len += TIMESTAMP_LEN;
116 len += host_id.len() + HOST_ID_LEN;
117 }
118 let mut line = String::with_capacity(len);
119 for point in points {
120 let _ = write!(line, "{},host_id={}", &point.name, host_id);
121 for (name, value) in point.tags.iter() {
122 let _ = write!(line, ",{name}={value}");
123 }
124
125 let mut first = true;
126 for (name, value) in point.fields.iter() {
127 let _ = write!(line, "{}{}={}", if first { ' ' } else { ',' }, name, value);
128 first = false;
129 }
130 let timestamp = point.timestamp.duration_since(UNIX_EPOCH);
131 let nanos = timestamp.unwrap().as_nanos();
132 let _ = writeln!(line, " {nanos}");
133 }
134 line
135}
136
137impl MetricsWriter for InfluxDbMetricsWriter {
138 fn write(&self, points: Vec<DataPoint>) {
139 if let Some(ref write_url) = self.write_url {
140 debug!("submitting {} points", points.len());
141
142 let host_id = HOST_ID.read().unwrap();
143
144 let line = serialize_points(&points, &host_id);
145
146 let client = reqwest::blocking::Client::builder()
147 .timeout(Duration::from_secs(5))
148 .build();
149 let client = match client {
150 Ok(client) => client,
151 Err(err) => {
152 warn!("client instantiation failed: {err}");
153 return;
154 }
155 };
156
157 let response = client.post(write_url.as_str()).body(line).send();
158 if let Ok(resp) = response {
159 let status = resp.status();
160 if !status.is_success() {
161 let text = resp
162 .text()
163 .unwrap_or_else(|_| "[text body empty]".to_string());
164 warn!("submit response unsuccessful: {status} {text}",);
165 }
166 } else {
167 warn!("submit error: {}", response.unwrap_err());
168 }
169 }
170 }
171}
172
173impl Default for MetricsAgent {
174 fn default() -> Self {
175 let max_points_per_sec = env::var("SOLANA_METRICS_MAX_POINTS_PER_SECOND")
176 .map(|x| {
177 x.parse()
178 .expect("Failed to parse SOLANA_METRICS_MAX_POINTS_PER_SECOND")
179 })
180 .unwrap_or(4000);
181
182 Self::new(
183 Arc::new(InfluxDbMetricsWriter::new()),
184 Duration::from_secs(10),
185 max_points_per_sec,
186 )
187 }
188}
189
190impl MetricsAgent {
191 pub fn new(
192 writer: Arc<dyn MetricsWriter + Send + Sync>,
193 write_frequency: Duration,
194 max_points_per_sec: usize,
195 ) -> Self {
196 let (sender, receiver) = unbounded::<MetricsCommand>();
197
198 thread::Builder::new()
199 .name("solMetricsAgent".into())
200 .spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec))
201 .unwrap();
202
203 Self { sender }
204 }
205
206 fn combine_points(
215 max_points: usize,
216 max_points_per_sec: usize,
217 secs_since_last_write: u64,
218 points_buffered: usize,
219 points: &mut Vec<DataPoint>,
220 counters: &mut CounterMap,
221 ) -> Vec<DataPoint> {
222 let max_points = max_points.saturating_sub(1);
224
225 let num_points = points.len().saturating_add(counters.len());
226 let fit_counters = max_points.saturating_sub(points.len());
227 let points_written = cmp::min(num_points, max_points);
228
229 debug!("run: attempting to write {num_points} points");
230
231 if num_points > max_points {
232 warn!(
233 "Max submission rate of {max_points_per_sec} datapoints per second exceeded. Only \
234 the first {max_points} of {num_points} points will be submitted."
235 );
236 }
237
238 let mut combined = std::mem::take(points);
239 combined.truncate(points_written);
240
241 combined.extend(counters.values().take(fit_counters).map(|v| v.into()));
242 counters.clear();
243
244 combined.push(
245 DataPoint::new("metrics")
246 .add_field_i64("points_written", points_written as i64)
247 .add_field_i64("num_points", num_points as i64)
248 .add_field_i64("points_lost", (num_points - points_written) as i64)
249 .add_field_i64("points_buffered", points_buffered as i64)
250 .add_field_i64("secs_since_last_write", secs_since_last_write as i64)
251 .to_owned(),
252 );
253
254 combined
255 }
256
257 fn write(
262 writer: &Arc<dyn MetricsWriter + Send + Sync>,
263 max_points: usize,
264 max_points_per_sec: usize,
265 last_write_time: Instant,
266 points_buffered: usize,
267 points: &mut Vec<DataPoint>,
268 counters: &mut CounterMap,
269 ) -> Instant {
270 let now = Instant::now();
271 let secs_since_last_write = now.duration_since(last_write_time).as_secs();
272
273 writer.write(Self::combine_points(
274 max_points,
275 max_points_per_sec,
276 secs_since_last_write,
277 points_buffered,
278 points,
279 counters,
280 ));
281
282 now
283 }
284
285 fn run(
286 receiver: &Receiver<MetricsCommand>,
287 writer: &Arc<dyn MetricsWriter + Send + Sync>,
288 write_frequency: Duration,
289 max_points_per_sec: usize,
290 ) {
291 trace!("run: enter");
292 let mut last_write_time = Instant::now();
293 let mut points = Vec::<DataPoint>::new();
294 let mut counters = CounterMap::new();
295
296 let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
297
298 let write = |last_write_time: Instant,
300 points: &mut Vec<DataPoint>,
301 counters: &mut CounterMap|
302 -> Instant {
303 Self::write(
304 writer,
305 max_points,
306 max_points_per_sec,
307 last_write_time,
308 receiver.len(),
309 points,
310 counters,
311 )
312 };
313
314 loop {
315 match receiver.try_recv() {
316 Ok(cmd) => match cmd {
317 MetricsCommand::Flush(barrier) => {
318 debug!("metrics_thread: flush");
319 last_write_time = write(last_write_time, &mut points, &mut counters);
320 barrier.wait();
321 }
322 MetricsCommand::Submit(point, level) => {
323 log!(level, "{point}");
324 points.push(point);
325 }
326 MetricsCommand::SubmitCounter(counter, _level, bucket) => {
327 debug!("{counter:?}");
328 let key = (counter.name, bucket);
329 if let Some(value) = counters.get_mut(&key) {
330 value.count += counter.count;
331 } else {
332 counters.insert(key, counter);
333 }
334 }
335 },
336 Err(TryRecvError::Empty) => {
337 std::thread::sleep(Duration::from_millis(5));
338 }
339 Err(TryRecvError::Disconnected) => {
340 debug!("run: sender disconnected");
341 break;
342 }
343 };
344
345 let now = Instant::now();
346 if now.duration_since(last_write_time) >= write_frequency {
347 last_write_time = write(last_write_time, &mut points, &mut counters);
348 }
349 }
350
351 debug_assert!(
352 points.is_empty() && counters.is_empty(),
353 "Controlling `MetricsAgent` is expected to call `flush()` from the `Drop` \
354 implementation, before exiting. So both `points` and `counters` must be empty at \
355 this point. `points`: {points:?}, `counters`: {counters:?}",
356 );
357
358 trace!("run: exit");
359 }
360
361 pub fn submit(&self, point: DataPoint, level: log::Level) {
362 self.sender
363 .send(MetricsCommand::Submit(point, level))
364 .unwrap();
365 }
366
367 pub fn submit_counter(&self, counter: CounterPoint, level: log::Level, bucket: u64) {
368 self.sender
369 .send(MetricsCommand::SubmitCounter(counter, level, bucket))
370 .unwrap();
371 }
372
373 pub fn flush(&self) {
374 debug!("Flush");
375 let barrier = Arc::new(Barrier::new(2));
376 self.sender
377 .send(MetricsCommand::Flush(Arc::clone(&barrier)))
378 .unwrap();
379
380 barrier.wait();
381 }
382}
383
384impl Drop for MetricsAgent {
385 fn drop(&mut self) {
386 self.flush();
387 }
388}
389
390fn get_singleton_agent() -> &'static MetricsAgent {
391 static AGENT: std::sync::LazyLock<MetricsAgent> =
392 std::sync::LazyLock::new(MetricsAgent::default);
393 &AGENT
394}
395
396static HOST_ID: std::sync::LazyLock<RwLock<String>> = std::sync::LazyLock::new(|| {
397 RwLock::new({
398 let hostname: String = gethostname()
399 .into_string()
400 .unwrap_or_else(|_| "".to_string());
401 format!("{}", hash(hostname.as_bytes()))
402 })
403});
404
405pub fn set_host_id(host_id: String) {
406 info!("host id: {host_id}");
407 *HOST_ID.write().unwrap() = host_id;
408}
409
410pub fn get_host_id() -> String {
411 HOST_ID.read().unwrap().clone()
412}
413
414pub fn submit(point: DataPoint, level: log::Level) {
417 let agent = get_singleton_agent();
418 agent.submit(point, level);
419}
420
421pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
424 let agent = get_singleton_agent();
425 agent.submit_counter(point, level, bucket);
426}
427
428#[derive(Debug, Default)]
429struct MetricsConfig {
430 pub host: String,
431 pub db: String,
432 pub username: String,
433 pub password: String,
434}
435
436impl MetricsConfig {
437 fn complete(&self) -> bool {
438 !(self.host.is_empty()
439 || self.db.is_empty()
440 || self.username.is_empty()
441 || self.password.is_empty())
442 }
443}
444
445fn get_metrics_config() -> Result<MetricsConfig, MetricsError> {
446 let mut config = MetricsConfig::default();
447 let config_var = env::var("SOLANA_METRICS_CONFIG")?;
448 if config_var.is_empty() {
449 Err(env::VarError::NotPresent)?;
450 }
451
452 for pair in config_var.split(',') {
453 let nv: Vec<_> = pair.split('=').collect();
454 if nv.len() != 2 {
455 return Err(MetricsError::ConfigInvalid(pair.to_string()));
456 }
457 let v = nv[1].to_string();
458 match nv[0] {
459 "host" => config.host = v,
460 "db" => config.db = v,
461 "u" => config.username = v,
462 "p" => config.password = v,
463 _ => return Err(MetricsError::ConfigInvalid(pair.to_string())),
464 }
465 }
466
467 if !config.complete() {
468 return Err(MetricsError::ConfigIncomplete);
469 }
470
471 Ok(config)
472}
473
474pub fn metrics_config_sanity_check(cluster_type: ClusterType) -> Result<(), MetricsError> {
475 let config = match get_metrics_config() {
476 Ok(config) => config,
477 Err(MetricsError::VarError(env::VarError::NotPresent)) => return Ok(()),
478 Err(e) => return Err(e),
479 };
480 match &config.db[..] {
481 "mainnet-beta" if cluster_type != ClusterType::MainnetBeta => (),
482 "tds" if cluster_type != ClusterType::Testnet => (),
483 "devnet" if cluster_type != ClusterType::Devnet => (),
484 _ => return Ok(()),
485 };
486 let (host, db) = (&config.host, &config.db);
487 let msg = format!("cluster_type={cluster_type:?} host={host} database={db}");
488 Err(MetricsError::DbMismatch(msg))
489}
490
491pub fn query(q: &str) -> Result<String, MetricsError> {
492 let config = get_metrics_config()?;
493 let query_url = format!(
494 "{}/query?u={}&p={}&q={}",
495 &config.host, &config.username, &config.password, &q
496 );
497
498 let response = reqwest::blocking::get(query_url.as_str())?.text()?;
499
500 Ok(response)
501}
502
503pub fn flush() {
506 let agent = get_singleton_agent();
507 agent.flush();
508}
509
510pub fn set_panic_hook(program: &'static str, version: Option<String>) {
512 static SET_HOOK: Once = Once::new();
513 SET_HOOK.call_once(|| {
514 let default_hook = std::panic::take_hook();
515 std::panic::set_hook(Box::new(move |ono| {
516 default_hook(ono);
517 let location = match ono.location() {
518 Some(location) => location.to_string(),
519 None => "?".to_string(),
520 };
521 submit(
522 DataPoint::new("panic")
523 .add_field_str("program", program)
524 .add_field_str("thread", thread::current().name().unwrap_or("?"))
525 .add_field_i64("one", 1)
528 .add_field_str("message", &ono.to_string())
529 .add_field_str("location", &location)
530 .add_field_str("version", version.as_ref().unwrap_or(&"".to_string()))
531 .to_owned(),
532 Level::Error,
533 );
534 flush();
536
537 std::process::exit(1);
539 }));
540 });
541}
542
543pub mod test_mocks {
544 use super::*;
545
546 pub struct MockMetricsWriter {
547 pub points_written: Arc<Mutex<Vec<DataPoint>>>,
548 }
549 impl MockMetricsWriter {
550 pub fn new() -> Self {
551 MockMetricsWriter {
552 points_written: Arc::new(Mutex::new(Vec::new())),
553 }
554 }
555
556 pub fn points_written(&self) -> usize {
557 self.points_written.lock().unwrap().len()
558 }
559 }
560
561 impl Default for MockMetricsWriter {
562 fn default() -> Self {
563 Self::new()
564 }
565 }
566
567 impl MetricsWriter for MockMetricsWriter {
568 fn write(&self, points: Vec<DataPoint>) {
569 assert!(!points.is_empty());
570
571 let new_points = points.len();
572 self.points_written.lock().unwrap().extend(points);
573
574 info!(
575 "Writing {} points ({} total)",
576 new_points,
577 self.points_written(),
578 );
579 }
580 }
581}
582
583#[cfg(test)]
584mod test {
585 use {super::*, test_mocks::MockMetricsWriter};
586
587 #[test]
588 fn test_submit() {
589 let writer = Arc::new(MockMetricsWriter::new());
590 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
591
592 for i in 0..42 {
593 agent.submit(
594 DataPoint::new("measurement")
595 .add_field_i64("i", i)
596 .to_owned(),
597 Level::Info,
598 );
599 }
600
601 agent.flush();
602 assert_eq!(writer.points_written(), 43);
603 }
604
605 #[test]
606 fn test_submit_counter() {
607 let writer = Arc::new(MockMetricsWriter::new());
608 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
609
610 for i in 0..10 {
611 agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
612 agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i);
613 }
614
615 agent.flush();
616 assert_eq!(writer.points_written(), 21);
617 }
618
619 #[test]
620 fn test_submit_counter_increment() {
621 let writer = Arc::new(MockMetricsWriter::new());
622 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
623
624 for _ in 0..10 {
625 agent.submit_counter(
626 CounterPoint {
627 name: "counter",
628 count: 10,
629 timestamp: UNIX_EPOCH,
630 },
631 Level::Info,
632 0, );
634 }
635
636 agent.flush();
637 assert_eq!(writer.points_written(), 2);
638
639 let submitted_point = writer.points_written.lock().unwrap()[0].clone();
640 assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
641 }
642
643 #[test]
644 fn test_submit_bucketed_counter() {
645 let writer = Arc::new(MockMetricsWriter::new());
646 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
647
648 for i in 0..50 {
649 agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10);
650 agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10);
651 }
652
653 agent.flush();
654 assert_eq!(writer.points_written(), 11);
655 }
656
657 #[test]
658 fn test_submit_with_delay() {
659 let writer = Arc::new(MockMetricsWriter::new());
660 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
661
662 agent.submit(DataPoint::new("point 1"), Level::Info);
663 thread::sleep(Duration::from_secs(2));
664 assert_eq!(writer.points_written(), 2);
665 }
666
667 #[test]
668 fn test_submit_exceed_max_rate() {
669 let writer = Arc::new(MockMetricsWriter::new());
670
671 let max_points_per_sec = 100;
672
673 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), max_points_per_sec);
674
675 for i in 0..(max_points_per_sec + 20) {
676 agent.submit(
677 DataPoint::new("measurement")
678 .add_field_i64("i", i.try_into().unwrap())
679 .to_owned(),
680 Level::Info,
681 );
682 }
683
684 agent.flush();
685
686 assert_eq!(writer.points_written(), max_points_per_sec);
689 }
690
691 #[test]
692 fn test_multithread_submit() {
693 let writer = Arc::new(MockMetricsWriter::new());
694 let agent = Arc::new(Mutex::new(MetricsAgent::new(
695 writer.clone(),
696 Duration::from_secs(10),
697 1000,
698 )));
699
700 let mut threads = Vec::new();
704 for i in 0..42 {
705 let mut point = DataPoint::new("measurement");
706 point.add_field_i64("i", i);
707 let agent = Arc::clone(&agent);
708 threads.push(thread::spawn(move || {
709 agent.lock().unwrap().submit(point, Level::Info);
710 }));
711 }
712
713 for thread in threads {
714 thread.join().unwrap();
715 }
716
717 agent.lock().unwrap().flush();
718 assert_eq!(writer.points_written(), 43);
719 }
720
721 #[test]
722 fn test_flush_before_drop() {
723 let writer = Arc::new(MockMetricsWriter::new());
724 {
725 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9_999_999), 1000);
726 agent.submit(DataPoint::new("point 1"), Level::Info);
727 }
728
729 assert_eq!(writer.points_written(), 2);
733 }
734
735 #[test]
736 fn test_live_submit() {
737 let agent = MetricsAgent::default();
738
739 let point = DataPoint::new("live_submit_test")
740 .add_field_bool("true", true)
741 .add_field_bool("random_bool", rand::random::<u8>() < 128)
742 .add_field_i64("random_int", rand::random::<u8>() as i64)
743 .to_owned();
744 agent.submit(point, Level::Info);
745 }
746
747 #[test]
748 fn test_host_id() {
749 let test_host_id = "test_host_123".to_string();
750 set_host_id(test_host_id.clone());
751 assert_eq!(get_host_id(), test_host_id);
752 }
753}