apptrail_application_events_sdk/
lib.rs

1pub mod event;
2
3#[macro_use]
4extern crate lazy_static;
5
6use backoff::{future::retry, ExponentialBackoff, ExponentialBackoffBuilder};
7use jsonschema::JSONSchema;
8use reqwest::{
9    multipart::{Form, Part},
10    Response, StatusCode,
11};
12use serde::Deserialize;
13use std::time::Duration;
14use std::{cell::RefCell, pin::Pin};
15use std::{collections::HashMap, io::Write};
16use uuid::Uuid;
17
18use crate::event::ApptrailEvent;
19
20// Change the alias to `Box<error::Error>`.
21type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
22
23static EVENT_SCHEMA_RAW: &'static str = include_str!("raw-event-schema.json");
24
25lazy_static! {
26    static ref EVENT_SCHEMA: JSONSchema = get_event_schema();
27}
28
29fn get_event_schema() -> JSONSchema {
30    let schema_value = serde_json::from_str::<serde_json::Value>(EVENT_SCHEMA_RAW).unwrap();
31    let schema = JSONSchema::compile(&schema_value).unwrap();
32    schema
33}
34
35fn default_exponential_backoff() -> ExponentialBackoff {
36    ExponentialBackoffBuilder::new()
37        .with_initial_interval(Duration::from_millis(200))
38        .with_max_elapsed_time(Some(Duration::from_millis(4500)))
39        .with_multiplier(1.2)
40        .with_randomization_factor(0.2)
41        .build()
42}
43
44#[derive(Clone)]
45/// The Apptrail Application Events Client lets you send audit logs from your Rust applications
46/// to your customers.
47///
48/// ## Learn more
49/// - [Working with events](https://apptrail.com/docs/applications/guide/working-with-events)
50/// - [SDK Reference](http://apptrail.com/docs/applications/guide/working-with-events/using-the-events-sdk/application-events-sdk-rust)
51pub struct ApptrailEventsClient {
52    pub region: String,
53
54    base_api_url: String,
55    api_key: String,
56    application_id: String,
57    req_client: reqwest::Client,
58
59    upload_url: RefCell<Option<String>>,
60    form_data: RefCell<Option<HashMap<String, String>>>,
61}
62
63fn parse_application_id(api_key_str: String) -> Result<String> {
64    let api_key_bytes = base64::decode_config(api_key_str, base64::URL_SAFE_NO_PAD)?;
65    let api_key_parsed = std::str::from_utf8(&api_key_bytes)?;
66    let parts = api_key_parsed.split(',').collect::<Vec<_>>();
67    parts
68        .get(0)
69        .map_or_else(|| Err("Invalid API Key.".into()), |s| Ok(s.to_string()))
70}
71
72impl ApptrailEventsClient {
73    /// Create a new events client. Instantiate this once at the top of your application and reuse it.
74    ///
75    /// ## Arguments
76    /// #### region
77    ///
78    /// The Apptrail region to send events to. Create a single instance per region. Regions are specified as strings, e.g. `us-west-2`.
79    /// [Learn more about Apptrail regions](https://apptrail.com/docs/applications/guide/regions).
80    ///
81    /// #### api_key
82    ///
83    /// Your Apptrail secret API Key. You can generate and retrieve API Keys from the Apptrail Dashboard.
84    /// [Learn more about managing API Keys](https://apptrail.com/docs/applications/guide/dashboard/managing-api-keys).
85    pub fn new<T>(region: T, api_key: T) -> Result<Self>
86    where
87        T: ToString,
88    {
89        let application_id = match parse_application_id(api_key.to_string()) {
90            Ok(aid) => aid,
91            Err(_) => return Err("Invalid API Key.".into()),
92        };
93
94        Ok(Self {
95            region: region.to_string(),
96            api_key: api_key.to_string(),
97            base_api_url: format!(
98                "https://events.{}.apptrail.com/applications/session",
99                region.to_string()
100            ),
101            application_id: application_id.to_string(),
102            req_client: reqwest::Client::new(),
103            upload_url: RefCell::new(None),
104            form_data: RefCell::new(None),
105        })
106    }
107
108    async fn refresh_post_policy(&mut self) -> Result<()> {
109        let op = || async {
110            let res = (&self.req_client)
111                .get(&self.base_api_url)
112                .bearer_auth(&self.api_key)
113                .send()
114                .await?;
115            match res.status() {
116                s if s.is_client_error() => {
117                    res.error_for_status().map_err(backoff::Error::Permanent)
118                }
119                s if s.is_success() => Ok(res),
120                _ => res
121                    .error_for_status()
122                    .map_err(|e| backoff::Error::Transient {
123                        err: e,
124                        retry_after: None,
125                    }),
126            }
127        };
128        let res_result: std::result::Result<Response, reqwest::Error> =
129            retry(default_exponential_backoff(), op).await;
130
131        let data = res_result?.json::<SessionData>().await?;
132        Pin::new(&mut self.upload_url).set(RefCell::new(Some(data.uploadUrl)));
133        Pin::new(&mut self.form_data).set(RefCell::new(Some(data.form)));
134
135        Ok(())
136    }
137
138    fn is_session_empty(&self) -> bool {
139        self.upload_url.borrow().is_none() || self.form_data.borrow().is_none()
140    }
141
142    /// Send a single audit log to Apptrail.
143    pub async fn put_event(&mut self, event: &ApptrailEvent) -> Result<()> {
144        self.put_events(&vec![event]).await
145    }
146
147    /// Log multiple audit events to Apptrail. You can pass up to 1000 events to this method. To log
148    /// more events, make multiple calls to this method.
149    pub async fn put_events(&mut self, events: &Vec<&ApptrailEvent>) -> Result<()> {
150        if events.len() == 0 || events.len() > 1000 {
151            return Err("Can put between 0 and 1000 events in a single call.".into());
152        }
153        if self.is_session_empty() {
154            self.refresh_post_policy().await?
155        }
156        // todo: better way lol
157        if self.is_session_empty() {
158            return Err("Invalid session".into());
159        }
160
161        let mut body_buf = Vec::<u8>::new();
162        for event in events.into_iter() {
163            let event_value = serde_json::to_value(event)?;
164            if !EVENT_SCHEMA.is_valid(&event_value) {
165                let message = EVENT_SCHEMA
166                    .validate(&event_value)
167                    .err()
168                    .and_then(|mut err| err.next().map(|e| e.to_string()))
169                    .unwrap_or_else(|| "".to_string());
170                return Err(format!("Invalid event format: {}.", message).into());
171            }
172            body_buf.write(&serde_json::to_vec(&event_value)?)?;
173            body_buf.write("\n".as_bytes())?;
174        }
175
176        let application_id: String = self.application_id.to_owned();
177        let form_data = &self.form_data.borrow().as_ref().unwrap().to_owned();
178
179        let create_form = || {
180            let form: Form = form_data
181                .to_owned()
182                .into_iter()
183                .fold(Form::new(), |form, (k, v)| form.text(k, v));
184            let filename = Uuid::new_v4().to_hyphenated().to_string() + ".jsonl";
185            let s3_key = format!("{}/{}", application_id.to_owned(), filename);
186
187            let form: Form = form.text("key", s3_key);
188
189            let body = std::str::from_utf8(&body_buf)?.to_owned();
190            let file_part = Part::text(body)
191                .file_name(filename)
192                .mime_str("application/jsonlines")
193                .unwrap();
194            let form: Form = form.part("file", file_part);
195            Result::Ok(form)
196        };
197
198        let upload_url = &self.upload_url.borrow().as_ref().unwrap().to_owned();
199        let do_upload = || async {
200            let form = create_form()
201                .map_err(|_| backoff::Error::Permanent("Failed to create form".into()));
202            let res = (&self.req_client)
203                .post(upload_url)
204                .multipart(form?)
205                .send()
206                .await
207                .map_err(|e| e.to_string())?;
208
209            match res.status() {
210                s if s.is_success() => Ok(res),
211                StatusCode::FORBIDDEN => {
212                    let refresh_result = self.to_owned().refresh_post_policy().await;
213                    if let Err(_) = refresh_result {
214                        Err(backoff::Error::Permanent(
215                            "Failed to refresh session".into(),
216                        ))
217                    } else {
218                        Err(backoff::Error::Transient {
219                            err: "Expired session".into(),
220                            retry_after: None,
221                        })
222                    }
223                }
224                s if s.is_client_error() => res
225                    .error_for_status()
226                    .map_err(|e| backoff::Error::Permanent(e.to_string())),
227
228                _ => res
229                    .error_for_status()
230                    .map_err(|e| backoff::Error::Transient {
231                        err: e.to_string(),
232                        retry_after: None,
233                    }),
234            }
235        };
236        let res_result: std::result::Result<Response, String> =
237            retry(default_exponential_backoff(), do_upload).await;
238        match res_result {
239            Ok(_) => {
240                println!("Apptrail: Succesfully put {} events.", events.len());
241                Ok(())
242            }
243            Err(_) => Err("Failed to put event.".into()),
244        }
245    }
246}
247
248impl std::fmt::Display for ApptrailEventsClient {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        write!(f, "ApptrailEventsClient {{ region: {} }}", &self.region)
251    }
252}
253
254#[derive(Deserialize)]
255struct SessionData {
256    uploadUrl: String,
257    form: HashMap<String, String>,
258}