1use std::{cell::RefCell, sync::OnceLock};
5
6use parking_lot::RwLock;
7
8use crate::{RecordingStream, StoreKind};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13enum RecordingScope {
14 Global,
15 ThreadLocal,
16}
17
18impl std::fmt::Display for RecordingScope {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 f.write_str(match self {
21 Self::Global => "global",
22 Self::ThreadLocal => "thread-local",
23 })
24 }
25}
26
27#[derive(Default)]
31struct ThreadLocalRecording {
32 stream: Option<RecordingStream>,
33}
34
35impl ThreadLocalRecording {
36 fn replace(&mut self, stream: Option<RecordingStream>) -> Option<RecordingStream> {
37 std::mem::replace(&mut self.stream, stream)
38 }
39
40 fn get(&self) -> Option<RecordingStream> {
41 self.stream.clone()
42 }
43}
44
45#[cfg(any(target_os = "macos", target_os = "windows"))]
46impl Drop for ThreadLocalRecording {
47 fn drop(&mut self) {
48 if let Some(stream) = self.stream.take() {
49 re_log::warn!(
54 "Using thread-local RecordingStream on macOS & Windows can result in data loss because of https://github.com/rerun-io/rerun/issues/3937"
55 );
56
57 std::thread::sleep(std::time::Duration::from_millis(500));
59
60 #[expect(clippy::mem_forget)] std::mem::forget(stream);
62 }
63 }
64}
65
66static GLOBAL_DATA_RECORDING: OnceLock<RwLock<Option<RecordingStream>>> = OnceLock::new();
67thread_local! {
68 static LOCAL_DATA_RECORDING: RefCell<ThreadLocalRecording> = Default::default();
69}
70
71static GLOBAL_BLUEPRINT_RECORDING: OnceLock<RwLock<Option<RecordingStream>>> = OnceLock::new();
72thread_local! {
73 static LOCAL_BLUEPRINT_RECORDING: RefCell<ThreadLocalRecording> = Default::default();
74}
75
76pub fn cleanup_if_forked_child() {
82 if let Some(global_recording) = RecordingStream::global(StoreKind::Recording)
83 && global_recording.is_forked_child()
84 {
85 re_log::debug!("Fork detected. Forgetting global recording");
86 RecordingStream::forget_global(StoreKind::Recording);
87 }
88
89 if let Some(global_blueprint) = RecordingStream::global(StoreKind::Blueprint)
90 && global_blueprint.is_forked_child()
91 {
92 re_log::debug!("Fork detected. Forgetting global blueprint");
93 RecordingStream::forget_global(StoreKind::Recording);
94 }
95
96 if let Some(thread_recording) = RecordingStream::thread_local(StoreKind::Recording)
97 && thread_recording.is_forked_child()
98 {
99 re_log::debug!("Fork detected. Forgetting thread-local recording");
100 RecordingStream::forget_thread_local(StoreKind::Recording);
101 }
102
103 if let Some(thread_blueprint) = RecordingStream::thread_local(StoreKind::Blueprint)
104 && thread_blueprint.is_forked_child()
105 {
106 re_log::debug!("Fork detected. Forgetting thread-local blueprint");
107 RecordingStream::forget_thread_local(StoreKind::Blueprint);
108 }
109}
110
111impl RecordingStream {
112 #[inline]
115 pub fn get(kind: StoreKind, overrides: Option<Self>) -> Option<Self> {
116 let rec = overrides.or_else(|| {
117 Self::get_any(RecordingScope::ThreadLocal, kind)
118 .or_else(|| Self::get_any(RecordingScope::Global, kind))
119 });
120
121 if rec.is_none() {
122 re_log::warn_once!(
125 "There is no currently active {kind} stream available \
126 for the current thread ({:?}): have you called `set_global()` and/or \
127 `set_thread_local()` first?",
128 std::thread::current().id(),
129 );
130 }
131
132 rec
133 }
134
135 #[inline]
138 #[doc(hidden)]
139 pub fn get_quiet(kind: StoreKind, overrides: Option<Self>) -> Option<Self> {
140 let rec = overrides.or_else(|| {
141 Self::get_any(RecordingScope::ThreadLocal, kind)
142 .or_else(|| Self::get_any(RecordingScope::Global, kind))
143 });
144
145 if rec.is_none() {
146 re_log::debug_once!(
149 "There is no currently active {kind} stream available \
150 for the current thread ({:?}): have you called `set_global()` and/or \
151 `set_thread_local()` first?",
152 std::thread::current().id(),
153 );
154 }
155
156 rec
157 }
158
159 #[inline]
163 pub fn global(kind: StoreKind) -> Option<Self> {
164 Self::get_any(RecordingScope::Global, kind)
165 }
166
167 #[inline]
172 pub fn set_global(kind: StoreKind, rec: Option<Self>) -> Option<Self> {
173 Self::set_any(RecordingScope::Global, kind, rec)
174 }
175
176 #[inline]
181 pub fn forget_global(kind: StoreKind) {
182 Self::forget_any(RecordingScope::Global, kind);
183 }
184
185 #[inline]
190 pub fn thread_local(kind: StoreKind) -> Option<Self> {
191 Self::get_any(RecordingScope::ThreadLocal, kind)
192 }
193
194 #[inline]
197 pub fn set_thread_local(kind: StoreKind, rec: Option<Self>) -> Option<Self> {
198 Self::set_any(RecordingScope::ThreadLocal, kind, rec)
199 }
200
201 #[inline]
206 pub fn forget_thread_local(kind: StoreKind) {
207 Self::forget_any(RecordingScope::ThreadLocal, kind);
208 }
209
210 fn get_any(scope: RecordingScope, kind: StoreKind) -> Option<Self> {
213 match kind {
214 StoreKind::Recording => match scope {
215 RecordingScope::Global => GLOBAL_DATA_RECORDING
216 .get_or_init(Default::default)
217 .read()
218 .clone(),
219 RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|rec| rec.borrow().get()),
220 },
221 StoreKind::Blueprint => match scope {
222 RecordingScope::Global => GLOBAL_BLUEPRINT_RECORDING
223 .get_or_init(Default::default)
224 .read()
225 .clone(),
226 RecordingScope::ThreadLocal => {
227 LOCAL_BLUEPRINT_RECORDING.with(|rec| rec.borrow().get())
228 }
229 },
230 }
231 }
232
233 fn set_any(scope: RecordingScope, kind: StoreKind, rec: Option<Self>) -> Option<Self> {
234 match kind {
235 StoreKind::Recording => match scope {
236 RecordingScope::Global => std::mem::replace(
237 &mut *GLOBAL_DATA_RECORDING.get_or_init(Default::default).write(),
238 rec,
239 ),
240 RecordingScope::ThreadLocal => {
241 LOCAL_DATA_RECORDING.with(|cell| cell.borrow_mut().replace(rec))
242 }
243 },
244 StoreKind::Blueprint => match scope {
245 RecordingScope::Global => std::mem::replace(
246 &mut *GLOBAL_BLUEPRINT_RECORDING
247 .get_or_init(Default::default)
248 .write(),
249 rec,
250 ),
251 RecordingScope::ThreadLocal => {
252 LOCAL_BLUEPRINT_RECORDING.with(|cell| cell.borrow_mut().replace(rec))
253 }
254 },
255 }
256 }
257
258 fn forget_any(scope: RecordingScope, kind: StoreKind) {
259 #![expect(clippy::mem_forget)] match kind {
261 StoreKind::Recording => match scope {
262 RecordingScope::Global => {
263 if let Some(global) = GLOBAL_DATA_RECORDING.get() {
264 std::mem::forget(global.write().take());
265 }
266 }
267 RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| {
268 std::mem::forget(cell.take());
269 }),
270 },
271 StoreKind::Blueprint => match scope {
272 RecordingScope::Global => {
273 if let Some(global) = GLOBAL_BLUEPRINT_RECORDING.get() {
274 std::mem::forget(global.write().take());
275 }
276 }
277 RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| {
278 std::mem::forget(cell.take());
279 }),
280 },
281 }
282 }
283}
284
285#[cfg(test)]
288mod tests {
289 use crate::RecordingStreamBuilder;
290
291 use super::*;
292
293 #[test]
294 fn fallbacks() {
295 fn check_store_id(expected: &RecordingStream, got: Option<RecordingStream>) {
296 assert_eq!(
297 expected.store_info().unwrap().store_id,
298 got.unwrap().store_info().unwrap().store_id
299 );
300 }
301
302 assert!(RecordingStream::get(StoreKind::Recording, None).is_none());
304 assert!(RecordingStream::get(StoreKind::Blueprint, None).is_none());
305
306 let explicit = RecordingStreamBuilder::new("rerun_example_explicit")
308 .buffered()
309 .unwrap();
310 check_store_id(
311 &explicit,
312 RecordingStream::get(StoreKind::Recording, explicit.clone().into()),
313 );
314 check_store_id(
315 &explicit,
316 RecordingStream::get(StoreKind::Blueprint, explicit.clone().into()),
317 );
318
319 let global_data = RecordingStreamBuilder::new("rerun_example_global_data")
320 .buffered()
321 .unwrap();
322 assert!(
323 RecordingStream::set_global(StoreKind::Recording, Some(global_data.clone())).is_none()
324 );
325
326 let global_blueprint = RecordingStreamBuilder::new("rerun_example_global_blueprint")
327 .buffered()
328 .unwrap();
329 assert!(
330 RecordingStream::set_global(StoreKind::Blueprint, Some(global_blueprint.clone()))
331 .is_none()
332 );
333
334 check_store_id(
336 &global_data,
337 RecordingStream::get(StoreKind::Recording, None),
338 );
339 check_store_id(
340 &global_blueprint,
341 RecordingStream::get(StoreKind::Blueprint, None),
342 );
343
344 check_store_id(
346 &global_data,
347 RecordingStream::set_global(StoreKind::Recording, Some(global_data.clone())),
348 );
349 check_store_id(
350 &global_blueprint,
351 RecordingStream::set_global(StoreKind::Blueprint, Some(global_blueprint.clone())),
352 );
353
354 std::thread::Builder::new()
355 .spawn({
356 let global_data = global_data.clone();
357 let global_blueprint = global_blueprint.clone();
358 move || {
359 check_store_id(
361 &global_data,
362 RecordingStream::get(StoreKind::Recording, None),
363 );
364 check_store_id(
365 &global_blueprint,
366 RecordingStream::get(StoreKind::Blueprint, None),
367 );
368
369 let local_data = RecordingStreamBuilder::new("rerun_example_local_data")
370 .buffered()
371 .unwrap();
372 assert!(
373 RecordingStream::set_thread_local(
374 StoreKind::Recording,
375 Some(local_data.clone())
376 )
377 .is_none()
378 );
379
380 let local_blueprint =
381 RecordingStreamBuilder::new("rerun_example_local_blueprint")
382 .buffered()
383 .unwrap();
384 assert!(
385 RecordingStream::set_thread_local(
386 StoreKind::Blueprint,
387 Some(local_blueprint.clone())
388 )
389 .is_none()
390 );
391
392 check_store_id(
394 &local_data,
395 RecordingStream::get(StoreKind::Recording, None),
396 );
397 check_store_id(
398 &local_blueprint,
399 RecordingStream::get(StoreKind::Blueprint, None),
400 );
401
402 check_store_id(
404 &explicit,
405 RecordingStream::get(StoreKind::Recording, explicit.clone().into()),
406 );
407 check_store_id(
408 &explicit,
409 RecordingStream::get(StoreKind::Blueprint, explicit.clone().into()),
410 );
411 }
412 })
413 .unwrap()
414 .join()
415 .unwrap();
416
417 check_store_id(
419 &global_data,
420 RecordingStream::get(StoreKind::Recording, None),
421 );
422 check_store_id(
423 &global_blueprint,
424 RecordingStream::get(StoreKind::Blueprint, None),
425 );
426
427 let local_data = RecordingStreamBuilder::new("rerun_example_local_data")
428 .buffered()
429 .unwrap();
430 assert!(
431 RecordingStream::set_thread_local(StoreKind::Recording, Some(local_data.clone()))
432 .is_none()
433 );
434
435 let local_blueprint = RecordingStreamBuilder::new("rerun_example_local_blueprint")
436 .buffered()
437 .unwrap();
438 assert!(
439 RecordingStream::set_thread_local(StoreKind::Blueprint, Some(local_blueprint.clone()))
440 .is_none()
441 );
442
443 check_store_id(
444 &global_data,
445 RecordingStream::set_global(StoreKind::Recording, None),
446 );
447 check_store_id(
448 &global_blueprint,
449 RecordingStream::set_global(StoreKind::Blueprint, None),
450 );
451
452 check_store_id(
454 &local_data,
455 RecordingStream::get(StoreKind::Recording, None),
456 );
457 check_store_id(
458 &local_blueprint,
459 RecordingStream::get(StoreKind::Blueprint, None),
460 );
461 }
462}