1pub mod http_client;
5mod scheduler;
6pub mod store;
7
8use crate::{
9 config::Config,
10 data::{self, Application, Dependency, Host, Integration, Log, Payload, Telemetry},
11 metrics::{ContextKey, MetricBuckets, MetricContexts},
12};
13use libdd_common::Endpoint;
14use libdd_common::{hyper_migration, tag::Tag, worker::Worker};
15
16use std::iter::Sum;
17use std::ops::Add;
18use std::{
19 collections::hash_map::DefaultHasher,
20 hash::{Hash, Hasher},
21 ops::ControlFlow,
22 sync::{
23 atomic::{AtomicU64, Ordering},
24 Arc, Condvar, Mutex,
25 },
26 time,
27};
28use std::{fmt::Debug, time::Duration};
29
30use crate::metrics::MetricBucketStats;
31use futures::{
32 channel::oneshot,
33 future::{self},
34};
35use http::{header, HeaderValue};
36use serde::{Deserialize, Serialize};
37use tokio::{
38 runtime::{self, Handle},
39 sync::mpsc,
40 task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use tracing::debug;
44
45const CONTINUE: ControlFlow<()> = ControlFlow::Continue(());
46const BREAK: ControlFlow<()> = ControlFlow::Break(());
47
48fn time_now() -> f64 {
49 #[allow(clippy::unwrap_used)]
50 std::time::SystemTime::UNIX_EPOCH
51 .elapsed()
52 .unwrap_or_default()
53 .as_secs_f64()
54}
55
56macro_rules! telemetry_worker_log {
57 ($worker:expr , ERROR , $fmt_str:tt, $($arg:tt)*) => {
58 {
59 debug!(
60 worker.runtime_id = %$worker.runtime_id,
61 worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
62 $fmt_str,
63 $($arg)*
64 );
65 if $worker.config.telemetry_debug_logging_enabled {
66 eprintln!(concat!("{}: Telemetry worker ERROR: ", $fmt_str), time_now(), $($arg)*);
67 }
68 }
69 };
70 ($worker:expr , DEBUG , $fmt_str:tt, $($arg:tt)*) => {
71 {
72 debug!(
73 worker.runtime_id = %$worker.runtime_id,
74 worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
75 $fmt_str,
76 $($arg)*
77 );
78 if $worker.config.telemetry_debug_logging_enabled {
79 println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*);
80 }
81 }
82 };
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub enum TelemetryActions {
87 AddPoint((f64, ContextKey, Vec<Tag>)),
88 AddConfig(data::Configuration),
89 AddDependency(Dependency),
90 AddIntegration(Integration),
91 AddLog((LogIdentifier, Log)),
92 Lifecycle(LifecycleAction),
93 #[serde(skip)]
94 CollectStats(oneshot::Sender<TelemetryWorkerStats>),
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98pub enum LifecycleAction {
99 Start,
100 Stop,
101 FlushMetricAggr,
102 FlushData,
103 ExtendedHeartbeat,
104}
105
106#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
111pub struct LogIdentifier {
112 pub identifier: u64,
114}
115
116#[derive(Debug)]
118struct TelemetryWorkerData {
119 started: bool,
120 dependencies: store::Store<Dependency>,
121 configurations: store::Store<data::Configuration>,
122 integrations: store::Store<data::Integration>,
123 logs: store::QueueHashMap<LogIdentifier, Log>,
124 metric_contexts: MetricContexts,
125 metric_buckets: MetricBuckets,
126 host: Host,
127 app: Application,
128}
129
130pub struct TelemetryWorker {
131 flavor: TelemetryWorkerFlavor,
132 config: Config,
133 mailbox: mpsc::Receiver<TelemetryActions>,
134 cancellation_token: CancellationToken,
135 seq_id: AtomicU64,
136 runtime_id: String,
137 client: Box<dyn http_client::HttpClient + Sync + Send>,
138 metrics_flush_interval: Duration,
139 deadlines: scheduler::Scheduler<LifecycleAction>,
140 data: TelemetryWorkerData,
141}
142impl Debug for TelemetryWorker {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("TelemetryWorker")
145 .field("flavor", &self.flavor)
146 .field("config", &self.config)
147 .field("mailbox", &self.mailbox)
148 .field("cancellation_token", &self.cancellation_token)
149 .field("seq_id", &self.seq_id)
150 .field("runtime_id", &self.runtime_id)
151 .field("metrics_flush_interval", &self.metrics_flush_interval)
152 .field("deadlines", &self.deadlines)
153 .field("data", &self.data)
154 .finish()
155 }
156}
157
158impl Worker for TelemetryWorker {
159 async fn run(&mut self) {
162 debug!(
163 worker.flavor = ?self.flavor,
164 worker.runtime_id = %self.runtime_id,
165 "Starting telemetry worker"
166 );
167
168 loop {
169 if self.cancellation_token.is_cancelled() {
170 debug!(
171 worker.runtime_id = %self.runtime_id,
172 "Telemetry worker cancelled, shutting down"
173 );
174 return;
175 }
176
177 let action = self.recv_next_action().await;
178 debug!(
179 worker.runtime_id = %self.runtime_id,
180 action = ?action,
181 "Received telemetry action"
182 );
183
184 let action_result = match self.flavor {
185 TelemetryWorkerFlavor::Full => self.dispatch_action(action).await,
186 TelemetryWorkerFlavor::MetricsLogs => {
187 self.dispatch_metrics_logs_action(action).await
188 }
189 };
190
191 match action_result {
192 ControlFlow::Continue(()) => {}
193 ControlFlow::Break(()) => {
194 debug!(
195 worker.runtime_id = %self.runtime_id,
196 worker.restartable = self.config.restartable,
197 "Telemetry worker received break signal"
198 );
199 if !self.config.restartable {
200 break;
201 }
202 }
203 };
204 }
205
206 debug!(
207 worker.runtime_id = %self.runtime_id,
208 "Telemetry worker stopped"
209 );
210 }
211}
212
213#[derive(Debug, Default, Serialize, Deserialize)]
214pub struct TelemetryWorkerStats {
215 pub dependencies_stored: u32,
216 pub dependencies_unflushed: u32,
217 pub configurations_stored: u32,
218 pub configurations_unflushed: u32,
219 pub integrations_stored: u32,
220 pub integrations_unflushed: u32,
221 pub logs: u32,
222 pub metric_contexts: u32,
223 pub metric_buckets: MetricBucketStats,
224}
225
226impl Add for TelemetryWorkerStats {
227 type Output = Self;
228
229 fn add(self, rhs: Self) -> Self::Output {
230 TelemetryWorkerStats {
231 dependencies_stored: self.dependencies_stored + rhs.dependencies_stored,
232 dependencies_unflushed: self.dependencies_unflushed + rhs.dependencies_unflushed,
233 configurations_stored: self.configurations_stored + rhs.configurations_stored,
234 configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed,
235 integrations_stored: self.integrations_stored + rhs.integrations_stored,
236 integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed,
237 logs: self.logs + rhs.logs,
238 metric_contexts: self.metric_contexts + rhs.metric_contexts,
239 metric_buckets: MetricBucketStats {
240 buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets,
241 series: self.metric_buckets.series + rhs.metric_buckets.series,
242 series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points,
243 distributions: self.metric_buckets.distributions
244 + self.metric_buckets.distributions,
245 distributions_points: self.metric_buckets.distributions_points
246 + self.metric_buckets.distributions_points,
247 },
248 }
249 }
250}
251
252impl Sum for TelemetryWorkerStats {
253 fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
254 iter.fold(Self::default(), |a, b| a + b)
255 }
256}
257
258mod serialize {
259 use crate::data;
260 use http::HeaderValue;
261 #[allow(clippy::declare_interior_mutable_const)]
262 pub const CONTENT_TYPE_VALUE: HeaderValue = libdd_common::header::APPLICATION_JSON;
263 pub fn serialize(telemetry: &data::Telemetry) -> anyhow::Result<Vec<u8>> {
264 Ok(serde_json::to_vec(telemetry)?)
265 }
266}
267
268impl TelemetryWorker {
269 fn log_err(&self, err: &anyhow::Error) {
270 telemetry_worker_log!(self, ERROR, "{}", err);
271 }
272
273 async fn recv_next_action(&mut self) -> TelemetryActions {
274 let action = if let Some((deadline, deadline_action)) = self.deadlines.next_deadline() {
275 if deadline
277 .checked_duration_since(time::Instant::now())
278 .is_none()
279 {
280 return TelemetryActions::Lifecycle(*deadline_action);
281 };
282
283 match tokio::time::timeout_at(deadline.into(), self.mailbox.recv()).await {
285 Ok(mailbox_action) => mailbox_action,
286 Err(_) => Some(TelemetryActions::Lifecycle(*deadline_action)),
287 }
288 } else {
289 self.mailbox.recv().await
290 };
291
292 action.unwrap_or_else(|| {
294 self.config.restartable = false;
296 TelemetryActions::Lifecycle(LifecycleAction::Stop)
297 })
298 }
299
300 async fn dispatch_metrics_logs_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
301 telemetry_worker_log!(self, DEBUG, "Handling metric action {:?}", action);
302 use LifecycleAction::*;
303 use TelemetryActions::*;
304 match action {
305 Lifecycle(Start) => {
306 if !self.data.started {
307 #[allow(clippy::unwrap_used)]
308 self.deadlines
309 .schedule_event(LifecycleAction::FlushMetricAggr)
310 .unwrap();
311
312 #[allow(clippy::unwrap_used)]
313 self.deadlines
314 .schedule_event(LifecycleAction::FlushData)
315 .unwrap();
316 self.data.started = true;
317 }
318 }
319 AddLog((identifier, log)) => {
320 let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
321 if !new {
322 l.count += 1;
323 }
324 }
325 AddPoint((point, key, extra_tags)) => {
326 self.data.metric_buckets.add_point(key, point, extra_tags)
327 }
328 Lifecycle(FlushMetricAggr) => {
329 self.data.metric_buckets.flush_aggregates();
330
331 #[allow(clippy::unwrap_used)]
332 self.deadlines
333 .schedule_event(LifecycleAction::FlushMetricAggr)
334 .unwrap();
335 }
336 Lifecycle(FlushData) => {
337 if !(self.data.started || self.config.restartable) {
338 return CONTINUE;
339 }
340
341 #[allow(clippy::unwrap_used)]
342 self.deadlines
343 .schedule_event(LifecycleAction::FlushData)
344 .unwrap();
345
346 let batch = self.build_observability_batch();
347 if !batch.is_empty() {
348 let payload = data::Payload::MessageBatch(batch);
349 match self.send_payload(&payload).await {
350 Ok(()) => self.payload_sent_success(&payload),
351 Err(e) => self.log_err(&e),
352 }
353 }
354 }
355 AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {}
356 Lifecycle(Stop) => {
357 if !self.data.started {
358 return BREAK;
359 }
360 self.data.metric_buckets.flush_aggregates();
361
362 let observability_events = self.build_observability_batch();
363 if let Err(e) = self
364 .send_payload(&data::Payload::MessageBatch(observability_events))
365 .await
366 {
367 self.log_err(&e);
368 }
369 self.data.started = false;
370 if !self.config.restartable {
371 self.deadlines.clear_pending();
372 }
373 return BREAK;
374 }
375 CollectStats(stats_sender) => {
376 stats_sender.send(self.stats()).ok();
377 }
378 };
379 CONTINUE
380 }
381
382 async fn dispatch_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
383 telemetry_worker_log!(self, DEBUG, "Handling action {:?}", action);
384
385 use LifecycleAction::*;
386 use TelemetryActions::*;
387 match action {
388 Lifecycle(Start) => {
389 if !self.data.started {
390 let app_started = data::Payload::AppStarted(self.build_app_started());
391 match self.send_payload(&app_started).await {
392 Ok(()) => self.payload_sent_success(&app_started),
393 Err(err) => self.log_err(&err),
394 }
395
396 #[allow(clippy::unwrap_used)]
397 self.deadlines
398 .schedule_event(LifecycleAction::FlushMetricAggr)
399 .unwrap();
400
401 #[allow(clippy::unwrap_used)]
402 self.deadlines
404 .schedule_event(LifecycleAction::FlushData)
405 .unwrap();
406 self.data.started = true;
407 }
408 }
409 AddDependency(dep) => self.data.dependencies.insert(dep),
410 AddIntegration(integration) => self.data.integrations.insert(integration),
411 AddConfig(cfg) => self.data.configurations.insert(cfg),
412 AddLog((identifier, log)) => {
413 let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
414 if !new {
415 l.count += 1;
416 }
417 }
418 AddPoint((point, key, extra_tags)) => {
419 self.data.metric_buckets.add_point(key, point, extra_tags)
420 }
421 Lifecycle(FlushMetricAggr) => {
422 self.data.metric_buckets.flush_aggregates();
423
424 #[allow(clippy::unwrap_used)]
425 self.deadlines
426 .schedule_event(LifecycleAction::FlushMetricAggr)
427 .unwrap();
428 }
429 Lifecycle(FlushData) => {
430 if !(self.data.started || self.config.restartable) {
431 return CONTINUE;
432 }
433
434 #[allow(clippy::unwrap_used)]
435 self.deadlines
436 .schedule_event(LifecycleAction::FlushData)
437 .unwrap();
438
439 let mut batch = self.build_app_events_batch();
440 let payload = if batch.is_empty() {
441 data::Payload::AppHeartbeat(())
442 } else {
443 batch.push(data::Payload::AppHeartbeat(()));
444 data::Payload::MessageBatch(batch)
445 };
446 match self.send_payload(&payload).await {
447 Ok(()) => self.payload_sent_success(&payload),
448 Err(err) => self.log_err(&err),
449 }
450
451 let batch = self.build_observability_batch();
452 if !batch.is_empty() {
453 let payload = data::Payload::MessageBatch(batch);
454 match self.send_payload(&payload).await {
455 Ok(()) => self.payload_sent_success(&payload),
456 Err(err) => self.log_err(&err),
457 }
458 }
459 }
460 Lifecycle(ExtendedHeartbeat) => {
461 self.data.dependencies.unflush_stored();
462 self.data.integrations.unflush_stored();
463 self.data.configurations.unflush_stored();
464
465 let app_started = data::Payload::AppStarted(self.build_app_started());
466 match self.send_payload(&app_started).await {
467 Ok(()) => self.payload_sent_success(&app_started),
468 Err(err) => self.log_err(&err),
469 }
470 #[allow(clippy::unwrap_used)]
471 self.deadlines
472 .schedule_events(
473 &mut [
474 LifecycleAction::FlushData,
475 LifecycleAction::ExtendedHeartbeat,
476 ]
477 .into_iter(),
478 )
479 .unwrap();
480 }
481 Lifecycle(Stop) => {
482 if !self.data.started {
483 return BREAK;
484 }
485 self.data.metric_buckets.flush_aggregates();
486
487 let mut app_events = self.build_app_events_batch();
488 app_events.push(data::Payload::AppClosing(()));
489
490 let observability_events = self.build_observability_batch();
491
492 let mut payloads = vec![data::Payload::MessageBatch(app_events)];
493 if !observability_events.is_empty() {
494 payloads.push(data::Payload::MessageBatch(observability_events));
495 }
496
497 let self_arc = Arc::new(tokio::sync::RwLock::new(&mut *self));
498 let futures = payloads.into_iter().map(|payload| {
499 let self_arc = self_arc.clone();
500 async move {
501 let res = {
505 let self_rguard = self_arc.read().await;
506 self_rguard.send_payload(&payload).await
507 };
508 match res {
509 Ok(()) => self_arc.write().await.payload_sent_success(&payload),
510 Err(err) => self_arc.read().await.log_err(&err),
511 }
512 }
513 });
514 future::join_all(futures).await;
515
516 self.data.started = false;
517 if !self.config.restartable {
518 self.deadlines.clear_pending();
519 }
520
521 return BREAK;
522 }
523 CollectStats(stats_sender) => {
524 stats_sender.send(self.stats()).ok();
525 }
526 }
527
528 CONTINUE
529 }
530
531 fn build_app_events_batch(&mut self) -> Vec<Payload> {
533 let mut payloads = Vec::new();
534
535 if self.data.dependencies.flush_not_empty() {
536 payloads.push(data::Payload::AppDependenciesLoaded(
537 data::AppDependenciesLoaded {
538 dependencies: self.data.dependencies.unflushed().cloned().collect(),
539 },
540 ))
541 }
542 if self.data.integrations.flush_not_empty() {
543 payloads.push(data::Payload::AppIntegrationsChange(
544 data::AppIntegrationsChange {
545 integrations: self.data.integrations.unflushed().cloned().collect(),
546 },
547 ))
548 }
549 if self.data.configurations.flush_not_empty() {
550 payloads.push(data::Payload::AppClientConfigurationChange(
551 data::AppClientConfigurationChange {
552 configuration: self.data.configurations.unflushed().cloned().collect(),
553 },
554 ))
555 }
556 payloads
557 }
558
559 fn build_observability_batch(&mut self) -> Vec<Payload> {
561 let mut payloads = Vec::new();
562
563 let logs = self.build_logs();
564 if !logs.is_empty() {
565 payloads.push(data::Payload::Logs(logs));
566 }
567 let metrics = self.build_metrics_series();
568 if !metrics.series.is_empty() {
569 payloads.push(data::Payload::GenerateMetrics(metrics))
570 }
571 let distributions = self.build_metrics_distributions();
572 if !distributions.series.is_empty() {
573 payloads.push(data::Payload::Sketches(distributions))
574 }
575 payloads
576 }
577
578 fn build_metrics_distributions(&mut self) -> data::Distributions {
579 let mut series = Vec::new();
580 let context_guard = self.data.metric_contexts.lock();
581 for (context_key, extra_tags, points) in self.data.metric_buckets.flush_distributions() {
582 let Some(context) = context_guard.read(context_key) else {
583 telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
584 continue;
585 };
586 let mut tags = extra_tags;
587 tags.extend(context.tags.iter().cloned());
588 series.push(data::metrics::Distribution {
589 namespace: context.namespace,
590 metric: context.name.clone(),
591 tags,
592 sketch: data::metrics::SerializedSketch::B64 {
593 sketch_b64: base64::Engine::encode(
594 &base64::engine::general_purpose::STANDARD,
595 points.encode_to_vec(),
596 ),
597 },
598 common: context.common,
599 _type: context.metric_type,
600 interval: self.metrics_flush_interval.as_secs(),
601 });
602 }
603 data::Distributions { series }
604 }
605
606 fn build_metrics_series(&mut self) -> data::GenerateMetrics {
607 let mut series = Vec::new();
608 let context_guard = self.data.metric_contexts.lock();
609 for (context_key, extra_tags, points) in self.data.metric_buckets.flush_series() {
610 let Some(context) = context_guard.read(context_key) else {
611 telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
612 continue;
613 };
614
615 let mut tags = extra_tags;
616 tags.extend(context.tags.iter().cloned());
617 series.push(data::metrics::Serie {
618 namespace: context.namespace,
619 metric: context.name.clone(),
620 tags,
621 points,
622 common: context.common,
623 _type: context.metric_type,
624 interval: self.metrics_flush_interval.as_secs(),
625 });
626 }
627
628 data::GenerateMetrics { series }
629 }
630
631 fn build_app_started(&mut self) -> data::AppStarted {
632 data::AppStarted {
633 configuration: self.data.configurations.unflushed().cloned().collect(),
634 }
635 }
636
637 fn app_started_sent_success(&mut self, p: &data::AppStarted) {
638 self.data
639 .configurations
640 .removed_flushed(p.configuration.len());
641 }
642
643 fn payload_sent_success(&mut self, payload: &data::Payload) {
644 use data::Payload::*;
645 match payload {
646 AppStarted(p) => self.app_started_sent_success(p),
647 AppExtendedHeartbeat(p) => self.app_started_sent_success(p),
648 AppDependenciesLoaded(p) => {
649 self.data.dependencies.removed_flushed(p.dependencies.len())
650 }
651 AppIntegrationsChange(p) => {
652 self.data.integrations.removed_flushed(p.integrations.len())
653 }
654 AppClientConfigurationChange(p) => self
655 .data
656 .configurations
657 .removed_flushed(p.configuration.len()),
658 MessageBatch(batch) => {
659 for p in batch {
660 self.payload_sent_success(p);
661 }
662 }
663 Logs(p) => {
664 for _ in p {
665 self.data.logs.pop_front();
666 }
667 }
668 AppHeartbeat(()) | AppClosing(()) => {}
669 GenerateMetrics(_) | Sketches(_) => {}
670 }
671 }
672
673 fn build_logs(&self) -> Vec<Log> {
674 let logs = self.data.logs.iter().map(|(_, l)| l.clone()).collect();
676 logs
677 }
678
679 fn next_seq_id(&self) -> u64 {
680 self.seq_id.fetch_add(1, Ordering::Release)
681 }
682
683 async fn send_payload(&self, payload: &data::Payload) -> anyhow::Result<()> {
684 debug!(
685 worker.runtime_id = %self.runtime_id,
686 payload.type = payload.request_type(),
687 seq_id = self.seq_id.load(Ordering::Acquire),
688 "Sending telemetry payload"
689 );
690 let req = self.build_request(payload)?;
691 let result = self.send_request(req).await;
692 match &result {
693 Ok(resp) => debug!(
694 worker.runtime_id = %self.runtime_id,
695 payload.type = payload.request_type(),
696 response.status = resp.status().as_u16(),
697 "Successfully sent telemetry payload"
698 ),
699 Err(e) => debug!(
700 worker.runtime_id = %self.runtime_id,
701 payload.type = payload.request_type(),
702 error = ?e,
703 "Failed to send telemetry payload"
704 ),
705 }
706 Ok(())
707 }
708
709 fn build_request(
710 &self,
711 payload: &data::Payload,
712 ) -> anyhow::Result<hyper_migration::HttpRequest> {
713 let seq_id = self.next_seq_id();
714 let tel = Telemetry {
715 api_version: data::ApiVersion::V2,
716 tracer_time: time::SystemTime::UNIX_EPOCH
717 .elapsed()
718 .map_or(0, |d| d.as_secs()),
719 runtime_id: &self.runtime_id,
720 seq_id,
721 host: &self.data.host,
722 origin: None,
723 application: &self.data.app,
724 payload,
725 };
726
727 telemetry_worker_log!(self, DEBUG, "Prepared payload: {:?}", tel);
728
729 let req = http_client::request_builder(&self.config)?
730 .method(http::Method::POST)
731 .header(header::CONTENT_TYPE, serialize::CONTENT_TYPE_VALUE)
732 .header(
733 http_client::header::REQUEST_TYPE,
734 HeaderValue::from_static(payload.request_type()),
735 )
736 .header(
737 http_client::header::API_VERSION,
738 HeaderValue::from_static(data::ApiVersion::V2.to_str()),
739 )
740 .header(
741 http_client::header::LIBRARY_LANGUAGE,
742 tel.application.language_name.clone(),
744 )
745 .header(
746 http_client::header::LIBRARY_VERSION,
747 tel.application.tracer_version.clone(),
748 );
749
750 let body = hyper_migration::Body::from(serialize::serialize(&tel)?);
751 Ok(req.body(body)?)
752 }
753
754 async fn send_request(
755 &self,
756 req: hyper_migration::HttpRequest,
757 ) -> Result<hyper_migration::HttpResponse, hyper_migration::Error> {
758 let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() {
759 endpoint.timeout_ms
760 } else {
761 Endpoint::DEFAULT_TIMEOUT
762 };
763
764 debug!(
765 worker.runtime_id = %self.runtime_id,
766 http.timeout_ms = timeout_ms,
767 "Sending HTTP request"
768 );
769
770 tokio::select! {
771 _ = self.cancellation_token.cancelled() => {
772 debug!(
773 worker.runtime_id = %self.runtime_id,
774 "Telemetry request cancelled"
775 );
776 Err(hyper_migration::Error::Other(anyhow::anyhow!("Request cancelled")))
777 },
778 _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => {
779 debug!(
780 worker.runtime_id = %self.runtime_id,
781 http.timeout_ms = timeout_ms,
782 "Telemetry request timed out"
783 );
784 Err(hyper_migration::Error::Other(anyhow::anyhow!("Request timed out")))
785 },
786 r = self.client.request(req) => {
787 match r {
788 Ok(resp) => {
789 Ok(resp)
790 }
791 Err(e) => {
792 Err(e)
793 },
794 }
795 }
796 }
797 }
798
799 fn stats(&self) -> TelemetryWorkerStats {
800 TelemetryWorkerStats {
801 dependencies_stored: self.data.dependencies.len_stored() as u32,
802 dependencies_unflushed: self.data.dependencies.len_unflushed() as u32,
803 configurations_stored: self.data.configurations.len_stored() as u32,
804 configurations_unflushed: self.data.configurations.len_unflushed() as u32,
805 integrations_stored: self.data.integrations.len_stored() as u32,
806 integrations_unflushed: self.data.integrations.len_unflushed() as u32,
807 logs: self.data.logs.len() as u32,
808 metric_contexts: self.data.metric_contexts.lock().len() as u32,
809 metric_buckets: self.data.metric_buckets.stats(),
810 }
811 }
812}
813
814#[derive(Debug)]
815struct InnerTelemetryShutdown {
816 is_shutdown: Mutex<bool>,
817 condvar: Condvar,
818}
819
820impl InnerTelemetryShutdown {
821 fn wait_for_shutdown(&self) {
822 drop(
823 #[allow(clippy::unwrap_used)]
824 self.condvar
825 .wait_while(self.is_shutdown.lock().unwrap(), |is_shutdown| {
826 !*is_shutdown
827 })
828 .unwrap(),
829 )
830 }
831
832 #[allow(clippy::unwrap_used)]
833 fn shutdown_finished(&self) {
834 *self.is_shutdown.lock().unwrap() = true;
835 self.condvar.notify_all();
836 }
837}
838
839#[derive(Clone, Debug)]
840pub struct TelemetryWorkerHandle {
848 sender: mpsc::Sender<TelemetryActions>,
849 shutdown: Arc<InnerTelemetryShutdown>,
850 cancellation_token: CancellationToken,
851 runtime: runtime::Handle,
853
854 contexts: MetricContexts,
855}
856
857impl TelemetryWorkerHandle {
858 pub fn register_metric_context(
859 &self,
860 name: String,
861 tags: Vec<Tag>,
862 metric_type: data::metrics::MetricType,
863 common: bool,
864 namespace: data::metrics::MetricNamespace,
865 ) -> ContextKey {
866 self.contexts
867 .register_metric_context(name, tags, metric_type, common, namespace)
868 }
869
870 pub fn try_send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
871 Ok(self.sender.try_send(msg)?)
872 }
873
874 pub async fn send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
875 Ok(self.sender.send(msg).await?)
876 }
877
878 pub async fn send_msgs<T>(&self, msgs: T) -> anyhow::Result<()>
879 where
880 T: IntoIterator<Item = TelemetryActions>,
881 {
882 for msg in msgs {
883 self.sender.send(msg).await?;
884 }
885
886 Ok(())
887 }
888
889 pub async fn send_msg_timeout(
890 &self,
891 msg: TelemetryActions,
892 timeout: time::Duration,
893 ) -> anyhow::Result<()> {
894 Ok(self.sender.send_timeout(msg, timeout).await?)
895 }
896
897 pub fn send_start(&self) -> anyhow::Result<()> {
898 Ok(self
899 .sender
900 .try_send(TelemetryActions::Lifecycle(LifecycleAction::Start))?)
901 }
902
903 pub fn send_stop(&self) -> anyhow::Result<()> {
904 Ok(self
905 .sender
906 .try_send(TelemetryActions::Lifecycle(LifecycleAction::Stop))?)
907 }
908
909 fn cancel_requests_with_deadline(&self, deadline: time::Instant) {
910 let token = self.cancellation_token.clone();
911 let f = async move {
912 tokio::time::sleep_until(deadline.into()).await;
913 token.cancel()
914 };
915 self.runtime.spawn(f);
916 }
917
918 pub fn wait_for_shutdown_deadline(&self, deadline: time::Instant) {
919 self.cancel_requests_with_deadline(deadline);
920 self.wait_for_shutdown()
921 }
922
923 pub fn add_dependency(&self, name: String, version: Option<String>) -> anyhow::Result<()> {
924 self.sender
925 .try_send(TelemetryActions::AddDependency(Dependency {
926 name,
927 version,
928 }))?;
929 Ok(())
930 }
931
932 pub fn add_integration(
933 &self,
934 name: String,
935 enabled: bool,
936 version: Option<String>,
937 compatible: Option<bool>,
938 auto_enabled: Option<bool>,
939 ) -> anyhow::Result<()> {
940 self.sender
941 .try_send(TelemetryActions::AddIntegration(Integration {
942 name,
943 version,
944 compatible,
945 enabled,
946 auto_enabled,
947 }))?;
948 Ok(())
949 }
950
951 pub fn add_log<T: Hash>(
952 &self,
953 identifier: T,
954 message: String,
955 level: data::LogLevel,
956 stack_trace: Option<String>,
957 ) -> anyhow::Result<()> {
958 let mut hasher = DefaultHasher::new();
959 identifier.hash(&mut hasher);
960 self.sender.try_send(TelemetryActions::AddLog((
961 LogIdentifier {
962 identifier: hasher.finish(),
963 },
964 data::Log {
965 message,
966 level,
967 stack_trace,
968 count: 1,
969 tags: String::new(),
970 is_sensitive: false,
971 is_crash: false,
972 },
973 )))?;
974 Ok(())
975 }
976
977 pub fn add_point(
978 &self,
979 value: f64,
980 context: &ContextKey,
981 extra_tags: Vec<Tag>,
982 ) -> anyhow::Result<()> {
983 self.sender
984 .try_send(TelemetryActions::AddPoint((value, *context, extra_tags)))?;
985 Ok(())
986 }
987
988 pub fn wait_for_shutdown(&self) {
989 self.shutdown.wait_for_shutdown();
990 }
991
992 pub fn stats(&self) -> anyhow::Result<oneshot::Receiver<TelemetryWorkerStats>> {
993 let (sender, receiver) = oneshot::channel();
994 self.sender
995 .try_send(TelemetryActions::CollectStats(sender))?;
996 Ok(receiver)
997 }
998}
999
1000pub const MAX_ITEMS: usize = 5000;
1002
1003#[derive(Debug, Default, Clone, Copy)]
1004pub enum TelemetryWorkerFlavor {
1005 #[default]
1008 Full,
1009 MetricsLogs,
1011}
1012
1013pub struct TelemetryWorkerBuilder {
1014 pub host: Host,
1015 pub application: Application,
1016 pub runtime_id: Option<String>,
1017 pub dependencies: store::Store<data::Dependency>,
1018 pub integrations: store::Store<data::Integration>,
1019 pub configurations: store::Store<data::Configuration>,
1020 pub native_deps: bool,
1021 pub rust_shared_lib_deps: bool,
1022 pub config: Config,
1023 pub flavor: TelemetryWorkerFlavor,
1024}
1025
1026impl TelemetryWorkerBuilder {
1027 pub fn new_fetch_host(
1029 service_name: String,
1030 language_name: String,
1031 language_version: String,
1032 tracer_version: String,
1033 ) -> Self {
1034 Self {
1035 host: crate::build_host(),
1036 ..Self::new(
1037 String::new(),
1038 service_name,
1039 language_name,
1040 language_version,
1041 tracer_version,
1042 )
1043 }
1044 }
1045
1046 pub fn new(
1048 hostname: String,
1049 service_name: String,
1050 language_name: String,
1051 language_version: String,
1052 tracer_version: String,
1053 ) -> Self {
1054 Self {
1055 host: Host {
1056 hostname,
1057 ..Default::default()
1058 },
1059 application: Application {
1060 service_name,
1061 language_name,
1062 language_version,
1063 tracer_version,
1064 ..Default::default()
1065 },
1066 runtime_id: None,
1067 dependencies: store::Store::new(MAX_ITEMS),
1068 integrations: store::Store::new(MAX_ITEMS),
1069 configurations: store::Store::new(MAX_ITEMS),
1070 native_deps: true,
1071 rust_shared_lib_deps: false,
1072 config: Config::default(),
1073 flavor: TelemetryWorkerFlavor::default(),
1074 }
1075 }
1076
1077 pub fn build_worker(self, tokio_runtime: Handle) -> (TelemetryWorkerHandle, TelemetryWorker) {
1081 let (tx, mailbox) = mpsc::channel(5000);
1082 let shutdown = Arc::new(InnerTelemetryShutdown {
1083 is_shutdown: Mutex::new(false),
1084 condvar: Condvar::new(),
1085 });
1086 let contexts = MetricContexts::default();
1087 let token = CancellationToken::new();
1088 let config = self.config;
1089 let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval;
1090 let client = http_client::from_config(&config);
1091
1092 let metrics_flush_interval =
1093 telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL);
1094
1095 #[allow(clippy::unwrap_used)]
1096 let worker = TelemetryWorker {
1097 flavor: self.flavor,
1098 data: TelemetryWorkerData {
1099 started: false,
1100 dependencies: self.dependencies,
1101 integrations: self.integrations,
1102 configurations: self.configurations,
1103 logs: store::QueueHashMap::default(),
1104 metric_contexts: contexts.clone(),
1105 metric_buckets: MetricBuckets::default(),
1106 host: self.host,
1107 app: self.application,
1108 },
1109 config,
1110 mailbox,
1111 seq_id: AtomicU64::new(1),
1112 runtime_id: self
1113 .runtime_id
1114 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1115 client,
1116 metrics_flush_interval,
1117 deadlines: scheduler::Scheduler::new(vec![
1118 (metrics_flush_interval, LifecycleAction::FlushMetricAggr),
1119 (telemetry_heartbeat_interval, LifecycleAction::FlushData),
1120 (
1121 time::Duration::from_secs(60 * 60 * 24),
1122 LifecycleAction::ExtendedHeartbeat,
1123 ),
1124 ]),
1125 cancellation_token: token.clone(),
1126 };
1127
1128 (
1129 TelemetryWorkerHandle {
1130 sender: tx,
1131 shutdown,
1132 cancellation_token: token,
1133 runtime: tokio_runtime,
1134 contexts,
1135 },
1136 worker,
1137 )
1138 }
1139
1140 pub fn spawn(self) -> (TelemetryWorkerHandle, JoinHandle<()>) {
1143 let tokio_runtime = tokio::runtime::Handle::current();
1144
1145 let (worker_handle, mut worker) = self.build_worker(tokio_runtime.clone());
1146
1147 let join_handle = tokio_runtime.spawn(async move { worker.run().await });
1148
1149 (worker_handle, join_handle)
1150 }
1151
1152 pub fn run(self) -> anyhow::Result<TelemetryWorkerHandle> {
1154 let runtime = tokio::runtime::Builder::new_current_thread()
1155 .enable_all()
1156 .build()?;
1157 let (handle, mut worker) = self.build_worker(runtime.handle().clone());
1158 let notify_shutdown = handle.shutdown.clone();
1159 std::thread::spawn(move || {
1160 runtime.block_on(worker.run());
1161 runtime.shutdown_background();
1162 notify_shutdown.shutdown_finished();
1163 });
1164
1165 Ok(handle)
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use crate::worker::TelemetryWorkerHandle;
1172
1173 fn is_send<T: Send>(_: T) {}
1174 fn is_sync<T: Sync>(_: T) {}
1175
1176 #[test]
1177 fn test_handle_sync_send() {
1178 #[allow(clippy::redundant_closure)]
1179 let _ = |h: TelemetryWorkerHandle| is_send(h);
1180 #[allow(clippy::redundant_closure)]
1181 let _ = |h: TelemetryWorkerHandle| is_sync(h);
1182 }
1183}