Skip to main content

sentry_core/
session.rs

1//! Release Health Sessions
2//!
3//! <https://develop.sentry.dev/sdk/sessions/>
4
5#[cfg(feature = "release-health")]
6pub use session_impl::*;
7
8#[cfg(feature = "release-health")]
9mod session_impl {
10
11    use std::collections::HashMap;
12    use std::sync::{Arc, Condvar, Mutex, MutexGuard};
13    use std::thread::JoinHandle;
14    use std::time::{Duration, Instant, SystemTime};
15
16    use crate::client::TransportArc;
17    use crate::clientoptions::SessionMode;
18    use crate::protocol::{
19        EnvelopeItem, Event, Level, SessionAggregateItem, SessionAggregates, SessionAttributes,
20        SessionStatus, SessionUpdate,
21    };
22
23    use crate::scope::StackLayer;
24
25    use crate::types::random_uuid;
26    use crate::{Client, Envelope};
27
28    #[derive(Clone, Debug)]
29    pub struct Session {
30        client: Arc<Client>,
31        session_update: SessionUpdate<'static>,
32        started: Instant,
33        dirty: bool,
34    }
35
36    impl Drop for Session {
37        fn drop(&mut self) {
38            self.close(SessionStatus::Exited);
39            if self.dirty {
40                self.client.enqueue_session(self.session_update.clone());
41            }
42        }
43    }
44
45    impl Session {
46        pub fn from_stack(stack: &StackLayer) -> Option<Self> {
47            let client = stack.client.as_ref()?;
48            let options = client.options();
49            let user = stack.scope.user.as_deref();
50            let distinct_id = user
51                .and_then(|user| {
52                    user.id
53                        .as_ref()
54                        .or(user.email.as_ref())
55                        .or(user.username.as_ref())
56                })
57                .cloned();
58            Some(Self {
59                client: client.clone(),
60                session_update: SessionUpdate {
61                    session_id: random_uuid(),
62                    distinct_id,
63                    sequence: None,
64                    timestamp: None,
65                    started: SystemTime::now(),
66                    init: true,
67                    duration: None,
68                    status: SessionStatus::Ok,
69                    errors: 0,
70                    attributes: SessionAttributes {
71                        release: options.release.clone()?,
72                        environment: options.environment.clone(),
73                        ip_address: None,
74                        user_agent: None,
75                    },
76                },
77                started: Instant::now(),
78                dirty: true,
79            })
80        }
81
82        pub(crate) fn update_from_event(&mut self, event: &Event<'static>) {
83            if self.session_update.status != SessionStatus::Ok {
84                // a session that has already transitioned to a "terminal" state
85                // should not receive any more updates
86                return;
87            }
88            let mut has_error = event.level >= Level::Error;
89            let mut is_crash = false;
90            for exc in &event.exception.values {
91                has_error = true;
92                if let Some(mechanism) = &exc.mechanism {
93                    if let Some(false) = mechanism.handled {
94                        is_crash = true;
95                        break;
96                    }
97                }
98            }
99
100            if is_crash {
101                self.session_update.status = SessionStatus::Crashed;
102            }
103            if has_error {
104                self.session_update.errors += 1;
105                self.dirty = true;
106            }
107        }
108
109        pub(crate) fn close(&mut self, status: SessionStatus) {
110            if self.session_update.status == SessionStatus::Ok {
111                let status = match status {
112                    SessionStatus::Ok => SessionStatus::Exited,
113                    s => s,
114                };
115                self.session_update.duration = Some(self.started.elapsed().as_secs_f64());
116                self.session_update.status = status;
117                self.dirty = true;
118            }
119        }
120
121        pub(crate) fn create_envelope_item(&mut self) -> Option<EnvelopeItem> {
122            if self.dirty {
123                let item = self.session_update.clone().into();
124                self.session_update.init = false;
125                self.dirty = false;
126                return Some(item);
127            }
128            None
129        }
130    }
131
132    // as defined here: https://develop.sentry.dev/sdk/envelopes/#size-limits
133    const MAX_SESSION_ITEMS: usize = 100;
134    const FLUSH_INTERVAL: Duration = Duration::from_secs(60);
135
136    #[derive(Debug, Default)]
137    struct SessionQueue {
138        individual: Vec<SessionUpdate<'static>>,
139        aggregated: Option<AggregatedSessions>,
140    }
141
142    #[derive(Debug)]
143    struct AggregatedSessions {
144        buckets: HashMap<AggregationKey, AggregationCounts>,
145        attributes: SessionAttributes<'static>,
146    }
147
148    impl From<AggregatedSessions> for EnvelopeItem {
149        fn from(sessions: AggregatedSessions) -> Self {
150            let aggregates = sessions
151                .buckets
152                .into_iter()
153                .map(|(key, counts)| SessionAggregateItem {
154                    started: key.started,
155                    distinct_id: key.distinct_id,
156                    exited: counts.exited,
157                    errored: counts.errored,
158                    abnormal: counts.abnormal,
159                    crashed: counts.crashed,
160                })
161                .collect();
162
163            SessionAggregates {
164                aggregates,
165                attributes: sessions.attributes,
166            }
167            .into()
168        }
169    }
170
171    #[derive(Debug, PartialEq, Eq, Hash)]
172    struct AggregationKey {
173        started: SystemTime,
174        distinct_id: Option<String>,
175    }
176
177    #[derive(Debug, Default)]
178    struct AggregationCounts {
179        exited: u32,
180        errored: u32,
181        abnormal: u32,
182        crashed: u32,
183    }
184
185    /// Background Session Flusher
186    ///
187    /// The background flusher queues session updates for delayed batched sending.
188    /// It has its own background thread that will flush its queue once every
189    /// `FLUSH_INTERVAL`.
190    pub(crate) struct SessionFlusher {
191        transport: TransportArc,
192        mode: SessionMode,
193        queue: Arc<Mutex<SessionQueue>>,
194        shutdown: Arc<(Mutex<bool>, Condvar)>,
195        worker: Option<JoinHandle<()>>,
196    }
197
198    impl SessionFlusher {
199        /// Creates a new Flusher that will submit envelopes to the given `transport`.
200        pub fn new(transport: TransportArc, mode: SessionMode) -> Self {
201            let queue = Arc::new(Mutex::new(Default::default()));
202            #[allow(clippy::mutex_atomic)]
203            let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
204
205            let worker_transport = transport.clone();
206            let worker_queue = queue.clone();
207            let worker_shutdown = shutdown.clone();
208            let worker = std::thread::Builder::new()
209                .name("sentry-session-flusher".into())
210                .spawn(move || {
211                    let (lock, cvar) = worker_shutdown.as_ref();
212                    let mut shutdown = lock.lock().unwrap();
213                    // check this immediately, in case the main thread is already shutting down
214                    if *shutdown {
215                        return;
216                    }
217                    let mut last_flush = Instant::now();
218                    loop {
219                        let timeout = FLUSH_INTERVAL
220                            .checked_sub(last_flush.elapsed())
221                            .unwrap_or_else(|| Duration::from_secs(0));
222                        shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
223                        if *shutdown {
224                            return;
225                        }
226                        if last_flush.elapsed() < FLUSH_INTERVAL {
227                            continue;
228                        }
229                        SessionFlusher::flush_queue_internal(
230                            worker_queue.lock().unwrap(),
231                            &worker_transport,
232                        );
233                        last_flush = Instant::now();
234                    }
235                })
236                .unwrap();
237
238            Self {
239                transport,
240                mode,
241                queue,
242                shutdown,
243                worker: Some(worker),
244            }
245        }
246
247        /// Enqueues a session update for delayed sending.
248        ///
249        /// This will aggregate session counts in request mode, for all sessions
250        /// that were not yet partially sent.
251        pub fn enqueue(&self, session_update: SessionUpdate<'static>) {
252            let mut queue = self.queue.lock().unwrap();
253            if self.mode == SessionMode::Application || !session_update.init {
254                queue.individual.push(session_update);
255                if queue.individual.len() >= MAX_SESSION_ITEMS {
256                    SessionFlusher::flush_queue_internal(queue, &self.transport);
257                }
258                return;
259            }
260
261            let aggregate = queue.aggregated.get_or_insert_with(|| AggregatedSessions {
262                buckets: HashMap::with_capacity(1),
263                attributes: session_update.attributes.clone(),
264            });
265
266            let duration = session_update
267                .started
268                .duration_since(SystemTime::UNIX_EPOCH)
269                .unwrap();
270            let duration = (duration.as_secs() / 60) * 60;
271            let started = SystemTime::UNIX_EPOCH
272                .checked_add(Duration::from_secs(duration))
273                .unwrap();
274
275            let key = AggregationKey {
276                started,
277                distinct_id: session_update.distinct_id,
278            };
279
280            let bucket = aggregate.buckets.entry(key).or_default();
281
282            match session_update.status {
283                SessionStatus::Exited => {
284                    if session_update.errors > 0 {
285                        bucket.errored += 1;
286                    } else {
287                        bucket.exited += 1;
288                    }
289                }
290                SessionStatus::Crashed => {
291                    bucket.crashed += 1;
292                }
293                SessionStatus::Abnormal => {
294                    bucket.abnormal += 1;
295                }
296                SessionStatus::Ok => {
297                    sentry_debug!("unreachable: only closed sessions will be enqueued");
298                }
299            }
300        }
301
302        /// Flushes the queue to the transport.
303        pub fn flush(&self) {
304            let queue = self.queue.lock().unwrap();
305            SessionFlusher::flush_queue_internal(queue, &self.transport);
306        }
307
308        /// Flushes the queue to the transport.
309        ///
310        /// This is a static method as it will be called from both the background
311        /// thread and the main thread on drop.
312        fn flush_queue_internal(
313            mut queue_lock: MutexGuard<SessionQueue>,
314            transport: &TransportArc,
315        ) {
316            let queue = std::mem::take(&mut queue_lock.individual);
317            let aggregate = queue_lock.aggregated.take();
318            drop(queue_lock);
319
320            // send aggregates
321            if let Some(aggregate) = aggregate {
322                if let Some(ref transport) = *transport.read().unwrap() {
323                    let mut envelope = Envelope::new();
324                    envelope.add_item(aggregate);
325                    transport.send_envelope(envelope);
326                }
327            }
328
329            // send individual items
330            if queue.is_empty() {
331                return;
332            }
333
334            let mut envelope = Envelope::new();
335            let mut items = 0;
336
337            for session_update in queue {
338                if items >= MAX_SESSION_ITEMS {
339                    if let Some(ref transport) = *transport.read().unwrap() {
340                        transport.send_envelope(envelope);
341                    }
342                    envelope = Envelope::new();
343                    items = 0;
344                }
345
346                envelope.add_item(session_update);
347                items += 1;
348            }
349
350            if let Some(ref transport) = *transport.read().unwrap() {
351                transport.send_envelope(envelope);
352            }
353        }
354    }
355
356    impl Drop for SessionFlusher {
357        fn drop(&mut self) {
358            let (lock, cvar) = self.shutdown.as_ref();
359            *lock.lock().unwrap() = true;
360            cvar.notify_one();
361
362            if let Some(worker) = self.worker.take() {
363                worker.join().ok();
364            }
365            SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
366        }
367    }
368
369    #[cfg(all(test, feature = "test"))]
370    mod tests {
371        use std::cmp::Ordering;
372
373        use super::*;
374        use crate as sentry;
375        use crate::protocol::{Envelope, EnvelopeItem, SessionStatus};
376
377        fn capture_envelopes<F>(f: F) -> Vec<Envelope>
378        where
379            F: FnOnce(),
380        {
381            crate::test::with_captured_envelopes_options(
382                f,
383                crate::ClientOptions {
384                    release: Some("some-release".into()),
385                    ..Default::default()
386                },
387            )
388        }
389
390        #[test]
391        fn test_session_startstop() {
392            let envelopes = capture_envelopes(|| {
393                sentry::start_session();
394                std::thread::sleep(std::time::Duration::from_millis(10));
395            });
396            assert_eq!(envelopes.len(), 1);
397
398            let mut items = envelopes[0].items();
399            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
400                assert_eq!(session.status, SessionStatus::Exited);
401                assert!(session.duration.unwrap() > 0.01);
402                assert_eq!(session.errors, 0);
403                assert_eq!(session.attributes.release, "some-release");
404                assert!(session.init);
405            } else {
406                panic!("expected session");
407            }
408            assert_eq!(items.next(), None);
409        }
410
411        #[test]
412        fn test_session_batching() {
413            let envelopes = capture_envelopes(|| {
414                for _ in 0..(MAX_SESSION_ITEMS * 2) {
415                    sentry::start_session();
416                }
417            });
418            // we only want *two* envelope for all the sessions
419            assert_eq!(envelopes.len(), 2);
420
421            let items = envelopes[0].items().chain(envelopes[1].items());
422            assert_eq!(items.clone().count(), MAX_SESSION_ITEMS * 2);
423            for item in items {
424                assert!(matches!(item, EnvelopeItem::SessionUpdate(_)));
425            }
426        }
427
428        #[test]
429        fn test_session_aggregation() {
430            let envelopes = crate::test::with_captured_envelopes_options(
431                || {
432                    sentry::start_session();
433                    let err = "NaN".parse::<usize>().unwrap_err();
434                    sentry::capture_error(&err);
435
436                    for _ in 0..50 {
437                        sentry::start_session();
438                    }
439                    sentry::end_session();
440
441                    sentry::configure_scope(|scope| {
442                        scope.set_user(Some(sentry::User {
443                            id: Some("foo-bar".into()),
444                            ..Default::default()
445                        }));
446                        scope.add_event_processor(Box::new(|_| None));
447                    });
448
449                    for _ in 0..50 {
450                        sentry::start_session();
451                    }
452
453                    // This error will be discarded because of the event processor,
454                    // and session will not be updated.
455                    // Only events dropped due to sampling should update the session.
456                    let err = "NaN".parse::<usize>().unwrap_err();
457                    sentry::capture_error(&err);
458                },
459                crate::ClientOptions {
460                    release: Some("some-release".into()),
461                    session_mode: SessionMode::Request,
462                    ..Default::default()
463                },
464            );
465            assert_eq!(envelopes.len(), 2);
466
467            let mut items = envelopes[0].items();
468            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
469            assert_eq!(items.next(), None);
470
471            let mut items = envelopes[1].items();
472            if let Some(EnvelopeItem::SessionAggregates(aggregate)) = items.next() {
473                let mut aggregates = aggregate.aggregates.clone();
474                assert_eq!(aggregates.len(), 2);
475                // the order depends on a hashmap and is not stable otherwise
476                aggregates.sort_by(|a, b| {
477                    a.distinct_id
478                        .partial_cmp(&b.distinct_id)
479                        .unwrap_or(Ordering::Less)
480                });
481
482                assert_eq!(aggregates[0].distinct_id, None);
483                assert_eq!(aggregates[0].exited, 50);
484
485                assert_eq!(aggregates[1].errored, 0);
486                assert_eq!(aggregates[1].distinct_id, Some("foo-bar".into()));
487                assert_eq!(aggregates[1].exited, 50);
488            } else {
489                panic!("expected session");
490            }
491            assert_eq!(items.next(), None);
492        }
493
494        #[test]
495        fn test_session_error() {
496            let envelopes = capture_envelopes(|| {
497                sentry::start_session();
498
499                let err = "NaN".parse::<usize>().unwrap_err();
500                sentry::capture_error(&err);
501            });
502            assert_eq!(envelopes.len(), 2);
503
504            let mut items = envelopes[0].items();
505            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
506            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
507                assert_eq!(session.status, SessionStatus::Ok);
508                assert_eq!(session.errors, 1);
509                assert_eq!(session.attributes.release, "some-release");
510                assert!(session.init);
511            } else {
512                panic!("expected session");
513            }
514            assert_eq!(items.next(), None);
515
516            let mut items = envelopes[1].items();
517            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
518                assert_eq!(session.status, SessionStatus::Exited);
519                assert_eq!(session.errors, 1);
520                assert!(!session.init);
521            } else {
522                panic!("expected session");
523            }
524            assert_eq!(items.next(), None);
525        }
526
527        #[test]
528        fn test_session_abnormal() {
529            let envelopes = capture_envelopes(|| {
530                sentry::start_session();
531                sentry::end_session_with_status(SessionStatus::Abnormal);
532            });
533            assert_eq!(envelopes.len(), 1);
534
535            let mut items = envelopes[0].items();
536            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
537                assert_eq!(session.status, SessionStatus::Abnormal);
538                assert!(session.init);
539            } else {
540                panic!("expected session");
541            }
542            assert_eq!(items.next(), None);
543        }
544
545        #[test]
546        fn test_session_sampled_errors() {
547            let mut envelopes = crate::test::with_captured_envelopes_options(
548                || {
549                    sentry::start_session();
550
551                    for _ in 0..100 {
552                        let err = "NaN".parse::<usize>().unwrap_err();
553                        sentry::capture_error(&err);
554                    }
555                },
556                crate::ClientOptions {
557                    release: Some("some-release".into()),
558                    sample_rate: 0.5,
559                    ..Default::default()
560                },
561            );
562            assert!(envelopes.len() > 25);
563            assert!(envelopes.len() < 75);
564
565            let envelope = envelopes.pop().unwrap();
566            let mut items = envelope.items();
567            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
568                assert_eq!(session.status, SessionStatus::Exited);
569                assert_eq!(session.errors, 100);
570            } else {
571                panic!("expected session");
572            }
573            assert_eq!(items.next(), None);
574        }
575
576        /// For _user-mode_ sessions, we want to inherit the session for any _new_
577        /// Hub that is spawned from the main thread Hub which already has a session
578        /// attached
579        #[test]
580        fn test_inherit_session_from_top() {
581            let envelopes = capture_envelopes(|| {
582                sentry::start_session();
583
584                let err = "NaN".parse::<usize>().unwrap_err();
585                sentry::capture_error(&err);
586
587                // create a new Hub which should have the same session
588                let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
589
590                sentry::Hub::run(hub, || {
591                    let err = "NaN".parse::<usize>().unwrap_err();
592                    sentry::capture_error(&err);
593
594                    sentry::with_scope(
595                        |_| {},
596                        || {
597                            let err = "NaN".parse::<usize>().unwrap_err();
598                            sentry::capture_error(&err);
599                        },
600                    );
601                });
602            });
603
604            assert_eq!(envelopes.len(), 4); // 3 errors and one session end
605
606            let mut items = envelopes[3].items();
607            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
608                assert_eq!(session.status, SessionStatus::Exited);
609                assert_eq!(session.errors, 3);
610                assert!(!session.init);
611            } else {
612                panic!("expected session");
613            }
614            assert_eq!(items.next(), None);
615        }
616
617        /// We want to forward-inherit sessions as the previous test asserted, but
618        /// not *backwards*. So any new session created in a derived Hub and scope
619        /// will only get updates from that particular scope.
620        #[test]
621        fn test_dont_inherit_session_backwards() {
622            let envelopes = capture_envelopes(|| {
623                let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
624
625                sentry::Hub::run(hub, || {
626                    sentry::with_scope(
627                        |_| {},
628                        || {
629                            sentry::start_session();
630
631                            let err = "NaN".parse::<usize>().unwrap_err();
632                            sentry::capture_error(&err);
633                        },
634                    );
635
636                    let err = "NaN".parse::<usize>().unwrap_err();
637                    sentry::capture_error(&err);
638                });
639
640                let err = "NaN".parse::<usize>().unwrap_err();
641                sentry::capture_error(&err);
642            });
643
644            assert_eq!(envelopes.len(), 4); // 3 errors and one session end
645
646            let mut items = envelopes[0].items();
647            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
648            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
649                assert_eq!(session.status, SessionStatus::Ok);
650                assert_eq!(session.errors, 1);
651                assert!(session.init);
652            } else {
653                panic!("expected session");
654            }
655            assert_eq!(items.next(), None);
656
657            // the other two events should not have session updates
658            let mut items = envelopes[1].items();
659            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
660            assert_eq!(items.next(), None);
661
662            let mut items = envelopes[2].items();
663            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
664            assert_eq!(items.next(), None);
665
666            // the session end is sent last as it is possibly batched
667            let mut items = envelopes[3].items();
668            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
669                assert_eq!(session.status, SessionStatus::Exited);
670                assert_eq!(session.errors, 1);
671                assert!(!session.init);
672            } else {
673                panic!("expected session");
674            }
675            assert_eq!(items.next(), None);
676        }
677    }
678}