unleash_edge_http_client/
instance_data.rs

1use reqwest::{StatusCode, Url};
2use std::sync::Arc;
3use tokio::sync::RwLock;
4
5use crate::{ClientMetaInformation, UnleashClient};
6use tracing::{debug, warn};
7use unleash_edge_cli::{CliArgs, EdgeMode};
8use unleash_edge_types::BackgroundTask;
9use unleash_edge_types::errors::EdgeError;
10use unleash_edge_types::metrics::instance_data::EdgeInstanceData;
11
12#[derive(Debug, Clone)]
13pub struct InstanceDataSender {
14    pub unleash_client: Arc<UnleashClient>,
15    pub token: String,
16    pub base_path: String,
17}
18
19#[derive(Debug, Clone)]
20pub enum InstanceDataSending {
21    SendNothing,
22    SendInstanceData(InstanceDataSender),
23}
24
25impl InstanceDataSending {
26    pub fn from_args(
27        args: CliArgs,
28        client_meta_information: &ClientMetaInformation,
29        http_client: reqwest::Client,
30    ) -> Result<Self, EdgeError> {
31        match args.mode {
32            EdgeMode::Edge(edge_args) => edge_args
33                .tokens
34                .first()
35                .map(|token| {
36                    let unleash_client = Url::parse(&edge_args.upstream_url.clone())
37                        .map(|url| {
38                            UnleashClient::from_url_with_backing_client(
39                                url,
40                                args.auth_headers
41                                    .upstream_auth_header
42                                    .clone()
43                                    .unwrap_or("Authorization".to_string()),
44                                http_client,
45                                client_meta_information.clone(),
46                            )
47                        })
48                        .map(|c| {
49                            c.with_custom_client_headers(edge_args.custom_client_headers.clone())
50                        })
51                        .map(Arc::new)
52                        .map_err(|_| EdgeError::InvalidServerUrl(edge_args.upstream_url.clone()))
53                        .expect("Could not construct UnleashClient");
54                    let instance_data_sender = InstanceDataSender {
55                        unleash_client,
56                        token: token.clone(),
57                        base_path: args.http.base_path.clone(),
58                    };
59                    InstanceDataSending::SendInstanceData(instance_data_sender)
60                })
61                .map(Ok)
62                .unwrap_or(Ok(InstanceDataSending::SendNothing)),
63            _ => Ok(InstanceDataSending::SendNothing),
64        }
65    }
66}
67
68#[derive(Debug)]
69enum InstanceDataSendError {
70    Backoff(String),
71    Unexpected(String),
72}
73
74async fn send_instance_data(
75    instance_data_sender: &Arc<InstanceDataSending>,
76    our_instance_data: &Arc<EdgeInstanceData>,
77    downstream_instance_data: &Arc<RwLock<Vec<EdgeInstanceData>>>,
78) -> Result<(), InstanceDataSendError> {
79    match instance_data_sender.as_ref() {
80        InstanceDataSending::SendNothing => {
81            debug!("No instance data sender found. Doing nothing.");
82            Ok(())
83        }
84        InstanceDataSending::SendInstanceData(instance_data_sender) => {
85            let observed_data = our_instance_data.observe(
86                downstream_instance_data.read().await.clone(),
87                &instance_data_sender.base_path,
88            );
89            let status = instance_data_sender
90                .unleash_client
91                .post_edge_observability_data(observed_data, &instance_data_sender.token)
92                .await;
93
94            if let Err(e) = status {
95                match e {
96                    EdgeError::EdgeMetricsRequestError(status, _) => {
97                        match status {
98                            StatusCode::NOT_FOUND => {
99                                downstream_instance_data.write().await.clear();
100                                our_instance_data.clear_time_windowed_metrics();
101                                Err(InstanceDataSendError::Backoff("Our upstream is not running a version that supports edge metrics.".into()))
102                            }
103                            StatusCode::FORBIDDEN => {
104                                downstream_instance_data.write().await.clear();
105                                our_instance_data.clear_time_windowed_metrics();
106                                Err(InstanceDataSendError::Backoff("Upstream edge metrics said our token wasn't allowed to post data".into()))
107                            }
108                            _ => Err(InstanceDataSendError::Unexpected(format!(
109                                "Failed to post instance data due to unknown error {e:?}"
110                            ))),
111                        }
112                    }
113                    _ => Err(InstanceDataSendError::Unexpected(format!(
114                        "Failed to post instance data due to unknown error {e:?}"
115                    ))),
116                }
117            } else {
118                downstream_instance_data.write().await.clear();
119                our_instance_data.clear_time_windowed_metrics();
120                Ok(())
121            }
122        }
123    }
124}
125
126pub fn create_once_off_send_instance_data(
127    instance_data_sender: Arc<InstanceDataSending>,
128    our_instance_data: Arc<EdgeInstanceData>,
129    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
130) -> BackgroundTask {
131    let instance_data_sender = instance_data_sender.clone();
132    let our_instance_data = our_instance_data.clone();
133    let downstream_instance_data = downstream_instance_data.clone();
134
135    Box::pin(async move {
136        let result = send_instance_data(
137            &instance_data_sender,
138            &our_instance_data,
139            &downstream_instance_data,
140        )
141        .await;
142
143        if let Err(err) = result {
144            warn!("Failed to send last set of instance data during graceful exit: {err:?}");
145        }
146    })
147}
148
149pub fn create_send_instance_data_task(
150    instance_data_sender: Arc<InstanceDataSending>,
151    our_instance_data: Arc<EdgeInstanceData>,
152    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
153) -> BackgroundTask {
154    let mut errors = 0;
155    let delay = std::time::Duration::from_secs(60);
156    Box::pin(async move {
157        loop {
158            tokio::time::sleep(
159                std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10),
160            )
161            .await;
162
163            let result = send_instance_data(
164                &instance_data_sender,
165                &our_instance_data,
166                &downstream_instance_data,
167            )
168            .await;
169            match result {
170                Ok(_) => {
171                    debug!("Successfully posted observability metrics.");
172                    errors = 0;
173                    downstream_instance_data.write().await.clear();
174                    our_instance_data.clear_time_windowed_metrics();
175                }
176                Err(err) => match err {
177                    InstanceDataSendError::Backoff(message) => {
178                        warn!(message);
179                        errors += 1;
180                    }
181                    InstanceDataSendError::Unexpected(message) => {
182                        warn!(message);
183                    }
184                },
185            }
186        }
187    })
188}