libdd_telemetry/worker/
mod.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod http_client;
5mod scheduler;
6pub mod store;
7
8use crate::{
9    config::Config,
10    data::{self, Application, Dependency, Host, Integration, Log, Payload, Telemetry},
11    metrics::{ContextKey, MetricBuckets, MetricContexts},
12};
13use libdd_common::Endpoint;
14use libdd_common::{hyper_migration, tag::Tag, worker::Worker};
15
16use std::iter::Sum;
17use std::ops::Add;
18use std::{
19    collections::hash_map::DefaultHasher,
20    hash::{Hash, Hasher},
21    ops::ControlFlow,
22    sync::{
23        atomic::{AtomicU64, Ordering},
24        Arc, Condvar, Mutex,
25    },
26    time,
27};
28use std::{fmt::Debug, time::Duration};
29
30use crate::metrics::MetricBucketStats;
31use futures::{
32    channel::oneshot,
33    future::{self},
34};
35use http::{header, HeaderValue};
36use serde::{Deserialize, Serialize};
37use tokio::{
38    runtime::{self, Handle},
39    sync::mpsc,
40    task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use tracing::debug;
44
45const CONTINUE: ControlFlow<()> = ControlFlow::Continue(());
46const BREAK: ControlFlow<()> = ControlFlow::Break(());
47
48fn time_now() -> f64 {
49    #[allow(clippy::unwrap_used)]
50    std::time::SystemTime::UNIX_EPOCH
51        .elapsed()
52        .unwrap_or_default()
53        .as_secs_f64()
54}
55
56macro_rules! telemetry_worker_log {
57    ($worker:expr , ERROR , $fmt_str:tt, $($arg:tt)*) => {
58        {
59            debug!(
60                worker.runtime_id = %$worker.runtime_id,
61                worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
62                $fmt_str,
63                $($arg)*
64            );
65            if $worker.config.telemetry_debug_logging_enabled {
66                eprintln!(concat!("{}: Telemetry worker ERROR: ", $fmt_str), time_now(), $($arg)*);
67            }
68        }
69    };
70    ($worker:expr , DEBUG , $fmt_str:tt, $($arg:tt)*) => {
71        {
72            debug!(
73                worker.runtime_id = %$worker.runtime_id,
74                worker.debug_logging = $worker.config.telemetry_debug_logging_enabled,
75                $fmt_str,
76                $($arg)*
77            );
78            if $worker.config.telemetry_debug_logging_enabled {
79                println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*);
80            }
81        }
82    };
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub enum TelemetryActions {
87    AddPoint((f64, ContextKey, Vec<Tag>)),
88    AddConfig(data::Configuration),
89    AddDependency(Dependency),
90    AddIntegration(Integration),
91    AddLog((LogIdentifier, Log)),
92    Lifecycle(LifecycleAction),
93    #[serde(skip)]
94    CollectStats(oneshot::Sender<TelemetryWorkerStats>),
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98pub enum LifecycleAction {
99    Start,
100    Stop,
101    FlushMetricAggr,
102    FlushData,
103    ExtendedHeartbeat,
104}
105
106/// Identifies a logging location uniquely
107///
108/// The identifier is a single 64 bit integer to save space an memory
109/// and to be able to generic on the way different languages handle
110#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
111pub struct LogIdentifier {
112    // Collisions? Never heard of them
113    pub identifier: u64,
114}
115
116// Holds the current state of the telemetry worker
117#[derive(Debug)]
118struct TelemetryWorkerData {
119    started: bool,
120    dependencies: store::Store<Dependency>,
121    configurations: store::Store<data::Configuration>,
122    integrations: store::Store<data::Integration>,
123    logs: store::QueueHashMap<LogIdentifier, Log>,
124    metric_contexts: MetricContexts,
125    metric_buckets: MetricBuckets,
126    host: Host,
127    app: Application,
128}
129
130pub struct TelemetryWorker {
131    flavor: TelemetryWorkerFlavor,
132    config: Config,
133    mailbox: mpsc::Receiver<TelemetryActions>,
134    cancellation_token: CancellationToken,
135    seq_id: AtomicU64,
136    runtime_id: String,
137    client: Box<dyn http_client::HttpClient + Sync + Send>,
138    metrics_flush_interval: Duration,
139    deadlines: scheduler::Scheduler<LifecycleAction>,
140    data: TelemetryWorkerData,
141}
142impl Debug for TelemetryWorker {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("TelemetryWorker")
145            .field("flavor", &self.flavor)
146            .field("config", &self.config)
147            .field("mailbox", &self.mailbox)
148            .field("cancellation_token", &self.cancellation_token)
149            .field("seq_id", &self.seq_id)
150            .field("runtime_id", &self.runtime_id)
151            .field("metrics_flush_interval", &self.metrics_flush_interval)
152            .field("deadlines", &self.deadlines)
153            .field("data", &self.data)
154            .finish()
155    }
156}
157
158impl Worker for TelemetryWorker {
159    // Runs a state machine that waits for actions, either from the worker's
160    // mailbox, or scheduled actions from the worker's deadline object.
161    async fn run(&mut self) {
162        debug!(
163            worker.flavor = ?self.flavor,
164            worker.runtime_id = %self.runtime_id,
165            "Starting telemetry worker"
166        );
167
168        loop {
169            if self.cancellation_token.is_cancelled() {
170                debug!(
171                    worker.runtime_id = %self.runtime_id,
172                    "Telemetry worker cancelled, shutting down"
173                );
174                return;
175            }
176
177            let action = self.recv_next_action().await;
178            debug!(
179                worker.runtime_id = %self.runtime_id,
180                action = ?action,
181                "Received telemetry action"
182            );
183
184            let action_result = match self.flavor {
185                TelemetryWorkerFlavor::Full => self.dispatch_action(action).await,
186                TelemetryWorkerFlavor::MetricsLogs => {
187                    self.dispatch_metrics_logs_action(action).await
188                }
189            };
190
191            match action_result {
192                ControlFlow::Continue(()) => {}
193                ControlFlow::Break(()) => {
194                    debug!(
195                        worker.runtime_id = %self.runtime_id,
196                        worker.restartable = self.config.restartable,
197                        "Telemetry worker received break signal"
198                    );
199                    if !self.config.restartable {
200                        break;
201                    }
202                }
203            };
204        }
205
206        debug!(
207            worker.runtime_id = %self.runtime_id,
208            "Telemetry worker stopped"
209        );
210    }
211}
212
213#[derive(Debug, Default, Serialize, Deserialize)]
214pub struct TelemetryWorkerStats {
215    pub dependencies_stored: u32,
216    pub dependencies_unflushed: u32,
217    pub configurations_stored: u32,
218    pub configurations_unflushed: u32,
219    pub integrations_stored: u32,
220    pub integrations_unflushed: u32,
221    pub logs: u32,
222    pub metric_contexts: u32,
223    pub metric_buckets: MetricBucketStats,
224}
225
226impl Add for TelemetryWorkerStats {
227    type Output = Self;
228
229    fn add(self, rhs: Self) -> Self::Output {
230        TelemetryWorkerStats {
231            dependencies_stored: self.dependencies_stored + rhs.dependencies_stored,
232            dependencies_unflushed: self.dependencies_unflushed + rhs.dependencies_unflushed,
233            configurations_stored: self.configurations_stored + rhs.configurations_stored,
234            configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed,
235            integrations_stored: self.integrations_stored + rhs.integrations_stored,
236            integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed,
237            logs: self.logs + rhs.logs,
238            metric_contexts: self.metric_contexts + rhs.metric_contexts,
239            metric_buckets: MetricBucketStats {
240                buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets,
241                series: self.metric_buckets.series + rhs.metric_buckets.series,
242                series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points,
243                distributions: self.metric_buckets.distributions
244                    + self.metric_buckets.distributions,
245                distributions_points: self.metric_buckets.distributions_points
246                    + self.metric_buckets.distributions_points,
247            },
248        }
249    }
250}
251
252impl Sum for TelemetryWorkerStats {
253    fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
254        iter.fold(Self::default(), |a, b| a + b)
255    }
256}
257
258mod serialize {
259    use crate::data;
260    use http::HeaderValue;
261    #[allow(clippy::declare_interior_mutable_const)]
262    pub const CONTENT_TYPE_VALUE: HeaderValue = libdd_common::header::APPLICATION_JSON;
263    pub fn serialize(telemetry: &data::Telemetry) -> anyhow::Result<Vec<u8>> {
264        Ok(serde_json::to_vec(telemetry)?)
265    }
266}
267
268impl TelemetryWorker {
269    fn log_err(&self, err: &anyhow::Error) {
270        telemetry_worker_log!(self, ERROR, "{}", err);
271    }
272
273    async fn recv_next_action(&mut self) -> TelemetryActions {
274        let action = if let Some((deadline, deadline_action)) = self.deadlines.next_deadline() {
275            // If deadline passed, directly return associated action
276            if deadline
277                .checked_duration_since(time::Instant::now())
278                .is_none()
279            {
280                return TelemetryActions::Lifecycle(*deadline_action);
281            };
282
283            // Otherwise run it in a timeout against the mailbox
284            match tokio::time::timeout_at(deadline.into(), self.mailbox.recv()).await {
285                Ok(mailbox_action) => mailbox_action,
286                Err(_) => Some(TelemetryActions::Lifecycle(*deadline_action)),
287            }
288        } else {
289            self.mailbox.recv().await
290        };
291
292        // if no action is received, then it means the channel is stopped
293        action.unwrap_or_else(|| {
294            // the worker handle no longer lives - we must remove restartable here to avoid leaks
295            self.config.restartable = false;
296            TelemetryActions::Lifecycle(LifecycleAction::Stop)
297        })
298    }
299
300    async fn dispatch_metrics_logs_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
301        telemetry_worker_log!(self, DEBUG, "Handling metric action {:?}", action);
302        use LifecycleAction::*;
303        use TelemetryActions::*;
304        match action {
305            Lifecycle(Start) => {
306                if !self.data.started {
307                    #[allow(clippy::unwrap_used)]
308                    self.deadlines
309                        .schedule_event(LifecycleAction::FlushMetricAggr)
310                        .unwrap();
311
312                    #[allow(clippy::unwrap_used)]
313                    self.deadlines
314                        .schedule_event(LifecycleAction::FlushData)
315                        .unwrap();
316                    self.data.started = true;
317                }
318            }
319            AddLog((identifier, log)) => {
320                let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
321                if !new {
322                    l.count += 1;
323                }
324            }
325            AddPoint((point, key, extra_tags)) => {
326                self.data.metric_buckets.add_point(key, point, extra_tags)
327            }
328            Lifecycle(FlushMetricAggr) => {
329                self.data.metric_buckets.flush_aggregates();
330
331                #[allow(clippy::unwrap_used)]
332                self.deadlines
333                    .schedule_event(LifecycleAction::FlushMetricAggr)
334                    .unwrap();
335            }
336            Lifecycle(FlushData) => {
337                if !(self.data.started || self.config.restartable) {
338                    return CONTINUE;
339                }
340
341                #[allow(clippy::unwrap_used)]
342                self.deadlines
343                    .schedule_event(LifecycleAction::FlushData)
344                    .unwrap();
345
346                let batch = self.build_observability_batch();
347                if !batch.is_empty() {
348                    let payload = data::Payload::MessageBatch(batch);
349                    match self.send_payload(&payload).await {
350                        Ok(()) => self.payload_sent_success(&payload),
351                        Err(e) => self.log_err(&e),
352                    }
353                }
354            }
355            AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {}
356            Lifecycle(Stop) => {
357                if !self.data.started {
358                    return BREAK;
359                }
360                self.data.metric_buckets.flush_aggregates();
361
362                let observability_events = self.build_observability_batch();
363                if let Err(e) = self
364                    .send_payload(&data::Payload::MessageBatch(observability_events))
365                    .await
366                {
367                    self.log_err(&e);
368                }
369                self.data.started = false;
370                if !self.config.restartable {
371                    self.deadlines.clear_pending();
372                }
373                return BREAK;
374            }
375            CollectStats(stats_sender) => {
376                stats_sender.send(self.stats()).ok();
377            }
378        };
379        CONTINUE
380    }
381
382    async fn dispatch_action(&mut self, action: TelemetryActions) -> ControlFlow<()> {
383        telemetry_worker_log!(self, DEBUG, "Handling action {:?}", action);
384
385        use LifecycleAction::*;
386        use TelemetryActions::*;
387        match action {
388            Lifecycle(Start) => {
389                if !self.data.started {
390                    let app_started = data::Payload::AppStarted(self.build_app_started());
391                    match self.send_payload(&app_started).await {
392                        Ok(()) => self.payload_sent_success(&app_started),
393                        Err(err) => self.log_err(&err),
394                    }
395
396                    #[allow(clippy::unwrap_used)]
397                    self.deadlines
398                        .schedule_event(LifecycleAction::FlushMetricAggr)
399                        .unwrap();
400
401                    #[allow(clippy::unwrap_used)]
402                    // flush data should be last to previously flushed metrics are sent
403                    self.deadlines
404                        .schedule_event(LifecycleAction::FlushData)
405                        .unwrap();
406                    self.data.started = true;
407                }
408            }
409            AddDependency(dep) => self.data.dependencies.insert(dep),
410            AddIntegration(integration) => self.data.integrations.insert(integration),
411            AddConfig(cfg) => self.data.configurations.insert(cfg),
412            AddLog((identifier, log)) => {
413                let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
414                if !new {
415                    l.count += 1;
416                }
417            }
418            AddPoint((point, key, extra_tags)) => {
419                self.data.metric_buckets.add_point(key, point, extra_tags)
420            }
421            Lifecycle(FlushMetricAggr) => {
422                self.data.metric_buckets.flush_aggregates();
423
424                #[allow(clippy::unwrap_used)]
425                self.deadlines
426                    .schedule_event(LifecycleAction::FlushMetricAggr)
427                    .unwrap();
428            }
429            Lifecycle(FlushData) => {
430                if !(self.data.started || self.config.restartable) {
431                    return CONTINUE;
432                }
433
434                #[allow(clippy::unwrap_used)]
435                self.deadlines
436                    .schedule_event(LifecycleAction::FlushData)
437                    .unwrap();
438
439                let mut batch = self.build_app_events_batch();
440                let payload = if batch.is_empty() {
441                    data::Payload::AppHeartbeat(())
442                } else {
443                    batch.push(data::Payload::AppHeartbeat(()));
444                    data::Payload::MessageBatch(batch)
445                };
446                match self.send_payload(&payload).await {
447                    Ok(()) => self.payload_sent_success(&payload),
448                    Err(err) => self.log_err(&err),
449                }
450
451                let batch = self.build_observability_batch();
452                if !batch.is_empty() {
453                    let payload = data::Payload::MessageBatch(batch);
454                    match self.send_payload(&payload).await {
455                        Ok(()) => self.payload_sent_success(&payload),
456                        Err(err) => self.log_err(&err),
457                    }
458                }
459            }
460            Lifecycle(ExtendedHeartbeat) => {
461                self.data.dependencies.unflush_stored();
462                self.data.integrations.unflush_stored();
463                self.data.configurations.unflush_stored();
464
465                let app_started = data::Payload::AppStarted(self.build_app_started());
466                match self.send_payload(&app_started).await {
467                    Ok(()) => self.payload_sent_success(&app_started),
468                    Err(err) => self.log_err(&err),
469                }
470                #[allow(clippy::unwrap_used)]
471                self.deadlines
472                    .schedule_events(
473                        &mut [
474                            LifecycleAction::FlushData,
475                            LifecycleAction::ExtendedHeartbeat,
476                        ]
477                        .into_iter(),
478                    )
479                    .unwrap();
480            }
481            Lifecycle(Stop) => {
482                if !self.data.started {
483                    return BREAK;
484                }
485                self.data.metric_buckets.flush_aggregates();
486
487                let mut app_events = self.build_app_events_batch();
488                app_events.push(data::Payload::AppClosing(()));
489
490                let observability_events = self.build_observability_batch();
491
492                let mut payloads = vec![data::Payload::MessageBatch(app_events)];
493                if !observability_events.is_empty() {
494                    payloads.push(data::Payload::MessageBatch(observability_events));
495                }
496
497                let self_arc = Arc::new(tokio::sync::RwLock::new(&mut *self));
498                let futures = payloads.into_iter().map(|payload| {
499                    let self_arc = self_arc.clone();
500                    async move {
501                        // This is different from the non-functional:
502                        // match self_arc.read().await.send_payload(&payload).await { ... }
503                        // presumably because the temp read guard would live till end of match
504                        let res = {
505                            let self_rguard = self_arc.read().await;
506                            self_rguard.send_payload(&payload).await
507                        };
508                        match res {
509                            Ok(()) => self_arc.write().await.payload_sent_success(&payload),
510                            Err(err) => self_arc.read().await.log_err(&err),
511                        }
512                    }
513                });
514                future::join_all(futures).await;
515
516                self.data.started = false;
517                if !self.config.restartable {
518                    self.deadlines.clear_pending();
519                }
520
521                return BREAK;
522            }
523            CollectStats(stats_sender) => {
524                stats_sender.send(self.stats()).ok();
525            }
526        }
527
528        CONTINUE
529    }
530
531    // Builds telemetry payloads containing lifecycle events
532    fn build_app_events_batch(&mut self) -> Vec<Payload> {
533        let mut payloads = Vec::new();
534
535        if self.data.dependencies.flush_not_empty() {
536            payloads.push(data::Payload::AppDependenciesLoaded(
537                data::AppDependenciesLoaded {
538                    dependencies: self.data.dependencies.unflushed().cloned().collect(),
539                },
540            ))
541        }
542        if self.data.integrations.flush_not_empty() {
543            payloads.push(data::Payload::AppIntegrationsChange(
544                data::AppIntegrationsChange {
545                    integrations: self.data.integrations.unflushed().cloned().collect(),
546                },
547            ))
548        }
549        if self.data.configurations.flush_not_empty() {
550            payloads.push(data::Payload::AppClientConfigurationChange(
551                data::AppClientConfigurationChange {
552                    configuration: self.data.configurations.unflushed().cloned().collect(),
553                },
554            ))
555        }
556        payloads
557    }
558
559    // Builds telemetry payloads containing logs, metrics and distributions
560    fn build_observability_batch(&mut self) -> Vec<Payload> {
561        let mut payloads = Vec::new();
562
563        let logs = self.build_logs();
564        if !logs.is_empty() {
565            payloads.push(data::Payload::Logs(logs));
566        }
567        let metrics = self.build_metrics_series();
568        if !metrics.series.is_empty() {
569            payloads.push(data::Payload::GenerateMetrics(metrics))
570        }
571        let distributions = self.build_metrics_distributions();
572        if !distributions.series.is_empty() {
573            payloads.push(data::Payload::Sketches(distributions))
574        }
575        payloads
576    }
577
578    fn build_metrics_distributions(&mut self) -> data::Distributions {
579        let mut series = Vec::new();
580        let context_guard = self.data.metric_contexts.lock();
581        for (context_key, extra_tags, points) in self.data.metric_buckets.flush_distributions() {
582            let Some(context) = context_guard.read(context_key) else {
583                telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
584                continue;
585            };
586            let mut tags = extra_tags;
587            tags.extend(context.tags.iter().cloned());
588            series.push(data::metrics::Distribution {
589                namespace: context.namespace,
590                metric: context.name.clone(),
591                tags,
592                sketch: data::metrics::SerializedSketch::B64 {
593                    sketch_b64: base64::Engine::encode(
594                        &base64::engine::general_purpose::STANDARD,
595                        points.encode_to_vec(),
596                    ),
597                },
598                common: context.common,
599                _type: context.metric_type,
600                interval: self.metrics_flush_interval.as_secs(),
601            });
602        }
603        data::Distributions { series }
604    }
605
606    fn build_metrics_series(&mut self) -> data::GenerateMetrics {
607        let mut series = Vec::new();
608        let context_guard = self.data.metric_contexts.lock();
609        for (context_key, extra_tags, points) in self.data.metric_buckets.flush_series() {
610            let Some(context) = context_guard.read(context_key) else {
611                telemetry_worker_log!(self, ERROR, "Context not found for key {:?}", context_key);
612                continue;
613            };
614
615            let mut tags = extra_tags;
616            tags.extend(context.tags.iter().cloned());
617            series.push(data::metrics::Serie {
618                namespace: context.namespace,
619                metric: context.name.clone(),
620                tags,
621                points,
622                common: context.common,
623                _type: context.metric_type,
624                interval: self.metrics_flush_interval.as_secs(),
625            });
626        }
627
628        data::GenerateMetrics { series }
629    }
630
631    fn build_app_started(&mut self) -> data::AppStarted {
632        data::AppStarted {
633            configuration: self.data.configurations.unflushed().cloned().collect(),
634        }
635    }
636
637    fn app_started_sent_success(&mut self, p: &data::AppStarted) {
638        self.data
639            .configurations
640            .removed_flushed(p.configuration.len());
641    }
642
643    fn payload_sent_success(&mut self, payload: &data::Payload) {
644        use data::Payload::*;
645        match payload {
646            AppStarted(p) => self.app_started_sent_success(p),
647            AppExtendedHeartbeat(p) => self.app_started_sent_success(p),
648            AppDependenciesLoaded(p) => {
649                self.data.dependencies.removed_flushed(p.dependencies.len())
650            }
651            AppIntegrationsChange(p) => {
652                self.data.integrations.removed_flushed(p.integrations.len())
653            }
654            AppClientConfigurationChange(p) => self
655                .data
656                .configurations
657                .removed_flushed(p.configuration.len()),
658            MessageBatch(batch) => {
659                for p in batch {
660                    self.payload_sent_success(p);
661                }
662            }
663            Logs(p) => {
664                for _ in p {
665                    self.data.logs.pop_front();
666                }
667            }
668            AppHeartbeat(()) | AppClosing(()) => {}
669            GenerateMetrics(_) | Sketches(_) => {}
670        }
671    }
672
673    fn build_logs(&self) -> Vec<Log> {
674        // TODO: change the data model to take a &[Log] so don't have to clone data here
675        let logs = self.data.logs.iter().map(|(_, l)| l.clone()).collect();
676        logs
677    }
678
679    fn next_seq_id(&self) -> u64 {
680        self.seq_id.fetch_add(1, Ordering::Release)
681    }
682
683    async fn send_payload(&self, payload: &data::Payload) -> anyhow::Result<()> {
684        debug!(
685            worker.runtime_id = %self.runtime_id,
686            payload.type = payload.request_type(),
687            seq_id = self.seq_id.load(Ordering::Acquire),
688            "Sending telemetry payload"
689        );
690        let req = self.build_request(payload)?;
691        let result = self.send_request(req).await;
692        match &result {
693            Ok(resp) => debug!(
694                worker.runtime_id = %self.runtime_id,
695                payload.type = payload.request_type(),
696                response.status = resp.status().as_u16(),
697                "Successfully sent telemetry payload"
698            ),
699            Err(e) => debug!(
700                worker.runtime_id = %self.runtime_id,
701                payload.type = payload.request_type(),
702                error = ?e,
703                "Failed to send telemetry payload"
704            ),
705        }
706        Ok(())
707    }
708
709    fn build_request(
710        &self,
711        payload: &data::Payload,
712    ) -> anyhow::Result<hyper_migration::HttpRequest> {
713        let seq_id = self.next_seq_id();
714        let tel = Telemetry {
715            api_version: data::ApiVersion::V2,
716            tracer_time: time::SystemTime::UNIX_EPOCH
717                .elapsed()
718                .map_or(0, |d| d.as_secs()),
719            runtime_id: &self.runtime_id,
720            seq_id,
721            host: &self.data.host,
722            origin: None,
723            application: &self.data.app,
724            payload,
725        };
726
727        telemetry_worker_log!(self, DEBUG, "Prepared payload: {:?}", tel);
728
729        let req = http_client::request_builder(&self.config)?
730            .method(http::Method::POST)
731            .header(header::CONTENT_TYPE, serialize::CONTENT_TYPE_VALUE)
732            .header(
733                http_client::header::REQUEST_TYPE,
734                HeaderValue::from_static(payload.request_type()),
735            )
736            .header(
737                http_client::header::API_VERSION,
738                HeaderValue::from_static(data::ApiVersion::V2.to_str()),
739            )
740            .header(
741                http_client::header::LIBRARY_LANGUAGE,
742                // Note: passing by ref here just causes the clone to happen underneath
743                tel.application.language_name.clone(),
744            )
745            .header(
746                http_client::header::LIBRARY_VERSION,
747                tel.application.tracer_version.clone(),
748            );
749
750        let body = hyper_migration::Body::from(serialize::serialize(&tel)?);
751        Ok(req.body(body)?)
752    }
753
754    async fn send_request(
755        &self,
756        req: hyper_migration::HttpRequest,
757    ) -> Result<hyper_migration::HttpResponse, hyper_migration::Error> {
758        let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() {
759            endpoint.timeout_ms
760        } else {
761            Endpoint::DEFAULT_TIMEOUT
762        };
763
764        debug!(
765            worker.runtime_id = %self.runtime_id,
766            http.timeout_ms = timeout_ms,
767            "Sending HTTP request"
768        );
769
770        tokio::select! {
771            _ = self.cancellation_token.cancelled() => {
772                debug!(
773                    worker.runtime_id = %self.runtime_id,
774                    "Telemetry request cancelled"
775                );
776                Err(hyper_migration::Error::Other(anyhow::anyhow!("Request cancelled")))
777            },
778            _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => {
779                debug!(
780                    worker.runtime_id = %self.runtime_id,
781                    http.timeout_ms = timeout_ms,
782                    "Telemetry request timed out"
783                );
784                Err(hyper_migration::Error::Other(anyhow::anyhow!("Request timed out")))
785            },
786            r = self.client.request(req) => {
787                match r {
788                    Ok(resp) => {
789                        Ok(resp)
790                    }
791                    Err(e) => {
792                        Err(e)
793                    },
794                }
795            }
796        }
797    }
798
799    fn stats(&self) -> TelemetryWorkerStats {
800        TelemetryWorkerStats {
801            dependencies_stored: self.data.dependencies.len_stored() as u32,
802            dependencies_unflushed: self.data.dependencies.len_unflushed() as u32,
803            configurations_stored: self.data.configurations.len_stored() as u32,
804            configurations_unflushed: self.data.configurations.len_unflushed() as u32,
805            integrations_stored: self.data.integrations.len_stored() as u32,
806            integrations_unflushed: self.data.integrations.len_unflushed() as u32,
807            logs: self.data.logs.len() as u32,
808            metric_contexts: self.data.metric_contexts.lock().len() as u32,
809            metric_buckets: self.data.metric_buckets.stats(),
810        }
811    }
812}
813
814#[derive(Debug)]
815struct InnerTelemetryShutdown {
816    is_shutdown: Mutex<bool>,
817    condvar: Condvar,
818}
819
820impl InnerTelemetryShutdown {
821    fn wait_for_shutdown(&self) {
822        drop(
823            #[allow(clippy::unwrap_used)]
824            self.condvar
825                .wait_while(self.is_shutdown.lock().unwrap(), |is_shutdown| {
826                    !*is_shutdown
827                })
828                .unwrap(),
829        )
830    }
831
832    #[allow(clippy::unwrap_used)]
833    fn shutdown_finished(&self) {
834        *self.is_shutdown.lock().unwrap() = true;
835        self.condvar.notify_all();
836    }
837}
838
839#[derive(Clone, Debug)]
840/// TelemetryWorkerHandle is a handle which allows interactions with the telemetry worker.
841/// The handle is safe to use across threads.
842///
843/// The worker won't send data to the agent until you call `TelemetryWorkerHandle::send_start`
844///
845/// To stop the worker, call `TelemetryWorkerHandle::send_stop` which trigger flush asynchronously
846/// then `TelemetryWorkerHandle::wait_for_shutdown`
847pub struct TelemetryWorkerHandle {
848    sender: mpsc::Sender<TelemetryActions>,
849    shutdown: Arc<InnerTelemetryShutdown>,
850    cancellation_token: CancellationToken,
851    // Used to spawn cancellation tasks
852    runtime: runtime::Handle,
853
854    contexts: MetricContexts,
855}
856
857impl TelemetryWorkerHandle {
858    pub fn register_metric_context(
859        &self,
860        name: String,
861        tags: Vec<Tag>,
862        metric_type: data::metrics::MetricType,
863        common: bool,
864        namespace: data::metrics::MetricNamespace,
865    ) -> ContextKey {
866        self.contexts
867            .register_metric_context(name, tags, metric_type, common, namespace)
868    }
869
870    pub fn try_send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
871        Ok(self.sender.try_send(msg)?)
872    }
873
874    pub async fn send_msg(&self, msg: TelemetryActions) -> anyhow::Result<()> {
875        Ok(self.sender.send(msg).await?)
876    }
877
878    pub async fn send_msgs<T>(&self, msgs: T) -> anyhow::Result<()>
879    where
880        T: IntoIterator<Item = TelemetryActions>,
881    {
882        for msg in msgs {
883            self.sender.send(msg).await?;
884        }
885
886        Ok(())
887    }
888
889    pub async fn send_msg_timeout(
890        &self,
891        msg: TelemetryActions,
892        timeout: time::Duration,
893    ) -> anyhow::Result<()> {
894        Ok(self.sender.send_timeout(msg, timeout).await?)
895    }
896
897    pub fn send_start(&self) -> anyhow::Result<()> {
898        Ok(self
899            .sender
900            .try_send(TelemetryActions::Lifecycle(LifecycleAction::Start))?)
901    }
902
903    pub fn send_stop(&self) -> anyhow::Result<()> {
904        Ok(self
905            .sender
906            .try_send(TelemetryActions::Lifecycle(LifecycleAction::Stop))?)
907    }
908
909    fn cancel_requests_with_deadline(&self, deadline: time::Instant) {
910        let token = self.cancellation_token.clone();
911        let f = async move {
912            tokio::time::sleep_until(deadline.into()).await;
913            token.cancel()
914        };
915        self.runtime.spawn(f);
916    }
917
918    pub fn wait_for_shutdown_deadline(&self, deadline: time::Instant) {
919        self.cancel_requests_with_deadline(deadline);
920        self.wait_for_shutdown()
921    }
922
923    pub fn add_dependency(&self, name: String, version: Option<String>) -> anyhow::Result<()> {
924        self.sender
925            .try_send(TelemetryActions::AddDependency(Dependency {
926                name,
927                version,
928            }))?;
929        Ok(())
930    }
931
932    pub fn add_integration(
933        &self,
934        name: String,
935        enabled: bool,
936        version: Option<String>,
937        compatible: Option<bool>,
938        auto_enabled: Option<bool>,
939    ) -> anyhow::Result<()> {
940        self.sender
941            .try_send(TelemetryActions::AddIntegration(Integration {
942                name,
943                version,
944                compatible,
945                enabled,
946                auto_enabled,
947            }))?;
948        Ok(())
949    }
950
951    pub fn add_log<T: Hash>(
952        &self,
953        identifier: T,
954        message: String,
955        level: data::LogLevel,
956        stack_trace: Option<String>,
957    ) -> anyhow::Result<()> {
958        let mut hasher = DefaultHasher::new();
959        identifier.hash(&mut hasher);
960        self.sender.try_send(TelemetryActions::AddLog((
961            LogIdentifier {
962                identifier: hasher.finish(),
963            },
964            data::Log {
965                message,
966                level,
967                stack_trace,
968                count: 1,
969                tags: String::new(),
970                is_sensitive: false,
971                is_crash: false,
972            },
973        )))?;
974        Ok(())
975    }
976
977    pub fn add_point(
978        &self,
979        value: f64,
980        context: &ContextKey,
981        extra_tags: Vec<Tag>,
982    ) -> anyhow::Result<()> {
983        self.sender
984            .try_send(TelemetryActions::AddPoint((value, *context, extra_tags)))?;
985        Ok(())
986    }
987
988    pub fn wait_for_shutdown(&self) {
989        self.shutdown.wait_for_shutdown();
990    }
991
992    pub fn stats(&self) -> anyhow::Result<oneshot::Receiver<TelemetryWorkerStats>> {
993        let (sender, receiver) = oneshot::channel();
994        self.sender
995            .try_send(TelemetryActions::CollectStats(sender))?;
996        Ok(receiver)
997    }
998}
999
1000/// How many dependencies/integrations/configs we keep in memory at most
1001pub const MAX_ITEMS: usize = 5000;
1002
1003#[derive(Debug, Default, Clone, Copy)]
1004pub enum TelemetryWorkerFlavor {
1005    /// Send all telemetry messages including lifecycle events like app-started, heartbeats,
1006    /// dependencies and configurations
1007    #[default]
1008    Full,
1009    /// Only send telemetry data not tied to the lifecycle of the app like logs and metrics
1010    MetricsLogs,
1011}
1012
1013pub struct TelemetryWorkerBuilder {
1014    pub host: Host,
1015    pub application: Application,
1016    pub runtime_id: Option<String>,
1017    pub dependencies: store::Store<data::Dependency>,
1018    pub integrations: store::Store<data::Integration>,
1019    pub configurations: store::Store<data::Configuration>,
1020    pub native_deps: bool,
1021    pub rust_shared_lib_deps: bool,
1022    pub config: Config,
1023    pub flavor: TelemetryWorkerFlavor,
1024}
1025
1026impl TelemetryWorkerBuilder {
1027    /// Creates a new telemetry worker builder and infer host information automatically
1028    pub fn new_fetch_host(
1029        service_name: String,
1030        language_name: String,
1031        language_version: String,
1032        tracer_version: String,
1033    ) -> Self {
1034        Self {
1035            host: crate::build_host(),
1036            ..Self::new(
1037                String::new(),
1038                service_name,
1039                language_name,
1040                language_version,
1041                tracer_version,
1042            )
1043        }
1044    }
1045
1046    /// Creates a new telemetry worker builder with the given hostname
1047    pub fn new(
1048        hostname: String,
1049        service_name: String,
1050        language_name: String,
1051        language_version: String,
1052        tracer_version: String,
1053    ) -> Self {
1054        Self {
1055            host: Host {
1056                hostname,
1057                ..Default::default()
1058            },
1059            application: Application {
1060                service_name,
1061                language_name,
1062                language_version,
1063                tracer_version,
1064                ..Default::default()
1065            },
1066            runtime_id: None,
1067            dependencies: store::Store::new(MAX_ITEMS),
1068            integrations: store::Store::new(MAX_ITEMS),
1069            configurations: store::Store::new(MAX_ITEMS),
1070            native_deps: true,
1071            rust_shared_lib_deps: false,
1072            config: Config::default(),
1073            flavor: TelemetryWorkerFlavor::default(),
1074        }
1075    }
1076
1077    /// Build the corresponding worker and it's handle.
1078    /// The runtime handle is wrapped in the worker handle and should be the one used to run the
1079    /// worker task.
1080    pub fn build_worker(self, tokio_runtime: Handle) -> (TelemetryWorkerHandle, TelemetryWorker) {
1081        let (tx, mailbox) = mpsc::channel(5000);
1082        let shutdown = Arc::new(InnerTelemetryShutdown {
1083            is_shutdown: Mutex::new(false),
1084            condvar: Condvar::new(),
1085        });
1086        let contexts = MetricContexts::default();
1087        let token = CancellationToken::new();
1088        let config = self.config;
1089        let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval;
1090        let client = http_client::from_config(&config);
1091
1092        let metrics_flush_interval =
1093            telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL);
1094
1095        #[allow(clippy::unwrap_used)]
1096        let worker = TelemetryWorker {
1097            flavor: self.flavor,
1098            data: TelemetryWorkerData {
1099                started: false,
1100                dependencies: self.dependencies,
1101                integrations: self.integrations,
1102                configurations: self.configurations,
1103                logs: store::QueueHashMap::default(),
1104                metric_contexts: contexts.clone(),
1105                metric_buckets: MetricBuckets::default(),
1106                host: self.host,
1107                app: self.application,
1108            },
1109            config,
1110            mailbox,
1111            seq_id: AtomicU64::new(1),
1112            runtime_id: self
1113                .runtime_id
1114                .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1115            client,
1116            metrics_flush_interval,
1117            deadlines: scheduler::Scheduler::new(vec![
1118                (metrics_flush_interval, LifecycleAction::FlushMetricAggr),
1119                (telemetry_heartbeat_interval, LifecycleAction::FlushData),
1120                (
1121                    time::Duration::from_secs(60 * 60 * 24),
1122                    LifecycleAction::ExtendedHeartbeat,
1123                ),
1124            ]),
1125            cancellation_token: token.clone(),
1126        };
1127
1128        (
1129            TelemetryWorkerHandle {
1130                sender: tx,
1131                shutdown,
1132                cancellation_token: token,
1133                runtime: tokio_runtime,
1134                contexts,
1135            },
1136            worker,
1137        )
1138    }
1139
1140    /// Spawns a telemetry worker task in the current tokio runtime
1141    /// The worker will capture a reference to the runtime and use it to run its tasks
1142    pub fn spawn(self) -> (TelemetryWorkerHandle, JoinHandle<()>) {
1143        let tokio_runtime = tokio::runtime::Handle::current();
1144
1145        let (worker_handle, mut worker) = self.build_worker(tokio_runtime.clone());
1146
1147        let join_handle = tokio_runtime.spawn(async move { worker.run().await });
1148
1149        (worker_handle, join_handle)
1150    }
1151
1152    /// Spawns a telemetry worker in a new thread and returns a handle to interact with it
1153    pub fn run(self) -> anyhow::Result<TelemetryWorkerHandle> {
1154        let runtime = tokio::runtime::Builder::new_current_thread()
1155            .enable_all()
1156            .build()?;
1157        let (handle, mut worker) = self.build_worker(runtime.handle().clone());
1158        let notify_shutdown = handle.shutdown.clone();
1159        std::thread::spawn(move || {
1160            runtime.block_on(worker.run());
1161            runtime.shutdown_background();
1162            notify_shutdown.shutdown_finished();
1163        });
1164
1165        Ok(handle)
1166    }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use crate::worker::TelemetryWorkerHandle;
1172
1173    fn is_send<T: Send>(_: T) {}
1174    fn is_sync<T: Sync>(_: T) {}
1175
1176    #[test]
1177    fn test_handle_sync_send() {
1178        #[allow(clippy::redundant_closure)]
1179        let _ = |h: TelemetryWorkerHandle| is_send(h);
1180        #[allow(clippy::redundant_closure)]
1181        let _ = |h: TelemetryWorkerHandle| is_sync(h);
1182    }
1183}