use crate::{
reqwest::is_http_error_recoverable, LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
LAUNCHDARKLY_PAYLOAD_ID_HEADER,
};
use chrono::DateTime;
use crossbeam_channel::Sender;
use launchdarkly_sdk_transport::HttpTransport;
use std::collections::HashMap;
#[cfg(feature = "event-compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "event-compression")]
use flate2::Compression;
#[cfg(feature = "event-compression")]
use std::io::Write;
use bytes::Bytes;
use futures::future::BoxFuture;
use tokio::time::{sleep, Duration};
use uuid::Uuid;
use super::event::OutputEvent;
pub struct EventSenderResult {
pub(super) time_from_server: u128,
pub(super) success: bool,
pub(super) must_shutdown: bool,
pub(super) flush_signal: Option<Sender<()>>,
}
pub trait EventSender: Send + Sync {
fn send_event_data(
&self,
events: Vec<OutputEvent>,
result_tx: Sender<EventSenderResult>,
flush_signal: Option<Sender<()>>,
) -> BoxFuture<'_, ()>;
}
#[derive(Clone)]
pub struct HttpEventSender<T: HttpTransport> {
url: http::Uri,
sdk_key: String,
transport: T,
default_headers: HashMap<&'static str, String>,
#[allow(dead_code)]
compress_events: bool,
}
impl<T: HttpTransport> HttpEventSender<T> {
pub fn new(
transport: T,
url: http::Uri,
sdk_key: &str,
default_headers: HashMap<&'static str, String>,
compress_events: bool,
) -> Self {
Self {
url,
sdk_key: sdk_key.to_owned(),
transport,
default_headers,
compress_events,
}
}
fn get_server_time_from_response<Body>(&self, response: &http::Response<Body>) -> u128 {
let date_value = response
.headers()
.get("date")
.unwrap_or(&crate::EMPTY_HEADER)
.to_str()
.unwrap_or("")
.to_owned();
match DateTime::parse_from_rfc2822(&date_value) {
Ok(date) => date.timestamp_millis() as u128,
Err(_) => 0,
}
}
}
impl<T: HttpTransport> EventSender for HttpEventSender<T> {
fn send_event_data(
&self,
events: Vec<OutputEvent>,
result_tx: Sender<EventSenderResult>,
flush_signal: Option<Sender<()>>,
) -> BoxFuture<'_, ()> {
Box::pin(async move {
let uuid = Uuid::new_v4();
debug!(
"Sending ({}): {}",
uuid,
serde_json::to_string_pretty(&events).unwrap_or_else(|e| e.to_string())
);
#[allow(unused_mut)]
let mut payload = match serde_json::to_vec(&events) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize event payload. Some events were dropped: {e:?}");
return;
}
};
#[allow(unused_mut)]
let mut additional_headers = self.default_headers.clone();
#[cfg(feature = "event-compression")]
if self.compress_events {
let mut e = GzEncoder::new(Vec::new(), Compression::default());
if e.write_all(payload.as_slice()).is_ok() {
if let Ok(compressed) = e.finish() {
payload = compressed;
additional_headers.insert("Content-Encoding", "gzip".into());
}
}
}
for attempt in 1..=2 {
if attempt == 2 {
sleep(Duration::from_secs(1)).await;
}
let mut request_builder = http::Request::builder()
.uri(self.url.clone())
.method("POST")
.header("Content-Type", "application/json")
.header("Authorization", self.sdk_key.clone())
.header("User-Agent", &*crate::USER_AGENT)
.header(
LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
crate::CURRENT_EVENT_SCHEMA,
)
.header(LAUNCHDARKLY_PAYLOAD_ID_HEADER, uuid.to_string());
for default_header in &additional_headers {
request_builder =
request_builder.header(*default_header.0, default_header.1.as_str());
}
let body_bytes = Bytes::from(payload.clone());
let request = request_builder.body(Some(body_bytes)).unwrap();
let result = self.transport.request(request).await;
let response = match result {
Ok(response) => response,
Err(_) if attempt == 1 => continue,
Err(e) => {
error!("Failed to send events. Some events were dropped: {e:?}");
result_tx
.send(EventSenderResult {
success: false,
time_from_server: 0,
must_shutdown: false,
flush_signal,
})
.unwrap();
return;
}
};
if response.status().is_success() {
let _ = result_tx.send(EventSenderResult {
success: true,
time_from_server: self.get_server_time_from_response(&response),
must_shutdown: false,
flush_signal,
});
return;
}
if !is_http_error_recoverable(response.status().as_u16()) {
result_tx
.send(EventSenderResult {
success: false,
time_from_server: 0,
must_shutdown: true,
flush_signal,
})
.unwrap();
return;
}
}
result_tx
.send(EventSenderResult {
success: false,
time_from_server: 0,
must_shutdown: false,
flush_signal,
})
.unwrap();
})
}
}
#[cfg(test)]
pub(crate) struct InMemoryEventSender {
event_tx: Sender<OutputEvent>,
}
#[cfg(test)]
impl InMemoryEventSender {
pub(crate) fn new(event_tx: Sender<OutputEvent>) -> Self {
Self { event_tx }
}
}
#[cfg(test)]
impl EventSender for InMemoryEventSender {
fn send_event_data(
&self,
events: Vec<OutputEvent>,
sender: Sender<EventSenderResult>,
flush_signal: Option<Sender<()>>,
) -> BoxFuture<'_, ()> {
Box::pin(async move {
for event in events {
self.event_tx.send(event).unwrap();
}
sender
.send(EventSenderResult {
time_from_server: 0,
success: true,
must_shutdown: true,
flush_signal,
})
.unwrap();
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crossbeam_channel::bounded;
use std::str::FromStr;
use test_case::test_case;
#[test_case(100, true; "100 CONTINUE is recoverable")]
#[test_case(200, true; "200 OK is recoverable")]
#[test_case(300, true; "300 MULTIPLE_CHOICES is recoverable")]
#[test_case(400, true; "400 BAD_REQUEST is recoverable")]
#[test_case(401, false; "401 UNAUTHORIZED is not recoverable")]
#[test_case(408, true; "408 REQUEST_TIMEOUT is recoverable")]
#[test_case(409, false; "409 CONFLICT is not recoverable")]
#[test_case(429, true; "429 TOO_MANY_REQUESTS is recoverable")]
#[test_case(431, false; "431 REQUEST_HEADER_FIELDS_TOO_LARGE is not recoverable")]
#[test_case(500, true; "500 INTERNAL_SERVER_ERROR is recoverable")]
fn can_determine_recoverable_errors(status: u16, is_recoverable: bool) {
assert_eq!(is_recoverable, is_http_error_recoverable(status));
}
#[tokio::test]
async fn can_parse_server_time_from_response() {
let mut server = mockito::Server::new_async().await;
server
.mock("POST", "/bulk")
.with_status(200)
.with_header("date", "Fri, 13 Feb 2009 23:31:30 GMT")
.create_async()
.await;
let (tx, rx) = bounded::<EventSenderResult>(5);
let event_sender = build_event_sender(server.url());
event_sender.send_event_data(vec![], tx, None).await;
let sender_result = rx.recv().unwrap();
assert!(sender_result.success);
assert!(!sender_result.must_shutdown);
assert_eq!(sender_result.time_from_server, 1234567890000);
}
#[tokio::test]
async fn unrecoverable_failure_requires_shutdown() {
let mut server = mockito::Server::new_async().await;
server
.mock("POST", "/bulk")
.with_status(401)
.create_async()
.await;
let (tx, rx) = bounded::<EventSenderResult>(5);
let event_sender = build_event_sender(server.url());
event_sender.send_event_data(vec![], tx, None).await;
let sender_result = rx.recv().expect("Failed to receive sender_result");
assert!(!sender_result.success);
assert!(sender_result.must_shutdown);
}
#[tokio::test]
async fn recoverable_failures_are_attempted_multiple_times() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/bulk")
.with_status(400)
.expect(2)
.create_async()
.await;
let (tx, rx) = bounded::<EventSenderResult>(5);
let event_sender = build_event_sender(server.url());
event_sender.send_event_data(vec![], tx, None).await;
let sender_result = rx.recv().expect("Failed to receive sender_result");
assert!(!sender_result.success);
assert!(!sender_result.must_shutdown);
mock.assert();
}
#[tokio::test]
async fn retrying_requests_can_eventually_succeed() {
let mut server = mockito::Server::new_async().await;
server
.mock("POST", "/bulk")
.with_status(400)
.create_async()
.await;
server
.mock("POST", "/bulk")
.with_status(200)
.with_header("date", "Fri, 13 Feb 2009 23:31:30 GMT")
.create_async()
.await;
let (tx, rx) = bounded::<EventSenderResult>(5);
let event_sender = build_event_sender(server.url());
event_sender.send_event_data(vec![], tx, None).await;
let sender_result = rx.recv().expect("Failed to receive sender_result");
assert!(sender_result.success);
assert!(!sender_result.must_shutdown);
assert_eq!(sender_result.time_from_server, 1234567890000);
}
fn build_event_sender(
url: String,
) -> HttpEventSender<launchdarkly_sdk_transport::HyperTransport> {
let url = format!("{}/bulk", &url);
let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url");
let transport = launchdarkly_sdk_transport::HyperTransport::new()
.expect("Failed to create HyperTransport");
HttpEventSender::new(
transport,
url,
"sdk-key",
HashMap::<&str, String>::new(),
false,
)
}
}