unleash_edge_http_client/
instance_data.rs1use reqwest::{StatusCode, Url};
2use std::sync::Arc;
3use tokio::sync::RwLock;
4
5use crate::{ClientMetaInformation, UnleashClient};
6use tracing::{debug, warn};
7use unleash_edge_cli::{CliArgs, EdgeMode};
8use unleash_edge_types::BackgroundTask;
9use unleash_edge_types::errors::EdgeError;
10use unleash_edge_types::metrics::instance_data::EdgeInstanceData;
11
12#[derive(Debug, Clone)]
13pub struct InstanceDataSender {
14 pub unleash_client: Arc<UnleashClient>,
15 pub token: String,
16 pub base_path: String,
17}
18
19#[derive(Debug, Clone)]
20pub enum InstanceDataSending {
21 SendNothing,
22 SendInstanceData(InstanceDataSender),
23}
24
25impl InstanceDataSending {
26 pub fn from_args(
27 args: CliArgs,
28 client_meta_information: &ClientMetaInformation,
29 http_client: reqwest::Client,
30 ) -> Result<Self, EdgeError> {
31 match args.mode {
32 EdgeMode::Edge(edge_args) => edge_args
33 .tokens
34 .first()
35 .map(|token| {
36 let unleash_client = Url::parse(&edge_args.upstream_url.clone())
37 .map(|url| {
38 UnleashClient::from_url_with_backing_client(
39 url,
40 args.auth_headers
41 .upstream_auth_header
42 .clone()
43 .unwrap_or("Authorization".to_string()),
44 http_client,
45 client_meta_information.clone(),
46 )
47 })
48 .map(|c| {
49 c.with_custom_client_headers(edge_args.custom_client_headers.clone())
50 })
51 .map(Arc::new)
52 .map_err(|_| EdgeError::InvalidServerUrl(edge_args.upstream_url.clone()))
53 .expect("Could not construct UnleashClient");
54 let instance_data_sender = InstanceDataSender {
55 unleash_client,
56 token: token.clone(),
57 base_path: args.http.base_path.clone(),
58 };
59 InstanceDataSending::SendInstanceData(instance_data_sender)
60 })
61 .map(Ok)
62 .unwrap_or(Ok(InstanceDataSending::SendNothing)),
63 _ => Ok(InstanceDataSending::SendNothing),
64 }
65 }
66}
67
68#[derive(Debug)]
69enum InstanceDataSendError {
70 Backoff(String),
71 Unexpected(String),
72}
73
74async fn send_instance_data(
75 instance_data_sender: &Arc<InstanceDataSending>,
76 our_instance_data: &Arc<EdgeInstanceData>,
77 downstream_instance_data: &Arc<RwLock<Vec<EdgeInstanceData>>>,
78) -> Result<(), InstanceDataSendError> {
79 match instance_data_sender.as_ref() {
80 InstanceDataSending::SendNothing => {
81 debug!("No instance data sender found. Doing nothing.");
82 Ok(())
83 }
84 InstanceDataSending::SendInstanceData(instance_data_sender) => {
85 let observed_data = our_instance_data.observe(
86 downstream_instance_data.read().await.clone(),
87 &instance_data_sender.base_path,
88 );
89 let status = instance_data_sender
90 .unleash_client
91 .post_edge_observability_data(observed_data, &instance_data_sender.token)
92 .await;
93
94 if let Err(e) = status {
95 match e {
96 EdgeError::EdgeMetricsRequestError(status, _) => {
97 match status {
98 StatusCode::NOT_FOUND => {
99 downstream_instance_data.write().await.clear();
100 our_instance_data.clear_time_windowed_metrics();
101 Err(InstanceDataSendError::Backoff("Our upstream is not running a version that supports edge metrics.".into()))
102 }
103 StatusCode::FORBIDDEN => {
104 downstream_instance_data.write().await.clear();
105 our_instance_data.clear_time_windowed_metrics();
106 Err(InstanceDataSendError::Backoff("Upstream edge metrics said our token wasn't allowed to post data".into()))
107 }
108 _ => Err(InstanceDataSendError::Unexpected(format!(
109 "Failed to post instance data due to unknown error {e:?}"
110 ))),
111 }
112 }
113 _ => Err(InstanceDataSendError::Unexpected(format!(
114 "Failed to post instance data due to unknown error {e:?}"
115 ))),
116 }
117 } else {
118 downstream_instance_data.write().await.clear();
119 our_instance_data.clear_time_windowed_metrics();
120 Ok(())
121 }
122 }
123 }
124}
125
126pub fn create_once_off_send_instance_data(
127 instance_data_sender: Arc<InstanceDataSending>,
128 our_instance_data: Arc<EdgeInstanceData>,
129 downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
130) -> BackgroundTask {
131 let instance_data_sender = instance_data_sender.clone();
132 let our_instance_data = our_instance_data.clone();
133 let downstream_instance_data = downstream_instance_data.clone();
134
135 Box::pin(async move {
136 let result = send_instance_data(
137 &instance_data_sender,
138 &our_instance_data,
139 &downstream_instance_data,
140 )
141 .await;
142
143 if let Err(err) = result {
144 warn!("Failed to send last set of instance data during graceful exit: {err:?}");
145 }
146 })
147}
148
149pub fn create_send_instance_data_task(
150 instance_data_sender: Arc<InstanceDataSending>,
151 our_instance_data: Arc<EdgeInstanceData>,
152 downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
153) -> BackgroundTask {
154 let mut errors = 0;
155 let delay = std::time::Duration::from_secs(60);
156 Box::pin(async move {
157 loop {
158 tokio::time::sleep(
159 std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10),
160 )
161 .await;
162
163 let result = send_instance_data(
164 &instance_data_sender,
165 &our_instance_data,
166 &downstream_instance_data,
167 )
168 .await;
169 match result {
170 Ok(_) => {
171 debug!("Successfully posted observability metrics.");
172 errors = 0;
173 downstream_instance_data.write().await.clear();
174 our_instance_data.clear_time_windowed_metrics();
175 }
176 Err(err) => match err {
177 InstanceDataSendError::Backoff(message) => {
178 warn!(message);
179 errors += 1;
180 }
181 InstanceDataSendError::Unexpected(message) => {
182 warn!(message);
183 }
184 },
185 }
186 }
187 })
188}