Skip to main content

libdd_telemetry/worker/
mod.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod http_client;
5mod scheduler;
6pub mod store;
7
8use crate::{
9    config::Config,
10    data::{self, Application, Dependency, Endpoint, Host, Integration, Log, Payload, Telemetry},
11    metrics::{ContextKey, MetricBuckets, MetricContexts},
12};
13
14use libdd_common::{http_common, tag::Tag, worker::Worker};
15
16use std::iter::Sum;
17use std::ops::Add;
18use std::{
19    collections::hash_map::DefaultHasher,
20    hash::{Hash, Hasher},
21    ops::ControlFlow,
22    sync::{
23        atomic::{AtomicU64, Ordering},
24        Arc, Condvar, Mutex,
25    },
26    time,
27};
28use std::{collections::HashSet, fmt::Debug, time::Duration};
29
30use crate::metrics::MetricBucketStats;
31use futures::{
32    channel::oneshot,
33    future::{self},
34};
35use http::{header, HeaderValue};
36use serde::{Deserialize, Serialize};
37use tokio::{
38    runtime::{self, Handle},
39    sync::mpsc,
40    task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use tracing::debug;
44
45const CONTINUE: ControlFlow<()> = ControlFlow::Continue(());
46const BREAK: ControlFlow<()> = ControlFlow::Break(());
47
48fn time_now() -> f64 {
49    #[allow(clippy::unwrap_used)]
50    std::time::SystemTime::UNIX_EPOCH
51        .elapsed()
52        .unwrap_or_default()
53        .as_secs_f64()
54}
55
56macro_rules! telemetry_worker_log {
57    ($worker:expr , ERROR , $fmt_str:tt, $($arg:tt)*) => {
58        {
59            debug!(
60                worker.runtime_id = %$worker.runtime_id,
61                worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
62                $fmt_str,
63                $($arg)*
64            );
65            if $worker.config.telemetry_debug_logging_enabled {
66                eprintln!(concat!("{}: Telemetry worker ERROR: ", $fmt_str), time_now(), $($arg)*);
67            }
68        }
69    };
70    ($worker:expr , DEBUG , $fmt_str:tt, $($arg:tt)*) => {
71        {
72            debug!(
73                worker.runtime_id = %$worker.runtime_id,
74                worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
75                $fmt_str,
76                $($arg)*
77            );
78            if $worker.config.telemetry_debug_logging_enabled {
79                println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*);
80            }
81        }
82    };
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub enum TelemetryActions {
87    AddPoint((f64, ContextKey, Vec<Tag>)),
88    AddConfig(data::Configuration),
89    AddDependency(Dependency),
90    AddIntegration(Integration),
91    AddLog((LogIdentifier, Log)),
92    AddEndpoint(Endpoint),
93    Lifecycle(LifecycleAction),
94    #[serde(skip)]
95    CollectStats(oneshot::Sender<TelemetryWorkerStats>),
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99pub enum LifecycleAction {
100    Start,
101    Stop,
102    FlushMetricAggr,
103    FlushData,
104    ExtendedHeartbeat,
105}
106
107/// Identifies a logging location uniquely
108///
109/// The identifier is a single 64 bit integer to save space an memory
110/// and to be able to generic on the way different languages handle
111#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub struct LogIdentifier {
113    // Collisions? Never heard of them
114    pub identifier: u64,
115}
116
117// Holds the current state of the telemetry worker
118#[derive(Debug)]
119struct TelemetryWorkerData {
120    started: bool,
121    dependencies: store::Store<Dependency>,
122    configurations: store::Store<data::Configuration>,
123    integrations: store::Store<data::Integration>,
124    endpoints: HashSet<data::Endpoint>,
125    logs: store::QueueHashMap<LogIdentifier, Log>,
126    metric_contexts: MetricContexts,
127    metric_buckets: MetricBuckets,
128    host: Host,
129    app: Application,
130}
131
132pub struct TelemetryWorker {
133    flavor: TelemetryWorkerFlavor,
134    config: Config,
135    mailbox: mpsc::Receiver<TelemetryActions>,
136    cancellation_token: CancellationToken,
137    seq_id: AtomicU64,
138    runtime_id: String,
139    client: Box<dyn http_client::HttpClient + Sync + Send>,
140    metrics_flush_interval: Duration,
141    deadlines: scheduler::Scheduler<LifecycleAction>,
142    data: TelemetryWorkerData,
143}
144impl Debug for TelemetryWorker {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("TelemetryWorker")
147            .field("flavor", &self.flavor)
148            .field("config", &self.config)
149            .field("mailbox", &self.mailbox)
150            .field("cancellation_token", &self.cancellation_token)
151            .field("seq_id", &self.seq_id)
152            .field("runtime_id", &self.runtime_id)
153            .field("metrics_flush_interval", &self.metrics_flush_interval)
154            .field("deadlines", &self.deadlines)
155            .field("data", &self.data)
156            .finish()
157    }
158}
159
160impl Worker for TelemetryWorker {
161    // Runs a state machine that waits for actions, either from the worker's
162    // mailbox, or scheduled actions from the worker's deadline object.
163    async fn run(&mut self) {
164        debug!(
165            worker.flavor = ?self.flavor,
166            worker.runtime_id = %self.runtime_id,
167            "Starting telemetry worker"
168        );
169
170        loop {
171            if self.cancellation_token.is_cancelled() {
172                debug!(
173                    worker.runtime_id = %self.runtime_id,
174                    "Telemetry worker cancelled, shutting down"
175                );
176                return;
177            }
178
179            let action = self.recv_next_action().await;
180            debug!(
181                worker.runtime_id = %self.runtime_id,
182                action = ?action,
183                "Received telemetry action"
184            );
185
186            let action_result = match self.flavor {
187                TelemetryWorkerFlavor::Full => self.dispatch_action(action).await,
188                TelemetryWorkerFlavor::MetricsLogs => {
189                    self.dispatch_metrics_logs_action(action).await
190                }
191            };
192
193            match action_result {
194                ControlFlow::Continue(()) => {}
195                ControlFlow::Break(()) => {
196                    debug!(
197                        worker.runtime_id = %self.runtime_id,
198                        worker.restartable = self.config.restartable,
199                        "Telemetry worker received break signal"
200                    );
201                    if !self.config.restartable {
202                        break;
203                    }
204                }
205            };
206        }
207
208        debug!(
209            worker.runtime_id = %self.runtime_id,
210            "Telemetry worker stopped"
211        );
212    }
213}
214
215#[derive(Debug, Default, Serialize, Deserialize)]
216pub struct TelemetryWorkerStats {
217    pub dependencies_stored: u32,
218    pub dependencies_unflushed: u32,
219    pub configurations_stored: u32,
220    pub configurations_unflushed: u32,
221    pub integrations_stored: u32,
222    pub integrations_unflushed: u32,
223    pub logs: u32,
224    pub metric_contexts: u32,
225    pub metric_buckets: MetricBucketStats,
226}
227
228impl Add for TelemetryWorkerStats {
229    type Output = Self;
230
231    fn add(self, rhs: Self) -> Self::Output {
232        TelemetryWorkerStats {
233            dependencies_stored: self.dependencies_stored + rhs.dependencies_stored,
234            dependencies_unflushed: self.dependencies_unflushed + rhs.dependencies_unflushed,
235            configurations_stored: self.configurations_stored + rhs.configurations_stored,
236            configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed,
237            integrations_stored: self.integrations_stored + rhs.integrations_stored,
238            integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed,
239            logs: self.logs + rhs.logs,
240            metric_contexts: self.metric_contexts + rhs.metric_contexts,
241            metric_buckets: MetricBucketStats {
242                buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets,
243                series: self.metric_buckets.series + rhs.metric_buckets.series,
244                series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points,
245                distributions: self.metric_buckets.distributions
246                    + self.metric_buckets.distributions,
247                distributions_points: self.metric_buckets.distributions_points
248                    + self.metric_buckets.distributions_points,
249            },
250        }
251    }
252}
253
254impl Sum for TelemetryWorkerStats {
255    fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
256        iter.fold(Self::default(), |a, b| a + b)
257    }
258}
259
260mod serialize {
261    use crate::data;
262    use http::HeaderValue;
263    #[allow(clippy::declare_interior_mutable_const)]
264    pub const CONTENT_TYPE_VALUE: HeaderValue = libdd_common::header::APPLICATION_JSON;
265    pub fn serialize(telemetry: &data::Telemetry) -> anyhow::Result<Vec<u8>> {
266        Ok(serde_json::to_vec(telemetry)?)
267    }
268}
269
270impl TelemetryWorker {
271    fn log_err(&self, err: &anyhow::Error) {
272        telemetry_worker_log!(self, ERROR, "{}", err);
273    }
274
275    async fn recv_next_action(&mut self) -> TelemetryActions {
276        let action = if let Some((deadline, deadline_action)) = self.deadlines.next_deadline() {
277            // If deadline passed, directly return associated action
278            if deadline
279                .checked_duration_since(time::Instant::now())
280                .is_none()
281            {
282                return TelemetryActions::Lifecycle(*deadline_action);
283            };
284
285            // Otherwise run it in a timeout against the mailbox
286            match tokio::time::timeout_at(deadline.into(), self.mailbox.recv()).await {
287                Ok(mailbox_action) => mailbox_action,
288                Err(_) => Some(TelemetryActions::Lifecycle(*deadline_action)),
289            }
290        } else {
291            self.mailbox.recv().await
292        };
293
294        // if no action is received, then it means the channel is stopped
295        action.unwrap_or_else(|| {
296            // the worker handle no longer lives - we must remove restartable here to avoid leaks
297            self.config.restartable = false;
298            TelemetryActions::Lifecycle(LifecycleAction::Stop)
299        })
300    }
301
302    async fn dispatch_metrics_logs_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
303        telemetry_worker_log!(self, DEBUG, "Handling metric action {:?}", action);
304        use LifecycleAction::*;
305        use TelemetryActions::*;
306        match action {
307            Lifecycle(Start) => {
308                if !self.data.started {
309                    #[allow(clippy::unwrap_used)]
310                    self.deadlines
311                        .schedule_event(LifecycleAction::FlushMetricAggr)
312                        .unwrap();
313
314                    #[allow(clippy::unwrap_used)]
315                    self.deadlines
316                        .schedule_event(LifecycleAction::FlushData)
317                        .unwrap();
318                    self.data.started = true;
319                }
320            }
321            AddLog((identifier, log)) => {
322                let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
323                if !new {
324                    l.count += 1;
325                }
326            }
327            AddPoint((point, key, extra_tags)) => {
328                self.data.metric_buckets.add_point(key, point, extra_tags)
329            }
330            Lifecycle(FlushMetricAggr) => {
331                self.data.metric_buckets.flush_aggregates();
332
333                #[allow(clippy::unwrap_used)]
334                self.deadlines
335                    .schedule_event(LifecycleAction::FlushMetricAggr)
336                    .unwrap();
337            }
338            Lifecycle(FlushData) => {
339                if !(self.data.started || self.config.restartable) {
340                    return CONTINUE;
341                }
342
343                #[allow(clippy::unwrap_used)]
344                self.deadlines
345                    .schedule_event(LifecycleAction::FlushData)
346                    .unwrap();
347
348                let batch = self.build_observability_batch();
349                if !batch.is_empty() {
350                    let payload = data::Payload::MessageBatch(batch);
351                    match self.send_payload(&payload).await {
352                        Ok(()) => self.payload_sent_success(&payload),
353                        Err(e) => self.log_err(&e),
354                    }
355                }
356            }
357            AddConfig(_)
358            | AddDependency(_)
359            | AddIntegration(_)
360            | AddEndpoint(_)
361            | Lifecycle(ExtendedHeartbeat) => {}
362            Lifecycle(Stop) => {
363                if !self.data.started {
364                    return BREAK;
365                }
366                self.data.metric_buckets.flush_aggregates();
367
368                let observability_events = self.build_observability_batch();
369                if let Err(e) = self
370                    .send_payload(&data::Payload::MessageBatch(observability_events))
371                    .await
372                {
373                    self.log_err(&e);
374                }
375                self.data.started = false;
376                if !self.config.restartable {
377                    self.deadlines.clear_pending();
378                }
379                return BREAK;
380            }
381            CollectStats(stats_sender) => {
382                stats_sender.send(self.stats()).ok();
383            }
384        };
385        CONTINUE
386    }
387
388    async fn dispatch_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
389        telemetry_worker_log!(self, DEBUG, "Handling action {:?}", action);
390
391        use LifecycleAction::*;
392        use TelemetryActions::*;
393        match action {
394            Lifecycle(Start) => {
395                if !self.data.started {
396                    let app_started = data::Payload::AppStarted(self.build_app_started());
397                    match self.send_payload(&app_started).await {
398                        Ok(()) => self.payload_sent_success(&app_started),
399                        Err(err) => self.log_err(&err),
400                    }
401
402                    #[allow(clippy::unwrap_used)]
403                    self.deadlines
404                        .schedule_event(LifecycleAction::FlushMetricAggr)
405                        .unwrap();
406
407                    #[allow(clippy::unwrap_used)]
408                    // flush data should be last to previously flushed metrics are sent
409                    self.deadlines
410                        .schedule_event(LifecycleAction::FlushData)
411                        .unwrap();
412                    self.data.started = true;
413                }
414            }
415            AddDependency(dep) => self.data.dependencies.insert(dep),
416            AddIntegration(integration) => self.data.integrations.insert(integration),
417            AddConfig(cfg) => self.data.configurations.insert(cfg),
418            AddEndpoint(endpoint) => {
419                self.data.endpoints.insert(endpoint);
420            }
421            AddLog((identifier, log)) => {
422                let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
423                if !new {
424                    l.count += 1;
425                }
426            }
427            AddPoint((point, key, extra_tags)) => {
428                self.data.metric_buckets.add_point(key, point, extra_tags)
429            }
430            Lifecycle(FlushMetricAggr) => {
431                self.data.metric_buckets.flush_aggregates();
432
433                #[allow(clippy::unwrap_used)]
434                self.deadlines
435                    .schedule_event(LifecycleAction::FlushMetricAggr)
436                    .unwrap();
437            }
438            Lifecycle(FlushData) => {
439                if !(self.data.started || self.config.restartable) {
440                    return CONTINUE;
441                }
442
443                #[allow(clippy::unwrap_used)]
444                self.deadlines
445                    .schedule_event(LifecycleAction::FlushData)
446                    .unwrap();
447
448                let mut batch = self.build_app_events_batch();
449                let payload = if batch.is_empty() {
450                    data::Payload::AppHeartbeat(())
451                } else {
452                    batch.push(data::Payload::AppHeartbeat(()));
453                    data::Payload::MessageBatch(batch)
454                };
455                match self.send_payload(&payload).await {
456                    Ok(()) => self.payload_sent_success(&payload),
457                    Err(err) => self.log_err(&err),
458                }
459
460                let batch = self.build_observability_batch();
461                if !batch.is_empty() {
462                    let payload = data::Payload::MessageBatch(batch);
463                    match self.send_payload(&payload).await {
464                        Ok(()) => self.payload_sent_success(&payload),
465                        Err(err) => self.log_err(&err),
466                    }
467                }
468            }
469            Lifecycle(ExtendedHeartbeat) => {
470                self.data.dependencies.unflush_stored();
471                self.data.integrations.unflush_stored();
472                self.data.configurations.unflush_stored();
473
474                let app_started = data::Payload::AppStarted(self.build_app_started());
475                match self.send_payload(&app_started).await {
476                    Ok(()) => self.payload_sent_success(&app_started),
477                    Err(err) => self.log_err(&err),
478                }
479                #[allow(clippy::unwrap_used)]
480                self.deadlines
481                    .schedule_events(
482                        &mut [
483                            LifecycleAction::FlushData,
484                            LifecycleAction::ExtendedHeartbeat,
485                        ]
486                        .into_iter(),
487                    )
488                    .unwrap();
489            }
490            Lifecycle(Stop) => {
491                if !self.data.started {
492                    return BREAK;
493                }
494                self.data.metric_buckets.flush_aggregates();
495
496                let mut app_events = self.build_app_events_batch();
497                app_events.push(data::Payload::AppClosing(()));
498
499                let observability_events = self.build_observability_batch();
500
501                let mut payloads = vec![data::Payload::MessageBatch(app_events)];
502                if !observability_events.is_empty() {
503                    payloads.push(data::Payload::MessageBatch(observability_events));
504                }
505
506                let self_arc = Arc::new(tokio::sync::RwLock::new(&mut *self));
507                let futures = payloads.into_iter().map(|payload| {
508                    let self_arc = self_arc.clone();
509                    async move {
510                        // This is different from the non-functional:
511                        // match self_arc.read().await.send_payload(&payload).await { ... }
512                        // presumably because the temp read guard would live till end of match
513                        let res = {
514                            let self_rguard = self_arc.read().await;
515                            self_rguard.send_payload(&payload).await
516                        };
517                        match res {
518                            Ok(()) => self_arc.write().await.payload_sent_success(&payload),
519                            Err(err) => self_arc.read().await.log_err(&err),
520                        }
521                    }
522                });
523                future::join_all(futures).await;
524
525                self.data.started = false;
526                if !self.config.restartable {
527                    self.deadlines.clear_pending();
528                }
529
530                return BREAK;
531            }
532            CollectStats(stats_sender) => {
533                stats_sender.send(self.stats()).ok();
534            }
535        }
536
537        CONTINUE
538    }
539
540    // Builds telemetry payloads containing lifecycle events
541    fn build_app_events_batch(&mut self) -> Vec<Payload> {
542        let mut payloads = Vec::new();
543
544        if self.data.dependencies.flush_not_empty() {
545            payloads.push(data::Payload::AppDependenciesLoaded(
546                data::AppDependenciesLoaded {
547                    dependencies: self.data.dependencies.unflushed().cloned().collect(),
548                },
549            ))
550        }
551        if self.data.integrations.flush_not_empty() {
552            payloads.push(data::Payload::AppIntegrationsChange(
553                data::AppIntegrationsChange {
554                    integrations: self.data.integrations.unflushed().cloned().collect(),
555                },
556            ))
557        }
558        if self.data.configurations.flush_not_empty() {
559            payloads.push(data::Payload::AppClientConfigurationChange(
560                data::AppClientConfigurationChange {
561                    configuration: self.data.configurations.unflushed().cloned().collect(),
562                },
563            ))
564        }
565        if !self.data.endpoints.is_empty() {
566            payloads.push(data::Payload::AppEndpoints(data::AppEndpoints {
567                is_first: true,
568                endpoints: self
569                    .data
570                    .endpoints
571                    .iter()
572                    .map(|e| e.to_json_value().unwrap_or_default())
573                    .filter(|e| e.is_object())
574                    .collect(),
575            }));
576        }
577        payloads
578    }
579
580    // Builds telemetry payloads containing logs, metrics and distributions
581    fn build_observability_batch(&mut self) -> Vec<Payload> {
582        let mut payloads = Vec::new();
583
584        let logs = self.build_logs();
585        if !logs.logs.is_empty() {
586            payloads.push(data::Payload::Logs(logs));
587        }
588        let metrics = self.build_metrics_series();
589        if !metrics.series.is_empty() {
590            payloads.push(data::Payload::GenerateMetrics(metrics))
591        }
592        let distributions = self.build_metrics_distributions();
593        if !distributions.series.is_empty() {
594            payloads.push(data::Payload::Sketches(distributions))
595        }
596        payloads
597    }
598
599    fn build_metrics_distributions(&mut self) -> data::Distributions {
600        let mut series = Vec::new();
601        let context_guard = self.data.metric_contexts.lock();
602        for (context_key, extra_tags, points) in self.data.metric_buckets.flush_distributions() {
603            let Some(context) = context_guard.read(context_key) else {
604                telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
605                continue;
606            };
607            let mut tags = extra_tags;
608            tags.extend(context.tags.iter().cloned());
609            series.push(data::metrics::Distribution {
610                namespace: context.namespace,
611                metric: context.name.clone(),
612                tags,
613                sketch: data::metrics::SerializedSketch::B64 {
614                    sketch_b64: base64::Engine::encode(
615                        &base64::engine::general_purpose::STANDARD,
616                        points.encode_to_vec(),
617                    ),
618                },
619                common: context.common,
620                _type: context.metric_type,
621                interval: self.metrics_flush_interval.as_secs(),
622            });
623        }
624        data::Distributions { series }
625    }
626
627    fn build_metrics_series(&mut self) -> data::GenerateMetrics {
628        let mut series = Vec::new();
629        let context_guard = self.data.metric_contexts.lock();
630        for (context_key, extra_tags, points) in self.data.metric_buckets.flush_series() {
631            let Some(context) = context_guard.read(context_key) else {
632                telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
633                continue;
634            };
635
636            let mut tags = extra_tags;
637            tags.extend(context.tags.iter().cloned());
638            series.push(data::metrics::Serie {
639                namespace: context.namespace,
640                metric: context.name.clone(),
641                tags,
642                points,
643                common: context.common,
644                _type: context.metric_type,
645                interval: self.metrics_flush_interval.as_secs(),
646            });
647        }
648
649        data::GenerateMetrics { series }
650    }
651
652    fn build_app_started(&mut self) -> data::AppStarted {
653        data::AppStarted {
654            configuration: self.data.configurations.unflushed().cloned().collect(),
655        }
656    }
657
658    fn app_started_sent_success(&mut self, p: &data::AppStarted) {
659        self.data
660            .configurations
661            .removed_flushed(p.configuration.len());
662    }
663
664    fn payload_sent_success(&mut self, payload: &data::Payload) {
665        use data::Payload::*;
666        match payload {
667            AppStarted(p) => self.app_started_sent_success(p),
668            AppExtendedHeartbeat(p) => self.app_started_sent_success(p),
669            AppDependenciesLoaded(p) => {
670                self.data.dependencies.removed_flushed(p.dependencies.len())
671            }
672            AppIntegrationsChange(p) => {
673                self.data.integrations.removed_flushed(p.integrations.len())
674            }
675            AppClientConfigurationChange(p) => self
676                .data
677                .configurations
678                .removed_flushed(p.configuration.len()),
679            AppEndpoints(_) => self.data.endpoints.clear(),
680            MessageBatch(batch) => {
681                for p in batch {
682                    self.payload_sent_success(p);
683                }
684            }
685            Logs(p) => {
686                for _ in &p.logs {
687                    self.data.logs.pop_front();
688                }
689            }
690            AppHeartbeat(()) | AppClosing(()) => {}
691            GenerateMetrics(_) | Sketches(_) => {}
692        }
693    }
694
695    fn build_logs(&self) -> data::Logs {
696        // TODO: change the data model to take a &[Log] so don't have to clone data here
697        let logs = self.data.logs.iter().map(|(_, l)| l.clone()).collect();
698        data::Logs { logs }
699    }
700
701    fn next_seq_id(&self) -> u64 {
702        self.seq_id.fetch_add(1, Ordering::Release)
703    }
704
705    async fn send_payload(&self, payload: &data::Payload) -> anyhow::Result<()> {
706        debug!(
707            worker.runtime_id = %self.runtime_id,
708            payload.type = payload.request_type(),
709            seq_id = self.seq_id.load(Ordering::Acquire),
710            "Sending telemetry payload"
711        );
712        let req = self.build_request(payload)?;
713        let result = self.send_request(req).await;
714        match &result {
715            Ok(resp) => debug!(
716                worker.runtime_id = %self.runtime_id,
717                payload.type = payload.request_type(),
718                response.status = resp.status().as_u16(),
719                "Successfully sent telemetry payload"
720            ),
721            Err(e) => debug!(
722                worker.runtime_id = %self.runtime_id,
723                payload.type = payload.request_type(),
724                error = ?e,
725                "Failed to send telemetry payload"
726            ),
727        }
728        Ok(())
729    }
730
731    fn build_request(&self, payload: &data::Payload) -> anyhow::Result<http_common::HttpRequest> {
732        let seq_id = self.next_seq_id();
733        let tel = Telemetry {
734            api_version: data::ApiVersion::V2,
735            tracer_time: time::SystemTime::UNIX_EPOCH
736                .elapsed()
737                .map_or(0, |d| d.as_secs()),
738            runtime_id: &self.runtime_id,
739            seq_id,
740            host: &self.data.host,
741            origin: None,
742            application: &self.data.app,
743            payload,
744        };
745
746        telemetry_worker_log!(self, DEBUG, "Prepared payload: {:?}", tel);
747
748        let req = http_client::request_builder(&self.config)?
749            .method(http::Method::POST)
750            .header(header::CONTENT_TYPE, serialize::CONTENT_TYPE_VALUE)
751            .header(
752                http_client::header::REQUEST_TYPE,
753                HeaderValue::from_static(payload.request_type()),
754            )
755            .header(
756                http_client::header::API_VERSION,
757                HeaderValue::from_static(data::ApiVersion::V2.to_str()),
758            )
759            .header(
760                http_client::header::LIBRARY_LANGUAGE,
761                // Note: passing by ref here just causes the clone to happen underneath
762                tel.application.language_name.clone(),
763            )
764            .header(
765                http_client::header::LIBRARY_VERSION,
766                tel.application.tracer_version.clone(),
767            );
768
769        let body = http_common::Body::from(serialize::serialize(&tel)?);
770        Ok(req.body(body)?)
771    }
772
773    async fn send_request(
774        &self,
775        req: http_common::HttpRequest,
776    ) -> Result<http_common::HttpResponse, http_common::Error> {
777        let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() {
778            endpoint.timeout_ms
779        } else {
780            libdd_common::Endpoint::DEFAULT_TIMEOUT
781        };
782
783        debug!(
784            worker.runtime_id = %self.runtime_id,
785            http.timeout_ms = timeout_ms,
786            "Sending HTTP request"
787        );
788
789        tokio::select! {
790            _ = self.cancellation_token.cancelled() => {
791                debug!(
792                    worker.runtime_id = %self.runtime_id,
793                    "Telemetry request cancelled"
794                );
795                Err(http_common::Error::Other(anyhow::anyhow!("Request cancelled")))
796            },
797            _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => {
798                debug!(
799                    worker.runtime_id = %self.runtime_id,
800                    http.timeout_ms = timeout_ms,
801                    "Telemetry request timed out"
802                );
803                Err(http_common::Error::Other(anyhow::anyhow!("Request timed out")))
804            },
805            r = self.client.request(req) => {
806                match r {
807                    Ok(resp) => {
808                        Ok(resp)
809                    }
810                    Err(e) => {
811                        Err(e)
812                    },
813                }
814            }
815        }
816    }
817
818    fn stats(&self) -> TelemetryWorkerStats {
819        TelemetryWorkerStats {
820            dependencies_stored: self.data.dependencies.len_stored() as u32,
821            dependencies_unflushed: self.data.dependencies.len_unflushed() as u32,
822            configurations_stored: self.data.configurations.len_stored() as u32,
823            configurations_unflushed: self.data.configurations.len_unflushed() as u32,
824            integrations_stored: self.data.integrations.len_stored() as u32,
825            integrations_unflushed: self.data.integrations.len_unflushed() as u32,
826            logs: self.data.logs.len() as u32,
827            metric_contexts: self.data.metric_contexts.lock().len() as u32,
828            metric_buckets: self.data.metric_buckets.stats(),
829        }
830    }
831}
832
833#[derive(Debug)]
834struct InnerTelemetryShutdown {
835    is_shutdown: Mutex<bool>,
836    condvar: Condvar,
837}
838
839impl InnerTelemetryShutdown {
840    fn wait_for_shutdown(&self) {
841        drop(
842            #[allow(clippy::unwrap_used)]
843            self.condvar
844                .wait_while(self.is_shutdown.lock().unwrap(), |is_shutdown| {
845                    !*is_shutdown
846                })
847                .unwrap(),
848        )
849    }
850
851    #[allow(clippy::unwrap_used)]
852    fn shutdown_finished(&self) {
853        *self.is_shutdown.lock().unwrap() = true;
854        self.condvar.notify_all();
855    }
856}
857
858#[derive(Clone, Debug)]
859/// TelemetryWorkerHandle is a handle which allows interactions with the telemetry worker.
860/// The handle is safe to use across threads.
861///
862/// The worker won't send data to the agent until you call `TelemetryWorkerHandle::send_start`
863///
864/// To stop the worker, call `TelemetryWorkerHandle::send_stop` which trigger flush asynchronously
865/// then `TelemetryWorkerHandle::wait_for_shutdown`
866pub struct TelemetryWorkerHandle {
867    sender: mpsc::Sender<TelemetryActions>,
868    shutdown: Arc<InnerTelemetryShutdown>,
869    cancellation_token: CancellationToken,
870    // Used to spawn cancellation tasks
871    runtime: runtime::Handle,
872
873    contexts: MetricContexts,
874}
875
876impl TelemetryWorkerHandle {
877    pub fn register_metric_context(
878        &self,
879        name: String,
880        tags: Vec<Tag>,
881        metric_type: data::metrics::MetricType,
882        common: bool,
883        namespace: data::metrics::MetricNamespace,
884    ) -> ContextKey {
885        self.contexts
886            .register_metric_context(name, tags, metric_type, common, namespace)
887    }
888
889    pub fn try_send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
890        Ok(self.sender.try_send(msg)?)
891    }
892
893    pub async fn send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
894        Ok(self.sender.send(msg).await?)
895    }
896
897    pub async fn send_msgs<T>(&self, msgs: T) -> anyhow::Result<()>
898    where
899        T: IntoIterator<Item = TelemetryActions>,
900    {
901        for msg in msgs {
902            self.sender.send(msg).await?;
903        }
904
905        Ok(())
906    }
907
908    pub async fn send_msg_timeout(
909        &self,
910        msg: TelemetryActions,
911        timeout: time::Duration,
912    ) -> anyhow::Result<()> {
913        Ok(self.sender.send_timeout(msg, timeout).await?)
914    }
915
916    pub fn send_start(&self) -> anyhow::Result<()> {
917        Ok(self
918            .sender
919            .try_send(TelemetryActions::Lifecycle(LifecycleAction::Start))?)
920    }
921
922    pub fn send_stop(&self) -> anyhow::Result<()> {
923        Ok(self
924            .sender
925            .try_send(TelemetryActions::Lifecycle(LifecycleAction::Stop))?)
926    }
927
928    fn cancel_requests_with_deadline(&self, deadline: time::Instant) {
929        let token = self.cancellation_token.clone();
930        let f = async move {
931            tokio::time::sleep_until(deadline.into()).await;
932            token.cancel()
933        };
934        self.runtime.spawn(f);
935    }
936
937    pub fn wait_for_shutdown_deadline(&self, deadline: time::Instant) {
938        self.cancel_requests_with_deadline(deadline);
939        self.wait_for_shutdown()
940    }
941
942    pub fn add_dependency(&self, name: String, version: Option<String>) -> anyhow::Result<()> {
943        self.sender
944            .try_send(TelemetryActions::AddDependency(Dependency {
945                name,
946                version,
947            }))?;
948        Ok(())
949    }
950
951    pub fn add_integration(
952        &self,
953        name: String,
954        enabled: bool,
955        version: Option<String>,
956        compatible: Option<bool>,
957        auto_enabled: Option<bool>,
958    ) -> anyhow::Result<()> {
959        self.sender
960            .try_send(TelemetryActions::AddIntegration(Integration {
961                name,
962                version,
963                compatible,
964                enabled,
965                auto_enabled,
966            }))?;
967        Ok(())
968    }
969
970    pub fn add_log<T: Hash>(
971        &self,
972        identifier: T,
973        message: String,
974        level: data::LogLevel,
975        stack_trace: Option<String>,
976    ) -> anyhow::Result<()> {
977        let mut hasher = DefaultHasher::new();
978        identifier.hash(&mut hasher);
979        self.sender.try_send(TelemetryActions::AddLog((
980            LogIdentifier {
981                identifier: hasher.finish(),
982            },
983            data::Log {
984                message,
985                level,
986                stack_trace,
987                count: 1,
988                tags: String::new(),
989                is_sensitive: false,
990                is_crash: false,
991            },
992        )))?;
993        Ok(())
994    }
995
996    pub fn add_point(
997        &self,
998        value: f64,
999        context: &ContextKey,
1000        extra_tags: Vec<Tag>,
1001    ) -> anyhow::Result<()> {
1002        self.sender
1003            .try_send(TelemetryActions::AddPoint((value, *context, extra_tags)))?;
1004        Ok(())
1005    }
1006
1007    pub fn wait_for_shutdown(&self) {
1008        self.shutdown.wait_for_shutdown();
1009    }
1010
1011    pub fn stats(&self) -> anyhow::Result<oneshot::Receiver<TelemetryWorkerStats>> {
1012        let (sender, receiver) = oneshot::channel();
1013        self.sender
1014            .try_send(TelemetryActions::CollectStats(sender))?;
1015        Ok(receiver)
1016    }
1017}
1018
1019/// How many dependencies/integrations/configs we keep in memory at most
1020pub const MAX_ITEMS: usize = 5000;
1021
1022#[derive(Debug, Default, Clone, Copy)]
1023pub enum TelemetryWorkerFlavor {
1024    /// Send all telemetry messages including lifecycle events like app-started, heartbeats,
1025    /// dependencies and configurations
1026    #[default]
1027    Full,
1028    /// Only send telemetry data not tied to the lifecycle of the app like logs and metrics
1029    MetricsLogs,
1030}
1031
1032pub struct TelemetryWorkerBuilder {
1033    pub host: Host,
1034    pub application: Application,
1035    pub runtime_id: Option<String>,
1036    pub dependencies: store::Store<data::Dependency>,
1037    pub integrations: store::Store<data::Integration>,
1038    pub configurations: store::Store<data::Configuration>,
1039    pub endpoints: HashSet<data::Endpoint>,
1040    pub native_deps: bool,
1041    pub rust_shared_lib_deps: bool,
1042    pub config: Config,
1043    pub flavor: TelemetryWorkerFlavor,
1044}
1045
1046impl TelemetryWorkerBuilder {
1047    /// Creates a new telemetry worker builder and infer host information automatically
1048    pub fn new_fetch_host(
1049        service_name: String,
1050        language_name: String,
1051        language_version: String,
1052        tracer_version: String,
1053    ) -> Self {
1054        Self {
1055            host: crate::build_host(),
1056            ..Self::new(
1057                String::new(),
1058                service_name,
1059                language_name,
1060                language_version,
1061                tracer_version,
1062            )
1063        }
1064    }
1065
1066    /// Creates a new telemetry worker builder with the given hostname
1067    pub fn new(
1068        hostname: String,
1069        service_name: String,
1070        language_name: String,
1071        language_version: String,
1072        tracer_version: String,
1073    ) -> Self {
1074        Self {
1075            host: Host {
1076                hostname,
1077                ..Default::default()
1078            },
1079            application: Application {
1080                service_name,
1081                language_name,
1082                language_version,
1083                tracer_version,
1084                ..Default::default()
1085            },
1086            runtime_id: None,
1087            dependencies: store::Store::new(MAX_ITEMS),
1088            integrations: store::Store::new(MAX_ITEMS),
1089            configurations: store::Store::new(MAX_ITEMS),
1090            endpoints: HashSet::new(),
1091            native_deps: true,
1092            rust_shared_lib_deps: false,
1093            config: Config::default(),
1094            flavor: TelemetryWorkerFlavor::default(),
1095        }
1096    }
1097
1098    /// Build the corresponding worker and it's handle.
1099    /// The runtime handle is wrapped in the worker handle and should be the one used to run the
1100    /// worker task.
1101    pub fn build_worker(self, tokio_runtime: Handle) -> (TelemetryWorkerHandle, TelemetryWorker) {
1102        let (tx, mailbox) = mpsc::channel(5000);
1103        let shutdown = Arc::new(InnerTelemetryShutdown {
1104            is_shutdown: Mutex::new(false),
1105            condvar: Condvar::new(),
1106        });
1107        let contexts = MetricContexts::default();
1108        let token = CancellationToken::new();
1109        let config = self.config;
1110        let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval;
1111        let client = http_client::from_config(&config);
1112
1113        let metrics_flush_interval =
1114            telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL);
1115
1116        #[allow(clippy::unwrap_used)]
1117        let worker = TelemetryWorker {
1118            flavor: self.flavor,
1119            data: TelemetryWorkerData {
1120                started: false,
1121                dependencies: self.dependencies,
1122                integrations: self.integrations,
1123                configurations: self.configurations,
1124                endpoints: self.endpoints,
1125                logs: store::QueueHashMap::default(),
1126                metric_contexts: contexts.clone(),
1127                metric_buckets: MetricBuckets::default(),
1128                host: self.host,
1129                app: self.application,
1130            },
1131            config,
1132            mailbox,
1133            seq_id: AtomicU64::new(1),
1134            runtime_id: self
1135                .runtime_id
1136                .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1137            client,
1138            metrics_flush_interval,
1139            deadlines: scheduler::Scheduler::new(vec![
1140                (metrics_flush_interval, LifecycleAction::FlushMetricAggr),
1141                (telemetry_heartbeat_interval, LifecycleAction::FlushData),
1142                (
1143                    time::Duration::from_secs(60 * 60 * 24),
1144                    LifecycleAction::ExtendedHeartbeat,
1145                ),
1146            ]),
1147            cancellation_token: token.clone(),
1148        };
1149
1150        (
1151            TelemetryWorkerHandle {
1152                sender: tx,
1153                shutdown,
1154                cancellation_token: token,
1155                runtime: tokio_runtime,
1156                contexts,
1157            },
1158            worker,
1159        )
1160    }
1161
1162    /// Spawns a telemetry worker task in the current tokio runtime
1163    /// The worker will capture a reference to the runtime and use it to run its tasks
1164    pub fn spawn(self) -> (TelemetryWorkerHandle, JoinHandle<()>) {
1165        let tokio_runtime = tokio::runtime::Handle::current();
1166
1167        let (worker_handle, mut worker) = self.build_worker(tokio_runtime.clone());
1168
1169        let join_handle = tokio_runtime.spawn(async move { worker.run().await });
1170
1171        (worker_handle, join_handle)
1172    }
1173
1174    /// Spawns a telemetry worker in a new thread and returns a handle to interact with it
1175    pub fn run(self) -> anyhow::Result<TelemetryWorkerHandle> {
1176        let runtime = tokio::runtime::Builder::new_current_thread()
1177            .enable_all()
1178            .build()?;
1179        let (handle, mut worker) = self.build_worker(runtime.handle().clone());
1180        let notify_shutdown = handle.shutdown.clone();
1181        std::thread::spawn(move || {
1182            runtime.block_on(worker.run());
1183            runtime.shutdown_background();
1184            notify_shutdown.shutdown_finished();
1185        });
1186
1187        Ok(handle)
1188    }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193    use crate::worker::TelemetryWorkerHandle;
1194
1195    fn is_send<T: Send>(_: T) {}
1196    fn is_sync<T: Sync>(_: T) {}
1197
1198    #[test]
1199    fn test_handle_sync_send() {
1200        #[allow(clippy::redundant_closure)]
1201        let _ = |h: TelemetryWorkerHandle| is_send(h);
1202        #[allow(clippy::redundant_closure)]
1203        let _ = |h: TelemetryWorkerHandle| is_sync(h);
1204    }
1205}