1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use super::ratelimit::{RateLimiter, RateLimitingCategory};
use crate::{sentry_debug, Envelope};
#[expect(
clippy::large_enum_variant,
reason = "In normal usage this is usually SendEnvelope, the other variants are only used when \
the user manually calls transport.flush() or when the transport is shut down."
)]
enum Task {
SendEnvelope(Envelope),
Flush(SyncSender<()>),
Shutdown,
}
/// A background-thread powered by [`tokio`] dedicated to sending [`Envelope`]s while respecting the rate limits imposed in the responses.
pub struct TransportThread {
sender: SyncSender<Task>,
shutdown: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl TransportThread {
/// Spawn a new background thread.
pub fn new<SendFn, SendFuture>(mut send: SendFn) -> Self
where
SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static,
// NOTE: returning RateLimiter here, otherwise we are in borrow hell
SendFuture: std::future::Future<Output = RateLimiter>,
{
let (sender, receiver) = sync_channel(30);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
.name("sentry-transport".into())
.spawn(move || {
// create a runtime on the transport thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut rl = RateLimiter::new();
// and block on an async fn in this runtime/thread
rt.block_on(async move {
for task in receiver.into_iter() {
if shutdown_worker.load(Ordering::SeqCst) {
return;
}
let envelope = match task {
Task::SendEnvelope(envelope) => envelope,
Task::Flush(sender) => {
sender.send(()).ok();
continue;
}
Task::Shutdown => {
return;
}
};
if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) {
sentry_debug!(
"Skipping event send because we're disabled due to rate limits for {}s",
time_left.as_secs()
);
continue;
}
match rl.filter_envelope(envelope) {
Some(envelope) => {
rl = send(envelope, rl).await;
},
None => {
sentry_debug!("Envelope was discarded due to per-item rate limits");
},
};
}
})
})
.ok();
Self {
sender,
shutdown,
handle,
}
}
/// Send an [`Envelope`].
///
/// In case the background thread cannot keep up, the [`Envelope`] is dropped.
pub fn send(&self, envelope: Envelope) {
// Using send here would mean that when the channel fills up for whatever
// reason, trying to send an envelope would block everything. We'd rather
// drop the envelope in that case.
if let Err(e) = self.sender.try_send(Task::SendEnvelope(envelope)) {
sentry_debug!("envelope dropped: {e}");
}
}
/// Flush all pending [`Envelope`]s.
///
/// Returns true if successful within given timeout.
pub fn flush(&self, timeout: Duration) -> bool {
let (sender, receiver) = sync_channel(1);
let _ = self.sender.send(Task::Flush(sender));
receiver.recv_timeout(timeout).is_ok()
}
}
impl Drop for TransportThread {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
let _ = self.sender.send(Task::Shutdown);
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
}