apptrail_application_events_sdk/
lib.rs1pub mod event;
2
3#[macro_use]
4extern crate lazy_static;
5
6use backoff::{future::retry, ExponentialBackoff, ExponentialBackoffBuilder};
7use jsonschema::JSONSchema;
8use reqwest::{
9 multipart::{Form, Part},
10 Response, StatusCode,
11};
12use serde::Deserialize;
13use std::time::Duration;
14use std::{cell::RefCell, pin::Pin};
15use std::{collections::HashMap, io::Write};
16use uuid::Uuid;
17
18use crate::event::ApptrailEvent;
19
20type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
22
23static EVENT_SCHEMA_RAW: &'static str = include_str!("raw-event-schema.json");
24
25lazy_static! {
26 static ref EVENT_SCHEMA: JSONSchema = get_event_schema();
27}
28
29fn get_event_schema() -> JSONSchema {
30 let schema_value = serde_json::from_str::<serde_json::Value>(EVENT_SCHEMA_RAW).unwrap();
31 let schema = JSONSchema::compile(&schema_value).unwrap();
32 schema
33}
34
35fn default_exponential_backoff() -> ExponentialBackoff {
36 ExponentialBackoffBuilder::new()
37 .with_initial_interval(Duration::from_millis(200))
38 .with_max_elapsed_time(Some(Duration::from_millis(4500)))
39 .with_multiplier(1.2)
40 .with_randomization_factor(0.2)
41 .build()
42}
43
44#[derive(Clone)]
45pub struct ApptrailEventsClient {
52 pub region: String,
53
54 base_api_url: String,
55 api_key: String,
56 application_id: String,
57 req_client: reqwest::Client,
58
59 upload_url: RefCell<Option<String>>,
60 form_data: RefCell<Option<HashMap<String, String>>>,
61}
62
63fn parse_application_id(api_key_str: String) -> Result<String> {
64 let api_key_bytes = base64::decode_config(api_key_str, base64::URL_SAFE_NO_PAD)?;
65 let api_key_parsed = std::str::from_utf8(&api_key_bytes)?;
66 let parts = api_key_parsed.split(',').collect::<Vec<_>>();
67 parts
68 .get(0)
69 .map_or_else(|| Err("Invalid API Key.".into()), |s| Ok(s.to_string()))
70}
71
72impl ApptrailEventsClient {
73 pub fn new<T>(region: T, api_key: T) -> Result<Self>
86 where
87 T: ToString,
88 {
89 let application_id = match parse_application_id(api_key.to_string()) {
90 Ok(aid) => aid,
91 Err(_) => return Err("Invalid API Key.".into()),
92 };
93
94 Ok(Self {
95 region: region.to_string(),
96 api_key: api_key.to_string(),
97 base_api_url: format!(
98 "https://events.{}.apptrail.com/applications/session",
99 region.to_string()
100 ),
101 application_id: application_id.to_string(),
102 req_client: reqwest::Client::new(),
103 upload_url: RefCell::new(None),
104 form_data: RefCell::new(None),
105 })
106 }
107
108 async fn refresh_post_policy(&mut self) -> Result<()> {
109 let op = || async {
110 let res = (&self.req_client)
111 .get(&self.base_api_url)
112 .bearer_auth(&self.api_key)
113 .send()
114 .await?;
115 match res.status() {
116 s if s.is_client_error() => {
117 res.error_for_status().map_err(backoff::Error::Permanent)
118 }
119 s if s.is_success() => Ok(res),
120 _ => res
121 .error_for_status()
122 .map_err(|e| backoff::Error::Transient {
123 err: e,
124 retry_after: None,
125 }),
126 }
127 };
128 let res_result: std::result::Result<Response, reqwest::Error> =
129 retry(default_exponential_backoff(), op).await;
130
131 let data = res_result?.json::<SessionData>().await?;
132 Pin::new(&mut self.upload_url).set(RefCell::new(Some(data.uploadUrl)));
133 Pin::new(&mut self.form_data).set(RefCell::new(Some(data.form)));
134
135 Ok(())
136 }
137
138 fn is_session_empty(&self) -> bool {
139 self.upload_url.borrow().is_none() || self.form_data.borrow().is_none()
140 }
141
142 pub async fn put_event(&mut self, event: &ApptrailEvent) -> Result<()> {
144 self.put_events(&vec![event]).await
145 }
146
147 pub async fn put_events(&mut self, events: &Vec<&ApptrailEvent>) -> Result<()> {
150 if events.len() == 0 || events.len() > 1000 {
151 return Err("Can put between 0 and 1000 events in a single call.".into());
152 }
153 if self.is_session_empty() {
154 self.refresh_post_policy().await?
155 }
156 if self.is_session_empty() {
158 return Err("Invalid session".into());
159 }
160
161 let mut body_buf = Vec::<u8>::new();
162 for event in events.into_iter() {
163 let event_value = serde_json::to_value(event)?;
164 if !EVENT_SCHEMA.is_valid(&event_value) {
165 let message = EVENT_SCHEMA
166 .validate(&event_value)
167 .err()
168 .and_then(|mut err| err.next().map(|e| e.to_string()))
169 .unwrap_or_else(|| "".to_string());
170 return Err(format!("Invalid event format: {}.", message).into());
171 }
172 body_buf.write(&serde_json::to_vec(&event_value)?)?;
173 body_buf.write("\n".as_bytes())?;
174 }
175
176 let application_id: String = self.application_id.to_owned();
177 let form_data = &self.form_data.borrow().as_ref().unwrap().to_owned();
178
179 let create_form = || {
180 let form: Form = form_data
181 .to_owned()
182 .into_iter()
183 .fold(Form::new(), |form, (k, v)| form.text(k, v));
184 let filename = Uuid::new_v4().to_hyphenated().to_string() + ".jsonl";
185 let s3_key = format!("{}/{}", application_id.to_owned(), filename);
186
187 let form: Form = form.text("key", s3_key);
188
189 let body = std::str::from_utf8(&body_buf)?.to_owned();
190 let file_part = Part::text(body)
191 .file_name(filename)
192 .mime_str("application/jsonlines")
193 .unwrap();
194 let form: Form = form.part("file", file_part);
195 Result::Ok(form)
196 };
197
198 let upload_url = &self.upload_url.borrow().as_ref().unwrap().to_owned();
199 let do_upload = || async {
200 let form = create_form()
201 .map_err(|_| backoff::Error::Permanent("Failed to create form".into()));
202 let res = (&self.req_client)
203 .post(upload_url)
204 .multipart(form?)
205 .send()
206 .await
207 .map_err(|e| e.to_string())?;
208
209 match res.status() {
210 s if s.is_success() => Ok(res),
211 StatusCode::FORBIDDEN => {
212 let refresh_result = self.to_owned().refresh_post_policy().await;
213 if let Err(_) = refresh_result {
214 Err(backoff::Error::Permanent(
215 "Failed to refresh session".into(),
216 ))
217 } else {
218 Err(backoff::Error::Transient {
219 err: "Expired session".into(),
220 retry_after: None,
221 })
222 }
223 }
224 s if s.is_client_error() => res
225 .error_for_status()
226 .map_err(|e| backoff::Error::Permanent(e.to_string())),
227
228 _ => res
229 .error_for_status()
230 .map_err(|e| backoff::Error::Transient {
231 err: e.to_string(),
232 retry_after: None,
233 }),
234 }
235 };
236 let res_result: std::result::Result<Response, String> =
237 retry(default_exponential_backoff(), do_upload).await;
238 match res_result {
239 Ok(_) => {
240 println!("Apptrail: Succesfully put {} events.", events.len());
241 Ok(())
242 }
243 Err(_) => Err("Failed to put event.".into()),
244 }
245 }
246}
247
248impl std::fmt::Display for ApptrailEventsClient {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 write!(f, "ApptrailEventsClient {{ region: {} }}", &self.region)
251 }
252}
253
254#[derive(Deserialize)]
255struct SessionData {
256 uploadUrl: String,
257 form: HashMap<String, String>,
258}