re_sdk/
global.rs

1//! Keeps track of global and thread-local [`RecordingStream`]s and handles fallback logic between
2//! them.
3
4use std::{cell::RefCell, sync::OnceLock};
5
6use parking_lot::RwLock;
7
8use crate::{RecordingStream, StoreKind};
9
10// ---
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13enum RecordingScope {
14    Global,
15    ThreadLocal,
16}
17
18impl std::fmt::Display for RecordingScope {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        f.write_str(match self {
21            Self::Global => "global",
22            Self::ThreadLocal => "thread-local",
23        })
24    }
25}
26
27// ---
28
29/// Required to work-around <https://github.com/rerun-io/rerun/issues/2889>
30#[derive(Default)]
31struct ThreadLocalRecording {
32    stream: Option<RecordingStream>,
33}
34
35impl ThreadLocalRecording {
36    fn replace(&mut self, stream: Option<RecordingStream>) -> Option<RecordingStream> {
37        std::mem::replace(&mut self.stream, stream)
38    }
39
40    fn get(&self) -> Option<RecordingStream> {
41        self.stream.clone()
42    }
43}
44
45#[cfg(any(target_os = "macos", target_os = "windows"))]
46impl Drop for ThreadLocalRecording {
47    fn drop(&mut self) {
48        if let Some(stream) = self.stream.take() {
49            // Work-around for https://github.com/rerun-io/rerun/issues/2889
50            // Calling drop on `self.stream` will panic the calling thread.
51            // But we want to make sure we don't loose the data in the stream.
52            // So how?
53            re_log::warn!(
54                "Using thread-local RecordingStream on macOS & Windows can result in data loss because of https://github.com/rerun-io/rerun/issues/3937"
55            );
56
57            // Give the batcher and sink threads a chance to process the data.
58            std::thread::sleep(std::time::Duration::from_millis(500));
59
60            #[expect(clippy::mem_forget)] // Intentionally not calling `drop`
61            std::mem::forget(stream);
62        }
63    }
64}
65
66static GLOBAL_DATA_RECORDING: OnceLock<RwLock<Option<RecordingStream>>> = OnceLock::new();
67thread_local! {
68    static LOCAL_DATA_RECORDING: RefCell<ThreadLocalRecording> = Default::default();
69}
70
71static GLOBAL_BLUEPRINT_RECORDING: OnceLock<RwLock<Option<RecordingStream>>> = OnceLock::new();
72thread_local! {
73    static LOCAL_BLUEPRINT_RECORDING: RefCell<ThreadLocalRecording> = Default::default();
74}
75
76/// Check whether we are the child of a fork.
77///
78/// If so, then our globals need to be cleaned up because they don't have associated batching
79/// or sink threads. The parent of the fork will continue to process any data in the original
80/// globals so nothing is being lost by doing this.
81pub fn cleanup_if_forked_child() {
82    if let Some(global_recording) = RecordingStream::global(StoreKind::Recording)
83        && global_recording.is_forked_child()
84    {
85        re_log::debug!("Fork detected. Forgetting global recording");
86        RecordingStream::forget_global(StoreKind::Recording);
87    }
88
89    if let Some(global_blueprint) = RecordingStream::global(StoreKind::Blueprint)
90        && global_blueprint.is_forked_child()
91    {
92        re_log::debug!("Fork detected. Forgetting global blueprint");
93        RecordingStream::forget_global(StoreKind::Recording);
94    }
95
96    if let Some(thread_recording) = RecordingStream::thread_local(StoreKind::Recording)
97        && thread_recording.is_forked_child()
98    {
99        re_log::debug!("Fork detected. Forgetting thread-local recording");
100        RecordingStream::forget_thread_local(StoreKind::Recording);
101    }
102
103    if let Some(thread_blueprint) = RecordingStream::thread_local(StoreKind::Blueprint)
104        && thread_blueprint.is_forked_child()
105    {
106        re_log::debug!("Fork detected. Forgetting thread-local blueprint");
107        RecordingStream::forget_thread_local(StoreKind::Blueprint);
108    }
109}
110
111impl RecordingStream {
112    /// Returns `overrides` if it exists, otherwise returns the most appropriate active recording
113    /// of the specified type (i.e. thread-local first, then global scope), if any.
114    #[inline]
115    pub fn get(kind: StoreKind, overrides: Option<Self>) -> Option<Self> {
116        let rec = overrides.or_else(|| {
117            Self::get_any(RecordingScope::ThreadLocal, kind)
118                .or_else(|| Self::get_any(RecordingScope::Global, kind))
119        });
120
121        if rec.is_none() {
122            // NOTE: This is the one and only place where a warning about missing active recording
123            // should be printed, don't stutter!
124            re_log::warn_once!(
125                "There is no currently active {kind} stream available \
126                for the current thread ({:?}): have you called `set_global()` and/or \
127                `set_thread_local()` first?",
128                std::thread::current().id(),
129            );
130        }
131
132        rec
133    }
134
135    // Internal implementation of `get()` that doesn't print a warning if no recording is found.
136    // Used from python-bridge.
137    #[inline]
138    #[doc(hidden)]
139    pub fn get_quiet(kind: StoreKind, overrides: Option<Self>) -> Option<Self> {
140        let rec = overrides.or_else(|| {
141            Self::get_any(RecordingScope::ThreadLocal, kind)
142                .or_else(|| Self::get_any(RecordingScope::Global, kind))
143        });
144
145        if rec.is_none() {
146            // NOTE: This is the one and only place where a warning about missing active recording
147            // should be printed, don't stutter!
148            re_log::debug_once!(
149                "There is no currently active {kind} stream available \
150                for the current thread ({:?}): have you called `set_global()` and/or \
151                `set_thread_local()` first?",
152                std::thread::current().id(),
153            );
154        }
155
156        rec
157    }
158
159    // --- Global ---
160
161    /// Returns the currently active recording of the specified type in the global scope, if any.
162    #[inline]
163    pub fn global(kind: StoreKind) -> Option<Self> {
164        Self::get_any(RecordingScope::Global, kind)
165    }
166
167    /// Replaces the currently active recording of the specified type in the global scope with
168    /// the specified one.
169    ///
170    /// Returns the previous one, if any.
171    #[inline]
172    pub fn set_global(kind: StoreKind, rec: Option<Self>) -> Option<Self> {
173        Self::set_any(RecordingScope::Global, kind, rec)
174    }
175
176    /// Forgets the currently active recording of the specified type in the global scope.
177    ///
178    /// WARNING: this intentionally bypasses any drop/flush logic. This should only ever be used in
179    /// cases where you know the batcher/sink threads have been lost such as in a forked process.
180    #[inline]
181    pub fn forget_global(kind: StoreKind) {
182        Self::forget_any(RecordingScope::Global, kind);
183    }
184
185    // --- Thread local ---
186
187    /// Returns the currently active recording of the specified type in the thread-local scope,
188    /// if any.
189    #[inline]
190    pub fn thread_local(kind: StoreKind) -> Option<Self> {
191        Self::get_any(RecordingScope::ThreadLocal, kind)
192    }
193
194    /// Replaces the currently active recording of the specified type in the thread-local scope
195    /// with the specified one.
196    #[inline]
197    pub fn set_thread_local(kind: StoreKind, rec: Option<Self>) -> Option<Self> {
198        Self::set_any(RecordingScope::ThreadLocal, kind, rec)
199    }
200
201    /// Forgets the currently active recording of the specified type in the thread-local scope.
202    ///
203    /// WARNING: this intentionally bypasses any drop/flush logic. This should only ever be used in
204    /// cases where you know the batcher/sink threads have been lost such as in a forked process.
205    #[inline]
206    pub fn forget_thread_local(kind: StoreKind) {
207        Self::forget_any(RecordingScope::ThreadLocal, kind);
208    }
209
210    // --- Internal helpers ---
211
212    fn get_any(scope: RecordingScope, kind: StoreKind) -> Option<Self> {
213        match kind {
214            StoreKind::Recording => match scope {
215                RecordingScope::Global => GLOBAL_DATA_RECORDING
216                    .get_or_init(Default::default)
217                    .read()
218                    .clone(),
219                RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|rec| rec.borrow().get()),
220            },
221            StoreKind::Blueprint => match scope {
222                RecordingScope::Global => GLOBAL_BLUEPRINT_RECORDING
223                    .get_or_init(Default::default)
224                    .read()
225                    .clone(),
226                RecordingScope::ThreadLocal => {
227                    LOCAL_BLUEPRINT_RECORDING.with(|rec| rec.borrow().get())
228                }
229            },
230        }
231    }
232
233    fn set_any(scope: RecordingScope, kind: StoreKind, rec: Option<Self>) -> Option<Self> {
234        match kind {
235            StoreKind::Recording => match scope {
236                RecordingScope::Global => std::mem::replace(
237                    &mut *GLOBAL_DATA_RECORDING.get_or_init(Default::default).write(),
238                    rec,
239                ),
240                RecordingScope::ThreadLocal => {
241                    LOCAL_DATA_RECORDING.with(|cell| cell.borrow_mut().replace(rec))
242                }
243            },
244            StoreKind::Blueprint => match scope {
245                RecordingScope::Global => std::mem::replace(
246                    &mut *GLOBAL_BLUEPRINT_RECORDING
247                        .get_or_init(Default::default)
248                        .write(),
249                    rec,
250                ),
251                RecordingScope::ThreadLocal => {
252                    LOCAL_BLUEPRINT_RECORDING.with(|cell| cell.borrow_mut().replace(rec))
253                }
254            },
255        }
256    }
257
258    fn forget_any(scope: RecordingScope, kind: StoreKind) {
259        #![expect(clippy::mem_forget)] // Intentionally leak memory and bypass drop cleanup
260        match kind {
261            StoreKind::Recording => match scope {
262                RecordingScope::Global => {
263                    if let Some(global) = GLOBAL_DATA_RECORDING.get() {
264                        std::mem::forget(global.write().take());
265                    }
266                }
267                RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| {
268                    std::mem::forget(cell.take());
269                }),
270            },
271            StoreKind::Blueprint => match scope {
272                RecordingScope::Global => {
273                    if let Some(global) = GLOBAL_BLUEPRINT_RECORDING.get() {
274                        std::mem::forget(global.write().take());
275                    }
276                }
277                RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| {
278                    std::mem::forget(cell.take());
279                }),
280            },
281        }
282    }
283}
284
285// ---
286
287#[cfg(test)]
288mod tests {
289    use crate::RecordingStreamBuilder;
290
291    use super::*;
292
293    #[test]
294    fn fallbacks() {
295        fn check_store_id(expected: &RecordingStream, got: Option<RecordingStream>) {
296            assert_eq!(
297                expected.store_info().unwrap().store_id,
298                got.unwrap().store_info().unwrap().store_id
299            );
300        }
301
302        // nothing is set
303        assert!(RecordingStream::get(StoreKind::Recording, None).is_none());
304        assert!(RecordingStream::get(StoreKind::Blueprint, None).is_none());
305
306        // nothing is set -- explicit wins
307        let explicit = RecordingStreamBuilder::new("rerun_example_explicit")
308            .buffered()
309            .unwrap();
310        check_store_id(
311            &explicit,
312            RecordingStream::get(StoreKind::Recording, explicit.clone().into()),
313        );
314        check_store_id(
315            &explicit,
316            RecordingStream::get(StoreKind::Blueprint, explicit.clone().into()),
317        );
318
319        let global_data = RecordingStreamBuilder::new("rerun_example_global_data")
320            .buffered()
321            .unwrap();
322        assert!(
323            RecordingStream::set_global(StoreKind::Recording, Some(global_data.clone())).is_none()
324        );
325
326        let global_blueprint = RecordingStreamBuilder::new("rerun_example_global_blueprint")
327            .buffered()
328            .unwrap();
329        assert!(
330            RecordingStream::set_global(StoreKind::Blueprint, Some(global_blueprint.clone()))
331                .is_none()
332        );
333
334        // globals are set, no explicit -- globals win
335        check_store_id(
336            &global_data,
337            RecordingStream::get(StoreKind::Recording, None),
338        );
339        check_store_id(
340            &global_blueprint,
341            RecordingStream::get(StoreKind::Blueprint, None),
342        );
343
344        // overwrite globals with themselves -- we expect to get the same value back
345        check_store_id(
346            &global_data,
347            RecordingStream::set_global(StoreKind::Recording, Some(global_data.clone())),
348        );
349        check_store_id(
350            &global_blueprint,
351            RecordingStream::set_global(StoreKind::Blueprint, Some(global_blueprint.clone())),
352        );
353
354        std::thread::Builder::new()
355            .spawn({
356                let global_data = global_data.clone();
357                let global_blueprint = global_blueprint.clone();
358                move || {
359                    // globals are still set, no explicit -- globals still win
360                    check_store_id(
361                        &global_data,
362                        RecordingStream::get(StoreKind::Recording, None),
363                    );
364                    check_store_id(
365                        &global_blueprint,
366                        RecordingStream::get(StoreKind::Blueprint, None),
367                    );
368
369                    let local_data = RecordingStreamBuilder::new("rerun_example_local_data")
370                        .buffered()
371                        .unwrap();
372                    assert!(
373                        RecordingStream::set_thread_local(
374                            StoreKind::Recording,
375                            Some(local_data.clone())
376                        )
377                        .is_none()
378                    );
379
380                    let local_blueprint =
381                        RecordingStreamBuilder::new("rerun_example_local_blueprint")
382                            .buffered()
383                            .unwrap();
384                    assert!(
385                        RecordingStream::set_thread_local(
386                            StoreKind::Blueprint,
387                            Some(local_blueprint.clone())
388                        )
389                        .is_none()
390                    );
391
392                    // locals are set for this thread -- locals win
393                    check_store_id(
394                        &local_data,
395                        RecordingStream::get(StoreKind::Recording, None),
396                    );
397                    check_store_id(
398                        &local_blueprint,
399                        RecordingStream::get(StoreKind::Blueprint, None),
400                    );
401
402                    // explicit still outsmarts everyone no matter what
403                    check_store_id(
404                        &explicit,
405                        RecordingStream::get(StoreKind::Recording, explicit.clone().into()),
406                    );
407                    check_store_id(
408                        &explicit,
409                        RecordingStream::get(StoreKind::Blueprint, explicit.clone().into()),
410                    );
411                }
412            })
413            .unwrap()
414            .join()
415            .unwrap();
416
417        // locals should not exist in this thread -- global wins
418        check_store_id(
419            &global_data,
420            RecordingStream::get(StoreKind::Recording, None),
421        );
422        check_store_id(
423            &global_blueprint,
424            RecordingStream::get(StoreKind::Blueprint, None),
425        );
426
427        let local_data = RecordingStreamBuilder::new("rerun_example_local_data")
428            .buffered()
429            .unwrap();
430        assert!(
431            RecordingStream::set_thread_local(StoreKind::Recording, Some(local_data.clone()))
432                .is_none()
433        );
434
435        let local_blueprint = RecordingStreamBuilder::new("rerun_example_local_blueprint")
436            .buffered()
437            .unwrap();
438        assert!(
439            RecordingStream::set_thread_local(StoreKind::Blueprint, Some(local_blueprint.clone()))
440                .is_none()
441        );
442
443        check_store_id(
444            &global_data,
445            RecordingStream::set_global(StoreKind::Recording, None),
446        );
447        check_store_id(
448            &global_blueprint,
449            RecordingStream::set_global(StoreKind::Blueprint, None),
450        );
451
452        // locals still win
453        check_store_id(
454            &local_data,
455            RecordingStream::get(StoreKind::Recording, None),
456        );
457        check_store_id(
458            &local_blueprint,
459            RecordingStream::get(StoreKind::Blueprint, None),
460        );
461    }
462}