1#[cfg(feature = "release-health")]
6pub use session_impl::*;
7
8#[cfg(feature = "release-health")]
9mod session_impl {
10
11 use std::collections::HashMap;
12 use std::sync::{Arc, Condvar, Mutex, MutexGuard};
13 use std::thread::JoinHandle;
14 use std::time::{Duration, Instant, SystemTime};
15
16 use crate::client::TransportArc;
17 use crate::clientoptions::SessionMode;
18 use crate::protocol::{
19 EnvelopeItem, Event, Level, SessionAggregateItem, SessionAggregates, SessionAttributes,
20 SessionStatus, SessionUpdate,
21 };
22
23 use crate::scope::StackLayer;
24
25 use crate::types::random_uuid;
26 use crate::{Client, Envelope};
27
28 #[derive(Clone, Debug)]
29 pub struct Session {
30 client: Arc<Client>,
31 session_update: SessionUpdate<'static>,
32 started: Instant,
33 dirty: bool,
34 }
35
36 impl Drop for Session {
37 fn drop(&mut self) {
38 self.close(SessionStatus::Exited);
39 if self.dirty {
40 self.client.enqueue_session(self.session_update.clone());
41 }
42 }
43 }
44
45 impl Session {
46 pub fn from_stack(stack: &StackLayer) -> Option<Self> {
47 let client = stack.client.as_ref()?;
48 let options = client.options();
49 let user = stack.scope.user.as_deref();
50 let distinct_id = user
51 .and_then(|user| {
52 user.id
53 .as_ref()
54 .or(user.email.as_ref())
55 .or(user.username.as_ref())
56 })
57 .cloned();
58 Some(Self {
59 client: client.clone(),
60 session_update: SessionUpdate {
61 session_id: random_uuid(),
62 distinct_id,
63 sequence: None,
64 timestamp: None,
65 started: SystemTime::now(),
66 init: true,
67 duration: None,
68 status: SessionStatus::Ok,
69 errors: 0,
70 attributes: SessionAttributes {
71 release: options.release.clone()?,
72 environment: options.environment.clone(),
73 ip_address: None,
74 user_agent: None,
75 },
76 },
77 started: Instant::now(),
78 dirty: true,
79 })
80 }
81
82 pub(crate) fn update_from_event(&mut self, event: &Event<'static>) {
83 if self.session_update.status != SessionStatus::Ok {
84 return;
87 }
88 let mut has_error = event.level >= Level::Error;
89 let mut is_crash = false;
90 for exc in &event.exception.values {
91 has_error = true;
92 if let Some(mechanism) = &exc.mechanism {
93 if let Some(false) = mechanism.handled {
94 is_crash = true;
95 break;
96 }
97 }
98 }
99
100 if is_crash {
101 self.session_update.status = SessionStatus::Crashed;
102 }
103 if has_error {
104 self.session_update.errors += 1;
105 self.dirty = true;
106 }
107 }
108
109 pub(crate) fn close(&mut self, status: SessionStatus) {
110 if self.session_update.status == SessionStatus::Ok {
111 let status = match status {
112 SessionStatus::Ok => SessionStatus::Exited,
113 s => s,
114 };
115 self.session_update.duration = Some(self.started.elapsed().as_secs_f64());
116 self.session_update.status = status;
117 self.dirty = true;
118 }
119 }
120
121 pub(crate) fn create_envelope_item(&mut self) -> Option<EnvelopeItem> {
122 if self.dirty {
123 let item = self.session_update.clone().into();
124 self.session_update.init = false;
125 self.dirty = false;
126 return Some(item);
127 }
128 None
129 }
130 }
131
132 const MAX_SESSION_ITEMS: usize = 100;
134 const FLUSH_INTERVAL: Duration = Duration::from_secs(60);
135
136 #[derive(Debug, Default)]
137 struct SessionQueue {
138 individual: Vec<SessionUpdate<'static>>,
139 aggregated: Option<AggregatedSessions>,
140 }
141
142 #[derive(Debug)]
143 struct AggregatedSessions {
144 buckets: HashMap<AggregationKey, AggregationCounts>,
145 attributes: SessionAttributes<'static>,
146 }
147
148 impl From<AggregatedSessions> for EnvelopeItem {
149 fn from(sessions: AggregatedSessions) -> Self {
150 let aggregates = sessions
151 .buckets
152 .into_iter()
153 .map(|(key, counts)| SessionAggregateItem {
154 started: key.started,
155 distinct_id: key.distinct_id,
156 exited: counts.exited,
157 errored: counts.errored,
158 abnormal: counts.abnormal,
159 crashed: counts.crashed,
160 })
161 .collect();
162
163 SessionAggregates {
164 aggregates,
165 attributes: sessions.attributes,
166 }
167 .into()
168 }
169 }
170
171 #[derive(Debug, PartialEq, Eq, Hash)]
172 struct AggregationKey {
173 started: SystemTime,
174 distinct_id: Option<String>,
175 }
176
177 #[derive(Debug, Default)]
178 struct AggregationCounts {
179 exited: u32,
180 errored: u32,
181 abnormal: u32,
182 crashed: u32,
183 }
184
185 pub(crate) struct SessionFlusher {
191 transport: TransportArc,
192 mode: SessionMode,
193 queue: Arc<Mutex<SessionQueue>>,
194 shutdown: Arc<(Mutex<bool>, Condvar)>,
195 worker: Option<JoinHandle<()>>,
196 }
197
198 impl SessionFlusher {
199 pub fn new(transport: TransportArc, mode: SessionMode) -> Self {
201 let queue = Arc::new(Mutex::new(Default::default()));
202 #[allow(clippy::mutex_atomic)]
203 let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
204
205 let worker_transport = transport.clone();
206 let worker_queue = queue.clone();
207 let worker_shutdown = shutdown.clone();
208 let worker = std::thread::Builder::new()
209 .name("sentry-session-flusher".into())
210 .spawn(move || {
211 let (lock, cvar) = worker_shutdown.as_ref();
212 let mut shutdown = lock.lock().unwrap();
213 if *shutdown {
215 return;
216 }
217 let mut last_flush = Instant::now();
218 loop {
219 let timeout = FLUSH_INTERVAL
220 .checked_sub(last_flush.elapsed())
221 .unwrap_or_else(|| Duration::from_secs(0));
222 shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
223 if *shutdown {
224 return;
225 }
226 if last_flush.elapsed() < FLUSH_INTERVAL {
227 continue;
228 }
229 SessionFlusher::flush_queue_internal(
230 worker_queue.lock().unwrap(),
231 &worker_transport,
232 );
233 last_flush = Instant::now();
234 }
235 })
236 .unwrap();
237
238 Self {
239 transport,
240 mode,
241 queue,
242 shutdown,
243 worker: Some(worker),
244 }
245 }
246
247 pub fn enqueue(&self, session_update: SessionUpdate<'static>) {
252 let mut queue = self.queue.lock().unwrap();
253 if self.mode == SessionMode::Application || !session_update.init {
254 queue.individual.push(session_update);
255 if queue.individual.len() >= MAX_SESSION_ITEMS {
256 SessionFlusher::flush_queue_internal(queue, &self.transport);
257 }
258 return;
259 }
260
261 let aggregate = queue.aggregated.get_or_insert_with(|| AggregatedSessions {
262 buckets: HashMap::with_capacity(1),
263 attributes: session_update.attributes.clone(),
264 });
265
266 let duration = session_update
267 .started
268 .duration_since(SystemTime::UNIX_EPOCH)
269 .unwrap();
270 let duration = (duration.as_secs() / 60) * 60;
271 let started = SystemTime::UNIX_EPOCH
272 .checked_add(Duration::from_secs(duration))
273 .unwrap();
274
275 let key = AggregationKey {
276 started,
277 distinct_id: session_update.distinct_id,
278 };
279
280 let bucket = aggregate.buckets.entry(key).or_default();
281
282 match session_update.status {
283 SessionStatus::Exited => {
284 if session_update.errors > 0 {
285 bucket.errored += 1;
286 } else {
287 bucket.exited += 1;
288 }
289 }
290 SessionStatus::Crashed => {
291 bucket.crashed += 1;
292 }
293 SessionStatus::Abnormal => {
294 bucket.abnormal += 1;
295 }
296 SessionStatus::Ok => {
297 sentry_debug!("unreachable: only closed sessions will be enqueued");
298 }
299 }
300 }
301
302 pub fn flush(&self) {
304 let queue = self.queue.lock().unwrap();
305 SessionFlusher::flush_queue_internal(queue, &self.transport);
306 }
307
308 fn flush_queue_internal(
313 mut queue_lock: MutexGuard<SessionQueue>,
314 transport: &TransportArc,
315 ) {
316 let queue = std::mem::take(&mut queue_lock.individual);
317 let aggregate = queue_lock.aggregated.take();
318 drop(queue_lock);
319
320 if let Some(aggregate) = aggregate {
322 if let Some(ref transport) = *transport.read().unwrap() {
323 let mut envelope = Envelope::new();
324 envelope.add_item(aggregate);
325 transport.send_envelope(envelope);
326 }
327 }
328
329 if queue.is_empty() {
331 return;
332 }
333
334 let mut envelope = Envelope::new();
335 let mut items = 0;
336
337 for session_update in queue {
338 if items >= MAX_SESSION_ITEMS {
339 if let Some(ref transport) = *transport.read().unwrap() {
340 transport.send_envelope(envelope);
341 }
342 envelope = Envelope::new();
343 items = 0;
344 }
345
346 envelope.add_item(session_update);
347 items += 1;
348 }
349
350 if let Some(ref transport) = *transport.read().unwrap() {
351 transport.send_envelope(envelope);
352 }
353 }
354 }
355
356 impl Drop for SessionFlusher {
357 fn drop(&mut self) {
358 let (lock, cvar) = self.shutdown.as_ref();
359 *lock.lock().unwrap() = true;
360 cvar.notify_one();
361
362 if let Some(worker) = self.worker.take() {
363 worker.join().ok();
364 }
365 SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
366 }
367 }
368
369 #[cfg(all(test, feature = "test"))]
370 mod tests {
371 use std::cmp::Ordering;
372
373 use super::*;
374 use crate as sentry;
375 use crate::protocol::{Envelope, EnvelopeItem, SessionStatus};
376
377 fn capture_envelopes<F>(f: F) -> Vec<Envelope>
378 where
379 F: FnOnce(),
380 {
381 crate::test::with_captured_envelopes_options(
382 f,
383 crate::ClientOptions {
384 release: Some("some-release".into()),
385 ..Default::default()
386 },
387 )
388 }
389
390 #[test]
391 fn test_session_startstop() {
392 let envelopes = capture_envelopes(|| {
393 sentry::start_session();
394 std::thread::sleep(std::time::Duration::from_millis(10));
395 });
396 assert_eq!(envelopes.len(), 1);
397
398 let mut items = envelopes[0].items();
399 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
400 assert_eq!(session.status, SessionStatus::Exited);
401 assert!(session.duration.unwrap() > 0.01);
402 assert_eq!(session.errors, 0);
403 assert_eq!(session.attributes.release, "some-release");
404 assert!(session.init);
405 } else {
406 panic!("expected session");
407 }
408 assert_eq!(items.next(), None);
409 }
410
411 #[test]
412 fn test_session_batching() {
413 let envelopes = capture_envelopes(|| {
414 for _ in 0..(MAX_SESSION_ITEMS * 2) {
415 sentry::start_session();
416 }
417 });
418 assert_eq!(envelopes.len(), 2);
420
421 let items = envelopes[0].items().chain(envelopes[1].items());
422 assert_eq!(items.clone().count(), MAX_SESSION_ITEMS * 2);
423 for item in items {
424 assert!(matches!(item, EnvelopeItem::SessionUpdate(_)));
425 }
426 }
427
428 #[test]
429 fn test_session_aggregation() {
430 let envelopes = crate::test::with_captured_envelopes_options(
431 || {
432 sentry::start_session();
433 let err = "NaN".parse::<usize>().unwrap_err();
434 sentry::capture_error(&err);
435
436 for _ in 0..50 {
437 sentry::start_session();
438 }
439 sentry::end_session();
440
441 sentry::configure_scope(|scope| {
442 scope.set_user(Some(sentry::User {
443 id: Some("foo-bar".into()),
444 ..Default::default()
445 }));
446 scope.add_event_processor(Box::new(|_| None));
447 });
448
449 for _ in 0..50 {
450 sentry::start_session();
451 }
452
453 let err = "NaN".parse::<usize>().unwrap_err();
457 sentry::capture_error(&err);
458 },
459 crate::ClientOptions {
460 release: Some("some-release".into()),
461 session_mode: SessionMode::Request,
462 ..Default::default()
463 },
464 );
465 assert_eq!(envelopes.len(), 2);
466
467 let mut items = envelopes[0].items();
468 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
469 assert_eq!(items.next(), None);
470
471 let mut items = envelopes[1].items();
472 if let Some(EnvelopeItem::SessionAggregates(aggregate)) = items.next() {
473 let mut aggregates = aggregate.aggregates.clone();
474 assert_eq!(aggregates.len(), 2);
475 aggregates.sort_by(|a, b| {
477 a.distinct_id
478 .partial_cmp(&b.distinct_id)
479 .unwrap_or(Ordering::Less)
480 });
481
482 assert_eq!(aggregates[0].distinct_id, None);
483 assert_eq!(aggregates[0].exited, 50);
484
485 assert_eq!(aggregates[1].errored, 0);
486 assert_eq!(aggregates[1].distinct_id, Some("foo-bar".into()));
487 assert_eq!(aggregates[1].exited, 50);
488 } else {
489 panic!("expected session");
490 }
491 assert_eq!(items.next(), None);
492 }
493
494 #[test]
495 fn test_session_error() {
496 let envelopes = capture_envelopes(|| {
497 sentry::start_session();
498
499 let err = "NaN".parse::<usize>().unwrap_err();
500 sentry::capture_error(&err);
501 });
502 assert_eq!(envelopes.len(), 2);
503
504 let mut items = envelopes[0].items();
505 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
506 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
507 assert_eq!(session.status, SessionStatus::Ok);
508 assert_eq!(session.errors, 1);
509 assert_eq!(session.attributes.release, "some-release");
510 assert!(session.init);
511 } else {
512 panic!("expected session");
513 }
514 assert_eq!(items.next(), None);
515
516 let mut items = envelopes[1].items();
517 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
518 assert_eq!(session.status, SessionStatus::Exited);
519 assert_eq!(session.errors, 1);
520 assert!(!session.init);
521 } else {
522 panic!("expected session");
523 }
524 assert_eq!(items.next(), None);
525 }
526
527 #[test]
528 fn test_session_abnormal() {
529 let envelopes = capture_envelopes(|| {
530 sentry::start_session();
531 sentry::end_session_with_status(SessionStatus::Abnormal);
532 });
533 assert_eq!(envelopes.len(), 1);
534
535 let mut items = envelopes[0].items();
536 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
537 assert_eq!(session.status, SessionStatus::Abnormal);
538 assert!(session.init);
539 } else {
540 panic!("expected session");
541 }
542 assert_eq!(items.next(), None);
543 }
544
545 #[test]
546 fn test_session_sampled_errors() {
547 let mut envelopes = crate::test::with_captured_envelopes_options(
548 || {
549 sentry::start_session();
550
551 for _ in 0..100 {
552 let err = "NaN".parse::<usize>().unwrap_err();
553 sentry::capture_error(&err);
554 }
555 },
556 crate::ClientOptions {
557 release: Some("some-release".into()),
558 sample_rate: 0.5,
559 ..Default::default()
560 },
561 );
562 assert!(envelopes.len() > 25);
563 assert!(envelopes.len() < 75);
564
565 let envelope = envelopes.pop().unwrap();
566 let mut items = envelope.items();
567 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
568 assert_eq!(session.status, SessionStatus::Exited);
569 assert_eq!(session.errors, 100);
570 } else {
571 panic!("expected session");
572 }
573 assert_eq!(items.next(), None);
574 }
575
576 #[test]
580 fn test_inherit_session_from_top() {
581 let envelopes = capture_envelopes(|| {
582 sentry::start_session();
583
584 let err = "NaN".parse::<usize>().unwrap_err();
585 sentry::capture_error(&err);
586
587 let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
589
590 sentry::Hub::run(hub, || {
591 let err = "NaN".parse::<usize>().unwrap_err();
592 sentry::capture_error(&err);
593
594 sentry::with_scope(
595 |_| {},
596 || {
597 let err = "NaN".parse::<usize>().unwrap_err();
598 sentry::capture_error(&err);
599 },
600 );
601 });
602 });
603
604 assert_eq!(envelopes.len(), 4); let mut items = envelopes[3].items();
607 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
608 assert_eq!(session.status, SessionStatus::Exited);
609 assert_eq!(session.errors, 3);
610 assert!(!session.init);
611 } else {
612 panic!("expected session");
613 }
614 assert_eq!(items.next(), None);
615 }
616
617 #[test]
621 fn test_dont_inherit_session_backwards() {
622 let envelopes = capture_envelopes(|| {
623 let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
624
625 sentry::Hub::run(hub, || {
626 sentry::with_scope(
627 |_| {},
628 || {
629 sentry::start_session();
630
631 let err = "NaN".parse::<usize>().unwrap_err();
632 sentry::capture_error(&err);
633 },
634 );
635
636 let err = "NaN".parse::<usize>().unwrap_err();
637 sentry::capture_error(&err);
638 });
639
640 let err = "NaN".parse::<usize>().unwrap_err();
641 sentry::capture_error(&err);
642 });
643
644 assert_eq!(envelopes.len(), 4); let mut items = envelopes[0].items();
647 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
648 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
649 assert_eq!(session.status, SessionStatus::Ok);
650 assert_eq!(session.errors, 1);
651 assert!(session.init);
652 } else {
653 panic!("expected session");
654 }
655 assert_eq!(items.next(), None);
656
657 let mut items = envelopes[1].items();
659 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
660 assert_eq!(items.next(), None);
661
662 let mut items = envelopes[2].items();
663 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
664 assert_eq!(items.next(), None);
665
666 let mut items = envelopes[3].items();
668 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
669 assert_eq!(session.status, SessionStatus::Exited);
670 assert_eq!(session.errors, 1);
671 assert!(!session.init);
672 } else {
673 panic!("expected session");
674 }
675 assert_eq!(items.next(), None);
676 }
677 }
678}