libdd_telemetry/worker/
mod.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod http_client;
5mod scheduler;
6pub mod store;
7
8use crate::{
9    config::Config,
10    data::{self, Application, Dependency, Host, Integration, Log, Payload, Telemetry},
11    metrics::{ContextKey, MetricBuckets, MetricContexts},
12};
13use libdd_common::Endpoint;
14use libdd_common::{hyper_migration, tag::Tag, worker::Worker};
15
16use std::fmt::Debug;
17use std::iter::Sum;
18use std::ops::Add;
19use std::{
20    collections::hash_map::DefaultHasher,
21    hash::{Hash, Hasher},
22    ops::ControlFlow,
23    sync::{
24        atomic::{AtomicU64, Ordering},
25        Arc, Condvar, Mutex,
26    },
27    time,
28};
29
30use crate::metrics::MetricBucketStats;
31use futures::{
32    channel::oneshot,
33    future::{self},
34};
35use http::{header, HeaderValue};
36use serde::{Deserialize, Serialize};
37use tokio::{
38    runtime::{self, Handle},
39    sync::mpsc,
40    task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use tracing::debug;
44
45const CONTINUE: ControlFlow<()> = ControlFlow::Continue(());
46const BREAK: ControlFlow<()> = ControlFlow::Break(());
47
48fn time_now() -> f64 {
49    #[allow(clippy::unwrap_used)]
50    std::time::SystemTime::UNIX_EPOCH
51        .elapsed()
52        .unwrap_or_default()
53        .as_secs_f64()
54}
55
56macro_rules! telemetry_worker_log {
57    ($worker:expr , ERROR , $fmt_str:tt, $($arg:tt)*) => {
58        {
59            debug!(
60                worker.runtime_id = %$worker.runtime_id,
61                worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
62                $fmt_str,
63                $($arg)*
64            );
65            if $worker.config.telemetry_debug_logging_enabled {
66                eprintln!(concat!("{}: Telemetry worker ERROR: ", $fmt_str), time_now(), $($arg)*);
67            }
68        }
69    };
70    ($worker:expr , DEBUG , $fmt_str:tt, $($arg:tt)*) => {
71        {
72            debug!(
73                worker.runtime_id = %$worker.runtime_id,
74                worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
75                $fmt_str,
76                $($arg)*
77            );
78            if $worker.config.telemetry_debug_logging_enabled {
79                println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*);
80            }
81        }
82    };
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub enum TelemetryActions {
87    AddPoint((f64, ContextKey, Vec<Tag>)),
88    AddConfig(data::Configuration),
89    AddDependency(Dependency),
90    AddIntegration(Integration),
91    AddLog((LogIdentifier, Log)),
92    Lifecycle(LifecycleAction),
93    #[serde(skip)]
94    CollectStats(oneshot::Sender<TelemetryWorkerStats>),
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98pub enum LifecycleAction {
99    Start,
100    Stop,
101    FlushMetricAggr,
102    FlushData,
103    ExtendedHeartbeat,
104}
105
106/// Identifies a logging location uniquely
107///
108/// The identifier is a single 64 bit integer to save space an memory
109/// and to be able to generic on the way different languages handle
110#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
111pub struct LogIdentifier {
112    // Collisions? Never heard of them
113    pub identifier: u64,
114}
115
116// Holds the current state of the telemetry worker
117#[derive(Debug)]
118struct TelemetryWorkerData {
119    started: bool,
120    dependencies: store::Store<Dependency>,
121    configurations: store::Store<data::Configuration>,
122    integrations: store::Store<data::Integration>,
123    logs: store::QueueHashMap<LogIdentifier, Log>,
124    metric_contexts: MetricContexts,
125    metric_buckets: MetricBuckets,
126    host: Host,
127    app: Application,
128}
129
130pub struct TelemetryWorker {
131    flavor: TelemetryWorkerFlavor,
132    config: Config,
133    mailbox: mpsc::Receiver<TelemetryActions>,
134    cancellation_token: CancellationToken,
135    seq_id: AtomicU64,
136    runtime_id: String,
137    client: Box<dyn http_client::HttpClient + Sync + Send>,
138    deadlines: scheduler::Scheduler<LifecycleAction>,
139    data: TelemetryWorkerData,
140}
141impl Debug for TelemetryWorker {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("TelemetryWorker")
144            .field("flavor", &self.flavor)
145            .field("config", &self.config)
146            .field("mailbox", &self.mailbox)
147            .field("cancellation_token", &self.cancellation_token)
148            .field("seq_id", &self.seq_id)
149            .field("runtime_id", &self.runtime_id)
150            .field("deadlines", &self.deadlines)
151            .field("data", &self.data)
152            .finish()
153    }
154}
155
156impl Worker for TelemetryWorker {
157    // Runs a state machine that waits for actions, either from the worker's
158    // mailbox, or scheduled actions from the worker's deadline object.
159    async fn run(&mut self) {
160        debug!(
161            worker.flavor = ?self.flavor,
162            worker.runtime_id = %self.runtime_id,
163            "Starting telemetry worker"
164        );
165
166        loop {
167            if self.cancellation_token.is_cancelled() {
168                debug!(
169                    worker.runtime_id = %self.runtime_id,
170                    "Telemetry worker cancelled, shutting down"
171                );
172                return;
173            }
174
175            let action = self.recv_next_action().await;
176            debug!(
177                worker.runtime_id = %self.runtime_id,
178                action = ?action,
179                "Received telemetry action"
180            );
181
182            let action_result = match self.flavor {
183                TelemetryWorkerFlavor::Full => self.dispatch_action(action).await,
184                TelemetryWorkerFlavor::MetricsLogs => {
185                    self.dispatch_metrics_logs_action(action).await
186                }
187            };
188
189            match action_result {
190                ControlFlow::Continue(()) => {}
191                ControlFlow::Break(()) => {
192                    debug!(
193                        worker.runtime_id = %self.runtime_id,
194                        worker.restartable = self.config.restartable,
195                        "Telemetry worker received break signal"
196                    );
197                    if !self.config.restartable {
198                        break;
199                    }
200                }
201            };
202        }
203
204        debug!(
205            worker.runtime_id = %self.runtime_id,
206            "Telemetry worker stopped"
207        );
208    }
209}
210
211#[derive(Debug, Default, Serialize, Deserialize)]
212pub struct TelemetryWorkerStats {
213    pub dependencies_stored: u32,
214    pub dependencies_unflushed: u32,
215    pub configurations_stored: u32,
216    pub configurations_unflushed: u32,
217    pub integrations_stored: u32,
218    pub integrations_unflushed: u32,
219    pub logs: u32,
220    pub metric_contexts: u32,
221    pub metric_buckets: MetricBucketStats,
222}
223
224impl Add for TelemetryWorkerStats {
225    type Output = Self;
226
227    fn add(self, rhs: Self) -> Self::Output {
228        TelemetryWorkerStats {
229            dependencies_stored: self.dependencies_stored + rhs.dependencies_stored,
230            dependencies_unflushed: self.dependencies_unflushed + rhs.dependencies_unflushed,
231            configurations_stored: self.configurations_stored + rhs.configurations_stored,
232            configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed,
233            integrations_stored: self.integrations_stored + rhs.integrations_stored,
234            integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed,
235            logs: self.logs + rhs.logs,
236            metric_contexts: self.metric_contexts + rhs.metric_contexts,
237            metric_buckets: MetricBucketStats {
238                buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets,
239                series: self.metric_buckets.series + rhs.metric_buckets.series,
240                series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points,
241                distributions: self.metric_buckets.distributions
242                    + self.metric_buckets.distributions,
243                distributions_points: self.metric_buckets.distributions_points
244                    + self.metric_buckets.distributions_points,
245            },
246        }
247    }
248}
249
250impl Sum for TelemetryWorkerStats {
251    fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
252        iter.fold(Self::default(), |a, b| a + b)
253    }
254}
255
256mod serialize {
257    use crate::data;
258    use http::HeaderValue;
259    #[allow(clippy::declare_interior_mutable_const)]
260    pub const CONTENT_TYPE_VALUE: HeaderValue = libdd_common::header::APPLICATION_JSON;
261    pub fn serialize(telemetry: &data::Telemetry) -> anyhow::Result<Vec<u8>> {
262        Ok(serde_json::to_vec(telemetry)?)
263    }
264}
265
266impl TelemetryWorker {
267    fn log_err(&self, err: &anyhow::Error) {
268        telemetry_worker_log!(self, ERROR, "{}", err);
269    }
270
271    async fn recv_next_action(&mut self) -> TelemetryActions {
272        let action = if let Some((deadline, deadline_action)) = self.deadlines.next_deadline() {
273            // If deadline passed, directly return associated action
274            if deadline
275                .checked_duration_since(time::Instant::now())
276                .is_none()
277            {
278                return TelemetryActions::Lifecycle(*deadline_action);
279            };
280
281            // Otherwise run it in a timeout against the mailbox
282            match tokio::time::timeout_at(deadline.into(), self.mailbox.recv()).await {
283                Ok(mailbox_action) => mailbox_action,
284                Err(_) => Some(TelemetryActions::Lifecycle(*deadline_action)),
285            }
286        } else {
287            self.mailbox.recv().await
288        };
289
290        // if no action is received, then it means the channel is stopped
291        action.unwrap_or_else(|| {
292            // the worker handle no longer lives - we must remove restartable here to avoid leaks
293            self.config.restartable = false;
294            TelemetryActions::Lifecycle(LifecycleAction::Stop)
295        })
296    }
297
298    async fn dispatch_metrics_logs_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
299        telemetry_worker_log!(self, DEBUG, "Handling metric action {:?}", action);
300        use LifecycleAction::*;
301        use TelemetryActions::*;
302        match action {
303            Lifecycle(Start) => {
304                if !self.data.started {
305                    #[allow(clippy::unwrap_used)]
306                    self.deadlines
307                        .schedule_event(LifecycleAction::FlushMetricAggr)
308                        .unwrap();
309
310                    #[allow(clippy::unwrap_used)]
311                    self.deadlines
312                        .schedule_event(LifecycleAction::FlushData)
313                        .unwrap();
314                    self.data.started = true;
315                }
316            }
317            AddLog((identifier, log)) => {
318                let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
319                if !new {
320                    l.count += 1;
321                }
322            }
323            AddPoint((point, key, extra_tags)) => {
324                self.data.metric_buckets.add_point(key, point, extra_tags)
325            }
326            Lifecycle(FlushMetricAggr) => {
327                self.data.metric_buckets.flush_aggregates();
328
329                #[allow(clippy::unwrap_used)]
330                self.deadlines
331                    .schedule_event(LifecycleAction::FlushMetricAggr)
332                    .unwrap();
333            }
334            Lifecycle(FlushData) => {
335                if !(self.data.started || self.config.restartable) {
336                    return CONTINUE;
337                }
338
339                #[allow(clippy::unwrap_used)]
340                self.deadlines
341                    .schedule_event(LifecycleAction::FlushData)
342                    .unwrap();
343
344                let batch = self.build_observability_batch();
345                if !batch.is_empty() {
346                    let payload = data::Payload::MessageBatch(batch);
347                    match self.send_payload(&payload).await {
348                        Ok(()) => self.payload_sent_success(&payload),
349                        Err(e) => self.log_err(&e),
350                    }
351                }
352            }
353            AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {}
354            Lifecycle(Stop) => {
355                if !self.data.started {
356                    return BREAK;
357                }
358                self.data.metric_buckets.flush_aggregates();
359
360                let observability_events = self.build_observability_batch();
361                if let Err(e) = self
362                    .send_payload(&data::Payload::MessageBatch(observability_events))
363                    .await
364                {
365                    self.log_err(&e);
366                }
367                self.data.started = false;
368                if !self.config.restartable {
369                    self.deadlines.clear_pending();
370                }
371                return BREAK;
372            }
373            CollectStats(stats_sender) => {
374                stats_sender.send(self.stats()).ok();
375            }
376        };
377        CONTINUE
378    }
379
380    async fn dispatch_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
381        telemetry_worker_log!(self, DEBUG, "Handling action {:?}", action);
382
383        use LifecycleAction::*;
384        use TelemetryActions::*;
385        match action {
386            Lifecycle(Start) => {
387                if !self.data.started {
388                    let app_started = data::Payload::AppStarted(self.build_app_started());
389                    match self.send_payload(&app_started).await {
390                        Ok(()) => self.payload_sent_success(&app_started),
391                        Err(err) => self.log_err(&err),
392                    }
393
394                    #[allow(clippy::unwrap_used)]
395                    self.deadlines
396                        .schedule_event(LifecycleAction::FlushMetricAggr)
397                        .unwrap();
398
399                    #[allow(clippy::unwrap_used)]
400                    // flush data should be last to previously flushed metrics are sent
401                    self.deadlines
402                        .schedule_event(LifecycleAction::FlushData)
403                        .unwrap();
404                    self.data.started = true;
405                }
406            }
407            AddDependency(dep) => self.data.dependencies.insert(dep),
408            AddIntegration(integration) => self.data.integrations.insert(integration),
409            AddConfig(cfg) => self.data.configurations.insert(cfg),
410            AddLog((identifier, log)) => {
411                let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
412                if !new {
413                    l.count += 1;
414                }
415            }
416            AddPoint((point, key, extra_tags)) => {
417                self.data.metric_buckets.add_point(key, point, extra_tags)
418            }
419            Lifecycle(FlushMetricAggr) => {
420                self.data.metric_buckets.flush_aggregates();
421
422                #[allow(clippy::unwrap_used)]
423                self.deadlines
424                    .schedule_event(LifecycleAction::FlushMetricAggr)
425                    .unwrap();
426            }
427            Lifecycle(FlushData) => {
428                if !(self.data.started || self.config.restartable) {
429                    return CONTINUE;
430                }
431
432                #[allow(clippy::unwrap_used)]
433                self.deadlines
434                    .schedule_event(LifecycleAction::FlushData)
435                    .unwrap();
436
437                let mut batch = self.build_app_events_batch();
438                let payload = if batch.is_empty() {
439                    data::Payload::AppHeartbeat(())
440                } else {
441                    batch.push(data::Payload::AppHeartbeat(()));
442                    data::Payload::MessageBatch(batch)
443                };
444                match self.send_payload(&payload).await {
445                    Ok(()) => self.payload_sent_success(&payload),
446                    Err(err) => self.log_err(&err),
447                }
448
449                let batch = self.build_observability_batch();
450                if !batch.is_empty() {
451                    let payload = data::Payload::MessageBatch(batch);
452                    match self.send_payload(&payload).await {
453                        Ok(()) => self.payload_sent_success(&payload),
454                        Err(err) => self.log_err(&err),
455                    }
456                }
457            }
458            Lifecycle(ExtendedHeartbeat) => {
459                self.data.dependencies.unflush_stored();
460                self.data.integrations.unflush_stored();
461                self.data.configurations.unflush_stored();
462
463                let app_started = data::Payload::AppStarted(self.build_app_started());
464                match self.send_payload(&app_started).await {
465                    Ok(()) => self.payload_sent_success(&app_started),
466                    Err(err) => self.log_err(&err),
467                }
468                #[allow(clippy::unwrap_used)]
469                self.deadlines
470                    .schedule_events(
471                        &mut [
472                            LifecycleAction::FlushData,
473                            LifecycleAction::ExtendedHeartbeat,
474                        ]
475                        .into_iter(),
476                    )
477                    .unwrap();
478            }
479            Lifecycle(Stop) => {
480                if !self.data.started {
481                    return BREAK;
482                }
483                self.data.metric_buckets.flush_aggregates();
484
485                let mut app_events = self.build_app_events_batch();
486                app_events.push(data::Payload::AppClosing(()));
487
488                let observability_events = self.build_observability_batch();
489
490                let mut payloads = vec![data::Payload::MessageBatch(app_events)];
491                if !observability_events.is_empty() {
492                    payloads.push(data::Payload::MessageBatch(observability_events));
493                }
494
495                let self_arc = Arc::new(tokio::sync::RwLock::new(&mut *self));
496                let futures = payloads.into_iter().map(|payload| {
497                    let self_arc = self_arc.clone();
498                    async move {
499                        // This is different from the non-functional:
500                        // match self_arc.read().await.send_payload(&payload).await { ... }
501                        // presumably because the temp read guard would live till end of match
502                        let res = {
503                            let self_rguard = self_arc.read().await;
504                            self_rguard.send_payload(&payload).await
505                        };
506                        match res {
507                            Ok(()) => self_arc.write().await.payload_sent_success(&payload),
508                            Err(err) => self_arc.read().await.log_err(&err),
509                        }
510                    }
511                });
512                future::join_all(futures).await;
513
514                self.data.started = false;
515                if !self.config.restartable {
516                    self.deadlines.clear_pending();
517                }
518
519                return BREAK;
520            }
521            CollectStats(stats_sender) => {
522                stats_sender.send(self.stats()).ok();
523            }
524        }
525
526        CONTINUE
527    }
528
529    // Builds telemetry payloads containing lifecycle events
530    fn build_app_events_batch(&mut self) -> Vec<Payload> {
531        let mut payloads = Vec::new();
532
533        if self.data.dependencies.flush_not_empty() {
534            payloads.push(data::Payload::AppDependenciesLoaded(
535                data::AppDependenciesLoaded {
536                    dependencies: self.data.dependencies.unflushed().cloned().collect(),
537                },
538            ))
539        }
540        if self.data.integrations.flush_not_empty() {
541            payloads.push(data::Payload::AppIntegrationsChange(
542                data::AppIntegrationsChange {
543                    integrations: self.data.integrations.unflushed().cloned().collect(),
544                },
545            ))
546        }
547        if self.data.configurations.flush_not_empty() {
548            payloads.push(data::Payload::AppClientConfigurationChange(
549                data::AppClientConfigurationChange {
550                    configuration: self.data.configurations.unflushed().cloned().collect(),
551                },
552            ))
553        }
554        payloads
555    }
556
557    // Builds telemetry payloads containing logs, metrics and distributions
558    fn build_observability_batch(&mut self) -> Vec<Payload> {
559        let mut payloads = Vec::new();
560
561        let logs = self.build_logs();
562        if !logs.is_empty() {
563            payloads.push(data::Payload::Logs(logs));
564        }
565        let metrics = self.build_metrics_series();
566        if !metrics.series.is_empty() {
567            payloads.push(data::Payload::GenerateMetrics(metrics))
568        }
569        let distributions = self.build_metrics_distributions();
570        if !distributions.series.is_empty() {
571            payloads.push(data::Payload::Sketches(distributions))
572        }
573        payloads
574    }
575
576    fn build_metrics_distributions(&mut self) -> data::Distributions {
577        let mut series = Vec::new();
578        let context_guard = self.data.metric_contexts.lock();
579        for (context_key, extra_tags, points) in self.data.metric_buckets.flush_distributions() {
580            let Some(context) = context_guard.read(context_key) else {
581                telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
582                continue;
583            };
584            let mut tags = extra_tags;
585            tags.extend(context.tags.iter().cloned());
586            series.push(data::metrics::Distribution {
587                namespace: context.namespace,
588                metric: context.name.clone(),
589                tags,
590                sketch: data::metrics::SerializedSketch::B64 {
591                    sketch_b64: base64::Engine::encode(
592                        &base64::engine::general_purpose::STANDARD,
593                        points.encode_to_vec(),
594                    ),
595                },
596                common: context.common,
597                _type: context.metric_type,
598                interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(),
599            });
600        }
601        data::Distributions { series }
602    }
603
604    fn build_metrics_series(&mut self) -> data::GenerateMetrics {
605        let mut series = Vec::new();
606        let context_guard = self.data.metric_contexts.lock();
607        for (context_key, extra_tags, points) in self.data.metric_buckets.flush_series() {
608            let Some(context) = context_guard.read(context_key) else {
609                telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
610                continue;
611            };
612
613            let mut tags = extra_tags;
614            tags.extend(context.tags.iter().cloned());
615            series.push(data::metrics::Serie {
616                namespace: context.namespace,
617                metric: context.name.clone(),
618                tags,
619                points,
620                common: context.common,
621                _type: context.metric_type,
622                interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(),
623            });
624        }
625
626        data::GenerateMetrics { series }
627    }
628
629    fn build_app_started(&mut self) -> data::AppStarted {
630        data::AppStarted {
631            configuration: self.data.configurations.unflushed().cloned().collect(),
632        }
633    }
634
635    fn app_started_sent_success(&mut self, p: &data::AppStarted) {
636        self.data
637            .configurations
638            .removed_flushed(p.configuration.len());
639    }
640
641    fn payload_sent_success(&mut self, payload: &data::Payload) {
642        use data::Payload::*;
643        match payload {
644            AppStarted(p) => self.app_started_sent_success(p),
645            AppExtendedHeartbeat(p) => self.app_started_sent_success(p),
646            AppDependenciesLoaded(p) => {
647                self.data.dependencies.removed_flushed(p.dependencies.len())
648            }
649            AppIntegrationsChange(p) => {
650                self.data.integrations.removed_flushed(p.integrations.len())
651            }
652            AppClientConfigurationChange(p) => self
653                .data
654                .configurations
655                .removed_flushed(p.configuration.len()),
656            MessageBatch(batch) => {
657                for p in batch {
658                    self.payload_sent_success(p);
659                }
660            }
661            Logs(p) => {
662                for _ in p {
663                    self.data.logs.pop_front();
664                }
665            }
666            AppHeartbeat(()) | AppClosing(()) => {}
667            GenerateMetrics(_) | Sketches(_) => {}
668        }
669    }
670
671    fn build_logs(&self) -> Vec<Log> {
672        // TODO: change the data model to take a &[Log] so don't have to clone data here
673        let logs = self.data.logs.iter().map(|(_, l)| l.clone()).collect();
674        logs
675    }
676
677    fn next_seq_id(&self) -> u64 {
678        self.seq_id.fetch_add(1, Ordering::Release)
679    }
680
681    async fn send_payload(&self, payload: &data::Payload) -> anyhow::Result<()> {
682        debug!(
683            worker.runtime_id = %self.runtime_id,
684            payload.type = payload.request_type(),
685            seq_id = self.seq_id.load(Ordering::Acquire),
686            "Sending telemetry payload"
687        );
688        let req = self.build_request(payload)?;
689        let result = self.send_request(req).await;
690        match &result {
691            Ok(resp) => debug!(
692                worker.runtime_id = %self.runtime_id,
693                payload.type = payload.request_type(),
694                response.status = resp.status().as_u16(),
695                "Successfully sent telemetry payload"
696            ),
697            Err(e) => debug!(
698                worker.runtime_id = %self.runtime_id,
699                payload.type = payload.request_type(),
700                error = ?e,
701                "Failed to send telemetry payload"
702            ),
703        }
704        Ok(())
705    }
706
707    fn build_request(
708        &self,
709        payload: &data::Payload,
710    ) -> anyhow::Result<hyper_migration::HttpRequest> {
711        let seq_id = self.next_seq_id();
712        let tel = Telemetry {
713            api_version: data::ApiVersion::V2,
714            tracer_time: time::SystemTime::UNIX_EPOCH
715                .elapsed()
716                .map_or(0, |d| d.as_secs()),
717            runtime_id: &self.runtime_id,
718            seq_id,
719            host: &self.data.host,
720            origin: None,
721            application: &self.data.app,
722            payload,
723        };
724
725        telemetry_worker_log!(self, DEBUG, "Prepared payload: {:?}", tel);
726
727        let req = http_client::request_builder(&self.config)?
728            .method(http::Method::POST)
729            .header(header::CONTENT_TYPE, serialize::CONTENT_TYPE_VALUE)
730            .header(
731                http_client::header::REQUEST_TYPE,
732                HeaderValue::from_static(payload.request_type()),
733            )
734            .header(
735                http_client::header::API_VERSION,
736                HeaderValue::from_static(data::ApiVersion::V2.to_str()),
737            )
738            .header(
739                http_client::header::LIBRARY_LANGUAGE,
740                // Note: passing by ref here just causes the clone to happen underneath
741                tel.application.language_name.clone(),
742            )
743            .header(
744                http_client::header::LIBRARY_VERSION,
745                tel.application.tracer_version.clone(),
746            );
747
748        let body = hyper_migration::Body::from(serialize::serialize(&tel)?);
749        Ok(req.body(body)?)
750    }
751
752    async fn send_request(
753        &self,
754        req: hyper_migration::HttpRequest,
755    ) -> Result<hyper_migration::HttpResponse, hyper_migration::Error> {
756        let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() {
757            endpoint.timeout_ms
758        } else {
759            Endpoint::DEFAULT_TIMEOUT
760        };
761
762        debug!(
763            worker.runtime_id = %self.runtime_id,
764            http.timeout_ms = timeout_ms,
765            "Sending HTTP request"
766        );
767
768        tokio::select! {
769            _ = self.cancellation_token.cancelled() => {
770                debug!(
771                    worker.runtime_id = %self.runtime_id,
772                    "Telemetry request cancelled"
773                );
774                Err(hyper_migration::Error::Other(anyhow::anyhow!("Request cancelled")))
775            },
776            _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => {
777                debug!(
778                    worker.runtime_id = %self.runtime_id,
779                    http.timeout_ms = timeout_ms,
780                    "Telemetry request timed out"
781                );
782                Err(hyper_migration::Error::Other(anyhow::anyhow!("Request timed out")))
783            },
784            r = self.client.request(req) => {
785                match r {
786                    Ok(resp) => {
787                        Ok(resp)
788                    }
789                    Err(e) => {
790                        Err(e)
791                    },
792                }
793            }
794        }
795    }
796
797    fn stats(&self) -> TelemetryWorkerStats {
798        TelemetryWorkerStats {
799            dependencies_stored: self.data.dependencies.len_stored() as u32,
800            dependencies_unflushed: self.data.dependencies.len_unflushed() as u32,
801            configurations_stored: self.data.configurations.len_stored() as u32,
802            configurations_unflushed: self.data.configurations.len_unflushed() as u32,
803            integrations_stored: self.data.integrations.len_stored() as u32,
804            integrations_unflushed: self.data.integrations.len_unflushed() as u32,
805            logs: self.data.logs.len() as u32,
806            metric_contexts: self.data.metric_contexts.lock().len() as u32,
807            metric_buckets: self.data.metric_buckets.stats(),
808        }
809    }
810}
811
812#[derive(Debug)]
813struct InnerTelemetryShutdown {
814    is_shutdown: Mutex<bool>,
815    condvar: Condvar,
816}
817
818impl InnerTelemetryShutdown {
819    fn wait_for_shutdown(&self) {
820        drop(
821            #[allow(clippy::unwrap_used)]
822            self.condvar
823                .wait_while(self.is_shutdown.lock().unwrap(), |is_shutdown| {
824                    !*is_shutdown
825                })
826                .unwrap(),
827        )
828    }
829
830    #[allow(clippy::unwrap_used)]
831    fn shutdown_finished(&self) {
832        *self.is_shutdown.lock().unwrap() = true;
833        self.condvar.notify_all();
834    }
835}
836
837#[derive(Clone, Debug)]
838/// TelemetryWorkerHandle is a handle which allows interactions with the telemetry worker.
839/// The handle is safe to use across threads.
840///
841/// The worker won't send data to the agent until you call `TelemetryWorkerHandle::send_start`
842///
843/// To stop the worker, call `TelemetryWorkerHandle::send_stop` which trigger flush asynchronously
844/// then `TelemetryWorkerHandle::wait_for_shutdown`
845pub struct TelemetryWorkerHandle {
846    sender: mpsc::Sender<TelemetryActions>,
847    shutdown: Arc<InnerTelemetryShutdown>,
848    cancellation_token: CancellationToken,
849    // Used to spawn cancellation tasks
850    runtime: runtime::Handle,
851
852    contexts: MetricContexts,
853}
854
855impl TelemetryWorkerHandle {
856    pub fn register_metric_context(
857        &self,
858        name: String,
859        tags: Vec<Tag>,
860        metric_type: data::metrics::MetricType,
861        common: bool,
862        namespace: data::metrics::MetricNamespace,
863    ) -> ContextKey {
864        self.contexts
865            .register_metric_context(name, tags, metric_type, common, namespace)
866    }
867
868    pub fn try_send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
869        Ok(self.sender.try_send(msg)?)
870    }
871
872    pub async fn send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
873        Ok(self.sender.send(msg).await?)
874    }
875
876    pub async fn send_msgs<T>(&self, msgs: T) -> anyhow::Result<()>
877    where
878        T: IntoIterator<Item = TelemetryActions>,
879    {
880        for msg in msgs {
881            self.sender.send(msg).await?;
882        }
883
884        Ok(())
885    }
886
887    pub async fn send_msg_timeout(
888        &self,
889        msg: TelemetryActions,
890        timeout: time::Duration,
891    ) -> anyhow::Result<()> {
892        Ok(self.sender.send_timeout(msg, timeout).await?)
893    }
894
895    pub fn send_start(&self) -> anyhow::Result<()> {
896        Ok(self
897            .sender
898            .try_send(TelemetryActions::Lifecycle(LifecycleAction::Start))?)
899    }
900
901    pub fn send_stop(&self) -> anyhow::Result<()> {
902        Ok(self
903            .sender
904            .try_send(TelemetryActions::Lifecycle(LifecycleAction::Stop))?)
905    }
906
907    fn cancel_requests_with_deadline(&self, deadline: time::Instant) {
908        let token = self.cancellation_token.clone();
909        let f = async move {
910            tokio::time::sleep_until(deadline.into()).await;
911            token.cancel()
912        };
913        self.runtime.spawn(f);
914    }
915
916    pub fn wait_for_shutdown_deadline(&self, deadline: time::Instant) {
917        self.cancel_requests_with_deadline(deadline);
918        self.wait_for_shutdown()
919    }
920
921    pub fn add_dependency(&self, name: String, version: Option<String>) -> anyhow::Result<()> {
922        self.sender
923            .try_send(TelemetryActions::AddDependency(Dependency {
924                name,
925                version,
926            }))?;
927        Ok(())
928    }
929
930    pub fn add_integration(
931        &self,
932        name: String,
933        enabled: bool,
934        version: Option<String>,
935        compatible: Option<bool>,
936        auto_enabled: Option<bool>,
937    ) -> anyhow::Result<()> {
938        self.sender
939            .try_send(TelemetryActions::AddIntegration(Integration {
940                name,
941                version,
942                compatible,
943                enabled,
944                auto_enabled,
945            }))?;
946        Ok(())
947    }
948
949    pub fn add_log<T: Hash>(
950        &self,
951        identifier: T,
952        message: String,
953        level: data::LogLevel,
954        stack_trace: Option<String>,
955    ) -> anyhow::Result<()> {
956        let mut hasher = DefaultHasher::new();
957        identifier.hash(&mut hasher);
958        self.sender.try_send(TelemetryActions::AddLog((
959            LogIdentifier {
960                identifier: hasher.finish(),
961            },
962            data::Log {
963                message,
964                level,
965                stack_trace,
966                count: 1,
967                tags: String::new(),
968                is_sensitive: false,
969                is_crash: false,
970            },
971        )))?;
972        Ok(())
973    }
974
975    pub fn add_point(
976        &self,
977        value: f64,
978        context: &ContextKey,
979        extra_tags: Vec<Tag>,
980    ) -> anyhow::Result<()> {
981        self.sender
982            .try_send(TelemetryActions::AddPoint((value, *context, extra_tags)))?;
983        Ok(())
984    }
985
986    pub fn wait_for_shutdown(&self) {
987        self.shutdown.wait_for_shutdown();
988    }
989
990    pub fn stats(&self) -> anyhow::Result<oneshot::Receiver<TelemetryWorkerStats>> {
991        let (sender, receiver) = oneshot::channel();
992        self.sender
993            .try_send(TelemetryActions::CollectStats(sender))?;
994        Ok(receiver)
995    }
996}
997
998/// How many dependencies/integrations/configs we keep in memory at most
999pub const MAX_ITEMS: usize = 5000;
1000
1001#[derive(Debug, Default, Clone, Copy)]
1002pub enum TelemetryWorkerFlavor {
1003    /// Send all telemetry messages including lifecycle events like app-started, heartbeats,
1004    /// dependencies and configurations
1005    #[default]
1006    Full,
1007    /// Only send telemetry data not tied to the lifecycle of the app like logs and metrics
1008    MetricsLogs,
1009}
1010
1011pub struct TelemetryWorkerBuilder {
1012    pub host: Host,
1013    pub application: Application,
1014    pub runtime_id: Option<String>,
1015    pub dependencies: store::Store<data::Dependency>,
1016    pub integrations: store::Store<data::Integration>,
1017    pub configurations: store::Store<data::Configuration>,
1018    pub native_deps: bool,
1019    pub rust_shared_lib_deps: bool,
1020    pub config: Config,
1021    pub flavor: TelemetryWorkerFlavor,
1022}
1023
1024impl TelemetryWorkerBuilder {
1025    /// Creates a new telemetry worker builder and infer host information automatically
1026    pub fn new_fetch_host(
1027        service_name: String,
1028        language_name: String,
1029        language_version: String,
1030        tracer_version: String,
1031    ) -> Self {
1032        Self {
1033            host: crate::build_host(),
1034            ..Self::new(
1035                String::new(),
1036                service_name,
1037                language_name,
1038                language_version,
1039                tracer_version,
1040            )
1041        }
1042    }
1043
1044    /// Creates a new telemetry worker builder with the given hostname
1045    pub fn new(
1046        hostname: String,
1047        service_name: String,
1048        language_name: String,
1049        language_version: String,
1050        tracer_version: String,
1051    ) -> Self {
1052        Self {
1053            host: Host {
1054                hostname,
1055                ..Default::default()
1056            },
1057            application: Application {
1058                service_name,
1059                language_name,
1060                language_version,
1061                tracer_version,
1062                ..Default::default()
1063            },
1064            runtime_id: None,
1065            dependencies: store::Store::new(MAX_ITEMS),
1066            integrations: store::Store::new(MAX_ITEMS),
1067            configurations: store::Store::new(MAX_ITEMS),
1068            native_deps: true,
1069            rust_shared_lib_deps: false,
1070            config: Config::default(),
1071            flavor: TelemetryWorkerFlavor::default(),
1072        }
1073    }
1074
1075    /// Build the corresponding worker and it's handle.
1076    /// The runtime handle is wrapped in the worker handle and should be the one used to run the
1077    /// worker task.
1078    pub fn build_worker(self, tokio_runtime: Handle) -> (TelemetryWorkerHandle, TelemetryWorker) {
1079        let (tx, mailbox) = mpsc::channel(5000);
1080        let shutdown = Arc::new(InnerTelemetryShutdown {
1081            is_shutdown: Mutex::new(false),
1082            condvar: Condvar::new(),
1083        });
1084        let contexts = MetricContexts::default();
1085        let token = CancellationToken::new();
1086        let config = self.config;
1087        let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval;
1088        let client = http_client::from_config(&config);
1089
1090        #[allow(clippy::unwrap_used)]
1091        let worker = TelemetryWorker {
1092            flavor: self.flavor,
1093            data: TelemetryWorkerData {
1094                started: false,
1095                dependencies: self.dependencies,
1096                integrations: self.integrations,
1097                configurations: self.configurations,
1098                logs: store::QueueHashMap::default(),
1099                metric_contexts: contexts.clone(),
1100                metric_buckets: MetricBuckets::default(),
1101                host: self.host,
1102                app: self.application,
1103            },
1104            config,
1105            mailbox,
1106            seq_id: AtomicU64::new(1),
1107            runtime_id: self
1108                .runtime_id
1109                .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1110            client,
1111            deadlines: scheduler::Scheduler::new(vec![
1112                (
1113                    MetricBuckets::METRICS_FLUSH_INTERVAL,
1114                    LifecycleAction::FlushMetricAggr,
1115                ),
1116                (telemetry_heartbeat_interval, LifecycleAction::FlushData),
1117                (
1118                    time::Duration::from_secs(60 * 60 * 24),
1119                    LifecycleAction::ExtendedHeartbeat,
1120                ),
1121            ]),
1122            cancellation_token: token.clone(),
1123        };
1124
1125        (
1126            TelemetryWorkerHandle {
1127                sender: tx,
1128                shutdown,
1129                cancellation_token: token,
1130                runtime: tokio_runtime,
1131                contexts,
1132            },
1133            worker,
1134        )
1135    }
1136
1137    /// Spawns a telemetry worker task in the current tokio runtime
1138    /// The worker will capture a reference to the runtime and use it to run its tasks
1139    pub fn spawn(self) -> (TelemetryWorkerHandle, JoinHandle<()>) {
1140        let tokio_runtime = tokio::runtime::Handle::current();
1141
1142        let (worker_handle, mut worker) = self.build_worker(tokio_runtime.clone());
1143
1144        let join_handle = tokio_runtime.spawn(async move { worker.run().await });
1145
1146        (worker_handle, join_handle)
1147    }
1148
1149    /// Spawns a telemetry worker in a new thread and returns a handle to interact with it
1150    pub fn run(self) -> anyhow::Result<TelemetryWorkerHandle> {
1151        let runtime = tokio::runtime::Builder::new_current_thread()
1152            .enable_all()
1153            .build()?;
1154        let (handle, mut worker) = self.build_worker(runtime.handle().clone());
1155        let notify_shutdown = handle.shutdown.clone();
1156        std::thread::spawn(move || {
1157            runtime.block_on(worker.run());
1158            runtime.shutdown_background();
1159            notify_shutdown.shutdown_finished();
1160        });
1161
1162        Ok(handle)
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use crate::worker::TelemetryWorkerHandle;
1169
1170    fn is_send<T: Send>(_: T) {}
1171    fn is_sync<T: Sync>(_: T) {}
1172
1173    #[test]
1174    fn test_handle_sync_send() {
1175        #[allow(clippy::redundant_closure)]
1176        let _ = |h: TelemetryWorkerHandle| is_send(h);
1177        #[allow(clippy::redundant_closure)]
1178        let _ = |h: TelemetryWorkerHandle| is_sync(h);
1179    }
1180}