use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime};
use reqwest::header::{Headers, RetryAfter};
use reqwest::{Client, Proxy, StatusCode};
use api::protocol::Event;
use client::ClientOptions;
use Dsn;
pub trait Transport: Send + Sync + 'static {
fn send_event(&self, event: Event<'static>);
fn drain(&self, timeout: Option<Duration>) -> bool {
let _timeout = timeout;
true
}
}
pub trait InternalTransportFactoryClone {
fn clone_factory(&self) -> Box<TransportFactory>;
}
impl<T: 'static + TransportFactory + Clone> InternalTransportFactoryClone for T {
fn clone_factory(&self) -> Box<TransportFactory> {
Box::new(self.clone())
}
}
pub trait TransportFactory: Send + Sync + InternalTransportFactoryClone {
fn create_transport(&self, options: &ClientOptions) -> Box<Transport>;
}
impl<F> TransportFactory for F
where
F: Fn(&ClientOptions) -> Box<Transport> + Clone + Send + Sync + 'static,
{
fn create_transport(&self, options: &ClientOptions) -> Box<Transport> {
(*self)(options)
}
}
impl<T: Transport> Transport for Arc<T> {
fn send_event(&self, event: Event<'static>) {
(**self).send_event(event)
}
fn drain(&self, timeout: Option<Duration>) -> bool {
(**self).drain(timeout)
}
}
impl<T: Transport> TransportFactory for Arc<T> {
fn create_transport(&self, options: &ClientOptions) -> Box<Transport> {
let _options = options;
Box::new(self.clone())
}
}
#[derive(Clone)]
pub struct DefaultTransportFactory;
impl TransportFactory for DefaultTransportFactory {
fn create_transport(&self, options: &ClientOptions) -> Box<Transport> {
Box::new(HttpTransport::new(options))
}
}
#[derive(Debug)]
pub struct HttpTransport {
dsn: Dsn,
sender: Mutex<SyncSender<Option<Event<'static>>>>,
drain_signal: Arc<Condvar>,
queue_size: Arc<Mutex<usize>>,
_handle: Option<JoinHandle<()>>,
}
fn spawn_http_sender(
client: Client,
receiver: Receiver<Option<Event<'static>>>,
dsn: Dsn,
signal: Arc<Condvar>,
queue_size: Arc<Mutex<usize>>,
user_agent: String,
) -> JoinHandle<()> {
let mut disabled: Option<(Instant, RetryAfter)> = None;
thread::spawn(move || {
let url = dsn.store_api_url().to_string();
while let Some(event) = receiver.recv().unwrap_or(None) {
match disabled {
Some((disabled_at, RetryAfter::Delay(disabled_for))) => {
if disabled_at.elapsed() > disabled_for {
disabled = None;
} else {
continue;
}
}
Some((_, RetryAfter::DateTime(wait_until))) => {
if SystemTime::from(wait_until) > SystemTime::now() {
disabled = None;
} else {
continue;
}
}
None => {}
}
let auth = dsn.to_auth(Some(&user_agent));
let mut headers = Headers::new();
headers.set_raw("X-Sentry-Auth", auth.to_string());
if let Ok(resp) = client
.post(url.as_str())
.json(&event)
.headers(headers)
.send()
{
if resp.status() == StatusCode::TooManyRequests {
disabled = resp
.headers()
.get::<RetryAfter>()
.map(|x| (Instant::now(), *x));
}
}
let mut size = queue_size.lock().unwrap();
*size -= 1;
if *size == 0 {
signal.notify_all();
}
}
})
}
impl HttpTransport {
pub fn new(options: &ClientOptions) -> HttpTransport {
let dsn = options.dsn.clone().unwrap();
let user_agent = options.user_agent.to_string();
let http_proxy = options.http_proxy.as_ref().map(|x| x.to_string());
let https_proxy = options.https_proxy.as_ref().map(|x| x.to_string());
let (sender, receiver) = sync_channel(30);
let drain_signal = Arc::new(Condvar::new());
#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))]
let queue_size = Arc::new(Mutex::new(0));
let mut client = Client::builder();
if let Some(url) = http_proxy {
client.proxy(Proxy::http(&url).unwrap());
};
if let Some(url) = https_proxy {
client.proxy(Proxy::https(&url).unwrap());
};
let _handle = Some(spawn_http_sender(
client.build().unwrap(),
receiver,
dsn.clone(),
drain_signal.clone(),
queue_size.clone(),
user_agent,
));
HttpTransport {
dsn,
sender: Mutex::new(sender),
drain_signal,
queue_size,
_handle,
}
}
}
impl Transport for HttpTransport {
fn send_event(&self, event: Event<'static>) {
*self.queue_size.lock().unwrap() += 1;
if self.sender.lock().unwrap().try_send(Some(event)).is_err() {
*self.queue_size.lock().unwrap() -= 1;
}
}
fn drain(&self, timeout: Option<Duration>) -> bool {
let guard = self.queue_size.lock().unwrap();
if *guard == 0 {
return true;
}
if let Some(timeout) = timeout {
self.drain_signal.wait_timeout(guard, timeout).is_ok()
} else {
self.drain_signal.wait(guard).is_ok()
}
}
}
impl Drop for HttpTransport {
fn drop(&mut self) {
if let Ok(sender) = self.sender.lock() {
sender.send(None).ok();
}
}
}