1pub mod http_client;
5mod scheduler;
6pub mod store;
7
8use crate::{
9 config::Config,
10 data::{self, Application, Dependency, Host, Integration, Log, Payload, Telemetry},
11 metrics::{ContextKey, MetricBuckets, MetricContexts},
12};
13use libdd_common::Endpoint;
14use libdd_common::{hyper_migration, tag::Tag, worker::Worker};
15
16use std::fmt::Debug;
17use std::iter::Sum;
18use std::ops::Add;
19use std::{
20 collections::hash_map::DefaultHasher,
21 hash::{Hash, Hasher},
22 ops::ControlFlow,
23 sync::{
24 atomic::{AtomicU64, Ordering},
25 Arc, Condvar, Mutex,
26 },
27 time,
28};
29
30use crate::metrics::MetricBucketStats;
31use futures::{
32 channel::oneshot,
33 future::{self},
34};
35use http::{header, HeaderValue};
36use serde::{Deserialize, Serialize};
37use tokio::{
38 runtime::{self, Handle},
39 sync::mpsc,
40 task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use tracing::debug;
44
45const CONTINUE: ControlFlow<()> = ControlFlow::Continue(());
46const BREAK: ControlFlow<()> = ControlFlow::Break(());
47
48fn time_now() -> f64 {
49 #[allow(clippy::unwrap_used)]
50 std::time::SystemTime::UNIX_EPOCH
51 .elapsed()
52 .unwrap_or_default()
53 .as_secs_f64()
54}
55
56macro_rules! telemetry_worker_log {
57 ($worker:expr , ERROR , $fmt_str:tt, $($arg:tt)*) => {
58 {
59 debug!(
60 worker.runtime_id = %$worker.runtime_id,
61 worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
62 $fmt_str,
63 $($arg)*
64 );
65 if $worker.config.telemetry_debug_logging_enabled {
66 eprintln!(concat!("{}: Telemetry worker ERROR: ", $fmt_str), time_now(), $($arg)*);
67 }
68 }
69 };
70 ($worker:expr , DEBUG , $fmt_str:tt, $($arg:tt)*) => {
71 {
72 debug!(
73 worker.runtime_id = %$worker.runtime_id,
74 worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
75 $fmt_str,
76 $($arg)*
77 );
78 if $worker.config.telemetry_debug_logging_enabled {
79 println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*);
80 }
81 }
82 };
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub enum TelemetryActions {
87 AddPoint((f64, ContextKey, Vec<Tag>)),
88 AddConfig(data::Configuration),
89 AddDependency(Dependency),
90 AddIntegration(Integration),
91 AddLog((LogIdentifier, Log)),
92 Lifecycle(LifecycleAction),
93 #[serde(skip)]
94 CollectStats(oneshot::Sender<TelemetryWorkerStats>),
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98pub enum LifecycleAction {
99 Start,
100 Stop,
101 FlushMetricAggr,
102 FlushData,
103 ExtendedHeartbeat,
104}
105
106#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
111pub struct LogIdentifier {
112 pub identifier: u64,
114}
115
116#[derive(Debug)]
118struct TelemetryWorkerData {
119 started: bool,
120 dependencies: store::Store<Dependency>,
121 configurations: store::Store<data::Configuration>,
122 integrations: store::Store<data::Integration>,
123 logs: store::QueueHashMap<LogIdentifier, Log>,
124 metric_contexts: MetricContexts,
125 metric_buckets: MetricBuckets,
126 host: Host,
127 app: Application,
128}
129
130pub struct TelemetryWorker {
131 flavor: TelemetryWorkerFlavor,
132 config: Config,
133 mailbox: mpsc::Receiver<TelemetryActions>,
134 cancellation_token: CancellationToken,
135 seq_id: AtomicU64,
136 runtime_id: String,
137 client: Box<dyn http_client::HttpClient + Sync + Send>,
138 deadlines: scheduler::Scheduler<LifecycleAction>,
139 data: TelemetryWorkerData,
140}
141impl Debug for TelemetryWorker {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("TelemetryWorker")
144 .field("flavor", &self.flavor)
145 .field("config", &self.config)
146 .field("mailbox", &self.mailbox)
147 .field("cancellation_token", &self.cancellation_token)
148 .field("seq_id", &self.seq_id)
149 .field("runtime_id", &self.runtime_id)
150 .field("deadlines", &self.deadlines)
151 .field("data", &self.data)
152 .finish()
153 }
154}
155
156impl Worker for TelemetryWorker {
157 async fn run(&mut self) {
160 debug!(
161 worker.flavor = ?self.flavor,
162 worker.runtime_id = %self.runtime_id,
163 "Starting telemetry worker"
164 );
165
166 loop {
167 if self.cancellation_token.is_cancelled() {
168 debug!(
169 worker.runtime_id = %self.runtime_id,
170 "Telemetry worker cancelled, shutting down"
171 );
172 return;
173 }
174
175 let action = self.recv_next_action().await;
176 debug!(
177 worker.runtime_id = %self.runtime_id,
178 action = ?action,
179 "Received telemetry action"
180 );
181
182 let action_result = match self.flavor {
183 TelemetryWorkerFlavor::Full => self.dispatch_action(action).await,
184 TelemetryWorkerFlavor::MetricsLogs => {
185 self.dispatch_metrics_logs_action(action).await
186 }
187 };
188
189 match action_result {
190 ControlFlow::Continue(()) => {}
191 ControlFlow::Break(()) => {
192 debug!(
193 worker.runtime_id = %self.runtime_id,
194 worker.restartable = self.config.restartable,
195 "Telemetry worker received break signal"
196 );
197 if !self.config.restartable {
198 break;
199 }
200 }
201 };
202 }
203
204 debug!(
205 worker.runtime_id = %self.runtime_id,
206 "Telemetry worker stopped"
207 );
208 }
209}
210
211#[derive(Debug, Default, Serialize, Deserialize)]
212pub struct TelemetryWorkerStats {
213 pub dependencies_stored: u32,
214 pub dependencies_unflushed: u32,
215 pub configurations_stored: u32,
216 pub configurations_unflushed: u32,
217 pub integrations_stored: u32,
218 pub integrations_unflushed: u32,
219 pub logs: u32,
220 pub metric_contexts: u32,
221 pub metric_buckets: MetricBucketStats,
222}
223
224impl Add for TelemetryWorkerStats {
225 type Output = Self;
226
227 fn add(self, rhs: Self) -> Self::Output {
228 TelemetryWorkerStats {
229 dependencies_stored: self.dependencies_stored + rhs.dependencies_stored,
230 dependencies_unflushed: self.dependencies_unflushed + rhs.dependencies_unflushed,
231 configurations_stored: self.configurations_stored + rhs.configurations_stored,
232 configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed,
233 integrations_stored: self.integrations_stored + rhs.integrations_stored,
234 integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed,
235 logs: self.logs + rhs.logs,
236 metric_contexts: self.metric_contexts + rhs.metric_contexts,
237 metric_buckets: MetricBucketStats {
238 buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets,
239 series: self.metric_buckets.series + rhs.metric_buckets.series,
240 series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points,
241 distributions: self.metric_buckets.distributions
242 + self.metric_buckets.distributions,
243 distributions_points: self.metric_buckets.distributions_points
244 + self.metric_buckets.distributions_points,
245 },
246 }
247 }
248}
249
250impl Sum for TelemetryWorkerStats {
251 fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
252 iter.fold(Self::default(), |a, b| a + b)
253 }
254}
255
256mod serialize {
257 use crate::data;
258 use http::HeaderValue;
259 #[allow(clippy::declare_interior_mutable_const)]
260 pub const CONTENT_TYPE_VALUE: HeaderValue = libdd_common::header::APPLICATION_JSON;
261 pub fn serialize(telemetry: &data::Telemetry) -> anyhow::Result<Vec<u8>> {
262 Ok(serde_json::to_vec(telemetry)?)
263 }
264}
265
266impl TelemetryWorker {
267 fn log_err(&self, err: &anyhow::Error) {
268 telemetry_worker_log!(self, ERROR, "{}", err);
269 }
270
271 async fn recv_next_action(&mut self) -> TelemetryActions {
272 let action = if let Some((deadline, deadline_action)) = self.deadlines.next_deadline() {
273 if deadline
275 .checked_duration_since(time::Instant::now())
276 .is_none()
277 {
278 return TelemetryActions::Lifecycle(*deadline_action);
279 };
280
281 match tokio::time::timeout_at(deadline.into(), self.mailbox.recv()).await {
283 Ok(mailbox_action) => mailbox_action,
284 Err(_) => Some(TelemetryActions::Lifecycle(*deadline_action)),
285 }
286 } else {
287 self.mailbox.recv().await
288 };
289
290 action.unwrap_or_else(|| {
292 self.config.restartable = false;
294 TelemetryActions::Lifecycle(LifecycleAction::Stop)
295 })
296 }
297
298 async fn dispatch_metrics_logs_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
299 telemetry_worker_log!(self, DEBUG, "Handling metric action {:?}", action);
300 use LifecycleAction::*;
301 use TelemetryActions::*;
302 match action {
303 Lifecycle(Start) => {
304 if !self.data.started {
305 #[allow(clippy::unwrap_used)]
306 self.deadlines
307 .schedule_event(LifecycleAction::FlushMetricAggr)
308 .unwrap();
309
310 #[allow(clippy::unwrap_used)]
311 self.deadlines
312 .schedule_event(LifecycleAction::FlushData)
313 .unwrap();
314 self.data.started = true;
315 }
316 }
317 AddLog((identifier, log)) => {
318 let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
319 if !new {
320 l.count += 1;
321 }
322 }
323 AddPoint((point, key, extra_tags)) => {
324 self.data.metric_buckets.add_point(key, point, extra_tags)
325 }
326 Lifecycle(FlushMetricAggr) => {
327 self.data.metric_buckets.flush_aggregates();
328
329 #[allow(clippy::unwrap_used)]
330 self.deadlines
331 .schedule_event(LifecycleAction::FlushMetricAggr)
332 .unwrap();
333 }
334 Lifecycle(FlushData) => {
335 if !(self.data.started || self.config.restartable) {
336 return CONTINUE;
337 }
338
339 #[allow(clippy::unwrap_used)]
340 self.deadlines
341 .schedule_event(LifecycleAction::FlushData)
342 .unwrap();
343
344 let batch = self.build_observability_batch();
345 if !batch.is_empty() {
346 let payload = data::Payload::MessageBatch(batch);
347 match self.send_payload(&payload).await {
348 Ok(()) => self.payload_sent_success(&payload),
349 Err(e) => self.log_err(&e),
350 }
351 }
352 }
353 AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {}
354 Lifecycle(Stop) => {
355 if !self.data.started {
356 return BREAK;
357 }
358 self.data.metric_buckets.flush_aggregates();
359
360 let observability_events = self.build_observability_batch();
361 if let Err(e) = self
362 .send_payload(&data::Payload::MessageBatch(observability_events))
363 .await
364 {
365 self.log_err(&e);
366 }
367 self.data.started = false;
368 if !self.config.restartable {
369 self.deadlines.clear_pending();
370 }
371 return BREAK;
372 }
373 CollectStats(stats_sender) => {
374 stats_sender.send(self.stats()).ok();
375 }
376 };
377 CONTINUE
378 }
379
380 async fn dispatch_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
381 telemetry_worker_log!(self, DEBUG, "Handling action {:?}", action);
382
383 use LifecycleAction::*;
384 use TelemetryActions::*;
385 match action {
386 Lifecycle(Start) => {
387 if !self.data.started {
388 let app_started = data::Payload::AppStarted(self.build_app_started());
389 match self.send_payload(&app_started).await {
390 Ok(()) => self.payload_sent_success(&app_started),
391 Err(err) => self.log_err(&err),
392 }
393
394 #[allow(clippy::unwrap_used)]
395 self.deadlines
396 .schedule_event(LifecycleAction::FlushMetricAggr)
397 .unwrap();
398
399 #[allow(clippy::unwrap_used)]
400 self.deadlines
402 .schedule_event(LifecycleAction::FlushData)
403 .unwrap();
404 self.data.started = true;
405 }
406 }
407 AddDependency(dep) => self.data.dependencies.insert(dep),
408 AddIntegration(integration) => self.data.integrations.insert(integration),
409 AddConfig(cfg) => self.data.configurations.insert(cfg),
410 AddLog((identifier, log)) => {
411 let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
412 if !new {
413 l.count += 1;
414 }
415 }
416 AddPoint((point, key, extra_tags)) => {
417 self.data.metric_buckets.add_point(key, point, extra_tags)
418 }
419 Lifecycle(FlushMetricAggr) => {
420 self.data.metric_buckets.flush_aggregates();
421
422 #[allow(clippy::unwrap_used)]
423 self.deadlines
424 .schedule_event(LifecycleAction::FlushMetricAggr)
425 .unwrap();
426 }
427 Lifecycle(FlushData) => {
428 if !(self.data.started || self.config.restartable) {
429 return CONTINUE;
430 }
431
432 #[allow(clippy::unwrap_used)]
433 self.deadlines
434 .schedule_event(LifecycleAction::FlushData)
435 .unwrap();
436
437 let mut batch = self.build_app_events_batch();
438 let payload = if batch.is_empty() {
439 data::Payload::AppHeartbeat(())
440 } else {
441 batch.push(data::Payload::AppHeartbeat(()));
442 data::Payload::MessageBatch(batch)
443 };
444 match self.send_payload(&payload).await {
445 Ok(()) => self.payload_sent_success(&payload),
446 Err(err) => self.log_err(&err),
447 }
448
449 let batch = self.build_observability_batch();
450 if !batch.is_empty() {
451 let payload = data::Payload::MessageBatch(batch);
452 match self.send_payload(&payload).await {
453 Ok(()) => self.payload_sent_success(&payload),
454 Err(err) => self.log_err(&err),
455 }
456 }
457 }
458 Lifecycle(ExtendedHeartbeat) => {
459 self.data.dependencies.unflush_stored();
460 self.data.integrations.unflush_stored();
461 self.data.configurations.unflush_stored();
462
463 let app_started = data::Payload::AppStarted(self.build_app_started());
464 match self.send_payload(&app_started).await {
465 Ok(()) => self.payload_sent_success(&app_started),
466 Err(err) => self.log_err(&err),
467 }
468 #[allow(clippy::unwrap_used)]
469 self.deadlines
470 .schedule_events(
471 &mut [
472 LifecycleAction::FlushData,
473 LifecycleAction::ExtendedHeartbeat,
474 ]
475 .into_iter(),
476 )
477 .unwrap();
478 }
479 Lifecycle(Stop) => {
480 if !self.data.started {
481 return BREAK;
482 }
483 self.data.metric_buckets.flush_aggregates();
484
485 let mut app_events = self.build_app_events_batch();
486 app_events.push(data::Payload::AppClosing(()));
487
488 let observability_events = self.build_observability_batch();
489
490 let mut payloads = vec![data::Payload::MessageBatch(app_events)];
491 if !observability_events.is_empty() {
492 payloads.push(data::Payload::MessageBatch(observability_events));
493 }
494
495 let self_arc = Arc::new(tokio::sync::RwLock::new(&mut *self));
496 let futures = payloads.into_iter().map(|payload| {
497 let self_arc = self_arc.clone();
498 async move {
499 let res = {
503 let self_rguard = self_arc.read().await;
504 self_rguard.send_payload(&payload).await
505 };
506 match res {
507 Ok(()) => self_arc.write().await.payload_sent_success(&payload),
508 Err(err) => self_arc.read().await.log_err(&err),
509 }
510 }
511 });
512 future::join_all(futures).await;
513
514 self.data.started = false;
515 if !self.config.restartable {
516 self.deadlines.clear_pending();
517 }
518
519 return BREAK;
520 }
521 CollectStats(stats_sender) => {
522 stats_sender.send(self.stats()).ok();
523 }
524 }
525
526 CONTINUE
527 }
528
529 fn build_app_events_batch(&mut self) -> Vec<Payload> {
531 let mut payloads = Vec::new();
532
533 if self.data.dependencies.flush_not_empty() {
534 payloads.push(data::Payload::AppDependenciesLoaded(
535 data::AppDependenciesLoaded {
536 dependencies: self.data.dependencies.unflushed().cloned().collect(),
537 },
538 ))
539 }
540 if self.data.integrations.flush_not_empty() {
541 payloads.push(data::Payload::AppIntegrationsChange(
542 data::AppIntegrationsChange {
543 integrations: self.data.integrations.unflushed().cloned().collect(),
544 },
545 ))
546 }
547 if self.data.configurations.flush_not_empty() {
548 payloads.push(data::Payload::AppClientConfigurationChange(
549 data::AppClientConfigurationChange {
550 configuration: self.data.configurations.unflushed().cloned().collect(),
551 },
552 ))
553 }
554 payloads
555 }
556
557 fn build_observability_batch(&mut self) -> Vec<Payload> {
559 let mut payloads = Vec::new();
560
561 let logs = self.build_logs();
562 if !logs.is_empty() {
563 payloads.push(data::Payload::Logs(logs));
564 }
565 let metrics = self.build_metrics_series();
566 if !metrics.series.is_empty() {
567 payloads.push(data::Payload::GenerateMetrics(metrics))
568 }
569 let distributions = self.build_metrics_distributions();
570 if !distributions.series.is_empty() {
571 payloads.push(data::Payload::Sketches(distributions))
572 }
573 payloads
574 }
575
576 fn build_metrics_distributions(&mut self) -> data::Distributions {
577 let mut series = Vec::new();
578 let context_guard = self.data.metric_contexts.lock();
579 for (context_key, extra_tags, points) in self.data.metric_buckets.flush_distributions() {
580 let Some(context) = context_guard.read(context_key) else {
581 telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
582 continue;
583 };
584 let mut tags = extra_tags;
585 tags.extend(context.tags.iter().cloned());
586 series.push(data::metrics::Distribution {
587 namespace: context.namespace,
588 metric: context.name.clone(),
589 tags,
590 sketch: data::metrics::SerializedSketch::B64 {
591 sketch_b64: base64::Engine::encode(
592 &base64::engine::general_purpose::STANDARD,
593 points.encode_to_vec(),
594 ),
595 },
596 common: context.common,
597 _type: context.metric_type,
598 interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(),
599 });
600 }
601 data::Distributions { series }
602 }
603
604 fn build_metrics_series(&mut self) -> data::GenerateMetrics {
605 let mut series = Vec::new();
606 let context_guard = self.data.metric_contexts.lock();
607 for (context_key, extra_tags, points) in self.data.metric_buckets.flush_series() {
608 let Some(context) = context_guard.read(context_key) else {
609 telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
610 continue;
611 };
612
613 let mut tags = extra_tags;
614 tags.extend(context.tags.iter().cloned());
615 series.push(data::metrics::Serie {
616 namespace: context.namespace,
617 metric: context.name.clone(),
618 tags,
619 points,
620 common: context.common,
621 _type: context.metric_type,
622 interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(),
623 });
624 }
625
626 data::GenerateMetrics { series }
627 }
628
629 fn build_app_started(&mut self) -> data::AppStarted {
630 data::AppStarted {
631 configuration: self.data.configurations.unflushed().cloned().collect(),
632 }
633 }
634
635 fn app_started_sent_success(&mut self, p: &data::AppStarted) {
636 self.data
637 .configurations
638 .removed_flushed(p.configuration.len());
639 }
640
641 fn payload_sent_success(&mut self, payload: &data::Payload) {
642 use data::Payload::*;
643 match payload {
644 AppStarted(p) => self.app_started_sent_success(p),
645 AppExtendedHeartbeat(p) => self.app_started_sent_success(p),
646 AppDependenciesLoaded(p) => {
647 self.data.dependencies.removed_flushed(p.dependencies.len())
648 }
649 AppIntegrationsChange(p) => {
650 self.data.integrations.removed_flushed(p.integrations.len())
651 }
652 AppClientConfigurationChange(p) => self
653 .data
654 .configurations
655 .removed_flushed(p.configuration.len()),
656 MessageBatch(batch) => {
657 for p in batch {
658 self.payload_sent_success(p);
659 }
660 }
661 Logs(p) => {
662 for _ in p {
663 self.data.logs.pop_front();
664 }
665 }
666 AppHeartbeat(()) | AppClosing(()) => {}
667 GenerateMetrics(_) | Sketches(_) => {}
668 }
669 }
670
671 fn build_logs(&self) -> Vec<Log> {
672 let logs = self.data.logs.iter().map(|(_, l)| l.clone()).collect();
674 logs
675 }
676
677 fn next_seq_id(&self) -> u64 {
678 self.seq_id.fetch_add(1, Ordering::Release)
679 }
680
681 async fn send_payload(&self, payload: &data::Payload) -> anyhow::Result<()> {
682 debug!(
683 worker.runtime_id = %self.runtime_id,
684 payload.type = payload.request_type(),
685 seq_id = self.seq_id.load(Ordering::Acquire),
686 "Sending telemetry payload"
687 );
688 let req = self.build_request(payload)?;
689 let result = self.send_request(req).await;
690 match &result {
691 Ok(resp) => debug!(
692 worker.runtime_id = %self.runtime_id,
693 payload.type = payload.request_type(),
694 response.status = resp.status().as_u16(),
695 "Successfully sent telemetry payload"
696 ),
697 Err(e) => debug!(
698 worker.runtime_id = %self.runtime_id,
699 payload.type = payload.request_type(),
700 error = ?e,
701 "Failed to send telemetry payload"
702 ),
703 }
704 Ok(())
705 }
706
707 fn build_request(
708 &self,
709 payload: &data::Payload,
710 ) -> anyhow::Result<hyper_migration::HttpRequest> {
711 let seq_id = self.next_seq_id();
712 let tel = Telemetry {
713 api_version: data::ApiVersion::V2,
714 tracer_time: time::SystemTime::UNIX_EPOCH
715 .elapsed()
716 .map_or(0, |d| d.as_secs()),
717 runtime_id: &self.runtime_id,
718 seq_id,
719 host: &self.data.host,
720 origin: None,
721 application: &self.data.app,
722 payload,
723 };
724
725 telemetry_worker_log!(self, DEBUG, "Prepared payload: {:?}", tel);
726
727 let req = http_client::request_builder(&self.config)?
728 .method(http::Method::POST)
729 .header(header::CONTENT_TYPE, serialize::CONTENT_TYPE_VALUE)
730 .header(
731 http_client::header::REQUEST_TYPE,
732 HeaderValue::from_static(payload.request_type()),
733 )
734 .header(
735 http_client::header::API_VERSION,
736 HeaderValue::from_static(data::ApiVersion::V2.to_str()),
737 )
738 .header(
739 http_client::header::LIBRARY_LANGUAGE,
740 tel.application.language_name.clone(),
742 )
743 .header(
744 http_client::header::LIBRARY_VERSION,
745 tel.application.tracer_version.clone(),
746 );
747
748 let body = hyper_migration::Body::from(serialize::serialize(&tel)?);
749 Ok(req.body(body)?)
750 }
751
752 async fn send_request(
753 &self,
754 req: hyper_migration::HttpRequest,
755 ) -> Result<hyper_migration::HttpResponse, hyper_migration::Error> {
756 let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() {
757 endpoint.timeout_ms
758 } else {
759 Endpoint::DEFAULT_TIMEOUT
760 };
761
762 debug!(
763 worker.runtime_id = %self.runtime_id,
764 http.timeout_ms = timeout_ms,
765 "Sending HTTP request"
766 );
767
768 tokio::select! {
769 _ = self.cancellation_token.cancelled() => {
770 debug!(
771 worker.runtime_id = %self.runtime_id,
772 "Telemetry request cancelled"
773 );
774 Err(hyper_migration::Error::Other(anyhow::anyhow!("Request cancelled")))
775 },
776 _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => {
777 debug!(
778 worker.runtime_id = %self.runtime_id,
779 http.timeout_ms = timeout_ms,
780 "Telemetry request timed out"
781 );
782 Err(hyper_migration::Error::Other(anyhow::anyhow!("Request timed out")))
783 },
784 r = self.client.request(req) => {
785 match r {
786 Ok(resp) => {
787 Ok(resp)
788 }
789 Err(e) => {
790 Err(e)
791 },
792 }
793 }
794 }
795 }
796
797 fn stats(&self) -> TelemetryWorkerStats {
798 TelemetryWorkerStats {
799 dependencies_stored: self.data.dependencies.len_stored() as u32,
800 dependencies_unflushed: self.data.dependencies.len_unflushed() as u32,
801 configurations_stored: self.data.configurations.len_stored() as u32,
802 configurations_unflushed: self.data.configurations.len_unflushed() as u32,
803 integrations_stored: self.data.integrations.len_stored() as u32,
804 integrations_unflushed: self.data.integrations.len_unflushed() as u32,
805 logs: self.data.logs.len() as u32,
806 metric_contexts: self.data.metric_contexts.lock().len() as u32,
807 metric_buckets: self.data.metric_buckets.stats(),
808 }
809 }
810}
811
812#[derive(Debug)]
813struct InnerTelemetryShutdown {
814 is_shutdown: Mutex<bool>,
815 condvar: Condvar,
816}
817
818impl InnerTelemetryShutdown {
819 fn wait_for_shutdown(&self) {
820 drop(
821 #[allow(clippy::unwrap_used)]
822 self.condvar
823 .wait_while(self.is_shutdown.lock().unwrap(), |is_shutdown| {
824 !*is_shutdown
825 })
826 .unwrap(),
827 )
828 }
829
830 #[allow(clippy::unwrap_used)]
831 fn shutdown_finished(&self) {
832 *self.is_shutdown.lock().unwrap() = true;
833 self.condvar.notify_all();
834 }
835}
836
837#[derive(Clone, Debug)]
838pub struct TelemetryWorkerHandle {
846 sender: mpsc::Sender<TelemetryActions>,
847 shutdown: Arc<InnerTelemetryShutdown>,
848 cancellation_token: CancellationToken,
849 runtime: runtime::Handle,
851
852 contexts: MetricContexts,
853}
854
855impl TelemetryWorkerHandle {
856 pub fn register_metric_context(
857 &self,
858 name: String,
859 tags: Vec<Tag>,
860 metric_type: data::metrics::MetricType,
861 common: bool,
862 namespace: data::metrics::MetricNamespace,
863 ) -> ContextKey {
864 self.contexts
865 .register_metric_context(name, tags, metric_type, common, namespace)
866 }
867
868 pub fn try_send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
869 Ok(self.sender.try_send(msg)?)
870 }
871
872 pub async fn send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
873 Ok(self.sender.send(msg).await?)
874 }
875
876 pub async fn send_msgs<T>(&self, msgs: T) -> anyhow::Result<()>
877 where
878 T: IntoIterator<Item = TelemetryActions>,
879 {
880 for msg in msgs {
881 self.sender.send(msg).await?;
882 }
883
884 Ok(())
885 }
886
887 pub async fn send_msg_timeout(
888 &self,
889 msg: TelemetryActions,
890 timeout: time::Duration,
891 ) -> anyhow::Result<()> {
892 Ok(self.sender.send_timeout(msg, timeout).await?)
893 }
894
895 pub fn send_start(&self) -> anyhow::Result<()> {
896 Ok(self
897 .sender
898 .try_send(TelemetryActions::Lifecycle(LifecycleAction::Start))?)
899 }
900
901 pub fn send_stop(&self) -> anyhow::Result<()> {
902 Ok(self
903 .sender
904 .try_send(TelemetryActions::Lifecycle(LifecycleAction::Stop))?)
905 }
906
907 fn cancel_requests_with_deadline(&self, deadline: time::Instant) {
908 let token = self.cancellation_token.clone();
909 let f = async move {
910 tokio::time::sleep_until(deadline.into()).await;
911 token.cancel()
912 };
913 self.runtime.spawn(f);
914 }
915
916 pub fn wait_for_shutdown_deadline(&self, deadline: time::Instant) {
917 self.cancel_requests_with_deadline(deadline);
918 self.wait_for_shutdown()
919 }
920
921 pub fn add_dependency(&self, name: String, version: Option<String>) -> anyhow::Result<()> {
922 self.sender
923 .try_send(TelemetryActions::AddDependency(Dependency {
924 name,
925 version,
926 }))?;
927 Ok(())
928 }
929
930 pub fn add_integration(
931 &self,
932 name: String,
933 enabled: bool,
934 version: Option<String>,
935 compatible: Option<bool>,
936 auto_enabled: Option<bool>,
937 ) -> anyhow::Result<()> {
938 self.sender
939 .try_send(TelemetryActions::AddIntegration(Integration {
940 name,
941 version,
942 compatible,
943 enabled,
944 auto_enabled,
945 }))?;
946 Ok(())
947 }
948
949 pub fn add_log<T: Hash>(
950 &self,
951 identifier: T,
952 message: String,
953 level: data::LogLevel,
954 stack_trace: Option<String>,
955 ) -> anyhow::Result<()> {
956 let mut hasher = DefaultHasher::new();
957 identifier.hash(&mut hasher);
958 self.sender.try_send(TelemetryActions::AddLog((
959 LogIdentifier {
960 identifier: hasher.finish(),
961 },
962 data::Log {
963 message,
964 level,
965 stack_trace,
966 count: 1,
967 tags: String::new(),
968 is_sensitive: false,
969 is_crash: false,
970 },
971 )))?;
972 Ok(())
973 }
974
975 pub fn add_point(
976 &self,
977 value: f64,
978 context: &ContextKey,
979 extra_tags: Vec<Tag>,
980 ) -> anyhow::Result<()> {
981 self.sender
982 .try_send(TelemetryActions::AddPoint((value, *context, extra_tags)))?;
983 Ok(())
984 }
985
986 pub fn wait_for_shutdown(&self) {
987 self.shutdown.wait_for_shutdown();
988 }
989
990 pub fn stats(&self) -> anyhow::Result<oneshot::Receiver<TelemetryWorkerStats>> {
991 let (sender, receiver) = oneshot::channel();
992 self.sender
993 .try_send(TelemetryActions::CollectStats(sender))?;
994 Ok(receiver)
995 }
996}
997
998pub const MAX_ITEMS: usize = 5000;
1000
1001#[derive(Debug, Default, Clone, Copy)]
1002pub enum TelemetryWorkerFlavor {
1003 #[default]
1006 Full,
1007 MetricsLogs,
1009}
1010
1011pub struct TelemetryWorkerBuilder {
1012 pub host: Host,
1013 pub application: Application,
1014 pub runtime_id: Option<String>,
1015 pub dependencies: store::Store<data::Dependency>,
1016 pub integrations: store::Store<data::Integration>,
1017 pub configurations: store::Store<data::Configuration>,
1018 pub native_deps: bool,
1019 pub rust_shared_lib_deps: bool,
1020 pub config: Config,
1021 pub flavor: TelemetryWorkerFlavor,
1022}
1023
1024impl TelemetryWorkerBuilder {
1025 pub fn new_fetch_host(
1027 service_name: String,
1028 language_name: String,
1029 language_version: String,
1030 tracer_version: String,
1031 ) -> Self {
1032 Self {
1033 host: crate::build_host(),
1034 ..Self::new(
1035 String::new(),
1036 service_name,
1037 language_name,
1038 language_version,
1039 tracer_version,
1040 )
1041 }
1042 }
1043
1044 pub fn new(
1046 hostname: String,
1047 service_name: String,
1048 language_name: String,
1049 language_version: String,
1050 tracer_version: String,
1051 ) -> Self {
1052 Self {
1053 host: Host {
1054 hostname,
1055 ..Default::default()
1056 },
1057 application: Application {
1058 service_name,
1059 language_name,
1060 language_version,
1061 tracer_version,
1062 ..Default::default()
1063 },
1064 runtime_id: None,
1065 dependencies: store::Store::new(MAX_ITEMS),
1066 integrations: store::Store::new(MAX_ITEMS),
1067 configurations: store::Store::new(MAX_ITEMS),
1068 native_deps: true,
1069 rust_shared_lib_deps: false,
1070 config: Config::default(),
1071 flavor: TelemetryWorkerFlavor::default(),
1072 }
1073 }
1074
1075 pub fn build_worker(self, tokio_runtime: Handle) -> (TelemetryWorkerHandle, TelemetryWorker) {
1079 let (tx, mailbox) = mpsc::channel(5000);
1080 let shutdown = Arc::new(InnerTelemetryShutdown {
1081 is_shutdown: Mutex::new(false),
1082 condvar: Condvar::new(),
1083 });
1084 let contexts = MetricContexts::default();
1085 let token = CancellationToken::new();
1086 let config = self.config;
1087 let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval;
1088 let client = http_client::from_config(&config);
1089
1090 #[allow(clippy::unwrap_used)]
1091 let worker = TelemetryWorker {
1092 flavor: self.flavor,
1093 data: TelemetryWorkerData {
1094 started: false,
1095 dependencies: self.dependencies,
1096 integrations: self.integrations,
1097 configurations: self.configurations,
1098 logs: store::QueueHashMap::default(),
1099 metric_contexts: contexts.clone(),
1100 metric_buckets: MetricBuckets::default(),
1101 host: self.host,
1102 app: self.application,
1103 },
1104 config,
1105 mailbox,
1106 seq_id: AtomicU64::new(1),
1107 runtime_id: self
1108 .runtime_id
1109 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1110 client,
1111 deadlines: scheduler::Scheduler::new(vec![
1112 (
1113 MetricBuckets::METRICS_FLUSH_INTERVAL,
1114 LifecycleAction::FlushMetricAggr,
1115 ),
1116 (telemetry_heartbeat_interval, LifecycleAction::FlushData),
1117 (
1118 time::Duration::from_secs(60 * 60 * 24),
1119 LifecycleAction::ExtendedHeartbeat,
1120 ),
1121 ]),
1122 cancellation_token: token.clone(),
1123 };
1124
1125 (
1126 TelemetryWorkerHandle {
1127 sender: tx,
1128 shutdown,
1129 cancellation_token: token,
1130 runtime: tokio_runtime,
1131 contexts,
1132 },
1133 worker,
1134 )
1135 }
1136
1137 pub fn spawn(self) -> (TelemetryWorkerHandle, JoinHandle<()>) {
1140 let tokio_runtime = tokio::runtime::Handle::current();
1141
1142 let (worker_handle, mut worker) = self.build_worker(tokio_runtime.clone());
1143
1144 let join_handle = tokio_runtime.spawn(async move { worker.run().await });
1145
1146 (worker_handle, join_handle)
1147 }
1148
1149 pub fn run(self) -> anyhow::Result<TelemetryWorkerHandle> {
1151 let runtime = tokio::runtime::Builder::new_current_thread()
1152 .enable_all()
1153 .build()?;
1154 let (handle, mut worker) = self.build_worker(runtime.handle().clone());
1155 let notify_shutdown = handle.shutdown.clone();
1156 std::thread::spawn(move || {
1157 runtime.block_on(worker.run());
1158 runtime.shutdown_background();
1159 notify_shutdown.shutdown_finished();
1160 });
1161
1162 Ok(handle)
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use crate::worker::TelemetryWorkerHandle;
1169
1170 fn is_send<T: Send>(_: T) {}
1171 fn is_sync<T: Sync>(_: T) {}
1172
1173 #[test]
1174 fn test_handle_sync_send() {
1175 #[allow(clippy::redundant_closure)]
1176 let _ = |h: TelemetryWorkerHandle| is_send(h);
1177 #[allow(clippy::redundant_closure)]
1178 let _ = |h: TelemetryWorkerHandle| is_sync(h);
1179 }
1180}