1pub mod http_client;
5mod scheduler;
6pub mod store;
7
8use crate::{
9 config::Config,
10 data::{self, Application, Dependency, Endpoint, Host, Integration, Log, Payload, Telemetry},
11 metrics::{ContextKey, MetricBuckets, MetricContexts},
12};
13
14use libdd_common::{http_common, tag::Tag, worker::Worker};
15
16use std::iter::Sum;
17use std::ops::Add;
18use std::{
19 collections::hash_map::DefaultHasher,
20 hash::{Hash, Hasher},
21 ops::ControlFlow,
22 sync::{
23 atomic::{AtomicU64, Ordering},
24 Arc, Condvar, Mutex,
25 },
26 time,
27};
28use std::{collections::HashSet, fmt::Debug, time::Duration};
29
30use crate::metrics::MetricBucketStats;
31use futures::{
32 channel::oneshot,
33 future::{self},
34};
35use http::{header, HeaderValue};
36use serde::{Deserialize, Serialize};
37use tokio::{
38 runtime::{self, Handle},
39 sync::mpsc,
40 task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use tracing::debug;
44
45const CONTINUE: ControlFlow<()> = ControlFlow::Continue(());
46const BREAK: ControlFlow<()> = ControlFlow::Break(());
47
48fn time_now() -> f64 {
49 #[allow(clippy::unwrap_used)]
50 std::time::SystemTime::UNIX_EPOCH
51 .elapsed()
52 .unwrap_or_default()
53 .as_secs_f64()
54}
55
56macro_rules! telemetry_worker_log {
57 ($worker:expr , ERROR , $fmt_str:tt, $($arg:tt)*) => {
58 {
59 debug!(
60 worker.runtime_id = %$worker.runtime_id,
61 worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
62 $fmt_str,
63 $($arg)*
64 );
65 if $worker.config.telemetry_debug_logging_enabled {
66 eprintln!(concat!("{}: Telemetry worker ERROR: ", $fmt_str), time_now(), $($arg)*);
67 }
68 }
69 };
70 ($worker:expr , DEBUG , $fmt_str:tt, $($arg:tt)*) => {
71 {
72 debug!(
73 worker.runtime_id = %$worker.runtime_id,
74 worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
75 $fmt_str,
76 $($arg)*
77 );
78 if $worker.config.telemetry_debug_logging_enabled {
79 println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*);
80 }
81 }
82 };
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub enum TelemetryActions {
87 AddPoint((f64, ContextKey, Vec<Tag>)),
88 AddConfig(data::Configuration),
89 AddDependency(Dependency),
90 AddIntegration(Integration),
91 AddLog((LogIdentifier, Log)),
92 AddEndpoint(Endpoint),
93 Lifecycle(LifecycleAction),
94 #[serde(skip)]
95 CollectStats(oneshot::Sender<TelemetryWorkerStats>),
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99pub enum LifecycleAction {
100 Start,
101 Stop,
102 FlushMetricAggr,
103 FlushData,
104 ExtendedHeartbeat,
105}
106
107#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub struct LogIdentifier {
113 pub identifier: u64,
115}
116
117#[derive(Debug)]
119struct TelemetryWorkerData {
120 started: bool,
121 dependencies: store::Store<Dependency>,
122 configurations: store::Store<data::Configuration>,
123 integrations: store::Store<data::Integration>,
124 endpoints: HashSet<data::Endpoint>,
125 logs: store::QueueHashMap<LogIdentifier, Log>,
126 metric_contexts: MetricContexts,
127 metric_buckets: MetricBuckets,
128 host: Host,
129 app: Application,
130}
131
132pub struct TelemetryWorker {
133 flavor: TelemetryWorkerFlavor,
134 config: Config,
135 mailbox: mpsc::Receiver<TelemetryActions>,
136 cancellation_token: CancellationToken,
137 seq_id: AtomicU64,
138 runtime_id: String,
139 client: Box<dyn http_client::HttpClient + Sync + Send>,
140 metrics_flush_interval: Duration,
141 deadlines: scheduler::Scheduler<LifecycleAction>,
142 data: TelemetryWorkerData,
143}
144impl Debug for TelemetryWorker {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("TelemetryWorker")
147 .field("flavor", &self.flavor)
148 .field("config", &self.config)
149 .field("mailbox", &self.mailbox)
150 .field("cancellation_token", &self.cancellation_token)
151 .field("seq_id", &self.seq_id)
152 .field("runtime_id", &self.runtime_id)
153 .field("metrics_flush_interval", &self.metrics_flush_interval)
154 .field("deadlines", &self.deadlines)
155 .field("data", &self.data)
156 .finish()
157 }
158}
159
160impl Worker for TelemetryWorker {
161 async fn run(&mut self) {
164 debug!(
165 worker.flavor = ?self.flavor,
166 worker.runtime_id = %self.runtime_id,
167 "Starting telemetry worker"
168 );
169
170 loop {
171 if self.cancellation_token.is_cancelled() {
172 debug!(
173 worker.runtime_id = %self.runtime_id,
174 "Telemetry worker cancelled, shutting down"
175 );
176 return;
177 }
178
179 let action = self.recv_next_action().await;
180 debug!(
181 worker.runtime_id = %self.runtime_id,
182 action = ?action,
183 "Received telemetry action"
184 );
185
186 let action_result = match self.flavor {
187 TelemetryWorkerFlavor::Full => self.dispatch_action(action).await,
188 TelemetryWorkerFlavor::MetricsLogs => {
189 self.dispatch_metrics_logs_action(action).await
190 }
191 };
192
193 match action_result {
194 ControlFlow::Continue(()) => {}
195 ControlFlow::Break(()) => {
196 debug!(
197 worker.runtime_id = %self.runtime_id,
198 worker.restartable = self.config.restartable,
199 "Telemetry worker received break signal"
200 );
201 if !self.config.restartable {
202 break;
203 }
204 }
205 };
206 }
207
208 debug!(
209 worker.runtime_id = %self.runtime_id,
210 "Telemetry worker stopped"
211 );
212 }
213}
214
215#[derive(Debug, Default, Serialize, Deserialize)]
216pub struct TelemetryWorkerStats {
217 pub dependencies_stored: u32,
218 pub dependencies_unflushed: u32,
219 pub configurations_stored: u32,
220 pub configurations_unflushed: u32,
221 pub integrations_stored: u32,
222 pub integrations_unflushed: u32,
223 pub logs: u32,
224 pub metric_contexts: u32,
225 pub metric_buckets: MetricBucketStats,
226}
227
228impl Add for TelemetryWorkerStats {
229 type Output = Self;
230
231 fn add(self, rhs: Self) -> Self::Output {
232 TelemetryWorkerStats {
233 dependencies_stored: self.dependencies_stored + rhs.dependencies_stored,
234 dependencies_unflushed: self.dependencies_unflushed + rhs.dependencies_unflushed,
235 configurations_stored: self.configurations_stored + rhs.configurations_stored,
236 configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed,
237 integrations_stored: self.integrations_stored + rhs.integrations_stored,
238 integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed,
239 logs: self.logs + rhs.logs,
240 metric_contexts: self.metric_contexts + rhs.metric_contexts,
241 metric_buckets: MetricBucketStats {
242 buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets,
243 series: self.metric_buckets.series + rhs.metric_buckets.series,
244 series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points,
245 distributions: self.metric_buckets.distributions
246 + self.metric_buckets.distributions,
247 distributions_points: self.metric_buckets.distributions_points
248 + self.metric_buckets.distributions_points,
249 },
250 }
251 }
252}
253
254impl Sum for TelemetryWorkerStats {
255 fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
256 iter.fold(Self::default(), |a, b| a + b)
257 }
258}
259
260mod serialize {
261 use crate::data;
262 use http::HeaderValue;
263 #[allow(clippy::declare_interior_mutable_const)]
264 pub const CONTENT_TYPE_VALUE: HeaderValue = libdd_common::header::APPLICATION_JSON;
265 pub fn serialize(telemetry: &data::Telemetry) -> anyhow::Result<Vec<u8>> {
266 Ok(serde_json::to_vec(telemetry)?)
267 }
268}
269
270impl TelemetryWorker {
271 fn log_err(&self, err: &anyhow::Error) {
272 telemetry_worker_log!(self, ERROR, "{}", err);
273 }
274
275 async fn recv_next_action(&mut self) -> TelemetryActions {
276 let action = if let Some((deadline, deadline_action)) = self.deadlines.next_deadline() {
277 if deadline
279 .checked_duration_since(time::Instant::now())
280 .is_none()
281 {
282 return TelemetryActions::Lifecycle(*deadline_action);
283 };
284
285 match tokio::time::timeout_at(deadline.into(), self.mailbox.recv()).await {
287 Ok(mailbox_action) => mailbox_action,
288 Err(_) => Some(TelemetryActions::Lifecycle(*deadline_action)),
289 }
290 } else {
291 self.mailbox.recv().await
292 };
293
294 action.unwrap_or_else(|| {
296 self.config.restartable = false;
298 TelemetryActions::Lifecycle(LifecycleAction::Stop)
299 })
300 }
301
302 async fn dispatch_metrics_logs_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
303 telemetry_worker_log!(self, DEBUG, "Handling metric action {:?}", action);
304 use LifecycleAction::*;
305 use TelemetryActions::*;
306 match action {
307 Lifecycle(Start) => {
308 if !self.data.started {
309 #[allow(clippy::unwrap_used)]
310 self.deadlines
311 .schedule_event(LifecycleAction::FlushMetricAggr)
312 .unwrap();
313
314 #[allow(clippy::unwrap_used)]
315 self.deadlines
316 .schedule_event(LifecycleAction::FlushData)
317 .unwrap();
318 self.data.started = true;
319 }
320 }
321 AddLog((identifier, log)) => {
322 let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
323 if !new {
324 l.count += 1;
325 }
326 }
327 AddPoint((point, key, extra_tags)) => {
328 self.data.metric_buckets.add_point(key, point, extra_tags)
329 }
330 Lifecycle(FlushMetricAggr) => {
331 self.data.metric_buckets.flush_aggregates();
332
333 #[allow(clippy::unwrap_used)]
334 self.deadlines
335 .schedule_event(LifecycleAction::FlushMetricAggr)
336 .unwrap();
337 }
338 Lifecycle(FlushData) => {
339 if !(self.data.started || self.config.restartable) {
340 return CONTINUE;
341 }
342
343 #[allow(clippy::unwrap_used)]
344 self.deadlines
345 .schedule_event(LifecycleAction::FlushData)
346 .unwrap();
347
348 let batch = self.build_observability_batch();
349 if !batch.is_empty() {
350 let payload = data::Payload::MessageBatch(batch);
351 match self.send_payload(&payload).await {
352 Ok(()) => self.payload_sent_success(&payload),
353 Err(e) => self.log_err(&e),
354 }
355 }
356 }
357 AddConfig(_)
358 | AddDependency(_)
359 | AddIntegration(_)
360 | AddEndpoint(_)
361 | Lifecycle(ExtendedHeartbeat) => {}
362 Lifecycle(Stop) => {
363 if !self.data.started {
364 return BREAK;
365 }
366 self.data.metric_buckets.flush_aggregates();
367
368 let observability_events = self.build_observability_batch();
369 if let Err(e) = self
370 .send_payload(&data::Payload::MessageBatch(observability_events))
371 .await
372 {
373 self.log_err(&e);
374 }
375 self.data.started = false;
376 if !self.config.restartable {
377 self.deadlines.clear_pending();
378 }
379 return BREAK;
380 }
381 CollectStats(stats_sender) => {
382 stats_sender.send(self.stats()).ok();
383 }
384 };
385 CONTINUE
386 }
387
388 async fn dispatch_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
389 telemetry_worker_log!(self, DEBUG, "Handling action {:?}", action);
390
391 use LifecycleAction::*;
392 use TelemetryActions::*;
393 match action {
394 Lifecycle(Start) => {
395 if !self.data.started {
396 let app_started = data::Payload::AppStarted(self.build_app_started());
397 match self.send_payload(&app_started).await {
398 Ok(()) => self.payload_sent_success(&app_started),
399 Err(err) => self.log_err(&err),
400 }
401
402 #[allow(clippy::unwrap_used)]
403 self.deadlines
404 .schedule_event(LifecycleAction::FlushMetricAggr)
405 .unwrap();
406
407 #[allow(clippy::unwrap_used)]
408 self.deadlines
410 .schedule_event(LifecycleAction::FlushData)
411 .unwrap();
412 self.data.started = true;
413 }
414 }
415 AddDependency(dep) => self.data.dependencies.insert(dep),
416 AddIntegration(integration) => self.data.integrations.insert(integration),
417 AddConfig(cfg) => self.data.configurations.insert(cfg),
418 AddEndpoint(endpoint) => {
419 self.data.endpoints.insert(endpoint);
420 }
421 AddLog((identifier, log)) => {
422 let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
423 if !new {
424 l.count += 1;
425 }
426 }
427 AddPoint((point, key, extra_tags)) => {
428 self.data.metric_buckets.add_point(key, point, extra_tags)
429 }
430 Lifecycle(FlushMetricAggr) => {
431 self.data.metric_buckets.flush_aggregates();
432
433 #[allow(clippy::unwrap_used)]
434 self.deadlines
435 .schedule_event(LifecycleAction::FlushMetricAggr)
436 .unwrap();
437 }
438 Lifecycle(FlushData) => {
439 if !(self.data.started || self.config.restartable) {
440 return CONTINUE;
441 }
442
443 #[allow(clippy::unwrap_used)]
444 self.deadlines
445 .schedule_event(LifecycleAction::FlushData)
446 .unwrap();
447
448 let mut batch = self.build_app_events_batch();
449 let payload = if batch.is_empty() {
450 data::Payload::AppHeartbeat(())
451 } else {
452 batch.push(data::Payload::AppHeartbeat(()));
453 data::Payload::MessageBatch(batch)
454 };
455 match self.send_payload(&payload).await {
456 Ok(()) => self.payload_sent_success(&payload),
457 Err(err) => self.log_err(&err),
458 }
459
460 let batch = self.build_observability_batch();
461 if !batch.is_empty() {
462 let payload = data::Payload::MessageBatch(batch);
463 match self.send_payload(&payload).await {
464 Ok(()) => self.payload_sent_success(&payload),
465 Err(err) => self.log_err(&err),
466 }
467 }
468 }
469 Lifecycle(ExtendedHeartbeat) => {
470 self.data.dependencies.unflush_stored();
471 self.data.integrations.unflush_stored();
472 self.data.configurations.unflush_stored();
473
474 let app_started = data::Payload::AppStarted(self.build_app_started());
475 match self.send_payload(&app_started).await {
476 Ok(()) => self.payload_sent_success(&app_started),
477 Err(err) => self.log_err(&err),
478 }
479 #[allow(clippy::unwrap_used)]
480 self.deadlines
481 .schedule_events(
482 &mut [
483 LifecycleAction::FlushData,
484 LifecycleAction::ExtendedHeartbeat,
485 ]
486 .into_iter(),
487 )
488 .unwrap();
489 }
490 Lifecycle(Stop) => {
491 if !self.data.started {
492 return BREAK;
493 }
494 self.data.metric_buckets.flush_aggregates();
495
496 let mut app_events = self.build_app_events_batch();
497 app_events.push(data::Payload::AppClosing(()));
498
499 let observability_events = self.build_observability_batch();
500
501 let mut payloads = vec![data::Payload::MessageBatch(app_events)];
502 if !observability_events.is_empty() {
503 payloads.push(data::Payload::MessageBatch(observability_events));
504 }
505
506 let self_arc = Arc::new(tokio::sync::RwLock::new(&mut *self));
507 let futures = payloads.into_iter().map(|payload| {
508 let self_arc = self_arc.clone();
509 async move {
510 let res = {
514 let self_rguard = self_arc.read().await;
515 self_rguard.send_payload(&payload).await
516 };
517 match res {
518 Ok(()) => self_arc.write().await.payload_sent_success(&payload),
519 Err(err) => self_arc.read().await.log_err(&err),
520 }
521 }
522 });
523 future::join_all(futures).await;
524
525 self.data.started = false;
526 if !self.config.restartable {
527 self.deadlines.clear_pending();
528 }
529
530 return BREAK;
531 }
532 CollectStats(stats_sender) => {
533 stats_sender.send(self.stats()).ok();
534 }
535 }
536
537 CONTINUE
538 }
539
540 fn build_app_events_batch(&mut self) -> Vec<Payload> {
542 let mut payloads = Vec::new();
543
544 if self.data.dependencies.flush_not_empty() {
545 payloads.push(data::Payload::AppDependenciesLoaded(
546 data::AppDependenciesLoaded {
547 dependencies: self.data.dependencies.unflushed().cloned().collect(),
548 },
549 ))
550 }
551 if self.data.integrations.flush_not_empty() {
552 payloads.push(data::Payload::AppIntegrationsChange(
553 data::AppIntegrationsChange {
554 integrations: self.data.integrations.unflushed().cloned().collect(),
555 },
556 ))
557 }
558 if self.data.configurations.flush_not_empty() {
559 payloads.push(data::Payload::AppClientConfigurationChange(
560 data::AppClientConfigurationChange {
561 configuration: self.data.configurations.unflushed().cloned().collect(),
562 },
563 ))
564 }
565 if !self.data.endpoints.is_empty() {
566 payloads.push(data::Payload::AppEndpoints(data::AppEndpoints {
567 is_first: true,
568 endpoints: self
569 .data
570 .endpoints
571 .iter()
572 .map(|e| e.to_json_value().unwrap_or_default())
573 .filter(|e| e.is_object())
574 .collect(),
575 }));
576 }
577 payloads
578 }
579
580 fn build_observability_batch(&mut self) -> Vec<Payload> {
582 let mut payloads = Vec::new();
583
584 let logs = self.build_logs();
585 if !logs.logs.is_empty() {
586 payloads.push(data::Payload::Logs(logs));
587 }
588 let metrics = self.build_metrics_series();
589 if !metrics.series.is_empty() {
590 payloads.push(data::Payload::GenerateMetrics(metrics))
591 }
592 let distributions = self.build_metrics_distributions();
593 if !distributions.series.is_empty() {
594 payloads.push(data::Payload::Sketches(distributions))
595 }
596 payloads
597 }
598
599 fn build_metrics_distributions(&mut self) -> data::Distributions {
600 let mut series = Vec::new();
601 let context_guard = self.data.metric_contexts.lock();
602 for (context_key, extra_tags, points) in self.data.metric_buckets.flush_distributions() {
603 let Some(context) = context_guard.read(context_key) else {
604 telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
605 continue;
606 };
607 let mut tags = extra_tags;
608 tags.extend(context.tags.iter().cloned());
609 series.push(data::metrics::Distribution {
610 namespace: context.namespace,
611 metric: context.name.clone(),
612 tags,
613 sketch: data::metrics::SerializedSketch::B64 {
614 sketch_b64: base64::Engine::encode(
615 &base64::engine::general_purpose::STANDARD,
616 points.encode_to_vec(),
617 ),
618 },
619 common: context.common,
620 _type: context.metric_type,
621 interval: self.metrics_flush_interval.as_secs(),
622 });
623 }
624 data::Distributions { series }
625 }
626
627 fn build_metrics_series(&mut self) -> data::GenerateMetrics {
628 let mut series = Vec::new();
629 let context_guard = self.data.metric_contexts.lock();
630 for (context_key, extra_tags, points) in self.data.metric_buckets.flush_series() {
631 let Some(context) = context_guard.read(context_key) else {
632 telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
633 continue;
634 };
635
636 let mut tags = extra_tags;
637 tags.extend(context.tags.iter().cloned());
638 series.push(data::metrics::Serie {
639 namespace: context.namespace,
640 metric: context.name.clone(),
641 tags,
642 points,
643 common: context.common,
644 _type: context.metric_type,
645 interval: self.metrics_flush_interval.as_secs(),
646 });
647 }
648
649 data::GenerateMetrics { series }
650 }
651
652 fn build_app_started(&mut self) -> data::AppStarted {
653 data::AppStarted {
654 configuration: self.data.configurations.unflushed().cloned().collect(),
655 }
656 }
657
658 fn app_started_sent_success(&mut self, p: &data::AppStarted) {
659 self.data
660 .configurations
661 .removed_flushed(p.configuration.len());
662 }
663
664 fn payload_sent_success(&mut self, payload: &data::Payload) {
665 use data::Payload::*;
666 match payload {
667 AppStarted(p) => self.app_started_sent_success(p),
668 AppExtendedHeartbeat(p) => self.app_started_sent_success(p),
669 AppDependenciesLoaded(p) => {
670 self.data.dependencies.removed_flushed(p.dependencies.len())
671 }
672 AppIntegrationsChange(p) => {
673 self.data.integrations.removed_flushed(p.integrations.len())
674 }
675 AppClientConfigurationChange(p) => self
676 .data
677 .configurations
678 .removed_flushed(p.configuration.len()),
679 AppEndpoints(_) => self.data.endpoints.clear(),
680 MessageBatch(batch) => {
681 for p in batch {
682 self.payload_sent_success(p);
683 }
684 }
685 Logs(p) => {
686 for _ in &p.logs {
687 self.data.logs.pop_front();
688 }
689 }
690 AppHeartbeat(()) | AppClosing(()) => {}
691 GenerateMetrics(_) | Sketches(_) => {}
692 }
693 }
694
695 fn build_logs(&self) -> data::Logs {
696 let logs = self.data.logs.iter().map(|(_, l)| l.clone()).collect();
698 data::Logs { logs }
699 }
700
701 fn next_seq_id(&self) -> u64 {
702 self.seq_id.fetch_add(1, Ordering::Release)
703 }
704
705 async fn send_payload(&self, payload: &data::Payload) -> anyhow::Result<()> {
706 debug!(
707 worker.runtime_id = %self.runtime_id,
708 payload.type = payload.request_type(),
709 seq_id = self.seq_id.load(Ordering::Acquire),
710 "Sending telemetry payload"
711 );
712 let req = self.build_request(payload)?;
713 let result = self.send_request(req).await;
714 match &result {
715 Ok(resp) => debug!(
716 worker.runtime_id = %self.runtime_id,
717 payload.type = payload.request_type(),
718 response.status = resp.status().as_u16(),
719 "Successfully sent telemetry payload"
720 ),
721 Err(e) => debug!(
722 worker.runtime_id = %self.runtime_id,
723 payload.type = payload.request_type(),
724 error = ?e,
725 "Failed to send telemetry payload"
726 ),
727 }
728 Ok(())
729 }
730
731 fn build_request(&self, payload: &data::Payload) -> anyhow::Result<http_common::HttpRequest> {
732 let seq_id = self.next_seq_id();
733 let tel = Telemetry {
734 api_version: data::ApiVersion::V2,
735 tracer_time: time::SystemTime::UNIX_EPOCH
736 .elapsed()
737 .map_or(0, |d| d.as_secs()),
738 runtime_id: &self.runtime_id,
739 seq_id,
740 host: &self.data.host,
741 origin: None,
742 application: &self.data.app,
743 payload,
744 };
745
746 telemetry_worker_log!(self, DEBUG, "Prepared payload: {:?}", tel);
747
748 let req = http_client::request_builder(&self.config)?
749 .method(http::Method::POST)
750 .header(header::CONTENT_TYPE, serialize::CONTENT_TYPE_VALUE)
751 .header(
752 http_client::header::REQUEST_TYPE,
753 HeaderValue::from_static(payload.request_type()),
754 )
755 .header(
756 http_client::header::API_VERSION,
757 HeaderValue::from_static(data::ApiVersion::V2.to_str()),
758 )
759 .header(
760 http_client::header::LIBRARY_LANGUAGE,
761 tel.application.language_name.clone(),
763 )
764 .header(
765 http_client::header::LIBRARY_VERSION,
766 tel.application.tracer_version.clone(),
767 );
768
769 let body = http_common::Body::from(serialize::serialize(&tel)?);
770 Ok(req.body(body)?)
771 }
772
773 async fn send_request(
774 &self,
775 req: http_common::HttpRequest,
776 ) -> Result<http_common::HttpResponse, http_common::Error> {
777 let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() {
778 endpoint.timeout_ms
779 } else {
780 libdd_common::Endpoint::DEFAULT_TIMEOUT
781 };
782
783 debug!(
784 worker.runtime_id = %self.runtime_id,
785 http.timeout_ms = timeout_ms,
786 "Sending HTTP request"
787 );
788
789 tokio::select! {
790 _ = self.cancellation_token.cancelled() => {
791 debug!(
792 worker.runtime_id = %self.runtime_id,
793 "Telemetry request cancelled"
794 );
795 Err(http_common::Error::Other(anyhow::anyhow!("Request cancelled")))
796 },
797 _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => {
798 debug!(
799 worker.runtime_id = %self.runtime_id,
800 http.timeout_ms = timeout_ms,
801 "Telemetry request timed out"
802 );
803 Err(http_common::Error::Other(anyhow::anyhow!("Request timed out")))
804 },
805 r = self.client.request(req) => {
806 match r {
807 Ok(resp) => {
808 Ok(resp)
809 }
810 Err(e) => {
811 Err(e)
812 },
813 }
814 }
815 }
816 }
817
818 fn stats(&self) -> TelemetryWorkerStats {
819 TelemetryWorkerStats {
820 dependencies_stored: self.data.dependencies.len_stored() as u32,
821 dependencies_unflushed: self.data.dependencies.len_unflushed() as u32,
822 configurations_stored: self.data.configurations.len_stored() as u32,
823 configurations_unflushed: self.data.configurations.len_unflushed() as u32,
824 integrations_stored: self.data.integrations.len_stored() as u32,
825 integrations_unflushed: self.data.integrations.len_unflushed() as u32,
826 logs: self.data.logs.len() as u32,
827 metric_contexts: self.data.metric_contexts.lock().len() as u32,
828 metric_buckets: self.data.metric_buckets.stats(),
829 }
830 }
831}
832
833#[derive(Debug)]
834struct InnerTelemetryShutdown {
835 is_shutdown: Mutex<bool>,
836 condvar: Condvar,
837}
838
839impl InnerTelemetryShutdown {
840 fn wait_for_shutdown(&self) {
841 drop(
842 #[allow(clippy::unwrap_used)]
843 self.condvar
844 .wait_while(self.is_shutdown.lock().unwrap(), |is_shutdown| {
845 !*is_shutdown
846 })
847 .unwrap(),
848 )
849 }
850
851 #[allow(clippy::unwrap_used)]
852 fn shutdown_finished(&self) {
853 *self.is_shutdown.lock().unwrap() = true;
854 self.condvar.notify_all();
855 }
856}
857
858#[derive(Clone, Debug)]
859pub struct TelemetryWorkerHandle {
867 sender: mpsc::Sender<TelemetryActions>,
868 shutdown: Arc<InnerTelemetryShutdown>,
869 cancellation_token: CancellationToken,
870 runtime: runtime::Handle,
872
873 contexts: MetricContexts,
874}
875
876impl TelemetryWorkerHandle {
877 pub fn register_metric_context(
878 &self,
879 name: String,
880 tags: Vec<Tag>,
881 metric_type: data::metrics::MetricType,
882 common: bool,
883 namespace: data::metrics::MetricNamespace,
884 ) -> ContextKey {
885 self.contexts
886 .register_metric_context(name, tags, metric_type, common, namespace)
887 }
888
889 pub fn try_send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
890 Ok(self.sender.try_send(msg)?)
891 }
892
893 pub async fn send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
894 Ok(self.sender.send(msg).await?)
895 }
896
897 pub async fn send_msgs<T>(&self, msgs: T) -> anyhow::Result<()>
898 where
899 T: IntoIterator<Item = TelemetryActions>,
900 {
901 for msg in msgs {
902 self.sender.send(msg).await?;
903 }
904
905 Ok(())
906 }
907
908 pub async fn send_msg_timeout(
909 &self,
910 msg: TelemetryActions,
911 timeout: time::Duration,
912 ) -> anyhow::Result<()> {
913 Ok(self.sender.send_timeout(msg, timeout).await?)
914 }
915
916 pub fn send_start(&self) -> anyhow::Result<()> {
917 Ok(self
918 .sender
919 .try_send(TelemetryActions::Lifecycle(LifecycleAction::Start))?)
920 }
921
922 pub fn send_stop(&self) -> anyhow::Result<()> {
923 Ok(self
924 .sender
925 .try_send(TelemetryActions::Lifecycle(LifecycleAction::Stop))?)
926 }
927
928 fn cancel_requests_with_deadline(&self, deadline: time::Instant) {
929 let token = self.cancellation_token.clone();
930 let f = async move {
931 tokio::time::sleep_until(deadline.into()).await;
932 token.cancel()
933 };
934 self.runtime.spawn(f);
935 }
936
937 pub fn wait_for_shutdown_deadline(&self, deadline: time::Instant) {
938 self.cancel_requests_with_deadline(deadline);
939 self.wait_for_shutdown()
940 }
941
942 pub fn add_dependency(&self, name: String, version: Option<String>) -> anyhow::Result<()> {
943 self.sender
944 .try_send(TelemetryActions::AddDependency(Dependency {
945 name,
946 version,
947 }))?;
948 Ok(())
949 }
950
951 pub fn add_integration(
952 &self,
953 name: String,
954 enabled: bool,
955 version: Option<String>,
956 compatible: Option<bool>,
957 auto_enabled: Option<bool>,
958 ) -> anyhow::Result<()> {
959 self.sender
960 .try_send(TelemetryActions::AddIntegration(Integration {
961 name,
962 version,
963 compatible,
964 enabled,
965 auto_enabled,
966 }))?;
967 Ok(())
968 }
969
970 pub fn add_log<T: Hash>(
971 &self,
972 identifier: T,
973 message: String,
974 level: data::LogLevel,
975 stack_trace: Option<String>,
976 ) -> anyhow::Result<()> {
977 let mut hasher = DefaultHasher::new();
978 identifier.hash(&mut hasher);
979 self.sender.try_send(TelemetryActions::AddLog((
980 LogIdentifier {
981 identifier: hasher.finish(),
982 },
983 data::Log {
984 message,
985 level,
986 stack_trace,
987 count: 1,
988 tags: String::new(),
989 is_sensitive: false,
990 is_crash: false,
991 },
992 )))?;
993 Ok(())
994 }
995
996 pub fn add_point(
997 &self,
998 value: f64,
999 context: &ContextKey,
1000 extra_tags: Vec<Tag>,
1001 ) -> anyhow::Result<()> {
1002 self.sender
1003 .try_send(TelemetryActions::AddPoint((value, *context, extra_tags)))?;
1004 Ok(())
1005 }
1006
1007 pub fn wait_for_shutdown(&self) {
1008 self.shutdown.wait_for_shutdown();
1009 }
1010
1011 pub fn stats(&self) -> anyhow::Result<oneshot::Receiver<TelemetryWorkerStats>> {
1012 let (sender, receiver) = oneshot::channel();
1013 self.sender
1014 .try_send(TelemetryActions::CollectStats(sender))?;
1015 Ok(receiver)
1016 }
1017}
1018
1019pub const MAX_ITEMS: usize = 5000;
1021
1022#[derive(Debug, Default, Clone, Copy)]
1023pub enum TelemetryWorkerFlavor {
1024 #[default]
1027 Full,
1028 MetricsLogs,
1030}
1031
1032pub struct TelemetryWorkerBuilder {
1033 pub host: Host,
1034 pub application: Application,
1035 pub runtime_id: Option<String>,
1036 pub dependencies: store::Store<data::Dependency>,
1037 pub integrations: store::Store<data::Integration>,
1038 pub configurations: store::Store<data::Configuration>,
1039 pub endpoints: HashSet<data::Endpoint>,
1040 pub native_deps: bool,
1041 pub rust_shared_lib_deps: bool,
1042 pub config: Config,
1043 pub flavor: TelemetryWorkerFlavor,
1044}
1045
1046impl TelemetryWorkerBuilder {
1047 pub fn new_fetch_host(
1049 service_name: String,
1050 language_name: String,
1051 language_version: String,
1052 tracer_version: String,
1053 ) -> Self {
1054 Self {
1055 host: crate::build_host(),
1056 ..Self::new(
1057 String::new(),
1058 service_name,
1059 language_name,
1060 language_version,
1061 tracer_version,
1062 )
1063 }
1064 }
1065
1066 pub fn new(
1068 hostname: String,
1069 service_name: String,
1070 language_name: String,
1071 language_version: String,
1072 tracer_version: String,
1073 ) -> Self {
1074 Self {
1075 host: Host {
1076 hostname,
1077 ..Default::default()
1078 },
1079 application: Application {
1080 service_name,
1081 language_name,
1082 language_version,
1083 tracer_version,
1084 ..Default::default()
1085 },
1086 runtime_id: None,
1087 dependencies: store::Store::new(MAX_ITEMS),
1088 integrations: store::Store::new(MAX_ITEMS),
1089 configurations: store::Store::new(MAX_ITEMS),
1090 endpoints: HashSet::new(),
1091 native_deps: true,
1092 rust_shared_lib_deps: false,
1093 config: Config::default(),
1094 flavor: TelemetryWorkerFlavor::default(),
1095 }
1096 }
1097
1098 pub fn build_worker(self, tokio_runtime: Handle) -> (TelemetryWorkerHandle, TelemetryWorker) {
1102 let (tx, mailbox) = mpsc::channel(5000);
1103 let shutdown = Arc::new(InnerTelemetryShutdown {
1104 is_shutdown: Mutex::new(false),
1105 condvar: Condvar::new(),
1106 });
1107 let contexts = MetricContexts::default();
1108 let token = CancellationToken::new();
1109 let config = self.config;
1110 let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval;
1111 let client = http_client::from_config(&config);
1112
1113 let metrics_flush_interval =
1114 telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL);
1115
1116 #[allow(clippy::unwrap_used)]
1117 let worker = TelemetryWorker {
1118 flavor: self.flavor,
1119 data: TelemetryWorkerData {
1120 started: false,
1121 dependencies: self.dependencies,
1122 integrations: self.integrations,
1123 configurations: self.configurations,
1124 endpoints: self.endpoints,
1125 logs: store::QueueHashMap::default(),
1126 metric_contexts: contexts.clone(),
1127 metric_buckets: MetricBuckets::default(),
1128 host: self.host,
1129 app: self.application,
1130 },
1131 config,
1132 mailbox,
1133 seq_id: AtomicU64::new(1),
1134 runtime_id: self
1135 .runtime_id
1136 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1137 client,
1138 metrics_flush_interval,
1139 deadlines: scheduler::Scheduler::new(vec![
1140 (metrics_flush_interval, LifecycleAction::FlushMetricAggr),
1141 (telemetry_heartbeat_interval, LifecycleAction::FlushData),
1142 (
1143 time::Duration::from_secs(60 * 60 * 24),
1144 LifecycleAction::ExtendedHeartbeat,
1145 ),
1146 ]),
1147 cancellation_token: token.clone(),
1148 };
1149
1150 (
1151 TelemetryWorkerHandle {
1152 sender: tx,
1153 shutdown,
1154 cancellation_token: token,
1155 runtime: tokio_runtime,
1156 contexts,
1157 },
1158 worker,
1159 )
1160 }
1161
1162 pub fn spawn(self) -> (TelemetryWorkerHandle, JoinHandle<()>) {
1165 let tokio_runtime = tokio::runtime::Handle::current();
1166
1167 let (worker_handle, mut worker) = self.build_worker(tokio_runtime.clone());
1168
1169 let join_handle = tokio_runtime.spawn(async move { worker.run().await });
1170
1171 (worker_handle, join_handle)
1172 }
1173
1174 pub fn run(self) -> anyhow::Result<TelemetryWorkerHandle> {
1176 let runtime = tokio::runtime::Builder::new_current_thread()
1177 .enable_all()
1178 .build()?;
1179 let (handle, mut worker) = self.build_worker(runtime.handle().clone());
1180 let notify_shutdown = handle.shutdown.clone();
1181 std::thread::spawn(move || {
1182 runtime.block_on(worker.run());
1183 runtime.shutdown_background();
1184 notify_shutdown.shutdown_finished();
1185 });
1186
1187 Ok(handle)
1188 }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193 use crate::worker::TelemetryWorkerHandle;
1194
1195 fn is_send<T: Send>(_: T) {}
1196 fn is_sync<T: Sync>(_: T) {}
1197
1198 #[test]
1199 fn test_handle_sync_send() {
1200 #[allow(clippy::redundant_closure)]
1201 let _ = |h: TelemetryWorkerHandle| is_send(h);
1202 #[allow(clippy::redundant_closure)]
1203 let _ = |h: TelemetryWorkerHandle| is_sync(h);
1204 }
1205}