1use std::fmt;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{
9 self, Receiver as CrossbeamReceiver, Sender as CrossbeamSender, TryRecvError,
10};
11
12use futures::{channel::oneshot, Future, TryFutureExt};
13
14use crate::instruments::switches::*;
15use crate::instruments::*;
16use crate::processor::{
17 AggregatesProcessors, ProcessesTelemetryMessages, ProcessingOutcome, ProcessingStrategy,
18};
19use crate::snapshot::{ItemKind, Snapshot};
20use crate::util;
21use crate::{Descriptive, PutsSnapshot};
22
23pub struct DriverBuilder {
25 pub name: Option<String>,
29 pub title: Option<String>,
33 pub description: Option<String>,
37 pub processing_strategy: ProcessingStrategy,
41 pub with_driver_metrics: bool,
46}
47
48impl DriverBuilder {
49 pub fn new<T: Into<String>>(name: T) -> DriverBuilder {
50 let mut me = Self::default();
51 me.name = Some(name.into());
52 me
53 }
54
55 pub fn set_name<T: Into<String>>(mut self, name: T) -> Self {
56 self.name = Some(name.into());
57 self
58 }
59
60 pub fn set_title<T: Into<String>>(mut self, title: T) -> Self {
61 self.title = Some(title.into());
62 self
63 }
64
65 pub fn set_description<T: Into<String>>(mut self, description: T) -> Self {
66 self.description = Some(description.into());
67 self
68 }
69
70 pub fn set_processing_strategy(mut self, processing_strategy: ProcessingStrategy) -> Self {
71 self.processing_strategy = processing_strategy;
72 self
73 }
74
75 pub fn set_driver_metrics(mut self, enabled: bool) -> Self {
76 self.with_driver_metrics = enabled;
77 self
78 }
79
80 pub fn build(self) -> TelemetryDriver {
81 TelemetryDriver::new(
82 self.name,
83 self.title,
84 self.description,
85 self.processing_strategy,
86 self.with_driver_metrics,
87 )
88 }
89}
90
91impl Default for DriverBuilder {
92 fn default() -> Self {
93 Self {
94 name: None,
95 title: None,
96 description: None,
97 processing_strategy: ProcessingStrategy::default(),
98 with_driver_metrics: true,
99 }
100 }
101}
102
103#[derive(Clone)]
152pub struct TelemetryDriver {
153 descriptives: Descriptives,
154 drop_guard: Arc<DropGuard>,
155 sender: CrossbeamSender<DriverMessage>,
156}
157
158struct DropGuard {
159 pub is_running: Arc<AtomicBool>,
160}
161
162impl Drop for DropGuard {
163 fn drop(&mut self) {
164 self.is_running.store(false, Ordering::Relaxed);
165 }
166}
167
168impl TelemetryDriver {
169 pub fn new(
176 name: Option<String>,
177 title: Option<String>,
178 description: Option<String>,
179 processing_strategy: ProcessingStrategy,
180 with_driver_metrics: bool,
181 ) -> TelemetryDriver {
182 let is_running = Arc::new(AtomicBool::new(true));
183
184 let driver_metrics = if with_driver_metrics {
185 Some(DriverMetrics {
186 instruments: DriverInstruments::default(),
187 })
188 } else {
189 None
190 };
191
192 let (sender, receiver) = crossbeam_channel::unbounded();
193
194 let mut descriptives = Descriptives::default();
195 descriptives.name = name;
196 descriptives.title = title;
197 descriptives.description = description;
198
199 let driver = TelemetryDriver {
200 descriptives: descriptives.clone(),
201 drop_guard: Arc::new(DropGuard {
202 is_running: is_running.clone(),
203 }),
204 sender,
205 };
206
207 start_telemetry_loop(
208 descriptives,
209 is_running,
210 processing_strategy,
211 driver_metrics,
212 receiver,
213 );
214
215 driver
216 }
217
218 pub fn name(&self) -> Option<&str> {
220 self.descriptives.name.as_deref()
221 }
222
223 pub fn change_processing_stragtegy(&self, strategy: ProcessingStrategy) {
225 let _ = self
226 .sender
227 .send(DriverMessage::SetProcessingStrategy(strategy));
228 }
229
230 pub fn pause(&self) {
232 let _ = self.sender.send(DriverMessage::Pause);
233 }
234
235 pub fn resume(&self) {
237 let _ = self.sender.send(DriverMessage::Resume);
238 }
239
240 pub fn snapshot(&self, descriptive: bool) -> Result<Snapshot, GetSnapshotError> {
241 let snapshot = Snapshot::default();
242 let (tx, rx) = crossbeam_channel::unbounded();
243 let _ = self
244 .sender
245 .send(DriverMessage::GetSnapshotSync(snapshot, tx, descriptive));
246 rx.recv().map_err(|_err| GetSnapshotError)
247 }
248
249 pub fn snapshot_async(
250 &self,
251 descriptive: bool,
252 ) -> impl Future<Output = Result<Snapshot, GetSnapshotError>> + Send + 'static {
253 let snapshot = Snapshot::default();
254 let (tx, rx) = oneshot::channel();
255 let _ = self
256 .sender
257 .send(DriverMessage::GetSnapshotAsync(snapshot, tx, descriptive));
258 rx.map_err(|_| GetSnapshotError)
259 }
260}
261
262#[derive(Clone, Copy, Debug)]
263pub struct GetSnapshotError;
264
265impl ::std::error::Error for GetSnapshotError {
266 fn description(&self) -> &str {
267 "could not create a snapshot"
268 }
269
270 fn cause(&self) -> Option<&dyn ::std::error::Error> {
271 None
272 }
273}
274
275impl fmt::Display for GetSnapshotError {
276 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
277 write!(f, "could not create a snapshot")
278 }
279}
280
281impl ProcessesTelemetryMessages for TelemetryDriver {
282 fn process(&mut self, _max: usize, _strategy: ProcessingStrategy) -> ProcessingOutcome {
284 ProcessingOutcome::default()
285 }
286}
287
288impl PutsSnapshot for TelemetryDriver {
289 fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
290 if let Ok(snapshot) = self.snapshot(descriptive) {
291 snapshot
292 .items
293 .into_iter()
294 .for_each(|(k, v)| into.push(k, v));
295 }
296 }
297}
298
299impl Default for TelemetryDriver {
300 fn default() -> TelemetryDriver {
301 TelemetryDriver::new(None, None, None, ProcessingStrategy::default(), true)
302 }
303}
304
305impl AggregatesProcessors for TelemetryDriver {
306 fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P) {
307 let _ = self
308 .sender
309 .send(DriverMessage::AddProcessor(Box::new(processor)));
310 }
311
312 fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
313 let _ = self
314 .sender
315 .send(DriverMessage::AddSnapshooter(Box::new(snapshooter)));
316 }
317}
318
319impl Descriptive for TelemetryDriver {
320 fn title(&self) -> Option<&str> {
321 self.descriptives.title.as_deref()
322 }
323
324 fn description(&self) -> Option<&str> {
325 self.descriptives.description.as_deref()
326 }
327}
328
329fn start_telemetry_loop(
330 descriptives: Descriptives,
331 is_running: Arc<AtomicBool>,
332 processing_strategy: ProcessingStrategy,
333 driver_metrics: Option<DriverMetrics>,
334 receiver: CrossbeamReceiver<DriverMessage>,
335) {
336 let builder = thread::Builder::new().name("metrix".to_string());
337 builder
338 .spawn(move || {
339 telemetry_loop(
340 descriptives,
341 &is_running,
342 processing_strategy,
343 driver_metrics,
344 receiver,
345 )
346 })
347 .unwrap();
348}
349
350enum DriverMessage {
351 AddProcessor(Box<dyn ProcessesTelemetryMessages>),
352 AddSnapshooter(Box<dyn PutsSnapshot>),
353 GetSnapshotSync(Snapshot, CrossbeamSender<Snapshot>, bool),
354 GetSnapshotAsync(Snapshot, oneshot::Sender<Snapshot>, bool),
355 SetProcessingStrategy(ProcessingStrategy),
356 Pause,
357 Resume,
358}
359
360fn telemetry_loop(
361 descriptives: Descriptives,
362 is_running: &AtomicBool,
363 processing_strategy: ProcessingStrategy,
364 mut driver_metrics: Option<DriverMetrics>,
365 receiver: CrossbeamReceiver<DriverMessage>,
366) {
367 let mut last_outcome_logged = Instant::now() - Duration::from_secs(60);
368 let mut dropped_since_last_logged = 0usize;
369
370 let mut processors: Vec<Box<dyn ProcessesTelemetryMessages>> = Vec::new();
371 let mut snapshooters: Vec<Box<dyn PutsSnapshot>> = Vec::new();
372
373 let mut processing_stragtegy = processing_strategy;
374
375 let mut paused = false;
376
377 loop {
378 if !is_running.load(Ordering::Relaxed) {
379 break;
380 }
381
382 let iteration_started = Instant::now();
383
384 match receiver.try_recv() {
385 Ok(message) => match message {
386 DriverMessage::AddProcessor(processor) => processors.push(processor),
387 DriverMessage::AddSnapshooter(snapshooter) => snapshooters.push(snapshooter),
388 DriverMessage::GetSnapshotSync(mut snapshot, back_channel, descriptive) => {
389 put_values_into_snapshot(
390 &mut snapshot,
391 &processors,
392 &snapshooters,
393 driver_metrics.as_mut(),
394 &descriptives,
395 descriptive,
396 );
397 let _ = back_channel.send(snapshot);
398 }
399 DriverMessage::GetSnapshotAsync(mut snapshot, back_channel, descriptive) => {
400 put_values_into_snapshot(
401 &mut snapshot,
402 &processors,
403 &snapshooters,
404 driver_metrics.as_mut(),
405 &descriptives,
406 descriptive,
407 );
408 let _ = back_channel.send(snapshot);
409 }
410 DriverMessage::SetProcessingStrategy(strategy) => {
411 util::log_info(&format!("Processing strategy changed to {:?}", strategy));
412 processing_stragtegy = strategy
413 }
414 DriverMessage::Pause => {
415 util::log_info("pausing");
416 paused = true
417 }
418 DriverMessage::Resume => {
419 paused = {
420 util::log_info("resuming");
421 false
422 }
423 }
424 },
425 Err(TryRecvError::Empty) => {}
426 Err(TryRecvError::Disconnected) => {
427 util::log_warning(
428 "Driver failed to receive message. Channel disconnected. Exiting",
429 );
430 break;
431 }
432 }
433
434 if paused {
435 thread::sleep(Duration::from_millis(50));
436 continue;
437 }
438
439 let run_started = Instant::now();
440 let outcome = do_a_run(&mut processors, 1_000, processing_stragtegy);
441 let run_time = run_started.elapsed();
442
443 dropped_since_last_logged += outcome.dropped;
444
445 if dropped_since_last_logged > 0 && last_outcome_logged.elapsed() > Duration::from_secs(5) {
446 log_outcome(dropped_since_last_logged);
447 last_outcome_logged = Instant::now();
448 dropped_since_last_logged = 0;
449 }
450
451 if let Some(ref mut driver_metrics) = driver_metrics {
452 driver_metrics.update_post_collection(&outcome, run_started);
453 }
454
455 if outcome.dropped > 0 || outcome.processed > 100 {
456 continue;
457 }
458
459 let finished = Instant::now();
460 let elapsed = finished - run_started;
461 if outcome.dropped == 0 && elapsed < Duration::from_millis(10) {
462 thread::sleep(Duration::from_millis(10) - elapsed);
463 }
464 report_elapsed_stats(iteration_started, run_time, driver_metrics.as_mut());
465 }
466
467 util::log_info("Metrix driver stopped");
468}
469
470fn do_a_run(
471 processors: &mut [Box<dyn ProcessesTelemetryMessages>],
472 max: usize,
473 strategy: ProcessingStrategy,
474) -> ProcessingOutcome {
475 let mut outcome = ProcessingOutcome::default();
476
477 for processor in processors.iter_mut() {
478 outcome.combine_with(&processor.process(max, strategy));
479 }
480
481 outcome
482}
483
484fn report_elapsed_stats(
485 iteration_started: Instant,
486 run_time: Duration,
487 metrics: Option<&mut DriverMetrics>,
488) {
489 if let Some(metrics) = metrics {
490 let iteration_time = iteration_started.elapsed().as_secs_f64();
491 let run_time = run_time.as_secs_f64();
492
493 if iteration_time > 0.0 {
494 let ratio = run_time / iteration_time;
495 metrics.update_run_ratio(ratio);
496 }
497 }
498}
499
500fn put_values_into_snapshot(
501 into: &mut Snapshot,
502 processors: &[Box<dyn ProcessesTelemetryMessages>],
503 snapshooters: &[Box<dyn PutsSnapshot>],
504 driver_metrics: Option<&mut DriverMetrics>,
505 descriptives: &Descriptives,
506 descriptive: bool,
507) {
508 let started = Instant::now();
509
510 if let Some(ref name) = descriptives.name {
511 let mut new_level = Snapshot::default();
512 add_snapshot_values(
513 &mut new_level,
514 &processors,
515 &snapshooters,
516 driver_metrics,
517 &descriptives,
518 descriptive,
519 started,
520 );
521 into.items
522 .push((name.clone(), ItemKind::Snapshot(new_level)));
523 } else {
524 add_snapshot_values(
525 into,
526 &processors,
527 &snapshooters,
528 driver_metrics,
529 &descriptives,
530 descriptive,
531 started,
532 );
533 }
534}
535
536fn add_snapshot_values(
537 into: &mut Snapshot,
538 processors: &[Box<dyn ProcessesTelemetryMessages>],
539 snapshooters: &[Box<dyn PutsSnapshot>],
540 driver_metrics: Option<&mut DriverMetrics>,
541 descriptives: &Descriptives,
542 descriptive: bool,
543 started: Instant,
544) {
545 util::put_default_descriptives(descriptives, into, descriptive);
546 processors
547 .iter()
548 .for_each(|p| p.put_snapshot(into, descriptive));
549
550 snapshooters
551 .iter()
552 .for_each(|s| s.put_snapshot(into, descriptive));
553
554 if let Some(driver_metrics) = driver_metrics {
555 driver_metrics.update_post_snapshot(started);
556 driver_metrics.put_snapshot(into, descriptive);
557 }
558}
559
560#[derive(Clone)]
561struct Descriptives {
562 pub name: Option<String>,
563 pub title: Option<String>,
564 pub description: Option<String>,
565}
566
567impl Default for Descriptives {
568 fn default() -> Self {
569 Self {
570 name: None,
571 title: None,
572 description: None,
573 }
574 }
575}
576
577impl Descriptive for Descriptives {
578 fn title(&self) -> Option<&str> {
579 self.title.as_deref()
580 }
581
582 fn description(&self) -> Option<&str> {
583 self.description.as_deref()
584 }
585}
586
587#[cfg(feature = "log")]
588#[inline]
589fn log_outcome(dropped: usize) {
590 warn!("{} observations have been dropped.", dropped);
591}
592
593#[cfg(not(feature = "log"))]
594#[inline]
595fn log_outcome(_dropped: usize) {}
596
597struct DriverMetrics {
598 instruments: DriverInstruments,
599}
600
601impl DriverMetrics {
602 pub fn update_post_collection(
603 &mut self,
604 outcome: &ProcessingOutcome,
605 collection_started: Instant,
606 ) {
607 self.instruments
608 .update_post_collection(outcome, collection_started);
609 }
610
611 pub fn update_post_snapshot(&mut self, snapshot_started: Instant) {
612 self.instruments.update_post_snapshot(snapshot_started);
613 }
614
615 pub fn put_snapshot(&mut self, into: &mut Snapshot, descriptive: bool) {
616 self.instruments.put_snapshot(into, descriptive);
617 }
618 pub fn update_run_ratio(&mut self, ratio: f64) {
619 let now = Instant::now();
620 let per_mille = (ratio * 1_000f64) as u64;
621
622 self.instruments
623 .iteration_update_per_mille_ratio_histo
624 .update(&Update::ObservationWithValue(per_mille.into(), now));
625 self.instruments
626 .iteration_update_per_mille_ratio
627 .update(&Update::ObservationWithValue(per_mille.into(), now));
628 }
629}
630
631struct DriverInstruments {
632 collections_per_second: Meter,
633 collection_times_us: Histogram,
634 observations_processed_per_second: Meter,
635 observations_processed_per_collection: Histogram,
636 observations_dropped_per_second: Meter,
637 observations_dropped_per_collection: Histogram,
638 observations_enqueued: Gauge,
639 instruments_updated_per_second: Meter,
640 snapshots_per_second: Meter,
641 snapshots_times_us: Histogram,
642 dropped_observations_alarm: StaircaseTimer,
643 inactivity_alarm: NonOccurrenceIndicator,
644 iteration_update_per_mille_ratio_histo: Histogram,
645 iteration_update_per_mille_ratio: Gauge,
646}
647
648impl Default for DriverInstruments {
649 fn default() -> Self {
650 DriverInstruments {
651 collections_per_second: Meter::new_with_defaults("collections_per_second"),
652 collection_times_us: Histogram::new_with_defaults("collection_times_us"),
653 observations_processed_per_second: Meter::new_with_defaults(
654 "observations_processed_per_second",
655 ),
656 observations_processed_per_collection: Histogram::new_with_defaults(
657 "observations_processed_per_collection",
658 ),
659 observations_dropped_per_second: Meter::new_with_defaults(
660 "observations_dropped_per_second",
661 ),
662 observations_dropped_per_collection: Histogram::new_with_defaults(
663 "observations_dropped_per_collection",
664 ),
665 observations_enqueued: Gauge::new_with_defaults("observations_enqueued")
666 .tracking(60)
667 .group_values(true),
668 instruments_updated_per_second: Meter::new_with_defaults(
669 "instruments_updated_per_second",
670 ),
671 snapshots_per_second: Meter::new_with_defaults("snapshots_per_second"),
672 snapshots_times_us: Histogram::new_with_defaults("snapshots_times_us"),
673 dropped_observations_alarm: StaircaseTimer::new_with_defaults(
674 "dropped_observations_alarm",
675 ),
676 inactivity_alarm: NonOccurrenceIndicator::new_with_defaults("inactivity_alarm"),
677 iteration_update_per_mille_ratio_histo: Histogram::new_with_defaults(
678 "iteration_update_ratio_per_mille_histo",
679 ),
680 iteration_update_per_mille_ratio: Gauge::new_with_defaults(
681 "iteration_update_ratio_per_mille",
682 )
683 .tracking(60)
684 .group_values(true),
685 }
686 }
687}
688
689impl DriverInstruments {
690 pub fn update_post_collection(
691 &mut self,
692 outcome: &ProcessingOutcome,
693 collection_started: Instant,
694 ) {
695 let now = Instant::now();
696 self.collections_per_second
697 .update(&Update::Observation(now));
698 self.collection_times_us
699 .update(&Update::ObservationWithValue(
700 (now - collection_started).into(),
701 now,
702 ));
703 if outcome.processed > 0 {
704 self.observations_processed_per_second
705 .update(&Update::Observations(outcome.processed as u64, now));
706 self.observations_processed_per_collection
707 .update(&Update::ObservationWithValue(outcome.processed.into(), now));
708 }
709 self.observations_enqueued
710 .update(&Update::ObservationWithValue(
711 outcome.observations_enqueued.into(),
712 now,
713 ));
714 if outcome.dropped > 0 {
715 self.observations_dropped_per_second
716 .update(&Update::Observations(outcome.dropped as u64, now));
717 self.observations_dropped_per_collection
718 .update(&Update::ObservationWithValue(outcome.dropped.into(), now));
719 self.dropped_observations_alarm
720 .update(&Update::Observation(now));
721 }
722 if outcome.instruments_updated > 0 {
723 self.instruments_updated_per_second
724 .update(&Update::Observations(
725 outcome.instruments_updated as u64,
726 now,
727 ));
728 }
729 self.inactivity_alarm.update(&Update::Observation(now));
730 }
731
732 pub fn update_post_snapshot(&mut self, snapshot_started: Instant) {
733 let now = Instant::now();
734 self.snapshots_per_second.update(&Update::Observation(now));
735 self.snapshots_times_us
736 .update(&Update::ObservationWithValue(
737 (now - snapshot_started).into(),
738 now,
739 ));
740 }
741
742 pub fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
743 let mut container = Snapshot::default();
744 self.collections_per_second
745 .put_snapshot(&mut container, descriptive);
746 self.collection_times_us
747 .put_snapshot(&mut container, descriptive);
748 self.observations_processed_per_second
749 .put_snapshot(&mut container, descriptive);
750 self.observations_processed_per_collection
751 .put_snapshot(&mut container, descriptive);
752 self.observations_dropped_per_second
753 .put_snapshot(&mut container, descriptive);
754 self.observations_dropped_per_collection
755 .put_snapshot(&mut container, descriptive);
756 self.observations_enqueued
757 .put_snapshot(&mut container, descriptive);
758 self.instruments_updated_per_second
759 .put_snapshot(&mut container, descriptive);
760 self.snapshots_per_second
761 .put_snapshot(&mut container, descriptive);
762 self.snapshots_times_us
763 .put_snapshot(&mut container, descriptive);
764 self.dropped_observations_alarm
765 .put_snapshot(&mut container, descriptive);
766 self.inactivity_alarm
767 .put_snapshot(&mut container, descriptive);
768 self.iteration_update_per_mille_ratio
769 .put_snapshot(&mut container, descriptive);
770 self.iteration_update_per_mille_ratio_histo
771 .put_snapshot(&mut container, descriptive);
772
773 into.items
774 .push(("_metrix".into(), ItemKind::Snapshot(container)));
775 }
776}
777
778