use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::client::TransportArc;
use crate::protocol::EnvelopeItem;
use crate::Envelope;
use sentry_types::protocol::v7::Log;
const MAX_ITEMS: usize = 100;
const FLUSH_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug)]
struct BatchQueue<T> {
items: Vec<T>,
}
pub(crate) trait IntoBatchEnvelopeItem: Sized {
fn into_envelope_item(items: Vec<Self>) -> EnvelopeItem;
}
impl<T> IntoBatchEnvelopeItem for T
where
Vec<T>: Into<EnvelopeItem>,
{
fn into_envelope_item(items: Vec<Self>) -> EnvelopeItem {
items.into()
}
}
pub(crate) trait Batch: IntoBatchEnvelopeItem {
const TYPE_NAME: &str;
}
impl Batch for Log {
const TYPE_NAME: &str = "logs";
}
pub(crate) struct Batcher<T: Batch> {
transport: TransportArc,
queue: Arc<Mutex<BatchQueue<T>>>,
shutdown: Arc<(Mutex<bool>, Condvar)>,
worker: Option<JoinHandle<()>>,
}
impl<T> Batcher<T>
where
T: Batch + Send + 'static,
{
pub(crate) fn new(transport: TransportArc) -> Self {
let queue = Arc::new(Mutex::new(BatchQueue { items: Vec::new() }));
#[allow(clippy::mutex_atomic)]
let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
let worker_transport = transport.clone();
let worker_queue = queue.clone();
let worker_shutdown = shutdown.clone();
let worker = std::thread::Builder::new()
.name(format!("sentry-{}-batcher", T::TYPE_NAME))
.spawn(move || {
let (lock, cvar) = worker_shutdown.as_ref();
let mut shutdown = lock.lock().unwrap();
if *shutdown {
return;
}
let mut last_flush = Instant::now();
loop {
let timeout = FLUSH_INTERVAL
.checked_sub(last_flush.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));
shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
if *shutdown {
return;
}
if last_flush.elapsed() >= FLUSH_INTERVAL {
Batcher::flush_queue_internal(
worker_queue.lock().unwrap(),
&worker_transport,
);
last_flush = Instant::now();
}
}
})
.unwrap();
Self {
transport,
queue,
shutdown,
worker: Some(worker),
}
}
}
impl<T: Batch> Batcher<T> {
pub(crate) fn enqueue(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.items.push(item);
if queue.items.len() >= MAX_ITEMS {
Batcher::flush_queue_internal(queue, &self.transport);
}
}
pub(crate) fn flush(&self) {
let queue = self.queue.lock().unwrap();
Batcher::flush_queue_internal(queue, &self.transport);
}
fn flush_queue_internal(mut queue_lock: MutexGuard<BatchQueue<T>>, transport: &TransportArc) {
let items = std::mem::take(&mut queue_lock.items);
drop(queue_lock);
if items.is_empty() {
return;
}
sentry_debug!("[Batcher({})] Flushing {} items", T::TYPE_NAME, items.len());
if let Some(ref transport) = *transport.read().unwrap() {
let mut envelope = Envelope::new();
let envelope_item = T::into_envelope_item(items);
envelope.add_item(envelope_item);
transport.send_envelope(envelope);
}
}
}
impl<T: Batch> Drop for Batcher<T> {
fn drop(&mut self) {
let (lock, cvar) = self.shutdown.as_ref();
*lock.lock().unwrap() = true;
cvar.notify_one();
if let Some(worker) = self.worker.take() {
worker.join().ok();
}
Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
}
}
#[cfg(all(test, feature = "test"))]
mod tests {
use crate::logger_info;
use crate::test;
#[test]
fn test_logs_batching() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..150 {
logger_info!("test log {}", i);
}
},
crate::ClientOptions {
enable_logs: true,
..Default::default()
},
);
assert_eq!(2, envelopes.len());
let mut total_logs = 0;
for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::Logs(logs),
) = item
{
total_logs += logs.len();
}
}
}
assert_eq!(150, total_logs);
}
#[test]
fn test_logs_batcher_flush() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..12 {
logger_info!("test log {}", i);
}
},
crate::ClientOptions {
enable_logs: true,
..Default::default()
},
);
assert_eq!(1, envelopes.len());
for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::Logs(logs),
) = item
{
assert_eq!(12, logs.len());
break;
}
}
}
}
}